diff --git a/src/commondao/common.dao.ts b/src/commondao/common.dao.ts index 3b52ed7..eca7462 100644 --- a/src/commondao/common.dao.ts +++ b/src/commondao/common.dao.ts @@ -495,7 +495,7 @@ export class CommonDao { const stream = this.cfg.db.streamQuery(q, opt) const partialQuery = !!q._selectedFieldNames - if (partialQuery) return stream + if (partialQuery) return stream as any // This almost works, but hard to implement `errorMode: THROW_AGGREGATED` in this case // return stream.flatMap(async (dbm: DBM) => { @@ -551,7 +551,7 @@ export class CommonDao { // Experimental: using `.map()` const stream: ReadableTyped = this.cfg.db .streamQuery(q.select(['id']), opt) - .on('error', err => stream.emit('error', err)) + // .on('error', err => stream.emit('error', err)) .map((r: ObjectWithId) => r.id) // const stream: ReadableTyped = this.cfg.db diff --git a/src/kv/commonKeyValueDao.ts b/src/kv/commonKeyValueDao.ts index c3c341e..e6c6bad 100644 --- a/src/kv/commonKeyValueDao.ts +++ b/src/kv/commonKeyValueDao.ts @@ -201,15 +201,15 @@ export class CommonKeyValueDao { const { mapBufferToValue } = this.cfg.hooks if (!mapBufferToValue) { - return this.cfg.db.streamValues(this.cfg.table, limit) + return this.cfg.db.streamValues(this.cfg.table, limit) as ReadableTyped } const stream: ReadableTyped = this.cfg.db .streamValues(this.cfg.table, limit) - .on('error', err => stream.emit('error', err)) - .flatMap(async (buf: Buffer) => { + // .on('error', err => stream.emit('error', err)) + .flatMap(async buf => { try { - return [await mapBufferToValue(buf)] satisfies T[] + return [await mapBufferToValue(buf)] } catch (err) { this.cfg.logger.error(err) return [] // SKIP @@ -223,15 +223,17 @@ export class CommonKeyValueDao { const { mapBufferToValue } = this.cfg.hooks if (!mapBufferToValue) { - return this.cfg.db.streamEntries(this.cfg.table, limit) + return this.cfg.db.streamEntries(this.cfg.table, limit) as ReadableTyped< + KeyValueTuple + > } const stream: ReadableTyped> = this.cfg.db .streamEntries(this.cfg.table, limit) - .on('error', err => stream.emit('error', err)) - .flatMap(async ([id, buf]: KeyValueTuple) => { + // .on('error', err => stream.emit('error', err)) + .flatMap(async ([id, buf]) => { try { - return [[id, await mapBufferToValue(buf)]] satisfies KeyValueTuple[] + return [[id, await mapBufferToValue(buf)]] } catch (err) { this.cfg.logger.error(err) return [] // SKIP diff --git a/src/testing/daoTest.ts b/src/testing/daoTest.ts index fefdd19..5500f77 100644 --- a/src/testing/daoTest.ts +++ b/src/testing/daoTest.ts @@ -1,6 +1,6 @@ import { Readable } from 'node:stream' import { _deepCopy, _pick, _sortBy, _omit, localTimeNow } from '@naturalcycles/js-lib' -import { _pipeline, readableToArray } from '@naturalcycles/nodejs-lib' +import { _pipeline } from '@naturalcycles/nodejs-lib' import { CommonDaoLogLevel, DBQuery } from '..' import { CommonDB } from '../common.db' import { CommonDao } from '../commondao/common.dao' @@ -212,7 +212,7 @@ export function runCommonDaoTest(db: CommonDB, quirks: CommonDBImplementationQui }) test('streamQueryIds all', async () => { - let ids = await readableToArray(dao.query().streamQueryIds()) + let ids = await dao.query().streamQueryIds().toArray() ids = ids.sort() expectMatch( expectedItems.map(i => i.id), diff --git a/src/testing/dbTest.ts b/src/testing/dbTest.ts index b1872c9..693eb28 100644 --- a/src/testing/dbTest.ts +++ b/src/testing/dbTest.ts @@ -1,5 +1,4 @@ import { _filterObject, _pick, _sortBy, pMap } from '@naturalcycles/js-lib' -import { readableToArray } from '@naturalcycles/nodejs-lib' import { CommonDB, CommonDBType } from '../common.db' import { DBIncrement, DBPatch } from '../db.model' import { DBQuery } from '../query/dbQuery' @@ -220,7 +219,7 @@ export function runCommonDBTest(db: CommonDB, quirks: CommonDBImplementationQuir // STREAM if (support.streaming) { test('streamQuery all', async () => { - let rows = await readableToArray(db.streamQuery(queryAll())) + let rows = await db.streamQuery(queryAll()).toArray() rows = _sortBy(rows, r => r.id) // cause order is not specified in DBQuery expectMatch(items, rows, quirks) diff --git a/src/testing/keyValueDBTest.ts b/src/testing/keyValueDBTest.ts index 278e9ee..303934a 100644 --- a/src/testing/keyValueDBTest.ts +++ b/src/testing/keyValueDBTest.ts @@ -1,5 +1,4 @@ import { _range, _sortBy } from '@naturalcycles/js-lib' -import { readableToArray } from '@naturalcycles/nodejs-lib' import { CommonKeyValueDB, KeyValueDBTuple } from '../kv/commonKeyValueDB' import { TEST_TABLE } from './test.model' @@ -42,13 +41,13 @@ export function runCommonKeyValueDBTest(db: CommonKeyValueDB): void { }) test('streamIds', async () => { - const ids = await readableToArray(db.streamIds(TEST_TABLE)) + const ids = await db.streamIds(TEST_TABLE).toArray() ids.sort() expect(ids).toEqual(testIds) }) test('streamIds limited', async () => { - const idsLimited = await readableToArray(db.streamIds(TEST_TABLE, 2)) + const idsLimited = await db.streamIds(TEST_TABLE, 2).toArray() // Order is non-deterministic, so, cannot compare values // idsLimited.sort() // expect(idsLimited).toEqual(testIds.slice(0, 2)) @@ -56,26 +55,26 @@ export function runCommonKeyValueDBTest(db: CommonKeyValueDB): void { }) test('streamValues', async () => { - const values = await readableToArray(db.streamValues(TEST_TABLE)) + const values = await db.streamValues(TEST_TABLE).toArray() values.sort() expect(values).toEqual(testEntries.map(e => e[1])) }) test('streamValues limited', async () => { - const valuesLimited = await readableToArray(db.streamValues(TEST_TABLE, 2)) + const valuesLimited = await db.streamValues(TEST_TABLE, 2).toArray() // valuesLimited.sort() // expect(valuesLimited).toEqual(testEntries.map(e => e[1]).slice(0, 2)) expect(valuesLimited.length).toBe(2) }) test('streamEntries', async () => { - const entries = await readableToArray(db.streamEntries(TEST_TABLE)) + const entries = await db.streamEntries(TEST_TABLE).toArray() entries.sort() expect(entries).toEqual(testEntries) }) test('streamEntries limited', async () => { - const entriesLimited = await readableToArray(db.streamEntries(TEST_TABLE, 2)) + const entriesLimited = await db.streamEntries(TEST_TABLE, 2).toArray() // entriesLimited.sort() // expect(entriesLimited).toEqual(testEntries.slice(0, 2)) expect(entriesLimited.length).toBe(2) diff --git a/src/testing/keyValueDaoTest.ts b/src/testing/keyValueDaoTest.ts index ce38780..2ff1efb 100644 --- a/src/testing/keyValueDaoTest.ts +++ b/src/testing/keyValueDaoTest.ts @@ -1,5 +1,4 @@ import { _range, _sortBy } from '@naturalcycles/js-lib' -import { readableToArray } from '@naturalcycles/nodejs-lib' import { KeyValueDBTuple } from '../kv/commonKeyValueDB' import { CommonKeyValueDao } from '../kv/commonKeyValueDao' @@ -33,13 +32,13 @@ export function runCommonKeyValueDaoTest(dao: CommonKeyValueDao): void { }) test('streamIds', async () => { - const ids = await readableToArray(dao.streamIds()) + const ids = await dao.streamIds().toArray() ids.sort() expect(ids).toEqual(testIds) }) test('streamIds limited', async () => { - const idsLimited = await readableToArray(dao.streamIds(2)) + const idsLimited = await dao.streamIds(2).toArray() // Order is non-deterministic, so, cannot compare values // idsLimited.sort() // expect(idsLimited).toEqual(testIds.slice(0, 2)) @@ -47,26 +46,26 @@ export function runCommonKeyValueDaoTest(dao: CommonKeyValueDao): void { }) test('streamValues', async () => { - const values = await readableToArray(dao.streamValues()) + const values = await dao.streamValues().toArray() values.sort() expect(values).toEqual(testEntries.map(e => e[1])) }) test('streamValues limited', async () => { - const valuesLimited = await readableToArray(dao.streamValues(2)) + const valuesLimited = await dao.streamValues(2).toArray() // valuesLimited.sort() // expect(valuesLimited).toEqual(testEntries.map(e => e[1]).slice(0, 2)) expect(valuesLimited.length).toBe(2) }) test('streamEntries', async () => { - const entries = await readableToArray(dao.streamEntries()) + const entries = await dao.streamEntries().toArray() entries.sort() expect(entries).toEqual(testEntries) }) test('streamEntries limited', async () => { - const entriesLimited = await readableToArray(dao.streamEntries(2)) + const entriesLimited = await dao.streamEntries(2).toArray() // entriesLimited.sort() // expect(entriesLimited).toEqual(testEntries.slice(0, 2)) expect(entriesLimited.length).toBe(2) diff --git a/yarn.lock b/yarn.lock index 64d96bd..b8b3fc1 100644 --- a/yarn.lock +++ b/yarn.lock @@ -879,9 +879,9 @@ zod "^3.20.2" "@naturalcycles/nodejs-lib@^13.0.1", "@naturalcycles/nodejs-lib@^13.0.2", "@naturalcycles/nodejs-lib@^13.1.1": - version "13.10.0" - resolved "https://registry.yarnpkg.com/@naturalcycles/nodejs-lib/-/nodejs-lib-13.10.0.tgz#0914dea4fced163a9642f09def35f242b9daf565" - integrity sha512-KKxpAb6oK250Lk7t2nGTrc+T3YD6R0Ba0c19wbuJNXDZToZ0FRooQc1vgcrjksMpHDebWBPYz0g9IIIbA+gQzA== + version "13.11.0" + resolved "https://registry.yarnpkg.com/@naturalcycles/nodejs-lib/-/nodejs-lib-13.11.0.tgz#b6835d5eb053029cee4645fd8f4c1e0c84ad4590" + integrity sha512-cjkuW9exFUlBxPRhOR8SCweJmn1UP8HmgB98Fsph4vjKaYOuRZUivuPTFIrHGxpvLrOedMXhMh4Ft8SHAnA+aw== dependencies: "@naturalcycles/js-lib" "^14.0.0" "@types/js-yaml" "^4.0.9"