Initial working source caching

This commit is contained in:
Mathias Hall-Andersen
2017-11-14 16:27:53 +01:00
parent 566269275e
commit 69fe86edf0
6 changed files with 63 additions and 54 deletions
+33 -28
View File
@@ -20,12 +20,13 @@ type QueueHandshakeElement struct {
}
type QueueInboundElement struct {
dropped int32
mutex sync.Mutex
buffer *[MaxMessageSize]byte
packet []byte
counter uint64
keyPair *KeyPair
dropped int32
mutex sync.Mutex
buffer *[MaxMessageSize]byte
packet []byte
counter uint64
keyPair *KeyPair
endpoint Endpoint
}
func (elem *QueueInboundElement) Drop() {
@@ -92,25 +93,13 @@ func (device *Device) addToHandshakeQueue(
}
}
func (device *Device) RoutineReceiveIncomming(IPVersion int) {
func (device *Device) RoutineReceiveIncomming(IP int, bind UDPBind) {
logDebug := device.log.Debug
logDebug.Println("Routine, receive incomming, IP version:", IPVersion)
logDebug.Println("Routine, receive incomming, IP version:", IP)
for {
// wait for bind
logDebug.Println("Waiting for UDP socket, IP version:", IPVersion)
device.net.update.Wait()
device.net.mutex.RLock()
bind := device.net.bind
device.net.mutex.RUnlock()
if bind == nil {
continue
}
// receive datagrams until conn is closed
buffer := device.GetMessageBuffer()
@@ -124,7 +113,7 @@ func (device *Device) RoutineReceiveIncomming(IPVersion int) {
var endpoint Endpoint
switch IPVersion {
switch IP {
case ipv4.Version:
size, err = bind.ReceiveIPv4(buffer[:], &endpoint)
case ipv6.Version:
@@ -181,10 +170,11 @@ func (device *Device) RoutineReceiveIncomming(IPVersion int) {
peer := value.peer
elem := &QueueInboundElement{
packet: packet,
buffer: buffer,
keyPair: keyPair,
dropped: AtomicFalse,
packet: packet,
buffer: buffer,
keyPair: keyPair,
dropped: AtomicFalse,
endpoint: endpoint,
}
elem.mutex.Lock()
@@ -396,7 +386,6 @@ func (device *Device) RoutineHandshake() {
peer.TimerAnyAuthenticatedPacketReceived()
// update endpoint
// TODO: Discover destination address also, only update on change
peer.mutex.Lock()
peer.endpoint.set = true
@@ -453,6 +442,13 @@ func (device *Device) RoutineHandshake() {
continue
}
// update endpoint
peer.mutex.Lock()
peer.endpoint.set = true
peer.endpoint.value = elem.endpoint
peer.mutex.Unlock()
logDebug.Println("Received handshake initation from", peer)
peer.TimerEphemeralKeyCreated()
@@ -521,6 +517,13 @@ func (peer *Peer) RoutineSequentialReceiver() {
}
kp.mutex.Unlock()
// update endpoint
peer.mutex.Lock()
peer.endpoint.set = true
peer.endpoint.value = elem.endpoint
peer.mutex.Unlock()
// check for keep-alive
if len(elem.packet) == 0 {
@@ -552,7 +555,8 @@ func (peer *Peer) RoutineSequentialReceiver() {
src := elem.packet[IPv4offsetSrc : IPv4offsetSrc+net.IPv4len]
if device.routingTable.LookupIPv4(src) != peer {
logInfo.Println("Packet with unallowed source IP from", peer.String())
logInfo.Println(src)
logInfo.Println("Packet with unallowed source IPv4 from", peer.String())
continue
}
@@ -577,7 +581,8 @@ func (peer *Peer) RoutineSequentialReceiver() {
src := elem.packet[IPv6offsetSrc : IPv6offsetSrc+net.IPv6len]
if device.routingTable.LookupIPv6(src) != peer {
logInfo.Println("Packet with unallowed source IP from", peer.String())
logInfo.Println(src)
logInfo.Println("Packet with unallowed source IPv6 from", peer.String())
continue
}