-
Notifications
You must be signed in to change notification settings - Fork 31
Open
Description
[Bug Report] Stream SDK 偶现消息丢失问题
问题描述
使用钉钉 Stream SDK (dingtalk-stream-sdk-python) 时,偶尔会出现收不到回调消息的情况。经过代码审查,发现 SDK 实现中存在多个可能导致消息丢失的潜在问题。
环境信息
- SDK 版本: dingtalk-stream-sdk-python
- Python 版本: 3.9
- 操作系统: 5.15.0-78-generic #85-Ubuntu SMP Fri Jul 7 15:25:09 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux
问题分析
1. 异步任务未被正确管理和追踪
位置: client.py 第 67-68 行
async for raw_message in websocket:
json_message = json.loads(raw_message)
asyncio.create_task(self.background_task(json_message))问题:
asyncio.create_task()创建的任务没有被保存引用- 任务执行过程中的异常可能被静默忽略
- 在高并发场景下,大量任务同时运行可能导致资源耗尽
- 无法追踪任务执行状态,难以排查消息处理失败的原因
影响: 当 background_task 或 route_message 中出现异常时,消息会丢失且没有明确的错误日志。
2. 消息处理异常捕获范围过大
位置: client.py 第 77-80 行
async def background_task(self, json_message):
try:
route_result = await self.route_message(json_message)
if route_result == DingTalkStreamClient.TAG_DISCONNECT:
await self.websocket.close()
except Exception as e:
self.logger.error(f"error processing message: {e}")问题:
- 捕获所有异常但仅记录简单的错误信息
- 缺少消息内容、堆栈跟踪等关键调试信息
- 异常发生后消息直接丢弃,没有重试机制
影响: 当业务 handler 处理逻辑有问题时,消息会静默丢失,难以排查根因。
3. Keepalive 任务生命周期管理不当
位置: client.py 第 66 行
asyncio.create_task(self.keepalive(websocket))问题:
- keepalive 任务没有被取消或等待
- 当 websocket 连接异常退出时,keepalive 任务可能继续运行
- 可能导致资源泄漏和意外的 websocket 操作
4. 网络异常重连延迟过长
位置: client.py 第 60 行和 74 行
await asyncio.sleep(10) # 连接失败时
await asyncio.sleep(10) # 网络异常时问题:
- 10 秒的重连间隔在网络短暂抖动时过长
- 这段时间内的消息会完全丢失
- 缺少指数退避或更智能的重连策略
5. JSON 解析异常未单独处理
位置: client.py 第 68 行
json_message = json.loads(raw_message)问题:
- JSON 解析失败会导致整个 websocket 连接中断
- 应该只跳过当前消息并记录错误,而不是影响后续消息
复现步骤
由于是偶发问题,较难稳定复现,但以下场景更容易触发:
- 高并发消息推送(如批量发送钉钉消息)
- 网络环境不稳定时
- Handler 处理逻辑耗时较长或存在异常时
- 服务器负载较高时
预期行为
- 所有收到的消息都应该被可靠处理
- 处理失败的消息应该有明确的错误日志和堆栈信息
- 异步任务异常不应该静默丢失
- 网络异常恢复后应该快速重连
建议的修复方案
方案 1: 任务追踪和异常处理
async def start(self):
self.pre_start()
self._running_tasks = set() # 追踪运行中的任务
while True:
keepalive_task = None
try:
connection = self.open_connection()
if not connection:
self.logger.error('open connection failed')
await asyncio.sleep(5)
continue
uri = f'{connection["endpoint"]}?ticket={quote_plus(connection["ticket"])}'
async with websockets.connect(uri) as websocket:
self.websocket = websocket
keepalive_task = asyncio.create_task(self.keepalive(websocket))
async for raw_message in websocket:
try:
json_message = json.loads(raw_message)
task = asyncio.create_task(self.background_task(json_message))
self._running_tasks.add(task)
task.add_done_callback(lambda t: self._task_done_callback(t))
except json.JSONDecodeError as e:
self.logger.error(f'Invalid JSON message: {e}, raw={raw_message[:200]}')
except KeyboardInterrupt:
break
except Exception as e:
self.logger.exception('Connection error: %s', e)
await asyncio.sleep(3)
finally:
if keepalive_task:
keepalive_task.cancel()
try:
await keepalive_task
except asyncio.CancelledError:
pass
def _task_done_callback(self, task):
self._running_tasks.discard(task)
if task.exception():
self.logger.error(f"Task failed with exception: {task.exception()}",
exc_info=task.exception())方案 2: 增强错误日志
async def background_task(self, json_message):
message_id = json_message.get('headers', {}).get('messageId', 'unknown')
try:
self.logger.debug(f"Processing message: {message_id}")
route_result = await self.route_message(json_message)
if route_result == DingTalkStreamClient.TAG_DISCONNECT:
await self.websocket.close()
self.logger.debug(f"Message processed successfully: {message_id}")
except Exception as e:
self.logger.error(
f"Error processing message: {message_id}, "
f"type: {json_message.get('type')}, "
f"topic: {json_message.get('headers', {}).get('topic')}, "
f"error: {e}",
exc_info=True
)方案 3: 添加消息处理监控
建议在 SDK 中添加以下统计指标:
self._metrics = {
'messages_received': 0,
'messages_processed': 0,
'messages_failed': 0,
'reconnect_count': 0,
}临时解决方案
在官方修复之前,用户可以通过以下方式缓解问题:
- 在业务代码的 handler 中增加 try-except 确保不抛出未捕获异常
- 添加应用层的消息去重和重试机制
- 监控 SDK 的错误日志,及时发现问题
- 降低单个 handler 的处理时间,避免阻塞
相关信息
- 相关文件:
dingtalk_stream/client.py - 相关方法:
start(),background_task(),route_message()
感谢钉钉团队的辛勤工作!期待这些问题能够得到重视和修复。
Metadata
Metadata
Assignees
Labels
No labels