Skip to content

Commit

Permalink
Added Flow mechanism for publishing websocket data
Browse files Browse the repository at this point in the history
  • Loading branch information
mrajatttt committed Aug 14, 2024
1 parent af1cce9 commit 4bd09c8
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,30 @@ class ChatViewModel @Inject constructor(
private fun configureChatSession() {
val globalConfig = GlobalConfig(region = chatConfiguration.region)
chatSession.configure(globalConfig)
setupChatHandlers(chatSession)
}

private fun setupChatHandlers(chatSession: ChatSession) {
chatSession.onConnectionEstablished = {
Log.d("ChatViewModel", "Connection established.")
}

chatSession.onMessageReceived = { transcriptItem ->
// Handle received message
Log.d("ChatViewModel", "Received transcript item: $transcriptItem")
}

chatSession.onChatEnded = {
Log.d("ChatViewModel", "Chat ended.")
}

chatSession.onConnectionBroken = {
Log.d("ChatViewModel", "Connection broken.")
}

chatSession.onConnectionReEstablished = {
Log.d("ChatViewModel", "Connection re-established.")
}
}

fun initiateChat() {
Expand Down
40 changes: 40 additions & 0 deletions chat-sdk/src/main/java/com/amazon/connect/chat/sdk/ChatSession.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package com.amazon.connect.chat.sdk

import com.amazon.connect.chat.sdk.model.ChatDetails
import com.amazon.connect.chat.sdk.model.ChatEvent
import com.amazon.connect.chat.sdk.model.GlobalConfig
import com.amazon.connect.chat.sdk.model.TranscriptItem
import com.amazon.connect.chat.sdk.repository.ChatService
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import javax.inject.Inject
import javax.inject.Singleton
Expand All @@ -22,11 +27,46 @@ interface ChatSession {
* @return A Result indicating whether the disconnection was successful.
*/
suspend fun disconnect(): Result<Unit>

var onConnectionEstablished: (() -> Unit)?
var onConnectionReEstablished: (() -> Unit)?
var onConnectionBroken: (() -> Unit)?
var onMessageReceived: ((TranscriptItem) -> Unit)?
var onChatEnded: (() -> Unit)?
}

@Singleton
class ChatSessionImpl @Inject constructor(private val chatService: ChatService) : ChatSession {

override var onConnectionEstablished: (() -> Unit)? = null
override var onConnectionReEstablished: (() -> Unit)? = null
override var onConnectionBroken: (() -> Unit)? = null
override var onMessageReceived: ((TranscriptItem) -> Unit)? = null
override var onChatEnded: (() -> Unit)? = null
private val coroutineScope = CoroutineScope(Dispatchers.Main + Job())

init {
setupEventSubscriptions()
}

private fun setupEventSubscriptions() {
coroutineScope.launch {
chatService.eventPublisher.collect { event ->
when (event) {
ChatEvent.ConnectionEstablished -> onConnectionEstablished?.invoke()
ChatEvent.ConnectionReEstablished -> onConnectionReEstablished?.invoke()
ChatEvent.ChatEnded -> onChatEnded?.invoke()
ChatEvent.ConnectionBroken -> onConnectionBroken?.invoke()
}
}
}
coroutineScope.launch {
chatService.transcriptPublisher.collect { transcriptItem ->
onMessageReceived?.invoke(transcriptItem)
}
}
}

override fun configure(config: GlobalConfig) {
chatService.configure(config)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ object ChatModule {
fun provideChatService(
apiClient: APIClient,
awsClient: AWSClient,
connectionDetailsProvider: ConnectionDetailsProvider
connectionDetailsProvider: ConnectionDetailsProvider,
webSocketManager: WebSocketManager
): ChatService {
return ChatServiceImpl(apiClient, awsClient, connectionDetailsProvider)
return ChatServiceImpl(apiClient, awsClient, connectionDetailsProvider, webSocketManager)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,11 @@ enum class ContentType(val type: String){
return values().find { it.type.equals(type, ignoreCase = true) }
}
}
}
}

enum class ChatEvent {
ConnectionEstablished,
ConnectionReEstablished,
ChatEnded,
ConnectionBroken,
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import androidx.lifecycle.Lifecycle
import androidx.lifecycle.LifecycleEventObserver
import androidx.lifecycle.ProcessLifecycleOwner
import com.amazon.connect.chat.sdk.Config
import com.amazon.connect.chat.sdk.model.ChatEvent
import com.amazon.connect.chat.sdk.model.Event
import com.amazon.connect.chat.sdk.model.Message
import com.amazon.connect.chat.sdk.model.MessageDirection
Expand All @@ -18,6 +19,8 @@ import com.amazon.connect.chat.sdk.model.WebSocketMessageType
import com.amazon.connect.chat.sdk.model.ContentType
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.launch
import okhttp3.OkHttpClient
import okhttp3.Request
Expand All @@ -37,6 +40,17 @@ object EventTypes {
const val deepHeartbeat = "{\"topic\": \"aws/ping\"}"
}

interface WebSocketManagerInterface {
// TODO - Align it with IOS
/*
* var eventPublisher: PassthroughSubject<ChatEvent, Never> { get }
var transcriptPublisher: PassthroughSubject<TranscriptItem, Never> { get }
func connect(wsUrl: URL?, isReconnect: Bool?)
func disconnect()
func formatAndProcessTranscriptItems(_ transcriptItems: [AWSConnectParticipantItem]) -> [TranscriptItem]
* */
}

class WebSocketManager @Inject constructor(
private val context: Context,
var requestNewWsUrl: () -> Unit
Expand All @@ -60,6 +74,19 @@ class WebSocketManager @Inject constructor(
private var isChatActive: Boolean = false
private var isReconnecting: Boolean = false

private val _eventPublisher = MutableSharedFlow<ChatEvent>(
// Keeping these here for future reference if we encounter issue with SharedFlow memory - Will update or remove later
// replay = 1, // Replay the last event to new subscribers
// extraBufferCapacity = 10 // Buffer up to 10 events in case of bursts or slow consumption
)
val eventPublisher: SharedFlow<ChatEvent> get() = _eventPublisher

private val _transcriptPublisher = MutableSharedFlow<TranscriptItem>(
// replay = 1, // Replay the last transcript item to new subscribers
// extraBufferCapacity = 10 // Buffer up to 10 transcript items
)
val transcriptPublisher: SharedFlow<TranscriptItem> get() = _transcriptPublisher

init {
registerObservers()
}
Expand Down Expand Up @@ -108,35 +135,41 @@ class WebSocketManager @Inject constructor(
this.messageCallBack = onMessageReceived
closeWebSocket()

webSocket = client.newWebSocket(request, object : WebSocketListener() {
override fun onOpen(ws: WebSocket, response: Response) {
handleWebSocketOpen()
}
webSocket = client.newWebSocket(request, object : WebSocketListener() {
override fun onOpen(ws: WebSocket, response: Response) {
CoroutineScope(Dispatchers.IO).launch {
handleWebSocketOpen()
}
}

override fun onMessage(ws: WebSocket, text: String) {
Log.i("WebSocket", "Received message: $text")
processJsonContent(text)
}
override fun onMessage(ws: WebSocket, text: String) {
Log.i("WebSocket", "Received message: $text")
CoroutineScope(Dispatchers.IO).launch {
processJsonContent(text)
}
}

override fun onClosing(ws: WebSocket, code: Int, reason: String) {
Log.i("WebSocket", "WebSocket is closing with code: $code, reason: $reason")
}
override fun onClosing(ws: WebSocket, code: Int, reason: String) {
Log.i("WebSocket", "WebSocket is closing with code: $code, reason: $reason")
}

override fun onClosed(ws: WebSocket, code: Int, reason: String) {
handleWebSocketClosed(code, reason)
}
override fun onClosed(ws: WebSocket, code: Int, reason: String) {
handleWebSocketClosed(code, reason)
}

override fun onFailure(ws: WebSocket, t: Throwable, response: Response?) {
handleWebSocketFailure(t, onConnectionFailed)
}
})

override fun onFailure(ws: WebSocket, t: Throwable, response: Response?) {
handleWebSocketFailure(t, onConnectionFailed)
}
})
}

private fun handleWebSocketOpen() {
private suspend fun handleWebSocketOpen() {
sendMessage(EventTypes.subscribe)
startHeartbeats()
isReconnecting = false
isChatActive = true
this._eventPublisher.emit(ChatEvent.ConnectionEstablished)
}

private fun handleWebSocketClosed(code: Int, reason: String) {
Expand All @@ -157,7 +190,7 @@ class WebSocketManager @Inject constructor(
}
}

private fun processJsonContent(text: String) {
private suspend fun processJsonContent(text: String) {
val json = try {
JSONObject(text)
} catch (e: JSONException) {
Expand Down Expand Up @@ -185,14 +218,15 @@ class WebSocketManager @Inject constructor(
}

private fun handleHeartbeat() {
heartbeatManager?.heartbeatReceived()
heartbeatManager.heartbeatReceived()
}

private fun websocketDidReceiveMessage(content: String?) {
private suspend fun websocketDidReceiveMessage(content: String?) {
content?.let {
val transcriptItem = parseTranscriptItemFromJson(it)
if (transcriptItem != null) {
this.messageCallBack(transcriptItem)
this._transcriptPublisher.emit(transcriptItem)
} else {
Log.i("WebSocket", "Received unrecognized or unsupported content.")
}
Expand All @@ -201,7 +235,7 @@ class WebSocketManager @Inject constructor(
}
}

private fun parseTranscriptItemFromJson(jsonString: String): TranscriptItem? {
private suspend fun parseTranscriptItemFromJson(jsonString: String): TranscriptItem? {
val json = try {
JSONObject(jsonString)
} catch (e: JSONException) {
Expand All @@ -228,7 +262,7 @@ class WebSocketManager @Inject constructor(
}
}
}
// WebSocketMessageType.ATTACHMENT -> handleAttachment(jsonObject)
// TODO- WebSocketMessageType.ATTACHMENT -> handleAttachment(jsonObject)
WebSocketMessageType.MESSAGE_METADATA -> handleMetadata(jsonObject)
else -> {
Log.w("WebSocket", "Unknown websocket message type: $type")
Expand Down Expand Up @@ -271,6 +305,10 @@ class WebSocketManager @Inject constructor(
if (isConnectedToNetwork) {
reestablishConnection()
}
val success = this._eventPublisher.tryEmit(ChatEvent.ConnectionBroken)
if (!success) {
println("Emission failed for ${ChatEvent.ConnectionBroken}, no subscribers and no buffer capacity")
}
}

// Message Handling Logic
Expand Down Expand Up @@ -327,9 +365,10 @@ class WebSocketManager @Inject constructor(
return event
}

private fun handleChatEnded(innerJson: JSONObject): TranscriptItem {
private suspend fun handleChatEnded(innerJson: JSONObject): TranscriptItem {
closeWebSocket();
isChatActive = false;
this._eventPublisher.emit(ChatEvent.ChatEnded)
val time = innerJson.getString("AbsoluteTime")
val eventId = innerJson.getString("Id")
val event = Event(
Expand Down
Loading

0 comments on commit 4bd09c8

Please sign in to comment.