diff --git a/.changes/fix-publisher-connection-negotiation b/.changes/fix-publisher-connection-negotiation new file mode 100644 index 00000000..b716bec2 --- /dev/null +++ b/.changes/fix-publisher-connection-negotiation @@ -0,0 +1 @@ +patch type="fixed" "Fix publisher connection causing redundant renegotiations on lower-end devices" diff --git a/lib/src/constants.dart b/lib/src/constants.dart index 3f2a2934..196f7518 100644 --- a/lib/src/constants.dart +++ b/lib/src/constants.dart @@ -31,7 +31,7 @@ class Timeouts { static const Timeouts defaultTimeouts = Timeouts( connection: Duration(seconds: 10), - debounce: Duration(milliseconds: 100), + debounce: Duration(milliseconds: 20), publish: Duration(seconds: 10), subscribe: Duration(seconds: 10), peerConnection: Duration(seconds: 10), diff --git a/lib/src/core/engine.dart b/lib/src/core/engine.dart index e667659d..2b1794d6 100644 --- a/lib/src/core/engine.dart +++ b/lib/src/core/engine.dart @@ -166,7 +166,7 @@ class Engine extends Disposable with EventsEmittable { final TTLMap _reliableReceivedState = TTLMap(30000); bool _isReconnecting = false; - Future? _publisherConnectionFuture; + Completer? _publisherConnectionCompleter; String? _reliableParticipantKey(lk_models.DataPacket packet) { if (packet.hasParticipantSid() && packet.participantSid.isNotEmpty) { @@ -487,11 +487,12 @@ class Engine extends Disposable with EventsEmittable { } Future _publisherEnsureConnected() async { - if ((await publisher?.pc.getConnectionState())?.isConnected() != true) { + final state = await publisher?.pc.getConnectionState(); + if (state?.isConnected() != true) { logger.fine('Publisher is not connected...'); // start negotiation - if (await publisher?.pc.getConnectionState() != rtc.RTCPeerConnectionState.RTCPeerConnectionStateConnecting) { + if (state != rtc.RTCPeerConnectionState.RTCPeerConnectionStateConnecting) { await negotiate(); } if (!lkPlatformIsTest()) { @@ -505,14 +506,41 @@ class Engine extends Disposable with EventsEmittable { } @internal - Future ensurePublisherConnected() async { - _publisherConnectionFuture ??= _publisherEnsureConnected(); - try { - await _publisherConnectionFuture; - } catch (_) { - _publisherConnectionFuture = null; - rethrow; + Future ensurePublisherConnected() { + final existing = _publisherConnectionCompleter; + if (existing != null && !existing.isCompleted) { + return existing.future; + } + + final completer = Completer(); + _publisherConnectionCompleter = completer; + + unawaited( + _publisherEnsureConnected().then((_) { + if (!completer.isCompleted) { + completer.complete(); + } + }, onError: (Object error, StackTrace stackTrace) { + if (!completer.isCompleted) { + completer.completeError(error, stackTrace); + } + }).whenComplete(() { + if (identical(_publisherConnectionCompleter, completer)) { + _publisherConnectionCompleter = null; + } + }), + ); + + return completer.future; + } + + void _resetPublisherConnection() { + final completer = _publisherConnectionCompleter; + if (completer != null && !completer.isCompleted) { + completer + .completeError(ConnectException('Publisher connection reset', reason: ConnectionErrorReason.InternalError)); } + _publisherConnectionCompleter = null; } lk_models.EncryptedPacketPayload? asEncryptablePacket(lk_models.DataPacket packet) { @@ -652,7 +680,7 @@ class Engine extends Disposable with EventsEmittable { rtc.RTCPeerConnectionState.RTCPeerConnectionStateFailed, rtc.RTCPeerConnectionState.RTCPeerConnectionStateDisconnected ].contains(state)) { - _publisherConnectionFuture = null; + _resetPublisherConnection(); } events.emit(EnginePublisherPeerStateUpdatedEvent( state: state, @@ -1130,7 +1158,7 @@ class Engine extends Disposable with EventsEmittable { await publisher?.dispose(); publisher = null; - _publisherConnectionFuture = null; + _resetPublisherConnection(); await subscriber?.dispose(); subscriber = null;