Skip to content

Commit

Permalink
use SinkError::Config
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Sep 29, 2023
1 parent 0610481 commit c1291f1
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
12 changes: 9 additions & 3 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl SinkFormatDesc {
}

impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
type Error = String;
type Error = SinkError;

fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
Expand All @@ -190,15 +190,21 @@ impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
F::Upsert => SinkFormat::Upsert,
F::Debezium => SinkFormat::Debezium,
f @ (F::Unspecified | F::Native | F::DebeziumMongo | F::Maxwell | F::Canal) => {
return Err(format!("sink format unsupported: {}", f.as_str_name()))
return Err(SinkError::Config(anyhow!(
"sink format unsupported: {}",
f.as_str_name()
)))
}
};
let encode = match value.encode() {
E::Json => SinkEncode::Json,
E::Protobuf => SinkEncode::Protobuf,
E::Avro => SinkEncode::Avro,
e @ (E::Unspecified | E::Native | E::Csv | E::Bytes) => {
return Err(format!("sink encode unsupported: {}", e.as_str_name()))
return Err(SinkError::Config(anyhow!(
"sink encode unsupported: {}",
e.as_str_name()
)))
}
};
let options = value.options.into_iter().collect();
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/from_proto/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl ExecutorBuilder for SinkExecutorBuilder {
}?;
let format_desc = match &sink_desc.format_desc {
// Case A: new syntax `format ... encode ...`
Some(f) => Some(f.clone().try_into().map_err(|e| anyhow!("{e}"))?),
Some(f) => Some(f.clone().try_into()?),
None => match sink_desc.properties.get(SINK_TYPE_OPTION) {
// Case B: old syntax `type = '...'`
Some(t) => SinkFormatDesc::from_legacy_type(connector, t)?,
Expand Down

0 comments on commit c1291f1

Please sign in to comment.