Skip to content

Commit

Permalink
feat: deno connector (#1816)
Browse files Browse the repository at this point in the history
Co-authored-by: chubei <[email protected]>
  • Loading branch information
v3g42 and chubei authored Nov 1, 2023
1 parent 07dbeb3 commit 6db6917
Show file tree
Hide file tree
Showing 19 changed files with 3,620 additions and 403 deletions.
3,396 changes: 3,052 additions & 344 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,14 @@ impl<'a> PipelineBuilder<'a> {
// For not breaking current functionality, current format is to be still supported.
pub async fn get_grouped_tables(
&self,
runtime: &Arc<Runtime>,
original_sources: &[String],
) -> Result<HashMap<Connection, Vec<Source>>, OrchestrationError> {
let mut grouped_connections: HashMap<Connection, Vec<Source>> = HashMap::new();

let mut connector_map = HashMap::new();
for connection in self.connections {
let connector = get_connector(connection.clone())
let connector = get_connector(runtime.clone(), connection.clone())
.map_err(|e| ConnectorSourceFactoryError::Connector(e.into()))?;

if let Some(info_table) = get_connector_info_table(connection) {
Expand Down Expand Up @@ -205,7 +206,7 @@ impl<'a> PipelineBuilder<'a> {

debug!("Used Sources: {:?}", calculated_sources.original_sources);
let grouped_connections = self
.get_grouped_tables(&calculated_sources.original_sources)
.get_grouped_tables(runtime, &calculated_sources.original_sources)
.await?;

let mut pipelines: Vec<AppPipeline> = vec![];
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl ConnectorSourceFactory {
) -> Result<Self, ConnectorSourceFactoryError> {
let connection_name = connection.name.clone();

let connector = get_connector(connection)
let connector = get_connector(runtime.clone(), connection)
.map_err(|e| ConnectorSourceFactoryError::Connector(e.into()))?;

// Fill column names if not provided.
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/tests/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn load_multi_sources() {
.unwrap();
let runtime = Arc::new(runtime);
let grouped_connections = runtime
.block_on(builder.get_grouped_tables(&used_sources))
.block_on(builder.get_grouped_tables(&runtime, &used_sources))
.unwrap();

let source_builder = SourceBuilder::new(grouped_connections, Default::default());
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl SimpleOrchestrator {
) -> Result<HashMap<String, (Vec<TableInfo>, Vec<SourceSchema>)>, OrchestrationError> {
let mut schema_map = HashMap::new();
for connection in &self.config.connections {
let connector = get_connector(connection.clone())
let connector = get_connector(self.runtime.clone(), connection.clone())
.map_err(|e| ConnectorSourceFactoryError::Connector(e.into()))?;
let schema_tuples = connector
.list_all_schemas()
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dozer-ingestion-deltalake = { path = "./deltalake" }
dozer-ingestion-dozer = { path = "./dozer" }
dozer-ingestion-ethereum = { path = "./ethereum", optional = true }
dozer-ingestion-grpc = { path = "./grpc" }
dozer-ingestion-javascript = { path = "./javascript" }
dozer-ingestion-kafka = { path = "./kafka", optional = true }
dozer-ingestion-mongodb = { path = "./mongodb", optional = true }
dozer-ingestion-mysql = { path = "./mysql" }
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/benches/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub fn get_progress() -> ProgressBar {
}

pub fn get_connection_iterator(runtime: Arc<Runtime>, config: TestConfig) -> IngestionIterator {
let connector = dozer_ingestion::get_connector(config.connection).unwrap();
let connector = dozer_ingestion::get_connector(runtime.clone(), config.connection).unwrap();
let tables = runtime.block_on(list_tables(&*connector));
let (ingestor, iterator) = Ingestor::initialize_channel(Default::default());
runtime.clone().spawn_blocking(move || async move {
Expand Down
14 changes: 14 additions & 0 deletions dozer-ingestion/javascript/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "dozer-ingestion-javascript"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
deno_ast = "0.29.5"
deno_runtime = "0.129.0"
dozer-ingestion-connector = { path = "../connector" }

[dev-dependencies]
camino = "1.1.6"
18 changes: 18 additions & 0 deletions dozer-ingestion/javascript/src/js_extension/ingest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
(async () => {
const url = 'https://api.github.com/repos/getdozer/dozer/commits';
const response = await fetch(url);

const commits = await response.json();

const snapshot_msg = { typ: "SnapshottingDone", old_val: null, new_val: null };
await Deno[Deno.internal].core.ops.ingest(snapshot_msg);

for (const commit of commits) {
const msg = {
typ: "Insert",
old_val: null,
new_val: { commit: commit.sha },
};
await Deno[Deno.internal].core.ops.ingest(msg);
}
})();
164 changes: 164 additions & 0 deletions dozer-ingestion/javascript/src/js_extension/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
use std::{future::Future, sync::Arc};

use deno_runtime::{
deno_core::{self, anyhow::Error, extension, op2, ModuleSpecifier},
permissions::PermissionsContainer,
worker::{MainWorker, WorkerOptions},
};
use dozer_ingestion_connector::{
dozer_types::{
errors::{internal::BoxedError, types::DeserializationError},
json_types::serde_json_to_json_value,
models::ingestion_types::IngestionMessage,
serde::{Deserialize, Serialize},
serde_json,
thiserror::{self, Error},
types::{Field, Operation, Record},
},
tokio::{runtime::Runtime, task::LocalSet},
Ingestor,
};

#[derive(Deserialize, Serialize, Debug)]
#[serde(crate = "dozer_ingestion_connector::dozer_types::serde")]
pub enum MsgType {
SnapshottingStarted,
SnapshottingDone,
Insert,
Delete,
Update,
}

#[derive(Deserialize, Serialize, Debug)]
#[serde(crate = "dozer_ingestion_connector::dozer_types::serde")]
pub struct JsMessage {
typ: MsgType,
old_val: serde_json::Value,
new_val: serde_json::Value,
}

#[op2(async)]
fn ingest(
#[state] ingestor: &Ingestor,
#[serde] val: JsMessage,
) -> impl Future<Output = Result<(), Error>> {
send(ingestor.clone(), val)
}

extension!(
dozer_extension,
ops = [ingest],
options = { ingestor: Ingestor },
state = |state, options| {
state.put(options.ingestor);
},
);

pub struct JsExtension {
runtime: Arc<Runtime>,
ingestor: Ingestor,
module_specifier: ModuleSpecifier,
}

#[derive(Debug, Error)]
pub enum JsExtensionError {
#[error("Failed to canonicalize path {0}: {1}")]
CanonicalizePath(String, #[source] std::io::Error),
}

impl JsExtension {
pub fn new(
runtime: Arc<Runtime>,
ingestor: Ingestor,
js_path: String,
) -> Result<Self, JsExtensionError> {
let path = std::fs::canonicalize(js_path.clone())
.map_err(|e| JsExtensionError::CanonicalizePath(js_path, e))?;
let module_specifier =
ModuleSpecifier::from_file_path(path).expect("we just canonicalized it");
Ok(Self {
runtime,
ingestor,
module_specifier,
})
}

pub async fn run(self) -> Result<(), BoxedError> {
let runtime = self.runtime.clone();
runtime
.spawn_blocking(move || {
let local_set = LocalSet::new();
local_set.block_on(&self.runtime, async move {
let mut worker = MainWorker::bootstrap_from_options(
self.module_specifier.clone(),
PermissionsContainer::allow_all(),
WorkerOptions {
module_loader: std::rc::Rc::new(
ts_module_loader::TypescriptModuleLoader::with_no_source_map(),
),
extensions: vec![dozer_extension::init_ops(self.ingestor)],
..Default::default()
},
);

worker.execute_main_module(&self.module_specifier).await?;
worker.run_event_loop(false).await
})
})
.await
.unwrap() // Propagate panics.
.map_err(Into::into)
}
}

async fn send(ingestor: Ingestor, val: JsMessage) -> Result<(), Error> {
let msg = match val.typ {
MsgType::SnapshottingStarted => IngestionMessage::SnapshottingStarted,
MsgType::SnapshottingDone => IngestionMessage::SnapshottingDone,
MsgType::Insert | MsgType::Delete | MsgType::Update => {
let op = map_operation(val)?;
IngestionMessage::OperationEvent {
table_index: 0,
op,
id: None,
}
}
};

// Ignore if the receiver is closed.
let _ = ingestor.handle_message(msg).await;
Ok(())
}

fn map_operation(msg: JsMessage) -> Result<Operation, DeserializationError> {
Ok(match msg.typ {
MsgType::Insert => Operation::Insert {
new: Record {
values: vec![Field::Json(serde_json_to_json_value(msg.new_val)?)],
lifetime: None,
},
},
MsgType::Delete => Operation::Delete {
old: Record {
values: vec![Field::Json(serde_json_to_json_value(msg.old_val)?)],
lifetime: None,
},
},
MsgType::Update => Operation::Update {
old: Record {
values: vec![Field::Json(serde_json_to_json_value(msg.old_val)?)],
lifetime: None,
},
new: Record {
values: vec![Field::Json(serde_json_to_json_value(msg.new_val)?)],
lifetime: None,
},
},
_ => unreachable!(),
})
}

mod ts_module_loader;

#[cfg(test)]
mod tests;
30 changes: 30 additions & 0 deletions dozer-ingestion/javascript/src/js_extension/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::time::Duration;

use camino::Utf8Path;
use dozer_ingestion_connector::{test_util::create_test_runtime, IngestionConfig, Ingestor};

use super::JsExtension;

#[test]
fn test_deno() {
let js_path = Utf8Path::new(env!("CARGO_MANIFEST_DIR")).join("./src/js_extension/ingest.js");

let (ingestor, mut iterator) = Ingestor::initialize_channel(IngestionConfig::default());

let runtime = create_test_runtime();
let ext = JsExtension::new(runtime.clone(), ingestor, js_path.into()).unwrap();

runtime.spawn(ext.run());

runtime.block_on(async move {
let mut count = 0;
loop {
let msg = iterator.next_timeout(Duration::from_secs(5)).await.unwrap();
count += 1;
println!("i: {:?}, msg: {:?}", count, msg);
if count > 3 {
break;
}
}
});
}
Loading

0 comments on commit 6db6917

Please sign in to comment.