Skip to content

Commit

Permalink
refactor(stream): move backfill/upstream_table to `backfill/cdc/ups…
Browse files Browse the repository at this point in the history
…tream_table` (#13909)
  • Loading branch information
xxchan authored Dec 11, 2023
1 parent fb1bf0a commit c8d351b
Show file tree
Hide file tree
Showing 9 changed files with 13 additions and 27 deletions.
5 changes: 2 additions & 3 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@ use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_stream::common::table::state_table::StateTable;
use risingwave_stream::error::StreamResult;
use risingwave_stream::executor::external::ExternalStorageTable;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::test_utils::MockSource;
use risingwave_stream::executor::{
expect_first_barrier, ActorContext, AddMutation, Barrier, BoxedExecutor as StreamBoxedExecutor,
BoxedMessageStream, CdcBackfillExecutor, Executor, ExecutorInfo, MaterializeExecutor, Message,
Mutation, PkIndices, PkIndicesRef, StreamExecutorError,
BoxedMessageStream, CdcBackfillExecutor, Executor, ExecutorInfo, ExternalStorageTable,
MaterializeExecutor, Message, Mutation, PkIndices, PkIndicesRef, StreamExecutorError,
};

// mock upstream binlog offset starting from "1.binlog, pos=0"
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use risingwave_storage::StateStore;

use crate::common::table::state_table::StateTable;
use crate::executor::backfill::cdc::state::CdcBackfillState;
use crate::executor::backfill::upstream_table::external::ExternalStorageTable;
use crate::executor::backfill::upstream_table::snapshot::{
use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
use crate::executor::backfill::cdc::upstream_table::snapshot::{
SnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
};
use crate::executor::backfill::utils::{
Expand Down Expand Up @@ -417,7 +417,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {

tracing::info!(
upstream_table_id,
"CdcBackfill has already finished and forward messages directly to the downstream"
"CdcBackfill has already finished and will forward messages directly to the downstream"
);

// Wait for first barrier to come after backfill is finished.
Expand Down
6 changes: 5 additions & 1 deletion src/stream/src/executor/backfill/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod cdc_backfill;
mod cdc_backfill;
mod state;
mod upstream_table;

pub use cdc_backfill::CdcBackfillExecutor;
pub use upstream_table::external::ExternalStorageTable;
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common::row::OwnedRow;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_connector::source::external::{CdcOffset, ExternalTableReader};

use crate::executor::backfill::upstream_table::external::ExternalStorageTable;
use super::external::ExternalStorageTable;
use crate::executor::backfill::utils::iter_chunks;
use crate::executor::{StreamExecutorError, StreamExecutorResult, INVALID_EPOCH};

Expand All @@ -46,20 +46,6 @@ pub struct SnapshotReadArgs {
}

impl SnapshotReadArgs {
pub fn new(
epoch: u64,
current_pos: Option<OwnedRow>,
ordered: bool,
chunk_size: usize,
) -> Self {
Self {
epoch,
current_pos,
ordered,
chunk_size,
}
}

pub fn new_for_cdc(current_pos: Option<OwnedRow>, chunk_size: usize) -> Self {
Self {
epoch: INVALID_EPOCH,
Expand Down
1 change: 0 additions & 1 deletion src/stream/src/executor/backfill/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,4 @@
pub mod arrangement_backfill;
pub mod cdc;
pub mod no_shuffle_backfill;
pub mod upstream_table;
pub mod utils;
3 changes: 1 addition & 2 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ mod utils;

pub use actor::{Actor, ActorContext, ActorContextRef};
use anyhow::Context;
pub use backfill::cdc::cdc_backfill::CdcBackfillExecutor;
pub use backfill::cdc::{CdcBackfillExecutor, ExternalStorageTable};
pub use backfill::no_shuffle_backfill::*;
pub use backfill::upstream_table::*;
pub use barrier_recv::BarrierRecvExecutor;
pub use batch_query::BatchQueryExecutor;
pub use chain::ChainExecutor;
Expand Down
3 changes: 1 addition & 2 deletions src/stream/src/from_proto/stream_cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ use risingwave_pb::stream_plan::StreamCdcScanNode;

use super::*;
use crate::common::table::state_table::StateTable;
use crate::executor::external::ExternalStorageTable;
use crate::executor::CdcBackfillExecutor;
use crate::executor::{CdcBackfillExecutor, ExternalStorageTable};

pub struct StreamCdcScanExecutorBuilder;

Expand Down

0 comments on commit c8d351b

Please sign in to comment.