Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/app/core/k8s_clients.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# mypy: disable-error-code="slop-any-check"
# Rationale: kubernetes client 31.0.0 has no type annotations (all Any)
import logging
from dataclasses import dataclass

Expand Down
118 changes: 31 additions & 87 deletions backend/app/core/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,20 @@
import logging
import re
from datetime import datetime, timezone
from typing import Any, Dict
from typing import Any

from opentelemetry import trace

correlation_id_context: contextvars.ContextVar[str | None] = contextvars.ContextVar("correlation_id", default=None)

request_metadata_context: contextvars.ContextVar[Dict[str, Any] | None] = contextvars.ContextVar(
request_metadata_context: contextvars.ContextVar[dict[str, Any] | None] = contextvars.ContextVar(
"request_metadata", default=None
)


class CorrelationFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
correlation_id = correlation_id_context.get()
if correlation_id:
record.correlation_id = correlation_id

metadata = request_metadata_context.get()
if metadata:
record.request_method = metadata.get("method")
record.request_path = metadata.get("path")
# Client IP is now safely extracted without DNS lookup
if metadata.get("client"):
record.client_host = metadata["client"].get("host")

return True


class JSONFormatter(logging.Formatter):
"""JSON formatter that reads context directly from typed sources."""

def _sanitize_sensitive_data(self, data: str) -> str:
"""Remove or mask sensitive information from log data."""
# Mask API keys, tokens, and similar sensitive data
Expand Down Expand Up @@ -59,89 +44,48 @@ def _sanitize_sensitive_data(self, data: str) -> str:
return data

def format(self, record: logging.LogRecord) -> str:
# Sanitize the message
message = self._sanitize_sensitive_data(record.getMessage())

log_data = {
log_data: dict[str, Any] = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": message,
"message": self._sanitize_sensitive_data(record.getMessage()),
}

if hasattr(record, "correlation_id"):
log_data["correlation_id"] = record.correlation_id

if hasattr(record, "request_method"):
log_data["request_method"] = record.request_method

if hasattr(record, "request_path"):
log_data["request_path"] = record.request_path

if hasattr(record, "client_host"):
log_data["client_host"] = record.client_host

# OpenTelemetry trace context (hexadecimal ids)
if hasattr(record, "trace_id"):
log_data["trace_id"] = record.trace_id
if hasattr(record, "span_id"):
log_data["span_id"] = record.span_id

if record.exc_info:
exc_text = self.formatException(record.exc_info)
log_data["exc_info"] = self._sanitize_sensitive_data(exc_text)

if hasattr(record, "stack_info") and record.stack_info:
stack_text = self.formatStack(record.stack_info)
log_data["stack_info"] = self._sanitize_sensitive_data(stack_text)
# Correlation context - read directly from typed ContextVar
(v := correlation_id_context.get()) and log_data.update(correlation_id=v)

# Request metadata - read directly from typed ContextVar
metadata = request_metadata_context.get() or {}
(v := metadata.get("method")) and log_data.update(request_method=v)
(v := metadata.get("path")) and log_data.update(request_path=v)
(v := (metadata.get("client") or {}).get("host")) and log_data.update(client_host=v)

# OpenTelemetry trace context - read directly from typed trace API
span = trace.get_current_span()
if span.is_recording():
span_context = span.get_span_context()
if span_context.is_valid:
log_data["trace_id"] = format(span_context.trace_id, "032x")
log_data["span_id"] = format(span_context.span_id, "016x")

record.exc_info and log_data.update(
exc_info=self._sanitize_sensitive_data(self.formatException(record.exc_info))
)
record.stack_info and log_data.update(
stack_info=self._sanitize_sensitive_data(self.formatStack(record.stack_info))
)

return json.dumps(log_data, ensure_ascii=False)


LOG_LEVELS: dict[str, int] = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}


def setup_logger(log_level: str) -> logging.Logger:
"""Create and configure the application logger. Called by DI with Settings.LOG_LEVEL."""
new_logger = logging.getLogger("integr8scode")
new_logger.handlers.clear()

console_handler = logging.StreamHandler()
formatter = JSONFormatter()

console_handler.setFormatter(formatter)

correlation_filter = CorrelationFilter()
console_handler.addFilter(correlation_filter)

class TracingFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
# Inline minimal helpers to avoid circular import on tracing.utils
span = trace.get_current_span()
trace_id = None
span_id = None
if span and span.is_recording():
span_context = span.get_span_context()
if span_context.is_valid:
trace_id = format(span_context.trace_id, "032x")
span_id = format(span_context.span_id, "016x")
if trace_id:
record.trace_id = trace_id
if span_id:
record.span_id = span_id
return True

console_handler.addFilter(TracingFilter())

console_handler.setFormatter(JSONFormatter())
new_logger.addHandler(console_handler)

level = LOG_LEVELS.get(log_level.upper(), logging.DEBUG)
new_logger.setLevel(level)
new_logger.setLevel(logging.getLevelNamesMapping().get(log_level.upper(), logging.DEBUG))

return new_logger
94 changes: 63 additions & 31 deletions backend/app/core/metrics/coordinator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from app.core.metrics.base import BaseMetrics
from app.domain.enums import ResourceType


class CoordinatorMetrics(BaseMetrics):
Expand Down Expand Up @@ -68,6 +69,18 @@ def _create_instruments(self) -> None:
name="coordinator.scheduling.decisions.total", description="Total scheduling decisions made", unit="1"
)

# Internal state tracking for gauge-like counters
self._active_executions_current: int = 0
self._exec_request_queue_size: int = 0
self._resource_cpu: float = 0.0
self._resource_memory: float = 0.0
self._resource_gpu: float = 0.0
self._resource_usage_cpu: float = 0.0
self._resource_usage_memory: float = 0.0
self._resource_usage_gpu: float = 0.0
self._rate_limiter_user: int = 0
self._rate_limiter_global: int = 0

def record_coordinator_processing_time(self, duration_seconds: float) -> None:
self.coordinator_processing_time.record(duration_seconds)

Expand All @@ -78,8 +91,7 @@ def update_active_executions_gauge(self, count: int) -> None:
"""Update the count of active executions (absolute value)."""
# Reset to 0 then set to new value (for gauge-like behavior)
# This is a workaround since we're using up_down_counter
current_val = getattr(self, "_active_executions_current", 0)
delta = count - current_val
delta = count - self._active_executions_current
if delta != 0:
self.coordinator_active_executions.add(delta)
self._active_executions_current = count
Expand All @@ -103,12 +115,10 @@ def record_queue_wait_time_by_priority(self, wait_seconds: float, priority: str,

def update_execution_request_queue_size(self, size: int) -> None:
"""Update the execution-only request queue depth (absolute value)."""
key = "_exec_request_queue_size"
current_val = getattr(self, key, 0)
delta = size - current_val
delta = size - self._exec_request_queue_size
if delta != 0:
self.execution_request_queue_depth.add(delta)
setattr(self, key, size)
self._exec_request_queue_size = size

def record_rate_limited(self, limit_type: str, user_id: str) -> None:
self.coordinator_rate_limited.add(1, attributes={"limit_type": limit_type, "user_id": user_id})
Expand All @@ -118,36 +128,55 @@ def update_rate_limit_wait_time(self, limit_type: str, user_id: str, wait_second
wait_seconds, attributes={"limit_type": limit_type, "user_id": user_id}
)

def record_resource_allocation(self, resource_type: str, amount: float, execution_id: str) -> None:
def record_resource_allocation(self, resource_type: ResourceType, amount: float, execution_id: str) -> None:
self.coordinator_resource_allocations.add(
1, attributes={"resource_type": resource_type, "execution_id": execution_id}
)

# Update gauge for current allocation
key = f"_resource_{resource_type}"
current_val = getattr(self, key, 0.0)
new_val = current_val + amount
setattr(self, key, new_val)

def record_resource_release(self, resource_type: str, amount: float, execution_id: str) -> None:
match resource_type:
case ResourceType.CPU:
self._resource_cpu += amount
case ResourceType.MEMORY:
self._resource_memory += amount
case ResourceType.GPU:
self._resource_gpu += amount

def record_resource_release(self, resource_type: ResourceType, amount: float, execution_id: str) -> None:
self.coordinator_resource_allocations.add(
-1, attributes={"resource_type": resource_type, "execution_id": execution_id}
)

# Update gauge for current allocation
key = f"_resource_{resource_type}"
current_val = getattr(self, key, 0.0)
new_val = max(0.0, current_val - amount)
setattr(self, key, new_val)

def update_resource_usage(self, resource_type: str, usage_percent: float) -> None:
match resource_type:
case ResourceType.CPU:
self._resource_cpu = max(0.0, self._resource_cpu - amount)
case ResourceType.MEMORY:
self._resource_memory = max(0.0, self._resource_memory - amount)
case ResourceType.GPU:
self._resource_gpu = max(0.0, self._resource_gpu - amount)

def update_resource_usage(self, resource_type: ResourceType, usage_percent: float) -> None:
# Record as a gauge-like metric
key = f"_resource_usage_{resource_type}"
current_val = getattr(self, key, 0.0)
delta = usage_percent - current_val
if delta != 0:
self.coordinator_resource_utilization.add(delta, attributes={"resource_type": resource_type})
setattr(self, key, usage_percent)
match resource_type:
case ResourceType.CPU:
delta = usage_percent - self._resource_usage_cpu
delta != 0 and self.coordinator_resource_utilization.add(
delta, attributes={"resource_type": resource_type}
)
self._resource_usage_cpu = usage_percent
case ResourceType.MEMORY:
delta = usage_percent - self._resource_usage_memory
delta != 0 and self.coordinator_resource_utilization.add(
delta, attributes={"resource_type": resource_type}
)
self._resource_usage_memory = usage_percent
case ResourceType.GPU:
delta = usage_percent - self._resource_usage_gpu
delta != 0 and self.coordinator_resource_utilization.add(
delta, attributes={"resource_type": resource_type}
)
self._resource_usage_gpu = usage_percent

def record_scheduling_decision(self, decision: str, reason: str) -> None:
self.coordinator_scheduling_decisions.add(1, attributes={"decision": decision, "reason": reason})
Expand All @@ -167,12 +196,15 @@ def record_priority_change(self, execution_id: str, old_priority: str, new_prior

def update_rate_limiter_tokens(self, limit_type: str, tokens: int) -> None:
# Track tokens as gauge-like metric
key = f"_rate_limiter_{limit_type}"
current_val = getattr(self, key, 0)
delta = tokens - current_val
if delta != 0:
self.coordinator_resource_utilization.add(delta, attributes={"resource_type": f"rate_limit_{limit_type}"})
setattr(self, key, tokens)
attrs = {"resource_type": f"rate_limit_{limit_type}"}
if limit_type == "user":
delta = tokens - self._rate_limiter_user
delta != 0 and self.coordinator_resource_utilization.add(delta, attributes=attrs)
self._rate_limiter_user = tokens
elif limit_type == "global":
delta = tokens - self._rate_limiter_global
delta != 0 and self.coordinator_resource_utilization.add(delta, attributes=attrs)
self._rate_limiter_global = tokens

def record_rate_limit_reset(self, limit_type: str, user_id: str) -> None:
self.coordinator_scheduling_decisions.add(
Expand Down
4 changes: 3 additions & 1 deletion backend/app/domain/enums/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from app.domain.enums.common import ErrorType, SortOrder, Theme
from app.domain.enums.common import Environment, ErrorType, ResourceType, SortOrder, Theme
from app.domain.enums.execution import ExecutionStatus
from app.domain.enums.health import AlertSeverity, AlertStatus, ComponentStatus
from app.domain.enums.notification import (
Expand All @@ -12,7 +12,9 @@

__all__ = [
# Common
"Environment",
"ErrorType",
"ResourceType",
"SortOrder",
"Theme",
# Execution
Expand Down
8 changes: 8 additions & 0 deletions backend/app/domain/enums/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,11 @@ class Environment(StringEnum):
STAGING = "staging"
PRODUCTION = "production"
TEST = "test"


class ResourceType(StringEnum):
"""Types of compute resources for metrics and allocation."""

CPU = "cpu"
MEMORY = "memory"
GPU = "gpu"
2 changes: 2 additions & 0 deletions backend/app/events/core/consumer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# mypy: disable-error-code="slop-any-check"
# Rationale: aiokafka message headers are untyped (Any)
import asyncio
import logging
from collections.abc import Awaitable, Callable
Expand Down
4 changes: 2 additions & 2 deletions backend/app/services/admin/admin_user_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ async def create_user(self, *, admin_username: str, user_data: UserCreate) -> Us
username=user_data.username,
email=user_data.email,
hashed_password=hashed_password,
role=getattr(user_data, "role", UserRole.USER),
is_active=getattr(user_data, "is_active", True),
role=user_data.role,
is_active=user_data.is_active,
is_superuser=False,
)
created = await self._users.create_user(create_data)
Expand Down
5 changes: 2 additions & 3 deletions backend/app/services/coordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async def _on_stop(self) -> None:
await self.queue_manager.stop()

# Close idempotency manager
if hasattr(self, "idempotency_manager") and self.idempotency_manager:
if self.idempotency_manager:
await self.idempotency_manager.close()

self.logger.info(f"ExecutionCoordinator service stopped. Active executions: {len(self._active_executions)}")
Expand Down Expand Up @@ -360,8 +360,7 @@ async def _schedule_execution(self, event: ExecutionRequestedEvent) -> None:

# Track metrics
queue_time = start_time - event.timestamp.timestamp()
priority = getattr(event, "priority", QueuePriority.NORMAL.value)
self.metrics.record_coordinator_queue_time(queue_time, QueuePriority(priority).name)
self.metrics.record_coordinator_queue_time(queue_time, QueuePriority(event.priority).name)

scheduling_duration = time.time() - start_time
self.metrics.record_coordinator_scheduling_duration(scheduling_duration)
Expand Down
7 changes: 4 additions & 3 deletions backend/app/services/coordinator/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Dict, List

from app.core.metrics.context import get_coordinator_metrics
from app.domain.enums import ResourceType


@dataclass
Expand Down Expand Up @@ -311,14 +312,14 @@ def _update_metrics(self) -> None:
"""Update metrics"""
cpu_usage = self.pool.total_cpu_cores - self.pool.available_cpu_cores
cpu_percent = cpu_usage / self.pool.total_cpu_cores * 100
self.metrics.update_resource_usage("cpu", cpu_percent)
self.metrics.update_resource_usage(ResourceType.CPU, cpu_percent)

memory_usage = self.pool.total_memory_mb - self.pool.available_memory_mb
memory_percent = memory_usage / self.pool.total_memory_mb * 100
self.metrics.update_resource_usage("memory", memory_percent)
self.metrics.update_resource_usage(ResourceType.MEMORY, memory_percent)

gpu_usage = self.pool.total_gpu_count - self.pool.available_gpu_count
gpu_percent = gpu_usage / max(1, self.pool.total_gpu_count) * 100
self.metrics.update_resource_usage("gpu", gpu_percent)
self.metrics.update_resource_usage(ResourceType.GPU, gpu_percent)

self.metrics.update_coordinator_active_executions(len(self._allocations))
Loading