Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(remote_wal): entry id usage #2986

Merged
merged 7 commits into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/log-store/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,13 @@ impl LogStore for NoopLogStore {

async fn append(&self, mut _e: Self::Entry) -> Result<AppendResponse> {
Ok(AppendResponse {
entry_id: 0,
offset: None,
last_entry_id: Default::default(),
})
}

async fn append_batch(&self, _e: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
Ok(AppendBatchResponse {
offsets: HashMap::new(),
last_entry_ids: HashMap::new(),
})
}

Expand Down
91 changes: 51 additions & 40 deletions src/log-store/src/raft_engine/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

Expand Down Expand Up @@ -179,8 +180,7 @@ impl LogStore for RaftEngineLogStore {
.write(&mut batch, self.config.sync_write)
.context(RaftEngineSnafu)?;
Ok(AppendResponse {
entry_id,
offset: None,
last_entry_id: entry_id,
})
}

Expand All @@ -192,11 +192,18 @@ impl LogStore for RaftEngineLogStore {
return Ok(AppendBatchResponse::default());
}

// Records the last entry id for each region's entries.
let mut last_entry_ids = HashMap::with_capacity(entries.len());
let mut batch = LogBatch::with_capacity(entries.len());

for e in entries {
self.check_entry(&e)?;
// For raft-engine log store, the namespace id is the region id.
let ns_id = e.namespace_id;
last_entry_ids
niebayes marked this conversation as resolved.
Show resolved Hide resolved
.entry(ns_id)
.and_modify(|x: &mut u64| *x = (*x).max(e.id))
niebayes marked this conversation as resolved.
Show resolved Hide resolved
.or_insert(e.id);
batch
.add_entries::<MessageType>(ns_id, &[e])
.context(AddEntryLogBatchSnafu)?;
Expand All @@ -207,8 +214,7 @@ impl LogStore for RaftEngineLogStore {
.write(&mut batch, self.config.sync_write)
.context(RaftEngineSnafu)?;

// The user of raft-engine log store does not care about the response.
Ok(AppendBatchResponse::default())
Ok(AppendBatchResponse { last_entry_ids })
}

/// Create a stream of entries from logstore in the given namespace. The end of stream is
Expand Down Expand Up @@ -381,7 +387,7 @@ mod tests {

use common_base::readable_size::ReadableSize;
use common_telemetry::debug;
use common_test_util::temp_dir::create_temp_dir;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use futures_util::StreamExt;
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::Namespace as NamespaceTrait;
Expand Down Expand Up @@ -452,7 +458,7 @@ mod tests {
))
.await
.unwrap();
assert_eq!(i, response.entry_id);
assert_eq!(i, response.last_entry_id);
}
let mut entries = HashSet::with_capacity(1024);
let mut s = logstore.read(&Namespace::with_id(1), 0).await.unwrap();
Expand Down Expand Up @@ -526,10 +532,7 @@ mod tests {
size
}

#[tokio::test]
async fn test_compaction() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("raft-engine-logstore-test");
async fn new_test_log_store(dir: &TempDir) -> RaftEngineLogStore {
let path = dir.path().to_str().unwrap().to_string();

let config = RaftEngineConfig {
Expand All @@ -539,7 +542,15 @@ mod tests {
..Default::default()
};

let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap();
RaftEngineLogStore::try_new(path, config).await.unwrap()
}

#[tokio::test]
async fn test_compaction() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("raft-engine-logstore-test");
let logstore = new_test_log_store(&dir).await;

let namespace = Namespace::with_id(42);
for id in 0..4096 {
let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec());
Expand All @@ -562,16 +573,8 @@ mod tests {
async fn test_obsolete() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("raft-engine-logstore-test");
let path = dir.path().to_str().unwrap().to_string();
let logstore = new_test_log_store(&dir).await;

let config = RaftEngineConfig {
file_size: ReadableSize::mb(2),
purge_threshold: ReadableSize::mb(4),
purge_interval: Duration::from_secs(5),
..Default::default()
};

let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap();
let namespace = Namespace::with_id(42);
for id in 0..1024 {
let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec());
Expand All @@ -591,16 +594,7 @@ mod tests {
async fn test_append_batch() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("logstore-append-batch-test");
let path = dir.path().to_str().unwrap().to_string();

let config = RaftEngineConfig {
file_size: ReadableSize::mb(2),
purge_threshold: ReadableSize::mb(4),
purge_interval: Duration::from_secs(5),
..Default::default()
};

let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap();
let logstore = new_test_log_store(&dir).await;

let entries = (0..8)
.flat_map(|ns_id| {
Expand All @@ -622,16 +616,7 @@ mod tests {
async fn test_append_batch_interleaved() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("logstore-append-batch-test");

let path = dir.path().to_str().unwrap().to_string();
let config = RaftEngineConfig {
file_size: ReadableSize::mb(2),
purge_threshold: ReadableSize::mb(4),
purge_interval: Duration::from_secs(5),
..Default::default()
};

let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap();
let logstore = new_test_log_store(&dir).await;

let entries = vec![
Entry::create(0, 0, [b'0'; 4096].to_vec()),
Expand All @@ -646,4 +631,30 @@ mod tests {
assert_eq!((Some(0), Some(2)), logstore.span(&Namespace::with_id(0)));
assert_eq!((Some(0), Some(1)), logstore.span(&Namespace::with_id(1)));
}

#[tokio::test]
async fn test_append_batch_response() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("logstore-append-batch-test");
let logstore = new_test_log_store(&dir).await;

let entries = vec![
// Entry[0] from region 0.
Entry::create(0, 0, [b'0'; 4096].to_vec()),
// Entry[0] from region 1.
Entry::create(0, 1, [b'1'; 4096].to_vec()),
// Entry[1] from region 1.
Entry::create(1, 0, [b'1'; 4096].to_vec()),
// Entry[1] from region 0.
Entry::create(1, 1, [b'0'; 4096].to_vec()),
// Entry[2] from region 2.
Entry::create(2, 2, [b'2'; 4096].to_vec()),
];

// Ensure the last entry id returned for each region is the expected one.
let last_entry_ids = logstore.append_batch(entries).await.unwrap().last_entry_ids;
assert_eq!(last_entry_ids[&0], 1);
assert_eq!(last_entry_ids[&1], 1);
assert_eq!(last_entry_ids[&2], 2);
}
}
2 changes: 0 additions & 2 deletions src/mito2/src/region_write_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ impl RegionWriteCtx {
&self.wal_entry,
&self.wal_options,
)?;
// We only call this method one time, but we still bump next entry id for consistency.
self.next_entry_id += 1;
Ok(())
}

Expand Down
6 changes: 2 additions & 4 deletions src/mito2/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use futures::StreamExt;
use prost::Message;
use snafu::ResultExt;
use store_api::logstore::entry::Entry;
use store_api::logstore::LogStore;
use store_api::logstore::{AppendBatchResponse, LogStore};
use store_api::storage::RegionId;

use crate::error::{
Expand Down Expand Up @@ -165,8 +165,7 @@ impl<S: LogStore> WalWriter<S> {
}

/// Write all buffered entries to the WAL.
// TODO(niebayes): returns an `AppendBatchResponse` and handle it properly.
pub async fn write_to_wal(&mut self) -> Result<()> {
pub async fn write_to_wal(&mut self) -> Result<AppendBatchResponse> {
// TODO(yingwen): metrics.

let entries = mem::take(&mut self.entries);
Expand All @@ -175,7 +174,6 @@ impl<S: LogStore> WalWriter<S> {
.await
.map_err(BoxedError::new)
.context(WriteWalSnafu)
.map(|_| ())
}
}

Expand Down
21 changes: 16 additions & 5 deletions src/mito2/src/worker/handle_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,23 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_ctx.set_error(e);
}
}
if let Err(e) = wal_writer.write_to_wal().await.map_err(Arc::new) {
// Failed to write wal.
for mut region_ctx in region_ctxs.into_values() {
region_ctx.set_error(e.clone());
match wal_writer.write_to_wal().await.map_err(Arc::new) {
Ok(response) => {
for (region_id, region_ctx) in region_ctxs.iter_mut() {
// Safety: the log store implementation ensures that either the `write_to_wal` fails and no
// response is returned or the last entry ids for each region do exist.
let last_entry_id =
response.last_entry_ids.get(&region_id.as_u64()).unwrap();
region_ctx.set_next_entry_id(last_entry_id + 1);
}
}
Err(e) => {
// Failed to write wal.
for mut region_ctx in region_ctxs.into_values() {
region_ctx.set_error(e.clone());
}
return;
}
return;
}
}

Expand Down
41 changes: 18 additions & 23 deletions src/store-api/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::HashMap;
use common_config::wal::WalOptions;
use common_error::ext::ErrorExt;

use crate::logstore::entry::{Entry, Id as EntryId, Offset as EntryOffset};
use crate::logstore::entry::{Entry, Id as EntryId};
use crate::logstore::entry_stream::SendableEntryStream;
use crate::logstore::namespace::{Id as NamespaceId, Namespace};

Expand All @@ -34,65 +34,60 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
type Namespace: Namespace;
type Entry: Entry;

/// Stop components of logstore.
/// Stops components of the logstore.
async fn stop(&self) -> Result<(), Self::Error>;

/// Append an `Entry` to WAL with given namespace and return append response containing
/// the entry id.
/// Appends an entry to the log store and returns a response containing the id of the append entry.
async fn append(&self, entry: Self::Entry) -> Result<AppendResponse, Self::Error>;

/// Append a batch of entries and return an append batch response containing the start entry ids of
/// log entries written to each region.
/// Appends a batch of entries and returns a response containing a map where the key is a region id
/// while the value is the id of the last successfully written entry of the region.
async fn append_batch(
&self,
entries: Vec<Self::Entry>,
) -> Result<AppendBatchResponse, Self::Error>;

/// Create a new `EntryStream` to asynchronously generates `Entry` with ids
/// Creates a new `EntryStream` to asynchronously generates `Entry` with ids
/// starting from `id`.
async fn read(
&self,
ns: &Self::Namespace,
id: EntryId,
) -> Result<SendableEntryStream<Self::Entry, Self::Error>, Self::Error>;

/// Create a new `Namespace`.
/// Creates a new `Namespace` from the given ref.
async fn create_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error>;

/// Delete an existing `Namespace` with given ref.
/// Deletes an existing `Namespace` specified by the given ref.
async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error>;

/// List all existing namespaces.
/// Lists all existing namespaces.
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>, Self::Error>;

/// Create an entry of the associate Entry type
/// Creates an entry of the associated Entry type
fn entry<D: AsRef<[u8]>>(&self, data: D, entry_id: EntryId, ns: Self::Namespace)
-> Self::Entry;

/// Create a namespace of the associate Namespace type
/// Creates a namespace of the associated Namespace type
// TODO(sunng87): confusion with `create_namespace`
fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace;

/// Mark all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete
/// the log files if all entries inside are obsolete. This method may not delete log
/// files immediately.
/// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete,
/// so that the log store can safely delete those entries. This method does not guarantee
/// that the obsolete entries are deleted immediately.
async fn obsolete(&self, ns: Self::Namespace, entry_id: EntryId) -> Result<(), Self::Error>;
}

/// The response of an `append` operation.
#[derive(Debug)]
pub struct AppendResponse {
/// The entry id of the appended log entry.
pub entry_id: EntryId,
/// The start entry offset of the appended log entry.
/// Depends on the `LogStore` implementation, the entry offset may be missing.
pub offset: Option<EntryOffset>,
/// The id of the entry appended to the log store.
pub last_entry_id: EntryId,
}

/// The response of an `append_batch` operation.
#[derive(Debug, Default)]
pub struct AppendBatchResponse {
/// Key: region id (as u64). Value: the known minimum start offset of the appended log entries belonging to the region.
/// Depends on the `LogStore` implementation, the entry offsets may be missing.
pub offsets: HashMap<u64, EntryOffset>,
/// Key: region id (as u64). Value: the id of the last successfully written entry of the region.
pub last_entry_ids: HashMap<u64, EntryId>,
}
15 changes: 10 additions & 5 deletions src/store-api/src/logstore/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,26 @@ use common_error::ext::ErrorExt;

use crate::logstore::namespace::Namespace;

/// An entry's logical id, allocated by log store users.
/// An entry's id.
/// Different log store implementations may interpret the id to different meanings.
pub type Id = u64;
/// An entry's physical offset in the underlying log store.
/// An entry's offset.
/// Notice: it's currently not used.
pub type Offset = usize;
niebayes marked this conversation as resolved.
Show resolved Hide resolved

/// Entry is the minimal data storage unit in `LogStore`.
/// Entry is the minimal data storage unit through which users interact with the log store.
/// The log store implementation may have larger or smaller data storage unit than an entry.
pub trait Entry: Send + Sync {
type Error: ErrorExt + Send + Sync;
type Namespace: Namespace;

/// Return contained data of entry.
/// Returns the contained data of the entry.
fn data(&self) -> &[u8];

/// Return entry id that monotonically increments.
/// Returns the id of the entry.
/// Usually the namespace id is identical with the region id.
fn id(&self) -> Id;

/// Returns the namespace of the entry.
fn namespace(&self) -> Self::Namespace;
}
3 changes: 3 additions & 0 deletions src/store-api/src/logstore/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@

use std::hash::Hash;

/// The namespace id.
/// Usually the namespace id is identical with the region id.
pub type Id = u64;

pub trait Namespace: Send + Sync + Clone + std::fmt::Debug + Hash + PartialEq + Eq {
/// Returns the namespace id.
fn id(&self) -> Id;
}
Loading