diff --git a/docs/go-agent-design.md b/docs/go-agent-design.md new file mode 100644 index 0000000000..e1d6e2c0b1 --- /dev/null +++ b/docs/go-agent-design.md @@ -0,0 +1,444 @@ +# Genkit Go Agent - Design Document + +## Overview + +This document describes the design for the `Agent` primitive in Genkit Go. An Agent is a stateful, multi-turn conversational agent with automatic session persistence and turn semantics. + +For the underlying bidirectional streaming primitives (BidiAction, BidiFlow, BidiModel), see [go-bidi-design.md](go-bidi-design.md). + +## Package Location + +Agent is an AI concept and belongs in `go/ai/x/` (experimental): + +``` +go/ai/x/ +├── agent.go # Agent, AgentFunc, AgentOutput, AgentResult +├── agent_options.go # AgentOption types +├── agent_test.go # Tests +``` + +Import as `aix "github.com/firebase/genkit/go/ai/x"`. + +--- + +## 1. Core Type Definitions + +### 1.1 Responder + +`Responder` wraps the output channel for agents, providing methods to send data and signal turn boundaries. + +```go +// Responder wraps the output channel with turn signaling for multi-turn agents. +type Responder[T any] struct { + ch chan<- streamChunk[T] // internal, unexported +} + +// Send sends a streamed chunk to the consumer. +func (r *Responder[T]) Send(data T) + +// EndTurn signals that the agent has finished responding to the current input. +// The consumer's Receive() iterator will exit, allowing them to send the next input. +func (r *Responder[T]) EndTurn() +``` + +### 1.2 Agent Types + +> **Note:** The `AgentOutput`, `AgentResult`, and `Artifact` types are generated from the source of truth Zod schemas in `genkit-tools` shared between runtimes. + +```go +// Artifact represents a named collection of parts produced during a session. +// Examples: generated files, images, code snippets, etc. +type Artifact struct { + Name string `json:"name"` + Parts []*ai.Part `json:"parts"` +} + +// AgentOutput wraps the output with session info for persistence. +type AgentOutput[State, Out any] struct { + SessionID string `json:"sessionId"` + Output Out `json:"output"` + State State `json:"state"` + Artifacts []Artifact `json:"artifacts,omitempty"` +} + +// Agent is a bidi flow with automatic session state management. +// Init = State: the initial state for new sessions (ignored when resuming an existing session). +type Agent[State, In, Out, Stream any] struct { + *corex.BidiFlow[State, In, AgentOutput[State, Out], Stream] + store session.Store[State] +} + +// AgentResult is the return type for agent functions. +type AgentResult[Out any] struct { + Output Out + Artifacts []Artifact +} + +// AgentFunc is the function signature for agents. +type AgentFunc[State, In, Out, Stream any] func( + ctx context.Context, + sess *session.Session[State], + inCh <-chan In, + resp *Responder[Stream], +) (AgentResult[Out], error) +``` + +--- + +## 2. API Surface + +### 2.1 Defining Agents + +```go +// In go/ai/x/agent.go + +// DefineAgent creates an Agent with automatic session management and registers it. +// Use this for multi-turn conversational agents that need to persist state across turns. +func DefineAgent[State, In, Out, Stream any]( + r api.Registry, + name string, + fn AgentFunc[State, In, Out, Stream], + opts ...AgentOption[State], +) *Agent[State, In, Out, Stream] + +// AgentOption configures an Agent. +type AgentOption[State any] interface { + applyAgent(*agentOptions[State]) error +} + +// WithSessionStore sets the session store for persisting session state. +// If not provided, sessions exist only in memory for the connection lifetime. +func WithSessionStore[State any](store session.Store[State]) AgentOption[State] +``` + +### 2.2 Starting Connections + +Agent has its own `StreamBidi` signature with session-specific options: + +```go +// StreamBidiOption configures a StreamBidi call. +type StreamBidiOption[State any] interface { + applyStreamBidi(*streamBidiOptions[State]) error +} + +// WithSessionID sets the session ID for resuming or creating a session. +// If the session exists in store, resumes that session (init ignored). +// If the session doesn't exist, creates new session with this ID. +// If not provided, generates a new UUID for the session. +func WithSessionID[State any](id string) StreamBidiOption[State] + +// WithInit sets the initial state for new sessions. +// Ignored when resuming an existing session. +func WithInit[State any](init State) StreamBidiOption[State] + +// StreamBidi starts a new agent session or resumes an existing one. +func (a *Agent[State, In, Out, Stream]) StreamBidi( + ctx context.Context, + opts ...StreamBidiOption[State], +) (*corex.BidiConnection[In, AgentOutput[State, Out], Stream], error) +``` + +### 2.3 High-Level Genkit API + +```go +// In go/genkit/bidi.go + +func DefineAgent[State, In, Out, Stream any]( + g *Genkit, + name string, + fn aix.AgentFunc[State, In, Out, Stream], + opts ...aix.AgentOption[State], +) *aix.Agent[State, In, Out, Stream] +``` + +--- + +## 3. Session Management + +### 3.1 Using StreamBidi with Agent + +```go +chatAgent := genkit.DefineAgent(g, "chatAgent", + myAgentFunc, + aix.WithSessionStore(store), +) + +// New user: fresh session with generated ID and zero state +conn1, _ := chatAgent.StreamBidi(ctx) + +// Returning user: resume existing session by ID +conn2, _ := chatAgent.StreamBidi(ctx, aix.WithSessionID[ChatState]("user-123-session")) + +// New user with pre-populated state +conn3, _ := chatAgent.StreamBidi(ctx, aix.WithInit(ChatState{Messages: preloadedHistory})) + +// New user with specific ID and initial state +conn4, _ := chatAgent.StreamBidi(ctx, + aix.WithSessionID[ChatState]("custom-session-id"), + aix.WithInit(ChatState{Messages: preloadedHistory}), +) +``` + +The Agent internally handles session creation/loading: +- If `WithSessionID` is provided and session exists in store → load existing session (init ignored) +- If `WithSessionID` is provided but session doesn't exist → create new session with that ID and init +- If `WithSessionID` is not provided → generate new UUID and create session with init + +The session ID is returned in `AgentOutput.SessionID`, so callers can retrieve it from the final output: + +```go +output, _ := conn.Output() +sessionID := output.SessionID // Save this to resume later +``` + +### 3.2 State Persistence + +State is persisted automatically when `sess.UpdateState()` is called - the existing `session.Session` implementation already persists to the configured store. No special persistence mode is needed; the user controls when to persist by calling `UpdateState()`. + +### 3.3 Session Integration + +Use existing `Session` and `Store` types from `go/core/x/session`: + +```go +import "github.com/firebase/genkit/go/core/x/session" + +// Agent holds reference to session store +type Agent[State, In, Out, Stream any] struct { + store session.Store[State] + // ... +} +``` + +--- + +## 4. Turn Signaling + +For multi-turn conversations, the consumer needs to know when the agent has finished responding to one input and is ready for the next. + +### 4.1 How It Works Internally + +1. `BidiConnection.streamCh` is actually `chan streamChunk[Stream]` (internal type) +2. `streamChunk` has `data T` and `endTurn bool` fields (unexported) +3. `resp.Send(data)` sends `streamChunk{data: data}` +4. `resp.EndTurn()` sends `streamChunk{endTurn: true}` +5. `conn.Receive()` unwraps chunks, yielding only the data +6. When `Receive()` sees `endTurn: true`, it exits the iterator without yielding + +### 4.2 From the Agent's Perspective + +```go +for input := range inCh { + resp.Send("partial...") + resp.Send("more...") + resp.EndTurn() // Consumer's for loop exits here +} +``` + +### 4.3 From the Consumer's Perspective + +```go +conn.Send("question") +for chunk, err := range conn.Receive() { + fmt.Print(chunk) // Just gets string, not streamChunk +} +// Loop exited because agent called EndTurn() + +conn.Send("follow-up") +for chunk, err := range conn.Receive() { ... } +``` + +--- + +## 5. Example Usage + +### 5.1 Chat Agent with Session Persistence + +```go +package main + +import ( + "context" + "fmt" + + "github.com/firebase/genkit/go/ai" + aix "github.com/firebase/genkit/go/ai/x" + corex "github.com/firebase/genkit/go/core/x" + "github.com/firebase/genkit/go/core/x/session" + "github.com/firebase/genkit/go/genkit" + "github.com/firebase/genkit/go/plugins/googlegenai" +) + +type ChatState struct { + Messages []*ai.Message `json:"messages"` +} + +func main() { + ctx := context.Background() + store := session.NewInMemoryStore[ChatState]() + + g := genkit.Init(ctx, + genkit.WithPlugins(&googlegenai.GoogleAI{}), + genkit.WithDefaultModel("googleai/gemini-2.5-flash"), + ) + + // Define an agent for multi-turn chat + chatAgent := genkit.DefineAgent(g, "chatAgent", + func(ctx context.Context, sess *session.Session[ChatState], inCh <-chan string, resp *aix.Responder[string]) (aix.AgentResult[string], error) { + state := sess.State() + messages := state.Messages + + for input := range inCh { + messages = append(messages, ai.NewUserTextMessage(input)) + + var respText string + for result, err := range genkit.GenerateStream(ctx, g, + ai.WithMessages(messages...), + ) { + if err != nil { + return aix.AgentResult[string]{}, err + } + if result.Done { + respText = result.Response.Text() + } + resp.Send(result.Chunk.Text()) + } + resp.EndTurn() // Signal turn complete, consumer's Receive() exits + + messages = append(messages, ai.NewModelTextMessage(respText)) + sess.UpdateState(ctx, ChatState{Messages: messages}) + } + + return aix.AgentResult[string]{ + Output: "conversation ended", + Artifacts: []aix.Artifact{ + { + Name: "summary", + Parts: []*ai.Part{ai.NewTextPart("...")}, + }, + }, + }, nil + }, + aix.WithSessionStore(store), + ) + + conn, _ := chatAgent.StreamBidi(ctx) + + conn.Send("Hello! Tell me about Go programming.") + for chunk, err := range conn.Receive() { + if err != nil { + panic(err) + } + fmt.Print(chunk) + } + + conn.Send("What are channels used for?") + for chunk, err := range conn.Receive() { + if err != nil { + panic(err) + } + fmt.Print(chunk) + } + + conn.Close() + + output, _ := conn.Output() + sessionID := output.SessionID + + conn2, _ := chatAgent.StreamBidi(ctx, aix.WithSessionID[ChatState](sessionID)) + conn2.Send("Continue our discussion") + // ... +} +``` + +### 5.2 Agent with Artifacts + +```go +type CodeState struct { + History []*ai.Message `json:"history"` + GeneratedCode []string `json:"generatedCode"` +} + +codeAgent := genkit.DefineAgent(g, "codeAgent", + func(ctx context.Context, sess *session.Session[CodeState], inCh <-chan string, resp *aix.Responder[string]) (aix.AgentResult[string], error) { + state := sess.State() + var artifacts []aix.Artifact + + for input := range inCh { + // Generate response with code... + generatedCode := "func main() { ... }" + + resp.Send("Here's the code you requested:\n") + resp.Send("```go\n" + generatedCode + "\n```") + resp.EndTurn() + + // Track generated code in state + state.GeneratedCode = append(state.GeneratedCode, generatedCode) + sess.UpdateState(ctx, state) + + // Add as artifact + artifacts = append(artifacts, aix.Artifact{ + Name: fmt.Sprintf("code_%d.go", len(artifacts)), + Parts: []*ai.Part{ai.NewTextPart(generatedCode)}, + }) + } + + return aix.AgentResult[string]{ + Output: "code generation complete", + Artifacts: artifacts, + }, nil + }, + aix.WithSessionStore(store), +) +``` + +--- + +## 6. Files to Create/Modify + +### New Files + +| File | Description | +|------|-------------| +| `go/ai/x/agent.go` | Agent, AgentFunc, AgentResult, AgentOutput, Artifact, Responder | +| `go/ai/x/agent_options.go` | AgentOption, WithSessionStore, StreamBidiOption, WithSessionID, WithInit | +| `go/ai/x/agent_test.go` | Tests | + +### Modified Files + +| File | Change | +|------|--------| +| `go/genkit/bidi.go` | Add DefineAgent wrapper | + +--- + +## 7. Implementation Notes + +### Agent Internal Wrapping + +The user's `AgentFunc` returns `AgentResult[Out]`, but `Agent.StreamBidi()` returns `AgentOutput[State, Out]`. Internally, Agent wraps the user function: + +```go +// Simplified internal logic +result, err := userFunc(ctx, wrappedInCh, outCh, sess) +if err != nil { + return AgentOutput[State, Out]{}, err +} +return AgentOutput[State, Out]{ + SessionID: sess.ID(), + Output: result.Output, + State: sess.State(), + Artifacts: result.Artifacts, +}, nil +``` + +### Thread Safety + +- BidiConnection uses mutex for state (closed flag) +- Send is safe to call from multiple goroutines +- Session operations are thread-safe (from existing session package) + +### Tracing + +- Agent inherits tracing from BidiFlow +- Each turn can create nested spans for LLM calls +- Session ID is recorded in trace metadata diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md new file mode 100644 index 0000000000..c1551c4c13 --- /dev/null +++ b/docs/go-bidi-design.md @@ -0,0 +1,588 @@ +# Genkit Go Bidirectional Streaming Features - Design Document + +## Overview + +This document describes the design for bidirectional streaming features in Genkit Go. The implementation introduces three new primitives: + +1. **BidiAction** - Core primitive for bidirectional operations (`go/core/x`) +2. **BidiFlow** - BidiAction with observability, intended for user definition (`go/core/x`) +3. **BidiModel** - Specialized bidi action for real-time LLM APIs (`go/ai/x`) + +For stateful multi-turn agents with session persistence, see [go-agent-design.md](go-agent-design.md). + +## Package Location + +``` +go/core/x/ +├── bidi.go # BidiAction, BidiFunc, BidiConnection +├── bidi_flow.go # BidiFlow +├── bidi_options.go # Options +├── bidi_test.go # Tests + +go/ai/x/ +├── bidi_model.go # BidiModel, BidiModelFunc +├── bidi_model_test.go +``` + +Import as: +- `corex "github.com/firebase/genkit/go/core/x"` +- `aix "github.com/firebase/genkit/go/ai/x"` + +--- + +## 1. Core Type Definitions + +### 1.1 BidiAction + +```go +// BidiAction represents a bidirectional streaming action. +// Type parameters: +// - Init: Type of initialization data (use struct{} if not needed) +// - In: Type of each message sent to the action +// - Out: Type of the final output +// - Stream: Type of each streamed output chunk +type BidiAction[Init, In, Out, Stream any] struct { + name string + fn BidiFunc[Init, In, Out, Stream] + registry api.Registry + desc *api.ActionDesc +} + +// BidiFunc is the function signature for bidi actions. +type BidiFunc[Init, In, Out, Stream any] func( + ctx context.Context, + init Init, + inCh <-chan In, + outCh chan<- Stream, +) (Out, error) +``` + +### 1.2 BidiConnection + +```go +// BidiConnection represents an active bidirectional streaming session. +type BidiConnection[In, Out, Stream any] struct { + inputCh chan In // Internal, accessed via Send() + streamCh chan Stream // Internal output stream channel + doneCh chan struct{} // Closed when action completes + output Out // Final output (valid after done) + err error // Error if any (valid after done) + ctx context.Context + cancel context.CancelFunc + span tracing.Span // Trace span, ended on completion + mu sync.Mutex + closed bool +} + +// Send sends an input message to the bidi action. +func (c *BidiConnection[In, Out, Stream]) Send(input In) error + +// Close signals that no more inputs will be sent. +func (c *BidiConnection[In, Out, Stream]) Close() error + +// Receive returns an iterator for receiving streamed response chunks. +// The iterator completes when the action finishes or signals end of turn. +func (c *BidiConnection[In, Out, Stream]) Receive() iter.Seq2[Stream, error] + +// Output returns the final output after the action completes. +// Blocks until done or context cancelled. +func (c *BidiConnection[In, Out, Stream]) Output() (Out, error) + +// Done returns a channel closed when the connection completes. +func (c *BidiConnection[In, Out, Stream]) Done() <-chan struct{} +``` + +### 1.3 BidiFlow + +```go +type BidiFlow[Init, In, Out, Stream any] struct { + *BidiAction[Init, In, Out, Stream] +} +``` + +--- + +## 2. BidiModel + +### 2.1 Overview + +`BidiModel` is a specialized bidi action for real-time LLM APIs like Gemini Live and OpenAI Realtime. These APIs establish a persistent connection where configuration (temperature, system prompt, tools) must be provided upfront, and then the conversation streams bidirectionally. + +### 2.2 The Role of `init` + +For real-time sessions, the connection to the model API often requires configuration to be established *before* the first user message is received. The `init` payload fulfills this requirement: + +- **`init`**: `ModelRequest` (contains config, tools, system prompt) +- **`inputStream`**: Stream of `ModelRequest` (contains user messages/turns) +- **`stream`**: Stream of `ModelResponseChunk` + +### 2.3 Type Definitions + +```go +// In go/ai/x/bidi_model.go + +// BidiModel represents a bidirectional streaming model for real-time LLM APIs. +type BidiModel struct { + *corex.BidiAction[*ai.ModelRequest, *ai.ModelRequest, *ai.ModelResponse, *ai.ModelResponseChunk] +} + +// BidiModelFunc is the function signature for bidi model implementations. +type BidiModelFunc func( + ctx context.Context, + init *ai.ModelRequest, + inCh <-chan *ai.ModelRequest, + outCh chan<- *ai.ModelResponseChunk, +) (*ai.ModelResponse, error) +``` + +### 2.4 Defining a BidiModel + +```go +// DefineBidiModel creates and registers a BidiModel for real-time LLM interactions. +// The opts parameter follows the same pattern as DefineModel for consistency. +func DefineBidiModel(r api.Registry, name string, opts *ai.ModelOptions, fn BidiModelFunc) *BidiModel +``` + +**Example Plugin Implementation:** + +```go +func (g *GoogleAI) defineBidiModel(r api.Registry) *aix.BidiModel { + return aix.DefineBidiModel(r, "googleai/gemini-2.0-flash-live", + &ai.ModelOptions{ + Label: "Gemini 2.0 Flash Live", + Supports: &ai.ModelSupports{ + Multiturn: true, + Tools: true, + SystemRole: true, + Media: true, + }, + }, + func(ctx context.Context, init *ai.ModelRequest, inCh <-chan *ai.ModelRequest, outCh chan<- *ai.ModelResponseChunk) (*ai.ModelResponse, error) { + session, err := g.client.Live.Connect(ctx, "gemini-2.0-flash-live", &genai.LiveConnectConfig{ + SystemInstruction: toContent(init.Messages), + Tools: toTools(init.Tools), + Temperature: toFloat32Ptr(init.Config), + ResponseModalities: []genai.Modality{genai.ModalityText}, + }) + if err != nil { + return nil, err + } + defer session.Close() + + var totalUsage ai.GenerationUsage + + for request := range inCh { + err := session.SendClientContent(genai.LiveClientContentInput{ + Turns: toContents(request.Messages), + TurnComplete: true, + }) + if err != nil { + return nil, err + } + + for { + msg, err := session.Receive() + if err != nil { + return nil, err + } + + if msg.ToolCall != nil { + outCh <- &ai.ModelResponseChunk{ + Content: toToolCallParts(msg.ToolCall), + } + continue + } + + if msg.ServerContent != nil { + if msg.ServerContent.ModelTurn != nil { + outCh <- &ai.ModelResponseChunk{ + Content: fromParts(msg.ServerContent.ModelTurn.Parts), + } + } + if msg.ServerContent.TurnComplete { + break + } + } + + if msg.UsageMetadata != nil { + totalUsage.InputTokens += int(msg.UsageMetadata.PromptTokenCount) + totalUsage.OutputTokens += int(msg.UsageMetadata.CandidatesTokenCount) + } + } + } + + return &ai.ModelResponse{ + Usage: &totalUsage, + }, nil + }, + ) +} +``` + +### 2.5 Using BidiModel (`GenerateBidi`) + +`GenerateBidi` is the high-level API for interacting with bidi models. It provides a session-like interface for real-time conversations. + +```go +// In go/genkit/generate.go or go/ai/x/generate_bidi.go + +// ModelBidiConnection wraps BidiConnection with model-specific convenience methods. +type ModelBidiConnection struct { + conn *corex.BidiConnection[*ai.ModelRequest, *ai.ModelResponse, *ai.ModelResponseChunk] +} + +// Send sends a user message to the model. +func (s *ModelBidiConnection) Send(messages ...*ai.Message) error { + return s.conn.Send(&ai.ModelRequest{Messages: messages}) +} + +// SendText is a convenience method for sending a text message. +func (s *ModelBidiConnection) SendText(text string) error { + return s.Send(ai.NewUserTextMessage(text)) +} + +// Stream returns an iterator for receiving response chunks. +func (s *ModelBidiConnection) Receive() iter.Seq2[*ai.ModelResponseChunk, error] { + return s.conn.Receive() +} + +// Close signals that the conversation is complete. +func (s *ModelBidiConnection) Close() error { + return s.conn.Close() +} + +// Output returns the final response after the session completes. +func (s *ModelBidiConnection) Output() (*ai.ModelResponse, error) { + return s.conn.Output() +} +``` + +**Usage:** + +`GenerateBidi` uses the same shared option types as regular `Generate` calls. Options like `WithModel`, `WithConfig`, `WithSystem`, and `WithTools` work the same way - they configure the initial session setup. + +```go +// GenerateBidi starts a bidirectional streaming session with a model. +// Uses the existing shared option types from ai/option.go. +func GenerateBidi(ctx context.Context, g *Genkit, opts ...ai.GenerateBidiOption) (*ModelBidiConnection, error) +``` + +**Example:** + +```go +conn, err := genkit.GenerateBidi(ctx, g, + ai.WithModel(geminiLive), + ai.WithConfig(&genai.LiveConnectConfig{Temperature: genai.Ptr[float32](0.7)}), + ai.WithSystem("You are a helpful voice assistant"), + ai.WithTools(weatherTool), +) +if err != nil { + return err +} +defer conn.Close() + +conn.SendText("Hello!") + +for chunk, err := range conn.Receive() { + if err != nil { + return err + } + fmt.Print(chunk.Text()) +} + +conn.SendText("Tell me more about that.") +for chunk, err := range conn.Receive() { + // ... +} + +response, _ := conn.Output() +fmt.Printf("Total tokens: %d\n", response.Usage.TotalTokens) +``` + +### 2.6 Tool Calling in BidiModel + +Real-time models may support tool calling. The pattern follows the standard generate flow but within the streaming context: + +```go +conn, _ := genkit.GenerateBidi(ctx, g, + ai.WithModel(geminiLive), + ai.WithTools(weatherTool, calculatorTool), +) + +conn.SendText("What's the weather in NYC?") + +for chunk, err := range conn.Receive() { + if err != nil { + return err + } + + if toolCall := chunk.ToolCall(); toolCall != nil { + result, _ := toolCall.Tool.Execute(ctx, toolCall.Input) + conn.Send(ai.NewToolResponseMessage(toolCall.ID, result)) + } else { + fmt.Print(chunk.Text()) + } +} +``` + +--- + +## 3. API Surface + +### 3.1 Defining Bidi Actions + +```go +// In go/core/x/bidi.go + +// NewBidiAction creates a BidiAction without registering it. +func NewBidiAction[Init, In, Out, Stream any]( + name string, + fn BidiFunc[Init, In, Out, Stream], +) *BidiAction[Init, In, Out, Stream] + +// DefineBidiAction creates and registers a BidiAction. +func DefineBidiAction[Init, In, Out, Stream any]( + r api.Registry, + name string, + fn BidiFunc[Init, In, Out, Stream], +) *BidiAction[Init, In, Out, Stream] +``` + +Schemas for `In`, `Out`, `Init`, and `Stream` types are automatically inferred from the type parameters using the existing JSON schema inference in `go/internal/base/json.go`. + +### 3.2 Defining Bidi Flows + +```go +// In go/core/x/bidi_flow.go + +// DefineBidiFlow creates a BidiFlow with tracing and registers it. +// Use this for user-defined bidirectional streaming operations. +func DefineBidiFlow[Init, In, Out, Stream any]( + r api.Registry, + name string, + fn BidiFunc[Init, In, Out, Stream], +) *BidiFlow[Init, In, Out, Stream] +``` + +### 3.3 Starting Connections + +All bidi types (BidiAction, BidiFlow, BidiModel) use the same `StreamBidi` method to start connections: + +```go +func (ba *BidiAction[Init, In, Out, Stream]) StreamBidi(ctx context.Context, init Init) (*BidiConnection[In, Out, Stream], error) +``` + +### 3.4 High-Level Genkit API + +```go +// In go/genkit/bidi.go + +func DefineBidiFlow[Init, In, Out, Stream any]( + g *Genkit, + name string, + fn corex.BidiFunc[Init, In, Out, Stream], +) *corex.BidiFlow[Init, In, Out, Stream] + +// GenerateBidi uses shared options from ai/option.go +// Options like WithModel, WithConfig, WithSystem, WithTools configure the session init. +func GenerateBidi( + ctx context.Context, + g *Genkit, + opts ...ai.GenerateBidiOption, +) (*ModelBidiConnection, error) +``` + +--- + +## 4. Integration with Existing Infrastructure + +### 4.1 Tracing Integration + +BidiFlows create spans that remain open for the lifetime of the connection, enabling streaming trace visualization in the Dev UI. + +**Key behaviors:** +- Span starts when `StreamBidi()` is called +- Span ends when the bidi function returns (via `defer` in the connection goroutine) +- Flow context is injected so `core.Run()` works inside the bidi function +- Nested spans for sub-operations (e.g., each LLM call) work normally + +**Important**: The span stays open while the connection is active, allowing: +- Streaming traces to the Dev UI in real-time +- Nested spans for sub-operations (e.g., each LLM call) +- Events recorded as they happen + +### 4.2 Action Registration + +Add new action types and schema fields: + +```go +// In go/core/api/action.go +const ( + ActionTypeBidiFlow ActionType = "bidi-flow" + ActionTypeBidiModel ActionType = "bidi-model" +) + +// ActionDesc gets two new optional fields +type ActionDesc struct { + // ... existing fields ... + StreamSchema map[string]any `json:"streamSchema,omitempty"` // NEW: schema for streamed chunks + InitSchema map[string]any `json:"initSchema,omitempty"` // NEW: schema for initialization data +} +``` + +--- + +## 5. Example Usage + +### 5.1 Basic Echo Bidi Flow + +```go +package main + +import ( + "context" + "fmt" + + "github.com/firebase/genkit/go/genkit" +) + +func main() { + ctx := context.Background() + g := genkit.Init(ctx) + + echoFlow := genkit.DefineBidiFlow(g, "echo", + func(ctx context.Context, init struct{}, inCh <-chan string, outCh chan<- string) (string, error) { + var count int + for input := range inCh { + count++ + outCh <- fmt.Sprintf("echo: %s", input) + } + return fmt.Sprintf("processed %d messages", count), nil + }, + ) + + conn, err := echoFlow.StreamBidi(ctx) + if err != nil { + panic(err) + } + + conn.Send("hello") + conn.Send("world") + conn.Close() + + for chunk, err := range conn.Receive() { + if err != nil { + panic(err) + } + fmt.Println(chunk) + } + + output, _ := conn.Output() + fmt.Println(output) +} +``` + +### 5.2 Bidi Flow with Initialization Data + +```go +type ChatInit struct { + SystemPrompt string `json:"systemPrompt"` + Temperature float64 `json:"temperature"` +} + +configuredChat := genkit.DefineBidiFlow(g, "configuredChat", + func(ctx context.Context, init ChatInit, inCh <-chan string, outCh chan<- string) (string, error) { + for input := range inCh { + resp, _ := genkit.GenerateText(ctx, g, + ai.WithSystem(init.SystemPrompt), + ai.WithConfig(&genai.GenerateContentConfig{Temperature: &init.Temperature}), + ai.WithPrompt(input), + ) + outCh <- resp + } + return "done", nil + }, +) + +conn, _ := configuredChat.StreamBidi(ctx, ChatInit{ + SystemPrompt: "You are a helpful assistant.", + Temperature: 0.7, +}) +``` + +--- + +## 6. Files to Create/Modify + +### New Files + +| File | Description | +|------|-------------| +| `go/core/x/bidi.go` | BidiAction, BidiFunc, BidiConnection | +| `go/core/x/bidi_flow.go` | BidiFlow with tracing | +| `go/core/x/bidi_options.go` | BidiOption types | +| `go/core/x/bidi_test.go` | Tests | +| `go/ai/x/bidi_model.go` | BidiModel, BidiModelFunc, ModelBidiConnection | +| `go/ai/x/bidi_model_test.go` | Tests | +| `go/genkit/bidi.go` | High-level API wrappers | + +### Modified Files + +| File | Change | +|------|--------| +| `go/core/api/action.go` | Add `ActionTypeBidiFlow`, `ActionTypeBidiModel` constants | + +--- + +## 7. Implementation Notes + +### Error Handling +- Errors from the bidi function propagate to both `Responses()` iterator and `Output()` +- Context cancellation closes all channels and terminates the action +- Send after Close returns an error +- Errors are yielded as the second value in the `iter.Seq2[Stream, error]` iterator + +### Goroutine Management +- BidiConnection spawns a goroutine to run the action +- Proper cleanup on context cancellation using `defer` and `sync.Once` +- Channel closure follows Go idioms (sender closes) +- Trace span is ended in the goroutine's defer + +### Thread Safety +- BidiConnection uses mutex for state (closed flag) +- Send is safe to call from multiple goroutines + +### Channels and Backpressure +- Both input and output channels are **unbuffered** by default (size 0) +- This provides natural backpressure: `Send()` blocks until the action reads, output blocks until consumer reads +- If needed, `WithInputBufferSize` / `WithOutputBufferSize` options could be added later for specific use cases + +### Tracing +- Span is started when connection is created, ended when action completes +- Nested spans work normally within the bidi function +- Events can be recorded throughout the connection lifecycle +- Dev UI can show traces in real-time as they stream +- Implementation uses the existing tracer infrastructure (details left to implementation) + +### Shutdown Sequence +When `Close()` is called on a BidiConnection: +1. The input channel is closed, signaling no more inputs +2. The bidi function's `for range inputStream` loop exits +3. The function returns its final output +4. The stream channel is closed +5. The `Done()` channel is closed +6. `Output()` unblocks and returns the result + +On context cancellation: +1. Context error propagates to the bidi function +2. All channels are closed +3. `Output()` returns the context error + +--- + +## 8. Integration with Reflection API + +These features align with **Reflection API V2**, which uses WebSockets to support bidirectional streaming between the Runtime and the CLI/Manager. + +- `runAction` now supports an `input` stream +- `streamChunk` notifications are bidirectional (Manager <-> Runtime)