From 7402d64f14bb491dda99b97cf92c6b0cb0492661 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 23 Oct 2023 16:18:17 +0800 Subject: [PATCH] add sink impl to meta dev dependency --- Cargo.lock | 1 + src/frontend/src/handler/create_source.rs | 14 ++++++-------- src/meta/Cargo.toml | 1 + .../sink_coordination/coordinator_worker.rs | 3 +-- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fd254db674006..8ffdd45662e16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7683,6 +7683,7 @@ dependencies = [ "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", + "risingwave_sink_impl", "risingwave_sqlparser", "risingwave_test_runner", "scopeguard", diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 5ca18e98edcad..6f7de61285cb0 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -26,9 +26,6 @@ use risingwave_common::catalog::{ use risingwave_common::error::ErrorCode::{self, InvalidInputSyntax, ProtocolError}; use risingwave_common::error::{Result, RwError}; use risingwave_common::types::DataType; -use risingwave_connector::common::{ - KAFKA_CONNECTOR_NAME, KINESIS_CONNECTOR_NAME, PULSAR_CONNECTOR_NAME, -}; use risingwave_connector::parser::{ schema_to_columns, AvroParserConfig, DebeziumAvroParserConfig, ProtobufParserConfig, SpecificParserConfig, @@ -41,7 +38,8 @@ use risingwave_connector::source::datagen::DATAGEN_CONNECTOR; use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType}; use risingwave_connector::source::test_source::TEST_CONNECTOR; use risingwave_connector::source::{ - GOOGLE_PUBSUB_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, S3_CONNECTOR, S3_V2_CONNECTOR, + GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, + PULSAR_CONNECTOR, S3_CONNECTOR, S3_V2_CONNECTOR, }; use risingwave_pb::catalog::{ PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc, @@ -838,7 +836,7 @@ pub(super) fn bind_source_watermark( static CONNECTORS_COMPATIBLE_FORMATS: LazyLock>>> = LazyLock::new(|| { convert_args!(hashmap!( - KAFKA_CONNECTOR_NAME => hashmap!( + KAFKA_CONNECTOR => hashmap!( Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv], Format::Upsert => vec![Encode::Json, Encode::Avro], Format::Debezium => vec![Encode::Json, Encode::Avro], @@ -846,14 +844,14 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock vec![Encode::Json], Format::DebeziumMongo => vec![Encode::Json], ), - PULSAR_CONNECTOR_NAME => hashmap!( + PULSAR_CONNECTOR => hashmap!( Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes], Format::Upsert => vec![Encode::Json, Encode::Avro], Format::Debezium => vec![Encode::Json], Format::Maxwell => vec![Encode::Json], Format::Canal => vec![Encode::Json], ), - KINESIS_CONNECTOR_NAME => hashmap!( + KINESIS_CONNECTOR => hashmap!( Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes], Format::Upsert => vec![Encode::Json, Encode::Avro], Format::Debezium => vec![Encode::Json], @@ -917,7 +915,7 @@ pub fn validate_compatibility( CONNECTORS_COMPATIBLE_FORMATS.keys() ))) })?; - if connector != KAFKA_CONNECTOR_NAME { + if connector != KAFKA_CONNECTOR { let res = match (&source_schema.format, &source_schema.row_encode) { (Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => { let mut options = source_schema.gen_options().map_err(|e| anyhow!(e))?; diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 67e9a95026cc7..6977f37ee1096 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -98,6 +98,7 @@ workspace-hack = { path = "../workspace-hack" } assert_matches = "1" maplit = "1.0.2" rand = "0.8" +risingwave_sink_impl = { workspace = true } risingwave_test_runner = { workspace = true } [features] diff --git a/src/meta/src/manager/sink_coordination/coordinator_worker.rs b/src/meta/src/manager/sink_coordination/coordinator_worker.rs index a7e6f8d1bc7ef..921efb307bdd2 100644 --- a/src/meta/src/manager/sink_coordination/coordinator_worker.rs +++ b/src/meta/src/manager/sink_coordination/coordinator_worker.rs @@ -57,8 +57,7 @@ impl CoordinatorWorker { first_writer_request: NewSinkWriterRequest, request_rx: UnboundedReceiver, ) { - let coordinator_result = build_box_coordinator(first_writer_request.param.clone()).await; - let coordinator = match coordinator_result { + let coordinator = match build_box_coordinator(first_writer_request.param.clone()).await { Ok(coordinator) => coordinator, Err(e) => { error!(