Skip to content

Commit

Permalink
feat: CommonDao.deleteByQuery stream option
Browse files Browse the repository at this point in the history
  • Loading branch information
kirillgroshkov committed Oct 30, 2021
1 parent b155085 commit c1226a2
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 216 deletions.
5 changes: 0 additions & 5 deletions .github/workflows/default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,3 @@ jobs:
./cc-test-reporter before-build
yarn test-ci
./cc-test-reporter after-build -t lcov
- uses: actions/upload-artifact@v2
with: { name: 'unit.xml', path: 'tmp/jest/unit.xml' }
- uses: actions/upload-artifact@v2
with: { name: 'coverage-unit', path: 'coverage/lcov-report' }
2 changes: 2 additions & 0 deletions src/commondao/common.dao.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ test('common', async () => {

expect(await dao.deleteById(undefined)).toBe(0)
expect(await dao.deleteById('123')).toBe(0)
expect(await dao.deleteByQuery(dao.query())).toBe(0)
expect(await dao.deleteByQuery(dao.query(), { stream: true })).toBe(0)

expect(dao.anyToDBM(undefined)).toBeUndefined()
expect(dao.anyToDBM({}, { skipValidation: true })).toMatchObject({})
Expand Down
46 changes: 43 additions & 3 deletions src/commondao/common.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
transformTap,
writableVoid,
_pipeline,
transformBuffer,
} from '@naturalcycles/nodejs-lib'
import { DBLibError } from '../cnst'
import { DBModelType, RunQueryResult } from '../db.model'
Expand Down Expand Up @@ -719,14 +720,53 @@ export class CommonDao<
return deletedIds
}

async deleteByQuery(q: DBQuery<DBM>, opt: CommonDaoOptions = {}): Promise<number> {
/**
* Pass `stream: true` option to use Streaming: it will Stream the query, batch by 500, and execute
* `deleteByIds` for each batch concurrently (infinite concurrency).
* This is expected to be more memory-efficient way of deleting big numbers of rows.
*/
async deleteByQuery(
q: DBQuery<DBM>,
opt: CommonDaoStreamForEachOptions<DBM> & { stream?: boolean } = {},
): Promise<number> {
this.requireWriteAccess()
q.table = opt.table || q.table
const op = `deleteByQuery(${q.pretty()})`
const started = this.logStarted(op, q.table)
const ids = await this.cfg.db.deleteByQuery(q, opt)
let deleted = 0

if (opt.stream) {
const batchSize = 500

await _pipeline([
this.cfg.db.streamQuery<DBM>(q.select(['id']), opt),
transformMapSimple<ObjectWithId, string>(objectWithId => objectWithId.id, {
errorMode: ErrorMode.SUPPRESS,
}),
transformBuffer<string>({ batchSize }),
transformMap<string[], void>(
async ids => {
deleted += await this.cfg.db.deleteByIds(q.table, ids, opt)
},
{
predicate: _passthroughPredicate,
},
),
// LogProgress should be AFTER the mapper, to be able to report correct stats
transformLogProgress({
metric: q.table,
logEvery: 2, // 500 * 2 === 1000
batchSize,
...opt,
}),
writableVoid(),
])
} else {
deleted = await this.cfg.db.deleteByQuery(q, opt)
}

this.logSaveResult(started, op, q.table)
return ids
return deleted
}

// CONVERSIONS
Expand Down
4 changes: 3 additions & 1 deletion src/query/dbQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,9 @@ export class RunnableDBQuery<
await this.dao.streamQueryIdsForEach(this, mapper, opt)
}

async deleteByQuery(opt?: CommonDaoOptions): Promise<number> {
async deleteByQuery(
opt?: CommonDaoStreamForEachOptions<DBM> & { stream?: boolean },
): Promise<number> {
return await this.dao.deleteByQuery(this, opt)
}
}
Loading

0 comments on commit c1226a2

Please sign in to comment.