Launch
This commit is contained in:
28
internal/dnsttcore/turbotunnel/clientid.go
Normal file
28
internal/dnsttcore/turbotunnel/clientid.go
Normal file
@@ -0,0 +1,28 @@
|
||||
package turbotunnel
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
)
|
||||
|
||||
// ClientID is an abstract identifier that binds together all the communications
|
||||
// belonging to a single client session, even though those communications may
|
||||
// arrive from multiple IP addresses or over multiple lower-level connections.
|
||||
// It plays the same role that an (IP address, port number) tuple plays in a
|
||||
// net.UDPConn: it's the return address pertaining to a long-lived abstract
|
||||
// client session. The client attaches its ClientID to each of its
|
||||
// communications, enabling the server to disambiguate requests among its many
|
||||
// clients. ClientID implements the net.Addr interface.
|
||||
type ClientID [8]byte
|
||||
|
||||
func NewClientID() ClientID {
|
||||
var id ClientID
|
||||
_, err := rand.Read(id[:])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
func (id ClientID) Network() string { return "clientid" }
|
||||
func (id ClientID) String() string { return hex.EncodeToString(id[:]) }
|
||||
22
internal/dnsttcore/turbotunnel/consts.go
Normal file
22
internal/dnsttcore/turbotunnel/consts.go
Normal file
@@ -0,0 +1,22 @@
|
||||
// Package turbotunnel is facilities for embedding packet-based reliability
|
||||
// protocols inside other protocols.
|
||||
//
|
||||
// https://github.com/net4people/bbs/issues/9
|
||||
package turbotunnel
|
||||
|
||||
import "errors"
|
||||
|
||||
// QueueSize is the size of send and receive queues in QueuePacketConn and
|
||||
// RemoteMap.
|
||||
const QueueSize = 128
|
||||
|
||||
var errClosedPacketConn = errors.New("operation on closed connection")
|
||||
var errNotImplemented = errors.New("not implemented")
|
||||
|
||||
// DummyAddr is a placeholder net.Addr, for when a programming interface
|
||||
// requires a net.Addr but there is none relevant. All DummyAddrs compare equal
|
||||
// to each other.
|
||||
type DummyAddr struct{}
|
||||
|
||||
func (addr DummyAddr) Network() string { return "dummy" }
|
||||
func (addr DummyAddr) String() string { return "dummy" }
|
||||
162
internal/dnsttcore/turbotunnel/queuepacketconn.go
Normal file
162
internal/dnsttcore/turbotunnel/queuepacketconn.go
Normal file
@@ -0,0 +1,162 @@
|
||||
package turbotunnel
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// taggedPacket is a combination of a []byte and a net.Addr, encapsulating the
|
||||
// return type of PacketConn.ReadFrom.
|
||||
type taggedPacket struct {
|
||||
P []byte
|
||||
Addr net.Addr
|
||||
}
|
||||
|
||||
// QueuePacketConn implements net.PacketConn by storing queues of packets. There
|
||||
// is one incoming queue (where packets are additionally tagged by the source
|
||||
// address of the peer that sent them). There are many outgoing queues, one for
|
||||
// each remote peer address that has been recently seen. The QueueIncoming
|
||||
// method inserts a packet into the incoming queue, to eventually be returned by
|
||||
// ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue,
|
||||
// which can later by accessed through the OutgoingQueue method.
|
||||
//
|
||||
// Besides the outgoing queues, there is also a one-element "stash" for each
|
||||
// remote peer address. You can stash a packet using the Stash method, and get
|
||||
// it back later by receiving from the channel returned by Unstash. The stash is
|
||||
// meant as a convenient place to temporarily store a single packet, such as
|
||||
// when you've read one too many packets from the send queue and need to store
|
||||
// the extra packet to be processed first in the next pass. It's the caller's
|
||||
// responsibility to Unstash what they have Stashed. Calling Stash does not put
|
||||
// the packet at the head of the send queue; if there is the possibility that a
|
||||
// packet has been stashed, it must be checked for by calling Unstash in
|
||||
// addition to OutgoingQueue.
|
||||
type QueuePacketConn struct {
|
||||
remotes *RemoteMap
|
||||
localAddr net.Addr
|
||||
recvQueue chan taggedPacket
|
||||
closeOnce sync.Once
|
||||
closed chan struct{}
|
||||
// What error to return when the QueuePacketConn is closed.
|
||||
err atomic.Value
|
||||
}
|
||||
|
||||
// NewQueuePacketConn makes a new QueuePacketConn, set to track recent peers
|
||||
// for at least a duration of timeout.
|
||||
func NewQueuePacketConn(localAddr net.Addr, timeout time.Duration) *QueuePacketConn {
|
||||
return &QueuePacketConn{
|
||||
remotes: NewRemoteMap(timeout),
|
||||
localAddr: localAddr,
|
||||
recvQueue: make(chan taggedPacket, QueueSize),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// QueueIncoming queues and incoming packet and its source address, to be
|
||||
// returned in a future call to ReadFrom.
|
||||
func (c *QueuePacketConn) QueueIncoming(p []byte, addr net.Addr) {
|
||||
select {
|
||||
case <-c.closed:
|
||||
// If we're closed, silently drop it.
|
||||
return
|
||||
default:
|
||||
}
|
||||
// Copy the slice so that the caller may reuse it.
|
||||
buf := make([]byte, len(p))
|
||||
copy(buf, p)
|
||||
select {
|
||||
case c.recvQueue <- taggedPacket{buf, addr}:
|
||||
default:
|
||||
// Drop the incoming packet if the receive queue is full.
|
||||
}
|
||||
}
|
||||
|
||||
// OutgoingQueue returns the queue of outgoing packets corresponding to addr,
|
||||
// creating it if necessary. The contents of the queue will be packets that are
|
||||
// written to the address in question using WriteTo.
|
||||
func (c *QueuePacketConn) OutgoingQueue(addr net.Addr) <-chan []byte {
|
||||
return c.remotes.SendQueue(addr)
|
||||
}
|
||||
|
||||
// Stash places p in the stash for addr, if the stash is not already occupied.
|
||||
// Returns true if the packet was placed in the stash, or false if the stash was
|
||||
// already occupied. This method is similar to WriteTo, except that it puts the
|
||||
// packet in the stash queue (accessible via Unstash), rather than the outgoing
|
||||
// queue (accessible via OutgoingQueue).
|
||||
func (c *QueuePacketConn) Stash(p []byte, addr net.Addr) bool {
|
||||
return c.remotes.Stash(addr, p)
|
||||
}
|
||||
|
||||
// Unstash returns the channel that represents the stash for addr.
|
||||
func (c *QueuePacketConn) Unstash(addr net.Addr) <-chan []byte {
|
||||
return c.remotes.Unstash(addr)
|
||||
}
|
||||
|
||||
// ReadFrom returns a packet and address previously stored by QueueIncoming.
|
||||
func (c *QueuePacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
|
||||
select {
|
||||
case <-c.closed:
|
||||
return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
|
||||
default:
|
||||
}
|
||||
select {
|
||||
case <-c.closed:
|
||||
return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
|
||||
case packet := <-c.recvQueue:
|
||||
return copy(p, packet.P), packet.Addr, nil
|
||||
}
|
||||
}
|
||||
|
||||
// WriteTo queues an outgoing packet for the given address. The queue can later
|
||||
// be retrieved using the OutgoingQueue method.
|
||||
func (c *QueuePacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
|
||||
select {
|
||||
case <-c.closed:
|
||||
return 0, &net.OpError{Op: "write", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
|
||||
default:
|
||||
}
|
||||
// Copy the slice so that the caller may reuse it.
|
||||
buf := make([]byte, len(p))
|
||||
copy(buf, p)
|
||||
select {
|
||||
case c.remotes.SendQueue(addr) <- buf:
|
||||
return len(buf), nil
|
||||
default:
|
||||
// Drop the outgoing packet if the send queue is full.
|
||||
return len(buf), nil
|
||||
}
|
||||
}
|
||||
|
||||
// closeWithError unblocks pending operations and makes future operations fail
|
||||
// with the given error. If err is nil, it becomes errClosedPacketConn.
|
||||
func (c *QueuePacketConn) closeWithError(err error) error {
|
||||
var newlyClosed bool
|
||||
c.closeOnce.Do(func() {
|
||||
newlyClosed = true
|
||||
// Store the error to be returned by future PacketConn
|
||||
// operations.
|
||||
if err == nil {
|
||||
err = errClosedPacketConn
|
||||
}
|
||||
c.err.Store(err)
|
||||
close(c.closed)
|
||||
})
|
||||
if !newlyClosed {
|
||||
return &net.OpError{Op: "close", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close unblocks pending operations and makes future operations fail with a
|
||||
// "closed connection" error.
|
||||
func (c *QueuePacketConn) Close() error {
|
||||
return c.closeWithError(nil)
|
||||
}
|
||||
|
||||
// LocalAddr returns the localAddr value that was passed to NewQueuePacketConn.
|
||||
func (c *QueuePacketConn) LocalAddr() net.Addr { return c.localAddr }
|
||||
|
||||
func (c *QueuePacketConn) SetDeadline(t time.Time) error { return errNotImplemented }
|
||||
func (c *QueuePacketConn) SetReadDeadline(t time.Time) error { return errNotImplemented }
|
||||
func (c *QueuePacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented }
|
||||
177
internal/dnsttcore/turbotunnel/remotemap.go
Normal file
177
internal/dnsttcore/turbotunnel/remotemap.go
Normal file
@@ -0,0 +1,177 @@
|
||||
package turbotunnel
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// remoteRecord is a record of a recently seen remote peer, with the time it was
|
||||
// last seen and queues of outgoing packets.
|
||||
type remoteRecord struct {
|
||||
Addr net.Addr
|
||||
LastSeen time.Time
|
||||
SendQueue chan []byte
|
||||
Stash chan []byte
|
||||
}
|
||||
|
||||
// RemoteMap manages a mapping of live remote peers, keyed by address, to their
|
||||
// respective send queues. Each peer has two queues: a primary send queue, and a
|
||||
// "stash". The primary send queue is returned by the SendQueue method. The
|
||||
// stash is an auxiliary one-element queue accessed using the Stash and Unstash
|
||||
// methods. The stash is meant for use by callers that need to "unread" a packet
|
||||
// that's already been removed from the primary send queue.
|
||||
//
|
||||
// RemoteMap's functions are safe to call from multiple goroutines.
|
||||
type RemoteMap struct {
|
||||
// We use an inner structure to avoid exposing public heap.Interface
|
||||
// functions to users of remoteMap.
|
||||
inner remoteMapInner
|
||||
// Synchronizes access to inner.
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// NewRemoteMap creates a RemoteMap that expires peers after a timeout.
|
||||
//
|
||||
// If the timeout is 0, peers never expire.
|
||||
//
|
||||
// The timeout does not have to be kept in sync with smux's idle timeout. If a
|
||||
// peer is removed from the map while the smux session is still live, the worst
|
||||
// that can happen is a loss of whatever packets were in the send queue at the
|
||||
// time. If smux later decides to send more packets to the same peer, we'll
|
||||
// instantiate a new send queue, and if the peer is ever seen again with a
|
||||
// matching address, we'll deliver them.
|
||||
func NewRemoteMap(timeout time.Duration) *RemoteMap {
|
||||
m := &RemoteMap{
|
||||
inner: remoteMapInner{
|
||||
byAge: make([]*remoteRecord, 0),
|
||||
byAddr: make(map[net.Addr]int),
|
||||
},
|
||||
}
|
||||
if timeout > 0 {
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(timeout / 2)
|
||||
now := time.Now()
|
||||
m.lock.Lock()
|
||||
m.inner.removeExpired(now, timeout)
|
||||
m.lock.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// SendQueue returns the send queue corresponding to addr, creating it if
|
||||
// necessary.
|
||||
func (m *RemoteMap) SendQueue(addr net.Addr) chan []byte {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
return m.inner.Lookup(addr, time.Now()).SendQueue
|
||||
}
|
||||
|
||||
// Stash places p in the stash corresponding to addr, if the stash is not
|
||||
// already occupied. Returns true if the p was placed in the stash, false
|
||||
// otherwise.
|
||||
func (m *RemoteMap) Stash(addr net.Addr, p []byte) bool {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
select {
|
||||
case m.inner.Lookup(addr, time.Now()).Stash <- p:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Unstash returns the channel that reads from the stash for addr.
|
||||
func (m *RemoteMap) Unstash(addr net.Addr) <-chan []byte {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
return m.inner.Lookup(addr, time.Now()).Stash
|
||||
}
|
||||
|
||||
// remoteMapInner is the inner type of RemoteMap, implementing heap.Interface.
|
||||
// byAge is the backing store, a heap ordered by LastSeen time, to facilitate
|
||||
// expiring old records. byAddr is a map from addresses to heap indices, to
|
||||
// allow looking up by address. Unlike RemoteMap, remoteMapInner requires
|
||||
// external synchonization.
|
||||
type remoteMapInner struct {
|
||||
byAge []*remoteRecord
|
||||
byAddr map[net.Addr]int
|
||||
}
|
||||
|
||||
// removeExpired removes all records whose LastSeen timestamp is more than
|
||||
// timeout in the past.
|
||||
func (inner *remoteMapInner) removeExpired(now time.Time, timeout time.Duration) {
|
||||
for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout {
|
||||
record := heap.Pop(inner).(*remoteRecord)
|
||||
close(record.SendQueue)
|
||||
}
|
||||
}
|
||||
|
||||
// Lookup finds the existing record corresponding to addr, or creates a new
|
||||
// one if none exists yet. It updates the record's LastSeen time and returns the
|
||||
// record.
|
||||
func (inner *remoteMapInner) Lookup(addr net.Addr, now time.Time) *remoteRecord {
|
||||
var record *remoteRecord
|
||||
i, ok := inner.byAddr[addr]
|
||||
if ok {
|
||||
// Found one, update its LastSeen.
|
||||
record = inner.byAge[i]
|
||||
record.LastSeen = now
|
||||
heap.Fix(inner, i)
|
||||
} else {
|
||||
// Not found, create a new one.
|
||||
record = &remoteRecord{
|
||||
Addr: addr,
|
||||
LastSeen: now,
|
||||
SendQueue: make(chan []byte, QueueSize),
|
||||
Stash: make(chan []byte, 1),
|
||||
}
|
||||
heap.Push(inner, record)
|
||||
}
|
||||
return record
|
||||
}
|
||||
|
||||
// heap.Interface for remoteMapInner.
|
||||
|
||||
func (inner *remoteMapInner) Len() int {
|
||||
if len(inner.byAge) != len(inner.byAddr) {
|
||||
panic("inconsistent remoteMap")
|
||||
}
|
||||
return len(inner.byAge)
|
||||
}
|
||||
|
||||
func (inner *remoteMapInner) Less(i, j int) bool {
|
||||
return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen)
|
||||
}
|
||||
|
||||
func (inner *remoteMapInner) Swap(i, j int) {
|
||||
inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i]
|
||||
inner.byAddr[inner.byAge[i].Addr] = i
|
||||
inner.byAddr[inner.byAge[j].Addr] = j
|
||||
}
|
||||
|
||||
func (inner *remoteMapInner) Push(x interface{}) {
|
||||
record := x.(*remoteRecord)
|
||||
if _, ok := inner.byAddr[record.Addr]; ok {
|
||||
panic("duplicate address in remoteMap")
|
||||
}
|
||||
// Insert into byAddr map.
|
||||
inner.byAddr[record.Addr] = len(inner.byAge)
|
||||
// Insert into byAge slice.
|
||||
inner.byAge = append(inner.byAge, record)
|
||||
}
|
||||
|
||||
func (inner *remoteMapInner) Pop() interface{} {
|
||||
n := len(inner.byAddr)
|
||||
// Remove from byAge slice.
|
||||
record := inner.byAge[n-1]
|
||||
inner.byAge[n-1] = nil
|
||||
inner.byAge = inner.byAge[:n-1]
|
||||
// Remove from byAddr map.
|
||||
delete(inner.byAddr, record.Addr)
|
||||
return record
|
||||
}
|
||||
Reference in New Issue
Block a user