Skip to content

Commit

Permalink
fix validate
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed May 23, 2024
1 parent 41c3a49 commit d98c1f7
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@
import com.risingwave.proto.Data.DataType.TypeName;
import com.risingwave.proto.PlanCommon;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableSchema {

static final Logger LOG = LoggerFactory.getLogger(TableSchema.class);

private final List<String> columnNames;
private final Map<String, TypeName> columns;
private final Map<String, Integer> columnIndices;
Expand Down Expand Up @@ -81,18 +87,19 @@ public Object getFromRow(String columnName, SinkRow row) {

public static TableSchema fromProto(ConnectorServiceProto.TableSchema tableSchema) {
// filter out additional columns
var columns =
tableSchema.getColumnsList().stream()
.filter(col -> !col.hasAdditionalColumn())
.collect(Collectors.toList());
return new TableSchema(
columns.stream().map(PlanCommon.ColumnDesc::getName).collect(Collectors.toList()),
columns.stream()
.map(PlanCommon.ColumnDesc::getColumnType)
.collect(Collectors.toList()),
tableSchema.getPkIndicesList().stream()
.map(i -> tableSchema.getColumns(i).getName())
.collect(Collectors.toList()));
var instance =
new TableSchema(
tableSchema.getColumnsList().stream()
.map(PlanCommon.ColumnDesc::getName)
.collect(Collectors.toList()),
tableSchema.getColumnsList().stream()
.map(PlanCommon.ColumnDesc::getColumnType)
.collect(Collectors.toList()),
tableSchema.getPkIndicesList().stream()
.map(i -> tableSchema.getColumns(i).getName())
.collect(Collectors.toList()));
LOG.info("table column names: {}", Arrays.toString(instance.getColumnNames()));
return instance;
}

public List<String> getPrimaryKeys() {
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/source/cdc/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use risingwave_pb::connector_service::{SourceType, ValidateSourceRequest, Valida

use crate::error::ConnectorResult;
use crate::source::cdc::{
CdcProperties, CdcSourceTypeTrait, Citus, DebeziumCdcSplit, Mongodb, Mysql, Postgres,
table_schema_exclude_additional_columns, CdcProperties, CdcSourceTypeTrait, Citus,
DebeziumCdcSplit, Mongodb, Mysql, Postgres,
};
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};

Expand Down Expand Up @@ -77,7 +78,7 @@ where
source_id: source_id as u64,
source_type: props.get_source_type_pb() as _,
properties: props.properties,
table_schema: Some(props.table_schema),
table_schema: Some(table_schema_exclude_additional_columns(&props.table_schema)),
is_source_job: props.is_cdc_source_job,
is_backfill_table: props.is_backfill_table,
};
Expand Down
16 changes: 16 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,22 @@ pub struct CdcProperties<T: CdcSourceTypeTrait> {
pub _phantom: PhantomData<T>,
}

pub fn table_schema_exclude_additional_columns(table_schema: &TableSchema) -> TableSchema {
TableSchema {
columns: table_schema
.columns
.iter()
.filter(|col| {
col.additional_column
.as_ref()
.is_some_and(|val| val.column_type.is_none())
})
.cloned()
.collect(),
pk_indices: table_schema.pk_indices.clone(),
}
}

impl<T: CdcSourceTypeTrait> TryFromHashmap for CdcProperties<T> {
fn try_from_hashmap(
properties: HashMap<String, String>,
Expand Down

0 comments on commit d98c1f7

Please sign in to comment.