Skip to content

Commit

Permalink
fix: avoid logging guard drop
Browse files Browse the repository at this point in the history
  • Loading branch information
zyy17 committed May 21, 2024
1 parent 4778575 commit 1ff5666
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 35 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ substrait.workspace = true
table.workspace = true
tokio.workspace = true
toml.workspace = true
tracing-appender = "0.2"

[target.'cfg(not(windows))'.dependencies]
tikv-jemallocator = "0.5"
Expand Down
21 changes: 13 additions & 8 deletions src/cmd/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use async_trait::async_trait;
use bench::BenchTableMetadataCommand;
use clap::Parser;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use tracing_appender::non_blocking::WorkerGuard;
// pub use repl::Repl;
use upgrade::UpgradeCommand;

Expand All @@ -48,11 +49,15 @@ pub trait Tool: Send + Sync {

pub struct Instance {
tool: Box<dyn Tool>,

// Keep the logging guard to prevent the worker from being dropped.
#[allow(dead_code)]
guard: Vec<WorkerGuard>,
}

impl Instance {
fn new(tool: Box<dyn Tool>) -> Self {
Self { tool }
fn new(tool: Box<dyn Tool>, guard: Vec<WorkerGuard>) -> Self {
Self { tool, guard }
}
}

Expand Down Expand Up @@ -83,14 +88,14 @@ pub struct Command {

impl Command {
pub async fn build(&self, opts: LoggingOptions) -> Result<Instance> {
let _guard = common_telemetry::init_global_logging(
let guard = common_telemetry::init_global_logging(
APP_NAME,
&opts,
&TracingOptions::default(),
None,
);

self.cmd.build().await
self.cmd.build(guard).await
}

pub fn load_options(&self, global_options: &GlobalOptions) -> Result<LoggingOptions> {
Expand All @@ -115,12 +120,12 @@ enum SubCommand {
}

impl SubCommand {
async fn build(&self) -> Result<Instance> {
async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
match self {
// SubCommand::Attach(cmd) => cmd.build().await,
SubCommand::Upgrade(cmd) => cmd.build().await,
SubCommand::Bench(cmd) => cmd.build().await,
SubCommand::Export(cmd) => cmd.build().await,
SubCommand::Upgrade(cmd) => cmd.build(guard).await,
SubCommand::Bench(cmd) => cmd.build(guard).await,
SubCommand::Export(cmd) => cmd.build(guard).await,
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/cmd/src/cli/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datatypes::schema::{ColumnSchema, RawSchema};
use rand::Rng;
use store_api::storage::RegionNumber;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType};
use tracing_appender::non_blocking::WorkerGuard;

use self::metadata::TableMetadataBencher;
use crate::cli::{Instance, Tool};
Expand Down Expand Up @@ -61,7 +62,7 @@ pub struct BenchTableMetadataCommand {
}

impl BenchTableMetadataCommand {
pub async fn build(&self) -> Result<Instance> {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128)
.await
.unwrap();
Expand All @@ -72,7 +73,7 @@ impl BenchTableMetadataCommand {
table_metadata_manager,
count: self.count,
};
Ok(Instance::new(Box::new(tool)))
Ok(Instance::new(Box::new(tool), guard))
}
}

Expand Down
24 changes: 14 additions & 10 deletions src/cmd/src/cli/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use tracing_appender::non_blocking::WorkerGuard;

use crate::cli::{Instance, Tool};
use crate::error::{
Expand Down Expand Up @@ -80,7 +81,7 @@ pub struct ExportCommand {
}

impl ExportCommand {
pub async fn build(&self) -> Result<Instance> {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let (catalog, schema) = split_database(&self.database)?;

let auth_header = if let Some(basic) = &self.auth_basic {
Expand All @@ -90,15 +91,18 @@ impl ExportCommand {
None
};

Ok(Instance::new(Box::new(Export {
addr: self.addr.clone(),
catalog,
schema,
output_dir: self.output_dir.clone(),
parallelism: self.export_jobs,
target: self.target.clone(),
auth_header,
})))
Ok(Instance::new(
Box::new(Export {
addr: self.addr.clone(),
catalog,
schema,
output_dir: self.output_dir.clone(),
parallelism: self.export_jobs,
target: self.target.clone(),
auth_header,
}),
guard,
))
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use etcd_client::Client;
use futures::TryStreamExt;
use prost::Message;
use snafu::ResultExt;
use tracing_appender::non_blocking::WorkerGuard;
use v1_helper::{CatalogKey as v1CatalogKey, SchemaKey as v1SchemaKey, TableGlobalValue};

use crate::cli::{Instance, Tool};
Expand All @@ -63,7 +64,7 @@ pub struct UpgradeCommand {
}

impl UpgradeCommand {
pub async fn build(&self) -> Result<Instance> {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let client = Client::connect([&self.etcd_addr], None)
.await
.context(ConnectEtcdSnafu {
Expand All @@ -77,7 +78,7 @@ impl UpgradeCommand {
skip_schema_keys: self.skip_schema_keys,
skip_table_route_keys: self.skip_table_route_keys,
};
Ok(Instance::new(Box::new(tool)))
Ok(Instance::new(Box::new(tool), guard))
}
}

Expand Down
13 changes: 9 additions & 4 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datanode::service::DatanodeServiceBuilder;
use meta_client::MetaClientOptions;
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;

use crate::error::{
LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
Expand All @@ -40,11 +41,15 @@ pub const APP_NAME: &str = "greptime-datanode";

pub struct Instance {
datanode: Datanode,

// Keep the logging guard to prevent the worker from being dropped.
#[allow(dead_code)]
guard: Vec<WorkerGuard>,
}

impl Instance {
pub fn new(datanode: Datanode) -> Self {
Self { datanode }
pub fn new(datanode: Datanode, guard: Vec<WorkerGuard>) -> Self {
Self { datanode, guard }
}

pub fn datanode_mut(&mut self) -> &mut Datanode {
Expand Down Expand Up @@ -228,7 +233,7 @@ impl StartCommand {
}

async fn build(&self, mut opts: DatanodeOptions) -> Result<Instance> {
let _guard = common_telemetry::init_global_logging(
let guard = common_telemetry::init_global_logging(
APP_NAME,
&opts.logging,
&opts.tracing,
Expand Down Expand Up @@ -274,7 +279,7 @@ impl StartCommand {
.context(StartDatanodeSnafu)?;
datanode.setup_services(services);

Ok(Instance::new(datanode))
Ok(Instance::new(datanode, guard))
}
}

Expand Down
13 changes: 9 additions & 4 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use meta_client::MetaClientOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;

use crate::error::{
self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result, StartFrontendSnafu,
Expand All @@ -51,13 +52,17 @@ use crate::{log_versions, App};

pub struct Instance {
frontend: FeInstance,

// Keep the logging guard to prevent the worker from being dropped.
#[allow(dead_code)]
guard: Vec<WorkerGuard>,
}

pub const APP_NAME: &str = "greptime-frontend";

impl Instance {
pub fn new(frontend: FeInstance) -> Self {
Self { frontend }
pub fn new(frontend: FeInstance, guard: Vec<WorkerGuard>) -> Self {
Self { frontend, guard }
}

pub fn mut_inner(&mut self) -> &mut FeInstance {
Expand Down Expand Up @@ -241,7 +246,7 @@ impl StartCommand {
}

async fn build(&self, mut opts: FrontendOptions) -> Result<Instance> {
let _guard = common_telemetry::init_global_logging(
let guard = common_telemetry::init_global_logging(
APP_NAME,
&opts.logging,
&opts.tracing,
Expand Down Expand Up @@ -358,7 +363,7 @@ impl StartCommand {
.build_servers(opts, servers)
.context(StartFrontendSnafu)?;

Ok(Instance::new(instance))
Ok(Instance::new(instance, guard))
}
}

Expand Down
13 changes: 9 additions & 4 deletions src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_version::{short_version, version};
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::MetasrvOptions;
use snafu::ResultExt;
use tracing_appender::non_blocking::WorkerGuard;

use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
use crate::options::GlobalOptions;
Expand All @@ -32,11 +33,15 @@ pub const APP_NAME: &str = "greptime-metasrv";

pub struct Instance {
instance: MetasrvInstance,

// Keep the logging guard to prevent the worker from being dropped.
#[allow(dead_code)]
guard: Vec<WorkerGuard>,
}

impl Instance {
fn new(instance: MetasrvInstance) -> Self {
Self { instance }
fn new(instance: MetasrvInstance, guard: Vec<WorkerGuard>) -> Self {
Self { instance, guard }
}
}

Expand Down Expand Up @@ -214,7 +219,7 @@ impl StartCommand {
}

async fn build(&self, mut opts: MetasrvOptions) -> Result<Instance> {
let _guard =
let guard =
common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None);
log_versions(version!(), short_version!());

Expand All @@ -234,7 +239,7 @@ impl StartCommand {
.await
.context(error::BuildMetaServerSnafu)?;

Ok(Instance::new(instance))
Ok(Instance::new(instance, guard))
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;

use crate::error::{
BuildCacheRegistrySnafu, CacheRequiredSnafu, CreateDirSnafu, IllegalConfigSnafu,
Expand Down Expand Up @@ -210,6 +211,10 @@ pub struct Instance {
frontend: FeInstance,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,

// Keep the logging guard to prevent the worker from being dropped.
#[allow(dead_code)]
guard: Vec<WorkerGuard>,
}

#[async_trait]
Expand Down Expand Up @@ -375,7 +380,7 @@ impl StartCommand {
#[allow(unused_variables)]
#[allow(clippy::diverging_sub_expression)]
async fn build(&self, opts: StandaloneOptions) -> Result<Instance> {
let _guard =
let guard =
common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None);
log_versions(version!(), short_version!());

Expand Down Expand Up @@ -521,6 +526,7 @@ impl StartCommand {
frontend,
procedure_manager,
wal_options_allocator,
guard,
})
}

Expand Down

0 comments on commit 1ff5666

Please sign in to comment.