Skip to content

Commit

Permalink
chore: add bound check for raft-engine logstore
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Jan 2, 2024
1 parent b526d15 commit 0a63a72
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
13 changes: 13 additions & 0 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_error::ext::ErrorExt;
use common_macro::stack_trace_debug;
use common_runtime::error::Error as RuntimeError;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;

use crate::kafka::NamespaceImpl as KafkaNamespace;

Expand Down Expand Up @@ -172,6 +173,18 @@ pub enum Error {

#[snafu(display("Failed to do a cast"))]
Cast { location: Location },

#[snafu(display(
"Attempt to append discontinuous log entry, region: {}, last index: {}, attempt index: {}",
region_id,
last_index,
attempt_index
))]
DiscontinuousLogIndex {
region_id: RegionId,
last_index: u64,
attempt_index: u64,
},
}

impl ErrorExt for Error {
Expand Down
19 changes: 15 additions & 4 deletions src/log-store/src/raft_engine/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::{Id as NamespaceId, Namespace as NamespaceTrait};
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};

use crate::error;
use crate::error::{
AddEntryLogBatchSnafu, Error, FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu,
OverrideCompactedEntrySnafu, RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu,
AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, Error, FetchEntrySnafu,
IllegalNamespaceSnafu, IllegalStateSnafu, OverrideCompactedEntrySnafu, RaftEngineSnafu, Result,
StartGcTaskSnafu, StopGcTaskSnafu,
};
use crate::raft_engine::backend::SYSTEM_NAMESPACE;
use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace};
Expand Down Expand Up @@ -167,14 +167,25 @@ impl LogStore for RaftEngineLogStore {
if let Some(first_index) = self.engine.first_index(namespace_id) {
ensure!(
entry_id >= first_index,
error::OverrideCompactedEntrySnafu {
OverrideCompactedEntrySnafu {
namespace: namespace_id,
first_index,
attempt_index: entry_id,
}
);
}

if let Some(last_index) = self.engine.last_index(namespace_id) {
ensure!(
entry_id == last_index + 1,
DiscontinuousLogIndexSnafu {
region_id: namespace_id,
last_index,
attempt_index: entry_id
}
);
}

let _ = self
.engine
.write(&mut batch, self.config.sync_write)
Expand Down

0 comments on commit 0a63a72

Please sign in to comment.