Skip to content

Commit

Permalink
make separate state and data communication
Browse files Browse the repository at this point in the history
  • Loading branch information
avan1235 committed Apr 11, 2023
1 parent 5a08c41 commit 8a81c61
Show file tree
Hide file tree
Showing 18 changed files with 329 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,56 +26,88 @@ private val SNAKE_GAME_HANDLER = GameService(updateDelay = 20) { SnakeGameState.
private val BIRD_GAME_HANDLER = GameService(updateDelay = 20) { BirdGameState.empty() }.let(::GameHandler)

fun Application.gameSockets() = routing {
authJwtWebSocket(SET_GAME_WEBSOCKET("{$SERVER_NAME}"), SET_GAME_HANDLER::handleGame)
authJwtWebSocket(SNAKE_GAME_WEBSOCKET("{$SERVER_NAME}"), SNAKE_GAME_HANDLER::handleGame)
authJwtWebSocket(BIRD_GAME_WEBSOCKET("{$SERVER_NAME}"), BIRD_GAME_HANDLER::handleGame)
authJwtWebSocket(SET_GAME_WEBSOCKET("{$SERVER_NAME}").statePath, SET_GAME_HANDLER::handleState)
authJwtWebSocket(SET_GAME_WEBSOCKET("{$SERVER_NAME}").dataPath, SET_GAME_HANDLER::handleData)

authJwtWebSocket(SNAKE_GAME_WEBSOCKET("{$SERVER_NAME}").statePath, SNAKE_GAME_HANDLER::handleState)
authJwtWebSocket(SNAKE_GAME_WEBSOCKET("{$SERVER_NAME}").dataPath, SNAKE_GAME_HANDLER::handleData)

authJwtWebSocket(BIRD_GAME_WEBSOCKET("{$SERVER_NAME}").statePath, BIRD_GAME_HANDLER::handleState)
authJwtWebSocket(BIRD_GAME_WEBSOCKET("{$SERVER_NAME}").dataPath, BIRD_GAME_HANDLER::handleData)
}

private class GameHandler(
private val service: GameService,
) {
@OptIn(ExperimentalSerializationApi::class)
suspend fun handleGame(
suspend fun handleState(
session: DefaultWebSocketServerSession,
user: Jwt.User,
) {
with(session) {
GameConnection(session, user.username).run {
GameStateConnection(session, user.username).run {
val serverName = call.parameters[SERVER_NAME]?.toServerName() ?: return
val username = user.username
service.addConnection(serverName, username, this)
service.addStateConnection(serverName, username, this)

try {
sendAllUpdatedGameState(serverName, service.state(serverName))

for (frame in incoming) {
frame as? Frame.Binary ?: continue
val clientMessage = GameSerialization.decodeFromByteArray<GameClientMessage>(frame.readBytes())
val bytes = frame.readBytes()
val clientMessage = GameSerialization.decodeFromByteArray<GameStateUpdateClientMessage>(bytes)
updateGameStateWithClientMessage(serverName, username, clientMessage)
}
} catch (e: Exception) {
eprintln(e.localizedMessage)
} finally {
service
.removeConnection(serverName, this)
.removeStateConnection(serverName, this)
?.let { gameState -> sendAllUpdatedGameState(serverName, gameState) }
}
}
}
}

private suspend fun GameConnection.updateGameStateWithClientMessage(
@OptIn(ExperimentalSerializationApi::class)
suspend fun handleData(
session: DefaultWebSocketServerSession,
user: Jwt.User,
) {
with(session) {
GameDataConnection(session, user.username).run {
val serverName = call.parameters[SERVER_NAME]?.toServerName() ?: return
val username = user.username
service.addDataConnection(serverName, username, this)

try {
for (frame in incoming) {
frame as? Frame.Binary ?: continue
val bytes = frame.readBytes()
val clientMessage = GameSerialization.decodeFromByteArray<GameClientMessage>(bytes)
updateGameDataWithClientMessage(serverName, username, clientMessage)
}
} catch (e: Exception) {
eprintln(e.localizedMessage)
} finally {
service.removeDataConnection(serverName, this)
}
}
}
}

private suspend fun GameDataConnection.updateGameDataWithClientMessage(
serverName: ServerName,
username: Username,
msg: GameClientMessage,
): Unit = when (msg) {
is HeartBeatClientMessage -> service.state(serverName)
.let(asGameStateServerMessage(username))
.let { session.sendSerialized(it) }

is GameStateUpdateClientMessage -> service
.updateGameState(serverName, username, msg.update)
?.let { sendGameStateUpdate(serverName, it) }
.let { state ->
service.stateConnections(serverName, username)
.forEach { it.sendStateSerialized(state) }
}

is UserActionClientMessage -> service.updateGameState(
serverName = serverName,
Expand All @@ -84,36 +116,48 @@ private class GameHandler(
action = msg.action
)?.let { gameState ->
val message = UserActionServerMessage(action = msg.action, timestamp = now())
val connections = service.connections(serverName, msg.username)
connections.forEach { it.session.sendSerialized(message) }
val connections = service.dataConnections(serverName, msg.username)
connections.forEach { it.sendDataSerialized(message) }
sendAllUpdatedGameState(serverName, gameState)
}

is SendMessageClientMessage -> sendAllUserMessage(serverName, msg.message)
} ?: Unit

private suspend fun GameStateConnection.updateGameStateWithClientMessage(
serverName: ServerName,
username: Username,
msg: GameStateUpdateClientMessage,
): Unit = service
.updateGameState(serverName, username, msg.update)
?.let { sendGameStateUpdate(serverName, it) }
?: Unit

private suspend fun sendAllUpdatedGameState(
serverName: ServerName,
gameState: GameState,
): Unit = service.connections(serverName).forEach {
): Unit = service.stateConnections(serverName).forEach {
val message = gameState.let(asGameStateServerMessage(it.username))
it.session.sendSerialized(message)
it.sendStateSerialized(message)
}

private suspend fun sendAllUserMessage(
serverName: ServerName,
userMessage: UserMessage,
): Unit = service.connections(serverName).forEach {
): Unit = service.dataConnections(serverName).forEach {
val message = ReceiveMessageServerMessage(userMessage, timestamp = now())
it.session.sendSerialized(message)
it.sendDataSerialized(message)
}

private suspend fun GameConnection.sendGameStateUpdate(
private suspend fun GameStateConnection.sendGameStateUpdate(
serverName: ServerName,
updateResult: GameStateUpdateResult,
): Unit = when (updateResult) {
UnapprovedGameStateUpdate -> UnapprovedGameStateUpdateServerMessage(timestamp = now())
.let { session.sendSerialized(it) }
.let { data ->
service.dataConnections(serverName)
.forEach { it.sendDataSerialized(data) }
}

is UpdatedGameState -> sendAllUpdatedGameState(serverName, updateResult.gameState)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,55 +21,75 @@ class GameService(
private val serverLocks = ComputedConcurrentHashMap<ServerName, Mutex> {
Mutex()
}
private val serverConnections = ComputedConcurrentHashMap<ServerName, MutableSet<GameConnection>> {
private val serverStateConnections = ComputedConcurrentHashMap<ServerName, MutableSet<GameStateConnection>> {
Collections.synchronizedSet(HashSet())
}
private val userServerConnections = ComputedConcurrentHashMap<UserAtServer, MutableSet<GameConnection>> {
private val serverDataConnections = ComputedConcurrentHashMap<ServerName, MutableSet<GameDataConnection>> {
Collections.synchronizedSet(HashSet())
}
private val userServerStateConnections = ComputedConcurrentHashMap<UserAtServer, MutableSet<GameStateConnection>> {
Collections.synchronizedSet(HashSet())
}
private val userServerDataConnections = ComputedConcurrentHashMap<UserAtServer, MutableSet<GameDataConnection>> {
Collections.synchronizedSet(HashSet())
}
private val serverGamesStates = ComputedConcurrentHashMap<ServerName, GameState> {
default()
}
private val serverUpdateJobs = HashMap<ServerName, Job>()

suspend fun addConnection(serverName: ServerName, username: Username, connection: GameConnection): Unit =
suspend fun addStateConnection(serverName: ServerName, username: Username, connection: GameStateConnection): Unit =
lockForGame(serverName) {
val role = if (serverConnections[serverName].isEmpty()) {
resetServerBackgroundUpdate(serverName)
val role = if (serverStateConnections[serverName].isEmpty()) {
safeResetServerBackgroundUpdate(serverName)
UserRole.Admin
} else UserRole.Player
val userAtServer = UserAtServer(serverName, username)
serverConnections[serverName] += connection
userServerConnections[userAtServer] += connection
serverStateConnections[serverName] += connection
userServerStateConnections[userAtServer] += connection
val gameState = serverGamesStates[serverName].addUser(username, role)
updateGameState(serverName, gameState)
}

private fun stopServerBackgroundUpdate(serverName: ServerName) {
suspend fun addDataConnection(serverName: ServerName, username: Username, connection: GameDataConnection): Unit =
lockForGame(serverName) {
val userAtServer = UserAtServer(serverName, username)
serverDataConnections[serverName] += connection
userServerDataConnections[userAtServer] += connection
}

private fun safeStopServerBackgroundUpdate(serverName: ServerName) {
serverUpdateJobs.remove(serverName)?.cancel()
}

private fun resetServerBackgroundUpdate(serverName: ServerName) {
stopServerBackgroundUpdate(serverName)
private fun safeResetServerBackgroundUpdate(serverName: ServerName) {
safeStopServerBackgroundUpdate(serverName)
updateDelay?.let { serverUpdateJobs[serverName] = updateInBackground(it, serverName) }
}

suspend fun removeConnection(serverName: ServerName, connection: GameConnection): GameState? =
suspend fun removeStateConnection(serverName: ServerName, connection: GameStateConnection): GameState? =
lockForGame(serverName) {
val userAtServer = UserAtServer(serverName, connection.username)
userServerConnections[userAtServer] -= connection
serverConnections[serverName] -= connection
userServerStateConnections[userAtServer] -= connection
serverStateConnections[serverName] -= connection

if (serverConnections[serverName].isEmpty()) {
stopServerBackgroundUpdate(serverName)
if (serverStateConnections[serverName].isEmpty()) {
safeStopServerBackgroundUpdate(serverName)
serverGamesStates[serverName] = default()
null
} else if (userServerConnections[userAtServer].isEmpty()) {
} else if (userServerStateConnections[userAtServer].isEmpty()) {
val gameState = serverGamesStates[serverName].removeUser(connection.username)
updateGameState(serverName, gameState)
} else null
}

suspend fun removeDataConnection(serverName: ServerName, connection: GameDataConnection): Unit =
lockForGame(serverName) {
val userAtServer = UserAtServer(serverName, connection.username)
userServerDataConnections[userAtServer] -= connection
serverDataConnections[serverName] -= connection
}

suspend fun updateGameState(
serverName: ServerName,
username: Username,
Expand Down Expand Up @@ -107,10 +127,17 @@ class GameService(

fun state(serverName: ServerName): GameState = serverGamesStates[serverName]

fun connections(serverName: ServerName): Set<GameConnection> = serverConnections[serverName]
fun stateConnections(serverName: ServerName): Set<GameStateConnection> =
serverStateConnections[serverName].toSet()

fun dataConnections(serverName: ServerName): Set<GameDataConnection> =
serverDataConnections[serverName].toSet()

fun stateConnections(serverName: ServerName, username: Username): Set<GameStateConnection> =
userServerStateConnections[UserAtServer(serverName, username)].toSet()

fun connections(serverName: ServerName, username: Username): Set<GameConnection> =
userServerConnections[UserAtServer(serverName, username)]
fun dataConnections(serverName: ServerName, username: Username): Set<GameDataConnection> =
userServerDataConnections[UserAtServer(serverName, username)].toSet()

private suspend inline fun <T> lockForGame(serverName: ServerName, action: () -> T): T =
serverLocks[serverName].withLock(action = action)
Expand All @@ -123,7 +150,7 @@ class GameService(
val updatedGameState = timeUpdateGameState(serverName) ?: return@update
val snapshot = updatedGameState.snapshot()
supervisorScope {
serverConnections[serverName]
stateConnections(serverName)
.map { sendSnapshot(it, snapshot::get) }
.joinAll()
}
Expand All @@ -136,26 +163,38 @@ class GameService(
}
}

class GameConnection(
class GameStateConnection(
val session: WebSocketSession,
val username: Username,
) {
override fun hashCode(): Int = session.hashCode()
override fun equals(other: Any?): Boolean = (other as? GameConnection)?.session == session
override fun equals(other: Any?): Boolean = (other as? GameStateConnection)?.session == session
}

class GameDataConnection(
val session: WebSocketSession,
val username: Username,
) {
override fun hashCode(): Int = session.hashCode()
override fun equals(other: Any?): Boolean = (other as? GameDataConnection)?.session == session
}

fun CoroutineScope.sendSnapshot(
connection: GameConnection,
connection: GameStateConnection,
snapshot: (Username) -> GameSnapshot?,
): Job = launch {
val userSnapshot = snapshot(connection.username) ?: return@launch
val message = GameStateSnapshotServerMessage(userSnapshot, timestamp = now())
connection.session.sendSerialized(message)
connection.sendStateSerialized(message)
}

@OptIn(ExperimentalSerializationApi::class)
suspend inline fun WebSocketSession.sendSerialized(content: GameServerMessage): Unit =
send(Frame.Binary(true, GameSerialization.encodeToByteArray(content)))
suspend inline fun GameStateConnection.sendStateSerialized(content: GameStateSnapshotServerMessage): Unit =
session.send(Frame.Binary(true, GameSerialization.encodeToByteArray(content)))

@OptIn(ExperimentalSerializationApi::class)
suspend inline fun GameDataConnection.sendDataSerialized(content: GameServerMessage): Unit =
session.send(Frame.Binary(true, GameSerialization.encodeToByteArray(content)))

@JvmInline
value class ServerName(val name: String)
Expand Down
Loading

0 comments on commit 8a81c61

Please sign in to comment.