Skip to content
Merged

Testing #1146

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ffa609a
fix: include message_id in error responses for chat function
husainhackerrank-afk Dec 29, 2025
333553e
Big Solved: Testcase Score not working
adityajunwal Dec 29, 2025
d354ee1
feat: integrate Hippocampus API for saving chatbot conversations
Husainbw786 Dec 30, 2025
eb7d18e
Merge pull request #1131 from Walkover-Web-Solution/message_id_in_err…
harshhsahu Dec 30, 2025
08c11fc
batch api service
adityajunwal Dec 30, 2025
4998f9a
Merge pull request #1125 from Walkover-Web-Solution/new_batch_services
harshhsahu Dec 30, 2025
b791593
refactor: update conversation saving to Hippocampus API with JSON for…
husainhackerrank-afk Dec 31, 2025
61cc015
Merge pull request #1132 from Walkover-Web-Solution/testcase_score_bug
harshhsahu Dec 31, 2025
e05011c
feat: Introduce `chatbot_auto_answers` configuration to conditionally…
husainhackerrank-afk Dec 31, 2025
a0e4316
fix: Validate `HIPPOCAMPUS_API_URL` configuration and send stringifie…
husainhackerrank-afk Dec 31, 2025
28378bc
Merge pull request #1134 from Walkover-Web-Solution/rag_similarity_fo…
harshhsahu Dec 31, 2025
8ad71b8
refactor: remove environment variable for Hippocampus API URL and use…
husainhackerrank-afk Dec 31, 2025
63e1987
Merge pull request #1136 from Walkover-Web-Solution/fix_env_name
harshhsahu Dec 31, 2025
2401505
add gtwy web search inbuild tool
Dec 31, 2025
23aba70
Merge pull request #1104 from Walkover-Web-Solution/add_web_crowler
harshhsahu Dec 31, 2025
e848593
Fixed a Bug in Anthropic Service
adityajunwal Jan 2, 2026
24db30f
Merge pull request #1137 from Walkover-Web-Solution/new_batch_services
harshhsahu Jan 2, 2026
4c01b43
fix: Correctly retrieve 'chatbot_auto_answers' from bridge_data inste…
husainhackerrank-afk Jan 2, 2026
4980018
Merge pull request #1138 from Walkover-Web-Solution/falg_for_chatbot_…
harshhsahu Jan 2, 2026
9851643
openai-image generation support (#1139)
ViaSocket-Git Jan 3, 2026
e88ac60
Removed untested services (#1142)
ViaSocket-Git Jan 3, 2026
5232cb2
feat: Enhance logging for saving conversations to Hippocampus in Queu…
Husainbw786 Jan 5, 2026
b70bbf4
Logger added dev (#1144)
Husainbw786 Jan 5, 2026
3c66937
refactor: Remove redundant logging statements in Queue2 service for s…
Husainbw786 Jan 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion config.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,7 @@ class Config:
DOCSTAR_COLLECTION_ID = os.getenv('DOCSTAR_COLLECTION_ID')
AI_ML_APIKEY = os.getenv('AI_ML_APIKEY')
AI_MIDDLEWARE_PAUTH_KEY = os.getenv('AI_MIDDLEWARE_PAUTH_KEY')
OPENAI_API_KEY_GPT_5_NANO = os.getenv('OPENAI_API_KEY_GPT_5_NANO')
OPENAI_API_KEY_GPT_5_NANO = os.getenv('OPENAI_API_KEY_GPT_5_NANO')
HIPPOCAMPUS_API_KEY = os.getenv('HIPPOCAMPUS_API_KEY')
HIPPOCAMPUS_COLLECTION_ID = os.getenv('HIPPOCAMPUS_COLLECTION_ID')
FIRECRAWL_API_KEY = os.getenv('FIRECRAWL_API_KEY')
2 changes: 1 addition & 1 deletion docs/CHAT_COMPLETION_FLOW.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ The `getConfiguration` function assembles:
**Service Mapping:**
- `openai` → `UnifiedOpenAICase`
- `gemini` → `GeminiHandler`
- `anthropic` → `Antrophic`
- `anthropic` → `Anthropic`
- `groq` → `Groq`
- `openai_response` → `OpenaiResponse`
- `open_router` → `OpenRouter`
Expand Down
4 changes: 4 additions & 0 deletions src/configs/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,8 @@
"gemini":"gemini-2.5-flash",
"ai_ml": "gpt-oss-20b",
"grok": "grok-4-fast"
}

inbuild_tools = {
"Gtwy_Web_Search":"Gtwy_Web_Search"
}
40 changes: 39 additions & 1 deletion src/services/cache_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,42 @@ def make_json_serializable(data):
except (TypeError, OverflowError):
return str(data)

__all__ = ['delete_in_cache', 'store_in_cache', 'find_in_cache', 'find_in_cache_and_expire', 'store_in_cache_permanent_until_read', 'verify_ttl', 'clear_cache']
async def acquire_lock(lock_key: str, ttl: int = 600) -> bool:
"""
Acquire a distributed lock using Redis SET NX EX pattern.

Args:
lock_key: Unique identifier for the lock
ttl: Time-to-live in seconds (default: 600 seconds = 10 minutes)

Returns:
True if lock was acquired, False otherwise
"""
try:
full_key = f"{REDIS_PREFIX}lock_{lock_key}"
# SET NX EX: Set if Not eXists with EXpiration
result = await client.set(full_key, "locked", nx=True, ex=ttl)
return result is not None
except Exception as e:
logger.error(f"Error acquiring lock for {lock_key}: {str(e)}")
return False

async def release_lock(lock_key: str) -> bool:
"""
Release a distributed lock.

Args:
lock_key: Unique identifier for the lock

Returns:
True if lock was released, False otherwise
"""
try:
full_key = f"{REDIS_PREFIX}lock_{lock_key}"
result = await client.delete(full_key)
return result > 0
except Exception as e:
logger.error(f"Error releasing lock for {lock_key}: {str(e)}")
return False

__all__ = ['delete_in_cache', 'store_in_cache', 'find_in_cache', 'find_in_cache_and_expire', 'store_in_cache_permanent_until_read', 'verify_ttl', 'clear_cache', 'acquire_lock', 'release_lock']
107 changes: 107 additions & 0 deletions src/services/commonServices/Google/gemini_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import pydash as _
from ..baseService.baseService import BaseService
from ..createConversations import ConversationService
from src.configs.constant import service_name
import json
import uuid
from ...cache_service import store_in_cache
from src.configs.constant import redis_keys
from src.services.commonServices.Google.gemini_run_batch import create_batch_file, process_batch_file


class GeminiBatch(BaseService):
async def batch_execute(self):
system_prompt = self.configuration.get('prompt', '')
batch_requests = []
message_mappings = []

# Validate batch_variables if provided
batch_variables = self.batch_variables if hasattr(self, 'batch_variables') and self.batch_variables else None
if batch_variables is not None:
if not isinstance(batch_variables, list):
return {
"success": False,
"message": "batch_variables must be an array"
}
if len(batch_variables) != len(self.batch):
return {
"success": False,
"message": f"batch_variables array length ({len(batch_variables)}) must match batch array length ({len(self.batch)})"
}

# Construct batch requests in Gemini JSONL format
for idx, message in enumerate(self.batch, start=1):
# Generate a unique key for each request
custom_id = str(uuid.uuid4())

# Construct Gemini native format request
request_content = {
'contents': [
{
'parts': [
{'text': message}
]
}
]
}

# Add system instruction
request_content['config'] = {
'system_instruction': {
'parts': [{'text': system_prompt}]
}
}

# Add other config from customConfig (like temperature, max_tokens, etc.)
if self.customConfig:
if 'config' not in request_content:
request_content['config'] = {}
# Merge customConfig into config, excluding any messages/prompt fields
for key, value in self.customConfig.items():
if key not in ['messages', 'prompt', 'model']:
request_content['config'][key] = value

# Create JSONL entry with key and request
batch_entry = {
"key": custom_id,
"request": request_content
}
batch_requests.append(json.dumps(batch_entry))

# Store message mapping for response
mapping_item = {
"message": message,
"custom_id": custom_id
}

# Add batch_variables to mapping if provided (idx-1 because enumerate starts at 1)
if batch_variables is not None:
mapping_item["variables"] = batch_variables[idx - 1]

message_mappings.append(mapping_item)

# Upload batch file and create batch job
uploaded_file = await create_batch_file(batch_requests, self.apikey)
batch_job = await process_batch_file(uploaded_file, self.apikey, self.model)

batch_id = batch_job.name
batch_json = {
"id": batch_job.name,
"state": batch_job.state,
"create_time": batch_job.create_time,
"model": batch_job.model or self.model,
"apikey": self.apikey,
"webhook": self.webhook,
"batch_variables": batch_variables,
"custom_id_mapping": {item["custom_id"]: idx for idx, item in enumerate(message_mappings)},
"service": self.service,
"uploaded_file": uploaded_file.name
}
cache_key = f"{redis_keys['batch_']}{batch_job.name}"
await store_in_cache(cache_key, batch_json, ttl = 86400)
return {
"success": True,
"message": "Response will be successfully sent to the webhook wihtin 24 hrs.",
"batch_id": batch_id,
"messages": message_mappings
}
215 changes: 215 additions & 0 deletions src/services/commonServices/Google/gemini_run_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import json
import uuid
import tempfile
import os
from google import genai
from google.genai import types

async def create_batch_file(batch_requests, apiKey):
"""
Creates a JSONL file and uploads it to Gemini File API.

Args:
batch_requests: List of JSON strings (JSONL entries)
apiKey: Gemini API key

Returns:
Uploaded file object from Gemini File API
"""
try:
# Initialize Gemini client
client = genai.Client(api_key=apiKey)

# Create JSONL file content
jsonl_content = "\n".join(batch_requests)

# Create a temporary file to upload
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False, encoding='utf-8') as temp_file:
temp_file.write(jsonl_content)
temp_file_path = temp_file.name

try:
# Upload the JSONL file to Gemini File API
uploaded_file = client.files.upload(
file=temp_file_path,
config=types.UploadFileConfig(
display_name=f'batch-{uuid.uuid4()}',
mime_type='application/jsonl'
)
)
return uploaded_file
finally:
# Clean up temporary file
if os.path.exists(temp_file_path):
os.remove(temp_file_path)
except Exception as e:
print("Error in Gemini create_batch_file:", repr(e))
print("Cause:", repr(getattr(e, "__cause__", None)))
raise

async def process_batch_file(uploaded_file, apiKey, model):
"""
Creates a batch job using the uploaded file.

Args:
uploaded_file: File object from create_batch_file
apiKey: Gemini API key
model: Model name to use for batch processing

Returns:
Batch job object
"""
try:
# Initialize Gemini client
client = genai.Client(api_key=apiKey)

# Create batch job with the uploaded file
batch_job = client.batches.create(
model=model,
src=uploaded_file.name,
config={
'display_name': f'batch-job-{uuid.uuid4()}',
}
)
print(f"Created batch job: {batch_job.name}")
return batch_job
except Exception as e:
print(f"Error in Gemini process_batch_file: {e}")
raise

async def retrieve_batch_status(batch_id, apiKey):
"""
Retrieves the status of a batch job.

Args:
batch_id: Batch job name
apiKey: Gemini API key

Returns:
Batch job object with current status
"""
try:
# Initialize Gemini client
client = genai.Client(api_key=apiKey)

# Get batch job status
batch = client.batches.get(name=batch_id)
print(f"Batch status: {batch.state}")
return batch
except Exception as e:
print(f"Error in Gemini retrieve_batch_status: {e}")
raise


async def download_gemini_file(file_uri, apikey):
"""
Helper function to download and parse a Gemini batch result file.

Args:
file_uri: The file URI to download
apikey: Gemini API key

Returns:
List of parsed JSON lines, or empty list if file doesn't exist or fails to parse
"""
if not file_uri:
return []

try:
client = genai.Client(api_key=apikey)
file_content = client.files.get(name=file_uri).read()

try:
results = [json.loads(line) for line in file_content.decode('utf-8').splitlines() if line.strip()]
return results
except json.JSONDecodeError as e:
print(f"JSON decoding error for file {file_uri}: {e}")
return []
except Exception as e:
print(f"Error downloading file {file_uri}: {e}")
return []


async def handle_batch_results(batch_id, apikey):
"""
Handle Gemini batch processing - retrieve status and process results.

Args:
batch_id: Batch ID
apikey: Gemini API key

Returns:
Tuple of (results, is_completed)
- For succeeded batches: (results_list, True)
- For failed/expired/cancelled with partial results: (combined_results, True)
- For failed/expired/cancelled without results: (error_info, True)
- For in-progress: (None, False) - continues polling
"""
batch = await retrieve_batch_status(batch_id, apikey)
state = batch.state

# In-progress states - continue polling
if state in [types.BatchState.STATE_PENDING, types.BatchState.STATE_RUNNING]:
return None, False

# Terminal states - download results
output_uri = batch.output_uri

# For Gemini, there's no separate error file like OpenAI
# All results (success and errors) are in the output file
output_results = await download_gemini_file(output_uri, apikey)

if output_results:
# We have some results (partial or complete)
return output_results, True

# No results available - return error based on state
if state == types.BatchState.STATE_SUCCEEDED:
# Succeeded but no output - unusual case
error_info = [{
"error": {
"message": "Batch succeeded but no result file was generated",
"type": "no_results",
"batch_status": state.name
},
"status_code": 400
}]
elif state == types.BatchState.STATE_FAILED:
error_info = [{
"error": {
"message": f"Batch failed. Error: {getattr(batch, 'error', 'No error details available')}",
"type": "batch_failed",
"batch_status": state.name
},
"status_code": 400
}]
elif state == types.BatchState.STATE_EXPIRED:
error_info = [{
"error": {
"message": "Batch expired - running or pending for more than 48 hours and no partial results available",
"type": "batch_expired",
"batch_status": state.name
},
"status_code": 400
}]
elif state == types.BatchState.STATE_CANCELLED:
error_info = [{
"error": {
"message": "Batch was cancelled and no partial results available",
"type": "batch_cancelled",
"batch_status": state.name
},
"status_code": 400
}]
else:
# Unknown terminal state
error_info = [{
"error": {
"message": f"Batch reached unknown terminal state: {state.name}",
"type": "unknown_status",
"batch_status": state.name
},
"status_code": 400
}]

return error_info, True
Loading