Skip to content

Commit 3a06785

Browse files
authored
feat: Introduce Polyamide Traffic Control (#19)
* Implement traffic control mechanism for more advanced packet redirection, filtering, and to enable support for future features
1 parent a6d1139 commit 3a06785

27 files changed

Lines changed: 749 additions & 559 deletions

cmd/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ var runCmd = &cobra.Command{
3434
}
3535
if state.DBG_pprof {
3636
go func() {
37-
log.Println(http.ListenAndServe("localhost:6060", nil))
37+
log.Println(http.ListenAndServe("0.0.0.0:6060", nil))
3838
}()
3939
}
4040

core/nylon.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,12 @@ import (
1111

1212
// Nylon struct must be thread safe, since it can receive packets through PolyReceiver
1313
type Nylon struct {
14-
PolySock *device.PolySock
15-
PingBuf *ttlcache.Cache[uint64, EpPing]
16-
Device *device.Device
17-
Tun tun.Device
18-
wgUapi net.Listener
19-
env *state.Env
20-
itfName string
14+
PingBuf *ttlcache.Cache[uint64, EpPing]
15+
Device *device.Device
16+
Tun tun.Device
17+
wgUapi net.Listener
18+
env *state.Env
19+
itfName string
2120
}
2221

2322
func (n *Nylon) Init(s *state.State) error {

core/nylon_endpoints.go

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"github.com/encodeous/nylon/protocol"
77
"github.com/encodeous/nylon/state"
88
"github.com/jellydator/ttlcache/v3"
9-
"google.golang.org/protobuf/proto"
109
"math/rand/v2"
1110
"slices"
1211
"time"
@@ -16,7 +15,7 @@ type EpPing struct {
1615
TimeSent time.Time
1716
}
1817

19-
func (n *Nylon) Probe(e *state.Env, ep *state.DynamicEndpoint) error {
18+
func (n *Nylon) Probe(ep *state.DynamicEndpoint) error {
2019
token := rand.Uint64()
2120
ping := &protocol.Ny{
2221
Type: &protocol.Ny_ProbeOp{
@@ -26,25 +25,20 @@ func (n *Nylon) Probe(e *state.Env, ep *state.DynamicEndpoint) error {
2625
},
2726
},
2827
}
29-
marshal, err := proto.Marshal(ping)
28+
peer := n.Device.LookupPeer(device.NoisePublicKey(n.env.GetNode(ep.Node()).PubKey))
29+
err := n.SendNylon(ping, ep.NetworkEndpoint().GetWgEndpoint(n.Device), peer)
3030
if err != nil {
3131
return err
3232
}
3333

34-
n.Send(marshal, ep)
3534
n.PingBuf.Set(token, EpPing{
3635
TimeSent: time.Now(),
3736
}, ttlcache.DefaultTTL)
3837
return nil
3938
}
4039

41-
func (n *Nylon) Send(packet []byte, ep *state.DynamicEndpoint) {
42-
neigh := n.env.GetRouter(ep.Node())
43-
dev := n.Device
44-
n.PolySock.Send(packet, ep.NetworkEndpoint().GetWgEndpoint(dev), dev.LookupPeer(device.NoisePublicKey(neigh.PubKey)))
45-
}
46-
47-
func HandleProbe(e *state.Env, sock *device.PolySock, pkt *protocol.Ny_Probe, endpoint conn.Endpoint, peer *device.Peer, node state.NodeId) {
40+
func handleProbe(n *Nylon, pkt *protocol.Ny_Probe, endpoint conn.Endpoint, peer *device.Peer, node state.NodeId) {
41+
e := n.env
4842
if pkt.ResponseToken == nil {
4943
// ping
5044
// build pong response
@@ -53,11 +47,11 @@ func HandleProbe(e *state.Env, sock *device.PolySock, pkt *protocol.Ny_Probe, en
5347
res.ResponseToken = &token
5448

5549
// send pong
56-
pktBytes, err := proto.Marshal(&protocol.Ny{Type: &protocol.Ny_ProbeOp{ProbeOp: pkt}})
50+
err := n.SendNylon(&protocol.Ny{Type: &protocol.Ny_ProbeOp{ProbeOp: pkt}}, endpoint, peer)
5751
if err != nil {
52+
n.env.Log.Error("Failed to send nylon packet to node", "node", node, "error", err)
5853
return
5954
}
60-
sock.Send(pktBytes, endpoint, peer)
6155

6256
e.Dispatch(func(s *state.State) error {
6357
return handleProbePing(s, node, endpoint)
@@ -147,10 +141,10 @@ func handleProbePong(s *state.State, node state.NodeId, token uint64, ep conn.En
147141
func (n *Nylon) probeLinks(s *state.State, active bool) error {
148142
// probe links
149143
for _, neigh := range s.Neighbours {
150-
for _, dpLink := range neigh.Eps {
151-
if dpLink.IsActive() == active {
144+
for _, ep := range neigh.Eps {
145+
if ep.IsActive() == active {
152146
go func() {
153-
err := n.Probe(s.Env, dpLink)
147+
err := n.Probe(ep)
154148
if err != nil {
155149
s.Log.Debug("probe failed", "err", err.Error())
156150
}
@@ -185,7 +179,7 @@ func (n *Nylon) probeNew(s *state.State) error {
185179
dpl := state.NewEndpoint(ep, peer, false, nil)
186180
neigh.Eps = append(neigh.Eps, dpl)
187181
go func() {
188-
err := n.Probe(s.Env, dpl)
182+
err := n.Probe(dpl)
189183
if err != nil {
190184
//s.Log.Debug("discovery probe failed", "err", err.Error())
191185
}

core/nylon_passive.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,7 @@ func cleanPassivePeers(s *state.State) error {
3535
for _, client := range r.Clients {
3636
cCfg := s.GetClient(client)
3737
peer := n.Device.LookupPeer(device.NoisePublicKey(cCfg.PubKey))
38-
ep := peer.GetEndpoints()
39-
if len(ep) > 1 {
40-
peer.SetEndpoints(ep[:1]) // prevent littering of too many endpoints
41-
}
38+
peer.CleanEndpoints()
4239
if peer == nil || time.Now().Sub(peer.LastReceivedPacket()) > state.ClientDeadThreshold {
4340
// dead client
4441
s.Log.Debug("passive client dead", "node", client)

core/nylon_sock.go

Lines changed: 0 additions & 46 deletions
This file was deleted.

core/nylon_tc.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package core
2+
3+
import (
4+
"github.com/encodeous/nylon/polyamide/conn"
5+
"github.com/encodeous/nylon/polyamide/device"
6+
"github.com/encodeous/nylon/protocol"
7+
"github.com/encodeous/nylon/state"
8+
"google.golang.org/protobuf/proto"
9+
)
10+
11+
const (
12+
NyProtoId = 8
13+
NyPriority = 10
14+
)
15+
16+
// polyamide traffic control for nylon
17+
18+
func (n *Nylon) InstallTC() {
19+
// bounce back packets if using system routing
20+
if n.env.UseSystemRouting {
21+
n.Device.InstallFilter(func(dev *device.Device, packet *device.TCElement) (device.TCAction, error) {
22+
if packet.Incoming() {
23+
// bounce incoming packets
24+
return device.TcBounce, nil
25+
}
26+
return device.TcPass, nil
27+
})
28+
}
29+
30+
// bounce back packets destined for the current node
31+
n.Device.InstallFilter(func(dev *device.Device, packet *device.TCElement) (device.TCAction, error) {
32+
if n.env.GetNode(n.env.Id).Address == packet.GetDst() {
33+
//dev.Log.Verbosef("BounceCur packet: %v -> %v", packet.GetSrc(), packet.GetDst())
34+
return device.TcBounce, nil
35+
}
36+
return device.TcPass, nil
37+
})
38+
39+
// handle incoming nylon packets
40+
n.Device.InstallFilter(func(dev *device.Device, packet *device.TCElement) (device.TCAction, error) {
41+
if packet.Incoming() && packet.GetIPVersion() == NyProtoId {
42+
n.handleNylonPacket(packet.Payload(), packet.FromEp, packet.FromPeer)
43+
return device.TcDrop, nil
44+
}
45+
return device.TcPass, nil
46+
})
47+
}
48+
49+
func (n *Nylon) SendNylon(pkt proto.Message, endpoint conn.Endpoint, peer *device.Peer) error {
50+
tce := n.Device.NewTCElement()
51+
offset := device.MessageTransportOffsetContent + device.PolyHeaderSize
52+
buf, err := proto.MarshalOptions{
53+
Deterministic: true,
54+
}.MarshalAppend(tce.Buffer[offset:offset], pkt)
55+
if err != nil {
56+
n.Device.PutMessageBuffer(tce.Buffer)
57+
n.Device.PutTCElement(tce)
58+
return err
59+
}
60+
tce.InitPacket(NyProtoId, uint16(len(buf)+device.PolyHeaderSize))
61+
tce.Priority = device.TcHighPriority
62+
63+
tce.ToEp = endpoint
64+
tce.ToPeer = peer
65+
66+
// TODO: Optimize? is it worth it?
67+
68+
tcs := device.NewTCState()
69+
70+
n.Device.TCBatch([]*device.TCElement{tce}, tcs)
71+
return nil
72+
}
73+
74+
func (n *Nylon) handleNylonPacket(packet []byte, endpoint conn.Endpoint, peer *device.Peer) {
75+
pkt := &protocol.Ny{}
76+
err := proto.Unmarshal(packet, pkt)
77+
if err != nil {
78+
// log skipped message
79+
n.env.Log.Debug("Failed to unmarshal packet", "err", err)
80+
return
81+
}
82+
83+
e := n.env
84+
85+
neigh := e.FindNodeBy(state.NyPublicKey(peer.GetPublicKey()))
86+
if neigh == nil {
87+
// this should not be possible
88+
panic("impossible state, peer added, but not a node in the network")
89+
return
90+
}
91+
92+
switch pkt.Type.(type) {
93+
case *protocol.Ny_SeqnoRequestOp:
94+
e.Dispatch(func(s *state.State) error {
95+
return routerHandleSeqnoRequest(s, *neigh, pkt.GetSeqnoRequestOp())
96+
})
97+
case *protocol.Ny_RouteOp:
98+
e.Dispatch(func(s *state.State) error {
99+
return routerHandleRouteUpdate(s, *neigh, pkt.GetRouteOp())
100+
})
101+
case *protocol.Ny_ProbeOp:
102+
handleProbe(n, pkt.GetProbeOp(), endpoint, peer, *neigh)
103+
}
104+
defer func() {
105+
err := recover()
106+
if err != nil {
107+
n.env.Log.Error("panic while handling poly socket: %v", err)
108+
}
109+
}()
110+
}

core/nylon_wireguard.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package core
22

33
import (
4+
"cmp"
45
"encoding/hex"
56
"fmt"
67
"github.com/encodeous/nylon/polyamide/conn"
@@ -22,15 +23,13 @@ func (n *Nylon) initWireGuard(s *state.State) error {
2223
return err
2324
}
2425

25-
dev.AllowAllInbound = true
26-
dev.UseSystemRouting = s.UseSystemRouting
27-
n.PolySock = dev.PolyListen(n)
28-
s.Log.Info("started polysock listener")
29-
3026
n.Device = dev
3127
n.Tun = tdev
3228
n.itfName = itfName
3329

30+
n.InstallTC()
31+
s.Log.Info("installed nylon traffic control filter for polysock")
32+
3433
// TODO: fully convert to code-based api
3534
err = dev.IpcSet(
3635
fmt.Sprintf(
@@ -169,11 +168,16 @@ func UpdateWireGuard(s *state.State) error {
169168
if nhNeigh != nil {
170169
links := slices.Clone(nhNeigh.Eps)
171170
slices.SortStableFunc(links, func(a, b *state.DynamicEndpoint) int {
172-
return -int(a.Metric() - b.Metric())
171+
return cmp.Compare(a.Metric(), b.Metric())
173172
})
174173
for _, ep := range links {
175174
eps = append(eps, ep.NetworkEndpoint().GetWgEndpoint(n.Device))
176175
}
176+
//if nhNeigh.Id == "melon" {
177+
// for _, link := range links {
178+
// fmt.Printf("link: %s, %d\n", link.NetworkEndpoint().Ep, link.Metric())
179+
// }
180+
//}
177181
}
178182

179183
// add endpoint if it is not in the list

core/router_utils.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package core
22

33
import (
44
"fmt"
5+
"github.com/encodeous/nylon/polyamide/device"
56
"github.com/encodeous/nylon/protocol"
67
"github.com/encodeous/nylon/state"
78
"google.golang.org/protobuf/proto"
@@ -24,11 +25,11 @@ func broadcast(s *state.State, message proto.Message, neighs []*state.Neighbour)
2425
// TODO, investigate effect of packet loss on control messages
2526
best := neigh.BestEndpoint()
2627
if best != nil && best.IsActive() {
27-
marshal, err := proto.Marshal(message)
28+
peer := n.Device.LookupPeer(device.NoisePublicKey(n.env.GetNode(best.Node()).PubKey))
29+
err := n.SendNylon(message, nil, peer)
2830
if err != nil {
2931
s.Env.Log.Error("error while broadcasting", "err", err.Error())
3032
}
31-
n.Send(marshal, best)
3233
}
3334
}
3435
}()

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/encodeous/nylon
22

3-
go 1.24.0
3+
go 1.24.4
44

55
require (
66
github.com/docker/docker v27.1.1+incompatible

integration/harness.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,10 +260,11 @@ func (i *InMemoryNetwork) virtualRouteTable(node state.NodeId, src, dst netip.Ad
260260
if n.Address == dst {
261261
select {
262262
case i.virtTun[curIdx].Outbound <- pkt: // send back into our tun to get routed by WireGuard/Polyamide
263+
return true
263264
default:
264-
panic(fmt.Sprintf("node %s's tun is not ready to accept data", n.Id))
265+
fmt.Printf("%s's tun is not ready to accept data\n", n.Id)
266+
return true
265267
}
266-
return true
267268
}
268269
}
269270
return false

0 commit comments

Comments
 (0)