diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 24b77af41e0f6..3f0793595c7c5 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -162,13 +162,12 @@ impl ExecutorBuilder for SourceExecutorBuilder { let table_type = ExternalTableType::from_properties(&source.properties); if table_type.can_backfill() && let Some(table_desc) = source_info.upstream_table.clone() { let upstream_table_name = SchemaTableName::from_properties(&source.properties); - let pk_indices = table_desc + let table_pk_indices = table_desc .pk .iter() .map(|k| k.column_index as usize) .collect_vec(); - - let order_types = table_desc + let table_pk_order_types = table_desc .pk .iter() .map(|desc| OrderType::from_protobuf(desc.get_order_type().unwrap())) @@ -180,8 +179,8 @@ impl ExecutorBuilder for SourceExecutorBuilder { upstream_table_name, table_reader, schema.clone(), - order_types, - pk_indices.clone(), + table_pk_order_types, + table_pk_indices, (0..table_desc.columns.len()).collect_vec(), ); @@ -197,7 +196,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { (0..source.columns.len()).collect_vec(), // eliminate the last column (_rw_offset) None, schema.clone(), - pk_indices, + params.pk_indices, params.executor_stats, source_state_handler, source_ctrl_opts.chunk_size diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index a80de8a1c267e..f54eb9921f77c 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -586,7 +586,8 @@ impl LocalStreamManagerCore { assert_eq!( executor.pk_indices(), &pk_indices, - "`pk_indices` of {identity} not consistent with what derived by optimizer" + "`pk_indices` of {} not consistent with what derived by optimizer", + executor.identity() ); // Wrap the executor for debug purpose.