Skip to content

Commit

Permalink
feat: integrate wal options allocator to standalone mode
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 19, 2023
1 parent cd1afdd commit 9a3a967
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 44 deletions.
10 changes: 9 additions & 1 deletion src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::any::Any;

use common_error::ext::ErrorExt;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use config::ConfigError;
Expand Down Expand Up @@ -231,6 +231,12 @@ pub enum Error {
#[snafu(source)]
error: std::io::Error,
},

#[snafu(display("Other error"))]
Other {
source: BoxedError,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -276,6 +282,8 @@ impl ErrorExt for Error {
Error::StartCatalogManager { source, .. } => source.status_code(),

Error::SerdeJson { .. } | Error::FileIo { .. } => StatusCode::Unexpected,

Error::Other { source, .. } => source.status_code(),
}
}

Expand Down
17 changes: 14 additions & 3 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use async_trait::async_trait;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef};
Expand All @@ -27,6 +28,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal::build_wal_options_allocator;
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
Expand All @@ -35,7 +37,7 @@ use datanode::datanode::{Datanode, DatanodeBuilder};
use file_engine::config::EngineConfig as FileEngineConfig;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::standalone::StandaloneTableMetadataCreator;
use frontend::instance::standalone::StandaloneTableMetadataAllocator;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
Expand All @@ -48,7 +50,7 @@ use servers::Mode;
use snafu::ResultExt;

use crate::error::{
CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, Result,
CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, OtherSnafu, Result,
ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu,
StartProcedureManagerSnafu, StopProcedureManagerSnafu,
};
Expand Down Expand Up @@ -372,7 +374,16 @@ impl StartCommand {
.step(10)
.build(),
);
let table_meta_allocator = Arc::new(StandaloneTableMetadataCreator::new(table_id_sequence));
// TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder.
let wal_options_allocator =
build_wal_options_allocator(&common_meta::wal::WalConfig::default(), &kv_backend)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?;
let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator,
));

let ddl_task_executor = Self::create_ddl_task_executor(
kv_backend.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use serde_with::with_prefix;
use crate::error::Result;
use crate::wal::kafka::KafkaConfig;
pub use crate::wal::kafka::{KafkaOptions as KafkaWalOptions, Topic as KafkaWalTopic};
pub use crate::wal::options_allocator::WalOptionsAllocator;
pub use crate::wal::options_allocator::{build_wal_options_allocator, WalOptionsAllocator};

/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair
/// and inserted into the options of a `RegionCreateRequest`.
Expand Down
35 changes: 32 additions & 3 deletions src/common/meta/src/wal/options_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use crate::error::Result;
use snafu::ResultExt;
use store_api::storage::RegionNumber;

use crate::error::{EncodeWalOptionsToJsonSnafu, Result};
use crate::kv_backend::KvBackendRef;
use crate::wal::kafka::{KafkaOptions, TopicManager as KafkaTopicManager};
use crate::wal::{WalConfig, WalOptions};
Expand All @@ -27,8 +31,6 @@ pub enum WalOptionsAllocator {
Kafka(KafkaTopicManager),
}

pub type WalOptionsAllocatorRef = Arc<WalOptionsAllocator>;

impl WalOptionsAllocator {
/// Creates a WalOptionsAllocator.
pub fn new(config: &WalConfig, kv_backend: KvBackendRef) -> Self {
Expand Down Expand Up @@ -59,3 +61,30 @@ impl WalOptionsAllocator {
}
}
}

/// Allocates a wal options for each region. The allocated wal options is encoded immediately.
pub fn build_region_wal_options(
regions: Vec<RegionNumber>,
wal_options_allocator: &WalOptionsAllocator,
) -> Result<HashMap<RegionNumber, String>> {
let wal_options = wal_options_allocator
.alloc_batch(regions.len())
.into_iter()
.map(|wal_options| {
serde_json::to_string(&wal_options).context(EncodeWalOptionsToJsonSnafu { wal_options })
})
.collect::<Result<Vec<_>>>()?;

Ok(regions.into_iter().zip(wal_options).collect())
}

/// Builds a wal options allocator.
// TODO(niebayes): implement.
pub async fn build_wal_options_allocator(
config: &WalConfig,
kv_backend: &KvBackendRef,
) -> Result<WalOptionsAllocator> {
let _ = config;
let _ = kv_backend;
Ok(WalOptionsAllocator::default())
}
31 changes: 24 additions & 7 deletions src/frontend/src/instance/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::SequenceRef;
use common_meta::wal::options_allocator::build_region_wal_options;
use common_meta::wal::WalOptionsAllocator;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{debug, tracing};
use datanode::region_server::RegionServer;
use servers::grpc::region_server::RegionServerHandler;
use snafu::{OptionExt, ResultExt};
Expand Down Expand Up @@ -105,18 +107,22 @@ impl Datanode for RegionInvoker {
}
}

pub struct StandaloneTableMetadataCreator {
pub struct StandaloneTableMetadataAllocator {
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocator,
}

impl StandaloneTableMetadataCreator {
pub fn new(table_id_sequence: SequenceRef) -> Self {
Self { table_id_sequence }
impl StandaloneTableMetadataAllocator {
pub fn new(table_id_sequence: SequenceRef, wal_options_allocator: WalOptionsAllocator) -> Self {
Self {
table_id_sequence,
wal_options_allocator,
}
}
}

#[async_trait]
impl TableMetadataAllocator for StandaloneTableMetadataCreator {
impl TableMetadataAllocator for StandaloneTableMetadataAllocator {
async fn create(
&self,
_ctx: &TableMetadataAllocatorContext,
Expand Down Expand Up @@ -145,7 +151,18 @@ impl TableMetadataAllocator for StandaloneTableMetadataCreator {
})
.collect::<Vec<_>>();

// There're no wal options involved in standalone mode currently.
let region_numbers = region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
let region_wal_options =
build_region_wal_options(region_numbers, &self.wal_options_allocator)?;

debug!(
"Allocated region wal options {:?} for table {}",
region_wal_options, table_id
);

Ok(TableMetadata {
table_id,
region_routes,
Expand Down
19 changes: 7 additions & 12 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::time::Duration;
use client::client_manager::DatanodeClients;
use common_base::Plugins;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::TableMetadataAllocatorRef;
Expand All @@ -30,14 +31,14 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef};
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::wal::{WalConfig, WalOptionsAllocator};
use common_meta::wal::build_wal_options_allocator;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;

use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::error::{self, Result};
use crate::error::{self, OtherSnafu, Result};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::check_leader_handler::CheckLeaderHandler;
use crate::handler::collect_stats_handler::CollectStatsHandler;
Expand Down Expand Up @@ -205,7 +206,10 @@ impl MetaSrvBuilder {
table_id: None,
};

let wal_options_allocator = build_wal_options_allocator(&options.wal, &kv_backend).await?;
let wal_options_allocator = build_wal_options_allocator(&options.wal, &kv_backend)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?;
let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
let sequence = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
Expand Down Expand Up @@ -358,15 +362,6 @@ fn build_procedure_manager(
Arc::new(LocalManager::new(manager_config, state_store))
}

async fn build_wal_options_allocator(
config: &WalConfig,
kv_backend: &KvBackendRef,
) -> Result<WalOptionsAllocator> {
let _ = config;
let _ = kv_backend;
Ok(WalOptionsAllocator::default())
}

fn build_ddl_manager(
options: &MetaSrvOptions,
datanode_clients: Option<DatanodeManagerRef>,
Expand Down
20 changes: 5 additions & 15 deletions src/meta-srv/src/table_meta_alloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use api::v1::meta::Partition;
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext};
use common_meta::error::{self as meta_error, EncodeWalOptionsToJsonSnafu, Result as MetaResult};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::SequenceRef;
use common_meta::wal::options_allocator::build_region_wal_options;
use common_meta::wal::WalOptionsAllocator;
use common_telemetry::{debug, warn};
use snafu::{ensure, ResultExt};
Expand Down Expand Up @@ -72,23 +73,12 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator {
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;

// Allocates a wal options for each region. The allocated wal options is encoded immediately.
let num_regions = region_routes.len();
let wal_options = self
.wal_options_allocator
.alloc_batch(num_regions)
.into_iter()
.map(|wal_options| {
serde_json::to_string(&wal_options)
.context(EncodeWalOptionsToJsonSnafu { wal_options })
})
.collect::<MetaResult<Vec<_>>>()?;

let region_wal_options = region_routes
let region_numbers = region_routes
.iter()
.map(|route| route.region.id.region_number())
.zip(wal_options)
.collect();
let region_wal_options =
build_region_wal_options(region_numbers, &self.wal_options_allocator)?;

debug!(
"Allocated region wal options {:?} for table {}",
Expand Down
13 changes: 11 additions & 2 deletions tests-integration/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ use common_meta::ddl_manager::DdlManager;
use common_meta::key::TableMetadataManager;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal::build_wal_options_allocator;
use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::LoggingOptions;
use datanode::config::DatanodeOptions;
use datanode::datanode::DatanodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::standalone::StandaloneTableMetadataCreator;
use frontend::instance::standalone::StandaloneTableMetadataAllocator;
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};

use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard};
Expand Down Expand Up @@ -117,7 +118,15 @@ impl GreptimeDbStandaloneBuilder {
.step(10)
.build(),
);
let table_meta_allocator = Arc::new(StandaloneTableMetadataCreator::new(table_id_sequence));
// TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder.
let wal_options_allocator =
build_wal_options_allocator(&common_meta::wal::WalConfig::default(), &kv_backend)
.await
.unwrap();
let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator,
));

let ddl_task_executor = Arc::new(
DdlManager::try_new(
Expand Down

0 comments on commit 9a3a967

Please sign in to comment.