Skip to content

Commit

Permalink
Fix file locking
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexagon committed Sep 19, 2024
1 parent b606343 commit cd343a4
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 34 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.17.1

- Fix missig awaits when closing file descriptor
- Ensure file is locked by the current process before writing anything

## 0.17.0

- Add parameter `ignoreTransactionErrors` to `open()` and `sync()`.
Expand Down
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cross/kv",
"version": "0.17.0",
"version": "0.17.1",
"exports": {
".": "./mod.ts",
"./cli": "./src/cli/mod.ts"
Expand Down
3 changes: 2 additions & 1 deletion src/lib/constants.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Configurable
export const LOCK_DEFAULT_MAX_RETRIES = 32;
export const LOCK_DEFAULT_MAX_RETRIES = 40;
export const LOCK_DEFAULT_INITIAL_RETRY_INTERVAL_MS = 30; // Increased with itself on each retry, so the actual retry interval is 20, 40, 60 etc. 32 and 20 become about 10 seconds total.
export const LOCK_STALE_TIMEOUT_MS = 6 * 60 * 60 * 1000; // Automatically unlock a ledger that has been locked for more than 2*60*60*1000 = 2 hours.
export const LEDGER_CURRENT_VERSION: string = "B017";
Expand All @@ -23,3 +23,4 @@ export const ENCODED_TRANSACTION_SIGNATURE: Uint8Array = new TextEncoder()
.encode("T;"); // Cross/Kv Transaction
export const UNLOCKED_BYTES = new Uint8Array(LOCKED_BYTES_LENGTH);
export const LOCKED_BYTES = new Uint8Array(LOCKED_BYTES_LENGTH);
export const FORCE_UNLOCK_SIGNAL = 1;
18 changes: 11 additions & 7 deletions src/lib/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import {
type KVTransactionResult,
} from "./transaction.ts";
import { KVLedger } from "./ledger.ts";
import { LEDGER_CACHE_MB, SYNC_INTERVAL_MS } from "./constants.ts";
import {
FORCE_UNLOCK_SIGNAL,
LEDGER_CACHE_MB,
SYNC_INTERVAL_MS,
} from "./constants.ts";

// External dependencies
import { EventEmitter } from "node:events";
Expand Down Expand Up @@ -295,8 +299,7 @@ export class KV extends EventEmitter {
if (!this.ledger) {
throw new Error("No ledger is currently open.");
}

await this.ledger.unlock();
await this.ledger.unlock(BigInt(FORCE_UNLOCK_SIGNAL));
}

/**
Expand Down Expand Up @@ -744,6 +747,7 @@ export class KV extends EventEmitter {
public async endTransaction(): Promise<Error[]> {
this.ensureOpen();
if (!this.isInTransaction) throw new Error("Not in a transaction");
if (!this.ledger) throw new Error("No ledger open");

const bufferedTransactions: {
transaction: KVTransaction;
Expand All @@ -764,7 +768,7 @@ export class KV extends EventEmitter {
currentOffset += transactionData.length;
}

await this.ledger?.lock();
const lockId = await this.ledger.lock();
let unlocked = false;
try {
// Sync before writing the transactions
Expand All @@ -774,7 +778,7 @@ export class KV extends EventEmitter {
}

// Write all buffered transactions at once and get the base offset
const baseOffset = await this.ledger?.add(bufferedTransactions);
const baseOffset = await this.ledger?.add(bufferedTransactions, lockId);

if (baseOffset === undefined) {
throw new Error(
Expand All @@ -783,7 +787,7 @@ export class KV extends EventEmitter {
}

// Unlock early if everying successed
await this.ledger?.unlock();
await this.ledger?.unlock(lockId);
unlocked = true;

// Update the index and check for errors
Expand Down Expand Up @@ -817,7 +821,7 @@ export class KV extends EventEmitter {
}
} finally {
// Back-up unlock
if (!unlocked) await this.ledger?.unlock();
if (!unlocked) await this.ledger?.unlock(lockId);
this.pendingTransactions = []; // Clear pending transactions
this.isInTransaction = false;
}
Expand Down
101 changes: 76 additions & 25 deletions src/lib/ledger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
} from "./utils/file.ts";
import {
ENCODED_TRANSACTION_SIGNATURE,
FORCE_UNLOCK_SIGNAL,
LEDGER_BASE_OFFSET,
LEDGER_CURRENT_VERSION,
LEDGER_FILE_ID,
Expand All @@ -27,6 +28,7 @@ import type { FileHandle } from "node:fs/promises";
import type { KVQuery } from "./key.ts";
import { KVLedgerCache } from "./cache.ts";
import { KVPrefetcher } from "./prefetcher.ts";
import { pseudoRandomTimestamp } from "./utils/randomts.ts";

/**
* This file handles the ledger file, which is where all persisted data of an cross/kv instance is stored.
Expand Down Expand Up @@ -157,7 +159,7 @@ export class KVLedger {
}
}
}
if (reusableFd) reusableFd.close();
if (reusableFd) await reusableFd.close();
return newTransactions;
}

Expand Down Expand Up @@ -198,7 +200,7 @@ export class KVLedger {

this.header = decoded;
} finally {
if (fd) fd.close();
if (fd) await fd.close();
}
}

Expand Down Expand Up @@ -262,7 +264,7 @@ export class KVLedger {
}
}

if (reusableFd) reusableFd.close();
if (reusableFd) await reusableFd.close();
}

public async writeHeader() {
Expand Down Expand Up @@ -295,7 +297,7 @@ export class KVLedger {
// Write the header data
await writeAtPosition(fd, new Uint8Array(headerBuffer), 0);
} finally {
if (fd) fd.close();
if (fd) await fd.close();
}
}

Expand All @@ -310,7 +312,7 @@ export class KVLedger {
*/
public async add(transactionsData: {
transactionData: Uint8Array;
}[]): Promise<number> {
}[], lockId: bigint): Promise<number> {
this.ensureOpen();

// Used to return the first offset of the series
Expand All @@ -323,6 +325,12 @@ export class KVLedger {
let fd;
try {
fd = await rawOpen(this.dataPath, true);

// Verify the lock just before writing
if (!await this.verifyLock(lockId)) {
throw new Error("Invalid lock");
}

for (const { transactionData } of transactionsData) {
// Append each transaction data
await writeAtPosition(fd, transactionData, currentOffset);
Expand All @@ -334,7 +342,7 @@ export class KVLedger {
}
await this.writeHeader(); // Update the on disk header with the new offset
} finally {
if (fd) fd.close();
if (fd) await fd.close();
}
return baseOffset;
}
Expand Down Expand Up @@ -437,7 +445,7 @@ export class KVLedger {
baseOffset + errorCorrectionOffset,
result,
);
if (fd && !externalFd) fd.close();
if (fd && !externalFd) await fd.close();
return result;
}
} catch (error) {
Expand All @@ -449,7 +457,7 @@ export class KVLedger {
errorCorrectionOffset += 1;
}
}
if (fd && !externalFd) fd.close();
if (fd && !externalFd) await fd.close();
return null;
}

Expand All @@ -458,6 +466,7 @@ export class KVLedger {
*/
public async vacuum(ignoreReadErrors: boolean = false): Promise<boolean> {
let ledgerIsReplaced = false;
let lockId: bigint | undefined;
try {
// 1. Gather All Transaction Offsets
const allOffsets: number[] = [];
Expand Down Expand Up @@ -522,7 +531,7 @@ export class KVLedger {

// Lock the temporary ledger to prevent multiple vacuums against the same tempfile
// - Will be unlocked in the finally clause
await tempLedger.lock();
lockId = await tempLedger.lock();

// 5. Append valid transactions to the new file.
for (const validTransaction of validTransactions) {
Expand All @@ -536,7 +545,7 @@ export class KVLedger {
if (transaction) {
await tempLedger.add([{
transactionData: transaction.transaction.toUint8Array(),
}]);
}], lockId);
} else if (!ignoreReadErrors) {
throw new Error("Unexpected end of file");
}
Expand All @@ -553,7 +562,7 @@ export class KVLedger {
ledgerIsReplaced = true;
} finally {
// 9. Unlock
if (ledgerIsReplaced) await this.unlock();
if (ledgerIsReplaced && lockId) await this.unlock(lockId);
}

return ledgerIsReplaced;
Expand All @@ -563,34 +572,58 @@ export class KVLedger {
if (!this.open) throw new Error("Ledger is not opened yet.");
}

public async lock(): Promise<void> {
public async verifyLock(existingLockId: bigint): Promise<boolean> {
this.ensureOpen();

let fd;

try {
fd = await rawOpen(this.dataPath, true);

// 1. Check if already locked
const lockData = await readAtPosition(
fd,
LOCKED_BYTES_LENGTH,
LOCK_BYTE_OFFSET,
);
const existingTimestamp = new DataView(lockData.buffer).getBigUint64(
0,
false,
);
return existingTimestamp === existingLockId;
} catch (_e) {
throw new Error("Error verifying lock");
} finally {
if (fd) await fd.close();
}
}

public async lock(): Promise<bigint> {
this.ensureOpen();

const fd = await rawOpen(this.dataPath, true);
const retryInterval = LOCK_DEFAULT_INITIAL_RETRY_INTERVAL_MS; // Use provided retry interval

for (let attempt = 0; attempt < LOCK_DEFAULT_MAX_RETRIES; attempt++) {
try {
fd = await rawOpen(this.dataPath, true);

// 1. Check if already locked
const lockData = await readAtPosition(
fd,
LOCKED_BYTES_LENGTH,
LOCK_BYTE_OFFSET,
);
const existingTimestamp = new DataView(lockData.buffer).getBigUint64(
const timestamp = new DataView(lockData.buffer).getBigUint64(
0,
false,
);

// Check for stale lock
if (
existingTimestamp !== BigInt(0) &&
Date.now() - Number(existingTimestamp) > LOCK_STALE_TIMEOUT_MS
timestamp > BigInt(0) &&
BigInt(Date.now()) - BigInt(timestamp) > LOCK_STALE_TIMEOUT_MS
) {
await writeAtPosition(fd, UNLOCKED_BYTES, LOCK_BYTE_OFFSET); // Remove stale lock
} else if (existingTimestamp !== BigInt(0)) {
await this.unlock(timestamp);
} else if (timestamp > BigInt(0)) {
// File is locked, wait and retry
await new Promise((resolve) =>
setTimeout(resolve, retryInterval + attempt * retryInterval)
Expand All @@ -601,31 +634,49 @@ export class KVLedger {
// 2. Prepare lock data
const lockBytes = LOCKED_BYTES;
const lockView = new DataView(lockBytes.buffer);
lockView.setBigUint64(0, BigInt(Date.now()), false);
const lockId = pseudoRandomTimestamp(BigInt(Date.now()), 11); // A lock id is a regular timestamp with the last 11 bits scrambled
lockView.setBigUint64(0, lockId, false);

// 3. Write lock data
await writeAtPosition(fd, lockBytes, LOCK_BYTE_OFFSET);

// Wait for the next iteration of the event loop, and verify the lock
await new Promise((resolve) => setTimeout(resolve, 0));
if (!await this.verifyLock(lockId)) {
// File has been locked by another process, wait and retry
await new Promise((resolve) =>
setTimeout(resolve, retryInterval + attempt * retryInterval)
);
continue;
}

// Lock acquired!
return;
} finally {
if (fd) fd.close();
if (fd) await fd.close();

return lockId;
} catch (_e) {
/* No op */
}
}

// Could not acquire the lock after retries
throw new Error("Could not acquire database lock");
}

public async unlock(): Promise<void> {
public async unlock(lockId: bigint): Promise<void> {
let fd;
try {
fd = await rawOpen(this.dataPath, true);

// Only unlock if the lock is unchanged
if (lockId !== BigInt(FORCE_UNLOCK_SIGNAL)) {
await this.verifyLock(lockId);
}

// Write all zeros to the lock bytes
await writeAtPosition(fd, UNLOCKED_BYTES, LOCK_BYTE_OFFSET);
} finally {
if (fd) fd.close();
if (fd) await fd.close();
}
}

Expand Down
16 changes: 16 additions & 0 deletions src/lib/utils/randomts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/**
* Modifies a timestamp by replacing the specified number of bits with a random value.
*
* @param {number} timestamp - The original timestamp.
* @param {number} [numBits=10] - The number of bits to replace with randomness.
* @returns {number} - The modified timestamp.
*/
export function pseudoRandomTimestamp(
timestamp: bigint,
numBits: number = 11,
): bigint {
const randomValue = BigInt(Math.floor(Math.random() * (1 << numBits)));
const mask = BigInt(~((1 << numBits) - 1));
const modifiedTimestamp = (timestamp & mask) | randomValue;
return modifiedTimestamp;
}

0 comments on commit cd343a4

Please sign in to comment.