Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ These commands should pass before committing. Warnings or noise in test is not a
- For fast feedback, run `mix test --exclude unboxed`. This skips synchronous tests. Then run `mix test` at the end to validate your work.
- New tests that you create MUST NOT generate extra logs, even if the tests pass.

## Testing

### No `Process.sleep`

Process.sleep is not allowed in test. You can use temporarily to get things working. But then find a way to use signal passing or telemetry handlers to get rid of it.

## Using jj workspaces for isolated work

When working on a feature or fix, you can create an isolated workspace using jj:
Expand Down
26 changes: 26 additions & 0 deletions lib/sequin/consumers/benchmark_sink.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Sequin.Consumers.BenchmarkSink do
@moduledoc """
Sink configuration for benchmark consumers.

Benchmark consumers are only allowed in dev/test environments and are used
for end-to-end pipeline testing and benchmarking.
"""
use Ecto.Schema
use TypedEctoSchema

import Ecto.Changeset

@derive {Jason.Encoder, only: [:type, :partition_count]}
@primary_key false
typed_embedded_schema do
field :type, Ecto.Enum, values: [:benchmark], default: :benchmark
# Number of partitions for checksum tracking
field :partition_count, :integer, default: System.schedulers_online()
end

def changeset(struct, params) do
struct
|> cast(params, [:partition_count])
|> validate_number(:partition_count, greater_than: 0)
end
end
18 changes: 16 additions & 2 deletions lib/sequin/consumers/sink_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Sequin.Consumers.SinkConsumer do
alias Sequin.Consumers
alias Sequin.Consumers.AzureEventHubSink
alias Sequin.Consumers.Backfill
alias Sequin.Consumers.BenchmarkSink
alias Sequin.Consumers.ElasticsearchSink
alias Sequin.Consumers.Function
alias Sequin.Consumers.GcpPubsubSink
Expand Down Expand Up @@ -51,7 +52,8 @@ defmodule Sequin.Consumers.SinkConsumer do
:typesense,
:meilisearch,
:sns,
:elasticsearch
:elasticsearch,
:benchmark
]

# This is a module attribute to compile the types into the schema
Expand Down Expand Up @@ -134,7 +136,8 @@ defmodule Sequin.Consumers.SinkConsumer do
azure_event_hub: AzureEventHubSink,
typesense: TypesenseSink,
meilisearch: MeilisearchSink,
elasticsearch: ElasticsearchSink
elasticsearch: ElasticsearchSink,
benchmark: BenchmarkSink
],
on_replace: :update,
type_field_name: :type
Expand Down Expand Up @@ -204,6 +207,7 @@ defmodule Sequin.Consumers.SinkConsumer do
|> cast_embed(:source, required: true)
|> cast_embed(:source_tables)
|> put_defaults()
|> validate_type()
|> validate_message_grouping()
|> validate_enrichment()
|> validate_required([:name, :status, :replication_slot_id, :batch_size])
Expand All @@ -223,6 +227,16 @@ defmodule Sequin.Consumers.SinkConsumer do
|> Sequin.Changeset.annotations_check_constraint()
end

defp validate_type(changeset) do
sink = get_field(changeset, :sink)

if sink && sink.type == :benchmark && Application.get_env(:sequin, :env) == :prod do
add_error(changeset, :type, "invalid type: #{inspect(sink.type)}")
else
changeset
end
end

defp validate_message_grouping(changeset) do
message_grouping = get_field(changeset, :message_grouping)
source_tables = get_field(changeset, :source_tables) || []
Expand Down
12 changes: 8 additions & 4 deletions lib/sequin/postgres/benchmark_source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ defmodule Sequin.Postgres.BenchmarkSource do
{pk, partition_key, state} = maybe_repeat_pk(state, pk, partition_key)

# Compute partition for checksum
partition = :erlang.phash2(partition_key, state.config.partition_count)
# Use pk (as string) to match pipeline's group_id partitioning
partition = :erlang.phash2(to_string(pk), state.config.partition_count)

# Compute commit_idx (0-based index within transaction)
txn_size = pick_from_distribution(state.config.transaction_sizes)
Expand Down Expand Up @@ -408,10 +409,13 @@ defmodule Sequin.Postgres.BenchmarkSource do
end

defp update_checksum(state, partition, lsn, commit_idx) do
{prev_checksum, count} = Map.fetch!(state.checksums, partition)
new_checksum = :erlang.crc32(<<prev_checksum::32, lsn::64, commit_idx::32>>)
checksums =
Map.update(state.checksums, partition, {0, 0}, fn {prev_checksum, count} ->
new_checksum = :erlang.crc32(<<prev_checksum::32, lsn::64, commit_idx::32>>)
{new_checksum, count + 1}
end)

%{state | checksums: Map.put(state.checksums, partition, {new_checksum, count + 1})}
%{state | checksums: checksums}
end

defp pick_from_distribution([{_fraction, value}]) do
Expand Down
154 changes: 154 additions & 0 deletions lib/sequin/runtime/benchmark_pipeline.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
defmodule Sequin.Runtime.BenchmarkPipeline do
@moduledoc """
A sink pipeline for benchmarking that tracks message checksums per partition.

This pipeline acts as a no-op destination but maintains rolling checksums
that can be compared against BenchmarkSource checksums to verify:
1. All messages were delivered (count matches)
2. Messages were delivered in correct order per partition (checksum matches)

## Checksum Strategy

Uses the same CRC32-based rolling checksum as BenchmarkSource:
- Partition computed from `group_id` using `:erlang.phash2/2`
- Checksum: `crc32(<<prev_checksum::32, commit_lsn::64, commit_idx::32>>)`

## ETS Table

Checksums are stored in a public ETS table named `:benchmark_pipeline_checksums`.

Keys: `{consumer_id, partition}`
Values: `{checksum, count}`
"""
@behaviour Sequin.Runtime.SinkPipeline

alias Sequin.Consumers.BenchmarkSink
alias Sequin.Consumers.SinkConsumer
alias Sequin.Runtime.SinkPipeline

require Logger

@ets_table :benchmark_pipeline_checksums

# ============================================================================
# SinkPipeline Callbacks
# ============================================================================

@impl SinkPipeline
def init(context, _opts) do
consumer = context.consumer
%SinkConsumer{sink: %BenchmarkSink{partition_count: partition_count}} = consumer

ensure_ets_table()
init_checksums(consumer.id, partition_count)

context
end

@impl SinkPipeline
def handle_batch(:default, messages, _batch_info, context) do
%{consumer: %SinkConsumer{sink: %BenchmarkSink{partition_count: partition_count}} = consumer} = context

# all messages should belong to same partition
[{partition, messages}] =
messages |> Enum.group_by(&:erlang.phash2(&1.data.group_id, partition_count)) |> Enum.to_list()

Enum.each(messages, fn msg ->
update_checksum(consumer.id, partition, msg.data.commit_lsn, msg.data.commit_idx)
end)

{:ok, messages, context}
end

# ============================================================================
# Public API
# ============================================================================

@doc """
Returns the current checksums for a consumer.

Returns a map of `%{partition => {checksum, count}}`.
"""
@spec checksums(String.t()) :: %{non_neg_integer() => {non_neg_integer(), non_neg_integer()}}
def checksums(consumer_id) do
ensure_ets_table()

@ets_table
|> :ets.match({{consumer_id, :"$1"}, :"$2"})
|> Map.new(fn [partition, {checksum, count}] ->
{partition, {checksum, count}}
end)
end

@doc """
Resets all checksums for a consumer to {0, 0}.
"""
@spec reset_checksums(String.t()) :: :ok
def reset_checksums(consumer_id) do
ensure_ets_table()

@ets_table
|> :ets.match({{consumer_id, :"$1"}, :_})
|> Enum.each(fn [partition] ->
:ets.insert(@ets_table, {{consumer_id, partition}, {0, 0}})
end)

:ok
end

@doc """
Deletes all checksums for a consumer.
"""
@spec delete_checksums(String.t()) :: :ok
def delete_checksums(consumer_id) do
ensure_ets_table()

@ets_table
|> :ets.match({{consumer_id, :"$1"}, :_})
|> Enum.each(fn [partition] ->
:ets.delete(@ets_table, {consumer_id, partition})
end)

:ok
end

# ============================================================================
# Private Functions
# ============================================================================

defp ensure_ets_table do
case :ets.whereis(@ets_table) do
:undefined ->
# Create a public table that any process can write to
# Use :set for O(1) lookups and updates
:ets.new(@ets_table, [:set, :public, :named_table, {:write_concurrency, true}])

_tid ->
:ok
end
end

defp init_checksums(consumer_id, partition_count) do
Enum.each(0..(partition_count - 1), fn partition ->
# Use insert_new to avoid overwriting existing checksums on restart
:ets.insert_new(@ets_table, {{consumer_id, partition}, {0, 0}})
end)
end

defp update_checksum(consumer_id, partition, commit_lsn, commit_idx) do
key = {consumer_id, partition}

# Use update_counter for atomic updates
# Since we need to compute CRC32, we use a match-and-update pattern
case :ets.lookup(@ets_table, key) do
[{^key, {prev_checksum, count}}] ->
new_checksum = :erlang.crc32(<<prev_checksum::32, commit_lsn::64, commit_idx::32>>)
:ets.insert(@ets_table, {key, {new_checksum, count + 1}})

[] ->
# Partition not initialized, start fresh
new_checksum = :erlang.crc32(<<0::32, commit_lsn::64, commit_idx::32>>)
:ets.insert(@ets_table, {key, {new_checksum, 1}})
end
end
end
6 changes: 6 additions & 0 deletions lib/sequin/runtime/sink_pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ defmodule Sequin.Runtime.SinkPipeline do
# Ensure db is not on there
|> Ecto.reset_fields([:postgres_database])

# Guard: benchmark consumers are not allowed in production
if consumer.type == :benchmark and Application.get_env(:sequin, :env) == :prod do
raise "Benchmark consumers are not allowed in production"
end

slot_message_store_mod = Keyword.get(opts, :slot_message_store_mod, Sequin.Runtime.SlotMessageStore)
producer = Keyword.get(opts, :producer, Sequin.Runtime.ConsumerProducer)
pipeline_mod = Keyword.get(opts, :pipeline_mod, pipeline_mod_for_consumer(consumer))
Expand Down Expand Up @@ -411,6 +416,7 @@ defmodule Sequin.Runtime.SinkPipeline do
def pipeline_mod_for_consumer(%SinkConsumer{} = consumer) do
case consumer.type do
:azure_event_hub -> Sequin.Runtime.AzureEventHubPipeline
:benchmark -> Sequin.Runtime.BenchmarkPipeline
:elasticsearch -> Sequin.Runtime.ElasticsearchPipeline
:gcp_pubsub -> Sequin.Runtime.GcpPubsubPipeline
:http_push -> Sequin.Runtime.HttpPushPipeline
Expand Down
25 changes: 23 additions & 2 deletions lib/sequin/runtime/sqs_pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,20 @@ defmodule Sequin.Runtime.SqsPipeline do
def handle_batch(:default, messages, %{batch_key: queue_url}, context) do
%{
consumer: %SinkConsumer{} = consumer,
sqs_client: sqs_client,
test_pid: test_pid
} = context

setup_allowances(test_pid)

# Credentials may have expired if we are using task role
context = maybe_refresh_client(context)

sqs_messages =
Enum.map(messages, fn %{data: data} ->
build_sqs_message(consumer, data)
end)

case SQS.send_messages(sqs_client, queue_url, sqs_messages) do
case SQS.send_messages(context.sqs_client, queue_url, sqs_messages) do
:ok ->
{:ok, messages, context}

Expand Down Expand Up @@ -94,6 +96,25 @@ defmodule Sequin.Runtime.SqsPipeline do
end
end

defp maybe_refresh_client(%{consumer: %SinkConsumer{} = consumer} = context) do
# Only refresh for task role credentials, as they expire
# Explicit credentials keep using the same client
if consumer.sink.use_task_role do
case SqsSink.aws_client(consumer.sink) do
{:ok, fresh_client} ->
Map.put(context, :sqs_client, fresh_client)

{:error, reason} ->
# Log but continue (may be transient)
Logger.warning("Failed to refresh AWS client for task role: #{inspect(reason)}")
context
end
else
# Not using task roles, no refresh needed
context
end
end

defp setup_allowances(nil), do: :ok

defp setup_allowances(test_pid) do
Expand Down
Loading
Loading