Skip to content

Commit

Permalink
perf: Reference counting record store garbage collection (#2059)
Browse files Browse the repository at this point in the history
* chore: Replace `triomphe::Arc` with `std::sync::Arc`

Also move `Send` and `Sync` `impl`s one level depper to enable `Weak`

* perf: Reference counting record store garbage collection

---------

Signed-off-by: Solomon <[email protected]>
Co-authored-by: Solomon <[email protected]>
  • Loading branch information
chubei and abcpro1 authored Sep 22, 2023
1 parent e9c5b98 commit 3ff95aa
Show file tree
Hide file tree
Showing 48 changed files with 509 additions and 509 deletions.
21 changes: 10 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 12 additions & 20 deletions dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use dozer_api::grpc::internal::internal_pipeline_server::LogEndpoint;
use dozer_cache::dozer_log::camino::Utf8Path;
use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir};
use dozer_cache::dozer_log::replication::Log;
use dozer_core::checkpoint::{CheckpointFactory, CheckpointFactoryOptions, OptionCheckpoint};
use dozer_core::checkpoint::OptionCheckpoint;
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::models::app_config::DataStorage;
use dozer_types::models::flags::Flags;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;
Expand All @@ -28,7 +29,6 @@ pub struct Executor<'a> {
connections: &'a [Connection],
sources: &'a [Source],
sql: Option<&'a str>,
checkpoint_factory: Arc<CheckpointFactory>,
checkpoint: OptionCheckpoint,
/// `ApiEndpoint` and its log.
endpoint_and_logs: Vec<(ApiEndpoint, LogEndpoint)>,
Expand All @@ -47,7 +47,7 @@ impl<'a> Executor<'a> {
sources: &'a [Source],
sql: Option<&'a str>,
api_endpoints: &'a [ApiEndpoint],
checkpoint_factory_options: CheckpointFactoryOptions,
storage_config: DataStorage,
labels: LabelsAndProgress,
udfs: &'a [UdfConfig],
) -> Result<Executor<'a>, OrchestrationError> {
Expand All @@ -58,18 +58,17 @@ impl<'a> Executor<'a> {
.ok_or(OrchestrationError::NoBuildFound)?;

// Load pipeline checkpoint.
let (checkpoint_factory, last_checkpoint, _) =
CheckpointFactory::new(build_path.data_dir.to_string(), checkpoint_factory_options)
.await?;
let checkpoint =
OptionCheckpoint::new(build_path.data_dir.to_string(), storage_config).await?;

let mut endpoint_and_logs = vec![];
for endpoint in api_endpoints {
let log_endpoint = create_log_endpoint(
contract,
&build_path,
&endpoint.name,
&checkpoint_factory,
last_checkpoint.num_slices(),
&checkpoint,
checkpoint.num_slices(),
)
.await?;
endpoint_and_logs.push((endpoint.clone(), log_endpoint));
Expand All @@ -79,8 +78,7 @@ impl<'a> Executor<'a> {
connections,
sources,
sql,
checkpoint_factory: Arc::new(checkpoint_factory),
checkpoint: last_checkpoint,
checkpoint,
endpoint_and_logs,
labels,
udfs,
Expand Down Expand Up @@ -112,13 +110,7 @@ impl<'a> Executor<'a> {
);

let dag = builder.build(runtime, shutdown).await?;
let exec = DagExecutor::new(
dag,
self.checkpoint_factory,
self.checkpoint,
executor_options,
)
.await?;
let exec = DagExecutor::new(dag, self.checkpoint, executor_options).await?;

Ok(exec)
}
Expand All @@ -139,7 +131,7 @@ async fn create_log_endpoint(
contract: &Contract,
build_path: &BuildPath,
endpoint_name: &str,
checkpoint_factory: &CheckpointFactory,
checkpoint: &OptionCheckpoint,
num_persisted_entries_to_keep: usize,
) -> Result<LogEndpoint, OrchestrationError> {
let endpoint_path = build_path.get_endpoint_path(endpoint_name);
Expand All @@ -157,10 +149,10 @@ async fn create_log_endpoint(
OrchestrationError::FileSystem(build_path.descriptor_path.clone().into(), e)
})?;

let log_prefix = AsRef::<Utf8Path>::as_ref(checkpoint_factory.prefix())
let log_prefix = AsRef::<Utf8Path>::as_ref(checkpoint.prefix())
.join(&endpoint_path.log_dir_relative_to_data_dir);
let log = Log::new(
checkpoint_factory.storage(),
checkpoint.storage(),
log_prefix.into(),
num_persisted_entries_to_keep,
)
Expand Down
7 changes: 2 additions & 5 deletions dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use crate::pipeline::PipelineBuilder;
use crate::shutdown::ShutdownReceiver;
use crate::simple::build;
use crate::simple::helper::validate_config;
use crate::utils::{
get_cache_manager_options, get_checkpoint_factory_options, get_default_max_num_records,
get_executor_options,
};
use crate::utils::{get_cache_manager_options, get_default_max_num_records, get_executor_options};

use crate::{flatten_join_handle, join_handle_map_err};
use dozer_api::auth::{Access, Authorizer};
Expand Down Expand Up @@ -229,7 +226,7 @@ impl SimpleOrchestrator {
&self.config.sources,
self.config.sql.as_deref(),
&self.config.endpoints,
get_checkpoint_factory_options(&self.config),
self.config.app.data_storage.clone(),
self.labels.clone(),
&self.config.udfs,
))?;
Expand Down
4 changes: 2 additions & 2 deletions dozer-cli/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,13 @@ fn get_max_interval_before_persist_in_seconds(config: &Config) -> u64 {
.unwrap_or_else(default_max_interval_before_persist_in_seconds)
}

pub fn get_checkpoint_factory_options(config: &Config) -> CheckpointFactoryOptions {
fn get_checkpoint_factory_options(config: &Config) -> CheckpointFactoryOptions {
CheckpointFactoryOptions {
persist_queue_capacity: config
.app
.persist_queue_capacity
.unwrap_or_else(default_persist_queue_capacity)
as usize,
storage_config: config.app.data_storage.clone(),
}
}

Expand All @@ -85,6 +84,7 @@ pub fn get_executor_options(config: &Config) -> ExecutorOptions {
config,
),
},
checkpoint_factory_options: get_checkpoint_factory_options(config),
}
}

Expand Down
17 changes: 9 additions & 8 deletions dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use daggy::petgraph::visit::{IntoNodeIdentifiers, IntoNodeReferences};
use dozer_types::node::NodeHandle;

use crate::{
checkpoint::{CheckpointFactory, OptionCheckpoint},
checkpoint::{CheckpointFactory, CheckpointFactoryOptions, OptionCheckpoint},
dag_schemas::{DagHaveSchemas, DagSchemas, EdgeType},
errors::ExecutionError,
node::{PortHandle, Processor, Sink, Source, SourceState},
Expand Down Expand Up @@ -44,8 +44,8 @@ pub struct BuilderDag {

impl BuilderDag {
pub async fn new(
checkpoint_factory: Arc<CheckpointFactory>,
checkpoint: OptionCheckpoint,
options: CheckpointFactoryOptions,
dag_schemas: DagSchemas,
) -> Result<Self, ExecutionError> {
// Collect input output schemas.
Expand All @@ -60,9 +60,7 @@ impl BuilderDag {
let mut checkpoint_data = HashMap::new();
for (node_index, node) in dag_schemas.graph().node_references() {
if let DagNodeKind::Processor(_) = &node.kind {
let processor_data = checkpoint
.load_processor_data(&checkpoint_factory, &node.handle)
.await?;
let processor_data = checkpoint.load_processor_data(&node.handle).await?;
checkpoint_data.insert(node_index, processor_data);
}
}
Expand Down Expand Up @@ -116,7 +114,7 @@ impl BuilderDag {
output_schemas
.remove(&node_index)
.expect("we collected all output schemas"),
checkpoint_factory.record_store(),
checkpoint.record_store(),
checkpoint_data
.remove(&node_index)
.expect("we collected all processor checkpoint data"),
Expand Down Expand Up @@ -144,10 +142,13 @@ impl BuilderDag {
|_, edge| Ok(edge),
)?;

let initial_epoch_id = checkpoint.next_epoch_id();
let (checkpoint_factory, _) = CheckpointFactory::new(checkpoint, options).await?;

Ok(BuilderDag {
graph,
initial_epoch_id: checkpoint.next_epoch_id(),
checkpoint_factory,
initial_epoch_id,
checkpoint_factory: Arc::new(checkpoint_factory),
})
}

Expand Down
Loading

0 comments on commit 3ff95aa

Please sign in to comment.