Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update: add graceful shutdown to session mode #831

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 43 additions & 41 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ where
// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
// or issue commands for our sharding and server selection protocol.
let mut shutdown_clone = self.shutdown.resubscribe();
loop {
trace!(
"Client idle, waiting for message, transaction mode: {}",
Expand Down Expand Up @@ -1155,39 +1156,38 @@ where

// This is not an initial message so discard the initial_parsed_ast
initial_parsed_ast.take();
tokio::select! {
result = tokio::time::timeout(idle_client_timeout_duration, read_message(&mut self.read)) => match result {
Ok(Ok(message)) => message,
Ok(Err(err)) => {
// Client disconnected inside a transaction.
// Clean up the server and re-use it.
self.stats.disconnect();
server.checkin_cleanup().await?;

return Err(err);
}
Err(_) => {
// Client idle in transaction timeout
error_response(&mut self.write, "idle transaction timeout").await?;
error!(
"Client idle in transaction timeout: \
{{ \
pool_name: {}, \
username: {}, \
shard: {:?}, \
role: \"{:?}\" \
}}",
self.pool_name,
self.username,
query_router.shard(),
query_router.role()
);

match tokio::time::timeout(
idle_client_timeout_duration,
read_message(&mut self.read),
)
.await
{
Ok(Ok(message)) => message,
Ok(Err(err)) => {
// Client disconnected inside a transaction.
// Clean up the server and re-use it.
self.stats.disconnect();
server.checkin_cleanup().await?;

return Err(err);
}
Err(_) => {
// Client idle in transaction timeout
error_response(&mut self.write, "idle transaction timeout").await?;
error!(
"Client idle in transaction timeout: \
{{ \
pool_name: {}, \
username: {}, \
shard: {:?}, \
role: \"{:?}\" \
}}",
self.pool_name,
self.username,
query_router.shard(),
query_router.role()
);

break;
}
},
_ = shutdown_clone.recv() => {
break;
}
}
Expand All @@ -1198,6 +1198,7 @@ where
message
}
};
self.stats.active();

// The message will be forwarded to the server intact. We still would like to
// parse it below to figure out what to do with it.
Expand Down Expand Up @@ -1586,6 +1587,7 @@ where
error!("Unexpected code: {}", code);
}
}
self.stats.idle();
}

// The server is no longer bound to us, we can't cancel it's queries anymore.
Expand Down Expand Up @@ -1652,15 +1654,15 @@ where
query_router.set_shard(current_shard);

error_response(
&mut self.write,
&format!(
"shard {} is not configured {}, staying on shard {:?} (shard numbers start at 0)",
selected_shard,
pool.shards(),
current_shard,
),
)
.await?;
&mut self.write,
&format!(
"shard {} is not configured {}, staying on shard {:?} (shard numbers start at 0)",
selected_shard,
pool.shards(),
current_shard,
),
)
.await?;
} else {
custom_protocol_response_ok(&mut self.write, "SET SHARD")
.await?;
Expand Down