Skip to content

Commit

Permalink
fixup! TF-3157 Implement web socket push
Browse files Browse the repository at this point in the history
  • Loading branch information
tddang-linagora committed Sep 25, 2024
1 parent 575edd9 commit dbf30b0
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 44 deletions.
2 changes: 1 addition & 1 deletion lib/features/base/action/ui_action.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ abstract class Action with EquatableMixin {}

abstract class UIAction extends Action {}

abstract class PushNotificationStateChangeAction extends UIAction {
abstract class PushNotificationStateChangeAction extends Action {
final TypeName typeName;
final jmap.State newState;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import 'dart:convert';
import 'package:core/utils/app_logger.dart';
import 'package:jmap_dart_client/jmap/account_id.dart';
import 'package:jmap_dart_client/jmap/core/capability/capability_identifier.dart';
import 'package:jmap_dart_client/jmap/core/capability/web_socket_ticket_capability.dart';
import 'package:jmap_dart_client/jmap/core/capability/websocket_capability.dart';
import 'package:jmap_dart_client/jmap/core/session/session.dart';
import 'package:model/extensions/session_extension.dart';
Expand All @@ -19,9 +18,7 @@ class RemoteWebSocketDatasourceImpl implements WebSocketDatasource {
final WebSocketApi _webSocketApi;
final ExceptionThrower _exceptionThrower;

RemoteWebSocketDatasourceImpl(this._webSocketApi, this._exceptionThrower);

int _webSocketRetryRemained = 3;
const RemoteWebSocketDatasourceImpl(this._webSocketApi, this._exceptionThrower);

@override
Stream getWebSocketChannel(Session session, AccountId accountId) {
Expand All @@ -30,10 +27,14 @@ class RemoteWebSocketDatasourceImpl implements WebSocketDatasource {
.doOnError(_exceptionThrower.throwException);
}

Stream _getWebSocketChannel(Session session, AccountId accountId) async* {
Stream _getWebSocketChannel(
Session session,
AccountId accountId,
[int retryRemained = 3]
) async* {
try {
_verifyWebSocketCapabilities(session, accountId);
final webSocketTicket = await _generateWebSocketTicket(session, accountId);
final webSocketTicket = await _webSocketApi.getWebSocketTicket(session, accountId);
final webSocketUri = _getWebSocketUri(session, accountId);

final webSocketChannel = WebSocketChannel.connect(
Expand All @@ -43,38 +44,21 @@ class RemoteWebSocketDatasourceImpl implements WebSocketDatasource {

yield* webSocketChannel.stream;
} catch (e) {
log('RemoteWebSocketDatasourceImpl::getWebSocketChannel():error: $e');
if (_webSocketRetryRemained > 0) {
_webSocketRetryRemained--;
yield* _getWebSocketChannel(session, accountId);
logError('RemoteWebSocketDatasourceImpl::getWebSocketChannel():error: $e');
if (retryRemained > 0) {
yield* _getWebSocketChannel(session, accountId, retryRemained - 1);
} else {
rethrow;
}
}
}

void _verifyWebSocketCapabilities(Session session, AccountId accountId) {
requireCapability(
session,
accountId,
[
CapabilityIdentifier.jmapWebSocket,
CapabilityIdentifier.jmapWebSocketTicket
]
);
}

Future<String> _generateWebSocketTicket(Session session, AccountId accountId) async {
final webSocketTicketCapability = session.getCapabilityProperties<WebSocketTicketCapability>(
accountId,
CapabilityIdentifier.jmapWebSocketTicket);

final webSocketTicketGenerationUrl = webSocketTicketCapability?.generationEndpoint;
if (webSocketTicketGenerationUrl == null) throw WebSocketTicketUnavailableException();
final webSocketTicket = await _webSocketApi.getWebSocketTicket('$webSocketTicketGenerationUrl');
if (webSocketTicket.value == null) throw WebSocketTicketUnavailableException();

return webSocketTicket.value!;
if (!CapabilityIdentifier.jmapWebSocket.isSupported(session, accountId)
|| !CapabilityIdentifier.jmapWebSocketTicket.isSupported(session, accountId)
) {
throw WebSocketPushNotSupportedException();
}
}

Uri _getWebSocketUri(Session session, AccountId accountId) {
Expand Down
30 changes: 27 additions & 3 deletions lib/features/push_notification/data/network/web_socket_api.dart
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@

import 'package:core/data/network/dio_client.dart';
import 'package:jmap_dart_client/jmap/account_id.dart';
import 'package:jmap_dart_client/jmap/core/capability/capability_identifier.dart';
import 'package:jmap_dart_client/jmap/core/capability/web_socket_ticket_capability.dart';
import 'package:jmap_dart_client/jmap/core/session/session.dart';
import 'package:model/extensions/session_extension.dart';
import 'package:tmail_ui_user/features/push_notification/data/model/web_socket_ticket.dart';
import 'package:tmail_ui_user/features/push_notification/domain/exceptions/web_socket_exceptions.dart';

class WebSocketApi {
final DioClient _dioClient;

WebSocketApi(this._dioClient);

Future<WebSocketTicket> getWebSocketTicket(String url) async {
final response = await _dioClient.post(url);
return WebSocketTicket.fromJson(response);
Future<String> getWebSocketTicket(
Session session,
AccountId accountId
) async {
final webSocketTicketCapability = session.getCapabilityProperties<WebSocketTicketCapability>(
accountId,
CapabilityIdentifier.jmapWebSocketTicket);

final webSocketTicketGenerationUrl = webSocketTicketCapability?.generationEndpoint;
if (webSocketTicketGenerationUrl == null) {
throw WebSocketTicketUnavailableException();
}
final webSocketTicketGenerationResponse = await _dioClient.post(
'$webSocketTicketGenerationUrl');
final webSocketTicket = WebSocketTicket.fromJson(
webSocketTicketGenerationResponse);
if (webSocketTicket.value == null) {
throw WebSocketTicketUnavailableException();
}

return webSocketTicket.value!;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class WebSocketPushStateReceived extends UIState {
WebSocketPushStateReceived(this.stateChange);

@override
List<Object?> get props => [stateChange];
List<Object> get props => [stateChange];
}

class WebSocketConnectionFailed extends FeatureFailure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ConnectWebSocketInteractor {
.getWebSocketChannel(session, accountId)
.map(_toStateChange);
} catch (e) {
log('ConnectWebSocketInteractor::execute: $e');
logError('ConnectWebSocketInteractor::execute: $e');
yield Left(WebSocketConnectionFailed(exception: e));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ abstract class PushBaseController {
newStateStream.listen(
_handleStateStream,
onError: (error, stackTrace) {
logError('PushNotificationBaseController::consumeState():onError:error: $error | stackTrace: $stackTrace');
logError('PushBaseController::consumeState():onError:error: $error | stackTrace: $stackTrace');
}
);
}
Expand All @@ -46,7 +46,7 @@ abstract class PushBaseController {
bool isForeground = true,
Session? session
}) {
log('PushNotificationBaseController::mappingTypeStateToAction():mapTypeState: $mapTypeState');
log('PushBaseController::mappingTypeStateToAction():mapTypeState: $mapTypeState');
final listTypeName = mapTypeState.keys
.map((value) => TypeName(value))
.toList();
Expand All @@ -57,7 +57,7 @@ abstract class PushBaseController {
.whereNotNull()
.toList();

log('PushNotificationBaseController::mappingTypeStateToAction():listEmailActions: $listEmailActions');
log('PushBaseController::mappingTypeStateToAction():listEmailActions: $listEmailActions');

if (listEmailActions.isNotEmpty) {
EmailChangeListener.instance.dispatchActions(listEmailActions);
Expand All @@ -69,7 +69,7 @@ abstract class PushBaseController {
.whereNotNull()
.toList();

log('PushNotificationBaseController::mappingTypeStateToAction():listMailboxActions: $listEmailActions');
log('PushBaseController::mappingTypeStateToAction():listMailboxActions: $listEmailActions');

if (listMailboxActions.isNotEmpty) {
MailboxChangeListener.instance.dispatchActions(listMailboxActions);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import 'dart:async';

import 'package:core/presentation/state/failure.dart';
import 'package:core/presentation/state/success.dart';
import 'package:core/utils/app_logger.dart';
Expand Down Expand Up @@ -40,7 +38,7 @@ class WebSocketController extends PushBaseController {
_connectWebSocket(accountId, session);
}

Future<void> _connectWebSocket(AccountId? accountId, Session? session) async {
void _connectWebSocket(AccountId? accountId, Session? session) {
_connectWebSocketInteractor = getBinding<ConnectWebSocketInteractor>();
if (_connectWebSocketInteractor == null || accountId == null || session == null) {
return;
Expand Down

0 comments on commit dbf30b0

Please sign in to comment.