Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion infrastructure/modules/eventpub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
| Name | Description |
|------|-------------|
| <a name="output_s3_bucket_event_cache"></a> [s3\_bucket\_event\_cache](#output\_s3\_bucket\_event\_cache) | S3 Bucket ARN and Name for event cache |
| <a name="output_sns_topic"></a> [sns\_topic](#output\_sns\_topic) | SNS Topic ARN and Name |
| <a name="output_sns_topic"></a> [sns\_topic](#output\_sns\_topic) | SNS Topic ARNs and Names |
<!-- vale on -->
<!-- markdownlint-enable -->
<!-- END_TF_DOCS -->
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
resource "aws_cloudwatch_log_group" "sns_delivery_logging_failure" {
count = var.enable_sns_delivery_logging ? 1 : 0
for_each = var.enable_event_cache ? local.sns_topics : {}

# SNS doesn't allow specifying a log group and is derived as: sns/${region}/${account_id}/${name_of_sns_topic}/Failure
# (for failure logs)
name = "sns/${var.region}/${var.aws_account_id}/${local.csi}/Failure"
name = "sns/${var.region}/${var.aws_account_id}/${local.csi}${each.key == "data" ? "-data" : "-control"}/Failure"
kms_key_id = var.kms_key_arn
retention_in_days = var.log_retention_in_days
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
resource "aws_cloudwatch_log_group" "sns_delivery_logging_success" {
count = var.enable_sns_delivery_logging ? 1 : 0
for_each = var.enable_event_cache ? local.sns_topics : {}

# SNS doesn't allow specifying a log group and is derived as: sns/${region}/${account_id}/${name_of_sns_topic}/Failure
# (for failure logs)
name = "sns/${var.region}/${var.aws_account_id}/${local.csi}"
name = "sns/${var.region}/${var.aws_account_id}/${local.csi}${each.key == "data" ? "-data" : "-control"}"
kms_key_id = var.kms_key_arn
retention_in_days = var.log_retention_in_days
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ data "aws_iam_policy_document" "sns_delivery_logging_cloudwatch" {
"logs:PutRetentionPolicy",
]

resources = [
aws_cloudwatch_log_group.sns_delivery_logging_success[0].arn,
"${aws_cloudwatch_log_group.sns_delivery_logging_success[0].arn}:log-stream:*",
aws_cloudwatch_log_group.sns_delivery_logging_failure[0].arn,
"${aws_cloudwatch_log_group.sns_delivery_logging_failure[0].arn}:log-stream:*",
]
resources = concat(
[for arn in values(aws_cloudwatch_log_group.sns_delivery_logging_success) : arn.arn],
[for arn in values(aws_cloudwatch_log_group.sns_delivery_logging_success) : "${arn.arn}:log-stream:*"],
[for arn in values(aws_cloudwatch_log_group.sns_delivery_logging_failure) : arn.arn],
[for arn in values(aws_cloudwatch_log_group.sns_delivery_logging_failure) : "${arn.arn}:log-stream:*"]
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,34 @@ const invalidCloudEvent = {
data: {}
};

const DATA_TOPIC_ARN = 'arn:aws:sns:eu-west-2:123456789012:data-topic';
const CONTROL_TOPIC_ARN = 'arn:aws:sns:eu-west-2:123456789012:control-topic';
const DATA_PLANE_EVENT_BUS_ARN = 'arn:aws:events:eu-west-2:123456789012:event-bus/data';
const CONTROL_PLANE_EVENT_BUS_ARN = 'arn:aws:events:eu-west-2:123456789012:event-bus/control';
const DLQ_URL = 'https://sqs.eu-west-2.amazonaws.com/123456789012/dlq';

const snsEvent = {
Records: [
{ Sns: { Message: JSON.stringify(validCloudEvent) } }
{ Sns: { Message: JSON.stringify(validCloudEvent), TopicArn: DATA_TOPIC_ARN } }
]
};

const snsEventInvalid = {
Records: [
{ Sns: { Message: JSON.stringify(invalidCloudEvent) } }
{ Sns: { Message: JSON.stringify(invalidCloudEvent), TopicArn: DATA_TOPIC_ARN } }
]
};

describe('SNS to EventBridge Lambda', () => {
beforeEach(() => {
eventBridgeMock.reset();
sqsMock.reset();
process.env.DATA_TOPIC_ARN = DATA_TOPIC_ARN;
process.env.CONTROL_TOPIC_ARN = CONTROL_TOPIC_ARN;
process.env.DATA_PLANE_EVENT_BUS_ARN = DATA_PLANE_EVENT_BUS_ARN;
process.env.CONTROL_PLANE_EVENT_BUS_ARN = CONTROL_PLANE_EVENT_BUS_ARN;
process.env.DLQ_URL = DLQ_URL;
process.env.THROTTLE_DELAY_MS = '0';
});

test('Valid event is sent to the correct EventBridge bus', async () => {
Expand All @@ -57,6 +69,9 @@ describe('SNS to EventBridge Lambda', () => {
await handler(snsEvent);

expect(eventBridgeMock.calls()).toHaveLength(1);
// Check correct bus
const callInput = eventBridgeMock.calls()[0].args[0].input;
expect(callInput.Entries[0].EventBusName).toBe(DATA_PLANE_EVENT_BUS_ARN);
});

test('Invalid event is sent to DLQ', async () => {
Expand All @@ -65,9 +80,10 @@ describe('SNS to EventBridge Lambda', () => {
await handler(snsEventInvalid);

expect(sqsMock.calls()).toHaveLength(1);
const callInput = sqsMock.calls()[0].args[0].input;
expect(callInput.QueueUrl).toBe(DLQ_URL);
});


test('Retries on EventBridge failure and sends failed events to DLQ', async () => {
eventBridgeMock
.on(PutEventsCommand)
Expand All @@ -85,10 +101,32 @@ describe('SNS to EventBridge Lambda', () => {
process.env.THROTTLE_DELAY_MS = '500';
jest.useFakeTimers();

eventBridgeMock.on(PutEventsCommand).resolves({ FailedEntryCount: 0, Entries: [{}] });

const handlerPromise = handler(snsEvent);
expect(setTimeout).toHaveBeenCalledWith(expect.any(Function), 500);
jest.advanceTimersByTime(500);
await handlerPromise;

jest.useRealTimers();
});

test('Routes control events to control event bus', async () => {
const controlEvent = {
...validCloudEvent,
type: "control"
};
const snsEventControl = {
Records: [
{ Sns: { Message: JSON.stringify(controlEvent), TopicArn: CONTROL_TOPIC_ARN } }
]
};
eventBridgeMock.on(PutEventsCommand).resolves({ FailedEntryCount: 0, Entries: [{}] });

await handler(snsEventControl);

expect(eventBridgeMock.calls()).toHaveLength(1);
const callInput = eventBridgeMock.calls()[0].args[0].input;
expect(callInput.Entries[0].EventBusName).toBe(CONTROL_PLANE_EVENT_BUS_ARN);
});
});
23 changes: 18 additions & 5 deletions infrastructure/modules/eventpub/lambda/eventpub/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ const THROTTLE_DELAY_MS = parseInt(process.env.THROTTLE_DELAY_MS || '0', 10);
const MAX_RETRIES = 3;
const EVENTBRIDGE_MAX_BATCH_SIZE = 10;

const DATA_TOPIC_ARN = process.env.DATA_TOPIC_ARN;
const CONTROL_TOPIC_ARN = process.env.CONTROL_TOPIC_ARN;

function validateEvent(event) {
// CloudEvents v1.0 schema validation (supplier-status)
const requiredFields = [
Expand Down Expand Up @@ -107,16 +110,26 @@ exports.handler = async (snsEvent) => {
await new Promise(res => setTimeout(res, THROTTLE_DELAY_MS));
}

const records = snsEvent.Records.map(record => JSON.parse(record.Sns.Message));
const validEvents = records.filter(validateEvent);
const invalidEvents = records.filter(event => !validateEvent(event));
// Map each record to its event and topicArn
const parsedRecords = snsEvent.Records.map(record => ({
event: JSON.parse(record.Sns.Message),
topicArn: record.Sns.TopicArn
}));

const validRecords = parsedRecords.filter(({ event }) => validateEvent(event));
const invalidEvents = parsedRecords.filter(({ event }) => !validateEvent(event)).map(({ event }) => event);

// console.info(`Valid events: ${validEvents.length}, Invalid events: ${invalidEvents.length}`);

if (invalidEvents.length) await sendToDLQ(invalidEvents);

const dataEvents = validEvents.filter(event => event.type === 'data');
const controlEvents = validEvents.filter(event => event.type === 'control');
// Classify by topicArn
const dataEvents = validRecords
.filter(({ topicArn }) => topicArn === DATA_TOPIC_ARN)
.map(({ event }) => event);
const controlEvents = validRecords
.filter(({ topicArn }) => topicArn === CONTROL_TOPIC_ARN)
.map(({ event }) => event);

// console.info(`Data events: ${dataEvents.length}, Control events: ${controlEvents.length}`);

Expand Down
2 changes: 2 additions & 0 deletions infrastructure/modules/eventpub/lambda_function.tf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ resource "aws_lambda_function" "main" {

environment {
variables = {
DATA_TOPIC_ARN = aws_sns_topic.main["data"].arn
CONTROL_TOPIC_ARN = aws_sns_topic.main["control"].arn
DATA_PLANE_EVENT_BUS_ARN = var.data_plane_bus_arn
CONTROL_PLANE_EVENT_BUS_ARN = var.control_plane_bus_arn
DLQ_URL = aws_sqs_queue.dlq.url
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
resource "aws_lambda_permission" "sns_lambda" {
statement_id = "AllowExecutionFromSNS"
for_each = local.sns_topics

statement_id = "AllowExecutionFromSNS${title(each.key)}Topic"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.main.function_name
principal = "sns.amazonaws.com"
source_arn = aws_sns_topic.main.arn
source_arn = aws_sns_topic.main[each.key].arn
}
5 changes: 4 additions & 1 deletion infrastructure/modules/eventpub/locals.tf
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,8 @@ locals {
Name = local.csi
},
)

sns_topics = {
data = "${local.csi}-data"
control = "${local.csi}-control"
}
}
9 changes: 6 additions & 3 deletions infrastructure/modules/eventpub/outputs.tf
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
output "sns_topic" {
description = "SNS Topic ARN and Name"
description = "SNS Topic ARNs and Names"
value = {
arn = aws_sns_topic.main.arn
name = aws_sns_topic.main.name
for key, value in aws_sns_topic.main :
key => {
arn = value.arn
name = value.name
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
resource "aws_sns_topic" "main" {
name = local.csi
for_each = local.sns_topics
name = each.value
kms_master_key_id = var.kms_key_arn

application_failure_feedback_role_arn = var.enable_sns_delivery_logging == true ? aws_iam_role.sns_delivery_logging_role[0].arn : null
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
resource "aws_sns_topic_subscription" "firehose" {
count = var.enable_event_cache ? 1 : 0
for_each = var.enable_event_cache ? local.sns_topics : {}

topic_arn = aws_sns_topic.main.arn
topic_arn = aws_sns_topic.main[each.key].arn
protocol = "firehose"
subscription_role_arn = aws_iam_role.sns_role.arn
endpoint = aws_kinesis_firehose_delivery_stream.main[0].arn
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
resource "aws_sns_topic_subscription" "lambda" {
topic_arn = aws_sns_topic.main.arn
for_each = local.sns_topics

topic_arn = aws_sns_topic.main[each.key].arn
protocol = "lambda"
endpoint = aws_lambda_function.main.arn
}
Loading