Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Sep 26, 2023
2 parents 6296059 + 09a1dcb commit 88db8a1
Show file tree
Hide file tree
Showing 40 changed files with 1,074 additions and 894 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion e2e_test/sink/append_only_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ create sink invalid_sink_type from t with (connector = 'blackhole', type = 'inva
statement error `force_append_only` must be true or false
create sink invalid_force_append_only from t with (connector = 'blackhole', force_append_only = 'invalid');

statement error invalid connector type: invalid
statement error db error: ERROR: QueryError: internal error: Sink error: config error: unsupported sink connector invalid
create sink invalid_connector from t with (connector = 'invalid');

statement ok
Expand Down
6 changes: 2 additions & 4 deletions proto/backup_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@ enum BackupJobStatus {
UNSPECIFIED = 0;
RUNNING = 1;
SUCCEEDED = 2;
// NOT_FOUND indicates one of these cases:
// - Invalid job id.
// - Job has failed.
// - Job has succeeded, but its resulted backup has been deleted later.
NOT_FOUND = 3;
FAILED = 4;
}
message BackupMetaRequest {}
message BackupMetaResponse {
Expand All @@ -29,6 +26,7 @@ message GetBackupJobStatusRequest {
message GetBackupJobStatusResponse {
uint64 job_id = 1;
BackupJobStatus job_status = 2;
string message = 3;
}
message DeleteMetaSnapshotRequest {
repeated uint64 snapshot_ids = 1;
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "6
] }
csv = "1.2"
duration-str = "0.5.1"
easy-ext = "1"
enum-as-inner = "0.6"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
Expand Down
68 changes: 68 additions & 0 deletions src/connector/src/sink/blackhole.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset};
use crate::sink::{
DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkError, SinkParam, SinkWriterParam,
};

pub const BLACKHOLE_SINK: &str = "blackhole";

#[derive(Debug)]
pub struct BlackHoleSink;

impl TryFrom<SinkParam> for BlackHoleSink {
type Error = SinkError;

fn try_from(_value: SinkParam) -> std::result::Result<Self, Self::Error> {
Ok(Self)
}
}

impl Sink for BlackHoleSink {
type Coordinator = DummySinkCommitCoordinator;
type LogSinker = Self;

const SINK_NAME: &'static str = BLACKHOLE_SINK;

async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(Self)
}

async fn validate(&self) -> Result<()> {
Ok(())
}
}

impl LogSinker for BlackHoleSink {
async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> {
log_reader.init().await?;
loop {
let (epoch, item) = log_reader.next_item().await?;
match item {
LogStoreReadItem::StreamChunk { chunk_id, .. } => {
log_reader
.truncate(TruncateOffset::Chunk { epoch, chunk_id })
.await?;
}
LogStoreReadItem::Barrier { .. } => {
log_reader
.truncate(TruncateOffset::Barrier { epoch })
.await?;
}
LogStoreReadItem::UpdateVnodeBitmap(_) => {}
}
}
}
}
35 changes: 2 additions & 33 deletions src/connector/src/sink/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{Debug, Formatter};
use std::ops::{Deref, DerefMut};
use std::ops::DerefMut;
use std::sync::Arc;

use async_trait::async_trait;
use risingwave_common::array::StreamChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_pb::connector_service::SinkMetadata;
use risingwave_rpc_client::ConnectorClient;

use crate::sink::{Sink, SinkCommitCoordinator, SinkWriter, SinkWriterParam};
use crate::sink::{SinkCommitCoordinator, SinkWriter};

pub type BoxWriter<CM> = Box<dyn SinkWriter<CommitMetadata = CM> + Send + 'static>;
pub type BoxCoordinator = Box<dyn SinkCommitCoordinator + Send + 'static>;
pub type BoxSink =
Box<dyn Sink<Writer = BoxWriter<()>, Coordinator = BoxCoordinator> + Send + Sync + 'static>;

impl Debug for BoxSink {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("BoxSink")
}
}

#[async_trait]
impl<CM: 'static + Send> SinkWriter for BoxWriter<CM> {
Expand Down Expand Up @@ -70,24 +60,3 @@ impl SinkCommitCoordinator for BoxCoordinator {
self.deref_mut().commit(epoch, metadata).await
}
}

#[async_trait]
impl Sink for BoxSink {
type Coordinator = BoxCoordinator;
type Writer = BoxWriter<()>;

async fn validate(&self) -> crate::sink::Result<()> {
self.deref().validate().await
}

async fn new_writer(&self, writer_param: SinkWriterParam) -> crate::sink::Result<Self::Writer> {
self.deref().new_writer(writer_param).await
}

async fn new_coordinator(
&self,
connector_client: Option<ConnectorClient>,
) -> crate::sink::Result<Self::Coordinator> {
self.deref().new_coordinator(connector_client).await
}
}
32 changes: 18 additions & 14 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ use serde_with::serde_as;

use super::{DummySinkCommitCoordinator, SinkWriterParam};
use crate::common::ClickHouseCommon;
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::{
Result, Sink, SinkError, SinkWriter, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};

pub const CLICKHOUSE_SINK: &str = "clickhouse";
Expand Down Expand Up @@ -70,21 +71,22 @@ impl ClickHouseConfig {
}
}

impl ClickHouseSink {
pub fn new(
config: ClickHouseConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
) -> Result<Self> {
impl TryFrom<SinkParam> for ClickHouseSink {
type Error = SinkError;

fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
let schema = param.schema();
let config = ClickHouseConfig::from_hashmap(param.properties)?;
Ok(Self {
config,
schema,
pk_indices,
is_append_only,
pk_indices: param.downstream_pk,
is_append_only: param.sink_type.is_append_only(),
})
}
}

impl ClickHouseSink {
/// Check that the column names and types of risingwave and clickhouse are identical
fn check_column_name_and_type(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
let rw_fields_name = build_fields_name_type_from_schema(&self.schema)?;
Expand Down Expand Up @@ -205,10 +207,11 @@ impl ClickHouseSink {
Ok(())
}
}
#[async_trait::async_trait]
impl Sink for ClickHouseSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = ClickHouseSinkWriter;
type LogSinker = LogSinkerOf<ClickHouseSinkWriter>;

const SINK_NAME: &'static str = CLICKHOUSE_SINK;

async fn validate(&self) -> Result<()> {
// For upsert clickhouse sink, the primary key must be defined.
Expand Down Expand Up @@ -240,14 +243,15 @@ impl Sink for ClickHouseSink {
Ok(())
}

async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result<Self::Writer> {
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(ClickHouseSinkWriter::new(
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
)
.await?)
.await?
.into_log_sinker(writer_param.sink_metrics))
}
}
pub struct ClickHouseSinkWriter {
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/sink/coordinate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use risingwave_pb::connector_service::SinkMetadata;
use risingwave_rpc_client::{CoordinatorStreamHandle, SinkCoordinationRpcClient};
use tracing::warn;

use crate::sink::{Result, SinkError, SinkParam, SinkWriter};
use crate::sink::writer::SinkWriter;
use crate::sink::{Result, SinkError, SinkParam};

pub struct CoordinatedSinkWriter<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> {
epoch: u64,
Expand Down
30 changes: 25 additions & 5 deletions src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use super::doris_connector::{DorisField, DorisInsert, DorisInsertClient, DORIS_D
use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use crate::common::DorisCommon;
use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam};
use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
use crate::sink::{
DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriter, SinkWriterParam,
};

pub const DORIS_SINK: &str = "doris";
#[serde_as]
Expand Down Expand Up @@ -151,19 +154,21 @@ impl DorisSink {
}
}

#[async_trait]
impl Sink for DorisSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = DorisSinkWriter;
type LogSinker = LogSinkerOf<DorisSinkWriter>;

const SINK_NAME: &'static str = DORIS_SINK;

async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result<Self::Writer> {
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(DorisSinkWriter::new(
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
)
.await?)
.await?
.into_log_sinker(writer_param.sink_metrics))
}

async fn validate(&self) -> Result<()> {
Expand Down Expand Up @@ -195,6 +200,21 @@ pub struct DorisSinkWriter {
row_encoder: JsonEncoder,
}

impl TryFrom<SinkParam> for DorisSink {
type Error = SinkError;

fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
let schema = param.schema();
let config = DorisConfig::from_hashmap(param.properties)?;
DorisSink::new(
config,
schema,
param.downstream_pk,
param.sink_type.is_append_only(),
)
}
}

impl DorisSinkWriter {
pub async fn new(
config: DorisConfig,
Expand Down
35 changes: 26 additions & 9 deletions src/connector/src/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,26 @@ use serde_json::Value;
use url::Url;

use super::{
Sink, SinkError, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION,
SINK_TYPE_UPSERT,
Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
use crate::deserialize_bool_from_string;
use crate::sink::coordinate::CoordinatedSinkWriter;
use crate::sink::remote::{CoordinatedRemoteSink, RemoteConfig};
use crate::sink::remote::{CoordinatedRemoteSink, RemoteSinkTrait};
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::{Result, SinkCommitCoordinator, SinkParam};

/// This iceberg sink is WIP. When it ready, we will change this name to "iceberg".
pub const ICEBERG_SINK: &str = "iceberg";
pub const REMOTE_ICEBERG_SINK: &str = "iceberg_java";

pub type RemoteIcebergSink = CoordinatedRemoteSink;
pub type RemoteIcebergConfig = RemoteConfig;
#[derive(Debug)]
pub struct RemoteIceberg;

impl RemoteSinkTrait for RemoteIceberg {
const SINK_NAME: &'static str = REMOTE_ICEBERG_SINK;
}

pub type RemoteIcebergSink = CoordinatedRemoteSink<RemoteIceberg>;

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -192,6 +198,15 @@ pub struct IcebergSink {
param: SinkParam,
}

impl TryFrom<SinkParam> for IcebergSink {
type Error = SinkError;

fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
let config = IcebergConfig::from_hashmap(param.properties.clone())?;
IcebergSink::new(config, param)
}
}

impl Debug for IcebergSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IcebergSink")
Expand Down Expand Up @@ -240,17 +255,18 @@ impl IcebergSink {
}
}

#[async_trait::async_trait]
impl Sink for IcebergSink {
type Coordinator = IcebergSinkCommitter;
type Writer = CoordinatedSinkWriter<IcebergWriter>;
type LogSinker = LogSinkerOf<CoordinatedSinkWriter<IcebergWriter>>;

const SINK_NAME: &'static str = ICEBERG_SINK;

async fn validate(&self) -> Result<()> {
let _ = self.create_table().await?;
Ok(())
}

async fn new_writer(&self, writer_param: SinkWriterParam) -> Result<Self::Writer> {
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
let table = self.create_table().await?;

let inner = IcebergWriter {
Expand All @@ -275,7 +291,8 @@ impl Sink for IcebergSink {
})?,
inner,
)
.await?)
.await?
.into_log_sinker(writer_param.sink_metrics))
}

async fn new_coordinator(
Expand Down
Loading

0 comments on commit 88db8a1

Please sign in to comment.