Skip to content

Commit

Permalink
clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Sep 30, 2023
1 parent af9ccb8 commit 5ca8af7
Show file tree
Hide file tree
Showing 14 changed files with 39 additions and 59 deletions.
2 changes: 0 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,4 @@ message StreamFragmentGraph {
StreamEnvironment env = 5;
// If none, default parallelism will be applied.
Parallelism parallelism = 6;

repeated uint32 dependent_source_ids = 7;
}
2 changes: 1 addition & 1 deletion src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc};

use super::row_id_column_desc;
use crate::catalog::{
offset_column_desc, table_id_column_desc, table_name_column_desc, table_name_column_name,
offset_column_desc, table_id_column_desc, table_name_column_desc,
Field, ROW_ID_COLUMN_ID,
};
use crate::error::ErrorCode;
Expand Down
1 change: 1 addition & 0 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
vec![0],
Arc::new(StreamingMetrics::unused()),
source_state_handler,
false,
4, // 4 rows in a snapshot chunk
);

Expand Down
12 changes: 6 additions & 6 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use std::rc::Rc;
use std::sync::LazyLock;

use anyhow::anyhow;
use fixedbitset::FixedBitSet;

use itertools::Itertools;
use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{
is_column_ids_dedup, offset_column_desc, ColumnCatalog, ColumnDesc, TableId,
is_column_ids_dedup, ColumnCatalog, ColumnDesc, TableId,
DEFAULT_KEY_COLUMN_NAME, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
};
use risingwave_common::error::ErrorCode::{self, InvalidInputSyntax, ProtocolError};
Expand All @@ -37,7 +37,7 @@ use risingwave_connector::source::cdc::{
MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
};
use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
use risingwave_connector::source::external::TABLE_NAME_KEY;

use risingwave_connector::source::filesystem::S3_CONNECTOR;
use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType};
use risingwave_connector::source::{
Expand Down Expand Up @@ -65,9 +65,9 @@ use crate::handler::create_table::{
};
use crate::handler::util::{get_connector, is_cdc_connector, is_kafka_connector};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::{LogicalScan, LogicalSource, ToStream, ToStreamContext};
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::optimizer::PlanRoot;
use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};


use crate::session::SessionImpl;
use crate::utils::resolve_connection_in_with_option;
use crate::{bind_data_type, build_graph, OptimizerContext, WithOptions};
Expand Down
14 changes: 6 additions & 8 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::iter::once;
use std::rc::Rc;

use anyhow::anyhow;
Expand All @@ -36,11 +35,9 @@ use risingwave_connector::source::external::{
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, WatermarkDesc};
use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::expr::expr_node::Type;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::{DefaultColumnDesc, GeneratedColumnDesc};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::user::grant_privilege::Object;
use risingwave_sqlparser::ast::{
ColumnDef, ColumnOption, DataType as AstDataType, Format, ObjectName, SourceSchemaV2,
SourceWatermark, TableConstraint,
Expand All @@ -51,13 +48,13 @@ use crate::binder::{bind_data_type, bind_struct_field, Clause};
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::table_catalog::TableVersion;
use crate::catalog::{check_valid_column_name, CatalogError, ColumnId};
use crate::expr::{Expr, ExprImpl, FunctionCall};
use crate::expr::{Expr, ExprImpl};
use crate::handler::create_source::{
bind_source_watermark, check_source_schema, try_bind_columns_from_source,
validate_compatibility, UPSTREAM_SOURCE_KEY,
};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::{LogicalFilter, LogicalScan, LogicalSource};
use crate::optimizer::plan_node::{LogicalScan, LogicalSource};
use crate::optimizer::property::{Cardinality, Order, RequiredDist};
use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
use crate::session::{CheckRelationError, SessionImpl};
Expand Down Expand Up @@ -881,7 +878,8 @@ fn derive_connect_properties(
.strip_prefix(prefix.as_str())
.ok_or_else(|| anyhow!("external table name must contain database prefix"))?
}
_POSTGRES_CDC_CONNECTOR => {
#[allow(unused_variables, non_snake_case)]
POSTGRES_CDC_CONNECTOR => {
let schema_name = connect_properties
.get(SCHEMA_NAME_KEY)
.ok_or_else(|| anyhow!("{} not found in source properties", SCHEMA_NAME_KEY))?;
Expand Down Expand Up @@ -934,8 +932,8 @@ pub async fn handle_create_table(
let (graph, source, table, job_type) = {
let context = OptimizerContext::from_handler_args(handler_args);
let source_schema = check_create_table_with_source(context.with_options(), source_schema)?;
let mut col_id_gen = ColumnIdGenerator::new_initial();
let mut properties = context.with_options().inner().clone().into_iter().collect();
let col_id_gen = ColumnIdGenerator::new_initial();
let properties = context.with_options().inner().clone().into_iter().collect();

let ((plan, source, table), job_type) = match (source_schema, cdc_source.as_ref()) {
(Some(source_schema), None) => (
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ pub(crate) mod tests {
value_indices: vec![0, 1, 2],
read_prefix_len_hint: 0,
watermark_columns: FixedBitSet::with_capacity(3),
connect_properties: Default::default(),
versioned: false,
}),
vec![],
Expand Down
2 changes: 0 additions & 2 deletions src/frontend/src/stream_fragmenter/graph/fragment_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ impl StreamFragmentGraph {
dependent_table_ids: vec![],
table_ids_cnt: 0,
parallelism: None,
// To be filled later
dependent_source_ids: vec![],
}
}

Expand Down
15 changes: 2 additions & 13 deletions src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ use risingwave_common::catalog::TableId;
use risingwave_common::error::Result;
use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::{
ChainType, DispatchStrategy, DispatcherType, ExchangeNode, FragmentTypeFlag, NoOpNode,
DispatchStrategy, DispatcherType, ExchangeNode, FragmentTypeFlag, NoOpNode,
StreamFragmentGraph as StreamFragmentGraphProto, StreamNode,
};

use self::rewrite::build_delta_join_without_arrange;
use crate::catalog::SourceId;
use crate::optimizer::plan_node::reorganize_elements_id;
use crate::optimizer::PlanRef;

Expand All @@ -54,8 +53,6 @@ pub struct BuildFragmentGraphState {
/// dependent streaming job ids.
dependent_table_ids: HashSet<TableId>,

dependent_source_ids: HashSet<SourceId>,

/// operator id to `LocalFragmentId` mapping used by share operator.
share_mapping: HashMap<u32, LocalFragmentId>,
/// operator id to `StreamNode` mapping used by share operator.
Expand Down Expand Up @@ -127,7 +124,6 @@ pub fn build_graph(plan_node: PlanRef) -> StreamFragmentGraphProto {
.into_iter()
.map(|id| id.table_id)
.collect();
fragment_graph.dependent_source_ids = state.dependent_source_ids.into_iter().collect();
fragment_graph.table_ids_cnt = state.next_table_id;
fragment_graph
}
Expand Down Expand Up @@ -279,15 +275,8 @@ fn build_fragment(

NodeBody::Chain(node) => {
current_fragment.fragment_type_mask |= FragmentTypeFlag::ChainNode as u32;
// if node.chain_type == ChainType::CdcBackfill as i32 {
// state.dependent_source_ids.insert(node.table_id);
// } else {
// // memorize table id for later use
// state
// .dependent_table_ids
// .insert(TableId::new(node.table_id));
// }
// memorize table id for later use
// The table id could be a upstream CDC source
state
.dependent_table_ids
.insert(TableId::new(node.table_id));
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::barrier::BarrierManagerRef;
use crate::manager::{
CatalogManagerRef, ClusterManagerRef, ConnectionId, DatabaseId, FragmentManagerRef, FunctionId,
IdCategory, IndexId, LocalNotification, MetaSrvEnv, NotificationVersion, RelationIdEnum,
SchemaId, SinkId, SourceId, StreamingClusterInfo, StreamingJob, TableId, TableJobType, ViewId,
SchemaId, SinkId, SourceId, StreamingClusterInfo, StreamingJob, TableId, ViewId,
};
use crate::model::{StreamEnvironment, TableFragments};
use crate::rpc::cloud_provider::AwsEc2Client;
Expand Down
7 changes: 5 additions & 2 deletions src/meta/src/stream/test_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use risingwave_pb::stream_plan::{
StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamSource,
};

use crate::manager::{MetaSrvEnv, StreamingClusterInfo, StreamingJob};
use crate::manager::{MetaSrvEnv, StreamingClusterInfo, StreamingJob, TableJobType};
use crate::model::TableFragments;
use crate::stream::{
ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, StreamFragmentGraph,
Expand Down Expand Up @@ -227,6 +227,7 @@ fn make_stream_fragments() -> Vec<StreamFragment> {
r#type: DispatcherType::Hash as i32,
dist_key_indices: vec![0],
output_indices: vec![0, 1, 2],
..Default::default()
}),
})),
fields: vec![
Expand Down Expand Up @@ -389,6 +390,7 @@ fn make_fragment_edges() -> Vec<StreamFragmentEdge> {
r#type: DispatcherType::Simple as i32,
dist_key_indices: vec![],
output_indices: vec![],
..Default::default()
}),
link_id: 4,
upstream_id: 1,
Expand All @@ -399,6 +401,7 @@ fn make_fragment_edges() -> Vec<StreamFragmentEdge> {
r#type: DispatcherType::Hash as i32,
dist_key_indices: vec![0],
output_indices: vec![],
..Default::default()
}),
link_id: 1,
upstream_id: 2,
Expand Down Expand Up @@ -452,7 +455,7 @@ fn make_cluster_info() -> StreamingClusterInfo {
async fn test_graph_builder() -> MetaResult<()> {
let env = MetaSrvEnv::for_test().await;
let parallel_degree = 4;
let job = StreamingJob::Table(None, make_materialize_table(888));
let job = StreamingJob::Table(None, make_materialize_table(888), TableJobType::Normal);

let graph = make_stream_graph();
let fragment_graph = StreamFragmentGraph::new(graph, env.id_gen_manager_ref(), &job).await?;
Expand Down
30 changes: 11 additions & 19 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::default::Default;
use std::pin::{pin, Pin};
use std::sync::Arc;

use anyhow::anyhow;
use either::Either;
use futures::stream::select_with_strategy;
use futures::{pin_mut, stream, StreamExt, TryStreamExt};
use futures_async_stream::{for_await, try_stream};
use futures_async_stream::try_stream;
use itertools::Itertools;
use maplit::hashmap;
use risingwave_common::array::{DataChunk, StreamChunk};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::types::{DataType, JsonbVal, ScalarRefImpl};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::source::external::{CdcOffset, DebeziumOffset, DebeziumSourceOffset};
use risingwave_connector::source::{
SourceColumnDesc, SourceContext, SplitId, SplitImpl, SplitMetaData,
};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::row::OwnedRow;
use risingwave_connector::source::external::CdcOffset;
use risingwave_connector::source::SplitMetaData;
use risingwave_storage::StateStore;
use serde_json::Value;

use crate::executor::backfill::cdc::state::{CdcStateManageImpl, EmbededStateManage};
use crate::executor::backfill::cdc::utils::transform_upstream;
Expand All @@ -48,8 +40,8 @@ use crate::executor::backfill::utils::{
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor,
ExecutorInfo, Message, MessageStream, Mutation, PkIndices, PkIndicesRef,
SourceStateTableHandler, StreamExecutorError, StreamExecutorResult,
ExecutorInfo, Message, Mutation, PkIndices, PkIndicesRef, SourceStateTableHandler,
StreamExecutorError,
};
use crate::task::{ActorId, CreateMviewProgress};

Expand Down Expand Up @@ -154,7 +146,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
},
Some(splits) => {
assert!(!splits.is_empty());
let split = splits.iter().exactly_one().map_err(|err| {
let split = splits.iter().exactly_one().map_err(|_err| {
StreamExecutorError::from(anyhow!(
"expect only one cdc split for table {}",
upstream_table_id
Expand Down Expand Up @@ -214,7 +206,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
// Keep track of rows from the snapshot.
#[allow(unused_variables)]
let mut total_snapshot_processed_rows: u64 = 0;
let mut snapshot_read_epoch = init_epoch;
let mut snapshot_read_epoch;

let mut last_binlog_offset: Option<CdcOffset>;

Expand Down Expand Up @@ -506,7 +498,7 @@ mod tests {
// let payload = r#"{"before": null,"after":{"O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" },"source":{"version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null},"op":"r","ts_ms":1695277757017,"transaction":null}"#.to_string();
let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;

let mut datums: Vec<Datum> = vec![
let datums: Vec<Datum> = vec![
Some(JsonbVal::from_str(payload).unwrap().into()),
Some("file: 1.binlog, pos: 100".to_string().into()),
Some("mydb.orders".to_string().into()),
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/backfill/cdc/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub async fn transform_upstream(upstream: BoxedMessageStream, schema: &Schema) {
for msg in upstream {
let mut msg = msg?;
if let Message::Chunk(chunk) = &mut msg {
let mut parsed_chunk = parse_debezium_chunk(&mut parser, chunk, schema).await?;
let parsed_chunk = parse_debezium_chunk(&mut parser, chunk, schema).await?;
let _ = std::mem::replace(chunk, parsed_chunk);
}
yield msg;
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use itertools::Itertools;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
use risingwave_common::row::{Row, RowExt};
use risingwave_common::row::Row;
use risingwave_common::types::ScalarRefImpl;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/from_proto/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use std::sync::Arc;

use anyhow::anyhow;
use maplit::hashmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId, TableOption};

use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
use risingwave_common::util::sort_util::OrderType;
use risingwave_connector::error::ConnectorError;
use risingwave_connector::source::external::{CdcTableType, SchemaTableName};
Expand Down

0 comments on commit 5ca8af7

Please sign in to comment.