Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,4 @@ src/scaffolding.config
*.sln.iml

# Visual Studio Code
.vscode
.vscode
3 changes: 3 additions & 0 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<PackageVersion Include="Autofac" Version="9.0.0" />
<PackageVersion Include="AWSSDK.CloudWatch" Version="4.0.7" />
<PackageVersion Include="Azure.Identity" Version="1.17.1" />
<PackageVersion Include="Azure.Storage.Blobs" Version="12.27.0" />
<PackageVersion Include="Azure.Monitor.Query.Metrics" Version="1.0.0" />
<PackageVersion Include="Azure.ResourceManager.ServiceBus" Version="1.1.0" />
<PackageVersion Include="ByteSize" Version="2.1.2" />
Expand All @@ -19,6 +20,7 @@
<PackageVersion Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="10.0.3" />
<PackageVersion Include="Microsoft.AspNetCore.Authentication.OpenIdConnect" Version="10.0.3" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="10.0.3" />
<PackageVersion Include="MongoDB.Driver" Version="3.6.0" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR.Client" Version="10.0.3" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="10.0.3" />
<PackageVersion Include="Microsoft.Extensions.DependencyModel" Version="10.0.3" />
Expand Down Expand Up @@ -80,6 +82,7 @@
<PackageVersion Include="System.Reactive" Version="6.1.0" />
<PackageVersion Include="System.Reflection.MetadataLoadContext" Version="10.0.3" />
<PackageVersion Include="System.ServiceProcess.ServiceController" Version="10.0.3" />
<PackageVersion Include="Testcontainers.MongoDb" Version="4.3.0" />
<PackageVersion Include="Validar.Fody" Version="1.9.0" />
<PackageVersion Include="Yarp.ReverseProxy" Version="2.3.0" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Auditing.BodyStorage;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Microsoft.Extensions.Logging;

class AzureBlobBodyStorage(
Channel<BodyWriteItem> channel,
MongoSettings settings,
ILogger<AzureBlobBodyStorage> logger)
: BatchedBodyStorageWriter<BodyWriteItem>(channel, settings, logger), IBodyStorage, IBodyWriter
{
const int MaxRetries = 3;
readonly BlobContainerClient containerClient = new(settings.BlobConnectionString, settings.BlobContainerName);

protected override string WriterName => "Azure Blob body storage writer";

// Initialization

public async Task Initialize(CancellationToken cancellationToken)
{
_ = await containerClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
logger.LogInformation("Azure Blob body storage initialized. Container: {ContainerName}", containerClient.Name);
}

// IBodyWriter

public bool IsEnabled => true;

public async ValueTask WriteAsync(string id, string contentType, ReadOnlyMemory<byte> body, DateTime expiresAt, CancellationToken cancellationToken)
{
await WriteToChannelAsync(new BodyWriteItem
{
Id = id,
ContentType = contentType,
BodySize = body.Length,
Body = body.ToArray(),
ExpiresAt = expiresAt
}, cancellationToken).ConfigureAwait(false);
}

// IBodyStorage

public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
=> Task.CompletedTask;

public async Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken)
{
var blobClient = containerClient.GetBlobClient(bodyId);

try
{
var response = await blobClient.DownloadStreamingAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
var details = response.Value.Details;

var bodySize = 0;
if (details.Metadata.TryGetValue("bodySize", out var bodySizeStr))
{
_ = int.TryParse(bodySizeStr, out bodySize);
}

return new StreamResult
{
HasResult = true,
Stream = response.Value.Content,
ContentType = details.ContentType ?? "text/plain",
BodySize = bodySize,
Etag = details.ETag.ToString()
};
}
catch (RequestFailedException ex) when (ex.Status == 404)
{
return new StreamResult { HasResult = false };
}
}

// BatchedBodyStorageWriter

protected override async Task FlushBatchAsync(List<BodyWriteItem> batch, CancellationToken cancellationToken)
{
var uploadTasks = batch.Select(entry => UploadBlobWithRetry(entry, cancellationToken));
await Task.WhenAll(uploadTasks).ConfigureAwait(false);
}

async Task UploadBlobWithRetry(BodyWriteItem entry, CancellationToken cancellationToken)
{
var blobClient = containerClient.GetBlobClient(entry.Id);

for (var attempt = 1; attempt <= MaxRetries; attempt++)
{
try
{
using var stream = new MemoryStream(entry.Body);
var options = new BlobUploadOptions
{
HttpHeaders = new BlobHttpHeaders { ContentType = entry.ContentType.Trim() },
Metadata = new Dictionary<string, string>
{
["messageId"] = entry.Id.Trim(),
["bodySize"] = entry.BodySize.ToString(),
["mongoExpiresAt"] = entry.ExpiresAt.ToString("O")
}
};
_ = await blobClient.UploadAsync(stream, options, cancellationToken).ConfigureAwait(false);
return;
}
catch (Exception ex) when (attempt < MaxRetries && !cancellationToken.IsCancellationRequested)
{
var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt - 1));
logger.LogWarning(ex, "Failed to upload blob {BlobId} (attempt {Attempt}/{MaxRetries}), retrying in {Delay}s",
entry.Id, attempt, MaxRetries, delay.TotalSeconds);
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to upload blob {BlobId} after {MaxRetries} attempts", entry.Id, MaxRetries);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

abstract class BatchedBodyStorageWriter<TEntry>(
Channel<TEntry> channel,
MongoSettings settings,
ILogger logger)
: BackgroundService
{
readonly int BatchSize = settings.BodyWriterBatchSize;
readonly int ParallelWriters = settings.BodyWriterParallelWriters;
readonly TimeSpan BatchTimeout = settings.BodyWriterBatchTimeout;
const int BacklogWarningThreshold = 5_000;
long totalWritten;
DateTime lastBacklogWarning;
DateTime lastBackpressureWarning;

readonly Channel<List<TEntry>> batchChannel = Channel.CreateBounded<List<TEntry>>(
new BoundedChannelOptions(settings.BodyWriterParallelWriters * 2)
{
SingleReader = false,
SingleWriter = true,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.Wait
});

protected ChannelWriter<TEntry> WriteChannel => channel.Writer;

protected async ValueTask WriteToChannelAsync(TEntry entry, CancellationToken cancellationToken)
{
if (channel.Writer.TryWrite(entry))
{
return;
}

if (DateTime.UtcNow - lastBackpressureWarning > TimeSpan.FromSeconds(10))
{
lastBackpressureWarning = DateTime.UtcNow;
logger.LogWarning("{WriterName} channel is full (backlog: {Backlog}). Body writes are blocking ingestion until the writer catches up",
WriterName, channel.Reader.Count);
}

await channel.Writer.WriteAsync(entry, cancellationToken).ConfigureAwait(false);
}

protected abstract string WriterName { get; }

protected abstract Task FlushBatchAsync(List<TEntry> batch, CancellationToken cancellationToken);

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("{WriterName} started ({Writers} writers, batch size {BatchSize})", WriterName, ParallelWriters, BatchSize);

var assemblerTask = Task.Run(() => BatchAssemblerLoop(stoppingToken), CancellationToken.None);

var writerTasks = new Task[ParallelWriters];
for (var i = 0; i < ParallelWriters; i++)
{
var writerId = i;
writerTasks[i] = Task.Run(() => WriterLoop(writerId, stoppingToken), CancellationToken.None);
}

try
{
await Task.WhenAll(writerTasks.Append(assemblerTask)).ConfigureAwait(false);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Expected during shutdown
}

logger.LogInformation("{WriterName} stopped", WriterName);
}

async Task BatchAssemblerLoop(CancellationToken stoppingToken)
{
var batch = new List<TEntry>(BatchSize);

try
{
while (await channel.Reader.WaitToReadAsync(stoppingToken).ConfigureAwait(false))
{
while (batch.Count < BatchSize && channel.Reader.TryRead(out var entry))
{
batch.Add(entry);
}

if (batch.Count > 0 && batch.Count < BatchSize)
{
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
timeoutCts.CancelAfter(BatchTimeout);
try
{
while (batch.Count < BatchSize)
{
if (!await channel.Reader.WaitToReadAsync(timeoutCts.Token).ConfigureAwait(false))
{
break;
}

while (batch.Count < BatchSize && channel.Reader.TryRead(out var entry))
{
batch.Add(entry);
}
}
}
catch (OperationCanceledException) when (!stoppingToken.IsCancellationRequested)
{
// Timeout expired - dispatch partial batch
}
}

if (batch.Count > 0)
{
await batchChannel.Writer.WriteAsync(batch, stoppingToken).ConfigureAwait(false);
batch = new List<TEntry>(BatchSize);
}
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Shutting down - drain channel into remaining batches
while (channel.Reader.TryRead(out var entry))
{
batch.Add(entry);

if (batch.Count >= BatchSize)
{
await batchChannel.Writer.WriteAsync(batch, CancellationToken.None).ConfigureAwait(false);
batch = new List<TEntry>(BatchSize);
}
}

if (batch.Count > 0)
{
await batchChannel.Writer.WriteAsync(batch, CancellationToken.None).ConfigureAwait(false);
}
}
finally
{
batchChannel.Writer.Complete();
}
}

async Task WriterLoop(int writerId, CancellationToken stoppingToken)
{
logger.LogDebug("{WriterName} writer {WriterId} started", WriterName, writerId);

try
{
// Use CancellationToken.None for FlushBatch so in-flight writes complete
// during shutdown. ReadAllAsync(stoppingToken) controls when we stop
// accepting new batches.
await foreach (var batch in batchChannel.Reader.ReadAllAsync(stoppingToken).ConfigureAwait(false))
{
await FlushBatchAsync(batch, CancellationToken.None).ConfigureAwait(false);
ReportBatchWritten(batch.Count);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Expected during shutdown
}

// Drain any remaining batches after the assembler completes the channel
while (batchChannel.Reader.TryRead(out var batch))
{
try
{
await FlushBatchAsync(batch, CancellationToken.None).ConfigureAwait(false);
ReportBatchWritten(batch.Count);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to flush {Count} entries during shutdown", batch.Count);
}
}

logger.LogDebug("{WriterName} writer {WriterId} stopped", WriterName, writerId);
}

void ReportBatchWritten(int batchCount)
{
totalWritten += batchCount;
var backlog = channel.Reader.Count;
logger.LogDebug("{WriterName}: batch={BatchCount}, total={TotalWritten}, backlog={Backlog}",
WriterName, batchCount, totalWritten, backlog);
if (backlog > BacklogWarningThreshold && DateTime.UtcNow - lastBacklogWarning > TimeSpan.FromSeconds(10))
{
lastBacklogWarning = DateTime.UtcNow;
logger.LogWarning("{WriterName} is not keeping up with ingestion. Channel backlog: {Backlog} items", WriterName, backlog);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System;

readonly struct BodyWriteItem
{
public required string Id { get; init; }
public required string ContentType { get; init; }
public required int BodySize { get; init; }
public required byte[] Body { get; init; }
public string TextBody { get; init; }
public required DateTime ExpiresAt { get; init; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System;
using System.Threading;
using System.Threading.Tasks;

interface IBodyWriter
{
bool IsEnabled { get; }

ValueTask WriteAsync(string id, string contentType, ReadOnlyMemory<byte> body, DateTime expiresAt, CancellationToken cancellationToken);
}
}
Loading
Loading