Skip to content

Commit

Permalink
TupleResponse wrapper for eval and call
Browse files Browse the repository at this point in the history
  • Loading branch information
Flowneee committed Aug 20, 2023
1 parent ee57274 commit c385cdb
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 75 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [Unreleased] - XXXX-XX-XX
### Added
- `TupleResponse` type for decoding `eval` and `call` responses.


## [0.0.5] - 2023-08-05
### Added
- `into_space` method to `ExecutorExt` trait, wich return `Space` with underlying `Executor`;
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "tarantool-rs"
description = "Asyncronous tokio-based client for Tarantool"
version = "0.0.5"
version = "0.0.6"
edition = "2021"
authors = ["Andrey Kononov [email protected]"]
license = "MIT"
Expand Down
19 changes: 12 additions & 7 deletions examples/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,28 @@ async fn main() -> Result<(), anyhow::Error> {

let conn = Connection::builder().build("127.0.0.1:3301").await?;

let _: Value = conn
let _ = conn
.eval("function f(arg) return 42, nil end; return", ())
.await?;

let resp: Value = conn.call("f", ()).await?;
println!("{:?}", resp);
// Drop call result
let _ = conn.call("f", ()).await?;

let resp: (Value, Value) = conn.call("f", (false,)).await?;
// Decode returned tuple entirely
let resp: (u64, Value) = conn.call("f", ()).await?.decode_full()?;
println!("{:?}", resp);

let resp: Vec<Value> = conn.call("f", ()).await?;
// Decode first element
let resp: u64 = conn.call("f", ()).await?.decode_first()?;
println!("{:?}", resp);

let resp: (u64, Option<String>) = conn.call("f", ()).await?;
// Decode returned tuple as result. Since second element is null,
// decode to Ok(u64)
let resp: u64 = conn.call("f", ()).await?.decode_result()?;
println!("{:?}", resp);

let resp: Response = conn.call("f", ()).await?;
// Decode returned tuple entirely into type
let resp: Response = conn.call("f", ()).await?.decode_full()?;
println!("{:?}", resp);

Ok(())
Expand Down
6 changes: 5 additions & 1 deletion examples/cli_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ async fn main() -> std::result::Result<(), anyhow::Error> {

async fn process_input(conn: &Connection, line: String) {
let query = format!("return ({})", line);
match conn.eval::<serde_json::Value, _, _>(query, ()).await {
match conn
.eval::<_, _>(query, ())
.await
.and_then(|resp| Ok(resp.decode_full()?))
{
Ok(x) => println!(
"Result: {}",
serde_json::to_string(&x).expect("All MessagePack values should be valid for JSON")
Expand Down
8 changes: 4 additions & 4 deletions examples/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use tarantool_rs::{Connection, Executor, ExecutorExt, Value};
use tarantool_rs::{Connection, Executor, ExecutorExt};

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
Expand All @@ -8,15 +8,15 @@ async fn main() -> Result<(), anyhow::Error> {
connection.clone().ping().await?;

let tx = connection.transaction().await?;
let _: Value = tx.eval("box.space.clients:insert{2}", ()).await?;
let _ = tx.eval("box.space.clients:insert{2}", ()).await?;
tx.rollback().await?;

let tx = connection.transaction().await?;
let _: Value = tx.eval("box.space.clients:insert{3}", ()).await?;
let _ = tx.eval("box.space.clients:insert{3}", ()).await?;
drop(tx);

let tx = connection.transaction().await?;
let _: Value = tx.eval("box.space.clients:insert{4}", ()).await?;
let _ = tx.eval("box.space.clients:insert{4}", ()).await?;
tx.commit().await?;

let _: Vec<u32> = connection
Expand Down
18 changes: 9 additions & 9 deletions src/client/executor_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
schema::{SchemaEntityKey, Space},
tuple::Tuple,
utils::extract_and_deserialize_iproto_data,
IteratorType, Result,
IteratorType, Result, TupleResponse,
};

/// Helper trait around [`Executor`] trait, which allows to send specific requests
Expand All @@ -38,27 +38,27 @@ pub trait ExecutorExt: Executor {
/// Evaluate Lua expression.
///
/// Check [docs][crate#deserializing-lua-responses-in-call-and-eval] on how to deserialize response.
async fn eval<T, A, I>(&self, expr: I, args: A) -> Result<T>
async fn eval<A, I>(&self, expr: I, args: A) -> Result<TupleResponse>
where
T: DeserializeOwned,
A: Tuple + Send,
I: Into<Cow<'static, str>> + Send,
{
let body = self.send_request(Eval::new(expr, args)).await?;
extract_and_deserialize_iproto_data(body).map_err(Into::into)
Ok(TupleResponse(
self.send_request(Eval::new(expr, args)).await?,
))
}

/// Remotely call function in Tarantool.
///
/// Check [docs][crate#deserializing-lua-responses-in-call-and-eval] on how to deserialize response.
async fn call<T, A, I>(&self, function_name: I, args: A) -> Result<T>
async fn call<A, I>(&self, function_name: I, args: A) -> Result<TupleResponse>
where
T: DeserializeOwned,
A: Tuple + Send,
I: Into<Cow<'static, str>> + Send,
{
let body = self.send_request(Call::new(function_name, args)).await?;
extract_and_deserialize_iproto_data(body).map_err(Into::into)
Ok(TupleResponse(
self.send_request(Call::new(function_name, args)).await?,
))
}

/// Select tuples from space.
Expand Down
2 changes: 2 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub use self::{
executor_ext::ExecutorExt,
stream::Stream,
transaction::{Transaction, TransactionBuilder},
tuple_response::TupleResponse,
};

pub mod schema;
Expand All @@ -13,6 +14,7 @@ mod executor;
mod executor_ext;
mod stream;
mod transaction;
mod tuple_response;

mod private {
use crate::client::{Connection, Stream, Transaction};
Expand Down
8 changes: 4 additions & 4 deletions src/client/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ use crate::{codec::request::EncodedRequest, Executor, Result};
///
/// // This will print 'fast' and then 'slow'
/// let eval_slow_fut = connection
/// .eval::<Value, _, _>("fiber = require('fiber'); fiber.sleep(0.5); return ...;", ("slow", ))
/// .eval("fiber = require('fiber'); fiber.sleep(0.5); return ...;", ("slow", ))
/// .inspect(|res| println!("{:?}", res));
/// let eval_fast_fut = connection
/// .eval::<Value, _, _>("return ...;", ("fast", ))
/// .eval("return ...;", ("fast", ))
/// .inspect(|res| println!("{:?}", res));
/// let _ = tokio::join!(eval_slow_fut, eval_fast_fut);
///
/// // This will print 'slow' and then 'fast', since slow request was created first and have smaller sync
/// let stream = connection.stream();
/// let eval_slow_fut = stream
/// .eval::<Value, _, _>("fiber = require('fiber'); fiber.sleep(0.5); return ...;", ("slow", ))
/// .eval("fiber = require('fiber'); fiber.sleep(0.5); return ...;", ("slow", ))
/// .inspect(|res| println!("{:?}", res));
/// let eval_fast_fut = stream
/// .eval::<Value, _, _>("return ...;", ("fast", ))
/// .eval("return ...;", ("fast", ))
/// .inspect(|res| println!("{:?}", res));
/// let _ = tokio::join!(eval_slow_fut, eval_fast_fut);
/// # }
Expand Down
165 changes: 165 additions & 0 deletions src/client/tuple_response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use rmpv::Value;
use serde::de::DeserializeOwned;

use crate::{errors::DecodingError, utils::extract_iproto_data, Error};

/// Tuple, returned from `call` and `eval` requests.
#[derive(Clone, Debug, PartialEq)]
pub struct TupleResponse(pub(crate) rmpv::Value);

impl TupleResponse {
/// Decode first element of the tuple, dropping everything else.
///
/// This is useful if function doesn't return an error.
pub fn decode_first<T>(self) -> Result<T, DecodingError>
where
T: DeserializeOwned,
{
let first = self
.into_data_tuple()?
.into_iter()
.next()
.ok_or_else(|| DecodingError::invalid_tuple_length(1, 0))?;
Ok(rmpv::ext::from_value(first)?)
}

/// Decode first 2 elements of the tuple, dropping everything else.
pub fn decode_two<T1, T2>(self) -> Result<(T1, T2), DecodingError>
where
T1: DeserializeOwned,
T2: DeserializeOwned,
{
let mut tuple_iter = self.into_data_tuple()?.into_iter();
if tuple_iter.len() < 2 {
return Err(DecodingError::invalid_tuple_length(2, tuple_iter.len()));
}
// SAFETY: this should be safe since we just checked tuple length
let first = tuple_iter
.next()
.expect("tuple_iter should have length >= 2");
let second = tuple_iter
.next()
.expect("tuple_iter should have length >= 2");
Ok((
rmpv::ext::from_value(first)?,
rmpv::ext::from_value(second)?,
))
}

/// Decode first two elements of the tuple into result, where
/// either first element deserialized into `T` and returned as `Ok(T)`
/// or second element returned as `Err(Error::CallEval)`.
///
/// If second element is `nil` or not present, first element will be returned,
/// otherwise second element will be returned as error.
pub fn decode_result<T>(self) -> Result<T, Error>
where
T: DeserializeOwned,
{
let mut tuple_iter = self.into_data_tuple()?.into_iter();
let first = tuple_iter
.next()
.ok_or_else(|| DecodingError::invalid_tuple_length(1, 0))?;
let second = tuple_iter.next();
match second {
Some(Value::Nil) | None => {
Ok(rmpv::ext::from_value(first).map_err(DecodingError::from)?)
}
Some(err) => Err(Error::CallEval(err)),
}
}

/// Decode entire response into type.
///
/// Note that currently every response would be a tuple, so be careful what type
/// you are specifying.
pub fn decode_full<T>(self) -> Result<T, DecodingError>
where
T: DeserializeOwned,
{
Ok(rmpv::ext::from_value(extract_iproto_data(self.0)?)?)
}

fn into_data_tuple(self) -> Result<Vec<Value>, DecodingError> {
match extract_iproto_data(self.0)? {
Value::Array(x) => Ok(x),
rest => Err(DecodingError::type_mismatch("array", rest.to_string())),
}
}
}

#[cfg(test)]
mod tests {
use assert_matches::assert_matches;

use crate::codec::consts::keys::DATA;

use super::*;

fn build_tuple_response(data: Vec<Value>) -> Value {
Value::Map(vec![(DATA.into(), Value::Array(data))])
}

#[test]
fn decode_first() {
let resp = build_tuple_response(vec![Value::Boolean(true)]);
assert_matches!(TupleResponse(resp).decode_first(), Ok(true));
}

#[test]
fn decode_first_err_len() {
let resp = build_tuple_response(vec![]);
assert_matches!(TupleResponse(resp).decode_first::<()>(), Err(_));
}

#[test]
fn decode_first_err_wrong_type() {
let resp = build_tuple_response(vec![Value::Boolean(true)]);
assert_matches!(TupleResponse(resp).decode_first::<String>(), Err(_));
}

#[test]
fn decode_two() {
let resp = build_tuple_response(vec![Value::Boolean(true), Value::Boolean(false)]);
assert_matches!(TupleResponse(resp).decode_two(), Ok((true, false)));
}

#[test]
fn decode_two_err_len() {
let resp = build_tuple_response(vec![]);
assert_matches!(TupleResponse(resp).decode_two::<(), ()>(), Err(_));

let resp = build_tuple_response(vec![Value::Boolean(true)]);
assert_matches!(TupleResponse(resp).decode_two::<(), ()>(), Err(_));
}

#[test]
fn decode_result_ok() {
let resp = build_tuple_response(vec![Value::Boolean(true)]);
assert_matches!(TupleResponse(resp).decode_result(), Ok(true));

let resp = build_tuple_response(vec![Value::Boolean(true), Value::Nil]);
assert_matches!(TupleResponse(resp).decode_result(), Ok(true));
}

#[test]
fn decode_result_err_present() {
let resp = build_tuple_response(vec![Value::Boolean(true), Value::Boolean(false)]);
assert_matches!(
TupleResponse(resp).decode_result::<bool>(),
Err(Error::CallEval(Value::Boolean(false)))
);
}

#[test]
fn decode_result_err_wrong_type() {
let resp = build_tuple_response(vec![Value::Boolean(true), Value::Nil]);
assert_matches!(TupleResponse(resp).decode_result::<String>(), Err(_));
}

#[test]
fn decode_full() {
let resp = build_tuple_response(vec![Value::Boolean(true), Value::Boolean(false)]);
assert_matches!(TupleResponse(resp).decode_full(), Ok((true, Some(false))));
}
}
11 changes: 11 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use rmp::{
decode::{MarkerReadError, NumValueReadError, ValueReadError},
encode::{RmpWriteErr, ValueWriteError},
};
use rmpv::Value;
use tokio::time::error::Elapsed;

/// Error returned by Tarantool in response to a request.
Expand Down Expand Up @@ -34,6 +35,9 @@ pub enum Error {
/// Error, returned in response from Tarantool instance.
#[error("Error response: {0}")]
Response(#[from] ErrorResponse),
/// Error, returned in response on `call` or `eval`.
#[error("Call or eval error: {0}")]
CallEval(Value),

/// Timeout.
#[error("Timeout")]
Expand Down Expand Up @@ -174,6 +178,10 @@ impl DecodingError {
DecodingErrorDetails::UnknownResponseCode(code).into()
}

pub(crate) fn invalid_tuple_length(expected: usize, actual: usize) -> Self {
DecodingErrorDetails::InvalidTupleLength { expected, actual }.into()
}

pub(crate) fn with_location(mut self, location: DecodingErrorLocation) -> Self {
self.location = Some(location);
self
Expand Down Expand Up @@ -218,6 +226,9 @@ pub enum DecodingErrorDetails {
expected: &'static str,
actual: Cow<'static, str>,
},
/// Tuple have invalid length
#[error("Invalid tuple length {actual}, expected {expected}")]
InvalidTupleLength { expected: usize, actual: usize },

/// Error while deserializing [`rmpv::Value`] into concrete type.
#[error("Failed to deserialize rmpv::Value")]
Expand Down
Loading

0 comments on commit c385cdb

Please sign in to comment.