Skip to content

Commit

Permalink
add sink impl to meta dev dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Oct 23, 2023
1 parent 69f74fd commit 7402d64
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 6 additions & 8 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ use risingwave_common::catalog::{
use risingwave_common::error::ErrorCode::{self, InvalidInputSyntax, ProtocolError};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_connector::common::{
KAFKA_CONNECTOR_NAME, KINESIS_CONNECTOR_NAME, PULSAR_CONNECTOR_NAME,
};
use risingwave_connector::parser::{
schema_to_columns, AvroParserConfig, DebeziumAvroParserConfig, ProtobufParserConfig,
SpecificParserConfig,
Expand All @@ -41,7 +38,8 @@ use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType};
use risingwave_connector::source::test_source::TEST_CONNECTOR;
use risingwave_connector::source::{
GOOGLE_PUBSUB_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, S3_CONNECTOR, S3_V2_CONNECTOR,
GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
PULSAR_CONNECTOR, S3_CONNECTOR, S3_V2_CONNECTOR,
};
use risingwave_pb::catalog::{
PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc,
Expand Down Expand Up @@ -838,22 +836,22 @@ pub(super) fn bind_source_watermark(
static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
LazyLock::new(|| {
convert_args!(hashmap!(
KAFKA_CONNECTOR_NAME => hashmap!(
KAFKA_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
Format::Upsert => vec![Encode::Json, Encode::Avro],
Format::Debezium => vec![Encode::Json, Encode::Avro],
Format::Maxwell => vec![Encode::Json],
Format::Canal => vec![Encode::Json],
Format::DebeziumMongo => vec![Encode::Json],
),
PULSAR_CONNECTOR_NAME => hashmap!(
PULSAR_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
Format::Upsert => vec![Encode::Json, Encode::Avro],
Format::Debezium => vec![Encode::Json],
Format::Maxwell => vec![Encode::Json],
Format::Canal => vec![Encode::Json],
),
KINESIS_CONNECTOR_NAME => hashmap!(
KINESIS_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
Format::Upsert => vec![Encode::Json, Encode::Avro],
Format::Debezium => vec![Encode::Json],
Expand Down Expand Up @@ -917,7 +915,7 @@ pub fn validate_compatibility(
CONNECTORS_COMPATIBLE_FORMATS.keys()
)))
})?;
if connector != KAFKA_CONNECTOR_NAME {
if connector != KAFKA_CONNECTOR {
let res = match (&source_schema.format, &source_schema.row_encode) {
(Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => {
let mut options = source_schema.gen_options().map_err(|e| anyhow!(e))?;
Expand Down
1 change: 1 addition & 0 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ workspace-hack = { path = "../workspace-hack" }
assert_matches = "1"
maplit = "1.0.2"
rand = "0.8"
risingwave_sink_impl = { workspace = true }
risingwave_test_runner = { workspace = true }

[features]
Expand Down
3 changes: 1 addition & 2 deletions src/meta/src/manager/sink_coordination/coordinator_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ impl CoordinatorWorker {
first_writer_request: NewSinkWriterRequest,
request_rx: UnboundedReceiver<NewSinkWriterRequest>,
) {
let coordinator_result = build_box_coordinator(first_writer_request.param.clone()).await;
let coordinator = match coordinator_result {
let coordinator = match build_box_coordinator(first_writer_request.param.clone()).await {
Ok(coordinator) => coordinator,
Err(e) => {
error!(
Expand Down

0 comments on commit 7402d64

Please sign in to comment.