Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve reconnection events, add RoomAttemptReconnectEvent. #439

Merged
merged 8 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions example/lib/pages/room.dart
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ class _RoomPageState extends State<RoomPage> {
..on<RoomRecordingStatusChanged>((event) {
context.showRecordingStatusChangedDialog(event.activeRecording);
})
..on<RoomAttemptReconnectEvent>((event) {
print(
'Attempting to reconnect ${event.attempt}/${event.maxAttemptsRetry}, '
'(${event.nextRetryDelaysInMs}ms delay until next attempt)');
})
..on<LocalTrackPublishedEvent>((_) => _sortParticipants())
..on<LocalTrackUnpublishedEvent>((_) => _sortParticipants())
..on<TrackSubscribedEvent>((_) => _sortParticipants())
Expand Down
46 changes: 27 additions & 19 deletions lib/src/core/engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await signalClient.cleanUp();

fullReconnectOnNext = false;
attemptingReconnect = false;

clearPendingReconnect();
}
Expand Down Expand Up @@ -625,8 +626,25 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
reconnectStart = DateTime.now();
}

if (reconnectAttempts! >= _reconnectCount) {
logger.fine('reconnectAttempts exceeded, disconnecting...');
_isClosed = true;
await cleanUp();

events.emit(EngineDisconnectedEvent(
reason: DisconnectReason.reconnectAttemptsExceeded,
));
return;
}

var delay = defaultRetryDelaysInMs[reconnectAttempts!];

events.emit(EngineAttemptReconnectEvent(
attempt: reconnectAttempts! + 1,
maxAttempts: _reconnectCount,
nextRetryDelaysInMs: delay,
));

clearReconnectTimeout();
logger.fine(
'WebSocket reconnecting in $delay ms, retry times $reconnectAttempts');
Expand Down Expand Up @@ -656,19 +674,10 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
fullReconnectOnNext = true;
}

if (reconnectAttempts! >= _reconnectCount) {
logger.fine('reconnectAttempts exceeded, disconnecting...');
events.emit(EngineDisconnectedEvent(
reason: DisconnectReason.connectionClosed,
));
await cleanUp();
return;
}

try {
attemptingReconnect = true;

if (await signalClient.checkInternetConnection() == false) {
if (await signalClient.networkIsAvailable() == false) {
logger.fine('no internet connection, waiting...');
await signalClient.events.waitFor<SignalConnectivityChangedEvent>(
duration: connectOptions.timeouts.connection * 10,
Expand All @@ -688,14 +697,12 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
} catch (e) {
reconnectAttempts = reconnectAttempts! + 1;
bool recoverable = true;
if (e is WebSocketException ||
e is ConnectException ||
e is MediaConnectException) {
if (e is WebSocketException || e is MediaConnectException) {
// cannot resume connection, need to do full reconnect
fullReconnectOnNext = true;
} else if (e is TimeoutException) {
fullReconnectOnNext = false;
} else {
}

if (e is UnexpectedConnectionState) {
recoverable = false;
}

Expand All @@ -704,7 +711,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
} else {
logger.fine('attemptReconnect: disconnecting...');
events.emit(EngineDisconnectedEvent(
reason: DisconnectReason.connectionClosed,
reason: DisconnectReason.disconnected,
));
await cleanUp();
}
Expand Down Expand Up @@ -835,7 +842,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
}

void _setUpEngineListeners() =>
events.on<EngineReconnectingEvent>((event) async {
events.on<SignalReconnectedEvent>((event) async {
// send queued requests if engine re-connected
signalClient.sendQueuedRequests();
});
Expand Down Expand Up @@ -907,14 +914,15 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
})
..on<SignalConnectingEvent>((event) async {
logger.fine('Signal connecting');
events.emit(const EngineConnectingEvent());
})
..on<SignalReconnectingEvent>((event) async {
logger.fine('Signal reconnecting');
events.emit(const EngineReconnectingEvent());
})
..on<SignalDisconnectedEvent>((event) async {
logger.fine('Signal disconnected ${event.reason}');
if (event.reason == DisconnectReason.connectionClosed && !_isClosed) {
if (event.reason == DisconnectReason.disconnected && !_isClosed) {
await handleDisconnect(ClientDisconnectReason.signal);
} else {
events.emit(EngineDisconnectedEvent(
Expand Down
24 changes: 22 additions & 2 deletions lib/src/core/room.dart
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
events.emit(const RoomReconnectedEvent());
// re-send tracks permissions
localParticipant?.sendTrackSubscriptionPermissions();
notifyListeners();
})
..on<EngineFullRestartingEvent>((event) async {
events.emit(const RoomRestartingEvent());
Expand All @@ -379,6 +380,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
events.emit(ParticipantDisconnectedEvent(participant: participant));
await participant.dispose();
}
notifyListeners();
})
..on<EngineRestartedEvent>((event) async {
events.emit(const RoomRestartedEvent());
Expand All @@ -393,14 +395,32 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
}
}
}
notifyListeners();
})
..on<EngineReconnectingEvent>((event) async {
events.emit(const RoomReconnectingEvent());
await _sendSyncState();
notifyListeners();
})
..on<EngineAttemptReconnectEvent>((event) async {
events.emit(RoomAttemptReconnectEvent(
attempt: event.attempt,
maxAttemptsRetry: event.maxAttempts,
nextRetryDelaysInMs: event.nextRetryDelaysInMs,
));
notifyListeners();
})
..on<EngineDisconnectedEvent>((event) async {
await _cleanUp();
events.emit(RoomDisconnectedEvent(reason: event.reason));
if (!engine.fullReconnectOnNext &&
![
DisconnectReason.signalingConnectionFailure,
DisconnectReason.joinFailure,
DisconnectReason.noInternetConnection
].contains(event.reason)) {
await _cleanUp();
events.emit(RoomDisconnectedEvent(reason: event.reason));
notifyListeners();
}
})
..on<EngineActiveSpeakersUpdateEvent>(
(event) => _onEngineActiveSpeakersUpdateEvent(event.speakers))
Expand Down
28 changes: 17 additions & 11 deletions lib/src/core/signal_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
ConnectivityResult? _connectivityResult;
StreamSubscription<ConnectivityResult>? connectivitySubscription;

Future<bool> checkInternetConnection() async {
if (!kIsWeb && !lkPlatformIsTest()) {
Future<bool> networkIsAvailable() async {
// Skip check for web or flutter test
if (kIsWeb || lkPlatformIsTest()) {
return true;
}
_connectivityResult = await Connectivity().checkConnectivity();
Expand Down Expand Up @@ -98,15 +99,19 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
.onConnectivityChanged
.listen((ConnectivityResult result) {
if (_connectivityResult != result) {
_connectivityResult = result;
if (result == ConnectivityResult.none) {
logger.warning('lost internet connection');
logger.warning('lost connectivity');
} else {
logger.info('internet connection restored');
events.emit(SignalConnectivityChangedEvent(
state: result,
));
logger.info(
'Connectivity changed, ${_connectivityResult!.name} => ${result.name}');
}

events.emit(SignalConnectivityChangedEvent(
oldState: _connectivityResult!,
state: result,
));

_connectivityResult = result;
}
});

Expand Down Expand Up @@ -140,14 +145,16 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
// Clean up existing socket
await cleanUp();
// Attempt to connect
_ws = await _wsConnector(
var future = _wsConnector(
rtcUri,
WebSocketEventHandlers(
onData: _onSocketData,
onDispose: _onSocketDispose,
onError: _onSocketError,
),
);
future = future.timeout(connectOptions.timeouts.connection);
_ws = await future;
// Successful connection
_connectionState = ConnectionState.connected;
events.emit(const SignalConnectedEvent());
Expand Down Expand Up @@ -340,8 +347,7 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
return;
}
_connectionState = ConnectionState.disconnected;
events.emit(
SignalDisconnectedEvent(reason: DisconnectReason.connectionClosed));
events.emit(SignalDisconnectedEvent(reason: DisconnectReason.disconnected));
}

void _sendPing() {
Expand Down
15 changes: 15 additions & 0 deletions lib/src/events.dart
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,21 @@ class RoomReconnectingEvent with RoomEvent {
String toString() => '${runtimeType}()';
}

/// report the number of attempts to reconnect to the room.
class RoomAttemptReconnectEvent with RoomEvent {
final int attempt;
final int maxAttemptsRetry;
final int nextRetryDelaysInMs;
const RoomAttemptReconnectEvent({
required this.attempt,
required this.maxAttemptsRetry,
required this.nextRetryDelaysInMs,
});

@override
String toString() => '${runtimeType}()';
}

/// Connection to room is re-established. All existing state is preserved.
/// Emitted by [Room].
class RoomReconnectedEvent with RoomEvent {
Expand Down
5 changes: 5 additions & 0 deletions lib/src/exceptions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,8 @@ class LiveKitE2EEException extends LiveKitException {
@override
String toString() => 'E2EE Exception: [$runtimeType] $message';
}

class UnexpectedConnectionState extends LiveKitException {
UnexpectedConnectionState([String msg = 'Unexpected connection state'])
: super._(msg);
}
19 changes: 19 additions & 0 deletions lib/src/internal/events.dart
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,19 @@ class SignalReconnectResponseEvent with SignalEvent, InternalEvent {

@internal
class SignalConnectivityChangedEvent with SignalEvent, InternalEvent {
final ConnectivityResult oldState;
final ConnectivityResult state;
const SignalConnectivityChangedEvent({
required this.oldState,
required this.state,
});
}

@internal
class EngineConnectingEvent with InternalEvent, EngineEvent {
const EngineConnectingEvent();
}

@internal
class EngineConnectedEvent with InternalEvent, SignalEvent, EngineEvent {
const EngineConnectedEvent();
Expand All @@ -167,6 +174,18 @@ class EngineFullRestartingEvent with InternalEvent, EngineEvent {
const EngineFullRestartingEvent();
}

@internal
class EngineAttemptReconnectEvent with InternalEvent, EngineEvent {
int attempt;
int maxAttempts;
int nextRetryDelaysInMs;
EngineAttemptReconnectEvent({
required this.attempt,
required this.maxAttempts,
required this.nextRetryDelaysInMs,
});
}

@internal
class EngineRestartedEvent with InternalEvent, EngineEvent {
const EngineRestartedEvent();
Expand Down
3 changes: 2 additions & 1 deletion lib/src/types/other.dart
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ enum DisconnectReason {
roomDeleted,
stateMismatch,
joinFailure,
connectionClosed,
disconnected,
signalingConnectionFailure,
noInternetConnection,
reconnectAttemptsExceeded,
}

/// The reason why a track failed to publish.
Expand Down
Loading