From 7d196329dd0f825c1a642cb3380f303b1339ee0d Mon Sep 17 00:00:00 2001 From: Soulter <905617992@qq.com> Date: Thu, 15 Jan 2026 13:10:50 +0800 Subject: [PATCH] fix: ensure message stream order --- .../sources/webchat/webchat_adapter.py | 5 +++-- .../platform/sources/webchat/webchat_event.py | 19 ++++++++++++++++--- astrbot/dashboard/routes/chat.py | 10 ++++++++++ 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/astrbot/core/platform/sources/webchat/webchat_adapter.py b/astrbot/core/platform/sources/webchat/webchat_adapter.py index 1ad68136e..e799e396e 100644 --- a/astrbot/core/platform/sources/webchat/webchat_adapter.py +++ b/astrbot/core/platform/sources/webchat/webchat_adapter.py @@ -93,7 +93,8 @@ async def send_by_session( session: MessageSesion, message_chain: MessageChain, ): - await WebChatMessageEvent._send(message_chain, session.session_id) + message_id = f"active_{str(uuid.uuid4())}" + await WebChatMessageEvent._send(message_id, message_chain, session.session_id) await super().send_by_session(session, message_chain) async def _get_message_history( @@ -196,7 +197,7 @@ async def convert_message(self, data: tuple) -> AstrBotMessage: abm.session_id = f"webchat!{username}!{cid}" - abm.message_id = str(uuid.uuid4()) + abm.message_id = payload.get("message_id") # 处理消息段列表 message_parts = payload.get("message", []) diff --git a/astrbot/core/platform/sources/webchat/webchat_event.py b/astrbot/core/platform/sources/webchat/webchat_event.py index 2e529bb1d..ab1370b3d 100644 --- a/astrbot/core/platform/sources/webchat/webchat_event.py +++ b/astrbot/core/platform/sources/webchat/webchat_event.py @@ -21,7 +21,10 @@ def __init__(self, message_str, message_obj, platform_meta, session_id): @staticmethod async def _send( - message: MessageChain | None, session_id: str, streaming: bool = False + message_id: str, + message: MessageChain | None, + session_id: str, + streaming: bool = False, ) -> str | None: cid = session_id.split("!")[-1] web_chat_back_queue = webchat_queue_mgr.get_or_create_back_queue(cid) @@ -31,6 +34,7 @@ async def _send( "type": "end", "data": "", "streaming": False, + "message_id": message_id, }, # end means this request is finished ) return @@ -45,6 +49,7 @@ async def _send( "data": data, "streaming": streaming, "chain_type": message.type, + "message_id": message_id, }, ) elif isinstance(comp, Json): @@ -54,6 +59,7 @@ async def _send( "data": json.dumps(comp.data, ensure_ascii=False), "streaming": streaming, "chain_type": message.type, + "message_id": message_id, }, ) elif isinstance(comp, Image): @@ -69,6 +75,7 @@ async def _send( "type": "image", "data": data, "streaming": streaming, + "message_id": message_id, }, ) elif isinstance(comp, Record): @@ -84,6 +91,7 @@ async def _send( "type": "record", "data": data, "streaming": streaming, + "message_id": message_id, }, ) elif isinstance(comp, File): @@ -100,6 +108,7 @@ async def _send( "type": "file", "data": data, "streaming": streaming, + "message_id": message_id, }, ) else: @@ -108,7 +117,8 @@ async def _send( return data async def send(self, message: MessageChain | None): - await WebChatMessageEvent._send(message, session_id=self.session_id) + message_id = self.message_obj.message_id + await WebChatMessageEvent._send(message_id, message, session_id=self.session_id) await super().send(MessageChain([])) async def send_streaming(self, generator, use_fallback: bool = False): @@ -116,6 +126,7 @@ async def send_streaming(self, generator, use_fallback: bool = False): reasoning_content = "" cid = self.session_id.split("!")[-1] web_chat_back_queue = webchat_queue_mgr.get_or_create_back_queue(cid) + message_id = self.message_obj.message_id async for chain in generator: # if chain.type == "break" and final_data: # # 分割符 @@ -130,7 +141,8 @@ async def send_streaming(self, generator, use_fallback: bool = False): # continue r = await WebChatMessageEvent._send( - chain, + message_id=message_id, + message=chain, session_id=self.session_id, streaming=True, ) @@ -147,6 +159,7 @@ async def send_streaming(self, generator, use_fallback: bool = False): "data": final_data, "reasoning": reasoning_content, "streaming": True, + "message_id": message_id, }, ) await super().send_streaming(generator, use_fallback) diff --git a/astrbot/dashboard/routes/chat.py b/astrbot/dashboard/routes/chat.py index c42bc4f64..de12daab9 100644 --- a/astrbot/dashboard/routes/chat.py +++ b/astrbot/dashboard/routes/chat.py @@ -296,6 +296,8 @@ async def chat(self): # 构建用户消息段(包含 path 用于传递给 adapter) message_parts = await self._build_user_message_parts(message) + message_id = str(uuid.uuid4()) + async def stream(): client_disconnected = False accumulated_parts = [] @@ -319,6 +321,13 @@ async def stream(): if not result: continue + if ( + "message_id" in result + and result["message_id"] != message_id + ): + logger.warning("webchat stream message_id mismatch") + continue + result_text = result["data"] msg_type = result.get("type") streaming = result.get("streaming", False) @@ -456,6 +465,7 @@ async def stream(): "selected_provider": selected_provider, "selected_model": selected_model, "enable_streaming": enable_streaming, + "message_id": message_id, }, ), )