diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index a0ca2d0d3326..51d8290ba34d 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -185,6 +185,9 @@ pub enum Error { last_index: u64, attempt_index: u64, }, + + #[snafu(display("Duplicate log entry, region: {}, attempt index: {}", region_id, index,))] + DuplicateLogIndex { region_id: RegionId, index: u64 }, } impl ErrorExt for Error { diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index dd7dfc8194f9..66893d097f1e 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -28,9 +28,9 @@ use store_api::logstore::namespace::{Id as NamespaceId, Namespace as NamespaceTr use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; use crate::error::{ - AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, Error, FetchEntrySnafu, - IllegalNamespaceSnafu, IllegalStateSnafu, OverrideCompactedEntrySnafu, RaftEngineSnafu, Result, - StartGcTaskSnafu, StopGcTaskSnafu, + AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, DuplicateLogIndexSnafu, Error, + FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu, OverrideCompactedEntrySnafu, + RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu, }; use crate::raft_engine::backend::SYSTEM_NAMESPACE; use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace}; @@ -116,22 +116,71 @@ impl RaftEngineLogStore { ) } - /// Checks if entry does not override the min index of namespace. - fn check_entry(&self, e: &EntryImpl) -> Result<()> { - if cfg!(debug_assertions) { + /// Converts entries to `LogBatch` and checks if entry ids are valid. + /// Returns the `LogBatch` converted along with the last entry id + /// to append in each namespace(region). + fn entries_to_batch( + &self, + entries: Vec, + ) -> Result<(LogBatch, HashMap)> { + // Records the last entry id for each region's entries. + let mut entry_ids: HashMap> = + HashMap::with_capacity(entries.len()); + + let mut batch = LogBatch::with_capacity(entries.len()); + + for e in entries { let ns_id = e.namespace_id; - if let Some(first_index) = self.engine.first_index(ns_id) { + if !entry_ids.entry(ns_id).or_default().insert(e.id) { + return DuplicateLogIndexSnafu { + region_id: ns_id, + index: e.id, + } + .fail(); + } + batch + .add_entries::(ns_id, &[e]) + .context(AddEntryLogBatchSnafu)?; + } + + let mut last_entry_ids = HashMap::with_capacity(entry_ids.len()); + for (region, ids) in entry_ids { + let first_in_batch = *ids.first().unwrap(); + let last_in_batch = *ids.last().unwrap(); + ensure!( + (last_in_batch - first_in_batch) == ids.len() as u64 - 1, + DiscontinuousLogIndexSnafu { + region_id: region, + last_index: first_in_batch, + attempt_index: last_in_batch + } + ); + + if let Some(first_index) = self.engine.first_index(region) { ensure!( - e.id() >= first_index, + first_in_batch >= first_index, OverrideCompactedEntrySnafu { - namespace: ns_id, + namespace: region, first_index, - attempt_index: e.id(), + attempt_index: first_in_batch, } ); } + + if let Some(last_index) = self.engine.last_index(region) { + ensure!( + first_in_batch == last_index + 1, + DiscontinuousLogIndexSnafu { + region_id: region, + last_index, + attempt_index: first_in_batch + } + ); + } + last_entry_ids.insert(region, last_in_batch); } - Ok(()) + + Ok((batch, last_entry_ids)) } } @@ -203,23 +252,7 @@ impl LogStore for RaftEngineLogStore { return Ok(AppendBatchResponse::default()); } - // Records the last entry id for each region's entries. - let mut last_entry_ids: HashMap = - HashMap::with_capacity(entries.len()); - let mut batch = LogBatch::with_capacity(entries.len()); - - for e in entries { - self.check_entry(&e)?; - // For raft-engine log store, the namespace id is the region id. - let ns_id = e.namespace_id; - last_entry_ids - .entry(ns_id) - .and_modify(|x| *x = (*x).max(e.id)) - .or_insert(e.id); - batch - .add_entries::(ns_id, &[e]) - .context(AddEntryLogBatchSnafu)?; - } + let (mut batch, last_entry_ids) = self.entries_to_batch(entries)?; let _ = self .engine