Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dyn-filter): filter left changes according to watermark before writing state table #17816

Merged
merged 9 commits into from
Aug 1, 2024
1 change: 1 addition & 0 deletions src/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ pub trait LocalStateStore: StaticSendSync {
read_options: ReadOptions,
) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_;

/// Get last persisted watermark for a given vnode.
fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;

/// Inserts a key-value entry associated with a given `epoch` into the state store.
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/common/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod state_cleaning;
pub mod state_table;
mod state_table_cache;
mod watermark;

#[cfg(test)]
pub mod test_state_table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,38 @@
// See the License for the specific language governing permissions and
// limitations under the License.

/// Strategy to decide how to buffer the watermarks, used for state cleaning.
pub trait WatermarkBufferStrategy: Default {
/// Strategy to decide when to do state cleaning.
pub trait StateCleanStrategy: Default {
/// Trigger when a epoch is committed.
fn tick(&mut self);

/// Whether to clear the buffer.
/// Whether to apply the state cleaning watermark.
///
/// Returns true to indicate that the buffer should be cleared and the strategy states reset.
/// Returns true to indicate that state cleaning should be applied.
fn apply(&mut self) -> bool;
}

/// No buffer, apply watermark to memory immediately.
/// Use the strategy when you want to apply the watermark immediately.
/// No delay, apply watermark to clean state immediately.
#[derive(Default, Debug)]
pub struct WatermarkNoBuffer;
pub struct EagerClean;

impl WatermarkBufferStrategy for WatermarkNoBuffer {
impl StateCleanStrategy for EagerClean {
fn tick(&mut self) {}

fn apply(&mut self) -> bool {
true
}
}

/// Buffer the watermark by a epoch period.
/// Delay the state cleaning by a specified epoch period.
/// The strategy reduced the delete-range calls to storage.
#[derive(Default, Debug)]
pub struct WatermarkBufferByEpoch<const PERIOD: usize> {
pub struct LazyCleanByEpoch<const PERIOD: usize> {
/// number of epochs since the last time we did state cleaning by watermark.
buffered_epochs_cnt: usize,
}

impl<const PERIOD: usize> WatermarkBufferStrategy for WatermarkBufferByEpoch<PERIOD> {
impl<const PERIOD: usize> StateCleanStrategy for LazyCleanByEpoch<PERIOD> {
fn tick(&mut self) {
self.buffered_epochs_cnt += 1;
}
Expand Down
Loading
Loading