Skip to content

Commit

Permalink
feat(connector): SQL Server sink connection redirect support
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisolszewski committed Jul 29, 2024
1 parent 657f85e commit 442ed8f
Showing 1 changed file with 25 additions and 3 deletions.
28 changes: 25 additions & 3 deletions src/connector/src/sink/sqlserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ use risingwave_common::types::{DataType, Decimal};
use serde_derive::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use simd_json::prelude::ArrayTrait;
use tiberius::error::Error as TiberiusError;
use tiberius::numeric::Numeric;
use tiberius::{AuthMethod, Client, ColumnData, Config, Query};
use tokio::net::TcpStream;
use tokio_util::compat::Compat;
use tokio_util::compat::TokioAsyncWriteCompatExt;
use with_options::WithOptions;

Expand Down Expand Up @@ -504,15 +506,35 @@ impl SqlClient {
config.database(&msconfig.database);
config.trust_cert();

let client = match Self::connect_with_config(&config).await {
Ok(client) => client,
Err(SinkError::SqlServer(e)) if matches!(e.downcast_ref::<TiberiusError>(), Some(TiberiusError::Routing { .. })) => {
if let Some(TiberiusError::Routing { host, port }) = e.downcast_ref::<TiberiusError>() {
let mut redirect_config = config.clone();
redirect_config.host(host);
redirect_config.port(*port);
Self::connect_with_config(&redirect_config).await?
} else {
return Err(SinkError::SqlServer(e));
}
}
Err(e) => return Err(e),
};

Ok(Self { client })
}

async fn connect_with_config(config: &Config) -> Result<Client<Compat<TcpStream>>> {
let tcp = TcpStream::connect(config.get_addr())
.await
.context("failed to connect to sql server")
.map_err(SinkError::SqlServer)?;
tcp.set_nodelay(true)
.context("failed to setting nodelay when connecting to sql server")
.context("failed to set nodelay when connecting to sql server")
.map_err(SinkError::SqlServer)?;
let client = Client::connect(config, tcp.compat_write()).await?;
Ok(Self { client })
Client::connect(config.clone(), tcp.compat_write())
.await
.map_err(|e| SinkError::SqlServer(e.into()))
}
}

Expand Down

0 comments on commit 442ed8f

Please sign in to comment.