diff --git a/iroh-sync/src/actor.rs b/iroh-sync/src/actor.rs index d3aee543..221a70ef 100644 --- a/iroh-sync/src/actor.rs +++ b/iroh-sync/src/actor.rs @@ -15,7 +15,7 @@ use tracing::{debug, error, error_span, trace, warn}; use crate::{ ranger::Message, - store::{self, GetFilter, ImportNamespaceOutcome}, + store::{self, ImportNamespaceOutcome, Query}, Author, AuthorHeads, AuthorId, Capability, CapabilityKind, ContentStatus, ContentStatusCallback, Event, NamespaceId, NamespaceSecret, PeerIdBytes, Replica, SignedEntry, SyncOutcome, @@ -122,13 +122,14 @@ enum ReplicaAction { #[debug("reply")] reply: oneshot::Sender>, }, - GetOne { + GetExact { author: AuthorId, key: Bytes, + include_empty: bool, reply: oneshot::Sender>>, }, GetMany { - filter: GetFilter, + query: Query, reply: flume::Sender>, }, DropReplica { @@ -366,26 +367,31 @@ impl SyncHandle { rx.await? } - // TODO: it would be great if this could be a sync method... pub async fn get_many( &self, namespace: NamespaceId, - filter: GetFilter, + query: Query, reply: flume::Sender>, ) -> Result<()> { - let action = ReplicaAction::GetMany { filter, reply }; + let action = ReplicaAction::GetMany { query, reply }; self.send_replica(namespace, action).await?; Ok(()) } - pub async fn get_one( + pub async fn get_exact( &self, namespace: NamespaceId, author: AuthorId, key: Bytes, + include_empty: bool, ) -> Result> { let (reply, rx) = oneshot::channel(); - let action = ReplicaAction::GetOne { author, key, reply }; + let action = ReplicaAction::GetExact { + author, + key, + include_empty, + reply, + }; self.send_replica(namespace, action).await?; rx.await? } @@ -595,17 +601,20 @@ impl Actor { let res = self.store.register_useful_peer(namespace, peer); send_reply(reply, res) } - ReplicaAction::GetOne { author, key, reply } => { - send_reply_with(reply, self, move |this| { - this.states.ensure_open(&namespace)?; - this.store.get_one(namespace, author, key) - }) - } - ReplicaAction::GetMany { filter, reply } => { + ReplicaAction::GetExact { + author, + key, + include_empty, + reply, + } => send_reply_with(reply, self, move |this| { + this.states.ensure_open(&namespace)?; + this.store.get_exact(namespace, author, key, include_empty) + }), + ReplicaAction::GetMany { query, reply } => { let iter = self .states .ensure_open(&namespace) - .and_then(|_| self.store.get_many(namespace, filter)); + .and_then(|_| self.store.get_many(namespace, query)); iter_to_channel(reply, iter) } ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| { diff --git a/iroh-sync/src/net/codec.rs b/iroh-sync/src/net/codec.rs index 18876153..46f67c0f 100644 --- a/iroh-sync/src/net/codec.rs +++ b/iroh-sync/src/net/codec.rs @@ -295,8 +295,8 @@ impl BobState { mod tests { use crate::{ actor::OpenOpts, - keys::{AuthorId, NamespaceSecret}, - store::{self, GetFilter, Store}, + store::{self, Query, Store}, + AuthorId, NamespaceSecret, }; use anyhow::Result; use iroh_bytes::Hash; @@ -330,7 +330,7 @@ mod tests { assert_eq!( bob_store - .get_many(bob_replica.id(), GetFilter::All) + .get_many(bob_replica.id(), Query::all(),) .unwrap() .collect::>>() .unwrap() @@ -339,7 +339,7 @@ mod tests { ); assert_eq!( alice_store - .get_many(alice_replica.id(), GetFilter::All) + .get_many(alice_replica.id(), Query::all()) .unwrap() .collect::>>() .unwrap() @@ -396,7 +396,7 @@ mod tests { assert_eq!( bob_store - .get_many(namespace.id(), GetFilter::All) + .get_many(namespace.id(), Query::all()) .unwrap() .collect::>>() .unwrap() @@ -405,7 +405,7 @@ mod tests { ); assert_eq!( alice_store - .get_many(namespace.id(), GetFilter::All) + .get_many(namespace.id(), Query::all()) .unwrap() .collect::>>() .unwrap() @@ -461,7 +461,7 @@ mod tests { fn get_messages(store: &S, namespace: NamespaceId) -> Vec { let mut msgs = store - .get_many(namespace, GetFilter::All) + .get_many(namespace, Query::all()) .unwrap() .map(|entry| { entry.map(|entry| { diff --git a/iroh-sync/src/store.rs b/iroh-sync/src/store.rs index 50b70b4e..8121f164 100644 --- a/iroh-sync/src/store.rs +++ b/iroh-sync/src/store.rs @@ -3,6 +3,7 @@ use std::num::{NonZeroU64, NonZeroUsize}; use anyhow::Result; +use bytes::Bytes; use iroh_bytes::Hash; use rand_core::CryptoRngCore; use serde::{Deserialize, Serialize}; @@ -19,6 +20,7 @@ use crate::{ pub mod fs; pub mod memory; mod pubkeys; +mod util; pub use pubkeys::*; /// Number of [`PeerIdBytes`] objects to cache per document. @@ -126,16 +128,19 @@ pub trait Store: std::fmt::Debug + Clone + Send + Sync + 'static { fn get_author(&self, author: &AuthorId) -> Result>; /// Get an iterator over entries of a replica. - /// - /// The [`GetFilter`] has several methods of filtering the returned entries. - fn get_many(&self, namespace: NamespaceId, filter: GetFilter) -> Result>; + fn get_many( + &self, + namespace: NamespaceId, + query: impl Into, + ) -> Result>; /// Get an entry by key and author. - fn get_one( + fn get_exact( &self, namespace: NamespaceId, author: AuthorId, key: impl AsRef<[u8]>, + include_empty: bool, ) -> Result>; /// Get all content hashes of all replicas in the store. @@ -183,35 +188,250 @@ pub enum ImportNamespaceOutcome { NoChange, } -/// Filter a get query onto a namespace -#[derive(Debug, Serialize, Deserialize)] -pub enum GetFilter { - /// No filter, list all entries - All, - /// Filter for exact key match - Key(Vec), - /// Filter for key prefix - Prefix(Vec), - /// Filter by author - Author(AuthorId), - /// Filter by key prefix and author - AuthorAndPrefix(AuthorId, Vec), +/// A query builder for document queries. +#[derive(Debug, Default)] +pub struct QueryBuilder { + kind: K, + filter_author: AuthorFilter, + filter_key: KeyFilter, + limit: Option, + offset: u64, + include_empty: bool, + sort_direction: SortDirection, +} + +impl QueryBuilder { + /// Call to include empty entries (deletion markers). + pub fn include_empty(mut self) -> Self { + self.include_empty = true; + self + } + /// Filter by exact key match. + pub fn key_exact(mut self, key: impl AsRef<[u8]>) -> Self { + self.filter_key = KeyFilter::Exact(key.as_ref().to_vec().into()); + self + } + /// Filter by key prefix. + pub fn key_prefix(mut self, key: impl AsRef<[u8]>) -> Self { + self.filter_key = KeyFilter::Prefix(key.as_ref().to_vec().into()); + self + } + /// Filter by author. + pub fn author(mut self, author: AuthorId) -> Self { + self.filter_author = AuthorFilter::Exact(author); + self + } + /// Set the maximum number of entries to be returned. + pub fn limit(mut self, limit: u64) -> Self { + self.limit = Some(limit); + self + } + /// Set the offset within the result set from where to start returning results. + pub fn offset(mut self, offset: u64) -> Self { + self.offset = offset; + self + } +} + +/// Query on all entries without aggregation. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct FlatQuery { + sort_by: SortBy, +} + +/// Query that only returns the latest entry for a key which has entries from multiple authors. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct SingleLatestPerKeyQuery {} + +impl QueryBuilder { + /// Set the sort for the query. + /// + /// The default is to sort by author, then by key, in ascending order. + pub fn sort_by(mut self, sort_by: SortBy, direction: SortDirection) -> Self { + self.kind.sort_by = sort_by; + self.sort_direction = direction; + self + } + + /// Build the query. + pub fn build(self) -> Query { + Query::from(self) + } +} + +impl QueryBuilder { + /// Set the order direction for the query. + /// + /// Ordering is always by key for this query type. + /// Default direction is ascending. + pub fn sort_direction(mut self, direction: SortDirection) -> Self { + self.sort_direction = direction; + self + } + + /// Build the query. + pub fn build(self) -> Query { + Query::from(self) + } } -impl Default for GetFilter { - fn default() -> Self { - Self::All +impl From> for Query { + fn from(builder: QueryBuilder) -> Query { + Query { + kind: QueryKind::SingleLatestPerKey(builder.kind), + filter_author: builder.filter_author, + filter_key: builder.filter_key, + limit: builder.limit, + offset: builder.offset, + include_empty: builder.include_empty, + sort_direction: builder.sort_direction, + } } } -impl GetFilter { - /// Create a [`GetFilter`] from author and prefix options. - pub fn author_prefix(author: Option, prefix: Option>) -> Self { - match (author, prefix) { - (None, None) => Self::All, - (Some(author), None) => Self::Author(author), - (None, Some(prefix)) => Self::Prefix(prefix.as_ref().to_vec()), - (Some(author), Some(prefix)) => Self::AuthorAndPrefix(author, prefix.as_ref().to_vec()), +impl From> for Query { + fn from(builder: QueryBuilder) -> Query { + Query { + kind: QueryKind::Flat(builder.kind), + filter_author: builder.filter_author, + filter_key: builder.filter_key, + limit: builder.limit, + offset: builder.offset, + include_empty: builder.include_empty, + sort_direction: builder.sort_direction, } } } + +/// Note: When using the `SingleLatestPerKey` query kind, the key filter is applied *before* the +/// grouping, the author filter is applied *after* the grouping. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Query { + kind: QueryKind, + filter_author: AuthorFilter, + filter_key: KeyFilter, + limit: Option, + offset: u64, + include_empty: bool, + sort_direction: SortDirection, +} + +impl Query { + /// Query all records. + pub fn all() -> QueryBuilder { + Default::default() + } + /// Query only the latest entry for each key, omitting older entries if the entry was written + /// to by multiple authors. + pub fn single_latest_per_key() -> QueryBuilder { + Default::default() + } + + /// Create a [`Query::all`] query filtered by a single author. + pub fn author(author: AuthorId) -> QueryBuilder { + Self::all().author(author) + } + + /// Create a [`Query::all`] query filtered by a single key. + pub fn key_exact(key: impl AsRef<[u8]>) -> QueryBuilder { + Self::all().key_exact(key) + } + + /// Create a [`Query::all`] query filtered by a key prefix. + pub fn key_prefix(prefix: impl AsRef<[u8]>) -> QueryBuilder { + Self::all().key_prefix(prefix) + } + + /// Get the limit for this query (max. number of entries to emit). + pub fn limit(&self) -> Option { + self.limit + } + + /// Get the offset for this query (number of entries to skip at the beginning). + pub fn offset(&self) -> u64 { + self.offset + } +} + +/// Sort direction +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] +pub enum SortDirection { + /// Sort ascending + #[default] + Asc, + /// Sort descending + Desc, +} + +#[derive(derive_more::Debug, Clone, Serialize, Deserialize)] +enum QueryKind { + #[debug("Flat {{ sort_by: {:?}}}", _0)] + Flat(FlatQuery), + #[debug("SingleLatestPerKey")] + SingleLatestPerKey(SingleLatestPerKeyQuery), +} + +/// Fields by which the query can be sorted +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] +pub enum SortBy { + /// Sort by key, then author. + KeyAuthor, + /// Sort by author, then key. + #[default] + AuthorKey, +} + +/// Key matching. +#[derive(Debug, Serialize, Deserialize, Clone, Default, Eq, PartialEq)] +pub enum KeyFilter { + /// Matches any key. + #[default] + Any, + /// Only keys that are exactly the provided value. + Exact(Bytes), + /// All keys that start with the provided value. + Prefix(Bytes), +} + +impl> From for KeyFilter { + fn from(value: T) -> Self { + KeyFilter::Exact(Bytes::copy_from_slice(value.as_ref())) + } +} + +impl KeyFilter { + /// Test if a key is matched by this [`KeyFilter`]. + pub fn matches(&self, key: &[u8]) -> bool { + match self { + Self::Any => true, + Self::Exact(k) => &k[..] == key, + Self::Prefix(p) => key.starts_with(p), + } + } +} + +/// Author matching. +#[derive(Debug, Serialize, Deserialize, Clone, Default, Eq, PartialEq)] +pub enum AuthorFilter { + /// Matches any author. + #[default] + Any, + /// Matches exactly the provided author. + Exact(AuthorId), +} + +impl AuthorFilter { + /// Test if an author is matched by this [`AuthorFilter`]. + pub fn matches(&self, author: &AuthorId) -> bool { + match self { + Self::Any => true, + Self::Exact(a) => a == author, + } + } +} + +impl From for AuthorFilter { + fn from(value: AuthorId) -> Self { + AuthorFilter::Exact(value) + } +} diff --git a/iroh-sync/src/store/fs.rs b/iroh-sync/src/store/fs.rs index 7943e5ca..7e0b1443 100644 --- a/iroh-sync/src/store/fs.rs +++ b/iroh-sync/src/store/fs.rs @@ -2,20 +2,22 @@ use std::{ cmp::Ordering, - collections::{HashMap, HashSet}, + collections::HashSet, + iter::{Chain, Flatten}, + ops::Bound, path::Path, sync::Arc, }; use anyhow::{anyhow, Result}; +use bytes::Bytes; use derive_more::From; use ed25519_dalek::{SignatureError, VerifyingKey}; use iroh_bytes::Hash; -use ouroboros::self_referencing; use parking_lot::RwLock; use redb::{ - Database, MultimapTableDefinition, Range as TableRange, ReadOnlyTable, ReadTransaction, - ReadableMultimapTable, ReadableTable, StorageError, Table, TableDefinition, TableHandle, + Database, MultimapTableDefinition, ReadOnlyTable, ReadableMultimapTable, ReadableTable, + TableDefinition, }; use crate::{ @@ -23,126 +25,85 @@ use crate::{ ranger::{Fingerprint, Range, RangeEntry}, store::Store as _, sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry}, - AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes, + AuthorId, Capability, CapabilityKind, NamespaceId, PeerIdBytes, }; -use super::{pubkeys::MemPublicKeyStore, ImportNamespaceOutcome, OpenError, PublicKeyStore}; +use super::{pubkeys::MemPublicKeyStore, ImportNamespaceOutcome, OpenError, PublicKeyStore, Query}; -/// Manages the replicas and authors for an instance. -#[derive(Debug, Clone)] -pub struct Store { - db: Arc, - open_replicas: Arc>>, - pubkeys: MemPublicKeyStore, -} +mod bounds; +mod migrations; +mod query; +mod ranges; + +use self::bounds::{ByKeyBounds, RecordsBounds}; +use self::query::QueryIterator; +use self::ranges::{TableRange, TableReader}; + +pub use self::ranges::RecordsRange; // Table Definitions -// Authors -// Table -// Key: [u8; 32] # AuthorId -// Value: #[u8; 32] # Author +/// Table: Authors +/// Key: `[u8; 32]` # AuthorId +/// Value: `[u8; 32]` # Author const AUTHORS_TABLE: TableDefinition<&[u8; 32], &[u8; 32]> = TableDefinition::new("authors-1"); -// Namespaces -// Table -// Key: [u8; 32] # NamespaceId -// Value: #[u8; 32] # Namespace +/// Table: Namespaces v1 (replaced by Namespaces v2 in migration ) +/// Key: `[u8; 32]` # NamespaceId +/// Value: `[u8; 32]` # NamespaceSecret const NAMESPACES_TABLE_V1: TableDefinition<&[u8; 32], &[u8; 32]> = TableDefinition::new("namespaces-1"); -// Namespaces -// Table -// Key: [u8; 32] # NamespaceId -// Value: (u8, [u8; 32]) # (CapabilityKind, Capability) +/// Table: Namespaces v2 +/// Key: `[u8; 32]` # NamespaceId +/// Value: `(u8, [u8; 32])` # (CapabilityKind, Capability) const NAMESPACES_TABLE: TableDefinition<&[u8; 32], (u8, &[u8; 32])> = TableDefinition::new("namespaces-2"); -// Records -// Table -// Key: ([u8; 32], [u8; 32], Vec) # (NamespaceId, AuthorId, Key) -// Value: -// (u64, [u8; 32], [u8; 32], u64, [u8; 32]) -// # (timestamp, signature_namespace, signature_author, len, hash) +/// Table: Records +/// Key: `([u8; 32], [u8; 32], &[u8])` +/// # (NamespaceId, AuthorId, Key) +/// Value: `(u64, [u8; 32], [u8; 32], u64, [u8; 32])` +/// # (timestamp, signature_namespace, signature_author, len, hash) const RECORDS_TABLE: TableDefinition = TableDefinition::new("records-1"); - -// Latest by author -// Table -// Key: ([u8; 32], [u8; 32]) # (NamespaceId, AuthorId) -// Value: (u64, Vec) # (Timestamp, Key) -const LATEST_TABLE: TableDefinition = - TableDefinition::new("latest-by-author-1"); -type LatestKey<'a> = (&'a [u8; 32], &'a [u8; 32]); -type LatestValue<'a> = (u64, &'a [u8]); -type LatestTable<'a> = ReadOnlyTable<'a, LatestKey<'static>, LatestValue<'static>>; -type LatestRange<'a> = TableRange<'a, LatestKey<'static>, LatestValue<'static>>; - type RecordsId<'a> = (&'a [u8; 32], &'a [u8; 32], &'a [u8]); +type RecordsIdOwned = ([u8; 32], [u8; 32], Bytes); type RecordsValue<'a> = (u64, &'a [u8; 64], &'a [u8; 64], u64, &'a [u8; 32]); -type RecordsRange<'a> = TableRange<'a, RecordsId<'static>, RecordsValue<'static>>; type RecordsTable<'a> = ReadOnlyTable<'a, RecordsId<'static>, RecordsValue<'static>>; -type DbResult = Result; +/// Table: Latest per author +/// Key: `([u8; 32], [u8; 32])` # (NamespaceId, AuthorId) +/// Value: `(u64, Vec)` # (Timestamp, Key) +const LATEST_PER_AUTHOR_TABLE: TableDefinition = + TableDefinition::new("latest-by-author-1"); +type LatestPerAuthorKey<'a> = (&'a [u8; 32], &'a [u8; 32]); +type LatestPerAuthorValue<'a> = (u64, &'a [u8]); + +/// Table: Records by key +/// Key: `([u8; 32], Vec, [u8; 32]])` # (NamespaceId, Key, AuthorId) +/// Value: `()` +const RECORDS_BY_KEY_TABLE: TableDefinition = + TableDefinition::new("records-by-key-1"); +type RecordsByKeyId<'a> = (&'a [u8; 32], &'a [u8], &'a [u8; 32]); +type RecordsByKeyIdOwned = ([u8; 32], Bytes, [u8; 32]); + +/// Table: Peers per document. +/// Key: `[u8; 32]` # NamespaceId +/// Value: `(u64, [u8; 32])` # ([`Nanos`], &[`PeerIdBytes`]) representing the last time a peer was used. +const NAMESPACE_PEERS_TABLE: MultimapTableDefinition<&[u8; 32], (Nanos, &PeerIdBytes)> = + MultimapTableDefinition::new("sync-peers-1"); /// Number of seconds elapsed since [`std::time::SystemTime::UNIX_EPOCH`]. Used to register the /// last time a peer was useful in a document. // NOTE: resolution is nanoseconds, stored as a u64 since this covers ~500years from unix epoch, // which should be more than enough type Nanos = u64; -/// Peers stored per document. -/// - Key: [`NamespaceId::as_bytes`] -/// - Value: ([`Nanos`], &[`PeerIdBytes`]) representing the last time a peer was used. -const NAMESPACE_PEERS_TABLE: MultimapTableDefinition<&[u8; 32], (Nanos, &PeerIdBytes)> = - MultimapTableDefinition::new("sync-peers-1"); - -/// migration 001: populate the latest table (which did not exist before) -fn migration_001_populate_latest_table( - records_table: &Table, RecordsValue<'static>>, - latest_table: &mut Table, LatestValue<'static>>, -) -> Result<()> { - tracing::info!("Starting migration: 001_populate_latest_table"); - #[allow(clippy::type_complexity)] - let mut heads: HashMap<([u8; 32], [u8; 32]), (u64, Vec)> = HashMap::new(); - let iter = records_table.iter()?; - - for next in iter { - let next = next?; - let (namespace, author, key) = next.0.value(); - let (timestamp, _namespace_sig, _author_sig, _len, _hash) = next.1.value(); - heads - .entry((*namespace, *author)) - .and_modify(|e| { - if timestamp >= e.0 { - *e = (timestamp, key.to_vec()); - } - }) - .or_insert_with(|| (timestamp, key.to_vec())); - } - let len = heads.len(); - for ((namespace, author), (timestamp, key)) in heads { - latest_table.insert((&namespace, &author), (timestamp, key.as_slice()))?; - } - tracing::info!("Migration finished (inserted {} entries)", len); - Ok(()) -} -/// Migrate the namespaces table from V1 to V2. -fn migration_002_namespaces_v2( - namespaces_v1: Table<&[u8; 32], &[u8; 32]>, - namespaces_v2: &mut Table<&[u8; 32], (u8, &[u8; 32])>, -) -> Result<()> { - tracing::info!("Starting migration: 002_namespaces_v2"); - let mut entries = 0; - for res in namespaces_v1.iter()? { - let db_value = res?.1; - let secret_bytes = db_value.value(); - let capability = Capability::Write(NamespaceSecret::from_bytes(secret_bytes)); - let id = capability.id().to_bytes(); - let (raw_kind, raw_bytes) = capability.raw(); - namespaces_v2.insert(&id, (raw_kind, &raw_bytes))?; - entries += 1; - } - tracing::info!("Migration finished ({entries} entries)"); - Ok(()) +/// Manages the replicas and authors for an instance. +#[derive(Debug, Clone)] +pub struct Store { + db: Arc, + open_replicas: Arc>>, + pubkeys: MemPublicKeyStore, } impl Store { @@ -155,51 +116,27 @@ impl Store { // Setup all tables let write_tx = db.begin_write()?; { - let records_table = write_tx.open_table(RECORDS_TABLE)?; - let namespaces_v1_exists = write_tx - .list_tables()? - .any(|handle| handle.name() == NAMESPACES_TABLE_V1.name()); - let mut namespaces_v2 = write_tx.open_table(NAMESPACES_TABLE)?; - let _table = write_tx.open_table(AUTHORS_TABLE)?; - let mut latest_table = write_tx.open_table(LATEST_TABLE)?; + let _table = write_tx.open_table(RECORDS_TABLE)?; + let _table = write_tx.open_table(NAMESPACES_TABLE)?; + let _table = write_tx.open_table(LATEST_PER_AUTHOR_TABLE)?; let _table = write_tx.open_multimap_table(NAMESPACE_PEERS_TABLE)?; - - // migration 001: populate latest table if it was empty before - if latest_table.is_empty()? && !records_table.is_empty()? { - migration_001_populate_latest_table(&records_table, &mut latest_table)?; - } - - // migration 002: update namespaces from V1 to V2 - if namespaces_v1_exists { - let namespaces_v1 = write_tx.open_table(NAMESPACES_TABLE_V1)?; - migration_002_namespaces_v2(namespaces_v1, &mut namespaces_v2)?; - write_tx.delete_table(NAMESPACES_TABLE_V1)?; - } } write_tx.commit()?; + // Run database migrations + migrations::run_migrations(&db)?; + Ok(Store { db: Arc::new(db), open_replicas: Default::default(), pubkeys: Default::default(), }) } - - fn insert_author(&self, author: Author) -> Result<()> { - let write_tx = self.db.begin_write()?; - { - let mut author_table = write_tx.open_table(AUTHORS_TABLE)?; - author_table.insert(author.id().as_bytes(), &author.to_bytes())?; - } - write_tx.commit()?; - - Ok(()) - } } impl super::Store for Store { type Instance = StoreInstance; - type GetIter<'a> = RangeIterator<'a>; + type GetIter<'a> = QueryIterator<'a>; type ContentHashesIter<'a> = ContentHashesIterator<'a>; type LatestIter<'a> = LatestIterator<'a>; type AuthorsIter<'a> = std::vec::IntoIter>; @@ -262,7 +199,12 @@ impl super::Store for Store { } fn import_author(&self, author: Author) -> Result<()> { - self.insert_author(author)?; + let write_tx = self.db.begin_write()?; + { + let mut author_table = write_tx.open_table(AUTHORS_TABLE)?; + author_table.insert(author.id().as_bytes(), &author.to_bytes())?; + } + write_tx.commit()?; Ok(()) } @@ -312,12 +254,18 @@ impl super::Store for Store { if self.open_replicas.read().contains(namespace) { return Err(anyhow!("replica is not closed")); } - let start = range_start(namespace); - let end = range_end(namespace); let write_tx = self.db.begin_write()?; { let mut record_table = write_tx.open_table(RECORDS_TABLE)?; - record_table.drain(start..=end)?; + let bounds = RecordsBounds::namespace(*namespace); + record_table.drain(bounds.as_ref())?; + } + { + let mut table = write_tx.open_table(RECORDS_BY_KEY_TABLE)?; + let bounds = ByKeyBounds::namespace(*namespace); + let _ = table.drain(bounds.as_ref()); + } + { let mut namespace_table = write_tx.open_table(NAMESPACES_TABLE)?; namespace_table.remove(namespace.as_bytes())?; } @@ -328,36 +276,29 @@ impl super::Store for Store { fn get_many( &self, namespace: NamespaceId, - filter: super::GetFilter, + query: impl Into, ) -> Result> { - match filter { - super::GetFilter::All => self.get_all(namespace), - super::GetFilter::Key(key) => self.get_by_key(namespace, key), - super::GetFilter::Prefix(prefix) => self.get_by_prefix(namespace, prefix), - super::GetFilter::Author(author) => self.get_by_author(namespace, author), - super::GetFilter::AuthorAndPrefix(author, prefix) => { - self.get_by_author_and_prefix(namespace, author, prefix) - } - } + QueryIterator::new(&self.db, namespace, query.into()) } - fn get_one( + fn get_exact( &self, namespace: NamespaceId, author: AuthorId, key: impl AsRef<[u8]>, + include_empty: bool, ) -> Result> { let read_tx = self.db.begin_read()?; let record_table = read_tx.open_table(RECORDS_TABLE)?; - get_one(&record_table, namespace, author, key) + get_exact(&record_table, namespace, author, key, include_empty) } fn content_hashes(&self) -> Result> { - ContentHashesIterator::create(&self.db) + ContentHashesIterator::new(&self.db) } fn get_latest_for_each_author(&self, namespace: NamespaceId) -> Result> { - LatestIterator::create(&self.db, namespace) + LatestIterator::new(&self.db, namespace) } fn register_useful_peer(&self, namespace: NamespaceId, peer: crate::PeerIdBytes) -> Result<()> { @@ -454,129 +395,21 @@ fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result, + include_empty: bool, ) -> Result> { - let db_key = (namespace.as_ref(), author.as_ref(), key.as_ref()); - let record = record_table.get(db_key)?; - let Some(record) = record else { - return Ok(None); - }; - let (timestamp, namespace_sig, author_sig, len, hash) = record.value(); - // return early if the hash equals the hash of the empty byte range, which we treat as - // delete marker (tombstone). - if hash == Hash::EMPTY.as_bytes() { - return Ok(None); - } - - let record = Record::new(hash.into(), len, timestamp); - let id = RecordIdentifier::new(namespace, author, key); - let entry = Entry::new(id, record); - let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig); - let signed_entry = SignedEntry::new(entry_signature, entry); - - Ok(Some(signed_entry)) -} - -impl Store { - fn get_by_key( - &self, - namespace: NamespaceId, - key: impl AsRef<[u8]>, - ) -> Result> { - RangeIterator::namespace( - &self.db, - &namespace, - RangeFilter::Key(key.as_ref().to_vec()), - ) - } - fn get_by_author(&self, namespace: NamespaceId, author: AuthorId) -> Result> { - let author = author.as_bytes(); - let start = (namespace.as_bytes(), author, &[][..]); - let end = prefix_range_end(&start); - RangeIterator::with_range( - &self.db, - |table| match end { - Some(end) => table.range(start..(&end.0, &end.1, &end.2)), - None => table.range(start..), - }, - RangeFilter::None, - ) - } - - fn get_by_author_and_prefix( - &self, - namespace: NamespaceId, - author: AuthorId, - prefix: impl AsRef<[u8]>, - ) -> Result> { - let author = author.as_bytes(); - let start = (namespace.as_bytes(), author, prefix.as_ref()); - let end = prefix_range_end(&start); - RangeIterator::with_range( - &self.db, - |table| match end { - Some(end) => table.range(start..(&end.0, &end.1, &end.2)), - None => table.range(start..), - }, - RangeFilter::None, - ) - } - - fn get_by_prefix( - &self, - namespace: NamespaceId, - prefix: impl AsRef<[u8]>, - ) -> Result> { - RangeIterator::namespace( - &self.db, - &namespace, - RangeFilter::Prefix(prefix.as_ref().to_vec()), - ) - } - - fn get_all(&self, namespace: NamespaceId) -> Result> { - RangeIterator::namespace(&self.db, &namespace, RangeFilter::None) - } + let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref()); + let record = record_table.get(id)?; + Ok(record + .map(|r| into_entry(id, r.value())) + .filter(|entry| include_empty || !entry.is_empty())) } -/// Increment a byte string by one, by incrementing the last byte that is not 255 by one. -/// -/// Returns false if all bytes are 255. -fn increment_by_one(value: &mut [u8]) -> bool { - for char in value.iter_mut().rev() { - if *char != 255 { - *char += 1; - return true; - } else { - *char = 0; - } - } - false -} - -// Get the end point of a prefix range -// -// Increments the last byte of the byte represenation of `prefix` and returns it as an owned tuple -// with the parts of the new [`RecordsId`]. -// Returns `None` if all bytes are equal to 255. -fn prefix_range_end<'a>(prefix: &'a RecordsId<'a>) -> Option<([u8; 32], [u8; 32], Vec)> { - let (mut namespace, mut author, mut prefix) = (*prefix.0, *prefix.1, prefix.2.to_vec()); - if !increment_by_one(&mut prefix) - && !increment_by_one(&mut author) - && !increment_by_one(&mut namespace) - { - // we have all-255 keys, so open-ended range - None - } else { - Some((namespace, author, prefix)) - } -} - -/// [`NamespaceSecret`] specific wrapper around the [`Store`]. +/// A wrapper around [`Store`] for a specific [`NamespaceId`] #[derive(Debug, Clone)] pub struct StoreInstance { namespace: NamespaceId, @@ -589,13 +422,6 @@ impl StoreInstance { } } -fn range_start(namespace: &NamespaceId) -> RecordsId { - (namespace.as_bytes(), &[u8::MIN; 32], &[][..]) -} -fn range_end(namespace: &NamespaceId) -> RecordsId { - (namespace.as_bytes(), &[u8::MAX; 32], &[][..]) -} - impl PublicKeyStore for StoreInstance { fn public_key(&self, id: &[u8; 32]) -> std::result::Result { self.store.pubkeys.public_key(id) @@ -604,7 +430,9 @@ impl PublicKeyStore for StoreInstance { impl crate::ranger::Store for StoreInstance { type Error = anyhow::Error; - type RangeIterator<'a> = std::iter::Chain, RangeIterator<'a>>; + type RangeIterator<'a> = + Chain, Flatten>>>; + type ParentIterator<'a> = ParentIterator<'a>; /// Get a the first key (or the default if none is available). fn get_first(&self) -> Result { @@ -612,9 +440,8 @@ impl crate::ranger::Store for StoreInstance { let record_table = read_tx.open_table(RECORDS_TABLE)?; // TODO: verify this fetches all keys with this namespace - let start = range_start(&self.namespace); - let end = range_end(&self.namespace); - let mut records = record_table.range(start..=end)?; + let bounds = RecordsBounds::namespace(self.namespace); + let mut records = record_table.range(bounds.as_ref())?; let Some(record) = records.next() else { return Ok(RecordIdentifier::default()); @@ -626,22 +453,23 @@ impl crate::ranger::Store for StoreInstance { } fn get(&self, id: &RecordIdentifier) -> Result> { - self.store.get_one(id.namespace(), id.author(), id.key()) + self.store + .get_exact(id.namespace(), id.author(), id.key(), true) } fn len(&self) -> Result { let read_tx = self.store.db.begin_read()?; let record_table = read_tx.open_table(RECORDS_TABLE)?; - // TODO: verify this fetches all keys with this namespace - let start = range_start(&self.namespace); - let end = range_end(&self.namespace); - let records = record_table.range(start..=end)?; + let bounds = RecordsBounds::namespace(self.namespace); + let records = record_table.range(bounds.as_ref())?; Ok(records.count()) } fn is_empty(&self) -> Result { - Ok(self.len()? == 0) + let read_tx = self.store.db.begin_read()?; + let record_table = read_tx.open_table(RECORDS_TABLE)?; + Ok(record_table.is_empty()?) } fn get_fingerprint(&self, range: &Range) -> Result { @@ -658,27 +486,37 @@ impl crate::ranger::Store for StoreInstance { } fn put(&mut self, e: SignedEntry) -> Result<()> { + let id = e.id(); let write_tx = self.store.db.begin_write()?; { // insert into record table let mut record_table = write_tx.open_table(RECORDS_TABLE)?; let key = ( - &e.id().namespace().to_bytes(), - &e.id().author().to_bytes(), - e.id().key(), + &id.namespace().to_bytes(), + &id.author().to_bytes(), + id.key(), ); - let hash = e.content_hash(); + let hash = e.content_hash(); // let binding is needed let value = ( e.timestamp(), - &e.signature().namespace_signature().to_bytes(), - &e.signature().author_signature().to_bytes(), + &e.signature().namespace().to_bytes(), + &e.signature().author().to_bytes(), e.content_len(), hash.as_bytes(), ); record_table.insert(key, value)?; + // insert into by key index table + let mut idx_by_key = write_tx.open_table(RECORDS_BY_KEY_TABLE)?; + let key = ( + &id.namespace().to_bytes(), + id.key(), + &id.author().to_bytes(), + ); + idx_by_key.insert(key, ())?; + // insert into latest table - let mut latest_table = write_tx.open_table(LATEST_TABLE)?; + let mut latest_table = write_tx.open_table(LATEST_PER_AUTHOR_TABLE)?; let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes()); let value = (e.timestamp(), e.id().key()); latest_table.insert(key, value)?; @@ -691,83 +529,64 @@ impl crate::ranger::Store for StoreInstance { let iter = match range.x().cmp(range.y()) { // identity range: iter1 = all, iter2 = none Ordering::Equal => { - let start = range_start(&self.namespace); - let end = range_end(&self.namespace); // iterator for all entries in replica - let iter = RangeIterator::with_range( - &self.store.db, - |table| table.range(start..=end), - RangeFilter::None, - )?; - // empty iterator, returns nothing - let iter2 = RangeIterator::empty(&self.store.db)?; - iter.chain(iter2) + let bounds = RecordsBounds::namespace(self.namespace); + let iter = RecordsRange::with_bounds(&self.store.db, bounds)?; + chain_none(iter) } // regular range: iter1 = x <= t < y, iter2 = none Ordering::Less => { - let start = range.x().as_byte_tuple(); - let end = range.y().as_byte_tuple(); // iterator for entries from range.x to range.y - let iter = RangeIterator::with_range( - &self.store.db, - |table| table.range(start..end), - RangeFilter::None, - )?; - // empty iterator - let iter2 = RangeIterator::empty(&self.store.db)?; - iter.chain(iter2) - // wrap-around range: iter1 = y <= t, iter2 = x >= t + let start = Bound::Included(range.x().to_byte_tuple()); + let end = Bound::Excluded(range.y().to_byte_tuple()); + let bounds = RecordsBounds::new(start, end); + let iter = RecordsRange::with_bounds(&self.store.db, bounds)?; + chain_none(iter) } + // split range: iter1 = start <= t < y, iter2 = x <= t <= end Ordering::Greater => { - let start = range_start(&self.namespace); - let end = range.y().as_byte_tuple(); - // iterator for entries start to from range.y - let iter = RangeIterator::with_range( - &self.store.db, - |table| table.range(start..end), - RangeFilter::None, - )?; - let start = range.x().as_byte_tuple(); - let end = range_end(&self.namespace); + // iterator for entries from start to range.y + let end = Bound::Excluded(range.y().to_byte_tuple()); + let bounds = RecordsBounds::from_start(&self.namespace, end); + let iter = RecordsRange::with_bounds(&self.store.db, bounds)?; + // iterator for entries from range.x to end - let iter2 = RangeIterator::with_range( - &self.store.db, - |table| table.range(start..=end), - RangeFilter::None, - )?; - iter.chain(iter2) + let start = Bound::Included(range.x().to_byte_tuple()); + let bounds = RecordsBounds::to_end(&self.namespace, start); + let iter2 = RecordsRange::with_bounds(&self.store.db, bounds)?; + + iter.chain(Some(iter2).into_iter().flatten()) } }; Ok(iter) } - fn remove(&mut self, k: &RecordIdentifier) -> Result> { + fn remove(&mut self, id: &RecordIdentifier) -> Result> { let write_tx = self.store.db.begin_write()?; - let res = { - let mut records_table = write_tx.open_table(RECORDS_TABLE)?; - let key = (&k.namespace().to_bytes(), &k.author().to_bytes(), k.key()); - let record = records_table.remove(key)?; - record.map(|record| { - let (timestamp, namespace_sig, author_sig, len, hash) = record.value(); - let record = Record::new(hash.into(), len, timestamp); - let entry = Entry::new(k.clone(), record); - let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig); - SignedEntry::new(entry_signature, entry) - }) + let (namespace, author, key) = id.as_byte_tuple(); + { + let mut table = write_tx.open_table(RECORDS_BY_KEY_TABLE)?; + let id = (namespace, key, author); + table.remove(id)?; + } + let entry = { + let mut table = write_tx.open_table(RECORDS_TABLE)?; + let id = (namespace, author, key); + let value = table.remove(id)?; + value.map(|value| into_entry(id, value.value())) }; write_tx.commit()?; - Ok(res) + Ok(entry) } fn all(&self) -> Result> { - let iter = RangeIterator::namespace(&self.store.db, &self.namespace, RangeFilter::None)?; - let iter2 = RangeIterator::empty(&self.store.db)?; - Ok(iter.chain(iter2)) + let bounds = RecordsBounds::namespace(self.namespace); + let iter = RecordsRange::with_bounds(&self.store.db, bounds)?; + Ok(chain_none(iter)) } - type ParentIterator<'a> = ParentIterator<'a>; fn prefixes_of(&self, id: &RecordIdentifier) -> Result, Self::Error> { - ParentIterator::create( + ParentIterator::new( &self.store.db, id.namespace(), id.author(), @@ -775,29 +594,18 @@ impl crate::ranger::Store for StoreInstance { ) } - fn prefixed_by(&self, prefix: &RecordIdentifier) -> Result> { - let start = prefix.as_byte_tuple(); - let end = prefix_range_end(&start); - let iter = RangeIterator::with_range( - &self.store.db, - |table| match end { - Some(end) => table.range(start..(&end.0, &end.1, &end.2)), - None => table.range(start..), - }, - RangeFilter::None, - )?; - let iter2 = RangeIterator::empty(&self.store.db)?; - Ok(iter.chain(iter2)) + fn prefixed_by(&self, id: &RecordIdentifier) -> Result> { + let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes()); + let iter = RecordsRange::with_bounds(&self.store.db, bounds)?; + Ok(chain_none(iter)) } fn remove_prefix_filtered( &mut self, - prefix: &RecordIdentifier, + id: &RecordIdentifier, predicate: impl Fn(&Record) -> bool, ) -> Result { - let start = prefix.as_byte_tuple(); - let end = prefix_range_end(&start); - + let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes()); let write_tx = self.store.db.begin_write()?; let count = { let mut table = write_tx.open_table(RECORDS_TABLE)?; @@ -807,10 +615,7 @@ impl crate::ranger::Store for StoreInstance { predicate(&record) }; - let iter = match end { - Some(end) => table.drain_filter(start..(&end.0, &end.1, &end.2), cb)?, - None => table.drain_filter(start.., cb)?, - }; + let iter = table.drain_filter(bounds.as_ref(), cb)?; iter.count() }; write_tx.commit()?; @@ -818,38 +623,36 @@ impl crate::ranger::Store for StoreInstance { } } +fn chain_none<'a, I: Iterator + 'a, T>( + iter: I, +) -> Chain>> { + iter.chain(None.into_iter().flatten()) +} + /// Iterator over parent entries, i.e. entries with the same namespace and author, and a key which /// is a prefix of the key passed to the iterator. -#[self_referencing] +#[derive(Debug)] pub struct ParentIterator<'a> { - read_tx: ReadTransaction<'a>, - #[borrows(read_tx)] - #[covariant] - record_table: RecordsTable<'this>, + reader: TableReader<'a, RecordsId<'static>, RecordsValue<'static>>, namespace: NamespaceId, author: AuthorId, key: Vec, } impl<'a> ParentIterator<'a> { - fn create( + fn new( db: &'a Arc, namespace: NamespaceId, author: AuthorId, key: Vec, ) -> anyhow::Result { - let iter = Self::try_new( - db.begin_read()?, - |read_tx| { - read_tx - .open_table(RECORDS_TABLE) - .map_err(anyhow::Error::from) - }, + let reader = TableReader::new(db, |tx| tx.open_table(RECORDS_TABLE))?; + Ok(Self { + reader, namespace, author, key, - )?; - Ok(iter) + }) } } @@ -857,49 +660,28 @@ impl Iterator for ParentIterator<'_> { type Item = Result; fn next(&mut self) -> Option { - self.with_mut(|fields| { - while !fields.key.is_empty() { - let entry = get_one( - fields.record_table, - *fields.namespace, - *fields.author, - &fields.key, - ); - fields.key.pop(); - match entry { - Err(err) => return Some(Err(err)), - Ok(Some(entry)) => return Some(Ok(entry)), - Ok(None) => continue, - } + let records_table = self.reader.table(); + while !self.key.is_empty() { + let entry = get_exact(records_table, self.namespace, self.author, &self.key, false); + self.key.pop(); + match entry { + Err(err) => return Some(Err(err)), + Ok(Some(entry)) => return Some(Ok(entry)), + Ok(None) => continue, } - None - }) + } + None } } /// Iterator over all content hashes for the fs store. -#[self_referencing] -pub struct ContentHashesIterator<'a> { - read_tx: ReadTransaction<'a>, - #[borrows(read_tx)] - #[covariant] - record_table: RecordsTable<'this>, - #[covariant] - #[borrows(record_table)] - records: RecordsRange<'this>, -} +#[derive(Debug)] +pub struct ContentHashesIterator<'a>(RecordsRange<'a>); + impl<'a> ContentHashesIterator<'a> { - fn create(db: &'a Arc) -> anyhow::Result { - let iter = Self::try_new( - db.begin_read()?, - |read_tx| { - read_tx - .open_table(RECORDS_TABLE) - .map_err(anyhow::Error::from) - }, - |table| table.iter().map_err(anyhow::Error::from), - )?; - Ok(iter) + fn new(db: &'a Arc) -> anyhow::Result { + let range = RecordsRange::new(db, |table| table.iter())?; + Ok(Self(range)) } } @@ -907,44 +689,30 @@ impl Iterator for ContentHashesIterator<'_> { type Item = Result; fn next(&mut self) -> Option { - self.with_mut(|fields| match fields.records.next() { - None => None, - Some(Err(err)) => Some(Err(err.into())), - Some(Ok((_key, value))) => { - let (_timestamp, _namespace_sig, _author_sig, _len, hash) = value.value(); - Some(Ok(Hash::from(hash))) - } + self.0.next_mapped(|_key, value| { + let (_timestamp, _namespace_sig, _author_sig, _len, hash) = value; + Hash::from(hash) }) } } /// Iterator over the latest entry per author. -#[self_referencing] -pub struct LatestIterator<'a> { - read_tx: ReadTransaction<'a>, - #[borrows(read_tx)] - #[covariant] - record_table: LatestTable<'this>, - #[covariant] - #[borrows(record_table)] - records: LatestRange<'this>, -} +#[derive(Debug)] +pub struct LatestIterator<'a>( + TableRange<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>, +); + impl<'a> LatestIterator<'a> { - fn create(db: &'a Arc, namespace: NamespaceId) -> anyhow::Result { - let iter = Self::try_new( - db.begin_read()?, - |read_tx| { - read_tx - .open_table(LATEST_TABLE) - .map_err(anyhow::Error::from) - }, + fn new(db: &'a Arc, namespace: NamespaceId) -> anyhow::Result { + Ok(Self(TableRange::new( + db, + |tx| tx.open_table(LATEST_PER_AUTHOR_TABLE), |table| { let start = (namespace.as_bytes(), &[u8::MIN; 32]); let end = (namespace.as_bytes(), &[u8::MAX; 32]); - table.range(start..=end).map_err(anyhow::Error::from) + table.range(start..=end) }, - )?; - Ok(iter) + )?)) } } @@ -952,124 +720,29 @@ impl Iterator for LatestIterator<'_> { type Item = Result<(AuthorId, u64, Vec)>; fn next(&mut self) -> Option { - self.with_mut(|fields| match fields.records.next() { - None => None, - Some(Err(err)) => Some(Err(err.into())), - Some(Ok((key, value))) => { - let (_namespace, author) = key.value(); - let (timestamp, key) = value.value(); - Some(Ok((author.into(), timestamp, key.to_vec()))) - } + self.0.next_mapped(|key, value| { + let (_namespace, author) = key; + let (timestamp, key) = value; + (author.into(), timestamp, key.to_vec()) }) } } -#[self_referencing] -pub struct RangeIterator<'a> { - read_tx: ReadTransaction<'a>, - #[borrows(read_tx)] - #[covariant] - record_table: RecordsTable<'this>, - #[covariant] - #[borrows(record_table)] - records: RecordsRange<'this>, - filter: RangeFilter, -} - -impl<'a> RangeIterator<'a> { - fn with_range( - db: &'a Arc, - range: impl for<'this> FnOnce(&'this RecordsTable<'this>) -> DbResult>, - filter: RangeFilter, - ) -> anyhow::Result { - let iter = RangeIterator::try_new( - db.begin_read()?, - |read_tx| { - read_tx - .open_table(RECORDS_TABLE) - .map_err(anyhow::Error::from) - }, - |record_table| range(record_table).map_err(anyhow::Error::from), - filter, - )?; - Ok(iter) - } - - fn namespace( - db: &'a Arc, - namespace: &NamespaceId, - filter: RangeFilter, - ) -> anyhow::Result { - let start = range_start(namespace); - let end = range_end(namespace); - Self::with_range(db, |table| table.range(start..=end), filter) - } - - fn empty(db: &'a Arc) -> anyhow::Result { - let start = (&[0u8; 32], &[0u8; 32], &[0u8][..]); - let end = (&[0u8; 32], &[0u8; 32], &[0u8][..]); - Self::with_range(db, |table| table.range(start..end), RangeFilter::None) - } -} - -#[derive(Debug)] -enum RangeFilter { - None, - Prefix(Vec), - Key(Vec), -} - -impl RangeFilter { - fn matches(&self, id: &RecordIdentifier) -> bool { - match self { - RangeFilter::None => true, - RangeFilter::Prefix(ref prefix) => id.key().starts_with(prefix), - RangeFilter::Key(ref key) => id.key() == key, - } - } -} - -impl std::fmt::Debug for RangeIterator<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RangeIterator").finish_non_exhaustive() - } -} - -impl Iterator for RangeIterator<'_> { - type Item = Result; - - fn next(&mut self) -> Option { - self.with_mut(|fields| { - for next in fields.records.by_ref() { - let next = match next { - Ok(next) => next, - Err(err) => return Some(Err(err.into())), - }; - - let (namespace, author, key) = next.0.value(); - let (timestamp, namespace_sig, author_sig, len, hash) = next.1.value(); - if hash == Hash::EMPTY.as_bytes() { - continue; - } - let id = RecordIdentifier::new(namespace, author, key); - if fields.filter.matches(&id) { - let record = Record::new(hash.into(), len, timestamp); - let entry = Entry::new(id, record); - let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig); - let signed_entry = SignedEntry::new(entry_signature, entry); - - return Some(Ok(signed_entry)); - } - } - None - }) - } +fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry { + let (namespace, author, key) = key; + let (timestamp, namespace_sig, author_sig, len, hash) = value; + let id = RecordIdentifier::new(namespace, author, key); + let record = Record::new(hash.into(), len, timestamp); + let entry = Entry::new(id, record); + let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig); + SignedEntry::new(entry_signature, entry) } #[cfg(test)] mod tests { use crate::ranger::Store as _; - use crate::store::{GetFilter, Store as _}; + use crate::store::Store as _; + use crate::NamespaceSecret; use super::*; @@ -1088,10 +761,7 @@ mod tests { replica.hash_and_insert(&key1, &author, b"v1")?; replica.hash_and_insert(&key2, &author, b"v2")?; let res = store - .get_many( - replica.id(), - GetFilter::AuthorAndPrefix(author.id(), vec![255]), - )? + .get_many(replica.id(), Query::author(author.id()).key_prefix([255]))? .collect::>>()?; assert_eq!(res.len(), 2); assert_eq!( @@ -1144,12 +814,14 @@ mod tests { } // get all - let entries = store.get_all(namespace.id())?.collect::>>()?; + let entries = store + .get_many(namespace.id(), Query::all())? + .collect::>>()?; assert_eq!(entries.len(), 5); // get all prefix let entries = store - .get_by_prefix(namespace.id(), "hello-")? + .get_many(namespace.id(), Query::key_prefix("hello-"))? .collect::>>()?; assert_eq!(entries.len(), 5); @@ -1164,7 +836,9 @@ mod tests { } // get latest - let entries = store.get_all(namespace.id())?.collect::>>()?; + let entries = store + .get_many(namespace.id(), Query::all())? + .collect::>>()?; assert_eq!(entries.len(), 0); Ok(()) @@ -1211,7 +885,7 @@ mod tests { // create a copy of our db file with the latest table deleted. let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| { - tx.delete_table(LATEST_TABLE)?; + tx.delete_table(LATEST_PER_AUTHOR_TABLE)?; Ok(()) })?; diff --git a/iroh-sync/src/store/fs/bounds.rs b/iroh-sync/src/store/fs/bounds.rs new file mode 100644 index 00000000..677929f3 --- /dev/null +++ b/iroh-sync/src/store/fs/bounds.rs @@ -0,0 +1,296 @@ +use std::ops::{Bound, RangeBounds}; + +use bytes::Bytes; + +use crate::{store::KeyFilter, AuthorId, NamespaceId}; + +use super::{RecordsByKeyId, RecordsByKeyIdOwned, RecordsId, RecordsIdOwned}; + +/// Bounds on the records table. +/// +/// Supports bounds by author, key +pub struct RecordsBounds(Bound, Bound); + +impl RecordsBounds { + pub fn new(start: Bound, end: Bound) -> Self { + Self(start, end) + } + + pub fn author_key(ns: NamespaceId, author: AuthorId, key_matcher: KeyFilter) -> Self { + let key_is_exact = matches!(key_matcher, KeyFilter::Exact(_)); + let key = match key_matcher { + KeyFilter::Any => Bytes::new(), + KeyFilter::Exact(key) => key, + KeyFilter::Prefix(prefix) => prefix, + }; + let author = author.to_bytes(); + let ns = ns.to_bytes(); + let mut author_end = author; + let mut ns_end = ns; + let mut key_end = key.to_vec(); + + let start = (ns, author, key); + + let end = if key_is_exact { + Bound::Included(start.clone()) + } else if increment_by_one(&mut key_end) { + Bound::Excluded((ns, author, key_end.into())) + } else if increment_by_one(&mut author_end) { + Bound::Excluded((ns, author_end, Bytes::new())) + } else if increment_by_one(&mut ns_end) { + Bound::Excluded((ns_end, [0u8; 32], Bytes::new())) + } else { + Bound::Unbounded + }; + + Self(Bound::Included(start), end) + } + + pub fn author_prefix(ns: NamespaceId, author: AuthorId, prefix: Bytes) -> Self { + RecordsBounds::author_key(ns, author, KeyFilter::Prefix(prefix)) + } + + pub fn namespace(ns: NamespaceId) -> Self { + Self::new(Self::namespace_start(&ns), Self::namespace_end(&ns)) + } + + pub fn from_start(ns: &NamespaceId, end: Bound) -> Self { + Self::new(Self::namespace_start(ns), end) + } + + pub fn to_end(ns: &NamespaceId, start: Bound) -> Self { + Self::new(start, Self::namespace_end(ns)) + } + + pub fn as_ref(&self) -> (Bound, Bound) { + fn map(id: &RecordsIdOwned) -> RecordsId { + (&id.0, &id.1, &id.2[..]) + } + (map_bound(&self.0, map), map_bound(&self.1, map)) + } + + fn namespace_start(namespace: &NamespaceId) -> Bound { + Bound::Included((namespace.to_bytes(), [0u8; 32], Bytes::new())) + } + + fn namespace_end(namespace: &NamespaceId) -> Bound { + let mut ns_end = namespace.to_bytes(); + if increment_by_one(&mut ns_end) { + Bound::Excluded((ns_end, [0u8; 32], Bytes::new())) + } else { + Bound::Unbounded + } + } +} + +impl RangeBounds for RecordsBounds { + fn start_bound(&self) -> Bound<&RecordsIdOwned> { + map_bound(&self.0, |s| s) + } + + fn end_bound(&self) -> Bound<&RecordsIdOwned> { + map_bound(&self.1, |s| s) + } +} + +impl From<(Bound, Bound)> for RecordsBounds { + fn from(value: (Bound, Bound)) -> Self { + Self::new(value.0, value.1) + } +} + +/// Bounds for the by-key index table. +/// +/// Supports bounds by key. +pub struct ByKeyBounds(Bound, Bound); +impl ByKeyBounds { + pub fn new(ns: NamespaceId, matcher: &KeyFilter) -> Self { + match matcher { + KeyFilter::Any => Self::namespace(ns), + KeyFilter::Exact(key) => { + let start = (ns.to_bytes(), key.clone(), [0u8; 32]); + let end = (ns.to_bytes(), key.clone(), [255u8; 32]); + Self(Bound::Included(start), Bound::Included(end)) + } + KeyFilter::Prefix(ref prefix) => { + let start = Bound::Included((ns.to_bytes(), prefix.clone(), [0u8; 32])); + + let mut ns_end = ns.to_bytes(); + let mut key_end = prefix.to_vec(); + let end = if increment_by_one(&mut key_end) { + Bound::Excluded((ns.to_bytes(), key_end.into(), [0u8; 32])) + } else if increment_by_one(&mut ns_end) { + Bound::Excluded((ns_end, Bytes::new(), [0u8; 32])) + } else { + Bound::Unbounded + }; + Self(start, end) + } + } + } + + pub fn namespace(ns: NamespaceId) -> Self { + let start = Bound::Included((ns.to_bytes(), Bytes::new(), [0u8; 32])); + let mut ns_end = ns.to_bytes(); + let end = if increment_by_one(&mut ns_end) { + Bound::Excluded((ns_end, Bytes::new(), [0u8; 32])) + } else { + Bound::Unbounded + }; + Self(start, end) + } + + pub fn as_ref(&self) -> (Bound, Bound) { + fn map(id: &RecordsByKeyIdOwned) -> RecordsByKeyId { + (&id.0, &id.1[..], &id.2) + } + (map_bound(&self.0, map), map_bound(&self.1, map)) + } +} + +impl RangeBounds for ByKeyBounds { + fn start_bound(&self) -> Bound<&RecordsByKeyIdOwned> { + map_bound(&self.0, |s| s) + } + + fn end_bound(&self) -> Bound<&RecordsByKeyIdOwned> { + map_bound(&self.1, |s| s) + } +} + +/// Increment a byte string by one, by incrementing the last byte that is not 255 by one. +/// +/// Returns false if all bytes are 255. +fn increment_by_one(value: &mut [u8]) -> bool { + for char in value.iter_mut().rev() { + if *char != 255 { + *char += 1; + return true; + } else { + *char = 0; + } + } + false +} + +fn map_bound<'a, T, U: 'a>(bound: &'a Bound, f: impl Fn(&'a T) -> U) -> Bound { + match bound { + Bound::Unbounded => Bound::Unbounded, + Bound::Included(t) => Bound::Included(f(t)), + Bound::Excluded(t) => Bound::Excluded(f(t)), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn records_bounds() { + let ns = NamespaceId::from(&[255u8; 32]); + + let bounds = RecordsBounds::namespace(ns); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), [0u8; 32], Bytes::new())) + ); + assert_eq!(bounds.end_bound(), Bound::Unbounded); + + let a = AuthorId::from(&[255u8; 32]); + + let bounds = RecordsBounds::author_key(ns, a, KeyFilter::Any); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), a.to_bytes(), Bytes::new())) + ); + assert_eq!(bounds.end_bound(), Bound::Unbounded); + + let a = AuthorId::from(&[0u8; 32]); + let mut a_end = a.to_bytes(); + a_end[31] = 1; + let bounds = RecordsBounds::author_key(ns, a, KeyFilter::Any); + assert_eq!( + bounds.end_bound(), + Bound::Excluded(&(ns.to_bytes(), a_end, Default::default())) + ); + + let bounds = RecordsBounds::author_key(ns, a, KeyFilter::Prefix(vec![1u8].into())); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), a.to_bytes(), vec![1u8].into())) + ); + assert_eq!( + bounds.end_bound(), + Bound::Excluded(&(ns.to_bytes(), a.to_bytes(), vec![2u8].into())) + ); + + let bounds = RecordsBounds::author_key(ns, a, KeyFilter::Exact(vec![1u8].into())); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), a.to_bytes(), vec![1u8].into())) + ); + assert_eq!( + bounds.end_bound(), + Bound::Included(&(ns.to_bytes(), a.to_bytes(), vec![1u8].into())) + ); + } + + #[test] + fn by_key_bounds() { + let ns = NamespaceId::from(&[255u8; 32]); + + let bounds = ByKeyBounds::namespace(ns); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), Bytes::new(), [0u8; 32])) + ); + assert_eq!(bounds.end_bound(), Bound::Unbounded); + + let bounds = ByKeyBounds::new(ns, &KeyFilter::Any); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), Bytes::new(), [0u8; 32])) + ); + assert_eq!(bounds.end_bound(), Bound::Unbounded); + + let bounds = ByKeyBounds::new(ns, &KeyFilter::Prefix(vec![1u8].into())); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), vec![1u8].into(), [0u8; 32])) + ); + assert_eq!( + bounds.end_bound(), + Bound::Excluded(&(ns.to_bytes(), vec![2u8].into(), [0u8; 32])) + ); + + let bounds = ByKeyBounds::new(ns, &KeyFilter::Prefix(vec![255u8].into())); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), vec![255u8].into(), [0u8; 32])) + ); + assert_eq!(bounds.end_bound(), Bound::Unbounded); + + let ns = NamespaceId::from(&[2u8; 32]); + let mut ns_end = ns.to_bytes(); + ns_end[31] = 3u8; + let bounds = ByKeyBounds::new(ns, &KeyFilter::Prefix(vec![255u8].into())); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), vec![255u8].into(), [0u8; 32])) + ); + assert_eq!( + bounds.end_bound(), + Bound::Excluded(&(ns_end, Bytes::new(), [0u8; 32])) + ); + + let bounds = ByKeyBounds::new(ns, &KeyFilter::Exact(vec![1u8].into())); + assert_eq!( + bounds.start_bound(), + Bound::Included(&(ns.to_bytes(), vec![1u8].into(), [0u8; 32])) + ); + assert_eq!( + bounds.end_bound(), + Bound::Included(&(ns.to_bytes(), vec![1u8].into(), [255u8; 32])) + ); + } +} diff --git a/iroh-sync/src/store/fs/migrations.rs b/iroh-sync/src/store/fs/migrations.rs new file mode 100644 index 00000000..7e6901a7 --- /dev/null +++ b/iroh-sync/src/store/fs/migrations.rs @@ -0,0 +1,120 @@ +use std::collections::HashMap; + +use anyhow::Result; +use redb::{Database, ReadableTable, TableHandle, WriteTransaction}; +use tracing::{debug, info}; + +use crate::{Capability, NamespaceSecret}; + +use super::{ + LATEST_PER_AUTHOR_TABLE, NAMESPACES_TABLE, NAMESPACES_TABLE_V1, RECORDS_BY_KEY_TABLE, + RECORDS_TABLE, +}; + +/// Run all database migrations, if needed. +pub fn run_migrations(db: &Database) -> Result<()> { + run_migration(db, migration_001_populate_latest_table)?; + run_migration(db, migration_002_namespaces_v2)?; + run_migration(db, migration_003_populate_by_key_index)?; + Ok(()) +} + +fn run_migration(db: &Database, f: F) -> Result<()> +where + F: Fn(&WriteTransaction) -> Result, +{ + let name = std::any::type_name::(); + let name = name.split("::").last().unwrap(); + let tx = db.begin_write()?; + debug!("Start migration {name}"); + match f(&tx)? { + MigrateOutcome::Execute(len) => { + tx.commit()?; + info!("Executed migration {name} ({len} rows affected)"); + } + MigrateOutcome::Skip => debug!("Skip migration {name}: Not needed"), + } + Ok(()) +} + +enum MigrateOutcome { + Skip, + Execute(usize), +} + +/// migration 001: populate the latest table (which did not exist before) +fn migration_001_populate_latest_table(tx: &WriteTransaction) -> Result { + let mut latest_table = tx.open_table(LATEST_PER_AUTHOR_TABLE)?; + let records_table = tx.open_table(RECORDS_TABLE)?; + if !latest_table.is_empty()? || records_table.is_empty()? { + return Ok(MigrateOutcome::Skip); + } + + #[allow(clippy::type_complexity)] + let mut heads: HashMap<([u8; 32], [u8; 32]), (u64, Vec)> = HashMap::new(); + let iter = records_table.iter()?; + + for next in iter { + let next = next?; + let (namespace, author, key) = next.0.value(); + let (timestamp, _namespace_sig, _author_sig, _len, _hash) = next.1.value(); + heads + .entry((*namespace, *author)) + .and_modify(|e| { + if timestamp >= e.0 { + *e = (timestamp, key.to_vec()); + } + }) + .or_insert_with(|| (timestamp, key.to_vec())); + } + let len = heads.len(); + for ((namespace, author), (timestamp, key)) in heads { + latest_table.insert((&namespace, &author), (timestamp, key.as_slice()))?; + } + Ok(MigrateOutcome::Execute(len)) +} + +/// Migrate the namespaces table from V1 to V2. +fn migration_002_namespaces_v2(tx: &WriteTransaction) -> Result { + let namespaces_v1_exists = tx + .list_tables()? + .any(|handle| handle.name() == NAMESPACES_TABLE_V1.name()); + // migration 002: update namespaces from V1 to V2 + if !namespaces_v1_exists { + return Ok(MigrateOutcome::Skip); + } + let namespaces_v1 = tx.open_table(NAMESPACES_TABLE_V1)?; + let mut namespaces_v2 = tx.open_table(NAMESPACES_TABLE)?; + let mut entries = 0; + for res in namespaces_v1.iter()? { + let db_value = res?.1; + let secret_bytes = db_value.value(); + let capability = Capability::Write(NamespaceSecret::from_bytes(secret_bytes)); + let id = capability.id().to_bytes(); + let (raw_kind, raw_bytes) = capability.raw(); + namespaces_v2.insert(&id, (raw_kind, &raw_bytes))?; + entries += 1; + } + tx.delete_table(NAMESPACES_TABLE_V1)?; + Ok(MigrateOutcome::Execute(entries)) +} + +/// migration 003: populate the by_key index table(which did not exist before) +fn migration_003_populate_by_key_index(tx: &WriteTransaction) -> Result { + let mut by_key_table = tx.open_table(RECORDS_BY_KEY_TABLE)?; + let records_table = tx.open_table(RECORDS_TABLE)?; + if !by_key_table.is_empty()? || records_table.is_empty()? { + return Ok(MigrateOutcome::Skip); + } + + let iter = records_table.iter()?; + let mut len = 0; + for next in iter { + let next = next?; + let (namespace, author, key) = next.0.value(); + let id = (namespace, key, author); + by_key_table.insert(id, ())?; + len += 1; + } + Ok(MigrateOutcome::Execute(len)) +} diff --git a/iroh-sync/src/store/fs/query.rs b/iroh-sync/src/store/fs/query.rs new file mode 100644 index 00000000..d21033c7 --- /dev/null +++ b/iroh-sync/src/store/fs/query.rs @@ -0,0 +1,161 @@ +use std::sync::Arc; + +use anyhow::Result; +use iroh_bytes::Hash; +use redb::Database; + +use crate::{ + store::{ + util::{IndexKind, LatestPerKeySelector, SelectorRes}, + AuthorFilter, KeyFilter, Query, + }, + AuthorId, NamespaceId, SignedEntry, +}; + +use super::{ + bounds::{ByKeyBounds, RecordsBounds}, + ranges::{RecordsByKeyRange, RecordsRange}, + RecordsValue, +}; + +/// A query iterator for entry queries. +#[derive(Debug)] +pub struct QueryIterator<'a> { + range: QueryRange<'a>, + query: Query, + offset: u64, + count: u64, +} + +#[derive(Debug)] +enum QueryRange<'a> { + AuthorKey { + range: RecordsRange<'a>, + key_filter: KeyFilter, + }, + KeyAuthor { + range: RecordsByKeyRange<'a>, + author_filter: AuthorFilter, + selector: Option, + }, +} + +impl<'a> QueryIterator<'a> { + pub fn new(db: &'a Arc, namespace: NamespaceId, query: Query) -> Result { + let index_kind = IndexKind::from(&query); + let range = match index_kind { + IndexKind::AuthorKey { range, key_filter } => { + let (bounds, filter) = match range { + // single author: both author and key are selected via the range. therefore + // set `filter` to `Any`. + AuthorFilter::Exact(author) => ( + RecordsBounds::author_key(namespace, author, key_filter), + KeyFilter::Any, + ), + // no author set => full table scan with the provided key filter + AuthorFilter::Any => (RecordsBounds::namespace(namespace), key_filter), + }; + let range = RecordsRange::with_bounds(db, bounds)?; + QueryRange::AuthorKey { + range, + key_filter: filter, + } + } + IndexKind::KeyAuthor { + range, + author_filter, + latest_per_key, + } => { + let bounds = ByKeyBounds::new(namespace, &range); + let range = RecordsByKeyRange::with_bounds(db, bounds)?; + let selector = latest_per_key.then(LatestPerKeySelector::default); + QueryRange::KeyAuthor { + author_filter, + range, + selector, + } + } + }; + + Ok(QueryIterator { + range, + query, + offset: 0, + count: 0, + }) + } +} + +impl<'a> Iterator for QueryIterator<'a> { + type Item = Result; + + fn next(&mut self) -> Option> { + // early-return if we reached the query limit. + if let Some(limit) = self.query.limit() { + if self.count >= limit { + return None; + } + } + loop { + let next = match &mut self.range { + QueryRange::AuthorKey { range, key_filter } => { + // get the next entry from the query range, filtered by the key and empty filters + range.next_filtered(&self.query.sort_direction, |(_ns, _author, key), value| { + key_filter.matches(key) + && (self.query.include_empty || !value_is_empty(&value)) + }) + } + + QueryRange::KeyAuthor { + range, + author_filter, + selector, + } => loop { + // get the next entry from the query range, filtered by the author filter + let next = range + .next_filtered(&self.query.sort_direction, |(_ns, _key, author)| { + author_filter.matches(&(AuthorId::from(author))) + }); + + // early-break if next contains Err + let next = match next.transpose() { + Err(err) => break Some(Err(err)), + Ok(next) => next, + }; + + // push the entry into the selector. if active, only the latest entry + // for each key will be emitted. + let next = match selector { + None => next, + Some(selector) => match selector.push(next) { + SelectorRes::Continue => continue, + SelectorRes::Finished => None, + SelectorRes::Some(res) => Some(res), + }, + }; + + // skip the entry if empty and no empty entries requested + if !self.query.include_empty && matches!(&next, Some(e) if e.is_empty()) { + continue; + } + + break next.map(Result::Ok); + }, + }; + + // skip the entry if we didn't get past the requested offset yet. + if self.offset < self.query.offset() && matches!(next, Some(Ok(_))) { + self.offset += 1; + continue; + } + + self.count += 1; + return next; + } + } +} + +fn value_is_empty(value: &RecordsValue) -> bool { + let (_timestamp, _namespace_sig, _author_sig, _len, hash) = value; + *hash == Hash::EMPTY.as_bytes() +} diff --git a/iroh-sync/src/store/fs/ranges.rs b/iroh-sync/src/store/fs/ranges.rs new file mode 100644 index 00000000..48eeefe4 --- /dev/null +++ b/iroh-sync/src/store/fs/ranges.rs @@ -0,0 +1,278 @@ +//! Ranges on [`redb`] tables +//! +//! Because the [`redb`] types all contain references, this uses [`ouroboros`] to create +//! self-referential structs so that we can embed the [`Range`] iterator together with the +//! [`ReadableTable`] and the [`ReadTransaction`] in a struct for our iterators returned from the +//! store. + +use std::{fmt, sync::Arc}; + +use ouroboros::self_referencing; +use redb::{ + Database, Range, ReadOnlyTable, ReadTransaction, ReadableTable, RedbKey, RedbValue, + StorageError, TableError, +}; + +use crate::{store::SortDirection, SignedEntry}; + +use super::{ + bounds::{ByKeyBounds, RecordsBounds}, + into_entry, RecordsByKeyId, RecordsId, RecordsValue, RECORDS_BY_KEY_TABLE, RECORDS_TABLE, +}; + +/// A [`ReadTransaction`] with a [`ReadOnlyTable`] that can be stored in a struct. +/// +/// This uses [`ouroboros::self_referencing`] to store a [`ReadTransaction`] and a [`ReadOnlyTable`] +/// with self-referencing. +pub struct TableReader<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static>( + TableReaderInner<'a, K, V>, +); + +#[self_referencing] +struct TableReaderInner<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static> { + #[debug("ReadTransaction")] + read_tx: ReadTransaction<'a>, + #[borrows(read_tx)] + #[covariant] + table: ReadOnlyTable<'this, K, V>, +} + +impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableReader<'a, K, V> { + /// Create a new [`TableReader`] + pub fn new( + db: &'a Arc, + table_fn: impl for<'this> FnOnce( + &'this ReadTransaction<'this>, + ) -> Result, TableError>, + ) -> anyhow::Result { + let reader = TableReaderInner::try_new(db.begin_read()?, |read_tx| { + table_fn(read_tx).map_err(anyhow::Error::from) + })?; + Ok(Self(reader)) + } + + /// Get a reference to the [`ReadOnlyTable`]; + pub fn table(&self) -> &ReadOnlyTable { + self.0.borrow_table() + } +} + +impl<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static> fmt::Debug for TableReader<'a, K, V> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "TableReader({:?})", self.table()) + } +} + +/// A range reader for a [`ReadOnlyTable`] that can be stored in a struct. +/// +/// This uses [`ouroboros::self_referencing`] to store a [`ReadTransaction`], a [`ReadOnlyTable`] +/// and a [`Range`] together. Useful to build iterators with. +pub struct TableRange<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static>( + TableRangeReaderInner<'a, K, V>, +); + +#[self_referencing] +struct TableRangeReaderInner<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static> { + #[debug("ReadTransaction")] + read_tx: ReadTransaction<'a>, + #[borrows(read_tx)] + #[covariant] + table: ReadOnlyTable<'this, K, V>, + #[covariant] + #[borrows(table)] + range: Range<'this, K, V>, +} + +impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableRange<'a, K, V> { + /// Create a new [`TableReader`] + pub fn new(db: &'a Arc, table_fn: TF, range_fn: RF) -> anyhow::Result + where + TF: for<'s> FnOnce(&'s ReadTransaction<'s>) -> Result, TableError>, + RF: for<'s> FnOnce(&'s ReadOnlyTable<'s, K, V>) -> Result, StorageError>, + { + let reader = TableRangeReaderInner::try_new( + db.begin_read()?, + |tx| table_fn(tx).map_err(anyhow_err), + |table| range_fn(table).map_err(anyhow_err), + )?; + Ok(Self(reader)) + } + + /// Get a reference to the [`ReadOnlyTable`]; + pub fn table(&self) -> &ReadOnlyTable { + self.0.borrow_table() + } + + pub fn next_mapped( + &mut self, + map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> T, + ) -> Option> { + self.0.with_range_mut(|records| { + records + .next() + .map(|r| r.map_err(Into::into).map(|r| map(r.0.value(), r.1.value()))) + }) + } + + pub fn next_filtered( + &mut self, + direction: &SortDirection, + filter: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> bool, + map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> T, + ) -> Option> { + self.0.with_range_mut(|records| loop { + let next = match direction { + SortDirection::Asc => records.next(), + SortDirection::Desc => records.next_back(), + }; + match next { + None => break None, + Some(Err(err)) => break Some(Err(err.into())), + Some(Ok(res)) => match filter(res.0.value(), res.1.value()) { + false => continue, + true => break Some(Ok(map(res.0.value(), res.1.value()))), + }, + } + }) + } +} + +impl<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static> fmt::Debug for TableRange<'a, K, V> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "TableRangeReader({:?})", self.table()) + } +} + +/// An iterator over a range of entries from the records table. +#[derive(Debug)] +pub struct RecordsRange<'a>(TableRange<'a, RecordsId<'static>, RecordsValue<'static>>); +impl<'a> RecordsRange<'a> { + pub(super) fn new(db: &'a Arc, range_fn: RF) -> anyhow::Result + where + RF: for<'s> FnOnce( + &'s ReadOnlyTable<'s, RecordsId<'static>, RecordsValue<'static>>, + ) -> Result< + Range<'s, RecordsId<'static>, RecordsValue<'static>>, + StorageError, + >, + { + Ok(Self(TableRange::new( + db, + |tx| tx.open_table(RECORDS_TABLE), + range_fn, + )?)) + } + + pub(super) fn with_bounds( + db: &'a Arc, + bounds: RecordsBounds, + ) -> anyhow::Result { + Self::new(db, |table| table.range(bounds.as_ref())) + } + + /// Get the next item in the range. + /// + /// Omit items for which the `matcher` function returns false. + pub(super) fn next_filtered( + &mut self, + direction: &SortDirection, + filter: impl for<'x> Fn(RecordsId<'x>, RecordsValue<'x>) -> bool, + ) -> Option> { + self.0.next_filtered(direction, filter, into_entry) + } + + pub(super) fn next_mapped( + &mut self, + map: impl for<'x> Fn(RecordsId<'x>, RecordsValue<'x>) -> T, + ) -> Option> { + self.0.next_mapped(map) + } +} + +impl<'a> Iterator for RecordsRange<'a> { + type Item = anyhow::Result; + fn next(&mut self) -> Option { + self.0.next_mapped(into_entry) + } +} + +#[derive(derive_more::Debug)] +#[debug("RecordsByKeyRange")] +pub struct RecordsByKeyRange<'a>(RecordsByKeyRangeInner<'a>); + +#[self_referencing] +struct RecordsByKeyRangeInner<'a> { + read_tx: ReadTransaction<'a>, + + #[covariant] + #[borrows(read_tx)] + records_table: ReadOnlyTable<'this, RecordsId<'static>, RecordsValue<'static>>, + + #[covariant] + #[borrows(read_tx)] + by_key_table: ReadOnlyTable<'this, RecordsByKeyId<'static>, ()>, + + #[borrows(by_key_table)] + #[covariant] + by_key_range: Range<'this, RecordsByKeyId<'static>, ()>, +} + +impl<'a> RecordsByKeyRange<'a> { + pub fn new(db: &'a Arc, range_fn: RF) -> anyhow::Result + where + RF: for<'s> FnOnce( + &'s ReadOnlyTable<'s, RecordsByKeyId<'static>, ()>, + ) -> Result, ()>, StorageError>, + { + let inner = RecordsByKeyRangeInner::try_new( + db.begin_read()?, + |tx| tx.open_table(RECORDS_TABLE).map_err(anyhow_err), + |tx| tx.open_table(RECORDS_BY_KEY_TABLE).map_err(anyhow_err), + |table| range_fn(table).map_err(Into::into), + )?; + Ok(Self(inner)) + } + + pub fn with_bounds(db: &'a Arc, bounds: ByKeyBounds) -> anyhow::Result { + Self::new(db, |table| table.range(bounds.as_ref())) + } + + /// Get the next item in the range. + /// + /// Omit items for which the `matcher` function returns false. + pub fn next_filtered( + &mut self, + direction: &SortDirection, + filter: impl for<'x> Fn(RecordsByKeyId<'x>) -> bool, + ) -> Option> { + self.0.with_mut(|fields| { + let by_key_id = loop { + let next = match direction { + SortDirection::Asc => fields.by_key_range.next(), + SortDirection::Desc => fields.by_key_range.next_back(), + }; + match next { + Some(Ok(res)) => match filter(res.0.value()) { + false => continue, + true => break res.0, + }, + Some(Err(err)) => return Some(Err(err.into())), + None => return None, + } + }; + + let (namespace, key, author) = by_key_id.value(); + let records_id = (namespace, author, key); + let entry = fields.records_table.get(&records_id); + match entry { + Ok(Some(entry)) => Some(Ok(into_entry(records_id, entry.value()))), + Ok(None) => None, + Err(err) => Some(Err(err.into())), + } + }) + } +} + +fn anyhow_err(err: impl Into) -> anyhow::Error { + err.into() +} diff --git a/iroh-sync/src/store/memory.rs b/iroh-sync/src/store/memory.rs index 9a69e6d9..aa29fde1 100644 --- a/iroh-sync/src/store/memory.rs +++ b/iroh-sync/src/store/memory.rs @@ -7,6 +7,7 @@ use std::{ }; use anyhow::{anyhow, Result}; +use bytes::Bytes; use ed25519_dalek::{SignatureError, VerifyingKey}; use iroh_bytes::Hash; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard}; @@ -18,7 +19,11 @@ use crate::{ AuthorId, Capability, CapabilityKind, NamespaceId, PeerIdBytes, Record, }; -use super::{pubkeys::MemPublicKeyStore, ImportNamespaceOutcome, OpenError, PublicKeyStore}; +use super::{ + pubkeys::MemPublicKeyStore, + util::{IndexKind, LatestPerKeySelector, SelectorRes}, + ImportNamespaceOutcome, OpenError, PublicKeyStore, Query, SortDirection, +}; type SyncPeersCache = Arc>>>; @@ -37,18 +42,48 @@ pub struct Store { peers_per_doc: SyncPeersCache, } -type Rid = (AuthorId, Vec); -type Rvalue = SignedEntry; -type RecordMap = BTreeMap; +type Key = Bytes; type ReplicaRecordsOwned = BTreeMap; +#[derive(Debug, Default)] +struct RecordMap { + by_author: BTreeMap<(AuthorId, Key), SignedEntry>, + by_key: BTreeMap<(Key, AuthorId), ()>, +} + +impl RecordMap { + fn insert(&mut self, entry: SignedEntry) { + self.by_key + .insert((entry.id().key_bytes(), entry.author()), ()); + self.by_author + .insert((entry.author(), entry.id().key_bytes()), entry); + } + fn remove(&mut self, id: &RecordIdentifier) -> Option { + let entry = self.by_author.remove(&(id.author(), id.key_bytes())); + self.by_key.remove(&(id.key_bytes(), id.author())); + entry + } + fn len(&self) -> usize { + self.by_author.len() + } + fn retain(&mut self, f: impl Fn(&(AuthorId, Key), &mut SignedEntry) -> bool) { + self.by_author.retain(|key, value| { + let retain = f(key, value); + if !retain { + self.by_key.remove(&(key.1.clone(), key.0)); + } + retain + }) + } +} + type LatestByAuthorMapOwned = BTreeMap)>; type LatestMapOwned = HashMap; type LatestByAuthorMap<'a> = MappedRwLockReadGuard<'a, LatestByAuthorMapOwned>; impl super::Store for Store { type Instance = ReplicaStoreInstance; - type GetIter<'a> = RangeIterator<'a>; + type GetIter<'a> = QueryIterator<'a>; type ContentHashesIter<'a> = ContentHashesIterator<'a>; type AuthorsIter<'a> = std::vec::IntoIter>; type NamespaceIter<'a> = std::vec::IntoIter>; @@ -134,34 +169,31 @@ impl super::Store for Store { fn get_many( &self, namespace: NamespaceId, - filter: super::GetFilter, + query: impl Into, ) -> Result> { - match filter { - super::GetFilter::All => self.get_all(namespace), - super::GetFilter::Key(key) => self.get_by_key(namespace, key), - super::GetFilter::Prefix(prefix) => self.get_by_prefix(namespace, prefix), - super::GetFilter::Author(author) => self.get_by_author(namespace, author), - super::GetFilter::AuthorAndPrefix(author, prefix) => { - self.get_by_author_and_prefix(namespace, author, prefix) - } - } + let query = query.into(); + let records = self.replica_records.read(); + Ok(QueryIterator::new(records, namespace, query)) } - fn get_one( + fn get_exact( &self, namespace: NamespaceId, author: AuthorId, key: impl AsRef<[u8]>, + include_empty: bool, ) -> Result> { let inner = self.replica_records.read(); - - let value = inner - .get(&namespace) - .and_then(|records| records.get(&(author, key.as_ref().to_vec()))); - Ok(match value { + let Some(records) = inner.get(&namespace) else { + return Ok(None); + }; + let entry = records + .by_author + .get(&(author, key.as_ref().to_vec().into())); + Ok(match entry { + Some(entry) if !include_empty && entry.is_empty() => None, + Some(entry) => Some(entry.clone()), None => None, - Some(value) if value.is_empty() => None, - Some(value) => Some(value.clone()), }) } @@ -205,121 +237,6 @@ impl super::Store for Store { } } -impl Store { - fn get_by_key( - &self, - namespace: NamespaceId, - key: impl AsRef<[u8]>, - ) -> Result> { - let records = self.replica_records.read(); - let key = key.as_ref().to_vec(); - let filter = GetFilter::Key { namespace, key }; - - Ok(RangeIterator { - records, - filter, - index: 0, - }) - } - - fn get_by_prefix( - &self, - namespace: NamespaceId, - prefix: impl AsRef<[u8]>, - ) -> Result> { - let records = self.replica_records.read(); - let prefix = prefix.as_ref().to_vec(); - let filter = GetFilter::Prefix { namespace, prefix }; - - Ok(RangeIterator { - records, - filter, - index: 0, - }) - } - - fn get_by_author(&self, namespace: NamespaceId, author: AuthorId) -> Result> { - let records = self.replica_records.read(); - let filter = GetFilter::Author { namespace, author }; - - Ok(RangeIterator { - records, - filter, - index: 0, - }) - } - - fn get_by_author_and_prefix( - &self, - namespace: NamespaceId, - author: AuthorId, - prefix: Vec, - ) -> Result> { - let records = self.replica_records.read(); - let filter = GetFilter::AuthorAndPrefix { - namespace, - author, - prefix, - }; - - Ok(RangeIterator { - records, - filter, - index: 0, - }) - } - - fn get_all(&self, namespace: NamespaceId) -> Result> { - let records = self.replica_records.read(); - let filter = GetFilter::All { namespace }; - - Ok(RangeIterator { - records, - filter, - index: 0, - }) - } -} - -#[derive(Debug)] -enum GetFilter { - /// All entries. - All { namespace: NamespaceId }, - /// Filter by author. - Author { - namespace: NamespaceId, - author: AuthorId, - }, - /// Filter by key only. - Key { - namespace: NamespaceId, - key: Vec, - }, - /// Filter by prefix only. - Prefix { - namespace: NamespaceId, - prefix: Vec, - }, - /// Filter by author and prefix. - AuthorAndPrefix { - namespace: NamespaceId, - prefix: Vec, - author: AuthorId, - }, -} - -impl GetFilter { - fn namespace(&self) -> NamespaceId { - match self { - GetFilter::All { namespace } => *namespace, - GetFilter::Key { namespace, .. } => *namespace, - GetFilter::Prefix { namespace, .. } => *namespace, - GetFilter::Author { namespace, .. } => *namespace, - GetFilter::AuthorAndPrefix { namespace, .. } => *namespace, - } - } -} - /// Iterator over all content hashes in the memory store. #[derive(Debug)] pub struct ContentHashesIterator<'a> { @@ -333,7 +250,7 @@ impl<'a> Iterator for ContentHashesIterator<'a> { fn next(&mut self) -> Option { loop { let records = self.records.values().nth(self.namespace_i)?; - match records.values().nth(self.record_i) { + match records.by_author.values().nth(self.record_i) { None => { self.namespace_i += 1; self.record_i = 0; @@ -370,48 +287,126 @@ impl<'a> Iterator for LatestIterator<'a> { /// Iterator over entries in the memory store #[derive(Debug)] -pub struct RangeIterator<'a> { +pub struct QueryIterator<'a> { records: ReplicaRecords<'a>, - filter: GetFilter, - /// Current iteration index. - index: usize, + namespace: NamespaceId, + query: Query, + index: IndexKind, + selector: Option, + // current iterator index + position: usize, + // number of entries returned from the iterator + count: u64, + // number of entries skipped at the beginning + offset: u64, } -impl<'a> Iterator for RangeIterator<'a> { +impl<'a> QueryIterator<'a> { + fn new(records: ReplicaRecords<'a>, namespace: NamespaceId, query: Query) -> Self { + let index = IndexKind::from(&query); + let selector = match index { + IndexKind::KeyAuthor { latest_per_key, .. } if latest_per_key => { + Some(LatestPerKeySelector::default()) + } + _ => None, + }; + + Self { + records, + namespace, + query, + index, + selector, + position: 0, + offset: 0, + count: 0, + } + } +} + +impl<'a> Iterator for QueryIterator<'a> { type Item = Result; fn next(&mut self) -> Option { loop { - let records = self.records.get(&self.filter.namespace())?; - let entry = match self.filter { - GetFilter::All { .. } => records.iter().nth(self.index)?, - GetFilter::Key { ref key, .. } => records - .iter() - .filter(|((_, k), _)| k == key) - .nth(self.index)?, - GetFilter::Prefix { ref prefix, .. } => records - .iter() - .filter(|((_, k), _)| k.starts_with(prefix)) - .nth(self.index)?, - GetFilter::Author { ref author, .. } => records - .iter() - .filter(|((a, _), _)| a == author) - .nth(self.index)?, - GetFilter::AuthorAndPrefix { - ref prefix, - ref author, + if let Some(limit) = self.query.limit() { + if self.count >= limit { + return None; + } + } + + let records = self.records.get(&self.namespace)?; + + let entry = match &self.index { + IndexKind::AuthorKey { range, key_filter } => { + let mut iter = records + .by_author + .iter() + .filter(|(_key, entry)| { + range.matches(&entry.author()) + && key_filter.matches(entry.key()) + && (self.query.include_empty || !entry.is_empty()) + }) + .map(|(_key, entry)| entry); + + let next = match self.query.sort_direction { + SortDirection::Asc => iter.nth(self.position), + SortDirection::Desc => iter.nth_back(self.position), + }; + next.cloned() + } + IndexKind::KeyAuthor { + range, + author_filter, .. - } => records - .iter() - .filter(|((a, k), _)| a == author && k.starts_with(prefix)) - .nth(self.index)?, + } => loop { + let mut iter = records + .by_key + .keys() + .flat_map(|k| records.by_author.get(&(k.1, k.0.clone())).into_iter()) + .filter(|entry| { + range.matches(entry.key()) && author_filter.matches(&entry.author()) + }); + let next = match self.query.sort_direction { + SortDirection::Asc => iter.nth(self.position), + SortDirection::Desc => iter.nth_back(self.position), + }; + let next = next.cloned(); + + let next = match self.selector.as_mut() { + None => next, + Some(selector) => match selector.push(next) { + SelectorRes::Continue => { + self.position += 1; + continue; + } + SelectorRes::Finished => None, + SelectorRes::Some(res) => Some(res), + }, + }; + let Some(entry) = next else { + break None; + }; + + // final check for empty entries: if the selector is active, the latest + // entry for a key might be empty, so skip it if no empty entries were + // requested + if !self.query.include_empty && entry.is_empty() { + self.position += 1; + continue; + } else { + break Some(entry); + } + }, }; - self.index += 1; - if entry.1.is_empty() { + + self.position += 1; + self.offset += 1; + if self.offset <= self.query.offset() { continue; - } else { - return Some(Ok(entry.1.clone())); } + self.count += 1; + return entry.map(Result::Ok); } } } @@ -493,7 +488,7 @@ impl Iterator for RecordsIter<'_> { fn next(&mut self) -> Option { let records = self.replica_records.get(&self.namespace)?; - let ((author, key), value) = records.iter().nth(self.i)?; + let ((author, key), value) = records.by_author.iter().nth(self.i)?; let id = RecordIdentifier::new(self.namespace, *author, key); self.i += 1; Some((id, value.clone())) @@ -508,25 +503,27 @@ impl crate::ranger::Store for ReplicaStoreInstance { Ok(self.with_records(|records| { records .and_then(|r| { - r.first_key_value().map(|((author, key), _value)| { - RecordIdentifier::new(self.namespace, *author, key.clone()) - }) + r.by_author + .first_key_value() + .map(|((author, key), _value)| { + RecordIdentifier::new(self.namespace, *author, key.clone()) + }) }) .unwrap_or_default() })) } - fn get(&self, key: &RecordIdentifier) -> Result, Self::Error> { + fn get(&self, id: &RecordIdentifier) -> Result, Self::Error> { Ok(self.with_records(|records| { records.and_then(|r| { - let v = r.get(&(key.author(), key.key().to_vec()))?; + let v = r.by_author.get(&(id.author(), id.key_bytes()))?; Some(v.clone()) }) })) } fn len(&self) -> Result { - Ok(self.with_records(|records| records.map(|v| v.len()).unwrap_or_default())) + Ok(self.with_records(|records| records.map(|v| v.by_author.len()).unwrap_or_default())) } fn is_empty(&self) -> Result { @@ -548,7 +545,7 @@ impl crate::ranger::Store for ReplicaStoreInstance { records.insert(e.author_bytes(), (e.timestamp(), e.key().to_vec())); }); self.with_records_mut_with_default(|records| { - records.insert((e.author_bytes(), e.key().to_vec()), e); + records.insert(e); }); Ok(()) } @@ -567,9 +564,7 @@ impl crate::ranger::Store for ReplicaStoreInstance { fn remove(&mut self, key: &RecordIdentifier) -> Result, Self::Error> { // TODO: what if we are trying to remove with the wrong timestamp? - let res = self.with_records_mut(|records| { - records.and_then(|records| records.remove(&(key.author(), key.key().to_vec()))) - }); + let res = self.with_records_mut(|records| records.and_then(|records| records.remove(key))); Ok(res) } @@ -616,7 +611,7 @@ impl crate::ranger::Store for ReplicaStoreInstance { let Some(records) = records else { return Ok(0); }; - let old_len = records.len(); + let old_len = records.by_author.len(); records.retain(|(a, k), v| { !(a == &prefix.author() && k.starts_with(prefix.key()) && predicate(v.entry())) }); diff --git a/iroh-sync/src/store/util.rs b/iroh-sync/src/store/util.rs new file mode 100644 index 00000000..e967e04b --- /dev/null +++ b/iroh-sync/src/store/util.rs @@ -0,0 +1,89 @@ +//! Utilities useful across different store impls. + +use crate::SignedEntry; + +use super::{AuthorFilter, KeyFilter, Query, QueryKind, SortBy}; + +/// A helper for stores that have by-author and by-key indexes for records. +#[derive(Debug)] +pub enum IndexKind { + AuthorKey { + range: AuthorFilter, + key_filter: KeyFilter, + }, + KeyAuthor { + range: KeyFilter, + author_filter: AuthorFilter, + latest_per_key: bool, + }, +} + +impl From<&Query> for IndexKind { + fn from(query: &Query) -> Self { + match &query.kind { + QueryKind::Flat(details) => match (&query.filter_author, details.sort_by) { + (AuthorFilter::Any, SortBy::KeyAuthor) => IndexKind::KeyAuthor { + range: query.filter_key.clone(), + author_filter: AuthorFilter::Any, + latest_per_key: false, + }, + _ => IndexKind::AuthorKey { + range: query.filter_author.clone(), + key_filter: query.filter_key.clone(), + }, + }, + QueryKind::SingleLatestPerKey(_) => IndexKind::KeyAuthor { + range: query.filter_key.clone(), + author_filter: query.filter_author.clone(), + latest_per_key: true, + }, + } + } +} + +/// Helper to extract the latest entry per key from an iterator that yields [`SignedEntry`] items. +/// +/// Items must be pushed in key-sorted order. +#[derive(Debug, Default)] +pub struct LatestPerKeySelector(Option); + +pub enum SelectorRes { + /// The iterator is finished. + Finished, + /// The selection is not yet finished, keep pushing more items. + Continue, + /// The selection yielded an entry. + Some(SignedEntry), +} + +impl LatestPerKeySelector { + /// Push an entry into the selector. + /// + /// Entries must be sorted by key beforehand. + pub fn push(&mut self, entry: Option) -> SelectorRes { + let Some(entry) = entry else { + return match self.0.take() { + Some(entry) => SelectorRes::Some(entry), + None => SelectorRes::Finished, + }; + }; + match self.0.take() { + None => { + self.0 = Some(entry); + SelectorRes::Continue + } + Some(last) if last.key() == entry.key() => { + if entry.timestamp() > last.timestamp() { + self.0 = Some(entry); + } else { + self.0 = Some(last); + } + SelectorRes::Continue + } + Some(last) => { + self.0 = Some(entry); + SelectorRes::Some(last) + } + } + } +} diff --git a/iroh-sync/src/sync.rs b/iroh-sync/src/sync.rs index 46ab8438..935c12c3 100644 --- a/iroh-sync/src/sync.rs +++ b/iroh-sync/src/sync.rs @@ -824,12 +824,12 @@ impl EntrySignature { } #[cfg(feature = "fs-store")] - pub(crate) fn author_signature(&self) -> &Signature { + pub(crate) fn author(&self) -> &Signature { &self.author_signature } #[cfg(feature = "fs-store")] - pub(crate) fn namespace_signature(&self) -> &Signature { + pub(crate) fn namespace(&self) -> &Signature { &self.namespace_signature } } @@ -939,7 +939,7 @@ impl Debug for RecordIdentifier { impl RangeKey for RecordIdentifier { fn is_prefix_of(&self, other: &Self) -> bool { - other.as_bytes().starts_with(self.as_bytes()) + other.as_ref().starts_with(self.as_ref()) } } @@ -969,9 +969,9 @@ impl RecordIdentifier { out.extend_from_slice(&self.0); } - /// Get this [`RecordIdentifier`] as a byte slices. - pub fn as_bytes(&self) -> &[u8] { - &self.0 + /// Get this [`RecordIdentifier`] as [Bytes]. + pub fn as_bytes(&self) -> Bytes { + self.0.clone() } /// Get this [`RecordIdentifier`] as a tuple of byte slices. @@ -983,11 +983,25 @@ impl RecordIdentifier { ) } + /// Get this [`RecordIdentifier`] as a tuple of bytes. + pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) { + ( + self.0[NAMESPACE_BYTES].try_into().unwrap(), + self.0[AUTHOR_BYTES].try_into().unwrap(), + self.0.slice(KEY_BYTES), + ) + } + /// Get the key of this record. pub fn key(&self) -> &[u8] { &self.0[KEY_BYTES] } + /// Get the key of this record as [`Bytes`]. + pub fn key_bytes(&self) -> Bytes { + self.0.slice(KEY_BYTES) + } + /// Get the [`NamespaceId`] of this record as byte array. pub fn namespace(&self) -> NamespaceId { let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap(); @@ -1001,6 +1015,12 @@ impl RecordIdentifier { } } +impl AsRef<[u8]> for RecordIdentifier { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + impl Deref for SignedEntry { type Target = Entry; fn deref(&self) -> &Self::Target { @@ -1127,7 +1147,7 @@ mod tests { use crate::{ actor::SyncHandle, ranger::{Range, Store as _}, - store::{self, GetFilter, OpenError, Store}, + store::{self, OpenError, Query, SortBy, SortDirection, Store}, }; use super::*; @@ -1172,7 +1192,7 @@ mod tests { for i in 0..10 { let res = store - .get_one(my_replica.id(), alice.id(), format!("/{i}"))? + .get_exact(my_replica.id(), alice.id(), format!("/{i}"), false)? .unwrap(); let len = format!("{i}: hello from alice").as_bytes().len() as u64; assert_eq!(res.entry().record().content_len(), len); @@ -1182,35 +1202,35 @@ mod tests { // Test multiple records for the same key my_replica.hash_and_insert("/cool/path", &alice, "round 1")?; let _entry = store - .get_one(my_replica.id(), alice.id(), "/cool/path")? + .get_exact(my_replica.id(), alice.id(), "/cool/path", false)? .unwrap(); // Second my_replica.hash_and_insert("/cool/path", &alice, "round 2")?; let _entry = store - .get_one(my_replica.id(), alice.id(), "/cool/path")? + .get_exact(my_replica.id(), alice.id(), "/cool/path", false)? .unwrap(); // Get All by author let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::Author(alice.id()))? + .get_many(my_replica.id(), Query::author(alice.id()))? .collect::>()?; assert_eq!(entries.len(), 11); // Get All by author let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::Author(bob.id()))? + .get_many(my_replica.id(), Query::author(bob.id()))? .collect::>()?; assert_eq!(entries.len(), 0); // Get All by key let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::Key(b"/cool/path".to_vec()))? + .get_many(my_replica.id(), Query::key_exact(b"/cool/path"))? .collect::>()?; assert_eq!(entries.len(), 1); // Get All let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::All)? + .get_many(my_replica.id(), Query::all())? .collect::>()?; assert_eq!(entries.len(), 11); @@ -1219,24 +1239,24 @@ mod tests { // Get All by author let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::Author(alice.id()))? + .get_many(my_replica.id(), Query::author(alice.id()))? .collect::>()?; assert_eq!(entries.len(), 11); let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::Author(bob.id()))? + .get_many(my_replica.id(), Query::author(bob.id()))? .collect::>()?; assert_eq!(entries.len(), 1); // Get All by key let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::Key(b"/cool/path".to_vec()))? + .get_many(my_replica.id(), Query::key_exact(b"/cool/path"))? .collect::>()?; assert_eq!(entries.len(), 2); // Get all by prefix let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::Prefix(b"/cool".to_vec()))? + .get_many(my_replica.id(), Query::key_prefix(b"/cool"))? .collect::>()?; assert_eq!(entries.len(), 2); @@ -1244,7 +1264,7 @@ mod tests { let entries: Vec<_> = store .get_many( my_replica.id(), - GetFilter::AuthorAndPrefix(alice.id(), b"/cool".to_vec()), + Query::author(alice.id()).key_prefix(b"/cool"), )? .collect::>()?; assert_eq!(entries.len(), 1); @@ -1252,14 +1272,14 @@ mod tests { let entries: Vec<_> = store .get_many( my_replica.id(), - GetFilter::AuthorAndPrefix(bob.id(), b"/cool".to_vec()), + Query::author(bob.id()).key_prefix(b"/cool"), )? .collect::>()?; assert_eq!(entries.len(), 1); // Get All let entries: Vec<_> = store - .get_many(my_replica.id(), GetFilter::All)? + .get_many(my_replica.id(), Query::all())? .collect::>()?; assert_eq!(entries.len(), 12); @@ -1491,7 +1511,9 @@ mod tests { replica .insert_entry(entry.clone(), InsertOrigin::Local) .unwrap(); - let res = store.get_one(namespace.id(), author.id(), key)?.unwrap(); + let res = store + .get_exact(namespace.id(), author.id(), key, false)? + .unwrap(); assert_eq!(res, entry); let entry2 = { @@ -1503,7 +1525,9 @@ mod tests { let res = replica.insert_entry(entry2, InsertOrigin::Local); assert!(matches!(res, Err(InsertError::NewerEntryExists))); - let res = store.get_one(namespace.id(), author.id(), key)?.unwrap(); + let res = store + .get_exact(namespace.id(), author.id(), key, false)? + .unwrap(); assert_eq!(res, entry); Ok(()) @@ -1716,9 +1740,18 @@ mod tests { // delete let deleted = replica.delete_prefix(b"foo", &alice)?; assert_eq!(deleted, 2); - assert_eq!(store.get_one(myspace.id(), alice.id(), b"foobar")?, None); - assert_eq!(store.get_one(myspace.id(), alice.id(), b"fooboo")?, None); - assert_eq!(store.get_one(myspace.id(), alice.id(), b"foo")?, None); + assert_eq!( + store.get_exact(myspace.id(), alice.id(), b"foobar", false)?, + None + ); + assert_eq!( + store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?, + None + ); + assert_eq!( + store.get_exact(myspace.id(), alice.id(), b"foo", false)?, + None + ); Ok(()) } @@ -1797,7 +1830,7 @@ mod tests { // insert entry let hash = replica.hash_and_insert(b"foo", &author, b"bar")?; let res = store - .get_many(namespace.id(), GetFilter::All)? + .get_many(namespace.id(), Query::all())? .collect::>(); assert_eq!(res.len(), 1); @@ -1808,7 +1841,7 @@ mod tests { store.close_replica(replica); store.remove_replica(&namespace.id())?; let res = store - .get_many(namespace.id(), GetFilter::All)? + .get_many(namespace.id(), Query::all())? .collect::>(); assert_eq!(res.len(), 0); @@ -1820,7 +1853,7 @@ mod tests { let mut replica = store.new_replica(namespace.clone())?; replica.insert(b"foo", &author, hash, 3)?; let res = store - .get_many(namespace.id(), GetFilter::All)? + .get_many(namespace.id(), Query::all())? .collect::>(); assert_eq!(res.len(), 1); Ok(()) @@ -2134,6 +2167,192 @@ mod tests { Ok(()) } + #[test] + fn test_replica_queries_mem() -> Result<()> { + let store = store::memory::Store::default(); + + test_replica_queries(store)?; + Ok(()) + } + + #[cfg(feature = "fs-store")] + #[test] + fn test_replica_queries_fs() -> Result<()> { + let dbfile = tempfile::NamedTempFile::new()?; + let store = store::fs::Store::new(dbfile.path())?; + test_replica_queries(store)?; + + Ok(()) + } + + fn test_replica_queries(store: S) -> Result<()> { + let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1); + let namespace = NamespaceSecret::new(&mut rng); + let mut replica = store.new_replica(namespace)?; + let namespace = replica.id(); + + let a1 = store.new_author(&mut rng)?; + let a2 = store.new_author(&mut rng)?; + let a3 = store.new_author(&mut rng)?; + println!( + "a1 {} a2 {} a3 {}", + a1.id().fmt_short(), + a2.id().fmt_short(), + a3.id().fmt_short() + ); + + replica.hash_and_insert("hi/world", &a2, "a2")?; + replica.hash_and_insert("hi/world", &a1, "a1")?; + replica.hash_and_insert("hi/moon", &a2, "a1")?; + replica.hash_and_insert("hi", &a3, "a3")?; + + struct QueryTester<'a, S: store::Store> { + store: &'a S, + namespace: NamespaceId, + } + impl<'a, S: store::Store> QueryTester<'a, S> { + fn assert(&self, query: impl Into, expected: Vec<(&'static str, &Author)>) { + let query = query.into(); + let actual = self + .store + .get_many(self.namespace, query.clone()) + .unwrap() + .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author()))) + .collect::>>() + .unwrap(); + let expected = expected + .into_iter() + .map(|(key, author)| (key.to_string(), author.id())) + .collect::>(); + assert_eq!(actual, expected, "query: {query:#?}") + } + } + + let qt = QueryTester { + store: &store, + namespace, + }; + + qt.assert( + Query::all(), + vec![ + ("hi/world", &a1), + ("hi/moon", &a2), + ("hi/world", &a2), + ("hi", &a3), + ], + ); + + qt.assert( + Query::single_latest_per_key(), + vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)], + ); + + qt.assert( + Query::single_latest_per_key().sort_direction(SortDirection::Desc), + vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)], + ); + + qt.assert( + Query::single_latest_per_key().key_prefix("hi/"), + vec![("hi/moon", &a2), ("hi/world", &a1)], + ); + + qt.assert( + Query::single_latest_per_key() + .key_prefix("hi/") + .sort_direction(SortDirection::Desc), + vec![("hi/world", &a1), ("hi/moon", &a2)], + ); + + qt.assert( + Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc), + vec![ + ("hi", &a3), + ("hi/moon", &a2), + ("hi/world", &a1), + ("hi/world", &a2), + ], + ); + + qt.assert( + Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc), + vec![ + ("hi/world", &a2), + ("hi/world", &a1), + ("hi/moon", &a2), + ("hi", &a3), + ], + ); + + qt.assert( + Query::all().key_prefix("hi/"), + vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)], + ); + + qt.assert( + Query::all().key_prefix("hi/").offset(1).limit(1), + vec![("hi/moon", &a2)], + ); + + qt.assert( + Query::all() + .key_prefix("hi/") + .sort_by(SortBy::KeyAuthor, SortDirection::Desc), + vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)], + ); + + qt.assert( + Query::all() + .key_prefix("hi/") + .sort_by(SortBy::KeyAuthor, SortDirection::Desc) + .offset(1) + .limit(1), + vec![("hi/world", &a1)], + ); + + qt.assert( + Query::all() + .key_prefix("hi/") + .sort_by(SortBy::AuthorKey, SortDirection::Asc), + vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)], + ); + + qt.assert( + Query::all() + .key_prefix("hi/") + .sort_by(SortBy::AuthorKey, SortDirection::Desc), + vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)], + ); + + qt.assert( + Query::all() + .sort_by(SortBy::KeyAuthor, SortDirection::Asc) + .limit(2) + .offset(1), + vec![("hi/moon", &a2), ("hi/world", &a1)], + ); + + replica.delete_prefix("hi/world", &a2)?; + + qt.assert( + Query::all(), + vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)], + ); + + qt.assert( + Query::all().include_empty(), + vec![ + ("hi/world", &a1), + ("hi/moon", &a2), + ("hi/world", &a2), + ("hi", &a3), + ], + ); + + Ok(()) + } + fn assert_keys(store: &S, namespace: NamespaceId, mut expected: Vec>) { expected.sort(); assert_eq!(expected, get_keys_sorted(store, namespace)); @@ -2141,7 +2360,7 @@ mod tests { fn get_keys_sorted(store: &S, namespace: NamespaceId) -> Vec> { let mut res = store - .get_many(namespace, GetFilter::All) + .get_many(namespace, Query::all()) .unwrap() .map(|e| e.map(|e| e.key().to_vec())) .collect::>>() @@ -2157,7 +2376,7 @@ mod tests { key: &[u8], ) -> anyhow::Result { let entry = store - .get_one(namespace, author, key)? + .get_exact(namespace, author, key, true)? .ok_or_else(|| anyhow::anyhow!("not found"))?; Ok(entry) } @@ -2169,7 +2388,7 @@ mod tests { key: &[u8], ) -> anyhow::Result> { let hash = store - .get_one(namespace, author, key)? + .get_exact(namespace, author, key, false)? .map(|e| e.content_hash()); Ok(hash) } @@ -2205,7 +2424,7 @@ mod tests { set: &[&str], ) -> Result<()> { for el in set { - store.get_one(*namespace, author.id(), el)?; + store.get_exact(*namespace, author.id(), el, false)?; } Ok(()) } diff --git a/iroh/examples/client.rs b/iroh/examples/client.rs index 45618ed2..6a332ebe 100644 --- a/iroh/examples/client.rs +++ b/iroh/examples/client.rs @@ -8,7 +8,7 @@ use indicatif::HumanBytes; use iroh::node::Node; use iroh_bytes::util::runtime; -use iroh_sync::{store::GetFilter, Entry}; +use iroh_sync::{store::Query, Entry}; use tokio_stream::StreamExt; #[tokio::main] @@ -26,7 +26,7 @@ async fn main() -> anyhow::Result<()> { let key = b"hello".to_vec(); let value = b"world".to_vec(); doc.set_bytes(author, key.clone(), value).await?; - let mut stream = doc.get_many(GetFilter::All).await?; + let mut stream = doc.get_many(Query::all()).await?; while let Some(entry) = stream.try_next().await? { println!("entry {}", fmt_entry(&entry)); let content = doc.read_to_bytes(&entry).await?; diff --git a/iroh/src/client.rs b/iroh/src/client.rs index 684f3cab..47b75c1f 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -22,8 +22,7 @@ use iroh_bytes::Hash; use iroh_bytes::{BlobFormat, Tag}; use iroh_net::{key::PublicKey, magic_endpoint::ConnectionInfo, NodeAddr}; use iroh_sync::actor::OpenState; -use iroh_sync::CapabilityKind; -use iroh_sync::{store::GetFilter, AuthorId, Entry, NamespaceId}; +use iroh_sync::{store::Query, AuthorId, CapabilityKind, Entry, NamespaceId}; use quic_rpc::message::RpcMsg; use quic_rpc::{RpcClient, ServiceConnection}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; @@ -36,7 +35,7 @@ use crate::rpc_protocol::{ BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse, BlobListRequest, BlobListResponse, BlobReadRequest, BlobReadResponse, BlobValidateRequest, CounterStats, DeleteTagRequest, DocCloseRequest, DocCreateRequest, DocDelRequest, - DocDelResponse, DocDropRequest, DocGetManyRequest, DocGetOneRequest, DocImportRequest, + DocDelResponse, DocDropRequest, DocGetExactRequest, DocGetManyRequest, DocImportRequest, DocLeaveRequest, DocListRequest, DocOpenRequest, DocSetHashRequest, DocSetRequest, DocShareRequest, DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, DownloadProgress, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, @@ -659,33 +658,49 @@ where Ok(removed) } - /// Get the latest entry for a key and author. - pub async fn get_one(&self, author: AuthorId, key: impl Into) -> Result> { + /// Get an entry for a key and author. + /// + /// Optionally also get the entry if it is empty (i.e. a deletion marker). + pub async fn get_exact( + &self, + author: AuthorId, + key: impl AsRef<[u8]>, + include_empty: bool, + ) -> Result> { self.ensure_open()?; let res = self - .rpc(DocGetOneRequest { + .rpc(DocGetExactRequest { author, - key: key.into(), + key: key.as_ref().to_vec().into(), doc_id: self.id(), + include_empty, }) .await??; Ok(res.entry.map(|entry| entry.into())) } /// Get entries. - pub async fn get_many(&self, filter: GetFilter) -> Result>> { + pub async fn get_many( + &self, + query: impl Into, + ) -> Result>> { self.ensure_open()?; let stream = self .0 .rpc .server_streaming(DocGetManyRequest { doc_id: self.id(), - filter, + query: query.into(), }) .await?; Ok(flatten(stream).map_ok(|res| res.entry.into())) } + /// Get a single entry. + pub async fn get_one(&self, query: impl Into) -> Result> { + self.get_many(query).await?.next().await.transpose() + } + /// Share this document with peers over a ticket. pub async fn share(&self, mode: ShareMode) -> anyhow::Result { self.ensure_open()?; diff --git a/iroh/src/commands/sync.rs b/iroh/src/commands/sync.rs index 022f8e1b..c4eb69c0 100644 --- a/iroh/src/commands/sync.rs +++ b/iroh/src/commands/sync.rs @@ -21,7 +21,10 @@ use iroh::{ util::fs::{path_content_info, PathContent}, }; use iroh_bytes::{provider::AddProgress, Hash, Tag}; -use iroh_sync::{store::GetFilter, AuthorId, Entry, NamespaceId}; +use iroh_sync::{ + store::{Query, SortDirection}, + AuthorId, Entry, NamespaceId, +}; use crate::config::ConsoleEnv; @@ -140,6 +143,12 @@ pub enum DocCommands { author: Option, /// Optional key prefix (parsed as UTF-8 string) prefix: Option, + /// How to sort the entries + #[clap(long, default_value_t=Sorting::Author)] + sort: Sorting, + /// Sort in descending order + #[clap(long)] + desc: bool, /// How to show the contents of the keys. #[clap(short, long, default_value_t=DisplayContentMode::Hash)] mode: DisplayContentMode, @@ -218,6 +227,24 @@ pub enum DocCommands { }, } +#[derive(clap::ValueEnum, Clone, Debug, Default, strum::Display)] +#[strum(serialize_all = "kebab-case")] +pub enum Sorting { + /// Sort by author, then key + #[default] + Author, + /// Sort by key, then author + Key, +} +impl From for iroh_sync::store::SortBy { + fn from(value: Sorting) -> Self { + match value { + Sorting::Author => Self::AuthorKey, + Sorting::Key => Self::KeyAuthor, + } + } +} + #[derive(Debug, Clone, Parser)] pub enum AuthorCommands { /// Set the active author (only works within the Iroh console). @@ -324,22 +351,15 @@ impl DocCommands { } => { let doc = get_doc(iroh, env, doc).await?; let key = key.as_bytes().to_vec(); - let filter = match (author, prefix) { - (None, false) => GetFilter::Key(key), - (None, true) => GetFilter::Prefix(key), - (Some(author), true) => GetFilter::AuthorAndPrefix(author, key), - (Some(author), false) => { - // Special case: Author and key, this means single entry. - let entry = doc - .get_one(author, key) - .await? - .ok_or_else(|| anyhow!("Entry not found"))?; - println!("{}", fmt_entry(&doc, &entry, mode).await); - return Ok(()); - } + let query = Query::all(); + let query = match (author, prefix) { + (None, false) => query.key_exact(key), + (None, true) => query.key_prefix(key), + (Some(author), true) => query.author(author).key_prefix(key), + (Some(author), false) => query.author(author).key_exact(key), }; - let mut stream = doc.get_many(filter).await?; + let mut stream = doc.get_many(query).await?; while let Some(entry) = stream.try_next().await? { println!("{}", fmt_entry(&doc, &entry, mode).await); } @@ -349,11 +369,23 @@ impl DocCommands { prefix, author, mode, + sort, + desc, } => { let doc = get_doc(iroh, env, doc).await?; - let filter = GetFilter::author_prefix(author, prefix); - - let mut stream = doc.get_many(filter).await?; + let mut query = Query::all(); + if let Some(author) = author { + query = query.author(author); + } + if let Some(prefix) = prefix { + query = query.key_prefix(prefix); + } + let direction = match desc { + true => SortDirection::Desc, + false => SortDirection::Asc, + }; + query = query.sort_by(sort.into(), direction); + let mut stream = doc.get_many(query).await?; while let Some(entry) = stream.try_next().await? { println!("{}", fmt_entry(&doc, &entry, mode).await); } @@ -420,8 +452,8 @@ impl DocCommands { let key_str = key.clone(); let key = key.as_bytes().to_vec(); let path: PathBuf = canonicalize_path(&out)?; - let stream = doc.get_many(GetFilter::Key(key)).await?; - let entry = match get_latest(stream).await? { + let mut stream = doc.get_many(Query::key_exact(key)).await?; + let entry = match stream.try_next().await? { None => { println!(""); return Ok(()); @@ -843,23 +875,3 @@ impl ImportProgressBar { self.mp.clear().ok(); } } - -/// Get the latest entry for a key. If `None`, then an entry of the given key -/// could not be found. -async fn get_latest(stream: impl Stream>) -> Result> { - let entry = stream - .try_fold(None, |acc: Option, cur: Entry| async move { - match acc { - None => Ok(Some(cur)), - Some(prev) => { - if cur.timestamp() > prev.timestamp() { - Ok(Some(cur)) - } else { - Ok(Some(prev)) - } - } - } - }) - .await?; - Ok(entry) -} diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 64709011..a4e95ab4 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -1561,9 +1561,9 @@ fn handle_rpc_request>( }) .await } - DocGetOne(msg) => { + DocGetExact(msg) => { chan.rpc(msg, handler, |handler, req| async move { - handler.inner.sync.doc_get_one(req).await + handler.inner.sync.doc_get_exact(req).await }) .await } diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 797e2a35..49cae325 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -20,9 +20,8 @@ use iroh_net::{ use iroh_sync::{ actor::OpenState, - store::GetFilter, - sync::SignedEntry, - CapabilityKind, {AuthorId, NamespaceId}, + store::Query, + {AuthorId, CapabilityKind, NamespaceId, SignedEntry}, }; use quic_rpc::{ message::{BidiStreaming, BidiStreamingMsg, Msg, RpcMsg, ServerStreaming, ServerStreamingMsg}, @@ -701,8 +700,8 @@ pub struct DocSetHashResponse {} pub struct DocGetManyRequest { /// The document id pub doc_id: NamespaceId, - /// Filter entries by this [`GetFilter`] - pub filter: GetFilter, + /// Query to run + pub query: Query, } impl Msg for DocGetManyRequest { @@ -722,22 +721,24 @@ pub struct DocGetManyResponse { /// Get entries from a document #[derive(Serialize, Deserialize, Debug)] -pub struct DocGetOneRequest { +pub struct DocGetExactRequest { /// The document id pub doc_id: NamespaceId, - /// Key + /// Key matcher pub key: Bytes, - /// Author + /// Author matcher pub author: AuthorId, + /// Whether to include empty entries (prefix deletion markers) + pub include_empty: bool, } -impl RpcMsg for DocGetOneRequest { - type Response = RpcResult; +impl RpcMsg for DocGetExactRequest { + type Response = RpcResult; } -/// Response to [`DocGetOneRequest`] +/// Response to [`DocGetExactRequest`] #[derive(Serialize, Deserialize, Debug)] -pub struct DocGetOneResponse { +pub struct DocGetExactResponse { /// The document entry pub entry: Option, } @@ -866,7 +867,7 @@ pub enum ProviderRequest { DocSet(DocSetRequest), DocSetHash(DocSetHashRequest), DocGet(DocGetManyRequest), - DocGetOne(DocGetOneRequest), + DocGetExact(DocGetExactRequest), DocDel(DocDelRequest), DocStartSync(DocStartSyncRequest), DocLeave(DocLeaveRequest), @@ -911,7 +912,7 @@ pub enum ProviderResponse { DocSet(RpcResult), DocSetHash(RpcResult), DocGet(RpcResult), - DocGetOne(RpcResult), + DocGetExact(RpcResult), DocDel(RpcResult), DocShare(RpcResult), DocStartSync(RpcResult), diff --git a/iroh/src/sync_engine/rpc.rs b/iroh/src/sync_engine/rpc.rs index 78e6f838..1ea194e6 100644 --- a/iroh/src/sync_engine/rpc.rs +++ b/iroh/src/sync_engine/rpc.rs @@ -10,13 +10,13 @@ use crate::{ rpc_protocol::{ AuthorCreateRequest, AuthorCreateResponse, AuthorListRequest, AuthorListResponse, DocCloseRequest, DocCloseResponse, DocCreateRequest, DocCreateResponse, DocDelRequest, - DocDelResponse, DocDropRequest, DocDropResponse, DocGetManyRequest, DocGetManyResponse, - DocGetOneRequest, DocGetOneResponse, DocImportRequest, DocImportResponse, DocLeaveRequest, - DocLeaveResponse, DocListRequest, DocListResponse, DocOpenRequest, DocOpenResponse, - DocSetHashRequest, DocSetHashResponse, DocSetRequest, DocSetResponse, DocShareRequest, - DocShareResponse, DocStartSyncRequest, DocStartSyncResponse, DocStatusRequest, - DocStatusResponse, DocSubscribeRequest, DocSubscribeResponse, DocTicket, RpcResult, - ShareMode, + DocDelResponse, DocDropRequest, DocDropResponse, DocGetExactRequest, DocGetExactResponse, + DocGetManyRequest, DocGetManyResponse, DocImportRequest, DocImportResponse, + DocLeaveRequest, DocLeaveResponse, DocListRequest, DocListResponse, DocOpenRequest, + DocOpenResponse, DocSetHashRequest, DocSetHashResponse, DocSetRequest, DocSetResponse, + DocShareRequest, DocShareResponse, DocStartSyncRequest, DocStartSyncResponse, + DocStatusRequest, DocStatusResponse, DocSubscribeRequest, DocSubscribeResponse, DocTicket, + RpcResult, ShareMode, }, sync_engine::SyncEngine, }; @@ -176,7 +176,7 @@ impl SyncEngine { .await?; let entry = self .sync - .get_one(doc_id, author_id, key) + .get_exact(doc_id, author_id, key, false) .await? .ok_or_else(|| anyhow!("failed to get entry after insertion"))?; Ok(DocSetResponse { entry }) @@ -210,14 +210,14 @@ impl SyncEngine { &self, req: DocGetManyRequest, ) -> impl Stream> { - let DocGetManyRequest { doc_id, filter } = req; + let DocGetManyRequest { doc_id, query } = req; let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); let sync = self.sync.clone(); // we need to spawn a task to send our request to the sync handle, because the method // itself must be sync. self.rt.main().spawn(async move { let tx2 = tx.clone(); - if let Err(err) = sync.get_many(doc_id, filter, tx).await { + if let Err(err) = sync.get_many(doc_id, query, tx).await { tx2.send_async(Err(err)).await.ok(); } }); @@ -227,13 +227,17 @@ impl SyncEngine { }) } - pub async fn doc_get_one(&self, req: DocGetOneRequest) -> RpcResult { - let DocGetOneRequest { + pub async fn doc_get_exact(&self, req: DocGetExactRequest) -> RpcResult { + let DocGetExactRequest { doc_id, author, key, + include_empty, } = req; - let entry = self.sync.get_one(doc_id, author, key).await?; - Ok(DocGetOneResponse { entry }) + let entry = self + .sync + .get_exact(doc_id, author, key, include_empty) + .await?; + Ok(DocGetExactResponse { entry }) } } diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index f9cdf584..cb7bf3a1 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -23,7 +23,7 @@ use tracing_subscriber::{prelude::*, EnvFilter}; use iroh_bytes::{util::runtime, Hash}; use iroh_net::derp::DerpMode; use iroh_sync::{ - store::{self, GetFilter}, + store::{self, Query}, AuthorId, ContentStatus, Entry, }; @@ -755,7 +755,7 @@ async fn doc_delete() -> Result<()> { let deleted = doc.del(author, b"foo".to_vec()).await?; assert_eq!(deleted, 1); - let entry = doc.get_one(author, b"foo".to_vec()).await?; + let entry = doc.get_exact(author, b"foo".to_vec(), false).await?; assert!(entry.is_none()); // wait for gc @@ -785,7 +785,7 @@ async fn sync_drop_doc() -> Result<()> { assert!(matches!(ev, Some(Ok(LiveEvent::InsertLocal { .. })))); client.docs.drop_doc(doc.id()).await?; - let res = doc.get_one(author, b"foo".to_vec()).await; + let res = doc.get_exact(author, b"foo".to_vec(), true).await; assert!(res.is_err()); let res = doc .set_bytes(author, b"foo".to_vec(), b"bar".to_vec()) @@ -805,9 +805,9 @@ async fn assert_latest(doc: &Doc, key: &[u8], value: &[u8]) { } async fn get_latest(doc: &Doc, key: &[u8]) -> anyhow::Result> { - let filter = GetFilter::Key(key.to_vec()); + let query = Query::single_latest_per_key().key_exact(key); let entry = doc - .get_many(filter) + .get_many(query) .await? .next() .await