diff --git a/app/src/main/java/com/amazon/connect/chat/androidchatexample/viewmodel/ChatViewModel.kt b/app/src/main/java/com/amazon/connect/chat/androidchatexample/viewmodel/ChatViewModel.kt index 6a26152..450132b 100644 --- a/app/src/main/java/com/amazon/connect/chat/androidchatexample/viewmodel/ChatViewModel.kt +++ b/app/src/main/java/com/amazon/connect/chat/androidchatexample/viewmodel/ChatViewModel.kt @@ -94,6 +94,30 @@ class ChatViewModel @Inject constructor( private fun configureChatSession() { val globalConfig = GlobalConfig(region = chatConfiguration.region) chatSession.configure(globalConfig) + setupChatHandlers(chatSession) + } + + private fun setupChatHandlers(chatSession: ChatSession) { + chatSession.onConnectionEstablished = { + Log.d("ChatViewModel", "Connection established.") + } + + chatSession.onMessageReceived = { transcriptItem -> + // Handle received message + Log.d("ChatViewModel", "Received transcript item: $transcriptItem") + } + + chatSession.onChatEnded = { + Log.d("ChatViewModel", "Chat ended.") + } + + chatSession.onConnectionBroken = { + Log.d("ChatViewModel", "Connection broken.") + } + + chatSession.onConnectionReEstablished = { + Log.d("ChatViewModel", "Connection re-established.") + } } fun initiateChat() { diff --git a/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/ChatSession.kt b/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/ChatSession.kt index 019991e..f29c176 100644 --- a/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/ChatSession.kt +++ b/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/ChatSession.kt @@ -1,9 +1,14 @@ package com.amazon.connect.chat.sdk import com.amazon.connect.chat.sdk.model.ChatDetails +import com.amazon.connect.chat.sdk.model.ChatEvent import com.amazon.connect.chat.sdk.model.GlobalConfig +import com.amazon.connect.chat.sdk.model.TranscriptItem import com.amazon.connect.chat.sdk.repository.ChatService +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import javax.inject.Inject import javax.inject.Singleton @@ -22,11 +27,46 @@ interface ChatSession { * @return A Result indicating whether the disconnection was successful. */ suspend fun disconnect(): Result + + var onConnectionEstablished: (() -> Unit)? + var onConnectionReEstablished: (() -> Unit)? + var onConnectionBroken: (() -> Unit)? + var onMessageReceived: ((TranscriptItem) -> Unit)? + var onChatEnded: (() -> Unit)? } @Singleton class ChatSessionImpl @Inject constructor(private val chatService: ChatService) : ChatSession { + override var onConnectionEstablished: (() -> Unit)? = null + override var onConnectionReEstablished: (() -> Unit)? = null + override var onConnectionBroken: (() -> Unit)? = null + override var onMessageReceived: ((TranscriptItem) -> Unit)? = null + override var onChatEnded: (() -> Unit)? = null + private val coroutineScope = CoroutineScope(Dispatchers.Main + Job()) + + init { + setupEventSubscriptions() + } + + private fun setupEventSubscriptions() { + coroutineScope.launch { + chatService.eventPublisher.collect { event -> + when (event) { + ChatEvent.ConnectionEstablished -> onConnectionEstablished?.invoke() + ChatEvent.ConnectionReEstablished -> onConnectionReEstablished?.invoke() + ChatEvent.ChatEnded -> onChatEnded?.invoke() + ChatEvent.ConnectionBroken -> onConnectionBroken?.invoke() + } + } + } + coroutineScope.launch { + chatService.transcriptPublisher.collect { transcriptItem -> + onMessageReceived?.invoke(transcriptItem) + } + } + } + override fun configure(config: GlobalConfig) { chatService.configure(config) } diff --git a/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/di/ChatModule.kt b/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/di/ChatModule.kt index ce96cb9..29ce304 100644 --- a/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/di/ChatModule.kt +++ b/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/di/ChatModule.kt @@ -34,9 +34,10 @@ object ChatModule { fun provideChatService( apiClient: APIClient, awsClient: AWSClient, - connectionDetailsProvider: ConnectionDetailsProvider + connectionDetailsProvider: ConnectionDetailsProvider, + webSocketManager: WebSocketManager ): ChatService { - return ChatServiceImpl(apiClient, awsClient, connectionDetailsProvider) + return ChatServiceImpl(apiClient, awsClient, connectionDetailsProvider, webSocketManager) } /** diff --git a/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/model/ContentType.kt b/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/model/ContentType.kt index 5615c6b..f8bb457 100644 --- a/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/model/ContentType.kt +++ b/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/model/ContentType.kt @@ -18,4 +18,11 @@ enum class ContentType(val type: String){ return values().find { it.type.equals(type, ignoreCase = true) } } } -} \ No newline at end of file +} + +enum class ChatEvent { + ConnectionEstablished, + ConnectionReEstablished, + ChatEnded, + ConnectionBroken, +} diff --git a/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/network/WebSocketManager.kt b/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/network/WebSocketManager.kt index d522a2f..e7f8e4e 100644 --- a/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/network/WebSocketManager.kt +++ b/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/network/WebSocketManager.kt @@ -8,6 +8,7 @@ import androidx.lifecycle.Lifecycle import androidx.lifecycle.LifecycleEventObserver import androidx.lifecycle.ProcessLifecycleOwner import com.amazon.connect.chat.sdk.Config +import com.amazon.connect.chat.sdk.model.ChatEvent import com.amazon.connect.chat.sdk.model.Event import com.amazon.connect.chat.sdk.model.Message import com.amazon.connect.chat.sdk.model.MessageDirection @@ -18,6 +19,8 @@ import com.amazon.connect.chat.sdk.model.WebSocketMessageType import com.amazon.connect.chat.sdk.model.ContentType import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.launch import okhttp3.OkHttpClient import okhttp3.Request @@ -37,6 +40,17 @@ object EventTypes { const val deepHeartbeat = "{\"topic\": \"aws/ping\"}" } +interface WebSocketManagerInterface { + // TODO - Align it with IOS + /* + * var eventPublisher: PassthroughSubject { get } + var transcriptPublisher: PassthroughSubject { get } + func connect(wsUrl: URL?, isReconnect: Bool?) + func disconnect() + func formatAndProcessTranscriptItems(_ transcriptItems: [AWSConnectParticipantItem]) -> [TranscriptItem] + * */ +} + class WebSocketManager @Inject constructor( private val context: Context, var requestNewWsUrl: () -> Unit @@ -60,6 +74,19 @@ class WebSocketManager @Inject constructor( private var isChatActive: Boolean = false private var isReconnecting: Boolean = false + private val _eventPublisher = MutableSharedFlow( +// Keeping these here for future reference if we encounter issue with SharedFlow memory - Will update or remove later +// replay = 1, // Replay the last event to new subscribers +// extraBufferCapacity = 10 // Buffer up to 10 events in case of bursts or slow consumption + ) + val eventPublisher: SharedFlow get() = _eventPublisher + + private val _transcriptPublisher = MutableSharedFlow( +// replay = 1, // Replay the last transcript item to new subscribers +// extraBufferCapacity = 10 // Buffer up to 10 transcript items + ) + val transcriptPublisher: SharedFlow get() = _transcriptPublisher + init { registerObservers() } @@ -108,35 +135,41 @@ class WebSocketManager @Inject constructor( this.messageCallBack = onMessageReceived closeWebSocket() - webSocket = client.newWebSocket(request, object : WebSocketListener() { - override fun onOpen(ws: WebSocket, response: Response) { - handleWebSocketOpen() - } + webSocket = client.newWebSocket(request, object : WebSocketListener() { + override fun onOpen(ws: WebSocket, response: Response) { + CoroutineScope(Dispatchers.IO).launch { + handleWebSocketOpen() + } + } - override fun onMessage(ws: WebSocket, text: String) { - Log.i("WebSocket", "Received message: $text") - processJsonContent(text) - } + override fun onMessage(ws: WebSocket, text: String) { + Log.i("WebSocket", "Received message: $text") + CoroutineScope(Dispatchers.IO).launch { + processJsonContent(text) + } + } - override fun onClosing(ws: WebSocket, code: Int, reason: String) { - Log.i("WebSocket", "WebSocket is closing with code: $code, reason: $reason") - } + override fun onClosing(ws: WebSocket, code: Int, reason: String) { + Log.i("WebSocket", "WebSocket is closing with code: $code, reason: $reason") + } - override fun onClosed(ws: WebSocket, code: Int, reason: String) { - handleWebSocketClosed(code, reason) - } + override fun onClosed(ws: WebSocket, code: Int, reason: String) { + handleWebSocketClosed(code, reason) + } + + override fun onFailure(ws: WebSocket, t: Throwable, response: Response?) { + handleWebSocketFailure(t, onConnectionFailed) + } + }) - override fun onFailure(ws: WebSocket, t: Throwable, response: Response?) { - handleWebSocketFailure(t, onConnectionFailed) - } - }) } - private fun handleWebSocketOpen() { + private suspend fun handleWebSocketOpen() { sendMessage(EventTypes.subscribe) startHeartbeats() isReconnecting = false isChatActive = true + this._eventPublisher.emit(ChatEvent.ConnectionEstablished) } private fun handleWebSocketClosed(code: Int, reason: String) { @@ -157,7 +190,7 @@ class WebSocketManager @Inject constructor( } } - private fun processJsonContent(text: String) { + private suspend fun processJsonContent(text: String) { val json = try { JSONObject(text) } catch (e: JSONException) { @@ -185,14 +218,15 @@ class WebSocketManager @Inject constructor( } private fun handleHeartbeat() { - heartbeatManager?.heartbeatReceived() + heartbeatManager.heartbeatReceived() } - private fun websocketDidReceiveMessage(content: String?) { + private suspend fun websocketDidReceiveMessage(content: String?) { content?.let { val transcriptItem = parseTranscriptItemFromJson(it) if (transcriptItem != null) { this.messageCallBack(transcriptItem) + this._transcriptPublisher.emit(transcriptItem) } else { Log.i("WebSocket", "Received unrecognized or unsupported content.") } @@ -201,7 +235,7 @@ class WebSocketManager @Inject constructor( } } - private fun parseTranscriptItemFromJson(jsonString: String): TranscriptItem? { + private suspend fun parseTranscriptItemFromJson(jsonString: String): TranscriptItem? { val json = try { JSONObject(jsonString) } catch (e: JSONException) { @@ -228,7 +262,7 @@ class WebSocketManager @Inject constructor( } } } - // WebSocketMessageType.ATTACHMENT -> handleAttachment(jsonObject) + // TODO- WebSocketMessageType.ATTACHMENT -> handleAttachment(jsonObject) WebSocketMessageType.MESSAGE_METADATA -> handleMetadata(jsonObject) else -> { Log.w("WebSocket", "Unknown websocket message type: $type") @@ -271,6 +305,10 @@ class WebSocketManager @Inject constructor( if (isConnectedToNetwork) { reestablishConnection() } + val success = this._eventPublisher.tryEmit(ChatEvent.ConnectionBroken) + if (!success) { + println("Emission failed for ${ChatEvent.ConnectionBroken}, no subscribers and no buffer capacity") + } } // Message Handling Logic @@ -327,9 +365,10 @@ class WebSocketManager @Inject constructor( return event } - private fun handleChatEnded(innerJson: JSONObject): TranscriptItem { + private suspend fun handleChatEnded(innerJson: JSONObject): TranscriptItem { closeWebSocket(); isChatActive = false; + this._eventPublisher.emit(ChatEvent.ChatEnded) val time = innerJson.getString("AbsoluteTime") val eventId = innerJson.getString("Id") val event = Event( diff --git a/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/repository/ChatService.kt b/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/repository/ChatService.kt index e4d6240..ecf4d15 100644 --- a/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/repository/ChatService.kt +++ b/chat-sdk/src/main/java/com/amazon/connect/chat/sdk/repository/ChatService.kt @@ -2,9 +2,18 @@ package com.amazon.connect.chat.sdk.repository import android.util.Log import com.amazon.connect.chat.sdk.model.ChatDetails +import com.amazon.connect.chat.sdk.model.ChatEvent import com.amazon.connect.chat.sdk.model.GlobalConfig +import com.amazon.connect.chat.sdk.model.TranscriptItem import com.amazon.connect.chat.sdk.network.APIClient import com.amazon.connect.chat.sdk.network.AWSClient +import com.amazon.connect.chat.sdk.network.WebSocketManager +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.launch +import java.net.URL import javax.inject.Inject interface ChatService { @@ -21,22 +30,44 @@ interface ChatService { * @return A Result indicating whether the disconnection was successful. */ suspend fun disconnectChatSession(): Result + + val eventPublisher: SharedFlow + val transcriptPublisher: SharedFlow } class ChatServiceImpl @Inject constructor( private val apiClient: APIClient, private val awsClient: AWSClient, - private val connectionDetailsProvider: ConnectionDetailsProvider) : ChatService { + private val connectionDetailsProvider: ConnectionDetailsProvider, + private val webSocketManager: WebSocketManager +) : ChatService { + + private val coroutineScope = CoroutineScope(Dispatchers.Main) + + private val _eventPublisher= MutableSharedFlow() + override val eventPublisher: SharedFlow get() = _eventPublisher + + private val _transcriptPublisher = MutableSharedFlow() + override val transcriptPublisher: SharedFlow get() = _transcriptPublisher override fun configure(config: GlobalConfig) { awsClient.configure(config) } + init { + setupWebSocket() + } + override suspend fun createChatSession(chatDetails: ChatDetails): Result { return runCatching { connectionDetailsProvider.updateChatDetails(chatDetails) val connectionDetails = awsClient.createParticipantConnection(chatDetails.participantToken).getOrThrow() connectionDetailsProvider.updateConnectionDetails(connectionDetails) + val wsUrl = connectionDetails.websocketUrl?.let { URL(it) } + wsUrl?.let { + // TODO +// setupWebSocket() + } Log.d("ChatServiceImpl", "Participant Connected") true }.onFailure { exception -> @@ -44,6 +75,31 @@ class ChatServiceImpl @Inject constructor( } } + private fun setupWebSocket(){ + coroutineScope.launch { + webSocketManager.eventPublisher.collect{ event -> + when(event) { + ChatEvent.ConnectionEstablished -> { + Log.d("ChatServiceImpl", "Connection Established") + } + ChatEvent.ConnectionReEstablished -> { + Log.d("ChatServiceImpl", "Connection Re-Established") + } + ChatEvent.ChatEnded -> Log.d("ChatServiceImpl", "Chat Ended") + ChatEvent.ConnectionBroken -> Log.d("ChatServiceImpl", "Connection Broken") + } + _eventPublisher.emit(event) + } + } + + coroutineScope.launch { + webSocketManager.transcriptPublisher.collect{ transcriptItem -> + _transcriptPublisher.emit(transcriptItem) + } + } + + } + override suspend fun disconnectChatSession(): Result { return runCatching { val connectionDetails = connectionDetailsProvider.getConnectionDetails()