diff --git a/src/adapter/file/file.db.ts b/src/adapter/file/file.db.ts index eb86c6c..7e525c5 100644 --- a/src/adapter/file/file.db.ts +++ b/src/adapter/file/file.db.ts @@ -1,20 +1,15 @@ import { generateJsonSchemaFromData, - pMap, - StringMap, _by, _deepEquals, _since, _sortBy, _sortObjectDeep, _stringMapValues, - _uniq, JsonSchemaRootObject, _filterUndefinedValues, ObjectWithId, _assert, - _deepCopy, - _stringMapEntries, Saved, } from '@naturalcycles/js-lib' import { readableCreate, ReadableTyped, dimGrey } from '@naturalcycles/nodejs-lib' @@ -22,9 +17,7 @@ import { BaseCommonDB, commonDBFullSupport, CommonDBSupport, - DBOperation, DBSaveBatchOperation, - DBTransaction, queryInMemory, } from '../..' import { CommonDB } from '../../common.db' @@ -55,7 +48,7 @@ export class FileDB extends BaseCommonDB implements CommonDB { updateSaveMethod: false, updateByQuery: false, createTable: false, - transactions: false, + transactions: false, // todo } constructor(cfg: FileDBCfg) { @@ -228,9 +221,9 @@ export class FileDB extends BaseCommonDB implements CommonDB { this.logFinished(started, op) } - override async createTransaction(): Promise { - return new FileDBTransaction(this) - } + // override async createTransaction(): Promise { + // return new FileDBTransaction(this) + // } sortRows(rows: ROW[]): ROW[] { rows = rows.map(r => _filterUndefinedValues(r)) @@ -260,14 +253,14 @@ export class FileDB extends BaseCommonDB implements CommonDB { } } +// todo: get back and fix it +// Implementation is optimized for loading/saving _whole files_. +/* export class FileDBTransaction implements DBTransaction { constructor(private db: FileDB) {} ops: DBOperation[] = [] - /** - * Implementation is optimized for loading/saving _whole files_. - */ async commit(): Promise { // data[table][id] => row const data: StringMap> = {} @@ -335,3 +328,4 @@ export class FileDBTransaction implements DBTransaction { this.ops = [] } } +*/ diff --git a/src/adapter/inmemory/inMemory.db.ts b/src/adapter/inmemory/inMemory.db.ts index f180ab4..93a829e 100644 --- a/src/adapter/inmemory/inMemory.db.ts +++ b/src/adapter/inmemory/inMemory.db.ts @@ -16,7 +16,6 @@ import { CommonLogger, _deepCopy, _assert, - _omit, } from '@naturalcycles/js-lib' import { bufferReviver, @@ -37,6 +36,7 @@ import { DBIncrement, DBOperation, DBPatch, + DBTransactionFn, queryInMemory, } from '../..' import { @@ -177,17 +177,6 @@ export class InMemoryDB implements CommonDB { rows: ROW[], opt: CommonDBSaveOptions = {}, ): Promise { - const { tx } = opt - if (tx) { - ;(tx as InMemoryDBTransaction).ops.push({ - type: 'saveBatch', - table: _table, - rows, - opt: _omit(opt, ['tx']), - }) - return - } - const table = this.cfg.tablesPrefix + _table this.data[table] ||= {} @@ -216,41 +205,18 @@ export class InMemoryDB implements CommonDB { async deleteByQuery( q: DBQuery, - opt: CommonDBOptions = {}, + _opt?: CommonDBOptions, ): Promise { const table = this.cfg.tablesPrefix + q.table if (!this.data[table]) return 0 const ids = queryInMemory(q, Object.values(this.data[table]!) as ROW[]).map(r => r.id) - - const { tx } = opt - if (tx) { - ;(tx as InMemoryDBTransaction).ops.push({ - type: 'deleteByIds', - table: q.table, - ids, - opt: _omit(opt, ['tx']), - }) - return ids.length - } - return await this.deleteByIds(q.table, ids) } - async deleteByIds(_table: string, ids: string[], opt: CommonDBOptions = {}): Promise { + async deleteByIds(_table: string, ids: string[], _opt?: CommonDBOptions): Promise { const table = this.cfg.tablesPrefix + _table if (!this.data[table]) return 0 - const { tx } = opt - if (tx) { - ;(tx as InMemoryDBTransaction).ops.push({ - type: 'deleteByIds', - table: _table, - ids, - opt: _omit(opt, ['tx']), - }) - return ids.length - } - let count = 0 ids.forEach(id => { if (!this.data[table]![id]) return @@ -268,8 +234,6 @@ export class InMemoryDB implements CommonDB { const patchEntries = Object.entries(patch) if (!patchEntries.length) return 0 - // todo: can we support tx here? :thinking: - const table = this.cfg.tablesPrefix + q.table const rows = queryInMemory(q, Object.values(this.data[table] || {}) as ROW[]) rows.forEach((row: any) => { @@ -309,8 +273,15 @@ export class InMemoryDB implements CommonDB { return Readable.from(queryInMemory(q, Object.values(this.data[table] || {}) as ROW[])) } - async createTransaction(): Promise { - return new InMemoryDBTransaction(this) + async runInTransaction(fn: DBTransactionFn): Promise { + const tx = new InMemoryDBTransaction(this) + try { + await fn(tx) + await tx.commit() + } catch (err) { + await tx.rollback() + throw err + } } /** @@ -394,6 +365,37 @@ export class InMemoryDBTransaction implements DBTransaction { ops: DBOperation[] = [] + async getByIds( + table: string, + ids: string[], + opt?: CommonDBOptions, + ): Promise { + return await this.db.getByIds(table, ids, opt) + } + + async saveBatch>( + table: string, + rows: ROW[], + opt?: CommonDBSaveOptions, + ): Promise { + this.ops.push({ + type: 'saveBatch', + table, + rows, + opt, + }) + } + + async deleteByIds(table: string, ids: string[], opt?: CommonDBOptions): Promise { + this.ops.push({ + type: 'deleteByIds', + table, + ids, + opt, + }) + return ids.length + } + async commit(): Promise { const backup = _deepCopy(this.db.data) @@ -411,6 +413,7 @@ export class InMemoryDBTransaction implements DBTransaction { this.ops = [] } catch (err) { // rollback + this.ops = [] this.db.data = backup this.db.cfg.logger!.log('InMemoryDB transaction rolled back') diff --git a/src/base.common.db.ts b/src/base.common.db.ts index 1b0c0d0..f839c78 100644 --- a/src/base.common.db.ts +++ b/src/base.common.db.ts @@ -5,7 +5,7 @@ import { CommonDBOptions, CommonDBSaveOptions, DBPatch, - DBTransaction, + DBTransactionFn, RunQueryResult, } from './db.model' import { DBQuery } from './query/dbQuery' @@ -83,7 +83,9 @@ export class BaseCommonDB implements CommonDB { throw new Error('deleteByIds is not implemented') } - async createTransaction(): Promise { - return new FakeDBTransaction(this) + async runInTransaction(fn: DBTransactionFn): Promise { + const tx = new FakeDBTransaction(this) + await fn(tx) + // there's no try/catch and rollback, as there's nothing to rollback } } diff --git a/src/common.db.ts b/src/common.db.ts index c6b0ef2..242bb37 100644 --- a/src/common.db.ts +++ b/src/common.db.ts @@ -1,12 +1,12 @@ import { JsonSchemaObject, JsonSchemaRootObject, ObjectWithId } from '@naturalcycles/js-lib' -import { ReadableTyped } from '@naturalcycles/nodejs-lib' +import type { ReadableTyped } from '@naturalcycles/nodejs-lib' import { CommonDBCreateOptions, CommonDBOptions, CommonDBSaveOptions, CommonDBStreamOptions, DBPatch, - DBTransaction, + DBTransactionFn, RunQueryResult, } from './db.model' import { DBQuery } from './query/dbQuery' @@ -159,8 +159,12 @@ export interface CommonDB { /** * Should be implemented as a Transaction (best effort), which means that * either ALL or NONE of the operations should be applied. + * + * Transaction is automatically committed if fn resolves normally. + * Transaction is rolled back if fn throws, the error is re-thrown in that case. + * Graceful rollback is allowed on tx.rollback() */ - createTransaction: () => Promise + runInTransaction: (fn: DBTransactionFn) => Promise } /** diff --git a/src/commondao/common.dao.test.ts b/src/commondao/common.dao.test.ts index c6ecd1b..83c97df 100644 --- a/src/commondao/common.dao.test.ts +++ b/src/commondao/common.dao.test.ts @@ -467,7 +467,7 @@ async function getEven(): Promise { test('runInTransaction', async () => { const items = createTestItemsBM(4) - await dao.useTransaction(async tx => { + await dao.runInTransaction(async tx => { await tx.save(dao, items[0]!) await tx.save(dao, items[1]!) await tx.save(dao, items[3]!) diff --git a/src/commondao/common.dao.ts b/src/commondao/common.dao.ts index 5fc77f3..399d240 100644 --- a/src/commondao/common.dao.ts +++ b/src/commondao/common.dao.ts @@ -13,6 +13,7 @@ import { AnyObject, AppError, AsyncMapper, + CommonLogger, ErrorMode, JsonSchemaObject, JsonSchemaRootObject, @@ -201,7 +202,7 @@ export class CommonDao< const op = `getByIds ${ids.length} id(s) (${_truncate(ids.slice(0, 10).join(', '), 50)})` const table = opt.table || this.cfg.table const started = this.logStarted(op, table) - let dbms = await this.cfg.db.getByIds(table, ids) + let dbms = await (opt.tx || this.cfg.db).getByIds(table, ids) if (!opt.raw && this.cfg.hooks!.afterLoad && dbms.length) { dbms = (await pMap(dbms, async dbm => await this.cfg.hooks!.afterLoad!(dbm))).filter( _isTruthy, @@ -900,7 +901,7 @@ export class CommonDao< const { excludeFromIndexes } = this.cfg const assignGeneratedIds = opt.assignGeneratedIds || this.cfg.assignGeneratedIds - await this.cfg.db.saveBatch(table, dbms, { + await (opt.tx || this.cfg.db).saveBatch(table, dbms, { excludeFromIndexes, assignGeneratedIds, ...opt, @@ -1050,7 +1051,7 @@ export class CommonDao< const op = `deleteByIds(${ids.join(', ')})` const table = opt.table || this.cfg.table const started = this.logStarted(op, table) - const count = await this.cfg.db.deleteByIds(table, ids, opt) + const count = await (opt.tx || this.cfg.db).deleteByIds(table, ids, opt) this.logSaveResult(started, op, table) return count } @@ -1330,22 +1331,17 @@ export class CommonDao< await this.cfg.db.ping() } - async useTransaction(fn: (tx: CommonDaoTransaction) => Promise): Promise { - const tx = await this.cfg.db.createTransaction() - const daoTx = new CommonDaoTransaction(tx) + async runInTransaction(fn: CommonDaoTransactionFn): Promise { + await this.cfg.db.runInTransaction(async tx => { + const daoTx = new CommonDaoTransaction(tx, this.cfg.logger!) - try { - await fn(daoTx) - await daoTx.commit() - } catch (err) { - await daoTx.rollback() - throw err - } - } - - async createTransaction(): Promise { - const tx = await this.cfg.db.createTransaction() - return new CommonDaoTransaction(tx) + try { + await fn(daoTx) + } catch (err) { + await daoTx.rollback() + throw err + } + }) } protected logResult(started: number, op: string, res: any, table: string): void { @@ -1405,17 +1401,32 @@ export class CommonDao< } } +/** + * Transaction is committed when the function returns resolved Promise (aka "returns normally"). + * + * Transaction is rolled back when the function returns rejected Promise (aka "throws"). + */ +export type CommonDaoTransactionFn = (tx: CommonDaoTransaction) => Promise + +/** + * Transaction context. + * Has similar API than CommonDao, but all operations are performed in the context of the transaction. + */ export class CommonDaoTransaction { - constructor(private tx: DBTransaction) {} + constructor( + private tx: DBTransaction, + private logger: CommonLogger, + ) {} - async commit(): Promise { - await this.tx.commit() - } + /** + * Perform a graceful rollback without throwing/re-throwing any error. + */ async rollback(): Promise { try { await this.tx.rollback() } catch (err) { - console.log(err) + // graceful rollback without re-throw + this.logger.error(err) } } @@ -1432,26 +1443,22 @@ export class CommonDaoTransaction { ids: string[], opt?: CommonDaoOptions, ): Promise[]> { - try { - return await dao.getByIds(ids, { ...opt, tx: this.tx }) - } catch (err) { - await this.rollback() - throw err - } - } - - async runQuery, DBM extends ObjectWithId>( - dao: CommonDao, - q: DBQuery, - opt?: CommonDaoOptions, - ): Promise[]> { - try { - return await dao.runQuery(q, { ...opt, tx: this.tx }) - } catch (err) { - await this.rollback() - throw err - } - } + return await dao.getByIds(ids, { ...opt, tx: this.tx }) + } + + // todo: Queries inside Transaction are not supported yet + // async runQuery, DBM extends ObjectWithId>( + // dao: CommonDao, + // q: DBQuery, + // opt?: CommonDaoOptions, + // ): Promise[]> { + // try { + // return await dao.runQuery(q, { ...opt, tx: this.tx }) + // } catch (err) { + // await this.rollback() + // throw err + // } + // } async save, DBM extends ObjectWithId>( dao: CommonDao, @@ -1466,12 +1473,7 @@ export class CommonDaoTransaction { bms: Unsaved[], opt?: CommonDaoSaveBatchOptions, ): Promise[]> { - try { - return await dao.saveBatch(bms, { ...opt, tx: this.tx }) - } catch (err) { - await this.rollback() - throw err - } + return await dao.saveBatch(bms, { ...opt, tx: this.tx }) } async deleteById(dao: CommonDao, id: string, opt?: CommonDaoOptions): Promise { @@ -1479,11 +1481,6 @@ export class CommonDaoTransaction { } async deleteByIds(dao: CommonDao, ids: string[], opt?: CommonDaoOptions): Promise { - try { - return await dao.deleteByIds(ids, { ...opt, tx: this.tx }) - } catch (err) { - await this.rollback() - throw err - } + return await dao.deleteByIds(ids, { ...opt, tx: this.tx }) } } diff --git a/src/db.model.ts b/src/db.model.ts index 9d740db..625ebe6 100644 --- a/src/db.model.ts +++ b/src/db.model.ts @@ -1,4 +1,5 @@ import type { ObjectWithId } from '@naturalcycles/js-lib' +import { CommonDB } from './common.db' /** * Similar to SQL INSERT, UPDATE. @@ -10,8 +11,26 @@ import type { ObjectWithId } from '@naturalcycles/js-lib' */ export type CommonDBSaveMethod = 'upsert' | 'insert' | 'update' +/** + * Transaction is committed when the function returns resolved Promise (aka "returns normally"). + * + * Transaction is rolled back when the function returns rejected Promise (aka "throws"). + */ +export type DBTransactionFn = (tx: DBTransaction) => Promise + +/** + * Transaction context. + * Has similar API than CommonDB, but all operations are performed in the context of the transaction. + */ export interface DBTransaction { - commit: () => Promise + getByIds: CommonDB['getByIds'] + saveBatch: CommonDB['saveBatch'] + deleteByIds: CommonDB['deleteByIds'] + + /** + * Perform a graceful rollback. + * It'll rollback the transaction and won't throw/re-throw any errors. + */ rollback: () => Promise } diff --git a/src/testing/daoTest.ts b/src/testing/daoTest.ts index 6a6b5f9..a2ec481 100644 --- a/src/testing/daoTest.ts +++ b/src/testing/daoTest.ts @@ -265,7 +265,7 @@ export function runCommonDaoTest(db: CommonDB, quirks: CommonDBImplementationQui // Test that id, created, updated are created const now = localTimeNow().unix() - await dao.useTransaction(async tx => { + await dao.runInTransaction(async tx => { const row = _omit(item1, ['id', 'created', 'updated']) await tx.save(dao, row) }) @@ -276,7 +276,7 @@ export function runCommonDaoTest(db: CommonDB, quirks: CommonDBImplementationQui expect(loaded[0]!.created).toBeGreaterThanOrEqual(now) expect(loaded[0]!.updated).toBe(loaded[0]!.created) - await dao.useTransaction(async tx => { + await dao.runInTransaction(async tx => { await tx.deleteById(dao, loaded[0]!.id) }) @@ -284,7 +284,7 @@ export function runCommonDaoTest(db: CommonDB, quirks: CommonDBImplementationQui // save item3 with k1: k1_mod // delete item2 // remaining: item1, item3_with_k1_mod - await dao.useTransaction(async tx => { + await dao.runInTransaction(async tx => { await tx.saveBatch(dao, items) await tx.save(dao, { ...items[2]!, k1: 'k1_mod' }) await tx.deleteById(dao, items[1]!.id) @@ -297,7 +297,7 @@ export function runCommonDaoTest(db: CommonDB, quirks: CommonDBImplementationQui test('transaction rollback', async () => { await expect( - dao.useTransaction(async tx => { + dao.runInTransaction(async tx => { await tx.deleteById(dao, items[2]!.id) await tx.save(dao, { ...items[0]!, k1: 5 as any }) // it should fail here }), diff --git a/src/testing/dbTest.ts b/src/testing/dbTest.ts index 603d3b6..5b215ea 100644 --- a/src/testing/dbTest.ts +++ b/src/testing/dbTest.ts @@ -287,11 +287,11 @@ export function runCommonDBTest(db: CommonDB, quirks: CommonDBImplementationQuir // save item3 with k1: k1_mod // delete item2 // remaining: item1, item3_with_k1_mod - const tx = await db.createTransaction() - await db.saveBatch(TEST_TABLE, items, { tx }) - await db.saveBatch(TEST_TABLE, [{ ...items[2]!, k1: 'k1_mod' }], { tx }) - await db.deleteByIds(TEST_TABLE, [items[1]!.id], { tx }) - await tx.commit() + await db.runInTransaction(async tx => { + await tx.saveBatch(TEST_TABLE, items) + await tx.saveBatch(TEST_TABLE, [{ ...items[2]!, k1: 'k1_mod' }]) + await tx.deleteByIds(TEST_TABLE, [items[1]!.id]) + }) const { rows } = await db.runQuery(queryAll()) const expected = [items[0], { ...items[2]!, k1: 'k1_mod' }] @@ -299,14 +299,14 @@ export function runCommonDBTest(db: CommonDB, quirks: CommonDBImplementationQuir }) test('transaction rollback', async () => { - // It should fail on id == null let err: any try { - const tx = await db.createTransaction() - await db.deleteByIds(TEST_TABLE, [items[2]!.id], { tx }) - await db.saveBatch(TEST_TABLE, [{ ...items[0]!, k1: 5, id: null as any }], { tx }) - await tx.commit() + await db.runInTransaction(async tx => { + await tx.deleteByIds(TEST_TABLE, [items[2]!.id]) + // It should fail on id == null + await tx.saveBatch(TEST_TABLE, [{ ...items[0]!, k1: 5, id: null as any }]) + }) } catch (err_) { err = err_ } diff --git a/src/timeseries/commonTimeSeriesDao.ts b/src/timeseries/commonTimeSeriesDao.ts index 649e8db..17ea6cd 100644 --- a/src/timeseries/commonTimeSeriesDao.ts +++ b/src/timeseries/commonTimeSeriesDao.ts @@ -52,19 +52,17 @@ export class CommonTimeSeriesDao { async commitTransaction(ops: TimeSeriesSaveBatchOp[]): Promise { if (!ops.length) return - const tx = await this.cfg.db.createTransaction() - - for (const op of ops) { - const rows: ObjectWithId[] = op.dataPoints.map(([ts, v]) => ({ - id: String(ts), // Convert Number id into String id, as per CommonDB - ts, // to allow querying by ts, since querying by id is not always available (Datastore is one example) - v, - })) - - await this.cfg.db.saveBatch(`${op.series}${_TIMESERIES_RAW}`, rows, { tx }) - } - - await tx.commit() + await this.cfg.db.runInTransaction(async tx => { + for (const op of ops) { + const rows: ObjectWithId[] = op.dataPoints.map(([ts, v]) => ({ + id: String(ts), // Convert Number id into String id, as per CommonDB + ts, // to allow querying by ts, since querying by id is not always available (Datastore is one example) + v, + })) + + await tx.saveBatch(`${op.series}${_TIMESERIES_RAW}`, rows) + } + }) } async deleteById(series: string, tsMillis: number): Promise { diff --git a/src/transaction/dbTransaction.util.ts b/src/transaction/dbTransaction.util.ts index c9ce0d2..9d38918 100644 --- a/src/transaction/dbTransaction.util.ts +++ b/src/transaction/dbTransaction.util.ts @@ -1,7 +1,6 @@ import { ObjectWithId } from '@naturalcycles/js-lib' import type { CommonDB } from '../common.db' -import { CommonDBOptions, CommonDBSaveOptions, DBTransaction, RunQueryResult } from '../db.model' -import { DBQuery } from '../query/dbQuery' +import { CommonDBOptions, CommonDBSaveOptions, DBTransaction } from '../db.model' /** * Optimizes the Transaction (list of DBOperations) to do less operations. @@ -98,7 +97,7 @@ export function mergeDBOperations(ops: DBOperation[]): DBOperation[] { export class FakeDBTransaction implements DBTransaction { constructor(protected db: CommonDB) {} - async commit(): Promise {} + // no-op async rollback(): Promise {} async getByIds( @@ -108,18 +107,18 @@ export class FakeDBTransaction implements DBTransaction { ): Promise { return await this.db.getByIds(table, ids, opt) } - async runQuery( - q: DBQuery, - opt?: CommonDBOptions, - ): Promise> { - return await this.db.runQuery(q, opt) - } + // async runQuery( + // q: DBQuery, + // opt?: CommonDBOptions, + // ): Promise> { + // return await this.db.runQuery(q, opt) + // } async saveBatch>( table: string, rows: ROW[], opt?: CommonDBSaveOptions, ): Promise { - return await this.db.saveBatch(table, rows, opt) + await this.db.saveBatch(table, rows, opt) } async deleteByIds( table: string,