From 34b5e76b03b3b845834223be50772fb91ca86417 Mon Sep 17 00:00:00 2001 From: Hexagon Date: Sat, 11 May 2024 01:45:15 +0200 Subject: [PATCH] Separate ledger. Refactor. Optimize locking. Add vacuum. --- README.md | 3 +- src/constants.ts | 4 + src/index.ts | 20 +-- src/kv.test.ts | 36 +++-- src/kv.ts | 322 ++++++++++++++------------------------------ src/ledger.ts | 324 +++++++++++++++++++++++++++++++++++++++++++++ src/transaction.ts | 130 +++++++++++++----- src/utils/file.ts | 99 +++++++------- 8 files changed, 609 insertions(+), 329 deletions(-) create mode 100644 src/ledger.ts diff --git a/README.md b/README.md index 355e4b3..4254184 100644 --- a/README.md +++ b/README.md @@ -111,12 +111,13 @@ await kvStore.close(); - `KV` class - `open(filepath)` - - `set(key, value, overwrite?)` + - `set(key, value)` - `get(key)` - `getMany(key)` - `delete(key)` - `beginTransaction()` - `endTransaction()` + - `vacuum()` - `close()` - `KVKey` class (Detail the constructor and methods) - `KVKeyRange` interface diff --git a/src/constants.ts b/src/constants.ts index 1725385..fb7b0ba 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -1,3 +1,7 @@ export const LOCK_DEFAULT_MAX_RETRIES = 32; export const LOCK_DEFAULT_INITIAL_RETRY_INTERVAL_MS = 20; // Increased with itself on each retry, so the actual retry interval is 20, 40, 60 etc. 32 and 20 become about 10 seconds total. export const LOCK_STALE_TIMEOUT_S = 60_000; + +export const SUPPORTED_LEDGER_VERSIONS = ["ALPH"]; + +export const SYNC_INTERVAL_MS = 1_000; diff --git a/src/index.ts b/src/index.ts index 70c513b..babb032 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,4 @@ import type { KVKey, KVKeyRange } from "./key.ts"; -import { type KVFinishedTransaction, KVOperation } from "./transaction.ts"; /** * Represents content of a node within the KVIndex tree. @@ -34,14 +33,11 @@ export class KVIndex { /** * Adds an entry to the index. - * @param transaction - The transaction to add * @throws {Error} If 'overwrite' is false and a duplicate key is found. */ - add(transaction: KVFinishedTransaction) { + add(key: KVKey, offset: number) { let current = this.index; - let lastPart; - for (const part of transaction.key.get()) { - lastPart = part; + for (const part of key.get()) { const currentPart = current.children?.get(part as string | number); if (currentPart) { current = currentPart; @@ -53,13 +49,7 @@ export class KVIndex { current = newObj; } } - if (current!.reference === undefined) { - current!.reference = transaction.offset; - } else if (transaction.oper === KVOperation.UPSERT) { - current!.reference = transaction.offset; - } else { - throw new Error(`Duplicate key: ${lastPart}`); - } + current!.reference = offset; } /** @@ -67,9 +57,9 @@ export class KVIndex { * @param transaction - The transaction to remove. * @returns The removed data row reference, or undefined if the key was not found. */ - delete(transaction: KVFinishedTransaction): number | undefined { + delete(key: KVKey): number | undefined { let current = this.index; - for (const part of transaction.key.get()) { + for (const part of key.get()) { const currentPart = current.children.get(part as (string | number)); if (!currentPart || !currentPart.children) { // Key path not found return undefined; diff --git a/src/kv.test.ts b/src/kv.test.ts index 518f656..e9462f2 100644 --- a/src/kv.test.ts +++ b/src/kv.test.ts @@ -17,6 +17,8 @@ test("KV: set, get and delete (numbers and strings)", async () => { assertEquals(await kvStore.get(["name"]), null); assertEquals((await kvStore.get(["age"]))?.data, 30); + + await kvStore.close(); }); test("KV: set, get and delete (big numbers)", async () => { @@ -33,12 +35,16 @@ test("KV: set, get and delete (big numbers)", async () => { 54645645646546345634523452345234545464, ); + kvStore.close(); + const kvStore2 = new KV(); await kvStore2.open(tempFilePrefix); assertEquals( (await kvStore2.get(["num", 54645645646546345634523452345234545464]))?.data, 54645645646546345634523452345234545464, ); + + kvStore2.close(); }); test("KV: set, get and delete (objects)", async () => { @@ -55,6 +61,8 @@ test("KV: set, get and delete (objects)", async () => { assertEquals(await kvStore.get(["name"]), null); assertEquals((await kvStore.get(["age"]))?.data, { data: 30 }); + + await kvStore.close(); }); test("KV: set, get and delete (dates)", async () => { @@ -73,20 +81,8 @@ test("KV: set, get and delete (dates)", async () => { ); await kvStore.delete(["pointintime"]); assertEquals(await kvStore.get(["pointintime"]), null); -}); - -test("KV: throws on duplicate key insertion", async () => { - const tempFilePrefix = await tempfile(); - const kvStore = new KV(); - await kvStore.open(tempFilePrefix); - - await kvStore.set(["name"], "Alice"); - assertRejects( - async () => await kvStore.set(["name"], "Bob"), - Error, - "Duplicate key: Key already exists", - ); + await kvStore.close(); }); test("KV: throws when trying to delete a non-existing key", async () => { @@ -98,6 +94,8 @@ test("KV: throws when trying to delete a non-existing key", async () => { async () => await kvStore.delete(["unknownKey"]), Error, ); // We don't have a specific error type for this yet + + await kvStore.close(); }); test("KV: supports multi-level nested keys", async () => { @@ -110,6 +108,8 @@ test("KV: supports multi-level nested keys", async () => { assertEquals((await kvStore.get(["data", "user", "name"]))?.data, "Alice"); assertEquals((await kvStore.get(["data", "system", "version"]))?.data, 1.2); + + await kvStore.close(); }); test("KV: supports multi-level nested keys with numbers", async () => { @@ -123,6 +123,8 @@ test("KV: supports multi-level nested keys with numbers", async () => { assertEquals((await kvStore.get(["data", "user", 4]))?.data, "Alice"); assertEquals((await kvStore.get(["data", "system", 4]))?.data, 1.2); assertEquals(await kvStore.get(["data", "system", 5]), null); + + await kvStore.close(); }); test("KV: supports numeric key ranges", async () => { @@ -141,6 +143,8 @@ test("KV: supports numeric key ranges", async () => { assertEquals(rangeResults[0].data, "Value 7"); assertEquals(rangeResults[1].data, "Value 8"); assertEquals(rangeResults[2].data, "Value 9"); + + await kvStore.close(); }); test("KV: supports additional levels after numeric key ranges", async () => { @@ -164,6 +168,8 @@ test("KV: supports additional levels after numeric key ranges", async () => { assertEquals(rangeResults[0].data, "Value 7 in doc1"); assertEquals(rangeResults[1].data, "Value 8 in doc1"); assertEquals(rangeResults[2].data, "Value 9 in doc1"); + + await kvStore.close(); }); test("KV: supports empty numeric key ranges to get all", async () => { @@ -186,6 +192,8 @@ test("KV: supports empty numeric key ranges to get all", async () => { assertEquals(rangeResults3.length, 12); const rangeResults4 = await kvStore.getMany(["data"]); assertEquals(rangeResults4.length, 12); + + await kvStore.close(); }); test("KV: supports string key ranges", async () => { @@ -205,4 +213,6 @@ test("KV: supports string key ranges", async () => { }]); assertEquals(rangeResults.length, 2); assertEquals(rangeResults[0].data, "Document A"); + + await kvStore.close(); }); diff --git a/src/kv.ts b/src/kv.ts index e60d53c..a4456a4 100644 --- a/src/kv.ts +++ b/src/kv.ts @@ -1,21 +1,9 @@ // deno-lint-ignore-file no-explicit-any import { KVIndex } from "./index.ts"; import { KVKey, type KVKeyRepresentation } from "./key.ts"; -import { - ensureFile, - lock, - readAtPosition, - toAbsolutePath, - unlock, - writeAtPosition, -} from "./utils/file.ts"; -import { readFile, stat } from "@cross/fs"; -import { - type KVFinishedTransaction, - KVOperation, - type KVPendingTransaction, -} from "./transaction.ts"; -import { extDecoder, extEncoder } from "./cbor.ts"; +import { KVOperation, KVTransaction } from "./transaction.ts"; +import { KVLedger } from "./ledger.ts"; +import { SYNC_INTERVAL_MS } from "./constants.ts"; /** * Represents a single data entry within the Key-Value store. @@ -38,13 +26,16 @@ export interface KVDataEntry { export class KV { private index: KVIndex = new KVIndex(); - private pendingTransactions: KVPendingTransaction[] = []; + private pendingTransactions: KVTransaction[] = []; private isInTransaction: boolean = false; - private dataPath?: string; - private transactionsPath?: string; + private ledger?: KVLedger; + private watchdogTimer?: number; - constructor() {} + private aborted: boolean = false; + + constructor() { + } /** * Opens the Key-Value store based on a provided file path. @@ -53,18 +44,48 @@ export class KV { * @param filePath - Path to the base file for the KV store. * Index and data files will be derived from this path. */ - public async open(filePath: string) { - const transactionsPath = toAbsolutePath(filePath + ".tlog"); - await ensureFile(transactionsPath); - this.transactionsPath = transactionsPath; + public async open(filePath: string, createIfMissing: boolean = true) { + this.ledger = new KVLedger(filePath); + await this.ledger.open(createIfMissing); + await this.watchdog(); + } - this.dataPath = toAbsolutePath(filePath + ".data"); - await ensureFile(this.dataPath); + /** + * Starts a watchdog function that periodically syncs the ledger with disk + */ + private async watchdog() { + if (this.aborted) return; + try { + const newTransactions = await this.ledger?.sync(); + if (newTransactions) { + for (const entry of newTransactions) { + try { + // Apply transaction to the index + switch (entry.operation) { + case KVOperation.SET: + this.index.add(entry.key, entry.offset); + break; + case KVOperation.DELETE: + this.index.delete(entry.key); + break; + } + } catch (_e) { + console.error(_e); + throw new Error("Error while encoding data"); + } + } + } + } catch (error) { + console.error("Error in watchdog sync:", error); + } - // Initial load of the transaction log into the index - await this.restoreTransactionLog(); + // Reschedule + this.watchdogTimer = setTimeout(() => this.watchdog(), SYNC_INTERVAL_MS); } + public async vacuum(): Promise { + await this.ledger?.vacuum(); + } /** * Begins a new transaction. * @throws {Error} If already in a transaction. @@ -101,46 +122,6 @@ export class KV { return errors; } - /** - * Loads all KVFinishedTransaction entries from the transaction log and - * replays them to rebuild the index state. - * - * @private - */ - private async restoreTransactionLog() { - this.ensureOpen(); - await lock(this.transactionsPath!); - const transactionLog = await readFile(this.transactionsPath!); - await unlock(this.transactionsPath!); - let position = 0; - while (position < transactionLog.byteLength) { - const dataLength = new DataView(transactionLog.buffer).getUint16( - position, - false, - ); - try { - const transaction: KVFinishedTransaction = extDecoder.decode( - transactionLog.slice(position + 2, position + 2 + dataLength), - ); - // Apply transaction to the index - switch (transaction.oper) { - case KVOperation.INSERT: - case KVOperation.UPSERT: - this.index.add(transaction); - break; - case KVOperation.DELETE: - this.index.delete(transaction); - break; - } - } catch (_e) { - console.error(_e); - throw new Error("Error while encoding data"); - } - - position += 2 + dataLength; // Move to the next transaction - } - } - /** * Ensures the database is open, throwing an error if it's not. * @@ -148,7 +129,7 @@ export class KV { * @throws {Error} If the database is not open. */ private ensureOpen(): void { - if (!this.index || !this.dataPath) { + if (!this.ledger) { throw new Error("Database not open"); } } @@ -187,94 +168,20 @@ export class KV { } const results: any[] = []; let count = 0; - // Add a setting to enable locks during reads - // await lock(this.dataPath!); for (const offset of offsets) { - count++; - const lengthPrefixBuffer = await readAtPosition( - this.dataPath!, - 2, - offset, - ); - const dataLength = new DataView(lengthPrefixBuffer.buffer).getUint16( - 0, - false, - ); - const dataBuffer = await readAtPosition( - this.dataPath!, - dataLength, - offset + 2, - ); - results.push(extDecoder.decode(dataBuffer)); + const result = await this.ledger?.rawGetTransaction(offset); + if (result?.transaction) { + results.push({ + ts: result?.transaction.timestamp, + data: result?.transaction.value, + }); + count++; + } if (limit && count >= limit) break; } - //await unlock(this.dataPath!); return results; } - /** - * Encodes a value and adds it to the list of pending transactions - * @param encodedData - The CBOR-encoded value to write. - * @returns The offset at which the data was written. - */ - private async writeData( - pendingTransaction: KVPendingTransaction, - ): Promise { - this.ensureOpen(); - - // Create a data entry - const dataEntry: KVDataEntry = { - ts: pendingTransaction.ts, - data: pendingTransaction.data, - }; - - const encodedDataEntry = extEncoder.encode(dataEntry); - - // Get current offset (since we're appending) - - await lock(this.dataPath!); - - const stats = await stat(this.dataPath!); // Use fs.fstat instead - const originalPosition = stats.size; - - // Create array - const fullData = new Uint8Array(2 + encodedDataEntry.length); - - // Add length prefix (2 bytes) - new DataView(fullData.buffer).setUint16(0, encodedDataEntry.length, false); - - // Add data - fullData.set(encodedDataEntry, 2); - - await writeAtPosition(this.dataPath!, fullData, originalPosition); - - await unlock(this.dataPath!); - - return originalPosition; // Return the offset (where the write started) - } - - /** - * Encodes a value and adds it to the list of pending transactions - * @param encodedData - The CBOR-encoded value to write. - * @returns The offset at which the data was written. - */ - private async writeTransaction(encodedData: Uint8Array): Promise { - this.ensureOpen(); - - // Get current offset (since we're appending) - await lock(this.transactionsPath!); - const stats = await stat(this.transactionsPath!); // Use fs.fstat instead - const originalPosition = stats.size; - - // Add length prefix (2 bytes) - const fullData = new Uint8Array(2 + encodedData.length); - new DataView(fullData.buffer).setUint16(0, encodedData.length); - fullData.set(encodedData, 2); - await writeAtPosition(this.transactionsPath!, fullData, originalPosition); - await unlock(this.transactionsPath!); - return originalPosition; // Return the offset (where the write started) - } - /** * Stores a value associated with the given key, optionally overwriting existing values. * @param key - Representation of the key. @@ -284,7 +191,6 @@ export class KV { public async set( key: KVKeyRepresentation, value: any, - overwrite: boolean = false, ): Promise { // Throw if database isn't open this.ensureOpen(); @@ -292,20 +198,19 @@ export class KV { // Ensure the key is ok const validatedKey = new KVKey(key); - // Create transaction - const pendingTransaction: KVPendingTransaction = { - key: validatedKey, - oper: overwrite ? KVOperation.UPSERT : KVOperation.INSERT, - ts: new Date().getTime(), - data: value, - }; + const transaction = new KVTransaction(); + transaction.create( + validatedKey, + KVOperation.SET, + Date.now(), + value, + ); // Enqueue transaction if (!this.isInTransaction) { - await this.runTransaction(pendingTransaction); + await this.runTransaction(transaction); } else { - this.checkTransaction(pendingTransaction); - this.pendingTransactions.push(pendingTransaction); + this.pendingTransactions.push(transaction); } } @@ -314,7 +219,7 @@ export class KV { * @param key - Representation of the key. * @throws {Error} If the key is not found. */ - async delete(key: KVKeyRepresentation): Promise { + async delete(key: KVKeyRepresentation): Promise { // Throw if database isn't open this.ensureOpen(); @@ -322,37 +227,21 @@ export class KV { const validatedKey = new KVKey(key); // Create transaction - const pendingTransaction: KVPendingTransaction = { - key: validatedKey, - oper: KVOperation.DELETE, - ts: new Date().getTime(), - }; + + const pendingTransaction = new KVTransaction(); + pendingTransaction.create( + validatedKey, + KVOperation.DELETE, + Date.now(), + ); if (!this.isInTransaction) { - return await this.runTransaction(pendingTransaction); + await this.runTransaction(pendingTransaction); } else { - this.checkTransaction(pendingTransaction); this.pendingTransactions.push(pendingTransaction); } } - /** - * Checks the prerequisites of a single transaction - * - * @param pendingTransaction - The transaction to execute. - */ - checkTransaction(pendingTransaction: KVPendingTransaction): void { - this.ensureOpen(); - - // Check that the key doesn't exist - if ( - pendingTransaction.oper === KVOperation.INSERT && - this.index.get(pendingTransaction.key).length > 0 - ) { - throw new Error("Duplicate key: Key already exists"); - } - } - /** * Processes a single transaction and makes necessary updates to the index and * data files. @@ -362,49 +251,38 @@ export class KV { * @param pendingTransaction - The transaction to execute. */ async runTransaction( - pendingTransaction: KVPendingTransaction, - ): Promise { + pendingTransaction: KVTransaction, + ): Promise { this.ensureOpen(); - // 1. Check that the transaction can be carried out - // - Will throw on error - this.checkTransaction(pendingTransaction); - - // 12. Write Data if Needed - let offset; - if (pendingTransaction.data !== undefined) { - offset = await this.writeData(pendingTransaction); - } - - // 3. Create the finished transaction - const finishedTransaction: KVFinishedTransaction = { - key: pendingTransaction.key, - oper: pendingTransaction.oper, - ts: pendingTransaction.ts, - offset: offset, - }; - - // 4. Update the Index - switch (pendingTransaction.oper) { - case KVOperation.UPSERT: - case KVOperation.INSERT: - this.index.add(finishedTransaction); - break; - case KVOperation.DELETE: { - const deletedReference = this.index.delete(finishedTransaction); - if (deletedReference === undefined) { - throw new Error("Could not delete entry, key not found."); + // 2. Write Data if Needed + const offset = await this.ledger?.add(pendingTransaction); + + if (offset) { + // 3. Update the Index + switch (pendingTransaction.operation) { + case KVOperation.SET: + this.index.add( + pendingTransaction.key!, + offset, + ); + break; + case KVOperation.DELETE: { + const deletedReference = this.index.delete(pendingTransaction.key!); + if (deletedReference === undefined) { + throw new Error("Could not delete entry, key not found."); + } + break; } - break; } + } else { + throw new Error("Transaction failed, no data written."); } - - // 5. Persist the Transaction Log - const encodedTransaction = extEncoder.encode(finishedTransaction); - return await this.writeTransaction(encodedTransaction); // Append } public close() { - /* No-op for now */ + this.aborted = true; + if (this.watchdogTimer) clearTimeout(this.watchdogTimer); + this.ledger?.close(); } } diff --git a/src/ledger.ts b/src/ledger.ts new file mode 100644 index 0000000..45277b0 --- /dev/null +++ b/src/ledger.ts @@ -0,0 +1,324 @@ +import { + ensureFile, + rawOpen, + readAtPosition, + toAbsolutePath, + writeAtPosition, +} from "./utils/file.ts"; +import { lock, unlock } from "./utils/file.ts"; +import { SUPPORTED_LEDGER_VERSIONS } from "./constants.ts"; +import { KVOperation, KVTransaction } from "./transaction.ts"; +import type { KVKey } from "./key.ts"; +import { rename, unlink } from "@cross/fs"; + +export interface KVTransactionMeta { + key: KVKey; + operation: KVOperation; + offset: number; +} + +interface LedgerHeader { + fileId: string; // "CKVD", 4 bytes + ledgerVersion: string; // 4 bytes + created: number; + baseOffset: number; + currentOffset: number; +} + +export class KVLedger { + private aborted: boolean = false; + private dataPath: string; + public header: LedgerHeader = { + fileId: "CKVD", + ledgerVersion: "ALPH", + created: 0, + baseOffset: 1024, + currentOffset: 1024, + }; + constructor(filePath: string) { + this.dataPath = toAbsolutePath(filePath + ".data"); + } + + /** + * Opens the Ledger based on a provided file path. + * + * @param filePath - Path to the base file for the KV store. + */ + public async open(createIfMissing: boolean = true) { + // Make sure there is a file + const alreadyExists = await ensureFile(this.dataPath); + + // Read or create the file header + if (alreadyExists) { + /* No-op */ + } else if (createIfMissing) { + this.header.created = Date.now(); + await this.writeHeader(); + } else { + throw new Error("Database not found."); + } + } + + close() { + this.aborted = true; + } + + /** + * Synchronizes the ledger with the underlying file, retrieving any new + * transactions that have been added since the last sync. + * + * @returns A Promise resolving to an array of the newly retrieved KVTransaction objects. + */ + public async sync(): Promise { + if (this.aborted) return []; + const newTransactions = [] as KVTransactionMeta[]; + + let currentOffset = this.header.currentOffset; // Get from the cached header + + // Update offset + await lock(this.dataPath); + await this.readHeader(false); + + while (currentOffset < this.header.currentOffset) { + const result = await this.rawGetTransaction(currentOffset, false, false); + newTransactions.push({ + key: result.transaction.key!, + operation: result.transaction.operation!, + offset: currentOffset, + }); // Add the transaction + currentOffset += result.length; // Advance the offset + } + + // Update the cached header's currentOffset + this.header.currentOffset = currentOffset; + + await unlock(this.dataPath); + + return newTransactions; + } + + /** + * Reads the header from the ledger file. + * @throws If the header is invalid or cannot be read. + */ + private async readHeader(doLock: boolean = true) { + if (doLock) await lock(this.dataPath); + let fd; + try { + fd = await rawOpen(this.dataPath, false); + const headerData = await readAtPosition(fd, 1024, 0); + const decoded: LedgerHeader = { + fileId: new TextDecoder().decode(headerData.slice(0, 4)), + ledgerVersion: new TextDecoder().decode(headerData.slice(4, 8)), + created: new DataView(headerData.buffer).getUint32(8, false), + baseOffset: new DataView(headerData.buffer).getUint32(12, false), + currentOffset: new DataView(headerData.buffer).getUint32(16, false), + }; + + if (decoded.fileId !== "CKVD") { + throw new Error("Invalid database file format"); + } + + if (!SUPPORTED_LEDGER_VERSIONS.includes(decoded.ledgerVersion)) { + throw new Error("Invalid database version"); + } + + if (decoded.baseOffset < 1024) { + throw new Error("Invalid base offset"); + } + + if ( + decoded.currentOffset < 1024 || + decoded.currentOffset < decoded.baseOffset + ) { + throw new Error("Invalid offset"); + } + + this.header = decoded; + } finally { + if (fd) fd.close(); + if (doLock) await unlock(this.dataPath); + } + } + + private async writeHeader(doLock: boolean = true) { + if (doLock) await lock(this.dataPath); + let fd; + try { + fd = await rawOpen(this.dataPath, true); + // Assuming the same header structure as before + const headerDataSize = 4 + 4 + 12; // 4 bytes for fileId, 4 for version, 3x4 for numbers + const headerBuffer = new ArrayBuffer(headerDataSize); + const headerView = new DataView(headerBuffer); + + // Encode fileId + new TextEncoder().encodeInto( + this.header.fileId, + new Uint8Array(headerBuffer, 0, 4), + ); + + // Encode ledgerVersion + new TextEncoder().encodeInto( + this.header.ledgerVersion, + new Uint8Array(headerBuffer, 4, 4), + ); + + // Set numeric fields + headerView.setUint32(8, this.header.created, false); // false for little-endian + headerView.setUint32(12, this.header.baseOffset, false); + headerView.setUint32(16, this.header.currentOffset, false); + + // Write the header data + await writeAtPosition(fd, new Uint8Array(headerBuffer), 0); + } finally { + if (fd) fd.close(); + if (doLock) await unlock(this.dataPath); + } + } + + public async add( + transaction: KVTransaction, + doLock: boolean = true, + ): Promise { + const offset = this.header.currentOffset; + if (doLock) await lock(this.dataPath); + let fd; + try { + fd = await rawOpen(this.dataPath, true); + const transactionData = await transaction.toUint8Array(); + + // Append the transaction data + await writeAtPosition( + fd, + transactionData, + this.header.currentOffset, + ); + + // Update the current offset in the header + this.header.currentOffset += transactionData.length; + + await this.writeHeader(false); // Update header with new offset + } finally { + if (fd) fd.close(); + if (doLock) await unlock(this.dataPath); + } + return offset; + } + + public async rawGetTransaction( + offset: number, + doLock: boolean = false, + decodeData: boolean = true, + ): Promise<{ offset: number; length: number; transaction: KVTransaction }> { + if (doLock) await lock(this.dataPath); + let fd; + try { + fd = await rawOpen(this.dataPath, false); + const transactionLengthData = await readAtPosition(fd, 8, offset); + const headerLength = new DataView(transactionLengthData.buffer).getUint32( + 0, + false, + ); + const dataLength = new DataView(transactionLengthData.buffer).getUint32( + 4, + false, + ); + const transaction = new KVTransaction(); + + // Read transaction header + const transactionHeaderData = await readAtPosition( + fd, + headerLength, + offset + 8, + ); + transaction.headerFromUint8Array(transactionHeaderData); + + // Read transaction data (optional) + if (decodeData) { + const transactionHeaderData = await readAtPosition( + fd, + dataLength, + offset + 8 + headerLength, + ); + transaction.dataFromUint8Array(transactionHeaderData); + } + return { + offset: offset, + length: 4 + 4 + dataLength + headerLength, + transaction, + }; + } finally { + if (fd) fd.close(); + if (doLock) await unlock(this.dataPath); + } + } + + public async vacuum() { + // 1. Lock for Exclusive Access + await lock(this.dataPath); + + try { + // 2. Gather All Transaction Offsets + const allOffsets: number[] = []; + let currentOffset = this.header.baseOffset; + while (currentOffset < this.header.currentOffset) { + const result = await this.rawGetTransaction( + currentOffset, + false, + false, + ); + allOffsets.push(currentOffset); + currentOffset += result.length; + } + + // 3. Gather Valid Transactions (in Reverse Order) + const validTransactions: KVTransactionMeta[] = []; + const removedKeys: Set = new Set(); + const addedKeys: Set = new Set(); + for (let i = allOffsets.length - 1; i >= 0; i--) { + const offset = allOffsets[i]; + const result = await this.rawGetTransaction(offset, false, false); + if (result.transaction.operation === KVOperation.DELETE) { + removedKeys.add(result.transaction.key!.getKeyRepresentation()); + } else if ( + !(removedKeys.has(result.transaction.key?.getKeyRepresentation()!)) && + !(addedKeys.has(result.transaction.key?.getKeyRepresentation()!)) + ) { + addedKeys.add(result.transaction.key!.getKeyRepresentation()); + validTransactions.push({ + key: result.transaction.key!, + operation: result.transaction.operation!, + offset: offset, + }); + } + } + + // 4. Compact the Data File + const tempFilePath = this.dataPath + ".tmp"; + const tempLedger = new KVLedger(tempFilePath); + await tempLedger.open(true); + + // Append valid transactions to the new file. + for (const validTransaction of validTransactions) { + const transaction = await this.rawGetTransaction( + validTransaction.offset, + false, + true, + ); + await tempLedger.add(transaction.transaction, false); + } + this.header.currentOffset = tempLedger.header.currentOffset; + tempLedger.close(); + + // 5. Replace Original File + await unlink(this.dataPath); + await rename(tempFilePath + ".data", this.dataPath); + + // 6. Update the Cached Header + await this.readHeader(false); + } finally { + // 7. Unlock + await unlock(this.dataPath); + } + } +} diff --git a/src/transaction.ts b/src/transaction.ts index cd9d612..16a0318 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -1,57 +1,123 @@ -import type { KVKey } from "./key.ts"; +import { extDecoder, extEncoder } from "./cbor.ts"; +import { KVKey, type KVKeyRepresentation } from "./key.ts"; export enum KVOperation { - INSERT = 1, - UPSERT = 2, - DELETE = 3, + SET = 1, + DELETE = 2, } /** - * Represents content of a finished transaction + * Represents content of a transaction */ -export interface KVFinishedTransaction { +export interface KVTransactionHeader { /** * Holds the key */ - key: KVKey; + k: KVKey; /** * Holds the operation */ - oper: KVOperation; + o: KVOperation; /** * Operation timestamp */ - ts: number; - - /** - * Offset data row, added once the entry has been written to a data file - */ - offset?: number; + t: number; } -/** - * Represents content of a transaction entry outside the KVIndex tree. - */ -export interface KVPendingTransaction { - /** - * Holds the key - */ - key: KVKey; +export type KVTransactionData = Uint8Array; - /** - * Holds the operation - */ - oper: KVOperation; +// Concrete implementation of the KVTransaction interface +export class KVTransaction { + public key?: KVKey; + public operation?: KVOperation; + public timestamp?: number; + public value?: unknown; - /** - * Operation timestamp - */ - ts: number; + constructor() { + } + + public create( + key: KVKey | KVKeyRepresentation, + operation: KVOperation, + timestamp: number, + value?: unknown, + ) { + if (!(key instanceof KVKey)) { + this.key = new KVKey(key, false); + } else { + this.key = key; + } + this.operation = operation; + this.timestamp = timestamp; + this.value = value; + } + + public headerFromUint8Array(data: Uint8Array) { + // ToDo: Optimise + const decoded: KVTransactionHeader = extDecoder.decode(data); + + this.key = decoded.k; + this.operation = decoded.o; + this.timestamp = decoded.t; + } + + public dataFromUint8Array(data: Uint8Array) { + this.value = extDecoder.decode(data); + } /** - * Actual data for this transaction, ready to be written + * Return a Uint8Array consisting of data length (uint32) plus the actual data */ - data?: unknown; + public toUint8Array(): Uint8Array { + // Create transaction data + const pendingTransactionHeader: KVTransactionHeader = { + k: this.key!, + o: this.operation!, + t: this.timestamp!, + }; + + // Encode header + const encodedTransactionHeader = extEncoder.encode( + pendingTransactionHeader, + ); + + // Encode data + const pendingTransactionData = this.value + ? extEncoder.encode(this.value) + : undefined; + const pendingTransactionDataLength = pendingTransactionData + ? pendingTransactionData.length + : 0; + + // Create array + const fullData = new Uint8Array( + 4 + 4 + encodedTransactionHeader.length + pendingTransactionDataLength, + ); + + // Add header length + new DataView(fullData.buffer).setUint32( + 0, + encodedTransactionHeader.length, + false, + ); + + // Add data length + new DataView(fullData.buffer).setUint32( + 4, + pendingTransactionDataLength, + false, + ); + + fullData.set(encodedTransactionHeader, 4 + 4); + if (pendingTransactionData) { + fullData.set( + pendingTransactionData, + 4 + 4 + encodedTransactionHeader.length, + ); + } + + return fullData; + } } diff --git a/src/utils/file.ts b/src/utils/file.ts index 4f8dfee..11c1784 100644 --- a/src/utils/file.ts +++ b/src/utils/file.ts @@ -1,4 +1,4 @@ -import { open, writeFile } from "node:fs/promises"; +import { type FileHandle, open, writeFile } from "node:fs/promises"; import { CurrentRuntime, Runtime } from "@cross/runtime"; import { cwd, isDir, isFile, mkdir, stat, unlink } from "@cross/fs"; import { dirname, isAbsolute, join, resolve } from "@std/path"; @@ -19,80 +19,55 @@ export function toAbsolutePath(filename: string): string { } export async function writeAtPosition( - filename: string, + fd: Deno.FsFile | FileHandle, data: Uint8Array, position: number, ) { // Deno if (CurrentRuntime === Runtime.Deno) { - const file = await Deno.open(filename, { read: true, write: true }); - await file.seek(position, Deno.SeekMode.Start); - await file.write(data); - file.close(); + await (fd as Deno.FsFile).seek(position, Deno.SeekMode.Start); + await fd.write(data); // Node or Bun } else if (CurrentRuntime) { // Node or Bun - const fd = await open(filename, "r+"); await fd.write(data, 0, data.length, position); - await fd.close(); } } -export async function readAtPosition( +export async function rawOpen( filename: string, + write: boolean = true, +): Promise { + // Deno + if (CurrentRuntime === Runtime.Deno) { + return await Deno.open(filename, { read: true, write: write }); + } else { + const mode = write ? "r+" : "r"; + return await open(filename, mode); + } +} + +export async function readAtPosition( + fd: Deno.FsFile | FileHandle, length: number, position: number, ): Promise { // Deno if (CurrentRuntime === Runtime.Deno) { - const file = await Deno.open(filename, { read: true }); - await file.seek(position, Deno.SeekMode.Start); + await (fd as Deno.FsFile).seek(position, Deno.SeekMode.Start); const buffer = new Uint8Array(length); - await file.read(buffer); - file.close(); + await fd.read(buffer); return buffer; // Node or Bun } else { - const fd = await open(filename, "r"); // @ts-ignore cross-runtime const buffer = Buffer.alloc(length); await fd.read(buffer, 0, length, position); - await fd.close(); return buffer; } } -/** - * Creates a file if it doesn't already exist - * @param filename The file to create if it doesnt already exist - * @returns True if created, False if it already existed - * @throws If the file can not be accessed or created - */ -export async function ensureFile(filePath: string): Promise { - const dirPath = dirname(filePath); - - // First ensure dir - if (!await isDir(filePath)) { - await mkdir(dirPath, { recursive: true }); - } - - // Then ensure file - if (await isFile(filePath)) { - // Existed since before - return false; - } else { - if (CurrentRuntime === Runtime.Deno) { - const file = await Deno.create(filePath); - file.close(); - } else { // Runtime.Node - await writeFile(filePath, ""); - } - // Created - return true; - } -} - /** * Locks a file * @param filename The file to create if it doesnt already exist @@ -112,7 +87,7 @@ export async function lock(filePath: string): Promise { ) { await unlink(lockFile); } - } catch (_e) { /* Ignore */ } + } catch (_e) { /* */ } for (let attempt = 0; attempt < LOCK_DEFAULT_MAX_RETRIES; attempt++) { try { @@ -169,3 +144,35 @@ export async function unlock(filePath: string): Promise { // Could not acquire the lock after retries return true; } + +/** + * Creates a file if it doesn't already exist + * @param filename The file to create if it doesnt already exist + * @returns True if created, False if it already existed + * @throws If the file can not be accessed or created + */ +export async function ensureFile(filePath: string): Promise { + const dirPath = dirname(filePath); + + // First ensure dir + if (!await isDir(dirPath)) { + await mkdir(dirPath, { recursive: true }); + } + + // Then ensure file + if (await isFile(filePath)) { + // Existed since before + return true; + } else { + // Create new file + if (CurrentRuntime === Runtime.Deno) { + const file = await Deno.create(filePath); + file.close(); + } else { // Runtime.Node + await writeFile(filePath, ""); + } + + // Created + return false; + } +}