From b54959ee90940832e4c4dfa71b070950419b8661 Mon Sep 17 00:00:00 2001 From: Husain Baghwala Date: Thu, 31 Jul 2025 12:56:50 +0530 Subject: [PATCH] docs: add comprehensive documentation for database flows and component structures --- docs/CHAT_COMPLETION_FLOW.md | 688 +++++------------- .../controllers/chat-completion-controller.md | 382 ++++++++++ .../components/controllers/data-structures.md | 454 ++++++++++++ docs/components/services/ai-service-layer.md | 587 +++++++++++++++ .../services/configuration-service.md | 572 +++++++++++++++ docs/flows/database-operations-flow.md | 395 ++++++++++ docs/flows/middleware-flow.md | 210 ++++++ docs/flows/response-formatting-flow.md | 431 +++++++++++ docs/flows/service-layer-flow.md | 357 +++++++++ 9 files changed, 3557 insertions(+), 519 deletions(-) create mode 100644 docs/components/controllers/chat-completion-controller.md create mode 100644 docs/components/controllers/data-structures.md create mode 100644 docs/components/services/ai-service-layer.md create mode 100644 docs/components/services/configuration-service.md create mode 100644 docs/flows/database-operations-flow.md create mode 100644 docs/flows/middleware-flow.md create mode 100644 docs/flows/response-formatting-flow.md create mode 100644 docs/flows/service-layer-flow.md diff --git a/docs/CHAT_COMPLETION_FLOW.md b/docs/CHAT_COMPLETION_FLOW.md index fa1254ee..7e7ca34f 100644 --- a/docs/CHAT_COMPLETION_FLOW.md +++ b/docs/CHAT_COMPLETION_FLOW.md @@ -1,540 +1,190 @@ -# AI Middleware - Completion API Flow Documentation - -## Overview -This document provides a comprehensive flow of the Completion API from request initiation to service execution. The flow demonstrates how a chat completion request is processed through various layers of middleware, configuration, and service handlers. - -## API Entry Point - -### Route Definition -- **Endpoint**: `/api/v2/model/chat/completion` -- **Method**: POST -- **File**: `index.py` (Line 167) -- **Router**: `v2_router` with prefix `/api/v2/model` - -### Route Handler -- **File**: `src/routes/v2/modelRouter.py` -- **Function**: `chat_completion` -- **Dependencies**: - - `auth_and_rate_limit` (JWT middleware + rate limiting) - - `add_configuration_data_to_body` (configuration middleware) - -## Flow Breakdown - -### 1. Request Processing & Middleware - -#### Authentication & Rate Limiting -- **JWT Middleware**: Validates authentication tokens -- **Rate Limiting**: - - 100 points per `bridge_id` - - 20 points per `thread_id` - -#### Configuration Middleware -- **File**: `src/middlewares/getDataUsingBridgeId.py` -- **Function**: `add_configuration_data_to_body` -- **Purpose**: Enriches request body with configuration data - -**Key Operations:** -- Extracts `bridge_id`, `org_id`, `version_id` from request -- Calls `getConfiguration()` to fetch complete configuration -- Validates model and service existence -- Validates organization permissions for custom models - -**Required Fields:** -- `user` message (mandatory unless images provided) -- Valid `service` and `model` combination -- Valid `bridge_id` or `agent_id` - -### 2. Configuration Retrieval - -#### Main Configuration Function -- **File**: `src/services/utils/getConfiguration.py` -- **Function**: `getConfiguration` - -**Input Parameters:** -- `configuration`: Base configuration to merge -- `service`: AI service name -- `bridge_id`: Bridge identifier -- `apikey`: API key for service -- `template_id`: Optional template ID -- `variables`: Variables for prompt replacement -- `org_id`: Organization ID -- `variables_path`: Path for variables -- `version_id`: Version ID for bridge -- `extra_tools`: Additional tools to include -- `built_in_tools`: Built-in tools to include - -#### Bridge Data Retrieval -- **File**: `src/services/utils/getConfiguration_utils.py` -- **Function**: `get_bridge_data` -- **Database Service**: `src/db_services/ConfigurationServices.py` -- **Function**: `get_bridges_with_tools_and_apikeys` - -**Database Operations:** -- Aggregation pipeline joins multiple collections: - - `bridges` (main configuration) - - `apicalls` (tools/functions) - - `apikeycredentials` (API keys) - - `rag_parent_datas` (RAG documents) -- Redis caching for performance optimization -- Converts ObjectIds to strings for JSON serialization - -**Retrieved Data:** -- Bridge configuration and settings -- Associated tools and API calls -- API keys for different services -- RAG (Retrieval Augmented Generation) data -- Pre-tools configuration -- Memory settings and context - -#### Configuration Assembly -The `getConfiguration` function assembles: - -**Core Configuration:** -- `prompt`: System prompt with tone and response style -- `model`: AI model to use -- `tools`: Available function tools -- `tool_choice`: Tool selection strategy -- `temperature`, `max_tokens`, etc.: Model parameters - -**Metadata:** -- `service`: AI service provider -- `apikey`: Service API key -- `bridge_id`: Bridge identifier -- `org_id`: Organization ID -- `variables`: Prompt variables -- `rag_data`: Document data for RAG -- `gpt_memory`: Memory settings -- `tool_call_count`: Maximum tool calls allowed - -**Output Structure:** -```json -{ - "success": true, - "configuration": { /* merged configuration */ }, - "service": "openai", - "apikey": "sk-...", - "pre_tools": { "name": "tool_name", "args": {} }, - "variables": { /* processed variables */ }, - "rag_data": [ /* document data */ ], - "tools": [ /* available tools */ ], - "tool_id_and_name_mapping": { /* tool mappings */ }, - "gpt_memory": true, - "version_id": "version_123", - "bridge_id": "bridge_456" -} +# AI Middleware - Chat Completion API Flow + +## 📋 Overview +This document provides a **master overview** of the Chat Completion API flow from request initiation to response delivery. It serves as a navigation map for understanding the complete system architecture and cross-component interactions. + +> **📚 Detailed Documentation**: For in-depth component details, see the [flows/](./flows/) and [components/](./components/) directories. + +## 🏗️ System Architecture + +```mermaid +graph TD + A[Client Request] --> B[FastAPI Router] + B --> C[Auth & Rate Limit] + C --> D[Configuration Middleware] + D --> E[Chat Controller] + E --> F[Service Selection] + F --> G[AI Service Execution] + G --> H[Response Processing] + H --> I[Background Tasks] + I --> J[Client Response] + + style A fill:#e1f5fe + style J fill:#e8f5e8 + style G fill:#fff3e0 ``` -### 3. Request Routing Decision - -#### Response Format Check -- **Queue Processing**: If `response_format.type != 'default'` - - Publishes message to queue for async processing - - Returns immediate acknowledgment -- **Synchronous Processing**: For default response format - - Continues to chat function - -#### Type-Based Routing -- **Embedding Type**: Routes to `embedding()` function -- **Chat Type**: Routes to `chat()` function (main flow) - -### 4. Chat Function Processing - -#### File: `src/services/commonServices/common.py` -#### Function: `chat(request_body)` - -**Step-by-Step Processing:** - -1. **Request Parsing** (`parse_request_body`) - - Extracts all required fields from request body - - Initializes default values - - Creates structured data object - -2. **Template Enhancement** - - Adds default template with current time reference - - Adds user message to variables as `_user_message` - -3. **Timer Initialization** - - Creates Timer object for performance tracking - - Starts timing for overall API execution - -4. **Model Configuration Loading** - - Loads model configuration from `model_config_document` - - Extracts custom configuration based on user input - - Handles fine-tuned model selection - -5. **Pre-Tools Execution** - - Executes pre-configured tools if specified - - Makes HTTP calls to external functions - - Stores results in variables - -6. **Thread Management** - - Creates or retrieves conversation thread - - Manages sub-thread relationships - - Loads conversation history - -7. **Prompt Preparation** - - Replaces variables in prompt template - - Applies system templates if specified - - Handles memory context for GPT memory - - Identifies missing required variables - -8. **Custom Settings Configuration** - - Applies service-specific configurations - - Handles response type conversions - - Manages JSON schema formatting - -9. **Service Parameter Building** - - Assembles all parameters for service execution - - Includes token calculator for cost tracking - - Prepares execution context - -### 5. Service Handler Creation and Execution - -#### Service Handler Factory -- **File**: `src/services/utils/helper.py` -- **Function**: `Helper.create_service_handler` - -**Service Mapping:** -- `openai` → `UnifiedOpenAICase` -- `gemini` → `GeminiHandler` -- `anthropic` → `Antrophic` -- `groq` → `Groq` -- `openai_response` → `OpenaiResponse` -- `open_router` → `OpenRouter` -- `mistral` → `Mistral` - -#### Service Execution -- **Method**: `class_obj.execute()` -- **Purpose**: Executes the AI service call with prepared parameters -- **Returns**: Service response with usage metrics and content - -## Key Data Structures - -### Parsed Request Data -```json -{ - "bridge_id": "string", - "configuration": { /* AI model config */ }, - "thread_id": "string", - "sub_thread_id": "string", - "org_id": "string", - "user": "string", - "service": "string", - "model": "string", - "variables": { /* prompt variables */ }, - "tools": [ /* available tools */ ], - "is_playground": boolean, - "response_format": { /* response formatting */ }, - "files": [ /* uploaded files */ ], - "images": [ /* image inputs */ ] -} +## 🚀 Quick Start + +### API Endpoint +```http +POST /api/v2/model/chat/completion +Content-Type: application/json +Authorization: Bearer ``` -### Service Parameters -```json -{ - "customConfig": { /* model-specific config */ }, - "configuration": { /* full configuration */ }, - "apikey": "string", - "user": "string", - "tools": [ /* function tools */ ], - "org_id": "string", - "bridge_id": "string", - "thread_id": "string", - "model": "string", - "service": "string", - "token_calculator": { /* cost tracking */ }, - "variables": { /* prompt variables */ }, - "memory": "string", - "rag_data": [ /* document data */ ] -} +### Entry Point Chain +``` +index.py:167 → v2_router → modelRouter.py:chat_completion ``` -## Error Handling +**Middleware Stack:** +- 🔐 `auth_and_rate_limit` - JWT validation + rate limiting +- ⚙️ `add_configuration_data_to_body` - Configuration enrichment + +> **📝 Details**: See [components/controllers/chat-completion-controller.md](./components/controllers/chat-completion-controller.md) + +## 🔄 Complete Flow Overview + +### End-to-End Flow Diagram + +```mermaid +sequenceDiagram + participant C as Client + participant R as Router + participant M as Middleware + participant CC as Chat Controller + participant CS as Config Service + participant AS as AI Service + participant DB as Database + participant BG as Background Tasks + + C->>R: POST /api/v2/model/chat/completion + R->>M: auth_and_rate_limit + M->>M: JWT validation + rate check + M->>M: add_configuration_data_to_body + M->>CS: getConfiguration(bridge_id) + CS->>M: configuration + validation + M->>CC: chat_completion(enriched_request) + CC->>CC: parse_request_data + CC->>AS: service.execute(params) + AS->>AS: AI model API call + AS->>CC: service_response + CC->>BG: process_background_tasks + BG->>DB: save metrics + conversation + CC->>C: formatted_response +``` -### Validation Errors -- Missing required fields (user message) -- Invalid model/service combinations -- Organization permission violations -- Invalid bridge_id or configuration +### 📋 Flow Steps Summary -### Processing Errors -- Database connection failures -- External API call failures -- Template processing errors -- Variable replacement failures +| Step | Component | Purpose | Details | +|------|-----------|---------|----------| +| 1️⃣ | **Authentication** | Security & Rate Limiting | [middleware-flow.md](./flows/middleware-flow.md) | +| 2️⃣ | **Configuration** | Request Enrichment | [middleware-flow.md](./flows/middleware-flow.md) | +| 3️⃣ | **Chat Controller** | Request Processing | [service-layer-flow.md](./flows/service-layer-flow.md) | +| 4️⃣ | **Service Selection** | AI Provider Routing | [service-layer-flow.md](./flows/service-layer-flow.md) | +| 5️⃣ | **AI Execution** | Model API Calls | [service-layer-flow.md](./flows/service-layer-flow.md) | +| 6️⃣ | **Response Processing** | Format & Validate | [response-formatting-flow.md](./flows/response-formatting-flow.md) | +| 7️⃣ | **Background Tasks** | Metrics & Storage | [database-operations-flow.md](./flows/database-operations-flow.md) | -### Response Format +## 📊 Key Components Overview + +### 🔐 Authentication & Security +- **JWT Middleware**: Token validation and user authentication +- **Rate Limiting**: 100 points per `bridge_id`, 20 points per `thread_id` +- **Organization Access Control**: Bridge-level permission validation + +> **📝 Details**: [flows/middleware-flow.md](./flows/middleware-flow.md) + +### ⚙️ Configuration Management +- **Configuration Service**: `src/services/utils/getConfiguration.py` +- **Redis Caching**: Bridge configurations cached by ID +- **Model Validation**: Service/model compatibility checks + +> **📝 Details**: [components/services/configuration-service.md](./components/services/configuration-service.md) + +**Supported AI Providers:** +- OpenAI (GPT models) +- Anthropic (Claude) +- Google (Gemini) +- Groq, Mistral, OpenRouter + +**Service Selection Logic:** +- Dynamic service routing based on configuration +- Fallback model support for reliability +- Concurrent tool execution capabilities + +> **📝 Details**: [components/services/ai-service-layer.md](./components/services/ai-service-layer.md) + +### 💾 Database & Metrics +- **PostgreSQL**: Main data storage +- **TimescaleDB**: Time-series metrics +- **Redis**: Caching layer +- **MongoDB**: Configuration storage + +> **📝 Details**: [flows/database-operations-flow.md](./flows/database-operations-flow.md) + +## 📈 Performance Features + +### ⚡ Optimization Strategies +- **Redis Caching**: Configuration and usage data +- **Connection Pooling**: Efficient database connections +- **Background Processing**: Non-blocking operations +- **Concurrent Execution**: Parallel tool calls + +### 🔄 Reliability Features +- **Model Fallbacks**: Automatic alternative model execution +- **Retry Mechanisms**: Graceful failure handling +- **Error Recovery**: Comprehensive error management +- **Rate Limiting**: Request throttling protection + +## 📝 Data Structures + +### Request Format ```json { - "success": false, - "error": "Error description" + "bridge_id": "string", + "user": "string", + "service": "openai", + "model": "gpt-4", + "thread_id": "string", + "tools": [], + "variables": {}, + "response_format": { "type": "default" } } ``` -## Performance Optimizations - -### Caching Strategy -- **Redis Cache**: Bridge configurations cached by `bridge_id` or `version_id` -- **File Cache**: Uploaded files cached by thread context -- **Memory Cache**: GPT memory context cached by thread ID - -### Async Processing -- **Queue System**: Non-default response formats processed asynchronously -- **Background Tasks**: Metrics and logging handled in background -- **Thread Pool**: Executor for CPU-intensive operations - -## Security Considerations - -### Authentication -- JWT token validation required -- Organization-level access control -- API key validation per service - -### Rate Limiting -- Per-bridge rate limiting (100 points) -- Per-thread rate limiting (20 points) -- Configurable rate limit windows - -### Data Isolation -- Organization-level data segregation -- Bridge-level permission validation -- Secure API key storage and retrieval - -## 6. Service Execution (OpenAI Example) - -### Service Handler Execution -- **File**: `src/services/commonServices/openAI/openaiCall.py` -- **Class**: `UnifiedOpenAICase` -- **Method**: `execute()` - -#### Conversation Creation -- **Function**: `ConversationService.createOpenAiConversation` -- **File**: `src/services/commonServices/createConversations.py` -- **Purpose**: Converts conversation history to OpenAI format - -**Conversation Processing:** -- Adds memory context if GPT memory is enabled -- Processes conversation history with role-based formatting -- Handles image URLs and file attachments -- Creates proper message structure for different AI services - -**Service-Specific Conversation Formats:** -- `createOpenAiConversation`: Standard OpenAI chat format -- `createOpenAiResponseConversation`: OpenAI Response API format -- `createAnthropicConversation`: Anthropic Claude format -- `createGroqConversation`: Groq API format -- `createGeminiConversation`: Google Gemini format -- `create_mistral_ai_conversation`: Mistral AI format - -#### Model API Call -- **Function**: `self.chats()` -- **File**: `src/services/commonServices/baseService/baseService.py` -- **Purpose**: Routes to appropriate service model runner - -**Service Routing:** -- `openai` → `runModel` (OpenAI API) -- `openai_response` → `openai_response_model` (OpenAI Response API) -- `anthropic` → `anthropic_runmodel` -- `groq` → `groq_runmodel` -- `gemini` → `gemini_modelrun` -- `mistral` → `mistral_model_run` -- `open_router` → `openrouter_modelrun` - -#### OpenAI Model Execution -- **File**: `src/services/commonServices/openAI/runModel.py` -- **Function**: `runModel` - -**Key Features:** -- Async OpenAI client initialization -- Retry mechanism with alternative model fallback -- Performance timing and logging -- Error handling with status codes -- Support for both standard and response APIs - -**Retry Logic:** -- Primary model: `o3` → Fallback: `gpt-4o-2024-08-06` -- Primary model: `gpt-4o` → Fallback: `o3` -- Default fallback: `gpt-4o` - -## 7. Tool Call Detection and Execution - -### Tool Call Detection -- **Check**: `len(modelResponse.get('choices', [])[0].get('message', {}).get("tool_calls", [])) > 0` -- **Purpose**: Determines if AI model wants to execute functions - -### Function Call Processing -- **Function**: `self.function_call()` -- **File**: `src/services/commonServices/baseService/baseService.py` -- **Recursive**: Supports multiple rounds of tool calling - -#### Tool Call Flow: - -1. **Tool Validation**: Checks if response contains valid tool calls -2. **Tool Execution**: Runs requested tools concurrently -3. **Configuration Update**: Adds tool results to conversation -4. **Model Re-call**: Sends updated conversation back to AI -5. **Recursion**: Repeats until no more tool calls or limit reached - -#### Tool Execution Process -- **Function**: `self.run_tool()` -- **Steps**: - 1. **Code Mapping**: `make_code_mapping_by_service()` - Extracts tool calls by service - 2. **Variable Replacement**: `replace_variables_in_args()` - Processes tool arguments - 3. **Tool Processing**: `process_data_and_run_tools()` - Executes tools concurrently - -#### Service-Specific Tool Call Extraction -- **OpenAI/Groq/Mistral**: `tool_calls` array with `function` objects -- **OpenAI Response**: `output` array with `function_call` type -- **Anthropic**: `content` array with `tool_use` type -- **Gemini**: Similar to OpenAI format - -#### Tool Types and Execution -- **Regular Tools**: HTTP calls to external APIs -- **RAG Tools**: Vector database queries -- **Agent Tools**: Calls to other AI agents -- **Built-in Tools**: Internal system functions - -**Concurrent Execution:** -- Uses `asyncio.gather()` for parallel tool execution -- Handles exceptions gracefully -- Returns formatted responses for each tool call - -#### Tool Call Limits -- **Maximum Rounds**: Configurable per bridge (`tool_call_count`) -- **Default**: 3 rounds of tool calling -- **Prevention**: Avoids infinite loops - -## 8. Response Processing and Formatting - -### Response Formatting -- **Function**: `Response_formatter()` -- **File**: `src/services/utils/ai_middleware_format.py` -- **Purpose**: Standardizes responses across all AI services - -#### Unified Response Structure +### Response Format ```json { - "data": { - "id": "response_id", - "content": "AI response text", - "model": "model_name", - "role": "assistant", - "finish_reason": "stop", - "tools_data": { /* tool execution results */ }, - "images": [ /* image URLs */ ], - "annotations": [ /* response annotations */ ], - "fallback": false, - "firstAttemptError": "" - }, - "usage": { - "input_tokens": 100, - "output_tokens": 50, - "total_tokens": 150, - "cached_tokens": 20 - } + "success": true, + "output": [{ + "content": { "text": "AI response" }, + "usage": { "input_tokens": 100, "output_tokens": 50 } + }] } ``` -#### Service-Specific Formatting -- **OpenAI**: Standard chat completion format -- **OpenAI Response**: Special handling for reasoning and function calls -- **Anthropic**: Content array processing -- **Gemini**: Google-specific response structure -- **Groq**: Similar to OpenAI with service-specific fields - -### History Preparation -- **Function**: `prepare_history_params()` -- **Purpose**: Creates data structure for conversation storage - -**History Parameters:** -- Thread and message identification -- User input and AI response -- Tool execution data -- Model configuration -- Usage metrics -- Error information -- File attachments - -## 9. Final Processing and Response - -### Post-Execution Processing -- **File**: `src/services/commonServices/common.py` -- **Function**: `chat()` (continuation) - -#### Success Flow: -1. **Error Checking**: Validates service execution success -2. **Retry Alerts**: Handles fallback model notifications -3. **Chatbot Processing**: Special handling for chatbot responses -4. **Usage Calculation**: Token and cost calculations -5. **Response Formatting**: Final response structure -6. **Background Tasks**: Async database operations - -#### Response Delivery -- **Playground Mode**: Direct JSON response -- **Production Mode**: - - WebSocket/webhook delivery for configured formats - - Database storage - - Usage tracking - - Error notifications - -### Background Data Processing -- **Function**: `process_background_tasks()` -- **File**: `src/services/utils/common_utils.py` - -**Background Operations:** -1. **Metrics Creation**: `create()` from metrics_service -2. **Sub-queue Publishing**: Message queue for downstream processing -3. **Conversation Storage**: Thread and message persistence - -## 10. Database Operations and Metrics - -### Metrics Collection -- **File**: `src/db_services/metrics_service.py` -- **Function**: `create()` - -#### Data Storage Flow: -1. **Conversation History**: Saves to conversation database -2. **Raw Data Insertion**: PostgreSQL metrics table -3. **TimescaleDB Metrics**: Time-series data for analytics -4. **Token Caching**: Redis cache for usage tracking - -#### Stored Metrics: -- **Usage Data**: Input/output tokens, costs -- **Performance Data**: Latency, execution times -- **Error Data**: Failure reasons, retry information -- **Configuration Data**: Model settings, tool usage -- **Organizational Data**: Bridge, version, user tracking - -### Error Handling and Alerts -- **Webhook Notifications**: Real-time error alerts -- **Usage Limit Tracking**: Token consumption monitoring -- **Performance Monitoring**: Latency and failure tracking -- **Retry Mechanism Alerts**: Fallback model usage notifications - -## Complete Flow Summary - -### Request Journey: -1. **API Entry** → Authentication & Rate Limiting -2. **Middleware** → Configuration Retrieval & Validation -3. **Chat Function** → Request Processing & Setup -4. **Service Handler** → AI Model Execution -5. **Tool Processing** → Function Call Execution (if needed) -6. **Response Formatting** → Standardized Output -7. **Background Tasks** → Database Storage & Metrics -8. **Response Delivery** → Client Response - -### Key Performance Features: -- **Concurrent Tool Execution**: Parallel function calling -- **Redis Caching**: Configuration and usage caching -- **Retry Mechanisms**: Automatic fallback handling -- **Background Processing**: Non-blocking database operations -- **Connection Pooling**: Efficient database connections +> **📝 Details**: [components/controllers/data-structures.md](./components/controllers/data-structures.md) + +## 🔗 Navigation Guide + +### 📁 Detailed Flow Documentation +- **[middleware-flow.md](./flows/middleware-flow.md)** - Authentication, rate limiting, configuration +- **[service-layer-flow.md](./flows/service-layer-flow.md)** - AI service selection and execution +- **[response-formatting-flow.md](./flows/response-formatting-flow.md)** - Response processing and formatting +- **[database-operations-flow.md](./flows/database-operations-flow.md)** - Metrics collection and storage + +### 📝 Component Documentation +- **[components/controllers/](./components/controllers/)** - API controllers and routing +- **[components/services/](./components/services/)** - Business logic and AI services +- **[components/middleware/](./components/middleware/)** - Request processing middleware + +### 🔍 Quick References +- **Error Handling**: See [components/services/error-handling.md](./components/services/error-handling.md) +- **Security**: See [components/middleware/security.md](./components/middleware/security.md) +- **Performance**: See [flows/performance-optimization.md](./flows/performance-optimization.md) -### Error Recovery: -- **Model Fallbacks**: Alternative model execution -- **Tool Error Handling**: Graceful function failure management -- **Database Resilience**: Error logging and recovery -- **User Notifications**: Real-time error feedback +--- -This complete flow ensures robust, scalable, and reliable AI service execution with comprehensive monitoring and error handling capabilities. +**📚 For complete implementation details, navigate to the specific component documentation linked above.** diff --git a/docs/components/controllers/chat-completion-controller.md b/docs/components/controllers/chat-completion-controller.md new file mode 100644 index 00000000..f9d2eb80 --- /dev/null +++ b/docs/components/controllers/chat-completion-controller.md @@ -0,0 +1,382 @@ +# Chat Completion Controller + +## 📍 Controller Overview + +### Route Definition +**File**: `src/routes/v2/modelRouter.py` +**Function**: `chat_completion` +**Endpoint**: `POST /api/v2/model/chat/completion` + +## 🔗 Router Configuration + +### FastAPI Router Setup +**File**: `index.py` (Line 167) + +```python +from src.routes.v2.modelRouter import router as v2_router + +app.include_router( + v2_router, + prefix="/api/v2/model", + tags=["AI Models v2"] +) +``` + +### Route Handler +```python +@router.post("/chat/completion") +async def chat_completion( + request: Request, + body: dict = Depends(auth_and_rate_limit), + enriched_body: dict = Depends(add_configuration_data_to_body) +): + """ + Handle chat completion requests with full middleware stack + """ + return await process_chat_completion(enriched_body) +``` + +## 🛡️ Middleware Dependencies + +### 1. Authentication & Rate Limiting +**Dependency**: `auth_and_rate_limit` +**File**: `src/middlewares/auth_and_rate_limit.py` + +#### Functionality: +- JWT token validation +- User authentication and authorization +- Rate limiting enforcement (100 points per bridge_id, 20 per thread_id) +- Request context enrichment + +#### Input/Output: +```python +# Input: Raw request with Authorization header +# Output: Authenticated request body with user context +{ + "user_id": "user_123", + "org_id": "org_456", + "permissions": ["chat_completion"], + "rate_limit_remaining": 95, + "original_request": {...} +} +``` + +### 2. Configuration Data Enrichment +**Dependency**: `add_configuration_data_to_body` +**File**: `src/middlewares/getDataUsingBridgeId.py` + +#### Functionality: +- Bridge configuration retrieval +- Model and service validation +- API key injection +- Tool and RAG data loading + +#### Input/Output: +```python +# Input: Authenticated request body +# Output: Fully enriched request with configuration +{ + "bridge_id": "bridge_123", + "configuration": {...}, + "service": "openai", + "model": "gpt-4", + "apikey": "sk-...", + "tools": [...], + "rag_data": [...], + "variables": {...} +} +``` + +## 📝 Request Processing Flow + +### Request Validation +```python +def validate_chat_request(request_body): + """Validate incoming chat completion request""" + + required_fields = ["bridge_id", "user", "service", "model"] + missing_fields = [field for field in required_fields + if not request_body.get(field)] + + if missing_fields: + raise ValidationError(f"Missing required fields: {missing_fields}") + + # Validate message content + if not request_body.get("user") and not request_body.get("images"): + raise ValidationError("Either 'user' message or 'images' must be provided") + + return True +``` + +### Request Data Structure +```python +class ChatCompletionRequest: + """Standard chat completion request structure""" + + # Required fields + bridge_id: str + user: str # User message content + service: str # AI service provider + model: str # Model name + + # Optional fields + thread_id: Optional[str] = None + sub_thread_id: Optional[str] = None + variables: Dict[str, Any] = {} + tools: List[Dict] = [] + response_format: Dict = {"type": "default"} + temperature: Optional[float] = None + max_tokens: Optional[int] = None + files: List[str] = [] + images: List[str] = [] + is_playground: bool = False + + # Enriched by middleware + configuration: Dict = {} + apikey: str = "" + org_id: str = "" + rag_data: List[Dict] = [] +``` + +## 🎯 Controller Logic + +### Main Processing Function +```python +async def process_chat_completion(enriched_request): + """Main chat completion processing logic""" + + try: + # 1. Validate enriched request + validate_enriched_request(enriched_request) + + # 2. Check response format routing + response_format = enriched_request.get("response_format", {}) + + if response_format.get("type") != "default": + # Route to queue for async processing + return await route_to_queue(enriched_request) + + # 3. Route based on request type + request_type = enriched_request.get("type", "chat") + + if request_type == "embedding": + return await handle_embedding_request(enriched_request) + else: + return await handle_chat_request(enriched_request) + + except Exception as e: + return handle_controller_error(e, enriched_request) +``` + +### Queue Routing for Non-Default Responses +```python +async def route_to_queue(request_data): + """Route non-default response formats to async queue""" + + queue_message = { + "request_id": generate_request_id(), + "request_data": request_data, + "timestamp": datetime.utcnow().isoformat(), + "priority": determine_priority(request_data) + } + + # Publish to processing queue + await publish_to_queue("chat_completion_queue", queue_message) + + return { + "success": True, + "message": "Request queued for processing", + "request_id": queue_message["request_id"], + "estimated_processing_time": "30-60 seconds" + } +``` + +### Chat Request Handling +```python +async def handle_chat_request(request_data): + """Handle standard chat completion requests""" + + # Import and call main chat function + from src.services.commonServices.common import chat + + try: + # Execute chat processing + response = await chat(request_data) + + # Format response for client + formatted_response = format_controller_response(response) + + return formatted_response + + except Exception as e: + return handle_chat_error(e, request_data) +``` + +### Embedding Request Handling +```python +async def handle_embedding_request(request_data): + """Handle text embedding requests""" + + from src.services.commonServices.embedding import embedding + + try: + # Execute embedding processing + response = await embedding(request_data) + + return format_embedding_response(response) + + except Exception as e: + return handle_embedding_error(e, request_data) +``` + +## 📤 Response Formatting + +### Standard Response Format +```python +def format_controller_response(service_response): + """Format service response for API client""" + + if service_response.get("success"): + return { + "success": True, + "output": service_response.get("output", []), + "usage": service_response.get("usage", {}), + "metadata": { + "service": service_response.get("service"), + "model": service_response.get("model"), + "processing_time_ms": service_response.get("processing_time_ms"), + "timestamp": datetime.utcnow().isoformat() + } + } + else: + return format_error_response(service_response) +``` + +### Error Response Handling +```python +def handle_controller_error(error, request_context): + """Handle controller-level errors""" + + error_type = type(error).__name__ + + error_mappings = { + "ValidationError": { + "status_code": 400, + "error_code": "VALIDATION_ERROR" + }, + "AuthenticationError": { + "status_code": 401, + "error_code": "AUTHENTICATION_ERROR" + }, + "RateLimitError": { + "status_code": 429, + "error_code": "RATE_LIMIT_EXCEEDED" + }, + "ServiceUnavailableError": { + "status_code": 503, + "error_code": "SERVICE_UNAVAILABLE" + } + } + + error_config = error_mappings.get(error_type, { + "status_code": 500, + "error_code": "INTERNAL_ERROR" + }) + + return { + "success": False, + "error": { + "type": error_config["error_code"], + "message": str(error), + "timestamp": datetime.utcnow().isoformat() + } + }, error_config["status_code"] +``` + +## 📊 Request Analytics + +### Request Tracking +```python +def track_request_analytics(request_data, response_data): + """Track request analytics for monitoring""" + + analytics_event = { + "event_type": "chat_completion_request", + "bridge_id": request_data.get("bridge_id"), + "org_id": request_data.get("org_id"), + "service": request_data.get("service"), + "model": request_data.get("model"), + "request_size_bytes": len(json.dumps(request_data)), + "response_size_bytes": len(json.dumps(response_data)), + "success": response_data.get("success", False), + "processing_time_ms": response_data.get("metadata", {}).get("processing_time_ms"), + "timestamp": datetime.utcnow().isoformat() + } + + # Send to analytics service + send_analytics_event(analytics_event) +``` + +## 🔒 Security Considerations + +### Input Sanitization +```python +def sanitize_request_input(request_data): + """Sanitize user input to prevent injection attacks""" + + # Sanitize user message + if "user" in request_data: + request_data["user"] = sanitize_text(request_data["user"]) + + # Sanitize variables + if "variables" in request_data: + for key, value in request_data["variables"].items(): + if isinstance(value, str): + request_data["variables"][key] = sanitize_text(value) + + return request_data +``` + +### Rate Limiting Enforcement +```python +def enforce_additional_limits(request_data): + """Enforce additional business logic limits""" + + # Check daily usage limits + daily_usage = get_daily_usage(request_data["bridge_id"]) + daily_limit = get_bridge_daily_limit(request_data["bridge_id"]) + + if daily_usage >= daily_limit: + raise RateLimitError("Daily usage limit exceeded") + + # Check concurrent request limits + concurrent_requests = get_concurrent_requests(request_data["bridge_id"]) + concurrent_limit = get_bridge_concurrent_limit(request_data["bridge_id"]) + + if concurrent_requests >= concurrent_limit: + raise RateLimitError("Concurrent request limit exceeded") +``` + +## 🔄 Controller Flow Summary + +```mermaid +sequenceDiagram + participant C as Client + participant R as Router + participant AM as Auth Middleware + participant CM as Config Middleware + participant CC as Chat Controller + participant CS as Chat Service + + C->>R: POST /api/v2/model/chat/completion + R->>AM: Apply auth_and_rate_limit + AM->>CM: Apply add_configuration_data_to_body + CM->>CC: Enriched request + CC->>CC: Validate & route request + CC->>CS: Process chat completion + CS->>CC: Service response + CC->>CC: Format response + CC->>C: Final response +``` + +This controller provides a robust, secure, and well-structured entry point for chat completion requests with comprehensive middleware integration and error handling. diff --git a/docs/components/controllers/data-structures.md b/docs/components/controllers/data-structures.md new file mode 100644 index 00000000..b39d9e6b --- /dev/null +++ b/docs/components/controllers/data-structures.md @@ -0,0 +1,454 @@ +# Data Structures - Request & Response Formats + +## 📥 Request Data Structures + +### Chat Completion Request +```json +{ + "bridge_id": "string", + "user": "string", + "service": "openai", + "model": "gpt-4", + "thread_id": "string", + "sub_thread_id": "string", + "org_id": "string", + "variables": { + "key": "value" + }, + "tools": [ + { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get weather information", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "City name" + } + }, + "required": ["location"] + } + } + } + ], + "response_format": { + "type": "default" + }, + "temperature": 0.7, + "max_tokens": 1000, + "files": ["file_id_1", "file_id_2"], + "images": ["image_url_1", "image_url_2"], + "is_playground": false +} +``` + +### Required Fields +- **bridge_id**: Bridge identifier for configuration lookup +- **user**: User message content (required unless images provided) +- **service**: AI service provider (openai, anthropic, gemini, etc.) +- **model**: Specific model name + +### Optional Fields +- **thread_id**: Conversation thread identifier +- **sub_thread_id**: Sub-conversation identifier +- **org_id**: Organization identifier +- **variables**: Key-value pairs for prompt variable replacement +- **tools**: Available function tools for the AI +- **response_format**: Response formatting configuration +- **temperature**: Model creativity parameter (0.0-2.0) +- **max_tokens**: Maximum response length +- **files**: Array of uploaded file identifiers +- **images**: Array of image URLs or identifiers +- **is_playground**: Flag for playground vs production mode + +## 📤 Response Data Structures + +### Standard Success Response +```json +{ + "success": true, + "output": [ + { + "type": "text", + "content": { + "text": "AI response content here" + }, + "metadata": { + "content_type": "text/plain", + "length": 123 + } + } + ], + "usage": { + "input_tokens": 100, + "output_tokens": 50, + "total_tokens": 150, + "input_cost": 0.003, + "output_cost": 0.003, + "total_cost": 0.006 + }, + "metadata": { + "service": "openai", + "model": "gpt-4", + "processing_time_ms": 1250, + "timestamp": "2024-01-01T12:00:00Z" + } +} +``` + +### Function Call Response +```json +{ + "success": true, + "output": [ + { + "type": "function_call", + "content": { + "name": "get_weather", + "arguments": { + "location": "New York" + }, + "result": { + "temperature": "22°C", + "condition": "Sunny" + } + }, + "metadata": { + "execution_time_ms": 500, + "success": true + } + }, + { + "type": "text", + "content": { + "text": "The weather in New York is 22°C and sunny." + }, + "metadata": { + "content_type": "text/plain", + "length": 45 + } + } + ], + "usage": { + "input_tokens": 120, + "output_tokens": 75, + "total_tokens": 195, + "total_cost": 0.008 + }, + "metadata": { + "service": "openai", + "model": "gpt-4", + "processing_time_ms": 1750, + "timestamp": "2024-01-01T12:00:00Z" + } +} +``` + +### Error Response +```json +{ + "success": false, + "error": { + "type": "VALIDATION_ERROR", + "message": "Missing required field: user", + "code": "MISSING_REQUIRED_FIELD", + "details": { + "field": "user", + "expected_type": "string" + } + }, + "metadata": { + "timestamp": "2024-01-01T12:00:00Z", + "request_id": "req_123456" + } +} +``` + +## 🔧 Internal Data Structures + +### Parsed Request Data (Internal) +```python +class ParsedRequestData: + """Internal representation of parsed request""" + + # Core identifiers + bridge_id: str + org_id: str + thread_id: Optional[str] + sub_thread_id: Optional[str] + + # AI configuration + service: str + model: str + configuration: Dict[str, Any] + apikey: str + + # Content + user: str + variables: Dict[str, Any] + tools: List[Dict] + files: List[str] + images: List[str] + + # Settings + temperature: Optional[float] + max_tokens: Optional[int] + response_format: Dict[str, Any] + is_playground: bool + + # Enriched data (added by middleware) + rag_data: List[Dict] + gpt_memory: bool + tool_id_mapping: Dict[str, str] + pre_tools: Optional[Dict] +``` + +### Service Parameters +```python +class ServiceParameters: + """Parameters passed to AI service handlers""" + + customConfig: Dict[str, Any] + configuration: Dict[str, Any] + apikey: str + user: str + tools: List[Dict] + org_id: str + bridge_id: str + thread_id: str + model: str + service: str + token_calculator: TokenCalculator + variables: Dict[str, Any] + memory: str + rag_data: List[Dict] + conversation: List[Dict] # Formatted conversation history + temperature: Optional[float] + max_tokens: Optional[int] + stream: bool = False +``` + +### Usage Metrics +```python +class UsageMetrics: + """Token usage and cost calculation""" + + input_tokens: int + output_tokens: int + total_tokens: int + input_cost: float + output_cost: float + total_cost: float + model: str + service: str + + # Additional metrics + reasoning_tokens: Optional[int] = None + input_tokens_details: Optional[Dict] = None + output_tokens_details: Optional[Dict] = None +``` + +## 🎯 Service-Specific Formats + +### OpenAI Conversation Format +```python +openai_conversation = [ + { + "role": "system", + "content": "You are a helpful assistant." + }, + { + "role": "user", + "content": "Hello, how are you?" + }, + { + "role": "assistant", + "content": "I'm doing well, thank you!" + } +] +``` + +### Anthropic Conversation Format +```python +anthropic_conversation = [ + { + "role": "user", + "content": "Hello, how are you?" + }, + { + "role": "assistant", + "content": "I'm doing well, thank you!" + } +] +# Note: System prompt passed separately in Anthropic API +``` + +### Gemini Conversation Format +```python +gemini_conversation = [ + { + "role": "user", + "parts": [ + { + "text": "Hello, how are you?" + } + ] + }, + { + "role": "model", + "parts": [ + { + "text": "I'm doing well, thank you!" + } + ] + } +] +``` + +## 📊 Database Schema Structures + +### Raw Data Table +```sql +CREATE TABLE raw_data ( + id SERIAL PRIMARY KEY, + chat_id VARCHAR(255), + bridge_id VARCHAR(255), + org_id VARCHAR(255), + user_input TEXT, + ai_response TEXT, + model VARCHAR(100), + service VARCHAR(100), + input_tokens INTEGER, + output_tokens INTEGER, + total_cost DECIMAL(10,6), + latency_ms INTEGER, + success BOOLEAN, + error_message TEXT, + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW() +); +``` + +### Conversation History Table +```sql +CREATE TABLE conversations ( + chat_id VARCHAR(255) PRIMARY KEY, + thread_id VARCHAR(255), + sub_thread_id VARCHAR(255), + bridge_id VARCHAR(255), + org_id VARCHAR(255), + conversation_data JSONB, + metadata JSONB, + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW() +); +``` + +### TimescaleDB Metrics Table +```sql +CREATE TABLE metrics_timeseries ( + time TIMESTAMPTZ NOT NULL, + bridge_id VARCHAR(255), + org_id VARCHAR(255), + service VARCHAR(100), + model VARCHAR(100), + input_tokens INTEGER, + output_tokens INTEGER, + total_cost DECIMAL(10,6), + latency_ms INTEGER, + success_rate DECIMAL(5,2), + error_count INTEGER, + request_count INTEGER +); +``` + +## 🔄 Configuration Structures + +### Bridge Configuration +```json +{ + "_id": "ObjectId(...)", + "bridge_id": "bridge_123", + "org_id": "org_456", + "name": "Customer Support Bot", + "configuration": { + "prompt": "You are a helpful customer support assistant...", + "model": "gpt-4", + "temperature": 0.7, + "max_tokens": 1000, + "tool_choice": "auto" + }, + "tools": [ + { + "name": "search_knowledge_base", + "description": "Search the knowledge base", + "parameters": {...} + } + ], + "apikeys": { + "openai": "sk-...", + "anthropic": "sk-ant-..." + }, + "rag_data": [ + { + "document_id": "doc_123", + "content": "Document content...", + "metadata": {...} + } + ], + "settings": { + "gpt_memory": true, + "max_tool_calls": 5, + "enable_streaming": false + } +} +``` + +### Model Configuration +```json +{ + "service": "openai", + "models": [ + { + "name": "gpt-4", + "display_name": "GPT-4", + "max_tokens": 8192, + "supports_tools": true, + "supports_vision": false, + "supports_streaming": true, + "cost_per_1k_input": 0.03, + "cost_per_1k_output": 0.06, + "fallback_models": ["gpt-4-turbo", "gpt-3.5-turbo"] + } + ] +} +``` + +## 🚨 Error Code Definitions + +### Validation Errors (400) +- **MISSING_REQUIRED_FIELD**: Required field is missing +- **INVALID_FIELD_TYPE**: Field has incorrect data type +- **INVALID_FIELD_VALUE**: Field value is outside acceptable range +- **INVALID_MODEL_SERVICE_COMBINATION**: Model not available for service + +### Authentication Errors (401) +- **INVALID_JWT_TOKEN**: JWT token is invalid or expired +- **MISSING_AUTHORIZATION**: Authorization header missing +- **INSUFFICIENT_PERMISSIONS**: User lacks required permissions + +### Rate Limiting Errors (429) +- **RATE_LIMIT_EXCEEDED**: Request rate limit exceeded +- **DAILY_QUOTA_EXCEEDED**: Daily usage quota exceeded +- **CONCURRENT_LIMIT_EXCEEDED**: Too many concurrent requests + +### Service Errors (500) +- **AI_SERVICE_ERROR**: Error from AI service provider +- **DATABASE_ERROR**: Database connection or query error +- **INTERNAL_PROCESSING_ERROR**: Internal processing failure +- **CONFIGURATION_ERROR**: Bridge or model configuration error + +This comprehensive data structure documentation provides clear specifications for all request/response formats and internal data representations used throughout the AI middleware system. diff --git a/docs/components/services/ai-service-layer.md b/docs/components/services/ai-service-layer.md new file mode 100644 index 00000000..09d5e7f7 --- /dev/null +++ b/docs/components/services/ai-service-layer.md @@ -0,0 +1,587 @@ +# AI Service Layer - Provider Integration & Management + +## 🤖 Service Architecture Overview + +The AI Service Layer provides a unified interface for multiple AI providers, handling service selection, conversation formatting, API calls, and response processing. + +### Supported AI Providers +- **OpenAI**: GPT models, function calling, vision +- **Anthropic**: Claude models, tool use +- **Google**: Gemini models, multimodal capabilities +- **Groq**: Fast inference models +- **Mistral**: European AI models +- **OpenRouter**: Multi-provider routing + +## 🏭 Service Factory Pattern + +### Service Handler Creation +**File**: `src/services/utils/helper.py` +**Class**: `Helper` + +```python +class Helper: + """Service handler factory for AI providers""" + + @staticmethod + def create_service_handler(service_name: str, **kwargs): + """Create appropriate service handler based on service name""" + + service_mapping = { + "openai": UnifiedOpenAICase, + "openai_response": OpenaiResponse, + "anthropic": Antrophic, + "gemini": GeminiHandler, + "groq": Groq, + "mistral": Mistral, + "open_router": OpenRouter + } + + handler_class = service_mapping.get(service_name) + if not handler_class: + raise ValueError(f"Unsupported service: {service_name}") + + return handler_class(**kwargs) +``` + +## 🔄 Service Execution Flow + +### Base Service Interface +**File**: `src/services/commonServices/baseService/baseService.py` + +```python +class BaseService: + """Base class for all AI service handlers""" + + def __init__(self, **params): + self.params = params + self.service = params.get("service") + self.model = params.get("model") + self.apikey = params.get("apikey") + + async def execute(self): + """Main execution method - implemented by subclasses""" + raise NotImplementedError + + async def chats(self): + """Route to appropriate model runner based on service""" + routing_map = { + "openai": self.runModel, + "openai_response": self.openai_response_model, + "anthropic": self.anthropic_runmodel, + "groq": self.groq_runmodel, + "gemini": self.gemini_modelrun, + "mistral": self.mistral_model_run, + "open_router": self.openrouter_modelrun + } + + runner = routing_map.get(self.service) + if not runner: + raise ValueError(f"No runner found for service: {self.service}") + + return await runner() +``` + +## 🎯 OpenAI Service Implementation + +### OpenAI Handler +**File**: `src/services/commonServices/openAI/openaiCall.py` +**Class**: `UnifiedOpenAICase` + +```python +class UnifiedOpenAICase(BaseService): + """Unified OpenAI service handler""" + + async def execute(self): + """Execute OpenAI chat completion""" + try: + # Create conversation format + conversation = await self.create_conversation() + + # Execute API call + response = await self.chats() + + # Process function calls if present + if self.has_function_calls(response): + response = await self.process_function_calls(response) + + # Format response + formatted_response = self.format_response(response) + + return formatted_response + + except Exception as e: + return self.handle_error(e) + + async def create_conversation(self): + """Create OpenAI-formatted conversation""" + from src.services.commonServices.createConversations import ConversationService + + return ConversationService.createOpenAiConversation( + history=self.params.get("conversation", []), + memory_context=self.params.get("memory", ""), + tools=self.params.get("tools", []) + ) +``` + +### OpenAI Model Runner +**File**: `src/services/commonServices/openAI/runModel.py` + +```python +async def runModel(params): + """Execute OpenAI API call with retry logic""" + + client = AsyncOpenAI(api_key=params["apikey"]) + + # Prepare API parameters + api_params = { + "model": params["model"], + "messages": params["conversation"], + "temperature": params.get("temperature", 0.7), + "max_tokens": params.get("max_tokens"), + "tools": params.get("tools"), + "tool_choice": params.get("tool_choice", "auto") + } + + # Remove None values + api_params = {k: v for k, v in api_params.items() if v is not None} + + try: + response = await client.chat.completions.create(**api_params) + return process_openai_response(response) + + except openai.RateLimitError as e: + return handle_rate_limit_error(e, params) + except openai.APIError as e: + return handle_api_error(e, params) + except Exception as e: + return handle_fallback_model(e, params) + +def process_openai_response(response): + """Process OpenAI API response""" + + message = response.choices[0].message + + # Extract content and function calls + content = message.content or "" + function_calls = [] + + if message.tool_calls: + for tool_call in message.tool_calls: + function_calls.append({ + "id": tool_call.id, + "name": tool_call.function.name, + "arguments": json.loads(tool_call.function.arguments) + }) + + return { + "success": True, + "content": content, + "function_calls": function_calls, + "usage": { + "input_tokens": response.usage.prompt_tokens, + "output_tokens": response.usage.completion_tokens, + "total_tokens": response.usage.total_tokens + }, + "model": response.model + } +``` + +## 🧠 Anthropic Service Implementation + +### Anthropic Handler +**File**: `src/services/commonServices/anthropic/anthropicCall.py` + +```python +class Antrophic(BaseService): + """Anthropic Claude service handler""" + + async def execute(self): + """Execute Anthropic chat completion""" + try: + # Create Anthropic conversation format + conversation = await self.create_anthropic_conversation() + + # Execute API call + response = await self.anthropic_runmodel() + + # Process tool use if present + if self.has_tool_use(response): + response = await self.process_tool_use(response) + + return self.format_response(response) + + except Exception as e: + return self.handle_error(e) + + async def create_anthropic_conversation(self): + """Create Anthropic-formatted conversation""" + from src.services.commonServices.createConversations import ConversationService + + return ConversationService.createAnthropicConversation( + history=self.params.get("conversation", []), + system_prompt=self.params.get("configuration", {}).get("prompt", ""), + tools=self.params.get("tools", []) + ) +``` + +## 🌟 Gemini Service Implementation + +### Gemini Handler +**File**: `src/services/commonServices/gemini/geminiCall.py` + +```python +class GeminiHandler(BaseService): + """Google Gemini service handler""" + + async def execute(self): + """Execute Gemini chat completion""" + try: + # Create Gemini conversation format + conversation = await self.create_gemini_conversation() + + # Execute API call + response = await self.gemini_modelrun() + + # Process function calling if present + if self.has_function_calling(response): + response = await self.process_function_calling(response) + + return self.format_response(response) + + except Exception as e: + return self.handle_error(e) + + async def create_gemini_conversation(self): + """Create Gemini-formatted conversation with parts structure""" + from src.services.commonServices.createConversations import ConversationService + + return ConversationService.createGeminiConversation( + history=self.params.get("conversation", []), + system_instruction=self.params.get("configuration", {}).get("prompt", ""), + tools=self.params.get("tools", []) + ) +``` + +## 🔧 Conversation Service + +### Conversation Formatting +**File**: `src/services/commonServices/createConversations.py` + +```python +class ConversationService: + """Service for creating provider-specific conversation formats""" + + @staticmethod + def createOpenAiConversation(history, memory_context="", tools=None): + """Create OpenAI chat completion format""" + + messages = [] + + # Add system message if present + if memory_context: + messages.append({ + "role": "system", + "content": memory_context + }) + + # Process conversation history + for item in history: + if item.get("role") in ["user", "assistant", "system"]: + message = { + "role": item["role"], + "content": item.get("content", "") + } + + # Add tool calls if present + if item.get("tool_calls"): + message["tool_calls"] = item["tool_calls"] + + messages.append(message) + + return messages + + @staticmethod + def createAnthropicConversation(history, system_prompt="", tools=None): + """Create Anthropic conversation format""" + + messages = [] + + # Anthropic doesn't include system in messages array + for item in history: + if item.get("role") in ["user", "assistant"]: + message = { + "role": item["role"], + "content": item.get("content", "") + } + + # Add tool use if present + if item.get("tool_use"): + message["content"] = [ + {"type": "text", "text": item.get("content", "")}, + {"type": "tool_use", **item["tool_use"]} + ] + + messages.append(message) + + return messages, system_prompt + + @staticmethod + def createGeminiConversation(history, system_instruction="", tools=None): + """Create Gemini conversation format with parts structure""" + + contents = [] + + for item in history: + role = "user" if item.get("role") == "user" else "model" + + parts = [] + + # Add text content + if item.get("content"): + parts.append({"text": item["content"]}) + + # Add function calls + if item.get("function_calls"): + for func_call in item["function_calls"]: + parts.append({ + "functionCall": { + "name": func_call["name"], + "args": func_call["arguments"] + } + }) + + # Add function responses + if item.get("function_responses"): + for func_resp in item["function_responses"]: + parts.append({ + "functionResponse": { + "name": func_resp["name"], + "response": func_resp["response"] + } + }) + + contents.append({ + "role": role, + "parts": parts + }) + + return contents, system_instruction +``` + +## 🛠️ Function/Tool Processing + +### Function Call Detection +**File**: `src/services/commonServices/openAI/openai_response.py` + +```python +def detect_function_calls(response): + """Enhanced function call detection across multiple formats""" + + output = response.get("output", []) + if not output: + return False + + # Check multiple detection strategies + detection_methods = [ + check_tool_calls_format, + check_function_call_format, + check_embedded_function_calls, + check_reasoning_function_calls + ] + + for method in detection_methods: + if method(output): + return True + + return False + +def check_tool_calls_format(output): + """Check for tool_calls in response""" + for item in output: + if item.get("tool_calls"): + return True + if item.get("content", {}).get("tool_calls"): + return True + return False + +def check_function_call_format(output): + """Check for function_call in response""" + for item in output: + if item.get("function_call"): + return True + if item.get("content", {}).get("function_call"): + return True + return False +``` + +### Concurrent Tool Execution +**File**: `src/services/commonServices/baseService/baseService.py` + +```python +async def execute_tools_concurrently(self, tool_calls): + """Execute multiple tools concurrently""" + + tasks = [] + + for tool_call in tool_calls: + task = asyncio.create_task( + self.execute_single_tool(tool_call) + ) + tasks.append(task) + + # Execute all tools concurrently + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Process results and handle exceptions + processed_results = [] + for i, result in enumerate(results): + if isinstance(result, Exception): + processed_results.append({ + "tool_call_id": tool_calls[i].get("id"), + "success": False, + "error": str(result), + "result": None + }) + else: + processed_results.append({ + "tool_call_id": tool_calls[i].get("id"), + "success": True, + "error": None, + "result": result + }) + + return processed_results + +async def execute_single_tool(self, tool_call): + """Execute a single tool/function call""" + + tool_name = tool_call.get("name") + tool_args = tool_call.get("arguments", {}) + + # Get tool configuration + tool_config = self.get_tool_config(tool_name) + + if not tool_config: + raise ValueError(f"Tool not found: {tool_name}") + + # Execute HTTP call to tool endpoint + try: + response = await self.make_tool_request( + url=tool_config["url"], + method=tool_config.get("method", "POST"), + data=tool_args, + headers=tool_config.get("headers", {}), + timeout=tool_config.get("timeout", 30) + ) + + return response + + except Exception as e: + raise Exception(f"Tool execution failed: {str(e)}") +``` + +## 📊 Response Processing & Formatting + +### Response Standardization +**File**: `src/services/utils/ai_middleware_format.py` + +```python +def format_ai_response(service_response, service_name): + """Format service response to standard AI middleware format""" + + formatted_response = { + "success": service_response.get("success", True), + "output": [], + "usage": extract_usage_metrics(service_response), + "metadata": { + "service": service_name, + "model": service_response.get("model"), + "processing_time_ms": service_response.get("latency_ms"), + "timestamp": datetime.utcnow().isoformat() + } + } + + # Process output items + for item in service_response.get("output", []): + formatted_item = format_output_item(item, service_name) + formatted_response["output"].append(formatted_item) + + return formatted_response + +def extract_usage_metrics(response): + """Extract and standardize usage metrics""" + + usage = response.get("usage", {}) + + return { + "input_tokens": usage.get("input_tokens", 0), + "output_tokens": usage.get("output_tokens", 0), + "total_tokens": usage.get("total_tokens", 0), + "input_cost": usage.get("input_cost", 0.0), + "output_cost": usage.get("output_cost", 0.0), + "total_cost": usage.get("total_cost", 0.0) + } +``` + +## 🔄 Fallback & Retry Logic + +### Model Fallback Strategy +```python +FALLBACK_MODELS = { + "gpt-4": ["gpt-4-turbo", "gpt-3.5-turbo"], + "gpt-4-turbo": ["gpt-3.5-turbo"], + "claude-3-opus": ["claude-3-sonnet", "claude-3-haiku"], + "claude-3-sonnet": ["claude-3-haiku"], + "gemini-pro": ["gemini-pro-vision"], + "llama2-70b-4096": ["mixtral-8x7b-32768"] +} + +async def handle_fallback_model(error, params): + """Handle model fallback on failure""" + + current_model = params.get("model") + fallback_models = FALLBACK_MODELS.get(current_model, []) + + for fallback_model in fallback_models: + try: + # Update params with fallback model + fallback_params = params.copy() + fallback_params["model"] = fallback_model + + # Retry with fallback model + response = await execute_with_fallback_model(fallback_params) + + # Add fallback metadata + response["fallback_used"] = True + response["original_model"] = current_model + response["fallback_model"] = fallback_model + response["fallback_reason"] = str(error) + + return response + + except Exception as fallback_error: + continue # Try next fallback model + + # All fallbacks failed + raise Exception(f"All models failed. Original error: {error}") +``` + +## 🔄 Service Layer Summary + +```mermaid +graph TD + A[Service Factory] --> B[Service Handler] + B --> C[Conversation Formatter] + C --> D[API Client] + D --> E[Response Processor] + E --> F[Function Executor] + F --> G[Response Formatter] + G --> H[Fallback Handler] + + style A fill:#e1f5fe + style D fill:#fff3e0 + style F fill:#f3e5f5 + style H fill:#ffebee +``` + +This AI Service Layer provides a robust, extensible architecture for integrating multiple AI providers with consistent interfaces, comprehensive error handling, and advanced features like concurrent tool execution and intelligent fallback strategies. diff --git a/docs/components/services/configuration-service.md b/docs/components/services/configuration-service.md new file mode 100644 index 00000000..49f20721 --- /dev/null +++ b/docs/components/services/configuration-service.md @@ -0,0 +1,572 @@ +# Configuration Service - Bridge & Model Management + +## 🔧 Configuration Architecture + +The Configuration Service manages bridge configurations, model settings, API keys, and associated tools/RAG data. It provides caching, validation, and dynamic configuration assembly. + +## 📋 Core Configuration Service + +### Main Configuration Function +**File**: `src/services/utils/getConfiguration.py` +**Function**: `getConfiguration` + +```python +async def getConfiguration( + configuration: Dict, + service: str, + bridge_id: str, + apikey: str = None, + template_id: str = None, + variables: Dict = None, + org_id: str = None, + variables_path: str = None, + version_id: str = None, + extra_tools: List = None, + built_in_tools: List = None +): + """ + Assemble complete configuration from bridge data and parameters + + Returns: + Dict: Complete configuration with all necessary data + """ + + try: + # Get bridge data from database + bridge_data = await get_bridge_data( + bridge_id=bridge_id, + org_id=org_id, + version_id=version_id + ) + + # Validate bridge data + validate_bridge_configuration(bridge_data, service) + + # Assemble configuration + assembled_config = assemble_configuration( + base_config=configuration, + bridge_data=bridge_data, + service=service, + variables=variables or {}, + extra_tools=extra_tools or [], + built_in_tools=built_in_tools or [] + ) + + return { + "success": True, + "configuration": assembled_config["configuration"], + "service": service, + "apikey": assembled_config["apikey"], + "pre_tools": assembled_config.get("pre_tools"), + "variables": assembled_config["variables"], + "rag_data": assembled_config["rag_data"], + "tools": assembled_config["tools"], + "tool_id_and_name_mapping": assembled_config["tool_mapping"], + "gpt_memory": assembled_config.get("gpt_memory", False), + "version_id": version_id, + "bridge_id": bridge_id + } + + except Exception as e: + return { + "success": False, + "error": str(e), + "error_type": type(e).__name__ + } +``` + +## 🗄️ Bridge Data Retrieval + +### Database Service +**File**: `src/db_services/ConfigurationServices.py` +**Function**: `get_bridges_with_tools_and_apikeys` + +```python +async def get_bridges_with_tools_and_apikeys(bridge_id: str, org_id: str = None): + """ + Retrieve bridge configuration with associated tools and API keys + + Uses MongoDB aggregation pipeline to join multiple collections + """ + + pipeline = [ + # Match bridge by ID + { + "$match": { + "_id": ObjectId(bridge_id) + } + }, + + # Join with tools/functions + { + "$lookup": { + "from": "apicalls", + "localField": "_id", + "foreignField": "bridge_id", + "as": "tools" + } + }, + + # Join with API keys + { + "$lookup": { + "from": "apikeycredentials", + "localField": "org_id", + "foreignField": "org_id", + "as": "apikeys" + } + }, + + # Join with RAG documents + { + "$lookup": { + "from": "rag_parent_datas", + "localField": "_id", + "foreignField": "bridge_id", + "as": "rag_data" + } + }, + + # Project required fields + { + "$project": { + "bridge_id": {"$toString": "$_id"}, + "org_id": 1, + "name": 1, + "configuration": 1, + "tools": 1, + "apikeys": 1, + "rag_data": 1, + "settings": 1, + "created_at": 1, + "updated_at": 1 + } + } + ] + + result = await mongodb_collection.aggregate(pipeline).to_list(1) + + if not result: + raise ValueError(f"Bridge not found: {bridge_id}") + + return result[0] +``` + +### Bridge Data Utilities +**File**: `src/services/utils/getConfiguration_utils.py` + +```python +async def get_bridge_data(bridge_id: str, org_id: str = None, version_id: str = None): + """ + Get bridge data with Redis caching + """ + + # Check Redis cache first + cache_key = f"bridge_config:{bridge_id}:{version_id or 'latest'}" + cached_data = await redis_client.get(cache_key) + + if cached_data: + return json.loads(cached_data) + + # Fetch from database + bridge_data = await ConfigurationServices.get_bridges_with_tools_and_apikeys( + bridge_id=bridge_id, + org_id=org_id + ) + + # Process and clean data + processed_data = process_bridge_data(bridge_data) + + # Cache for 1 hour + await redis_client.setex( + cache_key, + 3600, + json.dumps(processed_data, default=str) + ) + + return processed_data + +def process_bridge_data(raw_data): + """Process raw bridge data for use""" + + # Convert ObjectIds to strings + processed_data = convert_objectids_to_strings(raw_data) + + # Organize tools by type + processed_data["tools"] = organize_tools(processed_data.get("tools", [])) + + # Process API keys by service + processed_data["apikeys"] = organize_apikeys(processed_data.get("apikeys", [])) + + # Process RAG data + processed_data["rag_data"] = process_rag_data(processed_data.get("rag_data", [])) + + return processed_data +``` + +## ⚙️ Configuration Assembly + +### Configuration Builder +```python +def assemble_configuration( + base_config: Dict, + bridge_data: Dict, + service: str, + variables: Dict, + extra_tools: List, + built_in_tools: List +): + """ + Assemble complete configuration from all sources + """ + + # Start with bridge configuration + assembled_config = bridge_data.get("configuration", {}).copy() + + # Merge with base configuration (base takes precedence) + assembled_config.update(base_config) + + # Process variables + processed_variables = process_variables( + variables=variables, + bridge_variables=bridge_data.get("variables", {}), + configuration=assembled_config + ) + + # Assemble tools + all_tools = assemble_tools( + bridge_tools=bridge_data.get("tools", []), + extra_tools=extra_tools, + built_in_tools=built_in_tools + ) + + # Get API key for service + service_apikey = get_service_apikey( + bridge_apikeys=bridge_data.get("apikeys", {}), + service=service + ) + + # Process RAG data + processed_rag_data = process_rag_documents( + bridge_data.get("rag_data", []) + ) + + return { + "configuration": assembled_config, + "apikey": service_apikey, + "variables": processed_variables, + "tools": all_tools["tools"], + "tool_mapping": all_tools["mapping"], + "rag_data": processed_rag_data, + "pre_tools": bridge_data.get("pre_tools"), + "gpt_memory": bridge_data.get("settings", {}).get("gpt_memory", False) + } +``` + +### Tool Assembly +```python +def assemble_tools(bridge_tools: List, extra_tools: List, built_in_tools: List): + """ + Assemble and organize all available tools + """ + + all_tools = [] + tool_mapping = {} + + # Process bridge tools + for tool in bridge_tools: + processed_tool = process_bridge_tool(tool) + all_tools.append(processed_tool) + tool_mapping[tool.get("_id", "")] = processed_tool.get("name", "") + + # Add extra tools + for tool in extra_tools: + processed_tool = process_extra_tool(tool) + all_tools.append(processed_tool) + + # Add built-in tools + for tool_name in built_in_tools: + built_in_tool = get_built_in_tool(tool_name) + if built_in_tool: + all_tools.append(built_in_tool) + + return { + "tools": all_tools, + "mapping": tool_mapping + } + +def process_bridge_tool(tool_data): + """ + Convert bridge tool data to OpenAI function format + """ + + return { + "type": "function", + "function": { + "name": tool_data.get("name", ""), + "description": tool_data.get("description", ""), + "parameters": tool_data.get("parameters", {}), + "url": tool_data.get("url", ""), + "method": tool_data.get("method", "POST"), + "headers": tool_data.get("headers", {}), + "timeout": tool_data.get("timeout", 30) + } + } +``` + +## 🔑 API Key Management + +### Service API Key Retrieval +```python +def get_service_apikey(bridge_apikeys: Dict, service: str): + """ + Get API key for specific service from bridge configuration + """ + + # Service name mappings + service_mappings = { + "openai": ["openai", "openai_api_key"], + "anthropic": ["anthropic", "anthropic_api_key", "claude"], + "gemini": ["gemini", "google", "google_api_key"], + "groq": ["groq", "groq_api_key"], + "mistral": ["mistral", "mistral_api_key"], + "open_router": ["open_router", "openrouter", "openrouter_api_key"] + } + + possible_keys = service_mappings.get(service, [service]) + + for key_name in possible_keys: + if key_name in bridge_apikeys: + api_key = bridge_apikeys[key_name] + + # Decrypt if encrypted + if is_encrypted(api_key): + api_key = decrypt_api_key(api_key) + + return api_key + + raise ValueError(f"API key not found for service: {service}") + +def decrypt_api_key(encrypted_key: str) -> str: + """ + Decrypt encrypted API key + """ + try: + from cryptography.fernet import Fernet + + # Get encryption key from environment + encryption_key = os.getenv("API_KEY_ENCRYPTION_KEY") + if not encryption_key: + raise ValueError("Encryption key not found") + + f = Fernet(encryption_key.encode()) + decrypted_key = f.decrypt(encrypted_key.encode()).decode() + + return decrypted_key + + except Exception as e: + raise ValueError(f"Failed to decrypt API key: {str(e)}") +``` + +## 📚 RAG Data Processing + +### Document Processing +```python +def process_rag_documents(rag_data: List) -> List: + """ + Process RAG documents for use in AI context + """ + + processed_documents = [] + + for document in rag_data: + processed_doc = { + "document_id": str(document.get("_id", "")), + "title": document.get("title", ""), + "content": document.get("content", ""), + "metadata": { + "source": document.get("source", ""), + "type": document.get("type", "text"), + "created_at": document.get("created_at", ""), + "tags": document.get("tags", []) + }, + "embeddings": document.get("embeddings", []), + "chunk_size": len(document.get("content", "")), + "relevance_score": document.get("relevance_score", 0.0) + } + + processed_documents.append(processed_doc) + + # Sort by relevance score + processed_documents.sort( + key=lambda x: x["relevance_score"], + reverse=True + ) + + return processed_documents +``` + +## 🔍 Model Configuration Service + +### Optimized Model Configurations +**File**: `src/db_services/ModelConfigServices.py` + +```python +class ModelConfigServices: + """Optimized model configuration management""" + + @staticmethod + async def get_service_models_optimized(service: str): + """ + Get pre-processed model configurations for a service + """ + + # Check cache first + cache_key = f"model_config:{service}" + cached_config = await redis_client.get(cache_key) + + if cached_config: + return json.loads(cached_config) + + # Fetch from MongoDB processed collection + config = await mongodb_collection.find_one( + {"service": service}, + {"_id": 0} + ) + + if not config: + raise ValueError(f"Model configuration not found for service: {service}") + + # Cache for 2 hours + await redis_client.setex(cache_key, 7200, json.dumps(config)) + + return config + + @staticmethod + async def sync_model_configurations(): + """ + Sync model configurations from raw data to processed collection + """ + + # Load raw model configurations + raw_configs = await load_raw_model_configs() + + # Process configurations + processed_configs = [] + + for service, models in raw_configs.items(): + processed_config = { + "service": service, + "models": [], + "last_updated": datetime.utcnow() + } + + for model_name, model_data in models.items(): + processed_model = { + "name": model_name, + "display_name": model_data.get("display_name", model_name), + "max_tokens": model_data.get("max_tokens", 4096), + "supports_tools": model_data.get("supports_tools", False), + "supports_vision": model_data.get("supports_vision", False), + "supports_streaming": model_data.get("supports_streaming", True), + "cost_per_1k_input": model_data.get("cost_per_1k_input", 0.0), + "cost_per_1k_output": model_data.get("cost_per_1k_output", 0.0), + "fallback_models": model_data.get("fallback_models", []) + } + + processed_config["models"].append(processed_model) + + processed_configs.append(processed_config) + + # Update processed collection + for config in processed_configs: + await mongodb_collection.replace_one( + {"service": config["service"]}, + config, + upsert=True + ) + + # Clear cache + await clear_model_config_cache() + + return {"success": True, "updated_services": len(processed_configs)} +``` + +## 🔄 Configuration Validation + +### Bridge Configuration Validation +```python +def validate_bridge_configuration(bridge_data: Dict, service: str): + """ + Validate bridge configuration for service compatibility + """ + + # Check required fields + required_fields = ["bridge_id", "org_id", "configuration"] + missing_fields = [field for field in required_fields + if not bridge_data.get(field)] + + if missing_fields: + raise ValidationError(f"Missing required fields: {missing_fields}") + + # Validate service compatibility + configuration = bridge_data.get("configuration", {}) + model = configuration.get("model") + + if model: + validate_model_service_compatibility(model, service) + + # Validate API keys + apikeys = bridge_data.get("apikeys", {}) + if service not in apikeys and not get_service_apikey(apikeys, service): + raise ValidationError(f"API key not found for service: {service}") + + # Validate tools + tools = bridge_data.get("tools", []) + validate_tools_configuration(tools) + +def validate_model_service_compatibility(model: str, service: str): + """ + Validate that model is compatible with service + """ + + model_service_mapping = { + "gpt-4": ["openai"], + "gpt-3.5-turbo": ["openai"], + "claude-3-opus": ["anthropic"], + "claude-3-sonnet": ["anthropic"], + "gemini-pro": ["gemini"], + "llama2-70b-4096": ["groq"], + "mixtral-8x7b-32768": ["groq", "mistral"] + } + + compatible_services = model_service_mapping.get(model, []) + + if service not in compatible_services: + raise ValidationError( + f"Model '{model}' is not compatible with service '{service}'. " + f"Compatible services: {compatible_services}" + ) +``` + +## 🔄 Configuration Service Summary + +```mermaid +graph TD + A[Configuration Request] --> B[Redis Cache Check] + B -->|Cache Hit| C[Return Cached Config] + B -->|Cache Miss| D[Database Query] + D --> E[MongoDB Aggregation] + E --> F[Process Bridge Data] + F --> G[Assemble Configuration] + G --> H[Validate Configuration] + H --> I[Cache Result] + I --> J[Return Configuration] + + style A fill:#e1f5fe + style C fill:#e8f5e8 + style E fill:#fff3e0 + style H fill:#f3e5f5 +``` + +This Configuration Service provides a comprehensive, cached, and validated approach to managing bridge configurations, model settings, and associated resources for the AI middleware system. diff --git a/docs/flows/database-operations-flow.md b/docs/flows/database-operations-flow.md new file mode 100644 index 00000000..58d37ac5 --- /dev/null +++ b/docs/flows/database-operations-flow.md @@ -0,0 +1,395 @@ +# Database Operations Flow - Metrics & Data Storage + +## 🗄️ Database Architecture + +### Database Stack: +- **PostgreSQL**: Main data storage and conversation history +- **TimescaleDB**: Time-series metrics and analytics +- **Redis**: Caching layer and session storage +- **MongoDB**: Configuration and bridge data + +## 📊 Metrics Collection Flow + +### Background Processing +**File**: `src/services/utils/common_utils.py` +**Function**: `process_background_tasks()` + +#### Background Operations: +1. **Metrics Creation**: `create()` from metrics_service +2. **Sub-queue Publishing**: Message queue for downstream processing +3. **Conversation Storage**: Thread and message persistence + +### Metrics Service +**File**: `src/db_services/metrics_service.py` +**Function**: `create()` + +## 🔄 Data Storage Flow (Reordered) + +### Step 1: Raw Data Insertion +**Function**: `insertRawData()` in `conversationDbService.py` + +#### Process: +```python +def insertRawData(raw_data_list, random_id): + """ + Insert raw data with temporary chat_id + Returns: List of inserted record IDs + """ + inserted_ids = [] + + for data in raw_data_list: + # Use random_id as temporary chat_id + data["chat_id"] = random_id + + # Insert into PostgreSQL + result = db.execute( + "INSERT INTO raw_data (...) VALUES (...) RETURNING id", + data + ) + inserted_ids.append(result.fetchone()[0]) + + return inserted_ids +``` + +### Step 2: Conversation History Storage +**Function**: `saveConversationHistory()` in `conversationDbService.py` + +#### Process: +```python +def saveConversationHistory(conversation_data): + """ + Save conversation and get actual chat_id + Returns: chat_id + """ + # Insert conversation record + chat_id = db.execute( + "INSERT INTO conversations (...) VALUES (...) RETURNING chat_id", + conversation_data + ).fetchone()[0] + + return chat_id +``` + +### Step 3: Raw Data Update +**Function**: `updateRawDataChatId()` in `conversationDbService.py` + +#### Process: +```python +def updateRawDataChatId(record_ids, actual_chat_id): + """ + Update raw_data records with actual chat_id + """ + db.execute( + "UPDATE raw_data SET chat_id = %s WHERE id = ANY(%s)", + (actual_chat_id, record_ids) + ) +``` + +### Complete Reordered Flow: +```python +def create(metrics_data): + # Generate temporary ID + random_id = generate_random_id() + + # Step 1: Insert raw data with temporary chat_id + raw_data_ids = insertRawData(metrics_data["raw_data"], random_id) + + # Step 2: Save conversation history and get actual chat_id + actual_chat_id = saveConversationHistory(metrics_data["conversation"]) + + # Step 3: Update raw_data with actual chat_id + updateRawDataChatId(raw_data_ids, actual_chat_id) + + return {"success": True, "chat_id": actual_chat_id} +``` + +## 📈 Stored Metrics Data + +### Raw Data Table Structure: +```sql +CREATE TABLE raw_data ( + id SERIAL PRIMARY KEY, + chat_id VARCHAR(255), + bridge_id VARCHAR(255), + org_id VARCHAR(255), + user_input TEXT, + ai_response TEXT, + model VARCHAR(100), + service VARCHAR(100), + input_tokens INTEGER, + output_tokens INTEGER, + total_cost DECIMAL(10,6), + latency_ms INTEGER, + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW() +); +``` + +### Conversation History Table: +```sql +CREATE TABLE conversations ( + chat_id VARCHAR(255) PRIMARY KEY, + thread_id VARCHAR(255), + sub_thread_id VARCHAR(255), + bridge_id VARCHAR(255), + org_id VARCHAR(255), + conversation_data JSONB, + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW() +); +``` + +### TimescaleDB Metrics (Time-Series): +```sql +CREATE TABLE metrics_timeseries ( + time TIMESTAMPTZ NOT NULL, + bridge_id VARCHAR(255), + org_id VARCHAR(255), + service VARCHAR(100), + model VARCHAR(100), + input_tokens INTEGER, + output_tokens INTEGER, + total_cost DECIMAL(10,6), + latency_ms INTEGER, + success_rate DECIMAL(5,2), + error_count INTEGER +); + +-- Create hypertable for time-series optimization +SELECT create_hypertable('metrics_timeseries', 'time'); +``` + +## 🔄 Redis Caching Strategy + +### Cache Types: + +#### 1. Configuration Cache +```python +# Cache key pattern: config:{bridge_id}:{version_id} +cache_key = f"config:{bridge_id}:{version_id}" +cache_data = { + "configuration": bridge_config, + "tools": available_tools, + "apikeys": service_apikeys, + "rag_data": document_data, + "cached_at": datetime.utcnow().isoformat(), + "ttl": 3600 # 1 hour +} +redis.setex(cache_key, 3600, json.dumps(cache_data)) +``` + +#### 2. Usage Tracking Cache +```python +# Cache key pattern: usage:{bridge_id}:{date} +usage_key = f"usage:{bridge_id}:{date.today()}" +usage_data = { + "total_requests": 1250, + "total_tokens": 45000, + "total_cost": 12.50, + "last_updated": datetime.utcnow().isoformat() +} +redis.setex(usage_key, 86400, json.dumps(usage_data)) # 24 hours +``` + +#### 3. Rate Limiting Cache +```python +# Cache key pattern: rate_limit:{bridge_id}:{window} +rate_limit_key = f"rate_limit:{bridge_id}:{window_start}" +rate_data = { + "count": 95, + "window_start": window_start.isoformat(), + "remaining": 5 +} +redis.setex(rate_limit_key, window_duration, json.dumps(rate_data)) +``` + +#### 4. GPT Memory Cache +```python +# Cache key pattern: memory:{thread_id} +memory_key = f"memory:{thread_id}" +memory_data = { + "context": conversation_context, + "summary": conversation_summary, + "last_updated": datetime.utcnow().isoformat() +} +redis.setex(memory_key, 7200, json.dumps(memory_data)) # 2 hours +``` + +## 📊 MongoDB Configuration Storage + +### Bridge Configuration: +```javascript +// bridges collection +{ + "_id": ObjectId("..."), + "bridge_id": "bridge_123", + "org_id": "org_456", + "name": "Customer Support Bot", + "configuration": { + "prompt": "You are a helpful assistant...", + "model": "gpt-4", + "temperature": 0.7, + "max_tokens": 1000 + }, + "tools": [...], + "apikeys": {...}, + "created_at": ISODate("..."), + "updated_at": ISODate("...") +} +``` + +### Model Configurations (Optimized): +```javascript +// processed_model_configurations collection +{ + "_id": ObjectId("..."), + "service": "openai", + "models": [ + { + "name": "gpt-4", + "display_name": "GPT-4", + "max_tokens": 8192, + "supports_tools": true, + "supports_vision": false, + "cost_per_1k_input": 0.03, + "cost_per_1k_output": 0.06 + } + ], + "last_updated": ISODate("...") +} +``` + +## 🔍 Query Optimization + +### Database Indexes: +```sql +-- PostgreSQL indexes for performance +CREATE INDEX idx_raw_data_bridge_id ON raw_data(bridge_id); +CREATE INDEX idx_raw_data_org_id ON raw_data(org_id); +CREATE INDEX idx_raw_data_created_at ON raw_data(created_at); +CREATE INDEX idx_conversations_thread_id ON conversations(thread_id); + +-- TimescaleDB indexes +CREATE INDEX idx_metrics_bridge_id_time ON metrics_timeseries(bridge_id, time DESC); +CREATE INDEX idx_metrics_org_id_time ON metrics_timeseries(org_id, time DESC); +``` + +### Connection Pooling: +```python +# PostgreSQL connection pool +DATABASE_POOL = { + "min_connections": 5, + "max_connections": 20, + "connection_timeout": 30, + "idle_timeout": 300 +} + +# Redis connection pool +REDIS_POOL = { + "max_connections": 50, + "retry_on_timeout": True, + "socket_timeout": 5 +} +``` + +## 📈 Analytics & Reporting + +### Real-time Metrics: +```python +def get_realtime_metrics(bridge_id, timeframe="1h"): + """Get real-time metrics from TimescaleDB""" + query = """ + SELECT + time_bucket('5m', time) as bucket, + COUNT(*) as requests, + AVG(latency_ms) as avg_latency, + SUM(input_tokens + output_tokens) as total_tokens, + SUM(total_cost) as total_cost + FROM metrics_timeseries + WHERE bridge_id = %s + AND time >= NOW() - INTERVAL %s + GROUP BY bucket + ORDER BY bucket DESC + """ + return execute_query(query, (bridge_id, timeframe)) +``` + +### Usage Analytics: +```python +def get_usage_analytics(org_id, start_date, end_date): + """Get comprehensive usage analytics""" + query = """ + SELECT + bridge_id, + service, + model, + COUNT(*) as total_requests, + SUM(input_tokens) as total_input_tokens, + SUM(output_tokens) as total_output_tokens, + SUM(total_cost) as total_cost, + AVG(latency_ms) as avg_latency + FROM raw_data + WHERE org_id = %s + AND created_at BETWEEN %s AND %s + GROUP BY bridge_id, service, model + ORDER BY total_cost DESC + """ + return execute_query(query, (org_id, start_date, end_date)) +``` + +## 🚨 Error Handling & Recovery + +### Database Resilience: +```python +def execute_with_retry(operation, max_retries=3): + """Execute database operation with retry logic""" + for attempt in range(max_retries): + try: + return operation() + except (ConnectionError, TimeoutError) as e: + if attempt == max_retries - 1: + raise e + time.sleep(2 ** attempt) # Exponential backoff +``` + +### Data Consistency: +```python +def ensure_data_consistency(): + """Check and fix data consistency issues""" + # Check for orphaned raw_data records + orphaned_query = """ + SELECT id FROM raw_data + WHERE chat_id NOT IN (SELECT chat_id FROM conversations) + """ + + # Fix orphaned records + fix_query = """ + DELETE FROM raw_data + WHERE chat_id NOT IN (SELECT chat_id FROM conversations) + """ +``` + +## 🔄 Database Flow Summary + +```mermaid +sequenceDiagram + participant BG as Background Tasks + participant MS as Metrics Service + participant PG as PostgreSQL + participant TS as TimescaleDB + participant R as Redis + participant MG as MongoDB + + BG->>MS: process_background_tasks() + MS->>PG: insertRawData(random_id) + PG->>MS: raw_data_ids + MS->>PG: saveConversationHistory() + PG->>MS: actual_chat_id + MS->>PG: updateRawDataChatId() + MS->>TS: insert time-series metrics + MS->>R: update usage cache + MS->>MG: log configuration usage + MS->>BG: completion status +``` + +This database architecture ensures efficient data storage, retrieval, and analytics while maintaining data consistency and performance optimization. diff --git a/docs/flows/middleware-flow.md b/docs/flows/middleware-flow.md new file mode 100644 index 00000000..945ffd58 --- /dev/null +++ b/docs/flows/middleware-flow.md @@ -0,0 +1,210 @@ +# Middleware Flow - Authentication & Configuration + +## 🔐 Authentication & Rate Limiting + +### JWT Middleware +**File**: `src/middlewares/auth_and_rate_limit.py` + +#### Process Flow: +1. **Token Extraction**: Extract JWT from Authorization header +2. **Token Validation**: Verify token signature and expiration +3. **User Context**: Extract user information and permissions +4. **Rate Limit Check**: Validate request limits per bridge/thread + +#### Rate Limiting Rules: +- **Bridge Level**: 100 points per `bridge_id` +- **Thread Level**: 20 points per `thread_id` +- **Time Window**: Configurable sliding window +- **Storage**: Redis-based rate limit tracking + +```python +# Rate limit structure +{ + "bridge_id": { + "count": 95, + "window_start": "2024-01-01T12:00:00Z", + "remaining": 5 + }, + "thread_id": { + "count": 18, + "window_start": "2024-01-01T12:00:00Z", + "remaining": 2 + } +} +``` + +## ⚙️ Configuration Middleware + +### Configuration Enrichment +**File**: `src/middlewares/getDataUsingBridgeId.py` +**Function**: `add_configuration_data_to_body` + +#### Step-by-Step Process: + +### 1. Request Data Extraction +```python +# Extract key identifiers +bridge_id = request.get("bridge_id") +org_id = request.get("org_id") +version_id = request.get("version_id") +service = request.get("service") +model = request.get("model") +``` + +### 2. Configuration Retrieval +**Function**: `getConfiguration()` from `src/services/utils/getConfiguration.py` + +#### Input Parameters: +- `configuration`: Base configuration to merge +- `service`: AI service name (openai, anthropic, etc.) +- `bridge_id`: Bridge identifier +- `apikey`: API key for service +- `template_id`: Optional template ID +- `variables`: Variables for prompt replacement +- `org_id`: Organization ID +- `version_id`: Version ID for bridge +- `extra_tools`: Additional tools to include + +#### Database Operations: +**File**: `src/db_services/ConfigurationServices.py` +**Function**: `get_bridges_with_tools_and_apikeys` + +```python +# MongoDB Aggregation Pipeline +[ + {"$match": {"_id": ObjectId(bridge_id)}}, + {"$lookup": { + "from": "apicalls", + "localField": "_id", + "foreignField": "bridge_id", + "as": "tools" + }}, + {"$lookup": { + "from": "apikeycredentials", + "localField": "org_id", + "foreignField": "org_id", + "as": "apikeys" + }}, + {"$lookup": { + "from": "rag_parent_datas", + "localField": "_id", + "foreignField": "bridge_id", + "as": "rag_data" + }} +] +``` + +### 3. Configuration Assembly + +#### Core Configuration: +- **prompt**: System prompt with tone and response style +- **model**: AI model to use +- **tools**: Available function tools +- **tool_choice**: Tool selection strategy +- **temperature**, **max_tokens**: Model parameters + +#### Metadata: +- **service**: AI service provider +- **apikey**: Service API key +- **bridge_id**: Bridge identifier +- **org_id**: Organization ID +- **variables**: Prompt variables +- **rag_data**: Document data for RAG +- **gpt_memory**: Memory settings +- **tool_call_count**: Maximum tool calls allowed + +### 4. Validation Checks + +#### Required Field Validation: +- `user` message (mandatory unless images provided) +- Valid `service` and `model` combination +- Valid `bridge_id` or `agent_id` +- Organization permissions for custom models + +#### Model/Service Compatibility: +```python +# Service-specific model validation +service_models = { + "openai": ["gpt-4", "gpt-3.5-turbo", "gpt-4-turbo"], + "anthropic": ["claude-3-opus", "claude-3-sonnet"], + "gemini": ["gemini-pro", "gemini-pro-vision"], + "groq": ["llama2-70b-4096", "mixtral-8x7b-32768"] +} +``` + +### 5. Redis Caching + +#### Cache Strategy: +- **Key Pattern**: `config:{bridge_id}:{version_id}` +- **TTL**: 1 hour for active configurations +- **Invalidation**: On configuration updates + +```python +# Cache structure +{ + "configuration": {...}, + "tools": [...], + "apikeys": {...}, + "rag_data": [...], + "cached_at": "2024-01-01T12:00:00Z", + "ttl": 3600 +} +``` + +## 🚨 Error Handling + +### Validation Errors: +- **401 Unauthorized**: Invalid or expired JWT token +- **403 Forbidden**: Organization permission violations +- **429 Too Many Requests**: Rate limit exceeded +- **400 Bad Request**: Missing required fields or invalid model/service combinations + +### Processing Errors: +- **500 Internal Server Error**: Database connection failures +- **502 Bad Gateway**: External service unavailability +- **504 Gateway Timeout**: Configuration retrieval timeout + +### Error Response Format: +```json +{ + "success": false, + "error": "Error description", + "error_code": "RATE_LIMIT_EXCEEDED", + "details": { + "bridge_id": "bridge_123", + "current_usage": 100, + "limit": 100, + "reset_time": "2024-01-01T13:00:00Z" + } +} +``` + +## 🔄 Flow Summary + +```mermaid +sequenceDiagram + participant C as Client + participant AM as Auth Middleware + participant CM as Config Middleware + participant R as Redis + participant DB as Database + participant CC as Chat Controller + + C->>AM: POST with JWT token + AM->>AM: Validate JWT + AM->>R: Check rate limits + R->>AM: Rate limit status + AM->>CM: Authenticated request + CM->>R: Check config cache + alt Cache Hit + R->>CM: Cached configuration + else Cache Miss + CM->>DB: Fetch configuration + DB->>CM: Configuration data + CM->>R: Cache configuration + end + CM->>CM: Validate & enrich request + CM->>CC: Enriched request body +``` + +This middleware layer ensures secure, validated, and properly configured requests reach the chat controller with all necessary context and permissions. diff --git a/docs/flows/response-formatting-flow.md b/docs/flows/response-formatting-flow.md new file mode 100644 index 00000000..d6fd3dbc --- /dev/null +++ b/docs/flows/response-formatting-flow.md @@ -0,0 +1,431 @@ +# Response Formatting Flow - Processing & Delivery + +## 📤 Response Processing Pipeline + +### Main Response Handler +**File**: `src/services/commonServices/common.py` +**Function**: `chat()` (post-execution processing) + +## 🔄 Post-Execution Processing Flow + +### 1. Success Validation +```python +def validate_service_response(service_response): + """Validate AI service execution success""" + if not service_response.get("success", False): + return handle_service_error(service_response) + + return service_response +``` + +### 2. Retry Alert Handling +```python +def handle_retry_alerts(service_response, original_model): + """Handle fallback model notifications""" + if service_response.get("model_used") != original_model: + alert_data = { + "type": "model_fallback", + "original_model": original_model, + "fallback_model": service_response.get("model_used"), + "reason": service_response.get("fallback_reason") + } + send_webhook_notification(alert_data) + + return service_response +``` + +### 3. Chatbot Response Processing +```python +def process_chatbot_response(response, is_chatbot=False): + """Special handling for chatbot responses""" + if is_chatbot: + # Apply chatbot-specific formatting + response = format_chatbot_response(response) + + # Add chatbot metadata + response["chatbot_metadata"] = { + "response_type": "chatbot", + "formatted_at": datetime.utcnow().isoformat() + } + + return response +``` + +### 4. Usage Calculation +**File**: `src/services/utils/token_calculator.py` + +#### Token & Cost Calculation: +```python +def calculate_usage_metrics(response, model_config): + """Calculate comprehensive usage metrics""" + + # Extract token counts + input_tokens = response.get("usage", {}).get("input_tokens", 0) + output_tokens = response.get("usage", {}).get("output_tokens", 0) + + # Calculate costs based on model pricing + input_cost = (input_tokens / 1000) * model_config.get("input_cost_per_1k", 0) + output_cost = (output_tokens / 1000) * model_config.get("output_cost_per_1k", 0) + total_cost = input_cost + output_cost + + return { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "total_tokens": input_tokens + output_tokens, + "input_cost": round(input_cost, 6), + "output_cost": round(output_cost, 6), + "total_cost": round(total_cost, 6), + "model": model_config.get("name"), + "service": model_config.get("service") + } +``` + +## 📋 Response Format Standardization + +### AI Middleware Format +**File**: `src/services/utils/ai_middleware_format.py` + +#### Standard Response Structure: +```python +def format_ai_response(service_response, usage_metrics): + """Convert service response to standard AI middleware format""" + + formatted_response = { + "success": True, + "output": [], + "usage": usage_metrics, + "metadata": { + "service": service_response.get("service"), + "model": service_response.get("model"), + "processing_time_ms": service_response.get("latency_ms"), + "timestamp": datetime.utcnow().isoformat() + } + } + + # Process output content + for item in service_response.get("output", []): + formatted_item = format_output_item(item) + formatted_response["output"].append(formatted_item) + + return formatted_response +``` + +#### Output Item Formatting: +```python +def format_output_item(item): + """Format individual output items""" + + # Handle different content types + if item.get("type") == "function_call": + return format_function_call_item(item) + elif item.get("type") == "text": + return format_text_item(item) + elif item.get("type") == "image": + return format_image_item(item) + else: + return format_generic_item(item) + +def format_text_item(item): + """Format text content item""" + return { + "type": "text", + "content": { + "text": extract_text_content(item) + }, + "metadata": { + "content_type": "text/plain", + "length": len(extract_text_content(item)) + } + } + +def format_function_call_item(item): + """Format function call item""" + return { + "type": "function_call", + "content": { + "name": item.get("name"), + "arguments": item.get("arguments", {}), + "result": item.get("result") + }, + "metadata": { + "execution_time_ms": item.get("execution_time_ms"), + "success": item.get("success", True) + } + } +``` + +## 🎯 Response Type Handling + +### Default Response Format +```python +def handle_default_response(formatted_response): + """Handle default synchronous response""" + return { + "success": True, + "data": formatted_response, + "response_type": "default" + } +``` + +### Queue-Based Response Processing +```python +def handle_queue_response(request_data, response_format): + """Handle non-default response formats via queue""" + + if response_format.get("type") != "default": + # Publish to queue for async processing + queue_message = { + "request_id": generate_request_id(), + "request_data": request_data, + "response_format": response_format, + "timestamp": datetime.utcnow().isoformat() + } + + publish_to_queue("response_processing", queue_message) + + return { + "success": True, + "message": "Request queued for processing", + "request_id": queue_message["request_id"], + "response_type": "queued" + } +``` + +### Webhook Response Delivery +```python +def deliver_webhook_response(response_data, webhook_config): + """Deliver response via webhook""" + + webhook_payload = { + "event": "chat_completion", + "data": response_data, + "timestamp": datetime.utcnow().isoformat(), + "signature": generate_webhook_signature(response_data) + } + + try: + webhook_response = requests.post( + webhook_config["url"], + json=webhook_payload, + headers={ + "Content-Type": "application/json", + "X-Webhook-Signature": webhook_payload["signature"] + }, + timeout=30 + ) + + return { + "success": webhook_response.status_code == 200, + "status_code": webhook_response.status_code, + "delivery_time_ms": webhook_response.elapsed.total_seconds() * 1000 + } + + except Exception as e: + return { + "success": False, + "error": str(e), + "retry_scheduled": True + } +``` + +## 📱 Playground vs Production Mode + +### Playground Mode Response +```python +def format_playground_response(response_data): + """Format response for playground environment""" + + playground_response = { + "success": True, + "output": response_data.get("output", []), + "usage": response_data.get("usage", {}), + "debug_info": { + "processing_steps": response_data.get("debug_steps", []), + "model_config": response_data.get("model_config", {}), + "execution_time": response_data.get("execution_time_ms", 0) + }, + "metadata": response_data.get("metadata", {}) + } + + return playground_response +``` + +### Production Mode Response +```python +def format_production_response(response_data, delivery_config): + """Format response for production environment""" + + production_response = { + "success": True, + "output": response_data.get("output", []), + "usage": response_data.get("usage", {}), + "metadata": { + "service": response_data.get("metadata", {}).get("service"), + "model": response_data.get("metadata", {}).get("model"), + "timestamp": datetime.utcnow().isoformat() + } + } + + # Remove debug information for production + if "debug_info" in production_response: + del production_response["debug_info"] + + return production_response +``` + +## 🔄 Background Task Integration + +### Background Processing Trigger +```python +def trigger_background_processing(response_data, request_context): + """Trigger background tasks after response formatting""" + + background_tasks = [ + save_conversation_history, + update_usage_metrics, + send_analytics_data, + cache_response_data + ] + + for task in background_tasks: + asyncio.create_task( + task(response_data, request_context) + ) +``` + +### Async Background Operations +```python +async def save_conversation_history(response_data, context): + """Save conversation to database""" + conversation_data = { + "thread_id": context.get("thread_id"), + "user_message": context.get("user_input"), + "ai_response": response_data.get("output", []), + "metadata": response_data.get("metadata", {}) + } + + await database.save_conversation(conversation_data) + +async def update_usage_metrics(response_data, context): + """Update usage metrics in TimescaleDB""" + metrics_data = { + "bridge_id": context.get("bridge_id"), + "org_id": context.get("org_id"), + "usage": response_data.get("usage", {}), + "timestamp": datetime.utcnow() + } + + await timescale_db.insert_metrics(metrics_data) +``` + +## 🚨 Error Response Formatting + +### Error Response Structure +```python +def format_error_response(error, context=None): + """Format error responses consistently""" + + error_response = { + "success": False, + "error": { + "type": error.get("type", "unknown_error"), + "message": error.get("message", "An error occurred"), + "code": error.get("code", "INTERNAL_ERROR") + }, + "metadata": { + "timestamp": datetime.utcnow().isoformat(), + "request_id": context.get("request_id") if context else None + } + } + + # Add debug info in development + if os.getenv("ENVIRONMENT") == "development": + error_response["debug"] = { + "stack_trace": error.get("stack_trace"), + "context": context + } + + return error_response +``` + +### Service-Specific Error Handling +```python +def handle_service_errors(service_error, service_name): + """Handle service-specific errors""" + + error_mappings = { + "openai": handle_openai_errors, + "anthropic": handle_anthropic_errors, + "gemini": handle_gemini_errors + } + + handler = error_mappings.get(service_name, handle_generic_error) + return handler(service_error) + +def handle_openai_errors(error): + """Handle OpenAI-specific errors""" + if "rate_limit" in str(error).lower(): + return { + "type": "rate_limit_exceeded", + "message": "OpenAI rate limit exceeded", + "code": "OPENAI_RATE_LIMIT", + "retry_after": extract_retry_after(error) + } + elif "insufficient_quota" in str(error).lower(): + return { + "type": "quota_exceeded", + "message": "OpenAI quota exceeded", + "code": "OPENAI_QUOTA_EXCEEDED" + } + else: + return { + "type": "service_error", + "message": str(error), + "code": "OPENAI_ERROR" + } +``` + +## 📊 Response Metrics & Analytics + +### Response Analytics Collection +```python +def collect_response_analytics(response_data, request_context): + """Collect analytics data for response""" + + analytics_data = { + "response_size_bytes": len(json.dumps(response_data)), + "output_items_count": len(response_data.get("output", [])), + "processing_time_ms": response_data.get("metadata", {}).get("processing_time_ms"), + "success_rate": 1 if response_data.get("success") else 0, + "model_used": response_data.get("metadata", {}).get("model"), + "service_used": response_data.get("metadata", {}).get("service") + } + + # Send to analytics service + send_analytics_event("response_processed", analytics_data) +``` + +## 🔄 Response Flow Summary + +```mermaid +sequenceDiagram + participant CC as Chat Controller + participant RP as Response Processor + participant AF as AI Formatter + participant UC as Usage Calculator + participant BT as Background Tasks + participant C as Client + + CC->>RP: Service response + RP->>RP: Validate success + RP->>RP: Handle retry alerts + RP->>UC: Calculate usage metrics + UC->>RP: Usage data + RP->>AF: Format response + AF->>RP: Standardized response + RP->>BT: Trigger background tasks + RP->>C: Final response + BT->>BT: Save metrics & conversation +``` + +This response formatting system ensures consistent, well-structured responses across all AI services while providing comprehensive error handling and analytics collection. diff --git a/docs/flows/service-layer-flow.md b/docs/flows/service-layer-flow.md new file mode 100644 index 00000000..ecb98572 --- /dev/null +++ b/docs/flows/service-layer-flow.md @@ -0,0 +1,357 @@ +# Service Layer Flow - AI Service Selection & Execution + +## 🎯 Chat Controller Processing + +### Main Chat Function +**File**: `src/services/commonServices/common.py` +**Function**: `chat(request_body)` + +### Step-by-Step Processing: + +#### 1. Request Parsing (`parse_request_body`) +```python +# Extract and structure request data +parsed_data = { + "bridge_id": request.get("bridge_id"), + "configuration": request.get("configuration"), + "thread_id": request.get("thread_id"), + "sub_thread_id": request.get("sub_thread_id"), + "org_id": request.get("org_id"), + "user": request.get("user"), + "service": request.get("service"), + "model": request.get("model"), + "variables": request.get("variables", {}), + "tools": request.get("tools", []), + "is_playground": request.get("is_playground", False), + "response_format": request.get("response_format", {"type": "default"}), + "files": request.get("files", []), + "images": request.get("images", []) +} +``` + +#### 2. Template Enhancement +- Adds default template with current time reference +- Adds user message to variables as `_user_message` +- Processes system templates if specified + +#### 3. Performance Tracking +```python +# Timer initialization for performance monitoring +timer = Timer() +timer.start("overall_execution") +``` + +#### 4. Model Configuration Loading +- Loads model configuration from `model_config_document` +- Extracts custom configuration based on user input +- Handles fine-tuned model selection + +#### 5. Pre-Tools Execution +- Executes pre-configured tools if specified +- Makes HTTP calls to external functions +- Stores results in variables for prompt injection + +#### 6. Thread Management +- Creates or retrieves conversation thread +- Manages sub-thread relationships +- Loads conversation history for context + +#### 7. Prompt Preparation +- Replaces variables in prompt template +- Applies system templates if specified +- Handles memory context for GPT memory +- Identifies missing required variables + +#### 8. Custom Settings Configuration +- Applies service-specific configurations +- Handles response type conversions +- Manages JSON schema formatting + +## 🤖 Service Handler Factory + +### Service Creation +**File**: `src/services/utils/helper.py` +**Function**: `Helper.create_service_handler` + +### Service Mapping: +```python +SERVICE_HANDLERS = { + "openai": "UnifiedOpenAICase", + "gemini": "GeminiHandler", + "anthropic": "Antrophic", + "groq": "Groq", + "openai_response": "OpenaiResponse", + "open_router": "OpenRouter", + "mistral": "Mistral" +} +``` + +### Service Parameter Building: +```python +service_params = { + "customConfig": custom_config, + "configuration": full_configuration, + "apikey": service_api_key, + "user": user_message, + "tools": available_tools, + "org_id": organization_id, + "bridge_id": bridge_identifier, + "thread_id": thread_identifier, + "model": model_name, + "service": service_name, + "token_calculator": cost_tracking_object, + "variables": prompt_variables, + "memory": memory_context, + "rag_data": document_data +} +``` + +## 🔄 Service Execution Flow + +### OpenAI Service Example +**File**: `src/services/commonServices/openAI/openaiCall.py` +**Class**: `UnifiedOpenAICase` + +#### Execution Steps: + +### 1. Conversation Creation +**File**: `src/services/commonServices/createConversations.py` +**Function**: `ConversationService.createOpenAiConversation` + +#### Conversation Processing: +- Adds memory context if GPT memory is enabled +- Processes conversation history with role-based formatting +- Handles image URLs and file attachments +- Creates proper message structure for OpenAI API + +```python +# OpenAI conversation format +conversation = [ + { + "role": "system", + "content": system_prompt + }, + { + "role": "user", + "content": user_message, + "images": image_urls # if applicable + } +] +``` + +### 2. Service-Specific Conversation Formats: + +#### OpenAI Standard: +```python +def createOpenAiConversation(history, memory_context): + # Standard OpenAI chat completion format + return formatted_messages +``` + +#### OpenAI Response API: +```python +def createOpenAiResponseConversation(history, memory_context): + # OpenAI Response API specific format + return formatted_messages +``` + +#### Anthropic Claude: +```python +def createAnthropicConversation(history, memory_context): + # Claude-specific message format + return formatted_messages +``` + +#### Google Gemini: +```python +def createGeminiConversation(history, memory_context): + # Gemini-specific format with parts structure + return formatted_messages +``` + +### 3. Model API Call Routing +**File**: `src/services/commonServices/baseService/baseService.py` +**Function**: `self.chats()` + +#### Service Routing Logic: +```python +def route_to_service(service_name): + routing_map = { + "openai": "runModel", + "openai_response": "openai_response_model", + "anthropic": "anthropic_runmodel", + "groq": "groq_runmodel", + "gemini": "gemini_modelrun", + "mistral": "mistral_model_run", + "open_router": "openrouter_modelrun" + } + return routing_map.get(service_name) +``` + +### 4. OpenAI Model Execution +**File**: `src/services/commonServices/openAI/runModel.py` +**Function**: `runModel` + +#### Key Features: +- Async OpenAI client initialization +- Retry mechanism with alternative model fallback +- Streaming response handling +- Token usage tracking +- Error handling and logging + +```python +async def runModel(params): + client = AsyncOpenAI(api_key=params["apikey"]) + + try: + response = await client.chat.completions.create( + model=params["model"], + messages=params["conversation"], + tools=params["tools"], + temperature=params.get("temperature", 0.7), + max_tokens=params.get("max_tokens", 1000) + ) + return process_response(response) + except Exception as e: + return handle_fallback(e, params) +``` + +## 🔧 Tool Processing + +### Function Call Detection +**File**: `src/services/commonServices/openAI/openai_response.py` + +#### Enhanced Detection Strategy: +```python +def detect_function_calls(response): + # Check multiple response formats + detection_methods = [ + check_tool_calls, + check_function_call, + check_embedded_calls, + check_reasoning_calls + ] + + for method in detection_methods: + if method(response): + return True + return False +``` + +### Tool Execution +**File**: `src/services/commonServices/baseService/baseService.py` +**Function**: `execute_tools_concurrently` + +#### Concurrent Tool Processing: +```python +async def execute_tools_concurrently(tool_calls): + tasks = [] + for tool_call in tool_calls: + task = asyncio.create_task( + execute_single_tool(tool_call) + ) + tasks.append(task) + + results = await asyncio.gather(*tasks, return_exceptions=True) + return process_tool_results(results) +``` + +### Tool Result Integration +**Function**: `update_model_response` in `baseService.py` + +#### OpenAI Response Integration: +```python +def update_model_response(original_response, tool_results, service): + if service == "openai_response": + # Merge tool results into original response + merged_output = merge_function_call_results( + original_response.get("output", []), + tool_results + ) + + # Combine usage tokens + combined_usage = combine_usage_tokens( + original_response.get("usage", {}), + tool_results.get("usage", {}) + ) + + return { + "output": merged_output, + "usage": combined_usage + } +``` + +## 📊 Response Processing + +### Content Extraction +**File**: `src/services/utils/ai_middleware_format.py` + +#### Function Call Content: +```python +def extract_function_call_content(call): + # Extract function name and arguments + name = call.get("name", "unknown_function") + arguments = call.get("arguments", {}) + + return f"Function call: {name} with arguments: {arguments}" +``` + +#### Fallback Content Mechanism: +```python +def extract_content_with_fallback(item): + # Try multiple content sources + content_sources = [ + lambda: item.get("content", {}).get("text"), + lambda: item.get("message"), + lambda: item.get("output_text"), + lambda: item.get("reasoning") + ] + + for source in content_sources: + content = source() + if content: + return content + + return "No content available" +``` + +## 🔄 Service Flow Summary + +```mermaid +sequenceDiagram + participant CC as Chat Controller + participant SF as Service Factory + participant CS as Conversation Service + participant AS as AI Service + participant TE as Tool Executor + participant RF as Response Formatter + + CC->>CC: Parse request & prepare + CC->>SF: Create service handler + SF->>CS: Create conversation format + CS->>AS: Formatted conversation + AS->>AS: Execute AI model call + AS->>TE: Execute function calls (if any) + TE->>AS: Tool execution results + AS->>RF: Process & format response + RF->>CC: Final formatted response +``` + +## 🚨 Error Handling & Fallbacks + +### Model Fallback Strategy: +```python +FALLBACK_MODELS = { + "gpt-4": ["gpt-4-turbo", "gpt-3.5-turbo"], + "claude-3-opus": ["claude-3-sonnet", "claude-3-haiku"], + "gemini-pro": ["gemini-pro-vision"] +} +``` + +### Retry Mechanism: +- **Max Retries**: 3 attempts per request +- **Backoff Strategy**: Exponential backoff +- **Fallback Models**: Automatic alternative model selection +- **Error Logging**: Comprehensive error tracking + +This service layer ensures robust AI service execution with comprehensive error handling, tool processing, and response formatting capabilities.