From cd343a48dd8aebff52a088c1a797d0231374f23e Mon Sep 17 00:00:00 2001 From: Hexagon Date: Thu, 19 Sep 2024 23:10:27 +0200 Subject: [PATCH] Fix file locking --- CHANGELOG.md | 5 ++ deno.json | 2 +- src/lib/constants.ts | 3 +- src/lib/kv.ts | 18 ++++--- src/lib/ledger.ts | 101 ++++++++++++++++++++++++++++---------- src/lib/utils/randomts.ts | 16 ++++++ 6 files changed, 111 insertions(+), 34 deletions(-) create mode 100644 src/lib/utils/randomts.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index ee01600..66ca7a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.17.1 + +- Fix missig awaits when closing file descriptor +- Ensure file is locked by the current process before writing anything + ## 0.17.0 - Add parameter `ignoreTransactionErrors` to `open()` and `sync()`. diff --git a/deno.json b/deno.json index 1c65483..df27828 100644 --- a/deno.json +++ b/deno.json @@ -1,6 +1,6 @@ { "name": "@cross/kv", - "version": "0.17.0", + "version": "0.17.1", "exports": { ".": "./mod.ts", "./cli": "./src/cli/mod.ts" diff --git a/src/lib/constants.ts b/src/lib/constants.ts index b8c05bd..8949d31 100644 --- a/src/lib/constants.ts +++ b/src/lib/constants.ts @@ -1,5 +1,5 @@ // Configurable -export const LOCK_DEFAULT_MAX_RETRIES = 32; +export const LOCK_DEFAULT_MAX_RETRIES = 40; export const LOCK_DEFAULT_INITIAL_RETRY_INTERVAL_MS = 30; // Increased with itself on each retry, so the actual retry interval is 20, 40, 60 etc. 32 and 20 become about 10 seconds total. export const LOCK_STALE_TIMEOUT_MS = 6 * 60 * 60 * 1000; // Automatically unlock a ledger that has been locked for more than 2*60*60*1000 = 2 hours. export const LEDGER_CURRENT_VERSION: string = "B017"; @@ -23,3 +23,4 @@ export const ENCODED_TRANSACTION_SIGNATURE: Uint8Array = new TextEncoder() .encode("T;"); // Cross/Kv Transaction export const UNLOCKED_BYTES = new Uint8Array(LOCKED_BYTES_LENGTH); export const LOCKED_BYTES = new Uint8Array(LOCKED_BYTES_LENGTH); +export const FORCE_UNLOCK_SIGNAL = 1; diff --git a/src/lib/kv.ts b/src/lib/kv.ts index bfd7397..0f9fb74 100644 --- a/src/lib/kv.ts +++ b/src/lib/kv.ts @@ -10,7 +10,11 @@ import { type KVTransactionResult, } from "./transaction.ts"; import { KVLedger } from "./ledger.ts"; -import { LEDGER_CACHE_MB, SYNC_INTERVAL_MS } from "./constants.ts"; +import { + FORCE_UNLOCK_SIGNAL, + LEDGER_CACHE_MB, + SYNC_INTERVAL_MS, +} from "./constants.ts"; // External dependencies import { EventEmitter } from "node:events"; @@ -295,8 +299,7 @@ export class KV extends EventEmitter { if (!this.ledger) { throw new Error("No ledger is currently open."); } - - await this.ledger.unlock(); + await this.ledger.unlock(BigInt(FORCE_UNLOCK_SIGNAL)); } /** @@ -744,6 +747,7 @@ export class KV extends EventEmitter { public async endTransaction(): Promise { this.ensureOpen(); if (!this.isInTransaction) throw new Error("Not in a transaction"); + if (!this.ledger) throw new Error("No ledger open"); const bufferedTransactions: { transaction: KVTransaction; @@ -764,7 +768,7 @@ export class KV extends EventEmitter { currentOffset += transactionData.length; } - await this.ledger?.lock(); + const lockId = await this.ledger.lock(); let unlocked = false; try { // Sync before writing the transactions @@ -774,7 +778,7 @@ export class KV extends EventEmitter { } // Write all buffered transactions at once and get the base offset - const baseOffset = await this.ledger?.add(bufferedTransactions); + const baseOffset = await this.ledger?.add(bufferedTransactions, lockId); if (baseOffset === undefined) { throw new Error( @@ -783,7 +787,7 @@ export class KV extends EventEmitter { } // Unlock early if everying successed - await this.ledger?.unlock(); + await this.ledger?.unlock(lockId); unlocked = true; // Update the index and check for errors @@ -817,7 +821,7 @@ export class KV extends EventEmitter { } } finally { // Back-up unlock - if (!unlocked) await this.ledger?.unlock(); + if (!unlocked) await this.ledger?.unlock(lockId); this.pendingTransactions = []; // Clear pending transactions this.isInTransaction = false; } diff --git a/src/lib/ledger.ts b/src/lib/ledger.ts index b19e532..a4c7afd 100644 --- a/src/lib/ledger.ts +++ b/src/lib/ledger.ts @@ -7,6 +7,7 @@ import { } from "./utils/file.ts"; import { ENCODED_TRANSACTION_SIGNATURE, + FORCE_UNLOCK_SIGNAL, LEDGER_BASE_OFFSET, LEDGER_CURRENT_VERSION, LEDGER_FILE_ID, @@ -27,6 +28,7 @@ import type { FileHandle } from "node:fs/promises"; import type { KVQuery } from "./key.ts"; import { KVLedgerCache } from "./cache.ts"; import { KVPrefetcher } from "./prefetcher.ts"; +import { pseudoRandomTimestamp } from "./utils/randomts.ts"; /** * This file handles the ledger file, which is where all persisted data of an cross/kv instance is stored. @@ -157,7 +159,7 @@ export class KVLedger { } } } - if (reusableFd) reusableFd.close(); + if (reusableFd) await reusableFd.close(); return newTransactions; } @@ -198,7 +200,7 @@ export class KVLedger { this.header = decoded; } finally { - if (fd) fd.close(); + if (fd) await fd.close(); } } @@ -262,7 +264,7 @@ export class KVLedger { } } - if (reusableFd) reusableFd.close(); + if (reusableFd) await reusableFd.close(); } public async writeHeader() { @@ -295,7 +297,7 @@ export class KVLedger { // Write the header data await writeAtPosition(fd, new Uint8Array(headerBuffer), 0); } finally { - if (fd) fd.close(); + if (fd) await fd.close(); } } @@ -310,7 +312,7 @@ export class KVLedger { */ public async add(transactionsData: { transactionData: Uint8Array; - }[]): Promise { + }[], lockId: bigint): Promise { this.ensureOpen(); // Used to return the first offset of the series @@ -323,6 +325,12 @@ export class KVLedger { let fd; try { fd = await rawOpen(this.dataPath, true); + + // Verify the lock just before writing + if (!await this.verifyLock(lockId)) { + throw new Error("Invalid lock"); + } + for (const { transactionData } of transactionsData) { // Append each transaction data await writeAtPosition(fd, transactionData, currentOffset); @@ -334,7 +342,7 @@ export class KVLedger { } await this.writeHeader(); // Update the on disk header with the new offset } finally { - if (fd) fd.close(); + if (fd) await fd.close(); } return baseOffset; } @@ -437,7 +445,7 @@ export class KVLedger { baseOffset + errorCorrectionOffset, result, ); - if (fd && !externalFd) fd.close(); + if (fd && !externalFd) await fd.close(); return result; } } catch (error) { @@ -449,7 +457,7 @@ export class KVLedger { errorCorrectionOffset += 1; } } - if (fd && !externalFd) fd.close(); + if (fd && !externalFd) await fd.close(); return null; } @@ -458,6 +466,7 @@ export class KVLedger { */ public async vacuum(ignoreReadErrors: boolean = false): Promise { let ledgerIsReplaced = false; + let lockId: bigint | undefined; try { // 1. Gather All Transaction Offsets const allOffsets: number[] = []; @@ -522,7 +531,7 @@ export class KVLedger { // Lock the temporary ledger to prevent multiple vacuums against the same tempfile // - Will be unlocked in the finally clause - await tempLedger.lock(); + lockId = await tempLedger.lock(); // 5. Append valid transactions to the new file. for (const validTransaction of validTransactions) { @@ -536,7 +545,7 @@ export class KVLedger { if (transaction) { await tempLedger.add([{ transactionData: transaction.transaction.toUint8Array(), - }]); + }], lockId); } else if (!ignoreReadErrors) { throw new Error("Unexpected end of file"); } @@ -553,7 +562,7 @@ export class KVLedger { ledgerIsReplaced = true; } finally { // 9. Unlock - if (ledgerIsReplaced) await this.unlock(); + if (ledgerIsReplaced && lockId) await this.unlock(lockId); } return ledgerIsReplaced; @@ -563,34 +572,58 @@ export class KVLedger { if (!this.open) throw new Error("Ledger is not opened yet."); } - public async lock(): Promise { + public async verifyLock(existingLockId: bigint): Promise { this.ensureOpen(); let fd; + + try { + fd = await rawOpen(this.dataPath, true); + + // 1. Check if already locked + const lockData = await readAtPosition( + fd, + LOCKED_BYTES_LENGTH, + LOCK_BYTE_OFFSET, + ); + const existingTimestamp = new DataView(lockData.buffer).getBigUint64( + 0, + false, + ); + return existingTimestamp === existingLockId; + } catch (_e) { + throw new Error("Error verifying lock"); + } finally { + if (fd) await fd.close(); + } + } + + public async lock(): Promise { + this.ensureOpen(); + + const fd = await rawOpen(this.dataPath, true); const retryInterval = LOCK_DEFAULT_INITIAL_RETRY_INTERVAL_MS; // Use provided retry interval for (let attempt = 0; attempt < LOCK_DEFAULT_MAX_RETRIES; attempt++) { try { - fd = await rawOpen(this.dataPath, true); - // 1. Check if already locked const lockData = await readAtPosition( fd, LOCKED_BYTES_LENGTH, LOCK_BYTE_OFFSET, ); - const existingTimestamp = new DataView(lockData.buffer).getBigUint64( + const timestamp = new DataView(lockData.buffer).getBigUint64( 0, false, ); // Check for stale lock if ( - existingTimestamp !== BigInt(0) && - Date.now() - Number(existingTimestamp) > LOCK_STALE_TIMEOUT_MS + timestamp > BigInt(0) && + BigInt(Date.now()) - BigInt(timestamp) > LOCK_STALE_TIMEOUT_MS ) { - await writeAtPosition(fd, UNLOCKED_BYTES, LOCK_BYTE_OFFSET); // Remove stale lock - } else if (existingTimestamp !== BigInt(0)) { + await this.unlock(timestamp); + } else if (timestamp > BigInt(0)) { // File is locked, wait and retry await new Promise((resolve) => setTimeout(resolve, retryInterval + attempt * retryInterval) @@ -601,15 +634,28 @@ export class KVLedger { // 2. Prepare lock data const lockBytes = LOCKED_BYTES; const lockView = new DataView(lockBytes.buffer); - lockView.setBigUint64(0, BigInt(Date.now()), false); + const lockId = pseudoRandomTimestamp(BigInt(Date.now()), 11); // A lock id is a regular timestamp with the last 11 bits scrambled + lockView.setBigUint64(0, lockId, false); // 3. Write lock data await writeAtPosition(fd, lockBytes, LOCK_BYTE_OFFSET); + // Wait for the next iteration of the event loop, and verify the lock + await new Promise((resolve) => setTimeout(resolve, 0)); + if (!await this.verifyLock(lockId)) { + // File has been locked by another process, wait and retry + await new Promise((resolve) => + setTimeout(resolve, retryInterval + attempt * retryInterval) + ); + continue; + } + // Lock acquired! - return; - } finally { - if (fd) fd.close(); + if (fd) await fd.close(); + + return lockId; + } catch (_e) { + /* No op */ } } @@ -617,15 +663,20 @@ export class KVLedger { throw new Error("Could not acquire database lock"); } - public async unlock(): Promise { + public async unlock(lockId: bigint): Promise { let fd; try { fd = await rawOpen(this.dataPath, true); + // Only unlock if the lock is unchanged + if (lockId !== BigInt(FORCE_UNLOCK_SIGNAL)) { + await this.verifyLock(lockId); + } + // Write all zeros to the lock bytes await writeAtPosition(fd, UNLOCKED_BYTES, LOCK_BYTE_OFFSET); } finally { - if (fd) fd.close(); + if (fd) await fd.close(); } } diff --git a/src/lib/utils/randomts.ts b/src/lib/utils/randomts.ts new file mode 100644 index 0000000..65a1ab5 --- /dev/null +++ b/src/lib/utils/randomts.ts @@ -0,0 +1,16 @@ +/** + * Modifies a timestamp by replacing the specified number of bits with a random value. + * + * @param {number} timestamp - The original timestamp. + * @param {number} [numBits=10] - The number of bits to replace with randomness. + * @returns {number} - The modified timestamp. + */ +export function pseudoRandomTimestamp( + timestamp: bigint, + numBits: number = 11, +): bigint { + const randomValue = BigInt(Math.floor(Math.random() * (1 << numBits))); + const mask = BigInt(~((1 << numBits) - 1)); + const modifiedTimestamp = (timestamp & mask) | randomValue; + return modifiedTimestamp; +}