From 95f506740d0972d51a99152d3d0e979ec57d3606 Mon Sep 17 00:00:00 2001 From: Andrey Kononov Date: Sun, 30 Jul 2023 12:56:46 +0400 Subject: [PATCH] Implementation of ConnectionLike over Executor --- CHANGELOG.md | 5 ++- examples/transactions.rs | 1 + rustfmt.toml | 1 + src/client/connection.rs | 35 +++++----------- src/client/connection_like.rs | 72 ++++++++++----------------------- src/client/executor.rs | 76 ++++++++++++++++++++++++++++------- src/client/mod.rs | 4 +- src/client/stream.rs | 25 ++++-------- src/client/transaction.rs | 27 ++++--------- src/codec/mod.rs | 6 +-- src/codec/request/auth.rs | 4 +- src/codec/request/begin.rs | 4 +- src/codec/request/call.rs | 4 +- src/codec/request/commit.rs | 4 +- src/codec/request/delete.rs | 4 +- src/codec/request/eval.rs | 4 +- src/codec/request/id.rs | 4 +- src/codec/request/insert.rs | 4 +- src/codec/request/mod.rs | 12 +++--- src/codec/request/ping.rs | 4 +- src/codec/request/replace.rs | 4 +- src/codec/request/rollback.rs | 4 +- src/codec/request/select.rs | 4 +- src/codec/request/update.rs | 4 +- src/codec/request/upsert.rs | 4 +- src/transport/connection.rs | 6 +-- src/transport/dispatcher.rs | 6 +-- 27 files changed, 159 insertions(+), 173 deletions(-) create mode 100644 rustfmt.toml diff --git a/CHANGELOG.md b/CHANGELOG.md index 67290e2..ca3669c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased ### Added - - `Executor` trait, which sends encoded request. + - `Executor` trait, which sends encoded request; + - `.stream()`, `.transaction()` and `.transaction_builder()` methods moved to `Executor` trait; + - `Request` struct renamed to `EncodedRequest`; + - `RequestBody` trait renamed to `Request`. ### Changed - `ConnectionLike` now `Send` and `Sync`. diff --git a/examples/transactions.rs b/examples/transactions.rs index 9d6f6cd..e54af80 100644 --- a/examples/transactions.rs +++ b/examples/transactions.rs @@ -1,3 +1,4 @@ +use tarantool_rs::Executor; use tarantool_rs::{Connection, ConnectionLike, Value}; #[tokio::main] diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..c3c8c37 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +imports_granularity = "Crate" diff --git a/src/client/connection.rs b/src/client/connection.rs index a03d740..dd2c78a 100644 --- a/src/client/connection.rs +++ b/src/client/connection.rs @@ -7,19 +7,18 @@ use std::{ }; use async_trait::async_trait; -use futures::{future::BoxFuture, Future, TryFutureExt}; +use futures::{future::BoxFuture, TryFutureExt}; use rmpv::Value; use tracing::debug; -use super::{connection_like::ConnectionLike, Executor, Stream, Transaction, TransactionBuilder}; +use super::{Executor, Stream, Transaction, TransactionBuilder}; use crate::{ builder::ConnectionBuilder, codec::{ consts::TransactionIsolationLevel, - request::{Id, Request, RequestBody}, + request::{EncodedRequest, Id, Request}, response::ResponseBody, }, - errors::Error, transport::DispatcherSender, Result, }; @@ -65,26 +64,22 @@ impl Connection { pub(crate) fn encode_and_send_request( &self, - body: impl RequestBody, + body: impl Request, stream_id: Option, ) -> BoxFuture> { - let req = Request::new(body, stream_id); - Box::pin(async move { self.send_request(req?).await }) + let req = EncodedRequest::new(body, stream_id); + Box::pin(async move { self.send_encoded_request(req?).await }) } /// Synchronously send request to channel and drop response. #[allow(clippy::let_underscore_future)] - pub(crate) fn send_request_sync_and_forget( - &self, - body: impl RequestBody, - stream_id: Option, - ) { + pub(crate) fn send_request_sync_and_forget(&self, body: impl Request, stream_id: Option) { let this = self.clone(); - let req = Request::new(body, stream_id); + let req = EncodedRequest::new(body, stream_id); let _ = self.inner.async_rt_handle.spawn(async move { let res = futures::future::ready(req) .err_into() - .and_then(|x| this.send_request(x)) + .and_then(|x| this.send_encoded_request(x)) .await; debug!("Response for background request: {:?}", res); }); @@ -127,23 +122,13 @@ impl Connection { #[async_trait] impl Executor for Connection { - async fn send_request(&self, request: Request) -> Result { + async fn send_encoded_request(&self, request: EncodedRequest) -> Result { let resp = self.inner.dispatcher_sender.send(request).await?; match resp.body { ResponseBody::Ok(x) => Ok(x), ResponseBody::Error(x) => Err(x.into()), } } -} - -#[async_trait] -impl ConnectionLike for Connection { - fn send_any_request(&self, body: R) -> BoxFuture> - where - R: RequestBody, - { - self.encode_and_send_request(body, None) - } fn stream(&self) -> Stream { self.stream() diff --git a/src/client/connection_like.rs b/src/client/connection_like.rs index 2167b6b..bb9d9c5 100644 --- a/src/client/connection_like.rs +++ b/src/client/connection_like.rs @@ -1,17 +1,19 @@ use std::borrow::Cow; use async_trait::async_trait; -use futures::future::BoxFuture; +use futures::{future::BoxFuture, FutureExt}; use rmpv::Value; use serde::de::DeserializeOwned; -use super::{Executor, Stream, Transaction, TransactionBuilder}; +use super::Executor; use crate::{ codec::{ - request::{Call, Delete, Eval, Insert, Ping, Replace, RequestBody, Select, Update, Upsert}, + request::{ + Call, Delete, EncodedRequest, Eval, Insert, Ping, Replace, Request, Select, Update, + Upsert, + }, utils::deserialize_non_sql_response, }, - errors::Error, IteratorType, Result, }; @@ -21,30 +23,13 @@ pub trait ConnectionLike: Executor { /// /// It is not recommended to use this method directly, since some requests /// should be only sent in specific situations and might break connection. - #[deprecated = "This method will be removed in future"] - fn send_any_request(&self, body: R) -> BoxFuture> + fn send_request(&self, body: R) -> BoxFuture> where - R: RequestBody; - - /// Get new [`Stream`]. - /// - /// It is safe to create `Stream` from any type, implementing current trait. - fn stream(&self) -> Stream; - - /// Prepare [`TransactionBuilder`], which can be used to override parameters and create - /// [`Transaction`]. - /// - /// It is safe to create `TransactionBuilder` from any type. - fn transaction_builder(&self) -> TransactionBuilder; - - /// Create [`Transaction`] with parameters from builder. - /// - /// It is safe to create `Transaction` from any type, implementing current trait. - async fn transaction(&self) -> Result; + R: Request; /// Send PING request ([docs](https://www.tarantool.io/en/doc/latest/dev_guide/internals/box_protocol/#iproto-ping-0x40)). async fn ping(&self) -> Result<()> { - self.send_any_request(Ping {}).await.map(drop) + self.send_request(Ping {}).await.map(drop) } // TODO: docs @@ -53,7 +38,7 @@ pub trait ConnectionLike: Executor { I: Into> + Send, T: DeserializeOwned, { - let body = self.send_any_request(Eval::new(expr, args)).await?; + let body = self.send_request(Eval::new(expr, args)).await?; deserialize_non_sql_response(body).map_err(Into::into) } @@ -63,9 +48,7 @@ pub trait ConnectionLike: Executor { I: Into> + Send, T: DeserializeOwned, { - let body = self - .send_any_request(Call::new(function_name, args)) - .await?; + let body = self.send_request(Call::new(function_name, args)).await?; deserialize_non_sql_response(body).map_err(Into::into) } @@ -83,7 +66,7 @@ pub trait ConnectionLike: Executor { T: DeserializeOwned, { let body = self - .send_any_request(Select::new( + .send_request(Select::new( space_id, index_id, limit, offset, iterator, keys, )) .await?; @@ -93,7 +76,7 @@ pub trait ConnectionLike: Executor { // TODO: docs // TODO: decode response async fn insert(&self, space_id: u32, tuple: Vec) -> Result<()> { - let _ = self.send_any_request(Insert::new(space_id, tuple)).await?; + let _ = self.send_request(Insert::new(space_id, tuple)).await?; Ok(()) } @@ -108,7 +91,7 @@ pub trait ConnectionLike: Executor { tuple: Vec, ) -> Result<()> { let _ = self - .send_any_request(Update::new(space_id, index_id, index_base, keys, tuple)) + .send_request(Update::new(space_id, index_id, index_base, keys, tuple)) .await?; Ok(()) } @@ -124,7 +107,7 @@ pub trait ConnectionLike: Executor { tuple: Vec, ) -> Result<()> { let _ = self - .send_any_request(Upsert::new(space_id, index_base, ops, tuple)) + .send_request(Upsert::new(space_id, index_base, ops, tuple)) .await?; Ok(()) } @@ -132,7 +115,7 @@ pub trait ConnectionLike: Executor { // TODO: structured tuple // TODO: decode response async fn replace(&self, space_id: u32, keys: Vec) -> Result<()> { - let _ = self.send_any_request(Replace::new(space_id, keys)).await?; + let _ = self.send_request(Replace::new(space_id, keys)).await?; Ok(()) } @@ -140,30 +123,19 @@ pub trait ConnectionLike: Executor { // TODO: decode response async fn delete(&self, space_id: u32, index_id: u32, keys: Vec) -> Result<()> { let _ = self - .send_any_request(Delete::new(space_id, index_id, keys)) + .send_request(Delete::new(space_id, index_id, keys)) .await?; Ok(()) } } #[async_trait] -impl ConnectionLike for &C { - fn send_any_request(&self, body: R) -> BoxFuture> +impl ConnectionLike for E { + fn send_request(&self, body: R) -> BoxFuture> where - R: RequestBody, + R: Request, { - (*self).send_any_request(body) - } - - fn stream(&self) -> Stream { - (*self).stream() - } - - fn transaction_builder(&self) -> TransactionBuilder { - (*self).transaction_builder() - } - - async fn transaction(&self) -> Result { - (*self).transaction().await + let req = EncodedRequest::new(body, None); + async move { (*self).send_encoded_request(req?).await }.boxed() } } diff --git a/src/client/executor.rs b/src/client/executor.rs index 8d848b3..af33f2c 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -2,44 +2,92 @@ use async_trait::async_trait; use rmpv::Value; use super::private::Sealed; -use crate::{codec::request::Request, Result}; +use crate::{codec::request::EncodedRequest, Result, Stream, Transaction, TransactionBuilder}; // TODO: docs #[async_trait] pub trait Executor: Sealed + Send + Sync { - async fn send_request(&self, request: Request) -> Result; + /// Send encoded request. + async fn send_encoded_request(&self, request: EncodedRequest) -> Result; + + /// Get new [`Stream`]. + /// + /// It is safe to create `Stream` from any type, implementing current trait. + fn stream(&self) -> Stream; + + /// Prepare [`TransactionBuilder`], which can be used to override parameters and create + /// [`Transaction`]. + /// + /// It is safe to create `TransactionBuilder` from any type. + fn transaction_builder(&self) -> TransactionBuilder; + + /// Create [`Transaction`] with parameters from builder. + /// + /// It is safe to create `Transaction` from any type, implementing current trait. + async fn transaction(&self) -> Result; } #[async_trait] impl Executor for &E { - async fn send_request(&self, request: Request) -> Result { - (&*self).send_request(request).await + async fn send_encoded_request(&self, request: EncodedRequest) -> Result { + (**self).send_encoded_request(request).await + } + + fn stream(&self) -> Stream { + (**self).stream() + } + + fn transaction_builder(&self) -> TransactionBuilder { + (**self).transaction_builder() + } + + async fn transaction(&self) -> Result { + (**self).transaction().await } } #[async_trait] impl Executor for &mut E { - async fn send_request(&self, request: Request) -> Result { - (&*self).send_request(request).await + async fn send_encoded_request(&self, request: EncodedRequest) -> Result { + (**self).send_encoded_request(request).await + } + + fn stream(&self) -> Stream { + (**self).stream() + } + + fn transaction_builder(&self) -> TransactionBuilder { + (**self).transaction_builder() + } + + async fn transaction(&self) -> Result { + (**self).transaction().await } } #[cfg(test)] mod ui { use super::*; + use crate::ConnectionLike; #[test] fn executor_trait_object_safety() { - fn f(executor: impl Executor) { + fn _f(executor: impl Executor) { let _: Box = Box::new(executor); } } - // TODO: uncomment or remove - // #[test] - // fn calling_conn_like_on_dyn_executor() { - // async fn f(conn: &dyn Executor) -> Result { - // conn.ping().await - // } - // } + #[test] + fn calling_conn_like_on_dyn_executor() { + async fn _f(conn: &dyn Executor) -> Result<()> { + conn.ping().await + } + } + + #[test] + fn calling_conn_like_on_boxed_dyn_executor() { + async fn _f(conn: &Box) -> Result<()> { + conn.ping().await + } + } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 07c9949..d0eaa9c 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -23,6 +23,6 @@ mod private { impl Sealed for Connection {} impl Sealed for Stream {} impl Sealed for Transaction {} - impl Sealed for &S {} - impl Sealed for &mut S {} + impl Sealed for &S {} + impl Sealed for &mut S {} } diff --git a/src/client/stream.rs b/src/client/stream.rs index 14ccd50..ac40334 100644 --- a/src/client/stream.rs +++ b/src/client/stream.rs @@ -1,11 +1,10 @@ use async_trait::async_trait; -use futures::future::BoxFuture; + use rmpv::Value; -use super::{Connection, ConnectionLike, Transaction, TransactionBuilder}; +use super::{Connection, Transaction, TransactionBuilder}; use crate::{ - codec::request::{Request, RequestBody}, - errors::Error, + codec::request::{EncodedRequest}, Executor, Result, }; @@ -16,7 +15,7 @@ use crate::{ /// # Example /// /// ```rust,compile -/// use tarantool_rs::{Connection, ConnectionLike}; +/// use tarantool_rs::{Connection, ConnectionLike, Executor}; /// # use futures::FutureExt; /// # use rmpv::Value; /// @@ -60,19 +59,9 @@ impl Stream { #[async_trait] impl Executor for Stream { - async fn send_request(&self, request: Request) -> Result { - self.conn.send_request(request).await - } -} - -#[async_trait] -impl ConnectionLike for Stream { - fn send_any_request(&self, body: R) -> BoxFuture> - where - R: RequestBody, - { - self.conn - .encode_and_send_request(body, Some(self.stream_id)) + async fn send_encoded_request(&self, mut request: EncodedRequest) -> Result { + request.stream_id = Some(self.stream_id); + self.conn.send_encoded_request(request).await } fn stream(&self) -> Stream { diff --git a/src/client/transaction.rs b/src/client/transaction.rs index d9dd298..02d880b 100644 --- a/src/client/transaction.rs +++ b/src/client/transaction.rs @@ -1,7 +1,7 @@ use std::time::Duration; use async_trait::async_trait; -use futures::future::BoxFuture; + use rmpv::Value; use tracing::debug; @@ -9,9 +9,8 @@ use super::{Connection, ConnectionLike, Stream}; use crate::{ codec::{ consts::TransactionIsolationLevel, - request::{Begin, Commit, Request, RequestBody, Rollback}, + request::{Begin, Commit, EncodedRequest, Rollback}, }, - errors::Error, Executor, Result, }; @@ -49,7 +48,7 @@ impl Transaction { timeout_secs: Option, ) -> Result<()> { debug!("Beginning tranasction on stream {}", self.stream_id); - self.send_any_request(Begin::new(timeout_secs, transaction_isolation_level)) + self.send_request(Begin::new(timeout_secs, transaction_isolation_level)) .await .map(drop) } @@ -58,7 +57,7 @@ impl Transaction { pub async fn commit(mut self) -> Result<()> { if !self.finished { debug!("Commiting tranasction on stream {}", self.stream_id); - let _ = self.send_any_request(Commit::default()).await?; + let _ = self.send_request(Commit::default()).await?; self.finished = true; } Ok(()) @@ -68,7 +67,7 @@ impl Transaction { pub async fn rollback(mut self) -> Result<()> { if !self.finished { debug!("Rolling back tranasction on stream {}", self.stream_id); - let _ = self.send_any_request(Rollback::default()).await?; + let _ = self.send_request(Rollback::default()).await?; self.finished = true; } Ok(()) @@ -91,19 +90,9 @@ impl Drop for Transaction { #[async_trait] impl Executor for Transaction { - async fn send_request(&self, request: Request) -> Result { - self.conn.send_request(request).await - } -} - -#[async_trait] -impl ConnectionLike for Transaction { - fn send_any_request(&self, body: R) -> BoxFuture> - where - R: RequestBody, - { - self.conn - .encode_and_send_request(body, Some(self.stream_id)) + async fn send_encoded_request(&self, mut request: EncodedRequest) -> Result { + request.stream_id = Some(self.stream_id); + self.conn.send_encoded_request(request).await } // TODO: do we need to repeat this in all ConnetionLike implementations? diff --git a/src/codec/mod.rs b/src/codec/mod.rs index fc1308f..9bc6349 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -5,7 +5,7 @@ use rmp::Marker; use tokio_util::codec::{Decoder, Encoder}; use tracing::trace; -use self::{request::Request, response::Response}; +use self::{request::EncodedRequest, response::Response}; use crate::{ errors::{CodecDecodeError, CodecEncodeError, DecodingError}, Error, @@ -123,13 +123,13 @@ impl Decoder for ClientCodec { } } -impl Encoder for ClientCodec { +impl Encoder for ClientCodec { type Error = CodecEncodeError; // To omit creating intermediate BytesMut, encode message with 0 as length, // and after encoding calculate size of the encoded messages and overwrite // length field (0) with new data. - fn encode(&mut self, item: Request, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, item: EncodedRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { let begin_idx = dst.len(); // TODO: calculate necessary integer type instead of using u64 always diff --git a/src/codec/request/auth.rs b/src/codec/request/auth.rs index fb41794..abef8dd 100644 --- a/src/codec/request/auth.rs +++ b/src/codec/request/auth.rs @@ -4,7 +4,7 @@ use std::{cmp::min, io::Write}; use sha1::{Digest, Sha1}; -use super::RequestBody; +use super::Request; use crate::{ codec::{ consts::{keys, RequestType}, @@ -28,7 +28,7 @@ impl<'a> Auth<'a> { } } -impl<'a> RequestBody for Auth<'a> { +impl<'a> Request for Auth<'a> { fn request_type() -> RequestType where Self: Sized, diff --git a/src/codec/request/begin.rs b/src/codec/request/begin.rs index b255a8b..611c7bf 100644 --- a/src/codec/request/begin.rs +++ b/src/codec/request/begin.rs @@ -7,7 +7,7 @@ use crate::{ errors::EncodingError, }; -use super::RequestBody; +use super::Request; #[derive(Clone, Debug)] pub(crate) struct Begin { @@ -15,7 +15,7 @@ pub(crate) struct Begin { pub transaction_isolation_level: TransactionIsolationLevel, } -impl RequestBody for Begin { +impl Request for Begin { fn request_type() -> RequestType where Self: Sized, diff --git a/src/codec/request/call.rs b/src/codec/request/call.rs index 08eeb59..5d9a452 100644 --- a/src/codec/request/call.rs +++ b/src/codec/request/call.rs @@ -12,7 +12,7 @@ use crate::{ errors::EncodingError, }; -use super::RequestBody; +use super::Request; #[derive(Clone, Debug)] pub(crate) struct Call { @@ -20,7 +20,7 @@ pub(crate) struct Call { pub tuple: Vec, } -impl RequestBody for Call { +impl Request for Call { fn request_type() -> RequestType where Self: Sized, diff --git a/src/codec/request/commit.rs b/src/codec/request/commit.rs index 9df0812..7cfa26f 100644 --- a/src/codec/request/commit.rs +++ b/src/codec/request/commit.rs @@ -4,12 +4,12 @@ use std::io::Write; use crate::{codec::consts::RequestType, errors::EncodingError}; -use super::RequestBody; +use super::Request; #[derive(Clone, Debug, Default)] pub(crate) struct Commit {} -impl RequestBody for Commit { +impl Request for Commit { fn request_type() -> RequestType where Self: Sized, diff --git a/src/codec/request/delete.rs b/src/codec/request/delete.rs index dcb4c5d..13dd66f 100644 --- a/src/codec/request/delete.rs +++ b/src/codec/request/delete.rs @@ -10,7 +10,7 @@ use crate::{ errors::EncodingError, }; -use super::RequestBody; +use super::Request; #[derive(Clone, Debug)] pub(crate) struct Delete { @@ -29,7 +29,7 @@ impl Delete { } } -impl RequestBody for Delete { +impl Request for Delete { fn request_type() -> RequestType where Self: Sized, diff --git a/src/codec/request/eval.rs b/src/codec/request/eval.rs index c9392be..45fd0bd 100644 --- a/src/codec/request/eval.rs +++ b/src/codec/request/eval.rs @@ -10,7 +10,7 @@ use crate::{ errors::EncodingError, }; -use super::RequestBody; +use super::Request; #[derive(Clone, Debug)] pub(crate) struct Eval { @@ -18,7 +18,7 @@ pub(crate) struct Eval { pub tuple: Vec, } -impl RequestBody for Eval { +impl Request for Eval { fn request_type() -> RequestType where Self: Sized, diff --git a/src/codec/request/id.rs b/src/codec/request/id.rs index 8a223dd..4e1b6ea 100644 --- a/src/codec/request/id.rs +++ b/src/codec/request/id.rs @@ -5,7 +5,7 @@ use crate::{ errors::EncodingError, }; -use super::{RequestBody, PROTOCOL_VERSION}; +use super::{Request, PROTOCOL_VERSION}; #[derive(Clone, Debug)] pub(crate) struct Id { @@ -35,7 +35,7 @@ impl Id { const WATCHERS: u8 = 3; } -impl RequestBody for Id { +impl Request for Id { fn request_type() -> RequestType where Self: Sized, diff --git a/src/codec/request/insert.rs b/src/codec/request/insert.rs index 2d14b88..49c0669 100644 --- a/src/codec/request/insert.rs +++ b/src/codec/request/insert.rs @@ -12,7 +12,7 @@ use crate::{ errors::EncodingError, }; -use super::RequestBody; +use super::Request; #[derive(Clone, Debug)] pub(crate) struct Insert { @@ -26,7 +26,7 @@ impl Insert { } } -impl RequestBody for Insert { +impl Request for Insert { fn request_type() -> RequestType where Self: Sized, diff --git a/src/codec/request/mod.rs b/src/codec/request/mod.rs index 1d64c17..aeb374c 100644 --- a/src/codec/request/mod.rs +++ b/src/codec/request/mod.rs @@ -33,7 +33,7 @@ pub const PROTOCOL_VERSION: u8 = 3; const DEFAULT_ENCODE_BUFFER_SIZE: usize = 128; // TODO: docs -pub trait RequestBody { +pub trait Request { /// Return type of this request. fn request_type() -> RequestType where @@ -49,8 +49,9 @@ pub trait RequestBody { fn encode(&self, buf: &mut dyn Write) -> Result<(), EncodingError>; } +/// Request, encoded into MessagePack, and its meta data. #[doc(hidden)] -pub struct Request { +pub struct EncodedRequest { /// By default `sync` is set to 0 and replaced with /// actual value when reaching [`crate::transport::Connection`]. pub(crate) request_type: RequestType, @@ -60,11 +61,8 @@ pub struct Request { pub(crate) encoded_body: Bytes, } -impl Request { - pub fn new( - body: Body, - stream_id: Option, - ) -> Result { +impl EncodedRequest { + pub fn new(body: Body, stream_id: Option) -> Result { let mut buf = BytesMut::with_capacity(DEFAULT_ENCODE_BUFFER_SIZE).writer(); body.encode(&mut buf)?; Ok(Self { diff --git a/src/codec/request/ping.rs b/src/codec/request/ping.rs index 409f920..f593773 100644 --- a/src/codec/request/ping.rs +++ b/src/codec/request/ping.rs @@ -1,12 +1,12 @@ use std::io::Write; -use super::RequestBody; +use super::Request; use crate::{codec::consts::RequestType, errors::EncodingError}; #[derive(Clone, Debug)] pub(crate) struct Ping {} -impl RequestBody for Ping { +impl Request for Ping { fn request_type() -> RequestType { RequestType::Ping } diff --git a/src/codec/request/replace.rs b/src/codec/request/replace.rs index 9527826..10b960e 100644 --- a/src/codec/request/replace.rs +++ b/src/codec/request/replace.rs @@ -12,7 +12,7 @@ use crate::{ errors::EncodingError, }; -use super::RequestBody; +use super::Request; #[derive(Clone, Debug)] pub(crate) struct Replace { @@ -26,7 +26,7 @@ impl Replace { } } -impl RequestBody for Replace { +impl Request for Replace { fn request_type() -> RequestType where Self: Sized, diff --git a/src/codec/request/rollback.rs b/src/codec/request/rollback.rs index 6142594..0f51ee3 100644 --- a/src/codec/request/rollback.rs +++ b/src/codec/request/rollback.rs @@ -4,12 +4,12 @@ use std::io::Write; use crate::{codec::consts::RequestType, errors::EncodingError}; -use super::RequestBody; +use super::Request; #[derive(Clone, Debug, Default)] pub(crate) struct Rollback {} -impl RequestBody for Rollback { +impl Request for Rollback { fn request_type() -> RequestType where Self: Sized, diff --git a/src/codec/request/select.rs b/src/codec/request/select.rs index 2d2bc8e..bbd257d 100644 --- a/src/codec/request/select.rs +++ b/src/codec/request/select.rs @@ -10,7 +10,7 @@ use crate::{ errors::EncodingError, }; -use super::RequestBody; +use super::Request; #[derive(Clone, Debug)] pub(crate) struct Select { @@ -42,7 +42,7 @@ impl Select { } } -impl RequestBody for Select { +impl Request for Select { fn request_type() -> RequestType where Self: Sized, diff --git a/src/codec/request/update.rs b/src/codec/request/update.rs index e1530dd..d6685c6 100644 --- a/src/codec/request/update.rs +++ b/src/codec/request/update.rs @@ -12,7 +12,7 @@ use crate::{ errors::EncodingError, }; -use super::RequestBody; +use super::Request; #[derive(Clone, Debug)] pub(crate) struct Update { @@ -41,7 +41,7 @@ impl Update { } } -impl RequestBody for Update { +impl Request for Update { fn request_type() -> RequestType where Self: Sized, diff --git a/src/codec/request/upsert.rs b/src/codec/request/upsert.rs index 7c24ad9..e068993 100644 --- a/src/codec/request/upsert.rs +++ b/src/codec/request/upsert.rs @@ -12,7 +12,7 @@ use crate::{ errors::EncodingError, }; -use super::RequestBody; +use super::Request; #[derive(Clone, Debug)] pub(crate) struct Upsert { @@ -33,7 +33,7 @@ impl Upsert { } } -impl RequestBody for Upsert { +impl Request for Upsert { fn request_type() -> RequestType where Self: Sized, diff --git a/src/transport/connection.rs b/src/transport/connection.rs index a836779..c6d6785 100644 --- a/src/transport/connection.rs +++ b/src/transport/connection.rs @@ -17,7 +17,7 @@ use tracing::{debug, trace, warn}; use super::dispatcher::DispatcherResponse; use crate::{ codec::{ - request::{Auth, Request}, + request::{Auth, EncodedRequest}, response::{Response, ResponseBody}, ClientCodec, Greeting, }, @@ -82,7 +82,7 @@ impl Connection { } async fn auth(&mut self, user: &str, password: Option<&str>, salt: &[u8]) -> Result<(), Error> { - let mut request = Request::new(Auth::new(user, password, salt), None).unwrap(); + let mut request = EncodedRequest::new(Auth::new(user, password, salt), None).unwrap(); *request.sync_mut() = self.next_sync(); trace!("Sending auth request"); @@ -102,7 +102,7 @@ impl Connection { pub(super) async fn send_request( &mut self, - mut request: Request, + mut request: EncodedRequest, tx: oneshot::Sender, ) -> Result<(), tokio::io::Error> { let sync = self.next_sync(); diff --git a/src/transport/dispatcher.rs b/src/transport/dispatcher.rs index b634056..be0b1c2 100644 --- a/src/transport/dispatcher.rs +++ b/src/transport/dispatcher.rs @@ -10,20 +10,20 @@ use tracing::{debug, error}; use super::connection::Connection; use crate::{ - codec::{request::Request, response::Response}, + codec::{request::EncodedRequest, response::Response}, Error, ReconnectInterval, }; // Arc here is necessary to send same error to all waiting in-flights pub(crate) type DispatcherResponse = Result; -pub(crate) type DispatcherRequest = (Request, oneshot::Sender); +pub(crate) type DispatcherRequest = (EncodedRequest, oneshot::Sender); pub(crate) struct DispatcherSender { tx: mpsc::Sender, } impl DispatcherSender { - pub(crate) async fn send(&self, request: Request) -> DispatcherResponse { + pub(crate) async fn send(&self, request: EncodedRequest) -> DispatcherResponse { let (tx, rx) = oneshot::channel(); self.tx .send((request, tx))