Skip to content

Commit

Permalink
Various fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexagon committed Jun 3, 2024
1 parent 0b1c24c commit 0a802f2
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 50 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
## 0.15.7

## Changes

- Do not freeze the database during vacuum
- Reduce time in locked state during vacuum
- Only unlock old ledger after successful vacuum
- Only re-open ledger after successful vacuum

## Fixes

- Fixed Deno panic on decoding data from cache

## 0.15.6

## Additions
Expand Down
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cross/kv",
"version": "0.15.6",
"version": "0.15.7",
"exports": {
".": "./mod.ts",
"./cli": "./src/cli/mod.ts"
Expand Down
41 changes: 7 additions & 34 deletions src/lib/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import { EventEmitter } from "node:events";
export type KVSyncResultStatus =
| "noop" /** No operation was performed (e.g., ledger not open). */
| "ready" /** The database is ready, no new data to synchronize. */
| "blocked" /** Synchronization is temporarily blocked (e.g., during a vacuum). */
| "success" /** Synchronization completed successfully, new data was added. */
| "ledgerInvalidated" /** The ledger was invalidated and needs to be reopened. */
| "error"; /** An error occurred during synchronization. Check the `error` property for details. */
Expand Down Expand Up @@ -129,7 +128,6 @@ export class KV extends EventEmitter {
private disableIndex = false;

// States
private blockSync: boolean = false; // Syncing can be blocked during vacuum
private aborted: boolean = false;
private isInTransaction: boolean = false;
private watchdogTimer?: number; // Undefined if not scheduled or currently running
Expand Down Expand Up @@ -215,7 +213,7 @@ export class KV extends EventEmitter {

// Do the initial synchronization
// - If `this.autoSync` is enabled, additional synchronizations will be carried out every `this.syncIntervalMs`
const syncResult = await this.sync(true);
const syncResult = await this.sync();
if (syncResult.error) {
throw syncResult.error;
}
Expand Down Expand Up @@ -294,7 +292,6 @@ export class KV extends EventEmitter {
* - Automatically run on adding data
* - Can be manually triggered for full consistency before data retrieval (iterate(), listAll(), get())
*
* @param force - (Optional) If true, forces synchronization even if currently blocked (e.g., vacuum). For internal use only.
* @param doLock - (Optional) Locks the database before synchronization. Defaults to true. Always true unless called internally.
*
* @emits sync - Emits an event with the synchronization result:
Expand All @@ -303,16 +300,9 @@ export class KV extends EventEmitter {
*
* @throws {Error} If an unexpected error occurs during synchronization.
*/
public async sync(force = false, doLock = false): Promise<KVSyncResult> {
public async sync(doLock = false): Promise<KVSyncResult> {
// Throw if database isn't open
if (force) this.ensureOpen();

if (this.blockSync && !force) {
const error = new Error("Store synchronization is blocked");
// @ts-ignore .emit exists
this.emit("sync", { result: "blocked", error });
return { result: "blocked", error };
}
this.ensureOpen();

// Synchronization Logic (with lock if needed)
let result: KVSyncResult["result"] = "ready";
Expand Down Expand Up @@ -429,7 +419,6 @@ export class KV extends EventEmitter {
* It involves rewriting the ledger to remove deleted entries, potentially reducing its size.
*
* @remarks
* - Vacuuming temporarily blocks regular synchronization (`blockSync` is set to `true`).
* - The database is automatically re-opened after the vacuum is complete to ensure consistency.
*
* @async
Expand All @@ -439,16 +428,8 @@ export class KV extends EventEmitter {
this.ensureOpen();
this.ensureIndex();

this.blockSync = true;
try {
await this.ledger?.vacuum();

// Force re-opening the database
await this.open(this.ledgerPath!, false);
} finally {
// Ensure blockSync is reset even if vacuum throws an error
this.blockSync = false;
}
const ledgerIsReplaced = await this.ledger?.vacuum();
if (ledgerIsReplaced) await this.open(this.ledgerPath!, false);
}

/**
Expand Down Expand Up @@ -496,7 +477,7 @@ export class KV extends EventEmitter {
// Throw if database isn't open
this.ensureOpen();
this.ensureIndex();
for await (const entry of this.iterate<T>(key, 1)) {
for await (const entry of this.iterate<T>(key, 1, true)) {
return entry;
}
return null;
Expand Down Expand Up @@ -610,10 +591,6 @@ export class KV extends EventEmitter {
public async set<T = unknown>(key: KVKey, value: T): Promise<void> {
// Throw if database isn't open
this.ensureOpen();
// Throw if there is an ongoing vacuum
if (this.blockSync) {
throw new Error("Can not add data during vacuuming");
}

// Ensure the key is ok
const validatedKey = new KVKeyInstance(key);
Expand Down Expand Up @@ -646,10 +623,6 @@ export class KV extends EventEmitter {
async delete(key: KVKey): Promise<void> {
this.ensureOpen(); // Throw if the database isn't open

if (this.blockSync) {
throw new Error("Cannot delete data during vacuuming");
}

const validatedKey = new KVKeyInstance(key);

const pendingTransaction = new KVTransaction();
Expand Down Expand Up @@ -701,7 +674,7 @@ export class KV extends EventEmitter {
let unlocked = false;
try {
// Sync before writing the transactions
const syncResult = await this.sync(false, false);
const syncResult = await this.sync();
if (syncResult.error) {
throw syncResult.error;
}
Expand Down
33 changes: 19 additions & 14 deletions src/lib/ledger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -404,12 +404,10 @@ export class KVLedger {
/**
* Caution should be taken not to carry out any other operations during a vacuum
*/
public async vacuum() {
// 1. Lock for Exclusive Access
await this.lock();

public async vacuum(): Promise<boolean> {
let ledgerIsReplaced = false;
try {
// 2. Gather All Transaction Offsets
// 1. Gather All Transaction Offsets
const allOffsets: number[] = [];
let currentOffset = LEDGER_BASE_OFFSET;
while (currentOffset < this.header.currentOffset) {
Expand All @@ -419,8 +417,14 @@ export class KVLedger {
);
allOffsets.push(currentOffset);
currentOffset += result.length;

// Update the header after each read, to make sure we catch any new transactions
this.readHeader();
}

// 2. Now we need to lock the ledger, as a "state" is about to be calculated
await this.lock();

// 3. Gather Valid Transactions (in Reverse Order)
const validTransactions: KVLedgerResult[] = [];
const removedKeys: Set<string> = new Set();
Expand All @@ -439,18 +443,15 @@ export class KVLedger {
}
}

// 4. Clear cache
this.cache.clear();

// 5. Compact the Data File
// 4. Compact the Data File
const tempFilePath = this.dataPath + "-tmp";
const tempLedger = new KVLedger(
tempFilePath,
this.cache.maxCacheSizeBytes,
);
await tempLedger.open(true);

// 6. Append valid transactions to the new file.
// 5. Append valid transactions to the new file.
for (const validTransaction of validTransactions) {
const transaction = await this.rawGetTransaction(
validTransaction.offset,
Expand All @@ -462,16 +463,20 @@ export class KVLedger {
}
this.header.currentOffset = tempLedger.header.currentOffset;

// 6. Clear cache
this.cache.clear();

// 7. Replace Original File
// - The lock flag is now set independently, no need to unlock from this point on
await unlink(this.dataPath);
await rename(tempFilePath, this.dataPath);

// 8. Update the Cached Header
await this.readHeader();
ledgerIsReplaced = true;
} finally {
// 9. Unlock
await this.unlock();
if (ledgerIsReplaced) await this.unlock();
}

return ledgerIsReplaced;
}

private ensureOpen(): void {
Expand Down
2 changes: 1 addition & 1 deletion src/lib/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ export class KVTransaction {
this.operation = operation;
this.timestamp = timestamp;
if (value) {
this.data = encode(value);
this.data = new Uint8Array(encode(value));
this.hash = await sha1(this.data!);
}
}
Expand Down

0 comments on commit 0a802f2

Please sign in to comment.