Skip to content

Commit

Permalink
feat: default stream concurrency to 32 (up from 16)
Browse files Browse the repository at this point in the history
  • Loading branch information
kirillgroshkov committed Apr 22, 2024
1 parent 1ca08c2 commit 82c8b90
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/adapter/file/localFile.persistence.plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export class LocalFilePersistencePlugin implements FileDBPersistencePlugin {
}

async saveFiles(ops: DBSaveBatchOperation<any>[]): Promise<void> {
await pMap(ops, async op => await this.saveFile(op.table, op.rows), { concurrency: 16 })
await pMap(ops, async op => await this.saveFile(op.table, op.rows), { concurrency: 32 })
}

async saveFile<ROW extends ObjectWithId>(table: string, rows: ROW[]): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion src/commondao/common.dao.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ export interface CommonDaoStreamOptions<IN>

/**
* When chunkSize is set - this option controls how many chunks to run concurrently.
* Defaults to 16, "the magic number of JavaScript concurrency".
* Defaults to 32.
*/
chunkConcurrency?: number
}
Expand Down
6 changes: 3 additions & 3 deletions src/commondao/common.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM> {
* "Streaming" is implemented by buffering incoming rows into **batches**
* (of size opt.chunkSize, which defaults to 500),
* and then executing db.saveBatch(chunk) with the concurrency
* of opt.chunkConcurrency (which defaults to 16).
* of opt.chunkConcurrency (which defaults to 32).
*/
streamSaveTransform(opt: CommonDaoStreamSaveOptions<DBM> = {}): Transform[] {
this.requireWriteAccess()
Expand All @@ -936,7 +936,7 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM> {
const excludeFromIndexes = opt.excludeFromIndexes || this.cfg.excludeFromIndexes
const { beforeSave } = this.cfg.hooks!

const { chunkSize = 500, chunkConcurrency = 16, errorMode } = opt
const { chunkSize = 500, chunkConcurrency = 32, errorMode } = opt

return [
transformMap<BM, DBM>(
Expand Down Expand Up @@ -1019,7 +1019,7 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM> {
let deleted = 0

if (opt.chunkSize) {
const { chunkSize, chunkConcurrency = 16 } = opt
const { chunkSize, chunkConcurrency = 32 } = opt

await _pipeline([
this.cfg.db.streamQuery<DBM>(q.select(['id']), opt),
Expand Down
4 changes: 2 additions & 2 deletions src/kv/commonKeyValueDao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ export class CommonKeyValueDao<T> {
}
},
{
concurrency: 16,
concurrency: 32,
},
)
}
Expand All @@ -248,7 +248,7 @@ export class CommonKeyValueDao<T> {
}
},
{
concurrency: 16,
concurrency: 32,
},
)
}
Expand Down
6 changes: 3 additions & 3 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -884,9 +884,9 @@
zod "^3.20.2"

"@naturalcycles/nodejs-lib@^13.0.1", "@naturalcycles/nodejs-lib@^13.0.2", "@naturalcycles/nodejs-lib@^13.1.1":
version "13.19.0"
resolved "https://registry.yarnpkg.com/@naturalcycles/nodejs-lib/-/nodejs-lib-13.19.0.tgz#c5705a2636718e85b6aef8cb40fe19500d18d5ea"
integrity sha512-kc7Sx95amibbj6jHTej7UC1z1BVNBrSraEurcCWbKBnJWBAsp2f52yc5stnehOwD9J8cXm7DJtWdVdwwArpkiw==
version "13.20.0"
resolved "https://registry.yarnpkg.com/@naturalcycles/nodejs-lib/-/nodejs-lib-13.20.0.tgz#8d97cef2f685b387559d5be78d91f942d411d92f"
integrity sha512-qUmYEK2m58N2MwMXri+zqL/bJvc58FqBx1aV31Xtu6BiGMyRUZrqls+SuYCUW+/oJj9ULWFcBL08AHSbLnWC9A==
dependencies:
"@naturalcycles/js-lib" "^14.0.0"
"@types/js-yaml" "^4.0.9"
Expand Down

0 comments on commit 82c8b90

Please sign in to comment.