Skip to content

Commit

Permalink
Merge branch 'main' of github.com:risingwavelabs/risingwave into bakj…
Browse files Browse the repository at this point in the history
…os/deno_udf
  • Loading branch information
bakjos committed Apr 2, 2024
2 parents 4d75e6a + 2f6d635 commit 94b2550
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 17 deletions.
4 changes: 2 additions & 2 deletions src/stream/src/executor/aggregation/agg_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use risingwave_storage::StateStore;

use super::agg_state::{AggState, AggStateStorage};
use crate::common::table::state_table::StateTable;
use crate::consistency::inconsistency_panic;
use crate::consistency::consistency_panic;
use crate::executor::error::StreamExecutorResult;
use crate::executor::PkIndices;

Expand Down Expand Up @@ -317,7 +317,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
.expect("row count state should not be NULL")
.as_int64();
if row_count < 0 {
inconsistency_panic!(group = ?self.group_key_row(), row_count, "row count should be non-negative");
consistency_panic!(group = ?self.group_key_row(), row_count, "row count should be non-negative");

// NOTE: Here is the case that an inconsistent `DELETE` arrives at HashAgg executor, and there's no
// corresponding group existing before (or has been deleted). In this case, `prev_row_count()` will
Expand Down
9 changes: 3 additions & 6 deletions src/stream/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use super::row::{DegreeType, EncodedJoinRow};
use crate::cache::{new_with_hasher_in, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::common::table::state_table::StateTable;
use crate::consistency::enable_strict_consistency;
use crate::consistency::{consistency_error, enable_strict_consistency};
use crate::executor::error::StreamExecutorResult;
use crate::executor::join::row::JoinRow;
use crate::executor::monitor::StreamingMetrics;
Expand Down Expand Up @@ -685,7 +685,7 @@ impl JoinEntryState {
assert!(ret.is_ok(), "we have removed existing entry, if any");
if removed {
// if not silent, we should log the error
tracing::error!(?key, "double inserting a join state entry");
consistency_error!(?key, "double inserting a join state entry");
}
}

Expand All @@ -700,10 +700,7 @@ impl JoinEntryState {
} else if enable_strict_consistency() {
Err(JoinEntryError::RemoveError)
} else {
tracing::error!(
?pk,
"removing a join state entry but it is not in the cache"
);
consistency_error!(?pk, "removing a join state entry but it's not in the cache");
Ok(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use risingwave_common_estimate_size::KvSize;
use thiserror::Error;

use super::*;
use crate::consistency::enable_strict_consistency;
use crate::consistency::{consistency_error, enable_strict_consistency};

/// We manages a `HashMap` in memory for all entries belonging to a join key.
/// When evicted, `cached` does not hold any entries.
Expand Down Expand Up @@ -70,7 +70,7 @@ impl JoinEntryState {
assert!(ret.is_ok(), "we have removed existing entry, if any");
if removed {
// if not silent, we should log the error
tracing::error!(?key, "double inserting a join state entry");
consistency_error!(?key, "double inserting a join state entry");
}
}

Expand All @@ -85,10 +85,7 @@ impl JoinEntryState {
} else if enable_strict_consistency() {
Err(JoinEntryError::RemoveError)
} else {
tracing::error!(
?pk,
"removing a join state entry but it is not in the cache"
);
consistency_error!(?pk, "removing a join state entry but it's not in the cache");
Ok(())
}
}
Expand Down
29 changes: 26 additions & 3 deletions src/stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ mod consistency {
static INSANE_MODE: LazyLock<bool> =
LazyLock::new(|| env_var_is_true("RW_UNSAFE_ENABLE_INSANE_MODE"));

/// Check if the insane mode is enabled.
pub(crate) fn insane() -> bool {
*INSANE_MODE
}

/// Check if strict consistency is required.
pub(crate) fn enable_strict_consistency() -> bool {
let res = crate::CONFIG.try_with(|config| config.unsafe_enable_strict_consistency);
if cfg!(test) {
Expand All @@ -88,13 +90,34 @@ mod consistency {
}
}

macro_rules! inconsistency_panic {
/// Log an error message for breaking consistency. Must only be called in non-strict mode.
/// The log message will be suppressed if it is called too frequently.
macro_rules! consistency_error {
($($arg:tt)*) => {
debug_assert!(!crate::consistency::enable_strict_consistency());

use std::sync::LazyLock;
use risingwave_common::log::LogSuppresser;

static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(suppressed_count, $($arg)*);
}
};
}
pub(crate) use consistency_error;

/// Log an error message for breaking consistency, then panic if strict consistency is required.
/// The log message will be suppressed if it is called too frequently.
macro_rules! consistency_panic {
($($arg:tt)*) => {
tracing::error!($($arg)*);
if crate::consistency::enable_strict_consistency() {
tracing::error!($($arg)*);
panic!("inconsistency happened, see error log for details");
} else {
crate::consistency::consistency_error!($($arg)*);
}
};
}
pub(crate) use inconsistency_panic;
pub(crate) use consistency_panic;
}

0 comments on commit 94b2550

Please sign in to comment.