Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use hash fields as table namespace #18

Merged
merged 4 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/redisClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ export class RedisClient implements CommonClient {
Convenient type-safe wrapper.
Returns BATCHES of keys in each iteration (as-is).
*/
scanStream(opt: ScanStreamOptions): ReadableTyped<string[]> {
scanStream(opt?: ScanStreamOptions): ReadableTyped<string[]> {
return this.redis().scanStream(opt)
}

Expand All @@ -294,11 +294,11 @@ export class RedisClient implements CommonClient {
return count
}

hscanStream(key: string, opt: ScanStreamOptions): ReadableTyped<string[]> {
hscanStream(key: string, opt?: ScanStreamOptions): ReadableTyped<string[]> {
return this.redis().hscanStream(key, opt)
}

async hscanCount(key: string, opt: ScanStreamOptions): Promise<number> {
async hscanCount(key: string, opt?: ScanStreamOptions): Promise<number> {
let count = 0

const stream = this.redis().hscanStream(key, opt)
Expand Down
119 changes: 119 additions & 0 deletions src/redisKeyValueDB2.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import {
CommonDBCreateOptions,
CommonKeyValueDB,
commonKeyValueDBFullSupport,
CommonKeyValueDBSaveBatchOptions,
IncrementTuple,
KeyValueDBTuple,
} from '@naturalcycles/db-lib'
import { _chunk } from '@naturalcycles/js-lib'
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import { RedisClient } from './redisClient'

export interface RedisKeyValueDBCfg {
client: RedisClient
}

export class RedisKeyValueDB2 implements CommonKeyValueDB, AsyncDisposable {
constructor(cfg: RedisKeyValueDBCfg) {
mrnagydavid marked this conversation as resolved.
Show resolved Hide resolved
this.client = cfg.client
}

client: RedisClient

support = {
...commonKeyValueDBFullSupport,
}

async ping(): Promise<void> {
await this.client.ping()
}

async [Symbol.asyncDispose](): Promise<void> {
await this.client.disconnect()
}

async getByIds(table: string, ids: string[]): Promise<KeyValueDBTuple[]> {
if (!ids.length) return []
// we assume that the order of returned values is the same as order of input ids
const bufs = await this.client.hmgetBuffer(table, ids)
return bufs.map((buf, i) => [ids[i], buf] as KeyValueDBTuple).filter(([_k, v]) => v !== null)
}

async deleteByIds(table: string, ids: string[]): Promise<void> {
if (!ids.length) return
await this.client.hdel(table, ids)
}

async saveBatch(
table: string,
entries: KeyValueDBTuple[],
opt?: CommonKeyValueDBSaveBatchOptions,
): Promise<void> {
if (!entries.length) return

const record = Object.fromEntries(entries)

if (opt?.expireAt) {
await this.client.hsetWithTTL(table, record, opt.expireAt)
} else {
await this.client.hset(table, record)
}
}

streamIds(table: string, limit?: number): ReadableTyped<string> {
const stream = this.client
.hscanStream(table)
.flatMap(keyValueList => {
const keys: string[] = []
keyValueList.forEach((keyOrValue, index) => {
mrnagydavid marked this conversation as resolved.
Show resolved Hide resolved
if (index % 2 !== 0) return
keys.push(keyOrValue)
})
return keys
})
.take(limit || Infinity)

return stream
}

streamValues(table: string, limit?: number): ReadableTyped<Buffer> {
return this.client
.hscanStream(table)
.flatMap(keyValueList => {
const values: string[] = []
keyValueList.forEach((keyOrValue, index) => {
if (index % 2 !== 1) return
values.push(keyOrValue)
})
return values.map(v => Buffer.from(v))
})
.take(limit || Infinity)
}

streamEntries(table: string, limit?: number): ReadableTyped<KeyValueDBTuple> {
return this.client
.hscanStream(table)
.flatMap(keyValueList => {
const entries = _chunk(keyValueList, 2) as [string, string][]
mrnagydavid marked this conversation as resolved.
Show resolved Hide resolved
return entries.map(([k, v]) => {
return [k, Buffer.from(String(v))] satisfies KeyValueDBTuple
})
})
.take(limit || Infinity)
}

async count(table: string): Promise<number> {
return await this.client.hscanCount(table)
}

async incrementBatch(table: string, increments: IncrementTuple[]): Promise<IncrementTuple[]> {
return await this.client.hincrBatch(table, increments)
}

async createTable(table: string, opt?: CommonDBCreateOptions): Promise<void> {
if (!opt?.dropIfExists) return

await this.client.del([table])
}
}
72 changes: 72 additions & 0 deletions src/test/redis2.manual.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { CommonKeyValueDao, CommonKeyValueDaoMemoCache } from '@naturalcycles/db-lib'
import { runCommonKeyValueDBTest, TEST_TABLE } from '@naturalcycles/db-lib/dist/testing'
import { runCommonKeyValueDaoTest } from '@naturalcycles/db-lib/dist/testing/keyValueDaoTest'
import { KeyValueDBTuple } from '@naturalcycles/db-lib/src/kv/commonKeyValueDB'
import { _AsyncMemo, _range, localTime, pDelay } from '@naturalcycles/js-lib'
import { RedisClient } from '../redisClient'
import { RedisKeyValueDB2 } from '../redisKeyValueDB2'

const client = new RedisClient()
const db = new RedisKeyValueDB2({ client })

const dao = new CommonKeyValueDao<string, Buffer>({
db,
table: TEST_TABLE,
})

afterAll(async () => {
await client.disconnect()
})

test('connect', async () => {
await db.ping()
})

describe('runCommonKeyValueDBTest', () => runCommonKeyValueDBTest(db))
describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(db))

// Saving expiring hash fields is not supported until Redis 7.4.0
test.skip('saveBatch with EXAT', async () => {
const testIds = _range(1, 4).map(n => `id${n}`)
const testEntries: KeyValueDBTuple[] = testIds.map(id => [id, Buffer.from(`${id}value`)])

await db.saveBatch(TEST_TABLE, testEntries, {
expireAt: localTime.now().plus(1, 'second').unix,
})
let loaded = await db.getByIds(TEST_TABLE, testIds)
expect(loaded.length).toBe(3)
await pDelay(2000)
loaded = await db.getByIds(TEST_TABLE, testIds)
expect(loaded.length).toBe(0)
})

class C {
@_AsyncMemo({
cacheFactory: () =>
new CommonKeyValueDaoMemoCache({
dao,
ttl: 1,
}),
})
async get(k: string): Promise<Buffer | null> {
console.log(`get ${k}`)
return Buffer.from(k)
}
}

const c = new C()

test('CommonKeyValueDaoMemoCache serial', async () => {
for (const _ of _range(10)) {
console.log(await c.get('key'))
await pDelay(100)
}
})

test('CommonKeyValueDaoMemoCache async swarm', async () => {
await Promise.all(
_range(30).map(async () => {
console.log(await c.get('key'))
}),
)
})