Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion committed May 8, 2024
1 parent 3f0697d commit 09caf0a
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 68 deletions.
3 changes: 0 additions & 3 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,6 @@ insert into t_kafka values
(6, 'V4y71v4Gip', 4014, 10844, 28842, 5885.368, 11210.458724794062, '2023-04-13 10:42:02.137742', '\xCAFEBABE', '4 hour', '0001-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
(7, 'YIVLnWxHyf', 10324, 12652, 15914, 3946.7434, 10967.182297153104, '2023-04-14 04:41:03.083742', '\xBABEC0DE', '3 day', '0001-01-01', '01:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}');

# wait a second to wait for the sink job to create kafka topic
sleep 1s

statement error
create sink si_kafka_without_snapshot as select * from t_kafka with (
connector = 'kafka',
Expand Down
63 changes: 31 additions & 32 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub trait SinkFormatter {
fn format_chunk(
&self,
chunk: &StreamChunk,
) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>>;
) -> impl Iterator<Item=Result<(Option<Self::K>, Option<Self::V>)>>;
}

/// `tri!` in generators yield `Err` and return `()`
Expand Down Expand Up @@ -160,47 +160,46 @@ impl EncoderBuild for ProtoEncoder {

impl EncoderBuild for TextEncoder {
async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
if let Some(pk_indices) = pk_indices {
if pk_indices.len() != 1 {
return Err(SinkError::Config(anyhow!(
let Some(pk_indices) = pk_indices else {
return Err(SinkError::Config(anyhow!(
"TextEncoder requires primary key columns to be specified"
)));
};
if pk_indices.len() != 1 {
return Err(SinkError::Config(anyhow!(
"The key encode is TEXT, but the primary key has {} columns. The key encode TEXT requires the primary key to be a single column",
pk_indices.len()
)));
}
}

let schema_ref = params.schema.fields().get(pk_indices[0]).ok_or_else(|| {
SinkError::Config(anyhow!(
"The primary key column index {} is out of bounds in schema {:?}",
pk_indices[0],
params.schema
))
})?;
match &schema_ref.data_type() {
DataType::Varchar
| DataType::Boolean
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Int256
| DataType::Serial => {}
_ => {
// why we don't allow float as text for key encode: https://github.com/risingwavelabs/risingwave/issues/6412
return Err(SinkError::Config(
anyhow!(
let schema_ref = params.schema.fields().get(pk_indices[0]).ok_or_else(|| {
SinkError::Config(anyhow!(
"The primary key column index {} is out of bounds in schema {:?}",
pk_indices[0],
params.schema
))
})?;
match &schema_ref.data_type() {
DataType::Varchar
| DataType::Boolean
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Int256
| DataType::Serial => {}
_ => {
// why we don't allow float as text for key encode: https://github.com/risingwavelabs/risingwave/pull/16377#discussion_r1591864960
return Err(SinkError::Config(
anyhow!(
"The key encode is TEXT, but the primary key column {} has type {}. The key encode TEXT requires the primary key column to be of type varchar, bool, small int, int, big int, serial or rw_int256.",
schema_ref.name,
schema_ref.data_type
),
));
}
));
}

Ok(Self::new(params.schema, pk_indices[0]))
} else {
Err(SinkError::Config(anyhow!(
"TextEncoder requires primary key columns to be specified"
)))
}

Ok(Self::new(params.schema, pk_indices[0]))
}
}

Expand Down
20 changes: 10 additions & 10 deletions src/sqlparser/src/ast/legacy_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,28 +125,28 @@ pub fn parse_source_schema(p: &mut Parser) -> Result<CompatibleSourceSchema, Par
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum SourceSchema {
Protobuf(ProtobufSchema),
// Keyword::PROTOBUF ProtobufSchema
Json,
Protobuf(ProtobufSchema),
// Keyword::JSON
DebeziumJson,
Json,
// Keyword::DEBEZIUM_JSON
DebeziumJson,
DebeziumMongoJson,
UpsertJson,
// Keyword::UPSERT_JSON
Avro(AvroSchema),
UpsertJson,
// Keyword::AVRO
UpsertAvro(AvroSchema),
Avro(AvroSchema),
// Keyword::UpsertAVRO
Maxwell,
UpsertAvro(AvroSchema),
// Keyword::MAXWELL
CanalJson,
Maxwell,
// Keyword::CANAL_JSON
Csv(CsvInfo),
CanalJson,
// Keyword::CSV
Csv(CsvInfo),
Native,
DebeziumAvro(DebeziumAvroSchema),
// Keyword::DEBEZIUM_AVRO
DebeziumAvro(DebeziumAvroSchema),
Bytes,
}

Expand Down
14 changes: 7 additions & 7 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,20 +154,20 @@ impl Format {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum Encode {
Avro,
// Keyword::Avro
Csv,
Avro,
// Keyword::CSV
Protobuf,
Csv,
// Keyword::PROTOBUF
Json,
Protobuf,
// Keyword::JSON
Bytes,
Json,
// Keyword::BYTES
None,
Bytes,
// Keyword::None
Text,
None,
// Keyword::TEXT
Text,
Native,
Template,
}
Expand Down
24 changes: 8 additions & 16 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,25 +155,17 @@ type ColumnsDefTuple = (
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum Precedence {
Zero = 0,
LogicalOr,
// 5 in upstream
LogicalOr, // 5 in upstream
LogicalXor,
LogicalAnd,
// 10 in upstream
UnaryNot,
// 15 in upstream
Is,
// 17 in upstream
LogicalAnd, // 10 in upstream
UnaryNot, // 15 in upstream
Is, // 17 in upstream
Cmp,
Like,
// 19 in upstream
Between,
// 20 in upstream
Like, // 19 in upstream
Between, // 20 in upstream
Other,
PlusMinus,
// 30 in upstream
MulDiv,
// 40 in upstream
PlusMinus, // 30 in upstream
MulDiv, // 40 in upstream
Exp,
UnaryPosNeg,
PostfixFactorial,
Expand Down

0 comments on commit 09caf0a

Please sign in to comment.