From 0a802f29269b129f96da9fa5be9ee9889c2842dc Mon Sep 17 00:00:00 2001 From: Hexagon Date: Mon, 3 Jun 2024 20:56:42 +0200 Subject: [PATCH] Various fixes --- CHANGELOG.md | 13 +++++++++++++ deno.json | 2 +- src/lib/kv.ts | 41 +++++++---------------------------------- src/lib/ledger.ts | 33 +++++++++++++++++++-------------- src/lib/transaction.ts | 2 +- 5 files changed, 41 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0819f21..24c2f78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,16 @@ +## 0.15.7 + +## Changes + +- Do not freeze the database during vacuum +- Reduce time in locked state during vacuum +- Only unlock old ledger after successful vacuum +- Only re-open ledger after successful vacuum + +## Fixes + +- Fixed Deno panic on decoding data from cache + ## 0.15.6 ## Additions diff --git a/deno.json b/deno.json index ab53666..f139761 100644 --- a/deno.json +++ b/deno.json @@ -1,6 +1,6 @@ { "name": "@cross/kv", - "version": "0.15.6", + "version": "0.15.7", "exports": { ".": "./mod.ts", "./cli": "./src/cli/mod.ts" diff --git a/src/lib/kv.ts b/src/lib/kv.ts index e288ea8..b6ad12b 100644 --- a/src/lib/kv.ts +++ b/src/lib/kv.ts @@ -20,7 +20,6 @@ import { EventEmitter } from "node:events"; export type KVSyncResultStatus = | "noop" /** No operation was performed (e.g., ledger not open). */ | "ready" /** The database is ready, no new data to synchronize. */ - | "blocked" /** Synchronization is temporarily blocked (e.g., during a vacuum). */ | "success" /** Synchronization completed successfully, new data was added. */ | "ledgerInvalidated" /** The ledger was invalidated and needs to be reopened. */ | "error"; /** An error occurred during synchronization. Check the `error` property for details. */ @@ -129,7 +128,6 @@ export class KV extends EventEmitter { private disableIndex = false; // States - private blockSync: boolean = false; // Syncing can be blocked during vacuum private aborted: boolean = false; private isInTransaction: boolean = false; private watchdogTimer?: number; // Undefined if not scheduled or currently running @@ -215,7 +213,7 @@ export class KV extends EventEmitter { // Do the initial synchronization // - If `this.autoSync` is enabled, additional synchronizations will be carried out every `this.syncIntervalMs` - const syncResult = await this.sync(true); + const syncResult = await this.sync(); if (syncResult.error) { throw syncResult.error; } @@ -294,7 +292,6 @@ export class KV extends EventEmitter { * - Automatically run on adding data * - Can be manually triggered for full consistency before data retrieval (iterate(), listAll(), get()) * - * @param force - (Optional) If true, forces synchronization even if currently blocked (e.g., vacuum). For internal use only. * @param doLock - (Optional) Locks the database before synchronization. Defaults to true. Always true unless called internally. * * @emits sync - Emits an event with the synchronization result: @@ -303,16 +300,9 @@ export class KV extends EventEmitter { * * @throws {Error} If an unexpected error occurs during synchronization. */ - public async sync(force = false, doLock = false): Promise { + public async sync(doLock = false): Promise { // Throw if database isn't open - if (force) this.ensureOpen(); - - if (this.blockSync && !force) { - const error = new Error("Store synchronization is blocked"); - // @ts-ignore .emit exists - this.emit("sync", { result: "blocked", error }); - return { result: "blocked", error }; - } + this.ensureOpen(); // Synchronization Logic (with lock if needed) let result: KVSyncResult["result"] = "ready"; @@ -429,7 +419,6 @@ export class KV extends EventEmitter { * It involves rewriting the ledger to remove deleted entries, potentially reducing its size. * * @remarks - * - Vacuuming temporarily blocks regular synchronization (`blockSync` is set to `true`). * - The database is automatically re-opened after the vacuum is complete to ensure consistency. * * @async @@ -439,16 +428,8 @@ export class KV extends EventEmitter { this.ensureOpen(); this.ensureIndex(); - this.blockSync = true; - try { - await this.ledger?.vacuum(); - - // Force re-opening the database - await this.open(this.ledgerPath!, false); - } finally { - // Ensure blockSync is reset even if vacuum throws an error - this.blockSync = false; - } + const ledgerIsReplaced = await this.ledger?.vacuum(); + if (ledgerIsReplaced) await this.open(this.ledgerPath!, false); } /** @@ -496,7 +477,7 @@ export class KV extends EventEmitter { // Throw if database isn't open this.ensureOpen(); this.ensureIndex(); - for await (const entry of this.iterate(key, 1)) { + for await (const entry of this.iterate(key, 1, true)) { return entry; } return null; @@ -610,10 +591,6 @@ export class KV extends EventEmitter { public async set(key: KVKey, value: T): Promise { // Throw if database isn't open this.ensureOpen(); - // Throw if there is an ongoing vacuum - if (this.blockSync) { - throw new Error("Can not add data during vacuuming"); - } // Ensure the key is ok const validatedKey = new KVKeyInstance(key); @@ -646,10 +623,6 @@ export class KV extends EventEmitter { async delete(key: KVKey): Promise { this.ensureOpen(); // Throw if the database isn't open - if (this.blockSync) { - throw new Error("Cannot delete data during vacuuming"); - } - const validatedKey = new KVKeyInstance(key); const pendingTransaction = new KVTransaction(); @@ -701,7 +674,7 @@ export class KV extends EventEmitter { let unlocked = false; try { // Sync before writing the transactions - const syncResult = await this.sync(false, false); + const syncResult = await this.sync(); if (syncResult.error) { throw syncResult.error; } diff --git a/src/lib/ledger.ts b/src/lib/ledger.ts index d8ceb8c..c62026a 100644 --- a/src/lib/ledger.ts +++ b/src/lib/ledger.ts @@ -404,12 +404,10 @@ export class KVLedger { /** * Caution should be taken not to carry out any other operations during a vacuum */ - public async vacuum() { - // 1. Lock for Exclusive Access - await this.lock(); - + public async vacuum(): Promise { + let ledgerIsReplaced = false; try { - // 2. Gather All Transaction Offsets + // 1. Gather All Transaction Offsets const allOffsets: number[] = []; let currentOffset = LEDGER_BASE_OFFSET; while (currentOffset < this.header.currentOffset) { @@ -419,8 +417,14 @@ export class KVLedger { ); allOffsets.push(currentOffset); currentOffset += result.length; + + // Update the header after each read, to make sure we catch any new transactions + this.readHeader(); } + // 2. Now we need to lock the ledger, as a "state" is about to be calculated + await this.lock(); + // 3. Gather Valid Transactions (in Reverse Order) const validTransactions: KVLedgerResult[] = []; const removedKeys: Set = new Set(); @@ -439,10 +443,7 @@ export class KVLedger { } } - // 4. Clear cache - this.cache.clear(); - - // 5. Compact the Data File + // 4. Compact the Data File const tempFilePath = this.dataPath + "-tmp"; const tempLedger = new KVLedger( tempFilePath, @@ -450,7 +451,7 @@ export class KVLedger { ); await tempLedger.open(true); - // 6. Append valid transactions to the new file. + // 5. Append valid transactions to the new file. for (const validTransaction of validTransactions) { const transaction = await this.rawGetTransaction( validTransaction.offset, @@ -462,16 +463,20 @@ export class KVLedger { } this.header.currentOffset = tempLedger.header.currentOffset; + // 6. Clear cache + this.cache.clear(); + // 7. Replace Original File + // - The lock flag is now set independently, no need to unlock from this point on await unlink(this.dataPath); await rename(tempFilePath, this.dataPath); - - // 8. Update the Cached Header - await this.readHeader(); + ledgerIsReplaced = true; } finally { // 9. Unlock - await this.unlock(); + if (ledgerIsReplaced) await this.unlock(); } + + return ledgerIsReplaced; } private ensureOpen(): void { diff --git a/src/lib/transaction.ts b/src/lib/transaction.ts index 730ce5e..ca852de 100644 --- a/src/lib/transaction.ts +++ b/src/lib/transaction.ts @@ -133,7 +133,7 @@ export class KVTransaction { this.operation = operation; this.timestamp = timestamp; if (value) { - this.data = encode(value); + this.data = new Uint8Array(encode(value)); this.hash = await sha1(this.data!); } }