Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion committed Sep 23, 2024
1 parent 8b0c26b commit 7dfdb50
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 5 deletions.
5 changes: 3 additions & 2 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use risingwave_connector::source::reader::desc::test_utils::create_source_desc_b
use risingwave_dml::dml_manager::DmlManager;
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::PbRowFormatType;
use risingwave_pb::plan_common::{EncodeType, FormatType, PbRowFormatType};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::panic_store::PanicStateStore;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
Expand Down Expand Up @@ -112,7 +112,8 @@ async fn test_table_materialize() -> StreamResult<()> {
],
};
let source_info = StreamSourceInfo {
row_format: PbRowFormatType::Json as i32,
format: FormatType::Plain as i32,
row_encode: EncodeType::Json as i32,
..Default::default()
};
let properties = convert_args!(btreemap!(
Expand Down
7 changes: 5 additions & 2 deletions src/connector/src/source/reader/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use risingwave_common::bail;
use risingwave_common::catalog::ColumnCatalog;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::catalog::PbStreamSourceInfo;
use risingwave_pb::plan_common::PbColumnCatalog;
use risingwave_pb::plan_common::{FormatType, PbColumnCatalog};

#[expect(deprecated)]
use super::fs_reader::FsSourceReader;
Expand Down Expand Up @@ -93,7 +93,10 @@ impl SourceDescBuilder {
let (columns_exist, additional_columns) = source_add_partition_offset_cols(
&self.columns,
&connector_name,
self.source_info.get_format().as_ref().unwrap(),
self.source_info
.get_format()
.as_ref()
.unwrap_or(&FormatType::Unspecified),
);

let mut columns: Vec<_> = self
Expand Down
7 changes: 6 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_source_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::OrderType;
use risingwave_connector::parser::additional_columns::source_add_partition_offset_cols;
use risingwave_pb::plan_common::FormatType;
use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
use risingwave_pb::stream_plan::PbStreamNode;

Expand Down Expand Up @@ -61,7 +62,11 @@ impl StreamSourceScan {
let (columns_exist, additional_columns) = source_add_partition_offset_cols(
&core.column_catalog,
&source_catalog.connector_name(),
source_catalog.info.get_format().as_ref().unwrap(),
source_catalog
.info
.get_format()
.as_ref()
.unwrap_or(&FormatType::Unspecified),
);
for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
if !existed {
Expand Down

0 comments on commit 7dfdb50

Please sign in to comment.