Rework of entire locking system

Locking on the Device instance is now much more fined-grained,
seperating out the fields into "resources" st. most common interactions
only require a small number.
This commit is contained in:
Mathias Hall-Andersen
2018-02-02 16:40:14 +01:00
parent 1e42b14022
commit 029410b118
10 changed files with 386 additions and 239 deletions
+36 -27
View File
@@ -14,7 +14,6 @@ const (
)
type Peer struct {
id uint
isRunning AtomicBool
mutex deadlock.RWMutex
persistentKeepaliveInterval uint64
@@ -22,17 +21,20 @@ type Peer struct {
handshake Handshake
device *Device
endpoint Endpoint
stats struct {
stats struct {
txBytes uint64 // bytes send to peer (endpoint)
rxBytes uint64 // bytes received from peer
lastHandshakeNano int64 // nano seconds since epoch
}
time struct {
mutex deadlock.RWMutex
lastSend time.Time // last send message
lastHandshake time.Time // last completed handshake
nextKeepalive time.Time
}
signal struct {
newKeyPair Signal // size 1, new key pair was generated
handshakeCompleted Signal // size 1, handshake completed
@@ -41,7 +43,9 @@ type Peer struct {
messageSend Signal // size 1, message was send to peer
messageReceived Signal // size 1, authenticated message recv
}
timer struct {
// state related to WireGuard timers
keepalivePersistent Timer // set for persistent keepalives
@@ -54,17 +58,20 @@ type Peer struct {
sendLastMinuteHandshake bool
needAnotherKeepalive bool
}
queue struct {
nonce chan *QueueOutboundElement // nonce / pre-handshake queue
outbound chan *QueueOutboundElement // sequential ordering of work
inbound chan *QueueInboundElement // sequential ordering of work
}
routines struct {
mutex deadlock.Mutex // held when stopping / starting routines
starting sync.WaitGroup // routines pending start
stopping sync.WaitGroup // routines pending stop
stop Signal // size 0, stop all goroutines in peer
}
mac CookieGenerator
}
@@ -74,8 +81,22 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {
return nil, errors.New("Device closed")
}
device.mutex.Lock()
defer device.mutex.Unlock()
// lock resources
device.state.mutex.Lock()
defer device.state.mutex.Unlock()
device.noise.mutex.RLock()
defer device.noise.mutex.RUnlock()
device.peers.mutex.Lock()
defer device.peers.mutex.Unlock()
// check if over limit
if len(device.peers.keyMap) >= MaxPeers {
return nil, errors.New("Too many peers")
}
// create peer
@@ -94,32 +115,20 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {
peer.timer.handshakeDeadline = NewTimer()
peer.timer.handshakeTimeout = NewTimer()
// assign id for debugging
peer.id = device.idCounter
device.idCounter += 1
// check if over limit
if len(device.peers) >= MaxPeers {
return nil, errors.New("Too many peers")
}
// map public key
_, ok := device.peers[pk]
_, ok := device.peers.keyMap[pk]
if ok {
return nil, errors.New("Adding existing peer")
}
device.peers[pk] = peer
device.peers.keyMap[pk] = peer
// precompute DH
handshake := &peer.handshake
handshake.mutex.Lock()
handshake.remoteStatic = pk
handshake.precomputedStaticStatic =
device.privateKey.sharedSecret(handshake.remoteStatic)
handshake.precomputedStaticStatic = device.noise.privateKey.sharedSecret(pk)
handshake.mutex.Unlock()
// reset endpoint
@@ -134,11 +143,9 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {
// start peer
peer.device.state.mutex.Lock()
if peer.device.isUp.Get() {
peer.Start()
}
peer.device.state.mutex.Unlock()
return peer, nil
}
@@ -166,14 +173,12 @@ func (peer *Peer) SendBuffer(buffer []byte) error {
func (peer *Peer) String() string {
if peer.endpoint == nil {
return fmt.Sprintf(
"peer(%d unknown %s)",
peer.id,
"peer(unknown %s)",
base64.StdEncoding.EncodeToString(peer.handshake.remoteStatic[:]),
)
}
return fmt.Sprintf(
"peer(%d %s %s)",
peer.id,
"peer(%s %s)",
peer.endpoint.DstToString(),
base64.StdEncoding.EncodeToString(peer.handshake.remoteStatic[:]),
)
@@ -181,8 +186,12 @@ func (peer *Peer) String() string {
func (peer *Peer) Start() {
if peer.device.isClosed.Get() {
return
}
peer.routines.mutex.Lock()
defer peer.routines.mutex.Lock()
defer peer.routines.mutex.Unlock()
peer.device.log.Debug.Println("Starting:", peer.String())
@@ -222,7 +231,7 @@ func (peer *Peer) Start() {
func (peer *Peer) Stop() {
peer.routines.mutex.Lock()
defer peer.routines.mutex.Lock()
defer peer.routines.mutex.Unlock()
peer.device.log.Debug.Println("Stopping:", peer.String())