From f00be82e6088c4e692a1add1dfe0dcb349ed82fb Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Jan 2026 15:01:23 -0800 Subject: [PATCH 01/19] Create go-bidi-design.md --- docs/go-bidi-design.md | 634 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 634 insertions(+) create mode 100644 docs/go-bidi-design.md diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md new file mode 100644 index 0000000000..356290526c --- /dev/null +++ b/docs/go-bidi-design.md @@ -0,0 +1,634 @@ +# Genkit Go Bidirectional Streaming Features - Design Document + +## Overview + +This document describes the design for bidirectional streaming features in Genkit Go. The implementation introduces three new primitives: + +1. **BidiAction** - Core primitive for bidirectional operations +2. **BidiFlow** - BidiAction with observability, intended for user definition +3. **SessionFlow** - Stateful, multi-turn agent interactions with automatic persistence and turn semantics + +## Package Location + +All bidi types go in `go/core/x/` (experimental), which will move to `go/core/` when stabilized: + +``` +go/core/x/ +├── bidi.go # BidiActionDef, BidiFunc, BidiConnection +├── bidi_flow.go # BidiFlow with tracing +├── bidi_options.go # Option types for bidi +├── session_flow.go # SessionFlow implementation +├── bidi_test.go # Tests +``` + +High-level wrappers in `go/genkit/bidi.go`. + +Import as `corex "github.com/firebase/genkit/go/core/x"`. + +--- + +## 1. Core Type Definitions + +### 1.1 BidiAction + +```go +// BidiActionDef represents a bidirectional streaming action. +// Type parameters: +// - In: Type of each message sent to the action +// - Out: Type of the final output +// - Init: Type of initialization data (use struct{} if not needed) +// - Stream: Type of each streamed output chunk +type BidiActionDef[In, Out, Init, Stream any] struct { + name string + fn BidiFunc[In, Out, Init, Stream] + registry api.Registry + desc *api.ActionDesc +} + +// BidiFunc is the function signature for bidi actions. +type BidiFunc[In, Out, Init, Stream any] func( + ctx context.Context, + inputStream <-chan In, + init Init, + streamCallback core.StreamCallback[Stream], +) (Out, error) +``` + +### 1.2 BidiConnection + +```go +// BidiConnection represents an active bidirectional streaming session. +type BidiConnection[In, Out, Stream any] struct { + inputCh chan In // Internal, accessed via Send() + streamCh chan Stream // Internal output stream channel + doneCh chan struct{} // Closed when action completes + output Out // Final output (valid after done) + err error // Error if any (valid after done) + ctx context.Context + cancel context.CancelFunc + span tracing.Span // Trace span, ended on completion + mu sync.Mutex + closed bool +} + +// Send sends an input message to the bidi action. +func (c *BidiConnection[In, Out, Stream]) Send(input In) error + +// Close signals that no more inputs will be sent. +func (c *BidiConnection[In, Out, Stream]) Close() error + +// Stream returns an iterator for receiving streamed chunks. +// Each call returns a new iterator over the same underlying channel. +// Breaking out of the loop does NOT close the connection - you can call Stream() +// again to continue receiving. The iterator completes when the action finishes. +func (c *BidiConnection[In, Out, Stream]) Stream() iter.Seq2[Stream, error] + +// Output returns the final output after the action completes. +// Blocks until done or context cancelled. +func (c *BidiConnection[In, Out, Stream]) Output() (Out, error) + +// Done returns a channel closed when the connection completes. +func (c *BidiConnection[In, Out, Stream]) Done() <-chan struct{} +``` + +**Why iterators work for multi-turn:** Each call to `Stream()` returns an iterator over a new channel for that turn. When the agent finishes responding to an input (loops back to wait for the next input), the stream channel for that turn closes, causing the user's `for range` loop to exit naturally. Call `Stream()` again after sending the next input to get the next turn's response. + +### 1.3 BidiFlow + +```go +// BidiFlow wraps a BidiAction with flow semantics (tracing, monitoring). +type BidiFlow[In, Out, Init, Stream any] struct { + *BidiActionDef[In, Out, Init, Stream] + // Uses BidiActionDef.Name() for flow name - no separate field needed +} +``` + +### 1.4 SessionFlow + +SessionFlow adds session state management on top of BidiFlow. + +```go +// SessionFlowOutput wraps the output with session info for persistence. +type SessionFlowOutput[State, Out any] struct { + SessionID string `json:"sessionId"` + Output Out `json:"output"` + State State `json:"state"` +} + +// SessionFlow is a bidi flow with automatic session state management. +// Init = State: the initial state for new sessions (ignored when resuming an existing session). +type SessionFlow[State, In, Out, Stream any] struct { + *BidiFlow[In, SessionFlowOutput[State, Out], State, Stream] + store session.Store[State] + persistMode PersistMode +} + +// SessionFlowFunc is the function signature for session flows. +type SessionFlowFunc[State, In, Out, Stream any] func( + ctx context.Context, + inputStream <-chan In, + sess *session.Session[State], + cb core.StreamCallback[Stream], +) (Out, error) + +// PersistMode controls when session state is persisted. +type PersistMode int + +const ( + PersistOnClose PersistMode = iota // Persist only when connection closes (default) + PersistOnUpdate // Persist after each input message is processed +) +``` + +**Turn semantics**: The `SessionStreamCallback` includes a `turnDone` parameter. When the agent finishes responding to an input message, it calls `cb(ctx, lastChunk, true)` to signal the turn is complete. This allows clients to know when to prompt for the next user message. + +--- + +## 2. API Surface + +### 2.1 Defining Bidi Actions + +```go +// In go/core/x/bidi.go + +// NewBidiAction creates a BidiAction without registering it. +func NewBidiAction[In, Out, Init, Stream any]( + name string, + fn BidiFunc[In, Out, Init, Stream], +) *BidiActionDef[In, Out, Init, Stream] + +// DefineBidiAction creates and registers a BidiAction. +func DefineBidiAction[In, Out, Init, Stream any]( + r api.Registry, + name string, + fn BidiFunc[In, Out, Init, Stream], +) *BidiActionDef[In, Out, Init, Stream] +``` + +Schemas for `In`, `Out`, `Init`, and `Stream` types are automatically inferred from the type parameters using the existing JSON schema inference in `go/internal/base/json.go`. + +### 2.2 Defining Bidi Flows + +```go +// In go/core/x/bidi_flow.go + +func DefineBidiFlow[In, Out, Init, Stream any]( + r api.Registry, + name string, + fn BidiFunc[In, Out, Init, Stream], +) *BidiFlow[In, Out, Init, Stream] +``` + +### 2.3 Defining Session Flows + +```go +// In go/core/x/session_flow.go + +func DefineSessionFlow[State, In, Out, Stream any]( + r api.Registry, + name string, + fn SessionFlowFunc[State, In, Out, Stream], + opts ...SessionFlowOption[State], +) *SessionFlow[State, In, Out, Stream] + +// SessionFlowOption configures a SessionFlow. +type SessionFlowOption[State any] interface { + applySessionFlow(*sessionFlowOptions[State]) error +} + +func WithSessionStore[State any](store session.Store[State]) SessionFlowOption[State] +func WithPersistMode[State any](mode PersistMode) SessionFlowOption[State] +``` + +### 2.4 Starting Connections + +All bidi types (BidiAction, BidiFlow, SessionFlow) use the same `StreamBidi` method to start connections: + +```go +// BidiAction/BidiFlow +func (a *BidiActionDef[In, Out, Init, Stream]) StreamBidi( + ctx context.Context, + opts ...BidiOption[Init], +) (*BidiConnection[In, Out, Stream], error) + +// BidiOption for streaming +type BidiOption[Init any] interface { + applyBidi(*bidiOptions[Init]) error +} + +func WithInit[Init any](init Init) BidiOption[Init] + +// SessionFlow uses the same StreamBidi, with Init = State +// Additional option for session ID +func WithSessionID[Init any](id string) BidiOption[Init] + +func (sf *SessionFlow[State, In, Out, Stream]) StreamBidi( + ctx context.Context, + opts ...BidiOption[State], +) (*BidiConnection[In, SessionFlowOutput[State, Out], Stream], error) +``` + +### 2.5 High-Level Genkit API + +```go +// In go/genkit/bidi.go + +func DefineBidiFlow[In, Out, Init, Stream any]( + g *Genkit, + name string, + fn corex.BidiFunc[In, Out, Init, Stream], +) *corex.BidiFlow[In, Out, Init, Stream] + +func DefineSessionFlow[State, In, Out, Stream any]( + g *Genkit, + name string, + fn corex.SessionFlowFunc[State, In, Out, Stream], + opts ...corex.SessionFlowOption[State], +) *corex.SessionFlow[State, In, Out, Stream] +``` + +--- + +## 3. Session Flow Details + +### 3.1 Using StreamBidi with SessionFlow + +SessionFlow uses the same `StreamBidi` method as BidiAction and BidiFlow. Session ID is a connection option, and initial state is passed via `WithInit`: + +```go +// Define once at startup +chatAgent := genkit.DefineSessionFlow[ChatState, string, string, string](g, "chatAgent", + myAgentFunc, + corex.WithSessionStore(store), +) + +// NEW USER: Start fresh session (generates new ID, zero state) +conn1, _ := chatAgent.StreamBidi(ctx) + +// RETURNING USER: Resume existing session by ID +conn2, _ := chatAgent.StreamBidi(ctx, corex.WithSessionID[ChatState]("user-123-session")) + +// NEW USER WITH INITIAL STATE: Start with pre-populated state +conn3, _ := chatAgent.StreamBidi(ctx, corex.WithInit(ChatState{Messages: preloadedHistory})) + +// NEW USER WITH SPECIFIC ID AND INITIAL STATE +conn4, _ := chatAgent.StreamBidi(ctx, + corex.WithSessionID[ChatState]("custom-session-id"), + corex.WithInit(ChatState{Messages: preloadedHistory}), +) +``` + +The SessionFlow internally handles session creation/loading: +- If `WithSessionID` is provided and session exists in store → load existing session (WithInit ignored) +- If `WithSessionID` is provided but session doesn't exist → create new session with that ID and initial state from WithInit +- If no `WithSessionID` → generate new UUID and create session with initial state from WithInit + +The session ID is returned in `SessionFlowOutput.SessionID`, so callers can retrieve it from the final output: + +```go +output, _ := conn.Output() +sessionID := output.SessionID // Save this to resume later +``` + +### 3.2 State Persistence + +Persistence mode is configurable: + +```go +// Usage: +chatAgent := genkit.DefineSessionFlow[ChatState, string, string, string](g, "chatAgent", + fn, + corex.WithSessionStore(store), + corex.WithPersistMode(corex.PersistOnUpdate), // or PersistOnClose (default) +) +``` + +- **PersistOnClose** (default): State is persisted only when the connection closes. Better performance. +- **PersistOnUpdate**: State is persisted after each input message is processed. More durable. + +**Note**: `PersistOnUpdate` persists after each input from `inputStream` is processed, not on every `sess.UpdateState()` call. This covers the main use case (persist after each conversation turn) without requiring interface changes to `session.Session`. + +--- + +## 4. Integration with Existing Infrastructure + +### 4.1 Tracing Integration + +BidiFlows create spans that remain open for the lifetime of the connection, enabling streaming trace visualization in the Dev UI. + +```go +func (f *BidiFlow[In, Out, Init, Stream]) StreamBidi( + ctx context.Context, + opts ...BidiOption[Init], +) (*BidiConnection[In, Out, Stream], error) { + // Inject flow context + fc := &flowContext{flowName: f.Name()} + ctx = flowContextKey.NewContext(ctx, fc) + + // Start span (NOT RunInNewSpan - we manage lifecycle manually) + spanMeta := &tracing.SpanMetadata{ + Name: f.Name(), + Type: "action", + Subtype: "bidiFlow", + } + ctx, span := tracing.StartSpan(ctx, spanMeta) + + // Create connection, passing span for lifecycle management + conn, err := f.BidiActionDef.streamBidiWithSpan(ctx, span, opts...) + if err != nil { + span.End() // End span on error + return nil, err + } + return conn, nil +} + +// Inside BidiConnection, the span is ended when the action completes: +func (c *BidiConnection[...]) run() { + defer c.span.End() // End span when bidi flow completes + + // Run the action, recording events/nested spans as needed + output, err := c.fn(c.ctx, c.inputCh, c.init, c.streamCallback) + // ... +} +``` + +**Important**: The span stays open while the connection is active, allowing: +- Streaming traces to the Dev UI in real-time +- Nested spans for sub-operations (e.g., each LLM call) +- Events recorded as they happen + +### 4.2 Action Registration + +Add new action type: + +```go +// In go/core/api/action.go +const ( + ActionTypeBidiFlow ActionType = "bidi-flow" +) +``` + +### 4.3 Session Integration + +Use existing `Session` and `Store` types from `go/core/x/session` (remains a separate subpackage): + +```go +import "github.com/firebase/genkit/go/core/x/session" + +// SessionFlow holds reference to session store +type SessionFlow[State, In, Out, Stream any] struct { + store session.Store[State] + // ... +} +``` + +--- + +## 5. Example Usage + +### 5.1 Basic Echo Bidi Flow + +```go +package main + +import ( + "context" + "fmt" + + "github.com/firebase/genkit/go/core" + "github.com/firebase/genkit/go/genkit" +) + +func main() { + ctx := context.Background() + g := genkit.Init(ctx) + + // Define echo bidi flow (low-level, no turn semantics) + echoFlow := genkit.DefineBidiFlow[string, string, struct{}, string](g, "echo", + func(ctx context.Context, inputStream <-chan string, init struct{}, cb core.StreamCallback[string]) (string, error) { + var count int + for input := range inputStream { + count++ + if err := cb(ctx, fmt.Sprintf("echo: %s", input)); err != nil { + return "", err + } + } + return fmt.Sprintf("processed %d messages", count), nil + }, + ) + + // Start streaming connection + conn, err := echoFlow.StreamBidi(ctx) + if err != nil { + panic(err) + } + + // Send messages + conn.Send("hello") + conn.Send("world") + conn.Close() + + // Consume stream via iterator + for chunk, err := range conn.Stream() { + if err != nil { + panic(err) + } + fmt.Println(chunk) // "echo: hello", "echo: world" + } + + // Get final output + output, _ := conn.Output() + fmt.Println(output) // "processed 2 messages" +} +``` + +### 5.2 Chat Agent with Session Persistence + +```go +package main + +import ( + "context" + "fmt" + + "github.com/firebase/genkit/go/ai" + "github.com/firebase/genkit/go/core" + corex "github.com/firebase/genkit/go/core/x" + "github.com/firebase/genkit/go/core/x/session" + "github.com/firebase/genkit/go/genkit" + "github.com/firebase/genkit/go/plugins/googlegenai" +) + +type ChatState struct { + Messages []*ai.Message `json:"messages"` +} + +func main() { + ctx := context.Background() + store := session.NewInMemoryStore[ChatState]() + + g := genkit.Init(ctx, + genkit.WithPlugins(&googlegenai.GoogleAI{}), + genkit.WithDefaultModel("googleai/gemini-2.5-flash"), + ) + + // Define a session flow for multi-turn chat + chatAgent := genkit.DefineSessionFlow[ChatState, string, string, string](g, "chatAgent", + func(ctx context.Context, inputStream <-chan string, sess *session.Session[ChatState], cb core.StreamCallback[string]) (string, error) { + state := sess.State() + messages := state.Messages + + for userInput := range inputStream { + messages = append(messages, ai.NewUserTextMessage(userInput)) + + var responseText string + for result, err := range genkit.GenerateStream(ctx, g, + ai.WithMessages(messages...), + ) { + if err != nil { + return "", err + } + if result.Done { + responseText = result.Response.Text() + } + cb(ctx, result.Chunk.Text()) + } + // Stream channel closes here when we loop back to wait for next input + + messages = append(messages, ai.NewModelTextMessage(responseText)) + sess.UpdateState(ctx, ChatState{Messages: messages}) + } + + return "conversation ended", nil + }, + corex.WithSessionStore(store), + corex.WithPersistMode(corex.PersistOnClose), + ) + + // Start new session (generates new session ID) + conn, _ := chatAgent.StreamBidi(ctx) + + // First turn + conn.Send("Hello! Tell me about Go programming.") + for chunk, err := range conn.Stream() { + if err != nil { + panic(err) + } + fmt.Print(chunk) + } + // Loop exits when stream closes (agent finished responding) + + // Second turn - call Stream() again for next response + conn.Send("What are channels used for?") + for chunk, err := range conn.Stream() { + if err != nil { + panic(err) + } + fmt.Print(chunk) + } + + conn.Close() + + // Get session ID from final output to resume later + output, _ := conn.Output() + sessionID := output.SessionID + + // Resume session later with the saved ID + conn2, _ := chatAgent.StreamBidi(ctx, corex.WithSessionID[ChatState](sessionID)) + conn2.Send("Continue our discussion") + // ... +} +``` + +### 5.3 Bidi Flow with Initialization Data + +```go +type ChatInit struct { + SystemPrompt string `json:"systemPrompt"` + Temperature float64 `json:"temperature"` +} + +configuredChat := genkit.DefineBidiFlow[string, string, ChatInit, string](g, "configuredChat", + func(ctx context.Context, inputStream <-chan string, init ChatInit, cb core.StreamCallback[string]) (string, error) { + // Use init.SystemPrompt and init.Temperature + for input := range inputStream { + resp, _ := genkit.GenerateText(ctx, g, + ai.WithSystem(init.SystemPrompt), + ai.WithConfig(&genai.GenerateContentConfig{Temperature: &init.Temperature}), + ai.WithPrompt(input), + ) + cb(ctx, resp) + } + return "done", nil + }, +) + +conn, _ := configuredChat.StreamBidi(ctx, + corex.WithInit(ChatInit{ + SystemPrompt: "You are a helpful assistant.", + Temperature: 0.7, + }), +) +``` + +--- + +## 6. Files to Create/Modify + +### New Files + +| File | Description | +|------|-------------| +| `go/core/x/bidi.go` | BidiActionDef, BidiFunc, BidiConnection | +| `go/core/x/bidi_flow.go` | BidiFlow with tracing | +| `go/core/x/bidi_options.go` | BidiOption types | +| `go/core/x/session_flow.go` | SessionFlow implementation | +| `go/core/x/bidi_test.go` | Tests | +| `go/genkit/bidi.go` | High-level API wrappers | + +### Modified Files + +| File | Change | +|------|--------| +| `go/core/api/action.go` | Add `ActionTypeBidiFlow` constant | + +--- + +## 7. Implementation Notes + +### Error Handling +- Errors from the bidi function propagate to both `Stream()` iterator and `Output()` +- Context cancellation closes all channels and terminates the action +- Send after Close returns an error +- Errors are yielded as the second value in the `iter.Seq2[Stream, error]` iterator + +### Goroutine Management +- BidiConnection spawns a goroutine to run the action +- Proper cleanup on context cancellation using `defer` and `sync.Once` +- Channel closure follows Go idioms (sender closes) +- Trace span is ended in the goroutine's defer + +### Thread Safety +- BidiConnection uses mutex for state (closed flag) +- Send is safe to call from multiple goroutines +- Session operations are thread-safe (from existing session package) + +### Channels and Backpressure +- Both input and output channels are **unbuffered** by default (size 0) +- This provides natural backpressure: `Send()` blocks until agent reads, `cb()` blocks until user consumes +- If needed, a `WithInputBufferSize` option could be added later for specific use cases + +### Iterator Implementation and Turn Semantics +- `Stream()` returns `iter.Seq2[Stream, error]` - a Go 1.23 iterator +- Each call to `Stream()` returns an iterator over a **new channel** for that turn +- When the agent finishes responding (loops back to wait for next input), the stream channel closes +- The user's `for range` loop exits naturally when the channel closes +- Call `Stream()` again after sending the next input to get the next turn's response +- The iterator yields `(chunk, nil)` for each streamed value +- On error, the iterator yields `(zero, err)` and stops + +### Tracing +- Span is started when connection is created, ended when action completes +- Nested spans work normally within the bidi function +- Events can be recorded throughout the connection lifecycle +- Dev UI can show traces in real-time as they stream From 6c6afb7bc687c520e99a2b6bfcb756b97929781d Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Jan 2026 15:48:57 -0800 Subject: [PATCH 02/19] Update go-bidi-design.md --- docs/go-bidi-design.md | 183 +++++++++++++++++++++-------------------- 1 file changed, 93 insertions(+), 90 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 356290526c..2f7a76ad2c 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -14,7 +14,7 @@ All bidi types go in `go/core/x/` (experimental), which will move to `go/core/` ``` go/core/x/ -├── bidi.go # BidiActionDef, BidiFunc, BidiConnection +├── bidi.go # BidiAction, BidiFunc, BidiConnection ├── bidi_flow.go # BidiFlow with tracing ├── bidi_options.go # Option types for bidi ├── session_flow.go # SessionFlow implementation @@ -32,13 +32,13 @@ Import as `corex "github.com/firebase/genkit/go/core/x"`. ### 1.1 BidiAction ```go -// BidiActionDef represents a bidirectional streaming action. +// BidiAction represents a bidirectional streaming action. // Type parameters: // - In: Type of each message sent to the action // - Out: Type of the final output // - Init: Type of initialization data (use struct{} if not needed) // - Stream: Type of each streamed output chunk -type BidiActionDef[In, Out, Init, Stream any] struct { +type BidiAction[In, Out, Init, Stream any] struct { name string fn BidiFunc[In, Out, Init, Stream] registry api.Registry @@ -91,15 +91,13 @@ func (c *BidiConnection[In, Out, Stream]) Output() (Out, error) func (c *BidiConnection[In, Out, Stream]) Done() <-chan struct{} ``` -**Why iterators work for multi-turn:** Each call to `Stream()` returns an iterator over a new channel for that turn. When the agent finishes responding to an input (loops back to wait for the next input), the stream channel for that turn closes, causing the user's `for range` loop to exit naturally. Call `Stream()` again after sending the next input to get the next turn's response. +**Why iterators work for multi-turn:** Each call to `Stream()` returns an iterator over a **new channel** created for that turn. When the agent finishes responding (loops back to read the next input), it closes that turn's stream channel, causing the user's `for range` loop to exit naturally. The user then calls `Send()` with the next input and `Stream()` again to get a new iterator for the next turn's responses. ### 1.3 BidiFlow ```go -// BidiFlow wraps a BidiAction with flow semantics (tracing, monitoring). type BidiFlow[In, Out, Init, Stream any] struct { - *BidiActionDef[In, Out, Init, Stream] - // Uses BidiActionDef.Name() for flow name - no separate field needed + *BidiAction[In, Out, Init, Stream] } ``` @@ -108,19 +106,32 @@ type BidiFlow[In, Out, Init, Stream any] struct { SessionFlow adds session state management on top of BidiFlow. ```go +// Artifact represents a named collection of parts produced during a session. +// Examples: generated files, images, code snippets, etc. +type Artifact struct { + Name string `json:"name"` + Parts []*ai.Part `json:"parts"` +} + // SessionFlowOutput wraps the output with session info for persistence. type SessionFlowOutput[State, Out any] struct { - SessionID string `json:"sessionId"` - Output Out `json:"output"` - State State `json:"state"` + SessionID string `json:"sessionId"` + Output Out `json:"output"` + State State `json:"state"` + Artifacts []Artifact `json:"artifacts,omitempty"` } // SessionFlow is a bidi flow with automatic session state management. // Init = State: the initial state for new sessions (ignored when resuming an existing session). type SessionFlow[State, In, Out, Stream any] struct { *BidiFlow[In, SessionFlowOutput[State, Out], State, Stream] - store session.Store[State] - persistMode PersistMode + store session.Store[State] +} + +// SessionFlowResult is the return type for session flow functions. +type SessionFlowResult[Out any] struct { + Output Out + Artifacts []Artifact } // SessionFlowFunc is the function signature for session flows. @@ -128,20 +139,10 @@ type SessionFlowFunc[State, In, Out, Stream any] func( ctx context.Context, inputStream <-chan In, sess *session.Session[State], - cb core.StreamCallback[Stream], -) (Out, error) - -// PersistMode controls when session state is persisted. -type PersistMode int - -const ( - PersistOnClose PersistMode = iota // Persist only when connection closes (default) - PersistOnUpdate // Persist after each input message is processed -) + sendChunk core.StreamCallback[Stream], +) (SessionFlowResult[Out], error) ``` -**Turn semantics**: The `SessionStreamCallback` includes a `turnDone` parameter. When the agent finishes responding to an input message, it calls `cb(ctx, lastChunk, true)` to signal the turn is complete. This allows clients to know when to prompt for the next user message. - --- ## 2. API Surface @@ -155,14 +156,14 @@ const ( func NewBidiAction[In, Out, Init, Stream any]( name string, fn BidiFunc[In, Out, Init, Stream], -) *BidiActionDef[In, Out, Init, Stream] +) *BidiAction[In, Out, Init, Stream] // DefineBidiAction creates and registers a BidiAction. func DefineBidiAction[In, Out, Init, Stream any]( r api.Registry, name string, fn BidiFunc[In, Out, Init, Stream], -) *BidiActionDef[In, Out, Init, Stream] +) *BidiAction[In, Out, Init, Stream] ``` Schemas for `In`, `Out`, `Init`, and `Stream` types are automatically inferred from the type parameters using the existing JSON schema inference in `go/internal/base/json.go`. @@ -197,7 +198,6 @@ type SessionFlowOption[State any] interface { } func WithSessionStore[State any](store session.Store[State]) SessionFlowOption[State] -func WithPersistMode[State any](mode PersistMode) SessionFlowOption[State] ``` ### 2.4 Starting Connections @@ -206,7 +206,7 @@ All bidi types (BidiAction, BidiFlow, SessionFlow) use the same `StreamBidi` met ```go // BidiAction/BidiFlow -func (a *BidiActionDef[In, Out, Init, Stream]) StreamBidi( +func (a *BidiAction[In, Out, Init, Stream]) StreamBidi( ctx context.Context, opts ...BidiOption[Init], ) (*BidiConnection[In, Out, Stream], error) @@ -292,21 +292,7 @@ sessionID := output.SessionID // Save this to resume later ### 3.2 State Persistence -Persistence mode is configurable: - -```go -// Usage: -chatAgent := genkit.DefineSessionFlow[ChatState, string, string, string](g, "chatAgent", - fn, - corex.WithSessionStore(store), - corex.WithPersistMode(corex.PersistOnUpdate), // or PersistOnClose (default) -) -``` - -- **PersistOnClose** (default): State is persisted only when the connection closes. Better performance. -- **PersistOnUpdate**: State is persisted after each input message is processed. More durable. - -**Note**: `PersistOnUpdate` persists after each input from `inputStream` is processed, not on every `sess.UpdateState()` call. This covers the main use case (persist after each conversation turn) without requiring interface changes to `session.Session`. +State is persisted automatically when `sess.UpdateState()` is called - the existing `session.Session` implementation already persists to the configured store. No special persistence mode is needed; the user controls when to persist by calling `UpdateState()`. --- @@ -316,41 +302,11 @@ chatAgent := genkit.DefineSessionFlow[ChatState, string, string, string](g, "cha BidiFlows create spans that remain open for the lifetime of the connection, enabling streaming trace visualization in the Dev UI. -```go -func (f *BidiFlow[In, Out, Init, Stream]) StreamBidi( - ctx context.Context, - opts ...BidiOption[Init], -) (*BidiConnection[In, Out, Stream], error) { - // Inject flow context - fc := &flowContext{flowName: f.Name()} - ctx = flowContextKey.NewContext(ctx, fc) - - // Start span (NOT RunInNewSpan - we manage lifecycle manually) - spanMeta := &tracing.SpanMetadata{ - Name: f.Name(), - Type: "action", - Subtype: "bidiFlow", - } - ctx, span := tracing.StartSpan(ctx, spanMeta) - - // Create connection, passing span for lifecycle management - conn, err := f.BidiActionDef.streamBidiWithSpan(ctx, span, opts...) - if err != nil { - span.End() // End span on error - return nil, err - } - return conn, nil -} - -// Inside BidiConnection, the span is ended when the action completes: -func (c *BidiConnection[...]) run() { - defer c.span.End() // End span when bidi flow completes - - // Run the action, recording events/nested spans as needed - output, err := c.fn(c.ctx, c.inputCh, c.init, c.streamCallback) - // ... -} -``` +**Key behaviors:** +- Span starts when `StreamBidi()` is called +- Span ends when the bidi function returns (via `defer` in the connection goroutine) +- Flow context is injected so `core.Run()` works inside the bidi function +- Nested spans for sub-operations (e.g., each LLM call) work normally **Important**: The span stays open while the connection is active, allowing: - Streaming traces to the Dev UI in real-time @@ -359,13 +315,20 @@ func (c *BidiConnection[...]) run() { ### 4.2 Action Registration -Add new action type: +Add new action type and schema fields: ```go // In go/core/api/action.go const ( ActionTypeBidiFlow ActionType = "bidi-flow" ) + +// ActionDesc gets two new optional fields +type ActionDesc struct { + // ... existing fields ... + StreamSchema map[string]any `json:"streamSchema,omitempty"` // NEW: schema for streamed chunks + InitSchema map[string]any `json:"initSchema,omitempty"` // NEW: schema for initialization data +} ``` ### 4.3 Session Integration @@ -405,11 +368,11 @@ func main() { // Define echo bidi flow (low-level, no turn semantics) echoFlow := genkit.DefineBidiFlow[string, string, struct{}, string](g, "echo", - func(ctx context.Context, inputStream <-chan string, init struct{}, cb core.StreamCallback[string]) (string, error) { + func(ctx context.Context, inputStream <-chan string, init struct{}, sendChunk core.StreamCallback[string]) (string, error) { var count int for input := range inputStream { count++ - if err := cb(ctx, fmt.Sprintf("echo: %s", input)); err != nil { + if err := sendChunk(ctx, fmt.Sprintf("echo: %s", input)); err != nil { return "", err } } @@ -474,7 +437,7 @@ func main() { // Define a session flow for multi-turn chat chatAgent := genkit.DefineSessionFlow[ChatState, string, string, string](g, "chatAgent", - func(ctx context.Context, inputStream <-chan string, sess *session.Session[ChatState], cb core.StreamCallback[string]) (string, error) { + func(ctx context.Context, inputStream <-chan string, sess *session.Session[ChatState], sendChunk core.StreamCallback[string]) (corex.SessionFlowResult[string], error) { state := sess.State() messages := state.Messages @@ -486,12 +449,12 @@ func main() { ai.WithMessages(messages...), ) { if err != nil { - return "", err + return corex.SessionFlowResult[string]{}, err } if result.Done { responseText = result.Response.Text() } - cb(ctx, result.Chunk.Text()) + sendChunk(ctx, result.Chunk.Text()) } // Stream channel closes here when we loop back to wait for next input @@ -499,10 +462,17 @@ func main() { sess.UpdateState(ctx, ChatState{Messages: messages}) } - return "conversation ended", nil + return corex.SessionFlowResult[string]{ + Output: "conversation ended", + Artifacts: []corex.Artifact{ + { + Name: "summary", + Parts: []*ai.Part{ai.NewTextPart("...")}, + }, + }, + }, nil }, corex.WithSessionStore(store), - corex.WithPersistMode(corex.PersistOnClose), ) // Start new session (generates new session ID) @@ -549,7 +519,7 @@ type ChatInit struct { } configuredChat := genkit.DefineBidiFlow[string, string, ChatInit, string](g, "configuredChat", - func(ctx context.Context, inputStream <-chan string, init ChatInit, cb core.StreamCallback[string]) (string, error) { + func(ctx context.Context, inputStream <-chan string, init ChatInit, sendChunk core.StreamCallback[string]) (string, error) { // Use init.SystemPrompt and init.Temperature for input := range inputStream { resp, _ := genkit.GenerateText(ctx, g, @@ -557,7 +527,7 @@ configuredChat := genkit.DefineBidiFlow[string, string, ChatInit, string](g, "co ai.WithConfig(&genai.GenerateContentConfig{Temperature: &init.Temperature}), ai.WithPrompt(input), ) - cb(ctx, resp) + sendChunk(ctx, resp) } return "done", nil }, @@ -579,7 +549,7 @@ conn, _ := configuredChat.StreamBidi(ctx, | File | Description | |------|-------------| -| `go/core/x/bidi.go` | BidiActionDef, BidiFunc, BidiConnection | +| `go/core/x/bidi.go` | BidiAction, BidiFunc, BidiConnection | | `go/core/x/bidi_flow.go` | BidiFlow with tracing | | `go/core/x/bidi_options.go` | BidiOption types | | `go/core/x/session_flow.go` | SessionFlow implementation | @@ -615,7 +585,7 @@ conn, _ := configuredChat.StreamBidi(ctx, ### Channels and Backpressure - Both input and output channels are **unbuffered** by default (size 0) -- This provides natural backpressure: `Send()` blocks until agent reads, `cb()` blocks until user consumes +- This provides natural backpressure: `Send()` blocks until agent reads, `sendChunk()` blocks until user consumes - If needed, a `WithInputBufferSize` option could be added later for specific use cases ### Iterator Implementation and Turn Semantics @@ -632,3 +602,36 @@ conn, _ := configuredChat.StreamBidi(ctx, - Nested spans work normally within the bidi function - Events can be recorded throughout the connection lifecycle - Dev UI can show traces in real-time as they stream +- Implementation uses the existing tracer infrastructure (details left to implementation) + +### Shutdown Sequence +When `Close()` is called on a BidiConnection: +1. The input channel is closed, signaling no more inputs +2. The bidi function's `for range inputStream` loop exits +3. The function returns its final output +4. The stream channel is closed +5. The `Done()` channel is closed +6. `Output()` unblocks and returns the result + +On context cancellation: +1. Context error propagates to the bidi function +2. All channels are closed +3. `Output()` returns the context error + +### SessionFlow Internal Wrapping +The user's `SessionFlowFunc` returns `SessionFlowResult[Out]`, but `SessionFlow.StreamBidi()` returns `SessionFlowOutput[State, Out]`. Internally, SessionFlow wraps the user function: + +```go +// Simplified internal logic +result, err := userFunc(ctx, wrappedInputStream, sess, sendChunk) +if err != nil { + return SessionFlowOutput[State, Out]{}, err +} +return SessionFlowOutput[State, Out]{ + SessionID: sess.ID(), + Output: result.Output, + State: sess.State(), + Artifacts: result.Artifacts, +}, nil +``` + From e86893ba63356ee3dc0c0065f7419a3dcaecc475 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Jan 2026 15:56:28 -0800 Subject: [PATCH 03/19] Update go-bidi-design.md --- docs/go-bidi-design.md | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 2f7a76ad2c..a9d9e67121 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -173,6 +173,8 @@ Schemas for `In`, `Out`, `Init`, and `Stream` types are automatically inferred f ```go // In go/core/x/bidi_flow.go +// DefineBidiFlow creates a BidiFlow with tracing and registers it. +// Use this for user-defined bidirectional streaming operations. func DefineBidiFlow[In, Out, Init, Stream any]( r api.Registry, name string, @@ -185,6 +187,8 @@ func DefineBidiFlow[In, Out, Init, Stream any]( ```go // In go/core/x/session_flow.go +// DefineSessionFlow creates a SessionFlow with automatic session management and registers it. +// Use this for multi-turn conversational agents that need to persist state across turns. func DefineSessionFlow[State, In, Out, Stream any]( r api.Registry, name string, @@ -197,6 +201,8 @@ type SessionFlowOption[State any] interface { applySessionFlow(*sessionFlowOptions[State]) error } +// WithSessionStore sets the session store for persisting session state. +// If not provided, sessions exist only in memory for the connection lifetime. func WithSessionStore[State any](store session.Store[State]) SessionFlowOption[State] ``` @@ -211,15 +217,19 @@ func (a *BidiAction[In, Out, Init, Stream]) StreamBidi( opts ...BidiOption[Init], ) (*BidiConnection[In, Out, Stream], error) -// BidiOption for streaming +// BidiOption configures a bidi connection. type BidiOption[Init any] interface { applyBidi(*bidiOptions[Init]) error } +// WithInit provides initialization data for the bidi action. +// For SessionFlow, this sets the initial state for new sessions. func WithInit[Init any](init Init) BidiOption[Init] -// SessionFlow uses the same StreamBidi, with Init = State -// Additional option for session ID +// WithSessionID specifies an existing session ID to resume. +// If the session exists in the store, it is loaded (WithInit is ignored). +// If the session doesn't exist, a new session is created with this ID. +// If not provided, a new UUID is generated for new sessions. func WithSessionID[Init any](id string) BidiOption[Init] func (sf *SessionFlow[State, In, Out, Stream]) StreamBidi( From f594c565cb0532a2d811e5d7815b23ada7d901db Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Jan 2026 15:58:33 -0800 Subject: [PATCH 04/19] Update go-bidi-design.md --- docs/go-bidi-design.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index a9d9e67121..38151bdc44 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -15,14 +15,12 @@ All bidi types go in `go/core/x/` (experimental), which will move to `go/core/` ``` go/core/x/ ├── bidi.go # BidiAction, BidiFunc, BidiConnection -├── bidi_flow.go # BidiFlow with tracing -├── bidi_options.go # Option types for bidi +├── bidi_flow.go # BidiFlow ├── session_flow.go # SessionFlow implementation +├── option.go # Options ├── bidi_test.go # Tests ``` -High-level wrappers in `go/genkit/bidi.go`. - Import as `corex "github.com/firebase/genkit/go/core/x"`. --- From 05e415e5ae0669220e5f50e439540804d56163bd Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Jan 2026 15:59:27 -0800 Subject: [PATCH 05/19] Apply suggestion from @gemini-code-assist[bot] Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- docs/go-bidi-design.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 38151bdc44..2844c8dbac 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -78,9 +78,10 @@ func (c *BidiConnection[In, Out, Stream]) Close() error // Stream returns an iterator for receiving streamed chunks. // Each call returns a new iterator over the same underlying channel. // Breaking out of the loop does NOT close the connection - you can call Stream() -// again to continue receiving. The iterator completes when the action finishes. -func (c *BidiConnection[In, Out, Stream]) Stream() iter.Seq2[Stream, error] - +// For multi-turn interactions, this should be called after each Send() to get an +// iterator for that turn's response. Each call provides an iterator over a new +// channel specific to that turn. The iterator completes when the agent is done +// responding for the turn. // Output returns the final output after the action completes. // Blocks until done or context cancelled. func (c *BidiConnection[In, Out, Stream]) Output() (Out, error) From 4121ecca451c563572bd2727b031544f121d247d Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Jan 2026 16:01:10 -0800 Subject: [PATCH 06/19] Update go-bidi-design.md --- docs/go-bidi-design.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 2844c8dbac..38151bdc44 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -78,10 +78,9 @@ func (c *BidiConnection[In, Out, Stream]) Close() error // Stream returns an iterator for receiving streamed chunks. // Each call returns a new iterator over the same underlying channel. // Breaking out of the loop does NOT close the connection - you can call Stream() -// For multi-turn interactions, this should be called after each Send() to get an -// iterator for that turn's response. Each call provides an iterator over a new -// channel specific to that turn. The iterator completes when the agent is done -// responding for the turn. +// again to continue receiving. The iterator completes when the action finishes. +func (c *BidiConnection[In, Out, Stream]) Stream() iter.Seq2[Stream, error] + // Output returns the final output after the action completes. // Blocks until done or context cancelled. func (c *BidiConnection[In, Out, Stream]) Output() (Out, error) From ffc923dcf6d5751dbc8d3bf702d625d2d0b4dd90 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Jan 2026 18:53:12 -0800 Subject: [PATCH 07/19] Update go-bidi-design.md --- docs/go-bidi-design.md | 102 ++++++++++++++++++++--------------------- 1 file changed, 51 insertions(+), 51 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 38151bdc44..bbd83fc768 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -6,7 +6,7 @@ This document describes the design for bidirectional streaming features in Genki 1. **BidiAction** - Core primitive for bidirectional operations 2. **BidiFlow** - BidiAction with observability, intended for user definition -3. **SessionFlow** - Stateful, multi-turn agent interactions with automatic persistence and turn semantics +3. **Agent** - Stateful, multi-turn agent interactions with automatic persistence and turn semantics ## Package Location @@ -16,7 +16,7 @@ All bidi types go in `go/core/x/` (experimental), which will move to `go/core/` go/core/x/ ├── bidi.go # BidiAction, BidiFunc, BidiConnection ├── bidi_flow.go # BidiFlow -├── session_flow.go # SessionFlow implementation +├── agent.go # Agent implementation ├── option.go # Options ├── bidi_test.go # Tests ``` @@ -99,9 +99,9 @@ type BidiFlow[In, Out, Init, Stream any] struct { } ``` -### 1.4 SessionFlow +### 1.4 Agent -SessionFlow adds session state management on top of BidiFlow. +Agent adds session state management on top of BidiFlow. ```go // Artifact represents a named collection of parts produced during a session. @@ -111,34 +111,34 @@ type Artifact struct { Parts []*ai.Part `json:"parts"` } -// SessionFlowOutput wraps the output with session info for persistence. -type SessionFlowOutput[State, Out any] struct { +// AgentOutput wraps the output with session info for persistence. +type AgentOutput[State, Out any] struct { SessionID string `json:"sessionId"` Output Out `json:"output"` State State `json:"state"` Artifacts []Artifact `json:"artifacts,omitempty"` } -// SessionFlow is a bidi flow with automatic session state management. +// Agent is a bidi flow with automatic session state management. // Init = State: the initial state for new sessions (ignored when resuming an existing session). -type SessionFlow[State, In, Out, Stream any] struct { - *BidiFlow[In, SessionFlowOutput[State, Out], State, Stream] +type Agent[State, In, Out, Stream any] struct { + *BidiFlow[In, AgentOutput[State, Out], State, Stream] store session.Store[State] } -// SessionFlowResult is the return type for session flow functions. -type SessionFlowResult[Out any] struct { +// AgentResult is the return type for agent functions. +type AgentResult[Out any] struct { Output Out Artifacts []Artifact } -// SessionFlowFunc is the function signature for session flows. -type SessionFlowFunc[State, In, Out, Stream any] func( +// AgentFunc is the function signature for agents. +type AgentFunc[State, In, Out, Stream any] func( ctx context.Context, inputStream <-chan In, sess *session.Session[State], sendChunk core.StreamCallback[Stream], -) (SessionFlowResult[Out], error) +) (AgentResult[Out], error) ``` --- @@ -180,33 +180,33 @@ func DefineBidiFlow[In, Out, Init, Stream any]( ) *BidiFlow[In, Out, Init, Stream] ``` -### 2.3 Defining Session Flows +### 2.3 Defining Agents ```go -// In go/core/x/session_flow.go +// In go/core/x/agent.go -// DefineSessionFlow creates a SessionFlow with automatic session management and registers it. +// DefineAgent creates an Agent with automatic session management and registers it. // Use this for multi-turn conversational agents that need to persist state across turns. -func DefineSessionFlow[State, In, Out, Stream any]( +func DefineAgent[State, In, Out, Stream any]( r api.Registry, name string, - fn SessionFlowFunc[State, In, Out, Stream], - opts ...SessionFlowOption[State], -) *SessionFlow[State, In, Out, Stream] + fn AgentFunc[State, In, Out, Stream], + opts ...AgentOption[State], +) *Agent[State, In, Out, Stream] -// SessionFlowOption configures a SessionFlow. -type SessionFlowOption[State any] interface { - applySessionFlow(*sessionFlowOptions[State]) error +// AgentOption configures an Agent. +type AgentOption[State any] interface { + applyAgent(*agentOptions[State]) error } // WithSessionStore sets the session store for persisting session state. // If not provided, sessions exist only in memory for the connection lifetime. -func WithSessionStore[State any](store session.Store[State]) SessionFlowOption[State] +func WithSessionStore[State any](store session.Store[State]) AgentOption[State] ``` ### 2.4 Starting Connections -All bidi types (BidiAction, BidiFlow, SessionFlow) use the same `StreamBidi` method to start connections: +All bidi types (BidiAction, BidiFlow, Agent) use the same `StreamBidi` method to start connections: ```go // BidiAction/BidiFlow @@ -221,7 +221,7 @@ type BidiOption[Init any] interface { } // WithInit provides initialization data for the bidi action. -// For SessionFlow, this sets the initial state for new sessions. +// For Agent, this sets the initial state for new sessions. func WithInit[Init any](init Init) BidiOption[Init] // WithSessionID specifies an existing session ID to resume. @@ -230,10 +230,10 @@ func WithInit[Init any](init Init) BidiOption[Init] // If not provided, a new UUID is generated for new sessions. func WithSessionID[Init any](id string) BidiOption[Init] -func (sf *SessionFlow[State, In, Out, Stream]) StreamBidi( +func (a *Agent[State, In, Out, Stream]) StreamBidi( ctx context.Context, opts ...BidiOption[State], -) (*BidiConnection[In, SessionFlowOutput[State, Out], Stream], error) +) (*BidiConnection[In, AgentOutput[State, Out], Stream], error) ``` ### 2.5 High-Level Genkit API @@ -247,25 +247,25 @@ func DefineBidiFlow[In, Out, Init, Stream any]( fn corex.BidiFunc[In, Out, Init, Stream], ) *corex.BidiFlow[In, Out, Init, Stream] -func DefineSessionFlow[State, In, Out, Stream any]( +func DefineAgent[State, In, Out, Stream any]( g *Genkit, name string, - fn corex.SessionFlowFunc[State, In, Out, Stream], - opts ...corex.SessionFlowOption[State], -) *corex.SessionFlow[State, In, Out, Stream] + fn corex.AgentFunc[State, In, Out, Stream], + opts ...corex.AgentOption[State], +) *corex.Agent[State, In, Out, Stream] ``` --- -## 3. Session Flow Details +## 3. Agent Details -### 3.1 Using StreamBidi with SessionFlow +### 3.1 Using StreamBidi with Agent -SessionFlow uses the same `StreamBidi` method as BidiAction and BidiFlow. Session ID is a connection option, and initial state is passed via `WithInit`: +Agent uses the same `StreamBidi` method as BidiAction and BidiFlow. Session ID is a connection option, and initial state is passed via `WithInit`: ```go // Define once at startup -chatAgent := genkit.DefineSessionFlow[ChatState, string, string, string](g, "chatAgent", +chatAgent := genkit.DefineAgent[ChatState, string, string, string](g, "chatAgent", myAgentFunc, corex.WithSessionStore(store), ) @@ -286,12 +286,12 @@ conn4, _ := chatAgent.StreamBidi(ctx, ) ``` -The SessionFlow internally handles session creation/loading: +The Agent internally handles session creation/loading: - If `WithSessionID` is provided and session exists in store → load existing session (WithInit ignored) - If `WithSessionID` is provided but session doesn't exist → create new session with that ID and initial state from WithInit - If no `WithSessionID` → generate new UUID and create session with initial state from WithInit -The session ID is returned in `SessionFlowOutput.SessionID`, so callers can retrieve it from the final output: +The session ID is returned in `AgentOutput.SessionID`, so callers can retrieve it from the final output: ```go output, _ := conn.Output() @@ -346,8 +346,8 @@ Use existing `Session` and `Store` types from `go/core/x/session` (remains a sep ```go import "github.com/firebase/genkit/go/core/x/session" -// SessionFlow holds reference to session store -type SessionFlow[State, In, Out, Stream any] struct { +// Agent holds reference to session store +type Agent[State, In, Out, Stream any] struct { store session.Store[State] // ... } @@ -443,9 +443,9 @@ func main() { genkit.WithDefaultModel("googleai/gemini-2.5-flash"), ) - // Define a session flow for multi-turn chat - chatAgent := genkit.DefineSessionFlow[ChatState, string, string, string](g, "chatAgent", - func(ctx context.Context, inputStream <-chan string, sess *session.Session[ChatState], sendChunk core.StreamCallback[string]) (corex.SessionFlowResult[string], error) { + // Define an agent for multi-turn chat + chatAgent := genkit.DefineAgent[ChatState, string, string, string](g, "chatAgent", + func(ctx context.Context, inputStream <-chan string, sess *session.Session[ChatState], sendChunk core.StreamCallback[string]) (corex.AgentResult[string], error) { state := sess.State() messages := state.Messages @@ -457,7 +457,7 @@ func main() { ai.WithMessages(messages...), ) { if err != nil { - return corex.SessionFlowResult[string]{}, err + return corex.AgentResult[string]{}, err } if result.Done { responseText = result.Response.Text() @@ -470,7 +470,7 @@ func main() { sess.UpdateState(ctx, ChatState{Messages: messages}) } - return corex.SessionFlowResult[string]{ + return corex.AgentResult[string]{ Output: "conversation ended", Artifacts: []corex.Artifact{ { @@ -560,7 +560,7 @@ conn, _ := configuredChat.StreamBidi(ctx, | `go/core/x/bidi.go` | BidiAction, BidiFunc, BidiConnection | | `go/core/x/bidi_flow.go` | BidiFlow with tracing | | `go/core/x/bidi_options.go` | BidiOption types | -| `go/core/x/session_flow.go` | SessionFlow implementation | +| `go/core/x/agent.go` | Agent implementation | | `go/core/x/bidi_test.go` | Tests | | `go/genkit/bidi.go` | High-level API wrappers | @@ -626,16 +626,16 @@ On context cancellation: 2. All channels are closed 3. `Output()` returns the context error -### SessionFlow Internal Wrapping -The user's `SessionFlowFunc` returns `SessionFlowResult[Out]`, but `SessionFlow.StreamBidi()` returns `SessionFlowOutput[State, Out]`. Internally, SessionFlow wraps the user function: +### Agent Internal Wrapping +The user's `AgentFunc` returns `AgentResult[Out]`, but `Agent.StreamBidi()` returns `AgentOutput[State, Out]`. Internally, Agent wraps the user function: ```go // Simplified internal logic result, err := userFunc(ctx, wrappedInputStream, sess, sendChunk) if err != nil { - return SessionFlowOutput[State, Out]{}, err + return AgentOutput[State, Out]{}, err } -return SessionFlowOutput[State, Out]{ +return AgentOutput[State, Out]{ SessionID: sess.ID(), Output: result.Output, State: sess.State(), From 64e1f4c36c6205068e2be6703f2b87a44fb501ff Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Jan 2026 18:59:08 -0800 Subject: [PATCH 08/19] Update go-bidi-design.md --- docs/go-bidi-design.md | 44 +++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index bbd83fc768..4b56e3e7c6 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -32,19 +32,19 @@ Import as `corex "github.com/firebase/genkit/go/core/x"`. ```go // BidiAction represents a bidirectional streaming action. // Type parameters: +// - Init: Type of initialization data (use struct{} if not needed) // - In: Type of each message sent to the action // - Out: Type of the final output -// - Init: Type of initialization data (use struct{} if not needed) // - Stream: Type of each streamed output chunk -type BidiAction[In, Out, Init, Stream any] struct { +type BidiAction[Init, In, Out, Stream any] struct { name string - fn BidiFunc[In, Out, Init, Stream] + fn BidiFunc[Init, In, Out, Stream] registry api.Registry desc *api.ActionDesc } // BidiFunc is the function signature for bidi actions. -type BidiFunc[In, Out, Init, Stream any] func( +type BidiFunc[Init, In, Out, Stream any] func( ctx context.Context, inputStream <-chan In, init Init, @@ -94,8 +94,8 @@ func (c *BidiConnection[In, Out, Stream]) Done() <-chan struct{} ### 1.3 BidiFlow ```go -type BidiFlow[In, Out, Init, Stream any] struct { - *BidiAction[In, Out, Init, Stream] +type BidiFlow[Init, In, Out, Stream any] struct { + *BidiAction[Init, In, Out, Stream] } ``` @@ -122,7 +122,7 @@ type AgentOutput[State, Out any] struct { // Agent is a bidi flow with automatic session state management. // Init = State: the initial state for new sessions (ignored when resuming an existing session). type Agent[State, In, Out, Stream any] struct { - *BidiFlow[In, AgentOutput[State, Out], State, Stream] + *BidiFlow[State, In, AgentOutput[State, Out], Stream] store session.Store[State] } @@ -151,17 +151,17 @@ type AgentFunc[State, In, Out, Stream any] func( // In go/core/x/bidi.go // NewBidiAction creates a BidiAction without registering it. -func NewBidiAction[In, Out, Init, Stream any]( +func NewBidiAction[Init, In, Out, Stream any]( name string, - fn BidiFunc[In, Out, Init, Stream], -) *BidiAction[In, Out, Init, Stream] + fn BidiFunc[Init, In, Out, Stream], +) *BidiAction[Init, In, Out, Stream] // DefineBidiAction creates and registers a BidiAction. -func DefineBidiAction[In, Out, Init, Stream any]( +func DefineBidiAction[Init, In, Out, Stream any]( r api.Registry, name string, - fn BidiFunc[In, Out, Init, Stream], -) *BidiAction[In, Out, Init, Stream] + fn BidiFunc[Init, In, Out, Stream], +) *BidiAction[Init, In, Out, Stream] ``` Schemas for `In`, `Out`, `Init`, and `Stream` types are automatically inferred from the type parameters using the existing JSON schema inference in `go/internal/base/json.go`. @@ -173,11 +173,11 @@ Schemas for `In`, `Out`, `Init`, and `Stream` types are automatically inferred f // DefineBidiFlow creates a BidiFlow with tracing and registers it. // Use this for user-defined bidirectional streaming operations. -func DefineBidiFlow[In, Out, Init, Stream any]( +func DefineBidiFlow[Init, In, Out, Stream any]( r api.Registry, name string, - fn BidiFunc[In, Out, Init, Stream], -) *BidiFlow[In, Out, Init, Stream] + fn BidiFunc[Init, In, Out, Stream], +) *BidiFlow[Init, In, Out, Stream] ``` ### 2.3 Defining Agents @@ -210,7 +210,7 @@ All bidi types (BidiAction, BidiFlow, Agent) use the same `StreamBidi` method to ```go // BidiAction/BidiFlow -func (a *BidiAction[In, Out, Init, Stream]) StreamBidi( +func (a *BidiAction[Init, In, Out, Stream]) StreamBidi( ctx context.Context, opts ...BidiOption[Init], ) (*BidiConnection[In, Out, Stream], error) @@ -241,11 +241,11 @@ func (a *Agent[State, In, Out, Stream]) StreamBidi( ```go // In go/genkit/bidi.go -func DefineBidiFlow[In, Out, Init, Stream any]( +func DefineBidiFlow[Init, In, Out, Stream any]( g *Genkit, name string, - fn corex.BidiFunc[In, Out, Init, Stream], -) *corex.BidiFlow[In, Out, Init, Stream] + fn corex.BidiFunc[Init, In, Out, Stream], +) *corex.BidiFlow[Init, In, Out, Stream] func DefineAgent[State, In, Out, Stream any]( g *Genkit, @@ -375,7 +375,7 @@ func main() { g := genkit.Init(ctx) // Define echo bidi flow (low-level, no turn semantics) - echoFlow := genkit.DefineBidiFlow[string, string, struct{}, string](g, "echo", + echoFlow := genkit.DefineBidiFlow[struct{}, string, string, string](g, "echo", func(ctx context.Context, inputStream <-chan string, init struct{}, sendChunk core.StreamCallback[string]) (string, error) { var count int for input := range inputStream { @@ -526,7 +526,7 @@ type ChatInit struct { Temperature float64 `json:"temperature"` } -configuredChat := genkit.DefineBidiFlow[string, string, ChatInit, string](g, "configuredChat", +configuredChat := genkit.DefineBidiFlow[ChatInit, string, string, string](g, "configuredChat", func(ctx context.Context, inputStream <-chan string, init ChatInit, sendChunk core.StreamCallback[string]) (string, error) { // Use init.SystemPrompt and init.Temperature for input := range inputStream { From 8a8f19337d295df86985e01a131aa9b91247608d Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Sat, 17 Jan 2026 19:08:45 -0800 Subject: [PATCH 09/19] Update go-bidi-design.md --- docs/go-bidi-design.md | 121 +++++++++++++++++++++++++++-------------- 1 file changed, 79 insertions(+), 42 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 4b56e3e7c6..6a3c69bc0b 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -46,9 +46,9 @@ type BidiAction[Init, In, Out, Stream any] struct { // BidiFunc is the function signature for bidi actions. type BidiFunc[Init, In, Out, Stream any] func( ctx context.Context, - inputStream <-chan In, init Init, - streamCallback core.StreamCallback[Stream], + inCh <-chan In, + outCh chan<- Stream, ) (Out, error) ``` @@ -76,9 +76,9 @@ func (c *BidiConnection[In, Out, Stream]) Send(input In) error func (c *BidiConnection[In, Out, Stream]) Close() error // Stream returns an iterator for receiving streamed chunks. -// Each call returns a new iterator over the same underlying channel. -// Breaking out of the loop does NOT close the connection - you can call Stream() -// again to continue receiving. The iterator completes when the action finishes. +// For Agents, the iterator exits when the agent calls resp.EndTurn(), allowing +// multi-turn conversations. Call Stream() again after Send() for the next turn. +// The iterator completes permanently when the action finishes. func (c *BidiConnection[In, Out, Stream]) Stream() iter.Seq2[Stream, error] // Output returns the final output after the action completes. @@ -89,8 +89,6 @@ func (c *BidiConnection[In, Out, Stream]) Output() (Out, error) func (c *BidiConnection[In, Out, Stream]) Done() <-chan struct{} ``` -**Why iterators work for multi-turn:** Each call to `Stream()` returns an iterator over a **new channel** created for that turn. When the agent finishes responding (loops back to read the next input), it closes that turn's stream channel, causing the user's `for range` loop to exit naturally. The user then calls `Send()` with the next input and `Stream()` again to get a new iterator for the next turn's responses. - ### 1.3 BidiFlow ```go @@ -99,9 +97,27 @@ type BidiFlow[Init, In, Out, Stream any] struct { } ``` -### 1.4 Agent +### 1.4 Responder + +`Responder` wraps the output channel for agents, providing methods to send data and signal turn boundaries. + +```go +// Responder wraps the output channel with turn signaling for multi-turn agents. +type Responder[T any] struct { + ch chan<- streamChunk[T] // internal, unexported +} + +// Send sends a streamed chunk to the consumer. +func (r *Responder[T]) Send(data T) + +// EndTurn signals that the agent has finished responding to the current input. +// The consumer's Stream() iterator will exit, allowing them to send the next input. +func (r *Responder[T]) EndTurn() +``` + +### 1.5 Agent -Agent adds session state management on top of BidiFlow. +Agent adds session state management on top of BidiFlow with turn semantics. ```go // Artifact represents a named collection of parts produced during a session. @@ -135,9 +151,9 @@ type AgentResult[Out any] struct { // AgentFunc is the function signature for agents. type AgentFunc[State, In, Out, Stream any] func( ctx context.Context, - inputStream <-chan In, sess *session.Session[State], - sendChunk core.StreamCallback[Stream], + inCh <-chan In, + resp *Responder[Stream], ) (AgentResult[Out], error) ``` @@ -366,7 +382,6 @@ import ( "context" "fmt" - "github.com/firebase/genkit/go/core" "github.com/firebase/genkit/go/genkit" ) @@ -376,13 +391,11 @@ func main() { // Define echo bidi flow (low-level, no turn semantics) echoFlow := genkit.DefineBidiFlow[struct{}, string, string, string](g, "echo", - func(ctx context.Context, inputStream <-chan string, init struct{}, sendChunk core.StreamCallback[string]) (string, error) { + func(ctx context.Context, init struct{}, inCh <-chan string, outCh chan<- string) (string, error) { var count int - for input := range inputStream { + for input := range inCh { count++ - if err := sendChunk(ctx, fmt.Sprintf("echo: %s", input)); err != nil { - return "", err - } + outCh <- fmt.Sprintf("echo: %s", input) } return fmt.Sprintf("processed %d messages", count), nil }, @@ -423,7 +436,6 @@ import ( "fmt" "github.com/firebase/genkit/go/ai" - "github.com/firebase/genkit/go/core" corex "github.com/firebase/genkit/go/core/x" "github.com/firebase/genkit/go/core/x/session" "github.com/firebase/genkit/go/genkit" @@ -445,14 +457,14 @@ func main() { // Define an agent for multi-turn chat chatAgent := genkit.DefineAgent[ChatState, string, string, string](g, "chatAgent", - func(ctx context.Context, inputStream <-chan string, sess *session.Session[ChatState], sendChunk core.StreamCallback[string]) (corex.AgentResult[string], error) { + func(ctx context.Context, sess *session.Session[ChatState], inCh <-chan string, resp *corex.Responder[string]) (corex.AgentResult[string], error) { state := sess.State() messages := state.Messages - for userInput := range inputStream { + for userInput := range inCh { messages = append(messages, ai.NewUserTextMessage(userInput)) - var responseText string + var respText string for result, err := range genkit.GenerateStream(ctx, g, ai.WithMessages(messages...), ) { @@ -460,13 +472,13 @@ func main() { return corex.AgentResult[string]{}, err } if result.Done { - responseText = result.Response.Text() + respText = result.Response.Text() } - sendChunk(ctx, result.Chunk.Text()) + resp.Send(result.Chunk.Text()) } - // Stream channel closes here when we loop back to wait for next input + resp.EndTurn() // Signal turn complete, consumer's Stream() exits - messages = append(messages, ai.NewModelTextMessage(responseText)) + messages = append(messages, ai.NewModelTextMessage(respText)) sess.UpdateState(ctx, ChatState{Messages: messages}) } @@ -494,9 +506,9 @@ func main() { } fmt.Print(chunk) } - // Loop exits when stream closes (agent finished responding) + // Loop exits when agent calls resp.EndTurn() - // Second turn - call Stream() again for next response + // Second turn conn.Send("What are channels used for?") for chunk, err := range conn.Stream() { if err != nil { @@ -527,15 +539,15 @@ type ChatInit struct { } configuredChat := genkit.DefineBidiFlow[ChatInit, string, string, string](g, "configuredChat", - func(ctx context.Context, inputStream <-chan string, init ChatInit, sendChunk core.StreamCallback[string]) (string, error) { + func(ctx context.Context, init ChatInit, inCh <-chan string, outCh chan<- string) (string, error) { // Use init.SystemPrompt and init.Temperature - for input := range inputStream { + for input := range inCh { resp, _ := genkit.GenerateText(ctx, g, ai.WithSystem(init.SystemPrompt), ai.WithConfig(&genai.GenerateContentConfig{Temperature: &init.Temperature}), ai.WithPrompt(input), ) - sendChunk(ctx, resp) + outCh <- resp } return "done", nil }, @@ -593,17 +605,42 @@ conn, _ := configuredChat.StreamBidi(ctx, ### Channels and Backpressure - Both input and output channels are **unbuffered** by default (size 0) -- This provides natural backpressure: `Send()` blocks until agent reads, `sendChunk()` blocks until user consumes -- If needed, a `WithInputBufferSize` option could be added later for specific use cases - -### Iterator Implementation and Turn Semantics -- `Stream()` returns `iter.Seq2[Stream, error]` - a Go 1.23 iterator -- Each call to `Stream()` returns an iterator over a **new channel** for that turn -- When the agent finishes responding (loops back to wait for next input), the stream channel closes -- The user's `for range` loop exits naturally when the channel closes -- Call `Stream()` again after sending the next input to get the next turn's response -- The iterator yields `(chunk, nil)` for each streamed value -- On error, the iterator yields `(zero, err)` and stops +- This provides natural backpressure: `Send()` blocks until agent reads, `resp.Send()` blocks until consumer reads +- If needed, `WithInputBufferSize` / `WithOutputBufferSize` options could be added later for specific use cases + +### Turn Signaling (Agents) + +For multi-turn conversations, the consumer needs to know when the agent has finished responding to one input and is ready for the next. + +**How it works internally:** + +1. `BidiConnection.streamCh` is actually `chan streamChunk[Stream]` (internal type) +2. `streamChunk` has `data T` and `endTurn bool` fields (unexported) +3. `resp.Send(data)` sends `streamChunk{data: data}` +4. `resp.EndTurn()` sends `streamChunk{endTurn: true}` +5. `conn.Stream()` unwraps chunks, yielding only the data +6. When `Stream()` sees `endTurn: true`, it exits the iterator without yielding + +**From the agent's perspective:** +```go +for input := range inCh { + resp.Send("partial...") + resp.Send("more...") + resp.EndTurn() // Consumer's for loop exits here +} +``` + +**From the consumer's perspective:** +```go +conn.Send("question") +for chunk, err := range conn.Stream() { + fmt.Print(chunk) // Just gets string, not streamChunk +} +// Loop exited because agent called EndTurn() + +conn.Send("follow-up") +for chunk, err := range conn.Stream() { ... } +``` ### Tracing - Span is started when connection is created, ended when action completes @@ -631,7 +668,7 @@ The user's `AgentFunc` returns `AgentResult[Out]`, but `Agent.StreamBidi()` retu ```go // Simplified internal logic -result, err := userFunc(ctx, wrappedInputStream, sess, sendChunk) +result, err := userFunc(ctx, wrappedInCh, outCh, sess) if err != nil { return AgentOutput[State, Out]{}, err } From 899e9f28fd63a4e1c360f2fa6c1e4a92c9964583 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Sat, 17 Jan 2026 19:24:19 -0800 Subject: [PATCH 10/19] Update go-bidi-design.md --- docs/go-bidi-design.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 6a3c69bc0b..4e2a7bf4e4 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -281,7 +281,7 @@ Agent uses the same `StreamBidi` method as BidiAction and BidiFlow. Session ID i ```go // Define once at startup -chatAgent := genkit.DefineAgent[ChatState, string, string, string](g, "chatAgent", +chatAgent := genkit.DefineAgent(g, "chatAgent", myAgentFunc, corex.WithSessionStore(store), ) @@ -390,7 +390,7 @@ func main() { g := genkit.Init(ctx) // Define echo bidi flow (low-level, no turn semantics) - echoFlow := genkit.DefineBidiFlow[struct{}, string, string, string](g, "echo", + echoFlow := genkit.DefineBidiFlow(g, "echo", func(ctx context.Context, init struct{}, inCh <-chan string, outCh chan<- string) (string, error) { var count int for input := range inCh { @@ -456,7 +456,7 @@ func main() { ) // Define an agent for multi-turn chat - chatAgent := genkit.DefineAgent[ChatState, string, string, string](g, "chatAgent", + chatAgent := genkit.DefineAgent(g, "chatAgent", func(ctx context.Context, sess *session.Session[ChatState], inCh <-chan string, resp *corex.Responder[string]) (corex.AgentResult[string], error) { state := sess.State() messages := state.Messages @@ -538,7 +538,7 @@ type ChatInit struct { Temperature float64 `json:"temperature"` } -configuredChat := genkit.DefineBidiFlow[ChatInit, string, string, string](g, "configuredChat", +configuredChat := genkit.DefineBidiFlow(g, "configuredChat", func(ctx context.Context, init ChatInit, inCh <-chan string, outCh chan<- string) (string, error) { // Use init.SystemPrompt and init.Temperature for input := range inCh { From 157a5b1cbb1c2a4efd9cfb83639062a1df424d04 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Sat, 17 Jan 2026 19:38:51 -0800 Subject: [PATCH 11/19] Update go-bidi-design.md --- docs/go-bidi-design.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 4e2a7bf4e4..4d1dddee25 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -75,11 +75,11 @@ func (c *BidiConnection[In, Out, Stream]) Send(input In) error // Close signals that no more inputs will be sent. func (c *BidiConnection[In, Out, Stream]) Close() error -// Stream returns an iterator for receiving streamed chunks. +// Responses returns an iterator for receiving streamed response chunks. // For Agents, the iterator exits when the agent calls resp.EndTurn(), allowing -// multi-turn conversations. Call Stream() again after Send() for the next turn. +// multi-turn conversations. Call Responses() again after Send() for the next turn. // The iterator completes permanently when the action finishes. -func (c *BidiConnection[In, Out, Stream]) Stream() iter.Seq2[Stream, error] +func (c *BidiConnection[In, Out, Stream]) Responses() iter.Seq2[Stream, error] // Output returns the final output after the action completes. // Blocks until done or context cancelled. @@ -413,7 +413,7 @@ func main() { conn.Close() // Consume stream via iterator - for chunk, err := range conn.Stream() { + for chunk, err := range conn.Responses() { if err != nil { panic(err) } @@ -500,7 +500,7 @@ func main() { // First turn conn.Send("Hello! Tell me about Go programming.") - for chunk, err := range conn.Stream() { + for chunk, err := range conn.Responses() { if err != nil { panic(err) } @@ -510,7 +510,7 @@ func main() { // Second turn conn.Send("What are channels used for?") - for chunk, err := range conn.Stream() { + for chunk, err := range conn.Responses() { if err != nil { panic(err) } @@ -618,7 +618,7 @@ For multi-turn conversations, the consumer needs to know when the agent has fini 2. `streamChunk` has `data T` and `endTurn bool` fields (unexported) 3. `resp.Send(data)` sends `streamChunk{data: data}` 4. `resp.EndTurn()` sends `streamChunk{endTurn: true}` -5. `conn.Stream()` unwraps chunks, yielding only the data +5. `conn.Responses()` unwraps chunks, yielding only the data 6. When `Stream()` sees `endTurn: true`, it exits the iterator without yielding **From the agent's perspective:** @@ -633,13 +633,13 @@ for input := range inCh { **From the consumer's perspective:** ```go conn.Send("question") -for chunk, err := range conn.Stream() { +for chunk, err := range conn.Responses() { fmt.Print(chunk) // Just gets string, not streamChunk } // Loop exited because agent called EndTurn() conn.Send("follow-up") -for chunk, err := range conn.Stream() { ... } +for chunk, err := range conn.Responses() { ... } ``` ### Tracing From 6c160e5aeb0181e275281e6584ecf08b1eb4bae3 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Sat, 17 Jan 2026 19:39:17 -0800 Subject: [PATCH 12/19] Update go-bidi-design.md --- docs/go-bidi-design.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 4d1dddee25..40259cb6a3 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -461,8 +461,8 @@ func main() { state := sess.State() messages := state.Messages - for userInput := range inCh { - messages = append(messages, ai.NewUserTextMessage(userInput)) + for input := range inCh { + messages = append(messages, ai.NewUserTextMessage(input)) var respText string for result, err := range genkit.GenerateStream(ctx, g, From 83a15e32436e79fb8091a5bc3ec04b797bc4b219 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Wed, 21 Jan 2026 12:09:37 -0800 Subject: [PATCH 13/19] Create go-agent-design.md --- docs/go-agent-design.md | 444 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 444 insertions(+) create mode 100644 docs/go-agent-design.md diff --git a/docs/go-agent-design.md b/docs/go-agent-design.md new file mode 100644 index 0000000000..7553bbc8c4 --- /dev/null +++ b/docs/go-agent-design.md @@ -0,0 +1,444 @@ +# Genkit Go Agent - Design Document + +## Overview + +This document describes the design for the `Agent` primitive in Genkit Go. An Agent is a stateful, multi-turn conversational agent with automatic session persistence and turn semantics. + +For the underlying bidirectional streaming primitives (BidiAction, BidiFlow, BidiModel), see [go-bidi-design.md](go-bidi-design.md). + +## Package Location + +Agent is an AI concept and belongs in `go/ai/x/` (experimental): + +``` +go/ai/x/ +├── agent.go # Agent, AgentFunc, AgentOutput, AgentResult +├── agent_options.go # AgentOption types +├── agent_test.go # Tests +``` + +Import as `aix "github.com/firebase/genkit/go/ai/x"`. + +--- + +## 1. Core Type Definitions + +### 1.1 Responder + +`Responder` wraps the output channel for agents, providing methods to send data and signal turn boundaries. + +```go +// Responder wraps the output channel with turn signaling for multi-turn agents. +type Responder[T any] struct { + ch chan<- streamChunk[T] // internal, unexported +} + +// Send sends a streamed chunk to the consumer. +func (r *Responder[T]) Send(data T) + +// EndTurn signals that the agent has finished responding to the current input. +// The consumer's Responses() iterator will exit, allowing them to send the next input. +func (r *Responder[T]) EndTurn() +``` + +### 1.2 Agent Types + +```go +// Artifact represents a named collection of parts produced during a session. +// Examples: generated files, images, code snippets, etc. +type Artifact struct { + Name string `json:"name"` + Parts []*ai.Part `json:"parts"` +} + +// AgentOutput wraps the output with session info for persistence. +type AgentOutput[State, Out any] struct { + SessionID string `json:"sessionId"` + Output Out `json:"output"` + State State `json:"state"` + Artifacts []Artifact `json:"artifacts,omitempty"` +} + +// Agent is a bidi flow with automatic session state management. +// Init = State: the initial state for new sessions (ignored when resuming an existing session). +type Agent[State, In, Out, Stream any] struct { + *corex.BidiFlow[State, In, AgentOutput[State, Out], Stream] + store session.Store[State] +} + +// AgentResult is the return type for agent functions. +type AgentResult[Out any] struct { + Output Out + Artifacts []Artifact +} + +// AgentFunc is the function signature for agents. +type AgentFunc[State, In, Out, Stream any] func( + ctx context.Context, + sess *session.Session[State], + inCh <-chan In, + resp *Responder[Stream], +) (AgentResult[Out], error) +``` + +--- + +## 2. API Surface + +### 2.1 Defining Agents + +```go +// In go/ai/x/agent.go + +// DefineAgent creates an Agent with automatic session management and registers it. +// Use this for multi-turn conversational agents that need to persist state across turns. +func DefineAgent[State, In, Out, Stream any]( + r api.Registry, + name string, + fn AgentFunc[State, In, Out, Stream], + opts ...AgentOption[State], +) *Agent[State, In, Out, Stream] + +// AgentOption configures an Agent. +type AgentOption[State any] interface { + applyAgent(*agentOptions[State]) error +} + +// WithSessionStore sets the session store for persisting session state. +// If not provided, sessions exist only in memory for the connection lifetime. +func WithSessionStore[State any](store session.Store[State]) AgentOption[State] +``` + +### 2.2 Starting Connections + +Agent uses the same `StreamBidi` method as BidiAction and BidiFlow. Session ID is a connection option, and initial state is passed via `WithInit`: + +```go +func (a *Agent[State, In, Out, Stream]) StreamBidi( + ctx context.Context, + opts ...corex.BidiOption[State], +) (*corex.BidiConnection[In, AgentOutput[State, Out], Stream], error) + +// BidiOption from go/core/x - also used by Agent +// WithInit provides initialization data for the bidi action. +// For Agent, this sets the initial state for new sessions. +func WithInit[Init any](init Init) BidiOption[Init] + +// WithSessionID specifies an existing session ID to resume. +// If the session exists in the store, it is loaded (WithInit is ignored). +// If the session doesn't exist, a new session is created with this ID. +// If not provided, a new UUID is generated for new sessions. +func WithSessionID[Init any](id string) BidiOption[Init] +``` + +### 2.3 High-Level Genkit API + +```go +// In go/genkit/bidi.go + +func DefineAgent[State, In, Out, Stream any]( + g *Genkit, + name string, + fn aix.AgentFunc[State, In, Out, Stream], + opts ...aix.AgentOption[State], +) *aix.Agent[State, In, Out, Stream] +``` + +--- + +## 3. Session Management + +### 3.1 Using StreamBidi with Agent + +```go +// Define once at startup +chatAgent := genkit.DefineAgent(g, "chatAgent", + myAgentFunc, + aix.WithSessionStore(store), +) + +// NEW USER: Start fresh session (generates new ID, zero state) +conn1, _ := chatAgent.StreamBidi(ctx) + +// RETURNING USER: Resume existing session by ID +conn2, _ := chatAgent.StreamBidi(ctx, corex.WithSessionID[ChatState]("user-123-session")) + +// NEW USER WITH INITIAL STATE: Start with pre-populated state +conn3, _ := chatAgent.StreamBidi(ctx, corex.WithInit(ChatState{Messages: preloadedHistory})) + +// NEW USER WITH SPECIFIC ID AND INITIAL STATE +conn4, _ := chatAgent.StreamBidi(ctx, + corex.WithSessionID[ChatState]("custom-session-id"), + corex.WithInit(ChatState{Messages: preloadedHistory}), +) +``` + +The Agent internally handles session creation/loading: +- If `WithSessionID` is provided and session exists in store → load existing session (WithInit ignored) +- If `WithSessionID` is provided but session doesn't exist → create new session with that ID and initial state from WithInit +- If no `WithSessionID` → generate new UUID and create session with initial state from WithInit + +The session ID is returned in `AgentOutput.SessionID`, so callers can retrieve it from the final output: + +```go +output, _ := conn.Output() +sessionID := output.SessionID // Save this to resume later +``` + +### 3.2 State Persistence + +State is persisted automatically when `sess.UpdateState()` is called - the existing `session.Session` implementation already persists to the configured store. No special persistence mode is needed; the user controls when to persist by calling `UpdateState()`. + +### 3.3 Session Integration + +Use existing `Session` and `Store` types from `go/core/x/session`: + +```go +import "github.com/firebase/genkit/go/core/x/session" + +// Agent holds reference to session store +type Agent[State, In, Out, Stream any] struct { + store session.Store[State] + // ... +} +``` + +--- + +## 4. Turn Signaling + +For multi-turn conversations, the consumer needs to know when the agent has finished responding to one input and is ready for the next. + +### 4.1 How It Works Internally + +1. `BidiConnection.streamCh` is actually `chan streamChunk[Stream]` (internal type) +2. `streamChunk` has `data T` and `endTurn bool` fields (unexported) +3. `resp.Send(data)` sends `streamChunk{data: data}` +4. `resp.EndTurn()` sends `streamChunk{endTurn: true}` +5. `conn.Responses()` unwraps chunks, yielding only the data +6. When `Responses()` sees `endTurn: true`, it exits the iterator without yielding + +### 4.2 From the Agent's Perspective + +```go +for input := range inCh { + resp.Send("partial...") + resp.Send("more...") + resp.EndTurn() // Consumer's for loop exits here +} +``` + +### 4.3 From the Consumer's Perspective + +```go +conn.Send("question") +for chunk, err := range conn.Responses() { + fmt.Print(chunk) // Just gets string, not streamChunk +} +// Loop exited because agent called EndTurn() + +conn.Send("follow-up") +for chunk, err := range conn.Responses() { ... } +``` + +--- + +## 5. Example Usage + +### 5.1 Chat Agent with Session Persistence + +```go +package main + +import ( + "context" + "fmt" + + "github.com/firebase/genkit/go/ai" + aix "github.com/firebase/genkit/go/ai/x" + corex "github.com/firebase/genkit/go/core/x" + "github.com/firebase/genkit/go/core/x/session" + "github.com/firebase/genkit/go/genkit" + "github.com/firebase/genkit/go/plugins/googlegenai" +) + +type ChatState struct { + Messages []*ai.Message `json:"messages"` +} + +func main() { + ctx := context.Background() + store := session.NewInMemoryStore[ChatState]() + + g := genkit.Init(ctx, + genkit.WithPlugins(&googlegenai.GoogleAI{}), + genkit.WithDefaultModel("googleai/gemini-2.5-flash"), + ) + + // Define an agent for multi-turn chat + chatAgent := genkit.DefineAgent(g, "chatAgent", + func(ctx context.Context, sess *session.Session[ChatState], inCh <-chan string, resp *aix.Responder[string]) (aix.AgentResult[string], error) { + state := sess.State() + messages := state.Messages + + for input := range inCh { + messages = append(messages, ai.NewUserTextMessage(input)) + + var respText string + for result, err := range genkit.GenerateStream(ctx, g, + ai.WithMessages(messages...), + ) { + if err != nil { + return aix.AgentResult[string]{}, err + } + if result.Done { + respText = result.Response.Text() + } + resp.Send(result.Chunk.Text()) + } + resp.EndTurn() // Signal turn complete, consumer's Responses() exits + + messages = append(messages, ai.NewModelTextMessage(respText)) + sess.UpdateState(ctx, ChatState{Messages: messages}) + } + + return aix.AgentResult[string]{ + Output: "conversation ended", + Artifacts: []aix.Artifact{ + { + Name: "summary", + Parts: []*ai.Part{ai.NewTextPart("...")}, + }, + }, + }, nil + }, + aix.WithSessionStore(store), + ) + + // Start new session (generates new session ID) + conn, _ := chatAgent.StreamBidi(ctx) + + // First turn + conn.Send("Hello! Tell me about Go programming.") + for chunk, err := range conn.Responses() { + if err != nil { + panic(err) + } + fmt.Print(chunk) + } + // Loop exits when agent calls resp.EndTurn() + + // Second turn + conn.Send("What are channels used for?") + for chunk, err := range conn.Responses() { + if err != nil { + panic(err) + } + fmt.Print(chunk) + } + + conn.Close() + + // Get session ID from final output to resume later + output, _ := conn.Output() + sessionID := output.SessionID + + // Resume session later with the saved ID + conn2, _ := chatAgent.StreamBidi(ctx, corex.WithSessionID[ChatState](sessionID)) + conn2.Send("Continue our discussion") + // ... +} +``` + +### 5.2 Agent with Artifacts + +```go +type CodeState struct { + History []*ai.Message `json:"history"` + GeneratedCode []string `json:"generatedCode"` +} + +codeAgent := genkit.DefineAgent(g, "codeAgent", + func(ctx context.Context, sess *session.Session[CodeState], inCh <-chan string, resp *aix.Responder[string]) (aix.AgentResult[string], error) { + state := sess.State() + var artifacts []aix.Artifact + + for input := range inCh { + // Generate response with code... + generatedCode := "func main() { ... }" + + resp.Send("Here's the code you requested:\n") + resp.Send("```go\n" + generatedCode + "\n```") + resp.EndTurn() + + // Track generated code in state + state.GeneratedCode = append(state.GeneratedCode, generatedCode) + sess.UpdateState(ctx, state) + + // Add as artifact + artifacts = append(artifacts, aix.Artifact{ + Name: fmt.Sprintf("code_%d.go", len(artifacts)), + Parts: []*ai.Part{ai.NewTextPart(generatedCode)}, + }) + } + + return aix.AgentResult[string]{ + Output: "code generation complete", + Artifacts: artifacts, + }, nil + }, + aix.WithSessionStore(store), +) +``` + +--- + +## 6. Files to Create/Modify + +### New Files + +| File | Description | +|------|-------------| +| `go/ai/x/agent.go` | Agent, AgentFunc, AgentResult, AgentOutput, Artifact, Responder | +| `go/ai/x/agent_options.go` | AgentOption, WithSessionStore | +| `go/ai/x/agent_test.go` | Tests | + +### Modified Files + +| File | Change | +|------|--------| +| `go/genkit/bidi.go` | Add DefineAgent wrapper | + +--- + +## 7. Implementation Notes + +### Agent Internal Wrapping + +The user's `AgentFunc` returns `AgentResult[Out]`, but `Agent.StreamBidi()` returns `AgentOutput[State, Out]`. Internally, Agent wraps the user function: + +```go +// Simplified internal logic +result, err := userFunc(ctx, wrappedInCh, outCh, sess) +if err != nil { + return AgentOutput[State, Out]{}, err +} +return AgentOutput[State, Out]{ + SessionID: sess.ID(), + Output: result.Output, + State: sess.State(), + Artifacts: result.Artifacts, +}, nil +``` + +### Thread Safety + +- BidiConnection uses mutex for state (closed flag) +- Send is safe to call from multiple goroutines +- Session operations are thread-safe (from existing session package) + +### Tracing + +- Agent inherits tracing from BidiFlow +- Each turn can create nested spans for LLM calls +- Session ID is recorded in trace metadata From b43eb9e523bfd54102e1dad7a2dc0f05d4413834 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Wed, 21 Jan 2026 12:09:39 -0800 Subject: [PATCH 14/19] Update go-bidi-design.md --- docs/go-bidi-design.md | 587 +++++++++++++++++++---------------------- 1 file changed, 267 insertions(+), 320 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 40259cb6a3..050cff49f2 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -4,24 +4,29 @@ This document describes the design for bidirectional streaming features in Genkit Go. The implementation introduces three new primitives: -1. **BidiAction** - Core primitive for bidirectional operations -2. **BidiFlow** - BidiAction with observability, intended for user definition -3. **Agent** - Stateful, multi-turn agent interactions with automatic persistence and turn semantics +1. **BidiAction** - Core primitive for bidirectional operations (`go/core/x`) +2. **BidiFlow** - BidiAction with observability, intended for user definition (`go/core/x`) +3. **BidiModel** - Specialized bidi action for real-time LLM APIs (`go/ai/x`) -## Package Location +For stateful multi-turn agents with session persistence, see [go-agent-design.md](go-agent-design.md). -All bidi types go in `go/core/x/` (experimental), which will move to `go/core/` when stabilized: +## Package Location ``` go/core/x/ ├── bidi.go # BidiAction, BidiFunc, BidiConnection ├── bidi_flow.go # BidiFlow -├── agent.go # Agent implementation -├── option.go # Options +├── bidi_options.go # Options ├── bidi_test.go # Tests + +go/ai/x/ +├── bidi_model.go # BidiModel, BidiModelFunc +├── bidi_model_test.go ``` -Import as `corex "github.com/firebase/genkit/go/core/x"`. +Import as: +- `corex "github.com/firebase/genkit/go/core/x"` +- `aix "github.com/firebase/genkit/go/ai/x"` --- @@ -76,9 +81,7 @@ func (c *BidiConnection[In, Out, Stream]) Send(input In) error func (c *BidiConnection[In, Out, Stream]) Close() error // Responses returns an iterator for receiving streamed response chunks. -// For Agents, the iterator exits when the agent calls resp.EndTurn(), allowing -// multi-turn conversations. Call Responses() again after Send() for the next turn. -// The iterator completes permanently when the action finishes. +// The iterator completes when the action finishes or signals end of turn. func (c *BidiConnection[In, Out, Stream]) Responses() iter.Seq2[Stream, error] // Output returns the final output after the action completes. @@ -97,71 +100,232 @@ type BidiFlow[Init, In, Out, Stream any] struct { } ``` -### 1.4 Responder +--- + +## 2. BidiModel + +### 2.1 Overview + +`BidiModel` is a specialized bidi action for real-time LLM APIs like Gemini Live and OpenAI Realtime. These APIs establish a persistent connection where configuration (temperature, system prompt, tools) must be provided upfront, and then the conversation streams bidirectionally. -`Responder` wraps the output channel for agents, providing methods to send data and signal turn boundaries. +### 2.2 The Role of `init` + +For real-time sessions, the connection to the model API often requires configuration to be established *before* the first user message is received. The `init` payload fulfills this requirement: + +- **`init`**: `GenerateRequest` (contains config, tools, system prompt) +- **`inputStream`**: Stream of `GenerateRequest` (contains user messages/turns) +- **`stream`**: Stream of `GenerateResponseChunk` + +### 2.3 Type Definitions ```go -// Responder wraps the output channel with turn signaling for multi-turn agents. -type Responder[T any] struct { - ch chan<- streamChunk[T] // internal, unexported +// In go/ai/x/bidi_model.go + +// BidiModel represents a bidirectional streaming model for real-time LLM APIs. +type BidiModel struct { + *corex.BidiAction[*ai.GenerateRequest, *ai.GenerateRequest, *ai.GenerateResponse, *ai.GenerateResponseChunk] } -// Send sends a streamed chunk to the consumer. -func (r *Responder[T]) Send(data T) +// BidiModelFunc is the function signature for bidi model implementations. +type BidiModelFunc func( + ctx context.Context, + init *ai.GenerateRequest, + inCh <-chan *ai.GenerateRequest, + outCh chan<- *ai.GenerateResponseChunk, +) (*ai.GenerateResponse, error) +``` + +### 2.4 Defining a BidiModel -// EndTurn signals that the agent has finished responding to the current input. -// The consumer's Stream() iterator will exit, allowing them to send the next input. -func (r *Responder[T]) EndTurn() +```go +// DefineBidiModel creates and registers a BidiModel for real-time LLM interactions. +func DefineBidiModel( + r api.Registry, + name string, + fn BidiModelFunc, +) *BidiModel ``` -### 1.5 Agent +**Example Plugin Implementation:** + +```go +// In a plugin like googlegenai + +func (g *GoogleAI) defineBidiModel(r api.Registry) *aix.BidiModel { + return aix.DefineBidiModel(r, "googleai/gemini-2.0-flash-live", + func(ctx context.Context, init *ai.GenerateRequest, inCh <-chan *ai.GenerateRequest, outCh chan<- *ai.GenerateResponseChunk) (*ai.GenerateResponse, error) { + // 1. Establish session using configuration from init + session, err := g.client.ConnectLive(ctx, &genai.LiveConnectConfig{ + Model: "gemini-2.0-flash-live", + SystemPrompt: extractSystemPrompt(init), + Tools: convertTools(init.Tools), + Config: convertConfig(init.Config), + }) + if err != nil { + return nil, err + } + defer session.Close() -Agent adds session state management on top of BidiFlow with turn semantics. + var totalUsage ai.GenerationUsage + + // 2. Handle conversation stream + for request := range inCh { + // Send new user input to the upstream session + if err := session.SendContent(convertMessages(request.Messages)); err != nil { + return nil, err + } + + // Yield responses from the upstream session + for chunk := range session.Receive() { + outCh <- &ai.GenerateResponseChunk{ + Content: []*ai.Part{ai.NewTextPart(chunk.Text)}, + } + totalUsage.Add(chunk.Usage) + } + } + + // 3. Return final result (usage stats, etc.) + return &ai.GenerateResponse{ + Usage: &totalUsage, + }, nil + }, + ) +} +``` + +### 2.5 Using BidiModel (`GenerateBidi`) + +`GenerateBidi` is the high-level API for interacting with bidi models. It provides a session-like interface for real-time conversations. ```go -// Artifact represents a named collection of parts produced during a session. -// Examples: generated files, images, code snippets, etc. -type Artifact struct { - Name string `json:"name"` - Parts []*ai.Part `json:"parts"` +// In go/genkit/generate.go or go/ai/x/generate_bidi.go + +// GenerateBidiSession wraps BidiConnection with model-specific convenience methods. +type GenerateBidiSession struct { + conn *corex.BidiConnection[*ai.GenerateRequest, *ai.GenerateResponse, *ai.GenerateResponseChunk] } -// AgentOutput wraps the output with session info for persistence. -type AgentOutput[State, Out any] struct { - SessionID string `json:"sessionId"` - Output Out `json:"output"` - State State `json:"state"` - Artifacts []Artifact `json:"artifacts,omitempty"` +// Send sends a user message to the model. +func (s *GenerateBidiSession) Send(messages ...*ai.Message) error { + return s.conn.Send(&ai.GenerateRequest{Messages: messages}) } -// Agent is a bidi flow with automatic session state management. -// Init = State: the initial state for new sessions (ignored when resuming an existing session). -type Agent[State, In, Out, Stream any] struct { - *BidiFlow[State, In, AgentOutput[State, Out], Stream] - store session.Store[State] +// SendText is a convenience method for sending a text message. +func (s *GenerateBidiSession) SendText(text string) error { + return s.Send(ai.NewUserTextMessage(text)) } -// AgentResult is the return type for agent functions. -type AgentResult[Out any] struct { - Output Out - Artifacts []Artifact +// Stream returns an iterator for receiving response chunks. +func (s *GenerateBidiSession) Stream() iter.Seq2[*ai.GenerateResponseChunk, error] { + return s.conn.Responses() } -// AgentFunc is the function signature for agents. -type AgentFunc[State, In, Out, Stream any] func( - ctx context.Context, - sess *session.Session[State], - inCh <-chan In, - resp *Responder[Stream], -) (AgentResult[Out], error) +// Close signals that the conversation is complete. +func (s *GenerateBidiSession) Close() error { + return s.conn.Close() +} + +// Output returns the final response after the session completes. +func (s *GenerateBidiSession) Output() (*ai.GenerateResponse, error) { + return s.conn.Output() +} +``` + +**Usage:** + +```go +// GenerateBidi starts a bidirectional streaming session with a model. +func GenerateBidi(ctx context.Context, g *Genkit, opts ...GenerateBidiOption) (*GenerateBidiSession, error) + +// GenerateBidiOption configures a bidi generation session. +type GenerateBidiOption interface { + applyGenerateBidi(*generateBidiOptions) error +} + +// WithBidiModel specifies the model to use. +func WithBidiModel(model *aix.BidiModel) GenerateBidiOption + +// WithBidiConfig provides generation config (temperature, etc.) passed via init. +func WithBidiConfig(config any) GenerateBidiOption + +// WithBidiSystem provides the system prompt passed via init. +func WithBidiSystem(system string) GenerateBidiOption + +// WithBidiTools provides tools for the model passed via init. +func WithBidiTools(tools ...ai.Tool) GenerateBidiOption +``` + +**Example:** + +```go +// Start a real-time session +session, err := genkit.GenerateBidi(ctx, g, + genkit.WithBidiModel(geminiLive), + genkit.WithBidiConfig(&googlegenai.GenerationConfig{Temperature: ptr(0.7)}), + genkit.WithBidiSystem("You are a helpful voice assistant"), +) +if err != nil { + return err +} +defer session.Close() + +// Send a message +session.SendText("Hello!") + +// Listen for responses (can happen simultaneously with sends) +for chunk, err := range session.Stream() { + if err != nil { + return err + } + fmt.Print(chunk.Text()) +} + +// Continue the conversation +session.SendText("Tell me more about that.") +for chunk, err := range session.Stream() { + // ... +} + +// Get final usage stats +response, _ := session.Output() +fmt.Printf("Total tokens: %d\n", response.Usage.TotalTokens) +``` + +### 2.6 Tool Calling in BidiModel + +Real-time models may support tool calling. The pattern follows the standard generate flow but within the streaming context: + +```go +session, _ := genkit.GenerateBidi(ctx, g, + genkit.WithBidiModel(geminiLive), + genkit.WithBidiTools(weatherTool, calculatorTool), +) + +session.SendText("What's the weather in NYC?") + +for chunk, err := range session.Stream() { + if err != nil { + return err + } + + // Check for tool calls + if toolCall := chunk.ToolCall(); toolCall != nil { + // Execute the tool + result, _ := toolCall.Tool.Execute(ctx, toolCall.Input) + + // Send tool result back to the model + session.Send(ai.NewToolResultMessage(toolCall.ID, result)) + } else { + fmt.Print(chunk.Text()) + } +} ``` --- -## 2. API Surface +## 3. API Surface -### 2.1 Defining Bidi Actions +### 3.1 Defining Bidi Actions ```go // In go/core/x/bidi.go @@ -182,7 +346,7 @@ func DefineBidiAction[Init, In, Out, Stream any]( Schemas for `In`, `Out`, `Init`, and `Stream` types are automatically inferred from the type parameters using the existing JSON schema inference in `go/internal/base/json.go`. -### 2.2 Defining Bidi Flows +### 3.2 Defining Bidi Flows ```go // In go/core/x/bidi_flow.go @@ -196,33 +360,9 @@ func DefineBidiFlow[Init, In, Out, Stream any]( ) *BidiFlow[Init, In, Out, Stream] ``` -### 2.3 Defining Agents - -```go -// In go/core/x/agent.go - -// DefineAgent creates an Agent with automatic session management and registers it. -// Use this for multi-turn conversational agents that need to persist state across turns. -func DefineAgent[State, In, Out, Stream any]( - r api.Registry, - name string, - fn AgentFunc[State, In, Out, Stream], - opts ...AgentOption[State], -) *Agent[State, In, Out, Stream] - -// AgentOption configures an Agent. -type AgentOption[State any] interface { - applyAgent(*agentOptions[State]) error -} - -// WithSessionStore sets the session store for persisting session state. -// If not provided, sessions exist only in memory for the connection lifetime. -func WithSessionStore[State any](store session.Store[State]) AgentOption[State] -``` - -### 2.4 Starting Connections +### 3.3 Starting Connections -All bidi types (BidiAction, BidiFlow, Agent) use the same `StreamBidi` method to start connections: +All bidi types (BidiAction, BidiFlow, BidiModel) use the same `StreamBidi` method to start connections: ```go // BidiAction/BidiFlow @@ -237,22 +377,10 @@ type BidiOption[Init any] interface { } // WithInit provides initialization data for the bidi action. -// For Agent, this sets the initial state for new sessions. func WithInit[Init any](init Init) BidiOption[Init] - -// WithSessionID specifies an existing session ID to resume. -// If the session exists in the store, it is loaded (WithInit is ignored). -// If the session doesn't exist, a new session is created with this ID. -// If not provided, a new UUID is generated for new sessions. -func WithSessionID[Init any](id string) BidiOption[Init] - -func (a *Agent[State, In, Out, Stream]) StreamBidi( - ctx context.Context, - opts ...BidiOption[State], -) (*BidiConnection[In, AgentOutput[State, Out], Stream], error) ``` -### 2.5 High-Level Genkit API +### 3.4 High-Level Genkit API ```go // In go/genkit/bidi.go @@ -263,61 +391,13 @@ func DefineBidiFlow[Init, In, Out, Stream any]( fn corex.BidiFunc[Init, In, Out, Stream], ) *corex.BidiFlow[Init, In, Out, Stream] -func DefineAgent[State, In, Out, Stream any]( +func GenerateBidi( + ctx context.Context, g *Genkit, - name string, - fn corex.AgentFunc[State, In, Out, Stream], - opts ...corex.AgentOption[State], -) *corex.Agent[State, In, Out, Stream] -``` - ---- - -## 3. Agent Details - -### 3.1 Using StreamBidi with Agent - -Agent uses the same `StreamBidi` method as BidiAction and BidiFlow. Session ID is a connection option, and initial state is passed via `WithInit`: - -```go -// Define once at startup -chatAgent := genkit.DefineAgent(g, "chatAgent", - myAgentFunc, - corex.WithSessionStore(store), -) - -// NEW USER: Start fresh session (generates new ID, zero state) -conn1, _ := chatAgent.StreamBidi(ctx) - -// RETURNING USER: Resume existing session by ID -conn2, _ := chatAgent.StreamBidi(ctx, corex.WithSessionID[ChatState]("user-123-session")) - -// NEW USER WITH INITIAL STATE: Start with pre-populated state -conn3, _ := chatAgent.StreamBidi(ctx, corex.WithInit(ChatState{Messages: preloadedHistory})) - -// NEW USER WITH SPECIFIC ID AND INITIAL STATE -conn4, _ := chatAgent.StreamBidi(ctx, - corex.WithSessionID[ChatState]("custom-session-id"), - corex.WithInit(ChatState{Messages: preloadedHistory}), -) + opts ...GenerateBidiOption, +) (*GenerateBidiSession, error) ``` -The Agent internally handles session creation/loading: -- If `WithSessionID` is provided and session exists in store → load existing session (WithInit ignored) -- If `WithSessionID` is provided but session doesn't exist → create new session with that ID and initial state from WithInit -- If no `WithSessionID` → generate new UUID and create session with initial state from WithInit - -The session ID is returned in `AgentOutput.SessionID`, so callers can retrieve it from the final output: - -```go -output, _ := conn.Output() -sessionID := output.SessionID // Save this to resume later -``` - -### 3.2 State Persistence - -State is persisted automatically when `sess.UpdateState()` is called - the existing `session.Session` implementation already persists to the configured store. No special persistence mode is needed; the user controls when to persist by calling `UpdateState()`. - --- ## 4. Integration with Existing Infrastructure @@ -339,12 +419,13 @@ BidiFlows create spans that remain open for the lifetime of the connection, enab ### 4.2 Action Registration -Add new action type and schema fields: +Add new action types and schema fields: ```go // In go/core/api/action.go const ( - ActionTypeBidiFlow ActionType = "bidi-flow" + ActionTypeBidiFlow ActionType = "bidi-flow" + ActionTypeBidiModel ActionType = "bidi-model" ) // ActionDesc gets two new optional fields @@ -355,20 +436,6 @@ type ActionDesc struct { } ``` -### 4.3 Session Integration - -Use existing `Session` and `Store` types from `go/core/x/session` (remains a separate subpackage): - -```go -import "github.com/firebase/genkit/go/core/x/session" - -// Agent holds reference to session store -type Agent[State, In, Out, Stream any] struct { - store session.Store[State] - // ... -} -``` - --- ## 5. Example Usage @@ -389,7 +456,7 @@ func main() { ctx := context.Background() g := genkit.Init(ctx) - // Define echo bidi flow (low-level, no turn semantics) + // Define echo bidi flow echoFlow := genkit.DefineBidiFlow(g, "echo", func(ctx context.Context, init struct{}, inCh <-chan string, outCh chan<- string) (string, error) { var count int @@ -426,111 +493,7 @@ func main() { } ``` -### 5.2 Chat Agent with Session Persistence - -```go -package main - -import ( - "context" - "fmt" - - "github.com/firebase/genkit/go/ai" - corex "github.com/firebase/genkit/go/core/x" - "github.com/firebase/genkit/go/core/x/session" - "github.com/firebase/genkit/go/genkit" - "github.com/firebase/genkit/go/plugins/googlegenai" -) - -type ChatState struct { - Messages []*ai.Message `json:"messages"` -} - -func main() { - ctx := context.Background() - store := session.NewInMemoryStore[ChatState]() - - g := genkit.Init(ctx, - genkit.WithPlugins(&googlegenai.GoogleAI{}), - genkit.WithDefaultModel("googleai/gemini-2.5-flash"), - ) - - // Define an agent for multi-turn chat - chatAgent := genkit.DefineAgent(g, "chatAgent", - func(ctx context.Context, sess *session.Session[ChatState], inCh <-chan string, resp *corex.Responder[string]) (corex.AgentResult[string], error) { - state := sess.State() - messages := state.Messages - - for input := range inCh { - messages = append(messages, ai.NewUserTextMessage(input)) - - var respText string - for result, err := range genkit.GenerateStream(ctx, g, - ai.WithMessages(messages...), - ) { - if err != nil { - return corex.AgentResult[string]{}, err - } - if result.Done { - respText = result.Response.Text() - } - resp.Send(result.Chunk.Text()) - } - resp.EndTurn() // Signal turn complete, consumer's Stream() exits - - messages = append(messages, ai.NewModelTextMessage(respText)) - sess.UpdateState(ctx, ChatState{Messages: messages}) - } - - return corex.AgentResult[string]{ - Output: "conversation ended", - Artifacts: []corex.Artifact{ - { - Name: "summary", - Parts: []*ai.Part{ai.NewTextPart("...")}, - }, - }, - }, nil - }, - corex.WithSessionStore(store), - ) - - // Start new session (generates new session ID) - conn, _ := chatAgent.StreamBidi(ctx) - - // First turn - conn.Send("Hello! Tell me about Go programming.") - for chunk, err := range conn.Responses() { - if err != nil { - panic(err) - } - fmt.Print(chunk) - } - // Loop exits when agent calls resp.EndTurn() - - // Second turn - conn.Send("What are channels used for?") - for chunk, err := range conn.Responses() { - if err != nil { - panic(err) - } - fmt.Print(chunk) - } - - conn.Close() - - // Get session ID from final output to resume later - output, _ := conn.Output() - sessionID := output.SessionID - - // Resume session later with the saved ID - conn2, _ := chatAgent.StreamBidi(ctx, corex.WithSessionID[ChatState](sessionID)) - conn2.Send("Continue our discussion") - // ... -} -``` - -### 5.3 Bidi Flow with Initialization Data +### 5.2 Bidi Flow with Initialization Data ```go type ChatInit struct { @@ -561,6 +524,33 @@ conn, _ := configuredChat.StreamBidi(ctx, ) ``` +### 5.3 Real-Time Voice Model Session + +```go +// Using a bidi model for voice-like interactions +session, _ := genkit.GenerateBidi(ctx, g, + genkit.WithBidiModel(geminiLive), + genkit.WithBidiSystem("You are a voice assistant. Keep responses brief."), +) +defer session.Close() + +// Simulate real-time voice input/output +go func() { + // In a real app, this would be audio transcription + session.SendText("What time is it in Tokyo?") +}() + +// Stream responses as they arrive +for chunk, err := range session.Stream() { + if err != nil { + log.Printf("Error: %v", err) + break + } + // In a real app, this would go to text-to-speech + fmt.Print(chunk.Text()) +} +``` + --- ## 6. Files to Create/Modify @@ -572,22 +562,23 @@ conn, _ := configuredChat.StreamBidi(ctx, | `go/core/x/bidi.go` | BidiAction, BidiFunc, BidiConnection | | `go/core/x/bidi_flow.go` | BidiFlow with tracing | | `go/core/x/bidi_options.go` | BidiOption types | -| `go/core/x/agent.go` | Agent implementation | | `go/core/x/bidi_test.go` | Tests | +| `go/ai/x/bidi_model.go` | BidiModel, BidiModelFunc, GenerateBidiSession | +| `go/ai/x/bidi_model_test.go` | Tests | | `go/genkit/bidi.go` | High-level API wrappers | ### Modified Files | File | Change | |------|--------| -| `go/core/api/action.go` | Add `ActionTypeBidiFlow` constant | +| `go/core/api/action.go` | Add `ActionTypeBidiFlow`, `ActionTypeBidiModel` constants | --- ## 7. Implementation Notes ### Error Handling -- Errors from the bidi function propagate to both `Stream()` iterator and `Output()` +- Errors from the bidi function propagate to both `Responses()` iterator and `Output()` - Context cancellation closes all channels and terminates the action - Send after Close returns an error - Errors are yielded as the second value in the `iter.Seq2[Stream, error]` iterator @@ -601,47 +592,12 @@ conn, _ := configuredChat.StreamBidi(ctx, ### Thread Safety - BidiConnection uses mutex for state (closed flag) - Send is safe to call from multiple goroutines -- Session operations are thread-safe (from existing session package) ### Channels and Backpressure - Both input and output channels are **unbuffered** by default (size 0) -- This provides natural backpressure: `Send()` blocks until agent reads, `resp.Send()` blocks until consumer reads +- This provides natural backpressure: `Send()` blocks until the action reads, output blocks until consumer reads - If needed, `WithInputBufferSize` / `WithOutputBufferSize` options could be added later for specific use cases -### Turn Signaling (Agents) - -For multi-turn conversations, the consumer needs to know when the agent has finished responding to one input and is ready for the next. - -**How it works internally:** - -1. `BidiConnection.streamCh` is actually `chan streamChunk[Stream]` (internal type) -2. `streamChunk` has `data T` and `endTurn bool` fields (unexported) -3. `resp.Send(data)` sends `streamChunk{data: data}` -4. `resp.EndTurn()` sends `streamChunk{endTurn: true}` -5. `conn.Responses()` unwraps chunks, yielding only the data -6. When `Stream()` sees `endTurn: true`, it exits the iterator without yielding - -**From the agent's perspective:** -```go -for input := range inCh { - resp.Send("partial...") - resp.Send("more...") - resp.EndTurn() // Consumer's for loop exits here -} -``` - -**From the consumer's perspective:** -```go -conn.Send("question") -for chunk, err := range conn.Responses() { - fmt.Print(chunk) // Just gets string, not streamChunk -} -// Loop exited because agent called EndTurn() - -conn.Send("follow-up") -for chunk, err := range conn.Responses() { ... } -``` - ### Tracing - Span is started when connection is created, ended when action completes - Nested spans work normally within the bidi function @@ -663,20 +619,11 @@ On context cancellation: 2. All channels are closed 3. `Output()` returns the context error -### Agent Internal Wrapping -The user's `AgentFunc` returns `AgentResult[Out]`, but `Agent.StreamBidi()` returns `AgentOutput[State, Out]`. Internally, Agent wraps the user function: +--- -```go -// Simplified internal logic -result, err := userFunc(ctx, wrappedInCh, outCh, sess) -if err != nil { - return AgentOutput[State, Out]{}, err -} -return AgentOutput[State, Out]{ - SessionID: sess.ID(), - Output: result.Output, - State: sess.State(), - Artifacts: result.Artifacts, -}, nil -``` +## 8. Integration with Reflection API + +These features align with **Reflection API V2**, which uses WebSockets to support bidirectional streaming between the Runtime and the CLI/Manager. +- `runAction` now supports an `input` stream +- `streamChunk` notifications are bidirectional (Manager <-> Runtime) From 2af14e75e6fa1262cee22129dec726e6e8540692 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Wed, 21 Jan 2026 12:23:51 -0800 Subject: [PATCH 15/19] Update go-agent-design.md --- docs/go-agent-design.md | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/docs/go-agent-design.md b/docs/go-agent-design.md index 7553bbc8c4..61ea3c4c9a 100644 --- a/docs/go-agent-design.md +++ b/docs/go-agent-design.md @@ -37,12 +37,14 @@ type Responder[T any] struct { func (r *Responder[T]) Send(data T) // EndTurn signals that the agent has finished responding to the current input. -// The consumer's Responses() iterator will exit, allowing them to send the next input. +// The consumer's Receive() iterator will exit, allowing them to send the next input. func (r *Responder[T]) EndTurn() ``` ### 1.2 Agent Types +> **Note:** The `AgentOutput`, `AgentResult`, and `Artifact` types are generated from the source of truth Zod schemas in `genkit-tools` shared between runtimes. + ```go // Artifact represents a named collection of parts produced during a session. // Examples: generated files, images, code snippets, etc. @@ -215,8 +217,8 @@ For multi-turn conversations, the consumer needs to know when the agent has fini 2. `streamChunk` has `data T` and `endTurn bool` fields (unexported) 3. `resp.Send(data)` sends `streamChunk{data: data}` 4. `resp.EndTurn()` sends `streamChunk{endTurn: true}` -5. `conn.Responses()` unwraps chunks, yielding only the data -6. When `Responses()` sees `endTurn: true`, it exits the iterator without yielding +5. `conn.Receive()` unwraps chunks, yielding only the data +6. When `Receive()` sees `endTurn: true`, it exits the iterator without yielding ### 4.2 From the Agent's Perspective @@ -232,13 +234,13 @@ for input := range inCh { ```go conn.Send("question") -for chunk, err := range conn.Responses() { +for chunk, err := range conn.Receive() { fmt.Print(chunk) // Just gets string, not streamChunk } // Loop exited because agent called EndTurn() conn.Send("follow-up") -for chunk, err := range conn.Responses() { ... } +for chunk, err := range conn.Receive() { ... } ``` --- @@ -296,7 +298,7 @@ func main() { } resp.Send(result.Chunk.Text()) } - resp.EndTurn() // Signal turn complete, consumer's Responses() exits + resp.EndTurn() // Signal turn complete, consumer's Receive() exits messages = append(messages, ai.NewModelTextMessage(respText)) sess.UpdateState(ctx, ChatState{Messages: messages}) @@ -320,7 +322,7 @@ func main() { // First turn conn.Send("Hello! Tell me about Go programming.") - for chunk, err := range conn.Responses() { + for chunk, err := range conn.Receive() { if err != nil { panic(err) } @@ -330,7 +332,7 @@ func main() { // Second turn conn.Send("What are channels used for?") - for chunk, err := range conn.Responses() { + for chunk, err := range conn.Receive() { if err != nil { panic(err) } From aff85a00c71bed37c003389932bffa7dec174e8d Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Wed, 21 Jan 2026 12:23:56 -0800 Subject: [PATCH 16/19] Update go-bidi-design.md --- docs/go-bidi-design.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 050cff49f2..6443dacd00 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -80,9 +80,9 @@ func (c *BidiConnection[In, Out, Stream]) Send(input In) error // Close signals that no more inputs will be sent. func (c *BidiConnection[In, Out, Stream]) Close() error -// Responses returns an iterator for receiving streamed response chunks. +// Receive returns an iterator for receiving streamed response chunks. // The iterator completes when the action finishes or signals end of turn. -func (c *BidiConnection[In, Out, Stream]) Responses() iter.Seq2[Stream, error] +func (c *BidiConnection[In, Out, Stream]) Receive() iter.Seq2[Stream, error] // Output returns the final output after the action completes. // Blocks until done or context cancelled. @@ -217,7 +217,7 @@ func (s *GenerateBidiSession) SendText(text string) error { // Stream returns an iterator for receiving response chunks. func (s *GenerateBidiSession) Stream() iter.Seq2[*ai.GenerateResponseChunk, error] { - return s.conn.Responses() + return s.conn.Receive() } // Close signals that the conversation is complete. @@ -480,7 +480,7 @@ func main() { conn.Close() // Consume stream via iterator - for chunk, err := range conn.Responses() { + for chunk, err := range conn.Receive() { if err != nil { panic(err) } From 39f49abef2b6f4d546f262d28e61b7443cf3f419 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Wed, 21 Jan 2026 12:47:13 -0800 Subject: [PATCH 17/19] Update go-bidi-design.md --- docs/go-bidi-design.md | 185 ++++++++++++++++++++--------------------- 1 file changed, 90 insertions(+), 95 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 6443dacd00..20fc595be6 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -112,9 +112,9 @@ type BidiFlow[Init, In, Out, Stream any] struct { For real-time sessions, the connection to the model API often requires configuration to be established *before* the first user message is received. The `init` payload fulfills this requirement: -- **`init`**: `GenerateRequest` (contains config, tools, system prompt) -- **`inputStream`**: Stream of `GenerateRequest` (contains user messages/turns) -- **`stream`**: Stream of `GenerateResponseChunk` +- **`init`**: `ModelRequest` (contains config, tools, system prompt) +- **`inputStream`**: Stream of `ModelRequest` (contains user messages/turns) +- **`stream`**: Stream of `ModelResponseChunk` ### 2.3 Type Definitions @@ -123,43 +123,46 @@ For real-time sessions, the connection to the model API often requires configura // BidiModel represents a bidirectional streaming model for real-time LLM APIs. type BidiModel struct { - *corex.BidiAction[*ai.GenerateRequest, *ai.GenerateRequest, *ai.GenerateResponse, *ai.GenerateResponseChunk] + *corex.BidiAction[*ai.ModelRequest, *ai.ModelRequest, *ai.ModelResponse, *ai.ModelResponseChunk] } // BidiModelFunc is the function signature for bidi model implementations. type BidiModelFunc func( ctx context.Context, - init *ai.GenerateRequest, - inCh <-chan *ai.GenerateRequest, - outCh chan<- *ai.GenerateResponseChunk, -) (*ai.GenerateResponse, error) + init *ai.ModelRequest, + inCh <-chan *ai.ModelRequest, + outCh chan<- *ai.ModelResponseChunk, +) (*ai.ModelResponse, error) ``` ### 2.4 Defining a BidiModel ```go // DefineBidiModel creates and registers a BidiModel for real-time LLM interactions. -func DefineBidiModel( - r api.Registry, - name string, - fn BidiModelFunc, -) *BidiModel +// The opts parameter follows the same pattern as DefineModel for consistency. +func DefineBidiModel(r api.Registry, name string, opts *ai.ModelOptions, fn BidiModelFunc) *BidiModel ``` **Example Plugin Implementation:** ```go -// In a plugin like googlegenai - func (g *GoogleAI) defineBidiModel(r api.Registry) *aix.BidiModel { return aix.DefineBidiModel(r, "googleai/gemini-2.0-flash-live", - func(ctx context.Context, init *ai.GenerateRequest, inCh <-chan *ai.GenerateRequest, outCh chan<- *ai.GenerateResponseChunk) (*ai.GenerateResponse, error) { - // 1. Establish session using configuration from init - session, err := g.client.ConnectLive(ctx, &genai.LiveConnectConfig{ - Model: "gemini-2.0-flash-live", - SystemPrompt: extractSystemPrompt(init), - Tools: convertTools(init.Tools), - Config: convertConfig(init.Config), + &ai.ModelOptions{ + Label: "Gemini 2.0 Flash Live", + Supports: &ai.ModelSupports{ + Multiturn: true, + Tools: true, + SystemRole: true, + Media: true, + }, + }, + func(ctx context.Context, init *ai.ModelRequest, inCh <-chan *ai.ModelRequest, outCh chan<- *ai.ModelResponseChunk) (*ai.ModelResponse, error) { + session, err := g.client.Live.Connect(ctx, "gemini-2.0-flash-live", &genai.LiveConnectConfig{ + SystemInstruction: toContent(init.Messages), + Tools: toTools(init.Tools), + Temperature: toFloat32Ptr(init.Config), + ResponseModalities: []genai.Modality{genai.ModalityText}, }) if err != nil { return nil, err @@ -168,24 +171,47 @@ func (g *GoogleAI) defineBidiModel(r api.Registry) *aix.BidiModel { var totalUsage ai.GenerationUsage - // 2. Handle conversation stream for request := range inCh { - // Send new user input to the upstream session - if err := session.SendContent(convertMessages(request.Messages)); err != nil { + err := session.SendClientContent(genai.LiveClientContentInput{ + Turns: toContents(request.Messages), + TurnComplete: true, + }) + if err != nil { return nil, err } - // Yield responses from the upstream session - for chunk := range session.Receive() { - outCh <- &ai.GenerateResponseChunk{ - Content: []*ai.Part{ai.NewTextPart(chunk.Text)}, + for { + msg, err := session.Receive() + if err != nil { + return nil, err + } + + if msg.ToolCall != nil { + outCh <- &ai.ModelResponseChunk{ + Content: toToolCallParts(msg.ToolCall), + } + continue + } + + if msg.ServerContent != nil { + if msg.ServerContent.ModelTurn != nil { + outCh <- &ai.ModelResponseChunk{ + Content: fromParts(msg.ServerContent.ModelTurn.Parts), + } + } + if msg.ServerContent.TurnComplete { + break + } + } + + if msg.UsageMetadata != nil { + totalUsage.InputTokens += int(msg.UsageMetadata.PromptTokenCount) + totalUsage.OutputTokens += int(msg.UsageMetadata.CandidatesTokenCount) } - totalUsage.Add(chunk.Usage) } } - // 3. Return final result (usage stats, etc.) - return &ai.GenerateResponse{ + return &ai.ModelResponse{ Usage: &totalUsage, }, nil }, @@ -200,93 +226,75 @@ func (g *GoogleAI) defineBidiModel(r api.Registry) *aix.BidiModel { ```go // In go/genkit/generate.go or go/ai/x/generate_bidi.go -// GenerateBidiSession wraps BidiConnection with model-specific convenience methods. -type GenerateBidiSession struct { - conn *corex.BidiConnection[*ai.GenerateRequest, *ai.GenerateResponse, *ai.GenerateResponseChunk] +// ModelBidiConnection wraps BidiConnection with model-specific convenience methods. +type ModelBidiConnection struct { + conn *corex.BidiConnection[*ai.ModelRequest, *ai.ModelResponse, *ai.ModelResponseChunk] } // Send sends a user message to the model. -func (s *GenerateBidiSession) Send(messages ...*ai.Message) error { - return s.conn.Send(&ai.GenerateRequest{Messages: messages}) +func (s *ModelBidiConnection) Send(messages ...*ai.Message) error { + return s.conn.Send(&ai.ModelRequest{Messages: messages}) } // SendText is a convenience method for sending a text message. -func (s *GenerateBidiSession) SendText(text string) error { +func (s *ModelBidiConnection) SendText(text string) error { return s.Send(ai.NewUserTextMessage(text)) } // Stream returns an iterator for receiving response chunks. -func (s *GenerateBidiSession) Stream() iter.Seq2[*ai.GenerateResponseChunk, error] { +func (s *ModelBidiConnection) Receive() iter.Seq2[*ai.ModelResponseChunk, error] { return s.conn.Receive() } // Close signals that the conversation is complete. -func (s *GenerateBidiSession) Close() error { +func (s *ModelBidiConnection) Close() error { return s.conn.Close() } // Output returns the final response after the session completes. -func (s *GenerateBidiSession) Output() (*ai.GenerateResponse, error) { +func (s *ModelBidiConnection) Output() (*ai.ModelResponse, error) { return s.conn.Output() } ``` **Usage:** +`GenerateBidi` uses the same shared option types as regular `Generate` calls. Options like `WithModel`, `WithConfig`, `WithSystem`, and `WithTools` work the same way - they configure the initial session setup. + ```go // GenerateBidi starts a bidirectional streaming session with a model. -func GenerateBidi(ctx context.Context, g *Genkit, opts ...GenerateBidiOption) (*GenerateBidiSession, error) - -// GenerateBidiOption configures a bidi generation session. -type GenerateBidiOption interface { - applyGenerateBidi(*generateBidiOptions) error -} - -// WithBidiModel specifies the model to use. -func WithBidiModel(model *aix.BidiModel) GenerateBidiOption - -// WithBidiConfig provides generation config (temperature, etc.) passed via init. -func WithBidiConfig(config any) GenerateBidiOption - -// WithBidiSystem provides the system prompt passed via init. -func WithBidiSystem(system string) GenerateBidiOption - -// WithBidiTools provides tools for the model passed via init. -func WithBidiTools(tools ...ai.Tool) GenerateBidiOption +// Uses the existing shared option types from ai/option.go. +func GenerateBidi(ctx context.Context, g *Genkit, opts ...ai.GenerateBidiOption) (*ModelBidiConnection, error) ``` **Example:** ```go -// Start a real-time session session, err := genkit.GenerateBidi(ctx, g, - genkit.WithBidiModel(geminiLive), - genkit.WithBidiConfig(&googlegenai.GenerationConfig{Temperature: ptr(0.7)}), - genkit.WithBidiSystem("You are a helpful voice assistant"), + ai.WithModel(geminiLive), + ai.WithConfig(&googlegenai.GenerationConfig{Temperature: ptr(0.7)}), + ai.WithSystem("You are a helpful voice assistant"), + ai.WithTools(weatherTool), ) if err != nil { return err } defer session.Close() -// Send a message session.SendText("Hello!") -// Listen for responses (can happen simultaneously with sends) -for chunk, err := range session.Stream() { +for chunk, err := range session.Receive() { if err != nil { return err } fmt.Print(chunk.Text()) } -// Continue the conversation session.SendText("Tell me more about that.") -for chunk, err := range session.Stream() { +for chunk, err := range session.Receive() { // ... } -// Get final usage stats response, _ := session.Output() fmt.Printf("Total tokens: %d\n", response.Usage.TotalTokens) ``` @@ -297,23 +305,19 @@ Real-time models may support tool calling. The pattern follows the standard gene ```go session, _ := genkit.GenerateBidi(ctx, g, - genkit.WithBidiModel(geminiLive), - genkit.WithBidiTools(weatherTool, calculatorTool), + ai.WithModel(geminiLive), + ai.WithTools(weatherTool, calculatorTool), ) session.SendText("What's the weather in NYC?") -for chunk, err := range session.Stream() { +for chunk, err := range session.Receive() { if err != nil { return err } - // Check for tool calls if toolCall := chunk.ToolCall(); toolCall != nil { - // Execute the tool result, _ := toolCall.Tool.Execute(ctx, toolCall.Input) - - // Send tool result back to the model session.Send(ai.NewToolResultMessage(toolCall.ID, result)) } else { fmt.Print(chunk.Text()) @@ -391,11 +395,13 @@ func DefineBidiFlow[Init, In, Out, Stream any]( fn corex.BidiFunc[Init, In, Out, Stream], ) *corex.BidiFlow[Init, In, Out, Stream] +// GenerateBidi uses shared options from ai/option.go +// Options like WithModel, WithConfig, WithSystem, WithTools configure the session init. func GenerateBidi( ctx context.Context, g *Genkit, - opts ...GenerateBidiOption, -) (*GenerateBidiSession, error) + opts ...ai.GenerateBidiOption, +) (*ModelBidiConnection, error) ``` --- @@ -456,7 +462,6 @@ func main() { ctx := context.Background() g := genkit.Init(ctx) - // Define echo bidi flow echoFlow := genkit.DefineBidiFlow(g, "echo", func(ctx context.Context, init struct{}, inCh <-chan string, outCh chan<- string) (string, error) { var count int @@ -468,28 +473,24 @@ func main() { }, ) - // Start streaming connection conn, err := echoFlow.StreamBidi(ctx) if err != nil { panic(err) } - // Send messages conn.Send("hello") conn.Send("world") conn.Close() - // Consume stream via iterator for chunk, err := range conn.Receive() { if err != nil { panic(err) } - fmt.Println(chunk) // "echo: hello", "echo: world" + fmt.Println(chunk) } - // Get final output output, _ := conn.Output() - fmt.Println(output) // "processed 2 messages" + fmt.Println(output) } ``` @@ -503,7 +504,6 @@ type ChatInit struct { configuredChat := genkit.DefineBidiFlow(g, "configuredChat", func(ctx context.Context, init ChatInit, inCh <-chan string, outCh chan<- string) (string, error) { - // Use init.SystemPrompt and init.Temperature for input := range inCh { resp, _ := genkit.GenerateText(ctx, g, ai.WithSystem(init.SystemPrompt), @@ -527,26 +527,21 @@ conn, _ := configuredChat.StreamBidi(ctx, ### 5.3 Real-Time Voice Model Session ```go -// Using a bidi model for voice-like interactions session, _ := genkit.GenerateBidi(ctx, g, - genkit.WithBidiModel(geminiLive), - genkit.WithBidiSystem("You are a voice assistant. Keep responses brief."), + ai.WithModel(geminiLive), + ai.WithSystem("You are a voice assistant. Keep responses brief."), ) defer session.Close() -// Simulate real-time voice input/output go func() { - // In a real app, this would be audio transcription session.SendText("What time is it in Tokyo?") }() -// Stream responses as they arrive -for chunk, err := range session.Stream() { +for chunk, err := range session.Receive() { if err != nil { log.Printf("Error: %v", err) break } - // In a real app, this would go to text-to-speech fmt.Print(chunk.Text()) } ``` @@ -563,7 +558,7 @@ for chunk, err := range session.Stream() { | `go/core/x/bidi_flow.go` | BidiFlow with tracing | | `go/core/x/bidi_options.go` | BidiOption types | | `go/core/x/bidi_test.go` | Tests | -| `go/ai/x/bidi_model.go` | BidiModel, BidiModelFunc, GenerateBidiSession | +| `go/ai/x/bidi_model.go` | BidiModel, BidiModelFunc, ModelBidiConnection | | `go/ai/x/bidi_model_test.go` | Tests | | `go/genkit/bidi.go` | High-level API wrappers | From f12f57c63566632b52a422618ffb87ac0408ab07 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Wed, 21 Jan 2026 13:03:37 -0800 Subject: [PATCH 18/19] Update go-bidi-design.md --- docs/go-bidi-design.md | 70 ++++++++++-------------------------------- 1 file changed, 17 insertions(+), 53 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 20fc595be6..c1551c4c13 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -270,32 +270,32 @@ func GenerateBidi(ctx context.Context, g *Genkit, opts ...ai.GenerateBidiOption) **Example:** ```go -session, err := genkit.GenerateBidi(ctx, g, +conn, err := genkit.GenerateBidi(ctx, g, ai.WithModel(geminiLive), - ai.WithConfig(&googlegenai.GenerationConfig{Temperature: ptr(0.7)}), + ai.WithConfig(&genai.LiveConnectConfig{Temperature: genai.Ptr[float32](0.7)}), ai.WithSystem("You are a helpful voice assistant"), ai.WithTools(weatherTool), ) if err != nil { return err } -defer session.Close() +defer conn.Close() -session.SendText("Hello!") +conn.SendText("Hello!") -for chunk, err := range session.Receive() { +for chunk, err := range conn.Receive() { if err != nil { return err } fmt.Print(chunk.Text()) } -session.SendText("Tell me more about that.") -for chunk, err := range session.Receive() { +conn.SendText("Tell me more about that.") +for chunk, err := range conn.Receive() { // ... } -response, _ := session.Output() +response, _ := conn.Output() fmt.Printf("Total tokens: %d\n", response.Usage.TotalTokens) ``` @@ -304,21 +304,21 @@ fmt.Printf("Total tokens: %d\n", response.Usage.TotalTokens) Real-time models may support tool calling. The pattern follows the standard generate flow but within the streaming context: ```go -session, _ := genkit.GenerateBidi(ctx, g, +conn, _ := genkit.GenerateBidi(ctx, g, ai.WithModel(geminiLive), ai.WithTools(weatherTool, calculatorTool), ) -session.SendText("What's the weather in NYC?") +conn.SendText("What's the weather in NYC?") -for chunk, err := range session.Receive() { +for chunk, err := range conn.Receive() { if err != nil { return err } if toolCall := chunk.ToolCall(); toolCall != nil { result, _ := toolCall.Tool.Execute(ctx, toolCall.Input) - session.Send(ai.NewToolResultMessage(toolCall.ID, result)) + conn.Send(ai.NewToolResponseMessage(toolCall.ID, result)) } else { fmt.Print(chunk.Text()) } @@ -369,19 +369,7 @@ func DefineBidiFlow[Init, In, Out, Stream any]( All bidi types (BidiAction, BidiFlow, BidiModel) use the same `StreamBidi` method to start connections: ```go -// BidiAction/BidiFlow -func (a *BidiAction[Init, In, Out, Stream]) StreamBidi( - ctx context.Context, - opts ...BidiOption[Init], -) (*BidiConnection[In, Out, Stream], error) - -// BidiOption configures a bidi connection. -type BidiOption[Init any] interface { - applyBidi(*bidiOptions[Init]) error -} - -// WithInit provides initialization data for the bidi action. -func WithInit[Init any](init Init) BidiOption[Init] +func (ba *BidiAction[Init, In, Out, Stream]) StreamBidi(ctx context.Context, init Init) (*BidiConnection[In, Out, Stream], error) ``` ### 3.4 High-Level Genkit API @@ -516,34 +504,10 @@ configuredChat := genkit.DefineBidiFlow(g, "configuredChat", }, ) -conn, _ := configuredChat.StreamBidi(ctx, - corex.WithInit(ChatInit{ - SystemPrompt: "You are a helpful assistant.", - Temperature: 0.7, - }), -) -``` - -### 5.3 Real-Time Voice Model Session - -```go -session, _ := genkit.GenerateBidi(ctx, g, - ai.WithModel(geminiLive), - ai.WithSystem("You are a voice assistant. Keep responses brief."), -) -defer session.Close() - -go func() { - session.SendText("What time is it in Tokyo?") -}() - -for chunk, err := range session.Receive() { - if err != nil { - log.Printf("Error: %v", err) - break - } - fmt.Print(chunk.Text()) -} +conn, _ := configuredChat.StreamBidi(ctx, ChatInit{ + SystemPrompt: "You are a helpful assistant.", + Temperature: 0.7, +}) ``` --- From 60cacc76539cf6aa34dc385812a82f492cac76be Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Wed, 21 Jan 2026 13:05:59 -0800 Subject: [PATCH 19/19] Update go-agent-design.md --- docs/go-agent-design.md | 64 ++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/docs/go-agent-design.md b/docs/go-agent-design.md index 61ea3c4c9a..e1d6e2c0b1 100644 --- a/docs/go-agent-design.md +++ b/docs/go-agent-design.md @@ -113,24 +113,29 @@ func WithSessionStore[State any](store session.Store[State]) AgentOption[State] ### 2.2 Starting Connections -Agent uses the same `StreamBidi` method as BidiAction and BidiFlow. Session ID is a connection option, and initial state is passed via `WithInit`: +Agent has its own `StreamBidi` signature with session-specific options: ```go +// StreamBidiOption configures a StreamBidi call. +type StreamBidiOption[State any] interface { + applyStreamBidi(*streamBidiOptions[State]) error +} + +// WithSessionID sets the session ID for resuming or creating a session. +// If the session exists in store, resumes that session (init ignored). +// If the session doesn't exist, creates new session with this ID. +// If not provided, generates a new UUID for the session. +func WithSessionID[State any](id string) StreamBidiOption[State] + +// WithInit sets the initial state for new sessions. +// Ignored when resuming an existing session. +func WithInit[State any](init State) StreamBidiOption[State] + +// StreamBidi starts a new agent session or resumes an existing one. func (a *Agent[State, In, Out, Stream]) StreamBidi( ctx context.Context, - opts ...corex.BidiOption[State], + opts ...StreamBidiOption[State], ) (*corex.BidiConnection[In, AgentOutput[State, Out], Stream], error) - -// BidiOption from go/core/x - also used by Agent -// WithInit provides initialization data for the bidi action. -// For Agent, this sets the initial state for new sessions. -func WithInit[Init any](init Init) BidiOption[Init] - -// WithSessionID specifies an existing session ID to resume. -// If the session exists in the store, it is loaded (WithInit is ignored). -// If the session doesn't exist, a new session is created with this ID. -// If not provided, a new UUID is generated for new sessions. -func WithSessionID[Init any](id string) BidiOption[Init] ``` ### 2.3 High-Level Genkit API @@ -153,32 +158,31 @@ func DefineAgent[State, In, Out, Stream any]( ### 3.1 Using StreamBidi with Agent ```go -// Define once at startup chatAgent := genkit.DefineAgent(g, "chatAgent", myAgentFunc, aix.WithSessionStore(store), ) -// NEW USER: Start fresh session (generates new ID, zero state) +// New user: fresh session with generated ID and zero state conn1, _ := chatAgent.StreamBidi(ctx) -// RETURNING USER: Resume existing session by ID -conn2, _ := chatAgent.StreamBidi(ctx, corex.WithSessionID[ChatState]("user-123-session")) +// Returning user: resume existing session by ID +conn2, _ := chatAgent.StreamBidi(ctx, aix.WithSessionID[ChatState]("user-123-session")) -// NEW USER WITH INITIAL STATE: Start with pre-populated state -conn3, _ := chatAgent.StreamBidi(ctx, corex.WithInit(ChatState{Messages: preloadedHistory})) +// New user with pre-populated state +conn3, _ := chatAgent.StreamBidi(ctx, aix.WithInit(ChatState{Messages: preloadedHistory})) -// NEW USER WITH SPECIFIC ID AND INITIAL STATE +// New user with specific ID and initial state conn4, _ := chatAgent.StreamBidi(ctx, - corex.WithSessionID[ChatState]("custom-session-id"), - corex.WithInit(ChatState{Messages: preloadedHistory}), + aix.WithSessionID[ChatState]("custom-session-id"), + aix.WithInit(ChatState{Messages: preloadedHistory}), ) ``` The Agent internally handles session creation/loading: -- If `WithSessionID` is provided and session exists in store → load existing session (WithInit ignored) -- If `WithSessionID` is provided but session doesn't exist → create new session with that ID and initial state from WithInit -- If no `WithSessionID` → generate new UUID and create session with initial state from WithInit +- If `WithSessionID` is provided and session exists in store → load existing session (init ignored) +- If `WithSessionID` is provided but session doesn't exist → create new session with that ID and init +- If `WithSessionID` is not provided → generate new UUID and create session with init The session ID is returned in `AgentOutput.SessionID`, so callers can retrieve it from the final output: @@ -317,10 +321,8 @@ func main() { aix.WithSessionStore(store), ) - // Start new session (generates new session ID) conn, _ := chatAgent.StreamBidi(ctx) - // First turn conn.Send("Hello! Tell me about Go programming.") for chunk, err := range conn.Receive() { if err != nil { @@ -328,9 +330,7 @@ func main() { } fmt.Print(chunk) } - // Loop exits when agent calls resp.EndTurn() - // Second turn conn.Send("What are channels used for?") for chunk, err := range conn.Receive() { if err != nil { @@ -341,12 +341,10 @@ func main() { conn.Close() - // Get session ID from final output to resume later output, _ := conn.Output() sessionID := output.SessionID - // Resume session later with the saved ID - conn2, _ := chatAgent.StreamBidi(ctx, corex.WithSessionID[ChatState](sessionID)) + conn2, _ := chatAgent.StreamBidi(ctx, aix.WithSessionID[ChatState](sessionID)) conn2.Send("Continue our discussion") // ... } @@ -402,7 +400,7 @@ codeAgent := genkit.DefineAgent(g, "codeAgent", | File | Description | |------|-------------| | `go/ai/x/agent.go` | Agent, AgentFunc, AgentResult, AgentOutput, Artifact, Responder | -| `go/ai/x/agent_options.go` | AgentOption, WithSessionStore | +| `go/ai/x/agent_options.go` | AgentOption, WithSessionStore, StreamBidiOption, WithSessionID, WithInit | | `go/ai/x/agent_test.go` | Tests | ### Modified Files