Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 113 additions & 13 deletions go/core/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,15 @@ func captureStackTrace() string {
}

var (
providerInitOnce sync.Once
providerInitOnce sync.Once
initialTracerProvider = otel.GetTracerProvider()
)

// TracerProvider returns the global tracer provider, creating it if needed.
func TracerProvider() *sdktrace.TracerProvider {
func TracerProvider() trace.TracerProvider {
if tp := otel.GetTracerProvider(); tp != nil {
if sdkTP, ok := tp.(*sdktrace.TracerProvider); ok {
return sdkTP
if tp != initialTracerProvider {
return tp
}
}

Expand All @@ -120,21 +121,80 @@ func TracerProvider() *sdktrace.TracerProvider {
}
})

return otel.GetTracerProvider().(*sdktrace.TracerProvider)
return otel.GetTracerProvider()
}

func registerSimpleSpanProcessor(processor sdktrace.SpanExporter) {
type spanProcessorRegistrar interface {
RegisterSpanProcessor(processor sdktrace.SpanProcessor)
}

tp := TracerProvider()
if registrar, ok := tp.(spanProcessorRegistrar); ok {
registrar.RegisterSpanProcessor(sdktrace.NewSimpleSpanProcessor(processor))
}
}

func registerBatchSpanProcessor(processor sdktrace.SpanExporter) (shutdown func(context.Context) error) {
type registrar interface {
RegisterSpanProcessor(processor sdktrace.SpanProcessor)
}
type shutDowner interface {
Shutdown(ctx context.Context) error
}

tp := TracerProvider()
if r, ok := tp.(registrar); ok {
r.RegisterSpanProcessor(sdktrace.NewBatchSpanProcessor(processor))
if s, ok := tp.(shutDowner); ok {
return s.Shutdown
}
}
return func(context.Context) error { return nil }
}

// Tracer returns a tracer from the global tracer provider.
func Tracer() trace.Tracer {
return TracerProvider().Tracer("genkit-tracer", trace.WithInstrumentationVersion("v1"))
}

// --- Context-scoped tracer provider support ---

// tracerProviderCtxKey stores an optional trace.TracerProvider in context to allow
// subsystems (e.g., reflection server) to use their own provider without affecting the global one.
var tracerProviderCtxKey = base.NewContextKey[trace.TracerProvider]()

// ContextWithTracerProvider returns a new context that carries the given tracer provider.
func ContextWithTracerProvider(ctx context.Context, tp trace.TracerProvider) context.Context {
if tp == nil {
return ctx
}
return tracerProviderCtxKey.NewContext(ctx, tp)
}

// TracerProviderFromContext returns the tracer provider stored in context if present,
// otherwise falls back to the global provider.
func TracerProviderFromContext(ctx context.Context) trace.TracerProvider {
if ctx != nil {
if tp := tracerProviderCtxKey.FromContext(ctx); tp != nil {
return tp
}
}
return TracerProvider()
}

// TracerFromContext returns a tracer from the context-bound provider if available,
// otherwise from the global provider.
func TracerFromContext(ctx context.Context) trace.Tracer {
return TracerProviderFromContext(ctx).Tracer("genkit-tracer", trace.WithInstrumentationVersion("v1"))
}

// WriteTelemetryImmediate adds a telemetry server to the global tracer provider.
// Traces are saved immediately as they are finished.
// Use this for a gtrace.Store with a fast Save method,
// such as one that writes to a file.
func WriteTelemetryImmediate(client TelemetryClient) {
e := newTelemetryServerExporter(client)
TracerProvider().RegisterSpanProcessor(sdktrace.NewSimpleSpanProcessor(e))
registerSimpleSpanProcessor(newTelemetryServerExporter(client))
}

// WriteTelemetryBatch adds a telemetry server to the global tracer provider.
Expand All @@ -145,9 +205,48 @@ func WriteTelemetryImmediate(client TelemetryClient) {
// Callers must invoke the returned function at the end of the program to flush the final batch
// and perform other cleanup.
func WriteTelemetryBatch(client TelemetryClient) (shutdown func(context.Context) error) {
e := newTelemetryServerExporter(client)
TracerProvider().RegisterSpanProcessor(sdktrace.NewBatchSpanProcessor(e))
return TracerProvider().Shutdown
return registerBatchSpanProcessor(newTelemetryServerExporter(client))
}

// --- Provider-scoped exporter registration helpers ---

// registerSimpleSpanProcessorOnProvider registers a simple span processor on the provided tracer provider.
func registerSimpleSpanProcessorOnProvider(tp trace.TracerProvider, processor sdktrace.SpanExporter) {
type spanProcessorRegistrar interface {
RegisterSpanProcessor(processor sdktrace.SpanProcessor)
}
if registrar, ok := tp.(spanProcessorRegistrar); ok {
registrar.RegisterSpanProcessor(sdktrace.NewSimpleSpanProcessor(processor))
}
}

// registerBatchSpanProcessorOnProvider registers a batch span processor on the provided tracer provider.
// It returns a shutdown function if supported by the provider.
func registerBatchSpanProcessorOnProvider(tp trace.TracerProvider, processor sdktrace.SpanExporter) (shutdown func(context.Context) error) {
type registrar interface {
RegisterSpanProcessor(processor sdktrace.SpanProcessor)
}
type shutDowner interface {
Shutdown(ctx context.Context) error
}

if r, ok := tp.(registrar); ok {
r.RegisterSpanProcessor(sdktrace.NewBatchSpanProcessor(processor))
if s, ok := tp.(shutDowner); ok {
return s.Shutdown
}
}
return func(context.Context) error { return nil }
}

// WriteTelemetryImmediateOn adds a telemetry exporter to the specified tracer provider (simple/immediate).
func WriteTelemetryImmediateOn(tp trace.TracerProvider, client TelemetryClient) {
registerSimpleSpanProcessorOnProvider(tp, newTelemetryServerExporter(client))
}

// WriteTelemetryBatchOn adds a telemetry exporter to the specified tracer provider (batched).
func WriteTelemetryBatchOn(tp trace.TracerProvider, client TelemetryClient) (shutdown func(context.Context) error) {
return registerBatchSpanProcessorOnProvider(tp, newTelemetryServerExporter(client))
}

const (
Expand Down Expand Up @@ -183,13 +282,14 @@ func RunInNewSpan[I, O any](
) (O, error) {
// TODO: support span links.
log := logger.FromContext(ctx)
log.Debug("span start", "name", metadata.Name)
defer log.Debug("span end", "name", metadata.Name)

if metadata == nil {
metadata = &SpanMetadata{}
}

log.Debug("span start", "name", metadata.Name)
defer log.Debug("span end", "name", metadata.Name)

parentSM := spanMetaKey.FromContext(ctx)
isRoot := metadata.IsRoot
if !isRoot && parentSM == nil {
Expand Down Expand Up @@ -236,7 +336,7 @@ func RunInNewSpan[I, O any](
opts = append(opts, trace.WithAttributes(attribute.String(spanTypeAttr, metadata.Type)))
}

ctx, span := Tracer().Start(ctx, metadata.Name, opts...)
ctx, span := TracerFromContext(ctx).Start(ctx, metadata.Name, opts...)
sm.TraceInfo = TraceInfo{
TraceID: span.SpanContext().TraceID().String(),
SpanID: span.SpanContext().SpanID().String(),
Expand Down
32 changes: 22 additions & 10 deletions go/genkit/reflection.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"github.com/firebase/genkit/go/core/logger"
"github.com/firebase/genkit/go/core/tracing"
"github.com/firebase/genkit/go/internal"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
tracepkg "go.opentelemetry.io/otel/trace"
)

type streamingCallback[Stream any] = func(context.Context, Stream) error
Expand All @@ -54,6 +56,7 @@ type runtimeFileData struct {
// reflectionServer encapsulates everything needed to serve the Reflection API.
type reflectionServer struct {
*http.Server
TracerProvider tracepkg.TracerProvider
RuntimeFilePath string // Path to the runtime file that was written at startup.
activeActions *activeActionsMap // Tracks active actions for cancellation support.
}
Expand Down Expand Up @@ -139,13 +142,17 @@ func startReflectionServer(ctx context.Context, g *Genkit, errCh chan<- error, s
}
}

// Create an isolated tracer provider for the reflection server so it doesn't conflict with the global provider.
tp := sdktrace.NewTracerProvider()

s := &reflectionServer{
Server: &http.Server{
Addr: addr,
},
activeActions: newActiveActionsMap(),
TracerProvider: tp,
activeActions: newActiveActionsMap(),
}
s.Handler = serveMux(g, s)
s.Handler = serveMux(g, s, tp)

slog.Debug("starting reflection server", "addr", s.Addr)

Expand Down Expand Up @@ -289,7 +296,7 @@ func findProjectRoot() (string, error) {
}

// serveMux returns a new ServeMux configured for the required Reflection API endpoints.
func serveMux(g *Genkit, s *reflectionServer) *http.ServeMux {
func serveMux(g *Genkit, s *reflectionServer, tp tracepkg.TracerProvider) *http.ServeMux {
mux := http.NewServeMux()
// Skip wrapHandler here to avoid logging constant polling requests.
mux.HandleFunc("GET /api/__health", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -299,17 +306,21 @@ func serveMux(g *Genkit, s *reflectionServer) *http.ServeMux {
}
w.WriteHeader(http.StatusOK)
})
mux.HandleFunc("GET /api/actions", wrapReflectionHandler(handleListActions(g)))
mux.HandleFunc("POST /api/runAction", wrapReflectionHandler(handleRunAction(g, s.activeActions)))
mux.HandleFunc("POST /api/notify", wrapReflectionHandler(handleNotify()))
mux.HandleFunc("POST /api/cancelAction", wrapReflectionHandler(handleCancelAction(s.activeActions)))
mux.HandleFunc("GET /api/actions", wrapReflectionHandler(tp, handleListActions(g)))
mux.HandleFunc("POST /api/runAction", wrapReflectionHandler(tp, handleRunAction(g, s.activeActions)))
mux.HandleFunc("POST /api/notify", wrapReflectionHandler(tp, handleNotify(tp)))
mux.HandleFunc("POST /api/cancelAction", wrapReflectionHandler(tp, handleCancelAction(s.activeActions)))

return mux
}

// wrapReflectionHandler wraps an HTTP handler function with common logging and error handling.
func wrapReflectionHandler(h func(w http.ResponseWriter, r *http.Request) error) http.HandlerFunc {
func wrapReflectionHandler(tp tracepkg.TracerProvider, h func(w http.ResponseWriter, r *http.Request) error) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// Inject the reflection tracer provider into the request context so spans use the isolated provider.
ctx = tracing.ContextWithTracerProvider(ctx, tp)
r = r.WithContext(ctx)
logger.FromContext(ctx).Debug("request start", "method", r.Method, "path", r.URL.Path)

var err error
Expand Down Expand Up @@ -558,7 +569,7 @@ func handleCancelAction(activeActions *activeActionsMap) func(w http.ResponseWri
}

// handleNotify configures the telemetry server URL from the request.
func handleNotify() func(w http.ResponseWriter, r *http.Request) error {
func handleNotify(tp tracepkg.TracerProvider) func(w http.ResponseWriter, r *http.Request) error {
return func(w http.ResponseWriter, r *http.Request) error {
var body struct {
TelemetryServerURL string `json:"telemetryServerUrl"`
Expand All @@ -571,7 +582,8 @@ func handleNotify() func(w http.ResponseWriter, r *http.Request) error {
}

if os.Getenv("GENKIT_TELEMETRY_SERVER") == "" && body.TelemetryServerURL != "" {
tracing.WriteTelemetryImmediate(tracing.NewHTTPTelemetryClient(body.TelemetryServerURL))
// Attach telemetry exporter to the reflection tracer provider only.
tracing.WriteTelemetryImmediateOn(tp, tracing.NewHTTPTelemetryClient(body.TelemetryServerURL))
slog.Debug("connected to telemetry server", "url", body.TelemetryServerURL)
}

Expand Down
8 changes: 4 additions & 4 deletions go/genkit/reflection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestServeMux(t *testing.T) {
Server: &http.Server{},
activeActions: newActiveActionsMap(),
}
ts := httptest.NewServer(serveMux(g, s))
ts := httptest.NewServer(serveMux(g, s, tracing.TracerProvider()))
s.Addr = strings.TrimPrefix(ts.URL, "http://")
defer ts.Close()

Expand Down Expand Up @@ -314,7 +314,7 @@ func TestEarlyTraceIDTransmission(t *testing.T) {
})

s := &reflectionServer{Server: &http.Server{}, activeActions: newActiveActionsMap()}
ts := httptest.NewServer(serveMux(g, s))
ts := httptest.NewServer(serveMux(g, s, tracing.TracerProvider()))
defer ts.Close()

t.Run("headers arrive before body completes", func(t *testing.T) {
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestActionCancellation(t *testing.T) {
})

s := &reflectionServer{Server: &http.Server{}, activeActions: newActiveActionsMap()}
ts := httptest.NewServer(serveMux(g, s))
ts := httptest.NewServer(serveMux(g, s, tracing.TracerProvider()))
defer ts.Close()

// Start action in background
Expand Down Expand Up @@ -527,7 +527,7 @@ func TestCancelActionEndpoint(t *testing.T) {
Server: &http.Server{},
activeActions: newActiveActionsMap(),
}
ts := httptest.NewServer(serveMux(g, s))
ts := httptest.NewServer(serveMux(g, s, tracing.TracerProvider()))
defer ts.Close()

t.Run("cancel non-existent action", func(t *testing.T) {
Expand Down