Skip to content

Commit

Permalink
extra compatibility check in frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Sep 29, 2023
1 parent c219b4a commit e6942fc
Showing 1 changed file with 59 additions and 3 deletions.
62 changes: 59 additions & 3 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::rc::Rc;
use std::sync::LazyLock;

use itertools::Itertools;
use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, UserId};
use risingwave_common::error::{ErrorCode, Result};
Expand All @@ -24,8 +27,8 @@ use risingwave_connector::sink::{
};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_sqlparser::ast::{
CreateSink, CreateSinkStatement, EmitMode, ObjectName, Query, Select, SelectItem, SetExpr,
SinkSchema, TableFactor, TableWithJoins,
CreateSink, CreateSinkStatement, EmitMode, Encode, Format, ObjectName, Query, Select,
SelectItem, SetExpr, SinkSchema, TableFactor, TableWithJoins,
};

use super::create_mv::get_column_names;
Expand Down Expand Up @@ -123,7 +126,10 @@ pub fn gen_sink_plan(
.ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
let format_desc = match stmt.sink_schema {
// Case A: new syntax `format ... encode ...`
Some(f) => Some(bind_sink_format_desc(f)?),
Some(f) => {
validate_compatibility(connector, &f)?;
Some(bind_sink_format_desc(f)?)
},
None => match with_options.get(SINK_TYPE_OPTION) {
// Case B: old syntax `type = '...'`
Some(t) => SinkFormatDesc::from_legacy_type(connector, t)?.map(|mut f| {
Expand Down Expand Up @@ -251,6 +257,56 @@ fn bind_sink_format_desc(value: SinkSchema) -> Result<SinkFormatDesc> {
})
}

static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
LazyLock::new(|| {
use risingwave_connector::sink::kafka::KafkaSink;
use risingwave_connector::sink::kinesis::KinesisSink;
use risingwave_connector::sink::pulsar::PulsarSink;
use risingwave_connector::sink::Sink as _;

convert_args!(hashmap!(
KafkaSink::SINK_NAME => hashmap!(
Format::Plain => vec![Encode::Json],
Format::Upsert => vec![Encode::Json],
Format::Debezium => vec![Encode::Json],
),
KinesisSink::SINK_NAME => hashmap!(
Format::Plain => vec![Encode::Json],
Format::Upsert => vec![Encode::Json],
Format::Debezium => vec![Encode::Json],
),
PulsarSink::SINK_NAME => hashmap!(
Format::Plain => vec![Encode::Json],
Format::Upsert => vec![Encode::Json],
Format::Debezium => vec![Encode::Json],
),
))
});
pub fn validate_compatibility(connector: &str, format_desc: &SinkSchema) -> Result<()> {
let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
.get(connector)
.ok_or_else(|| {
ErrorCode::BindError(format!(
"connector {} is not supported by FORMAT ... ENCODE ... syntax",
connector
))
})?;
let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
ErrorCode::BindError(format!(
"connector {} does not support format {:?}",
connector, format_desc.format
))
})?;
if !compatible_encodes.contains(&format_desc.row_encode) {
return Err(ErrorCode::BindError(format!(
"connector {} does not support format {:?} with encode {:?}",
connector, format_desc.format, format_desc.row_encode
))
.into());
}
Ok(())
}

/// For `planner_test` crate so that it does not depend directly on `connector` crate just for `SinkFormatDesc`.
impl TryFrom<&WithOptions> for Option<SinkFormatDesc> {
type Error = risingwave_connector::sink::SinkError;
Expand Down

0 comments on commit e6942fc

Please sign in to comment.