diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs index 1929e59a2365..694641156fa4 100644 --- a/src/log-store/src/noop.rs +++ b/src/log-store/src/noop.rs @@ -66,14 +66,13 @@ impl LogStore for NoopLogStore { async fn append(&self, mut _e: Self::Entry) -> Result { Ok(AppendResponse { - entry_id: 0, - offset: None, + last_entry_id: Default::default(), }) } async fn append_batch(&self, _e: Vec) -> Result { Ok(AppendBatchResponse { - offsets: HashMap::new(), + last_entry_ids: HashMap::new(), }) } diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index eb14bf0cf90a..33d1d6247687 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -179,8 +180,7 @@ impl LogStore for RaftEngineLogStore { .write(&mut batch, self.config.sync_write) .context(RaftEngineSnafu)?; Ok(AppendResponse { - entry_id, - offset: None, + last_entry_id: entry_id, }) } @@ -192,11 +192,19 @@ impl LogStore for RaftEngineLogStore { return Ok(AppendBatchResponse::default()); } + // Records the last entry id for each region's entries. + let mut last_entry_ids: HashMap = + HashMap::with_capacity(entries.len()); let mut batch = LogBatch::with_capacity(entries.len()); for e in entries { self.check_entry(&e)?; + // For raft-engine log store, the namespace id is the region id. let ns_id = e.namespace_id; + last_entry_ids + .entry(ns_id) + .and_modify(|x| *x = (*x).max(e.id)) + .or_insert(e.id); batch .add_entries::(ns_id, &[e]) .context(AddEntryLogBatchSnafu)?; @@ -207,8 +215,7 @@ impl LogStore for RaftEngineLogStore { .write(&mut batch, self.config.sync_write) .context(RaftEngineSnafu)?; - // The user of raft-engine log store does not care about the response. - Ok(AppendBatchResponse::default()) + Ok(AppendBatchResponse { last_entry_ids }) } /// Create a stream of entries from logstore in the given namespace. The end of stream is @@ -381,7 +388,7 @@ mod tests { use common_base::readable_size::ReadableSize; use common_telemetry::debug; - use common_test_util::temp_dir::create_temp_dir; + use common_test_util::temp_dir::{create_temp_dir, TempDir}; use futures_util::StreamExt; use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::Namespace as NamespaceTrait; @@ -452,7 +459,7 @@ mod tests { )) .await .unwrap(); - assert_eq!(i, response.entry_id); + assert_eq!(i, response.last_entry_id); } let mut entries = HashSet::with_capacity(1024); let mut s = logstore.read(&Namespace::with_id(1), 0).await.unwrap(); @@ -526,10 +533,7 @@ mod tests { size } - #[tokio::test] - async fn test_compaction() { - common_telemetry::init_default_ut_logging(); - let dir = create_temp_dir("raft-engine-logstore-test"); + async fn new_test_log_store(dir: &TempDir) -> RaftEngineLogStore { let path = dir.path().to_str().unwrap().to_string(); let config = RaftEngineConfig { @@ -539,7 +543,15 @@ mod tests { ..Default::default() }; - let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap(); + RaftEngineLogStore::try_new(path, config).await.unwrap() + } + + #[tokio::test] + async fn test_compaction() { + common_telemetry::init_default_ut_logging(); + let dir = create_temp_dir("raft-engine-logstore-test"); + let logstore = new_test_log_store(&dir).await; + let namespace = Namespace::with_id(42); for id in 0..4096 { let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec()); @@ -562,16 +574,8 @@ mod tests { async fn test_obsolete() { common_telemetry::init_default_ut_logging(); let dir = create_temp_dir("raft-engine-logstore-test"); - let path = dir.path().to_str().unwrap().to_string(); + let logstore = new_test_log_store(&dir).await; - let config = RaftEngineConfig { - file_size: ReadableSize::mb(2), - purge_threshold: ReadableSize::mb(4), - purge_interval: Duration::from_secs(5), - ..Default::default() - }; - - let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap(); let namespace = Namespace::with_id(42); for id in 0..1024 { let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec()); @@ -591,16 +595,7 @@ mod tests { async fn test_append_batch() { common_telemetry::init_default_ut_logging(); let dir = create_temp_dir("logstore-append-batch-test"); - let path = dir.path().to_str().unwrap().to_string(); - - let config = RaftEngineConfig { - file_size: ReadableSize::mb(2), - purge_threshold: ReadableSize::mb(4), - purge_interval: Duration::from_secs(5), - ..Default::default() - }; - - let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap(); + let logstore = new_test_log_store(&dir).await; let entries = (0..8) .flat_map(|ns_id| { @@ -622,16 +617,7 @@ mod tests { async fn test_append_batch_interleaved() { common_telemetry::init_default_ut_logging(); let dir = create_temp_dir("logstore-append-batch-test"); - - let path = dir.path().to_str().unwrap().to_string(); - let config = RaftEngineConfig { - file_size: ReadableSize::mb(2), - purge_threshold: ReadableSize::mb(4), - purge_interval: Duration::from_secs(5), - ..Default::default() - }; - - let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap(); + let logstore = new_test_log_store(&dir).await; let entries = vec![ Entry::create(0, 0, [b'0'; 4096].to_vec()), @@ -646,4 +632,30 @@ mod tests { assert_eq!((Some(0), Some(2)), logstore.span(&Namespace::with_id(0))); assert_eq!((Some(0), Some(1)), logstore.span(&Namespace::with_id(1))); } + + #[tokio::test] + async fn test_append_batch_response() { + common_telemetry::init_default_ut_logging(); + let dir = create_temp_dir("logstore-append-batch-test"); + let logstore = new_test_log_store(&dir).await; + + let entries = vec![ + // Entry[0] from region 0. + Entry::create(0, 0, [b'0'; 4096].to_vec()), + // Entry[0] from region 1. + Entry::create(0, 1, [b'1'; 4096].to_vec()), + // Entry[1] from region 1. + Entry::create(1, 0, [b'1'; 4096].to_vec()), + // Entry[1] from region 0. + Entry::create(1, 1, [b'0'; 4096].to_vec()), + // Entry[2] from region 2. + Entry::create(2, 2, [b'2'; 4096].to_vec()), + ]; + + // Ensure the last entry id returned for each region is the expected one. + let last_entry_ids = logstore.append_batch(entries).await.unwrap().last_entry_ids; + assert_eq!(last_entry_ids[&0], 1); + assert_eq!(last_entry_ids[&1], 1); + assert_eq!(last_entry_ids[&2], 2); + } } diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 8a6decefb4ac..7d27e49eaf73 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -167,8 +167,6 @@ impl RegionWriteCtx { &self.wal_entry, &self.wal_options, )?; - // We only call this method one time, but we still bump next entry id for consistency. - self.next_entry_id += 1; Ok(()) } diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index 3bbfefe96b93..ac17d3df5415 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -27,7 +27,7 @@ use futures::StreamExt; use prost::Message; use snafu::ResultExt; use store_api::logstore::entry::Entry; -use store_api::logstore::LogStore; +use store_api::logstore::{AppendBatchResponse, LogStore}; use store_api::storage::RegionId; use crate::error::{ @@ -165,8 +165,7 @@ impl WalWriter { } /// Write all buffered entries to the WAL. - // TODO(niebayes): returns an `AppendBatchResponse` and handle it properly. - pub async fn write_to_wal(&mut self) -> Result<()> { + pub async fn write_to_wal(&mut self) -> Result { // TODO(yingwen): metrics. let entries = mem::take(&mut self.entries); @@ -175,7 +174,6 @@ impl WalWriter { .await .map_err(BoxedError::new) .context(WriteWalSnafu) - .map(|_| ()) } } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 97a481d7d4dc..e10012d57447 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -73,12 +73,23 @@ impl RegionWorkerLoop { region_ctx.set_error(e); } } - if let Err(e) = wal_writer.write_to_wal().await.map_err(Arc::new) { - // Failed to write wal. - for mut region_ctx in region_ctxs.into_values() { - region_ctx.set_error(e.clone()); + match wal_writer.write_to_wal().await.map_err(Arc::new) { + Ok(response) => { + for (region_id, region_ctx) in region_ctxs.iter_mut() { + // Safety: the log store implementation ensures that either the `write_to_wal` fails and no + // response is returned or the last entry ids for each region do exist. + let last_entry_id = + response.last_entry_ids.get(®ion_id.as_u64()).unwrap(); + region_ctx.set_next_entry_id(last_entry_id + 1); + } + } + Err(e) => { + // Failed to write wal. + for mut region_ctx in region_ctxs.into_values() { + region_ctx.set_error(e.clone()); + } + return; } - return; } } diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 3fb81d9a624c..fd08f2d6522b 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -19,7 +19,7 @@ use std::collections::HashMap; use common_config::wal::WalOptions; use common_error::ext::ErrorExt; -use crate::logstore::entry::{Entry, Id as EntryId, Offset as EntryOffset}; +use crate::logstore::entry::{Entry, Id as EntryId}; use crate::logstore::entry_stream::SendableEntryStream; use crate::logstore::namespace::{Id as NamespaceId, Namespace}; @@ -34,21 +34,20 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { type Namespace: Namespace; type Entry: Entry; - /// Stop components of logstore. + /// Stops components of the logstore. async fn stop(&self) -> Result<(), Self::Error>; - /// Append an `Entry` to WAL with given namespace and return append response containing - /// the entry id. + /// Appends an entry to the log store and returns a response containing the id of the append entry. async fn append(&self, entry: Self::Entry) -> Result; - /// Append a batch of entries and return an append batch response containing the start entry ids of - /// log entries written to each region. + /// Appends a batch of entries and returns a response containing a map where the key is a region id + /// while the value is the id of the last successfully written entry of the region. async fn append_batch( &self, entries: Vec, ) -> Result; - /// Create a new `EntryStream` to asynchronously generates `Entry` with ids + /// Creates a new `EntryStream` to asynchronously generates `Entry` with ids /// starting from `id`. async fn read( &self, @@ -56,43 +55,39 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { id: EntryId, ) -> Result, Self::Error>; - /// Create a new `Namespace`. + /// Creates a new `Namespace` from the given ref. async fn create_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error>; - /// Delete an existing `Namespace` with given ref. + /// Deletes an existing `Namespace` specified by the given ref. async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error>; - /// List all existing namespaces. + /// Lists all existing namespaces. async fn list_namespaces(&self) -> Result, Self::Error>; - /// Create an entry of the associate Entry type + /// Creates an entry of the associated Entry type fn entry>(&self, data: D, entry_id: EntryId, ns: Self::Namespace) -> Self::Entry; - /// Create a namespace of the associate Namespace type + /// Creates a namespace of the associated Namespace type // TODO(sunng87): confusion with `create_namespace` fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace; - /// Mark all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete - /// the log files if all entries inside are obsolete. This method may not delete log - /// files immediately. + /// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete, + /// so that the log store can safely delete those entries. This method does not guarantee + /// that the obsolete entries are deleted immediately. async fn obsolete(&self, ns: Self::Namespace, entry_id: EntryId) -> Result<(), Self::Error>; } /// The response of an `append` operation. #[derive(Debug)] pub struct AppendResponse { - /// The entry id of the appended log entry. - pub entry_id: EntryId, - /// The start entry offset of the appended log entry. - /// Depends on the `LogStore` implementation, the entry offset may be missing. - pub offset: Option, + /// The id of the entry appended to the log store. + pub last_entry_id: EntryId, } /// The response of an `append_batch` operation. #[derive(Debug, Default)] pub struct AppendBatchResponse { - /// Key: region id (as u64). Value: the known minimum start offset of the appended log entries belonging to the region. - /// Depends on the `LogStore` implementation, the entry offsets may be missing. - pub offsets: HashMap, + /// Key: region id (as u64). Value: the id of the last successfully written entry of the region. + pub last_entry_ids: HashMap, } diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index cb2538086e6d..1748ff5621be 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -16,21 +16,23 @@ use common_error::ext::ErrorExt; use crate::logstore::namespace::Namespace; -/// An entry's logical id, allocated by log store users. +/// An entry's id. +/// Different log store implementations may interpret the id to different meanings. pub type Id = u64; -/// An entry's physical offset in the underlying log store. -pub type Offset = usize; -/// Entry is the minimal data storage unit in `LogStore`. +/// Entry is the minimal data storage unit through which users interact with the log store. +/// The log store implementation may have larger or smaller data storage unit than an entry. pub trait Entry: Send + Sync { type Error: ErrorExt + Send + Sync; type Namespace: Namespace; - /// Return contained data of entry. + /// Returns the contained data of the entry. fn data(&self) -> &[u8]; - /// Return entry id that monotonically increments. + /// Returns the id of the entry. + /// Usually the namespace id is identical with the region id. fn id(&self) -> Id; + /// Returns the namespace of the entry. fn namespace(&self) -> Self::Namespace; } diff --git a/src/store-api/src/logstore/namespace.rs b/src/store-api/src/logstore/namespace.rs index 35a136d809ac..ac1b62e31bd4 100644 --- a/src/store-api/src/logstore/namespace.rs +++ b/src/store-api/src/logstore/namespace.rs @@ -14,8 +14,11 @@ use std::hash::Hash; +/// The namespace id. +/// Usually the namespace id is identical with the region id. pub type Id = u64; pub trait Namespace: Send + Sync + Clone + std::fmt::Debug + Hash + PartialEq + Eq { + /// Returns the namespace id. fn id(&self) -> Id; }