From 2c1b1cecc85802a2a7f2df0cf85f7fff18e1885d Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 9 Jan 2024 14:42:46 +0800 Subject: [PATCH] chore: add bound check for raft-engine logstore (#3073) * chore: add bound check for raft-engine logstore * feat: add bound check to append_batch API * chore: check entry id during replay * chore: resolve conflicts * feat: add allow_stale_entries options to force obsolete wal entries * chore: resolve some comments --- config/datanode.example.toml | 2 + config/standalone.example.toml | 2 + src/log-store/src/error.rs | 13 +++ src/log-store/src/raft_engine/log_store.rs | 109 ++++++++++++++------- src/mito2/src/config.rs | 3 + src/mito2/src/error.rs | 13 +++ src/mito2/src/region/opener.rs | 27 ++++- src/mito2/src/worker/handle_catchup.rs | 1 + tests-integration/tests/http.rs | 1 + 9 files changed, 134 insertions(+), 37 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 627fb9cccaa8..420c8d77dfa7 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -117,6 +117,8 @@ sst_write_buffer_size = "8MB" scan_parallelism = 0 # Capacity of the channel to send data from parallel scan tasks to the main task (default 32). parallel_scan_channel_size = 32 +# Whether to allow stale WAL entries read during replay. +allow_stale_entries = false # Log options, see `standalone.example.toml` # [logging] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index c9fd320b077d..5757f263737d 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -214,6 +214,8 @@ sst_write_buffer_size = "8MB" scan_parallelism = 0 # Capacity of the channel to send data from parallel scan tasks to the main task (default 32). parallel_scan_channel_size = 32 +# Whether to allow stale WAL entries read during replay. +allow_stale_entries = false # Log options # [logging] diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index b3f8b5d08585..be880b188211 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -20,6 +20,7 @@ use common_macro::stack_trace_debug; use common_runtime::error::Error as RuntimeError; use serde_json::error::Error as JsonError; use snafu::{Location, Snafu}; +use store_api::storage::RegionId; use crate::kafka::NamespaceImpl as KafkaNamespace; @@ -183,6 +184,18 @@ pub enum Error { #[snafu(display("The record sequence is not legal, error: {}", error))] IllegalSequence { location: Location, error: String }, + + #[snafu(display( + "Attempt to append discontinuous log entry, region: {}, last index: {}, attempt index: {}", + region_id, + last_index, + attempt_index + ))] + DiscontinuousLogIndex { + region_id: RegionId, + last_index: u64, + attempt_index: u64, + }, } impl ErrorExt for Error { diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 15c81d1e49d6..515ce6645f1b 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::sync::atomic::{AtomicI64, Ordering}; @@ -23,15 +24,15 @@ use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::{error, info}; use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode}; use snafu::{ensure, ResultExt}; -use store_api::logstore::entry::{Entry, Id as EntryId}; +use store_api::logstore::entry::Id as EntryId; use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::{Id as NamespaceId, Namespace as NamespaceTrait}; use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; -use crate::error; use crate::error::{ - AddEntryLogBatchSnafu, Error, FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu, - OverrideCompactedEntrySnafu, RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu, + AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, Error, FetchEntrySnafu, + IllegalNamespaceSnafu, IllegalStateSnafu, OverrideCompactedEntrySnafu, RaftEngineSnafu, Result, + StartGcTaskSnafu, StopGcTaskSnafu, }; use crate::raft_engine::backend::SYSTEM_NAMESPACE; use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace}; @@ -121,22 +122,65 @@ impl RaftEngineLogStore { ) } - /// Checks if entry does not override the min index of namespace. - fn check_entry(&self, e: &EntryImpl) -> Result<()> { - if cfg!(debug_assertions) { + /// Converts entries to `LogBatch` and checks if entry ids are valid. + /// Returns the `LogBatch` converted along with the last entry id + /// to append in each namespace(region). + fn entries_to_batch( + &self, + entries: Vec, + ) -> Result<(LogBatch, HashMap)> { + // Records the last entry id for each region's entries. + let mut entry_ids: HashMap = HashMap::with_capacity(entries.len()); + let mut batch = LogBatch::with_capacity(entries.len()); + + for e in entries { let ns_id = e.namespace_id; - if let Some(first_index) = self.engine.first_index(ns_id) { - ensure!( - e.id() >= first_index, - OverrideCompactedEntrySnafu { - namespace: ns_id, - first_index, - attempt_index: e.id(), + match entry_ids.entry(ns_id) { + Entry::Occupied(mut o) => { + let prev = *o.get(); + ensure!( + e.id == prev + 1, + DiscontinuousLogIndexSnafu { + region_id: ns_id, + last_index: prev, + attempt_index: e.id + } + ); + o.insert(e.id); + } + Entry::Vacant(v) => { + // this entry is the first in batch of given region. + if let Some(first_index) = self.engine.first_index(ns_id) { + // ensure the first in batch does not override compacted entry. + ensure!( + e.id > first_index, + OverrideCompactedEntrySnafu { + namespace: ns_id, + first_index, + attempt_index: e.id, + } + ); } - ); + // ensure the first in batch does not form a hole in raft-engine. + if let Some(last_index) = self.engine.last_index(ns_id) { + ensure!( + e.id == last_index + 1, + DiscontinuousLogIndexSnafu { + region_id: ns_id, + last_index, + attempt_index: e.id + } + ); + } + v.insert(e.id); + } } + batch + .add_entries::(ns_id, &[e]) + .context(AddEntryLogBatchSnafu)?; } - Ok(()) + + Ok((batch, entry_ids)) } } @@ -171,8 +215,8 @@ impl LogStore for RaftEngineLogStore { if let Some(first_index) = self.engine.first_index(namespace_id) { ensure!( - entry_id >= first_index, - error::OverrideCompactedEntrySnafu { + entry_id > first_index, + OverrideCompactedEntrySnafu { namespace: namespace_id, first_index, attempt_index: entry_id, @@ -180,6 +224,17 @@ impl LogStore for RaftEngineLogStore { ); } + if let Some(last_index) = self.engine.last_index(namespace_id) { + ensure!( + entry_id == last_index + 1, + DiscontinuousLogIndexSnafu { + region_id: namespace_id, + last_index, + attempt_index: entry_id + } + ); + } + let _ = self .engine .write(&mut batch, self.config.sync_write) @@ -197,23 +252,7 @@ impl LogStore for RaftEngineLogStore { return Ok(AppendBatchResponse::default()); } - // Records the last entry id for each region's entries. - let mut last_entry_ids: HashMap = - HashMap::with_capacity(entries.len()); - let mut batch = LogBatch::with_capacity(entries.len()); - - for e in entries { - self.check_entry(&e)?; - // For raft-engine log store, the namespace id is the region id. - let ns_id = e.namespace_id; - last_entry_ids - .entry(ns_id) - .and_modify(|x| *x = (*x).max(e.id)) - .or_insert(e.id); - batch - .add_entries::(ns_id, &[e]) - .context(AddEntryLogBatchSnafu)?; - } + let (mut batch, last_entry_ids) = self.entries_to_batch(entries)?; let mut sync = self.config.sync_write; diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index b56c16addf79..0723c702ae70 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -87,6 +87,8 @@ pub struct MitoConfig { pub scan_parallelism: usize, /// Capacity of the channel to send data from parallel scan tasks to the main task (default 32). pub parallel_scan_channel_size: usize, + /// Whether to allow stale entries read during replay. + pub allow_stale_entries: bool, } impl Default for MitoConfig { @@ -110,6 +112,7 @@ impl Default for MitoConfig { sst_write_buffer_size: ReadableSize::mb(8), scan_parallelism: divide_num_cpus(4), parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, + allow_stale_entries: false, } } } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index b63068072883..86ac9abfe79b 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -473,6 +473,18 @@ pub enum Error { #[snafu(display("Invalid config, {reason}"))] InvalidConfig { reason: String, location: Location }, + + #[snafu(display( + "Stale log entry found during replay, region: {}, flushed: {}, replayed: {}", + region_id, + flushed_entry_id, + unexpected_entry_id + ))] + StaleLogEntry { + region_id: RegionId, + flushed_entry_id: u64, + unexpected_entry_id: u64, + }, } pub type Result = std::result::Result; @@ -563,6 +575,7 @@ impl ErrorExt for Error { } CleanDir { .. } => StatusCode::Unexpected, InvalidConfig { .. } => StatusCode::InvalidArguments, + StaleLogEntry { .. } => StatusCode::Unexpected, } } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 9fd6e36dc898..80116ea9fdd8 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -32,7 +32,9 @@ use store_api::storage::{ColumnId, RegionId}; use crate::access_layer::AccessLayer; use crate::cache::CacheManagerRef; use crate::config::MitoConfig; -use crate::error::{EmptyRegionDirSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu, Result}; +use crate::error::{ + EmptyRegionDirSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu, Result, StaleLogEntrySnafu, +}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::manifest::storage::manifest_compress_type; use crate::memtable::MemtableBuilderRef; @@ -267,6 +269,7 @@ impl RegionOpener { region_id, flushed_entry_id, &version_control, + config.allow_stale_entries, ) .await?; } else { @@ -375,6 +378,7 @@ pub(crate) async fn replay_memtable( region_id: RegionId, flushed_entry_id: EntryId, version_control: &VersionControlRef, + allow_stale_entries: bool, ) -> Result { let mut rows_replayed = 0; // Last entry id should start from flushed entry id since there might be no @@ -383,10 +387,23 @@ pub(crate) async fn replay_memtable( let mut region_write_ctx = RegionWriteCtx::new(region_id, version_control, wal_options.clone()); let replay_from_entry_id = flushed_entry_id + 1; + let mut stale_entry_found = false; let mut wal_stream = wal.scan(region_id, replay_from_entry_id, wal_options)?; while let Some(res) = wal_stream.next().await { let (entry_id, entry) = res?; - debug_assert!(entry_id > flushed_entry_id); + if entry_id <= flushed_entry_id { + stale_entry_found = true; + warn!("Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}", region_id, flushed_entry_id, entry_id); + ensure!( + allow_stale_entries, + StaleLogEntrySnafu { + region_id, + flushed_entry_id, + unexpected_entry_id: entry_id, + } + ); + } + last_entry_id = last_entry_id.max(entry_id); for mutation in entry.mutations { rows_replayed += mutation @@ -402,6 +419,12 @@ pub(crate) async fn replay_memtable( region_write_ctx.set_next_entry_id(last_entry_id + 1); region_write_ctx.write_memtable(); + if allow_stale_entries && stale_entry_found { + wal.obsolete(region_id, flushed_entry_id, wal_options) + .await?; + info!("Force obsolete WAL entries, region id: {}, flushed entry id: {}, last entry id read: {}", region_id, flushed_entry_id, last_entry_id); + } + info!( "Replay WAL for region: {}, rows recovered: {}, last entry id: {}", region_id, rows_replayed, last_entry_id diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index c25f5e074da1..9841c4eb43f6 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -78,6 +78,7 @@ impl RegionWorkerLoop { region_id, flushed_entry_id, ®ion.version_control, + self.config.allow_stale_entries, ) .await?; info!( diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index fda52c3c35ef..351fa7cd0b95 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -737,6 +737,7 @@ experimental_write_cache_path = "" experimental_write_cache_size = "512MiB" sst_write_buffer_size = "8MiB" parallel_scan_channel_size = 32 +allow_stale_entries = false [[datanode.region_engine]]