diff --git a/src/connector/src/schema/avro.rs b/src/connector/src/schema/avro.rs
index 22c5fb4acadd1..557953dc5b342 100644
--- a/src/connector/src/schema/avro.rs
+++ b/src/connector/src/schema/avro.rs
@@ -12,42 +12,12 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use std::collections::BTreeMap;
-use std::sync::Arc;
-
 use apache_avro::Schema as AvroSchema;
 
-use super::loader::{LoadedSchema, SchemaLoader};
+use super::loader::LoadedSchema;
 use super::schema_registry::Subject;
 use super::SchemaFetchError;
 
-pub struct SchemaWithId {
-    pub schema: Arc<AvroSchema>,
-    pub id: i32,
-}
-
-/// Schema registry only
-pub async fn fetch_schema(
-    format_options: &BTreeMap<String, String>,
-    topic: &str,
-) -> Result<(SchemaWithId, SchemaWithId), SchemaFetchError> {
-    let loader = SchemaLoader::from_format_options(topic, format_options)?;
-
-    let (key_id, key_avro) = loader.load_key_schema().await?;
-    let (val_id, val_avro) = loader.load_val_schema().await?;
-
-    Ok((
-        SchemaWithId {
-            id: key_id,
-            schema: Arc::new(key_avro),
-        },
-        SchemaWithId {
-            id: val_id,
-            schema: Arc::new(val_avro),
-        },
-    ))
-}
-
 impl LoadedSchema for AvroSchema {
     fn compile(primary: Subject, _: Vec<Subject>) -> Result<Self, SchemaFetchError> {
         AvroSchema::parse_str(&primary.schema.content)
diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs
index d923d337a3ffb..8d6f1eb8ab463 100644
--- a/src/connector/src/sink/formatter/mod.rs
+++ b/src/connector/src/sink/formatter/mod.rs
@@ -77,6 +77,163 @@ pub enum SinkFormatterImpl {
     UpsertTemplate(UpsertFormatter<TemplateEncoder, TemplateEncoder>),
 }
 
+#[derive(Debug, Clone)]
+pub struct EncoderParams<'a> {
+    format_desc: &'a SinkFormatDesc,
+    schema: Schema,
+    db_name: String,
+    sink_from_name: String,
+    topic: &'a str,
+}
+
+/// Each encoder shall be able to be built from parameters.
+///
+/// This is not part of `RowEncoder` trait, because that one is about how an encoder completes its
+/// own job as a self-contained unit, with a custom `new` asking for only necessary info; while this
+/// one is about how different encoders can be selected from a common SQL interface.
+pub trait EncoderBuild: Sized {
+    /// Pass `pk_indices: None` for value/payload and `Some` for key. Certain encoder builds
+    /// differently when used as key vs value.
+    async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self>;
+}
+
+impl EncoderBuild for JsonEncoder {
+    async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
+        let timestamptz_mode = TimestamptzHandlingMode::from_options(&b.format_desc.options)?;
+        let encoder = JsonEncoder::new(
+            b.schema,
+            pk_indices,
+            DateHandlingMode::FromCe,
+            TimestampHandlingMode::Milli,
+            timestamptz_mode,
+            TimeHandlingMode::Milli,
+        );
+        let encoder = if let Some(s) = b.format_desc.options.get("schemas.enable") {
+            match s.to_lowercase().parse::<bool>() {
+                Ok(true) => {
+                    let kafka_connect = KafkaConnectParams {
+                        schema_name: format!("{}.{}", b.db_name, b.sink_from_name),
+                    };
+                    encoder.with_kafka_connect(kafka_connect)
+                }
+                Ok(false) => encoder,
+                _ => {
+                    return Err(SinkError::Config(anyhow!(
+                        "schemas.enable is expected to be `true` or `false`, got {s}",
+                    )));
+                }
+            }
+        } else {
+            encoder
+        };
+        Ok(encoder)
+    }
+}
+
+impl EncoderBuild for ProtoEncoder {
+    async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
+        // TODO: better to be a compile-time assert
+        assert!(pk_indices.is_none());
+        // By passing `None` as `aws_auth_props`, reading from `s3://` not supported yet.
+        let (descriptor, sid) =
+            crate::schema::protobuf::fetch_descriptor(&b.format_desc.options, b.topic, None)
+                .await
+                .map_err(|e| SinkError::Config(anyhow!(e)))?;
+        let header = match sid {
+            None => ProtoHeader::None,
+            Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid),
+        };
+        ProtoEncoder::new(b.schema, None, descriptor, header)
+    }
+}
+
+impl EncoderBuild for AvroEncoder {
+    async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
+        let loader =
+            crate::schema::SchemaLoader::from_format_options(b.topic, &b.format_desc.options)
+                .map_err(|e| SinkError::Config(anyhow!(e)))?;
+
+        let (schema_id, avro) = match pk_indices {
+            Some(_) => loader
+                .load_key_schema()
+                .await
+                .map_err(|e| SinkError::Config(anyhow!(e)))?,
+            None => loader
+                .load_val_schema()
+                .await
+                .map_err(|e| SinkError::Config(anyhow!(e)))?,
+        };
+        AvroEncoder::new(
+            b.schema,
+            pk_indices,
+            std::sync::Arc::new(avro),
+            AvroHeader::ConfluentSchemaRegistry(schema_id),
+        )
+    }
+}
+
+impl EncoderBuild for TemplateEncoder {
+    async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
+        let option_name = match pk_indices {
+            Some(_) => KEY_FORMAT,
+            None => VALUE_FORMAT,
+        };
+        let template = b.format_desc.options.get(option_name).ok_or_else(|| {
+            SinkError::Config(anyhow!(
+                "Cannot find '{option_name}',please set it or use JSON"
+            ))
+        })?;
+        Ok(TemplateEncoder::new(b.schema, pk_indices, template.clone()))
+    }
+}
+
+struct FormatterParams<'a> {
+    builder: EncoderParams<'a>,
+    pk_indices: Vec<usize>,
+}
+
+/// Each formatter shall be able to be built from parameters.
+///
+/// This is not part of `SinkFormatter` trait, because that is about how a formatter completes its
+/// own job as a self-contained unit, with a custom `new` asking for only necessary info; while this
+/// one is about how different formatters can be selected from a common SQL interface.
+trait FormatterBuild: Sized {
+    async fn build(b: FormatterParams<'_>) -> Result<Self>;
+}
+
+impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for AppendOnlyFormatter<KE, VE> {
+    async fn build(b: FormatterParams<'_>) -> Result<Self> {
+        let key_encoder = match b.pk_indices.is_empty() {
+            true => None,
+            false => Some(KE::build(b.builder.clone(), Some(b.pk_indices)).await?),
+        };
+        let val_encoder = VE::build(b.builder, None).await?;
+        Ok(AppendOnlyFormatter::new(key_encoder, val_encoder))
+    }
+}
+
+impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for UpsertFormatter<KE, VE> {
+    async fn build(b: FormatterParams<'_>) -> Result<Self> {
+        let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices)).await?;
+        let val_encoder = VE::build(b.builder, None).await?;
+        Ok(UpsertFormatter::new(key_encoder, val_encoder))
+    }
+}
+
+impl FormatterBuild for DebeziumJsonFormatter {
+    async fn build(b: FormatterParams<'_>) -> Result<Self> {
+        assert_eq!(b.builder.format_desc.encode, SinkEncode::Json);
+
+        Ok(DebeziumJsonFormatter::new(
+            b.builder.schema,
+            b.pk_indices,
+            b.builder.db_name,
+            b.builder.sink_from_name,
+            DebeziumAdapterOpts::default(),
+        ))
+    }
+}
+
 impl SinkFormatterImpl {
     pub async fn new(
         format_desc: &SinkFormatDesc,
@@ -86,187 +243,50 @@ impl SinkFormatterImpl {
         sink_from_name: String,
         topic: &str,
     ) -> Result<Self> {
-        let err_unsupported = || {
-            Err(SinkError::Config(anyhow!(
-                "sink format/encode unsupported: {:?} {:?}",
-                format_desc.format,
-                format_desc.encode,
-            )))
+        use {SinkEncode as E, SinkFormat as F, SinkFormatterImpl as Impl};
+        let p = FormatterParams {
+            builder: EncoderParams {
+                format_desc,
+                schema,
+                db_name,
+                sink_from_name,
+                topic,
+            },
+            pk_indices,
         };
-        let timestamptz_mode = TimestamptzHandlingMode::from_options(&format_desc.options)?;
-
-        match format_desc.format {
-            SinkFormat::AppendOnly => {
-                let key_encoder = (!pk_indices.is_empty()).then(|| {
-                    JsonEncoder::new(
-                        schema.clone(),
-                        Some(pk_indices.clone()),
-                        DateHandlingMode::FromCe,
-                        TimestampHandlingMode::Milli,
-                        timestamptz_mode,
-                        TimeHandlingMode::Milli,
-                    )
-                });
-
-                match format_desc.encode {
-                    SinkEncode::Json => {
-                        let val_encoder = JsonEncoder::new(
-                            schema,
-                            None,
-                            DateHandlingMode::FromCe,
-                            TimestampHandlingMode::Milli,
-                            timestamptz_mode,
-                            TimeHandlingMode::Milli,
-                        );
-                        let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder);
-                        Ok(SinkFormatterImpl::AppendOnlyJson(formatter))
-                    }
-                    SinkEncode::Protobuf => {
-                        // By passing `None` as `aws_auth_props`, reading from `s3://` not supported yet.
-                        let (descriptor, sid) = crate::schema::protobuf::fetch_descriptor(
-                            &format_desc.options,
-                            topic,
-                            None,
-                        )
-                        .await
-                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
-                        let header = match sid {
-                            None => ProtoHeader::None,
-                            Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid),
-                        };
-                        let val_encoder = ProtoEncoder::new(schema, None, descriptor, header)?;
-                        let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder);
-                        Ok(SinkFormatterImpl::AppendOnlyProto(formatter))
-                    }
-                    SinkEncode::Avro => err_unsupported(),
-                    SinkEncode::Template => {
-                        let key_format = format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
-                            SinkError::Config(anyhow!(
-                                "Cannot find 'key_format',please set it or use JSON"
-                            ))
-                        })?;
-                        let value_format =
-                            format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
-                                SinkError::Config(anyhow!(
-                                    "Cannot find 'redis_value_format',please set it or use JSON"
-                                ))
-                            })?;
-                        let key_encoder = TemplateEncoder::new(
-                            schema.clone(),
-                            Some(pk_indices),
-                            key_format.clone(),
-                        );
-                        let val_encoder = TemplateEncoder::new(schema, None, value_format.clone());
-                        Ok(SinkFormatterImpl::AppendOnlyTemplate(
-                            AppendOnlyFormatter::new(Some(key_encoder), val_encoder),
-                        ))
-                    }
-                }
-            }
-            SinkFormat::Debezium => {
-                if format_desc.encode != SinkEncode::Json {
-                    return err_unsupported();
-                }
 
-                Ok(SinkFormatterImpl::DebeziumJson(DebeziumJsonFormatter::new(
-                    schema,
-                    pk_indices,
-                    db_name,
-                    sink_from_name,
-                    DebeziumAdapterOpts::default(),
-                )))
-            }
-            SinkFormat::Upsert => {
-                match format_desc.encode {
-                    SinkEncode::Json => {
-                        let mut key_encoder = JsonEncoder::new(
-                            schema.clone(),
-                            Some(pk_indices),
-                            DateHandlingMode::FromCe,
-                            TimestampHandlingMode::Milli,
-                            timestamptz_mode,
-                            TimeHandlingMode::Milli,
-                        );
-                        let mut val_encoder = JsonEncoder::new(
-                            schema,
-                            None,
-                            DateHandlingMode::FromCe,
-                            TimestampHandlingMode::Milli,
-                            timestamptz_mode,
-                            TimeHandlingMode::Milli,
-                        );
+        // When defining `SinkFormatterImpl` we already linked each variant (eg `AppendOnlyJson`) to
+        // an instantiation (eg `AppendOnlyFormatter<JsonEncoder, JsonEncoder>`) that implements the
+        // trait `FormatterBuild`.
+        //
+        // Here we just need to match a `(format, encode)` to a variant, and rustc shall be able to
+        // find the corresponding instantiation.
 
-                        if let Some(s) = format_desc.options.get("schemas.enable") {
-                            match s.to_lowercase().parse::<bool>() {
-                                Ok(true) => {
-                                    let kafka_connect = KafkaConnectParams {
-                                        schema_name: format!("{}.{}", db_name, sink_from_name),
-                                    };
-                                    key_encoder =
-                                        key_encoder.with_kafka_connect(kafka_connect.clone());
-                                    val_encoder = val_encoder.with_kafka_connect(kafka_connect);
-                                }
-                                Ok(false) => {}
-                                _ => {
-                                    return Err(SinkError::Config(anyhow!(
-                                        "schemas.enable is expected to be `true` or `false`, got {}",
-                                        s
-                                    )));
-                                }
-                            }
-                        };
+        // However,
+        //   `Impl::AppendOnlyJson(FormatterBuild::build(p).await?)`
+        // fails to be inferred without the following dummy wrapper.
+        async fn build<T: FormatterBuild>(p: FormatterParams<'_>) -> Result<T> {
+            T::build(p).await
+        }
 
-                        // Initialize the upsert_stream
-                        let formatter = UpsertFormatter::new(key_encoder, val_encoder);
-                        Ok(SinkFormatterImpl::UpsertJson(formatter))
-                    }
-                    SinkEncode::Template => {
-                        let key_format = format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
-                            SinkError::Config(anyhow!(
-                                "Cannot find 'key_format',please set it or use JSON"
-                            ))
-                        })?;
-                        let value_format =
-                            format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
-                                SinkError::Config(anyhow!(
-                                    "Cannot find 'redis_value_format',please set it or use JSON"
-                                ))
-                            })?;
-                        let key_encoder = TemplateEncoder::new(
-                            schema.clone(),
-                            Some(pk_indices),
-                            key_format.clone(),
-                        );
-                        let val_encoder = TemplateEncoder::new(schema, None, value_format.clone());
-                        Ok(SinkFormatterImpl::UpsertTemplate(UpsertFormatter::new(
-                            key_encoder,
-                            val_encoder,
-                        )))
-                    }
-                    SinkEncode::Avro => {
-                        let (key_schema, val_schema) =
-                            crate::schema::avro::fetch_schema(&format_desc.options, topic)
-                                .await
-                                .map_err(|e| SinkError::Config(anyhow!(e)))?;
-                        let key_encoder = AvroEncoder::new(
-                            schema.clone(),
-                            Some(pk_indices),
-                            key_schema.schema,
-                            AvroHeader::ConfluentSchemaRegistry(key_schema.id),
-                        )?;
-                        let val_encoder = AvroEncoder::new(
-                            schema.clone(),
-                            None,
-                            val_schema.schema,
-                            AvroHeader::ConfluentSchemaRegistry(val_schema.id),
-                        )?;
-                        let formatter = UpsertFormatter::new(key_encoder, val_encoder);
-                        Ok(SinkFormatterImpl::UpsertAvro(formatter))
-                    }
-                    SinkEncode::Protobuf => err_unsupported(),
-                }
+        Ok(match (&format_desc.format, &format_desc.encode) {
+            (F::AppendOnly, E::Json) => Impl::AppendOnlyJson(build(p).await?),
+            (F::AppendOnly, E::Protobuf) => Impl::AppendOnlyProto(build(p).await?),
+            (F::AppendOnly, E::Template) => Impl::AppendOnlyTemplate(build(p).await?),
+            (F::Upsert, E::Json) => Impl::UpsertJson(build(p).await?),
+            (F::Upsert, E::Avro) => Impl::UpsertAvro(build(p).await?),
+            (F::Upsert, E::Template) => Impl::UpsertTemplate(build(p).await?),
+            (F::Debezium, E::Json) => Impl::DebeziumJson(build(p).await?),
+            (F::AppendOnly, E::Avro)
+            | (F::Upsert, E::Protobuf)
+            | (F::Debezium, E::Avro | E::Protobuf | E::Template) => {
+                return Err(SinkError::Config(anyhow!(
+                    "sink format/encode unsupported: {:?} {:?}",
+                    format_desc.format,
+                    format_desc.encode,
+                )));
             }
-        }
+        })
     }
 }