From 6d9aef5255dcc3b9bf93e3d8a89674aa579b4182 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Wed, 30 Jun 2021 19:39:26 +0200 Subject: [PATCH] connection_keeper: get rid of unused parts Most of the responsibility was moved from ConnectionKeeper to NodeConnectionPool. The former is still used for the control connection, but some of its functionality became unused and was removed: - Ability to set keyspace - Sending updated ShardInfo Moreover, the `many_connections` test was removed because a very similar version of the test was introduced in `connection_pool.rs`. --- scylla/src/transport/connection_keeper.rs | 195 ++-------------------- scylla/src/transport/topology.rs | 12 +- 2 files changed, 12 insertions(+), 195 deletions(-) diff --git a/scylla/src/transport/connection_keeper.rs b/scylla/src/transport/connection_keeper.rs index a8ac239c2e..9fef00bfb5 100644 --- a/scylla/src/transport/connection_keeper.rs +++ b/scylla/src/transport/connection_keeper.rs @@ -1,9 +1,8 @@ /// ConnectionKeeper keeps a Connection to some address and works to keep it open -use crate::routing::ShardInfo; use crate::transport::errors::QueryError; use crate::transport::{ connection, - connection::{Connection, ConnectionConfig, ErrorReceiver, VerifiedKeyspaceName}, + connection::{Connection, ConnectionConfig, ErrorReceiver}, }; use futures::{future::RemoteHandle, FutureExt}; @@ -14,7 +13,6 @@ use std::sync::Arc; /// ConnectionKeeper keeps a Connection to some address and works to keep it open pub struct ConnectionKeeper { conn_state_receiver: tokio::sync::watch::Receiver, - use_keyspace_channel: tokio::sync::mpsc::Sender, _worker_handle: RemoteHandle<()>, } @@ -29,24 +27,8 @@ pub enum ConnectionState { struct ConnectionKeeperWorker { address: SocketAddr, config: ConnectionConfig, - shard_info: Option, - shard_info_sender: Option, conn_state_sender: tokio::sync::watch::Sender, - - // Channel used to receive use keyspace requests - use_keyspace_channel: tokio::sync::mpsc::Receiver, - - // Keyspace send in "USE " when opening each connection - used_keyspace: Option, -} - -pub type ShardInfoSender = Arc>>>; - -#[derive(Debug)] -struct UseKeyspaceRequest { - keyspace_name: VerifiedKeyspaceName, - response_chan: tokio::sync::oneshot::Sender>, } impl ConnectionKeeper { @@ -54,29 +36,15 @@ impl ConnectionKeeper { /// # Arguments /// /// * `address` - IP address to connect to - /// * `compression` - preferred compression method to use - /// * `shard_info` - ShardInfo to use, will connect to shard number `shard_info.shard` - /// * `shard_info_sender` - channel to send new ShardInfo after each connection creation - pub fn new( - address: SocketAddr, - config: ConnectionConfig, - shard_info: Option, - shard_info_sender: Option, - keyspace_name: Option, - ) -> Self { + /// * `config` - connection configuration to use + pub fn new(address: SocketAddr, config: ConnectionConfig) -> Self { let (conn_state_sender, conn_state_receiver) = tokio::sync::watch::channel(ConnectionState::Initializing); - let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(1); - let worker = ConnectionKeeperWorker { address, config, - shard_info, - shard_info_sender, conn_state_sender, - use_keyspace_channel: use_keyspace_receiver, - used_keyspace: keyspace_name, }; let (fut, worker_handle) = worker.work().remote_handle(); @@ -84,7 +52,6 @@ impl ConnectionKeeper { ConnectionKeeper { conn_state_receiver, - use_keyspace_channel: use_keyspace_sender, _worker_handle: worker_handle, } } @@ -125,24 +92,6 @@ impl ConnectionKeeper { _ => unreachable!(), } } - - pub async fn use_keyspace( - &self, - keyspace_name: VerifiedKeyspaceName, - ) -> Result<(), QueryError> { - let (response_sender, response_receiver) = tokio::sync::oneshot::channel(); - - self.use_keyspace_channel - .send(UseKeyspaceRequest { - keyspace_name, - response_chan: response_sender, - }) - .await - .expect("Bug in ConnectionKeeper::use_keyspace sending"); - // Other end of this channel is in the Worker, can't be dropped while we have &self to _worker_handle - - response_receiver.await.unwrap() // ConnectionKeeperWorker always responds - } } enum RunConnectionRes { @@ -188,7 +137,7 @@ impl ConnectionKeeperWorker { // Opens a new connection and waits until some fatal error occurs async fn run_connection(&mut self) -> RunConnectionRes { // Connect to the node - let (connection, mut error_receiver) = match self.open_new_connection().await { + let (connection, error_receiver) = match self.open_new_connection().await { Ok(opened) => opened, Err(e) => return RunConnectionRes::Error(e), }; @@ -203,145 +152,21 @@ impl ConnectionKeeperWorker { return RunConnectionRes::ShouldStop; } - // Notify about new shard info - if let Some(sender) = &self.shard_info_sender { - let new_shard_info: Option = connection.get_shard_info().clone(); - - // Ignore sending error - // If no one wants to get shard_info that's OK - // If lock is poisoned do nothing - if let Ok(sender_locked) = sender.lock() { - let _ = sender_locked.send(new_shard_info); - } - } - - // Use the specified keyspace - if let Some(keyspace_name) = &self.used_keyspace { - let _ = connection.use_keyspace(&keyspace_name).await; - // Ignore the error, used_keyspace could be set a long time ago and then deleted - // user gets all errors from session.use_keyspace() - } - let connection_closed_error = QueryError::IoError(Arc::new(std::io::Error::new( ErrorKind::Other, "Connection closed", ))); - // Wait for events - a use keyspace request or a fatal error - loop { - tokio::select! { - recv_res = self.use_keyspace_channel.recv() => { - match recv_res { - Some(request) => { - self.used_keyspace = Some(request.keyspace_name.clone()); - - // Send USE KEYSPACE request, send result if channel wasn't closed - let res = connection.use_keyspace(&request.keyspace_name).await; - let _ = request.response_chan.send(res); - }, - None => return RunConnectionRes::ShouldStop, // If the channel was dropped we should stop - } - }, - connection_error = &mut error_receiver => { - let error = connection_error.unwrap_or(connection_closed_error); - return RunConnectionRes::Error(error); - } - } - } + // Wait for a fatal error to occur + let connection_error = error_receiver.await; + let error = connection_error.unwrap_or(connection_closed_error); + RunConnectionRes::Error(error) } async fn open_new_connection(&self) -> Result<(Arc, ErrorReceiver), QueryError> { - let (connection, error_receiver) = match &self.shard_info { - Some(info) => self.open_new_connection_to_shard(info).await?, - None => connection::open_connection(self.address, None, self.config.clone()).await?, - }; + let (connection, error_receiver) = + connection::open_connection(self.address, None, self.config.clone()).await?; Ok((Arc::new(connection), error_receiver)) } - - async fn open_new_connection_to_shard( - &self, - shard_info: &ShardInfo, - ) -> Result<(Connection, ErrorReceiver), QueryError> { - // Create iterator over all possible source ports for this shard - let source_port_iter = shard_info - .get_sharder() - .iter_source_ports_for_shard(shard_info.shard.into()); - - for port in source_port_iter { - let connect_result = - connection::open_connection(self.address, Some(port), self.config.clone()).await; - - match connect_result { - Err(err) if err.is_address_unavailable_for_use() => continue, // If we can't use this port, try the next one - result => return result, - } - } - - // Tried all source ports for that shard, give up - Err(QueryError::IoError(Arc::new(std::io::Error::new( - std::io::ErrorKind::AddrInUse, - "Could not find free source port for shard", - )))) - } -} - -#[cfg(test)] -mod tests { - use super::ConnectionKeeper; - use crate::transport::connection::ConnectionConfig; - use std::net::{SocketAddr, ToSocketAddrs}; - - // Open many connections to a node - // Port collision should occur - // If they are not handled this test will most likely fail - #[tokio::test] - async fn many_connections() { - let connections_number = 512; - - let connect_address: SocketAddr = std::env::var("SCYLLA_URI") - .unwrap_or_else(|_| "127.0.0.1:9042".to_string()) - .to_socket_addrs() - .unwrap() - .next() - .unwrap(); - - let connection_config = ConnectionConfig { - compression: None, - tcp_nodelay: true, - #[cfg(feature = "ssl")] - ssl_context: None, - ..Default::default() - }; - - // Get shard info from a single connection, all connections will open to this shard - let conn_keeper = - ConnectionKeeper::new(connect_address, connection_config.clone(), None, None, None); - let shard_info = conn_keeper - .get_connection() - .await - .unwrap() - .get_shard_info() - .clone(); - - // Open the connections - let mut conn_keepers: Vec = Vec::new(); - - for _ in 0..connections_number { - let conn_keeper = ConnectionKeeper::new( - connect_address, - connection_config.clone(), - shard_info.clone(), - None, - None, - ); - - conn_keepers.push(conn_keeper); - } - - // Check that each connection keeper connected succesfully - for conn_keeper in conn_keepers { - conn_keeper.get_connection().await.unwrap(); - } - } } diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index cf4c7f5809..98b72371ef 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -73,13 +73,8 @@ impl TopologyReader { // - send received events via server_event_sender connection_config.event_sender = Some(server_event_sender); - let control_connection = ConnectionKeeper::new( - control_connection_address, - connection_config.clone(), - None, - None, - None, - ); + let control_connection = + ConnectionKeeper::new(control_connection_address, connection_config.clone()); TopologyReader { control_connection_address, @@ -123,9 +118,6 @@ impl TopologyReader { self.control_connection = ConnectionKeeper::new( self.control_connection_address, self.connection_config.clone(), - None, - None, - None, ); result = self.fetch_topology_info().await;