diff --git a/Cargo.lock b/Cargo.lock index 82d08160842e7..2bf600de9a074 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3871,7 +3871,7 @@ dependencies = [ [[package]] name = "icelake" version = "0.0.10" -source = "git+https://github.com/icelake-io/icelake?rev=1812a39a4701612470f59fbc81741b15b72d936e#1812a39a4701612470f59fbc81741b15b72d936e" +source = "git+https://github.com/icelake-io/icelake?rev=186fde7663545d1d6a5856ce9fbbc541224eadfb#186fde7663545d1d6a5856ce9fbbc541224eadfb" dependencies = [ "anyhow", "apache-avro 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index cb4f705fc88a8..4d2aef35a7a31 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,7 +108,7 @@ criterion = { version = "0.5", features = ["async_futures"] } tonic = { package = "madsim-tonic", version = "0.4.0" } tonic-build = { package = "madsim-tonic-build", version = "0.4.0" } prost = { version = "0.12" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "1812a39a4701612470f59fbc81741b15b72d936e" } +icelake = { git = "https://github.com/icelake-io/icelake", rev = "186fde7663545d1d6a5856ce9fbbc541224eadfb" } arrow-array = "47" arrow-cast = "47" arrow-schema = "47" diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 7d398f5e9d372..a4e31af836191 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -22,8 +22,8 @@ apache-avro = { git = "https://github.com/risingwavelabs/avro", branch = "idx0de "xz", ] } arrow-array = { workspace = true } -arrow-schema = { workspace = true } arrow-json = { workspace = true } +arrow-schema = { workspace = true } async-nats = "0.32" async-trait = "0.1" auto_enums = { version = "0.8", features = ["futures03"] } diff --git a/src/connector/src/aws_utils.rs b/src/connector/src/aws_utils.rs index 785a4396bacba..c62d9b9d6d14d 100644 --- a/src/connector/src/aws_utils.rs +++ b/src/connector/src/aws_utils.rs @@ -24,12 +24,16 @@ use url::Url; use crate::aws_auth::AwsAuthProps; +pub const REGION: &str = "region"; +pub const ACCESS_KEY: &str = "access_key"; +pub const SECRET_ACCESS: &str = "secret_access"; + pub const AWS_DEFAULT_CONFIG: [&str; 7] = [ - "region", + REGION, "arn", "profile", - "access_key", - "secret_access", + ACCESS_KEY, + SECRET_ACCESS, "session_token", "endpoint_url", ]; diff --git a/src/connector/src/source/pulsar/mod.rs b/src/connector/src/source/pulsar/mod.rs index 28bb40caed57d..4e6e7ccdefb23 100644 --- a/src/connector/src/source/pulsar/mod.rs +++ b/src/connector/src/source/pulsar/mod.rs @@ -21,8 +21,8 @@ pub use enumerator::*; use serde::Deserialize; pub use split::*; +use self::source::reader::PulsarSplitReader; use crate::common::PulsarCommon; -use crate::source::pulsar::source::reader::PulsarBrokerReader; use crate::source::SourceProperties; pub const PULSAR_CONNECTOR: &str = "pulsar"; @@ -30,7 +30,7 @@ pub const PULSAR_CONNECTOR: &str = "pulsar"; impl SourceProperties for PulsarProperties { type Split = PulsarSplit; type SplitEnumerator = PulsarSplitEnumerator; - type SplitReader = PulsarBrokerReader; + type SplitReader = PulsarSplitReader; const SOURCE_NAME: &'static str = PULSAR_CONNECTOR; } @@ -45,4 +45,10 @@ pub struct PulsarProperties { #[serde(flatten)] pub common: PulsarCommon, + + #[serde(rename = "iceberg.enabled", default)] + pub iceberg_loader_enabled: Option, + + #[serde(rename = "iceberg.bucket", default)] + pub iceberg_bucket: Option, } diff --git a/src/connector/src/source/pulsar/source/message.rs b/src/connector/src/source/pulsar/source/message.rs index 9c411b0f57e79..8bf0ccdc34516 100644 --- a/src/connector/src/source/pulsar/source/message.rs +++ b/src/connector/src/source/pulsar/source/message.rs @@ -12,13 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::anyhow; -use arrow_array::RecordBatch; use pulsar::consumer::Message; -use crate::error::ConnectorError; -use crate::source::pulsar::topic::Topic; -use crate::source::{SourceMessage, SourceMessages, SourceMeta}; +use crate::source::{SourceMessage, SourceMeta}; impl From>> for SourceMessage { fn from(msg: Message>) -> Self { @@ -39,63 +35,3 @@ impl From>> for SourceMessage { } } } - -/// Meta columns from pulsar's iceberg table -const META_COLUMN_TOPIC: &str = "_topic"; -const META_COLUMN_KEY: &str = "_key"; -const META_COLUMN_LEDGER_ID: &str = "_ledgerId"; -const META_COLUMN_ENTRY_ID: &str = "_entryId"; -const META_COLUMN_BATCH_INDEX: &str = "_batchIndex"; -const META_COLUMN_PARTITION: &str = "_partition"; - -impl TryFrom<(&RecordBatch, &Topic)> for SourceMessages { - type Error = ConnectorError; - - fn try_from(value: (&RecordBatch, &Topic)) -> Result { - let (batch, topic) = value; - - let mut ret = Vec::with_capacity(batch.num_rows()); - let jsons = arrow_json::writer::record_batches_to_json_rows(&[batch]).map_err(|e| { - ConnectorError::Pulsar(anyhow!("Failed to convert record batch to json: {}", e)) - })?; - for json in jsons { - let source_message = SourceMessage { - key: json - .get(META_COLUMN_KEY) - .and_then(|v| v.as_str()) - .map(|v| v.as_bytes().to_vec()), - payload: Some( - serde_json::to_string(&json) - .map_err(|e| { - ConnectorError::Pulsar(anyhow!("Failed to serialize json: {}", e)) - })? - .into_bytes(), - ), - offset: format!( - "{}:{}:{}:{}", - json.get(META_COLUMN_LEDGER_ID) - .and_then(|v| v.as_i64()) - .ok_or_else(|| ConnectorError::Pulsar(anyhow!( - "Ledger id not found in iceberg table" - )))?, - json.get(META_COLUMN_ENTRY_ID) - .and_then(|v| v.as_i64()) - .ok_or_else(|| ConnectorError::Pulsar(anyhow!( - "Entry id not found in iceberg table" - )))?, - json.get(META_COLUMN_PARTITION) - .and_then(|v| v.as_i64()) - .unwrap_or(-1), - json.get(META_COLUMN_BATCH_INDEX) - .and_then(|v| v.as_i64()) - .unwrap_or(-1) - ), - split_id: topic.to_string().into(), - meta: SourceMeta::Empty, - }; - ret.push(source_message); - } - - Ok(SourceMessages(ret)) - } -} diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 6b28c6ad8a0dc..54ede1d4ffb8f 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -12,25 +12,33 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{anyhow, ensure, Result}; +use arrow_array::{Int32Array, Int64Array, RecordBatch}; use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::try_stream; +use icelake::catalog::{load_catalog, CATALOG_NAME, CATALOG_TYPE}; use icelake::io::FileScanStream; -use icelake::Table; +use icelake::{Table, TableIdentifier}; use itertools::Itertools; use pulsar::consumer::InitialPosition; use pulsar::message::proto::MessageIdData; use pulsar::{Consumer, ConsumerBuilder, ConsumerOptions, Pulsar, SubType, TokioExecutor}; +use risingwave_common::array::{DataChunk, StreamChunk}; +use risingwave_common::catalog::ROWID_PREFIX; +use risingwave_common::error::RwError; +use crate::aws_utils::{ACCESS_KEY, REGION, SECRET_ACCESS}; +use crate::error::ConnectorError; use crate::parser::ParserConfig; use crate::source::pulsar::split::PulsarSplit; use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties}; use crate::source::{ into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, SourceContextRef, - SourceMessage, SourceMessages, SplitId, SplitMetaData, SplitReader, + SourceMessage, SplitId, SplitMetaData, SplitReader, StreamChunkWithState, }; pub enum PulsarSplitReader { @@ -38,6 +46,59 @@ pub enum PulsarSplitReader { Iceberg(PulsarIcebergReader), } +#[async_trait] +impl SplitReader for PulsarSplitReader { + type Properties = PulsarProperties; + type Split = PulsarSplit; + + async fn new( + props: PulsarProperties, + splits: Vec, + parser_config: ParserConfig, + source_ctx: SourceContextRef, + _columns: Option>, + ) -> Result { + ensure!(splits.len() == 1, "only support single split"); + let split = splits.into_iter().next().unwrap(); + let topic = split.topic.to_string(); + + tracing::debug!("creating consumer for pulsar split topic {}", topic,); + + if props + .iceberg_loader_enabled + .as_ref() + .map(|s| s.parse::().unwrap()) + .unwrap_or(false) + && matches!(split.start_offset, PulsarEnumeratorOffset::Earliest) + && !topic.starts_with("non-persistent://") + { + tracing::debug!("Creating iceberg reader for pulsar split topic {}", topic); + Ok(Self::Iceberg(PulsarIcebergReader::new( + props, + split, + source_ctx, + parser_config, + ))) + } else { + Ok(Self::Broker( + PulsarBrokerReader::new(props, vec![split], parser_config, source_ctx, None) + .await?, + )) + } + } + + fn into_stream(self) -> BoxSourceWithStateStream { + match self { + Self::Broker(reader) => { + let (parser_config, source_context) = + (reader.parser_config.clone(), reader.source_ctx.clone()); + Box::pin(into_chunk_stream(reader, parser_config, source_context)) + } + Self::Iceberg(reader) => Box::pin(reader.into_stream()), + } + } +} + /// This reader reads from pulsar broker pub struct PulsarBrokerReader { pulsar: Pulsar, @@ -197,19 +258,39 @@ impl CommonSplitReader for PulsarBrokerReader { } } +const META_COLUMN_TOPIC: &str = "__topic"; +const META_COLUMN_KEY: &str = "__key"; +const META_COLUMN_LEDGER_ID: &str = "__ledgerId"; +const META_COLUMN_ENTRY_ID: &str = "__entryId"; +const META_COLUMN_BATCH_INDEX: &str = "__batchIndex"; +const META_COLUMN_PARTITION: &str = "__partition"; + /// Read history data from iceberg table pub struct PulsarIcebergReader { + props: PulsarProperties, split: PulsarSplit, source_ctx: SourceContextRef, + parser_config: ParserConfig, } impl PulsarIcebergReader { - fn new(split: PulsarSplit, source_ctx: SourceContextRef) -> Self { - Self { split, source_ctx } + fn new( + props: PulsarProperties, + split: PulsarSplit, + source_ctx: SourceContextRef, + parser_config: ParserConfig, + ) -> Self { + Self { + props, + split, + source_ctx, + parser_config, + } } async fn scan(&self) -> Result { let table = self.create_iceberg_table().await?; + tracing::debug!("Created iceberg pulsar table."); let max_chunk_size = self.source_ctx.source_ctrl_opts.chunk_size; @@ -223,11 +304,23 @@ impl PulsarIcebergReader { } async fn create_iceberg_table(&self) -> Result { - todo!() + let catalog = load_catalog(&self.build_iceberg_configs()?) + .await + .map_err(|e| ConnectorError::Pulsar(anyhow!("Unable to load iceberg catalog: {e}")))?; + + let table_id = TableIdentifier::new(vec!["testing-iceberg-v2"]) + .map_err(|e| ConnectorError::Pulsar(anyhow!("Unable to parse table name: {e}")))?; + + let table = catalog + .load_table(&table_id) + .await + .map_err(|err| ConnectorError::Pulsar(anyhow!(err)))?; + + Ok(table) } - #[try_stream(ok = Vec, error = anyhow::Error)] - async fn into_data_stream(self) { + #[try_stream(ok = StreamChunkWithState, error = anyhow::Error)] + async fn as_stream_chunk_stream(&self) { #[for_await] for file_scan in self.scan().await? { let file_scan = file_scan?; @@ -235,9 +328,192 @@ impl PulsarIcebergReader { #[for_await] for record_batch in file_scan.scan().await? { let batch = record_batch?; - let msgs = SourceMessages::try_from((&batch, &self.split.topic))?.0; + let msgs = self.convert_record_batch_to_source_with_state(&batch)?; yield msgs; } } } + + #[try_stream(ok = StreamChunkWithState, error = RwError)] + async fn into_stream(self) { + let (props, mut split, parser_config, source_ctx) = ( + self.props.clone(), + self.split.clone(), + self.parser_config.clone(), + self.source_ctx.clone(), + ); + tracing::info!("Starting to read pulsar message from iceberg"); + let mut last_msg_id = None; + + #[for_await] + for msg in self.as_stream_chunk_stream() { + let msg = + msg.inspect_err(|e| tracing::error!("Failed to read message from iceberg: {}", e))?; + last_msg_id = msg + .split_offset_mapping + .as_ref() + .and_then(|m| m.get(self.split.topic.to_string().as_str())) + .cloned(); + } + + tracing::info!("Finished reading pulsar message from iceberg"); + // We finished reading all the data from iceberg table, now we need to start from broker. + if let Some(msg_id) = last_msg_id { + tracing::info!("Last iceberg message id is {}", msg_id); + split.start_offset = PulsarEnumeratorOffset::MessageId(msg_id); + } + + tracing::info!( + "Switching from pulsar iceberg reader to broker reader with offset: {:?}", + split.start_offset + ); + let broker_reader = PulsarSplitReader::Broker( + PulsarBrokerReader::new(props, vec![split], parser_config, source_ctx, None).await?, + ); + + #[for_await] + for msg in broker_reader.into_stream() { + yield msg?; + } + } + + fn build_iceberg_configs(&self) -> Result> { + let mut iceberg_configs = HashMap::new(); + + let bucket = + self.props.iceberg_bucket.as_ref().ok_or_else(|| { + ConnectorError::Pulsar(anyhow!("Iceberg bucket is not configured")) + })?; + + iceberg_configs.insert(CATALOG_TYPE.to_string(), "storage".to_string()); + iceberg_configs.insert(CATALOG_NAME.to_string(), "pulsar".to_string()); + iceberg_configs.insert( + "iceberg.catalog.pulsar.warehouse".to_string(), + format!( + "s3://{}/{}/{}", + bucket, self.split.topic.tenant, self.split.topic.namespace, + ), + ); + + if let Some(s3_configs) = self.props.common.oauth.as_ref().map(|s| &s.s3_credentials) { + if let Some(region) = s3_configs.get(REGION) { + iceberg_configs.insert("iceberg.table.io.region".to_string(), region.to_string()); + } + + if let Some(access_key) = s3_configs.get(ACCESS_KEY) { + iceberg_configs.insert( + "iceberg.table.io.access_key_id".to_string(), + access_key.to_string(), + ); + } + + if let Some(secret_key) = s3_configs.get(SECRET_ACCESS) { + iceberg_configs.insert( + "iceberg.table.io.secret_access_key".to_string(), + secret_key.to_string(), + ); + } + } + + iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket.to_string()); + iceberg_configs.insert( + "iceberg.table.io.root".to_string(), + format!( + "/{}/{}", + self.split.topic.tenant, self.split.topic.namespace + ), + ); + // #TODO + // Support load config file + iceberg_configs.insert( + "iceberg.table.io.disable_config_load".to_string(), + "true".to_string(), + ); + + Ok(iceberg_configs) + } + + // Converts arrow record batch to stream chunk. + fn convert_record_batch_to_source_with_state( + &self, + record_batch: &RecordBatch, + ) -> Result { + let mut offsets = Vec::with_capacity(record_batch.num_rows()); + + let ledger_id_array = record_batch + .column_by_name(META_COLUMN_LEDGER_ID) + .ok_or_else(|| ConnectorError::Pulsar(anyhow!("Ledger id not found in iceberg table")))? + .as_any() + .downcast_ref::() + .ok_or_else(|| { + ConnectorError::Pulsar(anyhow!("Ledger id is not i64 in iceberg table")) + })?; + + let entry_id_array = record_batch + .column_by_name(META_COLUMN_ENTRY_ID) + .ok_or_else(|| ConnectorError::Pulsar(anyhow!("Entry id not found in iceberg table")))? + .as_any() + .downcast_ref::() + .ok_or_else(|| { + ConnectorError::Pulsar(anyhow!("Entry id is not i64 in iceberg table")) + })?; + + let partition_array = record_batch + .column_by_name(META_COLUMN_PARTITION) + .map(|arr| { + arr.as_any().downcast_ref::().ok_or_else(|| { + ConnectorError::Pulsar(anyhow!("Partition is not i64 in iceberg table")) + }) + }) + .transpose()?; + + let batch_index_array = record_batch + .column_by_name(META_COLUMN_BATCH_INDEX) + .map(|arr| { + arr.as_any().downcast_ref::().ok_or_else(|| { + ConnectorError::Pulsar(anyhow!("Batch index is not i64 in iceberg table")) + }) + }) + .transpose()?; + + let field_indices = self + .parser_config + .common + .rw_columns + .iter() + .filter(|col| col.name != ROWID_PREFIX) + .map(|col| { + record_batch + .schema() + .index_of(col.name.as_str()) + .map_err(|e| anyhow!(e)) + }) + .collect::>>()?; + + for row in 0..record_batch.num_rows() { + let offset = format!( + "{}:{}:{}:{}", + ledger_id_array.value(row), + entry_id_array.value(row), + partition_array.map(|arr| arr.value(row)).unwrap_or(-1), + batch_index_array.map(|arr| arr.value(row)).unwrap_or(-1) + ); + + offsets.push(offset); + } + + let data_chunk = DataChunk::try_from(&record_batch.project(&field_indices)?)?; + + let stream_chunk = StreamChunk::from(data_chunk); + + let state = Some(HashMap::from([( + self.split.topic.to_string().into(), + offsets.last().unwrap().clone(), + )])); + + Ok(StreamChunkWithState { + chunk: stream_chunk, + split_offset_mapping: state, + }) + } }