Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion application/common/dos.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def db_rows_to_spec_open_times(db_rows: Iterable[dict]) -> list[SpecifiedOpening
for date, rows in groupby(date_sorted_rows, lambda row: row["date"]):
is_open = True
open_periods = []
for row in list(rows):
for row in rows:
if row["isclosed"] is True:
is_open = False
else:
Expand Down
4 changes: 4 additions & 0 deletions application/common/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ class ValidationError(Exception):

class DynamoDBError(Exception):
"""Exception raised for all DynamoDB errors."""


class SecretsManagerError(Exception):
"""Exception raised for AWS Secrets Manager errors."""
2 changes: 1 addition & 1 deletion application/common/nhs.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def _get_specified_opening_times(self: Self) -> list[SpecifiedOpeningTime]:
date = datetime.strptime(date_str, "%b %d %Y").date()
is_open = True

for item in list(op_dict_list):
for item in op_dict_list:
if item["IsOpen"]:
open_periods.append(OpenPeriod.from_string_times(item["OpeningTime"], item["ClosingTime"]))
else:
Expand Down
6 changes: 4 additions & 2 deletions application/common/secretsmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from boto3 import client
from botocore.exceptions import ClientError

from .errors import SecretsManagerError

logger = Logger()

secrets_manager = client(service_name="secretsmanager")
Expand All @@ -16,7 +18,7 @@ def get_secret(secret_name: str) -> dict[str, str]:
secret_name (str): Secret name to get

Raises:
e: ClientError caused by secrets manager
SecretsManagerError: When unable to retrieve secret from AWS Secrets Manager

Returns:
Dict[str, str]: Secrets as a dictionary
Expand All @@ -25,6 +27,6 @@ def get_secret(secret_name: str) -> dict[str, str]:
secret_value_response = secrets_manager.get_secret_value(SecretId=secret_name)
except ClientError as err:
msg = f"Failed getting secret '{secret_name}' from secrets manager"
raise Exception(msg) from err # noqa: TRY002
raise SecretsManagerError(msg) from err
secrets_json_str = secret_value_response["SecretString"]
return loads(secrets_json_str)
4 changes: 3 additions & 1 deletion application/common/tests/test_secretsmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import pytest
from moto import mock_aws

from application.common.errors import SecretsManagerError

FILE_PATH = "application.common.secretsmanager"


Expand All @@ -26,5 +28,5 @@ def test_get_secret() -> None:
def test_get_secret_resource_not_found() -> None:
from application.common.secretsmanager import get_secret

with pytest.raises(Exception, match="Failed getting secret 'fake_secret_name' from secrets manager"):
with pytest.raises(SecretsManagerError, match="Failed getting secret 'fake_secret_name' from secrets manager"):
get_secret("fake_secret_name")
14 changes: 11 additions & 3 deletions application/event_replay/event_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,19 @@
from aws_lambda_powertools.utilities.typing import LambdaContext
from boto3 import client
from boto3.dynamodb.types import TypeDeserializer
from botocore.config import Config
from simplejson import dumps

from common.middlewares import unhandled_exception_logging

# Configure boto3 client with explicit timeout to prevent hanging in Lambda
# Timeouts tuned for typical DynamoDB/SQS operations while preventing indefinite hangs
boto_config = Config(connect_timeout=10, read_timeout=15)

# Create clients at module level for connection reuse across Lambda invocations
dynamodb_client = client("dynamodb", config=boto_config)
sqs_client = client("sqs", config=boto_config)

tracer = Tracer()
logger = Logger()

Expand Down Expand Up @@ -75,7 +84,7 @@ def get_change_event(odscode: str, sequence_number: Decimal) -> dict[str, Any]:
Returns:
dict[str, Any]: The change event
"""
response = client("dynamodb").query(
response = dynamodb_client.query(
TableName=getenv("CHANGE_EVENTS_TABLE_NAME"),
IndexName="gsi_ods_sequence",
ProjectionExpression="Event",
Expand Down Expand Up @@ -112,11 +121,10 @@ def send_change_event(change_event: dict[str, Any], odscode: str, sequence_numbe
sequence_number (int): The sequence number of the change event
correlation_id (str): The correlation id of the event replay
"""
sqs = client("sqs")
queue_url = getenv("CHANGE_EVENT_SQS_URL")
logger.info("Sending change event to SQS", queue_url=queue_url)
change_event_str = dumps(change_event)
response = sqs.send_message(
response = sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=change_event_str,
MessageGroupId=odscode,
Expand Down
19 changes: 8 additions & 11 deletions application/event_replay/tests/test_event_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,20 @@ def test_build_correlation_id(mock_time_ns: MagicMock) -> None:
assert response == f"{time}-local-replayed-event"


@patch(f"{FILE_PATH}.client")
@patch(f"{FILE_PATH}.dynamodb_client")
def test_get_change_event(mock_client: MagicMock, change_event: dict[str, str], event: dict[str, str]) -> None:
# Arrange
table_name = "my-table"
environ["CHANGE_EVENTS_TABLE_NAME"] = table_name
environ["AWS_REGION"] = "eu-west-1"
serializer = TypeSerializer()

mock_client.return_value.query.return_value = {"Items": [{"Event": serializer.serialize(change_event)}]}
mock_client.query.return_value = {"Items": [{"Event": serializer.serialize(change_event)}]}
# Act
response = get_change_event(event["odscode"], Decimal(event["sequence_number"]))
# Assert
assert response == change_event
mock_client.assert_called_with("dynamodb")
mock_client().query.assert_called_with(
mock_client.query.assert_called_with(
TableName=table_name,
IndexName="gsi_ods_sequence",
ProjectionExpression="Event",
Expand All @@ -130,21 +129,20 @@ def test_get_change_event(mock_client: MagicMock, change_event: dict[str, str],
del environ["AWS_REGION"]


@patch(f"{FILE_PATH}.client")
@patch(f"{FILE_PATH}.dynamodb_client")
def test_get_change_event_no_change_event_in_dynamodb(
mock_client: MagicMock, change_event: dict[str, str], event: dict[str, str]
) -> None:
# Arrange
table_name = "my-table"
environ["CHANGE_EVENTS_TABLE_NAME"] = table_name
environ["AWS_REGION"] = "eu-west-1"
mock_client.return_value.query.return_value = {"Items": []}
mock_client.query.return_value = {"Items": []}
# Act
with pytest.raises(ValueError, match="No change event found for ods code FXXX1 and sequence number 1"):
get_change_event(event["odscode"], Decimal(event["sequence_number"]))
# Assert
mock_client.assert_called_with("dynamodb")
mock_client().query.assert_called_with(
mock_client.query.assert_called_with(
TableName=table_name,
IndexName="gsi_ods_sequence",
ProjectionExpression="Event",
Expand All @@ -158,16 +156,15 @@ def test_get_change_event_no_change_event_in_dynamodb(
del environ["AWS_REGION"]


@patch(f"{FILE_PATH}.client")
@patch(f"{FILE_PATH}.sqs_client")
def test_send_change_event(mock_client: MagicMock, change_event: dict[str, str], event: dict[str, str]) -> None:
# Arrange
correlation_id = "CORRELATION_ID"
environ["CHANGE_EVENT_SQS_URL"] = queue_url = "https://sqs.eu-west-1.amazonaws.com/123456789/my-queue"
# Act
send_change_event(change_event, event["odscode"], int(event["sequence_number"]), correlation_id)
# Assert
mock_client.assert_called_with("sqs")
mock_client().send_message.assert_called_with(
mock_client.send_message.assert_called_with(
QueueUrl=queue_url,
MessageBody=dumps(change_event),
MessageGroupId=event["odscode"],
Expand Down
26 changes: 20 additions & 6 deletions application/ingest_change_event/ingest_change_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,22 @@
from aws_lambda_powertools.utilities.data_classes import SQSEvent, event_source
from aws_lambda_powertools.utilities.typing.lambda_context import LambdaContext
from boto3 import client
from botocore.config import Config
from botocore.exceptions import ClientError

from .change_event_validation import validate_change_event
from common.dynamodb import add_change_event_to_dynamodb, get_latest_sequence_id_for_a_given_odscode_from_dynamodb
from common.middlewares import redact_staff_key_from_event, unhandled_exception_logging
from common.types import HoldingQueueChangeEventItem
from common.utilities import extract_body, get_sequence_number

# Configure boto3 client with explicit timeout to prevent hanging in Lambda
boto_config = Config(connect_timeout=10, read_timeout=15)

logger = Logger()
tracer = Tracer()
sqs = client("sqs")
# Create SQS client at module level for connection reuse across Lambda invocations
sqs_client = client("sqs", config=boto_config)


@redact_staff_key_from_event()
Expand Down Expand Up @@ -82,8 +88,16 @@ def lambda_handler(event: SQSEvent, context: LambdaContext) -> None: # noqa: AR
correlation_id=logger.get_correlation_id(),
)
logger.debug("Change event validated", holding_queue_change_event_item=holding_queue_change_event_item)
sqs.send_message(
QueueUrl=getenv("HOLDING_QUEUE_URL"),
MessageBody=dumps(holding_queue_change_event_item),
MessageGroupId=ods_code,
)
try:
sqs_client.send_message(
QueueUrl=getenv("HOLDING_QUEUE_URL"),
MessageBody=dumps(holding_queue_change_event_item),
MessageGroupId=ods_code,
)
except ClientError as err:
logger.exception(
"Failed to send message to holding queue",
error_code=err.response["Error"]["Code"],
queue_url=getenv("HOLDING_QUEUE_URL"),
)
raise
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
FILE_PATH = "application.ingest_change_event.ingest_change_event"


@patch(f"{FILE_PATH}.sqs")
@patch(f"{FILE_PATH}.sqs_client")
@patch(f"{FILE_PATH}.HoldingQueueChangeEventItem")
@patch(f"{FILE_PATH}.add_change_event_to_dynamodb")
@patch(f"{FILE_PATH}.get_latest_sequence_id_for_a_given_odscode_from_dynamodb")
Expand Down Expand Up @@ -72,7 +72,7 @@ def test_lambda_handler(
del environ["HOLDING_QUEUE_URL"]


@patch(f"{FILE_PATH}.sqs")
@patch(f"{FILE_PATH}.sqs_client")
@patch(f"{FILE_PATH}.HoldingQueueChangeEventItem")
@patch(f"{FILE_PATH}.add_change_event_to_dynamodb")
@patch(f"{FILE_PATH}.get_latest_sequence_id_for_a_given_odscode_from_dynamodb")
Expand Down Expand Up @@ -130,7 +130,7 @@ def test_lambda_handler_with_sensitive_staff_key(


@patch.object(Logger, "error")
@patch(f"{FILE_PATH}.sqs")
@patch(f"{FILE_PATH}.sqs_client")
@patch(f"{FILE_PATH}.HoldingQueueChangeEventItem")
@patch(f"{FILE_PATH}.add_change_event_to_dynamodb")
@patch(f"{FILE_PATH}.get_latest_sequence_id_for_a_given_odscode_from_dynamodb")
Expand Down Expand Up @@ -183,7 +183,7 @@ def test_lambda_handler_no_sequence_number(


@patch.object(Logger, "error")
@patch(f"{FILE_PATH}.sqs")
@patch(f"{FILE_PATH}.sqs_client")
@patch(f"{FILE_PATH}.HoldingQueueChangeEventItem")
@patch(f"{FILE_PATH}.add_change_event_to_dynamodb")
@patch(f"{FILE_PATH}.get_latest_sequence_id_for_a_given_odscode_from_dynamodb")
Expand Down Expand Up @@ -239,7 +239,7 @@ def test_lambda_handler_less_than_latest_sequence_number(
del environ["HOLDING_QUEUE_URL"]


@patch(f"{FILE_PATH}.sqs")
@patch(f"{FILE_PATH}.sqs_client")
@patch(f"{FILE_PATH}.HoldingQueueChangeEventItem")
@patch(f"{FILE_PATH}.add_change_event_to_dynamodb")
@patch(f"{FILE_PATH}.get_latest_sequence_id_for_a_given_odscode_from_dynamodb")
Expand Down
1 change: 0 additions & 1 deletion application/service_matcher/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
aws-lambda-powertools[tracer] ~= 3.20.0
psycopg[binary]
pytz
9 changes: 7 additions & 2 deletions application/service_matcher/service_matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from aws_lambda_powertools.utilities.data_classes import SQSEvent, event_source
from aws_lambda_powertools.utilities.typing.lambda_context import LambdaContext
from boto3 import client
from botocore.config import Config

from .matching import get_matching_services
from .review_matches import review_matches
Expand All @@ -16,9 +17,13 @@
from common.types import HoldingQueueChangeEventItem, UpdateRequest
from common.utilities import extract_body

# Configure boto3 client with explicit timeout to prevent hanging in Lambda
boto_config = Config(connect_timeout=10, read_timeout=15)

logger = Logger()
tracer = Tracer()
sqs = client("sqs")
# Create SQS client at module level for connection reuse across Lambda invocations
sqs_client = client("sqs", config=boto_config)


@unhandled_exception_logging()
Expand Down Expand Up @@ -115,7 +120,7 @@ def send_update_requests(
for i, chunk in enumerate(chunks):
# TODO: Handle errors?
logger.debug(f"Sending off message chunk {i+1}/{len(chunks)}")
response = sqs.send_message_batch(QueueUrl=environ["UPDATE_REQUEST_QUEUE_URL"], Entries=chunk)
response = sqs_client.send_message_batch(QueueUrl=environ["UPDATE_REQUEST_QUEUE_URL"], Entries=chunk)
logger.debug("Sent off message chunk", response=response)
logger.warning(
"Sent Off Update Request",
Expand Down
2 changes: 1 addition & 1 deletion application/service_matcher/tests/test_service_matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def test_lambda_handler_should_throw_exception_if_event_records_len_not_eq_one(l
del environ["ENV"]


@patch(f"{FILE_PATH}.sqs")
@patch(f"{FILE_PATH}.sqs_client")
@patch.object(Logger, "get_correlation_id", return_value="1")
@patch.object(Logger, "warning")
def test_send_update_requests(
Expand Down
4 changes: 2 additions & 2 deletions application/service_sync/data_processing/service_histories.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
from json import dumps, loads
from time import time
from typing import Any, Self
from zoneinfo import ZoneInfo

from aws_lambda_powertools.logging import Logger
from psycopg import Connection
from psycopg.rows import dict_row
from pytz import timezone

from .service_histories_change import ServiceHistoriesChange
from common.constants import (
Expand Down Expand Up @@ -209,7 +209,7 @@ def save_service_histories(self: Self, connection: Connection) -> None:
# Generate the epoch time in seconds rounded down to the nearest second at the time of saving
current_epoch_time = str(int(time()))
# Get local datetime and format it to DoS date/time format
current_date_time = datetime.now(timezone("Europe/London")).strftime("%Y-%m-%d %H:%M:%S")
current_date_time = datetime.now(ZoneInfo("Europe/London")).strftime("%Y-%m-%d %H:%M:%S")
# Rename the new_change key to the current epoch time
self.service_history[current_epoch_time] = self.service_history.pop("new_change")
# Add the current time to the service_histories json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@
class ServiceHistoriesChange:
"""A change to be added to the servicehistories table."""

data: str
data: str | dict
previous_value: str
change_key: str
change_action: str
area: str

def __init__(
self: Self, data: str, previous_value: str, change_key: str, area: str = DOS_DEMOGRAPHICS_AREA_TYPE
self: Self, data: str | dict, previous_value: str, change_key: str, area: str = DOS_DEMOGRAPHICS_AREA_TYPE
) -> None:
"""Initialises the ServiceHistoriesChange object.

Args:
data (str): The data to be added to the servicehistories table.
data (str | dict): The data to be added to the servicehistories table.
previous_value (str): The previous value of the data to be added to the servicehistories table.
change_key (str): The change key for the data to be added to the servicehistories table.
area (str): The area of the data to be added to the servicehistories table.
Expand Down Expand Up @@ -76,8 +76,7 @@ def get_sgsd_change_action(self: Self) -> str:
Returns:
str: Change action - add, delete
"""
new_value: dict[list[str]] = self.data
value = next(iter(new_value.keys()))
value = next(iter(self.data.keys()))
return "add" if value == "add" else "delete"

def get_opening_times_change_action(self: Self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from os import environ
from time import time_ns
from typing import Self
from zoneinfo import ZoneInfo

from aws_lambda_powertools.logging import Logger
from boto3 import client
from psycopg import Connection
from psycopg.rows import DictRow
from pytz import timezone

from ..service_update_logger import ServiceUpdateLogger
from .s3 import put_content_to_s3
Expand Down Expand Up @@ -153,7 +153,7 @@ def reject_pending_changes(connection: Connection, pending_changes: list[Pending
)
query_vars = {
"USER_NAME": DOS_INTEGRATION_USER_NAME,
"TIMESTAMP": datetime.now(timezone("Europe/London")),
"TIMESTAMP": datetime.now(ZoneInfo("Europe/London")),
}
cursor = query_dos_db(connection=connection, query=sql_query, query_vars=query_vars)
cursor.close()
Expand Down
Loading
Loading