diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index 29f3ae023a..fa44b13aa2 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -9,6 +9,7 @@
+
@@ -20,6 +21,10 @@
+
+
+
+
@@ -52,6 +57,7 @@
+
@@ -92,4 +98,4 @@
-
+
\ No newline at end of file
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/.editorconfig b/src/ServiceControl.Audit.Persistence.Sql.Core/.editorconfig
new file mode 100644
index 0000000000..fc68ac3228
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/.editorconfig
@@ -0,0 +1,9 @@
+[*.cs]
+
+# Justification: ServiceControl app has no synchronization context
+dotnet_diagnostic.CA2007.severity = none
+
+# Disable style rules for auto-generated EF migrations
+[Migrations/**.cs]
+dotnet_diagnostic.IDE0065.severity = none
+generated_code = true
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/AuditSqlPersisterSettings.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/AuditSqlPersisterSettings.cs
new file mode 100644
index 0000000000..7893f9ecc3
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/AuditSqlPersisterSettings.cs
@@ -0,0 +1,18 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Abstractions;
+
+public abstract class AuditSqlPersisterSettings : PersistenceSettings
+{
+ protected AuditSqlPersisterSettings(
+ TimeSpan auditRetentionPeriod,
+ bool enableFullTextSearchOnBodies,
+ int maxBodySizeToStore)
+ : base(auditRetentionPeriod, enableFullTextSearchOnBodies, maxBodySizeToStore)
+ {
+ }
+
+ public required string ConnectionString { get; set; }
+ public int CommandTimeout { get; set; } = 30;
+ public bool EnableSensitiveDataLogging { get; set; } = false;
+ public int MinBodySizeForCompression { get; set; } = 4096;
+ public bool StoreMessageBodiesOnDisk { get; set; } = true;
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/BaseAuditPersistence.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/BaseAuditPersistence.cs
new file mode 100644
index 0000000000..da04f5f6a4
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/BaseAuditPersistence.cs
@@ -0,0 +1,30 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Abstractions;
+
+using Azure.Storage.Blobs;
+using Implementation;
+using Implementation.UnitOfWork;
+using Infrastructure;
+using Microsoft.Extensions.DependencyInjection;
+using ServiceControl.Audit.Auditing.BodyStorage;
+using ServiceControl.Audit.Persistence.UnitOfWork;
+
+public abstract class BaseAuditPersistence
+{
+ protected static void RegisterDataStores(IServiceCollection services, AuditSqlPersisterSettings settings)
+ {
+ services.AddSingleton();
+ if (!string.IsNullOrEmpty(settings.MessageBodyStoragePath))
+ {
+ services.AddSingleton();
+ }
+ else
+ {
+ services.AddSingleton();
+ }
+ services.AddScoped();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton(TimeProvider.System);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/IAuditDatabaseMigrator.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/IAuditDatabaseMigrator.cs
new file mode 100644
index 0000000000..e987af8f5b
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/IAuditDatabaseMigrator.cs
@@ -0,0 +1,6 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Abstractions;
+
+public interface IAuditDatabaseMigrator
+{
+ Task ApplyMigrations(CancellationToken cancellationToken = default);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/MinimumRequiredStorageState.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/MinimumRequiredStorageState.cs
new file mode 100644
index 0000000000..72fd05b84a
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/MinimumRequiredStorageState.cs
@@ -0,0 +1,6 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Abstractions;
+
+public class MinimumRequiredStorageState
+{
+ public bool CanIngestMore { get; set; } = true;
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/DbContexts/AuditDbContextBase.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/DbContexts/AuditDbContextBase.cs
new file mode 100644
index 0000000000..36211fd95e
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/DbContexts/AuditDbContextBase.cs
@@ -0,0 +1,40 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.DbContexts;
+
+using Entities;
+using EntityConfigurations;
+using Microsoft.EntityFrameworkCore;
+
+public abstract class AuditDbContextBase : DbContext
+{
+ protected AuditDbContextBase(DbContextOptions options) : base(options)
+ {
+ }
+
+ public DbSet ProcessedMessages { get; set; }
+ public DbSet FailedAuditImports { get; set; }
+ public DbSet SagaSnapshots { get; set; }
+ public DbSet KnownEndpoints { get; set; }
+ public DbSet KnownEndpointsInsertOnly { get; set; }
+
+ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
+ {
+ optionsBuilder.EnableDetailedErrors();
+ }
+
+ protected override void OnModelCreating(ModelBuilder modelBuilder)
+ {
+ base.OnModelCreating(modelBuilder);
+
+ modelBuilder.ApplyConfiguration(new ProcessedMessageConfiguration());
+ modelBuilder.ApplyConfiguration(new FailedAuditImportConfiguration());
+ modelBuilder.ApplyConfiguration(new SagaSnapshotConfiguration());
+ modelBuilder.ApplyConfiguration(new KnownEndpointConfiguration());
+ modelBuilder.ApplyConfiguration(new KnownEndpointInsertOnlyConfiguration());
+
+ OnModelCreatingProvider(modelBuilder);
+ }
+
+ protected virtual void OnModelCreatingProvider(ModelBuilder modelBuilder)
+ {
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/FailedAuditImportEntity.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/FailedAuditImportEntity.cs
new file mode 100644
index 0000000000..8292a48cf2
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/FailedAuditImportEntity.cs
@@ -0,0 +1,8 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Entities;
+
+public class FailedAuditImportEntity
+{
+ public Guid Id { get; set; }
+ public string MessageJson { get; set; } = null!;
+ public string? ExceptionInfo { get; set; }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/KnownEndpointEntity.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/KnownEndpointEntity.cs
new file mode 100644
index 0000000000..21ef329ca7
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/KnownEndpointEntity.cs
@@ -0,0 +1,13 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Entities;
+
+public class KnownEndpointEntity
+{
+ public Guid Id { get; set; }
+ public string? Name { get; set; }
+
+ public Guid HostId { get; set; }
+
+ public string? Host { get; set; }
+
+ public DateTime LastSeen { get; set; }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/KnownEndpointInsertOnlyEntity.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/KnownEndpointInsertOnlyEntity.cs
new file mode 100644
index 0000000000..0f6fb46c5c
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/KnownEndpointInsertOnlyEntity.cs
@@ -0,0 +1,15 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Entities;
+
+public class KnownEndpointInsertOnlyEntity
+{
+ public long Id { get; set; }
+ public Guid KnownEndpointId { get; set; }
+
+ public string? Name { get; set; }
+
+ public Guid HostId { get; set; }
+
+ public string? Host { get; set; }
+
+ public DateTime LastSeen { get; set; }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/ProcessedMessageEntity.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/ProcessedMessageEntity.cs
new file mode 100644
index 0000000000..cecc8d6715
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/ProcessedMessageEntity.cs
@@ -0,0 +1,35 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Entities;
+
+public class ProcessedMessageEntity
+{
+ public long Id { get; set; }
+ public string UniqueMessageId { get; set; } = null!;
+
+ // JSON columns for complex nested data
+ public string HeadersJson { get; set; } = null!;
+
+ // Full-text search column (combines headers JSON and message body for indexing)
+ public string? SearchableContent { get; set; }
+
+ // Denormalized fields for efficient querying
+ public string? MessageId { get; set; }
+ public string? MessageType { get; set; }
+ public DateTime? TimeSent { get; set; }
+ public DateTime CreatedOn { get; set; }
+ public bool IsSystemMessage { get; set; }
+ public int Status { get; set; }
+ public string? ConversationId { get; set; }
+
+ // Endpoint details (denormalized from MessageMetadata)
+ public string? ReceivingEndpointName { get; set; }
+
+ // Performance metrics (stored as ticks for precision)
+ public long? CriticalTimeTicks { get; set; }
+ public long? ProcessingTimeTicks { get; set; }
+ public long? DeliveryTimeTicks { get; set; }
+
+ // Body storage info
+ public int BodySize { get; set; }
+ public string? BodyUrl { get; set; }
+ public bool BodyNotStored { get; set; }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/SagaSnapshotEntity.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/SagaSnapshotEntity.cs
new file mode 100644
index 0000000000..4b158572bb
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/SagaSnapshotEntity.cs
@@ -0,0 +1,18 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Entities;
+
+using ServiceControl.SagaAudit;
+
+public class SagaSnapshotEntity
+{
+ public long Id { get; set; }
+ public Guid SagaId { get; set; }
+ public string? SagaType { get; set; }
+ public DateTime StartTime { get; set; }
+ public DateTime FinishTime { get; set; }
+ public SagaStateChangeStatus Status { get; set; }
+ public string? StateAfterChange { get; set; }
+ public string? InitiatingMessageJson { get; set; }
+ public string? OutgoingMessagesJson { get; set; }
+ public string? Endpoint { get; set; }
+ public DateTime CreatedOn { get; set; }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/FailedAuditImportConfiguration.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/FailedAuditImportConfiguration.cs
new file mode 100644
index 0000000000..21f1cca11f
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/FailedAuditImportConfiguration.cs
@@ -0,0 +1,16 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class FailedAuditImportConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("FailedAuditImports");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).IsRequired();
+ builder.Property(e => e.MessageJson).IsRequired();
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/KnownEndpointConfiguration.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/KnownEndpointConfiguration.cs
new file mode 100644
index 0000000000..de2253de2d
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/KnownEndpointConfiguration.cs
@@ -0,0 +1,21 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class KnownEndpointConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("KnownEndpoints");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).ValueGeneratedNever();
+ builder.Property(e => e.Name).IsRequired();
+ builder.Property(e => e.HostId).IsRequired();
+ builder.Property(e => e.Host).IsRequired();
+ builder.Property(e => e.LastSeen).IsRequired();
+
+ builder.HasIndex(e => e.LastSeen);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/KnownEndpointInsertOnlyConfiguration.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/KnownEndpointInsertOnlyConfiguration.cs
new file mode 100644
index 0000000000..e2d10ff6ab
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/KnownEndpointInsertOnlyConfiguration.cs
@@ -0,0 +1,23 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class KnownEndpointInsertOnlyConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("KnownEndpointsInsertOnly");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).ValueGeneratedOnAdd();
+ builder.Property(e => e.KnownEndpointId).IsRequired();
+ builder.Property(e => e.Name).IsRequired();
+ builder.Property(e => e.HostId).IsRequired();
+ builder.Property(e => e.Host).IsRequired();
+ builder.Property(e => e.LastSeen).IsRequired();
+
+ builder.HasIndex(e => e.LastSeen);
+ builder.HasIndex(e => e.KnownEndpointId);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/ProcessedMessageConfiguration.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/ProcessedMessageConfiguration.cs
new file mode 100644
index 0000000000..9ef6a92220
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/ProcessedMessageConfiguration.cs
@@ -0,0 +1,43 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class ProcessedMessageConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("ProcessedMessages");
+ builder.HasKey(e => new { e.Id, e.CreatedOn });
+ builder.Property(e => e.Id).ValueGeneratedOnAdd();
+ builder.Property(e => e.CreatedOn).IsRequired();
+ builder.Property(e => e.UniqueMessageId).HasMaxLength(200).IsRequired();
+
+ // JSON columns
+ builder.Property(e => e.HeadersJson).IsRequired();
+
+ // Full-text search column (combines header values + body text)
+ builder.Property(e => e.SearchableContent);
+
+ // Denormalized query fields
+ builder.Property(e => e.MessageId).HasMaxLength(200);
+ builder.Property(e => e.MessageType).HasMaxLength(500);
+ builder.Property(e => e.ConversationId).HasMaxLength(200);
+ builder.Property(e => e.ReceivingEndpointName).HasMaxLength(500);
+ builder.Property(e => e.BodyUrl).HasMaxLength(500);
+ builder.Property(e => e.TimeSent);
+ builder.Property(e => e.IsSystemMessage).IsRequired();
+ builder.Property(e => e.Status).IsRequired();
+ builder.Property(e => e.BodySize).IsRequired();
+ builder.Property(e => e.BodyNotStored).IsRequired();
+ builder.Property(e => e.CriticalTimeTicks);
+ builder.Property(e => e.ProcessingTimeTicks);
+ builder.Property(e => e.DeliveryTimeTicks);
+
+ builder.HasIndex(e => e.UniqueMessageId);
+ builder.HasIndex(e => e.ConversationId);
+ builder.HasIndex(e => e.MessageId);
+ builder.HasIndex(e => e.TimeSent);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/SagaSnapshotConfiguration.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/SagaSnapshotConfiguration.cs
new file mode 100644
index 0000000000..24f3e249b4
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/SagaSnapshotConfiguration.cs
@@ -0,0 +1,27 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class SagaSnapshotConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("SagaSnapshots");
+ builder.HasKey(e => new { e.Id, e.CreatedOn });
+ builder.Property(e => e.Id).ValueGeneratedOnAdd();
+ builder.Property(e => e.CreatedOn).IsRequired();
+ builder.Property(e => e.SagaId).IsRequired();
+ builder.Property(e => e.SagaType).IsRequired();
+ builder.Property(e => e.StartTime).IsRequired();
+ builder.Property(e => e.FinishTime);
+ builder.Property(e => e.Status).IsRequired();
+ builder.Property(e => e.StateAfterChange).IsRequired();
+ builder.Property(e => e.InitiatingMessageJson).IsRequired();
+ builder.Property(e => e.OutgoingMessagesJson).IsRequired();
+ builder.Property(e => e.Endpoint).IsRequired();
+
+ builder.HasIndex(e => e.SagaId);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/FullTextSearch/IAuditFullTextSearchProvider.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/FullTextSearch/IAuditFullTextSearchProvider.cs
new file mode 100644
index 0000000000..ef303ba399
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/FullTextSearch/IAuditFullTextSearchProvider.cs
@@ -0,0 +1,10 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.FullTextSearch;
+
+using Entities;
+
+public interface IAuditFullTextSearchProvider
+{
+ IQueryable ApplyFullTextSearch(
+ IQueryable query,
+ string searchTerms);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/AzureBlobBodyStoragePersistence.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/AzureBlobBodyStoragePersistence.cs
new file mode 100644
index 0000000000..ca75116a7f
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/AzureBlobBodyStoragePersistence.cs
@@ -0,0 +1,147 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+using System.Buffers;
+using System.IO.Compression;
+using Azure.Storage;
+using Azure.Storage.Blobs;
+using Azure.Storage.Blobs.Models;
+using ServiceControl.Audit.Persistence.Sql.Core.Abstractions;
+
+public class AzureBlobBodyStoragePersistence : IBodyStoragePersistence
+{
+ const string FormatVersion = "1";
+ readonly AuditSqlPersisterSettings settings;
+ readonly BlobContainerClient blobContainerClient;
+
+ public AzureBlobBodyStoragePersistence(AuditSqlPersisterSettings settings)
+ {
+ this.settings = settings;
+
+ var blobClient = new BlobServiceClient(settings.MessageBodyStorageConnectionString);
+ blobContainerClient = blobClient.GetBlobContainerClient("audit-bodies");
+ }
+
+ public async Task WriteBodyAsync(string bodyId, DateTime createdOn, ReadOnlyMemory body, string contentType, CancellationToken cancellationToken = default)
+ {
+ var datePrefix = createdOn.ToString("yyyy-MM-dd-HH");
+ var blob = blobContainerClient.GetBlobClient($"{datePrefix}/{bodyId}");
+ var shouldCompress = body.Length >= settings.MinBodySizeForCompression;
+
+ BinaryData data;
+ byte[]? rentedBuffer = null;
+
+ try
+ {
+ if (shouldCompress)
+ {
+ var maxCompressedSize = BrotliEncoder.GetMaxCompressedLength(body.Length);
+ rentedBuffer = ArrayPool.Shared.Rent(maxCompressedSize);
+
+ if (!BrotliEncoder.TryCompress(body.Span, rentedBuffer, out var bytesWritten, quality: 1, window: 22))
+ {
+ // Compression failed, fall back to uncompressed
+ data = BinaryData.FromBytes(body);
+ shouldCompress = false;
+ }
+ else
+ {
+ data = BinaryData.FromBytes(new ReadOnlyMemory(rentedBuffer, 0, bytesWritten));
+ }
+ }
+ else
+ {
+ data = BinaryData.FromBytes(body);
+ }
+
+ var options = new BlobUploadOptions
+ {
+ TransferValidation = new UploadTransferValidationOptions
+ {
+ ChecksumAlgorithm = StorageChecksumAlgorithm.Auto
+ },
+ Metadata = new Dictionary
+ {
+ { "FormatVersion", FormatVersion },
+ { "ContentType", Uri.EscapeDataString(contentType) },
+ { "BodySize", body.Length.ToString() },
+ { "IsCompressed", shouldCompress.ToString() }
+ }
+ };
+
+ await blob.UploadAsync(data, options, cancellationToken);
+ }
+ finally
+ {
+ if (rentedBuffer != null)
+ {
+ ArrayPool.Shared.Return(rentedBuffer);
+ }
+ }
+ }
+
+ public async Task ReadBodyAsync(string bodyId, DateTime createdOn, CancellationToken cancellationToken = default)
+ {
+ var datePrefix = createdOn.ToString("yyyy-MM-dd-HH");
+ var blob = blobContainerClient.GetBlobClient($"{datePrefix}/{bodyId}");
+
+ try
+ {
+ var response = await blob.DownloadContentAsync(cancellationToken);
+ var properties = response.Value;
+ var metadata = properties.Details.Metadata;
+
+ // Check format version
+ if (metadata.TryGetValue("FormatVersion", out var version) && version != FormatVersion)
+ {
+ throw new InvalidOperationException($"Unsupported blob format version: {version}");
+ }
+
+ var contentType = metadata.TryGetValue("ContentType", out var ct) ? Uri.UnescapeDataString(ct) : "application/octet-stream";
+ var bodySize = metadata.TryGetValue("BodySize", out var sizeStr) && int.TryParse(sizeStr, out var size) ? size : 0;
+ var isCompressed = metadata.TryGetValue("IsCompressed", out var compressedStr) && bool.TryParse(compressedStr, out var compressed) && compressed;
+ var etag = properties.Details.ETag.ToString();
+
+ Stream stream;
+ if (isCompressed)
+ {
+ var compressedData = properties.Content.ToMemory();
+ var decompressedBuffer = new byte[bodySize];
+
+ if (!BrotliDecoder.TryDecompress(compressedData.Span, decompressedBuffer, out var bytesWritten) || bytesWritten != bodySize)
+ {
+ throw new InvalidOperationException($"Failed to decompress body for {bodyId}");
+ }
+
+ stream = new MemoryStream(decompressedBuffer, writable: false);
+ }
+ else
+ {
+ stream = properties.Content.ToStream();
+ }
+
+ return new MessageBodyFileResult
+ {
+ Stream = stream,
+ ContentType = contentType,
+ BodySize = bodySize,
+ Etag = etag
+ };
+ }
+ catch (Azure.RequestFailedException ex) when (ex.Status == 404)
+ {
+ return null;
+ }
+ }
+
+ public Task DeleteBodiesForHour(DateTime hour, CancellationToken cancellationToken = default)
+ {
+ // var hourPrefix = hour.ToString("yyyy-MM-dd-HH") + "/";
+
+ // await foreach (var blobItem in blobContainerClient.GetBlobsAsync(BlobTraits.None, BlobStates.None, hourPrefix, cancellationToken))
+ // {
+ // await blobContainerClient.DeleteBlobIfExistsAsync(blobItem.Name, cancellationToken: cancellationToken);
+ // }
+
+ return Task.CompletedTask;
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/BodyStorageFetcher.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/BodyStorageFetcher.cs
new file mode 100644
index 0000000000..a54b4a6f46
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/BodyStorageFetcher.cs
@@ -0,0 +1,44 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Implementation;
+
+using Microsoft.EntityFrameworkCore;
+using ServiceControl.Audit.Auditing.BodyStorage;
+using ServiceControl.Audit.Persistence.Sql.Core.DbContexts;
+using ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+class BodyStorageFetcher(IBodyStoragePersistence storagePersistence, AuditDbContextBase dbContext) : IBodyStorage
+{
+ public async Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
+ {
+ throw new NotImplementedException();
+ }
+
+ public async Task TryFetch(string bodyId, CancellationToken cancellationToken)
+ {
+ // Look up CreatedOn from the database to locate the correct hourly folder
+ var createdOn = await dbContext.ProcessedMessages
+ .Where(m => m.UniqueMessageId == bodyId)
+ .Select(m => m.CreatedOn)
+ .FirstOrDefaultAsync(cancellationToken);
+
+ if (createdOn == default)
+ {
+ return new StreamResult { HasResult = false };
+ }
+
+ var result = await storagePersistence.ReadBodyAsync(bodyId, createdOn, cancellationToken);
+
+ if (result == null)
+ {
+ return new StreamResult { HasResult = false };
+ }
+
+ return new StreamResult
+ {
+ HasResult = true,
+ Stream = result.Stream,
+ ContentType = result.ContentType,
+ BodySize = result.BodySize,
+ Etag = result.Etag
+ };
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/EFAuditDataStore.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/EFAuditDataStore.cs
new file mode 100644
index 0000000000..8b805955b8
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/EFAuditDataStore.cs
@@ -0,0 +1,39 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Implementation;
+
+using ServiceControl.Audit.Auditing;
+using ServiceControl.Audit.Auditing.MessagesView;
+using ServiceControl.Audit.Infrastructure;
+using ServiceControl.Audit.Monitoring;
+using ServiceControl.SagaAudit;
+
+class EFAuditDataStore : IAuditDataStore
+{
+ static readonly QueryStatsInfo EmptyStats = new(string.Empty, 0);
+
+ public Task>> QueryKnownEndpoints(CancellationToken cancellationToken)
+ => Task.FromResult(new QueryResult>([], EmptyStats));
+
+ public Task> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken)
+ => Task.FromResult(QueryResult.Empty());
+
+ public Task>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default)
+ => Task.FromResult(new QueryResult>([], EmptyStats));
+
+ public Task>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default)
+ => Task.FromResult(new QueryResult>([], EmptyStats));
+
+ public Task>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default)
+ => Task.FromResult(new QueryResult>([], EmptyStats));
+
+ public Task>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default)
+ => Task.FromResult(new QueryResult>([], EmptyStats));
+
+ public Task>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
+ => Task.FromResult(new QueryResult>([], EmptyStats));
+
+ public Task GetMessageBody(string messageId, CancellationToken cancellationToken)
+ => Task.FromResult(MessageBodyView.NoContent());
+
+ public Task>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken)
+ => Task.FromResult(new QueryResult>([], EmptyStats));
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/EFFailedAuditStorage.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/EFFailedAuditStorage.cs
new file mode 100644
index 0000000000..fdc6e2f4b1
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/EFFailedAuditStorage.cs
@@ -0,0 +1,17 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Implementation;
+
+using ServiceControl.Audit.Auditing;
+
+class EFFailedAuditStorage : IFailedAuditStorage
+{
+ public Task SaveFailedAuditImport(FailedAuditImport message)
+ => Task.CompletedTask;
+
+ public Task ProcessFailedMessages(
+ Func, CancellationToken, Task> onMessage,
+ CancellationToken cancellationToken)
+ => Task.CompletedTask;
+
+ public Task GetFailedAuditsCount()
+ => Task.FromResult(0);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/FileSystemBodyStoragePersistence.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/FileSystemBodyStoragePersistence.cs
new file mode 100644
index 0000000000..f72e53a14f
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/FileSystemBodyStoragePersistence.cs
@@ -0,0 +1,181 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+using System.IO.Compression;
+using Abstractions;
+
+public class FileSystemBodyStoragePersistence(AuditSqlPersisterSettings settings) : IBodyStoragePersistence
+{
+ const int FormatVersion = 1;
+
+ public async Task WriteBodyAsync(
+ string bodyId,
+ DateTime createdOn,
+ ReadOnlyMemory body,
+ string contentType,
+ CancellationToken cancellationToken = default)
+ {
+ var dateFolder = createdOn.ToString("yyyy-MM-dd-HH");
+ var filePath = Path.Combine(settings.MessageBodyStoragePath, dateFolder, $"{bodyId}.body");
+
+ // Bodies are immutable - skip if file already exists
+ if (File.Exists(filePath))
+ {
+ return;
+ }
+
+ // Ensure directory exists
+ var directory = Path.GetDirectoryName(filePath);
+ if (!string.IsNullOrEmpty(directory) && !Directory.Exists(directory))
+ {
+ Directory.CreateDirectory(directory);
+ }
+
+ // Write to temp file first for atomic operation
+ var tempFilePath = filePath + ".tmp";
+
+ try
+ {
+ var fileStream = new FileStream(
+ tempFilePath,
+ FileMode.Create,
+ FileAccess.Write,
+ FileShare.None,
+ bufferSize: 4096,
+ useAsync: true);
+
+ await using (fileStream.ConfigureAwait(false))
+ {
+ using var writer = new BinaryWriter(fileStream, System.Text.Encoding.UTF8, leaveOpen: true);
+
+ var shouldCompress = body.Length >= settings.MinBodySizeForCompression;
+
+ // Write header
+ writer.Write(FormatVersion);
+ writer.Write(contentType);
+ writer.Write(body.Length); // Original uncompressed size
+ writer.Write(shouldCompress);
+ writer.Write(Guid.NewGuid().ToString()); // Generate ETag
+
+ // Flush the header before writing body
+ writer.Flush();
+
+ // Write body (compressed or not)
+ if (shouldCompress)
+ {
+ var brotliStream = new BrotliStream(fileStream, CompressionLevel.Fastest, leaveOpen: true);
+ await using (brotliStream.ConfigureAwait(false))
+ {
+ await brotliStream.WriteAsync(body, cancellationToken).ConfigureAwait(false);
+ }
+ }
+ else
+ {
+ await fileStream.WriteAsync(body, cancellationToken).ConfigureAwait(false);
+ }
+
+ await fileStream.FlushAsync(cancellationToken).ConfigureAwait(false);
+ }
+
+ // Atomic rename
+ File.Move(tempFilePath, filePath, overwrite: false);
+ }
+ catch
+ {
+ // Clean up temp file if it exists
+ if (File.Exists(tempFilePath))
+ {
+ try
+ {
+ File.Delete(tempFilePath);
+ }
+ catch
+ {
+ // Ignore cleanup errors
+ }
+ }
+ throw;
+ }
+ }
+
+ public Task ReadBodyAsync(string bodyId, DateTime createdOn, CancellationToken cancellationToken = default)
+ {
+ var dateFolder = createdOn.ToString("yyyy-MM-dd-HH");
+ var filePath = Path.Combine(settings.MessageBodyStoragePath, dateFolder, $"{bodyId}.body");
+
+ if (!File.Exists(filePath))
+ {
+ return Task.FromResult(null);
+ }
+
+ try
+ {
+ var fileStream = new FileStream(
+ filePath,
+ FileMode.Open,
+ FileAccess.Read,
+ FileShare.Read,
+ bufferSize: 4096,
+ useAsync: true);
+
+ var reader = new BinaryReader(fileStream, System.Text.Encoding.UTF8, leaveOpen: true);
+
+ // Read header
+ var formatVersion = reader.ReadInt32();
+ if (formatVersion != FormatVersion)
+ {
+ fileStream.Dispose();
+ throw new InvalidOperationException($"Unsupported body file format version: {formatVersion}");
+ }
+
+ var contentType = reader.ReadString();
+ var bodySize = reader.ReadInt32();
+ var isCompressed = reader.ReadBoolean();
+ var etag = reader.ReadString();
+
+ // Create appropriate stream wrapper for body data
+ Stream bodyStream = fileStream;
+ if (isCompressed)
+ {
+ bodyStream = new BrotliStream(fileStream, CompressionMode.Decompress, leaveOpen: false);
+ }
+
+ var result = new MessageBodyFileResult
+ {
+ Stream = bodyStream,
+ ContentType = contentType,
+ BodySize = bodySize,
+ Etag = etag
+ };
+
+ return Task.FromResult(result);
+ }
+ catch (FileNotFoundException)
+ {
+ return Task.FromResult(null);
+ }
+ catch (IOException ex)
+ {
+ throw new InvalidOperationException($"Failed to read body file for {bodyId}", ex);
+ }
+ }
+
+ public Task DeleteBodiesForHour(DateTime hour, CancellationToken cancellationToken = default)
+ {
+ var dateFolder = hour.ToString("yyyy-MM-dd-HH");
+ var directoryPath = Path.Combine(settings.MessageBodyStoragePath, dateFolder);
+
+ try
+ {
+ if (Directory.Exists(directoryPath))
+ {
+ Directory.Delete(directoryPath, recursive: true);
+ }
+ }
+ catch (DirectoryNotFoundException)
+ {
+ // Already gone
+ }
+
+ return Task.CompletedTask;
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/AuditIngestionUnitOfWork.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/AuditIngestionUnitOfWork.cs
new file mode 100644
index 0000000000..9a600651be
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/AuditIngestionUnitOfWork.cs
@@ -0,0 +1,240 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Implementation.UnitOfWork;
+
+using System.Net.Mime;
+using System.Text;
+using System.Text.Json;
+using Abstractions;
+using DbContexts;
+using Entities;
+using Infrastructure;
+using ServiceControl.Audit.Auditing;
+using ServiceControl.Audit.Monitoring;
+using ServiceControl.Audit.Persistence.Infrastructure;
+using ServiceControl.Audit.Persistence.Monitoring;
+using ServiceControl.Audit.Persistence.UnitOfWork;
+using ServiceControl.SagaAudit;
+
+class AuditIngestionUnitOfWork(
+ AuditDbContextBase dbContext,
+ IBodyStoragePersistence bodyPersistence,
+ AuditSqlPersisterSettings settings
+ )
+ : IAuditIngestionUnitOfWork
+{
+ readonly List bodyStorageTasks = [];
+ // Large object heap starts above 85000 bytes
+ const int LargeObjectHeapThreshold = 85_000;
+ static readonly Encoding Utf8 = new UTF8Encoding(true, true);
+
+ public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory body = default, CancellationToken cancellationToken = default)
+ {
+ var createdOn = TruncateToHour(DateTime.UtcNow);
+
+ var entity = new ProcessedMessageEntity
+ {
+ CreatedOn = createdOn,
+ UniqueMessageId = processedMessage.UniqueMessageId,
+ HeadersJson = JsonSerializer.Serialize(processedMessage.Headers, ProcessedMessageJsonContext.Default.DictionaryStringString),
+
+ // Denormalized fields
+ MessageId = GetMetadata(processedMessage.MessageMetadata, "MessageId"),
+ MessageType = GetMetadata(processedMessage.MessageMetadata, "MessageType"),
+ TimeSent = GetMetadata(processedMessage.MessageMetadata, "TimeSent"),
+ IsSystemMessage = GetMetadata(processedMessage.MessageMetadata, "IsSystemMessage"),
+ Status = (int)(GetMetadata(processedMessage.MessageMetadata, "IsRetried") ? MessageStatus.ResolvedSuccessfully : MessageStatus.Successful),
+ ConversationId = GetMetadata(processedMessage.MessageMetadata, "ConversationId"),
+
+ // Endpoint details
+ ReceivingEndpointName = GetEndpointName(processedMessage.MessageMetadata, "ReceivingEndpoint"),
+
+ // Performance metrics
+ CriticalTimeTicks = GetMetadata(processedMessage.MessageMetadata, "CriticalTime")?.Ticks,
+ ProcessingTimeTicks = GetMetadata(processedMessage.MessageMetadata, "ProcessingTime")?.Ticks,
+ DeliveryTimeTicks = GetMetadata(processedMessage.MessageMetadata, "DeliveryTime")?.Ticks,
+
+ // Full-text search content (header values + body text for single-column FTS indexing)
+ SearchableContent = settings.EnableFullTextSearchOnBodies ? BuildSearchableContent(processedMessage.Headers, body) : null,
+ BodySize = body.Length,
+ BodyUrl = body.IsEmpty ? null : $"/messages/{processedMessage.Id}/body",
+ BodyNotStored = !settings.StoreMessageBodiesOnDisk || body.Length > settings.MaxBodySizeToStore
+ };
+
+ dbContext.ProcessedMessages.Add(entity);
+
+ //Store body if below threshold and storage is enabled
+ if (settings.StoreMessageBodiesOnDisk && !body.IsEmpty && body.Length < settings.MaxBodySizeToStore)
+ {
+ var contentType = GetContentType(processedMessage.Headers, MediaTypeNames.Text.Plain);
+
+ // Queue body storage to run in parallel, awaited in DisposeAsync
+ bodyStorageTasks.Add(bodyPersistence.WriteBodyAsync(processedMessage.UniqueMessageId, createdOn, body, contentType, cancellationToken));
+ }
+ }
+
+ public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken = default)
+ {
+ var entity = new SagaSnapshotEntity
+ {
+ CreatedOn = TruncateToHour(DateTime.UtcNow),
+ SagaId = sagaSnapshot.SagaId,
+ SagaType = sagaSnapshot.SagaType,
+ StartTime = sagaSnapshot.StartTime,
+ FinishTime = sagaSnapshot.FinishTime,
+ Endpoint = sagaSnapshot.Endpoint,
+ Status = sagaSnapshot.Status,
+ InitiatingMessageJson = JsonSerializer.Serialize(sagaSnapshot.InitiatingMessage, SagaSnapshotJsonContext.Default.InitiatingMessage),
+ OutgoingMessagesJson = JsonSerializer.Serialize(sagaSnapshot.OutgoingMessages, SagaSnapshotJsonContext.Default.ListResultingMessage),
+ StateAfterChange = sagaSnapshot.StateAfterChange,
+ };
+
+ dbContext.SagaSnapshots.Add(entity);
+
+ return Task.CompletedTask;
+ }
+
+ public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken = default)
+ {
+ var entity = new KnownEndpointInsertOnlyEntity
+ {
+ KnownEndpointId = DeterministicGuid.MakeId(knownEndpoint.Name, knownEndpoint.HostId.ToString()),
+ Name = knownEndpoint.Name,
+ HostId = knownEndpoint.HostId,
+ Host = knownEndpoint.Host,
+ LastSeen = knownEndpoint.LastSeen
+ };
+
+ dbContext.KnownEndpointsInsertOnly.Add(entity);
+
+ return Task.CompletedTask;
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ try
+ {
+ // Wait for all body storage operations to complete first
+ await Task.WhenAll(bodyStorageTasks);
+ await dbContext.SaveChangesAsync();
+ }
+ finally
+ {
+ await dbContext.DisposeAsync();
+ }
+ }
+
+ static string GetContentType(IReadOnlyDictionary headers, string defaultContentType)
+ => headers.TryGetValue(Headers.ContentType, out var contentType) ? contentType : defaultContentType;
+
+ static T? GetMetadata(Dictionary metadata, string key)
+ {
+ if (metadata.TryGetValue(key, out var value))
+ {
+ if (value is T typedValue)
+ {
+ return typedValue;
+ }
+
+ // Handle JSON deserialized types
+ if (value is JsonElement jsonElement)
+ {
+ return DeserializeJsonElement(jsonElement);
+ }
+ }
+ return default;
+ }
+
+ static T? DeserializeJsonElement(JsonElement element)
+ {
+ try
+ {
+ return element.Deserialize(JsonSerializationOptions.Default);
+ }
+ catch
+ {
+ return default;
+ }
+ }
+
+ static string? GetEndpointName(Dictionary metadata, string key)
+ {
+ if (metadata.TryGetValue(key, out var value))
+ {
+ if (value is EndpointDetails endpoint)
+ {
+ return endpoint.Name;
+ }
+
+ if (value is JsonElement jsonElement)
+ {
+ try
+ {
+ var endpoint2 = jsonElement.Deserialize(JsonSerializationOptions.Default);
+ return endpoint2?.Name;
+ }
+ catch
+ {
+ return null;
+ }
+ }
+ }
+ return null;
+ }
+
+ static string BuildSearchableContent(Dictionary headers, ReadOnlyMemory body)
+ {
+ // Combine header values (not keys) with body text for FTS indexing
+ var headerValues = string.Join(" ", headers.Values);
+
+ var bodyString = GetBodyAsText(headers, body);
+ if (string.IsNullOrWhiteSpace(bodyString))
+ {
+ return headerValues;
+ }
+
+ return headerValues + " " + bodyString;
+ }
+
+ static string? GetBodyAsText(Dictionary headers, ReadOnlyMemory body)
+ {
+ if (body.IsEmpty)
+ {
+ return null;
+ }
+
+ var avoidsLargeObjectHeap = body.Length < LargeObjectHeapThreshold;
+ var isBinary = IsBinaryContent(headers);
+
+ if (avoidsLargeObjectHeap && !isBinary)
+ {
+ try
+ {
+ var bodyString = Utf8.GetString(body.Span);
+ if (!string.IsNullOrWhiteSpace(bodyString))
+ {
+ return bodyString;
+ }
+ }
+ catch
+ {
+ // If it won't decode to text, don't index it
+ }
+ }
+
+ return null;
+ }
+
+ static bool IsBinaryContent(Dictionary headers)
+ {
+ if (headers.TryGetValue(Headers.ContentType, out var contentType))
+ {
+ return contentType.Contains("octet-stream") ||
+ contentType.Contains("application/x-") ||
+ contentType.Contains("image/") ||
+ contentType.Contains("audio/") ||
+ contentType.Contains("video/");
+ }
+ return false;
+ }
+
+ static DateTime TruncateToHour(DateTime dt) => new(dt.Year, dt.Month, dt.Day, dt.Hour, 0, 0, dt.Kind);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/AuditIngestionUnitOfWorkFactory.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/AuditIngestionUnitOfWorkFactory.cs
new file mode 100644
index 0000000000..6a9b8737f4
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/AuditIngestionUnitOfWorkFactory.cs
@@ -0,0 +1,25 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Implementation.UnitOfWork;
+
+using Abstractions;
+using DbContexts;
+using Infrastructure;
+using Microsoft.Extensions.DependencyInjection;
+using ServiceControl.Audit.Persistence.UnitOfWork;
+
+class AuditIngestionUnitOfWorkFactory(
+ IServiceProvider serviceProvider,
+ MinimumRequiredStorageState storageState,
+ IBodyStoragePersistence storagePersistence)
+ : IAuditIngestionUnitOfWorkFactory
+{
+ public ValueTask StartNew(int batchSize, CancellationToken cancellationToken)
+ {
+ var scope = serviceProvider.CreateScope();
+ var dbContext = scope.ServiceProvider.GetRequiredService();
+ var settings = scope.ServiceProvider.GetRequiredService();
+ var unitOfWork = new AuditIngestionUnitOfWork(dbContext, storagePersistence, settings);
+ return ValueTask.FromResult(unitOfWork);
+ }
+
+ public bool CanIngestMore() => storageState.CanIngestMore;
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/ProcessedMessageJsonContext.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/ProcessedMessageJsonContext.cs
new file mode 100644
index 0000000000..e790b614d6
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/ProcessedMessageJsonContext.cs
@@ -0,0 +1,11 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Implementation.UnitOfWork;
+
+using System.Text.Json.Serialization;
+
+[JsonSourceGenerationOptions(
+ PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase,
+ WriteIndented = false)]
+[JsonSerializable(typeof(Dictionary))]
+partial class ProcessedMessageJsonContext : JsonSerializerContext
+{
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/SagaSnapshotJsonContext.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/SagaSnapshotJsonContext.cs
new file mode 100644
index 0000000000..6dbac3c2c2
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/SagaSnapshotJsonContext.cs
@@ -0,0 +1,13 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Implementation.UnitOfWork;
+
+using System.Text.Json.Serialization;
+using ServiceControl.SagaAudit;
+
+[JsonSourceGenerationOptions(
+ PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase,
+ WriteIndented = false)]
+[JsonSerializable(typeof(InitiatingMessage))]
+[JsonSerializable(typeof(List))]
+partial class SagaSnapshotJsonContext : JsonSerializerContext
+{
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/IBodyStoragePersistence.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/IBodyStoragePersistence.cs
new file mode 100644
index 0000000000..f0832e428e
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/IBodyStoragePersistence.cs
@@ -0,0 +1,8 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+public interface IBodyStoragePersistence
+{
+ Task WriteBodyAsync(string bodyId, DateTime createdOn, ReadOnlyMemory body, string contentType, CancellationToken cancellationToken = default);
+ Task ReadBodyAsync(string bodyId, DateTime createdOn, CancellationToken cancellationToken = default);
+ Task DeleteBodiesForHour(DateTime hour, CancellationToken cancellationToken = default);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/IPartitionManager.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/IPartitionManager.cs
new file mode 100644
index 0000000000..6d69c3994e
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/IPartitionManager.cs
@@ -0,0 +1,21 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+using DbContexts;
+
+public interface IPartitionManager
+{
+ ///
+ /// Creates hourly partitions from through + .
+ ///
+ Task EnsurePartitionsExist(AuditDbContextBase dbContext, DateTime currentHour, int hoursAhead, CancellationToken ct);
+
+ ///
+ /// Drops the partition for the specified hour for both ProcessedMessages and SagaSnapshots.
+ ///
+ Task DropPartition(AuditDbContextBase dbContext, DateTime partitionHour, CancellationToken ct);
+
+ ///
+ /// Returns hour-precision timestamps of all partitions older than .
+ ///
+ Task> GetExpiredPartitions(AuditDbContextBase dbContext, DateTime cutoff, CancellationToken ct);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/InsertOnlyTableReconciler.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/InsertOnlyTableReconciler.cs
new file mode 100644
index 0000000000..f226a513d3
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/InsertOnlyTableReconciler.cs
@@ -0,0 +1,67 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+using DbContexts;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+public abstract class InsertOnlyTableReconciler(
+ ILogger logger,
+ TimeProvider timeProvider,
+ IServiceScopeFactory serviceScopeFactory,
+ string serviceName) : BackgroundService
+ where TInsertOnly : class
+ where TTarget : class
+{
+ protected const int BatchSize = 1000;
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ logger.LogInformation("Starting {ServiceName}", serviceName);
+
+ try
+ {
+ await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
+
+ using PeriodicTimer timer = new(TimeSpan.FromSeconds(30), timeProvider);
+
+ do
+ {
+ try
+ {
+ await Reconcile(stoppingToken);
+ }
+ catch (Exception ex) when (ex is not OperationCanceledException)
+ {
+ logger.LogError(ex, "Error during {ServiceName} reconciliation", serviceName);
+ }
+ } while (await timer.WaitForNextTickAsync(stoppingToken));
+ }
+ catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
+ {
+ logger.LogInformation("Stopping {ServiceName}", serviceName);
+ }
+ }
+
+ async Task Reconcile(CancellationToken stoppingToken)
+ {
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ using var scope = serviceScopeFactory.CreateScope();
+ var dbContext = scope.ServiceProvider.GetRequiredService();
+
+ await using var transaction = await dbContext.Database.BeginTransactionAsync(stoppingToken);
+ var rowsAffected = await ReconcileBatch(dbContext, stoppingToken);
+ await transaction.CommitAsync(stoppingToken);
+
+ if (rowsAffected < BatchSize)
+ {
+ break;
+ }
+
+ await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken);
+ }
+ }
+
+ protected abstract Task ReconcileBatch(AuditDbContextBase dbContext, CancellationToken stoppingToken);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/JsonSerializationOptions.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/JsonSerializationOptions.cs
new file mode 100644
index 0000000000..5e6269310f
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/JsonSerializationOptions.cs
@@ -0,0 +1,12 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+using System.Text.Json;
+
+static class JsonSerializationOptions
+{
+ public static readonly JsonSerializerOptions Default = new()
+ {
+ PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
+ WriteIndented = false
+ };
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/MessageBodyFileResult.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/MessageBodyFileResult.cs
new file mode 100644
index 0000000000..65cf7b6597
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/MessageBodyFileResult.cs
@@ -0,0 +1,9 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+public class MessageBodyFileResult
+{
+ public Stream Stream { get; set; } = null!;
+ public string ContentType { get; set; } = null!;
+ public int BodySize { get; set; }
+ public string Etag { get; set; } = null!;
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/RetentionCleaner.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/RetentionCleaner.cs
new file mode 100644
index 0000000000..56a5d5b2ab
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/RetentionCleaner.cs
@@ -0,0 +1,110 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+using System.Data.Common;
+using System.Diagnostics;
+using Abstractions;
+using DbContexts;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+public abstract class RetentionCleaner(
+ ILogger logger,
+ TimeProvider timeProvider,
+ IServiceScopeFactory serviceScopeFactory,
+ AuditSqlPersisterSettings settings,
+ IBodyStoragePersistence bodyPersistence,
+ IPartitionManager partitionManager,
+ RetentionMetrics metrics) : BackgroundService
+{
+ const int HoursAhead = 6;
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ logger.LogInformation("Starting {ServiceName}", nameof(RetentionCleaner));
+
+ try
+ {
+ await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken);
+
+ using PeriodicTimer timer = new(TimeSpan.FromHours(1), timeProvider);
+
+ do
+ {
+ try
+ {
+ await Clean(stoppingToken);
+ }
+ catch (Exception ex) when (ex is not OperationCanceledException)
+ {
+ logger.LogError(ex, "Failed to run retention cleaner");
+ }
+ } while (await timer.WaitForNextTickAsync(stoppingToken));
+ }
+ catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
+ {
+ logger.LogInformation("Stopping {ServiceName}", nameof(RetentionCleaner));
+ }
+ }
+
+ async Task Clean(CancellationToken stoppingToken)
+ {
+ // Use a dedicated connection for the distributed lock so it is not affected
+ // by connection drops or resets on the main DbContext during cleanup operations
+ await using var lockConnection = CreateConnection();
+ await lockConnection.OpenAsync(stoppingToken);
+
+ using var scope = serviceScopeFactory.CreateScope();
+ var dbContext = scope.ServiceProvider.GetRequiredService();
+
+ var stopwatch = Stopwatch.StartNew();
+ // Round up to whole hours since partitions are hourly
+ var retentionPeriod = TimeSpan.FromHours(Math.Ceiling(settings.AuditRetentionPeriod.TotalHours));
+ var cutoff = timeProvider.GetUtcNow().UtcDateTime - retentionPeriod;
+ var now = timeProvider.GetUtcNow().UtcDateTime;
+
+ using var cycleMetrics = metrics.BeginCleanupCycle();
+
+ if (!await TryAcquireLock(lockConnection, stoppingToken))
+ {
+ logger.LogDebug("Another instance is running retention cleanup, skipping this cycle");
+ metrics.RecordLockSkipped();
+ return;
+ }
+
+ try
+ {
+ // Ensure partitions exist for upcoming hours
+ await partitionManager.EnsurePartitionsExist(dbContext, now, HoursAhead, stoppingToken);
+
+ // Find and drop expired partitions
+ var expiredPartitions = await partitionManager.GetExpiredPartitions(dbContext, cutoff, stoppingToken);
+
+ foreach (var hour in expiredPartitions)
+ {
+ // Delete body storage for this hour first
+ await bodyPersistence.DeleteBodiesForHour(hour, stoppingToken);
+
+ // Drop the database partition
+ await partitionManager.DropPartition(dbContext, hour, stoppingToken);
+
+ metrics.RecordPartitionDropped();
+
+ logger.LogInformation("Dropped partition for {Hour}", hour.ToString("yyyy-MM-dd HH:00"));
+ }
+
+ cycleMetrics.Complete();
+
+ logger.LogInformation("Retention cleanup dropped {Partitions} partition(s) in {Elapsed}",
+ expiredPartitions.Count, stopwatch.Elapsed.ToString(@"hh\:mm\:ss"));
+ }
+ finally
+ {
+ await ReleaseLock(lockConnection, stoppingToken);
+ }
+ }
+
+ protected abstract DbConnection CreateConnection();
+ protected abstract Task TryAcquireLock(DbConnection lockConnection, CancellationToken stoppingToken);
+ protected abstract Task ReleaseLock(DbConnection lockConnection, CancellationToken stoppingToken);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/RetentionMetrics.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/RetentionMetrics.cs
new file mode 100644
index 0000000000..01e0e74f9c
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/RetentionMetrics.cs
@@ -0,0 +1,78 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+using System.Diagnostics;
+using System.Diagnostics.Metrics;
+
+public class RetentionMetrics
+{
+ public const string MeterName = "Particular.ServiceControl.Audit";
+
+ public static readonly string CleanupDurationInstrumentName = $"{InstrumentPrefix}.cleanup_duration";
+ public static readonly string PartitionsDroppedInstrumentName = $"{InstrumentPrefix}.partitions_dropped_total";
+
+ public RetentionMetrics(IMeterFactory meterFactory)
+ {
+ var meter = meterFactory.Create(MeterName, MeterVersion);
+
+ cleanupDuration = meter.CreateHistogram(CleanupDurationInstrumentName, unit: "s", description: "Retention cleanup cycle duration");
+ partitionsDropped = meter.CreateCounter(PartitionsDroppedInstrumentName, description: "Total partitions dropped by retention cleanup");
+ consecutiveFailureGauge = meter.CreateObservableGauge($"{InstrumentPrefix}.consecutive_failures_total", () => consecutiveFailures, description: "Consecutive retention cleanup failures");
+ lockSkippedCounter = meter.CreateCounter($"{InstrumentPrefix}.lock_skipped_total", description: "Number of times cleanup was skipped due to another instance holding the lock");
+ }
+
+ public CleanupCycleMetrics BeginCleanupCycle() => new(cleanupDuration, RecordCycleOutcome);
+
+ public void RecordPartitionDropped() => partitionsDropped.Add(1);
+
+ public void RecordLockSkipped() => lockSkippedCounter.Add(1);
+
+ void RecordCycleOutcome(bool success)
+ {
+ if (success)
+ {
+ consecutiveFailures = 0;
+ }
+ else
+ {
+ consecutiveFailures++;
+ }
+ }
+
+ long consecutiveFailures;
+
+ readonly Histogram cleanupDuration;
+ readonly Counter partitionsDropped;
+#pragma warning disable IDE0052
+ readonly ObservableGauge consecutiveFailureGauge;
+#pragma warning restore IDE0052
+ readonly Counter lockSkippedCounter;
+
+ const string MeterVersion = "0.1.0";
+ const string InstrumentPrefix = "sc.audit.retention";
+}
+
+public class CleanupCycleMetrics : IDisposable
+{
+ readonly Histogram cleanupDuration;
+ readonly Action recordOutcome;
+ readonly Stopwatch stopwatch = Stopwatch.StartNew();
+
+ bool completed;
+
+ internal CleanupCycleMetrics(Histogram cleanupDuration, Action recordOutcome)
+ {
+ this.cleanupDuration = cleanupDuration;
+ this.recordOutcome = recordOutcome;
+ }
+
+ public void Complete() => completed = true;
+
+ public void Dispose()
+ {
+ var result = completed ? "success" : "failed";
+ var tags = new TagList { { "result", result } };
+
+ cleanupDuration.Record(stopwatch.Elapsed.TotalSeconds, tags);
+ recordOutcome(completed);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/SequentialGuidGenerator.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/SequentialGuidGenerator.cs
new file mode 100644
index 0000000000..0bfbccfd4a
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/SequentialGuidGenerator.cs
@@ -0,0 +1,51 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+///
+/// Generates sequential GUIDs for database primary keys to minimize page fragmentation
+/// and improve insert performance while maintaining security benefits of GUIDs.
+///
+///
+/// This implementation creates time-ordered GUIDs similar to .NET 9's Guid.CreateVersion7()
+/// but compatible with .NET 8. The GUIDs are ordered by timestamp to reduce B-tree page splits
+/// in clustered indexes, which significantly improves insert performance compared to random GUIDs.
+///
+/// Benefits:
+/// - Database agnostic (works with SQL Server, PostgreSQL, MySQL, SQLite)
+/// - Sequential ordering reduces page fragmentation
+/// - Better insert performance than random GUIDs
+/// - Can easily migrate to Guid.CreateVersion7() when upgrading to .NET 9+
+/// - No external dependencies
+///
+/// Security:
+/// - Still cryptographically secure (uses Guid.NewGuid() as base)
+/// - Not guessable (unlike sequential integers)
+/// - Safe to expose in APIs
+///
+public static class SequentialGuidGenerator
+{
+ ///
+ /// Generate a sequential GUID with timestamp-based ordering for optimal database performance.
+ ///
+ /// A new GUID with sequential characteristics.
+ public static Guid NewSequentialGuid()
+ {
+ var guidBytes = Guid.NewGuid().ToByteArray();
+ var now = DateTime.UtcNow;
+
+ // Get timestamp in milliseconds since Unix epoch (similar to Version 7 GUIDs)
+ var timestamp = (long)(now - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalMilliseconds;
+ var timestampBytes = BitConverter.GetBytes(timestamp);
+
+ // Reverse if little-endian to get big-endian byte order for proper sorting
+ if (BitConverter.IsLittleEndian)
+ {
+ Array.Reverse(timestampBytes);
+ }
+
+ // Replace last 6 bytes with timestamp for sequential ordering
+ // This placement works well with SQL Server's GUID comparison semantics
+ Array.Copy(timestampBytes, 2, guidBytes, 10, 6);
+
+ return new Guid(guidBytes);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/ServiceControl.Audit.Persistence.Sql.Core.csproj b/src/ServiceControl.Audit.Persistence.Sql.Core/ServiceControl.Audit.Persistence.Sql.Core.csproj
new file mode 100644
index 0000000000..573a686553
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/ServiceControl.Audit.Persistence.Sql.Core.csproj
@@ -0,0 +1,21 @@
+
+
+
+ net10.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/.editorconfig b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/.editorconfig
new file mode 100644
index 0000000000..fc68ac3228
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/.editorconfig
@@ -0,0 +1,9 @@
+[*.cs]
+
+# Justification: ServiceControl app has no synchronization context
+dotnet_diagnostic.CA2007.severity = none
+
+# Disable style rules for auto-generated EF migrations
+[Migrations/**.cs]
+dotnet_diagnostic.IDE0065.severity = none
+generated_code = true
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/KnownEndpointsReconciler.cs b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/KnownEndpointsReconciler.cs
new file mode 100644
index 0000000000..542bbd826c
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/KnownEndpointsReconciler.cs
@@ -0,0 +1,46 @@
+namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Infrastructure;
+
+using Core.DbContexts;
+using Core.Entities;
+using Core.Infrastructure;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+
+class KnownEndpointsReconciler(
+ ILogger logger,
+ TimeProvider timeProvider,
+ IServiceScopeFactory serviceScopeFactory)
+ : InsertOnlyTableReconciler(
+ logger, timeProvider, serviceScopeFactory, nameof(KnownEndpointsReconciler))
+{
+ protected override async Task ReconcileBatch(AuditDbContextBase dbContext, CancellationToken stoppingToken)
+ {
+ var sql = @"
+ WITH lock_check AS (
+ SELECT pg_try_advisory_xact_lock(hashtext('known_endpoints_sync')) AS acquired
+ ),
+ deleted AS (
+ DELETE FROM known_endpoints_insert_only
+ WHERE (SELECT acquired FROM lock_check)
+ AND ctid IN (
+ SELECT ctid FROM known_endpoints_insert_only LIMIT @batchSize
+ )
+ RETURNING known_endpoint_id, name, host_id, host, last_seen
+ ),
+ aggregated AS (
+ SELECT DISTINCT ON (known_endpoint_id) known_endpoint_id, name, host_id, host, last_seen
+ FROM deleted
+ ORDER BY known_endpoint_id, last_seen DESC
+ )
+ INSERT INTO known_endpoints (id, name, host_id, host, last_seen)
+ SELECT known_endpoint_id, name, host_id, host, last_seen
+ FROM aggregated
+ ON CONFLICT (id) DO UPDATE SET
+ last_seen = GREATEST(known_endpoints.last_seen, EXCLUDED.last_seen);
+ ";
+
+ var rowsAffected = await dbContext.Database.ExecuteSqlRawAsync(sql, [new Npgsql.NpgsqlParameter("@batchSize", BatchSize)], stoppingToken);
+ return rowsAffected;
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/PostgreSqlPartitionManager.cs b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/PostgreSqlPartitionManager.cs
new file mode 100644
index 0000000000..f9107de18b
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/PostgreSqlPartitionManager.cs
@@ -0,0 +1,89 @@
+namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Infrastructure;
+
+using Microsoft.EntityFrameworkCore;
+using ServiceControl.Audit.Persistence.Sql.Core.DbContexts;
+using ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+// Partition/table names cannot be parameterized in SQL; all values come from internal constants and date formatting
+#pragma warning disable EF1002, EF1003
+public class PostgreSqlPartitionManager : IPartitionManager
+{
+ static readonly (string ParentTable, string Prefix)[] PartitionedTables =
+ [
+ ("processed_messages", "processed_messages"),
+ ("saga_snapshots", "saga_snapshots")
+ ];
+
+ public async Task EnsurePartitionsExist(AuditDbContextBase dbContext, DateTime currentHour, int hoursAhead, CancellationToken ct)
+ {
+ var truncatedHour = TruncateToHour(currentHour);
+ var targetHour = truncatedHour.AddHours(hoursAhead);
+
+ for (var hour = truncatedHour; hour <= targetHour; hour = hour.AddHours(1))
+ {
+ var nextHour = hour.AddHours(1);
+ var hourSuffix = hour.ToString("yyyyMMddHH");
+ var hourStr = hour.ToString("yyyy-MM-dd HH:00:00");
+ var nextHourStr = nextHour.ToString("yyyy-MM-dd HH:00:00");
+
+ foreach (var (parentTable, prefix) in PartitionedTables)
+ {
+ var partitionName = prefix + "_" + hourSuffix;
+
+ await dbContext.Database.ExecuteSqlRawAsync(
+ "CREATE TABLE IF NOT EXISTS " + partitionName +
+ " PARTITION OF " + parentTable +
+ " FOR VALUES FROM ('" + hourStr + "') TO ('" + nextHourStr + "')", ct);
+ }
+ }
+ }
+
+ public async Task DropPartition(AuditDbContextBase dbContext, DateTime partitionHour, CancellationToken ct)
+ {
+ var hourSuffix = TruncateToHour(partitionHour).ToString("yyyyMMddHH");
+
+ foreach (var (parentTable, prefix) in PartitionedTables)
+ {
+ var partitionName = prefix + "_" + hourSuffix;
+
+ await dbContext.Database.ExecuteSqlRawAsync(
+ "ALTER TABLE " + parentTable + " DETACH PARTITION " + partitionName, ct);
+
+ await dbContext.Database.ExecuteSqlRawAsync(
+ "DROP TABLE " + partitionName, ct);
+ }
+ }
+
+ public async Task> GetExpiredPartitions(AuditDbContextBase dbContext, DateTime cutoff, CancellationToken ct)
+ {
+ var truncatedCutoff = TruncateToHour(cutoff);
+
+ var partitionNames = await dbContext.Database
+ .SqlQueryRaw(
+ "SELECT c.relname AS Value " +
+ "FROM pg_class c " +
+ "INNER JOIN pg_inherits i ON c.oid = i.inhrelid " +
+ "INNER JOIN pg_class parent ON i.inhparent = parent.oid " +
+ "WHERE parent.relname = 'processed_messages' " +
+ "AND c.relkind = 'r' " +
+ "ORDER BY c.relname")
+ .ToListAsync(ct);
+
+ var result = new List();
+
+ foreach (var name in partitionNames)
+ {
+ // Parse hour from partition name: processed_messages_yyyyMMddHH
+ var datePart = name.Replace("processed_messages_", "");
+ if (DateTime.TryParseExact(datePart, "yyyyMMddHH", null, System.Globalization.DateTimeStyles.None, out var hour)
+ && hour < truncatedCutoff)
+ {
+ result.Add(hour);
+ }
+ }
+
+ return result;
+ }
+
+ static DateTime TruncateToHour(DateTime dt) => new(dt.Year, dt.Month, dt.Day, dt.Hour, 0, 0, dt.Kind);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/RetentionCleaner.cs b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/RetentionCleaner.cs
new file mode 100644
index 0000000000..1689afe28a
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/RetentionCleaner.cs
@@ -0,0 +1,40 @@
+namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Infrastructure;
+
+using System.Data.Common;
+using Core.Abstractions;
+using Core.Infrastructure;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Npgsql;
+
+class RetentionCleaner(
+ ILogger logger,
+ TimeProvider timeProvider,
+ IServiceScopeFactory serviceScopeFactory,
+ AuditSqlPersisterSettings settings,
+ IBodyStoragePersistence bodyPersistence,
+ IPartitionManager partitionManager,
+ RetentionMetrics metrics)
+ : Core.Infrastructure.RetentionCleaner(logger, timeProvider, serviceScopeFactory, settings, bodyPersistence, partitionManager, metrics)
+{
+ readonly string connectionString = settings.ConnectionString;
+
+ protected override DbConnection CreateConnection() => new NpgsqlConnection(connectionString);
+
+ protected override async Task TryAcquireLock(DbConnection lockConnection, CancellationToken stoppingToken)
+ {
+ await using var cmd = lockConnection.CreateCommand();
+ cmd.CommandText = "SELECT pg_try_advisory_lock(hashtext('retention_cleaner'))";
+
+ var result = await cmd.ExecuteScalarAsync(stoppingToken);
+ return result is true;
+ }
+
+ protected override async Task ReleaseLock(DbConnection lockConnection, CancellationToken stoppingToken)
+ {
+ await using var cmd = lockConnection.CreateCommand();
+ cmd.CommandText = "SELECT pg_advisory_unlock(hashtext('retention_cleaner'))";
+
+ await cmd.ExecuteNonQueryAsync(stoppingToken);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031455_InitialCreate.Designer.cs b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031455_InitialCreate.Designer.cs
new file mode 100644
index 0000000000..d0457f3931
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031455_InitialCreate.Designer.cs
@@ -0,0 +1,281 @@
+//
+using System;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Infrastructure;
+using Microsoft.EntityFrameworkCore.Migrations;
+using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
+using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
+using ServiceControl.Audit.Persistence.Sql.PostgreSQL;
+
+#nullable disable
+
+namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Migrations
+{
+ [DbContext(typeof(PostgreSqlAuditDbContext))]
+ [Migration("20260214031455_InitialCreate")]
+ partial class InitialCreate
+ {
+ ///
+ protected override void BuildTargetModel(ModelBuilder modelBuilder)
+ {
+#pragma warning disable 612, 618
+ modelBuilder
+ .HasAnnotation("ProductVersion", "10.0.3")
+ .HasAnnotation("Relational:MaxIdentifierLength", 63);
+
+ NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.FailedAuditImportEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid")
+ .HasColumnName("id");
+
+ b.Property("ExceptionInfo")
+ .HasColumnType("text")
+ .HasColumnName("exception_info");
+
+ b.Property("MessageJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("message_json");
+
+ b.HasKey("Id");
+
+ b.ToTable("failed_audit_imports", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.KnownEndpointEntity", b =>
+ {
+ b.Property("Id")
+ .HasColumnType("uuid")
+ .HasColumnName("id");
+
+ b.Property("Host")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("host");
+
+ b.Property("HostId")
+ .HasColumnType("uuid")
+ .HasColumnName("host_id");
+
+ b.Property("LastSeen")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("last_seen");
+
+ b.Property("Name")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("name");
+
+ b.HasKey("Id");
+
+ b.HasIndex("LastSeen");
+
+ b.ToTable("known_endpoints", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.KnownEndpointInsertOnlyEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("bigint")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("Host")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("host");
+
+ b.Property("HostId")
+ .HasColumnType("uuid")
+ .HasColumnName("host_id");
+
+ b.Property("KnownEndpointId")
+ .HasColumnType("uuid")
+ .HasColumnName("known_endpoint_id");
+
+ b.Property("LastSeen")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("last_seen");
+
+ b.Property("Name")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("name");
+
+ b.HasKey("Id");
+
+ b.HasIndex("KnownEndpointId");
+
+ b.HasIndex("LastSeen");
+
+ b.ToTable("known_endpoints_insert_only", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.ProcessedMessageEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("bigint")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("CreatedOn")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("created_on");
+
+ b.Property("BodyNotStored")
+ .HasColumnType("boolean")
+ .HasColumnName("body_not_stored");
+
+ b.Property("BodySize")
+ .HasColumnType("integer")
+ .HasColumnName("body_size");
+
+ b.Property("BodyUrl")
+ .HasMaxLength(500)
+ .HasColumnType("character varying(500)")
+ .HasColumnName("body_url");
+
+ b.Property("ConversationId")
+ .HasMaxLength(200)
+ .HasColumnType("character varying(200)")
+ .HasColumnName("conversation_id");
+
+ b.Property("CriticalTimeTicks")
+ .HasColumnType("bigint")
+ .HasColumnName("critical_time_ticks");
+
+ b.Property("DeliveryTimeTicks")
+ .HasColumnType("bigint")
+ .HasColumnName("delivery_time_ticks");
+
+ b.Property("HeadersJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("headers_json");
+
+ b.Property("IsSystemMessage")
+ .HasColumnType("boolean")
+ .HasColumnName("is_system_message");
+
+ b.Property("MessageId")
+ .HasMaxLength(200)
+ .HasColumnType("character varying(200)")
+ .HasColumnName("message_id");
+
+ b.Property("MessageType")
+ .HasMaxLength(500)
+ .HasColumnType("character varying(500)")
+ .HasColumnName("message_type");
+
+ b.Property("ProcessingTimeTicks")
+ .HasColumnType("bigint")
+ .HasColumnName("processing_time_ticks");
+
+ b.Property("ReceivingEndpointName")
+ .HasMaxLength(500)
+ .HasColumnType("character varying(500)")
+ .HasColumnName("receiving_endpoint_name");
+
+ b.Property("SearchableContent")
+ .HasColumnType("text")
+ .HasColumnName("searchable_content");
+
+ b.Property("Status")
+ .HasColumnType("integer")
+ .HasColumnName("status");
+
+ b.Property("TimeSent")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("time_sent");
+
+ b.Property("UniqueMessageId")
+ .IsRequired()
+ .HasMaxLength(200)
+ .HasColumnType("character varying(200)")
+ .HasColumnName("unique_message_id");
+
+ b.HasKey("Id", "CreatedOn");
+
+ b.HasIndex("ConversationId");
+
+ b.HasIndex("MessageId");
+
+ b.HasIndex("TimeSent");
+
+ b.HasIndex("UniqueMessageId");
+
+ b.ToTable("processed_messages", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.SagaSnapshotEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("bigint")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("CreatedOn")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("created_on");
+
+ b.Property("Endpoint")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("endpoint");
+
+ b.Property("FinishTime")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("finish_time");
+
+ b.Property("InitiatingMessageJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("initiating_message_json");
+
+ b.Property("OutgoingMessagesJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("outgoing_messages_json");
+
+ b.Property("SagaId")
+ .HasColumnType("uuid")
+ .HasColumnName("saga_id");
+
+ b.Property("SagaType")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("saga_type");
+
+ b.Property("StartTime")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("start_time");
+
+ b.Property("StateAfterChange")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("state_after_change");
+
+ b.Property("Status")
+ .HasColumnType("integer")
+ .HasColumnName("status");
+
+ b.HasKey("Id", "CreatedOn");
+
+ b.HasIndex("SagaId");
+
+ b.ToTable("saga_snapshots", (string)null);
+ });
+#pragma warning restore 612, 618
+ }
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031455_InitialCreate.cs b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031455_InitialCreate.cs
new file mode 100644
index 0000000000..3780d85378
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031455_InitialCreate.cs
@@ -0,0 +1,171 @@
+using System;
+using Microsoft.EntityFrameworkCore.Migrations;
+using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
+
+#nullable disable
+
+namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Migrations
+{
+ ///
+ public partial class InitialCreate : Migration
+ {
+ ///
+ protected override void Up(MigrationBuilder migrationBuilder)
+ {
+ migrationBuilder.CreateTable(
+ name: "failed_audit_imports",
+ columns: table => new
+ {
+ id = table.Column(type: "uuid", nullable: false),
+ message_json = table.Column(type: "text", nullable: false),
+ exception_info = table.Column(type: "text", nullable: true)
+ },
+ constraints: table =>
+ {
+ table.PrimaryKey("PK_failed_audit_imports", x => x.id);
+ });
+
+ migrationBuilder.CreateTable(
+ name: "known_endpoints",
+ columns: table => new
+ {
+ id = table.Column(type: "uuid", nullable: false),
+ name = table.Column(type: "text", nullable: false),
+ host_id = table.Column(type: "uuid", nullable: false),
+ host = table.Column(type: "text", nullable: false),
+ last_seen = table.Column(type: "timestamp with time zone", nullable: false)
+ },
+ constraints: table =>
+ {
+ table.PrimaryKey("PK_known_endpoints", x => x.id);
+ });
+
+ migrationBuilder.CreateTable(
+ name: "known_endpoints_insert_only",
+ columns: table => new
+ {
+ id = table.Column(type: "bigint", nullable: false)
+ .Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn),
+ known_endpoint_id = table.Column(type: "uuid", nullable: false),
+ name = table.Column(type: "text", nullable: false),
+ host_id = table.Column(type: "uuid", nullable: false),
+ host = table.Column(type: "text", nullable: false),
+ last_seen = table.Column(type: "timestamp with time zone", nullable: false)
+ },
+ constraints: table =>
+ {
+ table.PrimaryKey("PK_known_endpoints_insert_only", x => x.id);
+ });
+
+ migrationBuilder.CreateTable(
+ name: "processed_messages",
+ columns: table => new
+ {
+ id = table.Column(type: "bigint", nullable: false)
+ .Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn),
+ created_on = table.Column(type: "timestamp with time zone", nullable: false),
+ unique_message_id = table.Column(type: "character varying(200)", maxLength: 200, nullable: false),
+ headers_json = table.Column(type: "text", nullable: false),
+ searchable_content = table.Column(type: "text", nullable: true),
+ message_id = table.Column(type: "character varying(200)", maxLength: 200, nullable: true),
+ message_type = table.Column(type: "character varying(500)", maxLength: 500, nullable: true),
+ time_sent = table.Column(type: "timestamp with time zone", nullable: true),
+ is_system_message = table.Column(type: "boolean", nullable: false),
+ status = table.Column(type: "integer", nullable: false),
+ conversation_id = table.Column(type: "character varying(200)", maxLength: 200, nullable: true),
+ receiving_endpoint_name = table.Column(type: "character varying(500)", maxLength: 500, nullable: true),
+ critical_time_ticks = table.Column(type: "bigint", nullable: true),
+ processing_time_ticks = table.Column(type: "bigint", nullable: true),
+ delivery_time_ticks = table.Column(type: "bigint", nullable: true),
+ body_size = table.Column(type: "integer", nullable: false),
+ body_url = table.Column(type: "character varying(500)", maxLength: 500, nullable: true),
+ body_not_stored = table.Column(type: "boolean", nullable: false)
+ },
+ constraints: table =>
+ {
+ table.PrimaryKey("PK_processed_messages", x => new { x.id, x.created_on });
+ });
+
+ migrationBuilder.CreateTable(
+ name: "saga_snapshots",
+ columns: table => new
+ {
+ id = table.Column(type: "bigint", nullable: false)
+ .Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn),
+ created_on = table.Column(type: "timestamp with time zone", nullable: false),
+ saga_id = table.Column(type: "uuid", nullable: false),
+ saga_type = table.Column(type: "text", nullable: false),
+ start_time = table.Column(type: "timestamp with time zone", nullable: false),
+ finish_time = table.Column(type: "timestamp with time zone", nullable: false),
+ status = table.Column(type: "integer", nullable: false),
+ state_after_change = table.Column(type: "text", nullable: false),
+ initiating_message_json = table.Column(type: "text", nullable: false),
+ outgoing_messages_json = table.Column(type: "text", nullable: false),
+ endpoint = table.Column(type: "text", nullable: false)
+ },
+ constraints: table =>
+ {
+ table.PrimaryKey("PK_saga_snapshots", x => new { x.id, x.created_on });
+ });
+
+ migrationBuilder.CreateIndex(
+ name: "IX_known_endpoints_last_seen",
+ table: "known_endpoints",
+ column: "last_seen");
+
+ migrationBuilder.CreateIndex(
+ name: "IX_known_endpoints_insert_only_known_endpoint_id",
+ table: "known_endpoints_insert_only",
+ column: "known_endpoint_id");
+
+ migrationBuilder.CreateIndex(
+ name: "IX_known_endpoints_insert_only_last_seen",
+ table: "known_endpoints_insert_only",
+ column: "last_seen");
+
+ migrationBuilder.CreateIndex(
+ name: "IX_processed_messages_conversation_id",
+ table: "processed_messages",
+ column: "conversation_id");
+
+ migrationBuilder.CreateIndex(
+ name: "IX_processed_messages_message_id",
+ table: "processed_messages",
+ column: "message_id");
+
+ migrationBuilder.CreateIndex(
+ name: "IX_processed_messages_time_sent",
+ table: "processed_messages",
+ column: "time_sent");
+
+ migrationBuilder.CreateIndex(
+ name: "IX_processed_messages_unique_message_id",
+ table: "processed_messages",
+ column: "unique_message_id");
+
+ migrationBuilder.CreateIndex(
+ name: "IX_saga_snapshots_saga_id",
+ table: "saga_snapshots",
+ column: "saga_id");
+ }
+
+ ///
+ protected override void Down(MigrationBuilder migrationBuilder)
+ {
+ migrationBuilder.DropTable(
+ name: "failed_audit_imports");
+
+ migrationBuilder.DropTable(
+ name: "known_endpoints");
+
+ migrationBuilder.DropTable(
+ name: "known_endpoints_insert_only");
+
+ migrationBuilder.DropTable(
+ name: "processed_messages");
+
+ migrationBuilder.DropTable(
+ name: "saga_snapshots");
+ }
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031511_AddPartitioning.Designer.cs b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031511_AddPartitioning.Designer.cs
new file mode 100644
index 0000000000..cd7c4bcb6a
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031511_AddPartitioning.Designer.cs
@@ -0,0 +1,281 @@
+//
+using System;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Infrastructure;
+using Microsoft.EntityFrameworkCore.Migrations;
+using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
+using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
+using ServiceControl.Audit.Persistence.Sql.PostgreSQL;
+
+#nullable disable
+
+namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Migrations
+{
+ [DbContext(typeof(PostgreSqlAuditDbContext))]
+ [Migration("20260214031511_AddPartitioning")]
+ partial class AddPartitioning
+ {
+ ///
+ protected override void BuildTargetModel(ModelBuilder modelBuilder)
+ {
+#pragma warning disable 612, 618
+ modelBuilder
+ .HasAnnotation("ProductVersion", "10.0.3")
+ .HasAnnotation("Relational:MaxIdentifierLength", 63);
+
+ NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.FailedAuditImportEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid")
+ .HasColumnName("id");
+
+ b.Property("ExceptionInfo")
+ .HasColumnType("text")
+ .HasColumnName("exception_info");
+
+ b.Property("MessageJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("message_json");
+
+ b.HasKey("Id");
+
+ b.ToTable("failed_audit_imports", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.KnownEndpointEntity", b =>
+ {
+ b.Property("Id")
+ .HasColumnType("uuid")
+ .HasColumnName("id");
+
+ b.Property("Host")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("host");
+
+ b.Property("HostId")
+ .HasColumnType("uuid")
+ .HasColumnName("host_id");
+
+ b.Property("LastSeen")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("last_seen");
+
+ b.Property("Name")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("name");
+
+ b.HasKey("Id");
+
+ b.HasIndex("LastSeen");
+
+ b.ToTable("known_endpoints", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.KnownEndpointInsertOnlyEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("bigint")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("Host")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("host");
+
+ b.Property("HostId")
+ .HasColumnType("uuid")
+ .HasColumnName("host_id");
+
+ b.Property("KnownEndpointId")
+ .HasColumnType("uuid")
+ .HasColumnName("known_endpoint_id");
+
+ b.Property("LastSeen")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("last_seen");
+
+ b.Property("Name")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("name");
+
+ b.HasKey("Id");
+
+ b.HasIndex("KnownEndpointId");
+
+ b.HasIndex("LastSeen");
+
+ b.ToTable("known_endpoints_insert_only", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.ProcessedMessageEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("bigint")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("CreatedOn")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("created_on");
+
+ b.Property("BodyNotStored")
+ .HasColumnType("boolean")
+ .HasColumnName("body_not_stored");
+
+ b.Property("BodySize")
+ .HasColumnType("integer")
+ .HasColumnName("body_size");
+
+ b.Property("BodyUrl")
+ .HasMaxLength(500)
+ .HasColumnType("character varying(500)")
+ .HasColumnName("body_url");
+
+ b.Property("ConversationId")
+ .HasMaxLength(200)
+ .HasColumnType("character varying(200)")
+ .HasColumnName("conversation_id");
+
+ b.Property("CriticalTimeTicks")
+ .HasColumnType("bigint")
+ .HasColumnName("critical_time_ticks");
+
+ b.Property("DeliveryTimeTicks")
+ .HasColumnType("bigint")
+ .HasColumnName("delivery_time_ticks");
+
+ b.Property("HeadersJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("headers_json");
+
+ b.Property("IsSystemMessage")
+ .HasColumnType("boolean")
+ .HasColumnName("is_system_message");
+
+ b.Property("MessageId")
+ .HasMaxLength(200)
+ .HasColumnType("character varying(200)")
+ .HasColumnName("message_id");
+
+ b.Property("MessageType")
+ .HasMaxLength(500)
+ .HasColumnType("character varying(500)")
+ .HasColumnName("message_type");
+
+ b.Property("ProcessingTimeTicks")
+ .HasColumnType("bigint")
+ .HasColumnName("processing_time_ticks");
+
+ b.Property("ReceivingEndpointName")
+ .HasMaxLength(500)
+ .HasColumnType("character varying(500)")
+ .HasColumnName("receiving_endpoint_name");
+
+ b.Property("SearchableContent")
+ .HasColumnType("text")
+ .HasColumnName("searchable_content");
+
+ b.Property("Status")
+ .HasColumnType("integer")
+ .HasColumnName("status");
+
+ b.Property("TimeSent")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("time_sent");
+
+ b.Property("UniqueMessageId")
+ .IsRequired()
+ .HasMaxLength(200)
+ .HasColumnType("character varying(200)")
+ .HasColumnName("unique_message_id");
+
+ b.HasKey("Id", "CreatedOn");
+
+ b.HasIndex("ConversationId");
+
+ b.HasIndex("MessageId");
+
+ b.HasIndex("TimeSent");
+
+ b.HasIndex("UniqueMessageId");
+
+ b.ToTable("processed_messages", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.SagaSnapshotEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("bigint")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("CreatedOn")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("created_on");
+
+ b.Property("Endpoint")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("endpoint");
+
+ b.Property("FinishTime")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("finish_time");
+
+ b.Property("InitiatingMessageJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("initiating_message_json");
+
+ b.Property("OutgoingMessagesJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("outgoing_messages_json");
+
+ b.Property("SagaId")
+ .HasColumnType("uuid")
+ .HasColumnName("saga_id");
+
+ b.Property("SagaType")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("saga_type");
+
+ b.Property("StartTime")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("start_time");
+
+ b.Property("StateAfterChange")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("state_after_change");
+
+ b.Property("Status")
+ .HasColumnType("integer")
+ .HasColumnName("status");
+
+ b.HasKey("Id", "CreatedOn");
+
+ b.HasIndex("SagaId");
+
+ b.ToTable("saga_snapshots", (string)null);
+ });
+#pragma warning restore 612, 618
+ }
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031511_AddPartitioning.cs b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031511_AddPartitioning.cs
new file mode 100644
index 0000000000..d30c186b5b
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031511_AddPartitioning.cs
@@ -0,0 +1,40 @@
+using Microsoft.EntityFrameworkCore.Migrations;
+
+#nullable disable
+
+namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Migrations
+{
+ ///
+ public partial class AddPartitioning : Migration
+ {
+ static readonly string[] Tables = ["processed_messages", "saga_snapshots"];
+
+ ///
+ protected override void Up(MigrationBuilder migrationBuilder)
+ {
+ foreach (var table in Tables)
+ {
+ migrationBuilder.Sql($"""
+ CREATE TABLE {table}_tmp (LIKE {table} INCLUDING ALL);
+ DROP TABLE {table};
+ CREATE TABLE {table} (LIKE {table}_tmp INCLUDING ALL) PARTITION BY RANGE (created_on);
+ DROP TABLE {table}_tmp;
+ """);
+ }
+ }
+
+ ///
+ protected override void Down(MigrationBuilder migrationBuilder)
+ {
+ foreach (var table in Tables)
+ {
+ migrationBuilder.Sql($"""
+ CREATE TABLE {table}_tmp (LIKE {table} INCLUDING ALL);
+ DROP TABLE {table};
+ CREATE TABLE {table} (LIKE {table}_tmp INCLUDING ALL);
+ DROP TABLE {table}_tmp;
+ """);
+ }
+ }
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031534_AddFullTextSearch.Designer.cs b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031534_AddFullTextSearch.Designer.cs
new file mode 100644
index 0000000000..6f6a3a343d
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031534_AddFullTextSearch.Designer.cs
@@ -0,0 +1,281 @@
+//
+using System;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Infrastructure;
+using Microsoft.EntityFrameworkCore.Migrations;
+using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
+using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
+using ServiceControl.Audit.Persistence.Sql.PostgreSQL;
+
+#nullable disable
+
+namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Migrations
+{
+ [DbContext(typeof(PostgreSqlAuditDbContext))]
+ [Migration("20260214031534_AddFullTextSearch")]
+ partial class AddFullTextSearch
+ {
+ ///
+ protected override void BuildTargetModel(ModelBuilder modelBuilder)
+ {
+#pragma warning disable 612, 618
+ modelBuilder
+ .HasAnnotation("ProductVersion", "10.0.3")
+ .HasAnnotation("Relational:MaxIdentifierLength", 63);
+
+ NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.FailedAuditImportEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid")
+ .HasColumnName("id");
+
+ b.Property("ExceptionInfo")
+ .HasColumnType("text")
+ .HasColumnName("exception_info");
+
+ b.Property("MessageJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("message_json");
+
+ b.HasKey("Id");
+
+ b.ToTable("failed_audit_imports", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.KnownEndpointEntity", b =>
+ {
+ b.Property("Id")
+ .HasColumnType("uuid")
+ .HasColumnName("id");
+
+ b.Property("Host")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("host");
+
+ b.Property("HostId")
+ .HasColumnType("uuid")
+ .HasColumnName("host_id");
+
+ b.Property("LastSeen")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("last_seen");
+
+ b.Property