Skip to content

Commit

Permalink
Add parameter ignoreTransactionErrors. Make cli open not fail on errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexagon committed Sep 10, 2024
1 parent 5c19059 commit f9c7d95
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 22 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## Unreleased

- Add parameter `ignoreTransactionErrors` to `open()` and `sync()`.
- Make cli `open` and `open:noindex` ignore errors by default.

## 0.16.5

- Makes cli tool `open` and `close` more resilient to errors.
Expand Down
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ deno install -frA --name ckv jsr:@cross/kv/cli
### Methods
- `KV(options)` - Main class. Options are optional.
- `async open(filepath, createIfMissing = true)` - OOpens the KV store at the
specified file path, creating it if it doesn't exist (default behavior).
- `async open(filepath, createIfMissing = true, ignoreTransactionErrors = false)` -
Opens the KV store at the specified file path, creating it if it doesn't
exist (default behavior).
- `async set<T>(key, value)` - Stores a value associated with the given key.
- `async delete(key)` - Removes the key-value pair identified by the key.
- `async get<T>(key)` - Retrieves the value associated with the specified key.
Expand All @@ -121,8 +122,8 @@ deno install -frA --name ckv jsr:@cross/kv/cli
transaction history (all set and delete operations) for keys matching the
query. Optionally recurses into subkeys and fetches the associated data.
- `listKeys(query)` - Returns an array of all keys matching the given query.
- `async sync()` - Manually synchronizes the in-memory index with the on-disk
data store.
- `async sync(ignoreTransactionErrors = false)` - Manually synchronizes the
in-memory index with the on-disk data store.
- `watch<T>(query, callback, recursive): void` - Registers a callback to be
invoked whenever a matching transaction (set or delete) is added.
- `unwatch<T>(query, callback): void` - Unregisters a previously registered
Expand Down
1 change: 1 addition & 0 deletions mod.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export {
KV,
type KVOptions,
type KVSyncErrors,
type KVSyncResult,
type KVSyncResultStatus,
} from "./src/lib/kv.ts";
Expand Down
31 changes: 29 additions & 2 deletions src/cli/commands/open.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,26 @@ export async function open(
}
container.db = new KV();
try {
await container.db.open(dbPath, true);
const openResult = await container.db.open(dbPath, true, true);
// Operation succeeded
if (openResult?.errors) {
// Print errors in a user-friendly way
console.error(`Errors occurred during database opening:`);
for (const error of openResult.errors) {
if (error) {
console.error(`\t- ${error.message}`);
if (error.cause) console.error(`\t ${error.cause}`);
} else {
console.error(`\t- An unknown error occurred.`);
}
}
}
return true;
} catch (e) {
await container.db.close();
container.db = undefined;
console.error(`Could not open database: ${e.message}`);
if (e.cause) console.error(`\t${e.cause}`);
return false;
}
}
Expand All @@ -58,7 +72,20 @@ export async function openNoIndex(
}
container.db = new KV({ disableIndex: true });
try {
await container.db.open(dbPath, true);
const openResult = await container.db.open(dbPath, true, true);
// Operation succeeded
if (openResult?.errors) {
// Print errors in a user-friendly way
console.error(`Errors occurred during database opening:`);
for (const error of openResult.errors) {
if (error) {
console.error(`\t- ${error.message}`);
if (error.cause) console.error(`\t ${error.cause}`);
} else {
console.error(`\t- An unknown error occurred.`);
}
}
}
return true;
} catch (e) {
console.error(`Could not open database: ${e.message}`);
Expand Down
51 changes: 35 additions & 16 deletions src/lib/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ export type KVSyncResultStatus =
| "ledgerInvalidated" /** The ledger was invalidated and needs to be reopened. */
| "error"; /** An error occurred during synchronization. Check the `error` property for details. */

/**
* Represents an array of errors that occurred during a synchronizing with the on-disk ledger.
*
* Each element in the array is either an `Error` object or `null`. If an error occurred, the `Error` object will provide more details.
* If no errors occurred, the array will be empty.
*/
export type KVSyncErrors = (Error | null)[];

/**
* The result of a synchronization operation between the in-memory index and the on-disk ledger.
*/
Expand All @@ -34,7 +42,7 @@ export interface KVSyncResult {
/**
* If an error occurred during synchronization, this property will contain the Error object. Otherwise, it will be null.
*/
error: Error | null;
errors: KVSyncErrors;
}

/**
Expand Down Expand Up @@ -242,11 +250,13 @@ export class KV extends EventEmitter {
*
* @param filePath - Path to the base file for the KV store. Index and data files will be derived from this path.
* @param createIfMissing - If true, the KV store files will be created if they do not exist. Default is true.
* @param ignoreTransactionErrors - If true, the open operation keeps going even if encountering errors, collection all of them. Default is false.
*/
public async open(
filePath: string,
createIfMissing: boolean = true,
) {
ignoreTransactioErrors: boolean = false,
): Promise<KVSyncResult> {
// Do not allow re-opening a closed database
if (this.aborted) {
throw new Error("Could not open, database already closed.");
Expand All @@ -264,10 +274,11 @@ export class KV extends EventEmitter {
// Do the initial synchronization
// - If `this.autoSync` is enabled, additional synchronizations will be carried out every `this.syncIntervalMs`

const syncResult = await this.sync();
if (syncResult.error) {
throw syncResult.error;
const syncResult = await this.sync(ignoreTransactioErrors);
if (syncResult.errors?.length > 0 && !ignoreTransactioErrors) {
throw syncResult.errors[0];
}
return syncResult;
}

/**
Expand Down Expand Up @@ -343,19 +354,23 @@ export class KV extends EventEmitter {
* - Automatically run on adding data
* - Can be manually triggered for full consistency before data retrieval (iterate(), listAll(), get())
*
* @param ignoreTransactionErrors - If true, the sync operation keeps going even if encountering errors, collection all of them. Default is false.
*
* @emits sync - Emits an event with the synchronization result:
* - `result`: "ready" | "blocked" | "success" | "ledgerInvalidated" | "error"
* - `error`: Error object (if an error occurred) or null
*
* @throws {Error} If an unexpected error occurs during synchronization.
*/
public async sync(): Promise<KVSyncResult> {
public async sync(
ignoreTransactioErrors: boolean = false,
): Promise<KVSyncResult> {
// Throw if database isn't open
this.ensureOpen();

// Synchronization Logic (with lock if needed)
let result: KVSyncResult["result"] = "ready";
let error: Error | null = null;
const errors: KVSyncErrors = [];
try {
const newTransactions = await this.ledger?.sync(this.disableIndex);

Expand All @@ -370,23 +385,27 @@ export class KV extends EventEmitter {
this.applyTransactionToIndex(entry.transaction, entry.offset); // Refactored for clarity
} catch (transactionError) {
result = "error";
error = new Error("Error processing transaction", {
cause: transactionError,
});
break; // Stop processing on transaction error
errors.push(
new Error("Error processing transaction", {
cause: transactionError,
}),
);
if (!ignoreTransactioErrors) {
break; // Stop processing on transaction error
}
}
}
}
}
} catch (syncError) {
result = "error";
error = new Error("Error during ledger sync", { cause: syncError });
errors.push(new Error("Error during ledger sync", { cause: syncError }));
} finally {
// @ts-ignore .emit exists
this.emit("sync", { result, error });
this.emit("sync", { result, errors });
}

return { result, error };
return { result, errors };
}

/**
Expand Down Expand Up @@ -718,8 +737,8 @@ export class KV extends EventEmitter {
try {
// Sync before writing the transactions
const syncResult = await this.sync();
if (syncResult.error) {
throw syncResult.error;
if (syncResult.errors.length > 0) {
throw syncResult.errors[0];
}

// Write all buffered transactions at once and get the base offset
Expand Down

0 comments on commit f9c7d95

Please sign in to comment.