From 379f581780b0759106f9c6b63a0f1e650cf506b1 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Mon, 6 Mar 2023 19:20:59 +0800 Subject: [PATCH] test: add Integrated Test for Coprocessor& fix minor bugs (#1122) * feat: cache `Runtime` * fix: coprstream schema not set * test: integrated tests for Coprocessor * fix: UDF fixed * style: remove unused import * chore: remove more unused import * feat: `filter`, (r)floordiv for Vector * chore: CR advices * feat: auto convert to `lit` * chore: fix typo * feat: from&to `pyarrow.array` * feat: allow `pyarrow.array` as args to builtins * chore: cargo fmt * test: CI add `pyarrow` * test: install Python&PyArrow in CI * test: not cache depend for now * chore: CR advices * test: fix name * style: rename --- .github/workflows/develop.yml | 6 + Cargo.lock | 3 + Cargo.toml | 2 +- docker/Dockerfile | 4 +- src/common/function/src/scalars/udf.rs | 16 +- src/script/Cargo.toml | 1 + src/script/src/python/engine.rs | 92 ++++++--- src/script/src/python/ffi_types/copr.rs | 28 ++- src/script/src/python/ffi_types/pair_tests.rs | 79 +++++++- .../ffi_types/pair_tests/sample_testcases.rs | 182 ++++++++++++++---- .../src/python/ffi_types/vector/tests.rs | 25 ++- src/script/src/python/pyo3/builtins.rs | 43 +++-- src/script/src/python/pyo3/copr_impl.rs | 20 +- src/script/src/python/pyo3/dataframe_impl.rs | 28 ++- src/script/src/python/pyo3/utils.rs | 14 +- src/script/src/python/pyo3/vector_impl.rs | 89 ++++++++- src/script/src/python/rspython/builtins.rs | 25 ++- src/script/src/python/rspython/copr_impl.rs | 13 +- .../src/python/rspython/dataframe_impl.rs | 28 ++- src/script/src/python/rspython/testcases.ron | 4 +- src/script/src/python/rspython/utils.rs | 44 +++-- src/script/src/python/utils.rs | 12 +- src/servers/tests/py_script/mod.rs | 2 +- tests-integration/tests/http.rs | 2 +- 24 files changed, 576 insertions(+), 186 deletions(-) diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 3f011d33acc8..909a46cc1ad7 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -207,6 +207,12 @@ jobs: uses: Swatinem/rust-cache@v2 - name: Install latest nextest release uses: taiki-e/install-action@nextest + - name: Install Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + - name: Install PyArrow Package + run: pip install pyarrow - name: Install cargo-llvm-cov uses: taiki-e/install-action@cargo-llvm-cov - name: Collect coverage data diff --git a/Cargo.lock b/Cargo.lock index a0a2f96fe5c3..bb751e53b13d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -209,6 +209,7 @@ dependencies = [ "arrow-select", "arrow-string", "comfy-table", + "pyo3", ] [[package]] @@ -390,6 +391,7 @@ version = "33.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb327717d87eb94be5eff3b0cb8987f54059d343ee5235abf7f143c85f54cfc8" dependencies = [ + "bitflags", "serde", ] @@ -6657,6 +6659,7 @@ checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2" name = "script" version = "0.1.0" dependencies = [ + "arrow", "async-trait", "catalog", "common-catalog", diff --git a/Cargo.toml b/Cargo.toml index 660677898ec5..abc9f39dd2ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,7 @@ edition = "2021" license = "Apache-2.0" [workspace.dependencies] -arrow = "33.0" +arrow = { version = "33.0", features = ["pyarrow"] } arrow-array = "33.0" arrow-flight = "33.0" arrow-schema = { version = "33.0", features = ["serde"] } diff --git a/docker/Dockerfile b/docker/Dockerfile index 5a8bfd1f26b4..622c9cc1ff57 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -10,7 +10,9 @@ RUN apt-get update && apt-get install -y \ curl \ build-essential \ pkg-config \ - python3-dev + python3 \ + python3-dev \ + && pip install pyarrow # Install Rust. SHELL ["/bin/bash", "-c"] diff --git a/src/common/function/src/scalars/udf.rs b/src/common/function/src/scalars/udf.rs index 75a61975db61..cd0ac48935dd 100644 --- a/src/common/function/src/scalars/udf.rs +++ b/src/common/function/src/scalars/udf.rs @@ -14,9 +14,9 @@ use std::sync::Arc; -use common_query::error::{ExecuteFunctionSnafu, FromScalarValueSnafu}; +use common_query::error::FromScalarValueSnafu; use common_query::prelude::{ - ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUdf, ScalarValue, + ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUdf, }; use datatypes::error::Error as DataTypeError; use datatypes::prelude::*; @@ -54,16 +54,8 @@ pub fn create_udf(func: FunctionRef) -> ScalarUdf { .collect(); let result = func_cloned.eval(func_ctx, &args.context(FromScalarValueSnafu)?); - - let udf = if len.is_some() { - result.map(ColumnarValue::Vector)? - } else { - ScalarValue::try_from_array(&result?.to_arrow_array(), 0) - .map(ColumnarValue::Scalar) - .context(ExecuteFunctionSnafu)? - }; - - Ok(udf) + let udf_result = result.map(ColumnarValue::Vector)?; + Ok(udf_result) }); ScalarUdf::new(func.name(), &func.signature(), &return_type, &fun) diff --git a/src/script/Cargo.toml b/src/script/Cargo.toml index 5b74f8e30d1b..4f48214669ad 100644 --- a/src/script/Cargo.toml +++ b/src/script/Cargo.toml @@ -24,6 +24,7 @@ python = [ ] [dependencies] +arrow.workspace = true async-trait.workspace = true catalog = { path = "../catalog" } common-catalog = { path = "../common/catalog" } diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index f92e87b1fce5..0f3a01be6be9 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -30,7 +30,7 @@ use common_recordbatch::{ RecordBatch, RecordBatchStream, RecordBatches, SendableRecordBatchStream, }; use datafusion_expr::Volatility; -use datatypes::schema::{ColumnSchema, SchemaRef}; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::VectorRef; use futures::Stream; use query::parser::{QueryLanguageParser, QueryStatement}; @@ -40,9 +40,8 @@ use snafu::{ensure, ResultExt}; use sql::statements::statement::Statement; use crate::engine::{CompileContext, EvalContext, Script, ScriptEngine}; -use crate::python::error::{self, Result}; +use crate::python::error::{self, PyRuntimeSnafu, Result}; use crate::python::ffi_types::copr::{exec_parsed, parse, AnnotationInfo, CoprocessorRef}; - const PY_ENGINE: &str = "python"; #[derive(Debug)] @@ -81,17 +80,21 @@ impl PyUDF { /// Fake a schema, should only be used with dynamically eval a Python Udf fn fake_schema(&self, columns: &[VectorRef]) -> SchemaRef { - let empty_args = vec![]; - let arg_names = self - .copr - .deco_args - .arg_names - .as_ref() - .unwrap_or(&empty_args); + // try to give schema right names in args so script can run as UDF without modify + // because when running as PyUDF, the incoming columns should have matching names to make sense + // for Coprocessor + let args = self.copr.deco_args.arg_names.clone(); + let try_get_name = |i: usize| { + if let Some(arg_name) = args.as_ref().and_then(|args| args.get(i)) { + arg_name.clone() + } else { + format!("name_{i}") + } + }; let col_sch: Vec<_> = columns .iter() .enumerate() - .map(|(i, col)| ColumnSchema::new(arg_names[i].clone(), col.data_type(), true)) + .map(|(i, col)| ColumnSchema::new(try_get_name(i), col.data_type(), true)) .collect(); let schema = datatypes::schema::Schema::new(col_sch); Arc::new(schema) @@ -172,7 +175,7 @@ impl Function for PyUDF { pub struct PyScript { query_engine: QueryEngineRef, - copr: CoprocessorRef, + pub(crate) copr: CoprocessorRef, } impl PyScript { @@ -188,12 +191,48 @@ impl PyScript { pub struct CoprStream { stream: SendableRecordBatchStream, copr: CoprocessorRef, + ret_schema: SchemaRef, params: HashMap, } +impl CoprStream { + fn try_new( + stream: SendableRecordBatchStream, + copr: CoprocessorRef, + params: HashMap, + ) -> Result { + let mut schema = vec![]; + for (ty, name) in copr.return_types.iter().zip(&copr.deco_args.ret_names) { + let ty = ty.clone().ok_or( + PyRuntimeSnafu { + msg: "return type not annotated, can't generate schema", + } + .build(), + )?; + let is_nullable = ty.is_nullable; + let ty = ty.datatype.ok_or( + PyRuntimeSnafu { + msg: "return type not annotated, can't generate schema", + } + .build(), + )?; + let col_schema = ColumnSchema::new(name, ty, is_nullable); + schema.push(col_schema); + } + let ret_schema = Arc::new(Schema::new(schema)); + Ok(Self { + stream, + copr, + ret_schema, + params, + }) + } +} + impl RecordBatchStream for CoprStream { fn schema(&self) -> SchemaRef { - self.stream.schema() + // FIXME(discord9): use copr returns for schema + self.ret_schema.clone() } } @@ -207,7 +246,6 @@ impl Stream for CoprStream { let batch = exec_parsed(&self.copr, &Some(recordbatch), &self.params) .map_err(BoxedError::new) .context(ExternalSnafu)?; - Poll::Ready(Some(Ok(batch))) } Poll::Ready(other) => Poll::Ready(other), @@ -246,11 +284,9 @@ impl Script for PyScript { let res = self.query_engine.execute(&plan).await?; let copr = self.copr.clone(); match res { - Output::Stream(stream) => Ok(Output::Stream(Box::pin(CoprStream { - params, - copr, - stream, - }))), + Output::Stream(stream) => Ok(Output::Stream(Box::pin(CoprStream::try_new( + stream, copr, params, + )?))), _ => unreachable!(), } } else { @@ -296,7 +332,8 @@ impl ScriptEngine for PyEngine { }) } } - +#[cfg(test)] +pub(crate) use tests::sample_script_engine; #[cfg(test)] mod tests { use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider}; @@ -311,7 +348,7 @@ mod tests { use super::*; - fn sample_script_engine() -> PyEngine { + pub(crate) fn sample_script_engine() -> PyEngine { let catalog_list = catalog::local::new_memory_catalog_list().unwrap(); let default_schema = Arc::new(MemorySchemaProvider::new()); @@ -340,7 +377,7 @@ mod tests { import greptime as gt @copr(args=["number"], returns = ["number"], sql = "select * from numbers") -def test(number)->vector[u32]: +def test(number) -> vector[u32]: return query.sql("select * from numbers")[0][0] "#; let script = script_engine @@ -367,7 +404,7 @@ def test(number)->vector[u32]: let script = r#" @copr(returns = ["number"]) -def test(**params)->vector[i64]: +def test(**params) -> vector[i64]: return int(params['a']) + int(params['b']) "#; let script = script_engine @@ -396,11 +433,10 @@ def test(**params)->vector[i64]: let script_engine = sample_script_engine(); let script = r#" -import greptime as gt -from data_frame import col +from greptime import col @copr(args=["number"], returns = ["number"], sql = "select * from numbers") -def test(number)->vector[u32]: +def test(number) -> vector[u32]: return dataframe.filter(col("number")==col("number")).collect()[0][0] "#; let script = script_engine @@ -432,7 +468,7 @@ def add(a, b): return a + b; @copr(args=["a", "b", "c"], returns = ["r"], sql="select number as a,number as b,number as c from numbers limit 100") -def test(a, b, c): +def test(a, b, c) -> vector[f64]: return add(a, b) / g.sqrt(c + 1) "#; let script = script_engine @@ -470,7 +506,7 @@ def test(a, b, c): import greptime as gt @copr(args=["number"], returns = ["r"], sql="select number from numbers limit 100") -def test(a): +def test(a) -> vector[i64]: return gt.vector([x for x in a if x % 2 == 0]) "#; let script = script_engine diff --git a/src/script/src/python/ffi_types/copr.rs b/src/script/src/python/ffi_types/copr.rs index 8af21e6b37b7..2ec5b4d9062f 100644 --- a/src/script/src/python/ffi_types/copr.rs +++ b/src/script/src/python/ffi_types/copr.rs @@ -219,7 +219,7 @@ pub(crate) fn select_from_rb(rb: &RecordBatch, fetch_names: &[String]) -> Result .iter() .map(|name| { let vector = rb.column_by_name(name).with_context(|| OtherSnafu { - reason: format!("Can't find field name {name}"), + reason: format!("Can't find field name {name} in all columns in {rb:?}"), })?; Ok(PyVector::from(vector.clone())) }) @@ -229,15 +229,29 @@ pub(crate) fn select_from_rb(rb: &RecordBatch, fetch_names: &[String]) -> Result /// match between arguments' real type and annotation types /// if type anno is `vector[_]` then use real type(from RecordBatch's schema) pub(crate) fn check_args_anno_real_type( + arg_names: &[String], args: &[PyVector], copr: &Coprocessor, rb: &RecordBatch, ) -> Result<()> { + ensure!( + arg_names.len() == args.len(), + OtherSnafu { + reason: format!("arg_names:{arg_names:?} and args{args:?}'s length is different") + } + ); for (idx, arg) in args.iter().enumerate() { let anno_ty = copr.arg_types[idx].clone(); let real_ty = arg.to_arrow_array().data_type().clone(); let real_ty = ConcreteDataType::from_arrow_type(&real_ty); - let is_nullable: bool = rb.schema.column_schemas()[idx].is_nullable(); + let arg_name = arg_names[idx].clone(); + let col_idx = rb.schema.column_index_by_name(&arg_name).ok_or( + OtherSnafu { + reason: format!("Can't find column by name {arg_name}"), + } + .build(), + )?; + let is_nullable: bool = rb.schema.column_schemas()[col_idx].is_nullable(); ensure!( anno_ty .clone() @@ -424,11 +438,13 @@ pub fn exec_parsed( pyo3_exec_parsed(copr, rb, params) } #[cfg(not(feature = "pyo3_backend"))] - OtherSnafu { - reason: "`pyo3` feature is disabled, therefore can't run scripts in cpython" - .to_string(), + { + OtherSnafu { + reason: "`pyo3` feature is disabled, therefore can't run scripts in cpython" + .to_string(), + } + .fail() } - .fail() } } } diff --git a/src/script/src/python/ffi_types/pair_tests.rs b/src/script/src/python/ffi_types/pair_tests.rs index 64e1cc685dc6..c322ba651407 100644 --- a/src/script/src/python/ffi_types/pair_tests.rs +++ b/src/script/src/python/ffi_types/pair_tests.rs @@ -15,29 +15,100 @@ mod sample_testcases; use std::collections::HashMap; +use std::sync::Arc; +use common_query::Output; +use common_recordbatch::RecordBatch; use datafusion::arrow::array::Float64Array; use datafusion::arrow::compute; use datatypes::arrow::datatypes::DataType as ArrowDataType; +use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::VectorRef; #[cfg(feature = "pyo3_backend")] use pyo3::{types::PyDict, Python}; use rustpython_compiler::Mode; -use crate::python::ffi_types::pair_tests::sample_testcases::sample_test_case; +use crate::engine::{CompileContext, EvalContext, Script, ScriptEngine}; +use crate::python::engine::sample_script_engine; +use crate::python::ffi_types::pair_tests::sample_testcases::{ + generate_copr_intgrate_tests, sample_test_case, +}; use crate::python::ffi_types::PyVector; #[cfg(feature = "pyo3_backend")] use crate::python::pyo3::{init_cpython_interpreter, vector_impl::into_pyo3_cell}; use crate::python::rspython::init_interpreter; +// TODO(discord9): paired test for slicing Vector +// & slice tests & lit() function for dataframe & test with full coprocessor&query engine ability /// generate testcases that should be tested in paired both in RustPython and CPython #[derive(Debug, Clone)] -struct TestCase { +struct CodeBlockTestCase { input: HashMap, script: String, expect: VectorRef, } +/// TODO(discord9): input a simple recordbatch, set a query engine, and such, +/// so that for a full Coprocessor it will work +#[derive(Debug, Clone, Default)] +struct CoprTestCase { + // will be build to a RecordBatch and feed to coprocessor + script: String, + expect: Option>, +} + +#[allow(unused)] +fn into_recordbatch(input: HashMap) -> RecordBatch { + let mut schema = Vec::new(); + let mut columns = Vec::new(); + for (name, v) in input { + schema.push(ColumnSchema::new(name, v.data_type(), false)); + columns.push(v); + } + let schema = Arc::new(Schema::new(schema)); + + RecordBatch::new(schema, columns).unwrap() +} + +#[tokio::test] +#[allow(clippy::print_stdout)] +async fn integrated_py_copr_test() { + let testcases = generate_copr_intgrate_tests(); + let script_engine = sample_script_engine(); + for (idx, case) in testcases.into_iter().enumerate() { + println!("Testcase {idx}:\n script: {}", case.script); + let script = case.script; + let script = script_engine + .compile(&script, CompileContext::default()) + .await + .unwrap(); + let output = script + .execute(HashMap::default(), EvalContext::default()) + .await + .unwrap(); + let res = match output { + Output::Stream(s) => common_recordbatch::util::collect_batches(s).await.unwrap(), + Output::RecordBatches(rbs) => rbs, + _ => unreachable!(), + }; + let rb = res.iter().next().expect("One and only one recordbatch"); + if let Some(expect_result) = case.expect { + let mut actual_result = HashMap::new(); + for col_sch in rb.schema.column_schemas() { + let col = rb.column_by_name(&col_sch.name).unwrap(); + actual_result.insert(col_sch.name.clone(), col.clone()); + } + for (name, col) in expect_result { + let actual_col = actual_result.get(&name).expect("Column with this name"); + if !check_equal(col.clone(), actual_col.clone()) { + panic!("Column {name} doesn't match, expect {col:?}, found {actual_col:?}") + } + } + } + println!(".. Ok"); + } +} + #[test] fn pyo3_rspy_test_in_pairs() { let testcases = sample_test_case(); @@ -76,7 +147,7 @@ fn check_equal(v0: VectorRef, v1: VectorRef) -> bool { } /// will panic if something is wrong, used in tests only -fn eval_rspy(case: TestCase) { +fn eval_rspy(case: CodeBlockTestCase) { let interpreter = init_interpreter(); interpreter.enter(|vm| { let scope = vm.new_scope_with_builtins(); @@ -112,7 +183,7 @@ fn eval_rspy(case: TestCase) { } #[cfg(feature = "pyo3_backend")] -fn eval_pyo3(case: TestCase) { +fn eval_pyo3(case: CodeBlockTestCase) { init_cpython_interpreter(); Python::with_gil(|py| { let locals = { diff --git a/src/script/src/python/ffi_types/pair_tests/sample_testcases.rs b/src/script/src/python/ffi_types/pair_tests/sample_testcases.rs index 4a46d23adabf..6f219f21ac0a 100644 --- a/src/script/src/python/ffi_types/pair_tests/sample_testcases.rs +++ b/src/script/src/python/ffi_types/pair_tests/sample_testcases.rs @@ -17,10 +17,10 @@ use std::f64::consts; use std::sync::Arc; use datatypes::prelude::ScalarVector; -use datatypes::vectors::{BooleanVector, Float64Vector, Int64Vector, VectorRef}; - -use crate::python::ffi_types::pair_tests::TestCase; +use datatypes::vectors::{BooleanVector, Float64Vector, Int32Vector, Int64Vector, VectorRef}; +use super::CoprTestCase; +use crate::python::ffi_types::pair_tests::CodeBlockTestCase; macro_rules! vector { ($ty: ident, $slice: expr) => { Arc::new($ty::from_slice($slice)) as VectorRef @@ -34,12 +34,110 @@ macro_rules! ronish { ]) }; } +pub(super) fn generate_copr_intgrate_tests() -> Vec { + vec![ + CoprTestCase { + script: r#" +@copr(args=["number", "number"], + returns=["value"], + sql="select number from numbers limit 5", backend="rspy") +def add_vecs(n1, n2) -> vector[i32]: + return n1 + n2 +"# + .to_string(), + expect: Some(ronish!("value": vector!(Int32Vector, [0, 2, 4, 6, 8]))), + }, + #[cfg(feature = "pyo3_backend")] + CoprTestCase { + script: r#" +@copr(args=["number", "number"], + returns=["value"], + sql="select number from numbers limit 5", backend="pyo3") +def add_vecs(n1, n2) -> vector[i32]: + return n1 + n2 +"# + .to_string(), + expect: Some(ronish!("value": vector!(Int32Vector, [0, 2, 4, 6, 8]))), + }, + CoprTestCase { + script: r#" +@copr(returns=["value"]) +def answer() -> vector[i64]: + from greptime import vector + return vector([42, 43, 44]) +"# + .to_string(), + expect: Some(ronish!("value": vector!(Int64Vector, [42, 43, 44]))), + }, + #[cfg(feature = "pyo3_backend")] + CoprTestCase { + script: r#" +@copr(returns=["value"], backend="pyo3") +def answer() -> vector[i64]: + from greptime import vector + return vector([42, 43, 44]) +"# + .to_string(), + expect: Some(ronish!("value": vector!(Int64Vector, [42, 43, 44]))), + }, + #[cfg(feature = "pyo3_backend")] + CoprTestCase { + script: r#" +@copr(returns=["value"], backend="pyo3") +def answer() -> vector[i64]: + from greptime import vector + return vector.from_pyarrow(vector([42, 43, 44]).to_pyarrow()) +"# + .to_string(), + expect: Some(ronish!("value": vector!(Int64Vector, [42, 43, 44]))), + }, + #[cfg(feature = "pyo3_backend")] + CoprTestCase { + script: r#" +@copr(returns=["value"], backend="pyo3") +def answer() -> vector[i64]: + from greptime import vector + import pyarrow as pa + return vector.from_pyarrow(pa.array([42, 43, 44])) +"# + .to_string(), + expect: Some(ronish!("value": vector!(Int64Vector, [42, 43, 44]))), + }, + CoprTestCase { + script: r#" +@copr(args=[], returns = ["number"], sql = "select * from numbers", backend="rspy") +def answer() -> vector[i64]: + from greptime import vector, col, lit + expr_0 = (col("number")0) + ret = dataframe.select([col("number")]).filter(expr_0).collect()[0][0] + return ret +"# + .to_string(), + expect: Some(ronish!("number": vector!(Int64Vector, [1, 2]))), + }, + #[cfg(feature = "pyo3_backend")] + CoprTestCase { + script: r#" +@copr(args=[], returns = ["number"], sql = "select * from numbers", backend="pyo3") +def answer() -> vector[i64]: + from greptime import vector, col, lit + # Bitwise Operator pred comparison operator + expr_0 = (col("number")0) + ret = dataframe.select([col("number")]).filter(expr_0).collect()[0][0] + return ret +"# + .to_string(), + expect: Some(ronish!("number": vector!(Int64Vector, [1, 2]))), + }, + ] +} +/// Generate tests for basic vector operations and basic builtin functions /// Using a function to generate testcase instead of `.ron` configure file because it's more flexible and we are in #[cfg(test)] so no binary bloat worrying #[allow(clippy::approx_constant)] -pub(super) fn sample_test_case() -> Vec { +pub(super) fn sample_test_case() -> Vec { vec![ - TestCase { + CodeBlockTestCase { input: ronish! { "a": vector!(Float64Vector, [1.0f64, 2.0, 3.0]) }, @@ -53,7 +151,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [1.0f64, 2.0, 3.0]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "a": vector!(Float64Vector, [1.0f64, 2.0, 3.0]), "b": vector!(Float64Vector, [3.0f64, 2.0, 1.0]) @@ -65,7 +163,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [4.0f64, 4.0, 4.0]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "a": vector!(Float64Vector, [1.0f64, 2.0, 3.0]), "b": vector!(Float64Vector, [3.0f64, 2.0, 1.0]) @@ -77,7 +175,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [-2.0f64, 0.0, 2.0]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "a": vector!(Float64Vector, [1.0f64, 2.0, 3.0]), "b": vector!(Float64Vector, [3.0f64, 2.0, 1.0]) @@ -89,7 +187,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [3.0f64, 4.0, 3.0]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "a": vector!(Float64Vector, [1.0f64, 2.0, 3.0]), "b": vector!(Float64Vector, [3.0f64, 2.0, 1.0]) @@ -101,7 +199,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [1. / 3., 1.0, 3.0]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [1.0f64, 2.0, 3.0]) }, @@ -115,7 +213,7 @@ ret"# [1.0f64, std::f64::consts::SQRT_2, 1.7320508075688772,] ), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [1.0, 2.0, 3.0]) }, @@ -129,7 +227,7 @@ ret"# [0.8414709848078965, 0.9092974268256817, 0.1411200080598672,] ), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [1.0, 2.0, 3.0]) }, @@ -143,7 +241,7 @@ ret"# [0.5403023058681398, -0.4161468365471424, -0.9899924966004454,] ), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [1.0, 2.0, 3.0]) }, @@ -157,7 +255,7 @@ ret"# [1.5574077246549023, -2.185039863261519, -0.1425465430742778,] ), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [0.3, 0.5, 1.0]) }, @@ -171,7 +269,7 @@ ret"# [0.3046926540153975, 0.5235987755982989, 1.5707963267948966,] ), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [0.3, 0.5, 1.0]) }, @@ -185,7 +283,7 @@ ret"# [1.2661036727794992, 1.0471975511965979, 0.0,] ), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [0.3, 0.5, 1.1]) }, @@ -199,7 +297,7 @@ ret"# [0.2914567944778671, 0.4636476090008061, 0.8329812666744317,] ), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [0.3, 0.5, 1.1]) }, @@ -210,7 +308,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [0.0, 0.0, 1.0,]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [0.3, 0.5, 1.1]) }, @@ -221,7 +319,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [1.0, 1.0, 2.0,]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [0.3, 0.5, 1.1]) }, @@ -232,7 +330,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [0.0, 1.0, 1.0,]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [0.3, 0.5, 1.1]) }, @@ -243,7 +341,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [0.0, 0.0, 1.0,]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [-0.3, 0.5, -1.1]) }, @@ -254,7 +352,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [0.3, 0.5, 1.1,]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [-0.3, 0.5, -1.1]) }, @@ -265,7 +363,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [-1.0, 1.0, -1.0,]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [0., 1.0, 2.0]) }, @@ -276,7 +374,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [1.0, consts::E, 7.38905609893065,]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [1.0, 2.0, 3.0]) }, @@ -287,7 +385,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [0.0, consts::LN_2, 1.0986122886681098,]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [1.0, 2.0, 3.0]) }, @@ -298,7 +396,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [0.0, 1.0, 1.584962500721156,]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [1.0, 2.0, 3.0]) }, @@ -309,7 +407,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [0.0, consts::LOG10_2, 0.47712125471966244,]), }, - TestCase { + CodeBlockTestCase { input: ronish! {}, script: r#" from greptime import * @@ -318,7 +416,7 @@ ret"# .to_string(), expect: vector!(BooleanVector, &[true, true, true]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Int64Vector, [1, 2, 2, 3]) }, @@ -329,7 +427,7 @@ ret"# .to_string(), expect: vector!(Int64Vector, [3]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Int64Vector, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) }, @@ -340,7 +438,7 @@ ret"# .to_string(), expect: vector!(Int64Vector, [6]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [1.0, 2.0, 3.0]) }, @@ -351,7 +449,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [1.0, 2.0, 3.0]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [1.0, 2.0, 3.0]) }, @@ -362,7 +460,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [2.0]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "a": vector!(Float64Vector, [1.0, 2.0, 3.0]), "b": vector!(Float64Vector, [1.0, 0.0, -1.0]) @@ -374,7 +472,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [-1.0]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Int64Vector, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), }, @@ -385,7 +483,7 @@ ret"# .to_string(), expect: vector!(Int64Vector, [10]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "a": vector!(Float64Vector, [1.0, 2.0, 3.0]), "b": vector!(Float64Vector, [1.0, 0.0, -1.0]) @@ -397,7 +495,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [-1.0]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "a": vector!(Float64Vector, [1.0, 2.0, 3.0]), "b": vector!(Float64Vector, [1.0, 0.0, -1.0]) @@ -409,7 +507,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [-0.6666666666666666]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "a": vector!(Float64Vector, [1.0, 2.0, 3.0]), }, @@ -420,7 +518,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [3.0]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "a": vector!(Float64Vector, [1.0, 2.0, 3.0]), }, @@ -431,7 +529,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [1.0]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [1., 2., 3., 4., 5., 6., 7., 8., 9., 10.]), }, @@ -442,7 +540,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [3.0276503540974917]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [1., 2., 3., 4., 5., 6., 7., 8., 9., 10.]), }, @@ -453,7 +551,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [2.8722813232690143]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [1., 2., 3., 4., 5., 6., 7., 8., 9., 10.]), }, @@ -464,7 +562,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [55.0]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [1., 2., 3., 4., 5., 6., 7., 8., 9., 10.]), }, @@ -475,7 +573,7 @@ ret"# .to_string(), expect: vector!(Float64Vector, [9.166666666666666]), }, - TestCase { + CodeBlockTestCase { input: ronish! { "values": vector!(Float64Vector, [1., 2., 3., 4., 5., 6., 7., 8., 9., 10.]), }, diff --git a/src/script/src/python/ffi_types/vector/tests.rs b/src/script/src/python/ffi_types/vector/tests.rs index e43380e04ffe..0a08c6e4219e 100644 --- a/src/script/src/python/ffi_types/vector/tests.rs +++ b/src/script/src/python/ffi_types/vector/tests.rs @@ -21,7 +21,7 @@ use std::collections::HashMap; use std::sync::Arc; use datatypes::scalars::ScalarVector; -use datatypes::vectors::{BooleanVector, Float64Vector, VectorRef}; +use datatypes::vectors::{BooleanVector, Float64Vector, Int64Vector, VectorRef}; #[cfg(feature = "pyo3_backend")] use pyo3::{types::PyDict, Python}; use rustpython_compiler::Mode; @@ -59,11 +59,13 @@ fn sample_py_vector() -> HashMap { let b2 = Arc::new(BooleanVector::from_slice(&[false, true, false, true])) as VectorRef; let f1 = Arc::new(Float64Vector::from_slice([0.0f64, 2.0, 10.0, 42.0])) as VectorRef; let f2 = Arc::new(Float64Vector::from_slice([-0.1f64, -42.0, 2., 7.0])) as VectorRef; + let f3 = Arc::new(Float64Vector::from_slice([1.0f64, -42.0, 2., 7.0])) as VectorRef; HashMap::from([ ("b1".to_owned(), b1), ("b2".to_owned(), b2), ("f1".to_owned(), f1), ("f2".to_owned(), f2), + ("f3".to_owned(), f3), ]) } @@ -105,6 +107,27 @@ fn get_test_cases() -> Vec { 42. / 7., ])) as VectorRef, }, + TestCase { + eval: "f2.__rtruediv__(f1)".to_string(), + result: Arc::new(Float64Vector::from_slice([ + 0.0 / -0.1f64, + 2. / -42., + 10. / 2., + 42. / 7., + ])) as VectorRef, + }, + TestCase { + eval: "f2.__floordiv__(f3)".to_string(), + result: Arc::new(Int64Vector::from_slice([0, 1, 1, 1])) as VectorRef, + }, + TestCase { + eval: "f3.__rfloordiv__(f2)".to_string(), + result: Arc::new(Int64Vector::from_slice([0, 1, 1, 1])) as VectorRef, + }, + TestCase { + eval: "f3.filter(b1)".to_string(), + result: Arc::new(Float64Vector::from_slice([2.0, 7.0])) as VectorRef, + }, ]; Vec::from(testcases) } diff --git a/src/script/src/python/pyo3/builtins.rs b/src/script/src/python/pyo3/builtins.rs index 89d71a5c41f7..e0921ba84eb3 100644 --- a/src/script/src/python/pyo3/builtins.rs +++ b/src/script/src/python/pyo3/builtins.rs @@ -22,16 +22,32 @@ use datafusion_physical_expr::{math_expressions, AggregateExpr}; use datatypes::vectors::VectorRef; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; -use pyo3::types::PyList; use super::utils::scalar_value_to_py_any; use crate::python::ffi_types::utils::all_to_f64; use crate::python::ffi_types::PyVector; -use crate::python::pyo3::dataframe_impl::col; +use crate::python::pyo3::dataframe_impl::{col, lit}; use crate::python::pyo3::utils::{ columnar_value_to_py_any, try_into_columnar_value, val_to_py_any, }; +/// Try to extract a `PyVector` or convert from a `pyarrow.array` object +#[inline] +fn try_into_py_vector(py: Python, obj: PyObject) -> PyResult { + if let Ok(v) = obj.extract::(py) { + Ok(v) + } else { + PyVector::from_pyarrow(obj.as_ref(py).get_type(), py, obj.clone()) + } +} + +#[inline] +fn to_array_of_py_vec(py: Python, obj: &[&PyObject]) -> PyResult> { + obj.iter() + .map(|v| try_into_py_vector(py, v.to_object(py))) + .collect::>() +} + macro_rules! batch_import { ($m: ident, [$($fn_name: ident),*]) => { $($m.add_function(wrap_pyfunction!($fn_name, $m)?)?;)* @@ -41,11 +57,12 @@ macro_rules! batch_import { #[pymodule] #[pyo3(name = "greptime")] pub(crate) fn greptime_builtins(_py: Python<'_>, m: &PyModule) -> PyResult<()> { + m.add_class::()?; batch_import!( m, [ + lit, col, - vector, pow, clip, diff, @@ -95,7 +112,8 @@ pub(crate) fn greptime_builtins(_py: Python<'_>, m: &PyModule) -> PyResult<()> { Ok(()) } -fn eval_func(py: Python<'_>, name: &str, v: &[&PyVector]) -> PyResult { +fn eval_func(py: Python<'_>, name: &str, v: &[&PyObject]) -> PyResult { + let v = to_array_of_py_vec(py, v)?; py.allow_threads(|| { let v: Vec = v.iter().map(|v| v.as_vector_ref()).collect(); let func: Option = FUNCTION_REGISTRY.get_function(name); @@ -153,7 +171,7 @@ fn eval_aggr_func(py: Python<'_>, name: &str, args: &[&PyVector]) -> PyResult( +fn eval_df_aggr_expr( py: Python<'_>, aggr: T, values: &[ArrayRef], @@ -191,8 +209,8 @@ macro_rules! bind_call_unary_math_function { macro_rules! simple_vector_fn { ($name: ident, $name_str: tt, [$($arg:ident),*]) => { #[pyfunction] - fn $name(py: Python<'_>, $($arg: &PyVector),*) -> PyResult { - eval_func(py, $name_str, &[$($arg),*]) + fn $name(py: Python<'_>, $($arg: PyObject),*) -> PyResult { + eval_func(py, $name_str, &[$(&$arg),*]) } }; ($name: ident, $name_str: tt, AGG[$($arg:ident),*]) => { @@ -203,11 +221,6 @@ macro_rules! simple_vector_fn { }; } -#[pyfunction] -fn vector(iterable: &PyList) -> PyResult { - PyVector::py_new(iterable) -} - // TODO(discord9): More Aggr functions& allow threads simple_vector_fn!(pow, "pow", [v0, v1]); simple_vector_fn!(clip, "clip", [v0, v1, v2]); @@ -255,7 +268,7 @@ macro_rules! bind_aggr_expr { fn $FUNC_NAME(py: Python<'_>, $($ARG: &PyVector),*)->PyResult{ // just a place holder, we just want the inner `XXXAccumulator`'s function // so its expr is irrelevant - return eval_aggr_expr( + return eval_df_aggr_expr( py, expressions::$AGGR_FUNC::new( $( @@ -273,7 +286,7 @@ macro_rules! bind_aggr_expr { expand into: ``` fn approx_distinct(py: Python<'_>, v0: &PyVector) -> PyResult { - return eval_aggr_expr( + return eval_df_aggr_expr( py, expressions::ApproxDistinct::new( Arc::new(expressions::Column::new("expr0", 0)) as _, @@ -293,7 +306,7 @@ bind_aggr_expr!(median, Median,[v0], v0, expr0=>0); #[pyfunction] fn approx_percentile_cont(py: Python<'_>, values: &PyVector, percent: f64) -> PyResult { let percent = expressions::Literal::new(datafusion_common::ScalarValue::Float64(Some(percent))); - return eval_aggr_expr( + return eval_df_aggr_expr( py, expressions::ApproxPercentileCont::new( vec![ diff --git a/src/script/src/python/pyo3/copr_impl.rs b/src/script/src/python/pyo3/copr_impl.rs index 135c86496279..456c8f4eaf88 100644 --- a/src/script/src/python/pyo3/copr_impl.rs +++ b/src/script/src/python/pyo3/copr_impl.rs @@ -63,17 +63,11 @@ pub(crate) fn pyo3_exec_parsed( rb: &Option, params: &HashMap, ) -> Result { - let arg_names = if let Some(names) = &copr.deco_args.arg_names { - names - } else { - return OtherSnafu { - reason: "PyO3 Backend doesn't support params yet".to_string(), - } - .fail(); - }; + // i.e params or use `vector(..)` to construct a PyVector + let arg_names = &copr.deco_args.arg_names.clone().unwrap_or(vec![]); let args: Vec = if let Some(rb) = rb { let args = select_from_rb(rb, arg_names)?; - check_args_anno_real_type(&args, copr, rb)?; + check_args_anno_real_type(arg_names, &args, copr, rb)?; args } else { Vec::new() @@ -83,12 +77,16 @@ pub(crate) fn pyo3_exec_parsed( Python::with_gil(|py| -> Result<_> { let mut cols = (|| -> PyResult<_> { let dummy_decorator = " +# Postponed evaluation of annotations(PEP 563) so annotation can be set freely +# This is needed for Python < 3.9 +from __future__ import annotations # A dummy decorator, actual implementation is in Rust code def copr(*dummy, **kwdummy): def inner(func): return func return inner coprocessor = copr +from greptime import vector "; let gen_call = format!("\n_return_from_coprocessor = {}(*_args_for_coprocessor, **_kwargs_for_coprocessor)", copr.name); let script = format!("{}{}{}", dummy_decorator, copr.script, gen_call); @@ -221,10 +219,10 @@ mod copr_test { @copr(args=["cpu", "mem"], returns=["ref"], backend="pyo3") def a(cpu, mem, **kwargs): import greptime as gt - from greptime import vector, log2, sum, pow, col + from greptime import vector, log2, sum, pow, col, lit for k, v in kwargs.items(): print("%s == %s" % (k, v)) - print(dataframe.select([col("cpu")]).collect()) + print(dataframe.select([col("cpu")= 0.75) "#; let cpu_array = Float32Vector::from_slice([0.9f32, 0.8, 0.7, 0.3]); diff --git a/src/script/src/python/pyo3/dataframe_impl.rs b/src/script/src/python/pyo3/dataframe_impl.rs index 172798525736..d07faf06dbbe 100644 --- a/src/script/src/python/pyo3/dataframe_impl.rs +++ b/src/script/src/python/pyo3/dataframe_impl.rs @@ -23,6 +23,7 @@ use snafu::ResultExt; use crate::python::error::DataFusionSnafu; use crate::python::ffi_types::PyVector; +use crate::python::pyo3::utils::pyo3_obj_try_to_typed_scalar_value; use crate::python::utils::block_on_async; type PyExprRef = Py; #[pyclass] @@ -223,6 +224,15 @@ impl PyDataFrame { } } +/// Convert a Python Object into a `Expr` for use in constructing literal i.e. `col("number") < lit(42)` +#[pyfunction] +pub(crate) fn lit(py: Python<'_>, value: PyObject) -> PyResult { + let value = pyo3_obj_try_to_typed_scalar_value(value.as_ref(py), None)?; + let expr: PyExpr = DfExpr::Literal(value).into(); + Ok(expr) +} + +#[derive(Clone)] #[pyclass] pub(crate) struct PyExpr { inner: DfExpr, @@ -242,7 +252,8 @@ pub(crate) fn col(name: String) -> PyExpr { #[pymethods] impl PyExpr { - fn __richcmp__(&self, other: &Self, op: CompareOp) -> PyResult { + fn __richcmp__(&self, py: Python<'_>, other: PyObject, op: CompareOp) -> PyResult { + let other = other.extract::(py).or_else(|_| lit(py, other))?; let op = match op { CompareOp::Lt => DfExpr::lt, CompareOp::Le => DfExpr::lt_eq, @@ -251,20 +262,18 @@ impl PyExpr { CompareOp::Gt => DfExpr::gt, CompareOp::Ge => DfExpr::gt_eq, }; - Ok(op(self.inner.clone(), other.inner.clone()).into()) + py.allow_threads(|| Ok(op(self.inner.clone(), other.inner.clone()).into())) } fn alias(&self, name: String) -> PyResult { Ok(self.inner.clone().alias(name).into()) } fn __and__(&self, py: Python<'_>, other: PyExprRef) -> PyResult { - Ok(self - .inner - .clone() - .and(other.borrow(py).inner.clone()) - .into()) + let other = other.borrow(py).inner.clone(); + py.allow_threads(|| Ok(self.inner.clone().and(other).into())) } fn __or__(&self, py: Python<'_>, other: PyExprRef) -> PyResult { - Ok(self.inner.clone().or(other.borrow(py).inner.clone()).into()) + let other = other.borrow(py).inner.clone(); + py.allow_threads(|| Ok(self.inner.clone().or(other).into())) } fn __invert__(&self) -> PyResult { Ok(self.inner.clone().not().into()) @@ -272,4 +281,7 @@ impl PyExpr { fn sort(&self, asc: bool, nulls_first: bool) -> PyExpr { self.inner.clone().sort(asc, nulls_first).into() } + fn __repr__(&self) -> String { + format!("{:#?}", &self.inner) + } } diff --git a/src/script/src/python/pyo3/utils.rs b/src/script/src/python/pyo3/utils.rs index c943f5be4d03..7b700bd06662 100644 --- a/src/script/src/python/pyo3/utils.rs +++ b/src/script/src/python/pyo3/utils.rs @@ -14,6 +14,7 @@ use std::sync::Mutex; +use arrow::pyarrow::PyArrowException; use common_telemetry::info; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; @@ -32,7 +33,9 @@ use crate::python::pyo3::builtins::greptime_builtins; /// prevent race condition of init cpython static START_PYO3: Lazy> = Lazy::new(|| Mutex::new(false)); - +pub(crate) fn to_py_err(err: impl ToString) -> PyErr { + PyArrowException::new_err(err.to_string()) +} pub(crate) fn init_cpython_interpreter() { let mut start = START_PYO3.lock().unwrap(); if !*start { @@ -100,6 +103,15 @@ macro_rules! to_con_type { }; } +/// Convert PyAny to [`ScalarValue`] +pub(crate) fn pyo3_obj_try_to_typed_scalar_value( + obj: &PyAny, + dtype: Option, +) -> PyResult { + let val = pyo3_obj_try_to_typed_val(obj, dtype)?; + val.try_to_scalar_value(&val.data_type()) + .map_err(|e| PyValueError::new_err(e.to_string())) +} /// to int/float/boolean, if dtype is None, then convert to highest prec type pub(crate) fn pyo3_obj_try_to_typed_val( obj: &PyAny, diff --git a/src/script/src/python/pyo3/vector_impl.rs b/src/script/src/python/pyo3/vector_impl.rs index f2fcc3a5a5a5..cd9c69010a89 100644 --- a/src/script/src/python/pyo3/vector_impl.rs +++ b/src/script/src/python/pyo3/vector_impl.rs @@ -12,17 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. +use arrow::array::{make_array, ArrayData}; +use arrow::pyarrow::PyArrowConvert; +use datafusion::arrow::array::BooleanArray; +use datafusion::arrow::compute; use datafusion::arrow::compute::kernels::{arithmetic, comparison}; use datatypes::arrow::array::{Array, ArrayRef}; use datatypes::arrow::datatypes::DataType as ArrowDataType; use datatypes::prelude::{ConcreteDataType, DataType}; -use pyo3::exceptions::{PyNotImplementedError, PyValueError}; +use datatypes::vectors::Helper; +use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::pyclass::CompareOp; -use pyo3::types::{PyBool, PyFloat, PyInt, PyList, PyString}; +use pyo3::types::{PyBool, PyFloat, PyInt, PyList, PyString, PyType}; -use crate::python::ffi_types::vector::{wrap_bool_result, wrap_result, PyVector}; -use crate::python::pyo3::utils::pyo3_obj_try_to_typed_val; +use crate::python::ffi_types::vector::{arrow_rtruediv, wrap_bool_result, wrap_result, PyVector}; +use crate::python::pyo3::utils::{pyo3_obj_try_to_typed_val, to_py_err}; macro_rules! get_con_type { ($obj:ident, $($pyty:ident => $con_ty:ident),*$(,)?) => { @@ -179,15 +184,47 @@ impl PyVector { } #[allow(unused)] fn __rtruediv__(&self, py: Python<'_>, other: PyObject) -> PyResult { - Err(PyNotImplementedError::new_err(())) + if pyo3_is_obj_scalar(other.as_ref(py)) { + self.pyo3_scalar_arith_op(py, other, Some(ArrowDataType::Float64), arrow_rtruediv) + } else { + self.pyo3_vector_arith_op( + py, + other, + Some(ArrowDataType::Float64), + wrap_result(|a, b| arithmetic::divide_dyn(b, a)), + ) + } } #[allow(unused)] fn __floordiv__(&self, py: Python<'_>, other: PyObject) -> PyResult { - Err(PyNotImplementedError::new_err(())) + if pyo3_is_obj_scalar(other.as_ref(py)) { + self.pyo3_scalar_arith_op( + py, + other, + Some(ArrowDataType::Int64), + wrap_result(arithmetic::divide_dyn), + ) + } else { + self.pyo3_vector_arith_op( + py, + other, + Some(ArrowDataType::Int64), + wrap_result(arithmetic::divide_dyn), + ) + } } #[allow(unused)] fn __rfloordiv__(&self, py: Python<'_>, other: PyObject) -> PyResult { - Err(PyNotImplementedError::new_err(())) + if pyo3_is_obj_scalar(other.as_ref(py)) { + self.pyo3_scalar_arith_op(py, other, Some(ArrowDataType::Int64), arrow_rtruediv) + } else { + self.pyo3_vector_arith_op( + py, + other, + Some(ArrowDataType::Int64), + wrap_result(|a, b| arithmetic::divide_dyn(b, a)), + ) + } } fn __and__(&self, other: &Self) -> PyResult { Self::vector_and(self, other).map_err(PyValueError::new_err) @@ -198,6 +235,29 @@ impl PyVector { fn __invert__(&self) -> PyResult { Self::vector_invert(self).map_err(PyValueError::new_err) } + /// take a boolean array and filters the Array, returning elements matching the filter (i.e. where the values are true). + #[pyo3(name = "filter")] + fn pyo3_filter(&self, py: Python<'_>, other: &Self) -> PyResult { + py.allow_threads(|| { + let left = self.to_arrow_array(); + let right = other.to_arrow_array(); + if let Some(filter) = right.as_any().downcast_ref::() { + let res = compute::filter(left.as_ref(), filter); + let res = + res.map_err(|err| PyValueError::new_err(format!("Arrow Error: {err:#?}")))?; + let ret = Helper::try_into_vector(res.clone()).map_err(|e| { + PyValueError::new_err(format!( + "Can't cast result into vector, result: {res:?}, err: {e:?}", + )) + })?; + Ok(ret.into()) + } else { + Err(PyValueError::new_err(format!( + "Can't cast operand into a Boolean Array, which is {right:#?}" + ))) + } + }) + } fn __len__(&self) -> usize { self.len() } @@ -207,6 +267,17 @@ impl PyVector { fn __repr__(&self) -> PyResult { Ok(format!("{self:#?}")) } + /// Convert to `pyarrow` 's array + pub(crate) fn to_pyarrow(&self, py: Python) -> PyResult { + self.to_arrow_array().data().to_pyarrow(py) + } + /// Convert from `pyarrow`'s array + #[classmethod] + pub(crate) fn from_pyarrow(_cls: &PyType, py: Python, obj: PyObject) -> PyResult { + let array = make_array(ArrayData::from_pyarrow(obj.as_ref(py))?); + let v = Helper::try_into_vector(array).map_err(to_py_err)?; + Ok(v.into()) + } } #[cfg(test)] @@ -236,10 +307,10 @@ mod test { let b: PyVector = (Arc::new(b) as VectorRef).into(); locals.insert("bv2".to_string(), b); - let f = Float64Vector::from_slice(&[0.0f64, 1.0, 42.0, 3.0]); + let f = Float64Vector::from_slice([0.0f64, 1.0, 42.0, 3.0]); let f: PyVector = (Arc::new(f) as VectorRef).into(); locals.insert("fv1".to_string(), f); - let f = Float64Vector::from_slice(&[1919.810f64, 0.114, 51.4, 3.0]); + let f = Float64Vector::from_slice([1919.810f64, 0.114, 51.4, 3.0]); let f: PyVector = (Arc::new(f) as VectorRef).into(); locals.insert("fv2".to_string(), f); locals diff --git a/src/script/src/python/rspython/builtins.rs b/src/script/src/python/rspython/builtins.rs index b46faad29d37..bc09b9f2d187 100644 --- a/src/script/src/python/rspython/builtins.rs +++ b/src/script/src/python/rspython/builtins.rs @@ -291,7 +291,7 @@ pub(crate) mod greptime_builtin { use common_function::scalars::{Function, FunctionRef, FUNCTION_REGISTRY}; use datafusion::arrow::datatypes::DataType as ArrowDataType; use datafusion::physical_plan::expressions; - use datafusion_expr::ColumnarValue as DFColValue; + use datafusion_expr::{ColumnarValue as DFColValue, Expr as DfExpr}; use datafusion_physical_expr::math_expressions; use datatypes::arrow::array::{ArrayRef, Int64Array, NullArray}; use datatypes::arrow::error::ArrowError; @@ -308,13 +308,32 @@ pub(crate) mod greptime_builtin { }; use crate::python::ffi_types::vector::val_to_pyobj; use crate::python::ffi_types::PyVector; - use crate::python::rspython::utils::{is_instance, py_vec_obj_to_array, PyVectorRef}; + use crate::python::rspython::dataframe_impl::data_frame::{PyExpr, PyExprRef}; + use crate::python::rspython::utils::{ + is_instance, py_obj_to_value, py_obj_to_vec, PyVectorRef, + }; #[pyfunction] fn vector(args: OptionalArg, vm: &VirtualMachine) -> PyResult { PyVector::new(args, vm) } + #[pyfunction] + fn col(name: String, vm: &VirtualMachine) -> PyExprRef { + let expr: PyExpr = DfExpr::Column(datafusion_common::Column::from_name(name)).into(); + expr.into_ref(vm) + } + + #[pyfunction] + pub(crate) fn lit(obj: PyObjectRef, vm: &VirtualMachine) -> PyResult { + let val = py_obj_to_value(&obj, vm)?; + let scalar_val = val + .try_to_scalar_value(&val.data_type()) + .map_err(|e| vm.new_runtime_error(format!("{e}")))?; + let expr: PyExpr = DfExpr::Literal(scalar_val).into(); + Ok(expr.into_ref(vm)) + } + // the main binding code, due to proc macro things, can't directly use a simpler macro // because pyfunction is not a attr? // ------ @@ -966,7 +985,7 @@ pub(crate) mod greptime_builtin { let args = FuncArgs::new(vec![v.into_pyobject(vm)], KwArgs::default()); let ret = func.invoke(args, vm); match ret{ - Ok(obj) => match py_vec_obj_to_array(&obj, vm, 1){ + Ok(obj) => match py_obj_to_vec(&obj, vm, 1){ Ok(v) => if v.len()==1{ Ok(v) }else{ diff --git a/src/script/src/python/rspython/copr_impl.rs b/src/script/src/python/rspython/copr_impl.rs index 996ac588b5bd..8c02c8e027bd 100644 --- a/src/script/src/python/rspython/copr_impl.rs +++ b/src/script/src/python/rspython/copr_impl.rs @@ -33,7 +33,7 @@ use crate::python::ffi_types::{check_args_anno_real_type, select_from_rb, Coproc use crate::python::rspython::builtins::init_greptime_builtins; use crate::python::rspython::dataframe_impl::data_frame::set_dataframe_in_scope; use crate::python::rspython::dataframe_impl::init_data_frame; -use crate::python::rspython::utils::{format_py_error, is_instance, py_vec_obj_to_array}; +use crate::python::rspython::utils::{format_py_error, is_instance, py_obj_to_vec}; thread_local!(static INTERPRETER: RefCell>> = RefCell::new(None)); @@ -45,8 +45,9 @@ pub(crate) fn rspy_exec_parsed( ) -> Result { // 3. get args from `rb`, and cast them into PyVector let args: Vec = if let Some(rb) = rb { - let args = select_from_rb(rb, copr.deco_args.arg_names.as_ref().unwrap_or(&vec![]))?; - check_args_anno_real_type(&args, copr, rb)?; + let arg_names = copr.deco_args.arg_names.clone().unwrap_or(vec![]); + let args = select_from_rb(rb, &arg_names)?; + check_args_anno_real_type(&arg_names, &args, copr, rb)?; args } else { vec![] @@ -158,7 +159,7 @@ pub(crate) fn exec_with_cached_vm( } /// convert a tuple of `PyVector` or one `PyVector`(wrapped in a Python Object Ref[`PyObjectRef`]) -/// to a `Vec` +/// to a `Vec` /// by default, a constant(int/float/bool) gives the a constant array of same length with input args fn try_into_columns( obj: &PyObjectRef, @@ -171,11 +172,11 @@ fn try_into_columns( .with_context(|| ret_other_error_with(format!("can't cast obj {obj:?} to PyTuple)")))?; let cols = tuple .iter() - .map(|obj| py_vec_obj_to_array(obj, vm, col_len)) + .map(|obj| py_obj_to_vec(obj, vm, col_len)) .collect::>>()?; Ok(cols) } else { - let col = py_vec_obj_to_array(obj, vm, col_len)?; + let col = py_obj_to_vec(obj, vm, col_len)?; Ok(vec![col]) } } diff --git a/src/script/src/python/rspython/dataframe_impl.rs b/src/script/src/python/rspython/dataframe_impl.rs index 98c295337a86..3564c2cd2f63 100644 --- a/src/script/src/python/rspython/dataframe_impl.rs +++ b/src/script/src/python/rspython/dataframe_impl.rs @@ -35,6 +35,7 @@ pub(crate) mod data_frame { use crate::python::error::DataFusionSnafu; use crate::python::ffi_types::PyVector; + use crate::python::rspython::builtins::greptime_builtin::lit; use crate::python::utils::block_on_async; #[rspyclass(module = "data_frame", name = "DataFrame")] #[derive(PyPayload, Debug)] @@ -251,21 +252,15 @@ pub(crate) mod data_frame { } } - #[rspyclass(module = "data_frame", name = "Expr")] + #[rspyclass(module = "data_frame", name = "PyExpr")] #[derive(PyPayload, Debug, Clone)] pub struct PyExpr { pub inner: DfExpr, } - #[pyfunction] - fn col(name: String, vm: &VirtualMachine) -> PyExprRef { - let expr: PyExpr = DfExpr::Column(datafusion_common::Column::from_name(name)).into(); - expr.into_ref(vm) - } - // TODO(discord9): lit function that take PyObject and turn it into ScalarValue - type PyExprRef = PyRef; + pub(crate) type PyExprRef = PyRef; impl From for PyExpr { fn from(value: DfExpr) -> Self { @@ -280,10 +275,8 @@ pub(crate) mod data_frame { op: PyComparisonOp, vm: &VirtualMachine, ) -> PyResult> { - if let (Some(zelf), Some(other)) = - (zelf.downcast_ref::(), other.downcast_ref::()) - { - let ret = zelf.richcompare((**other).clone(), op, vm)?; + if let Some(zelf) = zelf.downcast_ref::() { + let ret = zelf.richcompare(other.to_owned(), op, vm)?; let ret = ret.into_pyobject(vm); Ok(rustpython_vm::function::Either::A(ret)) } else { @@ -307,10 +300,15 @@ pub(crate) mod data_frame { impl PyExpr { fn richcompare( &self, - other: Self, + other: PyObjectRef, op: PyComparisonOp, - _vm: &VirtualMachine, + vm: &VirtualMachine, ) -> PyResult { + let other = if let Some(other) = other.downcast_ref::() { + other.to_owned() + } else { + lit(other, vm)? + }; let f = match op { PyComparisonOp::Eq => DfExpr::eq, PyComparisonOp::Ne => DfExpr::not_eq, @@ -319,7 +317,7 @@ pub(crate) mod data_frame { PyComparisonOp::Ge => DfExpr::gt_eq, PyComparisonOp::Le => DfExpr::lt_eq, }; - Ok(f(self.inner.clone(), other.inner).into()) + Ok(f(self.inner.clone(), other.inner.clone()).into()) } #[pymethod] fn alias(&self, name: String) -> PyResult { diff --git a/src/script/src/python/rspython/testcases.ron b/src/script/src/python/rspython/testcases.ron index 7c601d2e23df..74a8c42d73a8 100644 --- a/src/script/src/python/rspython/testcases.ron +++ b/src/script/src/python/rspython/testcases.ron @@ -559,7 +559,7 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], // constant column(int) name: "test_data_frame", code: r#" -from data_frame import col +from greptime import col @copr(args=["cpu", "mem"], returns=["perf", "what"]) def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], vector[f32]): @@ -593,7 +593,7 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], // constant column(int) name: "test_data_frame", code: r#" -from data_frame import col +from greptime import col @copr(args=["cpu", "mem"], returns=["perf", "what"]) def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None], vector[f32]): diff --git a/src/script/src/python/rspython/utils.rs b/src/script/src/python/rspython/utils.rs index af5dfd6178ae..7482067f25d4 100644 --- a/src/script/src/python/rspython/utils.rs +++ b/src/script/src/python/rspython/utils.rs @@ -17,12 +17,12 @@ use std::sync::Arc; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue as DFColValue; use datatypes::prelude::ScalarVector; +use datatypes::value::Value; use datatypes::vectors::{ BooleanVector, Float64Vector, Helper, Int64Vector, NullVector, StringVector, VectorRef, }; -use futures::Future; use rustpython_vm::builtins::{PyBaseExceptionRef, PyBool, PyFloat, PyInt, PyList, PyStr}; -use rustpython_vm::{PyObjectRef, PyPayload, PyRef, VirtualMachine}; +use rustpython_vm::{PyObjectRef, PyPayload, PyRef, PyResult, VirtualMachine}; use snafu::{Backtrace, GenerateImplicitData, OptionExt, ResultExt}; use crate::python::error; @@ -53,8 +53,32 @@ pub fn format_py_error(excep: PyBaseExceptionRef, vm: &VirtualMachine) -> error: } } +pub(crate) fn py_obj_to_value(obj: &PyObjectRef, vm: &VirtualMachine) -> PyResult { + macro_rules! obj2val { + ($OBJ: ident, $($PY_TYPE: ident => $RS_TYPE: ident => $VARIANT: ident),*) => { + $( + if is_instance::<$PY_TYPE>($OBJ, vm) { + let val = $OBJ + .to_owned() + .try_into_value::<$RS_TYPE>(vm)?; + Ok(Value::$VARIANT(val.into())) + } + )else* + else { + Err(vm.new_runtime_error(format!("can't convert obj {obj:?} to Value"))) + } + }; + } + obj2val!(obj, + PyBool => bool => Boolean, + PyInt => i64 => Int64, + PyFloat => f64 => Float64, + PyStr => String => String + ) +} + /// convert a single PyVector or a number(a constant)(wrapping in PyObjectRef) into a Array(or a constant array) -pub fn py_vec_obj_to_array( +pub fn py_obj_to_vec( obj: &PyObjectRef, vm: &VirtualMachine, col_len: usize, @@ -115,17 +139,3 @@ pub fn py_vec_obj_to_array( ret_other_error_with(format!("Expect a vector or a constant, found {obj:?}")).fail() } } - -/// a terrible hack to call async from sync by: -/// TODO(discord9): find a better way -/// 1. spawn a new thread -/// 2. create a new runtime in new thread and call `block_on` on it -#[allow(unused)] -pub fn block_on_async(f: F) -> std::thread::Result -where - F: Future + Send + 'static, - T: Send + 'static, -{ - let rt = tokio::runtime::Runtime::new().map_err(|e| Box::new(e) as _)?; - std::thread::spawn(move || rt.block_on(f)).join() -} diff --git a/src/script/src/python/utils.rs b/src/script/src/python/utils.rs index 025d8e65c34e..4ef69d3c0e0f 100644 --- a/src/script/src/python/utils.rs +++ b/src/script/src/python/utils.rs @@ -13,9 +13,11 @@ // limitations under the License. use futures::Future; +use once_cell::sync::OnceCell; use rustpython_vm::builtins::PyBaseExceptionRef; use rustpython_vm::{PyObjectRef, PyPayload, VirtualMachine}; use snafu::{Backtrace, GenerateImplicitData}; +use tokio::runtime::Runtime; use crate::python::error; @@ -39,7 +41,12 @@ pub fn format_py_error(excep: PyBaseExceptionRef, vm: &VirtualMachine) -> error: backtrace: Backtrace::generate(), } } - +static LOCAL_RUNTIME: OnceCell = OnceCell::new(); +fn get_local_runtime() -> std::thread::Result<&'static Runtime> { + let rt = LOCAL_RUNTIME + .get_or_try_init(|| tokio::runtime::Runtime::new().map_err(|e| Box::new(e) as _))?; + Ok(rt) +} /// a terrible hack to call async from sync by: /// TODO(discord9): find a better way /// 1. spawn a new thread @@ -49,6 +56,7 @@ where F: Future + Send + 'static, T: Send + 'static, { - let rt = tokio::runtime::Runtime::new().map_err(|e| Box::new(e) as _)?; + let rt = get_local_runtime()?; + std::thread::spawn(move || rt.block_on(f)).join() } diff --git a/src/servers/tests/py_script/mod.rs b/src/servers/tests/py_script/mod.rs index 14b3b1711687..fcd3317b0f4b 100644 --- a/src/servers/tests/py_script/mod.rs +++ b/src/servers/tests/py_script/mod.rs @@ -30,7 +30,7 @@ async fn test_insert_py_udf_and_query() -> Result<()> { let instance = create_testing_instance(table); let src = r#" @coprocessor(args=["uint32s"], returns = ["ret"]) -def double_that(col)->vector[u32]: +def double_that(col) -> vector[u32]: return col*2 "#; instance diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index ad6b498dc9a9..a8c7b938e99a 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -341,7 +341,7 @@ pub async fn test_scripts_api(store_type: StorageType) { .body( r#" @copr(sql='select number from numbers limit 10', args=['number'], returns=['n']) -def test(n): +def test(n) -> vector[f64]: return n + 1; "#, )