Skip to content

Commit

Permalink
Space API. Removed index_base from all requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Flowneee committed Jul 30, 2023
1 parent 95f5067 commit 8679dca
Show file tree
Hide file tree
Showing 13 changed files with 217 additions and 57 deletions.
13 changes: 10 additions & 3 deletions examples/schema.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use tarantool_rs::{schema::SpaceMetadata, Connection};
use tarantool_rs::{schema::SpaceMetadata, Connection, IteratorType};
use tracing::info;

#[tokio::main]
Expand All @@ -7,8 +7,15 @@ async fn main() -> Result<(), anyhow::Error> {

let conn = Connection::builder().build("127.0.0.1:3301").await?;

let data = SpaceMetadata::load_by_name(conn, "clients").await?;
let data = conn.find_space_by_name("clients").await?;
info!("{:?}", data);

let space = data.unwrap();
space.upsert(vec!["=".into()], vec![2.into()]).await?;
info!(
"{:?}",
space
.select::<(i64,)>(0, None, None, Some(IteratorType::All), vec![])
.await?
);
Ok(())
}
3 changes: 1 addition & 2 deletions examples/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use tarantool_rs::Executor;
use tarantool_rs::{Connection, ConnectionLike, Value};
use tarantool_rs::{Connection, ConnectionLike, Executor, Value};

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
Expand Down
18 changes: 18 additions & 0 deletions src/client/connection.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt;
use std::{
sync::{
atomic::{AtomicU32, Ordering},
Expand All @@ -19,6 +20,7 @@ use crate::{
request::{EncodedRequest, Id, Request},
response::ResponseBody,
},
schema::Space,
transport::DispatcherSender,
Result,
};
Expand Down Expand Up @@ -118,6 +120,16 @@ impl Connection {
pub(crate) async fn transaction(&self) -> Result<Transaction> {
self.transaction_builder().begin().await
}

/// Find and load space by its id.
pub async fn find_space_by_id(&self, id: u32) -> Result<Option<Space<Self>>> {
Space::load_by_id(self.clone(), id).await
}

/// Find and load space by its name.
pub async fn find_space_by_name(&self, name: &str) -> Result<Option<Space<Self>>> {
Space::load_by_name(self.clone(), name).await
}
}

#[async_trait]
Expand All @@ -142,3 +154,9 @@ impl Executor for Connection {
self.transaction().await
}
}

impl fmt::Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Connection")
}
}
16 changes: 4 additions & 12 deletions src/client/connection_like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
},
utils::deserialize_non_sql_response,
},
schema::Space,
IteratorType, Result,
};

Expand Down Expand Up @@ -86,29 +87,20 @@ pub trait ConnectionLike: Executor {
&self,
space_id: u32,
index_id: u32,
index_base: Option<u32>,
keys: Vec<Value>,
tuple: Vec<Value>,
) -> Result<()> {
let _ = self
.send_request(Update::new(space_id, index_id, index_base, keys, tuple))
.send_request(Update::new(space_id, index_id, keys, tuple))
.await?;
Ok(())
}

// TODO: structured tuple
// TODO: decode response
// TODO: maybe set index base to 1 always?
async fn upsert(
&self,
space_id: u32,
index_base: u32,
ops: Vec<Value>,
tuple: Vec<Value>,
) -> Result<()> {
let _ = self
.send_request(Upsert::new(space_id, index_base, ops, tuple))
.await?;
async fn upsert(&self, space_id: u32, ops: Vec<Value>, tuple: Vec<Value>) -> Result<()> {
let _ = self.send_request(Upsert::new(space_id, ops, tuple)).await?;
Ok(())
}

Expand Down
8 changes: 5 additions & 3 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt::Debug;

use async_trait::async_trait;
use rmpv::Value;

Expand All @@ -6,7 +8,7 @@ use crate::{codec::request::EncodedRequest, Result, Stream, Transaction, Transac

// TODO: docs
#[async_trait]
pub trait Executor: Sealed + Send + Sync {
pub trait Executor: Sealed + Send + Sync + Debug {
/// Send encoded request.
async fn send_encoded_request(&self, request: EncodedRequest) -> Result<Value>;

Expand All @@ -28,7 +30,7 @@ pub trait Executor: Sealed + Send + Sync {
}

#[async_trait]
impl<E: Executor + Sealed + Sync> Executor for &E {
impl<E: Executor + Sealed + Sync + Debug> Executor for &E {
async fn send_encoded_request(&self, request: EncodedRequest) -> Result<Value> {
(**self).send_encoded_request(request).await
}
Expand All @@ -47,7 +49,7 @@ impl<E: Executor + Sealed + Sync> Executor for &E {
}

#[async_trait]
impl<E: Executor + Sealed + Sync> Executor for &mut E {
impl<E: Executor + Sealed + Sync + Debug> Executor for &mut E {
async fn send_encoded_request(&self, request: EncodedRequest) -> Result<Value> {
(**self).send_encoded_request(request).await
}
Expand Down
6 changes: 5 additions & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@ mod stream;
mod transaction;

mod private {
use crate::client::{Connection, Stream, Transaction};
use crate::{
client::{Connection, Stream, Transaction},
schema::Space,
};

#[doc(hidden)]
pub trait Sealed {}

impl Sealed for Connection {}
impl Sealed for Stream {}
impl Sealed for Transaction {}
impl<E> Sealed for Space<E> {}
impl<S: Sealed + ?Sized> Sealed for &S {}
impl<S: Sealed + ?Sized> Sealed for &mut S {}
}
150 changes: 140 additions & 10 deletions src/client/schema/space.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
use std::fmt;
use std::fmt::{self, Debug};

use async_trait::async_trait;
use rmpv::Value;
use serde::Deserialize;
use serde::{de::DeserializeOwned, Deserialize};

use super::{IndexMetadata, SystemSpacesId};
use crate::{client::ConnectionLike, utils::UniqueIdNameMap, Error};
use crate::{
client::ConnectionLike,
codec::request::{Insert, Select},
utils::UniqueIdNameMap,
Error, Executor, IteratorType, Result,
};

/// Space metadata from with its indices metadata from [system views](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/system_views/).
#[derive(Clone, Deserialize)]
Expand Down Expand Up @@ -39,16 +45,13 @@ impl fmt::Debug for SpaceMetadata {

impl SpaceMetadata {
/// Load metadata of single space by its id.
pub async fn load_by_id(conn: impl ConnectionLike, id: u32) -> Result<Option<Self>, Error> {
pub async fn load_by_id(conn: impl ConnectionLike, id: u32) -> Result<Option<Self>> {
// 0 - primary id index
Self::load(conn, 0, id).await
}

/// Load metadata of single space by its name.
pub async fn load_by_name(
conn: impl ConnectionLike,
name: &str,
) -> Result<Option<Self>, Error> {
pub async fn load_by_name(conn: impl ConnectionLike, name: &str) -> Result<Option<Self>> {
// 2 - index on 'name' field
Self::load(conn, 2, name).await
}
Expand All @@ -58,7 +61,7 @@ impl SpaceMetadata {
conn: impl ConnectionLike,
index_id: u32,
key: impl Into<Value>,
) -> Result<Option<Self>, Error> {
) -> Result<Option<Self>> {
let Some(mut this): Option<Self> = conn
.select(
SystemSpacesId::VSpace as u32,
Expand All @@ -78,7 +81,7 @@ impl SpaceMetadata {
}

/// Load indices metadata into current space metadata.
async fn load_indices(&mut self, conn: impl ConnectionLike) -> Result<(), Error> {
async fn load_indices(&mut self, conn: impl ConnectionLike) -> Result<()> {
self.indices = IndexMetadata::load_by_space_id(conn, self.id)
.await
.and_then(|x| {
Expand Down Expand Up @@ -109,3 +112,130 @@ impl SpaceMetadata {
&self.indices
}
}

pub struct Space<E> {
executor: E,
metadata: SpaceMetadata,
}

impl<E: Clone> Clone for Space<E> {
fn clone(&self) -> Self {
Self {
executor: self.executor.clone(),
metadata: self.metadata.clone(),
}
}
}

impl<E> Space<E> {
pub fn new(executor: E, metadata: SpaceMetadata) -> Self {
Self { executor, metadata }
}

pub fn executor(&self) -> &E {
&self.executor
}

pub fn metadata(&self) -> &SpaceMetadata {
&self.metadata
}
}

impl<E: Executor> Space<E> {
/// Load metadata of single space by its id.
pub async fn load_by_id(executor: E, id: u32) -> Result<Option<Self>> {
let Some(metadata) = SpaceMetadata::load_by_id(&executor, id).await? else {
return Ok(None);
};
Ok(Some(Self::new(executor, metadata)))
}

/// Load metadata of single space by its name.
pub async fn load_by_name(executor: E, name: &str) -> Result<Option<Self>> {
let Some(metadata) = SpaceMetadata::load_by_name(&executor, name).await? else {
return Ok(None);
};
Ok(Some(Self::new(executor, metadata)))
}

// TODO: docs
pub async fn select<T>(
&self,
index_id: u32,
limit: Option<u32>,
offset: Option<u32>,
iterator: Option<IteratorType>,
keys: Vec<Value>,
) -> Result<Vec<T>>
where
T: DeserializeOwned,
{
self.executor
.select(self.metadata.id, index_id, limit, offset, iterator, keys)
.await
}

// TODO: docs
// TODO: decode response
pub async fn insert(&self, tuple: Vec<Value>) -> Result<()> {
self.executor.insert(self.metadata.id, tuple).await
}

// TODO: structured tuple
// TODO: decode response
pub async fn update(&self, index_id: u32, keys: Vec<Value>, tuple: Vec<Value>) -> Result<()> {
self.executor
.update(self.metadata.id, index_id, keys, tuple)
.await
}

// TODO: structured tuple
// TODO: decode response
// TODO: maybe set index base to 1 always?
pub async fn upsert(&self, ops: Vec<Value>, tuple: Vec<Value>) -> Result<()> {
self.executor.upsert(self.metadata.id, ops, tuple).await
}

// TODO: structured tuple
// TODO: decode response
pub async fn replace(&self, keys: Vec<Value>) -> Result<()> {
self.executor.replace(self.metadata.id, keys).await
}

// TODO: structured tuple
// TODO: decode response
pub async fn delete(&self, index_id: u32, keys: Vec<Value>) -> Result<()> {
self.executor.delete(self.metadata.id, index_id, keys).await
}
}

#[async_trait]
impl<E: Executor> Executor for Space<E> {
async fn send_encoded_request(
&self,
request: crate::codec::request::EncodedRequest,
) -> crate::Result<Value> {
self.executor.send_encoded_request(request).await
}

fn stream(&self) -> crate::Stream {
self.executor.stream()
}

fn transaction_builder(&self) -> crate::TransactionBuilder {
self.executor.transaction_builder()
}

async fn transaction(&self) -> crate::Result<crate::Transaction> {
self.executor.transaction().await
}
}

impl<E: Debug> fmt::Debug for Space<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SpaceMetadata")
.field("executor", &self.executor)
.field("metadata", &self.metadata)
.finish()
}
}
15 changes: 11 additions & 4 deletions src/client/stream.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::fmt;

use async_trait::async_trait;

use rmpv::Value;

use super::{Connection, Transaction, TransactionBuilder};
use crate::{
codec::request::{EncodedRequest},
Executor, Result,
};
use crate::{codec::request::EncodedRequest, Executor, Result};

/// Abstraction, providing sequential processing of requests.
///
Expand Down Expand Up @@ -76,3 +75,11 @@ impl Executor for Stream {
self.conn.transaction().await
}
}

impl fmt::Debug for Stream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Stream")
.field("stream_id", &self.stream_id)
.finish()
}
}
Loading

0 comments on commit 8679dca

Please sign in to comment.