Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tests-integration): add a naive region migration integration test #3078

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
Some((_, Instruction::OpenRegion { .. }))
| Some((_, Instruction::CloseRegion { .. }))
| Some((_, Instruction::DowngradeRegion { .. }))
| Some((_, Instruction::UpgradeRegion { .. }))
)
}

Expand Down Expand Up @@ -134,7 +135,7 @@ mod tests {
use common_meta::heartbeat::mailbox::{
HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
};
use common_meta::instruction::{DowngradeRegion, OpenRegion};
use common_meta::instruction::{DowngradeRegion, OpenRegion, UpgradeRegion};
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
Expand Down Expand Up @@ -175,6 +176,44 @@ mod tests {
}
}

#[test]
fn test_is_acceptable() {
common_telemetry::init_default_ut_logging();
let region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let heartbeat_env = HeartbeatResponseTestEnv::new();
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");

// Open region
let region_id = RegionId::new(1024, 1);
let storage_path = "test";
let instruction = open_region_instruction(region_id, storage_path);
assert!(heartbeat_handler
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));

// Close region
let instruction = close_region_instruction(region_id);
assert!(heartbeat_handler
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));

// Downgrade region
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id: RegionId::new(2048, 1),
});
assert!(heartbeat_handler
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));

// Upgrade region
let instruction = Instruction::UpgradeRegion(UpgradeRegion {
region_id,
last_entry_id: None,
wait_for_replay_timeout: None,
});
assert!(
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
);
}

fn close_region_instruction(region_id: RegionId) -> Instruction {
Instruction::CloseRegion(RegionIdent {
table_id: region_id.table_id(),
Expand Down
8 changes: 3 additions & 5 deletions src/datanode/src/heartbeat/handler/open_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use common_error::ext::ErrorExt;
use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
use common_meta::wal::prepare_wal_option;
use futures_util::future::BoxFuture;
use store_api::path_utils::region_dir;
use store_api::region_request::{RegionOpenRequest, RegionRequest};
Expand All @@ -26,26 +27,23 @@ impl HandlerContext {
OpenRegion {
region_ident,
region_storage_path,
region_options,
mut region_options,
region_wal_options,
skip_wal_replay,
}: OpenRegion,
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
// TODO(niebayes): extends region options with region_wal_options.
let _ = region_wal_options;
prepare_wal_option(&mut region_options, region_id, &region_wal_options);
let request = RegionRequest::Open(RegionOpenRequest {
engine: region_ident.engine,
region_dir: region_dir(&region_storage_path, region_id),
options: region_options,
skip_wal_replay,
});
let result = self.region_server.handle_request(region_id, request).await;

let success = result.is_ok();
let error = result.as_ref().map_err(|e| e.output_msg()).err();

InstructionReply::OpenRegion(SimpleReply {
result: success,
error,
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
pub use manager::RegionMigrationProcedureTask;
use serde::{Deserialize, Serialize};
use snafu::{location, Location, OptionExt, ResultExt};
use store_api::storage::RegionId;
Expand Down
15 changes: 13 additions & 2 deletions src/meta-srv/src/procedure/region_migration/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,24 @@ impl Drop for RegionMigrationProcedureGuard {
}

#[derive(Debug, Clone)]
pub(crate) struct RegionMigrationProcedureTask {
pub struct RegionMigrationProcedureTask {
pub(crate) cluster_id: ClusterId,
pub(crate) region_id: RegionId,
pub(crate) from_peer: Peer,
pub(crate) to_peer: Peer,
}

impl RegionMigrationProcedureTask {
pub fn new(cluster_id: ClusterId, region_id: RegionId, from_peer: Peer, to_peer: Peer) -> Self {
Self {
cluster_id,
region_id,
from_peer,
to_peer,
}
}
}

impl Display for RegionMigrationProcedureTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand Down Expand Up @@ -223,7 +234,7 @@ impl RegionMigrationManager {
}

/// Submits a new region migration procedure.
pub(crate) async fn submit_procedure(
pub async fn submit_procedure(
&self,
task: RegionMigrationProcedureTask,
) -> Result<Option<ProcedureId>> {
Expand Down
57 changes: 41 additions & 16 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,18 @@ use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use meta_client::client::MetaClientBuilder;
use meta_srv::cluster::MetaPeerClientRef;
use meta_srv::metasrv::{MetaSrv, MetaSrvOptions};
use meta_srv::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef};
use meta_srv::mocks::MockInfo;
use servers::grpc::GrpcServer;
use servers::heartbeat_options::HeartbeatOptions;
use servers::Mode;
use tempfile::TempDir;
use tonic::transport::Server;
use tower::service_fn;

use crate::test_util::{
self, create_datanode_opts, create_tmp_dir_and_datanode_opts, FileDirGuard, StorageGuard,
StorageType,
StorageType, PEER_PLACEHOLDER_ADDR,
};

pub struct GreptimeDbCluster {
Expand All @@ -75,6 +76,8 @@ pub struct GreptimeDbClusterBuilder {
datanodes: Option<u32>,
wal_config: WalConfig,
meta_wal_config: MetaWalConfig,
shared_home_dir: Option<Arc<TempDir>>,
meta_selector: Option<SelectorRef>,
}

impl GreptimeDbClusterBuilder {
Expand Down Expand Up @@ -102,34 +105,53 @@ impl GreptimeDbClusterBuilder {
datanodes: None,
wal_config: WalConfig::default(),
meta_wal_config: MetaWalConfig::default(),
shared_home_dir: None,
meta_selector: None,
}
}

#[must_use]
pub fn with_store_config(mut self, store_config: ObjectStoreConfig) -> Self {
self.store_config = Some(store_config);
self
}

#[must_use]
pub fn with_store_providers(mut self, store_providers: Vec<StorageType>) -> Self {
self.store_providers = Some(store_providers);
self
}

#[must_use]
pub fn with_datanodes(mut self, datanodes: u32) -> Self {
self.datanodes = Some(datanodes);
self
}

#[must_use]
pub fn with_wal_config(mut self, wal_config: WalConfig) -> Self {
self.wal_config = wal_config;
self
}

#[must_use]
pub fn with_meta_wal_config(mut self, wal_meta: MetaWalConfig) -> Self {
self.meta_wal_config = wal_meta;
self
}

#[must_use]
pub fn with_shared_home_dir(mut self, shared_home_dir: Arc<TempDir>) -> Self {
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
self.shared_home_dir = Some(shared_home_dir);
self
}

#[must_use]
pub fn with_meta_selector(mut self, selector: SelectorRef) -> Self {
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
self.meta_selector = Some(selector);
self
}

pub async fn build(self) -> GreptimeDbCluster {
let datanodes = self.datanodes.unwrap_or(4);

Expand All @@ -147,7 +169,13 @@ impl GreptimeDbClusterBuilder {
..Default::default()
};

let meta_srv = self.build_metasrv(opt, datanode_clients.clone()).await;
let meta_srv = meta_srv::mocks::mock(
opt,
self.kv_backend.clone(),
self.meta_selector.clone(),
Some(datanode_clients.clone()),
)
.await;

let (datanode_instances, storage_guards, dir_guards) =
self.build_datanodes(meta_srv.clone(), datanodes).await;
Expand Down Expand Up @@ -175,14 +203,6 @@ impl GreptimeDbClusterBuilder {
}
}

async fn build_metasrv(
&self,
opt: MetaSrvOptions,
datanode_clients: Arc<DatanodeClients>,
) -> MockInfo {
meta_srv::mocks::mock(opt, self.kv_backend.clone(), None, Some(datanode_clients)).await
}

async fn build_datanodes(
&self,
meta_srv: MockInfo,
Expand All @@ -200,10 +220,15 @@ impl GreptimeDbClusterBuilder {
let datanode_id = i as u64 + 1;
let mode = Mode::Distributed;
let mut opts = if let Some(store_config) = &self.store_config {
let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
let home_dir = if let Some(home_dir) = &self.shared_home_dir {
home_dir.path().to_str().unwrap().to_string()
} else {
let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
dir_guards.push(FileDirGuard::new(home_tmp_dir));

dir_guards.push(FileDirGuard::new(home_tmp_dir));
home_dir
};

create_datanode_opts(
mode,
Expand Down Expand Up @@ -370,8 +395,8 @@ async fn create_datanode_client(datanode: &Datanode) -> (String, Client) {
// Move client to an option so we can _move_ the inner value
// on the first attempt to connect. All other attempts will fail.
let mut client = Some(client);
// "127.0.0.1:3001" is just a placeholder, does not actually connect to it.
let addr = "127.0.0.1:3001";
// `PEER_PLACEHOLDER_ADDR` is just a placeholder, does not actually connect to it.
let addr = PEER_PLACEHOLDER_ADDR;
let channel_manager = ChannelManager::new();
let _ = channel_manager
.reset_with_connector(
Expand Down
5 changes: 5 additions & 0 deletions tests-integration/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl GreptimeDbStandaloneBuilder {
}
}

#[must_use]
pub fn with_default_store_type(self, store_type: StorageType) -> Self {
Self {
default_store: Some(store_type),
Expand All @@ -73,6 +74,7 @@ impl GreptimeDbStandaloneBuilder {
}

#[cfg(test)]
#[must_use]
pub fn with_store_providers(self, store_providers: Vec<StorageType>) -> Self {
Self {
store_providers: Some(store_providers),
Expand All @@ -81,18 +83,21 @@ impl GreptimeDbStandaloneBuilder {
}

#[cfg(test)]
#[must_use]
pub fn with_plugin(self, plugin: Plugins) -> Self {
Self {
plugin: Some(plugin),
..self
}
}

#[must_use]
pub fn with_wal_config(mut self, wal_config: WalConfig) -> Self {
self.wal_config = wal_config;
self
}

#[must_use]
pub fn with_meta_wal_config(mut self, wal_meta: MetaWalConfig) -> Self {
self.meta_wal_config = wal_meta;
self
Expand Down
23 changes: 23 additions & 0 deletions tests-integration/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use common_meta::key::schema_name::SchemaNameKey;
use common_query::Output;
use common_recordbatch::util;
use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::warn;
use common_test_util::ports;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datanode::config::{
Expand All @@ -36,6 +37,7 @@ use datanode::config::{
use frontend::frontend::TomlSerializable;
use frontend::instance::Instance;
use frontend::service_config::{MysqlOptions, PostgresOptions};
use futures::future::BoxFuture;
use object_store::services::{Azblob, Gcs, Oss, S3};
use object_store::test_util::TempFolder;
use object_store::ObjectStore;
Expand All @@ -54,6 +56,8 @@ use session::context::QueryContext;

use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder};

pub const PEER_PLACEHOLDER_ADDR: &str = "127.0.0.1:3001";

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum StorageType {
S3,
Expand Down Expand Up @@ -662,3 +666,22 @@ pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) {
.await
.unwrap();
}

pub async fn run_test_with_kafka_wal<F>(test: F)
where
F: FnOnce(Vec<String>) -> BoxFuture<'static, ()>,
{
let _ = dotenv::dotenv();
let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
if endpoints.is_empty() {
warn!("The endpoints is empty, skipping the test");
return;
}

let endpoints = endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>();

test(endpoints).await
}
Loading
Loading