Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make test-integration able to compile #2384

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::time::Duration;

use clap::Parser;
use common_telemetry::logging;
use datanode::datanode::builder::DatanodeBuilder;
use datanode::datanode::{Datanode, DatanodeOptions};
use meta_client::MetaClientOptions;
use servers::Mode;
Expand Down Expand Up @@ -162,7 +163,8 @@ impl StartCommand {
logging::info!("Datanode start command: {:#?}", self);
logging::info!("Datanode options: {:#?}", opts);

let datanode = Datanode::new(opts, Default::default())
let datanode = DatanodeBuilder::new(opts, Default::default())
.build()
.await
.context(StartDatanodeSnafu)?;

Expand Down
4 changes: 3 additions & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
use datanode::datanode::builder::DatanodeBuilder;
use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig};
use datanode::region_server::RegionServer;
use frontend::catalog::FrontendCatalogManager;
Expand Down Expand Up @@ -306,7 +307,8 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

let datanode = Datanode::new(dn_opts.clone(), plugins.clone())
let datanode = DatanodeBuilder::new(dn_opts.clone(), plugins.clone())
.build()
.await
.context(StartDatanodeSnafu)?;
let region_server = datanode.region_server();
Expand Down
3 changes: 3 additions & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true

[features]
testing = []

[dependencies]
api = { workspace = true }
arrow-flight.workspace = true
Expand Down
5 changes: 5 additions & 0 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ impl TableMetadataManager {
&self.table_route_manager
}

#[cfg(feature = "testing")]
pub fn kv_backend(&self) -> &KvBackendRef {
&self.kv_backend
}

pub async fn get_full_table_info(
&self,
table_id: TableId,
Expand Down
57 changes: 20 additions & 37 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

//! Datanode configurations

pub mod builder;

use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -33,7 +35,7 @@ use meta_client::MetaClientOptions;
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use object_store::util::normalize_dir;
use query::{QueryEngineFactory, QueryEngineRef};
use query::QueryEngineFactory;
use secrecy::SecretString;
use serde::{Deserialize, Serialize};
use servers::heartbeat_options::HeartbeatOptions;
Expand All @@ -53,7 +55,6 @@ use tokio::fs;
use crate::error::{
CreateDirSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu,
};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::HeartbeatTask;
use crate::region_server::RegionServer;
use crate::server::Services;
Expand Down Expand Up @@ -401,12 +402,14 @@ pub struct Datanode {
services: Option<Services>,
heartbeat_task: Option<HeartbeatTask>,
region_server: RegionServer,
query_engine: QueryEngineRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
}

impl Datanode {
pub async fn new(opts: DatanodeOptions, plugins: Arc<Plugins>) -> Result<Datanode> {
async fn new_region_server(
opts: &DatanodeOptions,
plugins: Arc<Plugins>,
) -> Result<RegionServer> {
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in datanode only executes plan with resolved table source.
MemoryCatalogManager::with_default_setup(),
Expand All @@ -425,46 +428,30 @@ impl Datanode {
);

let mut region_server = RegionServer::new(query_engine.clone(), runtime.clone());
let log_store = Self::build_log_store(&opts).await?;
let object_store = store::new_object_store(&opts).await?;
let engines = Self::build_store_engines(&opts, log_store, object_store).await?;
let log_store = Self::build_log_store(opts).await?;
let object_store = store::new_object_store(opts).await?;
let engines = Self::build_store_engines(opts, log_store, object_store).await?;
for engine in engines {
region_server.register_engine(engine);
}

// build optional things with different modes
let services = match opts.mode {
Mode::Distributed => Some(Services::try_new(region_server.clone(), &opts).await?),
Mode::Standalone => None,
};
let heartbeat_task = match opts.mode {
Mode::Distributed => Some(HeartbeatTask::try_new(&opts, region_server.clone()).await?),
Mode::Standalone => None,
};
let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
Some(opts.storage.data_home.clone()),
&opts.mode,
opts.enable_telemetry,
)
.await;

Ok(Self {
opts,
services,
heartbeat_task,
region_server,
query_engine,
greptimedb_telemetry_task,
})
Ok(region_server)
}

pub async fn start(&mut self) -> Result<()> {
info!("Starting datanode instance...");

self.start_heartbeat().await?;

let _ = self.greptimedb_telemetry_task.start();
self.start_services().await
}

pub async fn start_heartbeat(&self) -> Result<()> {
if let Some(task) = &self.heartbeat_task {
task.start().await?;
}
let _ = self.greptimedb_telemetry_task.start();
self.start_services().await
Ok(())
}

/// Start services of datanode. This method call will block until services are shutdown.
Expand Down Expand Up @@ -503,10 +490,6 @@ impl Datanode {
self.region_server.clone()
}

pub fn query_engine(&self) -> QueryEngineRef {
self.query_engine.clone()
}

// internal utils

/// Build [RaftEngineLogStore]
Expand Down
98 changes: 98 additions & 0 deletions src/datanode/src/datanode/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_base::Plugins;
use meta_client::client::MetaClient;
use servers::Mode;
use snafu::OptionExt;

use crate::datanode::{Datanode, DatanodeOptions};
use crate::error::{MissingMetasrvOptsSnafu, MissingNodeIdSnafu, Result};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::{new_metasrv_client, HeartbeatTask};
use crate::server::Services;

pub struct DatanodeBuilder {
MichaelScofield marked this conversation as resolved.
Show resolved Hide resolved
opts: DatanodeOptions,
plugins: Arc<Plugins>,
meta_client: Option<MetaClient>,
}

impl DatanodeBuilder {
pub fn new(opts: DatanodeOptions, plugins: Arc<Plugins>) -> Self {
Self {
opts,
plugins,
meta_client: None,
}
}

pub fn with_meta_client(self, meta_client: MetaClient) -> Self {
Self {
meta_client: Some(meta_client),
..self
}
}

pub async fn build(mut self) -> Result<Datanode> {
let region_server = Datanode::new_region_server(&self.opts, self.plugins.clone()).await?;

let mode = &self.opts.mode;

let heartbeat_task = match mode {
Mode::Distributed => {
let meta_client = if let Some(meta_client) = self.meta_client.take() {
meta_client
} else {
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;

let meta_config = self
.opts
.meta_client_options
.as_ref()
.context(MissingMetasrvOptsSnafu)?;

new_metasrv_client(node_id, meta_config).await?
};

let heartbeat_task =
HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?;
Some(heartbeat_task)
}
Mode::Standalone => None,
};

let services = match mode {
Mode::Distributed => Some(Services::try_new(region_server.clone(), &self.opts).await?),
Mode::Standalone => None,
};

let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
Some(self.opts.storage.data_home.clone()),
mode,
self.opts.enable_telemetry,
)
.await;

Ok(Datanode {
opts: self.opts,
services,
heartbeat_task,
region_server,
greptimedb_telemetry_task,
})
}
}
20 changes: 7 additions & 13 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@ use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info, trace, warn};
use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder};
use meta_client::MetaClientOptions;
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use tokio::sync::mpsc;
use tokio::time::Instant;

use self::handler::RegionHeartbeatResponseHandler;
use crate::alive_keeper::RegionAliveKeeper;
use crate::datanode::DatanodeOptions;
use crate::error::{
self, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, Result,
};
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::region_server::RegionServer;

pub(crate) mod handler;
Expand All @@ -62,15 +60,11 @@ impl Drop for HeartbeatTask {

impl HeartbeatTask {
/// Create a new heartbeat task instance.
pub async fn try_new(opts: &DatanodeOptions, region_server: RegionServer) -> Result<Self> {
let meta_client = new_metasrv_client(
opts.node_id.context(MissingNodeIdSnafu)?,
opts.meta_client_options
.as_ref()
.context(MissingMetasrvOptsSnafu)?,
)
.await?;

pub async fn try_new(
opts: &DatanodeOptions,
region_server: RegionServer,
meta_client: MetaClient,
) -> Result<Self> {
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
opts.heartbeat.interval_millis,
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,11 @@ impl StatementExecutor {
}
}

async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
pub async fn plan(
&self,
stmt: QueryStatement,
query_ctx: QueryContextRef,
) -> Result<LogicalPlan> {
self.query_engine
.planner()
.plan(stmt, query_ctx)
Expand Down
10 changes: 5 additions & 5 deletions src/servers/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ pub mod region_server;
use std::net::SocketAddr;
use std::sync::Arc;

#[cfg(feature = "testing")]
use api::v1::greptime_database_server::GreptimeDatabase;
use api::v1::greptime_database_server::GreptimeDatabaseServer;
use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
use api::v1::prometheus_gateway_server::{PrometheusGateway, PrometheusGatewayServer};
#[cfg(feature = "testing")]
use api::v1::region::region_server::Region;
use api::v1::region::region_server::RegionServer;
use api::v1::{HealthCheckRequest, HealthCheckResponse};
#[cfg(feature = "testing")]
Expand Down Expand Up @@ -123,12 +123,12 @@ impl GrpcServer {

#[cfg(feature = "testing")]
pub fn create_flight_service(&self) -> FlightServiceServer<impl FlightService> {
FlightServiceServer::new(FlightCraftWrapper(self.database_handler.clone().unwrap()))
FlightServiceServer::new(FlightCraftWrapper(self.flight_handler.clone().unwrap()))
}

#[cfg(feature = "testing")]
pub fn create_database_service(&self) -> GreptimeDatabaseServer<impl GreptimeDatabase> {
GreptimeDatabaseServer::new(DatabaseService::new(self.database_handler.clone().unwrap()))
pub fn create_region_service(&self) -> RegionServer<impl Region> {
RegionServer::new(self.region_server_handler.clone().unwrap())
}

pub fn create_healthcheck_service(&self) -> HealthCheckServer<impl HealthCheck> {
Expand Down
6 changes: 4 additions & 2 deletions tests-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ chrono.workspace = true
client = { workspace = true, features = ["testing"] }
common-base = { workspace = true }
common-catalog = { workspace = true }
common-config = { workspace = true }
common-error = { workspace = true }
common-grpc = { workspace = true }
common-meta = { workspace = true }
common-meta = { workspace = true, features = ["testing"] }
common-procedure = { workspace = true }
common-query = { workspace = true }
common-recordbatch = { workspace = true }
common-runtime = { workspace = true }
Expand Down Expand Up @@ -53,6 +55,7 @@ sqlx = { version = "0.6", features = [
"postgres",
"chrono",
] }
substrait = { workspace = true }
table = { workspace = true }
tempfile.workspace = true
tokio.workspace = true
Expand All @@ -61,7 +64,6 @@ tower = "0.4"
uuid.workspace = true

[dev-dependencies]
common-procedure = { workspace = true }
datafusion-expr.workspace = true
datafusion.workspace = true
itertools.workspace = true
Expand Down
Loading
Loading