Skip to content

Commit

Permalink
chore: add bound check for raft-engine logstore (#3073)
Browse files Browse the repository at this point in the history
* chore: add bound check for raft-engine logstore

* feat: add bound check to append_batch API

* chore: check entry id during replay

* chore: resolve conflicts

* feat: add allow_stale_entries options to force obsolete wal entries

* chore: resolve some comments
  • Loading branch information
v0y4g3r authored Jan 9, 2024
1 parent 62db28b commit 2c1b1ce
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 37 deletions.
2 changes: 2 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ sst_write_buffer_size = "8MB"
scan_parallelism = 0
# Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
parallel_scan_channel_size = 32
# Whether to allow stale WAL entries read during replay.
allow_stale_entries = false

# Log options, see `standalone.example.toml`
# [logging]
Expand Down
2 changes: 2 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ sst_write_buffer_size = "8MB"
scan_parallelism = 0
# Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
parallel_scan_channel_size = 32
# Whether to allow stale WAL entries read during replay.
allow_stale_entries = false

# Log options
# [logging]
Expand Down
13 changes: 13 additions & 0 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_macro::stack_trace_debug;
use common_runtime::error::Error as RuntimeError;
use serde_json::error::Error as JsonError;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;

use crate::kafka::NamespaceImpl as KafkaNamespace;

Expand Down Expand Up @@ -183,6 +184,18 @@ pub enum Error {

#[snafu(display("The record sequence is not legal, error: {}", error))]
IllegalSequence { location: Location, error: String },

#[snafu(display(
"Attempt to append discontinuous log entry, region: {}, last index: {}, attempt index: {}",
region_id,
last_index,
attempt_index
))]
DiscontinuousLogIndex {
region_id: RegionId,
last_index: u64,
attempt_index: u64,
},
}

impl ErrorExt for Error {
Expand Down
109 changes: 74 additions & 35 deletions src/log-store/src/raft_engine/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicI64, Ordering};
Expand All @@ -23,15 +24,15 @@ use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::{error, info};
use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode};
use snafu::{ensure, ResultExt};
use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::entry::Id as EntryId;
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::{Id as NamespaceId, Namespace as NamespaceTrait};
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};

use crate::error;
use crate::error::{
AddEntryLogBatchSnafu, Error, FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu,
OverrideCompactedEntrySnafu, RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu,
AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, Error, FetchEntrySnafu,
IllegalNamespaceSnafu, IllegalStateSnafu, OverrideCompactedEntrySnafu, RaftEngineSnafu, Result,
StartGcTaskSnafu, StopGcTaskSnafu,
};
use crate::raft_engine::backend::SYSTEM_NAMESPACE;
use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace};
Expand Down Expand Up @@ -121,22 +122,65 @@ impl RaftEngineLogStore {
)
}

/// Checks if entry does not override the min index of namespace.
fn check_entry(&self, e: &EntryImpl) -> Result<()> {
if cfg!(debug_assertions) {
/// Converts entries to `LogBatch` and checks if entry ids are valid.
/// Returns the `LogBatch` converted along with the last entry id
/// to append in each namespace(region).
fn entries_to_batch(
&self,
entries: Vec<EntryImpl>,
) -> Result<(LogBatch, HashMap<NamespaceId, EntryId>)> {
// Records the last entry id for each region's entries.
let mut entry_ids: HashMap<NamespaceId, EntryId> = HashMap::with_capacity(entries.len());
let mut batch = LogBatch::with_capacity(entries.len());

for e in entries {
let ns_id = e.namespace_id;
if let Some(first_index) = self.engine.first_index(ns_id) {
ensure!(
e.id() >= first_index,
OverrideCompactedEntrySnafu {
namespace: ns_id,
first_index,
attempt_index: e.id(),
match entry_ids.entry(ns_id) {
Entry::Occupied(mut o) => {
let prev = *o.get();
ensure!(
e.id == prev + 1,
DiscontinuousLogIndexSnafu {
region_id: ns_id,
last_index: prev,
attempt_index: e.id
}
);
o.insert(e.id);
}
Entry::Vacant(v) => {
// this entry is the first in batch of given region.
if let Some(first_index) = self.engine.first_index(ns_id) {
// ensure the first in batch does not override compacted entry.
ensure!(
e.id > first_index,
OverrideCompactedEntrySnafu {
namespace: ns_id,
first_index,
attempt_index: e.id,
}
);
}
);
// ensure the first in batch does not form a hole in raft-engine.
if let Some(last_index) = self.engine.last_index(ns_id) {
ensure!(
e.id == last_index + 1,
DiscontinuousLogIndexSnafu {
region_id: ns_id,
last_index,
attempt_index: e.id
}
);
}
v.insert(e.id);
}
}
batch
.add_entries::<MessageType>(ns_id, &[e])
.context(AddEntryLogBatchSnafu)?;
}
Ok(())

Ok((batch, entry_ids))
}
}

Expand Down Expand Up @@ -171,15 +215,26 @@ impl LogStore for RaftEngineLogStore {

if let Some(first_index) = self.engine.first_index(namespace_id) {
ensure!(
entry_id >= first_index,
error::OverrideCompactedEntrySnafu {
entry_id > first_index,
OverrideCompactedEntrySnafu {
namespace: namespace_id,
first_index,
attempt_index: entry_id,
}
);
}

if let Some(last_index) = self.engine.last_index(namespace_id) {
ensure!(
entry_id == last_index + 1,
DiscontinuousLogIndexSnafu {
region_id: namespace_id,
last_index,
attempt_index: entry_id
}
);
}

let _ = self
.engine
.write(&mut batch, self.config.sync_write)
Expand All @@ -197,23 +252,7 @@ impl LogStore for RaftEngineLogStore {
return Ok(AppendBatchResponse::default());
}

// Records the last entry id for each region's entries.
let mut last_entry_ids: HashMap<NamespaceId, EntryId> =
HashMap::with_capacity(entries.len());
let mut batch = LogBatch::with_capacity(entries.len());

for e in entries {
self.check_entry(&e)?;
// For raft-engine log store, the namespace id is the region id.
let ns_id = e.namespace_id;
last_entry_ids
.entry(ns_id)
.and_modify(|x| *x = (*x).max(e.id))
.or_insert(e.id);
batch
.add_entries::<MessageType>(ns_id, &[e])
.context(AddEntryLogBatchSnafu)?;
}
let (mut batch, last_entry_ids) = self.entries_to_batch(entries)?;

let mut sync = self.config.sync_write;

Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ pub struct MitoConfig {
pub scan_parallelism: usize,
/// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
pub parallel_scan_channel_size: usize,
/// Whether to allow stale entries read during replay.
pub allow_stale_entries: bool,
}

impl Default for MitoConfig {
Expand All @@ -110,6 +112,7 @@ impl Default for MitoConfig {
sst_write_buffer_size: ReadableSize::mb(8),
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,18 @@ pub enum Error {

#[snafu(display("Invalid config, {reason}"))]
InvalidConfig { reason: String, location: Location },

#[snafu(display(
"Stale log entry found during replay, region: {}, flushed: {}, replayed: {}",
region_id,
flushed_entry_id,
unexpected_entry_id
))]
StaleLogEntry {
region_id: RegionId,
flushed_entry_id: u64,
unexpected_entry_id: u64,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -563,6 +575,7 @@ impl ErrorExt for Error {
}
CleanDir { .. } => StatusCode::Unexpected,
InvalidConfig { .. } => StatusCode::InvalidArguments,
StaleLogEntry { .. } => StatusCode::Unexpected,
}
}

Expand Down
27 changes: 25 additions & 2 deletions src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use store_api::storage::{ColumnId, RegionId};
use crate::access_layer::AccessLayer;
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::error::{EmptyRegionDirSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu, Result};
use crate::error::{
EmptyRegionDirSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu, Result, StaleLogEntrySnafu,
};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::memtable::MemtableBuilderRef;
Expand Down Expand Up @@ -267,6 +269,7 @@ impl RegionOpener {
region_id,
flushed_entry_id,
&version_control,
config.allow_stale_entries,
)
.await?;
} else {
Expand Down Expand Up @@ -375,6 +378,7 @@ pub(crate) async fn replay_memtable<S: LogStore>(
region_id: RegionId,
flushed_entry_id: EntryId,
version_control: &VersionControlRef,
allow_stale_entries: bool,
) -> Result<EntryId> {
let mut rows_replayed = 0;
// Last entry id should start from flushed entry id since there might be no
Expand All @@ -383,10 +387,23 @@ pub(crate) async fn replay_memtable<S: LogStore>(
let mut region_write_ctx = RegionWriteCtx::new(region_id, version_control, wal_options.clone());

let replay_from_entry_id = flushed_entry_id + 1;
let mut stale_entry_found = false;
let mut wal_stream = wal.scan(region_id, replay_from_entry_id, wal_options)?;
while let Some(res) = wal_stream.next().await {
let (entry_id, entry) = res?;
debug_assert!(entry_id > flushed_entry_id);
if entry_id <= flushed_entry_id {
stale_entry_found = true;
warn!("Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}", region_id, flushed_entry_id, entry_id);
ensure!(
allow_stale_entries,
StaleLogEntrySnafu {
region_id,
flushed_entry_id,
unexpected_entry_id: entry_id,
}
);
}

last_entry_id = last_entry_id.max(entry_id);
for mutation in entry.mutations {
rows_replayed += mutation
Expand All @@ -402,6 +419,12 @@ pub(crate) async fn replay_memtable<S: LogStore>(
region_write_ctx.set_next_entry_id(last_entry_id + 1);
region_write_ctx.write_memtable();

if allow_stale_entries && stale_entry_found {
wal.obsolete(region_id, flushed_entry_id, wal_options)
.await?;
info!("Force obsolete WAL entries, region id: {}, flushed entry id: {}, last entry id read: {}", region_id, flushed_entry_id, last_entry_id);
}

info!(
"Replay WAL for region: {}, rows recovered: {}, last entry id: {}",
region_id, rows_replayed, last_entry_id
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/worker/handle_catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id,
flushed_entry_id,
&region.version_control,
self.config.allow_stale_entries,
)
.await?;
info!(
Expand Down
1 change: 1 addition & 0 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,7 @@ experimental_write_cache_path = ""
experimental_write_cache_size = "512MiB"
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
allow_stale_entries = false
[[datanode.region_engine]]
Expand Down

0 comments on commit 2c1b1ce

Please sign in to comment.