diff --git a/infrastructure/modules/eventpub/README.md b/infrastructure/modules/eventpub/README.md index c19e86b..4cea1a8 100644 --- a/infrastructure/modules/eventpub/README.md +++ b/infrastructure/modules/eventpub/README.md @@ -40,7 +40,7 @@ | Name | Description | |------|-------------| | [s3\_bucket\_event\_cache](#output\_s3\_bucket\_event\_cache) | S3 Bucket ARN and Name for event cache | -| [sns\_topic](#output\_sns\_topic) | SNS Topic ARN and Name | +| [sns\_topic](#output\_sns\_topic) | SNS Topic ARNs and Names | diff --git a/infrastructure/modules/eventpub/cloudwatch_log_group_sns_delivery_logging_failure.tf b/infrastructure/modules/eventpub/cloudwatch_log_group_sns_delivery_logging_failure.tf index 28a7ecf..c0fa0b5 100644 --- a/infrastructure/modules/eventpub/cloudwatch_log_group_sns_delivery_logging_failure.tf +++ b/infrastructure/modules/eventpub/cloudwatch_log_group_sns_delivery_logging_failure.tf @@ -1,9 +1,9 @@ resource "aws_cloudwatch_log_group" "sns_delivery_logging_failure" { - count = var.enable_sns_delivery_logging ? 1 : 0 + for_each = var.enable_event_cache ? local.sns_topics : {} # SNS doesn't allow specifying a log group and is derived as: sns/${region}/${account_id}/${name_of_sns_topic}/Failure # (for failure logs) - name = "sns/${var.region}/${var.aws_account_id}/${local.csi}/Failure" + name = "sns/${var.region}/${var.aws_account_id}/${local.csi}${each.key == "data" ? "-data" : "-control"}/Failure" kms_key_id = var.kms_key_arn retention_in_days = var.log_retention_in_days } diff --git a/infrastructure/modules/eventpub/cloudwatch_log_group_sns_delivery_logging_success.tf b/infrastructure/modules/eventpub/cloudwatch_log_group_sns_delivery_logging_success.tf index 338dabe..05ed919 100644 --- a/infrastructure/modules/eventpub/cloudwatch_log_group_sns_delivery_logging_success.tf +++ b/infrastructure/modules/eventpub/cloudwatch_log_group_sns_delivery_logging_success.tf @@ -1,9 +1,9 @@ resource "aws_cloudwatch_log_group" "sns_delivery_logging_success" { - count = var.enable_sns_delivery_logging ? 1 : 0 + for_each = var.enable_event_cache ? local.sns_topics : {} # SNS doesn't allow specifying a log group and is derived as: sns/${region}/${account_id}/${name_of_sns_topic}/Failure # (for failure logs) - name = "sns/${var.region}/${var.aws_account_id}/${local.csi}" + name = "sns/${var.region}/${var.aws_account_id}/${local.csi}${each.key == "data" ? "-data" : "-control"}" kms_key_id = var.kms_key_arn retention_in_days = var.log_retention_in_days } diff --git a/infrastructure/modules/eventpub/cloudwatch_metric_alarm_sns_delivery_failures.tf b/infrastructure/modules/eventpub/cloudwatch_metric_alarm_sns_delivery_failures.tf deleted file mode 100644 index e8ef124..0000000 --- a/infrastructure/modules/eventpub/cloudwatch_metric_alarm_sns_delivery_failures.tf +++ /dev/null @@ -1,16 +0,0 @@ -resource "aws_cloudwatch_metric_alarm" "sns_delivery_failures" { - alarm_name = "${local.csi}-sns-delivery-failures" - alarm_description = "RELIABILITY: Alarm for SNS topic delivery failures" - comparison_operator = "GreaterThanThreshold" - evaluation_periods = 1 - metric_name = "NumberOfNotificationsFailed" - namespace = "AWS/SNS" - period = 300 - statistic = "Sum" - threshold = 0 - treat_missing_data = "notBreaching" - - dimensions = { - TopicName = aws_sns_topic.main.name - } -} diff --git a/infrastructure/modules/eventpub/iam_policy_sns_delivery_logging_cloudwatch.tf b/infrastructure/modules/eventpub/iam_policy_sns_delivery_logging_cloudwatch.tf index d296da2..434aae0 100644 --- a/infrastructure/modules/eventpub/iam_policy_sns_delivery_logging_cloudwatch.tf +++ b/infrastructure/modules/eventpub/iam_policy_sns_delivery_logging_cloudwatch.tf @@ -34,11 +34,11 @@ data "aws_iam_policy_document" "sns_delivery_logging_cloudwatch" { "logs:PutRetentionPolicy", ] - resources = [ - aws_cloudwatch_log_group.sns_delivery_logging_success[0].arn, - "${aws_cloudwatch_log_group.sns_delivery_logging_success[0].arn}:log-stream:*", - aws_cloudwatch_log_group.sns_delivery_logging_failure[0].arn, - "${aws_cloudwatch_log_group.sns_delivery_logging_failure[0].arn}:log-stream:*", - ] + resources = concat( + [for arn in values(aws_cloudwatch_log_group.sns_delivery_logging_success) : arn.arn], + [for arn in values(aws_cloudwatch_log_group.sns_delivery_logging_success) : "${arn.arn}:log-stream:*"], + [for arn in values(aws_cloudwatch_log_group.sns_delivery_logging_failure) : arn.arn], + [for arn in values(aws_cloudwatch_log_group.sns_delivery_logging_failure) : "${arn.arn}:log-stream:*"] + ) } } diff --git a/infrastructure/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js b/infrastructure/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js index e11f735..4b10ed5 100644 --- a/infrastructure/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js +++ b/infrastructure/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js @@ -33,15 +33,21 @@ const invalidCloudEvent = { data: {} }; +const DATA_TOPIC_ARN = 'arn:aws:sns:eu-west-2:123456789012:data-topic'; +const CONTROL_TOPIC_ARN = 'arn:aws:sns:eu-west-2:123456789012:control-topic'; +const DATA_PLANE_EVENT_BUS_ARN = 'arn:aws:events:eu-west-2:123456789012:event-bus/data'; +const CONTROL_PLANE_EVENT_BUS_ARN = 'arn:aws:events:eu-west-2:123456789012:event-bus/control'; +const DLQ_URL = 'https://sqs.eu-west-2.amazonaws.com/123456789012/dlq'; + const snsEvent = { Records: [ - { Sns: { Message: JSON.stringify(validCloudEvent) } } + { Sns: { Message: JSON.stringify(validCloudEvent), TopicArn: DATA_TOPIC_ARN } } ] }; const snsEventInvalid = { Records: [ - { Sns: { Message: JSON.stringify(invalidCloudEvent) } } + { Sns: { Message: JSON.stringify(invalidCloudEvent), TopicArn: DATA_TOPIC_ARN } } ] }; @@ -49,6 +55,12 @@ describe('SNS to EventBridge Lambda', () => { beforeEach(() => { eventBridgeMock.reset(); sqsMock.reset(); + process.env.DATA_TOPIC_ARN = DATA_TOPIC_ARN; + process.env.CONTROL_TOPIC_ARN = CONTROL_TOPIC_ARN; + process.env.DATA_PLANE_EVENT_BUS_ARN = DATA_PLANE_EVENT_BUS_ARN; + process.env.CONTROL_PLANE_EVENT_BUS_ARN = CONTROL_PLANE_EVENT_BUS_ARN; + process.env.DLQ_URL = DLQ_URL; + process.env.THROTTLE_DELAY_MS = '0'; }); test('Valid event is sent to the correct EventBridge bus', async () => { @@ -57,6 +69,9 @@ describe('SNS to EventBridge Lambda', () => { await handler(snsEvent); expect(eventBridgeMock.calls()).toHaveLength(1); + // Check correct bus + const callInput = eventBridgeMock.calls()[0].args[0].input; + expect(callInput.Entries[0].EventBusName).toBe(DATA_PLANE_EVENT_BUS_ARN); }); test('Invalid event is sent to DLQ', async () => { @@ -65,9 +80,10 @@ describe('SNS to EventBridge Lambda', () => { await handler(snsEventInvalid); expect(sqsMock.calls()).toHaveLength(1); + const callInput = sqsMock.calls()[0].args[0].input; + expect(callInput.QueueUrl).toBe(DLQ_URL); }); - test('Retries on EventBridge failure and sends failed events to DLQ', async () => { eventBridgeMock .on(PutEventsCommand) @@ -85,10 +101,32 @@ describe('SNS to EventBridge Lambda', () => { process.env.THROTTLE_DELAY_MS = '500'; jest.useFakeTimers(); + eventBridgeMock.on(PutEventsCommand).resolves({ FailedEntryCount: 0, Entries: [{}] }); + const handlerPromise = handler(snsEvent); + expect(setTimeout).toHaveBeenCalledWith(expect.any(Function), 500); jest.advanceTimersByTime(500); await handlerPromise; jest.useRealTimers(); }); + + test('Routes control events to control event bus', async () => { + const controlEvent = { + ...validCloudEvent, + type: "control" + }; + const snsEventControl = { + Records: [ + { Sns: { Message: JSON.stringify(controlEvent), TopicArn: CONTROL_TOPIC_ARN } } + ] + }; + eventBridgeMock.on(PutEventsCommand).resolves({ FailedEntryCount: 0, Entries: [{}] }); + + await handler(snsEventControl); + + expect(eventBridgeMock.calls()).toHaveLength(1); + const callInput = eventBridgeMock.calls()[0].args[0].input; + expect(callInput.Entries[0].EventBusName).toBe(CONTROL_PLANE_EVENT_BUS_ARN); + }); }); diff --git a/infrastructure/modules/eventpub/lambda/eventpub/src/index.js b/infrastructure/modules/eventpub/lambda/eventpub/src/index.js index aa62531..02aa7f2 100644 --- a/infrastructure/modules/eventpub/lambda/eventpub/src/index.js +++ b/infrastructure/modules/eventpub/lambda/eventpub/src/index.js @@ -11,6 +11,9 @@ const THROTTLE_DELAY_MS = parseInt(process.env.THROTTLE_DELAY_MS || '0', 10); const MAX_RETRIES = 3; const EVENTBRIDGE_MAX_BATCH_SIZE = 10; +const DATA_TOPIC_ARN = process.env.DATA_TOPIC_ARN; +const CONTROL_TOPIC_ARN = process.env.CONTROL_TOPIC_ARN; + function validateEvent(event) { // CloudEvents v1.0 schema validation (supplier-status) const requiredFields = [ @@ -107,16 +110,26 @@ exports.handler = async (snsEvent) => { await new Promise(res => setTimeout(res, THROTTLE_DELAY_MS)); } - const records = snsEvent.Records.map(record => JSON.parse(record.Sns.Message)); - const validEvents = records.filter(validateEvent); - const invalidEvents = records.filter(event => !validateEvent(event)); + // Map each record to its event and topicArn + const parsedRecords = snsEvent.Records.map(record => ({ + event: JSON.parse(record.Sns.Message), + topicArn: record.Sns.TopicArn + })); + + const validRecords = parsedRecords.filter(({ event }) => validateEvent(event)); + const invalidEvents = parsedRecords.filter(({ event }) => !validateEvent(event)).map(({ event }) => event); // console.info(`Valid events: ${validEvents.length}, Invalid events: ${invalidEvents.length}`); if (invalidEvents.length) await sendToDLQ(invalidEvents); - const dataEvents = validEvents.filter(event => event.type === 'data'); - const controlEvents = validEvents.filter(event => event.type === 'control'); + // Classify by topicArn + const dataEvents = validRecords + .filter(({ topicArn }) => topicArn === DATA_TOPIC_ARN) + .map(({ event }) => event); + const controlEvents = validRecords + .filter(({ topicArn }) => topicArn === CONTROL_TOPIC_ARN) + .map(({ event }) => event); // console.info(`Data events: ${dataEvents.length}, Control events: ${controlEvents.length}`); diff --git a/infrastructure/modules/eventpub/lambda_function.tf b/infrastructure/modules/eventpub/lambda_function.tf index 1cadfc4..43c925d 100644 --- a/infrastructure/modules/eventpub/lambda_function.tf +++ b/infrastructure/modules/eventpub/lambda_function.tf @@ -21,6 +21,8 @@ resource "aws_lambda_function" "main" { environment { variables = { + DATA_TOPIC_ARN = aws_sns_topic.main["data"].arn + CONTROL_TOPIC_ARN = aws_sns_topic.main["control"].arn DATA_PLANE_EVENT_BUS_ARN = var.data_plane_bus_arn CONTROL_PLANE_EVENT_BUS_ARN = var.control_plane_bus_arn DLQ_URL = aws_sqs_queue.dlq.url diff --git a/infrastructure/modules/eventpub/lambda_permissions_sns_event_cache.tf b/infrastructure/modules/eventpub/lambda_permissions_sns_lambda.tf similarity index 55% rename from infrastructure/modules/eventpub/lambda_permissions_sns_event_cache.tf rename to infrastructure/modules/eventpub/lambda_permissions_sns_lambda.tf index ad473e9..a7e7afe 100644 --- a/infrastructure/modules/eventpub/lambda_permissions_sns_event_cache.tf +++ b/infrastructure/modules/eventpub/lambda_permissions_sns_lambda.tf @@ -1,7 +1,9 @@ resource "aws_lambda_permission" "sns_lambda" { - statement_id = "AllowExecutionFromSNS" + for_each = local.sns_topics + + statement_id = "AllowExecutionFromSNS${title(each.key)}Topic" action = "lambda:InvokeFunction" function_name = aws_lambda_function.main.function_name principal = "sns.amazonaws.com" - source_arn = aws_sns_topic.main.arn + source_arn = aws_sns_topic.main[each.key].arn } diff --git a/infrastructure/modules/eventpub/locals.tf b/infrastructure/modules/eventpub/locals.tf index 796ad69..94d9036 100644 --- a/infrastructure/modules/eventpub/locals.tf +++ b/infrastructure/modules/eventpub/locals.tf @@ -19,5 +19,8 @@ locals { Name = local.csi }, ) - + sns_topics = { + data = "${local.csi}-data" + control = "${local.csi}-control" + } } diff --git a/infrastructure/modules/eventpub/outputs.tf b/infrastructure/modules/eventpub/outputs.tf index e2ff3b3..4e5b310 100644 --- a/infrastructure/modules/eventpub/outputs.tf +++ b/infrastructure/modules/eventpub/outputs.tf @@ -1,8 +1,11 @@ output "sns_topic" { - description = "SNS Topic ARN and Name" + description = "SNS Topic ARNs and Names" value = { - arn = aws_sns_topic.main.arn - name = aws_sns_topic.main.name + for key, value in aws_sns_topic.main : + key => { + arn = value.arn + name = value.name + } } } diff --git a/infrastructure/modules/eventpub/sns_topic.tf b/infrastructure/modules/eventpub/sns_topic_main.tf similarity index 96% rename from infrastructure/modules/eventpub/sns_topic.tf rename to infrastructure/modules/eventpub/sns_topic_main.tf index cc30db1..a089b4f 100644 --- a/infrastructure/modules/eventpub/sns_topic.tf +++ b/infrastructure/modules/eventpub/sns_topic_main.tf @@ -1,5 +1,6 @@ resource "aws_sns_topic" "main" { - name = local.csi + for_each = local.sns_topics + name = each.value kms_master_key_id = var.kms_key_arn application_failure_feedback_role_arn = var.enable_sns_delivery_logging == true ? aws_iam_role.sns_delivery_logging_role[0].arn : null diff --git a/infrastructure/modules/eventpub/sns_topic_subscription_firehose.tf b/infrastructure/modules/eventpub/sns_topic_subscription_firehose.tf index 9ed83cc..552a177 100644 --- a/infrastructure/modules/eventpub/sns_topic_subscription_firehose.tf +++ b/infrastructure/modules/eventpub/sns_topic_subscription_firehose.tf @@ -1,7 +1,7 @@ resource "aws_sns_topic_subscription" "firehose" { - count = var.enable_event_cache ? 1 : 0 + for_each = var.enable_event_cache ? local.sns_topics : {} - topic_arn = aws_sns_topic.main.arn + topic_arn = aws_sns_topic.main[each.key].arn protocol = "firehose" subscription_role_arn = aws_iam_role.sns_role.arn endpoint = aws_kinesis_firehose_delivery_stream.main[0].arn diff --git a/infrastructure/modules/eventpub/sns_topic_subscription_lambda.tf b/infrastructure/modules/eventpub/sns_topic_subscription_lambda.tf index ffee18a..1b3e09d 100644 --- a/infrastructure/modules/eventpub/sns_topic_subscription_lambda.tf +++ b/infrastructure/modules/eventpub/sns_topic_subscription_lambda.tf @@ -1,5 +1,7 @@ resource "aws_sns_topic_subscription" "lambda" { - topic_arn = aws_sns_topic.main.arn + for_each = local.sns_topics + + topic_arn = aws_sns_topic.main[each.key].arn protocol = "lambda" endpoint = aws_lambda_function.main.arn }