diff --git a/flexus_client_kit/ckit_bot_exec.py b/flexus_client_kit/ckit_bot_exec.py index 5ef1f7ef..3671374a 100644 --- a/flexus_client_kit/ckit_bot_exec.py +++ b/flexus_client_kit/ckit_bot_exec.py @@ -77,8 +77,9 @@ def __init__(self, fclient: ckit_client.FlexusClient, p: ckit_bot_query.FPersona self._parked_messages: Dict[str, ckit_ask_model.FThreadMessageOutput] = {} self._parked_threads: Dict[str, ckit_ask_model.FThreadOutput] = {} self._parked_tasks: Dict[str, ckit_kanban.FPersonaKanbanTaskOutput] = {} - self._parked_toolcalls: List[ckit_cloudtool.FCloudtoolCall] = [] - self._parked_erp_changes: List[tuple[str, str, Optional[Dict[str, Any]], Optional[Dict[str, Any]]]] = [] + self._parked_toolcalls: Dict[str, ckit_cloudtool.FCloudtoolCall] = {} + self._processing_toolcalls: set[str] = set() + self._parked_erp_changes: Dict[tuple[str, str], tuple[str, Optional[Dict[str, Any]], Optional[Dict[str, Any]]]] = {} self._parked_anything_new = asyncio.Event() # These fields are designed for direct access: self.fclient = fclient @@ -153,9 +154,9 @@ async def unpark_collected_events(self, sleep_if_no_work: float, turn_tool_calls except Exception as e: logger.error("%s error in on_updated_task handler: %s\n%s", self.persona.persona_id, type(e).__name__, e, exc_info=e) - erp_changes = list(self._parked_erp_changes) + erp_changes = list(self._parked_erp_changes.items()) self._parked_erp_changes.clear() - for table_name, action, new_record_dict, old_record_dict in erp_changes: + for (table_name, record_id), (action, new_record_dict, old_record_dict) in erp_changes: did_anything = True handler = self._handler_per_erp_table_change.get(table_name) if handler: @@ -167,7 +168,10 @@ async def unpark_collected_events(self, sleep_if_no_work: float, turn_tool_calls except Exception as e: logger.error("%s error in on_erp_change(%r) handler: %s\n%s", self.persona.persona_id, table_name, type(e).__name__, e, exc_info=e) - mycalls = list(self._parked_toolcalls) + mycalls = list(self._parked_toolcalls.values()) + for c in mycalls: + if c.fcall_name in self._handler_per_tool: + self._processing_toolcalls.add(c.fcall_id) self._parked_toolcalls.clear() for c in mycalls: did_anything = True @@ -179,8 +183,11 @@ async def unpark_collected_events(self, sleep_if_no_work: float, turn_tool_calls await self._local_tool_call(self.fclient, c) except Exception as e: logger.error("%s error in on_tool_call() handler: %s\n%s", self.persona.persona_id, type(e).__name__, e, exc_info=e) + finally: + self._processing_toolcalls.discard(c.fcall_id) else: task = asyncio.create_task(self._local_tool_call(self.fclient, c)) + task.add_done_callback(lambda _, fcall_id=c.fcall_id: self._processing_toolcalls.discard(fcall_id)) task.add_done_callback(lambda t: self.bg_call_tasks.discard(t)) self.bg_call_tasks.add(task) @@ -469,9 +476,11 @@ async def subscribe_and_produce_callbacks( toolcall = upd.news_payload_toolcall persona_id = toolcall.connected_persona_id if persona_id in bc.bots_running: - logger.info("%s parked tool call %s %s", persona_id, toolcall.fcall_id, toolcall.fcall_name) - bc.bots_running[persona_id].instance_rcx._parked_toolcalls.append(toolcall) - bc.bots_running[persona_id].instance_rcx._parked_anything_new.set() + rcx = bc.bots_running[persona_id].instance_rcx + if toolcall.fcall_id not in rcx._processing_toolcalls and toolcall.fcall_id not in rcx._parked_toolcalls: + logger.info("%s parked tool call %s %s", persona_id, toolcall.fcall_id, toolcall.fcall_name) + rcx._parked_toolcalls[toolcall.fcall_id] = toolcall + rcx._parked_anything_new.set() else: logger.info("%s is about persona=%s which is not running here." % (toolcall.fcall_id, persona_id)) @@ -498,8 +507,9 @@ async def subscribe_and_produce_callbacks( handled = True new_record = upd.news_payload_erp_record_new old_record = upd.news_payload_erp_record_old + record_id = upd.news_payload_id for bot in bc.bots_running.values(): - bot.instance_rcx._parked_erp_changes.append((table_name, upd.news_action, new_record, old_record)) + bot.instance_rcx._parked_erp_changes[(table_name, record_id)] = (upd.news_action, new_record, old_record) bot.instance_rcx._parked_anything_new.set() elif upd.news_action == "INITIAL_UPDATES_OVER":