Skip to content

Commit

Permalink
reuse trad_souce
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Mar 6, 2024
1 parent 5f403fd commit de217fd
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 152 deletions.
2 changes: 1 addition & 1 deletion src/stream/src/from_proto/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

mod trad_source;
pub use trad_source::SourceExecutorBuilder;
pub use trad_source::{create_source_desc_builder, SourceExecutorBuilder};
mod fs_fetch;
pub use fs_fetch::FsFetchExecutorBuilder;

Expand Down
192 changes: 101 additions & 91 deletions src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use risingwave_common::catalog::{
default_key_column_name_version_mapping, TableId, KAFKA_TIMESTAMP_COLUMN_NAME,
};
use risingwave_connector::source::reader::desc::SourceDescBuilder;
use risingwave_connector::source::{
should_copy_to_format_encode_options, ConnectorProperties, SourceCtrlOpts, UPSTREAM_SOURCE_KEY,
};
use risingwave_pb::catalog::PbStreamSourceInfo;
use risingwave_pb::data::data_type::TypeName as PbTypeName;
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
use risingwave_pb::plan_common::{
AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp,
AdditionalColumnType as LegacyAdditionalColumnType, ColumnDescVersion, FormatType,
PbEncodeType,
PbColumnCatalog, PbEncodeType,
};
use risingwave_pb::stream_plan::SourceNode;
use risingwave_storage::panic_store::PanicStateStore;
Expand All @@ -39,6 +42,98 @@ use crate::executor::FlowControlExecutor;
const FS_CONNECTORS: &[&str] = &["s3"];
pub struct SourceExecutorBuilder;

pub fn create_source_desc_builder(
mut source_columns: Vec<PbColumnCatalog>,
params: &ExecutorParams,
source_info: PbStreamSourceInfo,
row_id_index: Option<u32>,
with_properties: HashMap<String, String>,
) -> SourceDescBuilder {
{
// compatible code: introduced in https://github.com/risingwavelabs/risingwave/pull/13707
// for upsert and (avro | protobuf) overwrite the `_rw_key` column's ColumnDesc.additional_column_type to Key
if source_info.format() == FormatType::Upsert
&& (source_info.row_encode() == PbEncodeType::Avro
|| source_info.row_encode() == PbEncodeType::Protobuf
|| source_info.row_encode() == PbEncodeType::Json)
{
for c in &mut source_columns {
if let Some(desc) = c.column_desc.as_mut() {
let is_bytea = desc
.get_column_type()
.map(|col_type| col_type.type_name == PbTypeName::Bytea as i32)
.unwrap();
if desc.name == default_key_column_name_version_mapping(
&desc.version()
)
&& is_bytea
// the column is from a legacy version (before v1.5.x)
&& desc.version == ColumnDescVersion::Unspecified as i32
{
desc.additional_column = Some(AdditionalColumn {
column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
});
}

// the column is from a legacy version (v1.6.x)
// introduced in https://github.com/risingwavelabs/risingwave/pull/15226
if desc.additional_column_type == LegacyAdditionalColumnType::Key as i32 {
desc.additional_column = Some(AdditionalColumn {
column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
});
}
}
}
}
}

{
// compatible code: handle legacy column `_rw_kafka_timestamp`
// the column is auto added for all kafka source to empower batch query on source
// solution: rewrite the column `additional_column` to Timestamp

let _ = source_columns.iter_mut().map(|c| {
let _ = c.column_desc.as_mut().map(|desc| {
let is_timestamp = desc
.get_column_type()
.map(|col_type| col_type.type_name == PbTypeName::Timestamptz as i32)
.unwrap();
if desc.name == KAFKA_TIMESTAMP_COLUMN_NAME
&& is_timestamp
// the column is from a legacy version
&& desc.version == ColumnDescVersion::Unspecified as i32
{
desc.additional_column = Some(AdditionalColumn {
column_type: Some(AdditionalColumnType::Timestamp(
AdditionalColumnTimestamp {},
)),
});
}
});
});
}

SourceDescBuilder::new(
source_columns.clone(),
params.env.source_metrics(),
row_id_index.map(|x| x as _),
with_properties,
source_info,
params.env.connector_params(),
params.env.config().developer.connector_message_buffer_size,
// `pk_indices` is used to ensure that a message will be skipped instead of parsed
// with null pk when the pk column is missing.
//
// Currently pk_indices for source is always empty since pk information is not
// passed via `StreamSource` so null pk may be emitted to downstream.
//
// TODO: use the correct information to fill in pk_dicies.
// We should consdier add back the "pk_column_ids" field removed by #8841 in
// StreamSource
params.info.pk_indices.clone(),
)
}

impl ExecutorBuilder for SourceExecutorBuilder {
type Node = SourceNode;

Expand Down Expand Up @@ -75,97 +170,12 @@ impl ExecutorBuilder for SourceExecutorBuilder {
);
}

let mut source_columns = source.columns.clone();
{
// compatible code: introduced in https://github.com/risingwavelabs/risingwave/pull/13707
// for upsert and (avro | protobuf) overwrite the `_rw_key` column's ColumnDesc.additional_column_type to Key
if source_info.format() == FormatType::Upsert
&& (source_info.row_encode() == PbEncodeType::Avro
|| source_info.row_encode() == PbEncodeType::Protobuf
|| source_info.row_encode() == PbEncodeType::Json)
{
for c in &mut source_columns {
if let Some(desc) = c.column_desc.as_mut() {
let is_bytea = desc
.get_column_type()
.map(|col_type| col_type.type_name == PbTypeName::Bytea as i32)
.unwrap();
if desc.name == default_key_column_name_version_mapping(
&desc.version()
)
&& is_bytea
// the column is from a legacy version (before v1.5.x)
&& desc.version == ColumnDescVersion::Unspecified as i32
{
desc.additional_column = Some(AdditionalColumn {
column_type: Some(AdditionalColumnType::Key(
AdditionalColumnKey {},
)),
});
}

// the column is from a legacy version (v1.6.x)
// introduced in https://github.com/risingwavelabs/risingwave/pull/15226
if desc.additional_column_type
== LegacyAdditionalColumnType::Key as i32
{
desc.additional_column = Some(AdditionalColumn {
column_type: Some(AdditionalColumnType::Key(
AdditionalColumnKey {},
)),
});
}
}
}
}
}

{
// compatible code: handle legacy column `_rw_kafka_timestamp`
// the column is auto added for all kafka source to empower batch query on source
// solution: rewrite the column `additional_column` to Timestamp

let _ = source_columns.iter_mut().map(|c| {
let _ = c.column_desc.as_mut().map(|desc| {
let is_timestamp = desc
.get_column_type()
.map(|col_type| {
col_type.type_name == PbTypeName::Timestamptz as i32
})
.unwrap();
if desc.name == KAFKA_TIMESTAMP_COLUMN_NAME
&& is_timestamp
// the column is from a legacy version
&& desc.version == ColumnDescVersion::Unspecified as i32
{
desc.additional_column = Some(AdditionalColumn {
column_type: Some(AdditionalColumnType::Timestamp(
AdditionalColumnTimestamp {},
)),
});
}
});
});
}

let source_desc_builder = SourceDescBuilder::new(
source_columns.clone(),
params.env.source_metrics(),
source.row_id_index.map(|x| x as _),
source.with_properties.clone(),
let source_desc_builder = create_source_desc_builder(
source.columns.clone(),
&params,
source_info,
params.env.connector_params(),
params.env.config().developer.connector_message_buffer_size,
// `pk_indices` is used to ensure that a message will be skipped instead of parsed
// with null pk when the pk column is missing.
//
// Currently pk_indices for source is always empty since pk information is not
// passed via `StreamSource` so null pk may be emitted to downstream.
//
// TODO: use the correct information to fill in pk_dicies.
// We should consdier add back the "pk_column_ids" field removed by #8841 in
// StreamSource
params.info.pk_indices.clone(),
source.row_id_index,
source.with_properties.clone(),
);

let source_ctrl_opts = SourceCtrlOpts {
Expand Down
71 changes: 11 additions & 60 deletions src/stream/src/from_proto/source_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,77 +38,27 @@ impl ExecutorBuilder for KafkaBackfillExecutorBuilder {
node: &Self::Node,
store: impl StateStore,
) -> StreamResult<Executor> {
let [input]: [_; 1] = params.input.try_into().unwrap();

// let (sender, barrier_receiver) = unbounded_channel();
// stream
// .context
// .barrier_manager()
// .register_sender(params.actor_context.id, sender);
let system_params = params.env.system_params_manager_ref().get_params();

let source_id = TableId::new(node.source_id);
let source_name = node.source_name.clone();
let source_info = node.get_info()?;

let mut source_columns = node.columns.clone();

{
// compatible code: introduced in https://github.com/risingwavelabs/risingwave/pull/13707
// for upsert and (avro | protobuf) overwrite the `_rw_key` column's ColumnDesc.additional_column_type to Key
if source_info.format() == FormatType::Upsert
&& (source_info.row_encode() == PbEncodeType::Avro
|| source_info.row_encode() == PbEncodeType::Protobuf)
{
let _ = source_columns.iter_mut().map(|c| {
let _ = c.column_desc.as_mut().map(|desc| {
let is_bytea = desc
.get_column_type()
.map(|col_type| col_type.type_name == PbTypeName::Bytea as i32)
.unwrap();
if desc.name == default_key_column_name_version_mapping(
&desc.version()
)
&& is_bytea
// the column is from a legacy version
&& desc.version == ColumnDescVersion::Unspecified as i32
{
desc.additional_column = Some(AdditionalColumn {
column_type: Some(ColumnType::Key(AdditionalColumnKey {})),
});
}
});
});
}
}
let source_desc_builder = SourceDescBuilder::new(
source_columns.clone(),
params.env.source_metrics(),
node.row_id_index.map(|x| x as _),
node.with_properties.clone(),
let source_desc_builder = super::source::create_source_desc_builder(
node.columns.clone(),
&params,
source_info.clone(),
params.env.connector_params(),
params.env.config().developer.connector_message_buffer_size,
// `pk_indices` is used to ensure that a message will be skipped instead of parsed
// with null pk when the pk column is missing.
//
// Currently pk_indices for source is always empty since pk information is not
// passed via `StreamSource` so null pk may be emitted to downstream.
//
// TODO: use the correct information to fill in pk_dicies.
// We should consdier add back the "pk_column_ids" field removed by #8841 in
// StreamSource
params.info.pk_indices.clone(),
node.row_id_index,
node.with_properties.clone(),
);

let source_ctrl_opts = SourceCtrlOpts {
chunk_size: params.env.config().developer.chunk_size,
rate_limit: None,
};

let source_column_ids: Vec<_> = source_columns
let source_column_ids: Vec<_> = source_desc_builder
.column_catalogs_to_source_column_descs()
.iter()
.map(|column| ColumnId::from(column.get_column_desc().unwrap().column_id))
.map(|column| column.column_id)
.collect();

// FIXME: remove this. It's wrong
Expand All @@ -135,12 +85,13 @@ impl ExecutorBuilder for KafkaBackfillExecutorBuilder {
params.info.clone(),
stream_source_core,
params.executor_stats.clone(),
// barrier_receiver,
system_params,
params.env.system_params_manager_ref().get_params(),
source_ctrl_opts.clone(),
params.env.connector_params(),
backfill_state_table,
);
let [input]: [_; 1] = params.input.try_into().unwrap();

Ok((
params.info,
KafkaBackfillExecutor { inner: exec, input }.boxed(),
Expand Down

0 comments on commit de217fd

Please sign in to comment.