Skip to content

Commit

Permalink
Code complete
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Oct 18, 2023
1 parent 152ca70 commit c76c8cb
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 81 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.4.0" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.0" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "1812a39a4701612470f59fbc81741b15b72d936e" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "186fde7663545d1d6a5856ce9fbbc541224eadfb" }
arrow-array = "47"
arrow-cast = "47"
arrow-schema = "47"
Expand Down
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ apache-avro = { git = "https://github.com/risingwavelabs/avro", branch = "idx0de
"xz",
] }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
arrow-json = { workspace = true }
arrow-schema = { workspace = true }
async-nats = "0.32"
async-trait = "0.1"
auto_enums = { version = "0.8", features = ["futures03"] }
Expand Down
10 changes: 7 additions & 3 deletions src/connector/src/aws_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ use url::Url;

use crate::aws_auth::AwsAuthProps;

pub const REGION: &str = "region";
pub const ACCESS_KEY: &str = "access_key";
pub const SECRET_ACCESS: &str = "secret_access";

pub const AWS_DEFAULT_CONFIG: [&str; 7] = [
"region",
REGION,
"arn",
"profile",
"access_key",
"secret_access",
ACCESS_KEY,
SECRET_ACCESS,
"session_token",
"endpoint_url",
];
Expand Down
10 changes: 8 additions & 2 deletions src/connector/src/source/pulsar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ pub use enumerator::*;
use serde::Deserialize;
pub use split::*;

use self::source::reader::PulsarSplitReader;
use crate::common::PulsarCommon;
use crate::source::pulsar::source::reader::PulsarBrokerReader;
use crate::source::SourceProperties;

pub const PULSAR_CONNECTOR: &str = "pulsar";

impl SourceProperties for PulsarProperties {
type Split = PulsarSplit;
type SplitEnumerator = PulsarSplitEnumerator;
type SplitReader = PulsarBrokerReader;
type SplitReader = PulsarSplitReader;

const SOURCE_NAME: &'static str = PULSAR_CONNECTOR;
}
Expand All @@ -45,4 +45,10 @@ pub struct PulsarProperties {

#[serde(flatten)]
pub common: PulsarCommon,

#[serde(rename = "iceberg.enabled", default)]
pub iceberg_loader_enabled: Option<String>,

#[serde(rename = "iceberg.bucket", default)]
pub iceberg_bucket: Option<String>,
}
66 changes: 1 addition & 65 deletions src/connector/src/source/pulsar/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::anyhow;
use arrow_array::RecordBatch;
use pulsar::consumer::Message;

use crate::error::ConnectorError;
use crate::source::pulsar::topic::Topic;
use crate::source::{SourceMessage, SourceMessages, SourceMeta};
use crate::source::{SourceMessage, SourceMeta};

impl From<Message<Vec<u8>>> for SourceMessage {
fn from(msg: Message<Vec<u8>>) -> Self {
Expand All @@ -39,63 +35,3 @@ impl From<Message<Vec<u8>>> for SourceMessage {
}
}
}

/// Meta columns from pulsar's iceberg table
const META_COLUMN_TOPIC: &str = "_topic";
const META_COLUMN_KEY: &str = "_key";
const META_COLUMN_LEDGER_ID: &str = "_ledgerId";
const META_COLUMN_ENTRY_ID: &str = "_entryId";
const META_COLUMN_BATCH_INDEX: &str = "_batchIndex";
const META_COLUMN_PARTITION: &str = "_partition";

impl TryFrom<(&RecordBatch, &Topic)> for SourceMessages {
type Error = ConnectorError;

fn try_from(value: (&RecordBatch, &Topic)) -> Result<Self, ConnectorError> {
let (batch, topic) = value;

let mut ret = Vec::with_capacity(batch.num_rows());
let jsons = arrow_json::writer::record_batches_to_json_rows(&[batch]).map_err(|e| {
ConnectorError::Pulsar(anyhow!("Failed to convert record batch to json: {}", e))
})?;
for json in jsons {
let source_message = SourceMessage {
key: json
.get(META_COLUMN_KEY)
.and_then(|v| v.as_str())
.map(|v| v.as_bytes().to_vec()),
payload: Some(
serde_json::to_string(&json)
.map_err(|e| {
ConnectorError::Pulsar(anyhow!("Failed to serialize json: {}", e))
})?
.into_bytes(),
),
offset: format!(
"{}:{}:{}:{}",
json.get(META_COLUMN_LEDGER_ID)
.and_then(|v| v.as_i64())
.ok_or_else(|| ConnectorError::Pulsar(anyhow!(
"Ledger id not found in iceberg table"
)))?,
json.get(META_COLUMN_ENTRY_ID)
.and_then(|v| v.as_i64())
.ok_or_else(|| ConnectorError::Pulsar(anyhow!(
"Entry id not found in iceberg table"
)))?,
json.get(META_COLUMN_PARTITION)
.and_then(|v| v.as_i64())
.unwrap_or(-1),
json.get(META_COLUMN_BATCH_INDEX)
.and_then(|v| v.as_i64())
.unwrap_or(-1)
),
split_id: topic.to_string().into(),
meta: SourceMeta::Empty,
};
ret.push(source_message);
}

Ok(SourceMessages(ret))
}
}
Loading

0 comments on commit c76c8cb

Please sign in to comment.