Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/fix-publisher-connection-negotiation
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Fix publisher connection causing redundant renegotiations on lower-end devices"
2 changes: 1 addition & 1 deletion lib/src/constants.dart
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Timeouts {

static const Timeouts defaultTimeouts = Timeouts(
connection: Duration(seconds: 10),
debounce: Duration(milliseconds: 100),
debounce: Duration(milliseconds: 20),
publish: Duration(seconds: 10),
subscribe: Duration(seconds: 10),
peerConnection: Duration(seconds: 10),
Expand Down
52 changes: 40 additions & 12 deletions lib/src/core/engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
final TTLMap<String, int> _reliableReceivedState = TTLMap<String, int>(30000);
bool _isReconnecting = false;

Future<void>? _publisherConnectionFuture;
Completer<void>? _publisherConnectionCompleter;

String? _reliableParticipantKey(lk_models.DataPacket packet) {
if (packet.hasParticipantSid() && packet.participantSid.isNotEmpty) {
Expand Down Expand Up @@ -487,11 +487,12 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
}

Future<void> _publisherEnsureConnected() async {
if ((await publisher?.pc.getConnectionState())?.isConnected() != true) {
final state = await publisher?.pc.getConnectionState();
if (state?.isConnected() != true) {
logger.fine('Publisher is not connected...');

// start negotiation
if (await publisher?.pc.getConnectionState() != rtc.RTCPeerConnectionState.RTCPeerConnectionStateConnecting) {
if (state != rtc.RTCPeerConnectionState.RTCPeerConnectionStateConnecting) {
await negotiate();
}
if (!lkPlatformIsTest()) {
Expand All @@ -505,14 +506,41 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
}

@internal
Future<void> ensurePublisherConnected() async {
_publisherConnectionFuture ??= _publisherEnsureConnected();
try {
await _publisherConnectionFuture;
} catch (_) {
_publisherConnectionFuture = null;
rethrow;
Future<void> ensurePublisherConnected() {
final existing = _publisherConnectionCompleter;
if (existing != null && !existing.isCompleted) {
return existing.future;
}

final completer = Completer<void>();
_publisherConnectionCompleter = completer;

unawaited(
_publisherEnsureConnected().then((_) {
if (!completer.isCompleted) {
completer.complete();
}
}, onError: (Object error, StackTrace stackTrace) {
if (!completer.isCompleted) {
completer.completeError(error, stackTrace);
}
}).whenComplete(() {
if (identical(_publisherConnectionCompleter, completer)) {
_publisherConnectionCompleter = null;
}
}),
);

return completer.future;
}

void _resetPublisherConnection() {
final completer = _publisherConnectionCompleter;
if (completer != null && !completer.isCompleted) {
completer
.completeError(ConnectException('Publisher connection reset', reason: ConnectionErrorReason.InternalError));
}
_publisherConnectionCompleter = null;
}

lk_models.EncryptedPacketPayload? asEncryptablePacket(lk_models.DataPacket packet) {
Expand Down Expand Up @@ -652,7 +680,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
rtc.RTCPeerConnectionState.RTCPeerConnectionStateFailed,
rtc.RTCPeerConnectionState.RTCPeerConnectionStateDisconnected
].contains(state)) {
_publisherConnectionFuture = null;
_resetPublisherConnection();
}
events.emit(EnginePublisherPeerStateUpdatedEvent(
state: state,
Expand Down Expand Up @@ -1130,7 +1158,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await publisher?.dispose();
publisher = null;

_publisherConnectionFuture = null;
_resetPublisherConnection();

await subscriber?.dispose();
subscriber = null;
Expand Down