Skip to content

Commit

Permalink
Implementation of ConnectionLike over Executor
Browse files Browse the repository at this point in the history
  • Loading branch information
Flowneee committed Jul 30, 2023
1 parent bda44e1 commit 95f5067
Show file tree
Hide file tree
Showing 27 changed files with 159 additions and 173 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased
### Added
- `Executor` trait, which sends encoded request.
- `Executor` trait, which sends encoded request;
- `.stream()`, `.transaction()` and `.transaction_builder()` methods moved to `Executor` trait;
- `Request` struct renamed to `EncodedRequest`;
- `RequestBody` trait renamed to `Request`.

### Changed
- `ConnectionLike` now `Send` and `Sync`.
Expand Down
1 change: 1 addition & 0 deletions examples/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use tarantool_rs::Executor;
use tarantool_rs::{Connection, ConnectionLike, Value};

#[tokio::main]
Expand Down
1 change: 1 addition & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
imports_granularity = "Crate"
35 changes: 10 additions & 25 deletions src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,18 @@ use std::{
};

use async_trait::async_trait;
use futures::{future::BoxFuture, Future, TryFutureExt};
use futures::{future::BoxFuture, TryFutureExt};
use rmpv::Value;
use tracing::debug;

use super::{connection_like::ConnectionLike, Executor, Stream, Transaction, TransactionBuilder};
use super::{Executor, Stream, Transaction, TransactionBuilder};
use crate::{
builder::ConnectionBuilder,
codec::{
consts::TransactionIsolationLevel,
request::{Id, Request, RequestBody},
request::{EncodedRequest, Id, Request},
response::ResponseBody,
},
errors::Error,
transport::DispatcherSender,
Result,
};
Expand Down Expand Up @@ -65,26 +64,22 @@ impl Connection {

pub(crate) fn encode_and_send_request(
&self,
body: impl RequestBody,
body: impl Request,
stream_id: Option<u32>,
) -> BoxFuture<Result<Value>> {
let req = Request::new(body, stream_id);
Box::pin(async move { self.send_request(req?).await })
let req = EncodedRequest::new(body, stream_id);
Box::pin(async move { self.send_encoded_request(req?).await })
}

/// Synchronously send request to channel and drop response.
#[allow(clippy::let_underscore_future)]
pub(crate) fn send_request_sync_and_forget(
&self,
body: impl RequestBody,
stream_id: Option<u32>,
) {
pub(crate) fn send_request_sync_and_forget(&self, body: impl Request, stream_id: Option<u32>) {
let this = self.clone();
let req = Request::new(body, stream_id);
let req = EncodedRequest::new(body, stream_id);
let _ = self.inner.async_rt_handle.spawn(async move {
let res = futures::future::ready(req)
.err_into()
.and_then(|x| this.send_request(x))
.and_then(|x| this.send_encoded_request(x))
.await;
debug!("Response for background request: {:?}", res);
});
Expand Down Expand Up @@ -127,23 +122,13 @@ impl Connection {

#[async_trait]
impl Executor for Connection {
async fn send_request(&self, request: Request) -> Result<Value> {
async fn send_encoded_request(&self, request: EncodedRequest) -> Result<Value> {
let resp = self.inner.dispatcher_sender.send(request).await?;
match resp.body {
ResponseBody::Ok(x) => Ok(x),
ResponseBody::Error(x) => Err(x.into()),
}
}
}

#[async_trait]
impl ConnectionLike for Connection {
fn send_any_request<R>(&self, body: R) -> BoxFuture<Result<Value>>
where
R: RequestBody,
{
self.encode_and_send_request(body, None)
}

fn stream(&self) -> Stream {
self.stream()
Expand Down
72 changes: 22 additions & 50 deletions src/client/connection_like.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::borrow::Cow;

use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::{future::BoxFuture, FutureExt};
use rmpv::Value;
use serde::de::DeserializeOwned;

use super::{Executor, Stream, Transaction, TransactionBuilder};
use super::Executor;
use crate::{
codec::{
request::{Call, Delete, Eval, Insert, Ping, Replace, RequestBody, Select, Update, Upsert},
request::{
Call, Delete, EncodedRequest, Eval, Insert, Ping, Replace, Request, Select, Update,
Upsert,
},
utils::deserialize_non_sql_response,
},
errors::Error,
IteratorType, Result,
};

Expand All @@ -21,30 +23,13 @@ pub trait ConnectionLike: Executor {
///
/// It is not recommended to use this method directly, since some requests
/// should be only sent in specific situations and might break connection.
#[deprecated = "This method will be removed in future"]
fn send_any_request<R>(&self, body: R) -> BoxFuture<Result<Value>>
fn send_request<R>(&self, body: R) -> BoxFuture<Result<Value>>
where
R: RequestBody;

/// Get new [`Stream`].
///
/// It is safe to create `Stream` from any type, implementing current trait.
fn stream(&self) -> Stream;

/// Prepare [`TransactionBuilder`], which can be used to override parameters and create
/// [`Transaction`].
///
/// It is safe to create `TransactionBuilder` from any type.
fn transaction_builder(&self) -> TransactionBuilder;

/// Create [`Transaction`] with parameters from builder.
///
/// It is safe to create `Transaction` from any type, implementing current trait.
async fn transaction(&self) -> Result<Transaction>;
R: Request;

/// Send PING request ([docs](https://www.tarantool.io/en/doc/latest/dev_guide/internals/box_protocol/#iproto-ping-0x40)).
async fn ping(&self) -> Result<()> {
self.send_any_request(Ping {}).await.map(drop)
self.send_request(Ping {}).await.map(drop)
}

// TODO: docs
Expand All @@ -53,7 +38,7 @@ pub trait ConnectionLike: Executor {
I: Into<Cow<'static, str>> + Send,
T: DeserializeOwned,
{
let body = self.send_any_request(Eval::new(expr, args)).await?;
let body = self.send_request(Eval::new(expr, args)).await?;
deserialize_non_sql_response(body).map_err(Into::into)
}

Expand All @@ -63,9 +48,7 @@ pub trait ConnectionLike: Executor {
I: Into<Cow<'static, str>> + Send,
T: DeserializeOwned,
{
let body = self
.send_any_request(Call::new(function_name, args))
.await?;
let body = self.send_request(Call::new(function_name, args)).await?;
deserialize_non_sql_response(body).map_err(Into::into)
}

Expand All @@ -83,7 +66,7 @@ pub trait ConnectionLike: Executor {
T: DeserializeOwned,
{
let body = self
.send_any_request(Select::new(
.send_request(Select::new(
space_id, index_id, limit, offset, iterator, keys,
))
.await?;
Expand All @@ -93,7 +76,7 @@ pub trait ConnectionLike: Executor {
// TODO: docs
// TODO: decode response
async fn insert(&self, space_id: u32, tuple: Vec<Value>) -> Result<()> {
let _ = self.send_any_request(Insert::new(space_id, tuple)).await?;
let _ = self.send_request(Insert::new(space_id, tuple)).await?;
Ok(())
}

Expand All @@ -108,7 +91,7 @@ pub trait ConnectionLike: Executor {
tuple: Vec<Value>,
) -> Result<()> {
let _ = self
.send_any_request(Update::new(space_id, index_id, index_base, keys, tuple))
.send_request(Update::new(space_id, index_id, index_base, keys, tuple))
.await?;
Ok(())
}
Expand All @@ -124,46 +107,35 @@ pub trait ConnectionLike: Executor {
tuple: Vec<Value>,
) -> Result<()> {
let _ = self
.send_any_request(Upsert::new(space_id, index_base, ops, tuple))
.send_request(Upsert::new(space_id, index_base, ops, tuple))
.await?;
Ok(())
}

// TODO: structured tuple
// TODO: decode response
async fn replace(&self, space_id: u32, keys: Vec<Value>) -> Result<()> {
let _ = self.send_any_request(Replace::new(space_id, keys)).await?;
let _ = self.send_request(Replace::new(space_id, keys)).await?;
Ok(())
}

// TODO: structured tuple
// TODO: decode response
async fn delete(&self, space_id: u32, index_id: u32, keys: Vec<Value>) -> Result<()> {
let _ = self
.send_any_request(Delete::new(space_id, index_id, keys))
.send_request(Delete::new(space_id, index_id, keys))
.await?;
Ok(())
}
}

#[async_trait]
impl<C: ConnectionLike + super::private::Sealed + Sync> ConnectionLike for &C {
fn send_any_request<R>(&self, body: R) -> BoxFuture<Result<Value>>
impl<E: Executor + ?Sized> ConnectionLike for E {
fn send_request<R>(&self, body: R) -> BoxFuture<Result<Value>>
where
R: RequestBody,
R: Request,
{
(*self).send_any_request(body)
}

fn stream(&self) -> Stream {
(*self).stream()
}

fn transaction_builder(&self) -> TransactionBuilder {
(*self).transaction_builder()
}

async fn transaction(&self) -> Result<Transaction> {
(*self).transaction().await
let req = EncodedRequest::new(body, None);
async move { (*self).send_encoded_request(req?).await }.boxed()
}
}
76 changes: 62 additions & 14 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,92 @@ use async_trait::async_trait;
use rmpv::Value;

use super::private::Sealed;
use crate::{codec::request::Request, Result};
use crate::{codec::request::EncodedRequest, Result, Stream, Transaction, TransactionBuilder};

// TODO: docs
#[async_trait]
pub trait Executor: Sealed + Send + Sync {
async fn send_request(&self, request: Request) -> Result<Value>;
/// Send encoded request.
async fn send_encoded_request(&self, request: EncodedRequest) -> Result<Value>;

/// Get new [`Stream`].
///
/// It is safe to create `Stream` from any type, implementing current trait.
fn stream(&self) -> Stream;

/// Prepare [`TransactionBuilder`], which can be used to override parameters and create
/// [`Transaction`].
///
/// It is safe to create `TransactionBuilder` from any type.
fn transaction_builder(&self) -> TransactionBuilder;

/// Create [`Transaction`] with parameters from builder.
///
/// It is safe to create `Transaction` from any type, implementing current trait.
async fn transaction(&self) -> Result<Transaction>;
}

#[async_trait]
impl<E: Executor + Sealed + Sync> Executor for &E {
async fn send_request(&self, request: Request) -> Result<Value> {
(&*self).send_request(request).await
async fn send_encoded_request(&self, request: EncodedRequest) -> Result<Value> {
(**self).send_encoded_request(request).await
}

fn stream(&self) -> Stream {
(**self).stream()
}

fn transaction_builder(&self) -> TransactionBuilder {
(**self).transaction_builder()
}

async fn transaction(&self) -> Result<Transaction> {
(**self).transaction().await
}
}

#[async_trait]
impl<E: Executor + Sealed + Sync> Executor for &mut E {
async fn send_request(&self, request: Request) -> Result<Value> {
(&*self).send_request(request).await
async fn send_encoded_request(&self, request: EncodedRequest) -> Result<Value> {
(**self).send_encoded_request(request).await
}

fn stream(&self) -> Stream {
(**self).stream()
}

fn transaction_builder(&self) -> TransactionBuilder {
(**self).transaction_builder()
}

async fn transaction(&self) -> Result<Transaction> {
(**self).transaction().await
}
}

#[cfg(test)]
mod ui {
use super::*;
use crate::ConnectionLike;

#[test]
fn executor_trait_object_safety() {
fn f(executor: impl Executor) {
fn _f(executor: impl Executor) {
let _: Box<dyn Executor> = Box::new(executor);
}
}

// TODO: uncomment or remove
// #[test]
// fn calling_conn_like_on_dyn_executor() {
// async fn f(conn: &dyn Executor) -> Result<Value> {
// conn.ping().await
// }
// }
#[test]
fn calling_conn_like_on_dyn_executor() {
async fn _f(conn: &dyn Executor) -> Result<()> {
conn.ping().await
}
}

#[test]
fn calling_conn_like_on_boxed_dyn_executor() {
async fn _f(conn: &Box<dyn Executor>) -> Result<()> {
conn.ping().await
}
}
}
4 changes: 2 additions & 2 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ mod private {
impl Sealed for Connection {}
impl Sealed for Stream {}
impl Sealed for Transaction {}
impl<S: Sealed> Sealed for &S {}
impl<S: Sealed> Sealed for &mut S {}
impl<S: Sealed + ?Sized> Sealed for &S {}
impl<S: Sealed + ?Sized> Sealed for &mut S {}
}
Loading

0 comments on commit 95f5067

Please sign in to comment.