Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion backend/.env
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ EVENT_RETENTION_DAYS=30
KAFKA_CONSUMER_GROUP_ID=integr8scode-backend
KAFKA_AUTO_OFFSET_RESET=earliest
KAFKA_ENABLE_AUTO_COMMIT=true
KAFKA_SESSION_TIMEOUT_MS=30000
KAFKA_SESSION_TIMEOUT_MS=45000
KAFKA_HEARTBEAT_INTERVAL_MS=10000
KAFKA_REQUEST_TIMEOUT_MS=40000
KAFKA_MAX_POLL_RECORDS=500

# WebSocket Configuration
Expand Down
11 changes: 10 additions & 1 deletion backend/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ KAFKA_TOPIC_PREFIX=test.
SCHEMA_SUBJECT_PREFIX=test.
SCHEMA_REGISTRY_URL=http://localhost:8081

# Reduce consumer pool and timeouts for faster test startup/teardown
# https://github.com/aio-libs/aiokafka/issues/773
SSE_CONSUMER_POOL_SIZE=1
KAFKA_SESSION_TIMEOUT_MS=6000
KAFKA_HEARTBEAT_INTERVAL_MS=2000
KAFKA_REQUEST_TIMEOUT_MS=5000

# Security
SECURE_COOKIES=true
BCRYPT_ROUNDS=4
Expand All @@ -33,7 +40,9 @@ BCRYPT_ROUNDS=4
RATE_LIMIT_ENABLED=true
ENABLE_TRACING=false

# OpenTelemetry - explicitly disabled for tests
# OpenTelemetry - disabled for tests
# Empty endpoint prevents OTLP exporter creation in setup_metrics()
# OTEL_SDK_DISABLED=true (set via pytest-env) provides additional safety
OTEL_EXPORTER_OTLP_ENDPOINT=

# Development
Expand Down
43 changes: 4 additions & 39 deletions backend/app/api/routes/dlq.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from datetime import datetime, timezone
from typing import List

from dishka import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from fastapi import APIRouter, Depends, HTTPException, Query
Expand Down Expand Up @@ -31,19 +28,7 @@
@router.get("/stats", response_model=DLQStats)
async def get_dlq_statistics(repository: FromDishka[DLQRepository]) -> DLQStats:
stats = await repository.get_dlq_stats()
return DLQStats(
by_status=stats.by_status,
by_topic=[{"topic": t.topic, "count": t.count, "avg_retry_count": t.avg_retry_count} for t in stats.by_topic],
by_event_type=[{"event_type": e.event_type, "count": e.count} for e in stats.by_event_type],
age_stats={
"min_age": stats.age_stats.min_age_seconds,
"max_age": stats.age_stats.max_age_seconds,
"avg_age": stats.age_stats.avg_age_seconds,
}
if stats.age_stats
else {},
timestamp=stats.timestamp,
)
return DLQStats.model_validate(stats, from_attributes=True)


@router.get("/messages", response_model=DLQMessagesResponse)
Expand All @@ -70,27 +55,7 @@ async def get_dlq_message(event_id: str, repository: FromDishka[DLQRepository])
message = await repository.get_message_by_id(event_id)
if not message:
raise HTTPException(status_code=404, detail="Message not found")

return DLQMessageDetail(
event_id=message.event_id,
event=message.event.model_dump(),
event_type=message.event_type,
original_topic=message.original_topic,
error=message.error,
retry_count=message.retry_count,
failed_at=message.failed_at or datetime(1970, 1, 1, tzinfo=timezone.utc),
status=DLQMessageStatus(message.status),
created_at=message.created_at,
last_updated=message.last_updated,
next_retry_at=message.next_retry_at,
retried_at=message.retried_at,
discarded_at=message.discarded_at,
discard_reason=message.discard_reason,
producer_id=message.producer_id,
dlq_offset=message.dlq_offset,
dlq_partition=message.dlq_partition,
last_error=message.last_error,
)
return DLQMessageDetail.model_validate(message, from_attributes=True)


@router.post("/retry", response_model=DLQBatchRetryResponse)
Expand Down Expand Up @@ -141,7 +106,7 @@ async def discard_dlq_message(
return MessageResponse(message=f"Message {event_id} discarded")


@router.get("/topics", response_model=List[DLQTopicSummaryResponse])
async def get_dlq_topics(repository: FromDishka[DLQRepository]) -> List[DLQTopicSummaryResponse]:
@router.get("/topics", response_model=list[DLQTopicSummaryResponse])
async def get_dlq_topics(repository: FromDishka[DLQRepository]) -> list[DLQTopicSummaryResponse]:
topics = await repository.get_topics_summary()
return [DLQTopicSummaryResponse.model_validate(topic) for topic in topics]
3 changes: 1 addition & 2 deletions backend/app/api/routes/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
from app.core.utils import get_client_ip
from app.domain.enums.common import SortOrder
from app.domain.events.event_models import EventFilter
from app.domain.events.typed import BaseEvent
from app.infrastructure.kafka.events.metadata import AvroEventMetadata as EventMetadata
from app.domain.events.typed import BaseEvent, EventMetadata
from app.schemas_pydantic.events import (
DeleteEventResponse,
EventAggregationRequest,
Expand Down
3 changes: 1 addition & 2 deletions backend/app/api/routes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
from app.domain.enums.events import EventType
from app.domain.enums.execution import ExecutionStatus
from app.domain.enums.user import UserRole
from app.domain.events.typed import BaseEvent, EventMetadata
from app.domain.exceptions import DomainError
from app.infrastructure.kafka.events.base import BaseEvent
from app.infrastructure.kafka.events.metadata import AvroEventMetadata as EventMetadata
from app.schemas_pydantic.execution import (
CancelExecutionRequest,
CancelResponse,
Expand Down
5 changes: 2 additions & 3 deletions backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from app.dlq.manager import DLQManager, create_dlq_manager
from app.domain.enums.kafka import KafkaTopic
from app.domain.saga.models import SagaConfig
from app.events.core import ProducerConfig, UnifiedProducer
from app.events.core import UnifiedProducer
from app.events.event_store import EventStore, create_event_store
from app.events.event_store_consumer import EventStoreConsumer, create_event_store_consumer
from app.events.schema.schema_registry import SchemaRegistryManager
Expand Down Expand Up @@ -160,8 +160,7 @@ class MessagingProvider(Provider):
async def get_kafka_producer(
self, settings: Settings, schema_registry: SchemaRegistryManager, logger: logging.Logger
) -> AsyncIterator[UnifiedProducer]:
config = ProducerConfig(bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS)
async with UnifiedProducer(config, schema_registry, logger, settings=settings) as producer:
async with UnifiedProducer(schema_registry, logger, settings) as producer:
yield producer

@provide
Expand Down
41 changes: 12 additions & 29 deletions backend/app/db/docs/dlq.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,25 @@
from datetime import datetime, timezone
from typing import Any

from beanie import Document, Indexed
from pydantic import ConfigDict, Field
from pymongo import ASCENDING, DESCENDING, IndexModel

from app.dlq.models import DLQMessageStatus
from app.domain.enums.events import EventType
from app.domain.events.typed import DomainEvent


class DLQMessageDocument(Document):
"""Unified DLQ message document for the entire system.
"""Unified DLQ message document. Access event_id/event_type via event.event_id, event.event_type."""

Copied from DLQMessage dataclass.
"""

# Core fields - always required
event: dict[str, Any] # The original event as dict (BaseEvent serialized)
event_id: Indexed(str, unique=True) # type: ignore[valid-type]
event_type: EventType # Indexed via Settings.indexes
original_topic: Indexed(str) # type: ignore[valid-type]
error: str # Error message from the failure
retry_count: Indexed(int) # type: ignore[valid-type]
failed_at: Indexed(datetime) # type: ignore[valid-type]
status: DLQMessageStatus # Indexed via Settings.indexes
producer_id: str # ID of the producer that sent to DLQ
model_config = ConfigDict(from_attributes=True)

# Optional fields
event: DomainEvent # Discriminated union - contains event_id, event_type
original_topic: Indexed(str) = "" # type: ignore[valid-type]
error: str = "Unknown error"
retry_count: Indexed(int) = 0 # type: ignore[valid-type]
failed_at: Indexed(datetime) = Field(default_factory=lambda: datetime.now(timezone.utc)) # type: ignore[valid-type]
status: DLQMessageStatus = DLQMessageStatus.PENDING
producer_id: str = "unknown"
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
last_updated: datetime | None = None
next_retry_at: Indexed(datetime) | None = None # type: ignore[valid-type]
Expand All @@ -36,25 +29,15 @@ class DLQMessageDocument(Document):
dlq_offset: int | None = None
dlq_partition: int | None = None
last_error: str | None = None

# Kafka message headers (optional)
headers: dict[str, str] = Field(default_factory=dict)

model_config = ConfigDict(from_attributes=True)

class Settings:
name = "dlq_messages"
use_state_management = True
indexes = [
IndexModel([("event_type", ASCENDING)], name="idx_dlq_event_type"),
IndexModel([("event.event_id", ASCENDING)], unique=True, name="idx_dlq_event_id"),
IndexModel([("event.event_type", ASCENDING)], name="idx_dlq_event_type"),
IndexModel([("status", ASCENDING)], name="idx_dlq_status"),
IndexModel([("failed_at", DESCENDING)], name="idx_dlq_failed_desc"),
# TTL index - auto-delete after 7 days
IndexModel([("created_at", ASCENDING)], name="idx_dlq_created_ttl", expireAfterSeconds=7 * 24 * 3600),
]

@property
def age_seconds(self) -> float:
"""Get message age in seconds since failure."""
failed_at: datetime = self.failed_at
return (datetime.now(timezone.utc) - failed_at).total_seconds()
Loading