Skip to content

Commit

Permalink
refactor(remote_wal): entry id usage (#2986)
Browse files Browse the repository at this point in the history
* chore: update comments for log store stuff

* refactor: entry id usage

* tmp update

* Revert "tmp update"

This reverts commit fcfcda2.

* fix: resolve review conversations

* fix: resolve review conversations

* chore: remove entry offset
  • Loading branch information
niebayes authored Dec 25, 2023
1 parent 4664cc6 commit d4ac873
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 83 deletions.
5 changes: 2 additions & 3 deletions src/log-store/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,13 @@ impl LogStore for NoopLogStore {

async fn append(&self, mut _e: Self::Entry) -> Result<AppendResponse> {
Ok(AppendResponse {
entry_id: 0,
offset: None,
last_entry_id: Default::default(),
})
}

async fn append_batch(&self, _e: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
Ok(AppendBatchResponse {
offsets: HashMap::new(),
last_entry_ids: HashMap::new(),
})
}

Expand Down
92 changes: 52 additions & 40 deletions src/log-store/src/raft_engine/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

Expand Down Expand Up @@ -179,8 +180,7 @@ impl LogStore for RaftEngineLogStore {
.write(&mut batch, self.config.sync_write)
.context(RaftEngineSnafu)?;
Ok(AppendResponse {
entry_id,
offset: None,
last_entry_id: entry_id,
})
}

Expand All @@ -192,11 +192,19 @@ impl LogStore for RaftEngineLogStore {
return Ok(AppendBatchResponse::default());
}

// Records the last entry id for each region's entries.
let mut last_entry_ids: HashMap<NamespaceId, EntryId> =
HashMap::with_capacity(entries.len());
let mut batch = LogBatch::with_capacity(entries.len());

for e in entries {
self.check_entry(&e)?;
// For raft-engine log store, the namespace id is the region id.
let ns_id = e.namespace_id;
last_entry_ids
.entry(ns_id)
.and_modify(|x| *x = (*x).max(e.id))
.or_insert(e.id);
batch
.add_entries::<MessageType>(ns_id, &[e])
.context(AddEntryLogBatchSnafu)?;
Expand All @@ -207,8 +215,7 @@ impl LogStore for RaftEngineLogStore {
.write(&mut batch, self.config.sync_write)
.context(RaftEngineSnafu)?;

// The user of raft-engine log store does not care about the response.
Ok(AppendBatchResponse::default())
Ok(AppendBatchResponse { last_entry_ids })
}

/// Create a stream of entries from logstore in the given namespace. The end of stream is
Expand Down Expand Up @@ -381,7 +388,7 @@ mod tests {

use common_base::readable_size::ReadableSize;
use common_telemetry::debug;
use common_test_util::temp_dir::create_temp_dir;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use futures_util::StreamExt;
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::Namespace as NamespaceTrait;
Expand Down Expand Up @@ -452,7 +459,7 @@ mod tests {
))
.await
.unwrap();
assert_eq!(i, response.entry_id);
assert_eq!(i, response.last_entry_id);
}
let mut entries = HashSet::with_capacity(1024);
let mut s = logstore.read(&Namespace::with_id(1), 0).await.unwrap();
Expand Down Expand Up @@ -526,10 +533,7 @@ mod tests {
size
}

#[tokio::test]
async fn test_compaction() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("raft-engine-logstore-test");
async fn new_test_log_store(dir: &TempDir) -> RaftEngineLogStore {
let path = dir.path().to_str().unwrap().to_string();

let config = RaftEngineConfig {
Expand All @@ -539,7 +543,15 @@ mod tests {
..Default::default()
};

let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap();
RaftEngineLogStore::try_new(path, config).await.unwrap()
}

#[tokio::test]
async fn test_compaction() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("raft-engine-logstore-test");
let logstore = new_test_log_store(&dir).await;

let namespace = Namespace::with_id(42);
for id in 0..4096 {
let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec());
Expand All @@ -562,16 +574,8 @@ mod tests {
async fn test_obsolete() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("raft-engine-logstore-test");
let path = dir.path().to_str().unwrap().to_string();
let logstore = new_test_log_store(&dir).await;

let config = RaftEngineConfig {
file_size: ReadableSize::mb(2),
purge_threshold: ReadableSize::mb(4),
purge_interval: Duration::from_secs(5),
..Default::default()
};

let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap();
let namespace = Namespace::with_id(42);
for id in 0..1024 {
let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec());
Expand All @@ -591,16 +595,7 @@ mod tests {
async fn test_append_batch() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("logstore-append-batch-test");
let path = dir.path().to_str().unwrap().to_string();

let config = RaftEngineConfig {
file_size: ReadableSize::mb(2),
purge_threshold: ReadableSize::mb(4),
purge_interval: Duration::from_secs(5),
..Default::default()
};

let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap();
let logstore = new_test_log_store(&dir).await;

let entries = (0..8)
.flat_map(|ns_id| {
Expand All @@ -622,16 +617,7 @@ mod tests {
async fn test_append_batch_interleaved() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("logstore-append-batch-test");

let path = dir.path().to_str().unwrap().to_string();
let config = RaftEngineConfig {
file_size: ReadableSize::mb(2),
purge_threshold: ReadableSize::mb(4),
purge_interval: Duration::from_secs(5),
..Default::default()
};

let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap();
let logstore = new_test_log_store(&dir).await;

let entries = vec![
Entry::create(0, 0, [b'0'; 4096].to_vec()),
Expand All @@ -646,4 +632,30 @@ mod tests {
assert_eq!((Some(0), Some(2)), logstore.span(&Namespace::with_id(0)));
assert_eq!((Some(0), Some(1)), logstore.span(&Namespace::with_id(1)));
}

#[tokio::test]
async fn test_append_batch_response() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("logstore-append-batch-test");
let logstore = new_test_log_store(&dir).await;

let entries = vec![
// Entry[0] from region 0.
Entry::create(0, 0, [b'0'; 4096].to_vec()),
// Entry[0] from region 1.
Entry::create(0, 1, [b'1'; 4096].to_vec()),
// Entry[1] from region 1.
Entry::create(1, 0, [b'1'; 4096].to_vec()),
// Entry[1] from region 0.
Entry::create(1, 1, [b'0'; 4096].to_vec()),
// Entry[2] from region 2.
Entry::create(2, 2, [b'2'; 4096].to_vec()),
];

// Ensure the last entry id returned for each region is the expected one.
let last_entry_ids = logstore.append_batch(entries).await.unwrap().last_entry_ids;
assert_eq!(last_entry_ids[&0], 1);
assert_eq!(last_entry_ids[&1], 1);
assert_eq!(last_entry_ids[&2], 2);
}
}
2 changes: 0 additions & 2 deletions src/mito2/src/region_write_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ impl RegionWriteCtx {
&self.wal_entry,
&self.wal_options,
)?;
// We only call this method one time, but we still bump next entry id for consistency.
self.next_entry_id += 1;
Ok(())
}

Expand Down
6 changes: 2 additions & 4 deletions src/mito2/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use futures::StreamExt;
use prost::Message;
use snafu::ResultExt;
use store_api::logstore::entry::Entry;
use store_api::logstore::LogStore;
use store_api::logstore::{AppendBatchResponse, LogStore};
use store_api::storage::RegionId;

use crate::error::{
Expand Down Expand Up @@ -165,8 +165,7 @@ impl<S: LogStore> WalWriter<S> {
}

/// Write all buffered entries to the WAL.
// TODO(niebayes): returns an `AppendBatchResponse` and handle it properly.
pub async fn write_to_wal(&mut self) -> Result<()> {
pub async fn write_to_wal(&mut self) -> Result<AppendBatchResponse> {
// TODO(yingwen): metrics.

let entries = mem::take(&mut self.entries);
Expand All @@ -175,7 +174,6 @@ impl<S: LogStore> WalWriter<S> {
.await
.map_err(BoxedError::new)
.context(WriteWalSnafu)
.map(|_| ())
}
}

Expand Down
21 changes: 16 additions & 5 deletions src/mito2/src/worker/handle_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,23 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_ctx.set_error(e);
}
}
if let Err(e) = wal_writer.write_to_wal().await.map_err(Arc::new) {
// Failed to write wal.
for mut region_ctx in region_ctxs.into_values() {
region_ctx.set_error(e.clone());
match wal_writer.write_to_wal().await.map_err(Arc::new) {
Ok(response) => {
for (region_id, region_ctx) in region_ctxs.iter_mut() {
// Safety: the log store implementation ensures that either the `write_to_wal` fails and no
// response is returned or the last entry ids for each region do exist.
let last_entry_id =
response.last_entry_ids.get(&region_id.as_u64()).unwrap();
region_ctx.set_next_entry_id(last_entry_id + 1);
}
}
Err(e) => {
// Failed to write wal.
for mut region_ctx in region_ctxs.into_values() {
region_ctx.set_error(e.clone());
}
return;
}
return;
}
}

Expand Down
41 changes: 18 additions & 23 deletions src/store-api/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::HashMap;
use common_config::wal::WalOptions;
use common_error::ext::ErrorExt;

use crate::logstore::entry::{Entry, Id as EntryId, Offset as EntryOffset};
use crate::logstore::entry::{Entry, Id as EntryId};
use crate::logstore::entry_stream::SendableEntryStream;
use crate::logstore::namespace::{Id as NamespaceId, Namespace};

Expand All @@ -34,65 +34,60 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
type Namespace: Namespace;
type Entry: Entry;

/// Stop components of logstore.
/// Stops components of the logstore.
async fn stop(&self) -> Result<(), Self::Error>;

/// Append an `Entry` to WAL with given namespace and return append response containing
/// the entry id.
/// Appends an entry to the log store and returns a response containing the id of the append entry.
async fn append(&self, entry: Self::Entry) -> Result<AppendResponse, Self::Error>;

/// Append a batch of entries and return an append batch response containing the start entry ids of
/// log entries written to each region.
/// Appends a batch of entries and returns a response containing a map where the key is a region id
/// while the value is the id of the last successfully written entry of the region.
async fn append_batch(
&self,
entries: Vec<Self::Entry>,
) -> Result<AppendBatchResponse, Self::Error>;

/// Create a new `EntryStream` to asynchronously generates `Entry` with ids
/// Creates a new `EntryStream` to asynchronously generates `Entry` with ids
/// starting from `id`.
async fn read(
&self,
ns: &Self::Namespace,
id: EntryId,
) -> Result<SendableEntryStream<Self::Entry, Self::Error>, Self::Error>;

/// Create a new `Namespace`.
/// Creates a new `Namespace` from the given ref.
async fn create_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error>;

/// Delete an existing `Namespace` with given ref.
/// Deletes an existing `Namespace` specified by the given ref.
async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error>;

/// List all existing namespaces.
/// Lists all existing namespaces.
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>, Self::Error>;

/// Create an entry of the associate Entry type
/// Creates an entry of the associated Entry type
fn entry<D: AsRef<[u8]>>(&self, data: D, entry_id: EntryId, ns: Self::Namespace)
-> Self::Entry;

/// Create a namespace of the associate Namespace type
/// Creates a namespace of the associated Namespace type
// TODO(sunng87): confusion with `create_namespace`
fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace;

/// Mark all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete
/// the log files if all entries inside are obsolete. This method may not delete log
/// files immediately.
/// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete,
/// so that the log store can safely delete those entries. This method does not guarantee
/// that the obsolete entries are deleted immediately.
async fn obsolete(&self, ns: Self::Namespace, entry_id: EntryId) -> Result<(), Self::Error>;
}

/// The response of an `append` operation.
#[derive(Debug)]
pub struct AppendResponse {
/// The entry id of the appended log entry.
pub entry_id: EntryId,
/// The start entry offset of the appended log entry.
/// Depends on the `LogStore` implementation, the entry offset may be missing.
pub offset: Option<EntryOffset>,
/// The id of the entry appended to the log store.
pub last_entry_id: EntryId,
}

/// The response of an `append_batch` operation.
#[derive(Debug, Default)]
pub struct AppendBatchResponse {
/// Key: region id (as u64). Value: the known minimum start offset of the appended log entries belonging to the region.
/// Depends on the `LogStore` implementation, the entry offsets may be missing.
pub offsets: HashMap<u64, EntryOffset>,
/// Key: region id (as u64). Value: the id of the last successfully written entry of the region.
pub last_entry_ids: HashMap<u64, EntryId>,
}
14 changes: 8 additions & 6 deletions src/store-api/src/logstore/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,23 @@ use common_error::ext::ErrorExt;

use crate::logstore::namespace::Namespace;

/// An entry's logical id, allocated by log store users.
/// An entry's id.
/// Different log store implementations may interpret the id to different meanings.
pub type Id = u64;
/// An entry's physical offset in the underlying log store.
pub type Offset = usize;

/// Entry is the minimal data storage unit in `LogStore`.
/// Entry is the minimal data storage unit through which users interact with the log store.
/// The log store implementation may have larger or smaller data storage unit than an entry.
pub trait Entry: Send + Sync {
type Error: ErrorExt + Send + Sync;
type Namespace: Namespace;

/// Return contained data of entry.
/// Returns the contained data of the entry.
fn data(&self) -> &[u8];

/// Return entry id that monotonically increments.
/// Returns the id of the entry.
/// Usually the namespace id is identical with the region id.
fn id(&self) -> Id;

/// Returns the namespace of the entry.
fn namespace(&self) -> Self::Namespace;
}
3 changes: 3 additions & 0 deletions src/store-api/src/logstore/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@

use std::hash::Hash;

/// The namespace id.
/// Usually the namespace id is identical with the region id.
pub type Id = u64;

pub trait Namespace: Send + Sync + Clone + std::fmt::Debug + Hash + PartialEq + Eq {
/// Returns the namespace id.
fn id(&self) -> Id;
}

0 comments on commit d4ac873

Please sign in to comment.