diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 00000000..0c597397 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,132 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## What Is Eventuous + +Eventuous is a production-grade Event Sourcing library for .NET implementing DDD tactical patterns. It provides abstractions and implementations for aggregates, command services, event stores, subscriptions, producers, and projections. + +## Build & Test Commands + +```bash +# Build the entire solution +dotnet build Eventuous.slnx + +# Run all tests (all target frameworks: net8.0, net9.0, net10.0) +dotnet test Eventuous.slnx + +# Run tests for a specific framework +dotnet test Eventuous.slnx -f net10.0 + +# Run a single test project +dotnet test src/Core/test/Eventuous.Tests/Eventuous.Tests.csproj + +# Run a single test by name filter +dotnet test src/Core/test/Eventuous.Tests/Eventuous.Tests.csproj --filter "FullyQualifiedName~TestClassName" + +# CI configuration (used in pull requests) +dotnet test -c "Debug CI" -f net10.0 +``` + +The solution file is `Eventuous.slnx` (new .slnx format, not .sln). The test runner is Microsoft.Testing.Platform with TUnit as the test framework (not xUnit or NUnit). Test results output as TRX to `test-results/`. + +Integration tests require infrastructure services. Start them with: +```bash +docker compose up -d +``` +Services: EventStoreDB (:2113), PostgreSQL (:5432), MongoDB (:27017), RabbitMQ (:5672), Kafka (:9092), SQL Server (:1433). + +## Architecture + +### Core Domain Model + +**Aggregates** (`src/Core/src/Eventuous.Domain/`): `Aggregate where T : State, new()` — tracks pending `Changes` and `Original` events, enforces business invariants via `Apply()`, uses optimistic concurrency through version tracking. + +**State** (`src/Core/src/Eventuous.Domain/`): `State` is an abstract record reconstructed from events using the `When(object @event)` fold pattern. States are immutable. + +**Id** (`src/Core/src/Eventuous.Domain/`): `Id` is an abstract record for string-based identity values with validation. + +### Command Services (Two Approaches) + +**Aggregate-based** (`src/Core/src/Eventuous.Application/AggregateService/`): `CommandService` — loads aggregate, executes domain logic, persists events. Handlers registered via `On().InState(...).GetId(...).Act(...)`. + +**Functional** (`src/Core/src/Eventuous.Application/FunctionalService/`): `CommandService` — no aggregate instances, pure functions that take state + command and return events. Uses `On().InState(...).GetStream(...).Act(...)`. + +### Event Store Layer + +`IEventStore` (combined), `IEventReader`, `IEventWriter` in `src/Core/src/Eventuous.Persistence/`. Implementations: EventStoreDB (`src/EventStore/`), PostgreSQL (`src/Postgres/`), SQL Server (`src/SqlServer/`). + +`IAggregateStore` is **deprecated** — use `IEventReader.LoadAggregate<>()` and `IEventWriter.StoreAggregate<>()` extension methods instead. + +### Subscriptions & Producers + +**Subscriptions** (`src/Core/src/Eventuous.Subscriptions/`): `IEventHandler` processes events, with consume filters/pipes, checkpoint management, and partitioning support. + +**Producers** (`src/Core/src/Eventuous.Producers/`): `IProducer`/`BaseProducer` for publishing to RabbitMQ, Kafka, Google Pub/Sub, Azure Service Bus. + +**Gateway** (`src/Gateway/`): Connects subscriptions to producers for cross-context event routing. + +### Key Conventions + +- **Stream naming**: Default pattern is `{AggregateType}-{AggregateId}` via `StreamNameMap`. +- **Type mapping**: Events must be registered in `TypeMap` for serialization. +- **Async everywhere**: All I/O is async; use `.NoContext()` for `ConfigureAwait(false)`. +- **Diagnostics**: Built-in OpenTelemetry tracing/metrics in `src/Diagnostics/`. + +## Project Layout + +``` +src/Core/src/ Core packages (Domain, Persistence, Application, Subscriptions, Producers, Serialization, Shared) +src/Core/gen/ Source generators +src/Core/test/ Core test projects +src/EventStore/ EventStoreDB integration (src/ + test/) +src/Postgres/ PostgreSQL integration (src/ + test/) +src/SqlServer/ SQL Server integration (src/ + test/) +src/Mongo/ MongoDB projections +src/RabbitMq/ RabbitMQ integration +src/Kafka/ Kafka integration +src/GooglePubSub/ Google Pub/Sub integration +src/Azure/ Azure Service Bus integration +src/Extensions/ ASP.NET Core, DI extensions +src/Diagnostics/ OpenTelemetry, Logging +src/Gateway/ Event gateway +src/Testing/ Test utilities +test/ Shared test helpers (Eventuous.Sut.App, Eventuous.Sut.Domain, Eventuous.TestHelpers, Eventuous.TestHelpers.TUnit) +samples/ Sample apps (esdb, postgres, kurrentdb, banking) +``` + +## Documentation Site + +The `docs/` directory is a Docusaurus v3 site (https://eventuous.dev). Requires Node >=18.19.0 and pnpm. + +```bash +cd docs + +# Install dependencies +pnpm install + +# Local dev server with hot reload +pnpm start + +# Production build (output to docs/build/) +pnpm build + +# Serve the production build locally +pnpm serve + +# TypeScript validation +pnpm typecheck +``` + +Docs content lives in `docs/docs/` as `.md` and `.mdx` files organized by topic: `domain/`, `persistence/`, `application/`, `subscriptions/`, `read-models/`, `producers/`, `gateway/`, `diagnostics/`, and `infra/` (per-provider: esdb, postgres, mongodb, mssql, kafka, rabbitmq, pubsub, elastic). MDX files can embed React components. Mermaid diagrams are supported in markdown code blocks. Versioned docs are in `versioned_docs/` (current version: 0.15). The build enforces no broken links. + +## Code Style + +- Targets .NET 10/9/8 (`TargetFrameworks: net10.0;net9.0;net8.0`) +- C# preview language features (`LangVersion: preview`) +- Nullable reference types enabled +- Implicit usings enabled +- Follow `.editorconfig` formatting rules +- Root namespace is `Eventuous` for most projects; integration-specific projects use `Eventuous.PostgreSQL`, `Eventuous.EventStore`, etc. +- Centralized package versions in `Directory.Packages.props` +- Versioning via MinVer from Git tags diff --git a/skills/eventuous-azure-servicebus.md b/skills/eventuous-azure-servicebus.md new file mode 100644 index 00000000..1765d3ec --- /dev/null +++ b/skills/eventuous-azure-servicebus.md @@ -0,0 +1,180 @@ +# Eventuous Azure Service Bus Integration + +NuGet package: `Eventuous.Azure.ServiceBus` +Namespace: `Eventuous.Azure.ServiceBus.Producers`, `Eventuous.Azure.ServiceBus.Subscriptions` +Source: `src/Azure/src/Eventuous.Azure.ServiceBus/` + +## Producer + +`ServiceBusProducer` extends `BaseProducer` and implements `IHostedProducer`, `IAsyncDisposable`. + +### Constructor + +```csharp +public ServiceBusProducer( + ServiceBusClient client, // Azure SDK ServiceBusClient + ServiceBusProducerOptions options, + IEventSerializer? serializer = null, + ILogger? log = null +) +``` + +The producer creates a `ServiceBusSender` from the client for the configured queue/topic. + +### ServiceBusProducerOptions + +```csharp +public class ServiceBusProducerOptions { + public required string QueueOrTopicName { get; init; } + public ServiceBusSenderOptions? SenderOptions { get; init; } + public ServiceBusMessageAttributeNames AttributeNames { get; init; } = new(); +} +``` + +### ServiceBusProduceOptions (per-message) + +```csharp +public class ServiceBusProduceOptions { + public string? Subject { get; set; } + public string? To { get; set; } + public string? ReplyTo { get; set; } + public string? SessionId { get; init; } // for session-enabled entities + public string? ReplyToSessionId { get; init; } + public TimeSpan TimeToLive { get; set; } = TimeSpan.MaxValue; +} +``` + +### How it works + +- Single messages sent via `SendMessageAsync`; multiple messages batched automatically via `ServiceBusMessageBatch` +- Event type stored in `ApplicationProperties["MessageType"]`; content type in `ServiceBusMessage.ContentType` +- Metadata and additional headers added as application properties (filtered by Service Bus-compatible types) +- Session ID on produce options enables ordered message processing +- Supports delivery acknowledgement callbacks + +### DI Registration + +```csharp +services.AddSingleton(new ServiceBusClient("your-connection-string")); +services.AddProducer(sp => + new ServiceBusProducer( + sp.GetRequiredService(), + new ServiceBusProducerOptions { QueueOrTopicName = "my-topic" } + ) +); +``` + +## Subscription + +`ServiceBusSubscription` extends `EventSubscription`. + +### Constructor + +```csharp +public ServiceBusSubscription( + ServiceBusClient client, + ServiceBusSubscriptionOptions options, + ConsumePipe consumePipe, + ILoggerFactory? loggerFactory, + IEventSerializer? eventSerializer +) +``` + +### ServiceBusSubscriptionOptions + +```csharp +public record ServiceBusSubscriptionOptions : SubscriptionOptions { + public required IQueueOrTopic QueueOrTopic { get; set; } + public ServiceBusProcessorOptions ProcessorOptions { get; set; } = new(); + public ServiceBusSessionProcessorOptions? SessionProcessorOptions { get; set; } // enables session mode + public ServiceBusMessageAttributeNames AttributeNames { get; init; } = new(); + public Func? ErrorHandler { get; init; } +} +``` + +### Queue or Topic targets + +Three implementations of `IQueueOrTopic`: + +```csharp +// Subscribe to a queue +new Queue("my-queue") + +// Subscribe to a topic (uses SubscriptionId from options as the subscription name) +new Topic("my-topic") + +// Subscribe to a topic with an explicit subscription name +new TopicAndSubscription("my-topic", "my-subscription") +``` + +### Session support + +When `SessionProcessorOptions` is set, the subscription uses `ServiceBusSessionProcessor` instead of `ServiceBusProcessor`, enabling ordered processing per session: + +```csharp +services.AddSubscription( + "SessionSub", + builder => builder + .Configure(o => { + o.QueueOrTopic = new Queue("session-queue"); + o.SessionProcessorOptions = new ServiceBusSessionProcessorOptions(); + }) + .AddEventHandler() +); +``` + +### Message attribute names + +`ServiceBusMessageAttributeNames` controls how metadata maps to Service Bus properties: + +```csharp +public class ServiceBusMessageAttributeNames { + public string MessageType { get; set; } = "MessageType"; + public string StreamName { get; set; } = "StreamName"; + public string CorrelationId { get; set; } = "correlation-id"; + public string CausationId { get; set; } = "causation-id"; + public string ReplyTo { get; set; } = "ReplyTo"; + public string Subject { get; set; } = "Subject"; + public string To { get; set; } = "To"; + public string MessageId { get; set; } = "message-id"; +} +``` + +## Complete Example + +```csharp +// Register the Azure SDK client +services.AddSingleton(new ServiceBusClient("Endpoint=sb://...")); + +// Register producer +services.AddProducer(sp => + new ServiceBusProducer( + sp.GetRequiredService(), + new ServiceBusProducerOptions { QueueOrTopicName = "events-topic" } + ) +); + +// Register subscription from a queue +services.AddSubscription( + "MyQueueSubscription", + builder => builder + .Configure(o => { + o.QueueOrTopic = new Queue("events-queue"); + }) + .AddEventHandler() +); + +// Register subscription from a topic with explicit subscription +services.AddSubscription( + "MyTopicSubscription", + builder => builder + .Configure(o => { + o.QueueOrTopic = new TopicAndSubscription("events-topic", "my-sub"); + }) + .AddEventHandler() +); +``` + +## Tracing + +Built-in OpenTelemetry tracing with `MessagingSystem = "azure-service-bus"`. diff --git a/skills/eventuous-gateway.md b/skills/eventuous-gateway.md new file mode 100644 index 00000000..4568f73f --- /dev/null +++ b/skills/eventuous-gateway.md @@ -0,0 +1,213 @@ +# Eventuous Gateway + +NuGet package: `Eventuous.Gateway` +Namespace: `Eventuous.Gateway` +Source: `src/Gateway/src/Eventuous.Gateway/` + +The Gateway bridges a subscription to a producer, enabling cross-context event routing. It subscribes to events from one source and produces transformed messages to another target. + +## Core Concepts + +### RouteAndTransform delegate + +The central abstraction is a function that transforms incoming events into outgoing messages: + +```csharp +public delegate ValueTask[]> RouteAndTransform( + IMessageConsumeContext message +); +``` + +Return an empty array to skip/filter a message. Return one or more `GatewayMessage` instances to produce them. + +### GatewayMessage + +```csharp +public record GatewayMessage( + StreamName TargetStream, + object Message, + Metadata? Metadata, + TProduceOptions ProduceOptions +); +``` + +### IGatewayTransform interface + +For class-based transforms (registered in DI): + +```csharp +public interface IGatewayTransform { + ValueTask[]> RouteAndTransform(IMessageConsumeContext context); +} +``` + +## Registration + +### AddGateway with inline transform + +```csharp +services.AddGateway( + subscriptionId: "MyGateway", + routeAndTransform: MyTransform.Transform, + configureSubscription: options => { ... }, // optional + configureBuilder: builder => { ... }, // optional + awaitProduce: true // default: true +); +``` + +### AddGateway with DI-resolved RouteAndTransform delegate + +```csharp +// Register the delegate in DI +services.AddSingleton>(MyTransform.Transform); + +// Register gateway without passing the delegate +services.AddGateway( + subscriptionId: "MyGateway" +); +``` + +### AddGateway with IGatewayTransform class + +```csharp +services.AddGateway( + subscriptionId: "MyGateway" +); +``` + +The `TTransform` class is registered as a singleton and must implement `IGatewayTransform`. + +## Transform Function Examples + +### Simple event forwarding with transformation + +```csharp +public static class PaymentsGateway { + static readonly StreamName Stream = new("PaymentsIntegration"); + + public static ValueTask[]> Transform( + IMessageConsumeContext original + ) { + var result = original.Message is PaymentEvents.PaymentRecorded evt + ? new GatewayMessage( + Stream, + new BookingPaymentRecorded( + original.Stream.GetId(), + evt.BookingId, + evt.Amount, + evt.Currency + ), + new Metadata(), + new KurrentDBProduceOptions() + ) + : null; + + return ValueTask.FromResult[]>( + result != null ? [result] : [] + ); + } +} +``` + +### Filtering: return empty array to skip + +```csharp +public static ValueTask[]> Transform(IMessageConsumeContext ctx) { + // Only forward specific event types + if (ctx.Message is not ImportantEvent evt) + return ValueTask.FromResult(Array.Empty>()); + + return ValueTask.FromResult[]>([ + new(new StreamName("target"), evt, null, new MyOptions()) + ]); +} +``` + +### Producing to RabbitMQ via gateway + +```csharp +public static class PaymentsGateway { + static readonly StreamName Stream = new("PaymentsIntegration"); + static readonly RabbitMqProduceOptions ProduceOptions = new(); + + public static ValueTask[]> Transform( + IMessageConsumeContext original + ) { + var result = original.Message is PaymentEvents.PaymentRecorded evt + ? new GatewayMessage( + Stream, + new BookingPaymentRecorded( + original.Stream.GetId(), evt.BookingId, evt.Amount, evt.Currency + ), + new Metadata(), + ProduceOptions + ) + : null; + + return ValueTask.FromResult[]>( + result != null ? [result] : [] + ); + } +} +``` + +## Complete Example (EventStoreDB to EventStoreDB) + +```csharp +// In the Payments bounded context +services.AddKurrentDBClient(connectionString); +services.AddEventStore(); +services.AddCheckpointStore(); +services.AddProducer(); + +services.AddGateway< + AllStreamSubscription, + AllStreamSubscriptionOptions, + KurrentDBProducer, + KurrentDBProduceOptions +>( + "IntegrationSubscription", + PaymentsGateway.Transform +); +``` + +## Complete Example (Postgres to RabbitMQ) + +```csharp +services.AddEventuousPostgres(configuration.GetSection("Postgres")); +services.AddEventStore(); +services.AddCheckpointStore(); +services.AddProducer(); + +services.AddGateway< + PostgresAllStreamSubscription, + PostgresAllStreamSubscriptionOptions, + RabbitMqProducer, + RabbitMqProduceOptions +>( + "IntegrationSubscription", + PaymentsGateway.Transform +); +``` + +## Metadata Propagation + +The gateway automatically: +- Sets causation ID from the original message's ID on produced messages +- Passes original context metadata (message, stream, positions) as additional headers on produced messages + +Access original context in a custom producer via `ProducedMessageExtensions`: +```csharp +message.GetOriginalStream() +message.GetOriginalMessage() +message.GetOriginalMetadata() +message.GetOriginalStreamPosition() +message.GetOriginalGlobalPosition() +message.GetOriginalMessageId() +message.GetOriginalMessageType() +``` + +## awaitProduce Option + +- `true` (default): The handler awaits produce completion and returns `EventHandlingStatus.Success`. Checkpoint advances only after successful produce. +- `false`: Returns `EventHandlingStatus.Pending` immediately. Uses async acknowledgement via `AsyncConsumeContext`. Useful for high-throughput scenarios but requires the subscription to support async acks. diff --git a/skills/eventuous-google-pubsub.md b/skills/eventuous-google-pubsub.md new file mode 100644 index 00000000..b2c79153 --- /dev/null +++ b/skills/eventuous-google-pubsub.md @@ -0,0 +1,171 @@ +# Eventuous Google Pub/Sub Integration + +NuGet package: `Eventuous.GooglePubSub` +Namespace: `Eventuous.GooglePubSub.Producers`, `Eventuous.GooglePubSub.Subscriptions` +Source: `src/GooglePubSub/src/Eventuous.GooglePubSub/` + +## Producer + +`GooglePubSubProducer` extends `BaseProducer` and implements `IHostedProducer`. + +### Constructor overloads + +```csharp +// Simple: project ID only +new GooglePubSubProducer(projectId: "my-gcp-project"); + +// Full options +new GooglePubSubProducer(new PubSubProducerOptions { + ProjectId = "my-gcp-project", + ConfigureClientBuilder = builder => { + builder.EmulatorDetection = EmulatorDetection.EmulatorOrProduction; + }, + CreateTopic = true, // default: true, auto-creates topic + Attributes = new PubSubAttributes { + EventType = "eventType", // defaults + ContentType = "contentType", + MessageId = "messageId" + } +}); + +// IOptions overload for DI +new GooglePubSubProducer(IOptions options); +``` + +### PubSubProducerOptions + +```csharp +public class PubSubProducerOptions { + public string ProjectId { get; init; } = null!; + public Action? ConfigureClientBuilder { get; init; } + public PubSubAttributes Attributes { get; init; } = new(); + public bool CreateTopic { get; set; } = true; // set false if pre-created +} +``` + +### PubSubProduceOptions (per-message) + +```csharp +public class PubSubProduceOptions { + public Func>? AddAttributes { get; init; } + public string? OrderingKey { get; init; } // requires client configured for ordering +} +``` + +### How it works + +- Caches `PublisherClient` instances per topic via `ClientCache` +- Auto-creates topics if `CreateTopic = true` (requires Pub/Sub admin permissions) +- Event type, content type, and message ID stored as Pub/Sub message attributes +- Metadata entries added as additional message attributes +- `StopAsync` shuts down all cached publisher clients + +### DI Registration + +```csharp +services.AddProducer(sp => + new GooglePubSubProducer(new PubSubProducerOptions { + ProjectId = "my-gcp-project" + }) +); + +// Or with IOptions pattern +services.Configure(config.GetSection("PubSub")); +services.AddProducer(); +``` + +## Subscription + +`GooglePubSubSubscription` extends `EventSubscription`. + +### Constructor overloads + +```csharp +// Simple constructor +new GooglePubSubSubscription( + projectId: "my-gcp-project", + topicId: "my-topic", + subscriptionId: "my-subscription", + consumePipe: pipe, + loggerFactory: loggerFactory, + eventSerializer: serializer, + configureClient: builder => { ... } +); + +// Options-based constructor +new GooglePubSubSubscription(options, consumePipe, loggerFactory, eventSerializer); +``` + +### PubSubSubscriptionOptions + +```csharp +public record PubSubSubscriptionOptions : SubscriptionOptions { + public string ProjectId { get; set; } = null!; + public string TopicId { get; set; } = null!; + public bool CreateSubscription { get; set; } = true; // auto-create + public Action? ConfigureClientBuilder { get; set; } + public HandleEventProcessingFailure? FailureHandler { get; set; } // default: NACK + public Action? ConfigureSubscription { get; set; } + public PubSubAttributes Attributes { get; set; } = new(); +} +``` + +### Failure handling + +```csharp +// Custom failure handler delegate +public delegate ValueTask HandleEventProcessingFailure( + SubscriberClient client, + PubsubMessage pubsubMessage, + Exception exception +); +``` + +Default behavior is `Reply.Nack`. You can override to implement dead-letter logic or `Reply.Ack` to skip poison messages. + +### DI Registration + +```csharp +services.AddSubscription( + "MyPubSubSubscription", + builder => builder + .Configure(o => { + o.ProjectId = "my-gcp-project"; + o.TopicId = "my-topic"; + }) + .AddEventHandler() +); +``` + +## Emulator Support + +Configure via `EmulatorDetection` on the client builder: + +```csharp +services.AddProducer(sp => + new GooglePubSubProducer(new PubSubProducerOptions { + ProjectId = "my-project", + ConfigureClientBuilder = builder => { + builder.EmulatorDetection = EmulatorDetection.EmulatorOrProduction; + } + }) +); +``` + +Set the `PUBSUB_EMULATOR_HOST` environment variable to point to the emulator. + +## Message Attributes + +`PubSubAttributes` controls the Pub/Sub message attribute names for system values: + +```csharp +public class PubSubAttributes { + public string EventType { get; set; } = "eventType"; + public string ContentType { get; set; } = "contentType"; + public string MessageId { get; set; } = "messageId"; +} +``` + +## Tracing + +Built-in OpenTelemetry tracing with `MessagingSystem = "google-pubsub"`. diff --git a/skills/eventuous-kafka.md b/skills/eventuous-kafka.md new file mode 100644 index 00000000..e7ad457c --- /dev/null +++ b/skills/eventuous-kafka.md @@ -0,0 +1,85 @@ +# Eventuous Kafka Integration + +NuGet package: `Eventuous.Kafka` +Namespace: `Eventuous.Kafka.Producers`, `Eventuous.Kafka.Subscriptions` +Source: `src/Kafka/src/Eventuous.Kafka/` + +Uses Confluent.Kafka under the hood. Produces byte[] payloads with type info in Kafka headers (no schema registry). + +## Producer + +`KafkaBasicProducer` extends `BaseProducer` and implements `IHostedProducer`, `IAsyncDisposable`. + +```csharp +// Constructor takes KafkaProducerOptions (wraps Confluent ProducerConfig) +public record KafkaProducerOptions(ProducerConfig ProducerConfig); + +// Per-message produce options (partition key) +public record KafkaProduceOptions(string PartitionKey); +``` + +### Instantiation + +```csharp +var options = new KafkaProducerOptions(new ProducerConfig { + BootstrapServers = "localhost:9092" +}); +await using var producer = new KafkaBasicProducer(options); +await producer.StartAsync(cancellationToken); + +// Produce with partition key +await producer.Produce( + new StreamName("my-topic"), + events, + new Metadata(), + new KafkaProduceOptions("my-partition-key"), + cancellationToken: ct +); +``` + +### DI Registration + +```csharp +services.AddProducer(sp => + new KafkaBasicProducer( + new KafkaProducerOptions(new ProducerConfig { + BootstrapServers = "localhost:9092" + }) + ) +); +``` + +### How it works + +- Serializes events using `IEventSerializer`, sends as `byte[]` values +- Stores event type in `message-type` header and content type in `content-type` header (configurable via `KafkaHeaderKeys`) +- When `PartitionKey` is provided, uses a keyed producer (`IProducer`); otherwise uses `IProducer` +- Metadata entries are converted to Kafka headers via `MetadataExtensions.AsKafkaHeaders()` +- Supports delivery acknowledgement callbacks (`OnAck`/`OnFail`) +- `StopAsync` flushes pending messages before stopping + +### Header Keys + +```csharp +public static class KafkaHeaderKeys { + public static string MessageTypeHeader { get; set; } = "message-type"; + public static string ContentTypeHeader { get; set; } = "content-type"; +} +``` + +## Subscription + +`KafkaBasicSubscription` extends `EventSubscription`. Note: the subscription is currently a stub (throws `NotImplementedException`). + +```csharp +public record KafkaSubscriptionOptions : SubscriptionOptions { + public ConsumerConfig ConsumerConfig { get; init; } = null!; +} +``` + +## Tracing + +Built-in OpenTelemetry tracing with: +- `MessagingSystem = "kafka"` +- `DestinationKind = "topic"` +- `ProduceOperation = "produce"` diff --git a/skills/eventuous-kurrentdb.md b/skills/eventuous-kurrentdb.md new file mode 100644 index 00000000..7a579afe --- /dev/null +++ b/skills/eventuous-kurrentdb.md @@ -0,0 +1,269 @@ +# Eventuous KurrentDB (EventStoreDB) Integration + +Infrastructure-specific guidance for using Eventuous with KurrentDB (EventStoreDB) as the event store, subscription source, and producer target. + +## NuGet Packages + +```xml + + +``` + +The `Eventuous.KurrentDB` package provides: `KurrentDBEventStore`, `AllStreamSubscription`, `StreamSubscription`, `StreamPersistentSubscription`, `AllPersistentSubscription`, and `KurrentDBProducer`. It depends on `KurrentDB.Client`. + +## Namespaces + +```csharp +using Eventuous.KurrentDB; // KurrentDBEventStore +using Eventuous.KurrentDB.Subscriptions; // AllStreamSubscription, StreamSubscription, StreamPersistentSubscription, AllPersistentSubscription, options +using Eventuous.KurrentDB.Producers; // KurrentDBProducer, KurrentDBProduceOptions +``` + +## Client Registration + +Register the KurrentDB gRPC client using `AddKurrentDBClient` (provided by the `KurrentDB.Client` package): + +```csharp +services.AddKurrentDBClient("esdb://localhost:2113?tls=false"); +``` + +Or from configuration: + +```csharp +services.AddKurrentDBClient(configuration["EventStore:ConnectionString"]!); +``` + +This registers `KurrentDBClient` as a singleton in the DI container. All Eventuous KurrentDB types (`KurrentDBEventStore`, subscriptions, producer) resolve this client automatically. + +## Event Store Registration + +Register `KurrentDBEventStore` as `IEventStore`, `IEventReader`, and `IEventWriter`: + +```csharp +services.AddKurrentDBClient("esdb://localhost:2113?tls=false"); +services.AddEventStore(); +``` + +`KurrentDBEventStore` implements `IEventStore` (which combines `IEventReader` and `IEventWriter`). `AddEventStore()` registers all three interfaces, with tracing wrappers when diagnostics are enabled. + +The legacy class `EsdbEventStore` is obsolete -- use `KurrentDBEventStore` instead. + +## Subscriptions + +### AllStreamSubscription (Catch-Up, $all Stream) + +Subscribes to the `$all` global stream. Requires a checkpoint store. Use this for cross-aggregate projections and integrations. + +```csharp +services.AddSubscription( + "BookingsProjections", + builder => builder + .UseCheckpointStore() + .AddEventHandler() + .AddEventHandler() + .WithPartitioningByStream(2) +); +``` + +`AllStreamSubscriptionOptions` properties: + +| Property | Type | Default | Description | +|---|---|---|---| +| `EventFilter` | IEventFilter? | null | Server-side event filter (defaults to excluding system events) | +| `CheckpointInterval` | uint | 10 | How often to persist checkpoint when filtering skips events | +| `ConcurrencyLimit` | int | 1 | Number of concurrent message consumers | +| `ResolveLinkTos` | bool | false | Resolve link events to their targets | +| `Credentials` | UserCredentials? | null | Optional user credentials | + +Inherited from `SubscriptionWithCheckpointOptions`: + +| Property | Type | Default | Description | +|---|---|---|---| +| `CheckpointCommitBatchSize` | int | 100 | Commit checkpoint after this many events | +| `CheckpointCommitDelayMs` | int | 5000 | Commit checkpoint after this delay (ms) | +| `StartFrom` | InitialPosition | Earliest | Where to start if no checkpoint exists (`Earliest` or `Latest`) | + +### StreamSubscription (Catch-Up, Specific Stream) + +Subscribes to a single named stream. Requires a checkpoint store. + +```csharp +services.AddSubscription( + "MyStreamSub", + builder => builder + .Configure(o => o.StreamName = "MyStream-123") + .UseCheckpointStore() + .AddEventHandler() +); +``` + +`StreamSubscriptionOptions` adds: + +| Property | Type | Default | Description | +|---|---|---|---| +| `StreamName` | StreamName | (required) | The stream to subscribe to | +| `IgnoreSystemEvents` | bool | true | Skip events whose type starts with `$` | + +Plus all properties from `AllStreamSubscriptionOptions` above (except `EventFilter` and `CheckpointInterval`). + +### StreamPersistentSubscription (Persistent, Specific Stream) + +Server-managed persistent subscription for a specific stream. KurrentDB manages the checkpoint -- no checkpoint store needed. The subscription is auto-created if it does not exist. + +```csharp +services.AddSubscription( + "PaymentIntegration", + builder => builder + .Configure(x => x.StreamName = "PaymentEvents") + .AddEventHandler() +); +``` + +`StreamPersistentSubscriptionOptions` properties: + +| Property | Type | Default | Description | +|---|---|---|---| +| `StreamName` | StreamName | (required) | The stream to subscribe to | +| `SubscriptionSettings` | PersistentSubscriptionSettings? | null | Native KurrentDB persistent subscription settings | +| `BufferSize` | int | 10 | Subscription buffer size | +| `Deadline` | TimeSpan? | null | gRPC call deadline | +| `FailureHandler` | HandleEventProcessingFailure? | null | Custom failure handling (default: retry then NACK) | +| `ResolveLinkTos` | bool | false | Resolve link events | +| `Credentials` | UserCredentials? | null | Optional user credentials | + +### AllPersistentSubscription (Persistent, $all Stream) + +Server-managed persistent subscription for the `$all` stream. + +```csharp +services.AddSubscription( + "AllPersistent", + builder => builder + .Configure(x => x.EventFilter = EventTypeFilter.ExcludeSystemEvents()) + .AddEventHandler() +); +``` + +`AllPersistentSubscriptionOptions` adds: + +| Property | Type | Default | Description | +|---|---|---|---| +| `EventFilter` | IEventFilter? | null | Server-side event filter (set at subscription creation time, not updated afterward) | + +Plus all properties from `PersistentSubscriptionOptions` above. + +## Producer + +`KurrentDBProducer` appends events to KurrentDB streams. Register it with `AddProducer`: + +```csharp +services.AddProducer(); +``` + +Use the producer to publish events: + +```csharp +await producer.Produce("target-stream", message, cancellationToken: ct); +``` + +`KurrentDBProduceOptions` controls produce behavior: + +| Property | Type | Default | Description | +|---|---|---|---| +| `ExpectedState` | StreamState | Any | Expected stream state for optimistic concurrency | +| `MaxAppendEventsCount` | int | 500 | Max events per batch append | +| `Deadline` | TimeSpan? | null | Timeout for the produce operation | +| `Credentials` | UserCredentials? | null | Optional user credentials | + +## Checkpoint Stores + +KurrentDB itself does not provide a checkpoint store. For catch-up subscriptions (`AllStreamSubscription`, `StreamSubscription`), you need an external checkpoint store. Common choices: + +- **MongoDB**: `MongoCheckpointStore` from `Eventuous.Projections.MongoDB` +- **PostgreSQL**: `PostgresCheckpointStore` from `Eventuous.Postgresql` +- **SQL Server**: `SqlServerCheckpointStore` from `Eventuous.SqlServer` +- **Redis**: `RedisCheckpointStore` from `Eventuous.Redis` + +Register globally or per-subscription: + +```csharp +// Global default +services.AddCheckpointStore(); + +// Per-subscription override +services.AddSubscription( + "MySub", + builder => builder + .UseCheckpointStore() + .AddEventHandler() +); +``` + +Persistent subscriptions (`StreamPersistentSubscription`, `AllPersistentSubscription`) do not need a checkpoint store -- KurrentDB manages the position server-side. + +## Gateway (Subscription-to-Producer Bridge) + +Use `AddGateway` to route events from a subscription through a producer to another stream or system: + +```csharp +services.AddCheckpointStore(); +services.AddProducer(); + +services.AddGateway( + "IntegrationSubscription", + PaymentsGateway.Transform +); +``` + +## Complete Registration Example + +```csharp +using Eventuous.KurrentDB; +using Eventuous.KurrentDB.Subscriptions; +using Eventuous.KurrentDB.Producers; +using Eventuous.Projections.MongoDB; + +public static class EventuousRegistrations { + public static void AddEventuous(this IServiceCollection services, IConfiguration configuration) { + // 1. KurrentDB client + services.AddKurrentDBClient(configuration["EventStore:ConnectionString"]!); + + // 2. Event store (IEventStore, IEventReader, IEventWriter) + services.AddEventStore(); + + // 3. Command service + services.AddCommandService(); + + // 4. All-stream catch-up subscription with MongoDB checkpoint store + services.AddSubscription( + "BookingsProjections", + builder => builder + .UseCheckpointStore() + .AddEventHandler() + .AddEventHandler() + .WithPartitioningByStream(2) + ); + + // 5. Persistent subscription (no checkpoint store needed) + services.AddSubscription( + "PaymentIntegration", + builder => builder + .Configure(x => x.StreamName = PaymentsIntegrationHandler.Stream) + .AddEventHandler() + ); + + // 6. Producer for publishing to KurrentDB streams + services.AddProducer(); + } +} +``` + +## Source Code Locations + +- Event store: `src/KurrentDB/src/Eventuous.KurrentDB/KurrentDBEventStore.cs` +- Subscriptions: `src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/` +- Subscription options: `src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/Options/` +- Producer: `src/KurrentDB/src/Eventuous.KurrentDB/Producers/KurrentDBProducer.cs` +- Produce options: `src/KurrentDB/src/Eventuous.KurrentDB/Producers/KurrentDBProduceOptions.cs` +- Store registration extensions: `src/Extensions/src/Eventuous.Extensions.DependencyInjection/Registrations/Stores.cs` +- Sample app: `samples/kurrentdb/Bookings/Registrations.cs` diff --git a/skills/eventuous-mongodb.md b/skills/eventuous-mongodb.md new file mode 100644 index 00000000..24ca69b0 --- /dev/null +++ b/skills/eventuous-mongodb.md @@ -0,0 +1,357 @@ +# Eventuous MongoDB Projections + +## NuGet Package + +``` +Eventuous.Projections.MongoDB +``` + +Depends on `MongoDB.Driver` and `Eventuous.Subscriptions`. + +## MongoDB Client Setup + +Register `IMongoDatabase` as a singleton. The projectors and checkpoint store both depend on it. + +```csharp +var settings = MongoClientSettings.FromConnectionString("mongodb://localhost:27017"); +var database = new MongoClient(settings).GetDatabase("mydb"); + +services.AddSingleton(database); +``` + +## Document Base Types + +All projected documents must inherit from `ProjectedDocument` (namespace `Eventuous.Projections.MongoDB.Tools`): + +```csharp +public abstract record Document(string Id); + +public abstract record ProjectedDocument(string Id) : Document(Id) { + public ulong StreamPosition { get; init; } + public ulong Position { get; init; } +} +``` + +`StreamPosition` and `Position` are set automatically by the projector framework on every update/insert. + +Define your document as a record inheriting `ProjectedDocument`: + +```csharp +public record BookingDocument(string Id) : ProjectedDocument(Id) { + public string GuestId { get; init; } = null!; + public string RoomId { get; init; } = null!; + public float BookingPrice { get; init; } + public float Outstanding { get; init; } + public bool Paid { get; init; } +} +``` + +## Collection Naming Convention + +`MongoCollectionName.For()` derives the collection name by stripping these suffixes from the type name: `"Document"`, `"Entity"`, `"View"`, `"Projection"`, `"ProjectionDocument"`, `"ProjectionEntity"`. The result is used as the MongoDB collection name. + +- `BookingDocument` -> collection `"Booking"` +- `MyBookings` -> collection `"MyBookings"` + +Override by passing `MongoProjectionOptions` to the base constructor. + +## MongoProjector Base Class + +Inherit from `MongoProjector` and register event handlers in the constructor: + +```csharp +public class BookingStateProjection : MongoProjector { + public BookingStateProjection(IMongoDatabase database) : base(database) { + // Register handlers here using On(...) methods + } +} +``` + +Constructor overloads: +- `MongoProjector(IMongoDatabase database, MongoProjectionOptions? options = null, ITypeMapper? typeMap = null)` + +The projector exposes `protected IMongoCollection Collection` for direct access if needed. + +## Handler Registration Patterns + +### 1. Operation Builder (preferred) + +The fluent `On(b => b.{Operation}.{IdOrFilter}.{Update/Document})` pattern: + +```csharp +On(b => b + .UpdateOne + .DefaultId() // ID from stream name + .Update((evt, update) => // evt is the event message + update.Set(x => x.RoomId, evt.RoomId) + ) +); +``` + +### 2. Shorthand: ID from stream + update delegate + +```csharp +On( + stream => stream.GetId(), // GetDocumentIdFromStream + (ctx, update) => // BuildUpdate + update.Set(x => x.RoomId, ctx.Message.RoomId) +); +``` + +### 3. Shorthand: ID from event + update delegate + +```csharp +On( + evt => evt.GuestId, // GetDocumentIdFromEvent + (ctx, update) => + update.Set(x => x.GuestId, ctx.Message.GuestId) +); +``` + +### 4. Shorthand: Custom filter + update delegate + +```csharp +On( + (ctx, filter) => // BuildFilter + filter.Eq(x => x.GuestId, ctx.Message.GuestId), + (ctx, update) => + update.Set(x => x.Cancelled, true) +); +``` + +### 5. Async variants + +Use `OnAsync(...)` for async update builders (same overload shapes as sync but returning `ValueTask>`). + +### 6. Raw handler + +```csharp +On(handler); +// where handler is: ProjectTypedEvent +// signature: MessageConsumeContext -> ValueTask> +``` + +## Operation Builders in Detail + +Access via the fluent builder lambda `b => b.{Operation}`: + +| Builder | MongoDB Operation | +|---------------|---------------------| +| `b.UpdateOne` | `UpdateOneAsync` | +| `b.UpdateMany` | `UpdateManyAsync` | +| `b.InsertOne` | `InsertOneAsync` | +| `b.InsertMany` | `InsertManyAsync` | +| `b.DeleteOne` | `DeleteOneAsync` | +| `b.DeleteMany` | `DeleteManyAsync` | +| `b.Bulk` | `BulkWriteAsync` | + +### UpdateOne / UpdateMany + +```csharp +On(b => b + .UpdateOne + .DefaultId() // ID = stream.GetId() + .Update((evt, update) => update.Set(...)) // from event message +); + +On(b => b + .UpdateOne + .Id(ctx => ctx.Message.SomeId) // custom ID from context + .UpdateFromContext((ctx, update) => update.Set(...)) // from full context +); + +On(b => b + .UpdateOne + .IdFromStream(stream => stream.GetId()) // ID from StreamName + .Update((evt, update) => update.Set(...)) +); + +On(b => b + .UpdateOne + .Filter((ctx, filter) => filter.Eq(...)) // custom filter + .UpdateFromContext((ctx, update) => update.Set(...)) +); +``` + +Update defaults: `IsUpsert = true`. Override with `.Configure(opts => opts.IsUpsert = false)`. + +### InsertOne + +```csharp +On(b => b + .InsertOne + .Document((stream, evt) => new MyDocument(stream.GetId()) { + Name = evt.Name + }) +); + +// Or from full context: +On(b => b + .InsertOne + .Document(ctx => new MyDocument(ctx.Stream.GetId()) { + Name = ctx.Message.Name + }) +); +``` + +### DeleteOne + +```csharp +On(b => b + .DeleteOne + .DefaultId() // ID = stream.GetId() +); + +On(b => b + .DeleteOne + .Id(ctx => ctx.Message.ItemId) // custom ID +); + +On(b => b + .DeleteOne + .Filter((ctx, filter) => filter.Eq(x => x.SomeField, ctx.Message.Value)) +); +``` + +### Bulk + +Combine multiple operations in a single `BulkWriteAsync`: + +```csharp +On(b => b + .Bulk + .AddOperation(ops => ops + .UpdateOne + .Id(ctx => ctx.Message.Id1) + .Update((evt, update) => update.Set(x => x.Field1, evt.Value1)) + ) + .AddOperation(ops => ops + .DeleteOne + .Id(ctx => ctx.Message.Id2) + ) +); +``` + +## MongoCheckpointStore + +Stores subscription checkpoint positions in a MongoDB collection (default: `"checkpoint"`). + +### Registration + +```csharp +services.AddCheckpointStore(); +``` + +Or as part of a subscription builder: + +```csharp +services.AddSubscription( + "MyProjections", + builder => builder + .UseCheckpointStore() + .AddEventHandler() +); +``` + +### Options + +Configure via `MongoCheckpointStoreOptions`: + +```csharp +services.Configure(opts => { + opts.CollectionName = "checkpoint"; // default: "checkpoint" + opts.BatchSize = 10; // default: 1 + opts.BatchIntervalSec = 5; // default: 5 +}); +``` + +Batching reduces write frequency. When both `BatchSize` and `BatchIntervalSec` are set, checkpoints are flushed when either threshold is reached. + +## Querying Projected Documents + +`IMongoDatabase` extension methods (from `Eventuous.Projections.MongoDB.Tools`) for reading documents: + +```csharp +var doc = await database.LoadDocument(id, cancellationToken); +var exists = await database.DocumentExists(id, cancellationToken); +var docs = await database.LoadDocuments(ids, cancellationToken); +var queryable = database.AsQueryable(); +``` + +## Complete Example + +```csharp +// --- Document --- +public record OrderDocument(string Id) : ProjectedDocument(Id) { + public string CustomerId { get; init; } = null!; + public decimal Total { get; init; } + public string Status { get; init; } = null!; + public List Lines { get; init; } = []; + + public record OrderLine(string ProductId, int Quantity, decimal Price); +} + +// --- Projector --- +public class OrderProjection : MongoProjector { + public OrderProjection(IMongoDatabase database) : base(database) { + + // Insert a new document when an order is created + On(b => b + .InsertOne + .Document((stream, evt) => new OrderDocument(stream.GetId()) { + CustomerId = evt.CustomerId, + Total = 0, + Status = "Created" + }) + ); + + // Update using DefaultId (from stream name) + On(b => b + .UpdateOne + .DefaultId() + .UpdateFromContext((ctx, update) => { + var evt = ctx.Message; + return update + .Push(x => x.Lines, new OrderDocument.OrderLine(evt.ProductId, evt.Quantity, evt.Price)) + .Inc(x => x.Total, evt.Quantity * evt.Price); + }) + ); + + // Simple update with shorthand + On(b => b + .UpdateOne + .DefaultId() + .Update((_, update) => update.Set(x => x.Status, "Confirmed")) + ); + + // Delete a document + On(b => b + .DeleteOne + .DefaultId() + ); + } +} + +// --- Registration --- +services.AddSingleton(database); + +services.AddSubscription( + "OrderProjections", + builder => builder + .UseCheckpointStore() + .AddEventHandler() + .WithPartitioningByStream(2) +); +``` + +## Key Source Files + +- Projector base class: `src/Mongo/src/Eventuous.Projections.MongoDB/MongoProjector.cs` +- Operation builder: `src/Mongo/src/Eventuous.Projections.MongoDB/MongoOperationBuilder.cs` +- Update/Insert/Delete builders: `src/Mongo/src/Eventuous.Projections.MongoDB/Operations/` +- Checkpoint store: `src/Mongo/src/Eventuous.Projections.MongoDB/MongoCheckpointStore.cs` +- Document base types: `src/Mongo/src/Eventuous.Projections.MongoDB/Tools/Document.cs` +- Collection extensions: `src/Mongo/src/Eventuous.Projections.MongoDB/Tools/MongoCollectionExtensions.cs` +- Database extensions: `src/Mongo/src/Eventuous.Projections.MongoDB/Tools/MongoDatabaseExtensions.cs` +- Delegate types: `src/Mongo/src/Eventuous.Projections.MongoDB/Functional.cs` +- Sample projections: `samples/kurrentdb/Bookings/Application/Queries/` diff --git a/skills/eventuous-postgres.md b/skills/eventuous-postgres.md new file mode 100644 index 00000000..cfa52cfe --- /dev/null +++ b/skills/eventuous-postgres.md @@ -0,0 +1,250 @@ +# Eventuous PostgreSQL Integration + +Infrastructure-specific guidance for using Eventuous with PostgreSQL as the event store, subscription source, checkpoint store, and projection target. + +## NuGet Packages + +```xml + + +``` + +The `Eventuous.Postgresql` package provides: `PostgresStore`, `PostgresAllStreamSubscription`, `PostgresStreamSubscription`, `PostgresCheckpointStore`, and `PostgresProjector`. + +## Namespaces + +```csharp +using Eventuous.Postgresql; // PostgresStore, PostgresStoreOptions, Schema, SchemaInitializer +using Eventuous.Postgresql.Subscriptions; // Subscriptions, checkpoint store, options +using Eventuous.Postgresql.Projections; // PostgresProjector +``` + +## Database Setup + +Register PostgreSQL infrastructure with `AddEventuousPostgres`. This configures `NpgsqlDataSource`, `NpgsqlConnection`, `PostgresStoreOptions`, `PostgresStore` (singleton), and a `SchemaInitializer` hosted service. + +**Option 1: Connection string directly** + +```csharp +services.AddEventuousPostgres( + connectionString: "Host=localhost;Username=postgres;Password=secret;Database=eventuous;", + schema: "eventuous", // default: "eventuous" + initializeDatabase: true, // default: false; creates schema tables on startup + configureBuilder: null, // Action? + connectionLifetime: ServiceLifetime.Transient, // default + dataSourceLifetime: ServiceLifetime.Singleton // default +); +``` + +**Option 2: From IConfiguration section** + +```csharp +services.AddEventuousPostgres(configuration.GetSection("Postgres")); +``` + +The configuration section binds to `PostgresStoreOptions`: + +```json +{ + "Postgres": { + "ConnectionString": "Host=localhost;Username=postgres;Password=secret;Database=eventuous;", + "Schema": "eventuous", + "InitializeDatabase": true + } +} +``` + +## Event Store Registration + +After `AddEventuousPostgres`, register `PostgresStore` as the `IEventStore`, `IEventReader`, and `IEventWriter`: + +```csharp +services.AddEventuousPostgres(connectionString, initializeDatabase: true); +services.AddEventStore(); +``` + +`AddEventStore()` registers the store as `IEventStore`, `IEventReader`, and `IEventWriter` (with tracing wrappers when diagnostics are enabled). + +## Schema Initialization + +When `initializeDatabase: true` (or `InitializeDatabase` in config), the `SchemaInitializer` hosted service runs embedded SQL scripts on startup to create the schema, tables, and functions. The default schema name is `"eventuous"`. All database objects (tables, functions, types) are created under that schema. + +`PostgresStoreOptions` properties: +- `Schema` (string) -- database schema name, default `"eventuous"` +- `ConnectionString` (string) -- PostgreSQL connection string +- `InitializeDatabase` (bool) -- create schema on startup, default `false` + +## Subscriptions + +### PostgresAllStreamSubscription + +Polls all events across all streams (the global ordered log). Use this for cross-aggregate projections and integrations. + +```csharp +services.AddSubscription( + "MySubscription", + builder => builder + .AddEventHandler() + .WithPartitioningByStream(2) // optional: parallel processing partitioned by stream +); +``` + +`PostgresAllStreamSubscriptionOptions` inherits from `PostgresSubscriptionBaseOptions` (no additional properties). + +### PostgresStreamSubscription + +Polls events from a single named stream. Use this for stream-specific projections. + +```csharp +services.AddSubscription( + "MyStreamSub", + builder => builder + .Configure(options => options.Stream = new StreamName("MyStream-123")) + .AddEventHandler() +); +``` + +`PostgresStreamSubscriptionOptions` adds: +- `Stream` (StreamName) -- the stream name to subscribe to + +### Common Subscription Options (PostgresSubscriptionBaseOptions) + +Inherited from `SqlSubscriptionOptionsBase`: + +| Property | Type | Default | Description | +|---|---|---|---| +| `Schema` | string | `"eventuous"` | Database schema name | +| `ConcurrencyLimit` | int | 1 | Number of concurrent message consumers | +| `MaxPageSize` | int | 1024 | Messages fetched per poll | +| `Polling` | PollingOptions | see below | Polling interval configuration | +| `Retry` | RetryOptions | see below | Retry configuration | +| `GapAgeThresholdMs` | int? | 3600000 (1h) | Gaps older than this are skipped | +| `GapSkipTimeoutMs` | int? | 5000 | Max time a gap holds back the subscription | +| `GapHandlingTimeoutMs` | int? | null | When set, creates tombstones for persistent gaps | + +**PollingOptions**: `MinIntervalMs` (5), `MaxIntervalMs` (1000), `GrowFactor` (1.5) +**RetryOptions**: `InitialDelayMs` (50) + +Configure options inline: + +```csharp +services.AddSubscription( + "FastSub", + builder => builder + .Configure(o => { + o.ConcurrencyLimit = 4; + o.MaxPageSize = 512; + o.Polling = new() { MinIntervalMs = 10, MaxIntervalMs = 500 }; + }) + .AddEventHandler() +); +``` + +## Checkpoint Store + +`PostgresCheckpointStore` stores subscription checkpoints in the `{schema}.checkpoints` table (created by schema initialization). + +Register it using the extension method: + +```csharp +services.AddPostgresCheckpointStore(); +``` + +This resolves `NpgsqlDataSource` and `PostgresStoreOptions` from DI. It uses the same schema as the event store by default. To override the schema, configure `PostgresCheckpointStoreOptions`: + +```csharp +services.Configure(o => o.Schema = "my_schema"); +services.AddPostgresCheckpointStore(); +``` + +## Projections with PostgresProjector + +`PostgresProjector` is an abstract base class for projecting events into PostgreSQL read model tables. It extends `EventHandler` and provides helper methods for building `NpgsqlCommand` instances. + +```csharp +public class BookingProjection : PostgresProjector { + public BookingProjection(NpgsqlDataSource dataSource) : base(dataSource) { + // Synchronous handler: returns NpgsqlCommand + On((connection, ctx) => + Project(connection, + "insert into bookings (id, guest) values (@id, @guest) on conflict (id) do update set guest = @guest", + ("id", ctx.Stream.GetId()), + ("guest", ctx.Message.GuestId) + ) + ); + + // Async handler: returns ValueTask + On(async (connection, ctx) => { + // Can do async work before returning the command + return Project(connection, + "update bookings set confirmed = true where id = @id", + ("id", ctx.Stream.GetId()) + ); + }); + } +} +``` + +**Key API:** +- `On(ProjectToPostgres handler)` -- register sync handler (returns `NpgsqlCommand`) +- `On(ProjectToPostgresAsync handler)` -- register async handler (returns `ValueTask`) +- `Project(NpgsqlConnection, string commandText, params (string Name, object Value)[] parameters)` -- static helper that creates a parameterized `NpgsqlCommand` +- `Project(NpgsqlConnection, string commandText, params NpgsqlParameter[] parameters)` -- overload accepting `NpgsqlParameter` array + +Register the projector as an event handler on a subscription: + +```csharp +services.AddSubscription( + "ReadModelProjection", + builder => builder.AddEventHandler() +); +``` + +## Complete Registration Example + +```csharp +using Eventuous.Postgresql; +using Eventuous.Postgresql.Subscriptions; + +public static class EventuousRegistrations { + public static void AddEventuous(this IServiceCollection services, IConfiguration configuration) { + // 1. PostgreSQL infrastructure (data source, store options, schema initializer) + services.AddEventuousPostgres(configuration.GetSection("Postgres")); + + // 2. Event store (IEventStore, IEventReader, IEventWriter) + services.AddEventStore(); + + // 3. Command service + services.AddCommandService(); + + // 4. Checkpoint store in PostgreSQL + services.AddPostgresCheckpointStore(); + + // 5. All-stream subscription with projections + services.AddSubscription( + "BookingsProjections", + builder => builder + .AddEventHandler() + .WithPartitioningByStream(2) + ); + + // 6. Single-stream subscription + services.AddSubscription( + "PaymentStream", + builder => builder + .Configure(o => o.Stream = new StreamName("Payment-integration")) + .AddEventHandler() + ); + } +} +``` + +## Source Code Locations + +- Store and options: `src/Postgres/src/Eventuous.Postgresql/PostgresStore.cs` +- Registration extensions: `src/Postgres/src/Eventuous.Postgresql/Extensions/RegistrationExtensions.cs` +- Schema and initialization: `src/Postgres/src/Eventuous.Postgresql/Schema.cs`, `SchemaInitializer.cs` +- Subscriptions: `src/Postgres/src/Eventuous.Postgresql/Subscriptions/` +- Projections: `src/Postgres/src/Eventuous.Postgresql/Projections/PostgresProjector.cs` +- SQL scripts: `src/Postgres/src/Eventuous.Postgresql/Scripts/` +- Sample app: `samples/postgres/Bookings/Registrations.cs` diff --git a/skills/eventuous-rabbitmq.md b/skills/eventuous-rabbitmq.md new file mode 100644 index 00000000..5b7e0a9b --- /dev/null +++ b/skills/eventuous-rabbitmq.md @@ -0,0 +1,282 @@ +# Eventuous RabbitMQ Integration + +## NuGet Package + +``` +Eventuous.RabbitMq +``` + +This package provides both producer and subscription implementations for RabbitMQ. It depends on the `RabbitMQ.Client` library. + +## ConnectionFactory Setup + +Both producer and subscription require a `RabbitMQ.Client.ConnectionFactory` registered as a singleton. You must set `DispatchConsumersAsync = true` for the subscription to work correctly. + +```csharp +using RabbitMQ.Client; + +var connectionFactory = new ConnectionFactory { + Uri = new("amqp://guest:guest@localhost:5672"), + DispatchConsumersAsync = true +}; +services.AddSingleton(connectionFactory); +``` + +## RabbitMqProducer + +**Namespace:** `Eventuous.RabbitMq.Producers` + +`RabbitMqProducer` extends `BaseProducer` and implements `IHostedProducer`. It manages its own connection and channel, uses publisher confirms, and auto-declares exchanges on first publish. + +### Constructor Parameters + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| `connectionFactory` | `ConnectionFactory` | Yes | RabbitMQ connection factory | +| `serializer` | `IEventSerializer?` | No | Falls back to `DefaultEventSerializer.Instance` | +| `log` | `ILogger?` | No | Logger | +| `options` | `RabbitMqExchangeOptions?` | No | Exchange configuration for auto-declared exchanges | + +### RabbitMqExchangeOptions + +**Namespace:** `Eventuous.RabbitMq.Shared` + +Controls how the producer declares exchanges on first use. + +| Property | Type | Default | Description | +|----------|------|---------|-------------| +| `Type` | `string` | `ExchangeType.Fanout` | Exchange type (fanout, direct, topic, headers) | +| `Durable` | `bool` | `true` | Survive broker restart | +| `AutoDelete` | `bool` | `false` | Delete when last consumer disconnects | +| `Arguments` | `IDictionary?` | `null` | Additional exchange arguments | + +### RabbitMqProduceOptions + +Per-message options passed when producing. + +| Property | Type | Default | Description | +|----------|------|---------|-------------| +| `RoutingKey` | `string?` | `null` | Routing key (for direct/topic exchanges) | +| `AppId` | `string?` | `null` | Application name for RabbitMQ management UI | +| `Expiration` | `int?` | `null` | Message TTL in milliseconds | +| `Priority` | `byte` | `0` | Message priority (0-9) | +| `ReplyTo` | `string?` | `null` | Reply address | +| `Persisted` | `bool` | `true` | Whether the message is persistent | + +### Registration + +```csharp +// Simple registration (resolves ConnectionFactory from DI) +services.AddProducer(); + +// Factory registration with exchange options +services.AddProducer(sp => new RabbitMqProducer( + sp.GetRequiredService(), + options: new RabbitMqExchangeOptions { + Type = ExchangeType.Topic, + Durable = true + } +)); +``` + +The producer implements `IHostedService` (via `IHostedProducer`), so `AddProducer` automatically registers it as a hosted service for connection lifecycle management. + +### Producing Messages + +The stream name passed to `Produce` is used as the RabbitMQ exchange name. The exchange is auto-declared on first publish. + +```csharp +await producer.Produce( + new StreamName("PaymentsIntegration"), + myEvent, + new Metadata(), + new RabbitMqProduceOptions { RoutingKey = "payments.recorded" } +); +``` + +## RabbitMqSubscription + +**Namespace:** `Eventuous.RabbitMq.Subscriptions` + +`RabbitMqSubscription` extends `EventSubscription`. It declares the exchange, queue, and binding on subscribe, then consumes with async event handling. Messages are acknowledged individually on success and rejected (with redelivery) on failure by default. + +### RabbitMqSubscriptionOptions + +A record extending `SubscriptionOptions`. + +| Property | Type | Default | Description | +|----------|------|---------|-------------| +| `SubscriptionId` | `string` | (required) | Unique subscription ID; also used as default queue name | +| `Exchange` | `string` | (required) | Exchange to consume from | +| `ThrowOnError` | `bool` | `false` | Stop subscription on processing error | +| `ConcurrencyLimit` | `uint` | `1` | Number of concurrent consumers | +| `PrefetchCount` | `ushort` | `0` | Prefetch count; if 0, defaults to `ConcurrencyLimit * 2` | +| `FailureHandler` | `HandleEventProcessingFailure?` | `null` | Custom failure handler delegate | +| `ExchangeOptions` | `RabbitMqExchangeOptions` | fanout, durable | Exchange declaration settings | +| `QueueOptions` | `RabbitMqQueueOptions` | durable | Queue declaration settings | +| `BindingOptions` | `RabbitMqBindingOptions` | `""` routing key | Queue-to-exchange binding settings | + +### RabbitMqQueueOptions + +Nested record inside `RabbitMqSubscriptionOptions`. + +| Property | Type | Default | Description | +|----------|------|---------|-------------| +| `Queue` | `string?` | `null` | Queue name; defaults to `SubscriptionId` if null | +| `Durable` | `bool` | `true` | Survive broker restart | +| `Exclusive` | `bool` | `false` | Exclusive to this connection | +| `AutoDelete` | `bool` | `false` | Delete when last consumer disconnects | +| `Arguments` | `IDictionary?` | `null` | Additional queue arguments (e.g., dead-letter exchange) | + +### RabbitMqBindingOptions + +Nested record inside `RabbitMqSubscriptionOptions`. + +| Property | Type | Default | Description | +|----------|------|---------|-------------| +| `RoutingKey` | `string` | `""` | Binding routing key | +| `Arguments` | `IDictionary?` | `null` | Additional binding arguments | + +### Registration + +```csharp +services.AddSubscription( + "PaymentIntegration", + builder => builder + .Configure(x => { + x.Exchange = "PaymentsIntegration"; + x.ConcurrencyLimit = 2; + x.ExchangeOptions = new() { Type = ExchangeType.Fanout, Durable = true }; + x.QueueOptions = new() { Durable = true }; + x.BindingOptions = new() { RoutingKey = "" }; + }) + .AddEventHandler() +); +``` + +No checkpoint store is needed for RabbitMQ subscriptions since RabbitMQ manages delivery tracking through its own acknowledgment mechanism. + +## Gateway: Forwarding Events to RabbitMQ + +The gateway pattern connects an event store subscription to a RabbitMQ producer for cross-context integration. Requires the `Eventuous.Gateway` package. + +```csharp +using Eventuous.Gateway; +using Eventuous.RabbitMq.Producers; +using Eventuous.Postgresql.Subscriptions; + +// Register the producer +services.AddProducer(); + +// Register the gateway (subscription + producer + transform) +services.AddGateway< + PostgresAllStreamSubscription, + PostgresAllStreamSubscriptionOptions, + RabbitMqProducer, + RabbitMqProduceOptions>( + "IntegrationSubscription", + PaymentsGateway.Transform +); +``` + +The transform function maps consumed events to `GatewayMessage`: + +```csharp +public static class PaymentsGateway { + static readonly StreamName Stream = new("PaymentsIntegration"); + static readonly RabbitMqProduceOptions ProduceOptions = new(); + + public static ValueTask[]> Transform( + IMessageConsumeContext original + ) { + var result = original.Message is PaymentRecorded evt + ? new GatewayMessage( + Stream, + new BookingPaymentRecorded(evt.BookingId, evt.Amount), + new Metadata(), + ProduceOptions + ) + : null; + + return ValueTask.FromResult(result != null ? [result] : Array.Empty>()); + } +} +``` + +## Complete Example + +Producer side (publishes integration events from an event store subscription to RabbitMQ): + +```csharp +using RabbitMQ.Client; +using Eventuous.RabbitMq.Producers; +using Eventuous.Postgresql; +using Eventuous.Postgresql.Subscriptions; +using Eventuous.Projections.MongoDB; + +// ConnectionFactory +var connectionFactory = new ConnectionFactory { + Uri = new(configuration["RabbitMq:ConnectionString"]!), + DispatchConsumersAsync = true +}; +services.AddSingleton(connectionFactory); + +// Event store +services.AddEventuousPostgres(configuration.GetSection("Postgres")); +services.AddEventStore(); + +// Checkpoint store (needed for the gateway's source subscription) +services.AddCheckpointStore(); + +// Producer + Gateway +services.AddProducer(); +services.AddGateway< + PostgresAllStreamSubscription, + PostgresAllStreamSubscriptionOptions, + RabbitMqProducer, + RabbitMqProduceOptions>( + "IntegrationSubscription", + PaymentsGateway.Transform +); +``` + +Consumer side (receives integration events from RabbitMQ): + +```csharp +using RabbitMQ.Client; +using Eventuous.RabbitMq.Subscriptions; + +// ConnectionFactory +var connectionFactory = new ConnectionFactory { + Uri = new(configuration["RabbitMq:ConnectionString"]!), + DispatchConsumersAsync = true +}; +services.AddSingleton(connectionFactory); + +// Subscribe to RabbitMQ exchange +services.AddSubscription( + "PaymentIntegration", + builder => builder + .Configure(x => x.Exchange = "PaymentsIntegration") + .AddEventHandler() +); +``` + +## Key Behaviors + +- The producer uses publisher confirms (`ConfirmSelect`) and waits for broker acknowledgment. +- The stream name used in `Produce` becomes the RabbitMQ exchange name; exchanges are auto-declared. +- The subscription declares its exchange, queue, and binding automatically on startup. +- Default failure handling rejects and requeues the message (`BasicReject` with `requeue: true`). +- If `FailureHandler` is set and `ThrowOnError` is `false`, a warning is logged about incompatibility. +- Concurrency is managed via `AsyncHandlingFilter`; prefetch defaults to `ConcurrencyLimit * 2`. + +## Source Files + +- Producer: `src/RabbitMq/src/Eventuous.RabbitMq/Producers/RabbitMqProducer.cs` +- Produce options: `src/RabbitMq/src/Eventuous.RabbitMq/Producers/RabbitMqProduceOptions.cs` +- Exchange options: `src/RabbitMq/src/Eventuous.RabbitMq/Shared/RabbitMqExchangeOptions.cs` +- Subscription: `src/RabbitMq/src/Eventuous.RabbitMq/Subscriptions/RabbitMqSubscription.cs` +- Subscription options: `src/RabbitMq/src/Eventuous.RabbitMq/Subscriptions/RabbitMqSubscriptionOptions.cs` +- Sample (producer): `samples/postgres/Bookings.Payments/Registrations.cs` +- Sample (consumer): `samples/postgres/Bookings/Registrations.cs` diff --git a/skills/eventuous-sqlserver.md b/skills/eventuous-sqlserver.md new file mode 100644 index 00000000..bcf44d15 --- /dev/null +++ b/skills/eventuous-sqlserver.md @@ -0,0 +1,162 @@ +# Eventuous SQL Server Integration + +NuGet package: `Eventuous.SqlServer` +Namespace: `Eventuous.SqlServer` +Source: `src/SqlServer/src/Eventuous.SqlServer/` + +Provides event store, subscriptions, checkpoint store, and projections for SQL Server. + +## Setup + +### AddEventuousSqlServer + +Register the SQL Server event store and schema initializer: + +```csharp +// Option 1: Connection string directly +services.AddEventuousSqlServer( + connectionString: "Server=localhost;Database=mydb;...", + schema: "eventuous", // default: "eventuous" + initializeDatabase: true // creates schema on startup +); + +// Option 2: From IConfiguration +services.AddEventuousSqlServer(configuration.GetSection("SqlServer")); +``` + +This registers: +- `SqlServerStoreOptions` as singleton +- `SqlServerStore` as singleton (the event store) +- `SchemaInitializer` as hosted service (creates tables/stored procs if `InitializeDatabase = true`) +- `SqlServerConnectionOptions` for shared connection info + +### SqlServerStoreOptions + +```csharp +public record SqlServerStoreOptions { + public string? ConnectionString { get; init; } + public string Schema { get; init; } = "eventuous"; + public bool InitializeDatabase { get; init; } +} +``` + +## Event Store + +`SqlServerStore` extends `SqlEventStoreBase` and implements `IEventStore`. Default schema is `"eventuous"`. + +```csharp +services.AddEventuousSqlServer(connectionString, initializeDatabase: true); +services.AddEventStore(); +``` + +## Schema + +The `Schema` class defines stored procedure names and SQL queries scoped to the configured schema name: +- `append_events`, `read_stream_forwards`, `read_stream_backwards` +- `read_all_forwards`, `check_stream`, `truncate_stream` +- Checkpoint queries for the `Checkpoints` table + +`SchemaInitializer` is an `IHostedService` that runs embedded SQL scripts to create the schema. It retries up to 10 times with 5-second delays on `SqlException`. + +## Subscriptions + +### SqlServerAllStreamSubscription + +Subscribes to all events across all streams (uses `read_all_forwards` stored procedure). + +```csharp +services.AddSubscription( + "MyAllStreamSub", + builder => builder + .Configure(o => { + o.Schema = "eventuous"; + o.ConnectionString = connectionString; // optional if AddEventuousSqlServer was called + }) + .AddEventHandler() +); +``` + +### SqlServerStreamSubscription + +Subscribes to events in a single named stream. + +```csharp +services.AddSubscription( + "MyStreamSub", + builder => builder + .Configure(o => { + o.Stream = new StreamName("MyStream-123"); + o.ConnectionString = connectionString; + }) + .AddEventHandler() +); +``` + +### Options hierarchy + +``` +SqlSubscriptionOptionsBase (Schema, MaxPageSize, PollingInterval) + -> SqlServerSubscriptionBaseOptions (+ ConnectionString) + -> SqlServerAllStreamSubscriptionOptions + -> SqlServerStreamSubscriptionOptions (+ Stream) +``` + +Connection string and schema can come from either the subscription options or `SqlServerConnectionOptions` (registered by `AddEventuousSqlServer`). + +## Checkpoint Store + +`SqlServerCheckpointStore` implements `ICheckpointStore`. Stores checkpoints in `{schema}.Checkpoints` table. + +```csharp +services.AddSqlServerCheckpointStore(); +``` + +This uses the connection string and schema from `SqlServerConnectionOptions` (falls back to `SqlServerCheckpointStoreOptions` if configured separately). + +```csharp +public record SqlServerCheckpointStoreOptions { + public string? Schema { get; init; } + public string? ConnectionString { get; init; } +} +``` + +## Projections + +`SqlServerProjector` is the base class for SQL Server read model projections. + +```csharp +public class MyProjection : SqlServerProjector { + public MyProjection(SqlServerConnectionOptions options) : base(options) { + On((connection, ctx) => + Project(connection, + "INSERT INTO MyReadModel (Id, Name) VALUES (@id, @name)", + new SqlParameter("@id", ctx.Message.Id), + new SqlParameter("@name", ctx.Message.Name) + ) + ); + } +} +``` + +Register with a subscription: +```csharp +services.AddSubscription( + "MyProjectionSub", + builder => builder + .AddEventHandler() +); +``` + +## Complete Example + +```csharp +services.AddEventuousSqlServer(connectionString, initializeDatabase: true); +services.AddEventStore(); +services.AddSqlServerCheckpointStore(); + +services.AddSubscription( + "AllEvents", + builder => builder + .AddEventHandler() +); +``` diff --git a/skills/eventuous.md b/skills/eventuous.md new file mode 100644 index 00000000..12c94b59 --- /dev/null +++ b/skills/eventuous.md @@ -0,0 +1,427 @@ +# Eventuous - Event Sourcing for .NET + +Eventuous is a production-grade Event Sourcing library for .NET that implements DDD tactical patterns. It provides aggregates, command services, event stores, subscriptions, producers, projections, and gateway components. + +**Target frameworks:** .NET 10/9/8. C# preview language features, nullable reference types, and implicit usings enabled. + +## Infrastructure-Specific Guides + +When working with specific infrastructure, also include the relevant skill file for full registration and configuration details: + +- `eventuous-kurrentdb.md` - KurrentDB (EventStoreDB) event store, subscriptions, producer +- `eventuous-postgres.md` - PostgreSQL event store, subscriptions, projections +- `eventuous-sqlserver.md` - SQL Server event store, subscriptions +- `eventuous-mongodb.md` - MongoDB projections and checkpoint store +- `eventuous-rabbitmq.md` - RabbitMQ producer and subscription +- `eventuous-kafka.md` - Kafka producer and subscription +- `eventuous-google-pubsub.md` - Google Pub/Sub producer and subscription +- `eventuous-azure-servicebus.md` - Azure Service Bus producer and subscription +- `eventuous-gateway.md` - Event gateway for cross-context routing + +--- + +## Domain Model + +### Identity + +Strongly-typed aggregate IDs extend the abstract `Id` record. The base class validates non-empty strings and provides implicit string conversion. + +```csharp +public record BookingId(string Value) : Id(Value); +``` + +### Domain Events + +Events are immutable records decorated with `[EventType]` for serialization mapping. Group events in a static class with versioned nested classes for schema evolution. Use primitive types in events, not value objects. + +```csharp +public static class BookingEvents { + public static class V1 { + [EventType("V1.RoomBooked")] + public record RoomBooked( + string GuestId, + string RoomId, + LocalDate CheckInDate, + LocalDate CheckOutDate, + float BookingPrice, + string Currency + ); + + [EventType("V1.BookingCancelled")] + public record BookingCancelled(string Reason); + } +} +``` + +### State + +State is an abstract record reconstructed from events. Register event handlers in the parameterless constructor using `On()`. Handlers are static pure functions that return new state via `with` expressions. + +```csharp +public record BookingState : State { + public string GuestId { get; init; } = null!; + public RoomId RoomId { get; init; } = null!; + public Money Price { get; init; } = null!; + public bool Paid { get; init; } + + public BookingState() { + On(HandleBooked); + On((state, _) => state with { Cancelled = true }); + } + + static BookingState HandleBooked(BookingState state, V1.RoomBooked e) + => state with { + GuestId = e.GuestId, + RoomId = new(e.RoomId), + Price = new(e.BookingPrice, e.Currency) + }; +} +``` + +For identity-aware state, use `State` which adds an `Id` property set automatically on load: + +```csharp +public record BookingState : State { ... } +``` + +### Aggregate + +Aggregates contain business logic and invariant enforcement. They extend `Aggregate` and use `Apply()` to record events and update state. + +Key members: +- `State` - current aggregate state after all applied events +- `Changes` - pending events not yet persisted +- `Apply(evt)` - apply event, update state, add to pending changes +- `EnsureExists()` / `EnsureDoesntExist()` - invariant guards +- `OriginalVersion` / `CurrentVersion` - optimistic concurrency tracking + +```csharp +public class Booking : Aggregate { + public void BookRoom(string guestId, RoomId roomId, StayPeriod period, Money price) { + EnsureDoesntExist(); + Apply(new V1.RoomBooked(guestId, roomId, period.CheckIn, period.CheckOut, price.Amount, price.Currency)); + } + + public void Cancel(string reason) { + EnsureExists(); + Apply(new V1.BookingCancelled(reason)); + } +} +``` + +### Value Objects + +Value objects are records with validation in constructors and internal parameterless constructors for serialization: + +```csharp +public record Money { + public float Amount { get; internal init; } + public string Currency { get; internal init; } = null!; + + internal Money() { } + + public Money(float amount, string currency) { + if (amount < 0) throw new DomainException("Amount cannot be negative"); + Amount = amount; + Currency = currency; + } +} +``` + +### Domain Services + +Define external service contracts as delegates in the domain layer: + +```csharp +public static class Services { + public delegate ValueTask IsRoomAvailable(RoomId roomId, StayPeriod period); +} +``` + +--- + +## Command Services + +### Commands + +Commands are record types, optionally grouped in a static class: + +```csharp +public static class BookingCommands { + public record BookRoom(string BookingId, string GuestId, string RoomId, DateTime CheckIn, DateTime CheckOut, float Price, string Currency); + public record CancelBooking(string BookingId, string Reason); +} +``` + +### Aggregate-Based Command Service + +For rich domain models with aggregate classes. Extends `CommandService`. Register handlers in the constructor using the fluent builder chain: `On().InState(...).GetId(...).Act(...)`. + +```csharp +public class BookingsCommandService : CommandService { + public BookingsCommandService(IEventStore store, Services.IsRoomAvailable isRoomAvailable) + : base(store) { + On() + .InState(ExpectedState.New) + .GetId(cmd => new BookingId(cmd.BookingId)) + .Act((booking, cmd) => booking.BookRoom( + cmd.GuestId, + new RoomId(cmd.RoomId), + new StayPeriod(cmd.CheckIn, cmd.CheckOut), + new Money(cmd.Price, cmd.Currency) + )); + + On() + .InState(ExpectedState.Existing) + .GetId(cmd => new BookingId(cmd.BookingId)) + .Act((booking, cmd) => booking.Cancel(cmd.Reason)); + } +} +``` + +**Fluent builder chain (aggregate-based):** +1. `On()` - register handler for command type +2. `.InState(ExpectedState)` - `New`, `Existing`, or `Any` +3. `.GetId(cmd => ...)` - extract aggregate ID from command (or `.GetIdAsync(...)`) +4. Optional: `.AmendEvent(...)` - modify events before storage +5. Optional: `.ResolveStore(...)` / `.ResolveReader(...)` / `.ResolveWriter(...)` - per-command store resolution +6. `.Act((aggregate, cmd) => ...)` - sync action, or `.ActAsync((aggregate, cmd, ct) => ...)` for async + +### Functional Command Service + +For pure-function style without aggregate instances. Extends `CommandService`. Uses `GetStream` instead of `GetId`, and `Act` returns events instead of calling aggregate methods. + +```csharp +public class PaymentsService : CommandService { + public PaymentsService(IEventStore store) : base(store) { + On() + .InState(ExpectedState.New) + .GetStream(cmd => GetStream(cmd.PaymentId)) + .Act(cmd => [new PaymentRecorded(cmd.BookingId, cmd.Amount, cmd.Currency)]); + + On() + .InState(ExpectedState.Existing) + .GetStream(cmd => GetStream(cmd.PaymentId)) + .Act((state, events, cmd) => [new PaymentRefunded(cmd.PaymentId, cmd.Reason)]); + } +} +``` + +**Fluent builder chain (functional):** +1. `On()` - register handler +2. `.InState(ExpectedState)` - `New`, `Existing`, or `Any` +3. `.GetStream(cmd => ...)` - get stream name (use `GetStream(id)` helper for default naming) +4. `.Act(cmd => events)` - for new streams (returns `NewEvents` / `IEnumerable`) +5. `.Act((state, originalEvents, cmd) => events)` - for existing streams, receives current state + +### Result Type + +Both command services return `Result`: + +```csharp +var result = await service.Handle(command, cancellationToken); + +// Pattern match +result.Match( + ok => /* ok.State, ok.Changes, ok.GlobalPosition */, + error => /* error.Exception, error.ErrorMessage */ +); + +// Or check directly +if (result.Success) { + var ok = result.Get(); + // ok.State, ok.Changes +} +``` + +--- + +## Event Serialization & Type Mapping + +Events must be registered in `TypeMap` for serialization. The `[EventType]` attribute provides automatic registration. + +```csharp +// Option 1: Auto-discover all [EventType]-decorated types in loaded assemblies +TypeMap.RegisterKnownEventTypes(); + +// Option 2: Auto-discover from specific assemblies +TypeMap.RegisterKnownEventTypes(typeof(BookingEvents).Assembly); + +// Option 3: Manual registration +TypeMap.Instance.AddType("V1.RoomBooked"); +``` + +Default serializer uses `System.Text.Json`. Configure custom options: + +```csharp +DefaultEventSerializer.SetDefaultSerializer( + new DefaultEventSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)) +); +``` + +--- + +## Stream Naming + +Default pattern: `{AggregateType}-{AggregateId}` (e.g., `Booking-booking-123`). + +For functional services, `GetStream(id)` uses `{StateNameWithoutSuffix}-{id}`. + +Custom mapping: + +```csharp +var streamNameMap = new StreamNameMap(); +streamNameMap.Register(id => new StreamName($"bookings:{id.Value}")); +// Pass to command service or register in DI +``` + +Extracting ID from stream name (useful in projections): `ctx.Stream.GetId()`. + +--- + +## HTTP API + +### Controller-Based + +Extend `CommandHttpApiBase` and call `Handle()`: + +```csharp +[Route("/booking")] +public class BookingCommandApi(ICommandService service) + : CommandHttpApiBase(service) { + + [HttpPost("book")] + public Task.Ok>> BookRoom( + [FromBody] BookRoom cmd, CancellationToken ct) => Handle(cmd, ct); +} +``` + +### Minimal API with Auto-Discovery + +Annotate commands with `[HttpCommand]` and map them: + +```csharp +// On command records: +[HttpCommand(Route = "book")] +public record BookRoom(string BookingId, string GuestId, ...); + +// Or group commands under a static class: +[HttpCommands] +public static class BookingCommands { + [HttpCommand(Route = "book")] + public record BookRoom(...); +} + +// In Program.cs: +app.MapDiscoveredCommands(); +// Or map individual commands: +app.MapCommand(); +``` + +--- + +## Subscriptions + +Subscriptions deliver events to handlers in real-time. Use them for projections, integration, and event transformation. + +### Event Handler + +Extend `EventHandler` and register typed handlers: + +```csharp +public class PaymentsIntegrationHandler : EventHandler { + public PaymentsIntegrationHandler(ICommandService service) { + On(async ctx => { + await service.Handle( + new RecordPayment(ctx.Message.BookingId, ctx.Message.Amount, ctx.Message.Currency), + ctx.CancellationToken + ); + }); + } +} +``` + +The `IMessageConsumeContext` provides: `Message`, `Stream`, `GlobalPosition`, `Metadata`, `CancellationToken`. + +### Subscription Registration + +```csharp +services.AddSubscription( + "SubscriptionName", + builder => builder + .Configure(opts => { /* configure options */ }) + .UseCheckpointStore() + .AddEventHandler() + .WithPartitioningByStream(2) // optional parallel processing +); +``` + +Subscription types and checkpoint stores are infrastructure-specific (see infrastructure skill files). + +--- + +## Producers + +Producers publish messages to brokers or event stores: + +```csharp +// Registration +services.AddProducer(); + +// Usage (injected) +await producer.Produce( + new StreamName("target-stream"), + new[] { new ProducedMessage(eventObject, metadata) }, + cancellationToken +); +``` + +--- + +## DI Registration Pattern + +Standard setup in `Program.cs` or extension methods: + +```csharp +// 1. Configure serialization +DefaultEventSerializer.SetDefaultSerializer( + new DefaultEventSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)) +); + +// 2. Register event store (infrastructure-specific, see infra skill files) +services.AddEventStore(); + +// 3. Register command services +services.AddCommandService(); +// Or for functional: +services.AddCommandService(); + +// 4. Register subscriptions (infrastructure-specific) +services.AddSubscription("name", builder => ...); + +// 5. Register producers (infrastructure-specific) +services.AddProducer(); +``` + +--- + +## Diagnostics + +Built-in OpenTelemetry integration: + +```csharp +builder.Services.AddOpenTelemetry() + .WithTracing(b => b.AddEventuousTracing()) + .WithMetrics(b => b + .AddEventuous() // app + persistence metrics + .AddEventuousSubscriptions() // subscription metrics + ); +``` + +Disable diagnostics via `EVENTUOUS_DISABLE_DIAGS` environment variable. + +Spyglass diagnostic endpoint: + +```csharp +app.MapEventuousSpyglass(); +```