Skip to content

Commit

Permalink
WIP: modify source properties on meta
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Oct 24, 2023
1 parent 2f95249 commit 90406ba
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 112 deletions.
1 change: 1 addition & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.check.slt'

# kill cluster and the connector node
cargo make kill
echo "cluster killed "

# insert new rows
mysql --host=mysql --port=3306 -u root -p123456 < ./e2e_test/source/cdc/mysql_cdc_insert.sql
Expand Down
103 changes: 0 additions & 103 deletions e2e_test/source/cdc_inline/mysql/mysql_create_drop2.slt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
import org.apache.commons.text.StringSubstitutor;
import org.slf4j.Logger;
Expand Down Expand Up @@ -77,8 +76,6 @@ private static Map<String, String> extractDebeziumProperties(
return dbzProps;
}

private static final AtomicLong serverId = new AtomicLong(20220410);

private final long sourceId;

private final SourceTypeE sourceType;
Expand Down Expand Up @@ -130,8 +127,6 @@ public DbzConnectorConfig(
mysqlProps.setProperty("snapshot.locking.mode", "none");
}

mysqlProps.setProperty("database.server.id", String.valueOf(serverId.getAndAdd(1L)));

dbzProps.putAll(mysqlProps);
} else if (source == SourceTypeE.POSTGRES || source == SourceTypeE.CITUS) {
var postgresProps = initiateDbConfig(POSTGRES_CONFIG_FILE, substitutor);
Expand Down
5 changes: 4 additions & 1 deletion src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,13 @@ macro_rules! impl_cdc_source_type {
$(
$cdc_source_type,
)*
Unspecified,
}

impl From<PbSourceType> for CdcSourceType {
fn from(value: PbSourceType) -> Self {
match value {
PbSourceType::Unspecified => unreachable!(),
PbSourceType::Unspecified => CdcSourceType::Unspecified,
$(
PbSourceType::$cdc_source_type => CdcSourceType::$cdc_source_type,
)*
Expand All @@ -253,8 +254,10 @@ macro_rules! impl_cdc_source_type {
$(
CdcSourceType::$cdc_source_type => PbSourceType::$cdc_source_type,
)*
CdcSourceType::Unspecified => PbSourceType::Unspecified,
}
}
}

}
}
2 changes: 1 addition & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::{

const SPLIT_TYPE_FIELD: &str = "split_type";
const SPLIT_INFO_FIELD: &str = "split_info";
const UPSTREAM_SOURCE_KEY: &str = "connector";
pub const UPSTREAM_SOURCE_KEY: &str = "connector";

pub trait TryFromHashmap: Sized {
fn try_from_hashmap(props: HashMap<String, String>) -> Result<Self>;
Expand Down
11 changes: 11 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ pub trait CdcSourceTypeTrait: Send + Sync + Clone + 'static {

for_all_classified_sources!(impl_cdc_source_type);

impl<'a> From<&'a str> for CdcSourceType {
fn from(name: &'a str) -> Self {
match name {
MYSQL_CDC_CONNECTOR => CdcSourceType::Mysql,
POSTGRES_CDC_CONNECTOR => CdcSourceType::Postgres,
CITUS_CDC_CONNECTOR => CdcSourceType::Citus,
_ => CdcSourceType::Unspecified,
}
}
}

#[derive(Clone, Debug, Default)]
pub struct CdcProperties<T: CdcSourceTypeTrait> {
/// Properties specified in the WITH clause by user
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
parser_config,
source_ctx,
}),
CdcSourceType::Unspecified => {
unreachable!();
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub mod monitor;
pub mod nats;
pub mod nexmark;
pub mod pulsar;
pub use base::*;
pub use base::{UPSTREAM_SOURCE_KEY, *};
pub(crate) use common::*;
pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR;
pub use kafka::KAFKA_CONNECTOR;
Expand Down
37 changes: 37 additions & 0 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use anyhow::anyhow;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::stream_graph_visitor::visit_fragment;
use risingwave_connector::sink::catalog::SinkId;
use risingwave_connector::source::cdc::CdcSourceType;
use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
use risingwave_pb::catalog::connection::private_link_service::{
PbPrivateLinkProvider, PrivateLinkProvider,
};
Expand Down Expand Up @@ -428,6 +430,17 @@ impl DdlService for DdlServiceImpl {
// Generate source id.
let source_id = self.gen_unique_id::<{ IdCategory::Table }>().await?; // TODO: Use source category
fill_table_source(source, source_id, &mut mview, table_id, &mut fragment_graph);

// Modify properties for cdc sources if needed
if let Some(connector) = source.properties.get(UPSTREAM_SOURCE_KEY) {
if matches!(
CdcSourceType::from(connector.as_str()),
CdcSourceType::Mysql
) {
let server_id = self.gen_unique_id_u64::<{ IdCategory::MySqlCdc }>().await?;
fill_cdc_mysql_server_id(server_id, &mut fragment_graph);
}
}
}

let mut stream_job = StreamingJob::Table(source, mview);
Expand Down Expand Up @@ -740,6 +753,11 @@ impl DdlServiceImpl {
Ok(id)
}

async fn gen_unique_id_u64<const C: IdCategoryType>(&self) -> MetaResult<u64> {
let id = self.env.id_gen_manager().generate::<C>().await?;
Ok(id)
}

async fn validate_connection(&self, connection_id: ConnectionId) -> MetaResult<()> {
let connection = self
.catalog_manager
Expand Down Expand Up @@ -798,3 +816,22 @@ fn fill_table_source(
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
}

fn fill_cdc_mysql_server_id(server_id: u64, fragment_graph: &mut PbStreamFragmentGraph) {
for fragment in fragment_graph.fragments.values_mut() {
visit_fragment(fragment, |node_body| {
if let NodeBody::Source(source_node) = node_body {
let props = &mut source_node.source_inner.as_mut().unwrap().properties;

// Modify properties for cdc sources if needed
if props.contains_key("server.id") {
return;
}
props
.entry("server.id".to_string())
.or_insert(server_id.to_string());
tracing::debug!("server.id no set, generate one {}", server_id);
}
});
}
}
6 changes: 6 additions & 0 deletions src/meta/src/manager/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ pub mod IdCategory {
pub const CompactionGroup: IdCategoryType = 15;
pub const Function: IdCategoryType = 16;
pub const Connection: IdCategoryType = 17;
pub const MySqlCdc: IdCategoryType = 18;
}

pub type IdGeneratorManagerRef = Arc<IdGeneratorManager>;
Expand All @@ -159,6 +160,7 @@ pub struct IdGeneratorManager {
parallel_unit: Arc<StoredIdGenerator>,
compaction_group: Arc<StoredIdGenerator>,
connection: Arc<StoredIdGenerator>,
mysql_cdc: Arc<StoredIdGenerator>,
}

impl IdGeneratorManager {
Expand Down Expand Up @@ -215,6 +217,9 @@ impl IdGeneratorManager {
connection: Arc::new(
StoredIdGenerator::new(meta_store.clone(), "connection", None).await,
),
mysql_cdc: Arc::new(
StoredIdGenerator::new(meta_store.clone(), "mysql_cdc", Some(20210401)).await,
),
}
}

Expand All @@ -236,6 +241,7 @@ impl IdGeneratorManager {
IdCategory::HummockCompactionTask => &self.hummock_compaction_task,
IdCategory::CompactionGroup => &self.compaction_group,
IdCategory::Connection => &self.connection,
IdCategory::MySqlCdc => &self.mysql_cdc,
_ => unreachable!(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl ConnectorSource {
let to_reader_splits = splits.into_iter().map(|split| vec![split]);

try_join_all(to_reader_splits.into_iter().map(|splits| {
tracing::debug!("spawning connector split reader for split {:?}", splits);
tracing::debug!(?splits, ?prop, "spawning connector split reader");
let props = prop.clone();
let data_gen_columns = data_gen_columns.clone();
let parser_config = parser_config.clone();
Expand Down

0 comments on commit 90406ba

Please sign in to comment.