Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: deno connector #1816

Merged
merged 1 commit into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,396 changes: 3,052 additions & 344 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,14 @@ impl<'a> PipelineBuilder<'a> {
// For not breaking current functionality, current format is to be still supported.
pub async fn get_grouped_tables(
&self,
runtime: &Arc<Runtime>,
original_sources: &[String],
) -> Result<HashMap<Connection, Vec<Source>>, OrchestrationError> {
let mut grouped_connections: HashMap<Connection, Vec<Source>> = HashMap::new();

let mut connector_map = HashMap::new();
for connection in self.connections {
let connector = get_connector(connection.clone())
let connector = get_connector(runtime.clone(), connection.clone())
.map_err(|e| ConnectorSourceFactoryError::Connector(e.into()))?;

if let Some(info_table) = get_connector_info_table(connection) {
Expand Down Expand Up @@ -205,7 +206,7 @@ impl<'a> PipelineBuilder<'a> {

debug!("Used Sources: {:?}", calculated_sources.original_sources);
let grouped_connections = self
.get_grouped_tables(&calculated_sources.original_sources)
.get_grouped_tables(runtime, &calculated_sources.original_sources)
.await?;

let mut pipelines: Vec<AppPipeline> = vec![];
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl ConnectorSourceFactory {
) -> Result<Self, ConnectorSourceFactoryError> {
let connection_name = connection.name.clone();

let connector = get_connector(connection)
let connector = get_connector(runtime.clone(), connection)
.map_err(|e| ConnectorSourceFactoryError::Connector(e.into()))?;

// Fill column names if not provided.
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/tests/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn load_multi_sources() {
.unwrap();
let runtime = Arc::new(runtime);
let grouped_connections = runtime
.block_on(builder.get_grouped_tables(&used_sources))
.block_on(builder.get_grouped_tables(&runtime, &used_sources))
.unwrap();

let source_builder = SourceBuilder::new(grouped_connections, Default::default());
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl SimpleOrchestrator {
) -> Result<HashMap<String, (Vec<TableInfo>, Vec<SourceSchema>)>, OrchestrationError> {
let mut schema_map = HashMap::new();
for connection in &self.config.connections {
let connector = get_connector(connection.clone())
let connector = get_connector(self.runtime.clone(), connection.clone())
.map_err(|e| ConnectorSourceFactoryError::Connector(e.into()))?;
let schema_tuples = connector
.list_all_schemas()
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dozer-ingestion-deltalake = { path = "./deltalake" }
dozer-ingestion-dozer = { path = "./dozer" }
dozer-ingestion-ethereum = { path = "./ethereum", optional = true }
dozer-ingestion-grpc = { path = "./grpc" }
dozer-ingestion-javascript = { path = "./javascript" }
dozer-ingestion-kafka = { path = "./kafka", optional = true }
dozer-ingestion-mongodb = { path = "./mongodb", optional = true }
dozer-ingestion-mysql = { path = "./mysql" }
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/benches/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub fn get_progress() -> ProgressBar {
}

pub fn get_connection_iterator(runtime: Arc<Runtime>, config: TestConfig) -> IngestionIterator {
let connector = dozer_ingestion::get_connector(config.connection).unwrap();
let connector = dozer_ingestion::get_connector(runtime.clone(), config.connection).unwrap();
let tables = runtime.block_on(list_tables(&*connector));
let (ingestor, iterator) = Ingestor::initialize_channel(Default::default());
runtime.clone().spawn_blocking(move || async move {
Expand Down
14 changes: 14 additions & 0 deletions dozer-ingestion/javascript/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "dozer-ingestion-javascript"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
deno_ast = "0.29.5"
deno_runtime = "0.129.0"
dozer-ingestion-connector = { path = "../connector" }

[dev-dependencies]
camino = "1.1.6"
18 changes: 18 additions & 0 deletions dozer-ingestion/javascript/src/js_extension/ingest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
(async () => {
const url = 'https://api.github.com/repos/getdozer/dozer/commits';
const response = await fetch(url);

const commits = await response.json();

const snapshot_msg = { typ: "SnapshottingDone", old_val: null, new_val: null };
await Deno[Deno.internal].core.ops.ingest(snapshot_msg);

for (const commit of commits) {
const msg = {
typ: "Insert",
old_val: null,
new_val: { commit: commit.sha },
};
await Deno[Deno.internal].core.ops.ingest(msg);
}
})();
164 changes: 164 additions & 0 deletions dozer-ingestion/javascript/src/js_extension/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
use std::{future::Future, sync::Arc};

use deno_runtime::{
deno_core::{self, anyhow::Error, extension, op2, ModuleSpecifier},
permissions::PermissionsContainer,
worker::{MainWorker, WorkerOptions},
};
use dozer_ingestion_connector::{
dozer_types::{
errors::{internal::BoxedError, types::DeserializationError},
json_types::serde_json_to_json_value,
models::ingestion_types::IngestionMessage,
serde::{Deserialize, Serialize},
serde_json,
thiserror::{self, Error},
types::{Field, Operation, Record},
},
tokio::{runtime::Runtime, task::LocalSet},
Ingestor,
};

#[derive(Deserialize, Serialize, Debug)]
#[serde(crate = "dozer_ingestion_connector::dozer_types::serde")]
pub enum MsgType {
SnapshottingStarted,
SnapshottingDone,
Insert,
Delete,
Update,
}

#[derive(Deserialize, Serialize, Debug)]
#[serde(crate = "dozer_ingestion_connector::dozer_types::serde")]
pub struct JsMessage {
typ: MsgType,
old_val: serde_json::Value,
new_val: serde_json::Value,
}

#[op2(async)]
fn ingest(
#[state] ingestor: &Ingestor,
#[serde] val: JsMessage,
) -> impl Future<Output = Result<(), Error>> {
send(ingestor.clone(), val)
}

extension!(
dozer_extension,
ops = [ingest],
options = { ingestor: Ingestor },
state = |state, options| {
state.put(options.ingestor);
},
);

pub struct JsExtension {
runtime: Arc<Runtime>,
ingestor: Ingestor,
module_specifier: ModuleSpecifier,
}

#[derive(Debug, Error)]
pub enum JsExtensionError {
#[error("Failed to canonicalize path {0}: {1}")]
CanonicalizePath(String, #[source] std::io::Error),
}

impl JsExtension {
pub fn new(
runtime: Arc<Runtime>,
ingestor: Ingestor,
js_path: String,
) -> Result<Self, JsExtensionError> {
let path = std::fs::canonicalize(js_path.clone())
.map_err(|e| JsExtensionError::CanonicalizePath(js_path, e))?;
let module_specifier =
ModuleSpecifier::from_file_path(path).expect("we just canonicalized it");
Ok(Self {
runtime,
ingestor,
module_specifier,
})
}

pub async fn run(self) -> Result<(), BoxedError> {
let runtime = self.runtime.clone();
Jesse-Bakker marked this conversation as resolved.
Show resolved Hide resolved
runtime
.spawn_blocking(move || {
let local_set = LocalSet::new();
local_set.block_on(&self.runtime, async move {
let mut worker = MainWorker::bootstrap_from_options(
self.module_specifier.clone(),
PermissionsContainer::allow_all(),
WorkerOptions {
module_loader: std::rc::Rc::new(
ts_module_loader::TypescriptModuleLoader::with_no_source_map(),
),
extensions: vec![dozer_extension::init_ops(self.ingestor)],
..Default::default()
},
);

worker.execute_main_module(&self.module_specifier).await?;
worker.run_event_loop(false).await
})
})
.await
.unwrap() // Propagate panics.
.map_err(Into::into)
}
}

async fn send(ingestor: Ingestor, val: JsMessage) -> Result<(), Error> {
let msg = match val.typ {
MsgType::SnapshottingStarted => IngestionMessage::SnapshottingStarted,
MsgType::SnapshottingDone => IngestionMessage::SnapshottingDone,
MsgType::Insert | MsgType::Delete | MsgType::Update => {
let op = map_operation(val)?;
IngestionMessage::OperationEvent {
table_index: 0,
op,
id: None,
}
}
};

// Ignore if the receiver is closed.
let _ = ingestor.handle_message(msg).await;
Ok(())
}

fn map_operation(msg: JsMessage) -> Result<Operation, DeserializationError> {
Ok(match msg.typ {
MsgType::Insert => Operation::Insert {
new: Record {
values: vec![Field::Json(serde_json_to_json_value(msg.new_val)?)],
lifetime: None,
},
},
MsgType::Delete => Operation::Delete {
old: Record {
values: vec![Field::Json(serde_json_to_json_value(msg.old_val)?)],
lifetime: None,
},
},
MsgType::Update => Operation::Update {
old: Record {
values: vec![Field::Json(serde_json_to_json_value(msg.old_val)?)],
lifetime: None,
},
new: Record {
values: vec![Field::Json(serde_json_to_json_value(msg.new_val)?)],
lifetime: None,
},
},
_ => unreachable!(),
})
}

mod ts_module_loader;

#[cfg(test)]
mod tests;
30 changes: 30 additions & 0 deletions dozer-ingestion/javascript/src/js_extension/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::time::Duration;

use camino::Utf8Path;
use dozer_ingestion_connector::{test_util::create_test_runtime, IngestionConfig, Ingestor};

use super::JsExtension;

#[test]
fn test_deno() {
let js_path = Utf8Path::new(env!("CARGO_MANIFEST_DIR")).join("./src/js_extension/ingest.js");

let (ingestor, mut iterator) = Ingestor::initialize_channel(IngestionConfig::default());

let runtime = create_test_runtime();
let ext = JsExtension::new(runtime.clone(), ingestor, js_path.into()).unwrap();

runtime.spawn(ext.run());

runtime.block_on(async move {
let mut count = 0;
loop {
let msg = iterator.next_timeout(Duration::from_secs(5)).await.unwrap();
count += 1;
println!("i: {:?}, msg: {:?}", count, msg);
if count > 3 {
break;
}
}
});
}
Loading
Loading