Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/fix-data-stream-closed-state
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Fix crash when writing to closed data stream controllers"
13 changes: 6 additions & 7 deletions lib/src/core/room.dart
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
final participant = _remoteParticipants.removeByIdentity(identity);
if (participant == null) return false;

validateParticipantHasNoActiveDataStreams(identity);
await validateParticipantHasNoActiveDataStreams(identity);

await participant.removeAllPublishedTracks(notify: true);

Expand Down Expand Up @@ -1396,9 +1396,7 @@ extension DataStreamRoomMethods on Room {
);

_byteStreamControllers.remove(chunk.streamId);
}

if (chunk.content.isNotEmpty) {
} else if (chunk.content.isNotEmpty) {
fileBuffer.write(chunk);
}
}
Expand All @@ -1415,8 +1413,7 @@ extension DataStreamRoomMethods on Room {

logger.warning('encryption type mismatch for text stream ${chunk.streamId}');
_textStreamControllers.remove(chunk.streamId);
}
if (chunk.content.isNotEmpty) {
} else if (chunk.content.isNotEmpty) {
textBuffer.write(chunk);
}
}
Expand Down Expand Up @@ -1467,7 +1464,7 @@ extension DataStreamRoomMethods on Room {
}
}

void validateParticipantHasNoActiveDataStreams(String participantIdentity) {
Future<void> validateParticipantHasNoActiveDataStreams(String participantIdentity) async {
// Terminate any in flight data stream receives from the given participant
final textStreamsBeingSentByDisconnectingParticipant = _textStreamControllers.values
.where((controller) => controller.info.sendingParticipantIdentity == participantIdentity)
Expand All @@ -1484,10 +1481,12 @@ extension DataStreamRoomMethods on Room {
);
for (var controller in byteStreamsBeingSentByDisconnectingParticipant) {
controller.error(abnormalEndError);
await controller.close();
_byteStreamControllers.remove(controller.info.id);
}
for (var controller in textStreamsBeingSentByDisconnectingParticipant) {
controller.error(abnormalEndError);
await controller.close();
_textStreamControllers.remove(controller.info.id);
}
}
Expand Down
17 changes: 14 additions & 3 deletions lib/src/types/data_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,22 @@ class DataStreamController<T extends DataStream_Chunk> {
this.endTime,
});

Future<void> close() => streamController.close();
bool get isClosed => streamController.isClosed;

void write(T chunk) => streamController.add(chunk);
Future<void> close() {
if (isClosed) return Future.value();
return streamController.close();
}

void write(T chunk) {
if (isClosed) return;
streamController.add(chunk);
}

void error(DataStreamError error) => streamController.addError(error);
void error(DataStreamError error) {
if (isClosed) return;
streamController.addError(error);
}
}

class ByteStreamInfo extends BaseStreamInfo {
Expand Down