From 53fe6f2b294d3d40323d44a702e045cf38822732 Mon Sep 17 00:00:00 2001 From: Bei Chu <914745487@qq.com> Date: Fri, 10 Nov 2023 11:38:33 +0800 Subject: [PATCH] feat: Implement `dozer run lambda` (#2202) * feat: Implement `dozer run lambda` * feat: Use module's default export as lambda function * refactor: Stop using `deno_runtime::MainWorker` --- Cargo.lock | 17 +- Cargo.toml | 1 + dozer-api/src/cache_builder/endpoint_meta.rs | 14 +- dozer-api/src/cache_builder/mod.rs | 5 +- dozer-cli/Cargo.toml | 1 + dozer-cli/src/cli/types.rs | 5 + dozer-cli/src/errors.rs | 2 + dozer-cli/src/main.rs | 6 +- dozer-cli/src/simple/orchestrator.rs | 57 +++- dozer-deno/Cargo.toml | 2 + dozer-deno/js/06_util.js | 94 ++++++ dozer-deno/js/98_global_scope.js | 285 +++++++++++++++++++ dozer-deno/js/99_main.js | 167 +++++++++++ dozer-deno/js/README.md | 7 + dozer-deno/src/lib.rs | 3 + dozer-deno/src/runtime/js_runtime.rs | 118 ++++++++ dozer-deno/src/runtime/mod.rs | 267 +++++++++++++++++ dozer-ingestion/dozer/src/connector.rs | 12 +- dozer-lambda/Cargo.toml | 14 + dozer-lambda/src/js/mod.rs | 72 +++++ dozer-lambda/src/js/test_lambda.js | 1 + dozer-lambda/src/js/tests.rs | 207 ++++++++++++++ dozer-lambda/src/js/trigger/mod.rs | 92 ++++++ dozer-lambda/src/js/worker/mod.rs | 65 +++++ dozer-lambda/src/lib.rs | 3 + dozer-log-js/src/lib.rs | 4 +- dozer-log-python/src/lib.rs | 4 +- dozer-log/examples/reader.rs | 11 +- dozer-log/src/reader.rs | 48 ++-- dozer-types/src/models/config.rs | 8 +- dozer-types/src/models/lambda_config.rs | 15 + dozer-types/src/models/mod.rs | 1 + json_schemas/dozer.json | 39 +++ 33 files changed, 1578 insertions(+), 69 deletions(-) create mode 100644 dozer-deno/js/06_util.js create mode 100644 dozer-deno/js/98_global_scope.js create mode 100644 dozer-deno/js/99_main.js create mode 100644 dozer-deno/js/README.md create mode 100644 dozer-deno/src/runtime/js_runtime.rs create mode 100644 dozer-deno/src/runtime/mod.rs create mode 100644 dozer-lambda/Cargo.toml create mode 100644 dozer-lambda/src/js/mod.rs create mode 100644 dozer-lambda/src/js/test_lambda.js create mode 100644 dozer-lambda/src/js/tests.rs create mode 100644 dozer-lambda/src/js/trigger/mod.rs create mode 100644 dozer-lambda/src/js/worker/mod.rs create mode 100644 dozer-lambda/src/lib.rs create mode 100644 dozer-types/src/models/lambda_config.rs diff --git a/Cargo.lock b/Cargo.lock index c9c32175bc..65ca87980a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3638,6 +3638,7 @@ dependencies = [ "dozer-cache", "dozer-core", "dozer-ingestion", + "dozer-lambda", "dozer-recordstore", "dozer-sql", "dozer-storage", @@ -3689,6 +3690,8 @@ version = "0.3.0" dependencies = [ "deno_ast", "deno_runtime", + "dozer-types", + "tokio", ] [[package]] @@ -3851,6 +3854,16 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "dozer-lambda" +version = "0.1.0" +dependencies = [ + "dozer-deno", + "dozer-log", + "dozer-types", + "env_logger", +] + [[package]] name = "dozer-log" version = "0.3.0" @@ -10741,9 +10754,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" dependencies = [ "backtrace", "bytes", diff --git a/Cargo.toml b/Cargo.toml index b826dbdc5c..d49bbf7ee3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ members = [ "dozer-log-python", "dozer-utils", "dozer-recordstore", + "dozer-lambda", ] resolver = "2" diff --git a/dozer-api/src/cache_builder/endpoint_meta.rs b/dozer-api/src/cache_builder/endpoint_meta.rs index 2722c6759b..b976af94e3 100644 --- a/dozer-api/src/cache_builder/endpoint_meta.rs +++ b/dozer-api/src/cache_builder/endpoint_meta.rs @@ -1,10 +1,7 @@ use dozer_cache::dozer_log::{reader::LogClient, schemas::EndpointSchema}; use dozer_tracing::Labels; use dozer_types::{ - grpc_types::internal::{ - internal_pipeline_service_client::InternalPipelineServiceClient, BuildRequest, - }, - serde_json, + grpc_types::internal::internal_pipeline_service_client::InternalPipelineServiceClient, tonic::transport::Channel, }; @@ -23,15 +20,8 @@ impl EndpointMeta { endpoint: String, ) -> Result<(Self, LogClient), ApiInitError> { // We establish the log stream first to avoid tonic auto-reconnecting without us knowing. - let log_client = LogClient::new(client, endpoint.clone()).await?; + let (log_client, schema) = LogClient::new(client, endpoint.clone()).await?; let log_id = client.get_id(()).await?.into_inner().id; - let build = client - .describe_build(BuildRequest { - endpoint: endpoint.clone(), - }) - .await? - .into_inner(); - let schema = serde_json::from_str(&build.schema_string)?; Ok(( Self { diff --git a/dozer-api/src/cache_builder/mod.rs b/dozer-api/src/cache_builder/mod.rs index 63eb3ece8e..4d2087b2d7 100644 --- a/dozer-api/src/cache_builder/mod.rs +++ b/dozer-api/src/cache_builder/mod.rs @@ -33,6 +33,7 @@ const READ_LOG_RETRY_INTERVAL: Duration = Duration::from_secs(1); #[derive(Debug)] pub struct CacheBuilder { client: InternalPipelineServiceClient, + endpoint: String, cache_manager: Arc, serving: Arc>, labels: Labels, @@ -80,6 +81,7 @@ impl CacheBuilder { Ok(( Self { client, + endpoint: endpoint.name.clone(), cache_manager, serving: Arc::new(ArcSwap::from_pointee(serving)), labels: labels.labels().clone(), @@ -104,7 +106,7 @@ impl CacheBuilder { loop { // Connect to the endpoint's log. let Some(connect_result) = runtime.block_on(with_cancel( - connect_until_success(&mut self.client, &self.log_reader_options.endpoint), + connect_until_success(&mut self.client, &self.endpoint), cancel, )) else { return Ok(()); @@ -280,7 +282,6 @@ fn check_cache_schema(cache: &dyn RoCache, given: SchemaWithIndex) -> Result<(), fn get_log_reader_options(endpoint: &ApiEndpoint) -> LogReaderOptions { LogReaderOptions { - endpoint: endpoint.name.clone(), batch_size: endpoint .log_reader_options .batch_size diff --git a/dozer-cli/Cargo.toml b/dozer-cli/Cargo.toml index fd8a8b7525..c43d1520ba 100644 --- a/dozer-cli/Cargo.toml +++ b/dozer-cli/Cargo.toml @@ -19,6 +19,7 @@ dozer-types = { path = "../dozer-types" } dozer-tracing = { path = "../dozer-tracing" } dozer-storage = { path = "../dozer-storage" } dozer-recordstore = { path = "../dozer-recordstore" } +dozer-lambda = { path = "../dozer-lambda" } uuid = { version = "1.3.0", features = ["v4", "serde"] } tokio = { version = "1", features = ["full"] } diff --git a/dozer-cli/src/cli/types.rs b/dozer-cli/src/cli/types.rs index 8e43458457..ace98927ce 100644 --- a/dozer-cli/src/cli/types.rs +++ b/dozer-cli/src/cli/types.rs @@ -116,6 +116,11 @@ pub enum RunCommands { API endpoints through REST and GRPC (depends on configuration)" )] Api, + #[command( + about = "Run lambda functions", + long_about = "Run lambda functions. Lambda functions are JavaScript or Python functions that are called when a new operation is output." + )] + Lambda, } #[derive(Debug, Args)] diff --git a/dozer-cli/src/errors.rs b/dozer-cli/src/errors.rs index 3e0bfbee46..7a96e733a4 100644 --- a/dozer-cli/src/errors.rs +++ b/dozer-cli/src/errors.rs @@ -103,6 +103,8 @@ pub enum OrchestrationError { LockedNoLockFile, #[error("Command was aborted")] Aborted, + #[error("Failed to create lambda runtime: {0}")] + CreateLambdaRuntime(#[from] dozer_lambda::Error), } #[derive(Error, Debug)] diff --git a/dozer-cli/src/main.rs b/dozer-cli/src/main.rs index 0c67943d86..36c8995a0a 100644 --- a/dozer-cli/src/main.rs +++ b/dozer-cli/src/main.rs @@ -172,14 +172,16 @@ fn run() -> Result<(), OrchestrationError> { Commands::Run(run) => match run.command { Some(RunCommands::Api) => { render_logo(); - dozer.run_api(shutdown_receiver) } Some(RunCommands::App) => { render_logo(); - dozer.run_apps(shutdown_receiver, None) } + Some(RunCommands::Lambda) => { + render_logo(); + dozer.run_lambda(shutdown_receiver) + } None => { render_logo(); dozer.run_all(shutdown_receiver, run.locked) diff --git a/dozer-cli/src/simple/orchestrator.rs b/dozer-cli/src/simple/orchestrator.rs index 2bdd2c5c4a..8ae0ee608a 100644 --- a/dozer-cli/src/simple/orchestrator.rs +++ b/dozer-cli/src/simple/orchestrator.rs @@ -22,8 +22,11 @@ use dozer_core::app::AppPipeline; use dozer_core::dag_schemas::DagSchemas; use dozer_tracing::LabelsAndProgress; use dozer_types::constants::LOCK_FILE; -use dozer_types::models::api_config::{default_app_grpc_host, default_app_grpc_port}; +use dozer_types::models::api_config::{ + default_app_grpc_host, default_app_grpc_port, AppGrpcOptions, +}; use dozer_types::models::flags::{default_dynamic, default_push_events}; +use dozer_types::models::lambda_config::LambdaConfig; use tokio::select; use crate::console_helper::get_colored_text; @@ -94,17 +97,7 @@ impl SimpleOrchestrator { (None, None) }; - let internal_grpc_config = &self.config.api.app_grpc; - let app_server_url = format!( - "http://{}:{}", - internal_grpc_config - .host - .clone() - .unwrap_or_else(default_app_grpc_host), - internal_grpc_config - .port - .unwrap_or_else(default_app_grpc_port) - ); + let app_server_url = app_url(&self.config.api.app_grpc); let cache_manager = Arc::new( LmdbRwCacheManager::new(get_cache_manager_options(&self.config)) .map_err(OrchestrationError::CacheInitFailed)?, @@ -283,6 +276,38 @@ impl SimpleOrchestrator { }) } + pub fn run_lambda(&mut self, shutdown: ShutdownReceiver) -> Result<(), OrchestrationError> { + let runtime = self.runtime.clone(); + let result = self.run_lambda_impl(); + let shutdown = shutdown.create_shutdown_future(); + runtime.block_on(async move { + select! { + () = shutdown => Ok(()), + result = result => result, + } + }) + } + + async fn run_lambda_impl(&mut self) -> Result<(), OrchestrationError> { + let lambda_modules = self + .config + .lambdas + .iter() + .map(|lambda| match lambda { + LambdaConfig::JavaScript(module) => module.clone(), + }) + .collect(); + let runtime = dozer_lambda::JsRuntime::new( + self.runtime.clone(), + app_url(&self.config.api.app_grpc), + lambda_modules, + Default::default(), + ) + .await?; + runtime.run().await; + Ok(()) + } + #[allow(clippy::type_complexity)] pub async fn list_connectors( &self, @@ -479,3 +504,11 @@ pub fn validate_sql(sql: String) -> Result<(), PipelineError> { pub fn lockfile_path(base_directory: Utf8PathBuf) -> Utf8PathBuf { base_directory.join(LOCK_FILE) } + +fn app_url(config: &AppGrpcOptions) -> String { + format!( + "http://{}:{}", + config.host.clone().unwrap_or_else(default_app_grpc_host), + config.port.unwrap_or_else(default_app_grpc_port) + ) +} diff --git a/dozer-deno/Cargo.toml b/dozer-deno/Cargo.toml index 387db7a648..1a27e8e240 100644 --- a/dozer-deno/Cargo.toml +++ b/dozer-deno/Cargo.toml @@ -6,5 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +dozer-types = { path = "../dozer-types" } deno_runtime = "0.129.0" deno_ast = "0.29.5" +tokio = "1.33.0" diff --git a/dozer-deno/js/06_util.js b/dozer-deno/js/06_util.js new file mode 100644 index 0000000000..6118d16805 --- /dev/null +++ b/dozer-deno/js/06_util.js @@ -0,0 +1,94 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +const primordials = globalThis.__bootstrap.primordials; +const { + Promise, + SafeArrayIterator, +} = primordials; + +// WARNING: Keep this in sync with Rust (search for LogLevel) +const LogLevel = { + Error: 1, + Warn: 2, + Info: 3, + Debug: 4, +}; + +let logLevel = 3; +let logSource = "JS"; + +function setLogLevel(level, source) { + logLevel = level; + if (source) { + logSource = source; + } +} + +function log(...args) { + if (logLevel >= LogLevel.Debug) { + // if we destructure `console` off `globalThis` too early, we don't bind to + // the right console, therefore we don't log anything out. + globalThis.console.error( + `DEBUG ${logSource} -`, + ...new SafeArrayIterator(args), + ); + } +} + +function createResolvable() { + let resolve; + let reject; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + promise.resolve = resolve; + promise.reject = reject; + return promise; +} + +function writable(value) { + return { + value, + writable: true, + enumerable: true, + configurable: true, + }; +} + +function nonEnumerable(value) { + return { + value, + writable: true, + enumerable: false, + configurable: true, + }; +} + +function readOnly(value) { + return { + value, + enumerable: true, + writable: false, + configurable: true, + }; +} + +function getterOnly(getter) { + return { + get: getter, + set() { }, + enumerable: true, + configurable: true, + }; +} + +export { + createResolvable, + getterOnly, + log, + nonEnumerable, + readOnly, + setLogLevel, + writable, +}; diff --git a/dozer-deno/js/98_global_scope.js b/dozer-deno/js/98_global_scope.js new file mode 100644 index 0000000000..1747f4b51f --- /dev/null +++ b/dozer-deno/js/98_global_scope.js @@ -0,0 +1,285 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +const core = globalThis.Deno.core; +const primordials = globalThis.__bootstrap.primordials; +const { + ObjectDefineProperties, + SymbolFor, +} = primordials; + +import * as util from "ext:runtime/06_util.js"; +import * as location from "ext:deno_web/12_location.js"; +import * as event from "ext:deno_web/02_event.js"; +import * as timers from "ext:deno_web/02_timers.js"; +import * as base64 from "ext:deno_web/05_base64.js"; +import * as encoding from "ext:deno_web/08_text_encoding.js"; +import * as console from "ext:deno_console/01_console.js"; +import * as caches from "ext:deno_cache/01_cache.js"; +import * as compression from "ext:deno_web/14_compression.js"; +import * as performance from "ext:deno_web/15_performance.js"; +import * as crypto from "ext:deno_crypto/00_crypto.js"; +import * as url from "ext:deno_url/00_url.js"; +import * as urlPattern from "ext:deno_url/01_urlpattern.js"; +import * as headers from "ext:deno_fetch/20_headers.js"; +import * as streams from "ext:deno_web/06_streams.js"; +import * as fileReader from "ext:deno_web/10_filereader.js"; +import * as webSocket from "ext:deno_websocket/01_websocket.js"; +import * as webSocketStream from "ext:deno_websocket/02_websocketstream.js"; +import * as broadcastChannel from "ext:deno_broadcast_channel/01_broadcast_channel.js"; +import * as file from "ext:deno_web/09_file.js"; +import * as formData from "ext:deno_fetch/21_formdata.js"; +import * as request from "ext:deno_fetch/23_request.js"; +import * as response from "ext:deno_fetch/23_response.js"; +import * as fetch from "ext:deno_fetch/26_fetch.js"; +import * as messagePort from "ext:deno_web/13_message_port.js"; +import * as webidl from "ext:deno_webidl/00_webidl.js"; +import DOMException from "ext:deno_web/01_dom_exception.js"; +import * as abortSignal from "ext:deno_web/03_abort_signal.js"; +import * as globalInterfaces from "ext:deno_web/04_global_interfaces.js"; +import * as webStorage from "ext:deno_webstorage/01_webstorage.js"; + +// https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope +const windowOrWorkerGlobalScope = { + AbortController: util.nonEnumerable(abortSignal.AbortController), + AbortSignal: util.nonEnumerable(abortSignal.AbortSignal), + Blob: util.nonEnumerable(file.Blob), + ByteLengthQueuingStrategy: util.nonEnumerable( + streams.ByteLengthQueuingStrategy, + ), + CloseEvent: util.nonEnumerable(event.CloseEvent), + CompressionStream: util.nonEnumerable(compression.CompressionStream), + CountQueuingStrategy: util.nonEnumerable( + streams.CountQueuingStrategy, + ), + CryptoKey: util.nonEnumerable(crypto.CryptoKey), + CustomEvent: util.nonEnumerable(event.CustomEvent), + DecompressionStream: util.nonEnumerable(compression.DecompressionStream), + DOMException: util.nonEnumerable(DOMException), + ErrorEvent: util.nonEnumerable(event.ErrorEvent), + Event: util.nonEnumerable(event.Event), + EventTarget: util.nonEnumerable(event.EventTarget), + File: util.nonEnumerable(file.File), + FileReader: util.nonEnumerable(fileReader.FileReader), + FormData: util.nonEnumerable(formData.FormData), + Headers: util.nonEnumerable(headers.Headers), + MessageEvent: util.nonEnumerable(event.MessageEvent), + Performance: util.nonEnumerable(performance.Performance), + PerformanceEntry: util.nonEnumerable(performance.PerformanceEntry), + PerformanceMark: util.nonEnumerable(performance.PerformanceMark), + PerformanceMeasure: util.nonEnumerable(performance.PerformanceMeasure), + PromiseRejectionEvent: util.nonEnumerable(event.PromiseRejectionEvent), + ProgressEvent: util.nonEnumerable(event.ProgressEvent), + ReadableStream: util.nonEnumerable(streams.ReadableStream), + ReadableStreamDefaultReader: util.nonEnumerable( + streams.ReadableStreamDefaultReader, + ), + Request: util.nonEnumerable(request.Request), + Response: util.nonEnumerable(response.Response), + TextDecoder: util.nonEnumerable(encoding.TextDecoder), + TextEncoder: util.nonEnumerable(encoding.TextEncoder), + TextDecoderStream: util.nonEnumerable(encoding.TextDecoderStream), + TextEncoderStream: util.nonEnumerable(encoding.TextEncoderStream), + TransformStream: util.nonEnumerable(streams.TransformStream), + URL: util.nonEnumerable(url.URL), + URLPattern: util.nonEnumerable(urlPattern.URLPattern), + URLSearchParams: util.nonEnumerable(url.URLSearchParams), + WebSocket: util.nonEnumerable(webSocket.WebSocket), + MessageChannel: util.nonEnumerable(messagePort.MessageChannel), + MessagePort: util.nonEnumerable(messagePort.MessagePort), + WritableStream: util.nonEnumerable(streams.WritableStream), + WritableStreamDefaultWriter: util.nonEnumerable( + streams.WritableStreamDefaultWriter, + ), + WritableStreamDefaultController: util.nonEnumerable( + streams.WritableStreamDefaultController, + ), + ReadableByteStreamController: util.nonEnumerable( + streams.ReadableByteStreamController, + ), + ReadableStreamBYOBReader: util.nonEnumerable( + streams.ReadableStreamBYOBReader, + ), + ReadableStreamBYOBRequest: util.nonEnumerable( + streams.ReadableStreamBYOBRequest, + ), + ReadableStreamDefaultController: util.nonEnumerable( + streams.ReadableStreamDefaultController, + ), + TransformStreamDefaultController: util.nonEnumerable( + streams.TransformStreamDefaultController, + ), + atob: util.writable(base64.atob), + btoa: util.writable(base64.btoa), + clearInterval: util.writable(timers.clearInterval), + clearTimeout: util.writable(timers.clearTimeout), + caches: { + enumerable: true, + configurable: true, + get: caches.cacheStorage, + }, + CacheStorage: util.nonEnumerable(caches.CacheStorage), + Cache: util.nonEnumerable(caches.Cache), + console: util.nonEnumerable( + new console.Console((msg, level) => core.print(msg, level > 1)), + ), + crypto: util.readOnly(crypto.crypto), + Crypto: util.nonEnumerable(crypto.Crypto), + SubtleCrypto: util.nonEnumerable(crypto.SubtleCrypto), + fetch: util.writable(fetch.fetch), + performance: util.writable(performance.performance), + reportError: util.writable(event.reportError), + setInterval: util.writable(timers.setInterval), + setTimeout: util.writable(timers.setTimeout), + structuredClone: util.writable(messagePort.structuredClone), + // Branding as a WebIDL object + [webidl.brand]: util.nonEnumerable(webidl.brand), +}; + +const unstableWindowOrWorkerGlobalScope = { + BroadcastChannel: util.nonEnumerable(broadcastChannel.BroadcastChannel), + WebSocketStream: util.nonEnumerable(webSocketStream.WebSocketStream), +}; + +class Navigator { + constructor() { + webidl.illegalConstructor(); + } + + [SymbolFor("Deno.privateCustomInspect")](inspect) { + return `${this.constructor.name} ${inspect({})}`; + } +} + +const navigator = webidl.createBranded(Navigator); + +let numCpus, userAgent, language; + +function setNumCpus(val) { + numCpus = val; +} + +function setUserAgent(val) { + userAgent = val; +} + +function setLanguage(val) { + language = val; +} + +ObjectDefineProperties(Navigator.prototype, { + hardwareConcurrency: { + configurable: true, + enumerable: true, + get() { + webidl.assertBranded(this, NavigatorPrototype); + return numCpus; + }, + }, + userAgent: { + configurable: true, + enumerable: true, + get() { + webidl.assertBranded(this, NavigatorPrototype); + return userAgent; + }, + }, + language: { + configurable: true, + enumerable: true, + get() { + webidl.assertBranded(this, NavigatorPrototype); + return language; + }, + }, + languages: { + configurable: true, + enumerable: true, + get() { + webidl.assertBranded(this, NavigatorPrototype); + return [language]; + }, + }, +}); +const NavigatorPrototype = Navigator.prototype; + +class WorkerNavigator { + constructor() { + webidl.illegalConstructor(); + } + + [SymbolFor("Deno.privateCustomInspect")](inspect) { + return `${this.constructor.name} ${inspect({})}`; + } +} + +const workerNavigator = webidl.createBranded(WorkerNavigator); + +ObjectDefineProperties(WorkerNavigator.prototype, { + hardwareConcurrency: { + configurable: true, + enumerable: true, + get() { + webidl.assertBranded(this, WorkerNavigatorPrototype); + return numCpus; + }, + }, + userAgent: { + configurable: true, + enumerable: true, + get() { + webidl.assertBranded(this, WorkerNavigatorPrototype); + return userAgent; + }, + }, + language: { + configurable: true, + enumerable: true, + get() { + webidl.assertBranded(this, WorkerNavigatorPrototype); + return language; + }, + }, + languages: { + configurable: true, + enumerable: true, + get() { + webidl.assertBranded(this, WorkerNavigatorPrototype); + return [language]; + }, + }, +}); +const WorkerNavigatorPrototype = WorkerNavigator.prototype; + +const mainRuntimeGlobalProperties = { + Location: location.locationConstructorDescriptor, + location: location.locationDescriptor, + Window: globalInterfaces.windowConstructorDescriptor, + window: util.getterOnly(() => globalThis), + self: util.getterOnly(() => globalThis), + Navigator: util.nonEnumerable(Navigator), + navigator: util.getterOnly(() => navigator), + localStorage: util.getterOnly(webStorage.localStorage), + sessionStorage: util.getterOnly(webStorage.sessionStorage), + Storage: util.nonEnumerable(webStorage.Storage), +}; + +const workerRuntimeGlobalProperties = { + WorkerLocation: location.workerLocationConstructorDescriptor, + location: location.workerLocationDescriptor, + WorkerGlobalScope: globalInterfaces.workerGlobalScopeConstructorDescriptor, + DedicatedWorkerGlobalScope: + globalInterfaces.dedicatedWorkerGlobalScopeConstructorDescriptor, + WorkerNavigator: util.nonEnumerable(WorkerNavigator), + navigator: util.getterOnly(() => workerNavigator), + self: util.getterOnly(() => globalThis), +}; + +export { + mainRuntimeGlobalProperties, + setLanguage, + setNumCpus, + setUserAgent, + unstableWindowOrWorkerGlobalScope, + windowOrWorkerGlobalScope, + workerRuntimeGlobalProperties, +}; diff --git a/dozer-deno/js/99_main.js b/dozer-deno/js/99_main.js new file mode 100644 index 0000000000..25fdae80d2 --- /dev/null +++ b/dozer-deno/js/99_main.js @@ -0,0 +1,167 @@ +const core = globalThis.Deno.core; +const ops = core.ops; +const internals = globalThis.__bootstrap.internals; +const primordials = globalThis.__bootstrap.primordials; +const { + ArrayPrototypeIndexOf, + ArrayPrototypePush, + ArrayPrototypeShift, + ArrayPrototypeSplice, + DateNow, + ErrorPrototype, + ObjectDefineProperties, + ObjectPrototypeIsPrototypeOf, + ObjectSetPrototypeOf, + SafeWeakMap, + WeakMapPrototypeDelete, + WeakMapPrototypeGet, + WeakMapPrototypeSet, +} = primordials; +import * as event from "ext:deno_web/02_event.js"; +import * as timers from "ext:deno_web/02_timers.js"; +import { + getDefaultInspectOptions, + getNoColor, + inspectArgs, + quoteString, +} from "ext:deno_console/01_console.js"; +import * as performance from "ext:deno_web/15_performance.js"; +import * as fetch from "ext:deno_fetch/26_fetch.js"; +import { + mainRuntimeGlobalProperties, + windowOrWorkerGlobalScope, +} from "ext:runtime/98_global_scope.js"; + +let globalThis_; + +function formatException(error) { + if (ObjectPrototypeIsPrototypeOf(ErrorPrototype, error)) { + return null; + } else if (typeof error == "string") { + return `Uncaught ${inspectArgs([quoteString(error, getDefaultInspectOptions())], { + colors: !getNoColor(), + }) + }`; + } else { + return `Uncaught ${inspectArgs([error], { colors: !getNoColor() })}`; + } +} + +function runtimeStart() { + core.setMacrotaskCallback(timers.handleTimerMacrotask); + core.setMacrotaskCallback(promiseRejectMacrotaskCallback); + core.setWasmStreamingCallback(fetch.handleWasmStreaming); + core.setReportExceptionCallback(event.reportException); + ops.op_set_format_exception_callback(formatException); +} + +const pendingRejections = []; +const pendingRejectionsReasons = new SafeWeakMap(); + +function promiseRejectCallback(type, promise, reason) { + switch (type) { + case 0: { + ops.op_store_pending_promise_rejection(promise, reason); + ArrayPrototypePush(pendingRejections, promise); + WeakMapPrototypeSet(pendingRejectionsReasons, promise, reason); + break; + } + case 1: { + ops.op_remove_pending_promise_rejection(promise); + const index = ArrayPrototypeIndexOf(pendingRejections, promise); + if (index > -1) { + ArrayPrototypeSplice(pendingRejections, index, 1); + WeakMapPrototypeDelete(pendingRejectionsReasons, promise); + } + break; + } + default: + return false; + } + + return !!globalThis_.onunhandledrejection || + event.listenerCount(globalThis_, "unhandledrejection") > 0 || + typeof internals.nodeProcessUnhandledRejectionCallback !== "undefined"; +} + +function promiseRejectMacrotaskCallback() { + // We have no work to do, tell the runtime that we don't + // need to perform microtask checkpoint. + if (pendingRejections.length === 0) { + return undefined; + } + + while (pendingRejections.length > 0) { + const promise = ArrayPrototypeShift(pendingRejections); + const hasPendingException = ops.op_has_pending_promise_rejection( + promise, + ); + const reason = WeakMapPrototypeGet(pendingRejectionsReasons, promise); + WeakMapPrototypeDelete(pendingRejectionsReasons, promise); + + if (!hasPendingException) { + continue; + } + + const rejectionEvent = new event.PromiseRejectionEvent( + "unhandledrejection", + { + cancelable: true, + promise, + reason, + }, + ); + + const errorEventCb = (event) => { + if (event.error === reason) { + ops.op_remove_pending_promise_rejection(promise); + } + }; + // Add a callback for "error" event - it will be dispatched + // if error is thrown during dispatch of "unhandledrejection" + // event. + globalThis_.addEventListener("error", errorEventCb); + globalThis_.dispatchEvent(rejectionEvent); + globalThis_.removeEventListener("error", errorEventCb); + + // If event was not yet prevented, try handing it off to Node compat layer + // (if it was initialized) + if ( + !rejectionEvent.defaultPrevented && + typeof internals.nodeProcessUnhandledRejectionCallback !== "undefined" + ) { + internals.nodeProcessUnhandledRejectionCallback(rejectionEvent); + } + + // If event was not prevented (or "unhandledrejection" listeners didn't + // throw) we will let Rust side handle it. + if (rejectionEvent.defaultPrevented) { + ops.op_remove_pending_promise_rejection(promise); + } + } + return true; +} + +delete globalThis.console; + +ObjectDefineProperties(globalThis, windowOrWorkerGlobalScope); + +ObjectDefineProperties(globalThis, mainRuntimeGlobalProperties); + +performance.setTimeOrigin(DateNow()); +globalThis_ = globalThis; + +ObjectSetPrototypeOf(globalThis, Window.prototype); + +event.setEventTargetData(globalThis); +event.saveGlobalThisReference(globalThis); + +event.defineEventHandler(globalThis, "error"); +event.defineEventHandler(globalThis, "load"); +event.defineEventHandler(globalThis, "beforeunload"); +event.defineEventHandler(globalThis, "unload"); +event.defineEventHandler(globalThis, "unhandledrejection"); + +core.setPromiseRejectCallback(promiseRejectCallback); + +runtimeStart(); diff --git a/dozer-deno/js/README.md b/dozer-deno/js/README.md new file mode 100644 index 0000000000..4055e6e0f5 --- /dev/null +++ b/dozer-deno/js/README.md @@ -0,0 +1,7 @@ +# Bootstrap development notes + +Files in this directory are adapted from `deno_runtime/js/`. The best way to view the original files is to compile this project and jump to`deno_runtime` crate source. + +To expose additional APIs to the JavaScript runtime, `dozer-deno/src/runtime/js_runtime.rs` and these files should be updated together. + +When updating to a new `deno_runtime` version, these files should be updated accordingly. The functions in `dozer-deno/src/runtime/js_runtime.rs` should also be revised to keep in sync with original code in `deno_runtime`. diff --git a/dozer-deno/src/lib.rs b/dozer-deno/src/lib.rs index e617303055..f9c5211221 100644 --- a/dozer-deno/src/lib.rs +++ b/dozer-deno/src/lib.rs @@ -2,3 +2,6 @@ pub use deno_runtime; mod ts_module_loader; pub use ts_module_loader::TypescriptModuleLoader; + +mod runtime; +pub use runtime::{Error as RuntimeError, Runtime}; diff --git a/dozer-deno/src/runtime/js_runtime.rs b/dozer-deno/src/runtime/js_runtime.rs new file mode 100644 index 0000000000..4486f7192b --- /dev/null +++ b/dozer-deno/src/runtime/js_runtime.rs @@ -0,0 +1,118 @@ +use std::rc::Rc; + +use deno_runtime::{ + deno_broadcast_channel::{deno_broadcast_channel, InMemoryBroadcastChannel}, + deno_cache::{deno_cache, SqliteBackedCache}, + deno_console::deno_console, + deno_core::{error::AnyError, extension, JsRuntime, ModuleId, RuntimeOptions}, + deno_crypto::deno_crypto, + deno_fetch::deno_fetch, + deno_napi::deno_napi, + deno_tls::deno_tls, + deno_url::deno_url, + deno_web::deno_web, + deno_webidl::deno_webidl, + deno_websocket::deno_websocket, + deno_webstorage::deno_webstorage, + permissions::PermissionsContainer, + BootstrapOptions, +}; +use tokio::select; + +use crate::TypescriptModuleLoader; + +extension!( + dozer_permissions_worker, + options = { + permissions: PermissionsContainer, + }, + state = |state, options| { + state.put(options.permissions) + } +); + +extension!( + runtime, + deps = [ + deno_webidl, + deno_console, + deno_url, + deno_tls, + deno_web, + deno_fetch, + deno_cache, + deno_websocket, + deno_webstorage, + deno_crypto, + deno_broadcast_channel, + deno_napi + ], + esm_entry_point = "ext:runtime/99_main.js", + esm = [ + dir "js", + "06_util.js", + "98_global_scope.js", + "99_main.js", + ], +); + +/// This is `MainWorker::from_options` with selected list of extensions. +pub fn new() -> JsRuntime { + let extensions = { + let user_agent = { + let version: String = env!("CARGO_PKG_VERSION").into(); + format!( + "Dozer/{} {}", + version, + BootstrapOptions::default().user_agent + ) + }; + vec![ + deno_webidl::init_ops_and_esm(), + deno_console::init_ops_and_esm(), + deno_url::init_ops_and_esm(), + deno_web::init_ops_and_esm::( + Default::default(), + Default::default(), + ), + deno_fetch::init_ops_and_esm::(Default::default()), + deno_cache::init_ops_and_esm::(Default::default()), + deno_websocket::init_ops_and_esm::( + user_agent, + Default::default(), + Default::default(), + ), + deno_webstorage::init_ops_and_esm(Default::default()), + deno_crypto::init_ops_and_esm(Default::default()), + deno_broadcast_channel::init_ops_and_esm::(Default::default()), + deno_tls::init_ops_and_esm(), + deno_napi::init_ops_and_esm::(), + dozer_permissions_worker::init_ops_and_esm(PermissionsContainer::allow_all()), + runtime::init_ops_and_esm(), + ] + }; + + JsRuntime::new(RuntimeOptions { + module_loader: Some(Rc::new(TypescriptModuleLoader::with_no_source_map())), + extensions, + ..Default::default() + }) +} + +/// `MainWorker::evaluate_module`. +pub async fn evaluate_module(runtime: &mut JsRuntime, id: ModuleId) -> Result<(), AnyError> { + let mut receiver = runtime.mod_evaluate(id); + select! { + biased; + + maybe_result = &mut receiver => { + maybe_result.expect("Module evaluation result not provided.") + } + + event_loop_result = runtime.run_event_loop(false) => { + event_loop_result?; + let maybe_result = receiver.await; + maybe_result.expect("Module evaluation result not provided.") + } + } +} diff --git a/dozer-deno/src/runtime/mod.rs b/dozer-deno/src/runtime/mod.rs new file mode 100644 index 0000000000..1ee9c210ea --- /dev/null +++ b/dozer-deno/src/runtime/mod.rs @@ -0,0 +1,267 @@ +//! `JsRuntime` is `!Send + !Sync`, make it difficult to use. +//! Here we implement a `Runtime` struct that runs `JsRuntime` in a dedicated thread. +//! By sending work to the worker thread, `Runtime` is `Send + Sync`. + +use std::{ + collections::HashMap, + fs::canonicalize, + future::poll_fn, + num::NonZeroI32, + ops::ControlFlow, + sync::Arc, + task::{Context, Poll}, +}; + +use deno_runtime::{ + deno_core::{ + anyhow::Context as _, + error::AnyError, + serde_v8::{from_v8, to_v8}, + JsRuntime, ModuleSpecifier, + }, + deno_napi::v8::{self, undefined, Function, Global, Local}, +}; +use dozer_types::{ + log::{error, info}, + serde_json::Value, + thiserror, +}; +use tokio::{ + sync::{ + mpsc::{ + self, + error::{TryRecvError, TrySendError}, + }, + oneshot, + }, + task::{JoinHandle, LocalSet}, +}; + +#[derive(Debug)] +pub struct Runtime { + work_sender: mpsc::Sender, + return_receiver: mpsc::Receiver, + handle: JoinHandle<()>, +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed to canonicalize path {0}: {1}")] + CanonicalizePath(String, #[source] std::io::Error), + #[error("failed to load module {0}: {1}")] + LoadModule(String, #[source] AnyError), + #[error("failed to evaluate module {0}: {1}")] + EvaluateModule(String, #[source] AnyError), + #[error("failed to get namespace of module {0}: {1}")] + GetModuleNamespace(String, #[source] AnyError), + #[error("module {0} has no default export")] + ModuleNoDefaultExport(String), + #[error("module {0} default export is not a function: {1}")] + ModuleDefaultExportNotFunction(String, #[source] v8::DataError), +} + +impl Runtime { + /// Returns `Runtime` and the ids of the exported functions. + pub async fn new( + tokio_runtime: Arc, + modules: Vec, + ) -> Result<(Self, Vec), Error> { + let (init_sender, init_receiver) = oneshot::channel(); + let (work_sender, work_receiver) = mpsc::channel(1); + let (return_sender, return_receiver) = mpsc::channel(1); + let handle = tokio_runtime.clone().spawn_blocking(move || { + let mut js_runtime = js_runtime::new(); + let local_set = LocalSet::new(); + let functions = match local_set + .block_on(&tokio_runtime, load_functions(&mut js_runtime, modules)) + { + Ok(functions) => { + if init_sender + .send(Ok(functions.iter().map(|(id, _)| *id).collect::>())) + .is_err() + { + return; + } + functions + } + Err(e) => { + let _ = init_sender.send(Err(e)); + return; + } + }; + let functions = functions.into_iter().collect(); + local_set.block_on( + &tokio_runtime, + worker_loop(js_runtime, work_receiver, return_sender, functions), + ); + }); + + let functions = match init_receiver.await { + Ok(Ok(functions)) => functions, + Ok(Err(e)) => return Err(e), + Err(_) => { + // Propagate the panic. + handle.await.unwrap(); + unreachable!("we should have panicked"); + } + }; + + Ok(( + Self { + work_sender, + return_receiver, + handle, + }, + functions, + )) + } + + pub async fn call_function(mut self, id: NonZeroI32, args: Vec) -> (Self, Value) { + if self + .work_sender + .send(Work::CallFunction { id, args }) + .await + .is_err() + { + // Propagate the panic. + self.handle.await.unwrap(); + unreachable!("we should have panicked"); + } + let Some(result) = self.return_receiver.recv().await else { + // Propagate the panic. + self.handle.await.unwrap(); + unreachable!("we should have panicked"); + }; + let Return::CallFunction(result) = result; + (self, result) + } +} + +async fn load_functions( + runtime: &mut JsRuntime, + modules: Vec, +) -> Result)>, Error> { + let mut result = vec![]; + for module in modules { + let path = canonicalize(&module).map_err(|e| Error::CanonicalizePath(module.clone(), e))?; + let module_specifier = + ModuleSpecifier::from_file_path(path).expect("we just canonicalized it"); + info!("loading module {}", module_specifier); + let module_id = runtime + .load_side_module(&module_specifier, None) + .await + .map_err(|e| Error::LoadModule(module.clone(), e))?; + js_runtime::evaluate_module(runtime, module_id) + .await + .map_err(|e| Error::EvaluateModule(module.clone(), e))?; + let namespace = runtime + .get_module_namespace(module_id) + .map_err(|e| Error::GetModuleNamespace(module.clone(), e))?; + let scope = &mut runtime.handle_scope(); + let namespace = v8::Local::new(scope, namespace); + let default_key = v8::String::new_external_onebyte_static(scope, b"default") + .unwrap() + .into(); + let default_export = namespace + .get(scope, default_key) + .ok_or_else(|| Error::ModuleNoDefaultExport(module.clone()))?; + let function: Local = default_export + .try_into() + .map_err(|e| Error::ModuleDefaultExportNotFunction(module.clone(), e))?; + let id = function.get_identity_hash(); + result.push((id, Global::new(scope, function))); + } + Ok(result) +} + +#[derive(Debug)] +enum Work { + CallFunction { id: NonZeroI32, args: Vec }, +} + +#[derive(Debug)] +enum Return { + CallFunction(Value), +} + +async fn worker_loop( + mut runtime: JsRuntime, + mut work_receiver: mpsc::Receiver, + return_sender: mpsc::Sender, + functions: HashMap>, +) { + loop { + match poll_fn(|cx| { + poll_work_and_event_loop( + &mut runtime, + &mut work_receiver, + &return_sender, + &functions, + cx, + ) + }) + .await + { + ControlFlow::Continue(Ok(())) => {} + ControlFlow::Continue(Err(e)) => { + error!("JavaScript runtime error: {}", e); + } + ControlFlow::Break(()) => { + break; + } + } + } +} + +fn poll_work_and_event_loop( + runtime: &mut JsRuntime, + work_receiver: &mut mpsc::Receiver, + return_sender: &mpsc::Sender, + functions: &HashMap>, + cx: &mut Context, +) -> Poll>> { + match work_receiver.try_recv() { + Ok(work) => { + return match do_work(runtime, work, functions) { + Ok(value) => match return_sender.try_send(Return::CallFunction(value)) { + Ok(()) => Poll::Ready(ControlFlow::Continue(Ok(()))), + Err(TrySendError::Full(_)) => unreachable!("work can only be sent serially"), + Err(TrySendError::Closed(_)) => Poll::Ready(ControlFlow::Break(())), + }, + Err(e) => Poll::Ready(ControlFlow::Continue(Err(e))), + } + } + Err(TryRecvError::Empty) => (), + Err(TryRecvError::Disconnected) => return Poll::Ready(ControlFlow::Break(())), + } + + runtime + .poll_event_loop(cx, false) + .map(ControlFlow::Continue) +} + +fn do_work( + runtime: &mut JsRuntime, + work: Work, + functions: &HashMap>, +) -> Result { + match work { + Work::CallFunction { id, args } => { + let function = functions + .get(&id) + .context(format!("function {} not found", id))?; + let scope = &mut runtime.handle_scope(); + let recv = undefined(scope); + let args = args + .into_iter() + .map(|arg| to_v8(scope, arg)) + .collect::, _>>()?; + let result = Local::new(scope, function).call(scope, recv.into(), &args); + result + .map(|value| from_v8(scope, value).map_err(Into::into)) + .unwrap_or(Ok(Value::Null)) + } + } +} + +mod js_runtime; diff --git a/dozer-ingestion/dozer/src/connector.rs b/dozer-ingestion/dozer/src/connector.rs index 56bed4bd1d..7a4678c68a 100644 --- a/dozer-ingestion/dozer/src/connector.rs +++ b/dozer-ingestion/dozer/src/connector.rs @@ -181,9 +181,8 @@ impl NestedDozerConnector { Ok(response.into_inner()) } - fn get_log_options(endpoint: String, value: NestedDozerLogOptions) -> LogReaderOptions { + fn get_log_options(value: NestedDozerLogOptions) -> LogReaderOptions { LogReaderOptions { - endpoint, batch_size: value.batch_size.unwrap_or_else(default_log_batch_size), timeout_in_millis: value.timeout_in_millis.unwrap_or_else(default_timeout), buffer_size: value.buffer_size.unwrap_or_else(default_buffer_size), @@ -194,10 +193,11 @@ impl NestedDozerConnector { &self, endpoint: String, ) -> Result { - let log_options = Self::get_log_options(endpoint, self.config.log_options.clone()); - let log_reader_builder = LogReaderBuilder::new(self.config.url.clone(), log_options) - .await - .map_err(NestedDozerConnectorError::ReaderBuilderError)?; + let log_options = Self::get_log_options(self.config.log_options.clone()); + let log_reader_builder = + LogReaderBuilder::new(self.config.url.clone(), endpoint, log_options) + .await + .map_err(NestedDozerConnectorError::ReaderBuilderError)?; Ok(log_reader_builder) } } diff --git a/dozer-lambda/Cargo.toml b/dozer-lambda/Cargo.toml new file mode 100644 index 0000000000..3aed05117e --- /dev/null +++ b/dozer-lambda/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "dozer-lambda" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dozer-types = { path = "../dozer-types" } +dozer-log = { path = "../dozer-log" } +dozer-deno = { path = "../dozer-deno" } + +[dev-dependencies] +env_logger = "0.10.0" diff --git a/dozer-lambda/src/js/mod.rs b/dozer-lambda/src/js/mod.rs new file mode 100644 index 0000000000..f54ee5de6b --- /dev/null +++ b/dozer-lambda/src/js/mod.rs @@ -0,0 +1,72 @@ +use std::sync::Arc; + +use dozer_log::{ + errors::ReaderBuilderError, + reader::LogReaderOptions, + tokio::{self, sync::Mutex}, +}; +use dozer_types::{ + grpc_types::internal::internal_pipeline_service_client::InternalPipelineServiceClient, + models::lambda_config::JavaScriptLambda, thiserror, tonic, +}; + +use self::{trigger::Trigger, worker::Worker}; + +#[derive(Debug)] +pub struct Runtime { + trigger: Trigger, + worker: Arc>, +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed to connect to app server at {0}: {1}")] + Connect(String, #[source] tonic::transport::Error), + #[error("failed to create worker: {0}")] + CreateWorker(#[from] dozer_deno::RuntimeError), + #[error("failed to connect to log endpoint {0}: {1}")] + ConnectLog(String, #[source] ReaderBuilderError), +} + +impl Runtime { + pub async fn new( + runtime: Arc, + app_url: String, + lambda_modules: Vec, + options: LogReaderOptions, + ) -> Result { + // Create worker. + let modules = lambda_modules + .iter() + .map(|module| module.module.clone()) + .collect(); + let (worker, lambdas) = Worker::new(runtime, modules).await?; + + // Create trigger. + let client = InternalPipelineServiceClient::connect(app_url.clone()) + .await + .map_err(|e| Error::Connect(app_url, e))?; + let mut trigger = Trigger::new(client, options); + + // Add lambdas to trigger. + for (module, lambda) in lambda_modules.into_iter().zip(lambdas) { + trigger + .add_lambda(module.endpoint.clone(), lambda) + .await + .map_err(|e| Error::ConnectLog(module.endpoint, e))?; + } + Ok(Self { + trigger, + worker: Arc::new(Mutex::new(worker)), + }) + } + + pub async fn run(mut self) { + self.trigger.run(&self.worker).await; + } +} + +#[cfg(test)] +mod tests; +mod trigger; +mod worker; diff --git a/dozer-lambda/src/js/test_lambda.js b/dozer-lambda/src/js/test_lambda.js new file mode 100644 index 0000000000..ca847d5e04 --- /dev/null +++ b/dozer-lambda/src/js/test_lambda.js @@ -0,0 +1 @@ +export default console.log; diff --git a/dozer-lambda/src/js/tests.rs b/dozer-lambda/src/js/tests.rs new file mode 100644 index 0000000000..c03d339154 --- /dev/null +++ b/dozer-lambda/src/js/tests.rs @@ -0,0 +1,207 @@ +use std::{pin::pin, time::Duration}; + +use dozer_deno::deno_runtime::deno_core::futures::future::{join, select, Either}; + +use super::*; + +#[test] +fn test_lambda_runtime() { + // env_logger::init(); + let tokio_runtime = Arc::new( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(), + ); + tokio_runtime.block_on(test_lambda_runtime_impl(tokio_runtime.clone())); +} + +async fn test_lambda_runtime_impl(tokio_runtime: Arc) { + let (app_url, app_server) = mock::start_mock_internal_pipeline_server().await; + let lambda_modules = vec![JavaScriptLambda { + endpoint: mock::mock_endpoint(), + module: "src/js/test_lambda.js".to_string(), + }]; + let lambda_runtime = Runtime::new(tokio_runtime, app_url, lambda_modules, Default::default()); + let (lambda_runtime, app_server) = match select(pin!(lambda_runtime), app_server).await { + Either::Left((lambda_runtime, app_server)) => (lambda_runtime.unwrap(), app_server), + Either::Right((app_server, _)) => { + panic!("unexpected app server error: {:?}", app_server); + } + }; + tokio::time::timeout( + Duration::from_millis(10), + join(lambda_runtime.run(), app_server), + ) + .await + .unwrap_err(); +} + +mod mock { + use std::{ + future::Future, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + }; + + use dozer_deno::deno_runtime::deno_core::futures::{stream::BoxStream, FutureExt, StreamExt}; + use dozer_log::{ + replication::{self, LogOperation}, + schemas::EndpointSchema, + tokio::net::TcpListener, + }; + use dozer_types::{ + bincode, + grpc_types::internal::{ + internal_pipeline_service_server::{ + InternalPipelineService, InternalPipelineServiceServer, + }, + storage_response::Storage, + BuildRequest, BuildResponse, DescribeApplicationResponse, GetIdResponse, LocalStorage, + LogRequest, LogResponse, StorageRequest, StorageResponse, + }, + serde_json, + tonic::{ + self, async_trait, + transport::{server::TcpIncoming, Server}, + Request, Response, Status, Streaming, + }, + types::{Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition}, + }; + + pub async fn start_mock_internal_pipeline_server() -> ( + String, + impl Future> + Unpin, + ) { + let listener = TcpListener::bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0))) + .await + .unwrap(); + let url = format!("http://{}", listener.local_addr().unwrap()); + let incoming = TcpIncoming::from_listener(listener, true, None).unwrap(); + ( + url, + Server::builder() + .add_service(InternalPipelineServiceServer::new( + MockInternalPipelineServer, + )) + .serve_with_incoming(incoming) + .boxed(), + ) + } + + struct MockInternalPipelineServer; + + pub fn mock_endpoint() -> String { + "mock".to_string() + } + + fn mock_build_response() -> BuildResponse { + let schema = EndpointSchema { + path: "mock".to_string(), + schema: Schema { + fields: vec![FieldDefinition { + name: "mock_field".to_string(), + typ: FieldType::UInt, + nullable: false, + source: SourceDefinition::Dynamic, + }], + primary_index: vec![0], + }, + secondary_indexes: vec![], + enable_token: false, + enable_on_event: true, + connections: Default::default(), + }; + BuildResponse { + schema_string: serde_json::to_string(&schema).unwrap(), + } + } + + fn err_not_found(endpoint: &str) -> Result { + Err(Status::not_found(format!( + "Endpoint {} not found", + endpoint + ))) + } + + #[async_trait] + impl InternalPipelineService for MockInternalPipelineServer { + async fn get_id(&self, _: Request<()>) -> Result, Status> { + Ok(Response::new(GetIdResponse { + id: "mock".to_string(), + })) + } + + async fn describe_storage( + &self, + request: Request, + ) -> Result, Status> { + let endpoint = request.into_inner().endpoint; + if endpoint == mock_endpoint() { + Ok(Response::new(StorageResponse { + storage: Some(Storage::Local(LocalStorage { + root: "mock".to_string(), + })), + })) + } else { + err_not_found(&endpoint) + } + } + + async fn describe_application( + &self, + _: Request<()>, + ) -> Result, Status> { + Ok(Response::new(DescribeApplicationResponse { + endpoints: [(mock_endpoint(), mock_build_response())] + .into_iter() + .collect(), + })) + } + + async fn describe_build( + &self, + request: Request, + ) -> Result, Status> { + let endpoint = request.into_inner().endpoint; + if endpoint == mock_endpoint() { + Ok(Response::new(mock_build_response())) + } else { + err_not_found(&endpoint) + } + } + + type GetLogStream = BoxStream<'static, Result>; + + async fn get_log( + &self, + requests: Request>, + ) -> Result, Status> { + let response = requests.into_inner().map(|request| { + request.and_then(|request| { + if request.endpoint == mock_endpoint() { + let operations = (request.start..request.end) + .map(|index| LogOperation::Op { + op: Operation::Insert { + new: Record { + values: vec![Field::UInt(index)], + lifetime: None, + }, + }, + }) + .collect(); + Ok(LogResponse { + data: bincode::encode_to_vec( + &replication::LogResponse::Operations(operations), + bincode::config::legacy(), + ) + .unwrap(), + }) + } else { + err_not_found(&request.endpoint) + } + }) + }); + Ok(Response::new(response.boxed())) + } + } +} diff --git a/dozer-lambda/src/js/trigger/mod.rs b/dozer-lambda/src/js/trigger/mod.rs new file mode 100644 index 0000000000..7da916c504 --- /dev/null +++ b/dozer-lambda/src/js/trigger/mod.rs @@ -0,0 +1,92 @@ +use std::{num::NonZeroI32, sync::Arc, time::Duration}; + +use dozer_deno::deno_runtime::deno_core::futures::future::join_all; +use dozer_log::{ + errors::{ReaderBuilderError, ReaderError}, + reader::{LogClient, LogReader, LogReaderOptions}, + replication::LogOperation, + tokio::{self, sync::Mutex}, +}; +use dozer_types::{ + grpc_types::internal::internal_pipeline_service_client::InternalPipelineServiceClient, + log::{error, trace}, + tonic::transport::Channel, +}; + +use super::worker::Worker; + +#[derive(Debug)] +pub struct Trigger { + client: InternalPipelineServiceClient, + options: LogReaderOptions, + lambdas: Vec<(LogReader, NonZeroI32)>, +} + +impl Trigger { + pub fn new(client: InternalPipelineServiceClient, options: LogReaderOptions) -> Self { + Self { + client, + options, + lambdas: vec![], + } + } + + pub async fn add_lambda( + &mut self, + endpoint: String, + lambda: NonZeroI32, + ) -> Result<(), ReaderBuilderError> { + let (client, schema) = LogClient::new(&mut self.client, endpoint).await?; + let reader = LogReader::new(schema, client, self.options.clone(), 0); + self.lambdas.push((reader, lambda)); + Ok(()) + } + + pub async fn run(&mut self, worker: &Arc>) { + let lambdas = std::mem::take(&mut self.lambdas); + let handles = lambdas + .into_iter() + .map(|(reader, lambda)| trigger_loop(reader, worker.clone(), lambda)) + .collect::>(); + join_all(handles).await; + } +} + +async fn trigger_loop(mut log_reader: LogReader, worker: Arc>, func: NonZeroI32) { + loop { + if let Err(e) = trigger_once(&mut log_reader, &worker, func).await { + const RETRY_INTERVAL: Duration = Duration::from_secs(5); + error!("error reading log: {}. Retrying in {:?}", e, RETRY_INTERVAL); + tokio::time::sleep(RETRY_INTERVAL).await; + } + } +} + +async fn trigger_once( + log_reader: &mut LogReader, + worker: &Mutex, + func: NonZeroI32, +) -> Result<(), ReaderError> { + let op_and_pos = log_reader.read_one().await?; + if let LogOperation::Op { op } = op_and_pos.op { + let field_names = log_reader + .schema + .schema + .fields + .iter() + .map(|field| field.name.clone()) + .collect::>(); + trace!( + "triggering lambda {} with op position {} from endpoint {}", + func, + op_and_pos.pos, + log_reader.schema.path + ); + worker + .lock() + .await + .call_lambda(func, op_and_pos.pos, op, field_names) + .await + } + Ok(()) +} diff --git a/dozer-lambda/src/js/worker/mod.rs b/dozer-lambda/src/js/worker/mod.rs new file mode 100644 index 0000000000..4301b3017b --- /dev/null +++ b/dozer-lambda/src/js/worker/mod.rs @@ -0,0 +1,65 @@ +use std::{num::NonZeroI32, sync::Arc}; + +use dozer_log::tokio::runtime::Runtime; +use dozer_types::{ + json_types::field_to_json_value, + serde_json::{json, Value}, + types::{Field, Operation}, +}; + +#[derive(Debug)] +pub struct Worker { + /// Always `Some`. + runtime: Option, +} + +impl Worker { + pub async fn new( + runtime: Arc, + modules: Vec, + ) -> Result<(Self, Vec), dozer_deno::RuntimeError> { + let (runtime, lambdas) = dozer_deno::Runtime::new(runtime, modules).await?; + Ok(( + Self { + runtime: Some(runtime), + }, + lambdas, + )) + } + + pub async fn call_lambda( + &mut self, + func: NonZeroI32, + operation_index: u64, + operation: Operation, + field_names: Vec, + ) { + let (operation_type, new_values, old_values) = match operation { + Operation::Insert { new } => ("insert", new.values, None), + Operation::Update { new, old } => ("update", new.values, Some(old.values)), + Operation::Delete { old } => ("delete", old.values, None), + }; + let arg = json!({ + "type": operation_type, + "index": operation_index, + "new": create_record_json_value(field_names.clone(), new_values), + "old": old_values.map(|old_values| create_record_json_value(field_names, old_values)), + }); + self.runtime = Some( + self.runtime + .take() + .unwrap() + .call_function(func, vec![arg]) + .await + .0, + ); + } +} + +fn create_record_json_value(field_names: Vec, values: Vec) -> Value { + let mut record = Value::Object(Default::default()); + for (field_name, value) in field_names.into_iter().zip(values.into_iter()) { + record[field_name] = field_to_json_value(value); + } + record +} diff --git a/dozer-lambda/src/lib.rs b/dozer-lambda/src/lib.rs new file mode 100644 index 0000000000..aa5463e26b --- /dev/null +++ b/dozer-lambda/src/lib.rs @@ -0,0 +1,3 @@ +mod js; + +pub use js::{Error, Runtime as JsRuntime}; diff --git a/dozer-log-js/src/lib.rs b/dozer-log-js/src/lib.rs index 8738f19ff9..c84d35a742 100644 --- a/dozer-log-js/src/lib.rs +++ b/dozer-log-js/src/lib.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use dozer_log::{ - reader::{LogReader as RustLogReader, LogReaderBuilder, LogReaderOptions}, + reader::{LogReader as RustLogReader, LogReaderBuilder}, tokio::{runtime::Runtime as TokioRuntime, sync::Mutex}, }; use neon::prelude::*; @@ -64,7 +64,7 @@ fn runtime_create_reader(mut cx: FunctionContext) -> JsResult { runtime.runtime.spawn(async move { // Create the builder. let reader_builder = - LogReaderBuilder::new(server_addr, LogReaderOptions::new(endpoint_name)).await; + LogReaderBuilder::new(server_addr, endpoint_name, Default::default()).await; match reader_builder { Ok(reader) => { // Create the reader and resolve the promise. diff --git a/dozer-log-python/src/lib.rs b/dozer-log-python/src/lib.rs index 68292fedbe..a243dade83 100644 --- a/dozer-log-python/src/lib.rs +++ b/dozer-log-python/src/lib.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use dozer_log::{ - reader::{LogReader as DozerLogReader, LogReaderBuilder, LogReaderOptions}, + reader::{LogReader as DozerLogReader, LogReaderBuilder}, tokio::sync::Mutex, }; use dozer_types::pyo3::{exceptions::PyException, prelude::*}; @@ -20,7 +20,7 @@ impl LogReader { fn new(py: Python, server_addr: String, endpoint_name: String) -> PyResult<&PyAny> { pyo3_asyncio::tokio::future_into_py(py, async move { let reader_result = - LogReaderBuilder::new(server_addr, LogReaderOptions::new(endpoint_name)).await; + LogReaderBuilder::new(server_addr, endpoint_name, Default::default()).await; let reader = reader_result .map_err(|e| PyException::new_err(e.to_string()))? .build(0); diff --git a/dozer-log/examples/reader.rs b/dozer-log/examples/reader.rs index 707286245c..83d151d855 100644 --- a/dozer-log/examples/reader.rs +++ b/dozer-log/examples/reader.rs @@ -1,5 +1,5 @@ use clap::Parser; -use dozer_log::reader::{LogReaderBuilder, LogReaderOptions}; +use dozer_log::reader::LogReaderBuilder; #[derive(Parser)] struct Cli { @@ -11,11 +11,10 @@ struct Cli { async fn main() { let cli = Cli::parse(); - let mut log_reader = - LogReaderBuilder::new(cli.server_addr, LogReaderOptions::new(cli.endpoint)) - .await - .unwrap() - .build(0); + let mut log_reader = LogReaderBuilder::new(cli.server_addr, cli.endpoint, Default::default()) + .await + .unwrap() + .build(0); let mut counter = 0; loop { diff --git a/dozer-log/src/reader.rs b/dozer-log/src/reader.rs index 7c8a0d54b7..9d40fa0fc5 100644 --- a/dozer-log/src/reader.rs +++ b/dozer-log/src/reader.rs @@ -24,16 +24,14 @@ use tokio_stream::StreamExt; #[derive(Debug, Clone)] pub struct LogReaderOptions { - pub endpoint: String, pub batch_size: u32, pub timeout_in_millis: u32, pub buffer_size: u32, } -impl LogReaderOptions { - pub fn new(endpoint: String) -> Self { +impl Default for LogReaderOptions { + fn default() -> Self { Self { - endpoint, batch_size: default_log_reader_batch_size(), timeout_in_millis: default_log_reader_timeout_in_millis(), buffer_size: default_log_reader_buffer_size(), @@ -60,18 +58,11 @@ pub struct LogReader { impl LogReaderBuilder { pub async fn new( server_addr: String, + endpoint: String, options: LogReaderOptions, ) -> Result { let mut client = InternalPipelineServiceClient::connect(server_addr).await?; - let build = client - .describe_build(BuildRequest { - endpoint: options.endpoint.clone(), - }) - .await? - .into_inner(); - let schema = serde_json::from_str(&build.schema_string)?; - - let client = LogClient::new(&mut client, options.endpoint.clone()).await?; + let (client, schema) = LogClient::new(&mut client, endpoint).await?; Ok(Self { schema, @@ -135,6 +126,7 @@ impl LogReader { pub struct LogClient { request_sender: Sender, response_stream: Streaming, + endpoint: String, storage: Box, } @@ -142,11 +134,21 @@ impl LogClient { pub async fn new( client: &mut InternalPipelineServiceClient, endpoint: String, - ) -> Result { + ) -> Result<(Self, EndpointSchema), ReaderBuilderError> { + let build = client + .describe_build(BuildRequest { + endpoint: endpoint.clone(), + }) + .await? + .into_inner(); + let schema = serde_json::from_str(&build.schema_string)?; + let (request_sender, response_stream) = create_get_log_stream(client).await?; let storage = client - .describe_storage(StorageRequest { endpoint }) + .describe_storage(StorageRequest { + endpoint: endpoint.clone(), + }) .await? .into_inner(); let storage: Box = match storage.storage.expect("Must not be None") { @@ -158,11 +160,15 @@ impl LogClient { } }; - Ok(Self { - request_sender, - response_stream, - storage, - }) + Ok(( + Self { + request_sender, + response_stream, + endpoint, + storage, + }, + schema, + )) } async fn get_log(&mut self, request: LogRequest) -> Result, ReaderError> { @@ -255,7 +261,7 @@ async fn log_reader_worker_loop( loop { // Request ops. let request = LogRequest { - endpoint: options.endpoint.clone(), + endpoint: log_client.endpoint.clone(), start: pos, end: pos + options.batch_size as u64, timeout_in_millis: options.timeout_in_millis, diff --git a/dozer-types/src/models/config.rs b/dozer-types/src/models/config.rs index ef7a088ab4..02fc86a091 100644 --- a/dozer-types/src/models/config.rs +++ b/dozer-types/src/models/config.rs @@ -2,8 +2,8 @@ use std::path::Path; use super::{ api_config::ApiConfig, api_endpoint::ApiEndpoint, app_config::AppConfig, cloud::Cloud, - connection::Connection, equal_default, flags::Flags, source::Source, - telemetry::TelemetryConfig, + connection::Connection, equal_default, flags::Flags, lambda_config::LambdaConfig, + source::Source, telemetry::TelemetryConfig, }; use crate::constants::DEFAULT_HOME_DIR; use crate::models::udf_config::UdfConfig; @@ -71,6 +71,10 @@ pub struct Config { /// UDF specific configuration (eg. !Onnx) #[serde(default, skip_serializing_if = "Vec::is_empty")] pub udfs: Vec, + + /// Lambda functions. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub lambdas: Vec, } pub fn default_home_dir() -> String { diff --git a/dozer-types/src/models/lambda_config.rs b/dozer-types/src/models/lambda_config.rs new file mode 100644 index 0000000000..6e0c378443 --- /dev/null +++ b/dozer-types/src/models/lambda_config.rs @@ -0,0 +1,15 @@ +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, JsonSchema)] +#[serde(deny_unknown_fields)] +pub enum LambdaConfig { + JavaScript(JavaScriptLambda), +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct JavaScriptLambda { + pub endpoint: String, + pub module: String, +} diff --git a/dozer-types/src/models/mod.rs b/dozer-types/src/models/mod.rs index efa150af9c..a10d5fc11c 100644 --- a/dozer-types/src/models/mod.rs +++ b/dozer-types/src/models/mod.rs @@ -8,6 +8,7 @@ pub mod connection; pub mod flags; pub mod ingestion_types; mod json_schema_helper; +pub mod lambda_config; pub mod source; pub mod telemetry; pub mod udf_config; diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index 3971340f03..940b1052d1 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -81,6 +81,13 @@ "null" ] }, + "lambdas": { + "description": "Lambda functions.", + "type": "array", + "items": { + "$ref": "#/definitions/LambdaConfig" + } + }, "sources": { "description": "sources to ingest data related to particular connection", "type": "array", @@ -1066,6 +1073,22 @@ } } }, + "JavaScriptLambda": { + "type": "object", + "required": [ + "endpoint", + "module" + ], + "properties": { + "endpoint": { + "type": "string" + }, + "module": { + "type": "string" + } + }, + "additionalProperties": false + }, "KafkaConfig": { "examples": [ { @@ -1089,6 +1112,22 @@ } } }, + "LambdaConfig": { + "oneOf": [ + { + "type": "object", + "required": [ + "JavaScript" + ], + "properties": { + "JavaScript": { + "$ref": "#/definitions/JavaScriptLambda" + } + }, + "additionalProperties": false + } + ] + }, "LocalDetails": { "type": "object", "required": [