Skip to content

Commit

Permalink
feat: BigQuery sink
Browse files Browse the repository at this point in the history
  • Loading branch information
abcpro1 committed Jan 16, 2024
1 parent 0d37dab commit 0ac809a
Show file tree
Hide file tree
Showing 12 changed files with 829 additions and 17 deletions.
217 changes: 203 additions & 14 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions dozer-cli/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ pub enum OrchestrationError {
Aborted,
#[error("Failed to create lambda runtime: {0}")]
CreateLambdaRuntime(#[from] dozer_lambda::Error),
#[error("SinkError: {0}")]
SinkError(#[from] BoxedError),
}

#[derive(Error, Debug)]
Expand Down
4 changes: 4 additions & 0 deletions dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ use dozer_core::app::PipelineEntryPoint;
use dozer_core::node::SinkFactory;
use dozer_core::DEFAULT_PORT_HANDLE;
use dozer_ingestion::{get_connector, get_connector_info_table};
use dozer_sinks::bigquery::BigQuerySinkFactory;
use dozer_sql::builder::statement_to_pipeline;
use dozer_sql::builder::{OutputNodeInfo, QueryContext};
use dozer_tracing::LabelsAndProgress;
use dozer_types::log::debug;
use dozer_types::models::connection::Connection;
use dozer_types::models::endpoint::BigQuery;
use dozer_types::models::flags::Flags;
use dozer_types::models::source::Source;
use dozer_types::models::udf_config::UdfConfig;
Expand Down Expand Up @@ -57,6 +59,7 @@ pub struct EndpointLog {
pub enum EndpointLogKind {
Api { log: Arc<Mutex<Log>> },
Dummy,
BigQuery(BigQuery),
}

pub struct PipelineBuilder<'a> {
Expand Down Expand Up @@ -284,6 +287,7 @@ impl<'a> PipelineBuilder<'a> {
self.labels.clone(),
)),
EndpointLogKind::Dummy => Box::new(DummySinkFactory),
EndpointLogKind::BigQuery(config) => Box::new(BigQuerySinkFactory::new(config)),
};

match table_info {
Expand Down
5 changes: 4 additions & 1 deletion dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir};
use dozer_cache::dozer_log::replication::Log;
use dozer_core::checkpoint::{CheckpointOptions, OptionCheckpoint};
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::endpoint::{Endpoint, EndpointKind};
use dozer_types::models::endpoint::{BigQuery, Endpoint, EndpointKind};
use dozer_types::models::flags::Flags;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -44,6 +44,7 @@ struct ExecutorEndpoint {
enum ExecutorEndpointKind {
Api { log_endpoint: LogEndpoint },
Dummy,
BigQuery(BigQuery),
}

impl<'a> Executor<'a> {
Expand Down Expand Up @@ -85,6 +86,7 @@ impl<'a> Executor<'a> {
ExecutorEndpointKind::Api { log_endpoint }
}
EndpointKind::Dummy => ExecutorEndpointKind::Dummy,
EndpointKind::BigQuery(config) => ExecutorEndpointKind::BigQuery(config.clone()),
};

executor_endpoints.push(ExecutorEndpoint {
Expand Down Expand Up @@ -140,6 +142,7 @@ impl<'a> Executor<'a> {
log: log_endpoint.log,
},
ExecutorEndpointKind::Dummy => EndpointLogKind::Dummy,
ExecutorEndpointKind::BigQuery(config) => EndpointLogKind::BigQuery(config),
};
EndpointLog {
table_name: endpoint.table_name,
Expand Down
1 change: 1 addition & 0 deletions dozer-sinks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2021"

[dependencies]
dozer-sinks-snowflake = { path = "./snowflake", optional = true }
dozer-sinks-bigquery = { path = "./bigquery" }

[features]
snowflake = ["dep:dozer-sinks-snowflake"]
22 changes: 22 additions & 0 deletions dozer-sinks/bigquery/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "dozer-sinks-bigquery"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dozer-api = { path = "../../dozer-api" }
dozer-types = { path = "../../dozer-types" }
dozer-log = { path = "../../dozer-log" }
dozer-core = { path = "../../dozer-core" }
dozer-recordstore = { path = "../../dozer-recordstore" }
itertools = "0.10.5"
futures-util = "0.3.28"
chrono = "0.4.31"
gcp-bigquery-client = "0.18.0"
object_store = { version = "0.9.0", features = ["gcp"] }
parquet = "48.0.1"
yup-oauth2 = "8.3.2"
bytes = "1.5.0"
html-escape = "0.2.13"
Loading

0 comments on commit 0ac809a

Please sign in to comment.