Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "feat: Add option to use rocksdb record store" #2082

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 1 addition & 51 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use dozer_api::grpc::internal::internal_pipeline_server::LogEndpoint;
use dozer_cache::dozer_log::camino::Utf8Path;
use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir};
use dozer_cache::dozer_log::replication::Log;
use dozer_core::checkpoint::{CheckpointOptions, OptionCheckpoint};
use dozer_core::checkpoint::OptionCheckpoint;
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::models::app_config::DataStorage;
use dozer_types::models::flags::Flags;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -46,7 +47,7 @@ impl<'a> Executor<'a> {
sources: &'a [Source],
sql: Option<&'a str>,
api_endpoints: &'a [ApiEndpoint],
checkpoint_options: CheckpointOptions,
storage_config: DataStorage,
labels: LabelsAndProgress,
udfs: &'a [UdfConfig],
) -> Result<Executor<'a>, OrchestrationError> {
Expand All @@ -58,7 +59,7 @@ impl<'a> Executor<'a> {

// Load pipeline checkpoint.
let checkpoint =
OptionCheckpoint::new(build_path.data_dir.to_string(), checkpoint_options).await?;
OptionCheckpoint::new(build_path.data_dir.to_string(), storage_config).await?;

let mut endpoint_and_logs = vec![];
for endpoint in api_endpoints {
Expand Down
7 changes: 2 additions & 5 deletions dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use crate::pipeline::PipelineBuilder;
use crate::shutdown::ShutdownReceiver;
use crate::simple::build;
use crate::simple::helper::validate_config;
use crate::utils::{
get_cache_manager_options, get_checkpoint_options, get_default_max_num_records,
get_executor_options,
};
use crate::utils::{get_cache_manager_options, get_default_max_num_records, get_executor_options};

use crate::{flatten_join_handle, join_handle_map_err};
use dozer_api::auth::{Access, Authorizer};
Expand Down Expand Up @@ -229,7 +226,7 @@ impl SimpleOrchestrator {
&self.config.sources,
self.config.sql.as_deref(),
&self.config.endpoints,
get_checkpoint_options(&self.config),
self.config.app.data_storage.clone(),
self.labels.clone(),
&self.config.udfs,
))?;
Expand Down
12 changes: 1 addition & 11 deletions dozer-cli/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use dozer_cache::cache::CacheManagerOptions;
use dozer_core::{
checkpoint::{CheckpointFactoryOptions, CheckpointOptions},
epoch::EpochManagerOptions,
executor::ExecutorOptions,
checkpoint::CheckpointFactoryOptions, epoch::EpochManagerOptions, executor::ExecutorOptions,
};
use dozer_types::{
constants::DEFAULT_DEFAULT_MAX_NUM_RECORDS,
Expand Down Expand Up @@ -64,14 +62,6 @@ fn get_max_interval_before_persist_in_seconds(config: &Config) -> u64 {
.unwrap_or_else(default_max_interval_before_persist_in_seconds)
}

pub fn get_checkpoint_options(config: &Config) -> CheckpointOptions {
let app = &config.app;
CheckpointOptions {
data_storage: app.data_storage.clone(),
record_store: app.record_store,
}
}

fn get_checkpoint_factory_options(config: &Config) -> CheckpointFactoryOptions {
CheckpointFactoryOptions {
persist_queue_capacity: config
Expand Down
24 changes: 8 additions & 16 deletions dozer-core/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer,
use dozer_types::{
bincode,
log::{error, info},
models::app_config::{DataStorage, RecordStore},
models::app_config::DataStorage,
node::{NodeHandle, OpIdentifier, SourceStates, TableState},
parking_lot::Mutex,
serde::{Deserialize, Serialize},
Expand Down Expand Up @@ -58,21 +58,14 @@ pub struct OptionCheckpoint {
checkpoint: Option<Checkpoint>,
}

#[derive(Debug, Clone, Default)]
pub struct CheckpointOptions {
pub data_storage: DataStorage,
pub record_store: RecordStore,
}

impl OptionCheckpoint {
pub async fn new(
checkpoint_dir: String,
options: CheckpointOptions,
storage_config: DataStorage,
) -> Result<Self, ExecutionError> {
let (storage, prefix) =
create_data_storage(options.data_storage, checkpoint_dir.to_string()).await?;
let (record_store, checkpoint) =
read_record_store_slices(&*storage, &prefix, options.record_store).await?;
create_data_storage(storage_config, checkpoint_dir.to_string()).await?;
let (record_store, checkpoint) = read_record_store_slices(&*storage, &prefix).await?;
if let Some(checkpoint) = &checkpoint {
info!(
"Restored record store from {}th checkpoint, last epoch id is {}, processor states are stored in {}",
Expand Down Expand Up @@ -296,9 +289,8 @@ impl Drop for CheckpointWriter {
async fn read_record_store_slices(
storage: &dyn Storage,
factory_prefix: &str,
record_store: RecordStore,
) -> Result<(ProcessorRecordStoreDeserializer, Option<Checkpoint>), ExecutionError> {
let record_store = ProcessorRecordStoreDeserializer::new(record_store)?;
let record_store = ProcessorRecordStoreDeserializer::new()?;
let record_store_prefix = record_store_prefix(factory_prefix);

let mut last_checkpoint: Option<Checkpoint> = None;
Expand Down Expand Up @@ -366,7 +358,7 @@ async fn read_record_store_slices(
pub async fn create_checkpoint_for_test() -> (TempDir, OptionCheckpoint) {
let temp_dir = TempDir::new("create_checkpoint_for_test").unwrap();
let checkpoint_dir = temp_dir.path().to_str().unwrap().to_string();
let checkpoint = OptionCheckpoint::new(checkpoint_dir.clone(), Default::default())
let checkpoint = OptionCheckpoint::new(checkpoint_dir.clone(), DataStorage::Local)
.await
.unwrap();
(temp_dir, checkpoint)
Expand All @@ -379,7 +371,7 @@ pub async fn create_checkpoint_factory_for_test(
// Create empty checkpoint storage.
let temp_dir = TempDir::new("create_checkpoint_factory_for_test").unwrap();
let checkpoint_dir = temp_dir.path().to_str().unwrap().to_string();
let checkpoint = OptionCheckpoint::new(checkpoint_dir.clone(), Default::default())
let checkpoint = OptionCheckpoint::new(checkpoint_dir.clone(), DataStorage::Local)
.await
.unwrap();
let (checkpoint_factory, handle) = CheckpointFactory::new(checkpoint, Default::default())
Expand Down Expand Up @@ -412,7 +404,7 @@ pub async fn create_checkpoint_factory_for_test(
handle.await.unwrap();

// Create a new factory that loads from the checkpoint.
let checkpoint = OptionCheckpoint::new(checkpoint_dir, Default::default())
let checkpoint = OptionCheckpoint::new(checkpoint_dir, DataStorage::Local)
.await
.unwrap();
let last_checkpoint = checkpoint.checkpoint.as_ref().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/executor/receiver_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ mod tests {
#[test]
fn receiver_loop_forwards_op() {
let (mut test_loop, senders) = TestReceiverLoop::new(2);
let record_store = ProcessorRecordStore::new(Default::default()).unwrap();
let record_store = ProcessorRecordStore::new().unwrap();
let record: ProcessorRecord = record_store
.create_record(&Record::new(vec![Field::Int(1)]))
.unwrap();
Expand Down
2 changes: 0 additions & 2 deletions dozer-recordstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,4 @@ fuzz = ["dozer-types/arbitrary"]

[dependencies]
dozer-types = { path = "../dozer-types" }
dozer-storage = { path = "../dozer-storage" }
slice-dst = { version = "1.5.1", default-features = false }
tempdir = "0.3.7"
Loading