Skip to content

Commit

Permalink
feat(block-store): implement KvBlockStore and use it with lmdb im…
Browse files Browse the repository at this point in the history
…plementation

Note: there is currently a bug fetching block data by height. Something is going wrong with encoding the block hash against the height causing for prefetching to fail for certain blocks (e.g. 800079). Original assumption it was an encodign issue converting strings to buffers, but that did not solve the problem. Continuing to investigate but pushing this commit to get eyes from others.
  • Loading branch information
dtfiedler committed Nov 3, 2023
1 parent 8eaac30 commit 26380c3
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 46 deletions.
74 changes: 37 additions & 37 deletions src/store/kv-block-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@
*/
import winston from 'winston';

import { jsonBlockToMsgpack, msgpackToJsonBlock } from '../lib/encoding.js';
import {
fromB64Url,
jsonBlockToMsgpack,
msgpackToJsonBlock,
toB64Url,
} from '../lib/encoding.js';
import { KVBufferStore, PartialJsonBlockStore } from '../types.js';
import { PartialJsonBlock } from '../types.js';

export class KvTransactionStore implements PartialJsonBlockStore {
export class KvBlockStore implements PartialJsonBlockStore {
private log: winston.Logger;
private kvBufferStore: KVBufferStore;

Expand All @@ -37,11 +42,11 @@ export class KvTransactionStore implements PartialJsonBlockStore {
}

private blockHash(hash: string) {
return `${hash}-hash`;
return `#|${hash}`;
}

private blockHeight(height: number) {
return `${height}-height`;
return `H|${height}`;
}

async hasHash(hash: string): Promise<boolean> {
Expand Down Expand Up @@ -77,14 +82,18 @@ export class KvTransactionStore implements PartialJsonBlockStore {
}
return false;
}

async getByHash(hash: string): Promise<PartialJsonBlock | undefined> {
try {
if (await this.hasHash(hash)) {
const blockData = await this.kvBufferStore.get(this.blockHash(hash));
if (blockData === undefined) {
const blockDataBuffer = await this.kvBufferStore.get(
this.blockHash(hash),
);
if (blockDataBuffer === undefined) {
throw new Error('Missing block data in key/value store');
}
return msgpackToJsonBlock(blockData);
const blockData = msgpackToJsonBlock(blockDataBuffer);
return blockData;
}
} catch (error: any) {
this.log.error('Failed to get block by hash', {
Expand All @@ -96,27 +105,21 @@ export class KvTransactionStore implements PartialJsonBlockStore {
return undefined;
}

async getByHeight(number: number): Promise<PartialJsonBlock | undefined> {
async getByHeight(height: number): Promise<PartialJsonBlock | undefined> {
try {
if (await this.hasHeight(number)) {
if (await this.hasHeight(height)) {
const blockHashBuffer = await this.kvBufferStore.get(
this.blockHeight(number),
this.blockHeight(height),
);
if (blockHashBuffer === undefined) {
throw new Error('Missing block hash in key/value store for height.');
}
const blockHash = blockHashBuffer.toString();
const blockData = await this.kvBufferStore.get(
this.blockHash(blockHash),
);
if (blockData === undefined) {
throw new Error('Missing block data in key/value store for hash.');
}
return msgpackToJsonBlock(blockData);
const blockHash = toB64Url(blockHashBuffer);
return this.getByHash(blockHash);
}
} catch (error: any) {
this.log.error('Failed to get block by height', {
height: number,
height,
message: error.message,
stack: error.stack,
});
Expand Down Expand Up @@ -149,27 +152,27 @@ export class KvTransactionStore implements PartialJsonBlockStore {
}
}

async delByHeight(number: number): Promise<void> {
async delByHeight(height: number): Promise<void> {
try {
if (await this.hasHeight(number)) {
if (await this.hasHeight(height)) {
const blockHashBuffer = await this.kvBufferStore.get(
this.blockHeight(number),
this.blockHeight(height),
);
// remove height to block hash reference
await this.kvBufferStore.del(this.blockHeight(number));
await this.kvBufferStore.del(this.blockHeight(height));

if (
blockHashBuffer !== undefined &&
(await this.hasHash(blockHashBuffer.toString()))
) {
// remove the block hash
const blockHash = blockHashBuffer.toString();
await this.kvBufferStore.del(this.blockHash(blockHash));
// remove the block hash to block data reference
if (blockHashBuffer !== undefined) {
const blockHash = toB64Url(blockHashBuffer);
if (await this.hasHash(blockHash)) {
const blockHash = toB64Url(blockHashBuffer);
await this.kvBufferStore.del(this.blockHash(blockHash));
}
}
}
} catch (error: any) {
this.log.error('Failed to delete block by height', {
height: number,
height,
message: error.message,
stack: error.stack,
});
Expand All @@ -183,13 +186,10 @@ export class KvTransactionStore implements PartialJsonBlockStore {
const blockData = jsonBlockToMsgpack(block);
await this.kvBufferStore.set(this.blockHash(hash), blockData);

// store the hash by height separately, instead of also storing the height against the block data
// store the height against the hash value to avoid duplication of block data in the KV store
if (height !== undefined && !(await this.hasHeight(height))) {
const blockHashBuffer = Buffer.from(hash);
await this.kvBufferStore.set(
this.blockHeight(height),
blockHashBuffer,
);
const encodedHash = fromB64Url(hash);
await this.kvBufferStore.set(this.blockHeight(height), encodedHash);
}
}
} catch (error: any) {
Expand Down
2 changes: 1 addition & 1 deletion src/store/lmdb-kv-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class LmdbKVStore implements KVBufferStore {
}

async get(key: string): Promise<Buffer | undefined> {
return this.db.getBinary(key);
return this.db.get(key);
}

async has(key: string): Promise<boolean> {
Expand Down
33 changes: 27 additions & 6 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ import * as metrics from './metrics.js';
import { MemoryCacheArNSResolver } from './resolution/memory-cache-arns-resolver.js';
import { StreamingManifestPathResolver } from './resolution/streaming-manifest-path-resolver.js';
import { TrustedGatewayArNSResolver } from './resolution/trusted-gateway-arns-resolver.js';
import { FsBlockStore } from './store/fs-block-store.js';
import { FsChunkDataStore } from './store/fs-chunk-data-store.js';
import { FsDataStore } from './store/fs-data-store.js';
import { FsKVStore } from './store/fs-kv-store.js';
import { KvBlockStore } from './store/kv-block-store.js';
import { KvTransactionStore } from './store/kv-transaction-store.js';
import { LmdbKVStore } from './store/lmdb-kv-store.js';
import {
Expand Down Expand Up @@ -73,7 +73,9 @@ const arweave = Arweave.init({});
const txStore = new KvTransactionStore({
log,
kvBufferStore: (() => {
log.info('Creating chain cache key/value store', {
// TODO: move this to a util function that accepts path and type as args

log.info('Creating transaction cache key/value store', {
type: config.CHAIN_CACHE_TYPE,
});
switch (config.CHAIN_CACHE_TYPE) {
Expand All @@ -95,11 +97,30 @@ const txStore = new KvTransactionStore({
})(),
});

// TODO: replace with KvBlockStore
const blockStore = new FsBlockStore({
const blockStore = new KvBlockStore({
log,
baseDir: 'data/headers/partial-blocks',
tmpDir: 'data/tmp/partial-blocks',
kvBufferStore: (() => {
// TODO: move this to a util function that accepts path and type as args
log.info('Creating block cache key/value store', {
type: config.CHAIN_CACHE_TYPE,
});
switch (config.CHAIN_CACHE_TYPE) {
case 'lmdb': {
return new LmdbKVStore({
dbPath: 'data/lmdb/partial-blocks',
});
}
case 'fs': {
return new FsKVStore({
baseDir: 'data/headers/partial-blocks',
tmpDir: 'data/tmp/partial-blocks',
});
}
default: {
throw new Error(`Invalid chain cache type: ${config.CHAIN_CACHE_TYPE}`);
}
}
})(),
});

export const arweaveClient = new ArweaveCompositeClient({
Expand Down
4 changes: 2 additions & 2 deletions src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ export interface PartialJsonBlockStore {
hasHash(hash: string): Promise<boolean>;
hasHeight(height: number): Promise<boolean>;
getByHash(hash: string): Promise<PartialJsonBlock | undefined>;
getByHeight(number: number): Promise<PartialJsonBlock | undefined>;
getByHeight(height: number): Promise<PartialJsonBlock | undefined>;
delByHash(hash: string): Promise<void>;
delByHeight(number: number): Promise<void>;
delByHeight(height: number): Promise<void>;
set(block: PartialJsonBlock, height?: number): Promise<void>;
}

Expand Down

0 comments on commit 26380c3

Please sign in to comment.