diff --git a/AGENTS.md b/AGENTS.md index eb5febf64..db29c7682 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -46,6 +46,12 @@ These commands should pass before committing. Warnings or noise in test is not a - For fast feedback, run `mix test --exclude unboxed`. This skips synchronous tests. Then run `mix test` at the end to validate your work. - New tests that you create MUST NOT generate extra logs, even if the tests pass. +## Testing + +### No `Process.sleep` + +Process.sleep is not allowed in test. You can use temporarily to get things working. But then find a way to use signal passing or telemetry handlers to get rid of it. + ## Using jj workspaces for isolated work When working on a feature or fix, you can create an isolated workspace using jj: diff --git a/lib/sequin/consumers/benchmark_sink.ex b/lib/sequin/consumers/benchmark_sink.ex new file mode 100644 index 000000000..23df0fd19 --- /dev/null +++ b/lib/sequin/consumers/benchmark_sink.ex @@ -0,0 +1,26 @@ +defmodule Sequin.Consumers.BenchmarkSink do + @moduledoc """ + Sink configuration for benchmark consumers. + + Benchmark consumers are only allowed in dev/test environments and are used + for end-to-end pipeline testing and benchmarking. + """ + use Ecto.Schema + use TypedEctoSchema + + import Ecto.Changeset + + @derive {Jason.Encoder, only: [:type, :partition_count]} + @primary_key false + typed_embedded_schema do + field :type, Ecto.Enum, values: [:benchmark], default: :benchmark + # Number of partitions for checksum tracking + field :partition_count, :integer, default: System.schedulers_online() + end + + def changeset(struct, params) do + struct + |> cast(params, [:partition_count]) + |> validate_number(:partition_count, greater_than: 0) + end +end diff --git a/lib/sequin/consumers/sink_consumer.ex b/lib/sequin/consumers/sink_consumer.ex index 92e1e31b0..8e0e0aea0 100644 --- a/lib/sequin/consumers/sink_consumer.ex +++ b/lib/sequin/consumers/sink_consumer.ex @@ -11,6 +11,7 @@ defmodule Sequin.Consumers.SinkConsumer do alias Sequin.Consumers alias Sequin.Consumers.AzureEventHubSink alias Sequin.Consumers.Backfill + alias Sequin.Consumers.BenchmarkSink alias Sequin.Consumers.ElasticsearchSink alias Sequin.Consumers.Function alias Sequin.Consumers.GcpPubsubSink @@ -51,7 +52,8 @@ defmodule Sequin.Consumers.SinkConsumer do :typesense, :meilisearch, :sns, - :elasticsearch + :elasticsearch, + :benchmark ] # This is a module attribute to compile the types into the schema @@ -134,7 +136,8 @@ defmodule Sequin.Consumers.SinkConsumer do azure_event_hub: AzureEventHubSink, typesense: TypesenseSink, meilisearch: MeilisearchSink, - elasticsearch: ElasticsearchSink + elasticsearch: ElasticsearchSink, + benchmark: BenchmarkSink ], on_replace: :update, type_field_name: :type @@ -204,6 +207,7 @@ defmodule Sequin.Consumers.SinkConsumer do |> cast_embed(:source, required: true) |> cast_embed(:source_tables) |> put_defaults() + |> validate_type() |> validate_message_grouping() |> validate_enrichment() |> validate_required([:name, :status, :replication_slot_id, :batch_size]) @@ -223,6 +227,16 @@ defmodule Sequin.Consumers.SinkConsumer do |> Sequin.Changeset.annotations_check_constraint() end + defp validate_type(changeset) do + sink = get_field(changeset, :sink) + + if sink && sink.type == :benchmark && Application.get_env(:sequin, :env) == :prod do + add_error(changeset, :type, "invalid type: #{inspect(sink.type)}") + else + changeset + end + end + defp validate_message_grouping(changeset) do message_grouping = get_field(changeset, :message_grouping) source_tables = get_field(changeset, :source_tables) || [] diff --git a/lib/sequin/postgres/benchmark_source.ex b/lib/sequin/postgres/benchmark_source.ex index dfce96e8c..09995d9cc 100644 --- a/lib/sequin/postgres/benchmark_source.ex +++ b/lib/sequin/postgres/benchmark_source.ex @@ -349,7 +349,8 @@ defmodule Sequin.Postgres.BenchmarkSource do {pk, partition_key, state} = maybe_repeat_pk(state, pk, partition_key) # Compute partition for checksum - partition = :erlang.phash2(partition_key, state.config.partition_count) + # Use pk (as string) to match pipeline's group_id partitioning + partition = :erlang.phash2(to_string(pk), state.config.partition_count) # Compute commit_idx (0-based index within transaction) txn_size = pick_from_distribution(state.config.transaction_sizes) @@ -408,10 +409,13 @@ defmodule Sequin.Postgres.BenchmarkSource do end defp update_checksum(state, partition, lsn, commit_idx) do - {prev_checksum, count} = Map.fetch!(state.checksums, partition) - new_checksum = :erlang.crc32(<>) + checksums = + Map.update(state.checksums, partition, {0, 0}, fn {prev_checksum, count} -> + new_checksum = :erlang.crc32(<>) + {new_checksum, count + 1} + end) - %{state | checksums: Map.put(state.checksums, partition, {new_checksum, count + 1})} + %{state | checksums: checksums} end defp pick_from_distribution([{_fraction, value}]) do diff --git a/lib/sequin/runtime/benchmark_pipeline.ex b/lib/sequin/runtime/benchmark_pipeline.ex new file mode 100644 index 000000000..b1ce3ced1 --- /dev/null +++ b/lib/sequin/runtime/benchmark_pipeline.ex @@ -0,0 +1,154 @@ +defmodule Sequin.Runtime.BenchmarkPipeline do + @moduledoc """ + A sink pipeline for benchmarking that tracks message checksums per partition. + + This pipeline acts as a no-op destination but maintains rolling checksums + that can be compared against BenchmarkSource checksums to verify: + 1. All messages were delivered (count matches) + 2. Messages were delivered in correct order per partition (checksum matches) + + ## Checksum Strategy + + Uses the same CRC32-based rolling checksum as BenchmarkSource: + - Partition computed from `group_id` using `:erlang.phash2/2` + - Checksum: `crc32(<>)` + + ## ETS Table + + Checksums are stored in a public ETS table named `:benchmark_pipeline_checksums`. + + Keys: `{consumer_id, partition}` + Values: `{checksum, count}` + """ + @behaviour Sequin.Runtime.SinkPipeline + + alias Sequin.Consumers.BenchmarkSink + alias Sequin.Consumers.SinkConsumer + alias Sequin.Runtime.SinkPipeline + + require Logger + + @ets_table :benchmark_pipeline_checksums + + # ============================================================================ + # SinkPipeline Callbacks + # ============================================================================ + + @impl SinkPipeline + def init(context, _opts) do + consumer = context.consumer + %SinkConsumer{sink: %BenchmarkSink{partition_count: partition_count}} = consumer + + ensure_ets_table() + init_checksums(consumer.id, partition_count) + + context + end + + @impl SinkPipeline + def handle_batch(:default, messages, _batch_info, context) do + %{consumer: %SinkConsumer{sink: %BenchmarkSink{partition_count: partition_count}} = consumer} = context + + # all messages should belong to same partition + [{partition, messages}] = + messages |> Enum.group_by(&:erlang.phash2(&1.data.group_id, partition_count)) |> Enum.to_list() + + Enum.each(messages, fn msg -> + update_checksum(consumer.id, partition, msg.data.commit_lsn, msg.data.commit_idx) + end) + + {:ok, messages, context} + end + + # ============================================================================ + # Public API + # ============================================================================ + + @doc """ + Returns the current checksums for a consumer. + + Returns a map of `%{partition => {checksum, count}}`. + """ + @spec checksums(String.t()) :: %{non_neg_integer() => {non_neg_integer(), non_neg_integer()}} + def checksums(consumer_id) do + ensure_ets_table() + + @ets_table + |> :ets.match({{consumer_id, :"$1"}, :"$2"}) + |> Map.new(fn [partition, {checksum, count}] -> + {partition, {checksum, count}} + end) + end + + @doc """ + Resets all checksums for a consumer to {0, 0}. + """ + @spec reset_checksums(String.t()) :: :ok + def reset_checksums(consumer_id) do + ensure_ets_table() + + @ets_table + |> :ets.match({{consumer_id, :"$1"}, :_}) + |> Enum.each(fn [partition] -> + :ets.insert(@ets_table, {{consumer_id, partition}, {0, 0}}) + end) + + :ok + end + + @doc """ + Deletes all checksums for a consumer. + """ + @spec delete_checksums(String.t()) :: :ok + def delete_checksums(consumer_id) do + ensure_ets_table() + + @ets_table + |> :ets.match({{consumer_id, :"$1"}, :_}) + |> Enum.each(fn [partition] -> + :ets.delete(@ets_table, {consumer_id, partition}) + end) + + :ok + end + + # ============================================================================ + # Private Functions + # ============================================================================ + + defp ensure_ets_table do + case :ets.whereis(@ets_table) do + :undefined -> + # Create a public table that any process can write to + # Use :set for O(1) lookups and updates + :ets.new(@ets_table, [:set, :public, :named_table, {:write_concurrency, true}]) + + _tid -> + :ok + end + end + + defp init_checksums(consumer_id, partition_count) do + Enum.each(0..(partition_count - 1), fn partition -> + # Use insert_new to avoid overwriting existing checksums on restart + :ets.insert_new(@ets_table, {{consumer_id, partition}, {0, 0}}) + end) + end + + defp update_checksum(consumer_id, partition, commit_lsn, commit_idx) do + key = {consumer_id, partition} + + # Use update_counter for atomic updates + # Since we need to compute CRC32, we use a match-and-update pattern + case :ets.lookup(@ets_table, key) do + [{^key, {prev_checksum, count}}] -> + new_checksum = :erlang.crc32(<>) + :ets.insert(@ets_table, {key, {new_checksum, count + 1}}) + + [] -> + # Partition not initialized, start fresh + new_checksum = :erlang.crc32(<<0::32, commit_lsn::64, commit_idx::32>>) + :ets.insert(@ets_table, {key, {new_checksum, 1}}) + end + end +end diff --git a/lib/sequin/runtime/sink_pipeline.ex b/lib/sequin/runtime/sink_pipeline.ex index 822df1b3f..a55577a15 100644 --- a/lib/sequin/runtime/sink_pipeline.ex +++ b/lib/sequin/runtime/sink_pipeline.ex @@ -114,6 +114,11 @@ defmodule Sequin.Runtime.SinkPipeline do # Ensure db is not on there |> Ecto.reset_fields([:postgres_database]) + # Guard: benchmark consumers are not allowed in production + if consumer.type == :benchmark and Application.get_env(:sequin, :env) == :prod do + raise "Benchmark consumers are not allowed in production" + end + slot_message_store_mod = Keyword.get(opts, :slot_message_store_mod, Sequin.Runtime.SlotMessageStore) producer = Keyword.get(opts, :producer, Sequin.Runtime.ConsumerProducer) pipeline_mod = Keyword.get(opts, :pipeline_mod, pipeline_mod_for_consumer(consumer)) @@ -411,6 +416,7 @@ defmodule Sequin.Runtime.SinkPipeline do def pipeline_mod_for_consumer(%SinkConsumer{} = consumer) do case consumer.type do :azure_event_hub -> Sequin.Runtime.AzureEventHubPipeline + :benchmark -> Sequin.Runtime.BenchmarkPipeline :elasticsearch -> Sequin.Runtime.ElasticsearchPipeline :gcp_pubsub -> Sequin.Runtime.GcpPubsubPipeline :http_push -> Sequin.Runtime.HttpPushPipeline diff --git a/lib/sequin/runtime/sqs_pipeline.ex b/lib/sequin/runtime/sqs_pipeline.ex index 17287793c..64945df33 100644 --- a/lib/sequin/runtime/sqs_pipeline.ex +++ b/lib/sequin/runtime/sqs_pipeline.ex @@ -54,18 +54,20 @@ defmodule Sequin.Runtime.SqsPipeline do def handle_batch(:default, messages, %{batch_key: queue_url}, context) do %{ consumer: %SinkConsumer{} = consumer, - sqs_client: sqs_client, test_pid: test_pid } = context setup_allowances(test_pid) + # Credentials may have expired if we are using task role + context = maybe_refresh_client(context) + sqs_messages = Enum.map(messages, fn %{data: data} -> build_sqs_message(consumer, data) end) - case SQS.send_messages(sqs_client, queue_url, sqs_messages) do + case SQS.send_messages(context.sqs_client, queue_url, sqs_messages) do :ok -> {:ok, messages, context} @@ -94,6 +96,25 @@ defmodule Sequin.Runtime.SqsPipeline do end end + defp maybe_refresh_client(%{consumer: %SinkConsumer{} = consumer} = context) do + # Only refresh for task role credentials, as they expire + # Explicit credentials keep using the same client + if consumer.sink.use_task_role do + case SqsSink.aws_client(consumer.sink) do + {:ok, fresh_client} -> + Map.put(context, :sqs_client, fresh_client) + + {:error, reason} -> + # Log but continue (may be transient) + Logger.warning("Failed to refresh AWS client for task role: #{inspect(reason)}") + context + end + else + # Not using task roles, no refresh needed + context + end + end + defp setup_allowances(nil), do: :ok defp setup_allowances(test_pid) do diff --git a/test/sequin/runtime/benchmark_pipeline_test.exs b/test/sequin/runtime/benchmark_pipeline_test.exs new file mode 100644 index 000000000..8c120c8d6 --- /dev/null +++ b/test/sequin/runtime/benchmark_pipeline_test.exs @@ -0,0 +1,223 @@ +defmodule Sequin.Runtime.BenchmarkPipelineTest do + @moduledoc """ + Tests for the BenchmarkPipeline sink. + + These tests verify that BenchmarkPipeline correctly tracks checksums and that + messages flow through the full pipeline correctly. + """ + use Sequin.DataCase, async: true + + alias Sequin.Factory.AccountsFactory + alias Sequin.Factory.ConsumersFactory + alias Sequin.Factory.DatabasesFactory + alias Sequin.Factory.ReplicationFactory + alias Sequin.Runtime.BenchmarkPipeline + alias Sequin.Runtime.SinkPipeline + alias Sequin.Runtime.SlotMessageStore + alias Sequin.Runtime.SlotMessageStoreSupervisor + + describe "checksums/1 and reset_checksums/1" do + test "returns empty map when no checksums exist" do + checksums = BenchmarkPipeline.checksums("nonexistent-consumer") + assert checksums == %{} + end + + test "reset_checksums/1 resets checksums to {0, 0}" do + consumer_id = "test-consumer-#{System.unique_integer()}" + + # Manually insert some checksums + :ets.new(:benchmark_pipeline_checksums, [:set, :public, :named_table, {:write_concurrency, true}]) + :ets.insert(:benchmark_pipeline_checksums, {{consumer_id, 0}, {123, 5}}) + :ets.insert(:benchmark_pipeline_checksums, {{consumer_id, 1}, {456, 10}}) + + BenchmarkPipeline.reset_checksums(consumer_id) + + checksums = BenchmarkPipeline.checksums(consumer_id) + assert checksums[0] == {0, 0} + assert checksums[1] == {0, 0} + end + + test "delete_checksums/1 removes all checksums for a consumer" do + consumer_id = "test-consumer-#{System.unique_integer()}" + + # Manually insert some checksums + :ets.new(:benchmark_pipeline_checksums, [:set, :public, :named_table, {:write_concurrency, true}]) + :ets.insert(:benchmark_pipeline_checksums, {{consumer_id, 0}, {123, 5}}) + :ets.insert(:benchmark_pipeline_checksums, {{consumer_id, 1}, {456, 10}}) + + BenchmarkPipeline.delete_checksums(consumer_id) + + checksums = BenchmarkPipeline.checksums(consumer_id) + assert checksums == %{} + end + end + + describe "messages flow through SlotMessageStore to benchmark pipeline" do + setup do + account = AccountsFactory.insert_account!() + database = DatabasesFactory.insert_postgres_database!(account_id: account.id) + + replication = + ReplicationFactory.insert_postgres_replication!( + account_id: account.id, + postgres_database_id: database.id + ) + + # Create a benchmark consumer + consumer = + ConsumersFactory.insert_sink_consumer!( + account_id: account.id, + type: :benchmark, + sink: %{type: :benchmark, partition_count: 4}, + replication_slot_id: replication.id + ) + + {:ok, %{consumer: consumer, database: database}} + end + + test "messages are processed and checksums are tracked", %{consumer: consumer} do + test_pid = self() + + # Start the SlotMessageStoreSupervisor + start_supervised!({SlotMessageStoreSupervisor, [consumer_id: consumer.id, test_pid: test_pid]}) + + # Create multiple consumer events with known commit_lsn and commit_idx + events = + for i <- 1..10 do + ConsumersFactory.consumer_event( + consumer_id: consumer.id, + commit_lsn: 1000 + i, + commit_idx: i, + group_id: "group-#{rem(i, 4)}" + ) + end + + # Put messages into the store + SlotMessageStore.put_messages(consumer, events) + + # Start the pipeline + start_supervised!({SinkPipeline, [consumer_id: consumer.id, test_pid: test_pid]}) + + # Wait for all messages to be processed + await_acks(10) + + # Verify checksums were tracked + checksums = BenchmarkPipeline.checksums(consumer.id) + assert map_size(checksums) == 4 + + # Verify total count matches + total_count = + Enum.reduce(checksums, 0, fn {_partition, {_checksum, count}}, acc -> acc + count end) + + assert total_count == 10 + end + + test "checksum computation matches expected formula", %{consumer: consumer} do + test_pid = self() + + # Start the SlotMessageStoreSupervisor + start_supervised!({SlotMessageStoreSupervisor, [consumer_id: consumer.id, test_pid: test_pid]}) + + # Create a single event with known values + group_id = "test-group" + commit_lsn = 12_345 + commit_idx = 0 + + event = + ConsumersFactory.consumer_event( + consumer_id: consumer.id, + commit_lsn: commit_lsn, + commit_idx: commit_idx, + group_id: group_id + ) + + # Put the message into the store + SlotMessageStore.put_messages(consumer, [event]) + + # Start the pipeline + start_supervised!({SinkPipeline, [consumer_id: consumer.id, test_pid: test_pid]}) + + # Wait for message to be processed + await_acks(1) + + # Compute expected checksum manually + partition = :erlang.phash2(group_id, 4) + expected_checksum = :erlang.crc32(<<0::32, commit_lsn::64, commit_idx::32>>) + + checksums = BenchmarkPipeline.checksums(consumer.id) + {actual_checksum, count} = checksums[partition] + + assert count == 1 + assert actual_checksum == expected_checksum + end + + test "checksums are order-sensitive (rolling checksum)", %{consumer: consumer} do + test_pid = self() + + # Start the SlotMessageStoreSupervisor + start_supervised!({SlotMessageStoreSupervisor, [consumer_id: consumer.id, test_pid: test_pid]}) + + # Create two events for the same partition + group_id = "same-partition" + partition = :erlang.phash2(group_id, 4) + + events = [ + ConsumersFactory.consumer_event( + consumer_id: consumer.id, + commit_lsn: 100, + commit_idx: 0, + group_id: group_id + ), + ConsumersFactory.consumer_event( + consumer_id: consumer.id, + commit_lsn: 200, + commit_idx: 0, + group_id: group_id + ) + ] + + # Put messages into the store + SlotMessageStore.put_messages(consumer, events) + + # Start the pipeline + start_supervised!({SinkPipeline, [consumer_id: consumer.id, test_pid: test_pid]}) + + # Wait for messages to be processed + await_acks(2) + + checksums = BenchmarkPipeline.checksums(consumer.id) + {actual_checksum, count} = checksums[partition] + + assert count == 2 + + # Verify the checksum is a rolling checksum (not just a single value). + # The order of processing within a batch may vary, so we accept either order. + # Order A: 100 then 200 + checksum_100_first = :erlang.crc32(<<0::32, 100::64, 0::32>>) + expected_order_a = :erlang.crc32(<>) + + # Order B: 200 then 100 + checksum_200_first = :erlang.crc32(<<0::32, 200::64, 0::32>>) + expected_order_b = :erlang.crc32(<>) + + assert actual_checksum in [expected_order_a, expected_order_b], + "Checksum #{actual_checksum} doesn't match expected order A (#{expected_order_a}) or B (#{expected_order_b})" + end + end + + defp await_acks(count, acc \\ []) + + defp await_acks(count, acc) when length(acc) >= count do + acc + end + + defp await_acks(count, acc) do + receive do + {SinkPipeline, :ack_finished, successful_ack_ids, []} -> + await_acks(count, acc ++ successful_ack_ids) + after + 5_000 -> + raise "Timed out waiting for acks. Expected #{count}, got #{length(acc)}" + end + end +end diff --git a/test/support/factory/consumers_factory.ex b/test/support/factory/consumers_factory.ex index 51ef60b31..e0f6876e9 100644 --- a/test/support/factory/consumers_factory.ex +++ b/test/support/factory/consumers_factory.ex @@ -5,6 +5,7 @@ defmodule Sequin.Factory.ConsumersFactory do alias Sequin.Consumers alias Sequin.Consumers.AzureEventHubSink alias Sequin.Consumers.Backfill + alias Sequin.Consumers.BenchmarkSink alias Sequin.Consumers.ConsumerEvent alias Sequin.Consumers.ConsumerEventData alias Sequin.Consumers.ElasticsearchSink @@ -301,6 +302,16 @@ defmodule Sequin.Factory.ConsumersFactory do merge_attributes(%SequinStreamSink{type: :sequin_stream}, attrs) end + defp sink(:benchmark, _account_id, attrs) do + merge_attributes( + %BenchmarkSink{ + type: :benchmark, + partition_count: Map.get(attrs, :partition_count, System.schedulers_online()) + }, + attrs + ) + end + defp sink(:gcp_pubsub, _account_id, attrs) do merge_attributes( %GcpPubsubSink{