Skip to content

Commit

Permalink
Revert "feat: Add option to use rocksdb record store (#2074)" (#2082)
Browse files Browse the repository at this point in the history
This reverts commit edb21fc.
  • Loading branch information
chubei authored Sep 25, 2023
1 parent edb21fc commit 66b7e67
Show file tree
Hide file tree
Showing 24 changed files with 610 additions and 1,024 deletions.
52 changes: 1 addition & 51 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use dozer_api::grpc::internal::internal_pipeline_server::LogEndpoint;
use dozer_cache::dozer_log::camino::Utf8Path;
use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir};
use dozer_cache::dozer_log::replication::Log;
use dozer_core::checkpoint::{CheckpointOptions, OptionCheckpoint};
use dozer_core::checkpoint::OptionCheckpoint;
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::models::app_config::DataStorage;
use dozer_types::models::flags::Flags;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -46,7 +47,7 @@ impl<'a> Executor<'a> {
sources: &'a [Source],
sql: Option<&'a str>,
api_endpoints: &'a [ApiEndpoint],
checkpoint_options: CheckpointOptions,
storage_config: DataStorage,
labels: LabelsAndProgress,
udfs: &'a [UdfConfig],
) -> Result<Executor<'a>, OrchestrationError> {
Expand All @@ -58,7 +59,7 @@ impl<'a> Executor<'a> {

// Load pipeline checkpoint.
let checkpoint =
OptionCheckpoint::new(build_path.data_dir.to_string(), checkpoint_options).await?;
OptionCheckpoint::new(build_path.data_dir.to_string(), storage_config).await?;

let mut endpoint_and_logs = vec![];
for endpoint in api_endpoints {
Expand Down
7 changes: 2 additions & 5 deletions dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use crate::pipeline::PipelineBuilder;
use crate::shutdown::ShutdownReceiver;
use crate::simple::build;
use crate::simple::helper::validate_config;
use crate::utils::{
get_cache_manager_options, get_checkpoint_options, get_default_max_num_records,
get_executor_options,
};
use crate::utils::{get_cache_manager_options, get_default_max_num_records, get_executor_options};

use crate::{flatten_join_handle, join_handle_map_err};
use dozer_api::auth::{Access, Authorizer};
Expand Down Expand Up @@ -229,7 +226,7 @@ impl SimpleOrchestrator {
&self.config.sources,
self.config.sql.as_deref(),
&self.config.endpoints,
get_checkpoint_options(&self.config),
self.config.app.data_storage.clone(),
self.labels.clone(),
&self.config.udfs,
))?;
Expand Down
12 changes: 1 addition & 11 deletions dozer-cli/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use dozer_cache::cache::CacheManagerOptions;
use dozer_core::{
checkpoint::{CheckpointFactoryOptions, CheckpointOptions},
epoch::EpochManagerOptions,
executor::ExecutorOptions,
checkpoint::CheckpointFactoryOptions, epoch::EpochManagerOptions, executor::ExecutorOptions,
};
use dozer_types::{
constants::DEFAULT_DEFAULT_MAX_NUM_RECORDS,
Expand Down Expand Up @@ -64,14 +62,6 @@ fn get_max_interval_before_persist_in_seconds(config: &Config) -> u64 {
.unwrap_or_else(default_max_interval_before_persist_in_seconds)
}

pub fn get_checkpoint_options(config: &Config) -> CheckpointOptions {
let app = &config.app;
CheckpointOptions {
data_storage: app.data_storage.clone(),
record_store: app.record_store,
}
}

fn get_checkpoint_factory_options(config: &Config) -> CheckpointFactoryOptions {
CheckpointFactoryOptions {
persist_queue_capacity: config
Expand Down
24 changes: 8 additions & 16 deletions dozer-core/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer,
use dozer_types::{
bincode,
log::{error, info},
models::app_config::{DataStorage, RecordStore},
models::app_config::DataStorage,
node::{NodeHandle, OpIdentifier, SourceStates, TableState},
parking_lot::Mutex,
serde::{Deserialize, Serialize},
Expand Down Expand Up @@ -58,21 +58,14 @@ pub struct OptionCheckpoint {
checkpoint: Option<Checkpoint>,
}

#[derive(Debug, Clone, Default)]
pub struct CheckpointOptions {
pub data_storage: DataStorage,
pub record_store: RecordStore,
}

impl OptionCheckpoint {
pub async fn new(
checkpoint_dir: String,
options: CheckpointOptions,
storage_config: DataStorage,
) -> Result<Self, ExecutionError> {
let (storage, prefix) =
create_data_storage(options.data_storage, checkpoint_dir.to_string()).await?;
let (record_store, checkpoint) =
read_record_store_slices(&*storage, &prefix, options.record_store).await?;
create_data_storage(storage_config, checkpoint_dir.to_string()).await?;
let (record_store, checkpoint) = read_record_store_slices(&*storage, &prefix).await?;
if let Some(checkpoint) = &checkpoint {
info!(
"Restored record store from {}th checkpoint, last epoch id is {}, processor states are stored in {}",
Expand Down Expand Up @@ -296,9 +289,8 @@ impl Drop for CheckpointWriter {
async fn read_record_store_slices(
storage: &dyn Storage,
factory_prefix: &str,
record_store: RecordStore,
) -> Result<(ProcessorRecordStoreDeserializer, Option<Checkpoint>), ExecutionError> {
let record_store = ProcessorRecordStoreDeserializer::new(record_store)?;
let record_store = ProcessorRecordStoreDeserializer::new()?;
let record_store_prefix = record_store_prefix(factory_prefix);

let mut last_checkpoint: Option<Checkpoint> = None;
Expand Down Expand Up @@ -366,7 +358,7 @@ async fn read_record_store_slices(
pub async fn create_checkpoint_for_test() -> (TempDir, OptionCheckpoint) {
let temp_dir = TempDir::new("create_checkpoint_for_test").unwrap();
let checkpoint_dir = temp_dir.path().to_str().unwrap().to_string();
let checkpoint = OptionCheckpoint::new(checkpoint_dir.clone(), Default::default())
let checkpoint = OptionCheckpoint::new(checkpoint_dir.clone(), DataStorage::Local)
.await
.unwrap();
(temp_dir, checkpoint)
Expand All @@ -379,7 +371,7 @@ pub async fn create_checkpoint_factory_for_test(
// Create empty checkpoint storage.
let temp_dir = TempDir::new("create_checkpoint_factory_for_test").unwrap();
let checkpoint_dir = temp_dir.path().to_str().unwrap().to_string();
let checkpoint = OptionCheckpoint::new(checkpoint_dir.clone(), Default::default())
let checkpoint = OptionCheckpoint::new(checkpoint_dir.clone(), DataStorage::Local)
.await
.unwrap();
let (checkpoint_factory, handle) = CheckpointFactory::new(checkpoint, Default::default())
Expand Down Expand Up @@ -412,7 +404,7 @@ pub async fn create_checkpoint_factory_for_test(
handle.await.unwrap();

// Create a new factory that loads from the checkpoint.
let checkpoint = OptionCheckpoint::new(checkpoint_dir, Default::default())
let checkpoint = OptionCheckpoint::new(checkpoint_dir, DataStorage::Local)
.await
.unwrap();
let last_checkpoint = checkpoint.checkpoint.as_ref().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/executor/receiver_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ mod tests {
#[test]
fn receiver_loop_forwards_op() {
let (mut test_loop, senders) = TestReceiverLoop::new(2);
let record_store = ProcessorRecordStore::new(Default::default()).unwrap();
let record_store = ProcessorRecordStore::new().unwrap();
let record: ProcessorRecord = record_store
.create_record(&Record::new(vec![Field::Int(1)]))
.unwrap();
Expand Down
2 changes: 0 additions & 2 deletions dozer-recordstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,4 @@ fuzz = ["dozer-types/arbitrary"]

[dependencies]
dozer-types = { path = "../dozer-types" }
dozer-storage = { path = "../dozer-storage" }
slice-dst = { version = "1.5.1", default-features = false }
tempdir = "0.3.7"
Loading

0 comments on commit 66b7e67

Please sign in to comment.