Skip to content

偶尔会有收不到回调消息 #65

@wilac-pv

Description

@wilac-pv

[Bug Report] Stream SDK 偶现消息丢失问题

问题描述

使用钉钉 Stream SDK (dingtalk-stream-sdk-python) 时,偶尔会出现收不到回调消息的情况。经过代码审查,发现 SDK 实现中存在多个可能导致消息丢失的潜在问题。

环境信息

  • SDK 版本: dingtalk-stream-sdk-python
  • Python 版本: 3.9
  • 操作系统: 5.15.0-78-generic #85-Ubuntu SMP Fri Jul 7 15:25:09 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux

问题分析

1. 异步任务未被正确管理和追踪

位置: client.py 第 67-68 行

async for raw_message in websocket:
    json_message = json.loads(raw_message)
    asyncio.create_task(self.background_task(json_message))

问题:

  • asyncio.create_task() 创建的任务没有被保存引用
  • 任务执行过程中的异常可能被静默忽略
  • 在高并发场景下,大量任务同时运行可能导致资源耗尽
  • 无法追踪任务执行状态,难以排查消息处理失败的原因

影响: 当 background_taskroute_message 中出现异常时,消息会丢失且没有明确的错误日志。

2. 消息处理异常捕获范围过大

位置: client.py 第 77-80 行

async def background_task(self, json_message):
    try:
        route_result = await self.route_message(json_message)
        if route_result == DingTalkStreamClient.TAG_DISCONNECT:
            await self.websocket.close()
    except Exception as e:
        self.logger.error(f"error processing message: {e}")

问题:

  • 捕获所有异常但仅记录简单的错误信息
  • 缺少消息内容、堆栈跟踪等关键调试信息
  • 异常发生后消息直接丢弃,没有重试机制

影响: 当业务 handler 处理逻辑有问题时,消息会静默丢失,难以排查根因。

3. Keepalive 任务生命周期管理不当

位置: client.py 第 66 行

asyncio.create_task(self.keepalive(websocket))

问题:

  • keepalive 任务没有被取消或等待
  • 当 websocket 连接异常退出时,keepalive 任务可能继续运行
  • 可能导致资源泄漏和意外的 websocket 操作

4. 网络异常重连延迟过长

位置: client.py 第 60 行和 74 行

await asyncio.sleep(10)  # 连接失败时
await asyncio.sleep(10)  # 网络异常时

问题:

  • 10 秒的重连间隔在网络短暂抖动时过长
  • 这段时间内的消息会完全丢失
  • 缺少指数退避或更智能的重连策略

5. JSON 解析异常未单独处理

位置: client.py 第 68 行

json_message = json.loads(raw_message)

问题:

  • JSON 解析失败会导致整个 websocket 连接中断
  • 应该只跳过当前消息并记录错误,而不是影响后续消息

复现步骤

由于是偶发问题,较难稳定复现,但以下场景更容易触发:

  1. 高并发消息推送(如批量发送钉钉消息)
  2. 网络环境不稳定时
  3. Handler 处理逻辑耗时较长或存在异常时
  4. 服务器负载较高时

预期行为

  1. 所有收到的消息都应该被可靠处理
  2. 处理失败的消息应该有明确的错误日志和堆栈信息
  3. 异步任务异常不应该静默丢失
  4. 网络异常恢复后应该快速重连

建议的修复方案

方案 1: 任务追踪和异常处理

async def start(self):
    self.pre_start()
    self._running_tasks = set()  # 追踪运行中的任务
    
    while True:
        keepalive_task = None
        try:
            connection = self.open_connection()
            if not connection:
                self.logger.error('open connection failed')
                await asyncio.sleep(5)
                continue
            
            uri = f'{connection["endpoint"]}?ticket={quote_plus(connection["ticket"])}'
            async with websockets.connect(uri) as websocket:
                self.websocket = websocket
                keepalive_task = asyncio.create_task(self.keepalive(websocket))
                
                async for raw_message in websocket:
                    try:
                        json_message = json.loads(raw_message)
                        task = asyncio.create_task(self.background_task(json_message))
                        self._running_tasks.add(task)
                        task.add_done_callback(lambda t: self._task_done_callback(t))
                    except json.JSONDecodeError as e:
                        self.logger.error(f'Invalid JSON message: {e}, raw={raw_message[:200]}')
                        
        except KeyboardInterrupt:
            break
        except Exception as e:
            self.logger.exception('Connection error: %s', e)
            await asyncio.sleep(3)
        finally:
            if keepalive_task:
                keepalive_task.cancel()
                try:
                    await keepalive_task
                except asyncio.CancelledError:
                    pass

def _task_done_callback(self, task):
    self._running_tasks.discard(task)
    if task.exception():
        self.logger.error(f"Task failed with exception: {task.exception()}", 
                         exc_info=task.exception())

方案 2: 增强错误日志

async def background_task(self, json_message):
    message_id = json_message.get('headers', {}).get('messageId', 'unknown')
    try:
        self.logger.debug(f"Processing message: {message_id}")
        route_result = await self.route_message(json_message)
        if route_result == DingTalkStreamClient.TAG_DISCONNECT:
            await self.websocket.close()
        self.logger.debug(f"Message processed successfully: {message_id}")
    except Exception as e:
        self.logger.error(
            f"Error processing message: {message_id}, "
            f"type: {json_message.get('type')}, "
            f"topic: {json_message.get('headers', {}).get('topic')}, "
            f"error: {e}",
            exc_info=True
        )

方案 3: 添加消息处理监控

建议在 SDK 中添加以下统计指标:

self._metrics = {
    'messages_received': 0,
    'messages_processed': 0,
    'messages_failed': 0,
    'reconnect_count': 0,
}

临时解决方案

在官方修复之前,用户可以通过以下方式缓解问题:

  1. 在业务代码的 handler 中增加 try-except 确保不抛出未捕获异常
  2. 添加应用层的消息去重和重试机制
  3. 监控 SDK 的错误日志,及时发现问题
  4. 降低单个 handler 的处理时间,避免阻塞

相关信息

  • 相关文件: dingtalk_stream/client.py
  • 相关方法: start(), background_task(), route_message()

感谢钉钉团队的辛勤工作!期待这些问题能够得到重视和修复。

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions