Skip to content

Commit

Permalink
Add pulsar iceberg table reader
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Oct 18, 2023
1 parent 52f4c63 commit 5d5e496
Show file tree
Hide file tree
Showing 8 changed files with 379 additions and 10 deletions.
24 changes: 23 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,15 @@ criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.4.0" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.0" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "16dab0e36ab337e58ee8002d828def2d212fa116" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "186fde7663545d1d6a5856ce9fbbc541224eadfb" }
arrow-array = "47"
arrow-cast = "47"
arrow-schema = "47"
arrow-buffer = "47"
arrow-flight = "47"
arrow-select = "47"
arrow-ord = "47"
arrow-json = "47"
tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" }
tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [
"profiling",
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ apache-avro = { git = "https://github.com/risingwavelabs/avro", branch = "idx0de
"xz",
] }
arrow-array = { workspace = true }
arrow-json = { workspace = true }
arrow-schema = { workspace = true }
async-nats = "0.32"
async-trait = "0.1"
Expand Down
10 changes: 7 additions & 3 deletions src/connector/src/aws_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ use url::Url;

use crate::aws_auth::AwsAuthProps;

pub const REGION: &str = "region";
pub const ACCESS_KEY: &str = "access_key";
pub const SECRET_ACCESS: &str = "secret_access";

pub const AWS_DEFAULT_CONFIG: [&str; 7] = [
"region",
REGION,
"arn",
"profile",
"access_key",
"secret_access",
ACCESS_KEY,
SECRET_ACCESS,
"session_token",
"endpoint_url",
];
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub enum ConnectorError {
#[error("MySQL error: {0}")]
MySql(#[from] mysql_async::Error),

#[error("Pulsar error: {0}")]
Pulsar(anyhow::Error),

#[error(transparent)]
Internal(#[from] anyhow::Error),
}
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,9 @@ pub struct SourceMessage {
pub meta: SourceMeta,
}

#[derive(Debug, Clone)]
pub struct SourceMessages(pub Vec<SourceMessage>);

#[derive(Debug, Clone)]
pub enum SourceMeta {
Kafka(KafkaMeta),
Expand Down
8 changes: 7 additions & 1 deletion src/connector/src/source/pulsar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub use enumerator::*;
use serde::Deserialize;
pub use split::*;

use self::source::reader::PulsarSplitReader;
use crate::common::PulsarCommon;
use crate::source::pulsar::source::reader::PulsarSplitReader;
use crate::source::SourceProperties;

pub const PULSAR_CONNECTOR: &str = "pulsar";
Expand All @@ -45,4 +45,10 @@ pub struct PulsarProperties {

#[serde(flatten)]
pub common: PulsarCommon,

#[serde(rename = "iceberg.enabled", default)]
pub iceberg_loader_enabled: Option<String>,

#[serde(rename = "iceberg.bucket", default)]
pub iceberg_bucket: Option<String>,
}
Loading

0 comments on commit 5d5e496

Please sign in to comment.