diff --git a/Cargo.lock b/Cargo.lock index 20ee05110f01e..495aa7930fb99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -241,7 +241,7 @@ dependencies = [ [[package]] name = "apache-avro" version = "0.16.0" -source = "git+https://github.com/risingwavelabs/avro?rev=5349b0c7b35940d117397edbd314ca9087cdb892#5349b0c7b35940d117397edbd314ca9087cdb892" +source = "git+https://github.com/risingwavelabs/avro?rev=25113ba88234a9ae23296e981d8302c290fdaa4b#25113ba88234a9ae23296e981d8302c290fdaa4b" dependencies = [ "bzip2", "crc32fast", diff --git a/Cargo.toml b/Cargo.toml index d5462dd40cd7d..dc07bb6b9fffb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ repository = "https://github.com/risingwavelabs/risingwave" [workspace.dependencies] foyer = { version = "0.9.4", features = ["nightly"] } -apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "5349b0c7b35940d117397edbd314ca9087cdb892", features = [ +apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [ "snappy", "zstandard", "bzip", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 029b68c9c619c..2ba8965f3af26 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -127,8 +127,8 @@ rustls-native-certs = "0.7" rustls-pemfile = "2" rustls-pki-types = "1" rw_futures_util = { workspace = true } -sea-schema = { version = "0.14", features = [ - "default", +sea-schema = { version = "0.14", default-features = false, features = [ + "discovery", "sqlx-postgres", "sqlx-mysql", ] } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 26cf746b535dc..10858aefa92a3 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -245,11 +245,9 @@ impl<'a> MessageMeta<'a> { } trait OpAction { - type Output<'a>; + fn output_for<'a>(datum: impl Into>) -> DatumCow<'a>; - fn output_for<'a>(datum: impl Into>) -> Self::Output<'a>; - - fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output<'_>); + fn apply(builder: &mut ArrayBuilderImpl, output: DatumCow<'_>); fn rollback(builder: &mut ArrayBuilderImpl); @@ -259,10 +257,8 @@ trait OpAction { struct OpActionInsert; impl OpAction for OpActionInsert { - type Output<'a> = DatumCow<'a>; - #[inline(always)] - fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { + fn output_for<'a>(datum: impl Into>) -> DatumCow<'a> { datum.into() } @@ -285,10 +281,8 @@ impl OpAction for OpActionInsert { struct OpActionDelete; impl OpAction for OpActionDelete { - type Output<'a> = DatumCow<'a>; - #[inline(always)] - fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { + fn output_for<'a>(datum: impl Into>) -> DatumCow<'a> { datum.into() } @@ -308,36 +302,6 @@ impl OpAction for OpActionDelete { } } -struct OpActionUpdate; - -impl OpAction for OpActionUpdate { - type Output<'a> = (DatumCow<'a>, DatumCow<'a>); - - #[inline(always)] - fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { - let datum = datum.into(); - (datum.clone(), datum) - } - - #[inline(always)] - fn apply(builder: &mut ArrayBuilderImpl, output: (DatumCow<'_>, DatumCow<'_>)) { - builder.append(output.0); - builder.append(output.1); - } - - #[inline(always)] - fn rollback(builder: &mut ArrayBuilderImpl) { - builder.pop().unwrap(); - builder.pop().unwrap(); - } - - #[inline(always)] - fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) { - writer.append_op(Op::UpdateDelete); - writer.append_op(Op::UpdateInsert); - } -} - impl SourceStreamChunkRowWriter<'_> { fn append_op(&mut self, op: Op) { self.op_builder.push(op); @@ -346,7 +310,7 @@ impl SourceStreamChunkRowWriter<'_> { fn do_action<'a, A: OpAction>( &'a mut self, - mut f: impl FnMut(&SourceColumnDesc) -> AccessResult>, + mut f: impl FnMut(&SourceColumnDesc) -> AccessResult>, ) -> AccessResult<()> { let mut parse_field = |desc: &SourceColumnDesc| { match f(desc) { @@ -543,22 +507,6 @@ impl SourceStreamChunkRowWriter<'_> { { self.do_action::(|desc| f(desc).map(Into::into)) } - - /// Write a `Update` record to the [`StreamChunk`], with the given fallible closure that - /// produces two [`Datum`]s as old and new value by corresponding [`SourceColumnDesc`]. - /// - /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details. - #[inline(always)] - pub fn do_update<'a, D1, D2>( - &mut self, - mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<(D1, D2)>, - ) -> AccessResult<()> - where - D1: Into>, - D2: Into>, - { - self.do_action::(|desc| f(desc).map(|(old, new)| (old.into(), new.into()))) - } } /// Transaction control message. Currently only used by Debezium messages.