Files
wireguard-go/device/channels.go
T

138 lines
3.6 KiB
Go
Raw Normal View History

2021-02-08 12:38:19 -08:00
/* SPDX-License-Identifier: MIT
*
2025-05-04 17:48:53 +02:00
* Copyright (C) 2017-2025 WireGuard LLC. All Rights Reserved.
2021-02-08 12:38:19 -08:00
*/
package device
2021-02-08 13:02:52 -08:00
import (
"runtime"
"sync"
)
2021-02-08 12:38:19 -08:00
// An outboundQueue is a channel of QueueOutboundElements awaiting encryption.
// An outboundQueue is ref-counted using its wg field.
// An outboundQueue created with newOutboundQueue has one reference.
// Every additional writer must call wg.Add(1).
// Every completed writer must call wg.Done().
// When no further writers will be added,
// call wg.Done to remove the initial reference.
// When the refcount hits 0, the queue's channel is closed.
type outboundQueue struct {
c chan *QueueOutboundElementsContainer
2021-02-08 12:38:19 -08:00
wg sync.WaitGroup
}
func newOutboundQueue() *outboundQueue {
q := &outboundQueue{
c: make(chan *QueueOutboundElementsContainer, QueueOutboundSize),
2021-02-08 12:38:19 -08:00
}
q.wg.Add(1)
go func() {
q.wg.Wait()
close(q.c)
}()
return q
}
// A inboundQueue is similar to an outboundQueue; see those docs.
type inboundQueue struct {
c chan *QueueInboundElementsContainer
2021-02-08 12:38:19 -08:00
wg sync.WaitGroup
}
func newInboundQueue() *inboundQueue {
q := &inboundQueue{
c: make(chan *QueueInboundElementsContainer, QueueInboundSize),
2021-02-08 12:38:19 -08:00
}
q.wg.Add(1)
go func() {
q.wg.Wait()
close(q.c)
}()
return q
}
// A handshakeQueue is similar to an outboundQueue; see those docs.
type handshakeQueue struct {
c chan QueueHandshakeElement
wg sync.WaitGroup
}
func newHandshakeQueue() *handshakeQueue {
q := &handshakeQueue{
c: make(chan QueueHandshakeElement, QueueHandshakeSize),
}
q.wg.Add(1)
go func() {
q.wg.Wait()
close(q.c)
}()
return q
}
2021-02-08 13:02:52 -08:00
type autodrainingInboundQueue struct {
c chan *QueueInboundElementsContainer
}
2021-02-08 13:02:52 -08:00
// newAutodrainingInboundQueue returns a channel that will be drained when it gets GC'd.
// It is useful in cases in which is it hard to manage the lifetime of the channel.
// The returned channel must not be closed. Senders should signal shutdown using
// some other means, such as sending a sentinel nil values.
func newAutodrainingInboundQueue(device *Device) *autodrainingInboundQueue {
2021-02-08 13:02:52 -08:00
q := &autodrainingInboundQueue{
c: make(chan *QueueInboundElementsContainer, QueueInboundSize),
2021-02-08 13:02:52 -08:00
}
runtime.SetFinalizer(q, device.flushInboundQueue)
return q
}
func (device *Device) flushInboundQueue(q *autodrainingInboundQueue) {
for {
select {
case elemsContainer := <-q.c:
elemsContainer.Lock()
for _, elem := range elemsContainer.elems {
device.PutMessageBuffer(elem.buffer)
device.PutInboundElement(elem)
}
device.PutInboundElementsContainer(elemsContainer)
default:
return
}
}
}
type autodrainingOutboundQueue struct {
c chan *QueueOutboundElementsContainer
2021-02-08 13:02:52 -08:00
}
// newAutodrainingOutboundQueue returns a channel that will be drained when it gets GC'd.
// It is useful in cases in which is it hard to manage the lifetime of the channel.
// The returned channel must not be closed. Senders should signal shutdown using
// some other means, such as sending a sentinel nil values.
// All sends to the channel must be best-effort, because there may be no receivers.
func newAutodrainingOutboundQueue(device *Device) *autodrainingOutboundQueue {
2021-02-08 13:02:52 -08:00
q := &autodrainingOutboundQueue{
c: make(chan *QueueOutboundElementsContainer, QueueOutboundSize),
2021-02-08 13:02:52 -08:00
}
runtime.SetFinalizer(q, device.flushOutboundQueue)
return q
2021-02-08 13:02:52 -08:00
}
func (device *Device) flushOutboundQueue(q *autodrainingOutboundQueue) {
for {
select {
case elemsContainer := <-q.c:
elemsContainer.Lock()
for _, elem := range elemsContainer.elems {
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
}
device.PutOutboundElementsContainer(elemsContainer)
default:
return
}
}
}