Skip to content

Commit

Permalink
chore: Merge branch 'develop' into feat/mito2-readonly
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Sep 13, 2023
2 parents b5fb517 + 93f3048 commit c892de7
Show file tree
Hide file tree
Showing 36 changed files with 597 additions and 628 deletions.
27 changes: 8 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::time::Duration;

use clap::Parser;
use common_telemetry::logging;
use datanode::datanode::builder::DatanodeBuilder;
use datanode::datanode::{Datanode, DatanodeOptions};
use meta_client::MetaClientOptions;
use servers::Mode;
Expand Down Expand Up @@ -162,7 +163,8 @@ impl StartCommand {
logging::info!("Datanode start command: {:#?}", self);
logging::info!("Datanode options: {:#?}", opts);

let datanode = Datanode::new(opts, Default::default())
let datanode = DatanodeBuilder::new(opts, Default::default())
.build()
.await
.context(StartDatanodeSnafu)?;

Expand Down
4 changes: 3 additions & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
use datanode::datanode::builder::DatanodeBuilder;
use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig};
use datanode::region_server::RegionServer;
use frontend::catalog::FrontendCatalogManager;
Expand Down Expand Up @@ -306,7 +307,8 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

let datanode = Datanode::new(dn_opts.clone(), plugins.clone())
let datanode = DatanodeBuilder::new(dn_opts.clone(), plugins.clone())
.build()
.await
.context(StartDatanodeSnafu)?;
let region_server = datanode.region_server();
Expand Down
13 changes: 11 additions & 2 deletions src/common/datasource/src/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Lister {
Source::Dir => {
let streamer = self
.object_store
.list(&self.path)
.lister_with(&self.path)
.await
.context(error::ListObjectsSnafu { path: &self.path })?;

Expand All @@ -76,7 +76,16 @@ impl Lister {
path: &file_full_path,
},
)?;
Ok(vec![Entry::new(&file_full_path)])

Ok(self
.object_store
.list_with(&self.path)
.await
.context(error::ListObjectsSnafu { path: &self.path })?
.into_iter()
.find(|f| f.name() == filename)
.map(|f| vec![f])
.unwrap_or_default())
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true

[features]
testing = []

[dependencies]
api = { workspace = true }
arrow-flight.workspace = true
Expand Down
5 changes: 5 additions & 0 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ impl TableMetadataManager {
&self.table_route_manager
}

#[cfg(feature = "testing")]
pub fn kv_backend(&self) -> &KvBackendRef {
&self.kv_backend
}

pub async fn get_full_table_info(
&self,
table_id: TableId,
Expand Down
8 changes: 2 additions & 6 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ mod tests {
use common_error::status_code::StatusCode;
use common_test_util::temp_dir::create_temp_dir;
use futures_util::future::BoxFuture;
use futures_util::{FutureExt, TryStreamExt};
use futures_util::FutureExt;
use object_store::ObjectStore;

use super::*;
Expand Down Expand Up @@ -492,11 +492,7 @@ mod tests {
) {
let dir = proc_path!(procedure_store, "{procedure_id}/");
let lister = object_store.list(&dir).await.unwrap();
let mut files_in_dir: Vec<_> = lister
.map_ok(|de| de.name().to_string())
.try_collect()
.await
.unwrap();
let mut files_in_dir: Vec<_> = lister.into_iter().map(|de| de.name().to_string()).collect();
files_in_dir.sort_unstable();
assert_eq!(files, files_in_dir);
}
Expand Down
18 changes: 5 additions & 13 deletions src/common/procedure/src/store/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use async_trait::async_trait;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use futures::{Stream, StreamExt};
use object_store::{EntryMode, Metakey, ObjectStore};
use object_store::{EntryMode, ObjectStore};
use snafu::ResultExt;

use crate::error::{DeleteStateSnafu, ListStateSnafu, PutStateSnafu, Result};
Expand Down Expand Up @@ -86,7 +86,8 @@ impl StateStore for ObjectStateStore {
async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream> {
let mut lister = self
.store
.scan(path)
.lister_with(path)
.delimiter("")
.await
.map_err(|e| {
BoxedError::new(PlainError::new(
Expand All @@ -110,17 +111,8 @@ impl StateStore for ObjectStateStore {
})
.context(ListStateSnafu { path: &path_string })?;
let key = entry.path();
let metadata = store
.metadata(&entry, Metakey::Mode)
.await
.map_err(|e| {
BoxedError::new(PlainError::new(
e.to_string(),
StatusCode::StorageUnavailable,
))
})
.context(ListStateSnafu { path: key })?;
if let EntryMode::FILE = metadata.mode() {

if let EntryMode::FILE = entry.metadata().mode() {
let value = store
.read(key)
.await
Expand Down
57 changes: 20 additions & 37 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

//! Datanode configurations
pub mod builder;

use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -33,7 +35,7 @@ use meta_client::MetaClientOptions;
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use object_store::util::normalize_dir;
use query::{QueryEngineFactory, QueryEngineRef};
use query::QueryEngineFactory;
use secrecy::SecretString;
use serde::{Deserialize, Serialize};
use servers::heartbeat_options::HeartbeatOptions;
Expand All @@ -53,7 +55,6 @@ use tokio::fs;
use crate::error::{
CreateDirSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu,
};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::HeartbeatTask;
use crate::region_server::RegionServer;
use crate::server::Services;
Expand Down Expand Up @@ -401,12 +402,14 @@ pub struct Datanode {
services: Option<Services>,
heartbeat_task: Option<HeartbeatTask>,
region_server: RegionServer,
query_engine: QueryEngineRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
}

impl Datanode {
pub async fn new(opts: DatanodeOptions, plugins: Arc<Plugins>) -> Result<Datanode> {
async fn new_region_server(
opts: &DatanodeOptions,
plugins: Arc<Plugins>,
) -> Result<RegionServer> {
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in datanode only executes plan with resolved table source.
MemoryCatalogManager::with_default_setup(),
Expand All @@ -425,46 +428,30 @@ impl Datanode {
);

let mut region_server = RegionServer::new(query_engine.clone(), runtime.clone());
let log_store = Self::build_log_store(&opts).await?;
let object_store = store::new_object_store(&opts).await?;
let engines = Self::build_store_engines(&opts, log_store, object_store).await?;
let log_store = Self::build_log_store(opts).await?;
let object_store = store::new_object_store(opts).await?;
let engines = Self::build_store_engines(opts, log_store, object_store).await?;
for engine in engines {
region_server.register_engine(engine);
}

// build optional things with different modes
let services = match opts.mode {
Mode::Distributed => Some(Services::try_new(region_server.clone(), &opts).await?),
Mode::Standalone => None,
};
let heartbeat_task = match opts.mode {
Mode::Distributed => Some(HeartbeatTask::try_new(&opts, region_server.clone()).await?),
Mode::Standalone => None,
};
let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
Some(opts.storage.data_home.clone()),
&opts.mode,
opts.enable_telemetry,
)
.await;

Ok(Self {
opts,
services,
heartbeat_task,
region_server,
query_engine,
greptimedb_telemetry_task,
})
Ok(region_server)
}

pub async fn start(&mut self) -> Result<()> {
info!("Starting datanode instance...");

self.start_heartbeat().await?;

let _ = self.greptimedb_telemetry_task.start();
self.start_services().await
}

pub async fn start_heartbeat(&self) -> Result<()> {
if let Some(task) = &self.heartbeat_task {
task.start().await?;
}
let _ = self.greptimedb_telemetry_task.start();
self.start_services().await
Ok(())
}

/// Start services of datanode. This method call will block until services are shutdown.
Expand Down Expand Up @@ -503,10 +490,6 @@ impl Datanode {
self.region_server.clone()
}

pub fn query_engine(&self) -> QueryEngineRef {
self.query_engine.clone()
}

// internal utils

/// Build [RaftEngineLogStore]
Expand Down
Loading

0 comments on commit c892de7

Please sign in to comment.