Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(air): embedded Starlark script support #853

Open
wants to merge 36 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
a5e551d
Implement starlark traits for `JValue`
monoid Jul 19, 2024
38cd002
Implement interpreter invocation
monoid Jul 19, 2024
6d209ca
TODO catchable
monoid Jul 22, 2024
2727fe5
Add Starlark tetraplet and value tests
monoid Jul 23, 2024
7d788e7
Arrange Starlark error handling
monoid Jul 23, 2024
5fb4846
Simplify returning error
monoid Jul 24, 2024
3221016
Add an embedded script support to AIR lexer
monoid Jul 25, 2024
224c5ee
Add `embed` syntax to AIR parser
monoid Jul 25, 2024
d6650f9
More starlark tests
monoid Jul 25, 2024
804ba8c
Add `embed` parser tests
monoid Jul 26, 2024
e06aaa2
starlark's get_tetraplet returns list
monoid Jul 30, 2024
0e4c9f5
Implement validator for `embed`
monoid Jul 30, 2024
f916166
Remove dbg output
monoid Jul 30, 2024
9239cb6
Execute `embed` statement
monoid Jul 30, 2024
2fe198f
Test `embed` errors
monoid Jul 31, 2024
5aa104d
Revert "Simplify returning error"
monoid Jul 31, 2024
fbfe9ff
Handle Starlark's `fail` in AquaVM
monoid Jul 31, 2024
d601a48
Some comments
monoid Jul 31, 2024
5a71674
`embed` args test
monoid Jul 31, 2024
9628304
Update [email protected] to 0.30.36
monoid Aug 1, 2024
646796b
Make clippy happy
monoid Aug 1, 2024
8a4bb18
Make fmt happy
monoid Aug 1, 2024
97fe1ce
Use raw strings `#".."#` for embed
monoid Aug 1, 2024
c8f8297
add embed join behavior test
monoid Aug 7, 2024
9725bf9
add `embed_zip_reverse` test
monoid Aug 8, 2024
7df9263
fix last-minute change in tests
monoid Aug 8, 2024
39dede1
make fmt happy
monoid Aug 8, 2024
aba0423
Document `embed` instruction
monoid Aug 8, 2024
9e2f123
Fix typo
monoid Aug 8, 2024
721e61e
Support raw strings in testing-framework
monoid Aug 19, 2024
7b746e3
Support embed in testing-framework
monoid Aug 19, 2024
c38a1b9
Implement RawString in testing-framework
monoid Aug 20, 2024
38f51b8
Add Starlark benchmarks
monoid Aug 20, 2024
2bbac79
Add tracing to Starlark `execute`
monoid Aug 20, 2024
aa23ca4
Run starlark benchmarks on MacOS M1
monoid Aug 20, 2024
9b02be1
Run starlark benchmarks on Linux x64
monoid Aug 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
478 changes: 461 additions & 17 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"crates/air-lib/interpreter-interface",
"crates/air-lib/interpreter-sede",
"crates/air-lib/interpreter-signatures",
"crates/air-lib/interpreter-starlark",
"crates/air-lib/interpreter-value",
"crates/air-lib/lambda/ast",
"crates/air-lib/lambda/parser",
Expand Down
1 change: 1 addition & 0 deletions air/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ air-interpreter-sede = { version = "0.1.0", path = "../crates/air-lib/interprete
air-interpreter-signatures = { version = "0.1.7", path = "../crates/air-lib/interpreter-signatures", features = ["rkyv"] }
air-interpreter-value = { version = "0.1.0", path = "../crates/air-lib/interpreter-value" }
air-interpreter-interface = { version = "0.19.0", path = "../crates/air-lib/interpreter-interface", default-features = false }
air-interpreter-starlark = { version = "0.1.0", path = "../crates/air-lib/interpreter-starlark", default-features = false }
air-log-targets = { version = "0.1.0", path = "../crates/air-lib/log-targets" }
air-lambda-ast = { version = "0.1.0", path = "../crates/air-lib/lambda/ast" }
air-lambda-parser = { version = "0.1.0", path = "../crates/air-lib/lambda/parser" }
Expand Down
3 changes: 3 additions & 0 deletions air/src/execution_step/errors/catchable_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ pub enum CatchableError {
/// Stream map related errors.
#[error(transparent)]
StreamMapError(#[from] StreamMapError),

#[error("Starlark error: {0}")]
StarlarkError(air_interpreter_starlark::ExecutionError),
}

impl From<LambdaError> for Rc<CatchableError> {
Expand Down
3 changes: 3 additions & 0 deletions air/src/execution_step/errors/uncatchable_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ pub enum UncatchableError {

#[error("failed to serialize call arguments {0}")]
CallArgumentsSerializationFailed(<CallArgumentsRepr as Representation>::SerializeError),

#[error("Starlark error: {0}")]
StarlarkError(air_interpreter_starlark::ExecutionError),
}

impl ToErrorCode for UncatchableError {
Expand Down
113 changes: 113 additions & 0 deletions air/src/execution_step/instructions/embed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* AquaVM Workflow Engine
*
* Copyright (C) 2024 Fluence DAO
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation version 3 of the
* License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

use std::rc::Rc;

use air_interpreter_data::Provenance;
use air_interpreter_starlark::execute as starlark_execute;
use air_parser::ast::Embed;
use air_parser::ast::ImmutableValue;
use polyplets::SecurityTetraplet;

use super::fail::fail_with_error_object;
use super::ExecutableInstruction;
use super::ExecutionCtx;
use super::ExecutionResult;
use super::TraceHandler;
use crate::execution_step::errors::Joinable as _;
use crate::execution_step::resolver::Resolvable as _;
use crate::execution_step::LiteralAggregate;
use crate::execution_step::RcSecurityTetraplets;
use crate::execution_step::ValueAggregate;
use crate::joinable;
use crate::CatchableError;
use crate::ExecutionError;
use crate::JValue;
use crate::UncatchableError;

impl<'i> ExecutableInstruction<'i> for Embed<'i> {
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, _trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
let args = joinable!(collect_args(&self.args, exec_ctx), exec_ctx, ())?;

let output_value = starlark_execute(self.script, args).map_err(classify_starlark_error)?;
match output_value {
Ok(value) => maybe_set_output_value(&self.output, value, exec_ctx),
Err((error_code, error_message)) => {
let error_object = crate::execution_step::execution_context::error_from_raw_fields_w_peerid(
error_code.into(),
&error_message,
&self.to_string(),
exec_ctx.run_parameters.init_peer_id.as_ref(),
);
let literal_tetraplet =
SecurityTetraplet::literal_tetraplet(exec_ctx.run_parameters.init_peer_id.as_ref());
let literal_tetraplet = Rc::new(literal_tetraplet);
// in (fail x y), x and y are always literals
let provenance = Provenance::literal();

fail_with_error_object(exec_ctx, error_object, Some(literal_tetraplet), provenance)
}
}
}
}

fn collect_args(
args: &[ImmutableValue<'_>],
exec_ctx: &ExecutionCtx<'_>,
) -> ExecutionResult<Vec<(JValue, RcSecurityTetraplets)>> {
let mut result = Vec::with_capacity(args.len());

for instruction_value in args {
let (arg, tetraplet, _) = instruction_value.resolve(exec_ctx)?;
result.push((arg, tetraplet));
}
Ok(result)
}

fn classify_starlark_error(err: air_interpreter_starlark::ExecutionError) -> ExecutionError {
use air_interpreter_starlark::ExecutionError::*;

match err {
// TODO perhaps, Other should be uncatchable
Value(_) | Function(_) | Other(_) => ExecutionError::Catchable(CatchableError::StarlarkError(err).into()),
Scope(_) | Lexer(_) | Internal(_) => ExecutionError::Uncatchable(UncatchableError::StarlarkError(err)),
}
}

fn maybe_set_output_value(
embed_output_value: &air_parser::ast::EmbedOutputValue<'_>,
result_value: air_interpreter_value::JValue,
exec_ctx: &mut ExecutionCtx<'_>,
) -> Result<(), ExecutionError> {
match embed_output_value {
air_parser::ast::EmbedOutputValue::Scalar(scalar) => {
// TODO for now, we treat value produced by Starlark as a literal, as it has to be
// same on every peer.
let result = ValueAggregate::from_literal_result(LiteralAggregate::new(
result_value,
exec_ctx.run_parameters.init_peer_id.clone(),
0.into(), // TODO is it correct?
));

exec_ctx.scalars.set_scalar_value(scalar.name, result)?;
}
air_parser::ast::EmbedOutputValue::None => {}
}
Ok(())
}
2 changes: 1 addition & 1 deletion air/src/execution_step/instructions/fail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ fn fail_with_error(exec_ctx: &mut ExecutionCtx<'_>) -> ExecutionResult<()> {
result
}

fn fail_with_error_object(
pub(crate) fn fail_with_error_object(
exec_ctx: &mut ExecutionCtx<'_>,
error: JValue,
tetraplet: Option<RcSecurityTetraplet>,
Expand Down
2 changes: 2 additions & 0 deletions air/src/execution_step/instructions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod canon_map;
mod canon_stream_map_scalar;
mod canon_utils;
mod compare_matchable;
mod embed;
mod fail;
mod fold;
mod fold_scalar;
Expand Down Expand Up @@ -95,6 +96,7 @@ impl<'i> ExecutableInstruction<'i> for Instruction<'i> {
Instruction::Xor(xor) => execute!(self, xor, exec_ctx, trace_ctx),
Instruction::Match(match_) => execute!(self, match_, exec_ctx, trace_ctx),
Instruction::MisMatch(mismatch) => execute!(self, mismatch, exec_ctx, trace_ctx),
Instruction::Embed(embed) => execute!(self, embed, exec_ctx, trace_ctx),

Instruction::Error => unreachable!("should not execute if parsing succeeded. QED."),
}
Expand Down
217 changes: 217 additions & 0 deletions air/tests/test_module/instructions/embed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
* AquaVM Workflow Engine
*
* Copyright (C) 2024 Fluence DAO
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation version 3 of the
* License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

use air::{CatchableError, UncatchableError};
use air_interpreter_starlark::ExecutionError as StarlarkExecutionError;
use air_test_utils::prelude::*;

#[tokio::test]
async fn embed_basic() {
let mut vm = create_avm(echo_call_service(), "").await;

let script = r##"
(seq
(embed []
#"
"a string\nwith escape"
"#
var)
(call %init_peer_id% ("" "") [var] result_name))"##;

let result = checked_call_vm!(vm, <_>::default(), script, "", "");
assert!(result.next_peer_pks.is_empty());

let expected_trace = vec![scalar!(
json!("a string\nwith escape"),
peer = "",
args = ["a string\nwith escape"]
)];

let trace = trace_from_result(&result);
assert_eq!(&*trace, expected_trace);
}

#[tokio::test]
async fn embed_args() {
let init_peer_id = "my_id";
let mut vm = create_avm(echo_call_service(), init_peer_id).await;

let script = r##"
(seq
(call %init_peer_id% ("myservice" "myfunc") [42] arg)
(seq
(embed [arg]
#"
t = get_tetraplet(0)[0]
"{}: {}/{}:{}".format(get_value(0), t.peer_pk, t.service_id, t.function_name)
"#
var)
(call %init_peer_id% ("" "") [var] result_name)))"##;

let run_params = TestRunParameters::from_init_peer_id(init_peer_id);
let result = checked_call_vm!(vm, run_params, script, "", "");
assert!(result.next_peer_pks.is_empty());

let expected_val = format!("42: {init_peer_id}/myservice:myfunc");
let expected_trace = vec![
scalar!(
json!(42),
peer = init_peer_id,
service = "myservice",
function = "myfunc",
args = [42]
),
scalar!(json!(expected_val), peer = init_peer_id, args = [expected_val]),
];

let trace = trace_from_result(&result);
assert_eq!(&*trace, expected_trace);
}

#[tokio::test]
async fn embed_error_fail() {
let mut vm = create_avm(echo_call_service(), "").await;

let script = r##"
(xor
(embed []
#"
fail(42, "my message")
"#
var)
(call %init_peer_id% ("" "") [%last_error%.$.error_code %last_error%.$.message] result_name))"##;

let result = checked_call_vm!(vm, <_>::default(), script, "", "");
assert!(result.next_peer_pks.is_empty());

let expected_trace = vec![scalar!(json!(42), peer = "", args = [json!(42), json!("my message")])];

let trace = trace_from_result(&result);
assert_eq!(&*trace, expected_trace);
}

#[tokio::test]
async fn embed_error_value() {
let mut vm = create_avm(echo_call_service(), "").await;

let script = r##"
(embed []
#"
42 + "string"
"#
var)"##;

let result = call_vm!(vm, <_>::default(), script, "", "");
let expected_error = CatchableError::StarlarkError(StarlarkExecutionError::Value(
"error: Operation `+` not supported for types `int` and `string`\n --> dummy.star:2:1\n |\n2 | 42 + \"string\"\n | ^^^^^^^^^^^^^\n |\n".to_owned(),
));
assert_error_eq!(&result, expected_error);
}

// TODO 42.length gives Other, and it is a problem
#[tokio::test]
async fn embed_error_lexer() {
let mut vm = create_avm(echo_call_service(), "").await;

let script = r##"
(embed []
#"
"an unterminated string
"#
var)"##;

let result = call_vm!(vm, <_>::default(), script, "", "");
let expected_error = UncatchableError::StarlarkError(StarlarkExecutionError::Lexer(
"Parse error: unfinished string literal".to_owned(),
));
assert_error_eq!(&result, expected_error);
}

#[tokio::test]
async fn embed_with_join_behavior() {
let mut vm = create_avm(echo_call_service(), "").await;

let script = r##"
(par
(call "other_peer" ("" "") [] var)
(seq
(embed [var] #"var + var"# var2)
(call %init_peer_id% ("" "") [var2])))"##;

let result = call_vm!(vm, <_>::default(), script, "", "");

assert_eq!(result.error_message, "");
assert_eq!(result.ret_code, 0);

let trace = trace_from_result(&result);
assert_eq!(trace.len(), 2);
}

#[tokio::test]
async fn embed_zip_reverse() {
let mut vm = create_avm(echo_call_service(), "").await;

let script = r##"
(seq
(seq
(seq
(ap 1 $stream)
(ap 2 $stream))
(canon %init_peer_id% $stream #canon))
(seq
(embed [#canon #canon]
#"
def main():
v1 = get_value(0)
v2 = get_value(1)

if get_tetraplet(0)[0].peer_pk != get_tetraplet(1)[0].peer_pk:
fail(42, 'tetraplet peer_pk mismatch')

return list(zip(v1, reversed(v2)))

main()
"#
var2)
(call %init_peer_id% ("" "") [var2] var3)))"##;

let result = call_vm!(vm, <_>::default(), script, "", "");

assert_eq!(result.error_message, "", "{}", result.error_message);
assert_eq!(result.ret_code, 0);

let expected_trace = vec![
ap(0),
ap(0),
canon(json!({
"tetraplet": {"function_name": "", "lens": "", "peer_pk": "", "service_id": ""},
"values": [{
"tetraplet": {"function_name": "", "lens": "", "peer_pk": "", "service_id": ""},
"result": 1
}, {
"tetraplet": {"function_name": "", "lens": "", "peer_pk": "", "service_id": ""},
"result": 2
}]
})),
scalar!(json!([(1, 2), (2, 1)]), args = [json!([(1, 2), (2, 1)])]),
];
let data = data_from_result(&result);
let trace = trace_from_result(&result);
assert_eq!(&*trace, expected_trace, "{:#?}", data.cid_info);
}
Loading
Loading