Skip to content

Commit

Permalink
perf: Reference counting record store garbage collection
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Sep 20, 2023
1 parent db5f21c commit 8d092b6
Show file tree
Hide file tree
Showing 45 changed files with 487 additions and 486 deletions.
32 changes: 12 additions & 20 deletions dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use dozer_api::grpc::internal::internal_pipeline_server::LogEndpoint;
use dozer_cache::dozer_log::camino::Utf8Path;
use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir};
use dozer_cache::dozer_log::replication::Log;
use dozer_core::checkpoint::{CheckpointFactory, CheckpointFactoryOptions, OptionCheckpoint};
use dozer_core::checkpoint::OptionCheckpoint;
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::models::app_config::DataStorage;
use dozer_types::models::flags::Flags;
use dozer_types::parking_lot::Mutex;
use tokio::runtime::Runtime;
Expand All @@ -28,7 +29,6 @@ pub struct Executor<'a> {
connections: &'a [Connection],
sources: &'a [Source],
sql: Option<&'a str>,
checkpoint_factory: Arc<CheckpointFactory>,
checkpoint: OptionCheckpoint,
/// `ApiEndpoint` and its log.
endpoint_and_logs: Vec<(ApiEndpoint, LogEndpoint)>,
Expand All @@ -47,7 +47,7 @@ impl<'a> Executor<'a> {
sources: &'a [Source],
sql: Option<&'a str>,
api_endpoints: &'a [ApiEndpoint],
checkpoint_factory_options: CheckpointFactoryOptions,
storage_config: DataStorage,
labels: LabelsAndProgress,
udfs: &'a [UdfConfig],
) -> Result<Executor<'a>, OrchestrationError> {
Expand All @@ -58,18 +58,17 @@ impl<'a> Executor<'a> {
.ok_or(OrchestrationError::NoBuildFound)?;

// Load pipeline checkpoint.
let (checkpoint_factory, last_checkpoint, _) =
CheckpointFactory::new(build_path.data_dir.to_string(), checkpoint_factory_options)
.await?;
let checkpoint =
OptionCheckpoint::new(build_path.data_dir.to_string(), storage_config).await?;

let mut endpoint_and_logs = vec![];
for endpoint in api_endpoints {
let log_endpoint = create_log_endpoint(
contract,
&build_path,
&endpoint.name,
&checkpoint_factory,
last_checkpoint.num_slices(),
&checkpoint,
checkpoint.num_slices(),
)
.await?;
endpoint_and_logs.push((endpoint.clone(), log_endpoint));
Expand All @@ -79,8 +78,7 @@ impl<'a> Executor<'a> {
connections,
sources,
sql,
checkpoint_factory: Arc::new(checkpoint_factory),
checkpoint: last_checkpoint,
checkpoint,
endpoint_and_logs,
labels,
udfs,
Expand Down Expand Up @@ -112,13 +110,7 @@ impl<'a> Executor<'a> {
);

let dag = builder.build(runtime, shutdown).await?;
let exec = DagExecutor::new(
dag,
self.checkpoint_factory,
self.checkpoint,
executor_options,
)
.await?;
let exec = DagExecutor::new(dag, self.checkpoint, executor_options).await?;

Ok(exec)
}
Expand All @@ -139,7 +131,7 @@ async fn create_log_endpoint(
contract: &Contract,
build_path: &BuildPath,
endpoint_name: &str,
checkpoint_factory: &CheckpointFactory,
checkpoint: &OptionCheckpoint,
num_persisted_entries_to_keep: usize,
) -> Result<LogEndpoint, OrchestrationError> {
let endpoint_path = build_path.get_endpoint_path(endpoint_name);
Expand All @@ -157,10 +149,10 @@ async fn create_log_endpoint(
OrchestrationError::FileSystem(build_path.descriptor_path.clone().into(), e)
})?;

let log_prefix = AsRef::<Utf8Path>::as_ref(checkpoint_factory.prefix())
let log_prefix = AsRef::<Utf8Path>::as_ref(checkpoint.prefix())
.join(&endpoint_path.log_dir_relative_to_data_dir);
let log = Log::new(
checkpoint_factory.storage(),
checkpoint.storage(),
log_prefix.into(),
num_persisted_entries_to_keep,
)
Expand Down
6 changes: 3 additions & 3 deletions dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::simple::build;
use crate::simple::helper::validate_config;
use crate::utils::{
get_api_security_config, get_app_grpc_config, get_cache_manager_options,
get_checkpoint_factory_options, get_default_max_num_records, get_executor_options,
get_grpc_config, get_rest_config,
get_default_max_num_records, get_executor_options, get_grpc_config, get_rest_config,
get_storage_config,
};

use crate::{flatten_join_handle, join_handle_map_err};
Expand Down Expand Up @@ -212,7 +212,7 @@ impl SimpleOrchestrator {
&self.config.sources,
self.config.sql.as_deref(),
&self.config.endpoints,
get_checkpoint_factory_options(&self.config),
get_storage_config(&self.config),
self.labels.clone(),
&self.config.udfs,
))?;
Expand Down
4 changes: 2 additions & 2 deletions dozer-cli/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,14 @@ pub fn get_api_security_config(config: &Config) -> Option<&ApiSecurity> {
.and_then(|api| api.api_security.as_ref())
}

pub fn get_checkpoint_factory_options(config: &Config) -> CheckpointFactoryOptions {
fn get_checkpoint_factory_options(config: &Config) -> CheckpointFactoryOptions {
CheckpointFactoryOptions {
persist_queue_capacity: config
.app
.as_ref()
.and_then(|app| app.persist_queue_capacity)
.unwrap_or_else(default_persist_queue_capacity)
as usize,
storage_config: get_storage_config(config),
}
}

Expand All @@ -137,6 +136,7 @@ pub fn get_executor_options(config: &Config) -> ExecutorOptions {
config,
),
},
checkpoint_factory_options: get_checkpoint_factory_options(config),
}
}

Expand Down
17 changes: 9 additions & 8 deletions dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use daggy::petgraph::visit::{IntoNodeIdentifiers, IntoNodeReferences};
use dozer_types::node::NodeHandle;

use crate::{
checkpoint::{CheckpointFactory, OptionCheckpoint},
checkpoint::{CheckpointFactory, CheckpointFactoryOptions, OptionCheckpoint},
dag_schemas::{DagHaveSchemas, DagSchemas, EdgeType},
errors::ExecutionError,
node::{PortHandle, Processor, Sink, Source, SourceState},
Expand Down Expand Up @@ -44,8 +44,8 @@ pub struct BuilderDag {

impl BuilderDag {
pub async fn new(
checkpoint_factory: Arc<CheckpointFactory>,
checkpoint: OptionCheckpoint,
options: CheckpointFactoryOptions,
dag_schemas: DagSchemas,
) -> Result<Self, ExecutionError> {
// Collect input output schemas.
Expand All @@ -60,9 +60,7 @@ impl BuilderDag {
let mut checkpoint_data = HashMap::new();
for (node_index, node) in dag_schemas.graph().node_references() {
if let DagNodeKind::Processor(_) = &node.kind {
let processor_data = checkpoint
.load_processor_data(&checkpoint_factory, &node.handle)
.await?;
let processor_data = checkpoint.load_processor_data(&node.handle).await?;
checkpoint_data.insert(node_index, processor_data);
}
}
Expand Down Expand Up @@ -116,7 +114,7 @@ impl BuilderDag {
output_schemas
.remove(&node_index)
.expect("we collected all output schemas"),
checkpoint_factory.record_store(),
checkpoint.record_store(),
checkpoint_data
.remove(&node_index)
.expect("we collected all processor checkpoint data"),
Expand Down Expand Up @@ -144,10 +142,13 @@ impl BuilderDag {
|_, edge| Ok(edge),
)?;

let initial_epoch_id = checkpoint.next_epoch_id();
let (checkpoint_factory, _) = CheckpointFactory::new(checkpoint, options).await?;

Ok(BuilderDag {
graph,
initial_epoch_id: checkpoint.next_epoch_id(),
checkpoint_factory,
initial_epoch_id,
checkpoint_factory: Arc::new(checkpoint_factory),
})
}

Expand Down
Loading

0 comments on commit 8d092b6

Please sign in to comment.