Skip to content

Commit

Permalink
Merge branch 'main' into chore/add-api-urls
Browse files Browse the repository at this point in the history
Signed-off-by: Karolis Gudiškis <[email protected]>
  • Loading branch information
karolisg authored Sep 22, 2023
2 parents 214354e + 24d413a commit fab414e
Show file tree
Hide file tree
Showing 39 changed files with 3,304 additions and 645 deletions.
1 change: 0 additions & 1 deletion .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ name: Dozer E2E Test

on:
workflow_dispatch:
merge_group:
pull_request_target:
branches: [main]

Expand Down
47 changes: 42 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dozer-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ http-body = "0.4.5"
bytes = "1.4.0"
http = "0.2.9"
pin-project = "1.1.3"
async-stream = "0.3.5"

[dev-dependencies]
tempdir = "0.3.7"
2 changes: 1 addition & 1 deletion dozer-api/src/grpc/grpc_web_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ where
{
if enabled {
let service = GrpcWebLayer::new().layer(service);
let service = CorsLayer::very_permissive().layer(service);
let service = CorsLayer::permissive().layer(service);
MaybeGrpcWebService::GrpcWeb(service)
} else {
MaybeGrpcWebService::NoGrpcWeb(service)
Expand Down
54 changes: 31 additions & 23 deletions dozer-api/src/grpc/internal/internal_pipeline_server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use async_stream::stream;
use dozer_cache::dozer_log::home_dir::BuildId;
use dozer_cache::dozer_log::replication::{Log, LogResponseFuture};
use dozer_types::bincode;
Expand All @@ -11,16 +12,15 @@ use dozer_types::grpc_types::internal::{
use dozer_types::log::info;
use dozer_types::models::api_config::AppGrpcOptions;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::parking_lot::Mutex;
use dozer_types::tonic::transport::server::TcpIncoming;
use dozer_types::tonic::transport::Server;
use dozer_types::tonic::{self, Request, Response, Status, Streaming};
use futures_util::future::Either;
use futures_util::stream::BoxStream;
use futures_util::{Future, StreamExt, TryStreamExt};
use futures_util::{Future, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;

use crate::errors::GrpcError;
use crate::grpc::run_server;
Expand Down Expand Up @@ -52,7 +52,7 @@ impl InternalPipelineService for InternalPipelineServer {
) -> Result<Response<StorageResponse>, Status> {
let endpoint = request.into_inner().endpoint;
let log = &find_log_endpoint(&self.endpoints, &endpoint)?.log;
let storage = log.lock().describe_storage();
let storage = log.lock().await.describe_storage();
Ok(Response::new(StorageResponse {
storage: Some(storage),
}))
Expand Down Expand Up @@ -103,26 +103,34 @@ impl InternalPipelineService for InternalPipelineServer {
requests: Request<Streaming<LogRequest>>,
) -> Result<Response<Self::GetLogStream>, Status> {
let endpoints = self.endpoints.clone();
Ok(Response::new(
requests
.into_inner()
.and_then(move |request| {
let log = &match find_log_endpoint(&endpoints, &request.endpoint) {
Ok(log) => log,
Err(e) => return Either::Left(std::future::ready(Err(e))),
let requests = requests.into_inner();
let stream = stream! {
for await request in requests {
let request = match request {
Ok(request) => request,
Err(e) => {
yield Err(e);
continue;
}
.log;

let response = log.lock().read(
request.start as usize..request.end as usize,
Duration::from_millis(request.timeout_in_millis as u64),
log.clone(),
);

Either::Right(serialize_log_response(response))
})
.boxed(),
))
};

let log = &match find_log_endpoint(&endpoints, &request.endpoint) {
Ok(log) => log,
Err(e) => {
yield Err(e);
continue;
}
}.log;

let response = log.lock().await.read(
request.start as usize..request.end as usize,
Duration::from_millis(request.timeout_in_millis as u64),
log.clone(),
).await;
yield serialize_log_response(response).await;
}
};
Ok(Response::new(stream.boxed()))
}
}

Expand Down
3 changes: 3 additions & 0 deletions dozer-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ tempfile = "3.8"
actix-files = "0.6.2"
prometheus-parse = "0.2.4"

[build-dependencies]
dozer-types = { path = "../dozer-types" }

[[bin]]
edition = "2021"
name = "dozer"
Expand Down
22 changes: 22 additions & 0 deletions dozer-cli/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::fs::File;
use std::io::Write;
use std::path::Path;

fn main() {
let schema_path = Path::new("../json_schemas");
// Define the path to the file we want to create or overwrite
let connection_path = schema_path.join("connections.json");
let dozer_path = schema_path.join("dozer.json");

let mut file = File::create(connection_path).expect("Failed to create connections.json");
let schemas = dozer_types::models::get_connection_schemas().unwrap();
write!(file, "{}", schemas).expect("Unable to write file");

let mut dozer_schema_file = File::create(dozer_path).expect("Failed to create dozer.json");
let schema = dozer_types::models::get_dozer_schema().unwrap();
write!(dozer_schema_file, "{}", schema).expect("Unable to write file");

// Print a message to indicate the file has been written
println!("cargo:rerun-if-changed=build.rs");
println!("Written to {:?}", schema_path.display());
}
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use dozer_types::models::connection::Connection;
use dozer_types::models::flags::Flags;
use dozer_types::models::source::Source;
use dozer_types::models::udf_config::UdfConfig;
use dozer_types::parking_lot::Mutex;
use std::hash::Hash;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;

use crate::pipeline::dummy_sink::DummySinkFactory;
use crate::pipeline::LogSinkFactory;
Expand Down
50 changes: 21 additions & 29 deletions dozer-cli/src/pipeline/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use dozer_core::{
};
use dozer_recordstore::ProcessorRecordStore;
use dozer_tracing::LabelsAndProgress;
use dozer_types::errors::internal::BoxedError;
use dozer_types::indicatif::ProgressBar;
use dozer_types::types::Schema;
use dozer_types::{errors::internal::BoxedError, parking_lot::Mutex};
use tokio::runtime::Runtime;
use tokio::{runtime::Runtime, sync::Mutex};

#[derive(Debug)]
pub struct LogSinkFactory {
Expand Down Expand Up @@ -69,7 +69,6 @@ pub struct LogSink {
runtime: Arc<Runtime>,
log: Arc<Mutex<Log>>,
pb: ProgressBar,
counter: u64,
}

impl LogSink {
Expand All @@ -80,18 +79,7 @@ impl LogSink {
labels: LabelsAndProgress,
) -> Self {
let pb = labels.create_progress_bar(endpoint_name);
let counter = log.lock().end() as u64;
Self {
runtime,
log,
pb,
counter,
}
}

fn update_counter(&mut self) {
self.counter += 1;
self.pb.set_position(self.counter);
Self { runtime, log, pb }
}
}

Expand All @@ -102,35 +90,39 @@ impl Sink for LogSink {
record_store: &ProcessorRecordStore,
op: ProcessorOperation,
) -> Result<(), BoxedError> {
self.log
.lock()
.write(dozer_cache::dozer_log::replication::LogOperation::Op {
let end = self.runtime.block_on(self.log.lock()).write(
dozer_cache::dozer_log::replication::LogOperation::Op {
op: op.load(record_store)?,
});
self.update_counter();
},
);
self.pb.set_position(end as u64);
Ok(())
}

fn commit(&mut self, epoch_details: &Epoch) -> Result<(), BoxedError> {
self.log.lock().write(LogOperation::Commit {
decision_instant: epoch_details.decision_instant,
});
self.update_counter();
let end = self
.runtime
.block_on(self.log.lock())
.write(LogOperation::Commit {
decision_instant: epoch_details.decision_instant,
});
self.pb.set_position(end as u64);
Ok(())
}

fn persist(&mut self, queue: &Queue) -> Result<(), BoxedError> {
self.log
.lock()
self.runtime
.block_on(self.log.lock())
.persist(queue, self.log.clone(), &self.runtime)?;
Ok(())
}

fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError> {
self.log
.lock()
let end = self
.runtime
.block_on(self.log.lock())
.write(LogOperation::SnapshottingDone { connection_name });
self.update_counter();
self.pb.set_position(end as u64);
Ok(())
}
}
4 changes: 1 addition & 3 deletions dozer-cli/src/simple/cloud_orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,7 @@ impl CloudOrchestrator for SimpleOrchestrator {
let mut table = table!();

for app in response.apps {
if let Some(app_data) = app.app {
table.add_row(row![app.app_id, app_data.convert_to_table()]);
}
table.add_row(row![app.app_id, app.app_name]);
}

table.printstd();
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use dozer_core::checkpoint::{CheckpointFactory, CheckpointFactoryOptions, Option
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::models::flags::Flags;
use dozer_types::parking_lot::Mutex;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;

use std::sync::{atomic::AtomicBool, Arc};

Expand Down
8 changes: 6 additions & 2 deletions dozer-core/src/epoch/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,16 @@ impl EpochManager {
let instant = SystemTime::now();
let action = if *should_commit {
let num_records = self.record_store().num_records();
if num_records - state.next_record_index_to_persist
if source_states.values().all(|table_states| {
table_states
.values()
.all(|&state| state != TableState::NonRestartable)
}) && (num_records - state.next_record_index_to_persist
>= self.options.max_num_records_before_persist
|| instant
.duration_since(state.last_persisted_epoch_decision_instant)
.unwrap_or(Duration::from_secs(0))
>= Duration::from_secs(self.options.max_interval_before_persist_in_seconds)
>= Duration::from_secs(self.options.max_interval_before_persist_in_seconds))
{
state.next_record_index_to_persist = num_records;
state.last_persisted_epoch_decision_instant = instant;
Expand Down
Loading

0 comments on commit fab414e

Please sign in to comment.