Skip to content

Commit

Permalink
refactor: remove wal options from region create/open requests
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 15, 2023
1 parent 1ef6e86 commit 37a925b
Show file tree
Hide file tree
Showing 40 changed files with 55 additions and 124 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "28725502aa3de91858da16cd4bd23c4066cebc6a" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b1d403088f02136bcebde53d604f491c260ca8e2" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
4 changes: 4 additions & 0 deletions src/common/config/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ use serde::{Deserialize, Serialize};
pub use crate::wal::kafka::KafkaConfig;
pub use crate::wal::raft_engine::RaftEngineConfig;

// TODO(niebayes): redirect to actual types.
pub type WalOptions = String;
pub type EncodedWalOptions = String;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "provider")]
pub enum WalConfig {
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async-trait.workspace = true
base64.workspace = true
bytes = "1.4"
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
common-grpc-expr.workspace = true
common-macro.workspace = true
Expand Down
3 changes: 0 additions & 3 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{
CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader,
Expand Down Expand Up @@ -181,7 +179,6 @@ impl CreateTableProcedure {
primary_key,
path: String::new(),
options: create_table_expr.table_options.clone(),
wal_options: HashMap::default(),
})
}

Expand Down
10 changes: 1 addition & 9 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,14 @@ pub struct OpenRegion {
pub region_ident: RegionIdent,
pub region_storage_path: String,
pub options: HashMap<String, String>,
pub wal_options: HashMap<String, String>,
}

impl OpenRegion {
pub fn new(
region_ident: RegionIdent,
path: &str,
options: HashMap<String, String>,
wal_options: HashMap<String, String>,
) -> Self {
pub fn new(region_ident: RegionIdent, path: &str, options: HashMap<String, String>) -> Self {
Self {
region_ident,
region_storage_path: path.to_string(),
options,
wal_options,
}
}
}
Expand Down Expand Up @@ -225,7 +218,6 @@ mod tests {
},
"test/foo",
HashMap::new(),
HashMap::new(),
));

let serialized = serde_json::to_string(&open_region).unwrap();
Expand Down
10 changes: 5 additions & 5 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,23 +345,24 @@ impl DatanodeBuilder {
while let Some(table_value) = table_values.next().await {
let table_value = table_value.context(GetMetadataSnafu)?;
for region_number in table_value.regions {
// TODO(niebayes): fetch region wal options from table value.
let region_wal_options: HashMap<String, String> = HashMap::default();
// TODO(niebayes): fetch wal options from region info.
let wal_options: HashMap<String, String> = HashMap::default();

regions.push((
RegionId::new(table_value.table_id, region_number),
table_value.region_info.engine.clone(),
table_value.region_info.region_storage_path.clone(),
table_value.region_info.region_options.clone(),
region_wal_options,
wal_options,
));
}
}
info!("going to open {} regions", regions.len());
let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM));
let mut tasks = vec![];

for (region_id, engine, store_path, options, wal_options) in regions {
// TODO(niebayes): integrate wal options into options.
for (region_id, engine, store_path, options, _wal_options) in regions {
let region_dir = region_dir(&store_path, region_id);
let semaphore_moved = semaphore.clone();
tasks.push(async move {
Expand All @@ -373,7 +374,6 @@ impl DatanodeBuilder {
engine: engine.clone(),
region_dir,
options,
wal_options,
}),
)
.await?;
Expand Down
3 changes: 0 additions & 3 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,13 @@ impl RegionHeartbeatResponseHandler {
region_ident,
region_storage_path,
options,
wal_options,
}) => Ok(Box::new(|region_server| {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
let request = RegionRequest::Open(RegionOpenRequest {
engine: region_ident.engine,
region_dir: region_dir(&region_storage_path, region_id),
options,
wal_options,
});
let result = region_server.handle_request(region_id, request).await;

Expand Down Expand Up @@ -241,7 +239,6 @@ mod tests {
},
path,
HashMap::new(),
HashMap::new(),
))
}

Expand Down
9 changes: 1 addition & 8 deletions src/file-engine/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,7 @@ mod tests {

use super::*;
use crate::error::Error;
use crate::test_util::{
new_test_column_metadata, new_test_object_store, new_test_options, new_test_wal_options,
};
use crate::test_util::{new_test_column_metadata, new_test_object_store, new_test_options};

#[tokio::test]
async fn test_create_region() {
Expand All @@ -115,7 +113,6 @@ mod tests {
column_metadatas: new_test_column_metadata(),
primary_key: vec![1],
options: new_test_options(),
wal_options: new_test_wal_options(),
region_dir: "create_region_dir/".to_string(),
};
let region_id = RegionId::new(1, 0);
Expand Down Expand Up @@ -154,7 +151,6 @@ mod tests {
column_metadatas: new_test_column_metadata(),
primary_key: vec![1],
options: new_test_options(),
wal_options: new_test_wal_options(),
region_dir: region_dir.clone(),
};
let region_id = RegionId::new(1, 0);
Expand All @@ -167,7 +163,6 @@ mod tests {
engine: "file".to_string(),
region_dir,
options: HashMap::default(),
wal_options: HashMap::default(),
};

let region = FileRegion::open(region_id, request, &object_store)
Expand All @@ -193,7 +188,6 @@ mod tests {
column_metadatas: new_test_column_metadata(),
primary_key: vec![1],
options: new_test_options(),
wal_options: new_test_wal_options(),
region_dir: region_dir.clone(),
};
let region_id = RegionId::new(1, 0);
Expand All @@ -217,7 +211,6 @@ mod tests {
engine: "file".to_string(),
region_dir,
options: HashMap::default(),
wal_options: HashMap::default(),
};
let err = FileRegion::open(region_id, request, &object_store)
.await
Expand Down
4 changes: 0 additions & 4 deletions src/file-engine/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,3 @@ pub fn new_test_options() -> HashMap<String, String> {
),
])
}

pub fn new_test_wal_options() -> HashMap<String, String> {
HashMap::default()
}
8 changes: 2 additions & 6 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::HashMap;

use common_config::wal::KafkaConfig;
use common_config::wal::{KafkaConfig, WalOptions};
use store_api::logstore::entry::Id as EntryId;
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::Id as NamespaceId;
Expand Down Expand Up @@ -73,11 +73,7 @@ impl LogStore for KafkaLogStore {
}

/// Create a namespace of the associate Namespace type
fn namespace(
&self,
ns_id: NamespaceId,
wal_options: &HashMap<String, String>,
) -> Result<Self::Namespace> {
fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Result<Self::Namespace> {
todo!()
}

Expand Down
7 changes: 2 additions & 5 deletions src/log-store/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;

use common_config::wal::WalOptions;
use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};
Expand Down Expand Up @@ -111,11 +112,7 @@ impl LogStore for NoopLogStore {
EntryImpl
}

fn namespace(
&self,
ns_id: NamespaceId,
wal_options: &HashMap<String, String>,
) -> Result<Self::Namespace> {
fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Result<Self::Namespace> {
let _ = ns_id;
let _ = wal_options;
Ok(NamespaceImpl)
Expand Down
9 changes: 2 additions & 7 deletions src/log-store/src/raft_engine/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use async_stream::stream;
use common_config::wal::RaftEngineConfig;
use common_config::wal::{RaftEngineConfig, WalOptions};
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::{error, info};
use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode};
Expand Down Expand Up @@ -342,11 +341,7 @@ impl LogStore for RaftEngineLogStore {
}
}

fn namespace(
&self,
ns_id: NamespaceId,
wal_options: &HashMap<String, String>,
) -> Result<Self::Namespace> {
fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Result<Self::Namespace> {
let _ = wal_options;
Ok(Namespace {
id: ns_id,
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ catalog.workspace = true
client.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
common-greptimedb-telemetry.workspace = true
common-grpc-expr.workspace = true
Expand Down
1 change: 0 additions & 1 deletion src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,6 @@ mod tests {
opening_region,
&path,
HashMap::new(),
HashMap::new(),
)))
.unwrap(),
))
Expand Down
8 changes: 0 additions & 8 deletions src/meta-srv/src/procedure/region_failover/activate_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ pub(super) struct ActivateRegion {
remark_inactive_region: bool,
region_storage_path: Option<String>,
region_options: Option<HashMap<String, String>>,
region_wal_options: Option<HashMap<String, String>>,
}

impl ActivateRegion {
Expand All @@ -53,7 +52,6 @@ impl ActivateRegion {
remark_inactive_region: false,
region_storage_path: None,
region_options: None,
region_wal_options: None,
}
}

Expand Down Expand Up @@ -83,18 +81,14 @@ impl ActivateRegion {
};
info!("Activating region: {candidate_ident:?}");
let region_options: HashMap<String, String> = (&table_info.meta.options).into();
// TODO(niebayes): properly fetch or construct region wal options.
let region_wal_options = HashMap::default();
let instruction = Instruction::OpenRegion(OpenRegion::new(
candidate_ident.clone(),
&region_storage_path,
region_options.clone(),
region_wal_options.clone(),
));

self.region_storage_path = Some(region_storage_path);
self.region_options = Some(region_options);
self.region_wal_options = Some(region_wal_options);

let msg = MailboxMessage::json_message(
"Activate Region",
Expand Down Expand Up @@ -229,7 +223,6 @@ mod tests {
},
&env.path,
HashMap::new(),
HashMap::new(),
)))
.unwrap(),
))
Expand Down Expand Up @@ -300,7 +293,6 @@ mod tests {
},
&env.path,
HashMap::new(),
HashMap::new(),
)))
.unwrap(),
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ impl OpenCandidateRegion {

let engine = table_info.meta.engine.clone();
let region_options: HashMap<String, String> = (&table_info.meta.options).into();
// TODO(niebayes): properly fetch or construct region wal options.
let region_wal_options = HashMap::default();

let open_instruction = Instruction::OpenRegion(OpenRegion::new(
RegionIdent {
Expand All @@ -89,7 +87,6 @@ impl OpenCandidateRegion {
},
&region_storage_path,
region_options,
region_wal_options,
));

Ok(open_instruction)
Expand Down Expand Up @@ -214,7 +211,6 @@ mod tests {
},
region_storage_path: "/bar/foo/region/".to_string(),
options: Default::default(),
wal_options: Default::default(),
})
}

Expand Down
Loading

0 comments on commit 37a925b

Please sign in to comment.