diff --git a/.changes/fix-data-stream-closed-state b/.changes/fix-data-stream-closed-state new file mode 100644 index 00000000..9ac8ac38 --- /dev/null +++ b/.changes/fix-data-stream-closed-state @@ -0,0 +1 @@ +patch type="fixed" "Fix crash when writing to closed data stream controllers" diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index 3fbf92a4..0064713d 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -922,7 +922,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { final participant = _remoteParticipants.removeByIdentity(identity); if (participant == null) return false; - validateParticipantHasNoActiveDataStreams(identity); + await validateParticipantHasNoActiveDataStreams(identity); await participant.removeAllPublishedTracks(notify: true); @@ -1396,9 +1396,7 @@ extension DataStreamRoomMethods on Room { ); _byteStreamControllers.remove(chunk.streamId); - } - - if (chunk.content.isNotEmpty) { + } else if (chunk.content.isNotEmpty) { fileBuffer.write(chunk); } } @@ -1415,8 +1413,7 @@ extension DataStreamRoomMethods on Room { logger.warning('encryption type mismatch for text stream ${chunk.streamId}'); _textStreamControllers.remove(chunk.streamId); - } - if (chunk.content.isNotEmpty) { + } else if (chunk.content.isNotEmpty) { textBuffer.write(chunk); } } @@ -1467,7 +1464,7 @@ extension DataStreamRoomMethods on Room { } } - void validateParticipantHasNoActiveDataStreams(String participantIdentity) { + Future validateParticipantHasNoActiveDataStreams(String participantIdentity) async { // Terminate any in flight data stream receives from the given participant final textStreamsBeingSentByDisconnectingParticipant = _textStreamControllers.values .where((controller) => controller.info.sendingParticipantIdentity == participantIdentity) @@ -1484,10 +1481,12 @@ extension DataStreamRoomMethods on Room { ); for (var controller in byteStreamsBeingSentByDisconnectingParticipant) { controller.error(abnormalEndError); + await controller.close(); _byteStreamControllers.remove(controller.info.id); } for (var controller in textStreamsBeingSentByDisconnectingParticipant) { controller.error(abnormalEndError); + await controller.close(); _textStreamControllers.remove(controller.info.id); } } diff --git a/lib/src/types/data_stream.dart b/lib/src/types/data_stream.dart index a5963ff5..4066a2a7 100644 --- a/lib/src/types/data_stream.dart +++ b/lib/src/types/data_stream.dart @@ -159,11 +159,22 @@ class DataStreamController { this.endTime, }); - Future close() => streamController.close(); + bool get isClosed => streamController.isClosed; - void write(T chunk) => streamController.add(chunk); + Future close() { + if (isClosed) return Future.value(); + return streamController.close(); + } + + void write(T chunk) { + if (isClosed) return; + streamController.add(chunk); + } - void error(DataStreamError error) => streamController.addError(error); + void error(DataStreamError error) { + if (isClosed) return; + streamController.addError(error); + } } class ByteStreamInfo extends BaseStreamInfo {