diff --git a/Cargo.lock b/Cargo.lock index 09dd7ff1c62a..74692cdbb5ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4114,7 +4114,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ec4b84931378004db60d168e2604bc3fb9735e9c#ec4b84931378004db60d168e2604bc3fb9735e9c" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=10c349c033dded29097d0dc933fbc2f89f658032#10c349c033dded29097d0dc933fbc2f89f658032" dependencies = [ "prost", "serde", @@ -5510,12 +5510,13 @@ dependencies = [ "datafusion-common", "datatypes", "futures", - "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ec4b84931378004db60d168e2604bc3fb9735e9c)", + "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=10c349c033dded29097d0dc933fbc2f89f658032)", "lazy_static", "log-store", "metrics", "object-store", "parquet", + "prost", "regex", "serde", "serde_json", diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 34fa492d09f2..b3375075c1e1 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -33,12 +33,13 @@ datafusion.workspace = true datatypes = { path = "../datatypes" } futures.workspace = true # TODO(yingwen): Update and use api crate once https://github.com/GreptimeTeam/greptime-proto/pull/75 is merged. -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec4b84931378004db60d168e2604bc3fb9735e9c" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "10c349c033dded29097d0dc933fbc2f89f658032" } lazy_static = "1.4" log-store = { path = "../log-store" } metrics.workspace = true object-store = { path = "../object-store" } parquet = { workspace = true, features = ["async"] } +prost.workspace = true regex = "1.5" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index e4a74527cb7c..f772795afa53 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -52,6 +52,9 @@ impl MitoEngine { } /// Stop the engine. + /// + /// Stopping the engine doesn't stop the underlying log store as other components might + /// still use it. pub async fn stop(&self) -> Result<()> { self.inner.stop().await } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 4b92522118d3..e5778a9c2bfa 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -15,9 +15,10 @@ use std::any::Any; use common_datasource::compression::CompressionType; -use common_error::ext::ErrorExt; +use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use datatypes::arrow::error::ArrowError; +use prost::{DecodeError, EncodeError}; use snafu::{Location, Snafu}; use store_api::manifest::ManifestVersion; use store_api::storage::RegionId; @@ -205,6 +206,60 @@ pub enum Error { column: String, source: datatypes::Error, }, + + #[snafu(display( + "Failed to encode WAL entry, region_id: {}, location: {}, source: {}", + region_id, + location, + source + ))] + EncodeWal { + region_id: RegionId, + location: Location, + source: EncodeError, + }, + + #[snafu(display("Failed to write WAL, location: {}, source: {}", location, source))] + WriteWal { + location: Location, + source: BoxedError, + }, + + #[snafu(display( + "Failed to read WAL, region_id: {}, location: {}, source: {}", + region_id, + location, + source + ))] + ReadWal { + region_id: RegionId, + location: Location, + source: BoxedError, + }, + + #[snafu(display( + "Failed to decode WAL entry, region_id: {}, location: {}, source: {}", + region_id, + location, + source + ))] + DecodeWal { + region_id: RegionId, + location: Location, + source: DecodeError, + }, + + #[snafu(display( + "Failed to delete WAL, region_id: {}, location: {}, source: {}", + region_id, + location, + source + ))] + DeleteWal { + region_id: RegionId, + location: Location, + source: BoxedError, + }, } pub type Result = std::result::Result; @@ -214,9 +269,12 @@ impl ErrorExt for Error { use Error::*; match self { - OpenDal { .. } | WriteParquet { .. } | ReadParquet { .. } => { - StatusCode::StorageUnavailable - } + OpenDal { .. } + | WriteParquet { .. } + | ReadParquet { .. } + | WriteWal { .. } + | ReadWal { .. } + | DeleteWal { .. } => StatusCode::StorageUnavailable, CompressObject { .. } | DecompressObject { .. } | SerdeJson { .. } @@ -231,9 +289,12 @@ impl ErrorExt for Error { | InvalidSchema { .. } | InvalidRequest { .. } | FillDefault { .. } => StatusCode::InvalidArguments, - RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => { - StatusCode::Internal - } + RegionMetadataNotFound { .. } + | Join { .. } + | WorkerStopped { .. } + | Recv { .. } + | EncodeWal { .. } + | DecodeWal { .. } => StatusCode::Internal, WriteBuffer { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 1aeacc9270d1..015c96c5c1f6 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -39,6 +39,7 @@ mod region; pub mod request; #[allow(dead_code)] pub mod sst; +pub mod wal; #[allow(dead_code)] mod worker; diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 837041edc7ba..3c56c50a9fe3 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -40,6 +40,7 @@ use crate::worker::WorkerGroup; pub struct TestEnv { /// Path to store data. data_home: TempDir, + // TODO(yingwen): Maybe provide a way to close the log store. } impl Default for TestEnv { diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs new file mode 100644 index 000000000000..ff0ece386f60 --- /dev/null +++ b/src/mito2/src/wal.rs @@ -0,0 +1,351 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Write ahead log of the engine. + +use std::mem; +use std::sync::Arc; + +use async_stream::try_stream; +use common_error::ext::BoxedError; +use futures::stream::BoxStream; +use futures::StreamExt; +use greptime_proto::v1::mito::WalEntry; +use prost::Message; +use snafu::ResultExt; +use store_api::logstore::entry::Entry; +use store_api::logstore::LogStore; +use store_api::storage::RegionId; + +use crate::error::{ + DecodeWalSnafu, DeleteWalSnafu, EncodeWalSnafu, ReadWalSnafu, Result, WriteWalSnafu, +}; + +/// WAL entry id. +pub type EntryId = store_api::logstore::entry::Id; +/// A stream that yields tuple of WAL entry id and corresponding entry. +pub type WalEntryStream<'a> = BoxStream<'a, Result<(EntryId, WalEntry)>>; + +/// Write ahead log. +/// +/// All regions in the engine shares the same WAL instance. +#[derive(Debug)] +pub struct Wal { + /// The underlying log store. + store: Arc, +} + +impl Wal { + /// Creates a new [Wal] from the log store. + pub fn new(store: Arc) -> Self { + Self { store } + } +} + +impl Wal { + /// Returns a writer to write to the WAL. + pub fn writer(&self) -> WalWriter { + WalWriter { + store: self.store.clone(), + entries: Vec::new(), + entry_encode_buf: Vec::new(), + } + } + + /// Scan entries of specific region starting from `start_id` (inclusive). + pub fn scan(&self, region_id: RegionId, start_id: EntryId) -> Result { + let stream = try_stream!({ + let namespace = self.store.namespace(region_id.into()); + let mut stream = self + .store + .read(&namespace, start_id) + .await + .map_err(BoxedError::new) + .context(ReadWalSnafu { region_id })?; + + while let Some(entries) = stream.next().await { + let entries = entries + .map_err(BoxedError::new) + .context(ReadWalSnafu { region_id })?; + + for entry in entries { + yield decode_entry(region_id, entry)?; + } + } + }); + + Ok(Box::pin(stream)) + } + + /// Mark entries whose ids `<= last_id` as deleted. + pub async fn obsolete(&self, region_id: RegionId, last_id: EntryId) -> Result<()> { + let namespace = self.store.namespace(region_id.into()); + self.store + .obsolete(namespace, last_id) + .await + .map_err(BoxedError::new) + .context(DeleteWalSnafu { region_id }) + } +} + +/// Decode Wal entry from log store. +fn decode_entry(region_id: RegionId, entry: E) -> Result<(EntryId, WalEntry)> { + let entry_id = entry.id(); + let data = entry.data(); + + let wal_entry = WalEntry::decode(data).context(DecodeWalSnafu { region_id })?; + + Ok((entry_id, wal_entry)) +} + +/// WAL batch writer. +pub struct WalWriter { + /// Log store of the WAL. + store: Arc, + /// Entries to write. + entries: Vec, + /// Buffer to encode WAL entry. + entry_encode_buf: Vec, +} + +impl WalWriter { + /// Add an wal entry for specific region to the writer's buffer. + pub fn add_entry( + &mut self, + region_id: RegionId, + entry_id: EntryId, + wal_entry: &WalEntry, + ) -> Result<()> { + let namespace = self.store.namespace(region_id.into()); + // Encode wal entry to log store entry. + self.entry_encode_buf.clear(); + wal_entry + .encode(&mut self.entry_encode_buf) + .context(EncodeWalSnafu { region_id })?; + let entry = self + .store + .entry(&self.entry_encode_buf, entry_id, namespace); + + self.entries.push(entry); + + Ok(()) + } + + /// Write all buffered entries to the WAL. + pub async fn write_to_wal(&mut self) -> Result<()> { + // TODO(yingwen): metrics. + + let entries = mem::take(&mut self.entries); + self.store + .append_batch(entries) + .await + .map_err(BoxedError::new) + .context(WriteWalSnafu) + } +} + +#[cfg(test)] +mod tests { + use common_test_util::temp_dir::{create_temp_dir, TempDir}; + use futures::TryStreamExt; + use greptime_proto::v1::mito::{Mutation, OpType}; + use greptime_proto::v1::{value, ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; + use log_store::raft_engine::log_store::RaftEngineLogStore; + use log_store::test_util::log_store_util; + use store_api::storage::SequenceNumber; + + use super::*; + + struct WalEnv { + _wal_dir: TempDir, + log_store: Option>, + } + + impl WalEnv { + async fn new() -> WalEnv { + let wal_dir = create_temp_dir(""); + let log_store = + log_store_util::create_tmp_local_file_log_store(wal_dir.path().to_str().unwrap()) + .await; + WalEnv { + _wal_dir: wal_dir, + log_store: Some(Arc::new(log_store)), + } + } + + fn new_wal(&self) -> Wal { + let log_store = self.log_store.clone().unwrap(); + Wal::new(log_store) + } + } + + /// Create a new mutation from rows. + /// + /// The row format is (string, i64). + fn new_mutation(op_type: OpType, sequence: SequenceNumber, rows: &[(&str, i64)]) -> Mutation { + let rows = rows + .iter() + .map(|(str_col, int_col)| { + let values = vec![ + Value { + value: Some(value::Value::StringValue(str_col.to_string())), + }, + Value { + value: Some(value::Value::TsMillisecondValue(*int_col)), + }, + ]; + Row { values } + }) + .collect(); + let schema = vec![ + ColumnSchema { + column_name: "tag".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + }, + ColumnSchema { + column_name: "ts".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + }, + ]; + + Mutation { + op_type: op_type as i32, + sequence, + rows: Some(Rows { schema, rows }), + } + } + + #[tokio::test] + async fn test_write_wal() { + let env = WalEnv::new().await; + let wal = env.new_wal(); + + let entry = WalEntry { + mutations: vec![ + new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]), + new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]), + ], + }; + let mut writer = wal.writer(); + // Region 1 entry 1. + writer.add_entry(RegionId::new(1, 1), 1, &entry).unwrap(); + // Region 2 entry 1. + writer.add_entry(RegionId::new(1, 2), 1, &entry).unwrap(); + // Region 1 entry 2. + writer.add_entry(RegionId::new(1, 1), 2, &entry).unwrap(); + + // Test writing multiple region to wal. + writer.write_to_wal().await.unwrap(); + } + + fn sample_entries() -> Vec { + vec![ + WalEntry { + mutations: vec![ + new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]), + new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]), + ], + }, + WalEntry { + mutations: vec![new_mutation(OpType::Put, 3, &[("k1", 1), ("k2", 2)])], + }, + WalEntry { + mutations: vec![ + new_mutation(OpType::Put, 4, &[("k1", 1), ("k2", 2)]), + new_mutation(OpType::Put, 5, &[("k3", 3), ("k4", 4)]), + ], + }, + WalEntry { + mutations: vec![new_mutation(OpType::Put, 6, &[("k1", 1), ("k2", 2)])], + }, + ] + } + + fn check_entries( + expect: &[WalEntry], + expect_start_id: EntryId, + actual: &[(EntryId, WalEntry)], + ) { + for (idx, (expect_entry, (actual_id, actual_entry))) in + expect.iter().zip(actual.iter()).enumerate() + { + let expect_id_entry = (expect_start_id + idx as u64, expect_entry); + assert_eq!(expect_id_entry, (*actual_id, actual_entry)); + } + assert_eq!(expect.len(), actual.len()); + } + + #[tokio::test] + async fn test_scan_wal() { + let env = WalEnv::new().await; + let wal = env.new_wal(); + + let entries = sample_entries(); + let (id1, id2) = (RegionId::new(1, 1), RegionId::new(1, 2)); + let mut writer = wal.writer(); + writer.add_entry(id1, 1, &entries[0]).unwrap(); + // Insert one entry into region2. Scan should not return this entry. + writer.add_entry(id2, 1, &entries[0]).unwrap(); + writer.add_entry(id1, 2, &entries[1]).unwrap(); + writer.add_entry(id1, 3, &entries[2]).unwrap(); + writer.add_entry(id1, 4, &entries[3]).unwrap(); + + writer.write_to_wal().await.unwrap(); + + // Scan all contents region1 + let stream = wal.scan(id1, 1).unwrap(); + let actual: Vec<_> = stream.try_collect().await.unwrap(); + check_entries(&entries, 1, &actual); + + // Scan parts of contents + let stream = wal.scan(id1, 2).unwrap(); + let actual: Vec<_> = stream.try_collect().await.unwrap(); + check_entries(&entries[1..], 2, &actual); + + // Scan out of range + let stream = wal.scan(id1, 5).unwrap(); + let actual: Vec<_> = stream.try_collect().await.unwrap(); + assert!(actual.is_empty()); + } + + #[tokio::test] + async fn test_obsolete_wal() { + let env = WalEnv::new().await; + let wal = env.new_wal(); + + let entries = sample_entries(); + let mut writer = wal.writer(); + let region_id = RegionId::new(1, 1); + writer.add_entry(region_id, 1, &entries[0]).unwrap(); + writer.add_entry(region_id, 2, &entries[1]).unwrap(); + writer.add_entry(region_id, 3, &entries[2]).unwrap(); + + writer.write_to_wal().await.unwrap(); + + // Delete 1, 2. + wal.obsolete(region_id, 2).await.unwrap(); + + // Put 4. + let mut writer = wal.writer(); + writer.add_entry(region_id, 4, &entries[3]).unwrap(); + writer.write_to_wal().await.unwrap(); + + // Scan all + let stream = wal.scan(region_id, 1).unwrap(); + let actual: Vec<_> = stream.try_collect().await.unwrap(); + check_entries(&entries[2..], 3, &actual); + } +} diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index dd26f9ac3960..2ba836666006 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -39,6 +39,7 @@ use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef}; use crate::region::{RegionMap, RegionMapRef}; use crate::request::{RegionRequest, RequestBody, SenderWriteRequest, WorkerRequest}; +use crate::wal::Wal; /// Identifier for a worker. pub(crate) type WorkerId = u32; @@ -179,7 +180,7 @@ impl RegionWorker { config, regions: regions.clone(), receiver, - log_store, + wal: Wal::new(log_store), object_store, running: running.clone(), memtable_builder: Arc::new(DefaultMemtableBuilder::default()), @@ -274,8 +275,8 @@ struct RegionWorkerLoop { regions: RegionMapRef, /// Request receiver. receiver: Receiver, - // TODO(yingwen): Replaced by Wal. - log_store: Arc, + /// WAL of the engine. + wal: Wal, /// Object store for manifest and SSTs. object_store: ObjectStore, /// Whether the worker thread is still running.