Skip to content

Commit

Permalink
Atomic transactions. Faster file locking. Update docs.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexagon committed May 20, 2024
1 parent 7dec4dd commit 3dcc90c
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 314 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.12.0

- Use a header flag instead of separate file to lock/unlock database
- Update ledger version `B011`->`B012` with backwards compatibility
- Implement atomic transactions

## 0.11.0

- Update ledger version `BETA`->`B011` "Beta 0.11"
Expand Down
147 changes: 44 additions & 103 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,51 @@
[![JSR](https://jsr.io/badges/@cross/kv)](https://jsr.io/@cross/kv)
[![JSR Score](https://jsr.io/badges/@<scope>/@cross/kv)](https://jsr.io/@cross/kv)

A cross-platform, in-memory indexed and file based Key/Value database for
JavaScript and TypeScript, designed for seamless multi-process access and
compatibility across Node.js, Deno, and Bun.
An in-memory indexed and file based Key/Value database for JavaScript and
TypeScript, designed for seamless multi-process access and compatibility across
Node.js, Deno, and Bun.

_Please note that `cross/kv` is currently in **beta**. The API and features are
starting to stabilize, but are still subject to change._
```typescript
import { KV } from "@cross/kv";

// Create an instance
const db = new KV();
await db.open("data/mydatabase.db");

// Listen for new interests
db.watch(["users", {}, "interests"], (data) => {
console.log(data);
});

// Store some values/documents indexed by users.<id>.<category>
await db.set(["users", 1, "contact"], {
name: "Bob",
});
await db.set(["users", 1, "interests"], {
description: "Fishing",
});

// Display all contact information connected to users with id < 10
console.log(await db.listAll(["users", { to: 10 }, "contact"]));

db.close();
```

## Features

- **Indexed Key/Value Storage**: Store and retrieve data easily using
hierarchical keys, with an in-memory index to provide fast lookups of large
datasets.
- **Transaction-Based Storage:** Uses a single append-only transaction ledger to
store data, ensuring durability and recoverability.
- **Vacuuming:** Supports vacuuming the ledger to reclaim disk space used by
deletion transactions and deleted documents.
- **Multi-Process Support**: Multiple processes can safely access and modify the
same database concurrently, index updates are distributed automatically.
- **Cross-Runtime Compatibility:** Works in Node.js, Deno and Bun.
- **Flexible Data Types**: Store any JavaScript object, including complex types
like Maps and Sets.
- **Key Ranges:** Retrieve ranges of data efficiently directly from the index
using key ranges.
- **Efficient Key-Value Storage:** Rapid storage and retrieval using
hierarchical keys and a high-performance in-memory index.
- **Durable Transactions:** Ensure data integrity and recoverability through an
append-only transaction ledger.
- **Atomic Transactions:** Guarantee data consistency by grouping multiple
operations into a single, indivisible unit.
- **Optimized Storage:** Reclaim disk space and maintain performance through
vacuuming operations.
- **Cross-Platform & Multi-Process:** Built in pure TypeScript, working
seamlessly across Node.js, Deno, and Bun, supporting concurrent access by
multiple processes.
- **Flexible & Customizable:** Store any JavaScript object, subscribe to data
changes, and fine-tune synchronization behavior.

## Installation

Expand All @@ -42,87 +64,6 @@ deno add @cross/kv
bunx jsr add @cross/kv
```

## Simple Usage

```typescript
import { KV } from "@cross/kv";

const kvStore = new KV();

// Open the database, path and database is created if it does not exist
await kvStore.open("data/mydatabase.db");

// Set a value
await kvStore.set(["data", "username"], "Alice");

// Get a value
const username = await kvStore.get(["data", "username"]);
console.log(username); // Output: { ts: <numeric timestamp>, data "Alice" }

// Delete a key
await kvStore.delete(["data", "username"]);

// Close the database
await kvStore.close();
```

## Advanced Usage

```typescript
import { KV } from "@cross/kv";

// Create an instance
const kvStore = new KV();

// Open the database
await kvStore.open("data/mydatabase.db");

// Store some values/documents indexed by users.by_id.<id>
await kvStore.set(["users", "by_id", 1], {
name: "Bob",
created: new Date(),
interests: new Set(["fishing", "hunting"]),
});
await kvStore.set(["users", "by_id", 2], {
name: "Alice",
created: new Date(),
interests: new Set(["singing", "hunting"]),
});
await kvStore.set(["users", "by_id", 3], {
name: "Ben",
created: new Date(),
interests: new Set(["singing", "fishing"]),
});
await kvStore.set(["users", "by_id", 4], {
name: "Lisa",
created: new Date(),
interests: new Set(["reading", "fighting"]),
});
await kvStore.set(["users", "by_id", 5], {
name: "Jan",
created: new Date(),
interests: new Set(["cooking", "fighting"]),
});

// Use the index to select users between 2 and 4
const query = ["users", "by_id", { from: 2, to: 4 }];
// ... will display Document count: 3
console.log("Document count: " + kvStore.count(query));
// ... will output the objects of Alice, Ben and Lisa
for await (const entry of kvStore.iterate(query)) {
console.log(entry);
}

// Use a plain JavaScript filter (less performant) to find a user named ben
const ben = (await kvStore.listAll(["users"])).filter((user) =>
user.data.name === "Ben"
);
console.log("Ben: ", ben); // Outputs the object of Ben

// Make sure the in-memory database is in sync with storage
await kvStore.close();
```

## API Documentation

### Methods
Expand All @@ -143,9 +84,9 @@ await kvStore.close();
database.
- `unwatch(query, callback): void` - Unregisters a previously registered watch
handler.
- `beginTransaction()` - Starts a transaction.
- `async endTransaction()` - Ends a transaction, returns a list of `Errors` if
any occurred.
- `beginTransaction()` - Starts an atomic transaction.
- `async endTransaction()` - Ends an atomic transaction, returns a list of
`Errors` if any occurred.
- `async vacuum()` - Reclaims storage space.
- `on(eventName, eventData)` - Listen for events such as `sync`,
`watchdogError` or `closing`.
Expand Down
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cross/kv",
"version": "0.11.0",
"version": "0.12.0",
"exports": {
".": "./mod.ts"
},
Expand Down
6 changes: 5 additions & 1 deletion src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@
export const LOCK_DEFAULT_MAX_RETRIES = 32;
export const LOCK_DEFAULT_INITIAL_RETRY_INTERVAL_MS = 30; // Increased with itself on each retry, so the actual retry interval is 20, 40, 60 etc. 32 and 20 become about 10 seconds total.
export const LOCK_STALE_TIMEOUT_MS = 60_000;
export const LEDGER_CURRENT_VERSION: string = "B011";
export const LEDGER_CURRENT_VERSION: string = "B012";
export const SUPPORTED_LEDGER_VERSIONS: string[] = [
LEDGER_CURRENT_VERSION,
"B011",
];
export const LEDGER_MAX_READ_FAILURES = 10;
export const LEDGER_PREFETCH_BYTES = 2_048;
export const SYNC_INTERVAL_MS = 2_500; // Overridable with instance configuration

// Extremely constant
export const LEDGER_BASE_OFFSET = 256; // DO NOT CHANGE!
export const LOCKED_BYTES_LENGTH = 8; // Length of timestamp
export const LOCK_BYTE_OFFSET = LEDGER_BASE_OFFSET - LOCKED_BYTES_LENGTH; // Last 8 bytes of the header
export const KV_KEY_ALLOWED_CHARS = /^[@\p{L}\p{N}_-]+$/u; // Unicode letters and numbers, undescore, hyphen and at
export const LEDGER_FILE_ID: string = "CKVD"; // Cross/KV Database
export const TRANSACTION_SIGNATURE: string = "T;"; // Cross/Kv Transaction
export const UNLOCKED_BYTES = new Uint8Array(LOCKED_BYTES_LENGTH);
131 changes: 72 additions & 59 deletions src/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -372,36 +372,6 @@ export class KV extends EventEmitter {
this.isInTransaction = true;
}

/**
* Ends the current transaction, executing all pending operations.
*
* @returns {Promise<Error[]>} A promise resolving to an array of errors
* encountered during transaction execution (empty if successful).
*/
public async endTransaction(): Promise<Error[]> {
// Throw if database isn't open
this.ensureOpen();

if (!this.isInTransaction) throw new Error("Not in a transaction");

// Run transactions
let p = this.pendingTransactions.pop();
const errors: Error[] = [];
while (p) {
try {
await this.runTransaction(p);
} catch (e) {
errors.push(e);
}
p = this.pendingTransactions.pop();
}

// Done
this.isInTransaction = false;

return errors;
}

/**
* Ensures the database is open, throwing an error if it's not.
*
Expand Down Expand Up @@ -549,7 +519,12 @@ export class KV extends EventEmitter {

// Enqueue transaction
if (!this.isInTransaction) {
await this.runTransaction(transaction);
this.beginTransaction();
this.pendingTransactions.push(transaction);
const result = await this.endTransaction();
if (result.length) {
throw result[0];
}
} else {
this.pendingTransactions.push(transaction);
}
Expand All @@ -569,56 +544,94 @@ export class KV extends EventEmitter {

const validatedKey = new KVKeyInstance(key);

// Ensure the key exists in the index by performing a sync
await this.sync();

// Check if the key exists in the index after the sync
const keyExistsInIndex = this.index.get(validatedKey, 1);

if (!keyExistsInIndex.length) {
throw new Error("Key not found");
}

const pendingTransaction = new KVTransaction();
pendingTransaction.create(validatedKey, KVOperation.DELETE, Date.now());

if (!this.isInTransaction) {
await this.runTransaction(pendingTransaction);
this.beginTransaction();
this.pendingTransactions.push(pendingTransaction);
const result = await this.endTransaction();
if (result.length) {
throw result[0];
}
} else {
this.pendingTransactions.push(pendingTransaction);
}
}

/**
* Processes a single transaction and updates the index and data files.
* Ends the current transaction, executing all pending operations in a batched write.
*
* @param pendingTransaction - The transaction to execute.
* @returns {Promise<Error[]>} A promise resolving to an array of errors encountered during transaction execution (empty if successful).
*
* @throws {Error} If the transaction fails or if there's an issue updating the index or data files.
* @throws {Error} If not in a transaction or if the database is not open.
*/
private async runTransaction(
pendingTransaction: KVTransaction,
): Promise<void> {
public async endTransaction(): Promise<Error[]> {
this.ensureOpen();
if (!this.isInTransaction) throw new Error("Not in a transaction");

const bufferedTransactions: {
transaction: KVTransaction;
relativeOffset: number;
}[] = [];
const errors: Error[] = [];

// Prepare transaction data and offsets
let currentOffset = 0;
for (const transaction of this.pendingTransactions) {
const transactionData = await transaction.toUint8Array();
bufferedTransactions.push({ transaction, relativeOffset: currentOffset });
currentOffset += transactionData.length;
}

await this.ledger!.lock();
try {
// Always do a complete sync before a transaction
// - This will ensure that the index is is up to date, and that new
// transactions are added reflected to listeners.
// - Throw on any error
// Sync before writing the transactions
const syncResult = await this.sync(false, false);
if (syncResult.error) throw syncResult.error;
if (syncResult.error) {
throw syncResult.error;
}

// Add the transaction to the ledger
const offset = await this.ledger!.add(pendingTransaction);
if (offset) {
this.applyTransactionToIndex(pendingTransaction, offset);
} else {
throw new Error("Transaction failed, no data written.");
// Convert buffered transactions to Uint8Array[]
const transactionsData = bufferedTransactions.map(({ transaction }) =>
transaction.toUint8Array()
);

// Write all buffered transactions at once and get the base offset
const baseOffset = await this.ledger!.add(transactionsData);

// Update the index and check for errors
for (const { transaction, relativeOffset } of bufferedTransactions) {
try {
this.applyTransactionToIndex(
transaction,
baseOffset + relativeOffset,
);
} catch (error) {
errors.push(error as Error);
}
}
} finally {
await this.ledger!.unlock();
this.pendingTransactions = []; // Clear pending transactions
this.isInTransaction = false;
}

return errors;
}

/**
* Aborts the current transaction, discarding all pending operations.
*
* @throws {Error} If not in a transaction or if the database is not open.
*/
public abortTransaction(): void {
this.ensureOpen();
if (!this.isInTransaction) throw new Error("Not in a transaction");

// Clear pending transactions
this.pendingTransactions = [];
this.isInTransaction = false;
}

/**
Expand Down
Loading

0 comments on commit 3dcc90c

Please sign in to comment.