diff --git a/infrastructure/modules/eventpub/README.md b/infrastructure/modules/eventpub/README.md
index c19e86b..4cea1a8 100644
--- a/infrastructure/modules/eventpub/README.md
+++ b/infrastructure/modules/eventpub/README.md
@@ -40,7 +40,7 @@
| Name | Description |
|------|-------------|
| [s3\_bucket\_event\_cache](#output\_s3\_bucket\_event\_cache) | S3 Bucket ARN and Name for event cache |
-| [sns\_topic](#output\_sns\_topic) | SNS Topic ARN and Name |
+| [sns\_topic](#output\_sns\_topic) | SNS Topic ARNs and Names |
diff --git a/infrastructure/modules/eventpub/cloudwatch_log_group_sns_delivery_logging_failure.tf b/infrastructure/modules/eventpub/cloudwatch_log_group_sns_delivery_logging_failure.tf
index 28a7ecf..c0fa0b5 100644
--- a/infrastructure/modules/eventpub/cloudwatch_log_group_sns_delivery_logging_failure.tf
+++ b/infrastructure/modules/eventpub/cloudwatch_log_group_sns_delivery_logging_failure.tf
@@ -1,9 +1,9 @@
resource "aws_cloudwatch_log_group" "sns_delivery_logging_failure" {
- count = var.enable_sns_delivery_logging ? 1 : 0
+ for_each = var.enable_event_cache ? local.sns_topics : {}
# SNS doesn't allow specifying a log group and is derived as: sns/${region}/${account_id}/${name_of_sns_topic}/Failure
# (for failure logs)
- name = "sns/${var.region}/${var.aws_account_id}/${local.csi}/Failure"
+ name = "sns/${var.region}/${var.aws_account_id}/${local.csi}${each.key == "data" ? "-data" : "-control"}/Failure"
kms_key_id = var.kms_key_arn
retention_in_days = var.log_retention_in_days
}
diff --git a/infrastructure/modules/eventpub/cloudwatch_log_group_sns_delivery_logging_success.tf b/infrastructure/modules/eventpub/cloudwatch_log_group_sns_delivery_logging_success.tf
index 338dabe..05ed919 100644
--- a/infrastructure/modules/eventpub/cloudwatch_log_group_sns_delivery_logging_success.tf
+++ b/infrastructure/modules/eventpub/cloudwatch_log_group_sns_delivery_logging_success.tf
@@ -1,9 +1,9 @@
resource "aws_cloudwatch_log_group" "sns_delivery_logging_success" {
- count = var.enable_sns_delivery_logging ? 1 : 0
+ for_each = var.enable_event_cache ? local.sns_topics : {}
# SNS doesn't allow specifying a log group and is derived as: sns/${region}/${account_id}/${name_of_sns_topic}/Failure
# (for failure logs)
- name = "sns/${var.region}/${var.aws_account_id}/${local.csi}"
+ name = "sns/${var.region}/${var.aws_account_id}/${local.csi}${each.key == "data" ? "-data" : "-control"}"
kms_key_id = var.kms_key_arn
retention_in_days = var.log_retention_in_days
}
diff --git a/infrastructure/modules/eventpub/cloudwatch_metric_alarm_sns_delivery_failures.tf b/infrastructure/modules/eventpub/cloudwatch_metric_alarm_sns_delivery_failures.tf
deleted file mode 100644
index e8ef124..0000000
--- a/infrastructure/modules/eventpub/cloudwatch_metric_alarm_sns_delivery_failures.tf
+++ /dev/null
@@ -1,16 +0,0 @@
-resource "aws_cloudwatch_metric_alarm" "sns_delivery_failures" {
- alarm_name = "${local.csi}-sns-delivery-failures"
- alarm_description = "RELIABILITY: Alarm for SNS topic delivery failures"
- comparison_operator = "GreaterThanThreshold"
- evaluation_periods = 1
- metric_name = "NumberOfNotificationsFailed"
- namespace = "AWS/SNS"
- period = 300
- statistic = "Sum"
- threshold = 0
- treat_missing_data = "notBreaching"
-
- dimensions = {
- TopicName = aws_sns_topic.main.name
- }
-}
diff --git a/infrastructure/modules/eventpub/iam_policy_sns_delivery_logging_cloudwatch.tf b/infrastructure/modules/eventpub/iam_policy_sns_delivery_logging_cloudwatch.tf
index d296da2..434aae0 100644
--- a/infrastructure/modules/eventpub/iam_policy_sns_delivery_logging_cloudwatch.tf
+++ b/infrastructure/modules/eventpub/iam_policy_sns_delivery_logging_cloudwatch.tf
@@ -34,11 +34,11 @@ data "aws_iam_policy_document" "sns_delivery_logging_cloudwatch" {
"logs:PutRetentionPolicy",
]
- resources = [
- aws_cloudwatch_log_group.sns_delivery_logging_success[0].arn,
- "${aws_cloudwatch_log_group.sns_delivery_logging_success[0].arn}:log-stream:*",
- aws_cloudwatch_log_group.sns_delivery_logging_failure[0].arn,
- "${aws_cloudwatch_log_group.sns_delivery_logging_failure[0].arn}:log-stream:*",
- ]
+ resources = concat(
+ [for arn in values(aws_cloudwatch_log_group.sns_delivery_logging_success) : arn.arn],
+ [for arn in values(aws_cloudwatch_log_group.sns_delivery_logging_success) : "${arn.arn}:log-stream:*"],
+ [for arn in values(aws_cloudwatch_log_group.sns_delivery_logging_failure) : arn.arn],
+ [for arn in values(aws_cloudwatch_log_group.sns_delivery_logging_failure) : "${arn.arn}:log-stream:*"]
+ )
}
}
diff --git a/infrastructure/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js b/infrastructure/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js
index e11f735..4b10ed5 100644
--- a/infrastructure/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js
+++ b/infrastructure/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js
@@ -33,15 +33,21 @@ const invalidCloudEvent = {
data: {}
};
+const DATA_TOPIC_ARN = 'arn:aws:sns:eu-west-2:123456789012:data-topic';
+const CONTROL_TOPIC_ARN = 'arn:aws:sns:eu-west-2:123456789012:control-topic';
+const DATA_PLANE_EVENT_BUS_ARN = 'arn:aws:events:eu-west-2:123456789012:event-bus/data';
+const CONTROL_PLANE_EVENT_BUS_ARN = 'arn:aws:events:eu-west-2:123456789012:event-bus/control';
+const DLQ_URL = 'https://sqs.eu-west-2.amazonaws.com/123456789012/dlq';
+
const snsEvent = {
Records: [
- { Sns: { Message: JSON.stringify(validCloudEvent) } }
+ { Sns: { Message: JSON.stringify(validCloudEvent), TopicArn: DATA_TOPIC_ARN } }
]
};
const snsEventInvalid = {
Records: [
- { Sns: { Message: JSON.stringify(invalidCloudEvent) } }
+ { Sns: { Message: JSON.stringify(invalidCloudEvent), TopicArn: DATA_TOPIC_ARN } }
]
};
@@ -49,6 +55,12 @@ describe('SNS to EventBridge Lambda', () => {
beforeEach(() => {
eventBridgeMock.reset();
sqsMock.reset();
+ process.env.DATA_TOPIC_ARN = DATA_TOPIC_ARN;
+ process.env.CONTROL_TOPIC_ARN = CONTROL_TOPIC_ARN;
+ process.env.DATA_PLANE_EVENT_BUS_ARN = DATA_PLANE_EVENT_BUS_ARN;
+ process.env.CONTROL_PLANE_EVENT_BUS_ARN = CONTROL_PLANE_EVENT_BUS_ARN;
+ process.env.DLQ_URL = DLQ_URL;
+ process.env.THROTTLE_DELAY_MS = '0';
});
test('Valid event is sent to the correct EventBridge bus', async () => {
@@ -57,6 +69,9 @@ describe('SNS to EventBridge Lambda', () => {
await handler(snsEvent);
expect(eventBridgeMock.calls()).toHaveLength(1);
+ // Check correct bus
+ const callInput = eventBridgeMock.calls()[0].args[0].input;
+ expect(callInput.Entries[0].EventBusName).toBe(DATA_PLANE_EVENT_BUS_ARN);
});
test('Invalid event is sent to DLQ', async () => {
@@ -65,9 +80,10 @@ describe('SNS to EventBridge Lambda', () => {
await handler(snsEventInvalid);
expect(sqsMock.calls()).toHaveLength(1);
+ const callInput = sqsMock.calls()[0].args[0].input;
+ expect(callInput.QueueUrl).toBe(DLQ_URL);
});
-
test('Retries on EventBridge failure and sends failed events to DLQ', async () => {
eventBridgeMock
.on(PutEventsCommand)
@@ -85,10 +101,32 @@ describe('SNS to EventBridge Lambda', () => {
process.env.THROTTLE_DELAY_MS = '500';
jest.useFakeTimers();
+ eventBridgeMock.on(PutEventsCommand).resolves({ FailedEntryCount: 0, Entries: [{}] });
+
const handlerPromise = handler(snsEvent);
+ expect(setTimeout).toHaveBeenCalledWith(expect.any(Function), 500);
jest.advanceTimersByTime(500);
await handlerPromise;
jest.useRealTimers();
});
+
+ test('Routes control events to control event bus', async () => {
+ const controlEvent = {
+ ...validCloudEvent,
+ type: "control"
+ };
+ const snsEventControl = {
+ Records: [
+ { Sns: { Message: JSON.stringify(controlEvent), TopicArn: CONTROL_TOPIC_ARN } }
+ ]
+ };
+ eventBridgeMock.on(PutEventsCommand).resolves({ FailedEntryCount: 0, Entries: [{}] });
+
+ await handler(snsEventControl);
+
+ expect(eventBridgeMock.calls()).toHaveLength(1);
+ const callInput = eventBridgeMock.calls()[0].args[0].input;
+ expect(callInput.Entries[0].EventBusName).toBe(CONTROL_PLANE_EVENT_BUS_ARN);
+ });
});
diff --git a/infrastructure/modules/eventpub/lambda/eventpub/src/index.js b/infrastructure/modules/eventpub/lambda/eventpub/src/index.js
index aa62531..02aa7f2 100644
--- a/infrastructure/modules/eventpub/lambda/eventpub/src/index.js
+++ b/infrastructure/modules/eventpub/lambda/eventpub/src/index.js
@@ -11,6 +11,9 @@ const THROTTLE_DELAY_MS = parseInt(process.env.THROTTLE_DELAY_MS || '0', 10);
const MAX_RETRIES = 3;
const EVENTBRIDGE_MAX_BATCH_SIZE = 10;
+const DATA_TOPIC_ARN = process.env.DATA_TOPIC_ARN;
+const CONTROL_TOPIC_ARN = process.env.CONTROL_TOPIC_ARN;
+
function validateEvent(event) {
// CloudEvents v1.0 schema validation (supplier-status)
const requiredFields = [
@@ -107,16 +110,26 @@ exports.handler = async (snsEvent) => {
await new Promise(res => setTimeout(res, THROTTLE_DELAY_MS));
}
- const records = snsEvent.Records.map(record => JSON.parse(record.Sns.Message));
- const validEvents = records.filter(validateEvent);
- const invalidEvents = records.filter(event => !validateEvent(event));
+ // Map each record to its event and topicArn
+ const parsedRecords = snsEvent.Records.map(record => ({
+ event: JSON.parse(record.Sns.Message),
+ topicArn: record.Sns.TopicArn
+ }));
+
+ const validRecords = parsedRecords.filter(({ event }) => validateEvent(event));
+ const invalidEvents = parsedRecords.filter(({ event }) => !validateEvent(event)).map(({ event }) => event);
// console.info(`Valid events: ${validEvents.length}, Invalid events: ${invalidEvents.length}`);
if (invalidEvents.length) await sendToDLQ(invalidEvents);
- const dataEvents = validEvents.filter(event => event.type === 'data');
- const controlEvents = validEvents.filter(event => event.type === 'control');
+ // Classify by topicArn
+ const dataEvents = validRecords
+ .filter(({ topicArn }) => topicArn === DATA_TOPIC_ARN)
+ .map(({ event }) => event);
+ const controlEvents = validRecords
+ .filter(({ topicArn }) => topicArn === CONTROL_TOPIC_ARN)
+ .map(({ event }) => event);
// console.info(`Data events: ${dataEvents.length}, Control events: ${controlEvents.length}`);
diff --git a/infrastructure/modules/eventpub/lambda_function.tf b/infrastructure/modules/eventpub/lambda_function.tf
index 1cadfc4..43c925d 100644
--- a/infrastructure/modules/eventpub/lambda_function.tf
+++ b/infrastructure/modules/eventpub/lambda_function.tf
@@ -21,6 +21,8 @@ resource "aws_lambda_function" "main" {
environment {
variables = {
+ DATA_TOPIC_ARN = aws_sns_topic.main["data"].arn
+ CONTROL_TOPIC_ARN = aws_sns_topic.main["control"].arn
DATA_PLANE_EVENT_BUS_ARN = var.data_plane_bus_arn
CONTROL_PLANE_EVENT_BUS_ARN = var.control_plane_bus_arn
DLQ_URL = aws_sqs_queue.dlq.url
diff --git a/infrastructure/modules/eventpub/lambda_permissions_sns_event_cache.tf b/infrastructure/modules/eventpub/lambda_permissions_sns_lambda.tf
similarity index 55%
rename from infrastructure/modules/eventpub/lambda_permissions_sns_event_cache.tf
rename to infrastructure/modules/eventpub/lambda_permissions_sns_lambda.tf
index ad473e9..a7e7afe 100644
--- a/infrastructure/modules/eventpub/lambda_permissions_sns_event_cache.tf
+++ b/infrastructure/modules/eventpub/lambda_permissions_sns_lambda.tf
@@ -1,7 +1,9 @@
resource "aws_lambda_permission" "sns_lambda" {
- statement_id = "AllowExecutionFromSNS"
+ for_each = local.sns_topics
+
+ statement_id = "AllowExecutionFromSNS${title(each.key)}Topic"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.main.function_name
principal = "sns.amazonaws.com"
- source_arn = aws_sns_topic.main.arn
+ source_arn = aws_sns_topic.main[each.key].arn
}
diff --git a/infrastructure/modules/eventpub/locals.tf b/infrastructure/modules/eventpub/locals.tf
index 796ad69..94d9036 100644
--- a/infrastructure/modules/eventpub/locals.tf
+++ b/infrastructure/modules/eventpub/locals.tf
@@ -19,5 +19,8 @@ locals {
Name = local.csi
},
)
-
+ sns_topics = {
+ data = "${local.csi}-data"
+ control = "${local.csi}-control"
+ }
}
diff --git a/infrastructure/modules/eventpub/outputs.tf b/infrastructure/modules/eventpub/outputs.tf
index e2ff3b3..4e5b310 100644
--- a/infrastructure/modules/eventpub/outputs.tf
+++ b/infrastructure/modules/eventpub/outputs.tf
@@ -1,8 +1,11 @@
output "sns_topic" {
- description = "SNS Topic ARN and Name"
+ description = "SNS Topic ARNs and Names"
value = {
- arn = aws_sns_topic.main.arn
- name = aws_sns_topic.main.name
+ for key, value in aws_sns_topic.main :
+ key => {
+ arn = value.arn
+ name = value.name
+ }
}
}
diff --git a/infrastructure/modules/eventpub/sns_topic.tf b/infrastructure/modules/eventpub/sns_topic_main.tf
similarity index 96%
rename from infrastructure/modules/eventpub/sns_topic.tf
rename to infrastructure/modules/eventpub/sns_topic_main.tf
index cc30db1..a089b4f 100644
--- a/infrastructure/modules/eventpub/sns_topic.tf
+++ b/infrastructure/modules/eventpub/sns_topic_main.tf
@@ -1,5 +1,6 @@
resource "aws_sns_topic" "main" {
- name = local.csi
+ for_each = local.sns_topics
+ name = each.value
kms_master_key_id = var.kms_key_arn
application_failure_feedback_role_arn = var.enable_sns_delivery_logging == true ? aws_iam_role.sns_delivery_logging_role[0].arn : null
diff --git a/infrastructure/modules/eventpub/sns_topic_subscription_firehose.tf b/infrastructure/modules/eventpub/sns_topic_subscription_firehose.tf
index 9ed83cc..552a177 100644
--- a/infrastructure/modules/eventpub/sns_topic_subscription_firehose.tf
+++ b/infrastructure/modules/eventpub/sns_topic_subscription_firehose.tf
@@ -1,7 +1,7 @@
resource "aws_sns_topic_subscription" "firehose" {
- count = var.enable_event_cache ? 1 : 0
+ for_each = var.enable_event_cache ? local.sns_topics : {}
- topic_arn = aws_sns_topic.main.arn
+ topic_arn = aws_sns_topic.main[each.key].arn
protocol = "firehose"
subscription_role_arn = aws_iam_role.sns_role.arn
endpoint = aws_kinesis_firehose_delivery_stream.main[0].arn
diff --git a/infrastructure/modules/eventpub/sns_topic_subscription_lambda.tf b/infrastructure/modules/eventpub/sns_topic_subscription_lambda.tf
index ffee18a..1b3e09d 100644
--- a/infrastructure/modules/eventpub/sns_topic_subscription_lambda.tf
+++ b/infrastructure/modules/eventpub/sns_topic_subscription_lambda.tf
@@ -1,5 +1,7 @@
resource "aws_sns_topic_subscription" "lambda" {
- topic_arn = aws_sns_topic.main.arn
+ for_each = local.sns_topics
+
+ topic_arn = aws_sns_topic.main[each.key].arn
protocol = "lambda"
endpoint = aws_lambda_function.main.arn
}