diff --git a/AGENTS.md b/AGENTS.md index ceb8ccc5a..eb5febf64 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -41,6 +41,11 @@ cd .. These commands should pass before committing. Warnings or noise in test is not acceptable. +## `mix test` + +- For fast feedback, run `mix test --exclude unboxed`. This skips synchronous tests. Then run `mix test` at the end to validate your work. +- New tests that you create MUST NOT generate extra logs, even if the tests pass. + ## Using jj workspaces for isolated work When working on a feature or fix, you can create an isolated workspace using jj: diff --git a/lib/sequin/circular_buffer.ex b/lib/sequin/circular_buffer.ex new file mode 100644 index 000000000..aba1daae2 --- /dev/null +++ b/lib/sequin/circular_buffer.ex @@ -0,0 +1,159 @@ +# From https://github.com/elixir-toniq/circular_buffer/blob/main/lib/circular_buffer.ex + +# SPDX-FileCopyrightText: 2019 Chris Keathley +# SPDX-FileCopyrightText: 2020 Frank Hunleth +# SPDX-FileCopyrightText: 2022 Milton Mazzarri +# +# SPDX-License-Identifier: MIT +# +defmodule Sequin.CircularBuffer do + @moduledoc """ + Circular Buffer + + When creating a circular buffer you must specify the max size: + + ``` + cb = CircularBuffer.new(10) + ``` + + CircularBuffers are implemented as Okasaki queues like Erlang's `:queue` + module, but with additional optimizations thanks to the reduced set + of operations. + + CircularBuffer implements both the + [`Enumerable`](https://hexdocs.pm/elixir/Enumerable.html) and + [`Collectable`](https://hexdocs.pm/elixir/Collectable.html) protocols, so code + like the following works: + + iex> cb = Enum.into([1, 2, 3, 4], CircularBuffer.new(3)) + #CircularBuffer<[2, 3, 4]> + iex> Enum.map(cb, fn x -> x * 2 end) + [4, 6, 8] + """ + + alias __MODULE__, as: CB + + defstruct [:a, :b, :max_size, :count] + @typedoc "A circular buffer" + @opaque t() :: %__MODULE__{ + a: list(), + b: list(), + max_size: pos_integer(), + count: non_neg_integer() + } + + @doc """ + Creates a new circular buffer with a given size. + """ + @spec new(pos_integer()) :: t() + def new(size) when is_integer(size) and size > 0 do + %CB{a: [], b: [], max_size: size, count: 0} + end + + @doc """ + Inserts a new item into the next location of the circular buffer + """ + @spec insert(t(), any()) :: t() + def insert(%CB{b: b} = cb, item) when b != [] do + %{cb | a: [item | cb.a], b: tl(b)} + end + + def insert(%CB{count: count, max_size: max_size} = cb, item) when count < max_size do + %{cb | a: [item | cb.a], count: cb.count + 1} + end + + def insert(%CB{b: []} = cb, item) do + new_b = cb.a |> Enum.reverse() |> tl() + %{cb | a: [item], b: new_b} + end + + @doc """ + Converts a circular buffer to a list. The list is ordered from oldest to newest + elements based on their insertion order. + """ + @spec to_list(t()) :: list() + def to_list(%CB{} = cb) do + cb.b ++ Enum.reverse(cb.a) + end + + @doc """ + Returns the newest element in the buffer + + ## Examples + + iex> cb = CircularBuffer.new(3) + iex> CircularBuffer.newest(cb) + nil + iex> cb = Enum.reduce(1..4, cb, fn n, cb -> CircularBuffer.insert(cb, n) end) + iex> CircularBuffer.newest(cb) + 4 + + """ + @spec newest(t()) :: any() + def newest(%CB{a: [newest | _rest]}), do: newest + def newest(%CB{b: []}), do: nil + + @doc """ + Returns the oldest element in the buffer + """ + @spec oldest(t()) :: any() + def oldest(%CB{b: [oldest | _rest]}), do: oldest + def oldest(%CB{a: a}), do: List.last(a) + + @doc """ + Checks the buffer to see if its empty + + Returns `true` if the given circular buffer is empty, otherwise `false`. + + ## Examples + + iex> cb = CircularBuffer.new(1) + iex> CircularBuffer.empty?(cb) + true + iex> cb |> CircularBuffer.insert(1) |> CircularBuffer.empty?() + false + + """ + @spec empty?(t()) :: boolean() + def empty?(%CB{} = cb) do + cb.count == 0 + end + + defimpl Enumerable do + def count(cb) do + {:ok, cb.count} + end + + def member?(cb, element) do + {:ok, Enum.member?(cb.a, element) or Enum.member?(cb.b, element)} + end + + def reduce(cb, acc, fun) do + Enumerable.List.reduce(CB.to_list(cb), acc, fun) + end + + def slice(_cb) do + {:error, __MODULE__} + end + end + + defimpl Collectable do + def into(original) do + collector_fn = fn + cb, {:cont, elem} -> CB.insert(cb, elem) + cb, :done -> cb + _cb, :halt -> :ok + end + + {original, collector_fn} + end + end + + defimpl Inspect do + import Inspect.Algebra + + def inspect(cb, opts) do + concat(["#CircularBuffer<", to_doc(CB.to_list(cb), opts), ">"]) + end + end +end diff --git a/lib/sequin/postgres/backend.ex b/lib/sequin/postgres/backend.ex new file mode 100644 index 000000000..de4cd6dc2 --- /dev/null +++ b/lib/sequin/postgres/backend.ex @@ -0,0 +1,30 @@ +defmodule Sequin.Postgres.Backend do + @moduledoc """ + Behaviour for Postgres replication protocol backends. + + Allows SlotProducer to work with different backends: + - PostgrexBackend: Real Postgrex.Protocol wrapper + - FakegresBackend: Fake backend for benchmarking + """ + + @type state :: Postgrex.Protocol.state() | Sequin.Postgres.FakegresBackend.State.t() + @type copies :: [binary()] + + @callback connect(opts :: keyword()) :: {:ok, state()} | {:error, term()} + + @callback handle_streaming(query :: String.t(), state()) :: + {:ok, state()} | {:error, term(), state()} + + @callback checkin(state()) :: {:ok, state()} | {:error, term()} + + @callback handle_copy_recv(socket_msg :: term(), max :: pos_integer(), state()) :: + {:ok, copies(), state()} | {:error | :disconnect, term(), state()} + + @callback handle_copy_send(msg :: iodata(), state()) :: + :ok | {:error | :disconnect, term(), state()} + + @callback handle_simple(query :: String.t(), params :: list(), state()) :: + {:ok, list(), state()} | {:error, term()} + + @callback disconnect(reason :: term(), state()) :: :ok +end diff --git a/lib/sequin/postgres/benchmark_source.ex b/lib/sequin/postgres/benchmark_source.ex new file mode 100644 index 000000000..dfce96e8c --- /dev/null +++ b/lib/sequin/postgres/benchmark_source.ex @@ -0,0 +1,504 @@ +defmodule Sequin.Postgres.BenchmarkSource do + @moduledoc """ + A virtual WAL source for benchmarking the replication pipeline. + + Generates endless WAL protocol messages on demand with configurable behavior patterns: + - Variable row sizes (including occasional large rows) + - Variable transaction sizes + - PK collision simulation + - Back-to-back same-PK messages + + ## Usage + + {:ok, pid} = BenchmarkSource.start_link( + id: some_id, + row_sizes: [{0.99, 200}, {0.01, 1_000_000}], + transaction_sizes: [{1.0, 10}], + pk_collision_rate: 0.1, + repeat_frequency: 0.05 + ) + + # Get checksums for verification + checksums = BenchmarkSource.checksums(id) + """ + + @behaviour Sequin.Postgres.Source + + use GenServer + use TypedStruct + + alias Sequin.CircularBuffer + alias Sequin.Postgres.Source + + require Logger + + # Postgres epoch: 2000-01-01 00:00:00 UTC + @pg_epoch 946_684_800_000_000 + + # Hardcoded tables - 3 tables with partition column + @tables [ + %{oid: 16_384, name: "benchmark_events_1", schema: "public"}, + %{oid: 16_385, name: "benchmark_events_2", schema: "public"}, + %{oid: 16_386, name: "benchmark_events_3", schema: "public"} + ] + + @columns [ + %{name: "id", type: :bigint, position: 1}, + %{name: "partition_key", type: :text, position: 2}, + %{name: "payload", type: :bytea, position: 3}, + %{name: "created_at", type: :timestamptz, position: 4} + ] + + @num_columns length(@columns) + + defmodule Config do + @moduledoc "Configuration for BenchmarkSource" + use TypedStruct + + typedstruct do + # [{fraction, size_bytes}] - distribution of payload sizes + # e.g., [{0.99, 200}, {0.01, 1_000_000}] = 99% small, 1% 1MB + field :row_sizes, [{float(), pos_integer()}], default: [{1.0, 200}] + + # [{fraction, count}] - messages per transaction + # e.g., [{0.9, 10}, {0.1, 100}] = 90% have 10 msgs, 10% have 100 + field :transaction_sizes, [{float(), pos_integer()}], default: [{1.0, 10}] + + # 0.0-1.0 - probability of reusing a PK from recent pool + field :pk_collision_rate, float(), default: 0.005 + + # 0.0-1.0 - probability of emitting back-to-back same-PK messages + field :repeat_frequency, float(), default: 0.01 + + # Number of partitions for checksum tracking (matches consumer partitions) + field :partition_count, pos_integer(), default: System.schedulers_online() + + # Size of circular buffer for recent PKs (for collision simulation) + field :pk_pool_size, pos_integer(), default: 100_000 + end + end + + defmodule State do + @moduledoc false + use TypedStruct + + typedstruct do + # Unique identifier for this instance + field :id, term(), enforce: true + + # The producer to send :tcp messages to + field :producer, pid() + + field :config, Config.t(), enforce: true + + # Current LSN (monotonically increasing) + field :current_lsn, pos_integer(), default: 1 + + # Current transaction state + field :current_xid, pos_integer(), default: 1 + field :transaction_commit_lsn, pos_integer() + field :transaction_timestamp, pos_integer() + field :transaction_messages_remaining, non_neg_integer(), default: 0 + field :in_transaction, boolean(), default: false + + # Checksums per partition: %{partition => {checksum, count}} + field :checksums, %{non_neg_integer() => {non_neg_integer(), non_neg_integer()}}, enforce: true + + # Circular buffer of recent PKs for collision simulation + field :recent_pks, CircularBuffer.t(), enforce: true + + # Stats + field :total_messages, non_neg_integer(), default: 0 + field :total_bytes, non_neg_integer(), default: 0 + field :total_transactions, non_neg_integer(), default: 0 + end + end + + # ============================================================================ + # Public API + # ============================================================================ + + def via_tuple(id) do + {:via, :syn, {:replication, {__MODULE__, id}}} + end + + @doc """ + Starts the BenchmarkSource server. + + ## Options + + - `:id` - Required. Unique identifier for this instance. + + See `Sequin.Postgres.BenchmarkSource.Config` for other available options. + """ + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts) do + id = Keyword.fetch!(opts, :id) + GenServer.start_link(__MODULE__, opts, name: via_tuple(id)) + end + + @doc """ + Registers a producer to receive :tcp messages from this source. + + Once set, will immediately send the first :tcp message to kick off + the replication flow. + """ + @impl Source + def set_producer(id, producer) do + GenServer.call(via_tuple(id), {:set_producer, producer}) + end + + @doc """ + Receives up to `max_count` WAL copy messages. + + Returns a list of binary messages ready to be processed by SlotProducer. + Note: The actual count may exceed max_count slightly to complete transactions. + + After returning, will send another :tcp message to the producer + to indicate more data is available. + """ + @impl Source + def recv_copies(id, max_count) do + GenServer.call(via_tuple(id), {:recv_copies, max_count}) + end + + @doc """ + Handles an ack message from the replication protocol. + + Parses the LSN from the ack and tracks it. + """ + @impl Source + def handle_ack(id, ack_msg) do + GenServer.cast(via_tuple(id), {:handle_ack, ack_msg}) + :ok + end + + @doc """ + Returns the current checksums per partition. + + Format: %{partition => {checksum, count}} + """ + @spec checksums(term()) :: %{non_neg_integer() => {non_neg_integer(), non_neg_integer()}} + def checksums(id) do + GenServer.call(via_tuple(id), :checksums) + end + + @doc """ + Returns statistics about messages generated. + """ + @spec stats(term()) :: map() + def stats(id) do + GenServer.call(via_tuple(id), :stats) + end + + @doc """ + Returns the table definitions (for Relation messages). + """ + @spec tables() :: [map()] + def tables, do: @tables + + @doc """ + Returns the column definitions. + """ + @spec columns() :: [map()] + def columns, do: @columns + + # ============================================================================ + # GenServer Callbacks + # ============================================================================ + + @impl GenServer + def init(opts) do + id = Keyword.fetch!(opts, :id) + config_opts = Keyword.delete(opts, :id) + config = struct!(Config, config_opts) + + checksums = + Map.new(0..(config.partition_count - 1), fn p -> {p, {0, 0}} end) + + state = %State{ + id: id, + config: config, + checksums: checksums, + recent_pks: CircularBuffer.new(config.pk_pool_size) + } + + {:ok, state} + end + + @impl GenServer + def handle_call({:set_producer, producer}, _from, state) do + {:reply, :ok, %{state | producer: producer}, {:continue, :send_next_tcp}} + end + + def handle_call({:recv_copies, max_count}, _from, state) do + {copies, state} = generate_copies(state, max_count, []) + {:reply, copies, state, {:continue, :send_next_tcp}} + end + + def handle_call(:checksums, _from, state) do + {:reply, state.checksums, state} + end + + def handle_call(:stats, _from, state) do + stats = %{ + total_messages: state.total_messages, + total_bytes: state.total_bytes, + total_transactions: state.total_transactions, + current_lsn: state.current_lsn + } + + {:reply, stats, state} + end + + @impl GenServer + def handle_cast({:handle_ack, _ack_msg}, state) do + # TODO: Parse and track acked LSN + {:noreply, state} + end + + @impl GenServer + def handle_continue(:send_next_tcp, %State{producer: nil} = state) do + # No producer registered yet, nothing to do + {:noreply, state} + end + + def handle_continue(:send_next_tcp, %State{producer: producer} = state) do + send(producer, {:tcp, :benchmark_source, :data_ready}) + {:noreply, state} + end + + # ============================================================================ + # Message Generation + # ============================================================================ + + defp generate_copies(state, 0, acc) do + {Enum.reverse(acc), state} + end + + defp generate_copies(state, remaining, acc) do + {copies, state} = generate_copy(state) + # copies is a list (may include BEGIN, UPDATE, COMMIT) + generate_copies(state, remaining - 1, Enum.reverse(copies, acc)) + end + + defp generate_copy(state) do + {begin_copy, state} = maybe_start_transaction(state) + {update_copy, state} = generate_change_message(state) + {commit_copy, state} = maybe_end_transaction(state) + + copies = Enum.reject([begin_copy, update_copy, commit_copy], &is_nil/1) + + {copies, state} + end + + defp maybe_start_transaction(%State{in_transaction: false} = state) do + txn_size = pick_from_distribution(state.config.transaction_sizes) + + # The commit LSN will be after all the change messages + commit_lsn = state.current_lsn + txn_size + + # Capture timestamp once for the entire transaction (BEGIN and COMMIT must match) + timestamp = pg_timestamp() + + state = %{ + state + | in_transaction: true, + transaction_messages_remaining: txn_size, + transaction_commit_lsn: commit_lsn, + transaction_timestamp: timestamp, + current_xid: state.current_xid + 1 + } + + begin_copy = encode_begin(commit_lsn, state.current_xid, timestamp) + {begin_copy, state} + end + + defp maybe_start_transaction(state), do: {nil, state} + + defp maybe_end_transaction(%State{transaction_messages_remaining: 1} = state) do + commit_copy = encode_commit(state.transaction_commit_lsn, state.transaction_timestamp) + + state = %{ + state + | in_transaction: false, + transaction_messages_remaining: 0, + transaction_commit_lsn: nil, + transaction_timestamp: nil, + total_transactions: state.total_transactions + 1 + } + + {commit_copy, state} + end + + defp maybe_end_transaction(%State{transaction_messages_remaining: n} = state) when n > 1 do + {nil, %{state | transaction_messages_remaining: n - 1}} + end + + defp generate_change_message(state) do + # Pick table (round-robin for now) + table = Enum.at(@tables, rem(state.total_messages, length(@tables))) + + # Generate or pick PK + {pk, partition_key, state} = generate_pk(state) + + # Pick row size + row_size = pick_from_distribution(state.config.row_sizes) + + # Maybe repeat (back-to-back same PK) + {pk, partition_key, state} = maybe_repeat_pk(state, pk, partition_key) + + # Compute partition for checksum + partition = :erlang.phash2(partition_key, state.config.partition_count) + + # Compute commit_idx (0-based index within transaction) + txn_size = pick_from_distribution(state.config.transaction_sizes) + commit_idx = txn_size - state.transaction_messages_remaining + + # Update checksum for this partition + state = update_checksum(state, partition, state.transaction_commit_lsn, commit_idx) + + # Generate payload + payload = :binary.copy(<<0>>, row_size) + timestamp = format_timestamp() + + # Encode the WAL UPDATE message + copy = encode_update(table.oid, pk, partition_key, payload, timestamp, state.current_lsn) + + state = %{ + state + | current_lsn: state.current_lsn + 1, + total_messages: state.total_messages + 1, + total_bytes: state.total_bytes + byte_size(copy), + recent_pks: CircularBuffer.insert(state.recent_pks, {pk, partition_key}) + } + + {copy, state} + end + + defp generate_pk(state) do + if should_collide?(state) and not CircularBuffer.empty?(state.recent_pks) do + # Pick a random PK from the recent pool + recent_list = CircularBuffer.to_list(state.recent_pks) + {pk, partition_key} = Enum.random(recent_list) + {pk, partition_key, state} + else + # Generate new PK using fast unique integer + pk = System.unique_integer([:positive]) + partition_key = "partition_#{rem(pk, 1000)}" + {pk, partition_key, state} + end + end + + defp maybe_repeat_pk(state, pk, partition_key) do + if should_repeat?(state) do + # Use same PK again (will generate another message with same PK) + {pk, partition_key, state} + else + {pk, partition_key, state} + end + end + + defp should_collide?(state) do + state.config.pk_collision_rate > 0 and :rand.uniform() < state.config.pk_collision_rate + end + + defp should_repeat?(state) do + state.config.repeat_frequency > 0 and :rand.uniform() < state.config.repeat_frequency + end + + defp update_checksum(state, partition, lsn, commit_idx) do + {prev_checksum, count} = Map.fetch!(state.checksums, partition) + new_checksum = :erlang.crc32(<>) + + %{state | checksums: Map.put(state.checksums, partition, {new_checksum, count + 1})} + end + + defp pick_from_distribution([{_fraction, value}]) do + value + end + + defp pick_from_distribution(distribution) do + random = :rand.uniform() + + distribution + |> Enum.reduce_while({0.0, nil}, fn {fraction, value}, {cumulative, _} -> + new_cumulative = cumulative + fraction + + if random <= new_cumulative do + {:halt, {:found, value}} + else + {:cont, {new_cumulative, value}} + end + end) + |> case do + {:found, value} -> value + {_, value} -> value + end + end + + # ============================================================================ + # WAL Protocol Encoding + # ============================================================================ + + # Wrap a message in the copy data format + # Format: ?w + wal_start(64) + wal_end(64) + send_time(64) + message + defp wrap_copy(msg, lsn) do + send_time = pg_timestamp() + <> + end + + # BEGIN message + # Format: ?B + final_lsn(64) + timestamp(64) + xid(32) + defp encode_begin(commit_lsn, xid, timestamp) do + lsn_binary = <<0::32, commit_lsn::32>> + msg = <<"B", lsn_binary::binary, timestamp::64, xid::32>> + wrap_copy(msg, commit_lsn) + end + + # COMMIT message + # Format: ?C + flags(8) + lsn(64) + end_lsn(64) + timestamp(64) + defp encode_commit(commit_lsn, timestamp) do + lsn_binary = <<0::32, commit_lsn::32>> + msg = <<"C", 0::8, lsn_binary::binary, lsn_binary::binary, timestamp::64>> + wrap_copy(msg, commit_lsn) + end + + # UPDATE message (new tuple only, no old tuple) + # Format: ?U + relation_id(32) + ?N + num_columns(16) + tuple_data + defp encode_update(relation_id, pk, partition_key, payload, timestamp, lsn) do + tuple_data = encode_tuple_data(pk, partition_key, payload, timestamp) + msg = <<"U", relation_id::32, "N", @num_columns::16, tuple_data::binary>> + wrap_copy(msg, lsn) + end + + # Encode tuple data for our 4 columns: id, partition_key, payload, created_at + # Each column: ?t + length(32) + value + defp encode_tuple_data(pk, partition_key, payload, timestamp) do + # Column 1: id (bigint as text) + id_text = Integer.to_string(pk) + id_col = <<"t", byte_size(id_text)::32, id_text::binary>> + + # Column 2: partition_key (text) + pk_col = <<"t", byte_size(partition_key)::32, partition_key::binary>> + + # Column 3: payload (bytea as hex-escaped text, but we'll use raw for simplicity) + # In real PG, bytea is sent as hex-escaped, but the decoder handles raw bytes too + payload_col = <<"t", byte_size(payload)::32, payload::binary>> + + # Column 4: created_at (timestamptz as text) + ts_col = <<"t", byte_size(timestamp)::32, timestamp::binary>> + + <> + end + + # Get current time as Postgres timestamp (microseconds since 2000-01-01) + defp pg_timestamp do + System.os_time(:microsecond) - @pg_epoch + end + + # Format timestamp as ISO8601 string + defp format_timestamp do + DateTime.to_iso8601(DateTime.utc_now()) + end +end diff --git a/lib/sequin/postgres/postgrex_backend.ex b/lib/sequin/postgres/postgrex_backend.ex new file mode 100644 index 000000000..c7d82a17f --- /dev/null +++ b/lib/sequin/postgres/postgrex_backend.ex @@ -0,0 +1,31 @@ +defmodule Sequin.Postgres.PostgrexBackend do + @moduledoc """ + Real Postgres backend using Postgrex.Protocol. + + This is the default backend used by SlotProducer for actual + Postgres replication connections. + """ + + @behaviour Sequin.Postgres.Backend + + @impl true + defdelegate connect(opts), to: Postgrex.Protocol + + @impl true + defdelegate handle_streaming(query, state), to: Postgrex.Protocol + + @impl true + defdelegate checkin(state), to: Postgrex.Protocol + + @impl true + defdelegate handle_copy_recv(msg, max, state), to: Postgrex.Protocol + + @impl true + defdelegate handle_copy_send(msg, state), to: Postgrex.Protocol + + @impl true + defdelegate handle_simple(query, params, state), to: Postgrex.Protocol + + @impl true + defdelegate disconnect(reason, state), to: Postgrex.Protocol +end diff --git a/lib/sequin/postgres/source.ex b/lib/sequin/postgres/source.ex new file mode 100644 index 000000000..b9de5a093 --- /dev/null +++ b/lib/sequin/postgres/source.ex @@ -0,0 +1,38 @@ +defmodule Sequin.Postgres.Source do + @moduledoc """ + Behaviour for virtual WAL sources used with `VirtualBackend`. + + Implementations generate or provide WAL messages that flow through + the replication pipeline without a real Postgres connection. + + ## Implementations + + - `Sequin.Postgres.BenchmarkSource` - Generates endless synthetic WAL + messages for benchmarking throughput + - `Sequin.Postgres.MockSource` (future) - Controllable source for tests + where you can push specific messages + """ + + @type id :: term() + + @doc """ + Registers a producer to receive `:tcp` messages from this source. + + The source should send `{:tcp, source_name, :data_ready}` to the producer + when data is available. + """ + @callback set_producer(id(), producer :: pid()) :: :ok + + @doc """ + Receives up to `max_count` WAL copy messages. + + Returns a list of binary WAL messages. After returning, the source + should send another `:tcp` message to keep the producer saturated. + """ + @callback recv_copies(id(), max_count :: pos_integer()) :: [binary()] + + @doc """ + Handles an ack message from the replication protocol. + """ + @callback handle_ack(id(), msg :: iodata()) :: :ok +end diff --git a/lib/sequin/postgres/virtual_backend.ex b/lib/sequin/postgres/virtual_backend.ex new file mode 100644 index 000000000..2e8ba9422 --- /dev/null +++ b/lib/sequin/postgres/virtual_backend.ex @@ -0,0 +1,84 @@ +defmodule Sequin.Postgres.VirtualBackend do + @moduledoc """ + Virtual Postgres backend that delegates to a configurable source. + + This backend adapts any `Sequin.Postgres.Source` implementation to work + with SlotProducer, enabling benchmarking and testing without a real + Postgres connection. + + ## Usage + + # Start the source server first + {:ok, _pid} = BenchmarkSource.start_link(id: some_id, ...) + + # Pass to SlotProducer with source_mod in connect_opts + SlotProducer.start_link( + backend_mod: Sequin.Postgres.VirtualBackend, + connect_opts: [id: some_id, source_mod: BenchmarkSource], + ... + ) + """ + + @behaviour Sequin.Postgres.Backend + + defmodule State do + @moduledoc false + use TypedStruct + + typedstruct do + field :id, term(), enforce: true + field :source_mod, module(), enforce: true + end + end + + @impl true + def connect(opts) do + id = Keyword.fetch!(opts, :id) + source_mod = Keyword.fetch!(opts, :source_mod) + # Register the producer with the source - this will trigger the first :tcp message + # We pass self() since we're running in the SlotProducer process context + source_mod.set_producer(id, self()) + {:ok, %State{id: id, source_mod: source_mod}} + end + + @impl true + def handle_streaming(_query, state) do + # No-op for virtual backend - we're always "streaming" + {:ok, state} + end + + @impl true + def checkin(state) do + # No-op - the source handles sending :tcp messages + {:ok, state} + end + + @impl true + def handle_copy_recv(_socket_msg, max, %State{} = state) do + copies = state.source_mod.recv_copies(state.id, max) + {:ok, copies, state} + end + + @impl true + def handle_copy_send(msg, %State{} = state) do + state.source_mod.handle_ack(state.id, msg) + :ok + end + + @impl true + def handle_simple(_query, _params, state) do + # Return a fake restart_lsn for init_restart_wal_cursor + fake_result = %Postgrex.Result{ + rows: [["0/1"]], + columns: ["restart_lsn"], + num_rows: 1 + } + + {:ok, [fake_result], state} + end + + @impl true + def disconnect(_reason, _state) do + :ok + end +end diff --git a/lib/sequin/runtime/slot_producer/slot_producer.ex b/lib/sequin/runtime/slot_producer/slot_producer.ex index b2280f689..4e33b8769 100644 --- a/lib/sequin/runtime/slot_producer/slot_producer.ex +++ b/lib/sequin/runtime/slot_producer/slot_producer.ex @@ -13,12 +13,12 @@ defmodule Sequin.Runtime.SlotProducer do use Sequin.GenerateBehaviour - alias Postgrex.Protocol alias Sequin.Error alias Sequin.Error.NotFoundError alias Sequin.Health alias Sequin.Health.Event alias Sequin.Postgres + alias Sequin.Postgres.Backend alias Sequin.ProcessMetrics alias Sequin.Replication alias Sequin.Runtime.PostgresAdapter.Decoder @@ -107,8 +107,9 @@ defmodule Sequin.Runtime.SlotProducer do field :publication_name, String.t() field :pg_major_version, integer() field :conn, (-> Postgres.db_conn()) - # Postgres replication connection - field :protocol, Postgrex.Protocol.state() + # Postgres replication backend + field :backend_mod, module() + field :backend_state, Backend.state() field :connect_opts, keyword() field :on_connect_fail, (any() -> any()) field :on_disconnect, (-> :ok) @@ -206,7 +207,8 @@ defmodule Sequin.Runtime.SlotProducer do Keyword.get(opts, :restart_wal_cursor_update_interval, to_timeout(second: 10)), consumer_mod: Keyword.get_lazy(opts, :consumer_mod, fn -> PipelineDefaults.processor_mod() end), conn: Keyword.fetch!(opts, :conn), - setting_batch_flush_interval: Keyword.get(opts, :batch_flush_interval) + setting_batch_flush_interval: Keyword.get(opts, :batch_flush_interval), + backend_mod: Keyword.get(opts, :backend_mod, Sequin.Postgres.PostgrexBackend) } if test_pid = opts[:test_pid] do @@ -241,13 +243,13 @@ defmodule Sequin.Runtime.SlotProducer do @impl GenStage def handle_info(:connect, %State{} = state) do - with {:ok, protocol} <- Protocol.connect(state.connect_opts), + with {:ok, backend_state} <- state.backend_mod.connect(state.connect_opts), Logger.info("[SlotProducer] Connected"), :ok <- put_connected_health(state.id), - {:ok, %State{} = state, protocol} <- init_restart_wal_cursor(state, protocol), - {:ok, protocol} <- Protocol.handle_streaming(start_replication_query(state), protocol), - {:ok, protocol} <- Protocol.checkin(protocol) do - state = %{state | protocol: protocol, status: :active} + {:ok, %State{} = state, backend_state} <- init_restart_wal_cursor(state, backend_state), + {:ok, backend_state} <- state.backend_mod.handle_streaming(start_replication_query(state), backend_state), + {:ok, backend_state} <- state.backend_mod.checkin(backend_state) do + state = %{state | backend_state: backend_state, status: :active} state = schedule_timers(state) {:noreply, [], state} else @@ -255,7 +257,7 @@ defmodule Sequin.Runtime.SlotProducer do reason = case error do {:error, msg} -> msg - {:error, msg, %Protocol{}} -> msg + {:error, msg, _backend_state} -> msg end error_msg = if is_exception(reason), do: Exception.message(reason), else: inspect(reason) @@ -274,17 +276,18 @@ defmodule Sequin.Runtime.SlotProducer do raise "Unexpectedly received a second socket message while buffering sock messages" end - def handle_info(msg, %State{protocol: protocol} = state) when is_socket_message(msg) do + def handle_info(msg, %State{backend_state: backend_state} = state) when is_socket_message(msg) do maybe_log_message(state) - with {:ok, copies, protocol} <- Protocol.handle_copy_recv(msg, @max_messages_per_protocol_read, protocol), - {:ok, state} <- handle_copies(copies, %{state | protocol: protocol}) do + with {:ok, copies, backend_state} <- + state.backend_mod.handle_copy_recv(msg, @max_messages_per_protocol_read, backend_state), + {:ok, state} <- handle_copies(copies, %{state | backend_state: backend_state}) do {messages, state} = maybe_produce_and_flush(state) state = maybe_toggle_buffering(state) {:noreply, messages, state} else - {error, reason, protocol} -> - handle_disconnect(error, reason, %{state | protocol: protocol}) + {error, reason, backend_state} -> + handle_disconnect(error, reason, %{state | backend_state: backend_state}) end end @@ -622,15 +625,15 @@ defmodule Sequin.Runtime.SlotProducer do defp maybe_toggle_buffering(%State{status: :active} = state), do: state - defp init_restart_wal_cursor(%State{} = state, protocol) do + defp init_restart_wal_cursor(%State{} = state, backend_state) do query = "select restart_lsn from pg_replication_slots where slot_name = '#{state.slot_name}'" case Replication.restart_wal_cursor(state.id) do {:error, %NotFoundError{}} -> - case Protocol.handle_simple(query, [], protocol) do - {:ok, [%Postgrex.Result{rows: [[lsn]]}], protocol} when not is_nil(lsn) -> + case state.backend_mod.handle_simple(query, [], backend_state) do + {:ok, [%Postgrex.Result{rows: [[lsn]]}], backend_state} when not is_nil(lsn) -> cursor = %{commit_lsn: Postgres.lsn_to_int(lsn), commit_idx: 0} - {:ok, %{state | restart_wal_cursor: cursor}, protocol} + {:ok, %{state | restart_wal_cursor: cursor}, backend_state} {:ok, _res} -> {:error, @@ -648,7 +651,7 @@ defmodule Sequin.Runtime.SlotProducer do end {:ok, cursor} -> - {:ok, %{state | restart_wal_cursor: cursor}, protocol} + {:ok, %{state | restart_wal_cursor: cursor}, backend_state} end end @@ -656,7 +659,7 @@ defmodule Sequin.Runtime.SlotProducer do defp handle_disconnect(error, reason, %State{} = state) when error in [:error, :disconnect] do Logger.error("[SlotProducer] Replication disconnected: #{inspect(reason)}") - Protocol.disconnect(%RuntimeError{}, state.protocol) + state.backend_mod.disconnect(%RuntimeError{}, state.backend_state) Process.send_after(self(), :connect, state.setting_reconnect_interval) if is_function(state.on_disconnect) do @@ -748,12 +751,12 @@ defmodule Sequin.Runtime.SlotProducer do msg = ack_message(state.restart_wal_cursor.commit_lsn) - case Protocol.handle_copy_send(msg, state.protocol) do + case state.backend_mod.handle_copy_send(msg, state.backend_state) do :ok -> {:noreply, [], %{state | last_sent_restart_wal_cursor: state.restart_wal_cursor}} - {error, reason, protocol} -> - handle_disconnect(error, reason, %{state | protocol: protocol}) + {error, reason, backend_state} -> + handle_disconnect(error, reason, %{state | backend_state: backend_state}) end end diff --git a/mix.exs b/mix.exs index c8512f1da..219cbbff3 100644 --- a/mix.exs +++ b/mix.exs @@ -13,6 +13,10 @@ defmodule Sequin.MixProject do ] end + def cli do + [preferred_envs: ["test.unboxed": :test]] + end + # Configuration for the OTP application. # # Type `mix help compile.app` for more information. @@ -159,6 +163,7 @@ defmodule Sequin.MixProject do &remove_consumer_messages_log/1 ], test: ["ecto.create --quiet", "ecto.migrate --quiet", "test"], + "test.unboxed": ["ecto.create --quiet", "ecto.migrate --quiet", "test --exclude unboxed"], "assets.setup": ["cmd --cd assets npm install"], "assets.build": ["tailwind sequin", "esbuild sequin"], "assets.deploy": [ diff --git a/test/sequin/postgres/benchmark_source_test.exs b/test/sequin/postgres/benchmark_source_test.exs new file mode 100644 index 000000000..b2940f35d --- /dev/null +++ b/test/sequin/postgres/benchmark_source_test.exs @@ -0,0 +1,179 @@ +defmodule Sequin.Postgres.BenchmarkSourceTest do + @moduledoc """ + Tests for the BenchmarkSource WAL message generator. + + These tests verify that BenchmarkSource generates valid WAL messages that flow + through SlotProducer correctly, without requiring a real Postgres connection. + """ + use Sequin.Case, async: true + + alias Sequin.Postgres.BenchmarkSource + alias Sequin.Postgres.VirtualBackend + alias Sequin.Runtime.SlotProducer + alias Sequin.Runtime.SlotProducer.ProcessorBehaviour + + defmodule TestProcessor do + @moduledoc false + @behaviour ProcessorBehaviour + + use GenStage + + def start_link(opts), do: GenStage.start_link(__MODULE__, opts) + + def get_messages(pid), do: GenStage.call(pid, :get_messages) + + @impl ProcessorBehaviour + def handle_relation(server, relation), do: GenStage.call(server, {:handle_relation, relation}) + + @impl ProcessorBehaviour + def handle_batch_marker(server, batch_marker), do: GenStage.sync_info(server, {:handle_batch_marker, batch_marker}) + + @impl GenStage + def init(opts) do + %{producer: producer, test_pid: test_pid, max_demand: max_demand, min_demand: min_demand} = opts + + state = %{ + test_pid: test_pid, + messages: [], + producer: nil + } + + {:consumer, state, subscribe_to: [{producer, max_demand: max_demand, min_demand: min_demand}]} + end + + @impl GenStage + def handle_events(events, _from, state) do + send(state.test_pid, {:messages_received, events}) + new_messages = state.messages ++ events + + {:noreply, [], %{state | messages: new_messages}} + end + + @impl GenStage + def handle_call(:get_messages, _from, state), do: {:reply, state.messages, [], state} + + def handle_call({:handle_relation, relation}, _from, state) do + send(state.test_pid, {:relation_received, relation}) + {:reply, :ok, [], state} + end + + @impl GenStage + def handle_subscribe(:producer, _opts, producer, state) do + {:automatic, %{state | producer: producer}} + end + + @impl GenStage + def handle_info({:handle_batch_marker, batch_marker}, state) do + send(state.test_pid, {:batch_marker_received, batch_marker}) + {:noreply, [], state} + end + end + + setup do + %{id: System.unique_integer([:positive])} + end + + describe "WAL message generation" do + test "generates valid WAL messages that decode through SlotProducer", %{id: id} do + start_benchmark_source!(id, transaction_sizes: [{1.0, 5}]) + producer = start_producer!(id) + start_processor!(producer) + + messages = receive_messages(50) + + first_msg = List.first(messages) + assert %SlotProducer.Message{} = first_msg + assert is_integer(first_msg.commit_lsn) + assert is_integer(first_msg.commit_idx) + assert first_msg.kind in [:insert, :update, :delete] + end + + test "computes checksums per partition", %{id: id} do + partition_count = 4 + start_benchmark_source!(id, partition_count: partition_count, transaction_sizes: [{1.0, 3}]) + producer = start_producer!(id) + start_processor!(producer) + + receive_messages(100) + + checksums = BenchmarkSource.checksums(id) + assert map_size(checksums) == partition_count + + total_count = + Enum.reduce(checksums, 0, fn {_partition, {_checksum, count}}, acc -> acc + count end) + + assert total_count >= 100 + end + + test "tracks stats correctly", %{id: id} do + start_benchmark_source!(id, transaction_sizes: [{1.0, 5}]) + producer = start_producer!(id) + start_processor!(producer) + + receive_messages(50) + + stats = BenchmarkSource.stats(id) + assert stats.total_messages >= 50 + assert stats.total_transactions >= 10 + assert stats.total_bytes > 0 + end + end + + # Helpers + + defp start_benchmark_source!(id, opts) do + start_supervised!({BenchmarkSource, Keyword.put(opts, :id, id)}) + end + + defp start_producer!(id, opts \\ []) do + producer_opts = + Keyword.merge( + [ + id: id, + database_id: id, + account_id: id, + slot_name: "test_slot", + publication_name: "test_publication", + connect_opts: [id: id, source_mod: BenchmarkSource], + pg_major_version: 17, + backend_mod: VirtualBackend, + conn: nil, + ack_interval: 1000, + restart_wal_cursor_update_interval: 5000, + restart_wal_cursor_fn: fn _id, cursor -> cursor end, + on_connect_fail_fn: fn _state, _reason -> :ok end, + consumer_mod: TestProcessor, + test_pid: self() + ], + opts + ) + + start_supervised!({SlotProducer, producer_opts}) + end + + defp start_processor!(producer, opts \\ []) do + processor_opts = %{ + producer: producer, + test_pid: self(), + max_demand: Keyword.get(opts, :max_demand, 10), + min_demand: Keyword.get(opts, :min_demand, 5), + target_count: Keyword.get(opts, :target_count, 100) + } + + start_supervised!({TestProcessor, processor_opts}) + end + + defp receive_messages(count, acc \\ []) + + defp receive_messages(count, acc) when length(acc) >= count do + Enum.take(acc, count) + end + + defp receive_messages(count, acc) do + receive do + {:messages_received, messages} -> receive_messages(count, acc ++ messages) + after + 100 -> acc + end + end +end