Skip to content
21 changes: 6 additions & 15 deletions backend/app/api/routes/admin/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@
from app.domain.enums.events import EventType
from app.infrastructure.mappers import (
AdminReplayApiMapper,
EventDetailMapper,
EventFilterMapper,
EventMapper,
EventStatisticsMapper,
ReplaySessionMapper,
)
from app.schemas_pydantic.admin_events import (
EventBrowseRequest,
Expand Down Expand Up @@ -50,9 +46,8 @@ async def browse_events(request: EventBrowseRequest, service: FromDishka[AdminEv
sort_order=request.sort_order,
)

event_mapper = EventMapper()
return EventBrowseResponse(
events=[jsonable_encoder(event_mapper.to_dict(event)) for event in result.events],
events=[jsonable_encoder(event) for event in result.events],
total=result.total,
skip=result.skip,
limit=result.limit,
Expand All @@ -69,8 +64,7 @@ async def get_event_stats(
) -> EventStatsResponse:
try:
stats = await service.get_event_stats(hours=hours)
stats_mapper = EventStatisticsMapper()
return EventStatsResponse(**stats_mapper.to_dict(stats))
return EventStatsResponse.model_validate(stats)

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Expand Down Expand Up @@ -147,12 +141,10 @@ async def get_event_detail(event_id: str, service: FromDishka[AdminEventsService
if not result:
raise HTTPException(status_code=404, detail="Event not found")

detail_mapper = EventDetailMapper()
serialized_result = jsonable_encoder(detail_mapper.to_dict(result))
return EventDetailResponse(
event=serialized_result["event"],
related_events=serialized_result["related_events"],
timeline=serialized_result["timeline"],
event=jsonable_encoder(result.event),
related_events=[jsonable_encoder(e) for e in result.related_events],
timeline=[jsonable_encoder(e) for e in result.timeline],
)

except HTTPException:
Expand Down Expand Up @@ -209,8 +201,7 @@ async def get_replay_status(session_id: str, service: FromDishka[AdminEventsServ
if not status:
raise HTTPException(status_code=404, detail="Replay session not found")

replay_mapper = ReplaySessionMapper()
return EventReplayStatusResponse(**replay_mapper.status_detail_to_dict(status))
return EventReplayStatusResponse.model_validate(status)

except HTTPException:
raise
Expand Down
41 changes: 25 additions & 16 deletions backend/app/api/routes/admin/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@
from app.domain.enums.user import UserRole
from app.domain.rate_limit import UserRateLimit
from app.domain.user import UserUpdate as DomainUserUpdate
from app.infrastructure.mappers import AdminOverviewApiMapper, UserMapper
from app.schemas_pydantic.admin_user_overview import AdminUserOverview
from app.schemas_pydantic.admin_user_overview import (
AdminUserOverview,
DerivedCounts,
RateLimitSummary,
)
from app.schemas_pydantic.events import EventResponse, EventStatistics
from app.schemas_pydantic.user import (
DeleteUserResponse,
MessageResponse,
Expand Down Expand Up @@ -48,17 +52,20 @@ async def list_users(
role=role,
)

user_mapper = UserMapper()
summaries = await rate_limit_service.get_user_rate_limit_summaries([u.user_id for u in result.users])
user_responses: list[UserResponse] = []
for user in result.users:
user_dict = user_mapper.to_response_dict(user)
user_response = UserResponse.model_validate(user)
summary = summaries.get(user.user_id)
if summary:
user_dict["bypass_rate_limit"] = summary.bypass_rate_limit
user_dict["global_multiplier"] = summary.global_multiplier
user_dict["has_custom_limits"] = summary.has_custom_limits
user_responses.append(UserResponse(**user_dict))
user_response = user_response.model_copy(
update={
"bypass_rate_limit": summary.bypass_rate_limit,
"global_multiplier": summary.global_multiplier,
"has_custom_limits": summary.has_custom_limits,
}
)
user_responses.append(user_response)

return UserListResponse(
users=user_responses,
Expand All @@ -80,8 +87,7 @@ async def create_user(
domain_user = await admin_user_service.create_user(admin_username=admin.username, user_data=user_data)
except ValueError as ve:
raise HTTPException(status_code=400, detail=str(ve))
user_mapper = UserMapper()
return UserResponse(**user_mapper.to_response_dict(domain_user))
return UserResponse.model_validate(domain_user)


@router.get("/{user_id}", response_model=UserResponse)
Expand All @@ -94,8 +100,7 @@ async def get_user(
if not user:
raise HTTPException(status_code=404, detail="User not found")

user_mapper = UserMapper()
return UserResponse(**user_mapper.to_response_dict(user))
return UserResponse.model_validate(user)


@router.get("/{user_id}/overview", response_model=AdminUserOverview)
Expand All @@ -109,8 +114,13 @@ async def get_user_overview(
domain = await admin_user_service.get_user_overview(user_id=user_id, hours=24)
except ValueError:
raise HTTPException(status_code=404, detail="User not found")
mapper = AdminOverviewApiMapper()
return mapper.to_response(domain)
return AdminUserOverview(
user=UserResponse.model_validate(domain.user),
stats=EventStatistics.model_validate(domain.stats),
derived_counts=DerivedCounts.model_validate(domain.derived_counts),
rate_limit_summary=RateLimitSummary.model_validate(domain.rate_limit_summary),
recent_events=[EventResponse.model_validate(e).model_dump() for e in domain.recent_events],
)


@router.put("/{user_id}", response_model=UserResponse)
Expand Down Expand Up @@ -141,8 +151,7 @@ async def update_user(
if not updated_user:
raise HTTPException(status_code=500, detail="Failed to update user")

user_mapper = UserMapper()
return UserResponse(**user_mapper.to_response_dict(updated_user))
return UserResponse.model_validate(updated_user)


@router.delete("/{user_id}", response_model=DeleteUserResponse)
Expand Down
36 changes: 3 additions & 33 deletions backend/app/api/routes/dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,27 +59,8 @@ async def get_dlq_messages(
status=status, topic=topic, event_type=event_type, limit=limit, offset=offset
)

# Convert domain messages to response models
messages = [
DLQMessageResponse(
event_id=msg.event_id or "unknown",
event_type=msg.event_type,
original_topic=msg.original_topic,
error=msg.error,
retry_count=msg.retry_count,
failed_at=msg.failed_at or datetime(1970, 1, 1, tzinfo=timezone.utc),
status=DLQMessageStatus(msg.status),
age_seconds=msg.age_seconds,
details={
"producer_id": msg.producer_id,
"dlq_offset": msg.dlq_offset,
"dlq_partition": msg.dlq_partition,
"last_error": msg.last_error,
"next_retry_at": msg.next_retry_at,
},
)
for msg in result.messages
]
# Convert domain messages to response models using model_validate
messages = [DLQMessageResponse.model_validate(msg) for msg in result.messages]

return DLQMessagesResponse(messages=messages, total=result.total, offset=result.offset, limit=result.limit)

Expand Down Expand Up @@ -163,15 +144,4 @@ async def discard_dlq_message(
@router.get("/topics", response_model=List[DLQTopicSummaryResponse])
async def get_dlq_topics(repository: FromDishka[DLQRepository]) -> List[DLQTopicSummaryResponse]:
topics = await repository.get_topics_summary()
return [
DLQTopicSummaryResponse(
topic=topic.topic,
total_messages=topic.total_messages,
status_breakdown=topic.status_breakdown,
oldest_message=topic.oldest_message,
newest_message=topic.newest_message,
avg_retry_count=topic.avg_retry_count,
max_retry_count=topic.max_retry_count,
)
for topic in topics
]
return [DLQTopicSummaryResponse.model_validate(topic) for topic in topics]
66 changes: 42 additions & 24 deletions backend/app/api/routes/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
from app.core.utils import get_client_ip
from app.domain.enums.common import SortOrder
from app.domain.events.event_models import EventFilter
from app.infrastructure.kafka.events.metadata import EventMetadata
from app.infrastructure.mappers import EventMapper, EventStatisticsMapper
from app.infrastructure.kafka.events.metadata import AvroEventMetadata as EventMetadata
from app.schemas_pydantic.events import (
DeleteEventResponse,
EventAggregationRequest,
Expand All @@ -39,21 +38,30 @@ async def get_execution_events(
current_user: Annotated[UserResponse, Depends(current_user)],
event_service: FromDishka[EventService],
include_system_events: bool = Query(False, description="Include system-generated events"),
limit: int = Query(100, ge=1, le=1000),
skip: int = Query(0, ge=0),
) -> EventListResponse:
mapper = EventMapper()
events = await event_service.get_execution_events(
result = await event_service.get_execution_events(
execution_id=execution_id,
user_id=current_user.user_id,
user_role=current_user.role,
include_system_events=include_system_events,
limit=limit,
skip=skip,
)

if events is None:
if result is None:
raise HTTPException(status_code=403, detail="Access denied")

event_responses = [EventResponse(**mapper.to_dict(event)) for event in events]
event_responses = [EventResponse.model_validate(event) for event in result.events]

return EventListResponse(events=event_responses, total=len(event_responses), limit=1000, skip=0, has_more=False)
return EventListResponse(
events=event_responses,
total=result.total,
limit=limit,
skip=skip,
has_more=result.has_more,
)


@router.get("/user", response_model=EventListResponse)
Expand All @@ -68,7 +76,6 @@ async def get_user_events(
sort_order: SortOrder = Query(SortOrder.DESC),
) -> EventListResponse:
"""Get events for the current user"""
mapper = EventMapper()
result = await event_service.get_user_events_paginated(
user_id=current_user.user_id,
event_types=event_types,
Expand All @@ -79,7 +86,7 @@ async def get_user_events(
sort_order=sort_order,
)

event_responses = [EventResponse(**mapper.to_dict(event)) for event in result.events]
event_responses = [EventResponse.model_validate(event) for event in result.events]

return EventListResponse(
events=event_responses, total=result.total, limit=limit, skip=skip, has_more=result.has_more
Expand All @@ -92,7 +99,6 @@ async def query_events(
filter_request: EventFilterRequest,
event_service: FromDishka[EventService],
) -> EventListResponse:
mapper = EventMapper()
event_filter = EventFilter(
event_types=[str(et) for et in filter_request.event_types] if filter_request.event_types else None,
aggregate_id=filter_request.aggregate_id,
Expand All @@ -116,7 +122,7 @@ async def query_events(
if result is None:
raise HTTPException(status_code=403, detail="Cannot query other users' events")

event_responses = [EventResponse(**mapper.to_dict(event)) for event in result.events]
event_responses = [EventResponse.model_validate(event) for event in result.events]

return EventListResponse(
events=event_responses, total=result.total, limit=result.limit, skip=result.skip, has_more=result.has_more
Expand All @@ -130,43 +136,57 @@ async def get_events_by_correlation(
event_service: FromDishka[EventService],
include_all_users: bool = Query(False, description="Include events from all users (admin only)"),
limit: int = Query(100, ge=1, le=1000),
skip: int = Query(0, ge=0),
) -> EventListResponse:
mapper = EventMapper()
events = await event_service.get_events_by_correlation(
result = await event_service.get_events_by_correlation(
correlation_id=correlation_id,
user_id=current_user.user_id,
user_role=current_user.role,
include_all_users=include_all_users,
limit=limit,
skip=skip,
)

event_responses = [EventResponse(**mapper.to_dict(event)) for event in events]
event_responses = [EventResponse.model_validate(event) for event in result.events]

return EventListResponse(events=event_responses, total=len(event_responses), limit=limit, skip=0, has_more=False)
return EventListResponse(
events=event_responses,
total=result.total,
limit=limit,
skip=skip,
has_more=result.has_more,
)


@router.get("/current-request", response_model=EventListResponse)
async def get_current_request_events(
current_user: Annotated[UserResponse, Depends(current_user)],
event_service: FromDishka[EventService],
limit: int = Query(100, ge=1, le=1000),
skip: int = Query(0, ge=0),
) -> EventListResponse:
mapper = EventMapper()
correlation_id = CorrelationContext.get_correlation_id()
if not correlation_id:
return EventListResponse(events=[], total=0, limit=limit, skip=0, has_more=False)
return EventListResponse(events=[], total=0, limit=limit, skip=skip, has_more=False)

events = await event_service.get_events_by_correlation(
result = await event_service.get_events_by_correlation(
correlation_id=correlation_id,
user_id=current_user.user_id,
user_role=current_user.role,
include_all_users=False,
limit=limit,
skip=skip,
)

event_responses = [EventResponse(**mapper.to_dict(event)) for event in events]
event_responses = [EventResponse.model_validate(event) for event in result.events]

return EventListResponse(events=event_responses, total=len(event_responses), limit=limit, skip=0, has_more=False)
return EventListResponse(
events=event_responses,
total=result.total,
limit=limit,
skip=skip,
has_more=result.has_more,
)


@router.get("/statistics", response_model=EventStatistics)
Expand All @@ -190,20 +210,18 @@ async def get_event_statistics(
include_all_users=include_all_users,
)

stats_mapper = EventStatisticsMapper()
return EventStatistics(**stats_mapper.to_dict(stats))
return EventStatistics.model_validate(stats)


@router.get("/{event_id}", response_model=EventResponse)
async def get_event(
event_id: str, current_user: Annotated[UserResponse, Depends(current_user)], event_service: FromDishka[EventService]
) -> EventResponse:
"""Get a specific event by ID"""
mapper = EventMapper()
event = await event_service.get_event(event_id=event_id, user_id=current_user.user_id, user_role=current_user.role)
if event is None:
raise HTTPException(status_code=404, detail="Event not found")
return EventResponse(**mapper.to_dict(event))
return EventResponse.model_validate(event)


@router.post("/publish", response_model=PublishEventResponse)
Expand Down
Loading
Loading