From 8679dca9145afe8a6398a7f2de35b8c196e3325f Mon Sep 17 00:00:00 2001 From: Andrey Kononov Date: Sun, 30 Jul 2023 14:52:05 +0400 Subject: [PATCH] Space API. Removed index_base from all requests --- examples/schema.rs | 13 ++- examples/transactions.rs | 3 +- src/client/connection.rs | 18 ++++ src/client/connection_like.rs | 16 +--- src/client/executor.rs | 8 +- src/client/mod.rs | 6 +- src/client/schema/space.rs | 150 +++++++++++++++++++++++++++++++--- src/client/stream.rs | 15 +++- src/client/transaction.rs | 11 ++- src/codec/request/mod.rs | 1 + src/codec/request/update.rs | 16 +--- src/codec/request/upsert.rs | 8 +- tests/integration.rs | 9 +- 13 files changed, 217 insertions(+), 57 deletions(-) diff --git a/examples/schema.rs b/examples/schema.rs index 4e7317f..77411e4 100644 --- a/examples/schema.rs +++ b/examples/schema.rs @@ -1,4 +1,4 @@ -use tarantool_rs::{schema::SpaceMetadata, Connection}; +use tarantool_rs::{schema::SpaceMetadata, Connection, IteratorType}; use tracing::info; #[tokio::main] @@ -7,8 +7,15 @@ async fn main() -> Result<(), anyhow::Error> { let conn = Connection::builder().build("127.0.0.1:3301").await?; - let data = SpaceMetadata::load_by_name(conn, "clients").await?; + let data = conn.find_space_by_name("clients").await?; info!("{:?}", data); - + let space = data.unwrap(); + space.upsert(vec!["=".into()], vec![2.into()]).await?; + info!( + "{:?}", + space + .select::<(i64,)>(0, None, None, Some(IteratorType::All), vec![]) + .await? + ); Ok(()) } diff --git a/examples/transactions.rs b/examples/transactions.rs index e54af80..3776573 100644 --- a/examples/transactions.rs +++ b/examples/transactions.rs @@ -1,5 +1,4 @@ -use tarantool_rs::Executor; -use tarantool_rs::{Connection, ConnectionLike, Value}; +use tarantool_rs::{Connection, ConnectionLike, Executor, Value}; #[tokio::main] async fn main() -> Result<(), anyhow::Error> { diff --git a/src/client/connection.rs b/src/client/connection.rs index dd2c78a..abd1739 100644 --- a/src/client/connection.rs +++ b/src/client/connection.rs @@ -1,3 +1,4 @@ +use std::fmt; use std::{ sync::{ atomic::{AtomicU32, Ordering}, @@ -19,6 +20,7 @@ use crate::{ request::{EncodedRequest, Id, Request}, response::ResponseBody, }, + schema::Space, transport::DispatcherSender, Result, }; @@ -118,6 +120,16 @@ impl Connection { pub(crate) async fn transaction(&self) -> Result { self.transaction_builder().begin().await } + + /// Find and load space by its id. + pub async fn find_space_by_id(&self, id: u32) -> Result>> { + Space::load_by_id(self.clone(), id).await + } + + /// Find and load space by its name. + pub async fn find_space_by_name(&self, name: &str) -> Result>> { + Space::load_by_name(self.clone(), name).await + } } #[async_trait] @@ -142,3 +154,9 @@ impl Executor for Connection { self.transaction().await } } + +impl fmt::Debug for Connection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Connection") + } +} diff --git a/src/client/connection_like.rs b/src/client/connection_like.rs index bb9d9c5..d5ef454 100644 --- a/src/client/connection_like.rs +++ b/src/client/connection_like.rs @@ -14,6 +14,7 @@ use crate::{ }, utils::deserialize_non_sql_response, }, + schema::Space, IteratorType, Result, }; @@ -86,12 +87,11 @@ pub trait ConnectionLike: Executor { &self, space_id: u32, index_id: u32, - index_base: Option, keys: Vec, tuple: Vec, ) -> Result<()> { let _ = self - .send_request(Update::new(space_id, index_id, index_base, keys, tuple)) + .send_request(Update::new(space_id, index_id, keys, tuple)) .await?; Ok(()) } @@ -99,16 +99,8 @@ pub trait ConnectionLike: Executor { // TODO: structured tuple // TODO: decode response // TODO: maybe set index base to 1 always? - async fn upsert( - &self, - space_id: u32, - index_base: u32, - ops: Vec, - tuple: Vec, - ) -> Result<()> { - let _ = self - .send_request(Upsert::new(space_id, index_base, ops, tuple)) - .await?; + async fn upsert(&self, space_id: u32, ops: Vec, tuple: Vec) -> Result<()> { + let _ = self.send_request(Upsert::new(space_id, ops, tuple)).await?; Ok(()) } diff --git a/src/client/executor.rs b/src/client/executor.rs index af33f2c..3f24776 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -1,3 +1,5 @@ +use std::fmt::Debug; + use async_trait::async_trait; use rmpv::Value; @@ -6,7 +8,7 @@ use crate::{codec::request::EncodedRequest, Result, Stream, Transaction, Transac // TODO: docs #[async_trait] -pub trait Executor: Sealed + Send + Sync { +pub trait Executor: Sealed + Send + Sync + Debug { /// Send encoded request. async fn send_encoded_request(&self, request: EncodedRequest) -> Result; @@ -28,7 +30,7 @@ pub trait Executor: Sealed + Send + Sync { } #[async_trait] -impl Executor for &E { +impl Executor for &E { async fn send_encoded_request(&self, request: EncodedRequest) -> Result { (**self).send_encoded_request(request).await } @@ -47,7 +49,7 @@ impl Executor for &E { } #[async_trait] -impl Executor for &mut E { +impl Executor for &mut E { async fn send_encoded_request(&self, request: EncodedRequest) -> Result { (**self).send_encoded_request(request).await } diff --git a/src/client/mod.rs b/src/client/mod.rs index d0eaa9c..f2b3e1d 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -15,7 +15,10 @@ mod stream; mod transaction; mod private { - use crate::client::{Connection, Stream, Transaction}; + use crate::{ + client::{Connection, Stream, Transaction}, + schema::Space, + }; #[doc(hidden)] pub trait Sealed {} @@ -23,6 +26,7 @@ mod private { impl Sealed for Connection {} impl Sealed for Stream {} impl Sealed for Transaction {} + impl Sealed for Space {} impl Sealed for &S {} impl Sealed for &mut S {} } diff --git a/src/client/schema/space.rs b/src/client/schema/space.rs index a355a0a..4336903 100644 --- a/src/client/schema/space.rs +++ b/src/client/schema/space.rs @@ -1,10 +1,16 @@ -use std::fmt; +use std::fmt::{self, Debug}; +use async_trait::async_trait; use rmpv::Value; -use serde::Deserialize; +use serde::{de::DeserializeOwned, Deserialize}; use super::{IndexMetadata, SystemSpacesId}; -use crate::{client::ConnectionLike, utils::UniqueIdNameMap, Error}; +use crate::{ + client::ConnectionLike, + codec::request::{Insert, Select}, + utils::UniqueIdNameMap, + Error, Executor, IteratorType, Result, +}; /// Space metadata from with its indices metadata from [system views](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/system_views/). #[derive(Clone, Deserialize)] @@ -39,16 +45,13 @@ impl fmt::Debug for SpaceMetadata { impl SpaceMetadata { /// Load metadata of single space by its id. - pub async fn load_by_id(conn: impl ConnectionLike, id: u32) -> Result, Error> { + pub async fn load_by_id(conn: impl ConnectionLike, id: u32) -> Result> { // 0 - primary id index Self::load(conn, 0, id).await } /// Load metadata of single space by its name. - pub async fn load_by_name( - conn: impl ConnectionLike, - name: &str, - ) -> Result, Error> { + pub async fn load_by_name(conn: impl ConnectionLike, name: &str) -> Result> { // 2 - index on 'name' field Self::load(conn, 2, name).await } @@ -58,7 +61,7 @@ impl SpaceMetadata { conn: impl ConnectionLike, index_id: u32, key: impl Into, - ) -> Result, Error> { + ) -> Result> { let Some(mut this): Option = conn .select( SystemSpacesId::VSpace as u32, @@ -78,7 +81,7 @@ impl SpaceMetadata { } /// Load indices metadata into current space metadata. - async fn load_indices(&mut self, conn: impl ConnectionLike) -> Result<(), Error> { + async fn load_indices(&mut self, conn: impl ConnectionLike) -> Result<()> { self.indices = IndexMetadata::load_by_space_id(conn, self.id) .await .and_then(|x| { @@ -109,3 +112,130 @@ impl SpaceMetadata { &self.indices } } + +pub struct Space { + executor: E, + metadata: SpaceMetadata, +} + +impl Clone for Space { + fn clone(&self) -> Self { + Self { + executor: self.executor.clone(), + metadata: self.metadata.clone(), + } + } +} + +impl Space { + pub fn new(executor: E, metadata: SpaceMetadata) -> Self { + Self { executor, metadata } + } + + pub fn executor(&self) -> &E { + &self.executor + } + + pub fn metadata(&self) -> &SpaceMetadata { + &self.metadata + } +} + +impl Space { + /// Load metadata of single space by its id. + pub async fn load_by_id(executor: E, id: u32) -> Result> { + let Some(metadata) = SpaceMetadata::load_by_id(&executor, id).await? else { + return Ok(None); + }; + Ok(Some(Self::new(executor, metadata))) + } + + /// Load metadata of single space by its name. + pub async fn load_by_name(executor: E, name: &str) -> Result> { + let Some(metadata) = SpaceMetadata::load_by_name(&executor, name).await? else { + return Ok(None); + }; + Ok(Some(Self::new(executor, metadata))) + } + + // TODO: docs + pub async fn select( + &self, + index_id: u32, + limit: Option, + offset: Option, + iterator: Option, + keys: Vec, + ) -> Result> + where + T: DeserializeOwned, + { + self.executor + .select(self.metadata.id, index_id, limit, offset, iterator, keys) + .await + } + + // TODO: docs + // TODO: decode response + pub async fn insert(&self, tuple: Vec) -> Result<()> { + self.executor.insert(self.metadata.id, tuple).await + } + + // TODO: structured tuple + // TODO: decode response + pub async fn update(&self, index_id: u32, keys: Vec, tuple: Vec) -> Result<()> { + self.executor + .update(self.metadata.id, index_id, keys, tuple) + .await + } + + // TODO: structured tuple + // TODO: decode response + // TODO: maybe set index base to 1 always? + pub async fn upsert(&self, ops: Vec, tuple: Vec) -> Result<()> { + self.executor.upsert(self.metadata.id, ops, tuple).await + } + + // TODO: structured tuple + // TODO: decode response + pub async fn replace(&self, keys: Vec) -> Result<()> { + self.executor.replace(self.metadata.id, keys).await + } + + // TODO: structured tuple + // TODO: decode response + pub async fn delete(&self, index_id: u32, keys: Vec) -> Result<()> { + self.executor.delete(self.metadata.id, index_id, keys).await + } +} + +#[async_trait] +impl Executor for Space { + async fn send_encoded_request( + &self, + request: crate::codec::request::EncodedRequest, + ) -> crate::Result { + self.executor.send_encoded_request(request).await + } + + fn stream(&self) -> crate::Stream { + self.executor.stream() + } + + fn transaction_builder(&self) -> crate::TransactionBuilder { + self.executor.transaction_builder() + } + + async fn transaction(&self) -> crate::Result { + self.executor.transaction().await + } +} + +impl fmt::Debug for Space { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SpaceMetadata") + .field("executor", &self.executor) + .field("metadata", &self.metadata) + .finish() + } +} diff --git a/src/client/stream.rs b/src/client/stream.rs index ac40334..fda4c6b 100644 --- a/src/client/stream.rs +++ b/src/client/stream.rs @@ -1,12 +1,11 @@ +use std::fmt; + use async_trait::async_trait; use rmpv::Value; use super::{Connection, Transaction, TransactionBuilder}; -use crate::{ - codec::request::{EncodedRequest}, - Executor, Result, -}; +use crate::{codec::request::EncodedRequest, Executor, Result}; /// Abstraction, providing sequential processing of requests. /// @@ -76,3 +75,11 @@ impl Executor for Stream { self.conn.transaction().await } } + +impl fmt::Debug for Stream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Stream") + .field("stream_id", &self.stream_id) + .finish() + } +} diff --git a/src/client/transaction.rs b/src/client/transaction.rs index 02d880b..a3097a4 100644 --- a/src/client/transaction.rs +++ b/src/client/transaction.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{fmt, time::Duration}; use async_trait::async_trait; @@ -109,6 +109,15 @@ impl Executor for Transaction { } } +impl fmt::Debug for Transaction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Transaction") + .field("stream_id", &self.stream_id) + .field("finished", &self.finished) + .finish() + } +} + /// Build transaction. pub struct TransactionBuilder { connection: Connection, diff --git a/src/codec/request/mod.rs b/src/codec/request/mod.rs index aeb374c..0ab480d 100644 --- a/src/codec/request/mod.rs +++ b/src/codec/request/mod.rs @@ -31,6 +31,7 @@ mod upsert; pub const PROTOCOL_VERSION: u8 = 3; const DEFAULT_ENCODE_BUFFER_SIZE: usize = 128; +const INDEX_BASE_VALUE: u32 = 0; // TODO: docs pub trait Request { diff --git a/src/codec/request/update.rs b/src/codec/request/update.rs index d6685c6..bd238f9 100644 --- a/src/codec/request/update.rs +++ b/src/codec/request/update.rs @@ -18,23 +18,15 @@ use super::Request; pub(crate) struct Update { pub space_id: u32, pub index_id: u32, - pub index_base: Option, pub keys: Vec, pub tuple: Vec, } impl Update { - pub(crate) fn new( - space_id: u32, - index_id: u32, - index_base: Option, - keys: Vec, - tuple: Vec, - ) -> Self { + pub(crate) fn new(space_id: u32, index_id: u32, keys: Vec, tuple: Vec) -> Self { Self { space_id, index_id, - index_base, keys, tuple, } @@ -51,13 +43,9 @@ impl Request for Update { // NOTE: `&mut buf: mut` is required since I don't get why compiler complain fn encode(&self, mut buf: &mut dyn Write) -> Result<(), EncodingError> { - let map_len = if self.index_base.is_some() { 5 } else { 4 }; - rmp::encode::write_map_len(&mut buf, map_len)?; + rmp::encode::write_map_len(&mut buf, 4)?; write_kv_u32(buf, keys::SPACE_ID, self.space_id)?; write_kv_u32(buf, keys::INDEX_ID, self.index_id)?; - if let Some(value) = self.index_base { - write_kv_u32(buf, keys::INDEX_BASE, value)?; - } write_kv_array(buf, keys::KEY, &self.keys)?; write_kv_array(buf, keys::TUPLE, &self.tuple)?; Ok(()) diff --git a/src/codec/request/upsert.rs b/src/codec/request/upsert.rs index e068993..ee6f658 100644 --- a/src/codec/request/upsert.rs +++ b/src/codec/request/upsert.rs @@ -12,21 +12,19 @@ use crate::{ errors::EncodingError, }; -use super::Request; +use super::{Request, INDEX_BASE_VALUE}; #[derive(Clone, Debug)] pub(crate) struct Upsert { pub space_id: u32, - pub index_base: u32, pub ops: Vec, pub tuple: Vec, } impl Upsert { - pub(crate) fn new(space_id: u32, index_base: u32, ops: Vec, tuple: Vec) -> Self { + pub(crate) fn new(space_id: u32, ops: Vec, tuple: Vec) -> Self { Self { space_id, - index_base, ops, tuple, } @@ -46,7 +44,7 @@ impl Request for Upsert { fn encode(&self, mut buf: &mut dyn Write) -> Result<(), EncodingError> { rmp::encode::write_map_len(&mut buf, 4)?; write_kv_u32(buf, keys::SPACE_ID, self.space_id)?; - write_kv_u32(buf, keys::INDEX_BASE, self.index_base)?; + write_kv_u32(buf, keys::INDEX_BASE, INDEX_BASE_VALUE)?; write_kv_array(buf, keys::OPS, &self.ops)?; write_kv_array(buf, keys::TUPLE, &self.tuple)?; Ok(()) diff --git a/tests/integration.rs b/tests/integration.rs index fc3dbac..b11849a 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -82,14 +82,19 @@ async fn retrieve_schema() -> Result<(), anyhow::Error> { let container = TarantoolTestContainer::default(); let conn = container.create_conn().await?; - let space = SpaceMetadata::load_by_name(&conn, "ds9_crew") + let space = conn + .load_by_name("ds9_crew") .await? .expect("Space 'ds9_crew' found"); assert_eq!(space.id(), 512, "First user space expected to have id 512"); assert_eq!(space.name(), "ds9_crew"); assert_eq!(space.indices().len(), 3); - let primary_idx = space.indices().get_by_id(0).expect("Primary index present"); + let primary_idx = space + .metadata() + .indices() + .get_by_id(0) + .expect("Primary index present"); assert_eq!(primary_idx.name(), "idx_id"); assert_eq!(primary_idx.space_id(), 512); assert_eq!(primary_idx.id(), 0);