Skip to content

Commit

Permalink
refactor(iceberg): extract IcebergCommon config (#18600)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Sep 20, 2024
1 parent 5aca914 commit 4bc3b28
Show file tree
Hide file tree
Showing 18 changed files with 737 additions and 636 deletions.
7 changes: 3 additions & 4 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, ScalarRefImpl};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::sink::iceberg::IcebergConfig;
use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit};
use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
use risingwave_connector::WithOptionsSecResolved;
Expand All @@ -42,7 +41,7 @@ use crate::task::BatchTaskContext;
static POSITION_DELETE_FILE_FILE_PATH_INDEX: usize = 0;
static POSITION_DELETE_FILE_POS: usize = 1;
pub struct IcebergScanExecutor {
iceberg_config: IcebergConfig,
iceberg_config: IcebergProperties,
#[allow(dead_code)]
snapshot_id: Option<i64>,
table_meta: TableMetadata,
Expand Down Expand Up @@ -70,7 +69,7 @@ impl Executor for IcebergScanExecutor {

impl IcebergScanExecutor {
pub fn new(
iceberg_config: IcebergConfig,
iceberg_config: IcebergProperties,
snapshot_id: Option<i64>,
table_meta: TableMetadata,
data_file_scan_tasks: Vec<FileScanTask>,
Expand Down Expand Up @@ -203,7 +202,7 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
let iceberg_properties: IcebergProperties = *iceberg_properties;
let split: IcebergSplit = split.clone();
Ok(Box::new(IcebergScanExecutor::new(
iceberg_properties.to_iceberg_config(),
iceberg_properties,
Some(split.snapshot_id),
split.table_meta.deserialize(),
split.files.into_iter().map(|x| x.deserialize()).collect(),
Expand Down
10 changes: 0 additions & 10 deletions src/connector/src/connector_common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;
use std::collections::BTreeMap;
use std::io::Write;
use std::time::Duration;
Expand All @@ -38,8 +37,6 @@ use crate::deserialize_duration_from_string;
use crate::error::ConnectorResult;
use crate::sink::SinkError;
use crate::source::nats::source::NatsOffset;
// The file describes the common abstractions for each connector and can be used in both source and
// sink.

pub const PRIVATE_LINK_BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints";
pub const PRIVATE_LINK_TARGETS_KEY: &str = "privatelink.targets";
Expand Down Expand Up @@ -563,13 +560,6 @@ impl KinesisCommon {
Ok(KinesisClient::from_conf(builder.build()))
}
}
#[derive(Debug, Deserialize)]
pub struct UpsertMessage<'a> {
#[serde(borrow)]
pub primary_key: Cow<'a, [u8]>,
#[serde(borrow)]
pub record: Cow<'a, [u8]>,
}

#[serde_as]
#[derive(Deserialize, Debug, Clone, WithOptions)]
Expand Down
Loading

0 comments on commit 4bc3b28

Please sign in to comment.