diff --git a/src/client.rs b/src/client.rs index 405d72be..6b7e968a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -885,6 +885,7 @@ where // Our custom protocol loop. // We expect the client to either start a transaction with regular queries // or issue commands for our sharding and server selection protocol. + let mut shutdown_clone = self.shutdown.resubscribe(); loop { trace!( "Client idle, waiting for message, transaction mode: {}", @@ -1155,39 +1156,38 @@ where // This is not an initial message so discard the initial_parsed_ast initial_parsed_ast.take(); + tokio::select! { + result = tokio::time::timeout(idle_client_timeout_duration, read_message(&mut self.read)) => match result { + Ok(Ok(message)) => message, + Ok(Err(err)) => { + // Client disconnected inside a transaction. + // Clean up the server and re-use it. + self.stats.disconnect(); + server.checkin_cleanup().await?; + + return Err(err); + } + Err(_) => { + // Client idle in transaction timeout + error_response(&mut self.write, "idle transaction timeout").await?; + error!( + "Client idle in transaction timeout: \ + {{ \ + pool_name: {}, \ + username: {}, \ + shard: {:?}, \ + role: \"{:?}\" \ + }}", + self.pool_name, + self.username, + query_router.shard(), + query_router.role() + ); - match tokio::time::timeout( - idle_client_timeout_duration, - read_message(&mut self.read), - ) - .await - { - Ok(Ok(message)) => message, - Ok(Err(err)) => { - // Client disconnected inside a transaction. - // Clean up the server and re-use it. - self.stats.disconnect(); - server.checkin_cleanup().await?; - - return Err(err); - } - Err(_) => { - // Client idle in transaction timeout - error_response(&mut self.write, "idle transaction timeout").await?; - error!( - "Client idle in transaction timeout: \ - {{ \ - pool_name: {}, \ - username: {}, \ - shard: {:?}, \ - role: \"{:?}\" \ - }}", - self.pool_name, - self.username, - query_router.shard(), - query_router.role() - ); - + break; + } + }, + _ = shutdown_clone.recv() => { break; } } @@ -1198,6 +1198,7 @@ where message } }; + self.stats.active(); // The message will be forwarded to the server intact. We still would like to // parse it below to figure out what to do with it. @@ -1586,6 +1587,7 @@ where error!("Unexpected code: {}", code); } } + self.stats.idle(); } // The server is no longer bound to us, we can't cancel it's queries anymore. @@ -1652,15 +1654,15 @@ where query_router.set_shard(current_shard); error_response( - &mut self.write, - &format!( - "shard {} is not configured {}, staying on shard {:?} (shard numbers start at 0)", - selected_shard, - pool.shards(), - current_shard, - ), - ) - .await?; + &mut self.write, + &format!( + "shard {} is not configured {}, staying on shard {:?} (shard numbers start at 0)", + selected_shard, + pool.shards(), + current_shard, + ), + ) + .await?; } else { custom_protocol_response_ok(&mut self.write, "SET SHARD") .await?;