Skip to content

Commit

Permalink
feat: implement deno connector
Browse files Browse the repository at this point in the history
  • Loading branch information
v3g42 committed Aug 15, 2023
1 parent e13b6ed commit 59f0102
Show file tree
Hide file tree
Showing 16 changed files with 3,256 additions and 565 deletions.
3,154 changes: 2,848 additions & 306 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions dozer-ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ mongodb = { version = "2.6.0", optional = true }

# Javascript connector
deno_core = "0.197.0"
deno_runtime = "0.121.0"
deno_ast = "0.27.3"


base64 = "0.21.0"
include_dir = { version = "0.7.3", optional = true }
Expand Down
58 changes: 24 additions & 34 deletions dozer-ingestion/src/connectors/javascript/connector.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,24 @@
use deno_core::{v8, JsRuntime};
use dozer_types::{
serde::Serialize,
ingestion_types::JavascriptConfig,
types::{FieldDefinition, Schema},
};
use tonic::async_trait;

use crate::{
connectors::{
CdcType, Connector, ListOrFilterColumns, SourceSchema, SourceSchemaResult, TableIdentifier,
TableInfo,
CdcType, Connector, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo,
},
errors::ConnectorError,
errors::{ConnectorError, JavascriptError},
ingestion::Ingestor,
};

#[derive(Clone, Debug)]
pub struct JavascriptConfig {
pub contents: String,
}
use super::js_extension::JsExtension;

#[derive(Debug)]
pub struct JavascriptConnector {
name: String,
config: JavascriptConfig,
}

impl JavascriptConnector {}

#[async_trait]
impl Connector for JavascriptConnector {
// We will return one field, named "value", of type Json to
Expand All @@ -48,7 +40,7 @@ impl Connector for JavascriptConnector {
async fn list_tables(&self) -> Result<Vec<TableIdentifier>, ConnectorError> {
Ok(vec![TableIdentifier {
schema: None,
name: "json_reconds".to_string(),
name: "json_records".to_string(),
}])
}

Expand All @@ -62,7 +54,7 @@ impl Connector for JavascriptConnector {
) -> Result<Vec<TableInfo>, ConnectorError> {
Ok(vec![TableInfo {
schema: None,
name: "json_reconds".to_string(),
name: "json_records".to_string(),
column_names: vec!["value".to_string()],
}])
}
Expand All @@ -88,30 +80,28 @@ impl Connector for JavascriptConnector {
async fn start(
&self,
ingestor: &Ingestor,
tables: Vec<TableInfo>,
_tables: Vec<TableInfo>,
) -> Result<(), ConnectorError> {
todo!()
let js_path = self.config.bootstrap_path.clone();
let ingestor = ingestor.clone();
let res = std::thread::spawn(move || -> Result<(), ConnectorError> {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let ext = JsExtension::new(ingestor, js_path);
ext.run().await
})
});
match res.join() {
Ok(res) => res,
Err(e) => Err(ConnectorError::JavascriptError(
JavascriptError::ErrorString(format!("{:?}", e)),
)),
}
}
}

impl JavascriptConnector {
pub fn new(name: String, config: JavascriptConfig) -> Self {
Self { name, config }
}

async fn run_js(js_code: &str) -> Result<(), ConnectorError> {
let mut isolate = JsRuntime::new(Default::default());

let mut js_runtime = deno_core::JsRuntime::new(deno_core::RuntimeOptions {
module_loader: Some(Rc::new(deno_core::FsModuleLoader)),
..Default::default()
});

isolate
.execute_script("js_connector", deno_core::FastString::Static(js_code))
.unwrap();

isolate.poll_value(global, cx);
Ok(())
pub fn new(config: JavascriptConfig) -> Self {
Self { config }
}
}
24 changes: 24 additions & 0 deletions dozer-ingestion/src/connectors/javascript/js_extension/ingest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@


(async () => {
let url = 'https://api.github.com/repos/getdozer/dozer/commits';
let response = await fetch(url);

let commits = await response.json();

let snapshop_msg = { typ: "SnapshottingDone", seq_no: 0, old_val: null, new_val: null };
Deno[Deno.internal].core.ops.ingest(snapshop_msg);

commits.forEach((commit, i) => {
let msg = {
typ: "Insert",
seq_no: i + 1,
new_val: {
commit: commit.sha,
},
old_val: null,
};

Deno[Deno.internal].core.ops.ingest(msg);
});
})();
150 changes: 150 additions & 0 deletions dozer-ingestion/src/connectors/javascript/js_extension/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
mod ts_module_loader;
use crate::connectors::javascript::js_extension::ts_module_loader::TypescriptModuleLoader;
use crate::errors::ConnectorError;

use crate::ingestion::Ingestor;
use deno_core::anyhow::Error;
use deno_core::op;
use deno_core::serde;
use deno_core::serde::{Deserialize, Serialize};
use deno_core::Extension;
use deno_core::Op;
use deno_core::OpState;
use deno_core::*;
use deno_runtime::permissions::PermissionsContainer;
use deno_runtime::worker::MainWorker;
use deno_runtime::worker::WorkerOptions;
use dozer_types::ingestion_types::IngestionMessage;
use dozer_types::json_types::serde_json_to_json_value;
use dozer_types::types::Field;
use dozer_types::types::Operation;
use dozer_types::types::Record;
use futures::channel::mpsc;
use futures::stream::StreamExt;

#[derive(Deserialize, Serialize, Debug)]
#[serde(crate = "self::serde")]
pub enum MsgType {
SnapshottingStarted,
SnapshottingDone,
Insert,
Delete,
Update,
}

#[derive(Deserialize, Serialize, Debug)]
#[serde(crate = "self::serde")]
pub struct JsMessage {
typ: MsgType,
seq_no: u64,
old_val: serde_json::Value,
new_val: serde_json::Value,
}

#[op]
fn ingest(state: &mut OpState, val: serde_json::Value) -> Result<(), Error> {
let tx = state.borrow_mut::<mpsc::UnboundedSender<JsMessage>>();

let val: JsMessage = serde_json::from_value(val)?;
tx.unbounded_send(val).unwrap();

Ok(())
}

pub struct JsExtension {
ingestor: Ingestor,
js_path: String,
}

impl JsExtension {
pub fn new(ingestor: Ingestor, js_path: String) -> Self {
Self { ingestor, js_path }
}

pub async fn run(&self) -> Result<(), ConnectorError> {
let (tx, rx) = mpsc::unbounded::<JsMessage>();

let ingestor = self.ingestor.clone();
let handle = tokio::spawn(async move {
let mut rx = rx;
while let Some(msg) = rx.next().await {
send(ingestor.clone(), msg).unwrap();
}
});

let dozer_extension: Extension = Extension::builder("dozer_extension")
.ops(vec![ingest::DECL])
.state(|state| {
state.put(tx);
})
.build();

let path = std::fs::canonicalize(self.js_path.clone()).unwrap();
let main_module = ModuleSpecifier::from_file_path(path).unwrap();
let mut worker = MainWorker::bootstrap_from_options(
main_module.clone(),
PermissionsContainer::allow_all(),
WorkerOptions {
// module_loader: std::rc::Rc::new(FsModuleLoader),
module_loader: std::rc::Rc::new(TypescriptModuleLoader),
extensions: vec![dozer_extension],
..Default::default()
},
);
worker.execute_main_module(&main_module).await.unwrap();
let fut = worker.run_event_loop(false);

tokio::select! {
r = fut => r.unwrap(),
r = handle => r.unwrap(),
};
Ok(())
}
}

fn send(ingestor: Ingestor, msg: JsMessage) -> Result<(), ConnectorError> {
let msg = match msg.typ {
MsgType::SnapshottingStarted => IngestionMessage::new_snapshotting_started(0, msg.seq_no),
MsgType::SnapshottingDone => IngestionMessage::new_snapshotting_done(0, msg.seq_no),
MsgType::Insert | MsgType::Delete | MsgType::Update => {
let seq_no = msg.seq_no;
let op = map_operation(msg);
IngestionMessage::new_op(0, seq_no, 0, op)
}
};

ingestor
.handle_message(msg)
.map_err(ConnectorError::IngestorError)
}

fn map_operation(msg: JsMessage) -> Operation {
match msg.typ {
MsgType::Insert => Operation::Insert {
new: Record {
values: vec![Field::Json(serde_json_to_json_value(msg.new_val).unwrap())],
lifetime: None,
},
},
MsgType::Delete => Operation::Delete {
old: Record {
values: vec![Field::Json(serde_json_to_json_value(msg.old_val).unwrap())],
lifetime: None,
},
},
MsgType::Update => Operation::Update {
old: Record {
values: vec![Field::Json(serde_json_to_json_value(msg.old_val).unwrap())],
lifetime: None,
},
new: Record {
values: vec![Field::Json(serde_json_to_json_value(msg.new_val).unwrap())],
lifetime: None,
},
},
_ => unreachable!(),
}
}

#[cfg(test)]
mod tests;
32 changes: 32 additions & 0 deletions dozer-ingestion/src/connectors/javascript/js_extension/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use crate::ingestion::{IngestionConfig, Ingestor};
use std::{panic, path::Path};

use super::JsExtension;

#[tokio::test]
async fn test_deno() {
panic::set_hook(Box::new(move |panic_info| {
println!("{}", panic_info);
std::process::exit(1);
}));

let js_path = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("./src/connectors/javascript/js_extension/ingest.js");

let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default());

std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let ext = JsExtension::new(ingestor, js_path.to_str().unwrap().to_string());
ext.run().await.unwrap();
});
});

for (i, msg) in iterator.enumerate() {
if i > 3 {
std::process::exit(0);
}
println!("i: {:?}, msg: {:?}", i, msg);
}
}
Loading

0 comments on commit 59f0102

Please sign in to comment.