Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into xx/block-refill
Browse files Browse the repository at this point in the history
  • Loading branch information
MrCroxx committed Dec 8, 2023
2 parents 10571cc + b87bcc3 commit b4a9b3a
Show file tree
Hide file tree
Showing 17 changed files with 73 additions and 4 deletions.
6 changes: 3 additions & 3 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ RUN apt-get update \

FROM base AS dashboard-builder

RUN apt-get install -y curl gnupg protobuf-compiler && mkdir -p /etc/apt/keyrings \
RUN apt-get update && apt-get install -y curl gnupg protobuf-compiler && mkdir -p /etc/apt/keyrings \
&& curl -fsSL https://deb.nodesource.com/gpgkey/nodesource-repo.gpg.key | gpg --dearmor -o /etc/apt/keyrings/nodesource.gpg \
&& echo "deb [signed-by=/etc/apt/keyrings/nodesource.gpg] https://deb.nodesource.com/node_18.x nodistro main" | tee /etc/apt/sources.list.d/nodesource.list \
&& apt-get update && apt-get install -y nodejs
Expand All @@ -19,7 +19,7 @@ RUN cd /risingwave/dashboard && npm i && npm run build-static && rm -rf node_mod

FROM base AS rust-base

RUN apt-get -y install make cmake protobuf-compiler curl bash lld unzip
RUN apt-get update && apt-get -y install make cmake protobuf-compiler curl bash lld unzip

SHELL ["/bin/bash", "-c"]

Expand Down Expand Up @@ -90,7 +90,7 @@ RUN find . -name pom.xml -exec bash -c 'mkdir -p poms/$(dirname {}); mv {} poms/
# We use rust-maven-plugin to build java-binding. So it's FROM rust-base
FROM rust-base AS java-builder

RUN apt-get -y install maven
RUN apt-get update && apt-get -y install maven

RUN mkdir -p /risingwave
WORKDIR /risingwave/java
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/dedup/append_only_dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl<S: StateStore> AppendOnlyDedupExecutor<S> {

yield Message::Chunk(chunk);
}
self.state_table.try_flush().await?;
}

Message::Barrier(barrier) => {
Expand Down
11 changes: 11 additions & 0 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,15 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
Ok(())
}

async fn try_flush_data(this: &mut ExecutorInner<K, S>) -> StreamExecutorResult<()> {
futures::future::try_join_all(
this.all_state_tables_mut()
.map(|table| async { table.try_flush().await }),
)
.await?;
Ok(())
}

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(self) {
let HashAggExecutor {
Expand Down Expand Up @@ -656,6 +665,8 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
yield Message::Chunk(chunk?);
}
}

Self::try_flush_data(&mut this).await?;
}
Message::Barrier(barrier) => {
Self::update_metrics(&this, &mut vars);
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/over_window/eowc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ impl<S: StateStore> EowcOverWindowExecutor<S> {
if let Some(chunk) = output_chunk {
yield Message::Chunk(chunk);
}
this.state_table.try_flush().await?;
}
Message::Barrier(barrier) => {
this.state_table.commit(barrier.epoch).await?;
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ impl<S: StateStore> OverWindowExecutor<S> {
for chunk in Self::apply_chunk(&mut this, &mut vars, chunk) {
yield Message::Chunk(chunk?);
}
this.state_table.try_flush().await?;
}
Message::Barrier(barrier) => {
this.state_table.commit(barrier.epoch).await?;
Expand Down
8 changes: 8 additions & 0 deletions src/stream/src/executor/simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,13 @@ impl<S: StateStore> SimpleAggExecutor<S> {
Ok(chunk)
}

async fn try_flush_data(this: &mut ExecutorInner<S>) -> StreamExecutorResult<()> {
futures::future::try_join_all(this.all_state_tables_mut().map(|table| table.try_flush()))
.await?;

Ok(())
}

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(self) {
let Self {
Expand Down Expand Up @@ -296,6 +303,7 @@ impl<S: StateStore> SimpleAggExecutor<S> {
Message::Watermark(_) => {}
Message::Chunk(chunk) => {
Self::apply_chunk(&mut this, &mut vars, chunk).await?;
Self::try_flush_data(&mut this).await?;
}
Message::Barrier(barrier) => {
if let Some(chunk) =
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl<S: StateStore> SortExecutor<S> {
Message::Chunk(chunk) => {
vars.buffer.apply_chunk(chunk, &mut this.buffer_table);
vars.buffer_changed = true;
this.buffer_table.try_flush().await?;
}
Message::Barrier(barrier) => {
if vars.buffer_changed {
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ impl<S: StateStore> FsFetchExecutor<S> {
})
.collect();
state_store_handler.take_snapshot(file_assignment).await?;
state_store_handler.state_store.try_flush().await?;
}
_ => unreachable!(),
}
Expand Down
9 changes: 9 additions & 0 deletions src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,14 @@ impl<S: StateStore> FsSourceExecutor<S> {
Ok(())
}

async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
let core = &mut self.stream_source_core;

core.split_state_store.state_store.try_flush().await?;

Ok(())
}

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn into_stream(mut self) {
let mut barrier_receiver = self.barrier_receiver.take().unwrap();
Expand Down Expand Up @@ -443,6 +451,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
])
.inc_by(chunk.cardinality() as u64);
yield Message::Chunk(chunk);
self.try_flush_data().await?;
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,13 @@ impl<S: StateStore> SourceExecutor<S> {
Ok(())
}

async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
let core = self.stream_source_core.as_mut().unwrap();
core.split_state_store.state_store.try_flush().await?;

Ok(())
}

/// A source executor with a stream source receives:
/// 1. Barrier messages
/// 2. Data from external source
Expand Down Expand Up @@ -598,6 +605,7 @@ impl<S: StateStore> SourceExecutor<S> {
)
.inc_by(chunk.cardinality() as u64);
yield Message::Chunk(chunk);
self.try_flush_data().await?;
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/executor/top_n/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ where
self.managed_state.flush(epoch).await
}

async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
self.managed_state.try_flush().await
}

fn info(&self) -> &ExecutorInfo {
&self.info
}
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/executor/top_n/group_top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ where
self.managed_state.flush(epoch).await
}

async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
self.managed_state.try_flush().await
}

fn info(&self) -> &ExecutorInfo {
&self.info
}
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/executor/top_n/top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ where
self.managed_state.flush(epoch).await
}

async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
self.managed_state.try_flush().await
}

fn info(&self) -> &ExecutorInfo {
&self.info
}
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/executor/top_n/top_n_plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ where
self.managed_state.flush(epoch).await
}

async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
self.managed_state.try_flush().await
}

fn info(&self) -> &ExecutorInfo {
&self.info
}
Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/top_n/top_n_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ impl<S: StateStore> ManagedTopNState<S> {
self.state_table.commit(epoch).await?;
Ok(())
}

pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
self.state_table.try_flush().await?;
Ok(())
}
}

#[cfg(test)]
Expand Down
8 changes: 7 additions & 1 deletion src/stream/src/executor/top_n/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub trait TopNExecutorBase: Send + 'static {
epoch: EpochPair,
) -> impl Future<Output = StreamExecutorResult<()>> + Send;

/// Flush the buffered chunk to the storage backend.
fn try_flush_data(&mut self) -> impl Future<Output = StreamExecutorResult<()>> + Send;

fn info(&self) -> &ExecutorInfo;

/// Update the vnode bitmap for the state table and manipulate the cache if necessary, only used
Expand Down Expand Up @@ -125,7 +128,10 @@ where
yield Message::Watermark(output_watermark);
}
}
Message::Chunk(chunk) => yield Message::Chunk(self.inner.apply_chunk(chunk).await?),
Message::Chunk(chunk) => {
yield Message::Chunk(self.inner.apply_chunk(chunk).await?);
self.inner.try_flush_data().await?;
}
Message::Barrier(barrier) => {
self.inner.flush_data(barrier.epoch).await?;

Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/watermark_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
watermark,
));
}
table.try_flush().await?;
}
Message::Watermark(watermark) => {
if watermark.col_idx == event_time_col_idx {
Expand Down

0 comments on commit b4a9b3a

Please sign in to comment.