From be3aa64148d8a60666756200825ca6ea7ec9e129 Mon Sep 17 00:00:00 2001 From: Hexagon Date: Tue, 2 Jul 2024 21:44:51 +0200 Subject: [PATCH] Add .defer --- CHANGELOG.md | 15 ++++- README.md | 2 + deno.json | 2 +- src/cli/commands/open.ts | 1 + src/lib/cache.ts | 2 +- src/lib/kv.ts | 116 ++++++++++++++++++++++++++++++++++----- src/lib/ledger.ts | 3 +- src/lib/prefetcher.ts | 32 ++++++++++- src/lib/utils/file.ts | 15 ++--- test/kv.test.ts | 21 +++++++ 10 files changed, 177 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bf98f5..391139e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,18 @@ +## 0.16.3 + +- Added `KV.defer(promiseToHandle, [errorHandler], [timeoutMs])` method to allow + non-awaited promises to be tracked and settled during `KV.close()`. + - `errorHandler` (optional): A function to handle errors that occur during + promise resolution/rejection. If not provided, errors will silently ignored. + - `timeoutMs` (optional): A timeout (in milliseconds) for promise resolution. + If the promise doesn't settle within this time during `KV.close()`, a + warning will be logged. Defaults to 5000ms. +- Fix cli tool not being able to open any database after a failed open +- Code refactors + ## 0.16.2 -- Fix for Node.js; use `readline` instead of prompt. -- +- Fix for Node.js; use `readline` instead of prompt ## 0.16.1 diff --git a/README.md b/README.md index e9e8924..3f18763 100644 --- a/README.md +++ b/README.md @@ -136,6 +136,8 @@ deno install -frA --name ckv jsr:@cross/kv/cli - `on(eventName, eventData)` - Subscribes to events like `sync`, `watchdogError`, or `closing` to get notified of specific occurrences. - `isOpen()` - Returns true if the database is open and ready for operations. + - `defer(promiseToHandle, [errorHandler], [timeoutMs])` - Defers the + resolution or rejection of a Promise until `.close()` - `async close()` - Closes the KV store, ensuring resources are released. ### Keys diff --git a/deno.json b/deno.json index 53ba6e7..a8496ef 100644 --- a/deno.json +++ b/deno.json @@ -1,6 +1,6 @@ { "name": "@cross/kv", - "version": "0.16.2", + "version": "0.16.3", "exports": { ".": "./mod.ts", "./cli": "./src/cli/mod.ts" diff --git a/src/cli/commands/open.ts b/src/cli/commands/open.ts index 2a55cea..ecdd852 100644 --- a/src/cli/commands/open.ts +++ b/src/cli/commands/open.ts @@ -36,6 +36,7 @@ export async function open( await container.db.open(dbPath, true); return true; } catch (e) { + container.db = undefined; console.error(`Could not open database: ${e.message}`); return false; } diff --git a/src/lib/cache.ts b/src/lib/cache.ts index dda210a..ac2c132 100644 --- a/src/lib/cache.ts +++ b/src/lib/cache.ts @@ -6,7 +6,7 @@ import type { KVLedgerResult } from "./ledger.ts"; * * This cache stores transaction results (`KVLedgerResult`) associated with their offsets within the ledger. * It maintains a fixed maximum size and evicts the oldest entries (Least Recently Used - LRU) - * when the cache becomes full. Since the ledger is append-only, expiration is not necessary. + * when the cache becomes full. * * Note: The `cacheSizeBytes` property is an approximation of the cache's size and represents * the encoded size of the transaction data on disk, not the actual memory usage of the cached objects. diff --git a/src/lib/kv.ts b/src/lib/kv.ts index bc37b26..a039d56 100644 --- a/src/lib/kv.ts +++ b/src/lib/kv.ts @@ -131,6 +131,8 @@ export class KV extends EventEmitter { private isInTransaction: boolean = false; private watchdogTimer?: number; // Undefined if not scheduled or currently running private watchdogPromise?: Promise; + /** Used through .deferCompletion to make .close await the action */ + private promiseWatchlist: Promise[]; /** * Initializes a new instance of the cross/kv main class `KV`. @@ -142,6 +144,8 @@ export class KV extends EventEmitter { constructor(options: KVOptions = {}) { super(); + this.promiseWatchlist = []; + // Validate and set options // - autoSync if ( @@ -184,6 +188,54 @@ export class KV extends EventEmitter { this.watchdogPromise = this.watchdog(); } } + + /** + * Defers the resolution or rejection of a Promise until the `.close()` method is called. + * + * This function adds the provided promise to a `promiseWatchlist`. During the `close()` method, the database + * will wait for all promises in the watchlist to settle (resolve or reject) before finalizing the closure. + * If an `errorHandler` function is provided, it will be called with any errors that occur during the promise's + * execution. Otherwise, errors will be silently ignored. + * + * @param promiseToHandle - The Promise whose resolution or rejection is to be deferred. + * @param errorHandler - An optional function to handle errors that occur during the promise's execution. + * @returns The original promise, allowing for chaining. + */ + public defer( + promiseToHandle: Promise, + errorHandler?: (error: unknown) => void, + ): Promise { + this.promiseWatchlist.push(promiseToHandle); + + promiseToHandle.finally(() => { + this.removePromiseFromWatchlist(promiseToHandle); + }).catch((error) => { + if (errorHandler) { + errorHandler(error); // Call the custom error handler + } else { + /** Silently ignore */ + } + this.removePromiseFromWatchlist(promiseToHandle); + }); + + return promiseToHandle; + } + + /** + * Removes a Promise from the `promiseWatchlist`. + * + * This function is used internally to clean up the watchlist after a promise has been settled (resolved or rejected). + * It ensures that only pending promises remain in the watchlist. + * + * @param promiseToRemove - The Promise to remove from the watchlist. + */ + private removePromiseFromWatchlist(promiseToRemove: Promise) { + const index = this.promiseWatchlist.indexOf(promiseToRemove); + if (index > -1) { + this.promiseWatchlist.splice(index, 1); + } + } + /** * Opens the Key-Value store based on a provided file path. * Initializes the index and data files. @@ -795,31 +847,65 @@ export class KV extends EventEmitter { } /** - * Closes the database gracefully. + * Closes the database gracefully, awaiting pending promises and optionally applying a timeout. + * + * 1. Awaits all deferred promises in the `promiseWatchlist`. + * 2. Waits for any ongoing watchdog task to complete. + * 3. Emits a 'closing' event to notify listeners. + * 4. Closes the associated ledger. * - * 1. Waits for any ongoing watchdog task to complete. - * 2. Emits a 'closing' event to notify listeners. - * 3. Closes the associated ledger. + * @param timeoutMs (optional) - The maximum time in milliseconds to wait for promises to resolve before closing. Defaults to 5000ms. */ - public async close() { + public async close(timeoutMs = 5000) { // Default timeout of 5 seconds // @ts-ignore emit exists this.emit("closing"); // Used to stop any pending watchdog runs this.aborted = true; - // Await running watchdog - await this.watchdogPromise; + try { + // Create a timeout promise + let promiseTimeout; + const timeoutPromise = new Promise((_, reject) => { + promiseTimeout = setTimeout( + () => reject(new Error("Database close timeout")), + timeoutMs, + ); + }); - // Abort any watchdog timer - clearTimeout(this.watchdogTimer!); + // Race to see if promises settle before the timeout + await Promise.race([ + Promise.allSettled(this.promiseWatchlist), + timeoutPromise, + ]); - // Clear all local variables to avoid problems with unexpected usage after closing - this.ledgerPath = undefined; - this.ledger = undefined; - this.index = new KVIndex(); - this.pendingTransactions = []; - this.watchHandlers = []; + // Clear the promise timeout on success + clearTimeout(promiseTimeout); + + // Await running watchdog if it hasn't been aborted + if (this.watchdogPromise) { + await this.watchdogPromise; + } + } catch (error) { + if (error.message === "Database close timeout") { + console.warn( + "Database close timed out. Some promises may not have resolved:", + this.promiseWatchlist, + ); + } else { + console.error("Error during database close:", error); + } + } finally { + // Clear watchdog timer regardless of errors + clearTimeout(this.watchdogTimer!); + + // Reset internal state + this.ledgerPath = undefined; + this.ledger = undefined; + this.index = new KVIndex(); + this.pendingTransactions = []; + this.watchHandlers = []; + } } /** diff --git a/src/lib/ledger.ts b/src/lib/ledger.ts index 86d8252..12d0463 100644 --- a/src/lib/ledger.ts +++ b/src/lib/ledger.ts @@ -11,6 +11,7 @@ import { LEDGER_CURRENT_VERSION, LEDGER_FILE_ID, LEDGER_MAX_READ_FAILURES, + LEDGER_PREFETCH_BYTES, LOCK_BYTE_OFFSET, LOCK_DEFAULT_INITIAL_RETRY_INTERVAL_MS, LOCK_DEFAULT_MAX_RETRIES, @@ -78,7 +79,7 @@ export class KVLedger { constructor(filePath: string, maxCacheSizeMBytes: number) { this.dataPath = toNormalizedAbsolutePath(filePath); this.cache = new KVLedgerCache(maxCacheSizeMBytes * 1024 * 1024); - this.prefetch = new KVPrefetcher(); + this.prefetch = new KVPrefetcher(LEDGER_PREFETCH_BYTES); } /** diff --git a/src/lib/prefetcher.ts b/src/lib/prefetcher.ts index ef6ade7..4b92203 100644 --- a/src/lib/prefetcher.ts +++ b/src/lib/prefetcher.ts @@ -1,17 +1,31 @@ import { readAtPosition } from "./utils/file.ts"; import type { FileHandle } from "node:fs/promises"; -import { LEDGER_PREFETCH_BYTES } from "./constants.ts"; +/** + * Manages prefetching data from files for efficient sequential reading. + * + * This class optimizes reading by fetching chunks of data larger than the requested amount, + * reducing the number of file reads needed for sequential access. + */ export class KVPrefetcher { private cache?: Uint8Array; private currentChunkStart: number; private currentChunkEnd: number; + private prefetchBytes: number; - constructor() { + constructor(prefetchBytes: number) { this.currentChunkStart = 0; this.currentChunkEnd = 0; + this.prefetchBytes = prefetchBytes; } + /** + * Fetches a chunk of data from the file. + * + * @param fd The file descriptor or handle. + * @param startPosition The position to start reading from. + * @param length The desired length of the chunk. + */ private async fetchChunk( fd: Deno.FsFile | FileHandle, startPosition: number, @@ -19,7 +33,7 @@ export class KVPrefetcher { ): Promise { const chunk = await readAtPosition( fd, - length > LEDGER_PREFETCH_BYTES ? length : LEDGER_PREFETCH_BYTES, + length > this.prefetchBytes ? length : this.prefetchBytes, startPosition, ); this.cache = chunk; @@ -27,6 +41,15 @@ export class KVPrefetcher { this.currentChunkEnd = startPosition + chunk.length; } + /** + * Reads data from the file, using the cache if possible. + * + * @param fd The file descriptor or handle. + * @param length The amount of data to read. + * @param position The position to start reading from. + * @returns The requested data. + * @throws {Error} If data fetching fails. + */ public async read( fd: Deno.FsFile | FileHandle, length: number, @@ -52,6 +75,9 @@ export class KVPrefetcher { ); } + /** + * Clears the cached data. + */ public clear() { this.cache = undefined; } diff --git a/src/lib/utils/file.ts b/src/lib/utils/file.ts index efb065f..d016f06 100644 --- a/src/lib/utils/file.ts +++ b/src/lib/utils/file.ts @@ -48,8 +48,7 @@ export async function rawOpen( if (CurrentRuntime === Runtime.Deno) { return await Deno.open(filename, { read: true, write: write }); } else { - const mode = write ? "r+" : "r"; - return await open(filename, mode); + return await open(filename, write ? "r+" : "r"); } } @@ -58,25 +57,23 @@ export async function readAtPosition( length: number, position: number, ): Promise { + const buffer = new Uint8Array(length); + let bytesRead: number | null; // Deno if (CurrentRuntime === Runtime.Deno) { await (fd as Deno.FsFile).seek(position, Deno.SeekMode.Start); - const buffer = new Uint8Array(length); - const bytesRead = await (fd as Deno.FsFile).read(buffer); - return buffer.subarray(0, bytesRead ?? 0); + bytesRead = await (fd as Deno.FsFile).read(buffer); // Node or Bun } else { - // @ts-ignore cross-runtime - const buffer = Buffer.alloc(length); const readResult = await fd.read( buffer, 0, length, position, ) as FileReadResult; - const bytesRead = readResult.bytesRead as number; - return new Uint8Array(buffer.buffer, 0, bytesRead); + bytesRead = readResult.bytesRead; } + return new Uint8Array(buffer.buffer, 0, bytesRead ?? 0); } /** diff --git a/test/kv.test.ts b/test/kv.test.ts index b608fda..dda726f 100644 --- a/test/kv.test.ts +++ b/test/kv.test.ts @@ -871,3 +871,24 @@ test("KV Options: throws on invalid disableIndex type", () => { "Invalid option: disableIndex must be a boolean", ); }); + +test("KV: defer function - promise rejection is handled", async () => { + const tempFilePrefix = await tempfile(); + const kvStore = new KV(); + + const deferredPromise = new Promise((_, reject) => { + setTimeout(() => { + reject(new Error("Test error")); + }, 500); + }); + + const then = Date.now(); + + await kvStore.open(tempFilePrefix); + + kvStore.defer(deferredPromise); + + await kvStore.close(); + + assertEquals(Date.now() - then >= 500, true); +});