Skip to content

Commit

Permalink
connection_keeper: get rid of unused parts
Browse files Browse the repository at this point in the history
Most of the responsibility was moved from ConnectionKeeper to
NodeConnectionPool. The former is still used for the control connection,
but some of its functionality became unused and was removed:

- Ability to set keyspace
- Sending updated ShardInfo

Moreover, the `many_connections` test was removed because a very similar
version of the test was introduced in `connection_pool.rs`.
  • Loading branch information
piodul committed Oct 20, 2021
1 parent 6e0504d commit a0560cb
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 195 deletions.
195 changes: 10 additions & 185 deletions scylla/src/transport/connection_keeper.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
/// ConnectionKeeper keeps a Connection to some address and works to keep it open
use crate::routing::ShardInfo;
use crate::transport::errors::QueryError;
use crate::transport::{
connection,
connection::{Connection, ConnectionConfig, ErrorReceiver, VerifiedKeyspaceName},
connection::{Connection, ConnectionConfig, ErrorReceiver},
};

use futures::{future::RemoteHandle, FutureExt};
Expand All @@ -14,7 +13,6 @@ use std::sync::Arc;
/// ConnectionKeeper keeps a Connection to some address and works to keep it open
pub struct ConnectionKeeper {
conn_state_receiver: tokio::sync::watch::Receiver<ConnectionState>,
use_keyspace_channel: tokio::sync::mpsc::Sender<UseKeyspaceRequest>,
_worker_handle: RemoteHandle<()>,
}

Expand All @@ -29,62 +27,31 @@ pub enum ConnectionState {
struct ConnectionKeeperWorker {
address: SocketAddr,
config: ConnectionConfig,
shard_info: Option<ShardInfo>,

shard_info_sender: Option<ShardInfoSender>,
conn_state_sender: tokio::sync::watch::Sender<ConnectionState>,

// Channel used to receive use keyspace requests
use_keyspace_channel: tokio::sync::mpsc::Receiver<UseKeyspaceRequest>,

// Keyspace send in "USE <keyspace name>" when opening each connection
used_keyspace: Option<VerifiedKeyspaceName>,
}

pub type ShardInfoSender = Arc<std::sync::Mutex<tokio::sync::watch::Sender<Option<ShardInfo>>>>;

#[derive(Debug)]
struct UseKeyspaceRequest {
keyspace_name: VerifiedKeyspaceName,
response_chan: tokio::sync::oneshot::Sender<Result<(), QueryError>>,
}

impl ConnectionKeeper {
/// Creates new ConnectionKeeper that starts a connection in the background
/// # Arguments
///
/// * `address` - IP address to connect to
/// * `compression` - preferred compression method to use
/// * `shard_info` - ShardInfo to use, will connect to shard number `shard_info.shard`
/// * `shard_info_sender` - channel to send new ShardInfo after each connection creation
pub fn new(
address: SocketAddr,
config: ConnectionConfig,
shard_info: Option<ShardInfo>,
shard_info_sender: Option<ShardInfoSender>,
keyspace_name: Option<VerifiedKeyspaceName>,
) -> Self {
/// * `config` - connection configuration to use
pub fn new(address: SocketAddr, config: ConnectionConfig) -> Self {
let (conn_state_sender, conn_state_receiver) =
tokio::sync::watch::channel(ConnectionState::Initializing);

let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(1);

let worker = ConnectionKeeperWorker {
address,
config,
shard_info,
shard_info_sender,
conn_state_sender,
use_keyspace_channel: use_keyspace_receiver,
used_keyspace: keyspace_name,
};

let (fut, worker_handle) = worker.work().remote_handle();
tokio::spawn(fut);

ConnectionKeeper {
conn_state_receiver,
use_keyspace_channel: use_keyspace_sender,
_worker_handle: worker_handle,
}
}
Expand Down Expand Up @@ -125,24 +92,6 @@ impl ConnectionKeeper {
_ => unreachable!(),
}
}

pub async fn use_keyspace(
&self,
keyspace_name: VerifiedKeyspaceName,
) -> Result<(), QueryError> {
let (response_sender, response_receiver) = tokio::sync::oneshot::channel();

self.use_keyspace_channel
.send(UseKeyspaceRequest {
keyspace_name,
response_chan: response_sender,
})
.await
.expect("Bug in ConnectionKeeper::use_keyspace sending");
// Other end of this channel is in the Worker, can't be dropped while we have &self to _worker_handle

response_receiver.await.unwrap() // ConnectionKeeperWorker always responds
}
}

enum RunConnectionRes {
Expand Down Expand Up @@ -188,7 +137,7 @@ impl ConnectionKeeperWorker {
// Opens a new connection and waits until some fatal error occurs
async fn run_connection(&mut self) -> RunConnectionRes {
// Connect to the node
let (connection, mut error_receiver) = match self.open_new_connection().await {
let (connection, error_receiver) = match self.open_new_connection().await {
Ok(opened) => opened,
Err(e) => return RunConnectionRes::Error(e),
};
Expand All @@ -203,145 +152,21 @@ impl ConnectionKeeperWorker {
return RunConnectionRes::ShouldStop;
}

// Notify about new shard info
if let Some(sender) = &self.shard_info_sender {
let new_shard_info: Option<ShardInfo> = connection.get_shard_info().clone();

// Ignore sending error
// If no one wants to get shard_info that's OK
// If lock is poisoned do nothing
if let Ok(sender_locked) = sender.lock() {
let _ = sender_locked.send(new_shard_info);
}
}

// Use the specified keyspace
if let Some(keyspace_name) = &self.used_keyspace {
let _ = connection.use_keyspace(keyspace_name).await;
// Ignore the error, used_keyspace could be set a long time ago and then deleted
// user gets all errors from session.use_keyspace()
}

let connection_closed_error = QueryError::IoError(Arc::new(std::io::Error::new(
ErrorKind::Other,
"Connection closed",
)));

// Wait for events - a use keyspace request or a fatal error
loop {
tokio::select! {
recv_res = self.use_keyspace_channel.recv() => {
match recv_res {
Some(request) => {
self.used_keyspace = Some(request.keyspace_name.clone());

// Send USE KEYSPACE request, send result if channel wasn't closed
let res = connection.use_keyspace(&request.keyspace_name).await;
let _ = request.response_chan.send(res);
},
None => return RunConnectionRes::ShouldStop, // If the channel was dropped we should stop
}
},
connection_error = &mut error_receiver => {
let error = connection_error.unwrap_or(connection_closed_error);
return RunConnectionRes::Error(error);
}
}
}
// Wait for a fatal error to occur
let connection_error = error_receiver.await;
let error = connection_error.unwrap_or(connection_closed_error);
RunConnectionRes::Error(error)
}

async fn open_new_connection(&self) -> Result<(Arc<Connection>, ErrorReceiver), QueryError> {
let (connection, error_receiver) = match &self.shard_info {
Some(info) => self.open_new_connection_to_shard(info).await?,
None => connection::open_connection(self.address, None, self.config.clone()).await?,
};
let (connection, error_receiver) =
connection::open_connection(self.address, None, self.config.clone()).await?;

Ok((Arc::new(connection), error_receiver))
}

async fn open_new_connection_to_shard(
&self,
shard_info: &ShardInfo,
) -> Result<(Connection, ErrorReceiver), QueryError> {
// Create iterator over all possible source ports for this shard
let source_port_iter = shard_info
.get_sharder()
.iter_source_ports_for_shard(shard_info.shard.into());

for port in source_port_iter {
let connect_result =
connection::open_connection(self.address, Some(port), self.config.clone()).await;

match connect_result {
Err(err) if err.is_address_unavailable_for_use() => continue, // If we can't use this port, try the next one
result => return result,
}
}

// Tried all source ports for that shard, give up
Err(QueryError::IoError(Arc::new(std::io::Error::new(
std::io::ErrorKind::AddrInUse,
"Could not find free source port for shard",
))))
}
}

#[cfg(test)]
mod tests {
use super::ConnectionKeeper;
use crate::transport::connection::ConnectionConfig;
use std::net::{SocketAddr, ToSocketAddrs};

// Open many connections to a node
// Port collision should occur
// If they are not handled this test will most likely fail
#[tokio::test]
async fn many_connections() {
let connections_number = 512;

let connect_address: SocketAddr = std::env::var("SCYLLA_URI")
.unwrap_or_else(|_| "127.0.0.1:9042".to_string())
.to_socket_addrs()
.unwrap()
.next()
.unwrap();

let connection_config = ConnectionConfig {
compression: None,
tcp_nodelay: true,
#[cfg(feature = "ssl")]
ssl_context: None,
..Default::default()
};

// Get shard info from a single connection, all connections will open to this shard
let conn_keeper =
ConnectionKeeper::new(connect_address, connection_config.clone(), None, None, None);
let shard_info = conn_keeper
.get_connection()
.await
.unwrap()
.get_shard_info()
.clone();

// Open the connections
let mut conn_keepers: Vec<ConnectionKeeper> = Vec::new();

for _ in 0..connections_number {
let conn_keeper = ConnectionKeeper::new(
connect_address,
connection_config.clone(),
shard_info.clone(),
None,
None,
);

conn_keepers.push(conn_keeper);
}

// Check that each connection keeper connected succesfully
for conn_keeper in conn_keepers {
conn_keeper.get_connection().await.unwrap();
}
}
}
12 changes: 2 additions & 10 deletions scylla/src/transport/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,8 @@ impl TopologyReader {
// - send received events via server_event_sender
connection_config.event_sender = Some(server_event_sender);

let control_connection = ConnectionKeeper::new(
control_connection_address,
connection_config.clone(),
None,
None,
None,
);
let control_connection =
ConnectionKeeper::new(control_connection_address, connection_config.clone());

TopologyReader {
control_connection_address,
Expand Down Expand Up @@ -125,9 +120,6 @@ impl TopologyReader {
self.control_connection = ConnectionKeeper::new(
self.control_connection_address,
self.connection_config.clone(),
None,
None,
None,
);

result = self.fetch_topology_info().await;
Expand Down

0 comments on commit a0560cb

Please sign in to comment.