Skip to content

Commit

Permalink
Better error handling. Add cli command repair.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexagon committed Sep 12, 2024
1 parent f9c7d95 commit 114ddfc
Show file tree
Hide file tree
Showing 11 changed files with 302 additions and 154 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

- Add parameter `ignoreTransactionErrors` to `open()` and `sync()`.
- Make cli `open` and `open:noindex` ignore errors by default.
- Add cli command `repair`
- Add optional parameter `ignoreReadErrors` to commands `open`, `sync`, `scan`
and `vacuum`

## 0.16.5

Expand Down
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ deno install -frA --name ckv jsr:@cross/kv/cli
### Methods
- `KV(options)` - Main class. Options are optional.
- `async open(filepath, createIfMissing = true, ignoreTransactionErrors = false)` -
- `async open(filepath, createIfMissing = true, ignoreReadErrors = false)` -
Opens the KV store at the specified file path, creating it if it doesn't
exist (default behavior).
- `async set<T>(key, value)` - Stores a value associated with the given key.
Expand All @@ -118,12 +118,13 @@ deno install -frA --name ckv jsr:@cross/kv/cli
the latest values matching the query
- `async listAll<T>(query, limit, reverse)` - Retrieves all latest values
matching the query as an array.
- `async *scan<T>(query, limit, reverse)` - Asynchronously iterates over the
transaction history (all set and delete operations) for keys matching the
query. Optionally recurses into subkeys and fetches the associated data.
- `async *scan<T>(query, limit, reverse, ignoreReadErrors = false)` -
Asynchronously iterates over the transaction history (all set and delete
operations) for keys matching the query. Optionally recurses into subkeys
and fetches the associated data.
- `listKeys(query)` - Returns an array of all keys matching the given query.
- `async sync(ignoreTransactionErrors = false)` - Manually synchronizes the
in-memory index with the on-disk data store.
- `async sync(ignoreReadErrors = false)` - Manually synchronizes the in-memory
index with the on-disk data store.
- `watch<T>(query, callback, recursive): void` - Registers a callback to be
invoked whenever a matching transaction (set or delete) is added.
- `unwatch<T>(query, callback): void` - Unregisters a previously registered
Expand All @@ -132,8 +133,8 @@ deno install -frA --name ckv jsr:@cross/kv/cli
consistency for multiple operations.
- `async endTransaction()` - Commits all changes made within the transaction,
or rolls back if errors occur.
- `async vacuum()` - Optimizes storage by removing redundant transaction
history, retaining only the latest value for each key.
- `async vacuum(ignoreReadErrors = false)` - Optimizes storage by removing
redundant transaction history, retaining only the latest value for each key.
- `on(eventName, eventData)` - Subscribes to events like `sync`,
`watchdogError`, or `closing` to get notified of specific occurrences.
- `isOpen()` - Returns true if the database is open and ready for operations.
Expand Down
20 changes: 15 additions & 5 deletions src/cli/commands/open.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,18 @@ export async function open(
try {
const openResult = await container.db.open(dbPath, true, true);
// Operation succeeded
if (openResult?.errors) {
if (openResult?.errors.length > 0) {
// Print errors in a user-friendly way
console.error(`Errors occurred during database opening:`);
for (const error of openResult.errors) {
if (error) {
console.error(`\t- ${error.message}`);
if (error.cause) console.error(`\t ${error.cause}`);
if (error.cause) {
console.error(`\t ${error.cause}`);
} else if (error.message) {
console.error(`\t- ${error.message}`);
} else {
console.error(`\t- ${error}`);
}
} else {
console.error(`\t- An unknown error occurred.`);
}
Expand Down Expand Up @@ -79,8 +84,13 @@ export async function openNoIndex(
console.error(`Errors occurred during database opening:`);
for (const error of openResult.errors) {
if (error) {
console.error(`\t- ${error.message}`);
if (error.cause) console.error(`\t ${error.cause}`);
if (error.cause) {
console.error(`\t ${error.cause}`);
} else if (error.message) {
console.error(`\t- ${error.message}`);
} else {
console.error(`\t- ${error}`);
}
} else {
console.error(`\t- An unknown error occurred.`);
}
Expand Down
27 changes: 27 additions & 0 deletions src/cli/commands/repair.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import {
ensureMaxParameters,
ensureOpen,
type KVDBContainer,
} from "../common.ts";

export async function repair(
container: KVDBContainer,
params: string[],
): Promise<boolean> {
if (!ensureOpen(container)) return false;
if (!ensureMaxParameters(params, 0)) return false;

console.log("");

try {
await container.db?.vacuum(true);
console.log("Repair done.");
console.log("");
return true;
} catch (e) {
console.log(e);
console.error(`Repair failed: ${e.message}`);
console.log("");
return false;
}
}
6 changes: 5 additions & 1 deletion src/cli/commands/stats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,20 @@ async function stats(
let ledgerInvalidCount = 0;
if (kvStore) {
try {
for await (const entry of kvStore.scan([], true, false)) {
for await (const entry of kvStore.scan([], true, false, true)) {
if (entry.operation === KVOperation.SET) {
ledgerSetCount++;
} else if (entry.operation === KVOperation.DELETE) {
ledgerDeleteCount++;
} else {
ledgerInvalidCount++;
}
if (!entry.clean) {
ledgerInvalidCount++;
}
}
} catch (_e) {
console.log(_e);
ledgerInvalidCount++;
}
console.log(
Expand Down
8 changes: 7 additions & 1 deletion src/cli/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ export async function main() {
const cmd = commandSplit[0].toLowerCase();
const startTime = performance.now(); // Start measuring time
if (commands[cmd]) {
const success = await commands[cmd](container, commandSplit.slice(1));
let success = false;
try {
success = await commands[cmd](container, commandSplit.slice(1));
} catch (error) {
console.error(error);
success = false;
}
const endTime = performance.now(); // End measuring time
const elapsedTime = (endTime - startTime).toFixed(2); // Calculate elapsed time (in milliseconds)
console.log(
Expand Down
2 changes: 2 additions & 0 deletions src/cli/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { scan } from "./commands/scan.ts";
import { stats } from "./commands/stats.ts";
import { count } from "./commands/count.ts";
import { vacuum } from "./commands/vacuum.ts";
import { repair } from "./commands/repair.ts";
import { unlock } from "./commands/unlock.ts";
import { sync } from "./commands/sync.ts";
registerCommand("help", help);
Expand All @@ -39,6 +40,7 @@ registerCommand("delete", del);
registerCommand("stats", stats);
registerCommand("count", count);
registerCommand("vacuum", vacuum);
registerCommand("repair", repair);
registerCommand("unlock", unlock);
registerCommand("sync", sync);

Expand Down
2 changes: 1 addition & 1 deletion src/lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export const SUPPORTED_LEDGER_VERSIONS: string[] = [
LEDGER_CURRENT_VERSION,
];
export const LEDGER_PREFETCH_BYTES = 50 * 1024; // Prefetch chunks of 50KB of data while reading the ledger
export const LEDGER_MAX_READ_FAILURES = 10;
export const LEDGER_MAX_READ_FAILURE_BYTES = 10 * 1024 * 1024; // Allow at most 10MB of read failures
export const SYNC_INTERVAL_MS = 2_500; // Overridable with instance configuration
export const LEDGER_CACHE_MB = 100; // Allow 100 MBytes of the ledger to exist in RAM. Not an exact science due to LEDGER_CACHE_MEMORY_FACTOR.
export const LEDGER_CACHE_MEMORY_FACTOR = 3; // Assume that ledger entries take about n times as much space when unwrapped in RAM. Used for ledger cache memory limit, does not need to be exakt.
Expand Down
51 changes: 40 additions & 11 deletions src/lib/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ export class KV extends EventEmitter {
*
* @param filePath - Path to the base file for the KV store. Index and data files will be derived from this path.
* @param createIfMissing - If true, the KV store files will be created if they do not exist. Default is true.
* @param ignoreTransactionErrors - If true, the open operation keeps going even if encountering errors, collection all of them. Default is false.
* @param ignoreReadErrors - If true, the open operation keeps going even if encountering errors, collection all of them. Default is false.
*/
public async open(
filePath: string,
Expand Down Expand Up @@ -354,7 +354,7 @@ export class KV extends EventEmitter {
* - Automatically run on adding data
* - Can be manually triggered for full consistency before data retrieval (iterate(), listAll(), get())
*
* @param ignoreTransactionErrors - If true, the sync operation keeps going even if encountering errors, collection all of them. Default is false.
* @param ignoreReadErrors - If true, the sync operation keeps going even if encountering errors, collection all of them. Default is false.
*
* @emits sync - Emits an event with the synchronization result:
* - `result`: "ready" | "blocked" | "success" | "ledgerInvalidated" | "error"
Expand All @@ -363,7 +363,7 @@ export class KV extends EventEmitter {
* @throws {Error} If an unexpected error occurs during synchronization.
*/
public async sync(
ignoreTransactioErrors: boolean = false,
ignoreReadErrors: boolean = false,
): Promise<KVSyncResult> {
// Throw if database isn't open
this.ensureOpen();
Expand All @@ -372,7 +372,10 @@ export class KV extends EventEmitter {
let result: KVSyncResult["result"] = "ready";
const errors: KVSyncErrors = [];
try {
const newTransactions = await this.ledger?.sync(this.disableIndex);
const newTransactions = await this.ledger?.sync(
this.disableIndex,
ignoreReadErrors,
);

if (newTransactions === null) { // Ledger invalidated
result = "ledgerInvalidated";
Expand All @@ -383,14 +386,25 @@ export class KV extends EventEmitter {
for (const entry of newTransactions) {
try {
this.applyTransactionToIndex(entry.transaction, entry.offset); // Refactored for clarity
if (entry.errorCorrectionOffset !== 0) {
result = "error";
errors.push(
new Error("Error processing transaction", {
cause: "Invalid transaction skipped",
}),
);
if (!ignoreReadErrors) {
break; // Stop processing on transaction error
}
}
} catch (transactionError) {
result = "error";
errors.push(
new Error("Error processing transaction", {
cause: transactionError,
}),
);
if (!ignoreTransactioErrors) {
if (!ignoreReadErrors) {
break; // Stop processing on transaction error
}
}
Expand All @@ -414,20 +428,29 @@ export class KV extends EventEmitter {
* @param query - Representation of the key to search for, or a query object for complex filters.
* @param recursive - Match all entries matching the given query, and recurse.
* @param fetchData - Return transactions with full data. Setting this to false improves performance, but does only yield transaction metadata.
* @param ignoreReadErrors - If true, the operation keeps going even if encountering errors, collection all of them. Default is false.
* @returns An async generator yielding `KVTransactionResult` objects for each matching entry.
*/
public async *scan<T = unknown>(
query: KVKey | KVQuery,
recursive: boolean = false,
fetchData: boolean = true,
ignoreReadErrors = false,
): AsyncGenerator<KVTransactionResult<T>> {
this.ensureOpen();
if (this.ledger) {
for await (
const result of this.ledger?.scan(query, recursive, fetchData)
const result of this.ledger?.scan(
query,
recursive,
fetchData,
ignoreReadErrors,
)
) {
if (result?.transaction) { // Null check to ensure safety
const processedResult = result.transaction.asResult<T>();
const processedResult = result.transaction.asResult<T>(
result.errorCorrectionOffset === 0,
);
yield processedResult;
}
}
Expand Down Expand Up @@ -487,16 +510,17 @@ export class KV extends EventEmitter {
* This operation is essential for maintaining performance as the database grows over time.
* It involves rewriting the ledger to remove deleted entries, potentially reducing its size.
*
* @param ignoreReadErrors - If true, the vacuum operation keeps going even if encountering errors, essentially repairing the ledger. Default is false.
* @remarks
* - The database is automatically re-opened after the vacuum is complete to ensure consistency.
*
* @async
*/
public async vacuum(): Promise<void> {
public async vacuum(ignoreReadErrors: boolean = false): Promise<void> {
this.ensureOpen();
this.ensureIndex();

const ledgerIsReplaced = await this.ledger?.vacuum();
const ledgerIsReplaced = await this.ledger?.vacuum(ignoreReadErrors);
if (ledgerIsReplaced) await this.open(this.ledgerPath!, false);
}

Expand Down Expand Up @@ -589,9 +613,13 @@ export class KV extends EventEmitter {

let count = 0;
for (const offset of offsets) {
const result = await this.ledger?.rawGetTransaction(offset, true);
const result = await this.ledger?.rawGetTransaction(
offset,
this.ledger.header.currentOffset,
true,
);
if (result?.transaction) {
yield result.transaction.asResult();
yield result.transaction.asResult(result.errorCorrectionOffset === 0);
count++;
}
}
Expand Down Expand Up @@ -768,6 +796,7 @@ export class KV extends EventEmitter {
length: transactionData.length,
complete: true,
transaction,
errorCorrectionOffset: 0,
},
);

Expand Down
Loading

0 comments on commit 114ddfc

Please sign in to comment.