Skip to content

Commit

Permalink
feat(iroh-sync): Queries and "views" (#1766)
Browse files Browse the repository at this point in the history
## Description

Implementation of #1667, extends #1701

With the following changes after discussions in Discord:

* Remove the notion of views, instead embed the info in the query. If
the high-level concept of views makes more sense for people from an API
perspective, we can restore it in the client api. However under the hood
the query details are different per view, this is why here I expose a
single `Query` struct
* Add a query builder with a typestate to only allow possible
combinations
* Add an index to the redb (fs) document store to make queries that are
sorted by key, or that are filtered by key but not by author, efficient.
This also is what allows to do queries for the "latest only" entry per
key, without allocating the full result set.

In the process I did a refactoring of the redb store to be safer to use.
Especially, I moved the `ouroboros` self-referencing stuff into a
`ranges` module, and encapsulated in an inner type to keep the
self-referencing compilcations scoped. Also did something I had mind for
a while: Add some type-safe abstractions around the range bounds
constructions that are used when selecting on redb tables. All this
turned out quite nice, IMO.

Also contains the changes from #1772 : 
* Renames `get_one` to `get_exact`
* Add flag to `get_exact` whether to include empty entries or not
* Add `get_one` to `iroh::client::Doc` to get a single entry with the
same query mechanisms as `get_many`


Open questions and tasks:
* Naming review of the query builder methods
* Integration of the query parameters in the console


## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [ ] Tests if relevant.

---------

Co-authored-by: dignifiedquire <[email protected]>
  • Loading branch information
Frando and dignifiedquire authored Nov 7, 2023
1 parent e671f12 commit 9a16498
Show file tree
Hide file tree
Showing 18 changed files with 2,006 additions and 913 deletions.
41 changes: 25 additions & 16 deletions iroh-sync/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tracing::{debug, error, error_span, trace, warn};

use crate::{
ranger::Message,
store::{self, GetFilter, ImportNamespaceOutcome},
store::{self, ImportNamespaceOutcome, Query},
Author, AuthorHeads, AuthorId, Capability, CapabilityKind, ContentStatus,
ContentStatusCallback, Event, NamespaceId, NamespaceSecret, PeerIdBytes, Replica, SignedEntry,
SyncOutcome,
Expand Down Expand Up @@ -122,13 +122,14 @@ enum ReplicaAction {
#[debug("reply")]
reply: oneshot::Sender<Result<()>>,
},
GetOne {
GetExact {
author: AuthorId,
key: Bytes,
include_empty: bool,
reply: oneshot::Sender<Result<Option<SignedEntry>>>,
},
GetMany {
filter: GetFilter,
query: Query,
reply: flume::Sender<Result<SignedEntry>>,
},
DropReplica {
Expand Down Expand Up @@ -366,26 +367,31 @@ impl SyncHandle {
rx.await?
}

// TODO: it would be great if this could be a sync method...
pub async fn get_many(
&self,
namespace: NamespaceId,
filter: GetFilter,
query: Query,
reply: flume::Sender<Result<SignedEntry>>,
) -> Result<()> {
let action = ReplicaAction::GetMany { filter, reply };
let action = ReplicaAction::GetMany { query, reply };
self.send_replica(namespace, action).await?;
Ok(())
}

pub async fn get_one(
pub async fn get_exact(
&self,
namespace: NamespaceId,
author: AuthorId,
key: Bytes,
include_empty: bool,
) -> Result<Option<SignedEntry>> {
let (reply, rx) = oneshot::channel();
let action = ReplicaAction::GetOne { author, key, reply };
let action = ReplicaAction::GetExact {
author,
key,
include_empty,
reply,
};
self.send_replica(namespace, action).await?;
rx.await?
}
Expand Down Expand Up @@ -595,17 +601,20 @@ impl<S: store::Store> Actor<S> {
let res = self.store.register_useful_peer(namespace, peer);
send_reply(reply, res)
}
ReplicaAction::GetOne { author, key, reply } => {
send_reply_with(reply, self, move |this| {
this.states.ensure_open(&namespace)?;
this.store.get_one(namespace, author, key)
})
}
ReplicaAction::GetMany { filter, reply } => {
ReplicaAction::GetExact {
author,
key,
include_empty,
reply,
} => send_reply_with(reply, self, move |this| {
this.states.ensure_open(&namespace)?;
this.store.get_exact(namespace, author, key, include_empty)
}),
ReplicaAction::GetMany { query, reply } => {
let iter = self
.states
.ensure_open(&namespace)
.and_then(|_| self.store.get_many(namespace, filter));
.and_then(|_| self.store.get_many(namespace, query));
iter_to_channel(reply, iter)
}
ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
Expand Down
14 changes: 7 additions & 7 deletions iroh-sync/src/net/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ impl BobState {
mod tests {
use crate::{
actor::OpenOpts,
keys::{AuthorId, NamespaceSecret},
store::{self, GetFilter, Store},
store::{self, Query, Store},
AuthorId, NamespaceSecret,
};
use anyhow::Result;
use iroh_bytes::Hash;
Expand Down Expand Up @@ -330,7 +330,7 @@ mod tests {

assert_eq!(
bob_store
.get_many(bob_replica.id(), GetFilter::All)
.get_many(bob_replica.id(), Query::all(),)
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap()
Expand All @@ -339,7 +339,7 @@ mod tests {
);
assert_eq!(
alice_store
.get_many(alice_replica.id(), GetFilter::All)
.get_many(alice_replica.id(), Query::all())
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap()
Expand Down Expand Up @@ -396,7 +396,7 @@ mod tests {

assert_eq!(
bob_store
.get_many(namespace.id(), GetFilter::All)
.get_many(namespace.id(), Query::all())
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap()
Expand All @@ -405,7 +405,7 @@ mod tests {
);
assert_eq!(
alice_store
.get_many(namespace.id(), GetFilter::All)
.get_many(namespace.id(), Query::all())
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap()
Expand Down Expand Up @@ -461,7 +461,7 @@ mod tests {

fn get_messages<S: Store>(store: &S, namespace: NamespaceId) -> Vec<Message> {
let mut msgs = store
.get_many(namespace, GetFilter::All)
.get_many(namespace, Query::all())
.unwrap()
.map(|entry| {
entry.map(|entry| {
Expand Down
Loading

0 comments on commit 9a16498

Please sign in to comment.