Skip to content

Commit

Permalink
A few changes / fixes
Browse files Browse the repository at this point in the history
  * Various verb related builtins were returning E_VERBNF for bad objects, instead of E_INVARG.
  * This could be a broader problem with other builtins, but this is what I found for now.
  * Turn down spammy log on :huh
  * A bit of trait refactoring in the db layer.
  • Loading branch information
rdaum committed Sep 30, 2023
1 parent 813e0cb commit 493f1b0
Show file tree
Hide file tree
Showing 12 changed files with 282 additions and 173 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ metrics-macros.workspace = true
# For the DB layer.
rocksdb.workspace = true
crossbeam-channel.workspace = true
bincode.workspace = true
bincode.workspace = true
115 changes: 47 additions & 68 deletions crates/db/src/db_client.rs → crates/db/src/channel_db_tx_client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use async_trait::async_trait;
use crossbeam_channel::Sender;
use tokio::sync::oneshot;
use uuid::Uuid;
Expand All @@ -15,9 +16,10 @@ use moor_values::var::objid::Objid;
use moor_values::var::Var;

use crate::db_message::DbMessage;
use crate::db_tx::DbTransaction;
use moor_values::model::verbdef::VerbDefs;

pub(crate) struct DbTxClient {
pub(crate) struct DbTxChannelClient {
pub(crate) mailbox: Sender<DbMessage>,
}

Expand All @@ -29,8 +31,10 @@ async fn get_reply<R>(
.map_err(|e| WorldStateError::DatabaseError(e.to_string()))?
}

/// Sends messages over crossbeam channel to the Db tx thread and fields replies.
impl DbTxClient {
/// An implementation of DbTransaction which communicates over a crossbeam channel to a separate
/// (per-transaction) thread. For e.g. systems which have ownership patterns that make it difficult
/// to hold transactions in an async context, etc.
impl DbTxChannelClient {
pub fn new(mailbox: Sender<DbMessage>) -> Self {
Self { mailbox }
}
Expand All @@ -40,27 +44,27 @@ impl DbTxClient {
.send(msg)
.map_err(|e| WorldStateError::DatabaseError(e.to_string()))
}
}

pub async fn get_object_owner(&self, obj: Objid) -> Result<Objid, WorldStateError> {
#[async_trait]
impl DbTransaction for DbTxChannelClient {
async fn get_object_owner(&self, obj: Objid) -> Result<Objid, WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::GetObjectOwner(obj, send))?;
get_reply(receive).await
}

pub async fn set_object_owner(&self, obj: Objid, owner: Objid) -> Result<(), WorldStateError> {
async fn set_object_owner(&self, obj: Objid, owner: Objid) -> Result<(), WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::SetObjectOwner(obj, owner, send))?;
get_reply(receive).await?;
Ok(())
}

pub async fn get_object_flags(&self, obj: Objid) -> Result<BitEnum<ObjFlag>, WorldStateError> {
async fn get_object_flags(&self, obj: Objid) -> Result<BitEnum<ObjFlag>, WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::GetObjectFlagsOf(obj, send))?;
get_reply(receive).await
}

pub async fn set_object_flags(
async fn set_object_flags(
&self,
obj: Objid,
flags: BitEnum<ObjFlag>,
Expand All @@ -70,15 +74,13 @@ impl DbTxClient {
get_reply(receive).await?;
Ok(())
}

pub async fn get_object_name(&self, obj: Objid) -> Result<String, WorldStateError> {
async fn get_object_name(&self, obj: Objid) -> Result<String, WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::GetObjectNameOf(obj, send))?;
let name = get_reply(receive).await?;
Ok(name)
}

pub async fn create_object(
async fn create_object(
&self,
id: Option<Objid>,
attrs: ObjAttrs,
Expand All @@ -92,106 +94,82 @@ impl DbTxClient {
let oid = get_reply(receive).await?;
Ok(oid)
}

pub async fn recycle_object(&self, obj: Objid) -> Result<(), WorldStateError> {
async fn recycle_object(&self, obj: Objid) -> Result<(), WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::RecycleObject(obj, send))?;
get_reply(receive).await?;
Ok(())
}

pub async fn set_object_name(&self, obj: Objid, name: String) -> Result<(), WorldStateError> {
async fn set_object_name(&self, obj: Objid, name: String) -> Result<(), WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::SetObjectNameOf(obj, name, send))?;
get_reply(receive).await?;
Ok(())
}

pub async fn get_parent(&self, obj: Objid) -> Result<Objid, WorldStateError> {
async fn get_parent(&self, obj: Objid) -> Result<Objid, WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::GetParentOf(obj, send))?;
let oid = get_reply(receive).await?;
Ok(oid)
}

pub async fn set_parent(&self, obj: Objid, parent: Objid) -> Result<(), WorldStateError> {
async fn set_parent(&self, obj: Objid, parent: Objid) -> Result<(), WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::SetParent(obj, parent, send))?;
get_reply(receive).await?;
Ok(())
}

pub async fn get_children(&self, obj: Objid) -> Result<ObjSet, WorldStateError> {
async fn get_children(&self, obj: Objid) -> Result<ObjSet, WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::GetChildrenOf(obj, send))?;
let children = get_reply(receive).await?;
Ok(children)
}

pub async fn get_location_of(&self, obj: Objid) -> Result<Objid, WorldStateError> {
async fn get_location_of(&self, obj: Objid) -> Result<Objid, WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::GetLocationOf(obj, send))?;
let oid = get_reply(receive).await?;
Ok(oid)
}

pub async fn set_location_of(
&self,
obj: Objid,
location: Objid,
) -> Result<(), WorldStateError> {
async fn set_location_of(&self, obj: Objid, location: Objid) -> Result<(), WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::SetLocationOf(obj, location, send))?;
get_reply(receive).await?;
Ok(())
}

pub async fn get_contents_of(&self, obj: Objid) -> Result<ObjSet, WorldStateError> {
async fn get_contents_of(&self, obj: Objid) -> Result<ObjSet, WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::GetContentsOf(obj, send))?;
let contents = get_reply(receive).await?;
Ok(contents)
}

pub async fn get_max_object(&self) -> Result<Objid, WorldStateError> {
async fn get_max_object(&self) -> Result<Objid, WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::GetMaxObject(send))?;
let oid = get_reply(receive).await?;
Ok(oid)
}

pub async fn get_verbs(&self, obj: Objid) -> Result<VerbDefs, WorldStateError> {
async fn get_verbs(&self, obj: Objid) -> Result<VerbDefs, WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::GetVerbs(obj, send))?;
let verbs = get_reply(receive).await?;
Ok(verbs)
}

// TODO: this could return SliceRef or an Arc<Vec<u8>>, to potentially avoid copying. Though
// for RocksDB I don't think it matters, since I don't think it will let us avoid copying
// anyway.
pub async fn get_verb_binary(
&self,
obj: Objid,
uuid: Uuid,
) -> Result<Vec<u8>, WorldStateError> {
async fn get_verb_binary(&self, obj: Objid, uuid: Uuid) -> Result<Vec<u8>, WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::GetVerbBinary(obj, uuid, send))?;
let verb = get_reply(receive).await?;
Ok(verb)
}

pub async fn get_verb_by_name(
&self,
obj: Objid,
name: String,
) -> Result<VerbDef, WorldStateError> {
async fn get_verb_by_name(&self, obj: Objid, name: String) -> Result<VerbDef, WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::GetVerbByName(obj, name, send))?;
let verb = get_reply(receive).await?;
Ok(verb)
}
pub async fn get_verb_by_index(
async fn get_verb_by_index(
&self,
obj: Objid,
index: usize,
Expand All @@ -201,7 +179,7 @@ impl DbTxClient {
let verb = get_reply(receive).await?;
Ok(verb)
}
pub async fn resolve_verb(
async fn resolve_verb(
&self,
obj: Objid,
name: String,
Expand All @@ -217,7 +195,7 @@ impl DbTxClient {
let verbdef = get_reply(receive).await?;
Ok(verbdef)
}
pub async fn update_verb(
async fn update_verb(
&self,
obj: Objid,
uuid: Uuid,
Expand Down Expand Up @@ -248,7 +226,7 @@ impl DbTxClient {
}
Ok(())
}
pub async fn add_verb(
async fn add_verb(
&self,
location: Objid,
owner: Objid,
Expand All @@ -272,7 +250,7 @@ impl DbTxClient {
get_reply(receive).await?;
Ok(())
}
pub async fn delete_verb(&self, location: Objid, uuid: Uuid) -> Result<(), WorldStateError> {
async fn delete_verb(&self, location: Objid, uuid: Uuid) -> Result<(), WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::DeleteVerb {
location,
Expand All @@ -282,14 +260,13 @@ impl DbTxClient {
get_reply(receive).await?;
Ok(())
}

pub async fn get_properties(&self, obj: Objid) -> Result<PropDefs, WorldStateError> {
async fn get_properties(&self, obj: Objid) -> Result<PropDefs, WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::GetProperties(obj, send))?;
let props = get_reply(receive).await?;
Ok(props)
}
pub async fn set_property(
async fn set_property(
&self,
obj: Objid,
uuid: Uuid,
Expand All @@ -300,7 +277,7 @@ impl DbTxClient {
get_reply(receive).await?;
Ok(())
}
pub async fn define_property(
async fn define_property(
&self,
definer: Objid,
location: Objid,
Expand All @@ -322,7 +299,7 @@ impl DbTxClient {
let uuid = get_reply(receive).await?;
Ok(uuid)
}
pub async fn set_property_info(
async fn set_property_info(
&self,
obj: Objid,
uuid: Uuid,
Expand All @@ -342,25 +319,25 @@ impl DbTxClient {
get_reply(receive).await?;
Ok(())
}
pub async fn clear_property(&self, obj: Objid, uuid: Uuid) -> Result<(), WorldStateError> {
async fn clear_property(&self, obj: Objid, uuid: Uuid) -> Result<(), WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::ClearProperty(obj, uuid, send))?;
get_reply(receive).await?;
Ok(())
}
pub async fn delete_property(&self, obj: Objid, uuid: Uuid) -> Result<(), WorldStateError> {
async fn delete_property(&self, obj: Objid, uuid: Uuid) -> Result<(), WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::DeleteProperty(obj, uuid, send))?;
get_reply(receive).await?;
Ok(())
}
pub async fn retrieve_property(&self, obj: Objid, uuid: Uuid) -> Result<Var, WorldStateError> {
async fn retrieve_property(&self, obj: Objid, uuid: Uuid) -> Result<Var, WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::RetrieveProperty(obj, uuid, send))?;
let value = get_reply(receive).await?;
Ok(value)
}
pub async fn resolve_property(
async fn resolve_property(
&self,
obj: Objid,
name: String,
Expand All @@ -370,23 +347,25 @@ impl DbTxClient {
let (prop, value) = get_reply(receive).await?;
Ok((prop, value))
}
pub async fn valid(&self, obj: Objid) -> Result<bool, WorldStateError> {
async fn valid(&self, obj: Objid) -> Result<bool, WorldStateError> {
if obj.0 < 0 {
return Ok(false);
}
let (send, receive) = oneshot::channel();
self.send(DbMessage::Valid(obj, send))?;
let valid = receive
.await
.map_err(|e| WorldStateError::DatabaseError(e.to_string()))?;
Ok(valid)
}
pub async fn commit(&self) -> Result<CommitResult, WorldStateError> {
async fn commit(&self) -> Result<CommitResult, WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::Commit(send))?;
receive
.await
.map_err(|e| WorldStateError::DatabaseError(e.to_string()))
}

pub async fn rollback(&self) -> Result<(), WorldStateError> {
async fn rollback(&self) -> Result<(), WorldStateError> {
let (send, receive) = oneshot::channel();
self.send(DbMessage::Rollback(send))?;
receive
Expand Down
Loading

0 comments on commit 493f1b0

Please sign in to comment.