Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jan 12, 2024
1 parent 47d57bc commit d87a089
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 34 deletions.
66 changes: 39 additions & 27 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ use std::sync::Arc;

use either::Either;
use futures::stream::select_with_strategy;
use futures::{pin_mut, stream, StreamExt};
use futures::{stream, StreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::catalog::Schema;
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::Datum;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::{bail, row};
use risingwave_hummock_sdk::HummockReadEpoch;
Expand All @@ -35,8 +34,8 @@ use risingwave_storage::StateStore;
use crate::common::table::state_table::StateTable;
use crate::executor::backfill::utils;
use crate::executor::backfill::utils::{
compute_bounds, construct_initial_finished_state, create_builder, get_new_pos, iter_chunks,
mapping_chunk, mapping_message, mark_chunk, owned_row_iter, METADATA_STATE_LEN,
compute_bounds, construct_initial_finished_state, create_builder, get_new_pos, mapping_chunk,
mapping_message, mark_chunk, owned_row_iter, METADATA_STATE_LEN,
};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
Expand Down Expand Up @@ -239,7 +238,6 @@ where
snapshot_read_epoch,
current_pos.clone(),
true,
&mut builder
)
.map(Either::Right),);

Expand Down Expand Up @@ -299,20 +297,27 @@ where

break 'backfill_loop;
}
Some(chunk) => {
// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
current_pos =
Some(get_new_pos(&chunk, &pk_in_output_indices));

let chunk_cardinality = chunk.cardinality() as u64;
cur_barrier_snapshot_processed_rows += chunk_cardinality;
total_snapshot_processed_rows += chunk_cardinality;
yield Message::Chunk(mapping_chunk(
chunk,
&self.output_indices,
));
Some(record) => {
// Buffer the snapshot read row.
if let Some(data_chunk) = builder.append_one_row(record) {
let ops = vec![Op::Insert; data_chunk.capacity()];
let chunk = StreamChunk::from_parts(ops, data_chunk);
// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
current_pos =
Some(get_new_pos(&chunk, &pk_in_output_indices));

let chunk_cardinality = chunk.cardinality() as u64;
cur_barrier_snapshot_processed_rows +=
chunk_cardinality;
total_snapshot_processed_rows += chunk_cardinality;

yield Message::Chunk(mapping_chunk(
chunk,
&self.output_indices,
));
}
}
}
}
Expand All @@ -337,7 +342,16 @@ where
snapshot_read_complete = true;
break;
}
Some(chunk) => {
Some(row) => {
let chunk = match builder.append_one_row(row) {
Some(chunk) => chunk,
None => builder.consume_all().expect(
"we just appended one row, should have data chunk",
),
};
assert_eq!(chunk.cardinality(), 1);
let ops = vec![Op::Insert; 1];
let chunk = StreamChunk::from_parts(ops, chunk);
// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
Expand Down Expand Up @@ -614,13 +628,12 @@ where
/// remaining data in `builder` must be flushed manually.
/// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
/// present, Then when we flush we contain duplicate rows.
#[try_stream(ok = Option<StreamChunk>, error = StreamExecutorError)]
async fn snapshot_read<'a>(
upstream_table: &'a StorageTable<S>,
#[try_stream(ok = Option<OwnedRow>, error = StreamExecutorError)]
async fn snapshot_read(
upstream_table: &StorageTable<S>,
epoch: u64,
current_pos: Option<OwnedRow>,
ordered: bool,
builder: &'a mut DataChunkBuilder,
) {
let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos);
let range_bounds = match range_bounds {
Expand All @@ -644,11 +657,10 @@ where
)
.await?;
let row_iter = owned_row_iter(iter);
pin_mut!(row_iter);

#[for_await]
for chunk in iter_chunks(row_iter, builder) {
yield Some(chunk?);
for row in row_iter {
yield Some(row?);
}
yield None;
}
Expand Down
5 changes: 2 additions & 3 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,12 @@ impl Default for Configuration {
let mut file =
tempfile::NamedTempFile::new().expect("failed to create temp config file");

let config_data = format!(
r#"
let config_data = r#"
[server]
telemetry_enabled = false
metrics_level = "Disabled"
"#
);
.to_string();
file.write_all(config_data.as_bytes())
.expect("failed to write config file");
file.into_temp_path()
Expand Down
14 changes: 10 additions & 4 deletions src/tests/simulation/tests/integration_tests/backfill_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ async fn test_backfill_backpressure() -> Result<()> {
session.run("CREATE TABLE dim (v1 int);").await?;
// Ingest
// Amplification of 1000 records
session.run("INSERT INTO dim SELECT 1 FROM generate_series(1, 200);").await?;
session
.run("INSERT INTO dim SELECT 1 FROM generate_series(1, 200);")
.await?;
// Create fact table
session.run("CREATE TABLE fact (v1 int);").await?;
// Create sink
Expand All @@ -210,9 +212,13 @@ async fn test_backfill_backpressure() -> Result<()> {

// Ingest
tokio::spawn(async move {
session.run("INSERT INTO fact SELECT 1 FROM generate_series(1, 100000);").await.unwrap();
}).await?;
let mut session = cluster.start_session();
session
.run("INSERT INTO fact SELECT 1 FROM generate_series(1, 100000);")
.await
.unwrap();
})
.await?;
let mut session = cluster.start_session();
session.run("FLUSH").await?;
// Run flush to check if barrier can go through. It should be able to.
// There will be some latency for the initial barrier.
Expand Down

0 comments on commit d87a089

Please sign in to comment.