From 5d5e4964dcc3db843422a018707b1cb7f8bac35b Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Tue, 10 Oct 2023 18:20:16 +0800 Subject: [PATCH] Add pulsar iceberg table reader --- Cargo.lock | 24 +- Cargo.toml | 3 +- src/connector/Cargo.toml | 1 + src/connector/src/aws_utils.rs | 10 +- src/connector/src/error.rs | 3 + src/connector/src/source/base.rs | 3 + src/connector/src/source/pulsar/mod.rs | 8 +- .../src/source/pulsar/source/reader.rs | 337 +++++++++++++++++- 8 files changed, 379 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 247852213c422..e251c6b8dce14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -346,6 +346,26 @@ dependencies = [ "flatbuffers", ] +[[package]] +name = "arrow-json" +version = "47.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f03d7e3b04dd688ccec354fe449aed56b831679f03e44ee2c1cfc4045067b69c" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "half 2.3.1", + "indexmap 2.0.0", + "lexical-core", + "num", + "serde", + "serde_json", +] + [[package]] name = "arrow-ord" version = "47.0.0" @@ -3850,7 +3870,7 @@ dependencies = [ [[package]] name = "icelake" version = "0.0.10" -source = "git+https://github.com/icelake-io/icelake?rev=16dab0e36ab337e58ee8002d828def2d212fa116#16dab0e36ab337e58ee8002d828def2d212fa116" +source = "git+https://github.com/icelake-io/icelake?rev=186fde7663545d1d6a5856ce9fbbc541224eadfb#186fde7663545d1d6a5856ce9fbbc541224eadfb" dependencies = [ "anyhow", "apache-avro 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3867,6 +3887,7 @@ dependencies = [ "bytes", "chrono", "csv", + "derive_builder", "enum-display", "faster-hex", "futures", @@ -7244,6 +7265,7 @@ dependencies = [ "anyhow", "apache-avro 0.15.0 (git+https://github.com/risingwavelabs/avro?branch=idx0dev/resolved_schema)", "arrow-array", + "arrow-json", "arrow-schema", "async-nats", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 74a097d4eb9d7..080786482dd9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,7 +111,7 @@ criterion = { version = "0.5", features = ["async_futures"] } tonic = { package = "madsim-tonic", version = "0.4.0" } tonic-build = { package = "madsim-tonic-build", version = "0.4.0" } prost = { version = "0.12" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "16dab0e36ab337e58ee8002d828def2d212fa116" } +icelake = { git = "https://github.com/icelake-io/icelake", rev = "186fde7663545d1d6a5856ce9fbbc541224eadfb" } arrow-array = "47" arrow-cast = "47" arrow-schema = "47" @@ -119,6 +119,7 @@ arrow-buffer = "47" arrow-flight = "47" arrow-select = "47" arrow-ord = "47" +arrow-json = "47" tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [ "profiling", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index abb7486de3091..51ab7ad12a5dc 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -22,6 +22,7 @@ apache-avro = { git = "https://github.com/risingwavelabs/avro", branch = "idx0de "xz", ] } arrow-array = { workspace = true } +arrow-json = { workspace = true } arrow-schema = { workspace = true } async-nats = "0.32" async-trait = "0.1" diff --git a/src/connector/src/aws_utils.rs b/src/connector/src/aws_utils.rs index 785a4396bacba..c62d9b9d6d14d 100644 --- a/src/connector/src/aws_utils.rs +++ b/src/connector/src/aws_utils.rs @@ -24,12 +24,16 @@ use url::Url; use crate::aws_auth::AwsAuthProps; +pub const REGION: &str = "region"; +pub const ACCESS_KEY: &str = "access_key"; +pub const SECRET_ACCESS: &str = "secret_access"; + pub const AWS_DEFAULT_CONFIG: [&str; 7] = [ - "region", + REGION, "arn", "profile", - "access_key", - "secret_access", + ACCESS_KEY, + SECRET_ACCESS, "session_token", "endpoint_url", ]; diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index 73cd27d5801e5..2cdfdd99ecfe0 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -35,6 +35,9 @@ pub enum ConnectorError { #[error("MySQL error: {0}")] MySql(#[from] mysql_async::Error), + #[error("Pulsar error: {0}")] + Pulsar(anyhow::Error), + #[error(transparent)] Internal(#[from] anyhow::Error), } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index a0c0aee86592e..04b9d538d961e 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -505,6 +505,9 @@ pub struct SourceMessage { pub meta: SourceMeta, } +#[derive(Debug, Clone)] +pub struct SourceMessages(pub Vec); + #[derive(Debug, Clone)] pub enum SourceMeta { Kafka(KafkaMeta), diff --git a/src/connector/src/source/pulsar/mod.rs b/src/connector/src/source/pulsar/mod.rs index 544d1b7fb3ed3..4e6e7ccdefb23 100644 --- a/src/connector/src/source/pulsar/mod.rs +++ b/src/connector/src/source/pulsar/mod.rs @@ -21,8 +21,8 @@ pub use enumerator::*; use serde::Deserialize; pub use split::*; +use self::source::reader::PulsarSplitReader; use crate::common::PulsarCommon; -use crate::source::pulsar::source::reader::PulsarSplitReader; use crate::source::SourceProperties; pub const PULSAR_CONNECTOR: &str = "pulsar"; @@ -45,4 +45,10 @@ pub struct PulsarProperties { #[serde(flatten)] pub common: PulsarCommon, + + #[serde(rename = "iceberg.enabled", default)] + pub iceberg_loader_enabled: Option, + + #[serde(rename = "iceberg.bucket", default)] + pub iceberg_bucket: Option, } diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 85d85a8d18714..54ede1d4ffb8f 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -12,26 +12,95 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{anyhow, ensure, Result}; +use arrow_array::{Int32Array, Int64Array, RecordBatch}; use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::try_stream; +use icelake::catalog::{load_catalog, CATALOG_NAME, CATALOG_TYPE}; +use icelake::io::FileScanStream; +use icelake::{Table, TableIdentifier}; use itertools::Itertools; use pulsar::consumer::InitialPosition; use pulsar::message::proto::MessageIdData; use pulsar::{Consumer, ConsumerBuilder, ConsumerOptions, Pulsar, SubType, TokioExecutor}; +use risingwave_common::array::{DataChunk, StreamChunk}; +use risingwave_common::catalog::ROWID_PREFIX; +use risingwave_common::error::RwError; +use crate::aws_utils::{ACCESS_KEY, REGION, SECRET_ACCESS}; +use crate::error::ConnectorError; use crate::parser::ParserConfig; use crate::source::pulsar::split::PulsarSplit; use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties}; use crate::source::{ into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, SourceContextRef, - SourceMessage, SplitId, SplitMetaData, SplitReader, + SourceMessage, SplitId, SplitMetaData, SplitReader, StreamChunkWithState, }; -pub struct PulsarSplitReader { +pub enum PulsarSplitReader { + Broker(PulsarBrokerReader), + Iceberg(PulsarIcebergReader), +} + +#[async_trait] +impl SplitReader for PulsarSplitReader { + type Properties = PulsarProperties; + type Split = PulsarSplit; + + async fn new( + props: PulsarProperties, + splits: Vec, + parser_config: ParserConfig, + source_ctx: SourceContextRef, + _columns: Option>, + ) -> Result { + ensure!(splits.len() == 1, "only support single split"); + let split = splits.into_iter().next().unwrap(); + let topic = split.topic.to_string(); + + tracing::debug!("creating consumer for pulsar split topic {}", topic,); + + if props + .iceberg_loader_enabled + .as_ref() + .map(|s| s.parse::().unwrap()) + .unwrap_or(false) + && matches!(split.start_offset, PulsarEnumeratorOffset::Earliest) + && !topic.starts_with("non-persistent://") + { + tracing::debug!("Creating iceberg reader for pulsar split topic {}", topic); + Ok(Self::Iceberg(PulsarIcebergReader::new( + props, + split, + source_ctx, + parser_config, + ))) + } else { + Ok(Self::Broker( + PulsarBrokerReader::new(props, vec![split], parser_config, source_ctx, None) + .await?, + )) + } + } + + fn into_stream(self) -> BoxSourceWithStateStream { + match self { + Self::Broker(reader) => { + let (parser_config, source_context) = + (reader.parser_config.clone(), reader.source_ctx.clone()); + Box::pin(into_chunk_stream(reader, parser_config, source_context)) + } + Self::Iceberg(reader) => Box::pin(reader.into_stream()), + } + } +} + +/// This reader reads from pulsar broker +pub struct PulsarBrokerReader { pulsar: Pulsar, consumer: Consumer, TokioExecutor>, split: PulsarSplit, @@ -84,7 +153,7 @@ fn parse_message_id(id: &str) -> Result { } #[async_trait] -impl SplitReader for PulsarSplitReader { +impl SplitReader for PulsarBrokerReader { type Properties = PulsarProperties; type Split = PulsarSplit; @@ -173,7 +242,7 @@ impl SplitReader for PulsarSplitReader { } } -impl CommonSplitReader for PulsarSplitReader { +impl CommonSplitReader for PulsarBrokerReader { #[try_stream(ok = Vec, error = anyhow::Error)] async fn into_data_stream(self) { let max_chunk_size = self.source_ctx.source_ctrl_opts.chunk_size; @@ -188,3 +257,263 @@ impl CommonSplitReader for PulsarSplitReader { } } } + +const META_COLUMN_TOPIC: &str = "__topic"; +const META_COLUMN_KEY: &str = "__key"; +const META_COLUMN_LEDGER_ID: &str = "__ledgerId"; +const META_COLUMN_ENTRY_ID: &str = "__entryId"; +const META_COLUMN_BATCH_INDEX: &str = "__batchIndex"; +const META_COLUMN_PARTITION: &str = "__partition"; + +/// Read history data from iceberg table +pub struct PulsarIcebergReader { + props: PulsarProperties, + split: PulsarSplit, + source_ctx: SourceContextRef, + parser_config: ParserConfig, +} + +impl PulsarIcebergReader { + fn new( + props: PulsarProperties, + split: PulsarSplit, + source_ctx: SourceContextRef, + parser_config: ParserConfig, + ) -> Self { + Self { + props, + split, + source_ctx, + parser_config, + } + } + + async fn scan(&self) -> Result { + let table = self.create_iceberg_table().await?; + tracing::debug!("Created iceberg pulsar table."); + + let max_chunk_size = self.source_ctx.source_ctrl_opts.chunk_size; + + // TODO: Add partition + Ok(table + .new_scan_builder() + .with_batch_size(max_chunk_size) + .build()? + .scan(&table) + .await?) + } + + async fn create_iceberg_table(&self) -> Result { + let catalog = load_catalog(&self.build_iceberg_configs()?) + .await + .map_err(|e| ConnectorError::Pulsar(anyhow!("Unable to load iceberg catalog: {e}")))?; + + let table_id = TableIdentifier::new(vec!["testing-iceberg-v2"]) + .map_err(|e| ConnectorError::Pulsar(anyhow!("Unable to parse table name: {e}")))?; + + let table = catalog + .load_table(&table_id) + .await + .map_err(|err| ConnectorError::Pulsar(anyhow!(err)))?; + + Ok(table) + } + + #[try_stream(ok = StreamChunkWithState, error = anyhow::Error)] + async fn as_stream_chunk_stream(&self) { + #[for_await] + for file_scan in self.scan().await? { + let file_scan = file_scan?; + + #[for_await] + for record_batch in file_scan.scan().await? { + let batch = record_batch?; + let msgs = self.convert_record_batch_to_source_with_state(&batch)?; + yield msgs; + } + } + } + + #[try_stream(ok = StreamChunkWithState, error = RwError)] + async fn into_stream(self) { + let (props, mut split, parser_config, source_ctx) = ( + self.props.clone(), + self.split.clone(), + self.parser_config.clone(), + self.source_ctx.clone(), + ); + tracing::info!("Starting to read pulsar message from iceberg"); + let mut last_msg_id = None; + + #[for_await] + for msg in self.as_stream_chunk_stream() { + let msg = + msg.inspect_err(|e| tracing::error!("Failed to read message from iceberg: {}", e))?; + last_msg_id = msg + .split_offset_mapping + .as_ref() + .and_then(|m| m.get(self.split.topic.to_string().as_str())) + .cloned(); + } + + tracing::info!("Finished reading pulsar message from iceberg"); + // We finished reading all the data from iceberg table, now we need to start from broker. + if let Some(msg_id) = last_msg_id { + tracing::info!("Last iceberg message id is {}", msg_id); + split.start_offset = PulsarEnumeratorOffset::MessageId(msg_id); + } + + tracing::info!( + "Switching from pulsar iceberg reader to broker reader with offset: {:?}", + split.start_offset + ); + let broker_reader = PulsarSplitReader::Broker( + PulsarBrokerReader::new(props, vec![split], parser_config, source_ctx, None).await?, + ); + + #[for_await] + for msg in broker_reader.into_stream() { + yield msg?; + } + } + + fn build_iceberg_configs(&self) -> Result> { + let mut iceberg_configs = HashMap::new(); + + let bucket = + self.props.iceberg_bucket.as_ref().ok_or_else(|| { + ConnectorError::Pulsar(anyhow!("Iceberg bucket is not configured")) + })?; + + iceberg_configs.insert(CATALOG_TYPE.to_string(), "storage".to_string()); + iceberg_configs.insert(CATALOG_NAME.to_string(), "pulsar".to_string()); + iceberg_configs.insert( + "iceberg.catalog.pulsar.warehouse".to_string(), + format!( + "s3://{}/{}/{}", + bucket, self.split.topic.tenant, self.split.topic.namespace, + ), + ); + + if let Some(s3_configs) = self.props.common.oauth.as_ref().map(|s| &s.s3_credentials) { + if let Some(region) = s3_configs.get(REGION) { + iceberg_configs.insert("iceberg.table.io.region".to_string(), region.to_string()); + } + + if let Some(access_key) = s3_configs.get(ACCESS_KEY) { + iceberg_configs.insert( + "iceberg.table.io.access_key_id".to_string(), + access_key.to_string(), + ); + } + + if let Some(secret_key) = s3_configs.get(SECRET_ACCESS) { + iceberg_configs.insert( + "iceberg.table.io.secret_access_key".to_string(), + secret_key.to_string(), + ); + } + } + + iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket.to_string()); + iceberg_configs.insert( + "iceberg.table.io.root".to_string(), + format!( + "/{}/{}", + self.split.topic.tenant, self.split.topic.namespace + ), + ); + // #TODO + // Support load config file + iceberg_configs.insert( + "iceberg.table.io.disable_config_load".to_string(), + "true".to_string(), + ); + + Ok(iceberg_configs) + } + + // Converts arrow record batch to stream chunk. + fn convert_record_batch_to_source_with_state( + &self, + record_batch: &RecordBatch, + ) -> Result { + let mut offsets = Vec::with_capacity(record_batch.num_rows()); + + let ledger_id_array = record_batch + .column_by_name(META_COLUMN_LEDGER_ID) + .ok_or_else(|| ConnectorError::Pulsar(anyhow!("Ledger id not found in iceberg table")))? + .as_any() + .downcast_ref::() + .ok_or_else(|| { + ConnectorError::Pulsar(anyhow!("Ledger id is not i64 in iceberg table")) + })?; + + let entry_id_array = record_batch + .column_by_name(META_COLUMN_ENTRY_ID) + .ok_or_else(|| ConnectorError::Pulsar(anyhow!("Entry id not found in iceberg table")))? + .as_any() + .downcast_ref::() + .ok_or_else(|| { + ConnectorError::Pulsar(anyhow!("Entry id is not i64 in iceberg table")) + })?; + + let partition_array = record_batch + .column_by_name(META_COLUMN_PARTITION) + .map(|arr| { + arr.as_any().downcast_ref::().ok_or_else(|| { + ConnectorError::Pulsar(anyhow!("Partition is not i64 in iceberg table")) + }) + }) + .transpose()?; + + let batch_index_array = record_batch + .column_by_name(META_COLUMN_BATCH_INDEX) + .map(|arr| { + arr.as_any().downcast_ref::().ok_or_else(|| { + ConnectorError::Pulsar(anyhow!("Batch index is not i64 in iceberg table")) + }) + }) + .transpose()?; + + let field_indices = self + .parser_config + .common + .rw_columns + .iter() + .filter(|col| col.name != ROWID_PREFIX) + .map(|col| { + record_batch + .schema() + .index_of(col.name.as_str()) + .map_err(|e| anyhow!(e)) + }) + .collect::>>()?; + + for row in 0..record_batch.num_rows() { + let offset = format!( + "{}:{}:{}:{}", + ledger_id_array.value(row), + entry_id_array.value(row), + partition_array.map(|arr| arr.value(row)).unwrap_or(-1), + batch_index_array.map(|arr| arr.value(row)).unwrap_or(-1) + ); + + offsets.push(offset); + } + + let data_chunk = DataChunk::try_from(&record_batch.project(&field_indices)?)?; + + let stream_chunk = StreamChunk::from(data_chunk); + + let state = Some(HashMap::from([( + self.split.topic.to_string().into(), + offsets.last().unwrap().clone(), + )])); + + Ok(StreamChunkWithState { + chunk: stream_chunk, + split_offset_mapping: state, + }) + } +}