diff --git a/src/redisClient.ts b/src/redisClient.ts index 9b4f157..e4fd6b6 100644 --- a/src/redisClient.ts +++ b/src/redisClient.ts @@ -271,7 +271,7 @@ export class RedisClient implements CommonClient { Convenient type-safe wrapper. Returns BATCHES of keys in each iteration (as-is). */ - scanStream(opt: ScanStreamOptions): ReadableTyped { + scanStream(opt?: ScanStreamOptions): ReadableTyped { return this.redis().scanStream(opt) } @@ -294,11 +294,11 @@ export class RedisClient implements CommonClient { return count } - hscanStream(key: string, opt: ScanStreamOptions): ReadableTyped { + hscanStream(key: string, opt?: ScanStreamOptions): ReadableTyped { return this.redis().hscanStream(key, opt) } - async hscanCount(key: string, opt: ScanStreamOptions): Promise { + async hscanCount(key: string, opt?: ScanStreamOptions): Promise { let count = 0 const stream = this.redis().hscanStream(key, opt) diff --git a/src/redisHashKeyValueDB.ts b/src/redisHashKeyValueDB.ts index e19bfb2..55f56cb 100644 --- a/src/redisHashKeyValueDB.ts +++ b/src/redisHashKeyValueDB.ts @@ -6,46 +6,46 @@ import { IncrementTuple, KeyValueDBTuple, } from '@naturalcycles/db-lib' -import { _chunk, StringMap } from '@naturalcycles/js-lib' import { ReadableTyped } from '@naturalcycles/nodejs-lib' -import { RedisClient } from './redisClient' import { RedisKeyValueDBCfg } from './redisKeyValueDB' -export interface RedisHashKeyValueDBCfg extends RedisKeyValueDBCfg { - hashKey: string -} - -export class RedisHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable { - client: RedisClient - keyOfHashField: string - - constructor(cfg: RedisHashKeyValueDBCfg) { - this.client = cfg.client - this.keyOfHashField = cfg.hashKey - } +/** + * RedisHashKeyValueDB is a KeyValueDB implementation that uses hash fields to simulate tables. + * The value in the `table` arguments points to a hash field in Redis. + * + * The reason for having this approach and also the traditional RedisKeyValueDB is that + * the currently available Redis versions (in Memorystore, or on MacOs) do not support + * expiring hash properties. + * The expiring fields feature is important, and only available via RedisKeyValueDB. + * + * Once the available Redis version reaches 7.4.0+, + * this implementation can take over for RedisKeyValueDB. + */ +export class RedishHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable { + constructor(public cfg: RedisKeyValueDBCfg) {} support = { ...commonKeyValueDBFullSupport, } async ping(): Promise { - await this.client.ping() + await this.cfg.client.ping() } async [Symbol.asyncDispose](): Promise { - await this.client.disconnect() + await this.cfg.client.disconnect() } async getByIds(table: string, ids: string[]): Promise { if (!ids.length) return [] // we assume that the order of returned values is the same as order of input ids - const bufs = await this.client.hmgetBuffer(this.keyOfHashField, this.idsToKeys(table, ids)) + const bufs = await this.cfg.client.hmgetBuffer(table, ids) return bufs.map((buf, i) => [ids[i], buf] as KeyValueDBTuple).filter(([_k, v]) => v !== null) } async deleteByIds(table: string, ids: string[]): Promise { if (!ids.length) return - await this.client.hdel(this.keyOfHashField, this.idsToKeys(table, ids)) + await this.cfg.client.hdel(table, ids) } async saveBatch( @@ -55,106 +55,70 @@ export class RedisHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable { ): Promise { if (!entries.length) return - const entriesWithKey = entries.map(([k, v]) => [this.idToKey(table, k), v]) - const map: StringMap = Object.fromEntries(entriesWithKey) + const record = Object.fromEntries(entries) if (opt?.expireAt) { - await this.client.hsetWithTTL(this.keyOfHashField, map, opt.expireAt) + await this.cfg.client.hsetWithTTL(table, record, opt.expireAt) } else { - await this.client.hset(this.keyOfHashField, map) + await this.cfg.client.hset(table, record) } } streamIds(table: string, limit?: number): ReadableTyped { - let stream = this.client - .hscanStream(this.keyOfHashField, { - match: `${table}:*`, - }) + const stream = this.cfg.client + .hscanStream(table) .flatMap(keyValueList => { const keys: string[] = [] - keyValueList.forEach((keyOrValue, index) => { - if (index % 2 !== 0) return - keys.push(keyOrValue) - }) - return this.keysToIds(table, keys) + for (let i = 0; i < keyValueList.length; i += 2) { + keys.push(keyValueList[i]!) + } + return keys }) - - if (limit) { - stream = stream.take(limit) - } + .take(limit || Infinity) return stream } streamValues(table: string, limit?: number): ReadableTyped { - return this.client - .hscanStream(this.keyOfHashField, { - match: `${table}:*`, - }) + return this.cfg.client + .hscanStream(table) .flatMap(keyValueList => { - const values: string[] = [] - keyValueList.forEach((keyOrValue, index) => { - if (index % 2 !== 1) return - values.push(keyOrValue) - }) - return values.map(v => Buffer.from(v)) + const values: Buffer[] = [] + for (let i = 0; i < keyValueList.length; i += 2) { + const value = Buffer.from(keyValueList[i + 1]!) + values.push(value) + } + return values }) .take(limit || Infinity) } streamEntries(table: string, limit?: number): ReadableTyped { - return this.client - .hscanStream(this.keyOfHashField, { - match: `${table}:*`, - }) + return this.cfg.client + .hscanStream(table) .flatMap(keyValueList => { - const entries = _chunk(keyValueList, 2) - return entries.map(([k, v]) => { - return [this.keyToId(table, String(k)), Buffer.from(String(v))] satisfies KeyValueDBTuple - }) + const entries: [string, Buffer][] = [] + for (let i = 0; i < keyValueList.length; i += 2) { + const key = keyValueList[i]! + const value = Buffer.from(keyValueList[i + 1]!) + entries.push([key, value]) + } + return entries }) .take(limit || Infinity) } async count(table: string): Promise { - return await this.client.hscanCount(this.keyOfHashField, { - match: `${table}:*`, - }) + return await this.cfg.client.hscanCount(table) } async incrementBatch(table: string, increments: IncrementTuple[]): Promise { - const incrementTuplesWithInternalKeys = increments.map( - ([id, v]) => [this.idToKey(table, id), v] as [string, number], - ) - const resultsWithInternalKeys = await this.client.hincrBatch( - this.keyOfHashField, - incrementTuplesWithInternalKeys, - ) - const results = resultsWithInternalKeys.map( - ([k, v]) => [this.keyToId(table, k), v] as IncrementTuple, - ) - return results + return await this.cfg.client.hincrBatch(table, increments) } async createTable(table: string, opt?: CommonDBCreateOptions): Promise { if (!opt?.dropIfExists) return - await this.client.dropTable(table) - } - - private idsToKeys(table: string, ids: string[]): string[] { - return ids.map(id => this.idToKey(table, id)) - } - - private idToKey(table: string, id: string): string { - return `${table}:${id}` - } - - private keysToIds(table: string, keys: string[]): string[] { - return keys.map(key => this.keyToId(table, key)) - } - - private keyToId(table: string, key: string): string { - return key.slice(table.length + 1) + await this.cfg.client.del([table]) } } diff --git a/src/redisKeyValueDB.ts b/src/redisKeyValueDB.ts index 41a5365..908cbbc 100644 --- a/src/redisKeyValueDB.ts +++ b/src/redisKeyValueDB.ts @@ -15,34 +15,30 @@ export interface RedisKeyValueDBCfg { } export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { - constructor(cfg: RedisKeyValueDBCfg) { - this.client = cfg.client - } - - client: RedisClient + constructor(public cfg: RedisKeyValueDBCfg) {} support = { ...commonKeyValueDBFullSupport, } async ping(): Promise { - await this.client.ping() + await this.cfg.client.ping() } async [Symbol.asyncDispose](): Promise { - await this.client.disconnect() + await this.cfg.client.disconnect() } async getByIds(table: string, ids: string[]): Promise { if (!ids.length) return [] // we assume that the order of returned values is the same as order of input ids - const bufs = await this.client.mgetBuffer(this.idsToKeys(table, ids)) + const bufs = await this.cfg.client.mgetBuffer(this.idsToKeys(table, ids)) return bufs.map((buf, i) => [ids[i], buf] as KeyValueDBTuple).filter(([_k, v]) => v !== null) } async deleteByIds(table: string, ids: string[]): Promise { if (!ids.length) return - await this.client.del(this.idsToKeys(table, ids)) + await this.cfg.client.del(this.idsToKeys(table, ids)) } async saveBatch( @@ -55,7 +51,7 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { if (opt?.expireAt) { // There's no supported mset with TTL: https://stackoverflow.com/questions/16423342/redis-multi-set-with-a-ttl // so we gonna use a pipeline instead - await this.client.withPipeline(pipeline => { + await this.cfg.client.withPipeline(pipeline => { for (const [k, v] of entries) { pipeline.set(this.idToKey(table, k), v, 'EXAT', opt.expireAt!) } @@ -64,12 +60,12 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { const obj: Record = Object.fromEntries( entries.map(([k, v]) => [this.idToKey(table, k), v]) as KeyValueDBTuple[], ) - await this.client.msetBuffer(obj) + await this.cfg.client.msetBuffer(obj) } } streamIds(table: string, limit?: number): ReadableTyped { - let stream = this.client + let stream = this.cfg.client .scanStream({ match: `${table}:*`, // count: limit, // count is actually a "batchSize", not a limit @@ -84,13 +80,13 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { } streamValues(table: string, limit?: number): ReadableTyped { - return this.client + return this.cfg.client .scanStream({ match: `${table}:*`, }) .flatMap( async keys => { - return (await this.client.mgetBuffer(keys)).filter(_isTruthy) + return (await this.cfg.client.mgetBuffer(keys)).filter(_isTruthy) }, { concurrency: 16, @@ -100,14 +96,14 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { } streamEntries(table: string, limit?: number): ReadableTyped { - return this.client + return this.cfg.client .scanStream({ match: `${table}:*`, }) .flatMap( async keys => { // casting as Buffer[], because values are expected to exist for given keys - const bufs = (await this.client.mgetBuffer(keys)) as Buffer[] + const bufs = (await this.cfg.client.mgetBuffer(keys)) as Buffer[] return _zip(this.keysToIds(table, keys), bufs) }, { @@ -119,7 +115,7 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { async count(table: string): Promise { // todo: implement more efficiently, e.g via LUA? - return await this.client.scanCount({ + return await this.cfg.client.scanCount({ match: `${table}:*`, }) } @@ -128,7 +124,7 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { const incrementTuplesWithInternalKeys = increments.map( ([id, v]) => [this.idToKey(table, id), v] as [string, number], ) - const resultsWithInternalKeys = await this.client.incrBatch(incrementTuplesWithInternalKeys) + const resultsWithInternalKeys = await this.cfg.client.incrBatch(incrementTuplesWithInternalKeys) const results = resultsWithInternalKeys.map( ([k, v]) => [this.keyToId(table, k), v] as IncrementTuple, ) @@ -138,7 +134,7 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { async createTable(table: string, opt?: CommonDBCreateOptions): Promise { if (!opt?.dropIfExists) return - await this.client.dropTable(table) + await this.cfg.client.dropTable(table) } private idsToKeys(table: string, ids: string[]): string[] { diff --git a/src/test/redis.hash.manual.test.ts b/src/test/redis.hash.manual.test.ts index 297a255..46b8e55 100644 --- a/src/test/redis.hash.manual.test.ts +++ b/src/test/redis.hash.manual.test.ts @@ -1,16 +1,18 @@ -import { - runCommonKeyValueDaoTest, - runCommonKeyValueDBTest, - TEST_TABLE, -} from '@naturalcycles/db-lib/dist/testing' +import { CommonKeyValueDao, CommonKeyValueDaoMemoCache } from '@naturalcycles/db-lib' +import { runCommonKeyValueDBTest, TEST_TABLE } from '@naturalcycles/db-lib/dist/testing' +import { runCommonKeyValueDaoTest } from '@naturalcycles/db-lib/dist/testing/keyValueDaoTest' import { KeyValueDBTuple } from '@naturalcycles/db-lib/src/kv/commonKeyValueDB' -import { _range, localTime, pDelay } from '@naturalcycles/js-lib' +import { _AsyncMemo, _range, localTime, pDelay } from '@naturalcycles/js-lib' import { RedisClient } from '../redisClient' -import { RedisHashKeyValueDB } from '../redisHashKeyValueDB' +import { RedishHashKeyValueDB } from '../redisHashKeyValueDB' const client = new RedisClient() -const hashKey = 'hashField' -const db = new RedisHashKeyValueDB({ client, hashKey }) +const db = new RedishHashKeyValueDB({ client }) + +const dao = new CommonKeyValueDao({ + db, + table: TEST_TABLE, +}) afterAll(async () => { await client.disconnect() @@ -20,9 +22,10 @@ test('connect', async () => { await db.ping() }) -describe('runCommonHashKeyValueDBTest', () => runCommonKeyValueDBTest(db)) +describe('runCommonKeyValueDBTest', () => runCommonKeyValueDBTest(db)) describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(db)) +// Saving expiring hash fields is not supported until Redis 7.4.0 test.skip('saveBatch with EXAT', async () => { const testIds = _range(1, 4).map(n => `id${n}`) const testEntries: KeyValueDBTuple[] = testIds.map(id => [id, Buffer.from(`${id}value`)]) @@ -36,3 +39,34 @@ test.skip('saveBatch with EXAT', async () => { loaded = await db.getByIds(TEST_TABLE, testIds) expect(loaded.length).toBe(0) }) + +class C { + @_AsyncMemo({ + cacheFactory: () => + new CommonKeyValueDaoMemoCache({ + dao, + ttl: 1, + }), + }) + async get(k: string): Promise { + console.log(`get ${k}`) + return Buffer.from(k) + } +} + +const c = new C() + +test('CommonKeyValueDaoMemoCache serial', async () => { + for (const _ of _range(10)) { + console.log(await c.get('key')) + await pDelay(100) + } +}) + +test('CommonKeyValueDaoMemoCache async swarm', async () => { + await Promise.all( + _range(30).map(async () => { + console.log(await c.get('key')) + }), + ) +})