Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/go-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ jobs:
run: go build

- name: Run all tests
run: go test -v -tags=router_test,integration,e2e ./...
run: go run gotest.tools/gotestsum@latest -- -tags=router_test,integration,e2e ./...

8 changes: 5 additions & 3 deletions cmd/inspect.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cmd

import (
"fmt"

"github.com/encodeous/nylon/core"
"github.com/spf13/cobra"
)
Expand All @@ -11,16 +13,16 @@ var inspectCmd = &cobra.Command{
Short: "Inspects the current state of nylon",
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 1 {
println("Usage: nylon inspect <interface>")
fmt.Println("Usage: nylon inspect <interface>")
return
}
itf := args[0]
result, err := core.IPCGet(itf)
if err != nil {
println("Error:", err.Error())
fmt.Println("Error:", err.Error())
return
}
println(result)
fmt.Print(result)
},
GroupID: "ny",
}
Expand Down
19 changes: 17 additions & 2 deletions core/ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func IPCGet(itf string) (string, error) {
if err != nil {
return "", err
}
defer conn.Close()
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))

_, err = rw.WriteString("get=nylon\n")
Expand All @@ -37,7 +38,7 @@ func IPCGet(itf string) (string, error) {
if err != nil && err != io.EOF {
return "", err
}
return res, nil
return strings.TrimSuffix(res, "\x00"), nil
}

func HandleNylonIPCGet(s *state.State, rw *bufio.ReadWriter) error {
Expand All @@ -57,6 +58,16 @@ func HandleNylonIPCGet(s *state.State, rw *bufio.ReadWriter) error {
met = n.BestEndpoint().Metric()
}
sb.WriteString(fmt.Sprintf(" Metric: %d\n", met))
sb.WriteString(fmt.Sprintf(" Endpoints:\n"))
for _, ep := range n.Eps {
nep := ep.AsNylonEndpoint()
ap, err := nep.DynEP.Get()
if err != nil {
sb.WriteString(fmt.Sprintf(" - %s (unresolved)\n", nep.DynEP.Value))
} else {
sb.WriteString(fmt.Sprintf(" - %s (resolved: %s) active=%v metric=%d\n", nep.DynEP.Value, ap.String(), nep.IsActive(), nep.Metric()))
}
}
sb.WriteString(fmt.Sprintf(" Published Routes:\n"))
rt := make([]string, 0)
if len(n.Routes) == 0 {
Expand Down Expand Up @@ -119,11 +130,15 @@ func HandleNylonIPCGet(s *state.State, rw *bufio.ReadWriter) error {
slices.Sort(rt)
sb.WriteString(strings.Join(rt, "\n") + "\n")

sb.WriteRune(0)
_, err = rw.WriteString(sb.String())
if err != nil {
return err
}
err = rw.WriteByte(0)
if err != nil {
return err
}
return rw.Flush()
default:
return fmt.Errorf("unknown command %s", cmd)
}
Expand Down
40 changes: 18 additions & 22 deletions core/nylon.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package core

import (
"context"
"net"
"net/netip"
"time"
Expand All @@ -28,26 +27,7 @@ func (n *Nylon) Init(s *state.State) error {

s.Log.Debug("init nylon")

if len(s.DnsResolvers) != 0 {
s.Log.Debug("setting custom DNS resolvers", "resolvers", s.DnsResolvers)
net.DefaultResolver = &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
d := net.Dialer{
Timeout: time.Second * 10,
}
var err error
var conn net.Conn
for _, resolver := range s.DnsResolvers {
conn, err = d.DialContext(ctx, network, resolver)
if err == nil {
return conn, nil
}
}
return conn, err
},
}
}
state.SetResolvers(s.DnsResolvers)

// add neighbours
for _, peer := range s.GetPeers(s.Id) {
Expand All @@ -61,7 +41,7 @@ func (n *Nylon) Init(s *state.State) error {
}
cfg := s.GetRouter(peer)
for _, ep := range cfg.Endpoints {
stNeigh.Eps = append(stNeigh.Eps, state.NewEndpoint(ep, peer, false, nil))
stNeigh.Eps = append(stNeigh.Eps, state.NewEndpoint(ep, false, nil))
}

s.Neighbours = append(s.Neighbours, stNeigh)
Expand All @@ -85,6 +65,22 @@ func (n *Nylon) Init(s *state.State) error {
s.Env.RepeatTask(func(s *state.State) error {
return n.probeLinks(s, true)
}, state.ProbeDelay)
s.Env.RepeatTask(func(s *state.State) error {
// refresh dynamic endpoints
for _, neigh := range s.Neighbours {
for _, ep := range neigh.Eps {
if nep, ok := ep.(*state.NylonEndpoint); ok {
go func() {
_, err := nep.DynEP.Refresh()
if err != nil {
s.Log.Debug("failed to resolve endpoint", "ep", nep.DynEP.Value, "err", err.Error())
}
}()
}
}
}
return nil
}, state.EndpointResolveDelay)
s.Env.RepeatTask(func(s *state.State) error {
return n.probeLinks(s, false)
}, state.ProbeRecoveryDelay)
Expand Down
26 changes: 25 additions & 1 deletion core/nylon_distribution.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package core

import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
Expand All @@ -27,7 +29,29 @@ func FetchConfig(repoStr string, key state.NyPublicKey) (*state.CentralCfg, erro
}
cfgBody = file
} else if repo.Scheme == "http" || repo.Scheme == "https" {
res, err := http.Get(repo.String())
client := &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network string, addr string) (conn net.Conn, err error) {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
addrs, err := state.ResolveName(ctx, host)
if err != nil {
return nil, err
}
for _, ip := range addrs {
var dialer net.Dialer
conn, err = dialer.DialContext(ctx, network, net.JoinHostPort(ip.String(), port))
if err == nil {
break
}
}
return
},
},
}
res, err := client.Get(repo.String())
if err != nil {
return nil, fmt.Errorf("failed to fetch %s: %w", repo.String(), err)
}
Expand Down
35 changes: 23 additions & 12 deletions core/nylon_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type EpPing struct {
TimeSent time.Time
}

func (n *Nylon) Probe(ep *state.NylonEndpoint) error {
func (n *Nylon) Probe(node state.NodeId, ep *state.NylonEndpoint) error {
token := rand.Uint64()
ping := &protocol.Ny{
Type: &protocol.Ny_ProbeOp{
Expand All @@ -26,8 +26,12 @@ func (n *Nylon) Probe(ep *state.NylonEndpoint) error {
},
},
}
peer := n.Device.LookupPeer(device.NoisePublicKey(n.env.GetNode(ep.Node()).PubKey))
err := n.SendNylon(ping, ep.GetWgEndpoint(n.Device), peer)
peer := n.Device.LookupPeer(device.NoisePublicKey(n.env.GetNode(node).PubKey))
nep, err := ep.GetWgEndpoint(n.Device)
if err != nil {
return err
}
err = n.SendNylon(ping, nep, peer)
if err != nil {
return err
}
Expand Down Expand Up @@ -76,7 +80,8 @@ func handleProbePing(s *state.State, node state.NodeId, ep conn.Endpoint) {
for _, neigh := range s.Neighbours {
for _, dep := range neigh.Eps {
dep := dep.AsNylonEndpoint()
if dep.Ep == ep.DstIPPort() && neigh.Id == node {
ap, err := dep.DynEP.Get()
if err == nil && ap == ep.DstIPPort() && neigh.Id == node {
// we have a link

// refresh wireguard ep
Expand All @@ -88,7 +93,7 @@ func handleProbePing(s *state.State, node state.NodeId, ep conn.Endpoint) {
dep.Renew()

if state.DBG_log_probe {
s.Log.Debug("probe from", "addr", dep.Ep)
s.Log.Debug("probe from", "addr", ap.String())
}
return
}
Expand All @@ -97,7 +102,7 @@ func handleProbePing(s *state.State, node state.NodeId, ep conn.Endpoint) {
// create a new link if we dont have a link
for _, neigh := range s.Neighbours {
if neigh.Id == node {
newEp := state.NewEndpoint(ep.DstIPPort(), neigh.Id, true, ep)
newEp := state.NewEndpoint(state.NewDynamicEndpoint(ep.DstIPPort().String()), true, ep)
newEp.Renew()
neigh.Eps = append(neigh.Eps, newEp)
// push route update to improve convergence time
Expand All @@ -114,7 +119,8 @@ func handleProbePong(s *state.State, node state.NodeId, token uint64, ep conn.En
for _, neigh := range s.Neighbours {
for _, dpLink := range neigh.Eps {
dpLink := dpLink.AsNylonEndpoint()
if dpLink.Ep == ep.DstIPPort() && neigh.Id == node {
ap, err := dpLink.DynEP.Get()
if err == nil && ap == ep.DstIPPort() && neigh.Id == node {
linkHealth, ok := n.PingBuf.GetAndDelete(token)
if ok {
health := linkHealth.Value()
Expand Down Expand Up @@ -145,7 +151,7 @@ func (n *Nylon) probeLinks(s *state.State, active bool) error {
for _, ep := range neigh.Eps {
if ep.IsActive() == active {
go func() {
err := n.Probe(ep.AsNylonEndpoint())
err := n.Probe(neigh.Id, ep.AsNylonEndpoint())
if err != nil {
s.Log.Debug("probe failed", "err", err.Error())
}
Expand All @@ -169,18 +175,23 @@ func (n *Nylon) probeNew(s *state.State) error {
cfg := s.GetRouter(peer)
// assumption: we don't need to connect to the same endpoint again within the scope of the same node
for _, ep := range cfg.Endpoints {
if !ep.IsValid() {
ap, err := ep.Get()
if err != nil {
continue
}
idx := slices.IndexFunc(neigh.Eps, func(link state.Endpoint) bool {
return !link.IsRemote() && link.AsNylonEndpoint().Ep == ep
lap, err := link.AsNylonEndpoint().DynEP.Get()
if err != nil {
return false
}
return !link.IsRemote() && lap == ap
})
if idx == -1 {
// add the link to the neighbour
dpl := state.NewEndpoint(ep, peer, false, nil)
dpl := state.NewEndpoint(ep, false, nil)
neigh.Eps = append(neigh.Eps, dpl)
go func() {
err := n.Probe(dpl)
err := n.Probe(peer, dpl)
if err != nil {
//s.Log.Debug("discovery probe failed", "err", err.Error())
}
Expand Down
5 changes: 4 additions & 1 deletion core/nylon_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ func nylonGc(s *state.State) error {
n := 0
for _, x := range neigh.Eps {
x := x.AsNylonEndpoint()
if !x.IsActive() {
x.DynEP.Clear()
}
if x.IsAlive() {
neigh.Eps[n] = x
n++
} else {
s.Log.Debug("removed dead endpoint", "ep", x.Ep, "to", x.Node())
s.Log.Debug("removed dead endpoint", "ep", x.DynEP.String(), "to", neigh.Id)
}
}
neigh.Eps = neigh.Eps[:n]
Expand Down
20 changes: 16 additions & 4 deletions core/nylon_wireguard.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ listen_port=%d
rcfg := s.GetRouter(peer)
endpoints := make([]conn.Endpoint, 0)
for _, nep := range rcfg.Endpoints {
endpoint, err := n.Device.Bind().ParseEndpoint(nep.String())
ap, err := nep.Get()
if err != nil {
continue
}
endpoint, err := n.Device.Bind().ParseEndpoint(ap.String())
if err != nil {
return err
}
Expand Down Expand Up @@ -166,16 +170,24 @@ func UpdateWireGuard(s *state.State) error {
return cmp.Compare(a.Metric(), b.Metric())
})
for _, ep := range links {
eps = append(eps, ep.AsNylonEndpoint().GetWgEndpoint(n.Device))
nep, err := ep.AsNylonEndpoint().GetWgEndpoint(n.Device)
if err != nil {
continue
}
eps = append(eps, nep)
}
}

// add endpoint if it is not in the list
for _, ep := range pcfg.Endpoints {
ap, err := ep.Get()
if err != nil {
continue
}
if !slices.ContainsFunc(eps, func(endpoint conn.Endpoint) bool {
return endpoint.DstIPPort() == ep
return endpoint.DstIPPort() == ap
}) {
endpoint, err := n.Device.Bind().ParseEndpoint(ep.String())
endpoint, err := n.Device.Bind().ParseEndpoint(ap.String())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func flushIO(s *state.State) error {
continue
}
if best != nil && best.IsActive() {
peer := n.Device.LookupPeer(device.NoisePublicKey(n.env.GetNode(best.Node()).PubKey))
peer := n.Device.LookupPeer(device.NoisePublicKey(n.env.GetNode(neigh.Id).PubKey))
for {
bundle := &protocol.TransportBundle{}
tLength := 0
Expand Down
4 changes: 2 additions & 2 deletions core/router_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (m MockEndpoint) IsActive() bool {
}

func (m MockEndpoint) AsNylonEndpoint() *state.NylonEndpoint {
panic("MockEndpoint is not a NylonEndpoint")
return nil
}

func NewMockEndpoint(node state.NodeId, metric uint32) *MockEndpoint {
Expand Down Expand Up @@ -207,7 +207,7 @@ func AddLink(r *state.RouterState, ep *MockEndpoint) *MockEndpoint {
return nil
}

func RemoveLink(r *state.RouterState, ep state.Endpoint) {
func RemoveLink(r *state.RouterState, ep *MockEndpoint) {
for _, n := range r.Neighbours {
if n.Id == ep.Node() {
for i, e := range n.Eps {
Expand Down
Loading