Skip to content

Commit

Permalink
Add .defer
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexagon committed Jul 2, 2024
1 parent 3bb6b46 commit be3aa64
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 32 deletions.
15 changes: 13 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
## 0.16.3

- Added `KV.defer(promiseToHandle, [errorHandler], [timeoutMs])` method to allow
non-awaited promises to be tracked and settled during `KV.close()`.
- `errorHandler` (optional): A function to handle errors that occur during
promise resolution/rejection. If not provided, errors will silently ignored.
- `timeoutMs` (optional): A timeout (in milliseconds) for promise resolution.
If the promise doesn't settle within this time during `KV.close()`, a
warning will be logged. Defaults to 5000ms.
- Fix cli tool not being able to open any database after a failed open
- Code refactors

## 0.16.2

- Fix for Node.js; use `readline` instead of prompt.
-
- Fix for Node.js; use `readline` instead of prompt

## 0.16.1

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ deno install -frA --name ckv jsr:@cross/kv/cli
- `on(eventName, eventData)` - Subscribes to events like `sync`,
`watchdogError`, or `closing` to get notified of specific occurrences.
- `isOpen()` - Returns true if the database is open and ready for operations.
- `defer(promiseToHandle, [errorHandler], [timeoutMs])` - Defers the
resolution or rejection of a Promise until `.close()`
- `async close()` - Closes the KV store, ensuring resources are released.
### Keys
Expand Down
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cross/kv",
"version": "0.16.2",
"version": "0.16.3",
"exports": {
".": "./mod.ts",
"./cli": "./src/cli/mod.ts"
Expand Down
1 change: 1 addition & 0 deletions src/cli/commands/open.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export async function open(
await container.db.open(dbPath, true);
return true;
} catch (e) {
container.db = undefined;
console.error(`Could not open database: ${e.message}`);
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import type { KVLedgerResult } from "./ledger.ts";
*
* This cache stores transaction results (`KVLedgerResult`) associated with their offsets within the ledger.
* It maintains a fixed maximum size and evicts the oldest entries (Least Recently Used - LRU)
* when the cache becomes full. Since the ledger is append-only, expiration is not necessary.
* when the cache becomes full.
*
* Note: The `cacheSizeBytes` property is an approximation of the cache's size and represents
* the encoded size of the transaction data on disk, not the actual memory usage of the cached objects.
Expand Down
116 changes: 101 additions & 15 deletions src/lib/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ export class KV extends EventEmitter {
private isInTransaction: boolean = false;
private watchdogTimer?: number; // Undefined if not scheduled or currently running
private watchdogPromise?: Promise<void>;
/** Used through .deferCompletion to make .close await the action */
private promiseWatchlist: Promise<unknown>[];

/**
* Initializes a new instance of the cross/kv main class `KV`.
Expand All @@ -142,6 +144,8 @@ export class KV extends EventEmitter {
constructor(options: KVOptions = {}) {
super();

this.promiseWatchlist = [];

// Validate and set options
// - autoSync
if (
Expand Down Expand Up @@ -184,6 +188,54 @@ export class KV extends EventEmitter {
this.watchdogPromise = this.watchdog();
}
}

/**
* Defers the resolution or rejection of a Promise until the `.close()` method is called.
*
* This function adds the provided promise to a `promiseWatchlist`. During the `close()` method, the database
* will wait for all promises in the watchlist to settle (resolve or reject) before finalizing the closure.
* If an `errorHandler` function is provided, it will be called with any errors that occur during the promise's
* execution. Otherwise, errors will be silently ignored.
*
* @param promiseToHandle - The Promise whose resolution or rejection is to be deferred.
* @param errorHandler - An optional function to handle errors that occur during the promise's execution.
* @returns The original promise, allowing for chaining.
*/
public defer(
promiseToHandle: Promise<unknown>,
errorHandler?: (error: unknown) => void,
): Promise<unknown> {
this.promiseWatchlist.push(promiseToHandle);

promiseToHandle.finally(() => {
this.removePromiseFromWatchlist(promiseToHandle);
}).catch((error) => {
if (errorHandler) {
errorHandler(error); // Call the custom error handler
} else {
/** Silently ignore */
}
this.removePromiseFromWatchlist(promiseToHandle);
});

return promiseToHandle;
}

/**
* Removes a Promise from the `promiseWatchlist`.
*
* This function is used internally to clean up the watchlist after a promise has been settled (resolved or rejected).
* It ensures that only pending promises remain in the watchlist.
*
* @param promiseToRemove - The Promise to remove from the watchlist.
*/
private removePromiseFromWatchlist(promiseToRemove: Promise<unknown>) {
const index = this.promiseWatchlist.indexOf(promiseToRemove);
if (index > -1) {
this.promiseWatchlist.splice(index, 1);
}
}

/**
* Opens the Key-Value store based on a provided file path.
* Initializes the index and data files.
Expand Down Expand Up @@ -795,31 +847,65 @@ export class KV extends EventEmitter {
}

/**
* Closes the database gracefully.
* Closes the database gracefully, awaiting pending promises and optionally applying a timeout.
*
* 1. Awaits all deferred promises in the `promiseWatchlist`.
* 2. Waits for any ongoing watchdog task to complete.
* 3. Emits a 'closing' event to notify listeners.
* 4. Closes the associated ledger.
*
* 1. Waits for any ongoing watchdog task to complete.
* 2. Emits a 'closing' event to notify listeners.
* 3. Closes the associated ledger.
* @param timeoutMs (optional) - The maximum time in milliseconds to wait for promises to resolve before closing. Defaults to 5000ms.
*/
public async close() {
public async close(timeoutMs = 5000) { // Default timeout of 5 seconds
// @ts-ignore emit exists
this.emit("closing");

// Used to stop any pending watchdog runs
this.aborted = true;

// Await running watchdog
await this.watchdogPromise;
try {
// Create a timeout promise
let promiseTimeout;
const timeoutPromise = new Promise((_, reject) => {
promiseTimeout = setTimeout(
() => reject(new Error("Database close timeout")),
timeoutMs,
);
});

// Abort any watchdog timer
clearTimeout(this.watchdogTimer!);
// Race to see if promises settle before the timeout
await Promise.race([
Promise.allSettled(this.promiseWatchlist),
timeoutPromise,
]);

// Clear all local variables to avoid problems with unexpected usage after closing
this.ledgerPath = undefined;
this.ledger = undefined;
this.index = new KVIndex();
this.pendingTransactions = [];
this.watchHandlers = [];
// Clear the promise timeout on success
clearTimeout(promiseTimeout);

// Await running watchdog if it hasn't been aborted
if (this.watchdogPromise) {
await this.watchdogPromise;
}
} catch (error) {
if (error.message === "Database close timeout") {
console.warn(
"Database close timed out. Some promises may not have resolved:",
this.promiseWatchlist,
);
} else {
console.error("Error during database close:", error);
}
} finally {
// Clear watchdog timer regardless of errors
clearTimeout(this.watchdogTimer!);

// Reset internal state
this.ledgerPath = undefined;
this.ledger = undefined;
this.index = new KVIndex();
this.pendingTransactions = [];
this.watchHandlers = [];
}
}

/**
Expand Down
3 changes: 2 additions & 1 deletion src/lib/ledger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
LEDGER_CURRENT_VERSION,
LEDGER_FILE_ID,
LEDGER_MAX_READ_FAILURES,
LEDGER_PREFETCH_BYTES,
LOCK_BYTE_OFFSET,
LOCK_DEFAULT_INITIAL_RETRY_INTERVAL_MS,
LOCK_DEFAULT_MAX_RETRIES,
Expand Down Expand Up @@ -78,7 +79,7 @@ export class KVLedger {
constructor(filePath: string, maxCacheSizeMBytes: number) {
this.dataPath = toNormalizedAbsolutePath(filePath);
this.cache = new KVLedgerCache(maxCacheSizeMBytes * 1024 * 1024);
this.prefetch = new KVPrefetcher();
this.prefetch = new KVPrefetcher(LEDGER_PREFETCH_BYTES);
}

/**
Expand Down
32 changes: 29 additions & 3 deletions src/lib/prefetcher.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,55 @@
import { readAtPosition } from "./utils/file.ts";
import type { FileHandle } from "node:fs/promises";
import { LEDGER_PREFETCH_BYTES } from "./constants.ts";

/**
* Manages prefetching data from files for efficient sequential reading.
*
* This class optimizes reading by fetching chunks of data larger than the requested amount,
* reducing the number of file reads needed for sequential access.
*/
export class KVPrefetcher {
private cache?: Uint8Array;
private currentChunkStart: number;
private currentChunkEnd: number;
private prefetchBytes: number;

constructor() {
constructor(prefetchBytes: number) {
this.currentChunkStart = 0;
this.currentChunkEnd = 0;
this.prefetchBytes = prefetchBytes;
}

/**
* Fetches a chunk of data from the file.
*
* @param fd The file descriptor or handle.
* @param startPosition The position to start reading from.
* @param length The desired length of the chunk.
*/
private async fetchChunk(
fd: Deno.FsFile | FileHandle,
startPosition: number,
length: number,
): Promise<void> {
const chunk = await readAtPosition(
fd,
length > LEDGER_PREFETCH_BYTES ? length : LEDGER_PREFETCH_BYTES,
length > this.prefetchBytes ? length : this.prefetchBytes,
startPosition,
);
this.cache = chunk;
this.currentChunkStart = startPosition;
this.currentChunkEnd = startPosition + chunk.length;
}

/**
* Reads data from the file, using the cache if possible.
*
* @param fd The file descriptor or handle.
* @param length The amount of data to read.
* @param position The position to start reading from.
* @returns The requested data.
* @throws {Error} If data fetching fails.
*/
public async read(
fd: Deno.FsFile | FileHandle,
length: number,
Expand All @@ -52,6 +75,9 @@ export class KVPrefetcher {
);
}

/**
* Clears the cached data.
*/
public clear() {
this.cache = undefined;
}
Expand Down
15 changes: 6 additions & 9 deletions src/lib/utils/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ export async function rawOpen(
if (CurrentRuntime === Runtime.Deno) {
return await Deno.open(filename, { read: true, write: write });
} else {
const mode = write ? "r+" : "r";
return await open(filename, mode);
return await open(filename, write ? "r+" : "r");
}
}

Expand All @@ -58,25 +57,23 @@ export async function readAtPosition(
length: number,
position: number,
): Promise<Uint8Array> {
const buffer = new Uint8Array(length);
let bytesRead: number | null;
// Deno
if (CurrentRuntime === Runtime.Deno) {
await (fd as Deno.FsFile).seek(position, Deno.SeekMode.Start);
const buffer = new Uint8Array(length);
const bytesRead = await (fd as Deno.FsFile).read(buffer);
return buffer.subarray(0, bytesRead ?? 0);
bytesRead = await (fd as Deno.FsFile).read(buffer);
// Node or Bun
} else {
// @ts-ignore cross-runtime
const buffer = Buffer.alloc(length);
const readResult = await fd.read(
buffer,
0,
length,
position,
) as FileReadResult<Uint8Array>;
const bytesRead = readResult.bytesRead as number;
return new Uint8Array(buffer.buffer, 0, bytesRead);
bytesRead = readResult.bytesRead;
}
return new Uint8Array(buffer.buffer, 0, bytesRead ?? 0);
}

/**
Expand Down
21 changes: 21 additions & 0 deletions test/kv.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -871,3 +871,24 @@ test("KV Options: throws on invalid disableIndex type", () => {
"Invalid option: disableIndex must be a boolean",
);
});

test("KV: defer function - promise rejection is handled", async () => {
const tempFilePrefix = await tempfile();
const kvStore = new KV();

const deferredPromise = new Promise<void>((_, reject) => {
setTimeout(() => {
reject(new Error("Test error"));
}, 500);
});

const then = Date.now();

await kvStore.open(tempFilePrefix);

kvStore.defer(deferredPromise);

await kvStore.close();

assertEquals(Date.now() - then >= 500, true);
});

0 comments on commit be3aa64

Please sign in to comment.