Skip to content

Commit

Permalink
feat: add bound check to append_batch API
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Jan 3, 2024
1 parent 0a63a72 commit 86119f8
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 29 deletions.
3 changes: 3 additions & 0 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ pub enum Error {
last_index: u64,
attempt_index: u64,
},

#[snafu(display("Duplicate log entry, region: {}, attempt index: {}", region_id, index,))]
DuplicateLogIndex { region_id: RegionId, index: u64 },
}

impl ErrorExt for Error {
Expand Down
91 changes: 62 additions & 29 deletions src/log-store/src/raft_engine/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

Expand All @@ -28,9 +28,9 @@ use store_api::logstore::namespace::{Id as NamespaceId, Namespace as NamespaceTr
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};

use crate::error::{
AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, Error, FetchEntrySnafu,
IllegalNamespaceSnafu, IllegalStateSnafu, OverrideCompactedEntrySnafu, RaftEngineSnafu, Result,
StartGcTaskSnafu, StopGcTaskSnafu,
AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, DuplicateLogIndexSnafu, Error,
FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu, OverrideCompactedEntrySnafu,
RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu,
};
use crate::raft_engine::backend::SYSTEM_NAMESPACE;
use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace};
Expand Down Expand Up @@ -116,22 +116,71 @@ impl RaftEngineLogStore {
)
}

/// Checks if entry does not override the min index of namespace.
fn check_entry(&self, e: &EntryImpl) -> Result<()> {
if cfg!(debug_assertions) {
/// Converts entries to `LogBatch` and checks if entry ids are valid.
/// Returns the `LogBatch` converted along with the last entry id
/// to append in each namespace(region).
fn entries_to_batch(
&self,
entries: Vec<EntryImpl>,
) -> Result<(LogBatch, HashMap<NamespaceId, EntryId>)> {
// Records the last entry id for each region's entries.
let mut entry_ids: HashMap<NamespaceId, BTreeSet<EntryId>> =
HashMap::with_capacity(entries.len());

let mut batch = LogBatch::with_capacity(entries.len());

for e in entries {
let ns_id = e.namespace_id;
if let Some(first_index) = self.engine.first_index(ns_id) {
if !entry_ids.entry(ns_id).or_default().insert(e.id) {
return DuplicateLogIndexSnafu {
region_id: ns_id,
index: e.id,
}
.fail();
}
batch
.add_entries::<MessageType>(ns_id, &[e])
.context(AddEntryLogBatchSnafu)?;
}

let mut last_entry_ids = HashMap::with_capacity(entry_ids.len());
for (region, ids) in entry_ids {
let first_in_batch = *ids.first().unwrap();
let last_in_batch = *ids.last().unwrap();
ensure!(
(last_in_batch - first_in_batch) == ids.len() as u64 - 1,
DiscontinuousLogIndexSnafu {
region_id: region,
last_index: first_in_batch,
attempt_index: last_in_batch
}
);

if let Some(first_index) = self.engine.first_index(region) {
ensure!(
e.id() >= first_index,
first_in_batch >= first_index,
OverrideCompactedEntrySnafu {
namespace: ns_id,
namespace: region,
first_index,
attempt_index: e.id(),
attempt_index: first_in_batch,
}
);
}

if let Some(last_index) = self.engine.last_index(region) {
ensure!(
first_in_batch == last_index + 1,
DiscontinuousLogIndexSnafu {
region_id: region,
last_index,
attempt_index: first_in_batch
}
);
}
last_entry_ids.insert(region, last_in_batch);
}
Ok(())

Ok((batch, last_entry_ids))
}
}

Expand Down Expand Up @@ -203,23 +252,7 @@ impl LogStore for RaftEngineLogStore {
return Ok(AppendBatchResponse::default());
}

// Records the last entry id for each region's entries.
let mut last_entry_ids: HashMap<NamespaceId, EntryId> =
HashMap::with_capacity(entries.len());
let mut batch = LogBatch::with_capacity(entries.len());

for e in entries {
self.check_entry(&e)?;
// For raft-engine log store, the namespace id is the region id.
let ns_id = e.namespace_id;
last_entry_ids
.entry(ns_id)
.and_modify(|x| *x = (*x).max(e.id))
.or_insert(e.id);
batch
.add_entries::<MessageType>(ns_id, &[e])
.context(AddEntryLogBatchSnafu)?;
}
let (mut batch, last_entry_ids) = self.entries_to_batch(entries)?;

let _ = self
.engine
Expand Down

0 comments on commit 86119f8

Please sign in to comment.