From e3978d3a465a8d35a1f8578446eefe99ca6b19f9 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Wed, 11 Feb 2026 21:52:59 +0900 Subject: [PATCH 1/3] fix --- lib/src/core/room.dart | 9 ++++----- lib/src/types/data_stream.dart | 17 ++++++++++++++--- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index 3fbf92a4e..53b8f1125 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -1396,9 +1396,7 @@ extension DataStreamRoomMethods on Room { ); _byteStreamControllers.remove(chunk.streamId); - } - - if (chunk.content.isNotEmpty) { + } else if (chunk.content.isNotEmpty) { fileBuffer.write(chunk); } } @@ -1415,8 +1413,7 @@ extension DataStreamRoomMethods on Room { logger.warning('encryption type mismatch for text stream ${chunk.streamId}'); _textStreamControllers.remove(chunk.streamId); - } - if (chunk.content.isNotEmpty) { + } else if (chunk.content.isNotEmpty) { textBuffer.write(chunk); } } @@ -1484,10 +1481,12 @@ extension DataStreamRoomMethods on Room { ); for (var controller in byteStreamsBeingSentByDisconnectingParticipant) { controller.error(abnormalEndError); + controller.close(); _byteStreamControllers.remove(controller.info.id); } for (var controller in textStreamsBeingSentByDisconnectingParticipant) { controller.error(abnormalEndError); + controller.close(); _textStreamControllers.remove(controller.info.id); } } diff --git a/lib/src/types/data_stream.dart b/lib/src/types/data_stream.dart index a5963ff54..4066a2a7e 100644 --- a/lib/src/types/data_stream.dart +++ b/lib/src/types/data_stream.dart @@ -159,11 +159,22 @@ class DataStreamController { this.endTime, }); - Future close() => streamController.close(); + bool get isClosed => streamController.isClosed; - void write(T chunk) => streamController.add(chunk); + Future close() { + if (isClosed) return Future.value(); + return streamController.close(); + } + + void write(T chunk) { + if (isClosed) return; + streamController.add(chunk); + } - void error(DataStreamError error) => streamController.addError(error); + void error(DataStreamError error) { + if (isClosed) return; + streamController.addError(error); + } } class ByteStreamInfo extends BaseStreamInfo { From ca1f7e164793bc28ddff43097021c24f73d5414c Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Wed, 11 Feb 2026 21:54:07 +0900 Subject: [PATCH 2/3] changes --- .changes/fix-data-stream-closed-state | 1 + 1 file changed, 1 insertion(+) create mode 100644 .changes/fix-data-stream-closed-state diff --git a/.changes/fix-data-stream-closed-state b/.changes/fix-data-stream-closed-state new file mode 100644 index 000000000..9ac8ac382 --- /dev/null +++ b/.changes/fix-data-stream-closed-state @@ -0,0 +1 @@ +patch type="fixed" "Fix crash when writing to closed data stream controllers" From 9d939bc65a5db90f5381f9249fbad5fe5df8d676 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Wed, 11 Feb 2026 22:02:55 +0900 Subject: [PATCH 3/3] fix async --- lib/src/core/room.dart | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index 53b8f1125..0064713d1 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -922,7 +922,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { final participant = _remoteParticipants.removeByIdentity(identity); if (participant == null) return false; - validateParticipantHasNoActiveDataStreams(identity); + await validateParticipantHasNoActiveDataStreams(identity); await participant.removeAllPublishedTracks(notify: true); @@ -1464,7 +1464,7 @@ extension DataStreamRoomMethods on Room { } } - void validateParticipantHasNoActiveDataStreams(String participantIdentity) { + Future validateParticipantHasNoActiveDataStreams(String participantIdentity) async { // Terminate any in flight data stream receives from the given participant final textStreamsBeingSentByDisconnectingParticipant = _textStreamControllers.values .where((controller) => controller.info.sendingParticipantIdentity == participantIdentity) @@ -1481,12 +1481,12 @@ extension DataStreamRoomMethods on Room { ); for (var controller in byteStreamsBeingSentByDisconnectingParticipant) { controller.error(abnormalEndError); - controller.close(); + await controller.close(); _byteStreamControllers.remove(controller.info.id); } for (var controller in textStreamsBeingSentByDisconnectingParticipant) { controller.error(abnormalEndError); - controller.close(); + await controller.close(); _textStreamControllers.remove(controller.info.id); } }