From eb6c6bb71008fef5b97287fd5ec02a4cbd6ff7d9 Mon Sep 17 00:00:00 2001 From: encodeous Date: Sun, 8 Feb 2026 10:08:50 -0500 Subject: [PATCH 1/3] feat: real time packet trace --- cmd/inspect.go | 23 ++++++++++++----- core/entrypoint.go | 1 + core/ipc.go | 62 +++++++++++++++++++++++++++++++++++++++++++++ core/nylon_tc.go | 13 +++++++--- core/nylon_trace.go | 19 ++++++++++++++ core/router.go | 3 +++ go.mod | 1 + go.sum | 2 ++ 8 files changed, 114 insertions(+), 10 deletions(-) create mode 100644 core/nylon_trace.go diff --git a/cmd/inspect.go b/cmd/inspect.go index 35fe6ba..e50898e 100644 --- a/cmd/inspect.go +++ b/cmd/inspect.go @@ -16,17 +16,28 @@ var inspectCmd = &cobra.Command{ fmt.Println("Usage: nylon inspect ") return } - itf := args[0] - result, err := core.IPCGet(itf) - if err != nil { - fmt.Println("Error:", err.Error()) - return + if ok, _ := cmd.Flags().GetBool("trace"); ok { + itf := args[0] + err := core.IPCTrace(itf) + if err != nil { + fmt.Println("Error:", err.Error()) + return + } + } else { + itf := args[0] + result, err := core.IPCGet(itf) + if err != nil { + fmt.Println("Error:", err.Error()) + return + } + fmt.Print(result) } - fmt.Print(result) }, GroupID: "ny", } func init() { rootCmd.AddCommand(inspectCmd) + + inspectCmd.Flags().BoolP("trace", "t", false, "Enables live packet routing capture") } diff --git a/core/entrypoint.go b/core/entrypoint.go index 707b95c..60d3ab1 100644 --- a/core/entrypoint.go +++ b/core/entrypoint.go @@ -234,6 +234,7 @@ func Start(ccfg state.CentralCfg, ncfg state.LocalCfg, logLevel slog.Level, conf func initModules(s *state.State) error { var modules []state.NyModule + modules = append(modules, &NylonTrace{}) modules = append(modules, &NylonRouter{}) modules = append(modules, &Nylon{}) diff --git a/core/ipc.go b/core/ipc.go index cec2dac..7fab06f 100644 --- a/core/ipc.go +++ b/core/ipc.go @@ -2,6 +2,7 @@ package core import ( "bufio" + "context" "fmt" "io" "slices" @@ -41,6 +42,37 @@ func IPCGet(itf string) (string, error) { return strings.TrimSuffix(res, "\x00"), nil } +func IPCTrace(itf string) error { + conn, err := ipc.UAPIDial(itf) + if err != nil { + return err + } + defer conn.Close() + rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + _, err = rw.WriteString("get=nylon\n") + if err != nil { + return err + } + + _, err = rw.WriteString("trace\n") + if err != nil { + return err + } + err = rw.Flush() + if err != nil { + return err + } + + for { + str, err := rw.ReadString('\n') + if err != nil { + return err + } + fmt.Print(str) + } +} + func HandleNylonIPCGet(s *state.State, rw *bufio.ReadWriter) error { cmd, err := rw.ReadString('\n') if err != nil { @@ -139,6 +171,36 @@ func HandleNylonIPCGet(s *state.State, rw *bufio.ReadWriter) error { return err } return rw.Flush() + case "trace\n": + if !state.DBG_trace_tc { + return fmt.Errorf("trace mode is not enabled") + } + ctx, cancel := context.WithCancel(context.Background()) + t := Get[*NylonTrace](s) + go func() { + _, _ = rw.ReadByte() // wait for EOF + cancel() + }() + ch := make(chan interface{}) + t.Register(ch) + defer t.Unregister(ch) + for { + select { + case <-ctx.Done(): + return nil + case msg := <-ch: + if str, ok := msg.(string); ok { + _, err := rw.WriteString(str) + if err != nil { + return err + } + err = rw.Flush() + if err != nil { + return err + } + } + } + } default: return fmt.Errorf("unknown command %s", cmd) } diff --git a/core/nylon_tc.go b/core/nylon_tc.go index 0f6c05f..79bed43 100644 --- a/core/nylon_tc.go +++ b/core/nylon_tc.go @@ -1,6 +1,7 @@ package core import ( + "fmt" "net/netip" "github.com/encodeous/nylon/polyamide/conn" @@ -18,6 +19,7 @@ const ( func (n *Nylon) InstallTC(s *state.State) { r := Get[*NylonRouter](s) + t := Get[*NylonTrace](s) if state.DBG_trace_tc { n.Device.InstallFilter(func(dev *device.Device, packet *device.TCElement) (device.TCAction, error) { @@ -33,7 +35,7 @@ func (n *Nylon) InstallTC(s *state.State) { peer != nil && src != netip.IPv4Unspecified() && src != netip.IPv6Unspecified() && dst != netip.IPv4Unspecified() && dst != netip.IPv6Unspecified() { - dev.Log.Verbosef("Unhandled TC packet: %v -> %v, peer %s", packet.GetSrc(), packet.GetDst(), peer) + t.Submit(fmt.Sprintf("Unhandled TC packet: %v -> %v, peer %s\n", packet.GetSrc(), packet.GetDst(), peer)) } } return device.TcPass, nil @@ -56,7 +58,7 @@ func (n *Nylon) InstallTC(s *state.State) { if ok && !packet.Incoming() { packet.ToPeer = entry.Peer if state.DBG_trace_tc { - dev.Log.Verbosef("Fwd packet: %v -> %v, via %s", packet.GetSrc(), packet.GetDst(), entry.Nh) + t.Submit(fmt.Sprintf("Fwd packet: %v -> %v, via %s\n", packet.GetSrc(), packet.GetDst(), entry.Nh)) } return device.TcForward, nil } @@ -69,7 +71,7 @@ func (n *Nylon) InstallTC(s *state.State) { if ok { packet.ToPeer = entry.Peer if state.DBG_trace_tc { - dev.Log.Verbosef("Fwd packet: %v -> %v, via %s", packet.GetSrc(), packet.GetDst(), entry.Nh) + t.Submit(fmt.Sprintf("Fwd packet: %v -> %v, via %s\n", packet.GetSrc(), packet.GetDst(), entry.Nh)) } return device.TcForward, nil } @@ -87,7 +89,7 @@ func (n *Nylon) InstallTC(s *state.State) { } if ttl == 0 { if state.DBG_trace_tc { - dev.Log.Verbosef("TTL Expired: %v -> %v, via %s", packet.GetSrc(), packet.GetDst()) + t.Submit(fmt.Sprintf("TTL Expired: %v -> %v\n", packet.GetSrc(), packet.GetDst())) } return device.TcBounce, nil } @@ -103,6 +105,9 @@ func (n *Nylon) InstallTC(s *state.State) { entry, ok := r.ExitTable.Lookup(packet.GetDst()) // we should only accept packets destined to us, but not our passive clients if ok && entry.Nh == s.Id { + if state.DBG_trace_tc { + t.Submit(fmt.Sprintf("Exit: %v -> %v\n", packet.GetSrc(), packet.GetDst())) + } //dev.Log.Verbosef("BounceCur packet: %v -> %v", packet.GetSrc(), packet.GetDst()) return device.TcBounce, nil } diff --git a/core/nylon_trace.go b/core/nylon_trace.go new file mode 100644 index 0000000..511fe8b --- /dev/null +++ b/core/nylon_trace.go @@ -0,0 +1,19 @@ +package core + +import ( + "github.com/dustin/go-broadcast" + "github.com/encodeous/nylon/state" +) + +type NylonTrace struct { + broadcast.Broadcaster +} + +func (n *NylonTrace) Init(s *state.State) error { + n.Broadcaster = broadcast.NewBroadcaster(1024) + return nil +} + +func (n *NylonTrace) Cleanup(s *state.State) error { + return n.Broadcaster.Close() +} diff --git a/core/router.go b/core/router.go index 5422236..0946d0d 100644 --- a/core/router.go +++ b/core/router.go @@ -97,6 +97,9 @@ func (r *NylonRouter) BroadcastRequestSeqno(src state.Source, seqno uint16, hopC } func (r *NylonRouter) Log(event RouterEvent, desc string, args ...any) { + if event == NoEpToNeighbour { + return // ignored + } r.Env.Log.Debug(fmt.Sprintf("%s %s", event.String(), desc), args...) } diff --git a/go.mod b/go.mod index 5b0b23d..953cc7b 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( github.com/distribution/reference v0.6.0 // indirect github.com/docker/go-connections v0.6.0 // indirect github.com/docker/go-units v0.5.0 // indirect + github.com/dustin/go-broadcast v0.0.0-20211018055107-71439988bd91 // indirect github.com/ebitengine/purego v0.8.4 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/logr v1.4.3 // indirect diff --git a/go.sum b/go.sum index 529e16f..0dc7603 100644 --- a/go.sum +++ b/go.sum @@ -43,6 +43,8 @@ github.com/docker/go-connections v0.6.0 h1:LlMG9azAe1TqfR7sO+NJttz1gy6KO7VJBh+pM github.com/docker/go-connections v0.6.0/go.mod h1:AahvXYshr6JgfUJGdDCs2b5EZG/vmaMAntpSFH5BFKE= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/dustin/go-broadcast v0.0.0-20211018055107-71439988bd91 h1:jAUM3D1KIrJmwx60DKB+a/qqM69yHnu6otDGVa2t0vs= +github.com/dustin/go-broadcast v0.0.0-20211018055107-71439988bd91/go.mod h1:8rK6Kbo1Jd6sK22b24aPVgAm3jlNy1q1ft+lBALdIqA= github.com/ebitengine/purego v0.8.4 h1:CF7LEKg5FFOsASUj0+QwaXf8Ht6TlFxg09+S9wz0omw= github.com/ebitengine/purego v0.8.4/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/encodeous/metric v0.0.0-20251111175231-f339c2f7c4bd h1:B32Ob80QTv5MomcVt709TsiWyD0QrpUYtnwW1jQFNlE= From 29c2954a48778b3621f26484f57aae7411a03bd2 Mon Sep 17 00:00:00 2001 From: encodeous Date: Sun, 8 Feb 2026 11:20:37 -0500 Subject: [PATCH 2/3] test(e2e): update harness to support trace output --- e2e/harness.go | 148 +++++++++++++++----------------------- e2e/logging.go | 164 +++++++++++++++++++++++++++++++++++++++++++ e2e/recovery_test.go | 39 +++++----- 3 files changed, 238 insertions(+), 113 deletions(-) create mode 100644 e2e/logging.go diff --git a/e2e/harness.go b/e2e/harness.go index 47292cd..9269767 100644 --- a/e2e/harness.go +++ b/e2e/harness.go @@ -10,8 +10,6 @@ import ( "net/netip" "os" "path/filepath" - "regexp" - "strings" "sync" "testing" "time" @@ -38,27 +36,12 @@ type Harness struct { ctx context.Context Network *testcontainers.DockerNetwork Nodes map[string]testcontainers.Container - LogBuffers map[string]*LogBuffer + LogManager *LogManager ImageName string RootDir string Subnet string Gateway string } -type LogBuffer struct { - mu sync.Mutex - buf bytes.Buffer -} - -func (l *LogBuffer) Write(p []byte) (n int, err error) { - l.mu.Lock() - defer l.mu.Unlock() - return l.buf.Write(p) -} -func (l *LogBuffer) String() string { - l.mu.Lock() - defer l.mu.Unlock() - return l.buf.String() -} // NewHarness creates a test harness with a unique subnet func NewHarness(t *testing.T) *Harness { @@ -105,7 +88,7 @@ func NewHarness(t *testing.T) *Harness { ctx: ctx, Network: newNetwork, Nodes: make(map[string]testcontainers.Container), - LogBuffers: make(map[string]*LogBuffer), + LogManager: NewLogManager(), RootDir: rootDir, Subnet: subnet, Gateway: gateway, @@ -117,27 +100,6 @@ func NewHarness(t *testing.T) *Harness { return h } -var ansiRegex = regexp.MustCompile(`\x1b\[[0-9;]*[a-zA-Z]`) - -func StripAnsi(s string) string { - return ansiRegex.ReplaceAllString(s, "") -} - -type LogConsumer struct { - Name string - Buffer *LogBuffer -} - -func (g *LogConsumer) Accept(l testcontainers.Log) { - content := string(l.Content) - // Strip ANSI codes for easier processing and cleaner output - cleanContent := StripAnsi(content) - fmt.Printf("[%s] %s", g.Name, cleanContent) - if g.Buffer != nil { - g.Buffer.Write([]byte(cleanContent)) - } -} - type NodeSpec struct { Name string IP string @@ -194,80 +156,80 @@ func (h *Harness) StartNode(name string, ip string, centralConfigPath, nodeConfi } } }, + LogConsumerCfg: &testcontainers.LogConsumerConfig{ + Consumers: []testcontainers.LogConsumer{ + &UnifiedLogConsumer{Node: name, Manager: h.LogManager}, + }, + }, Name: h.t.Name() + "-" + name, } - container, err := testcontainers.GenericContainer(h.ctx, testcontainers.GenericContainerRequest{ + cont, err := testcontainers.GenericContainer(h.ctx, testcontainers.GenericContainerRequest{ ContainerRequest: req, Started: true, }) if err != nil { h.t.Fatalf("failed to start container %s: %v", name, err) } - buffer := &LogBuffer{} h.mu.Lock() - h.LogBuffers[name] = buffer - h.Nodes[name] = container + h.Nodes[name] = cont h.mu.Unlock() - container.FollowOutput(&LogConsumer{Name: name, Buffer: buffer}) - container.StartLogProducer(h.ctx) - return container + return cont } + func (h *Harness) WaitForLog(nodeName string, pattern string) { - h.mu.Lock() - buffer, ok := h.LogBuffers[nodeName] - h.mu.Unlock() - if !ok { - h.t.Fatalf("log buffer for node %s not found", nodeName) + h.waitFor(nodeName, SourceStdout, pattern, false) +} +func (h *Harness) WaitForMatch(nodeName string, pattern string) { + h.waitFor(nodeName, SourceStdout, pattern, true) +} +func (h *Harness) WaitForTrace(nodeName string, pattern string) { + h.waitFor(nodeName, SourceTrace, pattern, false) +} +func (h *Harness) waitFor(nodeName string, source LogSource, pattern string, isRegex bool) { + sub, err := h.LogManager.Subscribe(nodeName, source, pattern, isRegex) + if err != nil { + h.t.Fatalf("failed to subscribe: %v", err) } - // Poll the buffer - timeout := time.After(WaitTimeout) - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - for { - select { - case <-timeout: - h.t.Fatalf("timed out waiting for log pattern %q in node %s", pattern, nodeName) - case <-ticker.C: - if strings.Contains(buffer.String(), pattern) { - return - } - case <-h.ctx.Done(): - h.t.Fatal("context canceled") - } + defer h.LogManager.Unsubscribe(sub) + + select { + case <-sub.MatchCh: + return + case <-time.After(WaitTimeout): + h.t.Fatalf("timed out waiting for %s pattern %q in node %s", source, pattern, nodeName) + case <-h.ctx.Done(): + h.t.Fatal("context canceled") } } -func (h *Harness) WaitForMatch(nodeName string, pattern string) { + +type managerWriter struct { + node string + source LogSource + manager *LogManager +} + +func (w *managerWriter) Write(p []byte) (n int, err error) { + w.manager.Accept(w.node, w.source, string(p)) + return len(p), nil +} + +func (h *Harness) StartTrace(nodeName string) { h.mu.Lock() - buffer, ok := h.LogBuffers[nodeName] + container, ok := h.Nodes[nodeName] h.mu.Unlock() - if !ok { - h.t.Fatalf("log buffer for node %s not found", nodeName) - } - // Compile the regex once before the loop - re, err := regexp.Compile(pattern) - if err != nil { - h.t.Fatalf("invalid regex pattern %q: %v", pattern, err) + if !ok { + h.t.Fatalf("node %s not found", nodeName) } - // Poll the buffer - timeout := time.After(WaitTimeout) - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-timeout: - h.t.Fatalf("timed out waiting for regex match %q in node %s", pattern, nodeName) - case <-ticker.C: - // Check against the compiled regex - if re.MatchString(buffer.String()) { - return - } - case <-h.ctx.Done(): - h.t.Fatal("context canceled") + go func() { + _, r, err := container.Exec(h.ctx, []string{"nylon", "inspect", "nylon0", "--trace"}) + if err != nil { + return } - } + stdoutWriter := &managerWriter{node: nodeName, source: SourceTrace, manager: h.LogManager} + stdcopy.StdCopy(stdoutWriter, stdoutWriter, r) + }() } func (h *Harness) Cleanup() { h.mu.Lock() diff --git a/e2e/logging.go b/e2e/logging.go new file mode 100644 index 0000000..be3cbd8 --- /dev/null +++ b/e2e/logging.go @@ -0,0 +1,164 @@ +//go:build e2e + +package e2e + +import ( + "context" + "fmt" + "regexp" + "strings" + "sync" + + "github.com/testcontainers/testcontainers-go" +) + +var ansiRegex = regexp.MustCompile(`\x1b\[[0-9;]*[a-zA-Z]`) + +func StripAnsi(s string) string { + return ansiRegex.ReplaceAllString(s, "") +} + +type LogSource string + +const ( + SourceStdout LogSource = "stdout" + SourceStderr LogSource = "stderr" + SourceTrace LogSource = "trace" +) + +type LogEvent struct { + Node string + Source LogSource + Content string +} + +type LogSubscription struct { + Node string + Source LogSource + Pattern string + Regex *regexp.Regexp + MatchCh chan struct{} +} + +type LogManager struct { + mu sync.RWMutex + subscribers []*LogSubscription + // history keeps track of all logs to allow matching against past logs if needed + // however, the prompt implies a streaming approach for wait. + // Let's keep a simple buffer per node/source to allow "WaitFor" to check already received logs. + history map[string]map[LogSource]*strings.Builder + historyMu sync.RWMutex +} + +func NewLogManager() *LogManager { + return &LogManager{ + subscribers: make([]*LogSubscription, 0), + history: make(map[string]map[LogSource]*strings.Builder), + } +} + +func (m *LogManager) Accept(node string, source LogSource, content string) { + m.historyMu.Lock() + if _, ok := m.history[node]; !ok { + m.history[node] = make(map[LogSource]*strings.Builder) + } + if _, ok := m.history[node][source]; !ok { + m.history[node][source] = &strings.Builder{} + } + m.history[node][source].WriteString(content) + fullContent := m.history[node][source].String() + m.historyMu.Unlock() + + m.mu.RLock() + defer m.mu.RUnlock() + + for _, sub := range m.subscribers { + if sub.Node != node || sub.Source != source { + continue + } + matched := false + if sub.Regex != nil { + if sub.Regex.MatchString(content) || sub.Regex.MatchString(fullContent) { + matched = true + } + } else if sub.Pattern != "" { + if strings.Contains(content, sub.Pattern) || strings.Contains(fullContent, sub.Pattern) { + matched = true + } + } + + if matched { + select { + case sub.MatchCh <- struct{}{}: + default: + } + } + } +} + +func (m *LogManager) Subscribe(node string, source LogSource, pattern string, isRegex bool) (*LogSubscription, error) { + sub := &LogSubscription{ + Node: node, + Source: source, + MatchCh: make(chan struct{}, 1), + } + if isRegex { + re, err := regexp.Compile(pattern) + if err != nil { + return nil, err + } + sub.Regex = re + } else { + sub.Pattern = pattern + } + + m.mu.Lock() + m.subscribers = append(m.subscribers, sub) + m.mu.Unlock() + + // Check history immediately + m.historyMu.RLock() + defer m.historyMu.RUnlock() + if h, ok := m.history[node]; ok { + if b, ok := h[source]; ok { + content := b.String() + matched := false + if sub.Regex != nil { + matched = sub.Regex.MatchString(content) + } else { + matched = strings.Contains(content, sub.Pattern) + } + if matched { + sub.MatchCh <- struct{}{} + } + } + } + + return sub, nil +} + +func (m *LogManager) Unsubscribe(sub *LogSubscription) { + m.mu.Lock() + defer m.mu.Unlock() + for i, s := range m.subscribers { + if s == sub { + m.subscribers = append(m.subscribers[:i], m.subscribers[i+1:]...) + break + } + } +} + +type UnifiedLogConsumer struct { + Node string + Manager *LogManager +} + +func (c *UnifiedLogConsumer) Accept(l testcontainers.Log) { + source := SourceStdout + if l.LogType == "stderr" { + source = SourceStderr + } + content := StripAnsi(string(l.Content)) + fmt.Printf("[%s:%s] %s", c.Node, source, content) + c.Manager.Accept(c.Node, source, content) +} diff --git a/e2e/recovery_test.go b/e2e/recovery_test.go index 0855bd3..f651b73 100644 --- a/e2e/recovery_test.go +++ b/e2e/recovery_test.go @@ -4,7 +4,6 @@ package e2e import ( "fmt" - "strings" "testing" "time" @@ -85,6 +84,9 @@ func TestRecoveryExample(t *testing.T) { t.Log("Waiting for initial convergence...") h.WaitForLog(alice, fmt.Sprintf("new.prefix=%s/32", nylonIPs[bob])) + // Start tracing on Alice to observe packet forwarding + h.StartTrace(alice) + // 4. Verify connectivity Alice -> Bob t.Log("Verifying initial connectivity Alice -> Bob (Direct)") stdout, stderr, err := h.Exec(alice, []string{"ping", "-c", "3", nylonIPs[bob]}) @@ -93,7 +95,7 @@ func TestRecoveryExample(t *testing.T) { } // Check that traffic went directly to Bob - h.WaitForLog(alice, fmt.Sprintf("Fwd packet: %s -> %s, via %s", nylonIPs[alice], nylonIPs[bob], bob)) + h.WaitForTrace(alice, fmt.Sprintf("Fwd packet: %s -> %s, via %s", nylonIPs[alice], nylonIPs[bob], bob)) // 5. Break the link Alice-Bob t.Log("Breaking link Alice <-> Bob") @@ -108,26 +110,23 @@ func TestRecoveryExample(t *testing.T) { // 6. Wait for recovery t.Log("Waiting for recovery (rerouting)...") - deadline := time.Now().Add(1 * time.Minute) - recovered := false - for time.Now().Before(deadline) { - h.Exec(alice, []string{"ping", "-c", "1", "-W", "1", nylonIPs[bob]}) - - h.mu.Lock() - buf := h.LogBuffers[alice].String() - h.mu.Unlock() - - if strings.Contains(buf, fmt.Sprintf("Fwd packet: %s -> %s, via %s", nylonIPs[alice], nylonIPs[bob], vps)) { - recovered = true - break + // Start a background pinger to trigger routing + stopPinger := make(chan struct{}) + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-stopPinger: + return + case <-ticker.C: + h.Exec(alice, []string{"ping", "-c", "1", "-W", "1", nylonIPs[bob]}) + } } - time.Sleep(1 * time.Second) - } + }() - if !recovered { - h.PrintLogs(alice) - t.Fatal("Failed to recover route via VPS") - } + h.WaitForTrace(alice, fmt.Sprintf("Fwd packet: %s -> %s, via %s", nylonIPs[alice], nylonIPs[bob], vps)) + close(stopPinger) t.Log("Recovery successful! Traffic rerouted via VPS.") From e61acffd984e7125d3e6cf7e217a2ee621843958 Mon Sep 17 00:00:00 2001 From: Adam Chen Date: Sun, 8 Feb 2026 18:36:40 +0000 Subject: [PATCH 3/3] test(e2e): update recovery test to use nylon inspect --trace --- e2e/harness.go | 57 ++++++++++++++++++++++--- e2e/healthcheck_test.go | 25 +++-------- e2e/logging.go | 95 ++++++++++++++++++----------------------- e2e/recovery_test.go | 12 +++--- 4 files changed, 104 insertions(+), 85 deletions(-) diff --git a/e2e/harness.go b/e2e/harness.go index 9269767..5a8d367 100644 --- a/e2e/harness.go +++ b/e2e/harness.go @@ -10,9 +10,12 @@ import ( "net/netip" "os" "path/filepath" + "reflect" + "regexp" "sync" "testing" "time" + "unsafe" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" @@ -182,6 +185,21 @@ func (h *Harness) WaitForLog(nodeName string, pattern string) { func (h *Harness) WaitForMatch(nodeName string, pattern string) { h.waitFor(nodeName, SourceStdout, pattern, true) } +func (h *Harness) WaitForInspect(nodeName string, pattern string) { + start := time.Now() + re := regexp.MustCompile(pattern) + for { + if time.Since(start) > WaitTimeout { + stdout, _, _ := h.Exec(nodeName, []string{"nylon", "inspect", "nylon0"}) + h.t.Fatalf("timed out waiting for inspect pattern %q in node %s. Current inspect:\n%s", pattern, nodeName, stdout) + } + stdout, _, err := h.Exec(nodeName, []string{"nylon", "inspect", "nylon0"}) + if err == nil && re.MatchString(stdout) { + return + } + time.Sleep(500 * time.Millisecond) + } +} func (h *Harness) WaitForTrace(nodeName string, pattern string) { h.waitFor(nodeName, SourceTrace, pattern, false) } @@ -209,26 +227,55 @@ type managerWriter struct { } func (w *managerWriter) Write(p []byte) (n int, err error) { - w.manager.Accept(w.node, w.source, string(p)) + content := StripAnsi(string(p)) + fmt.Printf("[%s:%s] %s", w.node, w.source, content) + w.manager.Accept(w.node, w.source, content) return len(p), nil } +func GetUnexportedField(field reflect.Value) interface{} { + return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface() +} + func (h *Harness) StartTrace(nodeName string) { h.mu.Lock() - container, ok := h.Nodes[nodeName] + cont, ok := h.Nodes[nodeName] h.mu.Unlock() if !ok { h.t.Fatalf("node %s not found", nodeName) } + h.t.Logf("Started trace for %s", nodeName) + go func() { - _, r, err := container.Exec(h.ctx, []string{"nylon", "inspect", "nylon0", "--trace"}) + time.Sleep(time.Second) + execOptions := container.ExecOptions{ + Cmd: []string{"nylon", "inspect", "nylon0", "--trace"}, + AttachStdout: true, + AttachStderr: true, + Tty: false, + } + + docker := cont.(*testcontainers.DockerContainer) + // this is very sketchy, but testcontainers doesn't provide any API + v := reflect.ValueOf(docker) + y := GetUnexportedField(v.Elem().FieldByName("provider")).(*testcontainers.DockerProvider) + cli := y.Client() + + execIDResp, err := cli.ContainerExecCreate(h.ctx, cont.GetContainerID(), execOptions) if err != nil { - return + h.t.Fatalf("Failed to start trace for %s: %v", nodeName, err) } + + resp, err := cli.ContainerExecAttach(h.ctx, execIDResp.ID, container.ExecAttachOptions{}) + if err != nil { + h.t.Fatalf("Failed to start trace for %s: %v", nodeName, err) + } + defer resp.Close() + stdoutWriter := &managerWriter{node: nodeName, source: SourceTrace, manager: h.LogManager} - stdcopy.StdCopy(stdoutWriter, stdoutWriter, r) + _, _ = stdcopy.StdCopy(stdoutWriter, stdoutWriter, resp.Reader) }() } func (h *Harness) Cleanup() { diff --git a/e2e/healthcheck_test.go b/e2e/healthcheck_test.go index b21744c..7821488 100644 --- a/e2e/healthcheck_test.go +++ b/e2e/healthcheck_test.go @@ -100,8 +100,8 @@ func TestHealthcheckPing(t *testing.T) { // 5. Wait for convergence t.Log("Waiting for convergence...") - h.WaitForMatch("node3", ".+old.router=node2.+old.prefix=10.0.1.4\\/32.+new.router=node1.+new.prefix=10.0.1.4\\/32.+") - h.WaitForLog("node1", "prefix=10.0.0.3/32") + h.WaitForInspect("node3", `10\.0\.1\.4/32 via \(nh: node2, router: node1`) + h.WaitForInspect("node1", `10\.0\.0\.3/32 via node2`) // ping from 3 to 10.0.0.4 stdout, stderr, err := h.Exec("node3", []string{"ping", "-c", "3", "10.0.1.4"}) @@ -215,7 +215,7 @@ func TestHealthcheckHTTP(t *testing.T) { // Client should route to Backup (Metric 1000). t.Log("Step A: Waiting for routing to fallback (Primary DOWN)") - h.WaitForMatch("client", "prefix=10\\.0\\.3\\.1/32.+new.nh=backup") + h.WaitForInspect("client", `10\.0\.3\.1/32 via backup`) // B. Start HTTP Server on Primary t.Log("Step B: Starting HTTP server on Primary") @@ -228,7 +228,7 @@ func TestHealthcheckHTTP(t *testing.T) { // Primary should advertise Metric 10. // Client should switch to Primary. t.Log("Step C: Waiting for routing to switch to Primary (Primary UP)") - h.WaitForMatch("client", "prefix=10\\.0\\.3\\.1/32.+new.nh=primary") + h.WaitForInspect("client", `10\.0\.3\.1/32 via primary`) // D. Stop HTTP Server t.Log("Step D: Stopping HTTP server") @@ -236,19 +236,6 @@ func TestHealthcheckHTTP(t *testing.T) { bg.Wait() // E. Wait for fallback to Backup - - time.Sleep(5 * time.Second) - - h.mu.Lock() - logs := h.LogBuffers["client"].String() - h.mu.Unlock() - - idxPrimary := strings.LastIndex(logs, "new.nh=primary") - idxBackup := strings.LastIndex(logs, "new.nh=backup") - - if idxBackup > idxPrimary { - t.Log("Verified: Route switched back to backup") - } else { - t.Fatalf("Route failed to switch back to backup. Logs:\n%s", logs) - } + t.Log("Step E: Waiting for fallback to Backup") + h.WaitForInspect("client", `10\.0\.3\.1/32 via backup`) } diff --git a/e2e/logging.go b/e2e/logging.go index be3cbd8..e8422c5 100644 --- a/e2e/logging.go +++ b/e2e/logging.go @@ -3,7 +3,6 @@ package e2e import ( - "context" "fmt" "regexp" "strings" @@ -40,63 +39,72 @@ type LogSubscription struct { MatchCh chan struct{} } +type sourceKey struct { + node string + source LogSource +} + type LogManager struct { - mu sync.RWMutex - subscribers []*LogSubscription - // history keeps track of all logs to allow matching against past logs if needed - // however, the prompt implies a streaming approach for wait. - // Let's keep a simple buffer per node/source to allow "WaitFor" to check already received logs. - history map[string]map[LogSource]*strings.Builder - historyMu sync.RWMutex + mu sync.Mutex + subscribers map[sourceKey]*LogSubscription + histories map[sourceKey][]string } func NewLogManager() *LogManager { return &LogManager{ - subscribers: make([]*LogSubscription, 0), - history: make(map[string]map[LogSource]*strings.Builder), + subscribers: make(map[sourceKey]*LogSubscription), + histories: make(map[sourceKey][]string), } } func (m *LogManager) Accept(node string, source LogSource, content string) { - m.historyMu.Lock() - if _, ok := m.history[node]; !ok { - m.history[node] = make(map[LogSource]*strings.Builder) - } - if _, ok := m.history[node][source]; !ok { - m.history[node][source] = &strings.Builder{} - } - m.history[node][source].WriteString(content) - fullContent := m.history[node][source].String() - m.historyMu.Unlock() + m.mu.Lock() + defer m.mu.Unlock() - m.mu.RLock() - defer m.mu.RUnlock() + key := sourceKey{node, source} + m.histories[key] = append(m.histories[key], content) + m.checkMatchLocked(key) +} - for _, sub := range m.subscribers { - if sub.Node != node || sub.Source != source { - continue - } +func (m *LogManager) checkMatchLocked(key sourceKey) { + sub, ok := m.subscribers[key] + if !ok { + return + } + + history := m.histories[key] + for i, content := range history { matched := false if sub.Regex != nil { - if sub.Regex.MatchString(content) || sub.Regex.MatchString(fullContent) { + if sub.Regex.MatchString(content) { matched = true } } else if sub.Pattern != "" { - if strings.Contains(content, sub.Pattern) || strings.Contains(fullContent, sub.Pattern) { + if strings.Contains(content, sub.Pattern) { matched = true } } if matched { + m.histories[key] = history[i+1:] select { case sub.MatchCh <- struct{}{}: default: } + return } } } func (m *LogManager) Subscribe(node string, source LogSource, pattern string, isRegex bool) (*LogSubscription, error) { + m.mu.Lock() + defer m.mu.Unlock() + + key := sourceKey{node, source} + if _, ok := m.subscribers[key]; ok { + return nil, fmt.Errorf("node %s source %s already has a subscriber", node, source) + } + sub := &LogSubscription{ Node: node, Source: source, @@ -112,27 +120,8 @@ func (m *LogManager) Subscribe(node string, source LogSource, pattern string, is sub.Pattern = pattern } - m.mu.Lock() - m.subscribers = append(m.subscribers, sub) - m.mu.Unlock() - - // Check history immediately - m.historyMu.RLock() - defer m.historyMu.RUnlock() - if h, ok := m.history[node]; ok { - if b, ok := h[source]; ok { - content := b.String() - matched := false - if sub.Regex != nil { - matched = sub.Regex.MatchString(content) - } else { - matched = strings.Contains(content, sub.Pattern) - } - if matched { - sub.MatchCh <- struct{}{} - } - } - } + m.subscribers[key] = sub + m.checkMatchLocked(key) return sub, nil } @@ -140,11 +129,9 @@ func (m *LogManager) Subscribe(node string, source LogSource, pattern string, is func (m *LogManager) Unsubscribe(sub *LogSubscription) { m.mu.Lock() defer m.mu.Unlock() - for i, s := range m.subscribers { - if s == sub { - m.subscribers = append(m.subscribers[:i], m.subscribers[i+1:]...) - break - } + key := sourceKey{sub.Node, sub.Source} + if current, ok := m.subscribers[key]; ok && current == sub { + delete(m.subscribers, key) } } diff --git a/e2e/recovery_test.go b/e2e/recovery_test.go index f651b73..4780a3b 100644 --- a/e2e/recovery_test.go +++ b/e2e/recovery_test.go @@ -79,24 +79,23 @@ func TestRecoveryExample(t *testing.T) { } h.StartNodes(nodeSpecs...) + h.WaitForLog(alice, "Nylon has been initialized") + // Start tracing on Alice to observe packet forwarding + h.StartTrace(alice) // 3. Wait for full convergence t.Log("Waiting for initial convergence...") h.WaitForLog(alice, fmt.Sprintf("new.prefix=%s/32", nylonIPs[bob])) - // Start tracing on Alice to observe packet forwarding - h.StartTrace(alice) - // 4. Verify connectivity Alice -> Bob t.Log("Verifying initial connectivity Alice -> Bob (Direct)") + go h.Exec(alice, []string{"ping", "-c", "10", nylonIPs[bob]}) + h.WaitForTrace(alice, fmt.Sprintf("Fwd packet: %s -> %s, via %s", nylonIPs[alice], nylonIPs[bob], bob)) stdout, stderr, err := h.Exec(alice, []string{"ping", "-c", "3", nylonIPs[bob]}) if err != nil { t.Fatalf("Initial ping failed: %v\nStdout: %s\nStderr: %s", err, stdout, stderr) } - // Check that traffic went directly to Bob - h.WaitForTrace(alice, fmt.Sprintf("Fwd packet: %s -> %s, via %s", nylonIPs[alice], nylonIPs[bob], bob)) - // 5. Break the link Alice-Bob t.Log("Breaking link Alice <-> Bob") _, _, err = h.Exec(alice, []string{"ip", "route", "add", "blackhole", dockerIPs[bob] + "/32"}) @@ -124,7 +123,6 @@ func TestRecoveryExample(t *testing.T) { } } }() - h.WaitForTrace(alice, fmt.Sprintf("Fwd packet: %s -> %s, via %s", nylonIPs[alice], nylonIPs[bob], vps)) close(stopPinger)