Skip to content

Commit

Permalink
feat: Implement dozer run lambda (#2202)
Browse files Browse the repository at this point in the history
* feat: Implement `dozer run lambda`

* feat: Use module's default export as lambda function

* refactor: Stop using `deno_runtime::MainWorker`
  • Loading branch information
chubei authored Nov 10, 2023
1 parent a12f3d0 commit 53fe6f2
Show file tree
Hide file tree
Showing 33 changed files with 1,578 additions and 69 deletions.
17 changes: 15 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ members = [
"dozer-log-python",
"dozer-utils",
"dozer-recordstore",
"dozer-lambda",
]
resolver = "2"

Expand Down
14 changes: 2 additions & 12 deletions dozer-api/src/cache_builder/endpoint_meta.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use dozer_cache::dozer_log::{reader::LogClient, schemas::EndpointSchema};
use dozer_tracing::Labels;
use dozer_types::{
grpc_types::internal::{
internal_pipeline_service_client::InternalPipelineServiceClient, BuildRequest,
},
serde_json,
grpc_types::internal::internal_pipeline_service_client::InternalPipelineServiceClient,
tonic::transport::Channel,
};

Expand All @@ -23,15 +20,8 @@ impl EndpointMeta {
endpoint: String,
) -> Result<(Self, LogClient), ApiInitError> {
// We establish the log stream first to avoid tonic auto-reconnecting without us knowing.
let log_client = LogClient::new(client, endpoint.clone()).await?;
let (log_client, schema) = LogClient::new(client, endpoint.clone()).await?;
let log_id = client.get_id(()).await?.into_inner().id;
let build = client
.describe_build(BuildRequest {
endpoint: endpoint.clone(),
})
.await?
.into_inner();
let schema = serde_json::from_str(&build.schema_string)?;

Ok((
Self {
Expand Down
5 changes: 3 additions & 2 deletions dozer-api/src/cache_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const READ_LOG_RETRY_INTERVAL: Duration = Duration::from_secs(1);
#[derive(Debug)]
pub struct CacheBuilder {
client: InternalPipelineServiceClient<Channel>,
endpoint: String,
cache_manager: Arc<dyn RwCacheManager>,
serving: Arc<ArcSwap<CacheReader>>,
labels: Labels,
Expand Down Expand Up @@ -80,6 +81,7 @@ impl CacheBuilder {
Ok((
Self {
client,
endpoint: endpoint.name.clone(),
cache_manager,
serving: Arc::new(ArcSwap::from_pointee(serving)),
labels: labels.labels().clone(),
Expand All @@ -104,7 +106,7 @@ impl CacheBuilder {
loop {
// Connect to the endpoint's log.
let Some(connect_result) = runtime.block_on(with_cancel(
connect_until_success(&mut self.client, &self.log_reader_options.endpoint),
connect_until_success(&mut self.client, &self.endpoint),
cancel,
)) else {
return Ok(());
Expand Down Expand Up @@ -280,7 +282,6 @@ fn check_cache_schema(cache: &dyn RoCache, given: SchemaWithIndex) -> Result<(),

fn get_log_reader_options(endpoint: &ApiEndpoint) -> LogReaderOptions {
LogReaderOptions {
endpoint: endpoint.name.clone(),
batch_size: endpoint
.log_reader_options
.batch_size
Expand Down
1 change: 1 addition & 0 deletions dozer-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dozer-types = { path = "../dozer-types" }
dozer-tracing = { path = "../dozer-tracing" }
dozer-storage = { path = "../dozer-storage" }
dozer-recordstore = { path = "../dozer-recordstore" }
dozer-lambda = { path = "../dozer-lambda" }

uuid = { version = "1.3.0", features = ["v4", "serde"] }
tokio = { version = "1", features = ["full"] }
Expand Down
5 changes: 5 additions & 0 deletions dozer-cli/src/cli/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ pub enum RunCommands {
API endpoints through REST and GRPC (depends on configuration)"
)]
Api,
#[command(
about = "Run lambda functions",
long_about = "Run lambda functions. Lambda functions are JavaScript or Python functions that are called when a new operation is output."
)]
Lambda,
}

#[derive(Debug, Args)]
Expand Down
2 changes: 2 additions & 0 deletions dozer-cli/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub enum OrchestrationError {
LockedNoLockFile,
#[error("Command was aborted")]
Aborted,
#[error("Failed to create lambda runtime: {0}")]
CreateLambdaRuntime(#[from] dozer_lambda::Error),
}

#[derive(Error, Debug)]
Expand Down
6 changes: 4 additions & 2 deletions dozer-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,16 @@ fn run() -> Result<(), OrchestrationError> {
Commands::Run(run) => match run.command {
Some(RunCommands::Api) => {
render_logo();

dozer.run_api(shutdown_receiver)
}
Some(RunCommands::App) => {
render_logo();

dozer.run_apps(shutdown_receiver, None)
}
Some(RunCommands::Lambda) => {
render_logo();
dozer.run_lambda(shutdown_receiver)
}
None => {
render_logo();
dozer.run_all(shutdown_receiver, run.locked)
Expand Down
57 changes: 45 additions & 12 deletions dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ use dozer_core::app::AppPipeline;
use dozer_core::dag_schemas::DagSchemas;
use dozer_tracing::LabelsAndProgress;
use dozer_types::constants::LOCK_FILE;
use dozer_types::models::api_config::{default_app_grpc_host, default_app_grpc_port};
use dozer_types::models::api_config::{
default_app_grpc_host, default_app_grpc_port, AppGrpcOptions,
};
use dozer_types::models::flags::{default_dynamic, default_push_events};
use dozer_types::models::lambda_config::LambdaConfig;
use tokio::select;

use crate::console_helper::get_colored_text;
Expand Down Expand Up @@ -94,17 +97,7 @@ impl SimpleOrchestrator {
(None, None)
};

let internal_grpc_config = &self.config.api.app_grpc;
let app_server_url = format!(
"http://{}:{}",
internal_grpc_config
.host
.clone()
.unwrap_or_else(default_app_grpc_host),
internal_grpc_config
.port
.unwrap_or_else(default_app_grpc_port)
);
let app_server_url = app_url(&self.config.api.app_grpc);
let cache_manager = Arc::new(
LmdbRwCacheManager::new(get_cache_manager_options(&self.config))
.map_err(OrchestrationError::CacheInitFailed)?,
Expand Down Expand Up @@ -283,6 +276,38 @@ impl SimpleOrchestrator {
})
}

pub fn run_lambda(&mut self, shutdown: ShutdownReceiver) -> Result<(), OrchestrationError> {
let runtime = self.runtime.clone();
let result = self.run_lambda_impl();
let shutdown = shutdown.create_shutdown_future();
runtime.block_on(async move {
select! {
() = shutdown => Ok(()),
result = result => result,
}
})
}

async fn run_lambda_impl(&mut self) -> Result<(), OrchestrationError> {
let lambda_modules = self
.config
.lambdas
.iter()
.map(|lambda| match lambda {
LambdaConfig::JavaScript(module) => module.clone(),
})
.collect();
let runtime = dozer_lambda::JsRuntime::new(
self.runtime.clone(),
app_url(&self.config.api.app_grpc),
lambda_modules,
Default::default(),
)
.await?;
runtime.run().await;
Ok(())
}

#[allow(clippy::type_complexity)]
pub async fn list_connectors(
&self,
Expand Down Expand Up @@ -479,3 +504,11 @@ pub fn validate_sql(sql: String) -> Result<(), PipelineError> {
pub fn lockfile_path(base_directory: Utf8PathBuf) -> Utf8PathBuf {
base_directory.join(LOCK_FILE)
}

fn app_url(config: &AppGrpcOptions) -> String {
format!(
"http://{}:{}",
config.host.clone().unwrap_or_else(default_app_grpc_host),
config.port.unwrap_or_else(default_app_grpc_port)
)
}
2 changes: 2 additions & 0 deletions dozer-deno/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dozer-types = { path = "../dozer-types" }
deno_runtime = "0.129.0"
deno_ast = "0.29.5"
tokio = "1.33.0"
94 changes: 94 additions & 0 deletions dozer-deno/js/06_util.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.

const primordials = globalThis.__bootstrap.primordials;
const {
Promise,
SafeArrayIterator,
} = primordials;

// WARNING: Keep this in sync with Rust (search for LogLevel)
const LogLevel = {
Error: 1,
Warn: 2,
Info: 3,
Debug: 4,
};

let logLevel = 3;
let logSource = "JS";

function setLogLevel(level, source) {
logLevel = level;
if (source) {
logSource = source;
}
}

function log(...args) {
if (logLevel >= LogLevel.Debug) {
// if we destructure `console` off `globalThis` too early, we don't bind to
// the right console, therefore we don't log anything out.
globalThis.console.error(
`DEBUG ${logSource} -`,
...new SafeArrayIterator(args),
);
}
}

function createResolvable() {
let resolve;
let reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
promise.resolve = resolve;
promise.reject = reject;
return promise;
}

function writable(value) {
return {
value,
writable: true,
enumerable: true,
configurable: true,
};
}

function nonEnumerable(value) {
return {
value,
writable: true,
enumerable: false,
configurable: true,
};
}

function readOnly(value) {
return {
value,
enumerable: true,
writable: false,
configurable: true,
};
}

function getterOnly(getter) {
return {
get: getter,
set() { },
enumerable: true,
configurable: true,
};
}

export {
createResolvable,
getterOnly,
log,
nonEnumerable,
readOnly,
setLogLevel,
writable,
};
Loading

0 comments on commit 53fe6f2

Please sign in to comment.