Skip to content

Commit

Permalink
remove dead code
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jun 20, 2024
1 parent 7f5056c commit e1e5cac
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 59 deletions.
4 changes: 2 additions & 2 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ rustls-native-certs = "0.7"
rustls-pemfile = "2"
rustls-pki-types = "1"
rw_futures_util = { workspace = true }
sea-schema = { version = "0.14", features = [
"default",
sea-schema = { version = "0.14", default-features = false, features = [
"discovery",
"sqlx-postgres",
"sqlx-mysql",
] }
Expand Down
62 changes: 5 additions & 57 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,9 @@ impl<'a> MessageMeta<'a> {
}

trait OpAction {
type Output<'a>;
fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> DatumCow<'a>;

fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a>;

fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output<'_>);
fn apply(builder: &mut ArrayBuilderImpl, output: DatumCow<'_>);

fn rollback(builder: &mut ArrayBuilderImpl);

Expand All @@ -259,10 +257,8 @@ trait OpAction {
struct OpActionInsert;

impl OpAction for OpActionInsert {
type Output<'a> = DatumCow<'a>;

#[inline(always)]
fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> DatumCow<'a> {
datum.into()
}

Expand All @@ -285,10 +281,8 @@ impl OpAction for OpActionInsert {
struct OpActionDelete;

impl OpAction for OpActionDelete {
type Output<'a> = DatumCow<'a>;

#[inline(always)]
fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> DatumCow<'a> {
datum.into()
}

Expand All @@ -308,36 +302,6 @@ impl OpAction for OpActionDelete {
}
}

struct OpActionUpdate;

impl OpAction for OpActionUpdate {
type Output<'a> = (DatumCow<'a>, DatumCow<'a>);

#[inline(always)]
fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
let datum = datum.into();
(datum.clone(), datum)
}

#[inline(always)]
fn apply(builder: &mut ArrayBuilderImpl, output: (DatumCow<'_>, DatumCow<'_>)) {
builder.append(output.0);
builder.append(output.1);
}

#[inline(always)]
fn rollback(builder: &mut ArrayBuilderImpl) {
builder.pop().unwrap();
builder.pop().unwrap();
}

#[inline(always)]
fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) {
writer.append_op(Op::UpdateDelete);
writer.append_op(Op::UpdateInsert);
}
}

impl SourceStreamChunkRowWriter<'_> {
fn append_op(&mut self, op: Op) {
self.op_builder.push(op);
Expand All @@ -346,7 +310,7 @@ impl SourceStreamChunkRowWriter<'_> {

fn do_action<'a, A: OpAction>(
&'a mut self,
mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<A::Output<'a>>,
mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<DatumCow<'a>>,
) -> AccessResult<()> {
let mut parse_field = |desc: &SourceColumnDesc| {
match f(desc) {
Expand Down Expand Up @@ -543,22 +507,6 @@ impl SourceStreamChunkRowWriter<'_> {
{
self.do_action::<OpActionDelete>(|desc| f(desc).map(Into::into))
}

/// Write a `Update` record to the [`StreamChunk`], with the given fallible closure that
/// produces two [`Datum`]s as old and new value by corresponding [`SourceColumnDesc`].
///
/// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
#[inline(always)]
pub fn do_update<'a, D1, D2>(
&mut self,
mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<(D1, D2)>,
) -> AccessResult<()>
where
D1: Into<DatumCow<'a>>,
D2: Into<DatumCow<'a>>,
{
self.do_action::<OpActionUpdate>(|desc| f(desc).map(|(old, new)| (old.into(), new.into())))
}
}

/// Transaction control message. Currently only used by Debezium messages.
Expand Down

0 comments on commit e1e5cac

Please sign in to comment.