Skip to content

Commit

Permalink
refactor(frontend): split create_source into smaller mods (#19803)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Dec 16, 2024
1 parent 1e24ca4 commit 45fcf8f
Show file tree
Hide file tree
Showing 15 changed files with 1,229 additions and 1,059 deletions.
25 changes: 24 additions & 1 deletion src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::collections::BTreeMap;

use risingwave_common::bail;
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
use risingwave_common::types::DataType;

use super::simd_json_parser::DebeziumJsonAccessBuilder;
use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig};
Expand All @@ -28,6 +30,20 @@ use crate::parser::{
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

/// Note: these columns are added in `SourceStreamChunkRowWriter::do_action`.
/// May also look for the usage of `SourceColumnType`.
pub fn debezium_cdc_source_schema() -> Vec<ColumnCatalog> {
let columns = vec![
ColumnCatalog {
column_desc: ColumnDesc::named("payload", ColumnId::placeholder(), DataType::Jsonb),
is_hidden: false,
},
ColumnCatalog::offset_column(),
ColumnCatalog::cdc_table_name_column(),
];
columns
}

#[derive(Debug)]
pub struct DebeziumParser {
key_builder: AccessBuilderImpl,
Expand Down Expand Up @@ -192,7 +208,7 @@ mod tests {
use std::ops::Deref;
use std::sync::Arc;

use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId, CDC_SOURCE_COLUMN_NUM};
use risingwave_common::row::Row;
use risingwave_common::types::Timestamptz;
use risingwave_pb::plan_common::{
Expand Down Expand Up @@ -327,4 +343,11 @@ mod tests {
_ => panic!("unexpected parse result: {:?}", res),
}
}

#[tokio::test]
async fn test_cdc_source_job_schema() {
let columns = debezium_cdc_source_schema();
// make sure it doesn't broken by future PRs
assert_eq!(CDC_SOURCE_COLUMN_NUM, columns.len() as u32);
}
}
5 changes: 3 additions & 2 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ use risingwave_sqlparser::ast::{
};
use risingwave_sqlparser::parser::Parser;

use super::alter_table_column::schema_has_schema_registry;
use super::create_source::{generate_stream_graph_for_source, validate_compatibility};
use super::create_source::{
generate_stream_graph_for_source, schema_has_schema_registry, validate_compatibility,
};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
Expand Down
19 changes: 4 additions & 15 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Encode,
FormatEncodeOptions, Ident, ObjectName, Statement, StructField, TableConstraint,
AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Ident, ObjectName,
Statement, StructField, TableConstraint,
};
use risingwave_sqlparser::parser::Parser;

use super::create_source::get_json_schema_location;
use super::create_source::schema_has_schema_registry;
use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
Expand All @@ -45,7 +45,7 @@ use crate::expr::{Expr, ExprImpl, InputRef, Literal};
use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
use crate::handler::create_table::bind_table_constraints;
use crate::session::SessionImpl;
use crate::{Binder, TableCatalog, WithOptions};
use crate::{Binder, TableCatalog};

/// Used in auto schema change process
pub async fn get_new_table_definition_for_cdc_table(
Expand Down Expand Up @@ -475,17 +475,6 @@ pub async fn handle_alter_table_column(
Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
}

pub fn schema_has_schema_registry(schema: &FormatEncodeOptions) -> bool {
match schema.row_encode {
Encode::Avro | Encode::Protobuf => true,
Encode::Json => {
let mut options = WithOptions::try_from(schema.row_options()).unwrap();
matches!(get_json_schema_location(options.inner_mut()), Ok(Some(_)))
}
_ => false,
}
}

pub fn fetch_table_catalog_for_alter(
session: &SessionImpl,
table_name: &ObjectName,
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/alter_table_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use risingwave_sqlparser::parser::Parser;
use thiserror_ext::AsReport;

use super::alter_source_with_sr::alter_definition_format_encode;
use super::alter_table_column::{fetch_table_catalog_for_alter, schema_has_schema_registry};
use super::alter_table_column::fetch_table_catalog_for_alter;
use super::create_source::schema_has_schema_registry;
use super::util::SourceSchemaCompatExt;
use super::{get_replace_table_plan, HandlerArgs, RwPgResponse};
use crate::error::{ErrorCode, Result};
Expand Down
Loading

0 comments on commit 45fcf8f

Please sign in to comment.