Skip to content

Commit

Permalink
Add option fetchData to scan function, allow faster retrieval of hist…
Browse files Browse the repository at this point in the history
…ory when full data is not required.
  • Loading branch information
Hexagon committed Jun 3, 2024
1 parent 4f286c4 commit 52e4ca4
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 35 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## Changes

- Remove sync result `noop` as isn't used anywhere anymore
- Adds `fetchData` option to `scan`. Setting this to `false` enables faster
retrieval of transaction metadata.

## Fixes

Expand Down
49 changes: 27 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,31 +107,36 @@ deno install -frA --name ckv jsr:@cross/kv/cli
### Methods
- `KV(options)` - Main class. Options are optional.
- `async open(filepath, createIfMissing)` - Opens the KV store.
`createIfMissing` defaults to true.
- `async set<T>(key, value)` - Stores a value.
- `async get<T>(key)` - Retrieves a value.
- `async *iterate<T>(query, limit, reverse)` - Iterates over entries for a
key. Limit and reverse are optional.
- `async listAll<T>(query, limit, reverse)` - Gets all entries for a key as an
array. Limit and reverse are optional.
- `listKeys(query)` - List all keys under <query>.
- `async delete(key)` - Deletes a key-value pair.
- `async sync()` - Synchronizez the ledger with disk.
- `async open(filepath, createIfMissing = true)` - OOpens the KV store at the
specified file path, creating it if it doesn't exist (default behavior).
- `async set<T>(key, value)` - Stores a value associated with the given key.
- `async delete(key)` - Removes the key-value pair identified by the key.
- `async get<T>(key)` - Retrieves the value associated with the specified key.
Returns null if the key does not exist.
- `async *iterate<T>(query, limit, reverse)` - Asynchronously iterates over
the latest values matching the query
- `async listAll<T>(query, limit, reverse)` - Retrieves all latest values
matching the query as an array.
- `async *scan<T>(query, limit, reverse)` - Asynchronously iterates over the
transaction history (all set and delete operations) for keys matching the
query. Optionally recurses into subkeys and fetches the associated data.
- `listKeys(query)` - Returns an array of all keys matching the given query.
- `async sync()` - Manually synchronizes the in-memory index with the on-disk
data store.
- `watch<T>(query, callback, recursive): void` - Registers a callback to be
called whenever a new transaction matching the given query is added to the
database.
invoked whenever a matching transaction (set or delete) is added.
- `unwatch<T>(query, callback): void` - Unregisters a previously registered
watch handler.
- `beginTransaction()` - Starts an atomic transaction.
- `async endTransaction()` - Ends an atomic transaction, returns a list of
`Errors` if any occurred.
- `async vacuum()` - Reclaims storage space.
- `on(eventName, eventData)` - Listen for events such as `sync`,
`watchdogError` or `closing`.
- `isOpen()` - Returns true if the database is open and ready for
transactions.
- `async close()` - Closes the KV store.
- `beginTransaction()` - Starts an atomic transaction, ensuring data
consistency for multiple operations.
- `async endTransaction()` - Commits all changes made within the transaction,
or rolls back if errors occur.
- `async vacuum()` - Optimizes storage by removing redundant transaction
history, retaining only the latest value for each key.
- `on(eventName, eventData)` - Subscribes to events like `sync`,
`watchdogError`, or `closing` to get notified of specific occurrences.
- `isOpen()` - Returns true if the database is open and ready for operations.
- `async close()` - Closes the KV store, ensuring resources are released.
### Keys
Expand Down
2 changes: 1 addition & 1 deletion src/cli/commands/stats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async function stats(
let ledgerInvalidCount = 0;
if (kvStore) {
try {
for await (const entry of kvStore.scan([], true)) {
for await (const entry of kvStore.scan([], true, false)) {
if (entry.operation === KVOperation.SET) {
ledgerSetCount++;
} else if (entry.operation === KVOperation.DELETE) {
Expand Down
6 changes: 4 additions & 2 deletions src/lib/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,18 +345,20 @@ export class KV extends EventEmitter {
}

/**
* Asynchronously iterates over data entries associated with a given key.
* Asynchronously iterates over transactions associated with a given key.
*
* @param query - Representation of the key to search for, or a query object for complex filters.
* @param recursive - Match all entries matching the given query, and recurse.
* @param fetchData - Return transactions with full data. Setting this to false improves performance, but does only yield transaction metadata.
* @returns An async generator yielding `KVTransactionResult` objects for each matching entry.
*/
public async *scan<T = unknown>(
query: KVKey | KVQuery,
recursive: boolean = false,
fetchData: boolean = true,
): AsyncGenerator<KVTransactionResult<T>> {
this.ensureOpen();
for await (const result of this.ledger!.scan(query, recursive)) {
for await (const result of this.ledger!.scan(query, recursive, fetchData)) {
if (result?.transaction) { // Null check to ensure safety
const processedResult = result.transaction.asResult<T>(); // Apply your processing logic here
yield processedResult;
Expand Down
8 changes: 5 additions & 3 deletions src/lib/ledger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ export class KVLedger {
try {
await this.readHeader();

// If the ledger is re-created (by vacuum or overwriting), there will be a time in the cached header
// If the ledger is re-created (by vacuum or overwriting), there will be one time in the cached header
// and there will be a different time after reading the header
if (currentCreated !== 0 && currentCreated !== this.header.created) {
// Return 0 to invalidate this ledger
Expand Down Expand Up @@ -205,11 +205,13 @@ export class KVLedger {
*
* @param query
* @param recursive
* @param fetchData
* @returns An async generator yielding `KVLedgerResult` objects for each matching transaction.
*/
public async *scan(
query: KVQuery,
recursive: boolean,
fetchData: boolean = true,
): AsyncIterableIterator<KVLedgerResult> {
this.ensureOpen();

Expand All @@ -228,7 +230,7 @@ export class KVLedger {
);
if (result.transaction.key?.matchesQuery(query, recursive)) {
// Check for completeness
if (result.complete) {
if (result.complete || !fetchData) {
yield result;
} else {
const completeResult = await this.rawGetTransaction(
Expand Down Expand Up @@ -335,7 +337,7 @@ export class KVLedger {
try {
if (!externalFd) fd = await rawOpen(this.dataPath, false);

// Fetch 2 + 4 + 4 bytes (signature, header length, data length) + prefetch
// Fetch 2 + 4 + 4 bytes (signature, header length, data length)
const transactionLengthData = await readAtPosition(
fd!,
TRANSACTION_SIGNATURE.length + 4 + 4,
Expand Down
12 changes: 5 additions & 7 deletions src/lib/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ export class KVTransaction {
}

public headerFromUint8Array(
data: Uint8Array | DataView,
data: Uint8Array,
readHash: boolean = true,
) {
const dataView = data instanceof DataView ? data : new DataView(
const dataView = new DataView(
data.buffer,
data.byteOffset,
data.byteLength,
Expand All @@ -167,11 +167,9 @@ export class KVTransaction {

// Decode hash bytes
if (readHash) {
this.hash = new Uint8Array(
dataView.buffer.slice(
dataView.byteOffset + offset,
dataView.byteOffset + offset + hashLength,
),
this.hash = data.slice(
offset,
offset + hashLength,
);
}
offset += hashLength;
Expand Down

0 comments on commit 52e4ca4

Please sign in to comment.