Skip to content

Commit

Permalink
Write dozer.lock (#1945)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker authored Aug 31, 2023
1 parent 7865281 commit 18d6eab
Show file tree
Hide file tree
Showing 13 changed files with 111 additions and 50 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dozer-config.test.*
log4rs.yaml
.DS_Store
queries
dozer.lock

.dozer/
logs/
Expand Down
8 changes: 7 additions & 1 deletion dozer-cli/src/cli/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use crate::errors::CliError::{ConfigurationFilePathNotProvided, FailedToFindConf
use crate::errors::ConfigCombineError::CannotReadConfig;
use crate::errors::OrchestrationError;
use crate::simple::SimpleOrchestrator as Dozer;

use atty::Stream;
use dozer_cache::dozer_log::camino::Utf8PathBuf;
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::config::default_cache_max_map_size;
use dozer_types::prettytable::{row, Table};
Expand Down Expand Up @@ -35,7 +37,11 @@ pub async fn init_dozer(
let page_size = page_size::get() as u64;
config.cache_max_map_size = Some(cache_max_map_size / page_size * page_size);

Ok(Dozer::new(config, runtime, labels))
let base_directory = std::env::current_dir().map_err(CliError::Io)?;
let base_directory =
Utf8PathBuf::try_from(base_directory).map_err(|e| CliError::Io(e.into_io_error()))?;

Ok(Dozer::new(base_directory, config, runtime, labels))
}

pub async fn list_sources(
Expand Down
3 changes: 3 additions & 0 deletions dozer-cli/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ pub enum CliError {
MissingConfigOverride(String),
#[error("Failed to deserialize config from json: {0}")]
DeserializeConfigFromJson(#[source] serde_json::Error),
// Generic IO error
#[error(transparent)]
Io(#[from] std::io::Error),
}

#[derive(Error, Debug)]
Expand Down
11 changes: 6 additions & 5 deletions dozer-cli/src/simple/build/contract/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
path::Path,
};

use dozer_cache::dozer_log::{home_dir::BuildPath, schemas::EndpointSchema};
use dozer_cache::dozer_log::schemas::EndpointSchema;
use dozer_core::{
dag_schemas::DagSchemas,
daggy::{self, NodeIndex},
Expand Down Expand Up @@ -162,13 +162,13 @@ impl Contract {
})
}

pub fn serialize(&self, build_path: &BuildPath) -> Result<(), BuildError> {
serde_json_to_path(&build_path.dag_path, &self)?;
pub fn serialize(&self, path: &Path) -> Result<(), BuildError> {
serde_json_to_path(path, &self)?;
Ok(())
}

pub fn deserialize(build_path: &BuildPath) -> Result<Self, BuildError> {
serde_json_from_path(&build_path.dag_path)
pub fn deserialize(path: &Path) -> Result<Self, BuildError> {
serde_json_from_path(path)
}
}

Expand Down Expand Up @@ -222,6 +222,7 @@ fn serde_json_to_path(path: impl AsRef<Path>, value: &impl Serialize) -> Result<
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path.as_ref())
.map_err(|e| BuildError::FileSystem(path.as_ref().into(), e))?;
serde_json::to_writer_pretty(file, value).map_err(BuildError::SerdeJson)
Expand Down
24 changes: 14 additions & 10 deletions dozer-cli/src/simple/build/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,25 @@ pub use contract::{Contract, PipelineContract};
pub async fn build(
home_dir: &HomeDir,
contract: &Contract,
existing_contract: Option<&Contract>,
storage_config: &DataStorage,
) -> Result<(), BuildError> {
if let Some(build_id) = needs_build(home_dir, contract, storage_config).await? {
if let Some(build_id) =
new_build_id(home_dir, contract, existing_contract, storage_config).await?
{
let build_name = build_id.name().to_string();
create_build(home_dir, build_id, contract)?;
build_endpoint_protos(home_dir, build_id, contract)?;
info!("Created new build {build_name}");
} else {
info!("Building not needed");
}
Ok(())
}

async fn needs_build(
async fn new_build_id(
home_dir: &HomeDir,
contract: &Contract,
existing_contract: Option<&Contract>,
storage_config: &DataStorage,
) -> Result<Option<BuildId>, BuildError> {
let build_path = home_dir
Expand All @@ -40,6 +44,10 @@ async fn needs_build(
return Ok(Some(BuildId::first()));
};

let Some(existing_contract) = existing_contract else {
return Ok(Some(build_path.id.next()));
};

let mut futures = vec![];
for endpoint in contract.endpoints.keys() {
let endpoint_path = build_path.get_endpoint_path(endpoint);
Expand All @@ -52,20 +60,18 @@ async fn needs_build(
if !try_join_all(futures)
.await?
.into_iter()
.all(|is_empty| is_empty)
.all(std::convert::identity)
{
return Ok(Some(build_path.id.next()));
}

let existing_contract = Contract::deserialize(&build_path)?;
for (endpoint, schema) in &contract.endpoints {
if let Some(existing_schema) = existing_contract.endpoints.get(endpoint) {
if schema == existing_schema {
continue;
}
} else {
return Ok(Some(build_path.id.next()));
}
return Ok(Some(build_path.id.next()));
}
Ok(None)
}
Expand All @@ -75,7 +81,7 @@ async fn is_empty(storage: Box<dyn Storage>, prefix: String) -> Result<bool, Bui
Ok(objects.objects.is_empty())
}

fn create_build(
fn build_endpoint_protos(
home_dir: &HomeDir,
build_id: BuildId,
contract: &Contract,
Expand Down Expand Up @@ -104,7 +110,5 @@ fn create_build(
&resources,
)?;

contract.serialize(&build_path)?;

Ok(())
}
9 changes: 7 additions & 2 deletions dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ pub struct Executor<'a> {
}

impl<'a> Executor<'a> {
// TODO: Refactor this to not require both `contract` and all of
// connections, sources and sql
#[allow(clippy::too_many_arguments)]
pub async fn new(
home_dir: &'a HomeDir,
contract: &Contract,
connections: &'a [Connection],
sources: &'a [Source],
sql: Option<&'a str>,
Expand All @@ -63,7 +67,8 @@ impl<'a> Executor<'a> {
let mut endpoint_and_logs = vec![];
for endpoint in api_endpoints {
let log_endpoint =
create_log_endpoint(&build_path, &endpoint.name, &checkpoint_factory).await?;
create_log_endpoint(contract, &build_path, &endpoint.name, &checkpoint_factory)
.await?;
endpoint_and_logs.push((endpoint.clone(), log_endpoint));
}

Expand Down Expand Up @@ -119,13 +124,13 @@ pub fn run_dag_executor(
}

async fn create_log_endpoint(
contract: &Contract,
build_path: &BuildPath,
endpoint_name: &str,
checkpoint_factory: &CheckpointFactory,
) -> Result<LogEndpoint, OrchestrationError> {
let endpoint_path = build_path.get_endpoint_path(endpoint_name);

let contract = Contract::deserialize(build_path)?;
let schema = contract
.endpoints
.get(endpoint_name)
Expand Down
54 changes: 44 additions & 10 deletions dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::executor::{run_dag_executor, Executor};
use super::Contract;
use crate::errors::OrchestrationError;
use crate::pipeline::PipelineBuilder;
use crate::shutdown::ShutdownReceiver;
Expand All @@ -15,10 +16,12 @@ use dozer_api::auth::{Access, Authorizer};
use dozer_api::grpc::internal::internal_pipeline_server::start_internal_pipeline_server;
use dozer_api::{grpc, rest, CacheEndpoint};
use dozer_cache::cache::LmdbRwCacheManager;
use dozer_cache::dozer_log::camino::Utf8PathBuf;
use dozer_cache::dozer_log::home_dir::HomeDir;
use dozer_core::app::AppPipeline;
use dozer_core::dag_schemas::DagSchemas;
use dozer_tracing::LabelsAndProgress;
use dozer_types::constants::LOCK_FILE;
use dozer_types::models::flags::default_push_events;
use tokio::select;

Expand All @@ -39,7 +42,6 @@ use futures::{FutureExt, StreamExt, TryFutureExt};
use metrics::{describe_counter, describe_histogram};
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;

use std::sync::Arc;
use std::thread;
Expand All @@ -48,14 +50,21 @@ use tokio::sync::broadcast;

#[derive(Clone)]
pub struct SimpleOrchestrator {
pub base_directory: Utf8PathBuf,
pub config: Config,
pub runtime: Arc<Runtime>,
pub labels: LabelsAndProgress,
}

impl SimpleOrchestrator {
pub fn new(config: Config, runtime: Arc<Runtime>, labels: LabelsAndProgress) -> Self {
pub fn new(
base_directory: Utf8PathBuf,
config: Config,
runtime: Arc<Runtime>,
labels: LabelsAndProgress,
) -> Self {
Self {
base_directory,
config,
runtime,
labels,
Expand Down Expand Up @@ -173,14 +182,28 @@ impl SimpleOrchestrator {
Ok(())
}

pub fn home_dir(&self) -> Utf8PathBuf {
self.base_directory.join(&self.config.home_dir)
}

pub fn cache_dir(&self) -> Utf8PathBuf {
self.base_directory.join(&self.config.cache_dir)
}

pub fn lockfile_path(&self) -> Utf8PathBuf {
self.base_directory.join(LOCK_FILE)
}

pub fn run_apps(
&mut self,
shutdown: ShutdownReceiver,
api_notifier: Option<Sender<bool>>,
) -> Result<(), OrchestrationError> {
let home_dir = HomeDir::new(self.config.home_dir.clone(), self.config.cache_dir.clone());
let home_dir = HomeDir::new(self.home_dir(), self.cache_dir());
let contract = Contract::deserialize(self.lockfile_path().as_std_path())?;
let executor = self.runtime.block_on(Executor::new(
&home_dir,
&contract,
&self.config.connections,
&self.config.sources,
self.config.sql.as_deref(),
Expand Down Expand Up @@ -270,7 +293,9 @@ impl SimpleOrchestrator {
force: bool,
shutdown: ShutdownReceiver,
) -> Result<(), OrchestrationError> {
let home_dir = HomeDir::new(self.config.home_dir.clone(), self.config.cache_dir.clone());
let home_dir = self.home_dir();
let cache_dir = self.cache_dir();
let home_dir = HomeDir::new(home_dir, cache_dir);

info!(
"Initiating app: {}",
Expand Down Expand Up @@ -327,27 +352,36 @@ impl SimpleOrchestrator {
enable_on_event,
)?;

let contract_path = self.lockfile_path();
let existing_contract = Contract::deserialize(contract_path.as_std_path()).ok();

// Run build
let storage_config = get_storage_config(&self.config);
self.runtime
.block_on(build::build(&home_dir, &contract, &storage_config))?;
self.runtime.block_on(build::build(
&home_dir,
&contract,
existing_contract.as_ref(),
&storage_config,
))?;

contract.serialize(contract_path.as_std_path())?;

Ok(())
}

// Cleaning the entire folder as there will be inconsistencies
// between pipeline, cache and generated proto files.
pub fn clean(&mut self) -> Result<(), OrchestrationError> {
let cache_dir = PathBuf::from(self.config.cache_dir.clone());
let cache_dir = self.cache_dir();
if cache_dir.exists() {
fs::remove_dir_all(&cache_dir)
.map_err(|e| ExecutionError::FileSystemError(cache_dir, e))?;
.map_err(|e| ExecutionError::FileSystemError(cache_dir.into_std_path_buf(), e))?;
};

let home_dir = PathBuf::from(self.config.home_dir.clone());
let home_dir = self.home_dir();
if home_dir.exists() {
fs::remove_dir_all(&home_dir)
.map_err(|e| ExecutionError::FileSystemError(home_dir, e))?;
.map_err(|e| ExecutionError::FileSystemError(home_dir.into_std_path_buf(), e))?;
};

Ok(())
Expand Down
24 changes: 9 additions & 15 deletions dozer-ingestion/tests/test_suite/connectors/dozer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ use dozer_cli::shutdown::{self, ShutdownSender};
use dozer_cli::simple::SimpleOrchestrator;
use dozer_ingestion::connectors::dozer::NestedDozerConnector;
use dozer_ingestion::connectors::{CdcType, SourceSchema};
use dozer_types::constants::LOCK_FILE;
use dozer_types::grpc_types::conversions::field_to_grpc;
use dozer_types::grpc_types::ingest::ingest_service_client::IngestServiceClient;
use dozer_types::grpc_types::ingest::{IngestRequest, OperationType};
use dozer_types::grpc_types::types::Record;
use dozer_types::ingestion_types::GrpcConfig;
use dozer_types::log::info;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::models::config::{default_cache_dir, default_home_dir};
use dozer_types::models::source::Source;
use dozer_types::types::{Field, FieldDefinition, FieldType};
use dozer_types::{
Expand Down Expand Up @@ -196,14 +198,12 @@ async fn create_nested_dozer_server(
)),
adapter: "default".to_owned(),
};
let dozer_dir = temp_dir.path().join(".dozer");
let cache_dir = dozer_dir.join("cache");
std::fs::create_dir_all(&cache_dir).unwrap();

let config = dozer_types::models::config::Config {
version: 1,
app_name: "nested-dozer-connector-test".to_owned(),
home_dir: dozer_dir.to_str().unwrap().to_owned(),
cache_dir: cache_dir.to_str().unwrap().to_owned(),
home_dir: default_home_dir(),
cache_dir: default_cache_dir(),
connections: vec![dozer_types::models::connection::Connection {
config: Some(dozer_types::models::connection::ConnectionConfig::Grpc(
grpc_config,
Expand All @@ -227,19 +227,13 @@ async fn create_nested_dozer_server(
version: None,
log_reader_options: None,
}],
api: None,
sql: None,
flags: None,
cache_max_map_size: None,
app: None,
telemetry: None,
cloud: None,
udfs: vec![],
..Default::default()
};

let dozer_runtime = Runtime::new().expect("Failed to start tokio runtime for nested dozer");
let runtime = Arc::new(dozer_runtime);
let mut dozer = SimpleOrchestrator::new(config, runtime.clone(), Default::default());
let directory = temp_dir.path().to_owned().try_into().unwrap();
let mut dozer = SimpleOrchestrator::new(directory, config, runtime.clone(), Default::default());
let (shutdown_sender, shutdown_receiver) = shutdown::new(&dozer.runtime);
let dozer_thread = std::thread::spawn(move || dozer.run_all(shutdown_receiver).unwrap());

Expand Down Expand Up @@ -281,7 +275,7 @@ impl Drop for DozerConnectorTest {
if let Some((join_handle, shutdown)) = self.shutdown.take() {
shutdown.shutdown();
info!("Sent shutdown signal");
join_handle.join().unwrap();
let _ = join_handle.join();
info!("Joined dozer thread");
}
}
Expand Down
Loading

0 comments on commit 18d6eab

Please sign in to comment.