Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 100 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,101 @@
# CozeWebSocket
Coze 平台 websocket quecpython 接入

## Table of Contents

- [Introduction](#Introduction)
- [Features](#Features)
- [Quick_Start](#Quick_Start)
- [Prerequisites](#Prerequisites)
- [Installation](#Installation)
- [Running_Application](#Running_Application)
- [Contributing](#Contributing)
- [License](#License)
- [Support](#Support)

## Introduction

QuecPython introduces the Coze Platform Websocket Access solution. This solution is based on the WebSocket protocol, offering cross-platform compatibility and supporting most QuecPython modules.

This demo uses an AI development board equipped with the EC800MCNLE module.

## Features

- Supports voice interruption/barge-in.
- Supports keyword-based voice wake-up.
- Uses Python for easy secondary development.

## Quick_Start

### Prerequisites

Before getting started, ensure you have the following prerequisites:

- **Hardware:**
- Contact Quectel to obtain the AI development board and accessories.
- Computer (Windows 7, Windows 10, or Windows 11)
- Speaker
- Any 2-5W speaker will work
- [Purchase link from Quectel Mall](https://www.quecmall.com/goods-detail/2c90800c94028da201948249e9f4012d)
- **Software:**
- Debugging tool [QPYcom](https://images.quectel.com/python/2022/12/QPYcom_V3.6.0.zip)
- QuecPython firmware (beta firmware is available in the `fw` directory of the repository)
- Python text editor (e.g., [VSCode](https://code.visualstudio.com/), [PyCharm](https://www.jetbrains.com/pycharm/download/))

### Installation

1. **Clone the Repository:**

```bash
git clone https://github.com/QuecPython/CozeWebSocket.git
```

2. **Flash the Firmware:**
Follow the [instructions](https://python.quectel.com/doc/Application_guide/zh/dev-tools/QPYcom/qpycom-dw.html#%E4%B8%8B%E8%BD%BD%E5%9B%BA%E4%BB%B6) to flash the firmware onto the development board.

### Running_Application

1. **Hardware Connection:**
This demo uses the Quectel AI development board. Contact Quectel if needed. Connect the hardware as shown below:

<img src="./media/20250425131903.jpg" style="zoom:80%;" />

1. Connect the speaker
2. Connect the antenna
3. Insert the battery

2. Connect to the host computer via Type-C.

3. **Download the Code to the Device:**

- Launch the QPYcom debugging tool.
- Connect the data cable to the computer.
- Press the **PWRKEY** button on the development board to power on the device.
- Follow the [instructions](https://developer.quectel.com/doc/quecpython/Getting_started/en/4G/first_python.html#PC与模组间的文件传输) to import all files from the `code` folder into the module's file system, preserving the directory structure.

4. **Run the Application:**

- Select the `File` tab.
- Choose the `coze_main.py` script.
- Right-click and select `Run` or use the `Run` shortcut button to execute the script.

5. **After keyword wake-up, start a conversation. Refer to the runtime log:**

![](./media/20260202.png)

## Contributing

We welcome contributions to improve this project! Follow these steps to contribute:

1. Fork this repository.
2. Create a new branch (`git checkout -b feature/your-feature`).
3. Commit your changes (`git commit -m 'Add your feature'`).
4. Push to the branch (`git push origin feature/your-feature`).
5. Open a Pull Request.

## License

This project is licensed under the Apache License. See the [LICENSE](https://license/) file for details.

## Support

If you have any questions or need support, refer to the [QuecPython Documentation](https://python.quectel.com/doc) or open an issue in this repository.
103 changes: 103 additions & 0 deletions README_zh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# CozeWebSocket
Coze 平台 websocket quecpython 接入

## 目录

- [介绍](#介绍)
- [功能特性](#功能特性)
- [快速开始](#快速开始)
- [先决条件](#先决条件)
- [安装](#安装)
- [运行应用程序](#运行应用程序)
- [贡献](#贡献)
- [许可证](#许可证)
- [支持](#支持)

## 介绍

QuecPython 推出了Coze 平台 websocket quecpython 接入解决方案。该方案基于 websocket 协议,具有跨平台特性,可以适用于大部分 QuecPython 模组。

本案例采用搭载 EC800MCNLE 模组的 AI 开发板。

## 功能特性

- 支持语音中断/打断。
- 支持关键词语音唤醒。
- 使用 Python 语言,便于二次开发。

## 快速开始

### 先决条件

在开始之前,请确保您具备以下先决条件:

- **硬件:**
- 联系移远官方获取 AI 开发板及配件。
- 电脑(Windows 7、Windows 10 或 Windows 11)
- 喇叭
- 任意 2-5W 功率的喇叭即可
- [移远商城购买链接](https://www.quecmall.com/goods-detail/2c90800c94028da201948249e9f4012d)

- **软件:**
- 调试工具 [QPYcom](https://images.quectel.com/python/2022/12/QPYcom_V3.6.0.zip)
- QuecPython 固件(仓库 fw 目录下有 beta 固件)
- Python 文本编辑器(例如,[VSCode](https://code.visualstudio.com/)、[Pycharm](https://www.jetbrains.com/pycharm/download/))

### 安装

1. **克隆仓库**:

```bash
git clone https://github.com/QuecPython/CozeWebSocket.git
```

2. **烧录固件:**
按照[说明](https://python.quectel.com/doc/Application_guide/zh/dev-tools/QPYcom/qpycom-dw.html#%E4%B8%8B%E8%BD%BD%E5%9B%BA%E4%BB%B6)将固件烧录到开发板上。

### 运行应用程序

1. **连接硬件:**
本案例采用移远 AI 开发板,如有需要请联系官方获取。按照下图进行硬件连接:

<img src="./media/20250425131903.jpg" style="zoom:80%;" />

1. 连接喇叭
2. 连接天线
3. 接入电池

2. **通过 Tpye-C 连接上位机**

3. **将代码下载到设备:**

- 启动 QPYcom 调试工具。
- 将数据线连接到计算机。
- 按下开发板上的 **PWRKEY** 按钮启动设备。
- 按照[说明](https://developer.quectel.com/doc/quecpython/Getting_started/zh/4G/first_python.html#PC与模组间的文件传输)将 `code` 文件夹中的所有文件导入到模块的文件系统中,保留目录结构。

4. **运行应用程序:**

- 选择 `File` 选项卡。
- 选择 `coze_main.py` 脚本。
- 右键单击并选择 `Run` 或使用`运行`快捷按钮执行脚本。

5. **关键词唤醒后,即可对话, 参考运行日志:**

![](./media/20260202.png)

## 贡献

我们欢迎对本项目的改进做出贡献!请按照以下步骤进行贡献:

1. Fork 此仓库。
2. 创建一个新分支(`git checkout -b feature/your-feature`)。
3. 提交您的更改(`git commit -m 'Add your feature'`)。
4. 推送到分支(`git push origin feature/your-feature`)。
5. 打开一个 Pull Request。

## 许可证

本项目使用 Apache 许可证。详细信息请参阅 [LICENSE](LICENSE) 文件。

## 支持

如果您有任何问题或需要支持,请参阅 [QuecPython 文档](https://python.quectel.com/doc) 或在本仓库中打开一个 issue。
Binary file not shown.
Binary file added media/20250425131903.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added media/20260202.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
154 changes: 154 additions & 0 deletions src/coze.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
from usr import uwebsocket
import _thread
from usr import packet #update, append
import ujson
import ubinascii
from usr.media import singleton_media
import utime
from queue import Queue

class cozews():
def __init__(self, url, auth, callback=None):

self.media = singleton_media('pcma', 4)
if self.media is None:
print('media is busy, please stop it first')
return
self.audio_queue = Queue()

self.url = url
self.headers = {"Authorization": "Bearer " + auth}

self.ws_recv_task_id = None
self.ws_audio_uplink_handler_id = None
self.ws_audio_downlink_handler_id = None
self.isactive = False
self.volume = 8
self.callback = callback

if self.callback:
self.event_queue = Queue()
self.ws_callback_event_id = _thread.start_new_thread(self.ws_server_event_handler, ())

def start(self):
if self.media.is_idle() is False:
print('media is busy, please stop it first')
return
self.client = uwebsocket.Client.connect(self.url, self.headers)
msg = ujson.dumps(packet.update)
self.client.send(msg)

# ws recv task
self.ws_recv_task_id = _thread.start_new_thread(self.ws_recv_task, ())

def stop(self):
self.stop_audio_stream()

if self.ws_recv_task_id:
_thread.stop_thread(self.ws_recv_task_id)
self.ws_recv_task_id = None

self.client.close()
self.isactive = False

def ws_audio_uplink_handler(self):
msg = packet.append

while True:
try:
t1 = utime.ticks_ms()
data = b"".join([self.media.pcma_read() for _ in range(5)])
t2 = utime.ticks_ms()
if len(data) > 0:
msg['data']['delta'] = ubinascii.b2a_base64(data).strip()
payload = ujson.dumps(msg)
#print('up {}ms/{}'.format(t2 - t1, len(payload)))
self.client.send(payload)
utime.sleep_ms(1)
except Exception as e:
print("Error in ws_audio_uplink_handler: {}".format(e))

def ws_audio_downlink_handler(self):
while True:
recv_data = self.audio_queue.get()
start,end = ujson.search(recv_data, 'content')
data = ubinascii.a2b_base64(recv_data[start:end])
self.media.pcma_write(data)
utime.sleep_ms(1)

def ws_server_event_handler(self):
while True:
recv_data = self.event_queue.get()
self.callback(self, recv_data)
utime.sleep_ms(1)

def start_audio_stream(self):
self.media.start()
self.media.set_volume(self.volume)

self.ws_audio_uplink_handler_id = _thread.start_new_thread(self.ws_audio_uplink_handler, ())
self.ws_audio_downlink_handler_id = _thread.start_new_thread(self.ws_audio_downlink_handler, ())
self.isactive = True

def stop_audio_stream(self):
if self.ws_audio_uplink_handler_id:
_thread.stop_thread(self.ws_audio_uplink_handler_id)
self.ws_audio_uplink_handler_id = None
if self.ws_audio_downlink_handler_id:
_thread.stop_thread(self.ws_audio_downlink_handler_id)
self.ws_audio_downlink_handler_id = None
self.media.stop()

def ws_recv_task(self):
while True:
try:
recv_data = self.client.recv(4096)
#print('recv_data_{}: {}'.format(len(recv_data), recv_data))
if recv_data is None or len(recv_data) <= 1:
print('illegal data {}'.format(recv_data))
continue
if packet.EventType.CONVERSATION_AUDIO_DELTA in recv_data:
self.audio_queue.put(recv_data)
else:
if self.callback:
self.event_queue.put(recv_data)
except Exception as e:
if "EIO" in str(e):
if self.isactive:
self.stop_audio_stream()
self.client.close()
self.isactive = False
msg = '{"event_type": "client.disconnected"}'
if self.callback:
#self.callback(self, msg)
self.event_queue.put(msg)
break
else:
if recv_data is not None:
print('recv error[{}] |{}|'.format(len(recv_data), recv_data))
print('ws error |{}|'.format(e))
utime.sleep_ms(1)

def active(self):
return self.isactive

def config(self, arg = None, **kwargs):
if arg != None:
if arg == 'volume':
if self.isactive is False:
return self.volume
return self.media.get_volume()

for key, value in kwargs.items():
if key == 'volume':
self.volume = value
if self.isactive is False:
continue
self.media.set_volume(value)

def interrupted(self):
if self.isactive is False:
return
# 打断对话
msg = ujson.dumps(packet.cancel)
self.client.send(msg)
Loading