diff --git a/scripts/tsconfig.json b/scripts/tsconfig.json index 4e2b4d0..4672fe3 100644 --- a/scripts/tsconfig.json +++ b/scripts/tsconfig.json @@ -3,5 +3,5 @@ // { "extends": "@naturalcycles/dev-lib/scripts/tsconfig.json", - "exclude": ["**/__exclude"] + "exclude": ["**/__exclude"], } diff --git a/src/adapter/cachedb/cache.db.model.ts b/src/adapter/cachedb/cache.db.model.ts index 57b8e8f..21b71c9 100644 --- a/src/adapter/cachedb/cache.db.model.ts +++ b/src/adapter/cachedb/cache.db.model.ts @@ -1,6 +1,11 @@ import { CommonLogger, ObjectWithId } from '@naturalcycles/js-lib' import { CommonDB } from '../../common.db' -import { CommonDBCreateOptions, CommonDBSaveOptions, CommonDBStreamOptions } from '../../db.model' +import { + CommonDBCreateOptions, + CommonDBOptions, + CommonDBSaveOptions, + CommonDBStreamOptions, +} from '../../db.model' export interface CacheDBCfg { name: string @@ -45,7 +50,7 @@ export interface CacheDBCfg { logger?: CommonLogger } -export interface CacheDBOptions { +export interface CacheDBOptions extends CommonDBOptions { /** * @default false */ diff --git a/src/adapter/cachedb/cache.db.ts b/src/adapter/cachedb/cache.db.ts index 5a31357..0064438 100644 --- a/src/adapter/cachedb/cache.db.ts +++ b/src/adapter/cachedb/cache.db.ts @@ -7,10 +7,9 @@ import { StringMap, } from '@naturalcycles/js-lib' import { BaseCommonDB } from '../../base.common.db' -import { CommonDB } from '../../common.db' -import { CommonDBOptions, DBPatch, RunQueryResult } from '../../db.model' +import { CommonDB, commonDBFullSupport, CommonDBSupport } from '../../common.db' +import { DBPatch, RunQueryResult } from '../../db.model' import { DBQuery } from '../../query/dbQuery' -import { DBTransaction } from '../../transaction/dbTransaction' import { CacheDBCfg, CacheDBCreateOptions, @@ -26,6 +25,11 @@ import { * Queries always hit downstream (unless `onlyCache` is passed) */ export class CacheDB extends BaseCommonDB implements CommonDB { + override support: CommonDBSupport = { + ...commonDBFullSupport, + transactions: false, + } + constructor(cfg: CacheDBCfg) { super() this.cfg = { @@ -284,9 +288,4 @@ export class CacheDB extends BaseCommonDB implements CommonDB { return updated || 0 } - - override async commitTransaction(tx: DBTransaction, opt?: CommonDBOptions): Promise { - await this.cfg.downstreamDB.commitTransaction(tx, opt) - await this.cfg.cacheDB.commitTransaction(tx, opt) - } } diff --git a/src/adapter/file/file.db.test.ts b/src/adapter/file/file.db.test.ts index 586cd02..56dd9d5 100644 --- a/src/adapter/file/file.db.test.ts +++ b/src/adapter/file/file.db.test.ts @@ -6,16 +6,6 @@ const db = new FileDB({ plugin: new InMemoryPersistencePlugin(), }) -describe('runCommonDBTest', () => - runCommonDBTest(db, { - bufferSupport: false, // todo: implement - insert: false, - update: false, - updateByQuery: false, - createTable: false, - })) +describe('runCommonDBTest', () => runCommonDBTest(db)) -describe('runCommonDaoTest', () => - runCommonDaoTest(db, { - createTable: false, - })) +describe('runCommonDaoTest', () => runCommonDaoTest(db)) diff --git a/src/adapter/file/file.db.ts b/src/adapter/file/file.db.ts index 8c9f753..eb86c6c 100644 --- a/src/adapter/file/file.db.ts +++ b/src/adapter/file/file.db.ts @@ -18,7 +18,15 @@ import { Saved, } from '@naturalcycles/js-lib' import { readableCreate, ReadableTyped, dimGrey } from '@naturalcycles/nodejs-lib' -import { BaseCommonDB, DBSaveBatchOperation, queryInMemory } from '../..' +import { + BaseCommonDB, + commonDBFullSupport, + CommonDBSupport, + DBOperation, + DBSaveBatchOperation, + DBTransaction, + queryInMemory, +} from '../..' import { CommonDB } from '../../common.db' import { CommonDBOptions, @@ -27,7 +35,6 @@ import { RunQueryResult, } from '../../db.model' import { DBQuery } from '../../query/dbQuery' -import { DBTransaction } from '../../transaction/dbTransaction' import { FileDBCfg } from './file.db.model' /** @@ -41,6 +48,16 @@ import { FileDBCfg } from './file.db.model' * Each save operation saves *whole* file to the persistence layer. */ export class FileDB extends BaseCommonDB implements CommonDB { + override support: CommonDBSupport = { + ...commonDBFullSupport, + bufferValues: false, // todo: implement + insertSaveMethod: false, + updateSaveMethod: false, + updateByQuery: false, + createTable: false, + transactions: false, + } + constructor(cfg: FileDBCfg) { super() this.cfg = { @@ -101,72 +118,6 @@ export class FileDB extends BaseCommonDB implements CommonDB { } } - /** - * Implementation is optimized for loading/saving _whole files_. - */ - override async commitTransaction(tx: DBTransaction, _opt?: CommonDBOptions): Promise { - // data[table][id] => row - const data: StringMap> = {} - - // 1. Load all tables data (concurrently) - const tables = _uniq(tx.ops.map(o => o.table)) - - await pMap( - tables, - async table => { - const rows = await this.loadFile(table) - data[table] = _by(rows, r => r.id) - }, - { concurrency: 16 }, - ) - - const backup = _deepCopy(data) - - // 2. Apply ops one by one (in order) - tx.ops.forEach(op => { - if (op.type === 'deleteByIds') { - op.ids.forEach(id => delete data[op.table]![id]) - } else if (op.type === 'saveBatch') { - op.rows.forEach(r => { - if (!r.id) { - throw new Error('FileDB: row has an empty id') - } - data[op.table]![r.id] = r - }) - } else { - throw new Error(`DBOperation not supported: ${(op as any).type}`) - } - }) - - // 3. Sort, turn it into ops - // Not filtering empty arrays, cause it's already filtered in this.saveFiles() - const ops: DBSaveBatchOperation[] = _stringMapEntries(data).map(([table, map]) => { - return { - type: 'saveBatch', - table, - rows: this.sortRows(_stringMapValues(map)), - } - }) - - // 4. Save all files - try { - await this.saveFiles(ops) - } catch (err) { - const ops: DBSaveBatchOperation[] = _stringMapEntries(backup).map(([table, map]) => { - return { - type: 'saveBatch', - table, - rows: this.sortRows(_stringMapValues(map)), - } - }) - - // Rollback, ignore rollback error (if any) - await this.saveFiles(ops).catch(_ => {}) - - throw err - } - } - override async runQuery( q: DBQuery, _opt?: CommonDBOptions, @@ -216,6 +167,27 @@ export class FileDB extends BaseCommonDB implements CommonDB { return deleted } + override async deleteByIds( + table: string, + ids: string[], + _opt?: CommonDBOptions, + ): Promise { + const byId = _by(await this.loadFile(table), r => r.id) + + let deleted = 0 + ids.forEach(id => { + if (!byId[id]) return + delete byId[id] + deleted++ + }) + + if (deleted > 0) { + await this.saveFile(table, _stringMapValues(byId)) + } + + return deleted + } + override async getTableSchema( table: string, ): Promise> { @@ -256,7 +228,11 @@ export class FileDB extends BaseCommonDB implements CommonDB { this.logFinished(started, op) } - private sortRows(rows: ROW[]): ROW[] { + override async createTransaction(): Promise { + return new FileDBTransaction(this) + } + + sortRows(rows: ROW[]): ROW[] { rows = rows.map(r => _filterUndefinedValues(r)) if (this.cfg.sortOnSave) { @@ -283,3 +259,79 @@ export class FileDB extends BaseCommonDB implements CommonDB { this.cfg.logger?.log(`<< ${op} ${dimGrey(`in ${_since(started)}`)}`) } } + +export class FileDBTransaction implements DBTransaction { + constructor(private db: FileDB) {} + + ops: DBOperation[] = [] + + /** + * Implementation is optimized for loading/saving _whole files_. + */ + async commit(): Promise { + // data[table][id] => row + const data: StringMap> = {} + + // 1. Load all tables data (concurrently) + const tables = _uniq(this.ops.map(o => o.table)) + + await pMap( + tables, + async table => { + const rows = await this.db.loadFile(table) + data[table] = _by(rows, r => r.id) + }, + { concurrency: 16 }, + ) + + const backup = _deepCopy(data) + + // 2. Apply ops one by one (in order) + this.ops.forEach(op => { + if (op.type === 'deleteByIds') { + op.ids.forEach(id => delete data[op.table]![id]) + } else if (op.type === 'saveBatch') { + op.rows.forEach(r => { + if (!r.id) { + throw new Error('FileDB: row has an empty id') + } + data[op.table]![r.id] = r + }) + } else { + throw new Error(`DBOperation not supported: ${(op as any).type}`) + } + }) + + // 3. Sort, turn it into ops + // Not filtering empty arrays, cause it's already filtered in this.saveFiles() + const ops: DBSaveBatchOperation[] = _stringMapEntries(data).map(([table, map]) => { + return { + type: 'saveBatch', + table, + rows: this.db.sortRows(_stringMapValues(map)), + } + }) + + // 4. Save all files + try { + await this.db.saveFiles(ops) + } catch (err) { + const ops: DBSaveBatchOperation[] = _stringMapEntries(backup).map(([table, map]) => { + return { + type: 'saveBatch', + table, + rows: this.db.sortRows(_stringMapValues(map)), + } + }) + + // Rollback, ignore rollback error (if any) + await this.db.saveFiles(ops).catch(_ => {}) + + throw err + } + } + + async rollback(): Promise { + this.ops = [] + } +} diff --git a/src/adapter/file/localFile.persistence.plugin.test.ts b/src/adapter/file/localFile.persistence.plugin.test.ts index 267eb22..6b88fb3 100644 --- a/src/adapter/file/localFile.persistence.plugin.test.ts +++ b/src/adapter/file/localFile.persistence.plugin.test.ts @@ -8,16 +8,6 @@ const db = new FileDB({ }), }) -describe('runCommonDBTest', () => - runCommonDBTest(db, { - bufferSupport: false, // todo: use bufferReviver - insert: false, - update: false, - updateByQuery: false, - createTable: false, - })) +describe('runCommonDBTest', () => runCommonDBTest(db)) -describe('runCommonDaoTest', () => - runCommonDaoTest(db, { - createTable: false, - })) +describe('runCommonDaoTest', () => runCommonDaoTest(db)) diff --git a/src/adapter/inmemory/inMemory.db.ts b/src/adapter/inmemory/inMemory.db.ts index a3db546..f180ab4 100644 --- a/src/adapter/inmemory/inMemory.db.ts +++ b/src/adapter/inmemory/inMemory.db.ts @@ -16,6 +16,7 @@ import { CommonLogger, _deepCopy, _assert, + _omit, } from '@naturalcycles/js-lib' import { bufferReviver, @@ -29,11 +30,20 @@ import { yellow, fs2, } from '@naturalcycles/nodejs-lib' -import { CommonDB, DBIncrement, DBPatch, DBTransaction, queryInMemory } from '../..' +import { + CommonDB, + commonDBFullSupport, + CommonDBType, + DBIncrement, + DBOperation, + DBPatch, + queryInMemory, +} from '../..' import { CommonDBCreateOptions, CommonDBOptions, CommonDBSaveOptions, + DBTransaction, RunQueryResult, } from '../../db.model' import { DBQuery } from '../../query/dbQuery' @@ -76,6 +86,12 @@ export interface InMemoryDBCfg { } export class InMemoryDB implements CommonDB { + dbType = CommonDBType.document + + support = { + ...commonDBFullSupport, + } + constructor(cfg?: Partial) { this.cfg = { // defaults @@ -161,6 +177,17 @@ export class InMemoryDB implements CommonDB { rows: ROW[], opt: CommonDBSaveOptions = {}, ): Promise { + const { tx } = opt + if (tx) { + ;(tx as InMemoryDBTransaction).ops.push({ + type: 'saveBatch', + table: _table, + rows, + opt: _omit(opt, ['tx']), + }) + return + } + const table = this.cfg.tablesPrefix + _table this.data[table] ||= {} @@ -189,16 +216,48 @@ export class InMemoryDB implements CommonDB { async deleteByQuery( q: DBQuery, - _opt?: CommonDBOptions, + opt: CommonDBOptions = {}, ): Promise { const table = this.cfg.tablesPrefix + q.table - this.data[table] ||= {} + if (!this.data[table]) return 0 + const ids = queryInMemory(q, Object.values(this.data[table]!) as ROW[]).map(r => r.id) + + const { tx } = opt + if (tx) { + ;(tx as InMemoryDBTransaction).ops.push({ + type: 'deleteByIds', + table: q.table, + ids, + opt: _omit(opt, ['tx']), + }) + return ids.length + } + + return await this.deleteByIds(q.table, ids) + } + + async deleteByIds(_table: string, ids: string[], opt: CommonDBOptions = {}): Promise { + const table = this.cfg.tablesPrefix + _table + if (!this.data[table]) return 0 + + const { tx } = opt + if (tx) { + ;(tx as InMemoryDBTransaction).ops.push({ + type: 'deleteByIds', + table: _table, + ids, + opt: _omit(opt, ['tx']), + }) + return ids.length + } + let count = 0 - queryInMemory(q, Object.values(this.data[table] || {}) as ROW[]).forEach(r => { - if (!this.data[table]![r.id]) return - delete this.data[table]![r.id] + ids.forEach(id => { + if (!this.data[table]![id]) return + delete this.data[table]![id] count++ }) + return count } @@ -209,6 +268,8 @@ export class InMemoryDB implements CommonDB { const patchEntries = Object.entries(patch) if (!patchEntries.length) return 0 + // todo: can we support tx here? :thinking: + const table = this.cfg.tablesPrefix + q.table const rows = queryInMemory(q, Object.values(this.data[table] || {}) as ROW[]) rows.forEach((row: any) => { @@ -248,29 +309,8 @@ export class InMemoryDB implements CommonDB { return Readable.from(queryInMemory(q, Object.values(this.data[table] || {}) as ROW[])) } - async commitTransaction(tx: DBTransaction, opt?: CommonDBOptions): Promise { - const backup = _deepCopy(this.data) - - try { - for await (const op of tx.ops) { - if (op.type === 'saveBatch') { - await this.saveBatch(op.table, op.rows, { ...op.opt, ...opt }) - } else if (op.type === 'deleteByIds') { - await this.deleteByQuery(DBQuery.create(op.table).filter('id', 'in', op.ids), { - ...op.opt, - ...opt, - }) - } else { - throw new Error(`DBOperation not supported: ${(op as any).type}`) - } - } - } catch (err) { - // rollback - this.data = backup - this.cfg.logger!.log('InMemoryDB transaction rolled back') - - throw err - } + async createTransaction(): Promise { + return new InMemoryDBTransaction(this) } /** @@ -348,3 +388,37 @@ export class InMemoryDB implements CommonDB { ) } } + +export class InMemoryDBTransaction implements DBTransaction { + constructor(private db: InMemoryDB) {} + + ops: DBOperation[] = [] + + async commit(): Promise { + const backup = _deepCopy(this.db.data) + + try { + for (const op of this.ops) { + if (op.type === 'saveBatch') { + await this.db.saveBatch(op.table, op.rows, op.opt) + } else if (op.type === 'deleteByIds') { + await this.db.deleteByIds(op.table, op.ids, op.opt) + } else { + throw new Error(`DBOperation not supported: ${(op as any).type}`) + } + } + + this.ops = [] + } catch (err) { + // rollback + this.db.data = backup + this.db.cfg.logger!.log('InMemoryDB transaction rolled back') + + throw err + } + } + + async rollback(): Promise { + this.ops = [] + } +} diff --git a/src/base.common.db.ts b/src/base.common.db.ts index 43a126f..1b0c0d0 100644 --- a/src/base.common.db.ts +++ b/src/base.common.db.ts @@ -1,9 +1,15 @@ import { JsonSchemaObject, JsonSchemaRootObject, ObjectWithId } from '@naturalcycles/js-lib' import { ReadableTyped } from '@naturalcycles/nodejs-lib' -import { CommonDB } from './common.db' -import { CommonDBOptions, CommonDBSaveOptions, DBPatch, RunQueryResult } from './db.model' +import { CommonDB, CommonDBSupport, CommonDBType } from './common.db' +import { + CommonDBOptions, + CommonDBSaveOptions, + DBPatch, + DBTransaction, + RunQueryResult, +} from './db.model' import { DBQuery } from './query/dbQuery' -import { DBTransaction } from './transaction/dbTransaction' +import { FakeDBTransaction } from './transaction/dbTransaction.util' /* eslint-disable unused-imports/no-unused-vars */ @@ -12,6 +18,10 @@ import { DBTransaction } from './transaction/dbTransaction' * To be extended by actual implementations. */ export class BaseCommonDB implements CommonDB { + dbType = CommonDBType.document + + support: CommonDBSupport = {} + async ping(): Promise { throw new Error('ping is not implemented') } @@ -33,7 +43,7 @@ export class BaseCommonDB implements CommonDB { // no-op } - async getByIds(table: string, ids: ROW['id'][]): Promise { + async getByIds(table: string, ids: string[]): Promise { throw new Error('getByIds is not implemented') } @@ -69,12 +79,11 @@ export class BaseCommonDB implements CommonDB { throw new Error('streamQuery is not implemented') } - /** - * Naive implementation. - * Doesn't support rollback on error, hence doesn't pass dbTest. - * To be extended. - */ - async commitTransaction(tx: DBTransaction, opt?: CommonDBOptions): Promise { - throw new Error('commitTransaction is not implemented') + async deleteByIds(table: string, ids: string[], opt?: CommonDBOptions): Promise { + throw new Error('deleteByIds is not implemented') + } + + async createTransaction(): Promise { + return new FakeDBTransaction(this) } } diff --git a/src/common.db.ts b/src/common.db.ts index f19b807..c6b0ef2 100644 --- a/src/common.db.ts +++ b/src/common.db.ts @@ -6,12 +6,44 @@ import { CommonDBSaveOptions, CommonDBStreamOptions, DBPatch, + DBTransaction, RunQueryResult, } from './db.model' import { DBQuery } from './query/dbQuery' -import { DBTransaction } from './transaction/dbTransaction' + +export enum CommonDBType { + 'document' = 'document', + 'relational' = 'relational', +} export interface CommonDB { + /** + * Relational databases are expected to return `null` for all missing properties. + */ + dbType: CommonDBType + + /** + * Manifest of supported features. + */ + support: CommonDBSupport + + // Support flags indicate which of the CommonDB features are supported by this implementation. + supportsQueries?: boolean + supportsDBQueryFilter?: boolean + supportsDBQueryFilterIn?: boolean + supportsDBQueryOrder?: boolean + supportsDBQuerySelectFields?: boolean + supportsInsertSaveMethod?: boolean + supportsUpdateSaveMethod?: boolean + supportsUpdateByQuery?: boolean + supportsDBIncrement?: boolean + supportsCreateTable?: boolean + supportsTableSchemas?: boolean + supportsStreaming?: boolean + supportsBufferValues?: boolean + supportsNullValues?: boolean + supportsTransactions?: boolean + /** * Checks that connection/credentials/etc is ok. * Also acts as a "warmup request" for a DB. @@ -84,6 +116,12 @@ export interface CommonDB { ) => Promise // DELETE + /** + * Returns number of deleted items. + * Not supported by all implementations (e.g Datastore will always return same number as number of ids). + */ + deleteByIds: (table: string, ids: string[], opt?: CommonDBOptions) => Promise + /** * Returns number of deleted items. * Not supported by all implementations (e.g Datastore will always return same number as number of ids). @@ -122,5 +160,44 @@ export interface CommonDB { * Should be implemented as a Transaction (best effort), which means that * either ALL or NONE of the operations should be applied. */ - commitTransaction: (tx: DBTransaction, opt?: CommonDBOptions) => Promise + createTransaction: () => Promise +} + +/** + * Manifest of supported features. + */ +export interface CommonDBSupport { + queries?: boolean + dbQueryFilter?: boolean + dbQueryFilterIn?: boolean + dbQueryOrder?: boolean + dbQuerySelectFields?: boolean + insertSaveMethod?: boolean + updateSaveMethod?: boolean + updateByQuery?: boolean + dbIncrement?: boolean + createTable?: boolean + tableSchemas?: boolean + streaming?: boolean + bufferValues?: boolean + nullValues?: boolean + transactions?: boolean +} + +export const commonDBFullSupport: CommonDBSupport = { + queries: true, + dbQueryFilter: true, + dbQueryFilterIn: true, + dbQueryOrder: true, + dbQuerySelectFields: true, + insertSaveMethod: true, + updateSaveMethod: true, + updateByQuery: true, + dbIncrement: true, + createTable: true, + tableSchemas: true, + streaming: true, + bufferValues: true, + nullValues: true, + transactions: true, } diff --git a/src/commondao/common.dao.model.ts b/src/commondao/common.dao.model.ts index f238669..6678352 100644 --- a/src/commondao/common.dao.model.ts +++ b/src/commondao/common.dao.model.ts @@ -274,17 +274,6 @@ export interface CommonDaoOptions extends CommonDBOptions { * Useful e.g in AirtableDB where you can have one Dao to control multiple tables. */ table?: string - - /** - * If passed - operation will not be performed immediately, but instead "added" to the transaction. - * In the end - transaction needs to be committed (by calling `commit`). - * This API is inspired by Datastore API. - * - * Only applicable to save* and delete* operations - * - * @experimental - */ - tx?: boolean } export interface CommonDaoSaveOptions, DBM extends ObjectWithId> diff --git a/src/commondao/common.dao.test.ts b/src/commondao/common.dao.test.ts index 31b12a9..c6ecd1b 100644 --- a/src/commondao/common.dao.test.ts +++ b/src/commondao/common.dao.test.ts @@ -467,12 +467,12 @@ async function getEven(): Promise { test('runInTransaction', async () => { const items = createTestItemsBM(4) - await dao.runInTransaction([ - dao.tx.save(items[0]!), - dao.tx.save(items[1]!), - dao.tx.save(items[3]!), - dao.tx.deleteById(items[1]!.id), - ]) + await dao.useTransaction(async tx => { + await tx.save(dao, items[0]!) + await tx.save(dao, items[1]!) + await tx.save(dao, items[3]!) + await tx.deleteById(dao, items[1]!.id) + }) const items2 = await dao.query().runQuery() expect(items2.map(i => i.id).sort()).toEqual(['id1', 'id4']) diff --git a/src/commondao/common.dao.ts b/src/commondao/common.dao.ts index 1d44e09..5fc77f3 100644 --- a/src/commondao/common.dao.ts +++ b/src/commondao/common.dao.ts @@ -18,7 +18,6 @@ import { JsonSchemaRootObject, ObjectWithId, pMap, - Promisable, Saved, SKIP, UnixTimestampMillisNumber, @@ -43,16 +42,8 @@ import { writableVoid, } from '@naturalcycles/nodejs-lib' import { DBLibError } from '../cnst' -import { - DBDeleteByIdsOperation, - DBModelType, - DBOperation, - DBPatch, - DBSaveBatchOperation, - RunQueryResult, -} from '../db.model' +import { DBModelType, DBPatch, DBTransaction, RunQueryResult } from '../db.model' import { DBQuery, RunnableDBQuery } from '../query/dbQuery' -import { DBTransaction } from '../transaction/dbTransaction' import { CommonDaoCfg, CommonDaoCreateOptions, @@ -698,68 +689,6 @@ export class CommonDao< return obj as any } - tx = { - save: async ( - bm: Unsaved, - opt: CommonDaoSaveBatchOptions = {}, - ): Promise => { - // .save actually returns DBM (not BM) when it detects `opt.tx === true` - const row: DBM | null = (await this.save(bm, { ...opt, tx: true })) as any - if (row === null) return - - return { - type: 'saveBatch', - table: this.cfg.table, - rows: [row], - opt: { - excludeFromIndexes: this.cfg.excludeFromIndexes, - ...opt, - }, - } - }, - saveBatch: async ( - bms: Unsaved[], - opt: CommonDaoSaveBatchOptions = {}, - ): Promise => { - const rows: DBM[] = (await this.saveBatch(bms, { ...opt, tx: true })) as any - if (!rows.length) return - - return { - type: 'saveBatch', - table: this.cfg.table, - rows, - opt: { - excludeFromIndexes: this.cfg.excludeFromIndexes, - ...opt, - }, - } - }, - deleteByIds: async ( - ids: string[], - opt: CommonDaoOptions = {}, - ): Promise => { - if (!ids.length) return - return { - type: 'deleteByIds', - table: this.cfg.table, - ids, - opt, - } - }, - deleteById: async ( - id: string | null | undefined, - opt: CommonDaoOptions = {}, - ): Promise => { - if (!id) return - return { - type: 'deleteByIds', - table: this.cfg.table, - ids: [id], - opt, - } - }, - } - // SAVE /** * Mutates with id, created, updated @@ -778,12 +707,7 @@ export class CommonDao< if (this.cfg.hooks!.beforeSave) { dbm = (await this.cfg.hooks!.beforeSave(dbm))! - if (dbm === null && !opt.tx) return bm as any - } - - if (opt.tx) { - // May return `null`, in which case it'll be skipped - return dbm as any + if (dbm === null) return bm as any } const table = opt.table || this.cfg.table @@ -960,10 +884,6 @@ export class CommonDao< ) } - if (opt.tx) { - return dbms as any - } - if (opt.ensureUniqueId) throw new AppError('ensureUniqueId is not supported in saveBatch') if (this.cfg.immutable && !opt.allowMutability && !opt.saveMethod) { opt = { ...opt, saveMethod: 'insert' } @@ -1071,7 +991,7 @@ export class CommonDao< if (beforeSave) { dbm = (await beforeSave(dbm))! - if (dbm === null && !opt.tx) return SKIP + if (dbm === null) return SKIP } return dbm @@ -1118,7 +1038,7 @@ export class CommonDao< const op = `deleteById(${id})` const table = opt.table || this.cfg.table const started = this.logStarted(op, table) - const count = await this.cfg.db.deleteByQuery(DBQuery.create(table).filterEq('id', id)) + const count = await this.cfg.db.deleteByIds(table, [id], opt) this.logSaveResult(started, op, table) return count } @@ -1130,7 +1050,7 @@ export class CommonDao< const op = `deleteByIds(${ids.join(', ')})` const table = opt.table || this.cfg.table const started = this.logStarted(op, table) - const count = await this.cfg.db.deleteByQuery(DBQuery.create(table).filterIn('id', ids)) + const count = await this.cfg.db.deleteByIds(table, ids, opt) this.logSaveResult(started, op, table) return count } @@ -1410,11 +1330,22 @@ export class CommonDao< await this.cfg.db.ping() } - async runInTransaction(ops: Promisable[]): Promise { - const resolvedOps = (await Promise.all(ops)).filter(_isTruthy) - if (!resolvedOps.length) return + async useTransaction(fn: (tx: CommonDaoTransaction) => Promise): Promise { + const tx = await this.cfg.db.createTransaction() + const daoTx = new CommonDaoTransaction(tx) - await this.cfg.db.commitTransaction(DBTransaction.create(resolvedOps)) + try { + await fn(daoTx) + await daoTx.commit() + } catch (err) { + await daoTx.rollback() + throw err + } + } + + async createTransaction(): Promise { + const tx = await this.cfg.db.createTransaction() + return new CommonDaoTransaction(tx) } protected logResult(started: number, op: string, res: any, table: string): void { @@ -1473,3 +1404,86 @@ export class CommonDao< return Date.now() } } + +export class CommonDaoTransaction { + constructor(private tx: DBTransaction) {} + + async commit(): Promise { + await this.tx.commit() + } + async rollback(): Promise { + try { + await this.tx.rollback() + } catch (err) { + console.log(err) + } + } + + async getById, DBM extends ObjectWithId>( + dao: CommonDao, + id: string, + opt?: CommonDaoOptions, + ): Promise | null> { + return (await this.getByIds(dao, [id], opt))[0] || null + } + + async getByIds, DBM extends ObjectWithId>( + dao: CommonDao, + ids: string[], + opt?: CommonDaoOptions, + ): Promise[]> { + try { + return await dao.getByIds(ids, { ...opt, tx: this.tx }) + } catch (err) { + await this.rollback() + throw err + } + } + + async runQuery, DBM extends ObjectWithId>( + dao: CommonDao, + q: DBQuery, + opt?: CommonDaoOptions, + ): Promise[]> { + try { + return await dao.runQuery(q, { ...opt, tx: this.tx }) + } catch (err) { + await this.rollback() + throw err + } + } + + async save, DBM extends ObjectWithId>( + dao: CommonDao, + bm: Unsaved, + opt?: CommonDaoSaveBatchOptions, + ): Promise> { + return (await this.saveBatch(dao, [bm], opt))[0]! + } + + async saveBatch, DBM extends ObjectWithId>( + dao: CommonDao, + bms: Unsaved[], + opt?: CommonDaoSaveBatchOptions, + ): Promise[]> { + try { + return await dao.saveBatch(bms, { ...opt, tx: this.tx }) + } catch (err) { + await this.rollback() + throw err + } + } + + async deleteById(dao: CommonDao, id: string, opt?: CommonDaoOptions): Promise { + return await this.deleteByIds(dao, [id], opt) + } + + async deleteByIds(dao: CommonDao, ids: string[], opt?: CommonDaoOptions): Promise { + try { + return await dao.deleteByIds(ids, { ...opt, tx: this.tx }) + } catch (err) { + await this.rollback() + throw err + } + } +} diff --git a/src/db.model.ts b/src/db.model.ts index c252cf5..9d740db 100644 --- a/src/db.model.ts +++ b/src/db.model.ts @@ -1,4 +1,4 @@ -import { ObjectWithId } from '@naturalcycles/js-lib' +import type { ObjectWithId } from '@naturalcycles/js-lib' /** * Similar to SQL INSERT, UPDATE. @@ -10,7 +10,20 @@ import { ObjectWithId } from '@naturalcycles/js-lib' */ export type CommonDBSaveMethod = 'upsert' | 'insert' | 'update' -export interface CommonDBOptions {} +export interface DBTransaction { + commit: () => Promise + rollback: () => Promise +} + +export interface CommonDBOptions { + /** + * If passed - the operation will be performed in the context of that DBTransaction. + * Note that not every type of operation supports Transaction + * (e.g in Datastore queries cannot be executed inside a Transaction). + * Also, not every CommonDB implementation supports Transactions. + */ + tx?: DBTransaction +} /** * All properties default to undefined. diff --git a/src/index.ts b/src/index.ts index fee9155..70be2e3 100644 --- a/src/index.ts +++ b/src/index.ts @@ -13,6 +13,5 @@ export * from './pipeline/dbPipelineBackup' export * from './pipeline/dbPipelineCopy' export * from './pipeline/dbPipelineRestore' export * from './query/dbQuery' -export * from './transaction/dbTransaction' export * from './transaction/dbTransaction.util' export * from './kv/commonKeyValueDaoMemoCache' diff --git a/src/testing/daoTest.ts b/src/testing/daoTest.ts index 64520bb..6a6b5f9 100644 --- a/src/testing/daoTest.ts +++ b/src/testing/daoTest.ts @@ -1,10 +1,10 @@ import { Readable } from 'node:stream' -import { pDelay, _deepCopy, _pick, _sortBy, _omit, localTimeNow } from '@naturalcycles/js-lib' +import { _deepCopy, _pick, _sortBy, _omit, localTimeNow } from '@naturalcycles/js-lib' import { _pipeline, readableToArray, transformNoOp } from '@naturalcycles/nodejs-lib' import { CommonDaoLogLevel, DBQuery } from '..' import { CommonDB } from '../common.db' import { CommonDao } from '../commondao/common.dao' -import { CommonDBImplementationFeatures, CommonDBImplementationQuirks, expectMatch } from './dbTest' +import { CommonDBImplementationQuirks, expectMatch } from './dbTest' import { createTestItemsBM, testItemBMSchema, @@ -16,11 +16,8 @@ import { } from './test.model' import { TestItemBM } from '.' -export function runCommonDaoTest( - db: CommonDB, - features: CommonDBImplementationFeatures = {}, - quirks: CommonDBImplementationQuirks = {}, -): void { +export function runCommonDaoTest(db: CommonDB, quirks: CommonDBImplementationQuirks = {}): void { + const { support } = db const dao = new CommonDao({ table: TEST_TABLE, db, @@ -31,26 +28,6 @@ export function runCommonDaoTest( logLevel: CommonDaoLogLevel.DATA_FULL, }) - const { - querying = true, - // tableSchemas = true, - createTable = true, - dbQueryFilter = true, - // dbQueryFilterIn = true, - dbQueryOrder = true, - dbQuerySelectFields = true, - streaming = true, - strongConsistency = true, - nullValues = true, - transactions = true, - } = features - - // const { - // allowExtraPropertiesInResponse, - // allowBooleansAsUndefined, - // } = quirks - const eventualConsistencyDelay = !strongConsistency && quirks.eventualConsistencyDelay - const items = createTestItemsBM(3) const itemsClone = _deepCopy(items) // deepFreeze(items) // mutation of id/created/updated is allowed now! (even expected) @@ -66,13 +43,13 @@ export function runCommonDaoTest( }) // CREATE TABLE, DROP - if (createTable) { + if (support.createTable) { test('createTable, dropIfExists=true', async () => { await dao.createTable(testItemDBMJsonSchema, { dropIfExists: true }) }) } - if (querying) { + if (support.queries) { // DELETE ALL initially test('deleteByIds test items', async () => { const rows = await dao.query().select(['id']).runQuery() @@ -87,7 +64,6 @@ export function runCommonDaoTest( // QUERY empty test('runQuery(all), runQueryCount should return empty', async () => { - if (eventualConsistencyDelay) await pDelay(eventualConsistencyDelay) expect(await dao.query().runQuery()).toEqual([]) expect(await dao.query().runQueryCount()).toBe(0) }) @@ -109,7 +85,7 @@ export function runCommonDaoTest( }) // SAVE - if (nullValues) { + if (support.nullValues) { test('should allow to save and load null values', async () => { const item3 = { ...createTestItemBM(3), @@ -168,15 +144,14 @@ export function runCommonDaoTest( }) // QUERY - if (querying) { + if (support.queries) { test('runQuery(all) should return all items', async () => { - if (eventualConsistencyDelay) await pDelay(eventualConsistencyDelay) let rows = await dao.query().runQuery() rows = _sortBy(rows, r => r.id) expectMatch(expectedItems, rows, quirks) }) - if (dbQueryFilter) { + if (support.dbQueryFilter) { test('query even=true', async () => { let rows = await dao.query().filter('even', '==', true).runQuery() rows = _sortBy(rows, r => r.id) @@ -188,14 +163,14 @@ export function runCommonDaoTest( }) } - if (dbQueryOrder) { + if (support.dbQueryOrder) { test('query order by k1 desc', async () => { const rows = await dao.query().order('k1', true).runQuery() expectMatch([...expectedItems].reverse(), rows, quirks) }) } - if (dbQuerySelectFields) { + if (support.dbQuerySelectFields) { test('projection query with only ids', async () => { let rows = await dao.query().select(['id']).runQuery() rows = _sortBy(rows, r => r.id) @@ -213,7 +188,7 @@ export function runCommonDaoTest( } // STREAM - if (streaming) { + if (support.streaming) { test('streamQueryForEach all', async () => { let rows: TestItemBM[] = [] await dao.query().streamQueryForEach(bm => void rows.push(bm)) @@ -269,11 +244,10 @@ export function runCommonDaoTest( } // DELETE BY - if (querying) { + if (support.queries) { test('deleteByQuery even=false', async () => { const deleted = await dao.query().filter('even', '==', false).deleteByQuery() expect(deleted).toBe(items.filter(item => !item.even).length) - if (eventualConsistencyDelay) await pDelay(eventualConsistencyDelay) expect(await dao.query().runQueryCount()).toBe(1) }) @@ -283,14 +257,18 @@ export function runCommonDaoTest( }) } - if (transactions) { + if (support.transactions) { test('transaction happy path', async () => { // cleanup await dao.query().deleteByQuery() // Test that id, created, updated are created const now = localTimeNow().unix() - await dao.runInTransaction([dao.tx.save(_omit(item1, ['id', 'created', 'updated']))]) + + await dao.useTransaction(async tx => { + const row = _omit(item1, ['id', 'created', 'updated']) + await tx.save(dao, row) + }) const loaded = await dao.query().runQuery() expect(loaded.length).toBe(1) @@ -298,17 +276,19 @@ export function runCommonDaoTest( expect(loaded[0]!.created).toBeGreaterThanOrEqual(now) expect(loaded[0]!.updated).toBe(loaded[0]!.created) - await dao.runInTransaction([dao.tx.deleteById(loaded[0]!.id)]) + await dao.useTransaction(async tx => { + await tx.deleteById(dao, loaded[0]!.id) + }) // saveBatch [item1, 2, 3] // save item3 with k1: k1_mod // delete item2 // remaining: item1, item3_with_k1_mod - await dao.runInTransaction([ - dao.tx.saveBatch(items), - dao.tx.save({ ...items[2]!, k1: 'k1_mod' }), - dao.tx.deleteById(items[1]!.id), - ]) + await dao.useTransaction(async tx => { + await tx.saveBatch(dao, items) + await tx.save(dao, { ...items[2]!, k1: 'k1_mod' }) + await tx.deleteById(dao, items[1]!.id) + }) const rows = await dao.query().runQuery() const expected = [items[0], { ...items[2]!, k1: 'k1_mod' }] @@ -317,10 +297,10 @@ export function runCommonDaoTest( test('transaction rollback', async () => { await expect( - dao.runInTransaction([ - dao.tx.deleteById(items[2]!.id), - dao.tx.save({ ...items[0]!, k1: 5 as any }), // it should fail here - ]), + dao.useTransaction(async tx => { + await tx.deleteById(dao, items[2]!.id) + await tx.save(dao, { ...items[0]!, k1: 5 as any }) // it should fail here + }), ).rejects.toThrow() const rows = await dao.query().runQuery() @@ -328,7 +308,7 @@ export function runCommonDaoTest( expectMatch(expected, rows, quirks) }) - if (querying) { + if (support.queries) { test('transaction cleanup', async () => { await dao.query().deleteByQuery() }) diff --git a/src/testing/dbTest.ts b/src/testing/dbTest.ts index d7fd920..603d3b6 100644 --- a/src/testing/dbTest.ts +++ b/src/testing/dbTest.ts @@ -1,68 +1,21 @@ -import { pDelay, pMap, _filterObject, _pick, _sortBy } from '@naturalcycles/js-lib' +import { _filterObject, _pick, _sortBy, pMap } from '@naturalcycles/js-lib' import { readableToArray } from '@naturalcycles/nodejs-lib' -import { CommonDB } from '../common.db' +import { CommonDB, CommonDBType } from '../common.db' import { DBIncrement, DBPatch } from '../db.model' import { DBQuery } from '../query/dbQuery' -import { DBTransaction } from '../transaction/dbTransaction' import { createTestItemDBM, createTestItemsDBM, - TestItemDBM, TEST_TABLE, + TestItemDBM, testItemDBMJsonSchema, } from './test.model' import { deepFreeze } from './test.util' -export interface CommonDBImplementationFeatures { - /** - * All querying functionality. - */ - querying?: boolean - - dbQueryFilter?: boolean - dbQueryFilterIn?: boolean - dbQueryOrder?: boolean - dbQuerySelectFields?: boolean - insert?: boolean - update?: boolean - - updateByQuery?: boolean - - dbIncrement?: boolean - - createTable?: boolean - tableSchemas?: boolean - - /** - * Queries should return fresh results immediately. - * Datastore is the one known to NOT have strong consistency for queries (not for getById though). - */ - strongConsistency?: boolean - - streaming?: boolean - - bufferSupport?: boolean - nullValues?: boolean - - /** - * Set false for SQL (relational) databases, - * they will return `null` for all missing properties. - */ - documentDB?: boolean - - transactions?: boolean -} - /** * All options default to `false`. */ export interface CommonDBImplementationQuirks { - /** - * Applicable to e.g Datastore. - * Time in milliseconds to wait for eventual consistency to propagate. - */ - eventualConsistencyDelay?: number - /** * Example: airtableId */ @@ -74,40 +27,8 @@ export interface CommonDBImplementationQuirks { allowBooleansAsUndefined?: boolean } -/** - * All unclaimed features will default to 'true' - */ -export function runCommonDBTest( - db: CommonDB, - features: CommonDBImplementationFeatures = {}, - quirks: CommonDBImplementationQuirks = {}, -): void { - const { - querying = true, - tableSchemas = true, - createTable = true, - dbQueryFilter = true, - // dbQueryFilterIn = true, - dbQueryOrder = true, - dbQuerySelectFields = true, - insert = true, - update = true, - updateByQuery = true, - dbIncrement = true, - streaming = true, - strongConsistency = true, - bufferSupport = true, - nullValues = true, - documentDB = true, - transactions = true, - } = features - - // const { - // allowExtraPropertiesInResponse, - // allowBooleansAsUndefined, - // } = quirks - const eventualConsistencyDelay = !strongConsistency && quirks.eventualConsistencyDelay - +export function runCommonDBTest(db: CommonDB, quirks: CommonDBImplementationQuirks = {}): void { + const { support } = db const items = createTestItemsDBM(3) deepFreeze(items) const item1 = items[0]! @@ -119,13 +40,13 @@ export function runCommonDBTest( }) // CREATE TABLE, DROP - if (createTable) { + if (support.createTable) { test('createTable, dropIfExists=true', async () => { await db.createTable(TEST_TABLE, testItemDBMJsonSchema, { dropIfExists: true }) }) } - if (querying) { + if (support.queries) { // DELETE ALL initially test('deleteByIds test items', async () => { const { rows } = await db.runQuery(queryAll().select(['id'])) @@ -139,7 +60,6 @@ export function runCommonDBTest( // QUERY empty test('runQuery(all), runQueryCount should return empty', async () => { - if (eventualConsistencyDelay) await pDelay(eventualConsistencyDelay) expect((await db.runQuery(queryAll())).rows).toEqual([]) expect(await db.runQueryCount(queryAll())).toBe(0) }) @@ -161,7 +81,7 @@ export function runCommonDBTest( }) // SAVE - if (nullValues) { + if (support.nullValues) { test('should allow to save and load null values', async () => { const item3 = { ...createTestItemDBM(3), @@ -175,7 +95,7 @@ export function runCommonDBTest( }) } - if (documentDB) { + if (db.dbType === CommonDBType.document) { test('undefined values should not be saved/loaded', async () => { const item3 = { ...createTestItemDBM(3), @@ -193,7 +113,7 @@ export function runCommonDBTest( }) } - if (update) { + if (support.updateSaveMethod) { test('saveBatch UPDATE method should throw', async () => { await expect(db.saveBatch(TEST_TABLE, items, { saveMethod: 'update' })).rejects.toThrow() }) @@ -207,13 +127,13 @@ export function runCommonDBTest( await expect(db.saveBatch(TEST_TABLE, [{ ...item1, id: null as any }])).rejects.toThrow() }) - if (insert) { + if (support.insertSaveMethod) { test('saveBatch INSERT method should throw', async () => { await expect(db.saveBatch(TEST_TABLE, items, { saveMethod: 'insert' })).rejects.toThrow() }) } - if (update) { + if (support.updateSaveMethod) { test('saveBatch UPDATE method should pass', async () => { await db.saveBatch(TEST_TABLE, items, { saveMethod: 'update' }) }) @@ -230,19 +150,18 @@ export function runCommonDBTest( }) // QUERY - if (querying) { + if (support.queries) { test('runQuery(all) should return all items', async () => { - if (eventualConsistencyDelay) await pDelay(eventualConsistencyDelay) let { rows } = await db.runQuery(queryAll()) rows = _sortBy(rows, r => r.id) // because query doesn't specify order here expectMatch(items, rows, quirks) }) - if (dbQueryFilter) { + if (support.dbQueryFilter) { test('query even=true', async () => { const q = new DBQuery(TEST_TABLE).filter('even', '==', true) let { rows } = await db.runQuery(q) - if (!dbQueryOrder) rows = _sortBy(rows, r => r.id) + if (!support.dbQueryOrder) rows = _sortBy(rows, r => r.id) expectMatch( items.filter(i => i.even), rows, @@ -251,7 +170,7 @@ export function runCommonDBTest( }) } - if (dbQueryOrder) { + if (support.dbQueryOrder) { test('query order by k1 desc', async () => { const q = new DBQuery(TEST_TABLE).order('k1', true) const { rows } = await db.runQuery(q) @@ -259,7 +178,7 @@ export function runCommonDBTest( }) } - if (dbQuerySelectFields) { + if (support.dbQuerySelectFields) { test('projection query with only ids', async () => { const q = new DBQuery(TEST_TABLE).select(['id']) let { rows } = await db.runQuery(q) @@ -299,7 +218,7 @@ export function runCommonDBTest( } // STREAM - if (streaming) { + if (support.streaming) { test('streamQuery all', async () => { let rows = await readableToArray(db.streamQuery(queryAll())) @@ -313,7 +232,7 @@ export function runCommonDBTest( const tables = await db.getTables() // console.log({ tables }) - if (tableSchemas) { + if (support.tableSchemas) { await pMap(tables, async table => { const schema = await db.getTableSchema(table) // console.log(schema) @@ -323,21 +242,19 @@ export function runCommonDBTest( }) // DELETE BY - if (querying && dbQueryFilter) { + if (support.queries && support.dbQueryFilter) { test('deleteByQuery even=false', async () => { const q = new DBQuery(TEST_TABLE).filter('even', '==', false) const deleted = await db.deleteByQuery(q) expect(deleted).toBe(items.filter(item => !item.even).length) - if (eventualConsistencyDelay) await pDelay(eventualConsistencyDelay) - expect(await db.runQueryCount(queryAll())).toBe(1) }) } // BUFFER - if (bufferSupport) { - test('buffer support', async () => { + if (support.bufferValues) { + test('buffer values', async () => { const s = 'helloWorld 1' const b1 = Buffer.from(s) @@ -361,7 +278,7 @@ export function runCommonDBTest( }) } - if (transactions) { + if (support.transactions) { test('transaction happy path', async () => { // cleanup await db.deleteByQuery(queryAll()) @@ -370,12 +287,11 @@ export function runCommonDBTest( // save item3 with k1: k1_mod // delete item2 // remaining: item1, item3_with_k1_mod - const tx = DBTransaction.create() - .saveBatch(TEST_TABLE, items) - .save(TEST_TABLE, { ...items[2]!, k1: 'k1_mod' }) - .deleteById(TEST_TABLE, items[1]!.id) - - await db.commitTransaction(tx) + const tx = await db.createTransaction() + await db.saveBatch(TEST_TABLE, items, { tx }) + await db.saveBatch(TEST_TABLE, [{ ...items[2]!, k1: 'k1_mod' }], { tx }) + await db.deleteByIds(TEST_TABLE, [items[1]!.id], { tx }) + await tx.commit() const { rows } = await db.runQuery(queryAll()) const expected = [items[0], { ...items[2]!, k1: 'k1_mod' }] @@ -384,11 +300,18 @@ export function runCommonDBTest( test('transaction rollback', async () => { // It should fail on id == null - const tx = DBTransaction.create() - .deleteById(TEST_TABLE, items[2]!.id) - .save(TEST_TABLE, { ...items[0]!, k1: 5, id: null as any }) + let err: any + + try { + const tx = await db.createTransaction() + await db.deleteByIds(TEST_TABLE, [items[2]!.id], { tx }) + await db.saveBatch(TEST_TABLE, [{ ...items[0]!, k1: 5, id: null as any }], { tx }) + await tx.commit() + } catch (err_) { + err = err_ + } - await expect(db.commitTransaction(tx)).rejects.toThrow() + expect(err).toBeDefined() const { rows } = await db.runQuery(queryAll()) const expected = [items[0], { ...items[2]!, k1: 'k1_mod' }] @@ -396,7 +319,7 @@ export function runCommonDBTest( }) } - if (updateByQuery) { + if (support.updateByQuery) { test('updateByQuery simple', async () => { // cleanup, reset initial data await db.deleteByQuery(queryAll()) @@ -419,7 +342,7 @@ export function runCommonDBTest( expectMatch(expected, rows, quirks) }) - if (dbIncrement) { + if (support.dbIncrement) { test('updateByQuery DBIncrement', async () => { // cleanup, reset initial data await db.deleteByQuery(queryAll()) @@ -447,7 +370,7 @@ export function runCommonDBTest( } } - if (querying) { + if (support.queries) { test('cleanup', async () => { // CLEAN UP await db.deleteByQuery(queryAll()) diff --git a/src/testing/index.ts b/src/testing/index.ts index 69d8369..3e13c79 100644 --- a/src/testing/index.ts +++ b/src/testing/index.ts @@ -1,9 +1,5 @@ import { runCommonDaoTest } from './daoTest' -import { - CommonDBImplementationFeatures, - CommonDBImplementationQuirks, - runCommonDBTest, -} from './dbTest' +import { CommonDBImplementationQuirks, runCommonDBTest } from './dbTest' import { runCommonKeyValueDBTest } from './keyValueDBTest' import { createTestItemBM, @@ -21,13 +17,7 @@ import { TEST_TABLE, } from './test.model' -export type { - TestItemDBM, - TestItemBM, - TestItemTM, - CommonDBImplementationFeatures, - CommonDBImplementationQuirks, -} +export type { TestItemDBM, TestItemBM, TestItemTM, CommonDBImplementationQuirks } export { TEST_TABLE, diff --git a/src/timeseries/commonTimeSeriesDao.ts b/src/timeseries/commonTimeSeriesDao.ts index 04a46a4..649e8db 100644 --- a/src/timeseries/commonTimeSeriesDao.ts +++ b/src/timeseries/commonTimeSeriesDao.ts @@ -1,5 +1,4 @@ import { _isTruthy, ObjectWithId } from '@naturalcycles/js-lib' -import { DBTransaction } from '..' import { DBQuery } from '../query/dbQuery' import { CommonTimeSeriesDaoCfg, @@ -53,19 +52,19 @@ export class CommonTimeSeriesDao { async commitTransaction(ops: TimeSeriesSaveBatchOp[]): Promise { if (!ops.length) return - const tx = DBTransaction.create() + const tx = await this.cfg.db.createTransaction() - ops.forEach(op => { + for (const op of ops) { const rows: ObjectWithId[] = op.dataPoints.map(([ts, v]) => ({ id: String(ts), // Convert Number id into String id, as per CommonDB ts, // to allow querying by ts, since querying by id is not always available (Datastore is one example) v, })) - tx.saveBatch(`${op.series}${_TIMESERIES_RAW}`, rows) - }) + await this.cfg.db.saveBatch(`${op.series}${_TIMESERIES_RAW}`, rows, { tx }) + } - await this.cfg.db.commitTransaction(tx) + await tx.commit() } async deleteById(series: string, tsMillis: number): Promise { diff --git a/src/transaction/dbTransaction.ts b/src/transaction/dbTransaction.ts deleted file mode 100644 index 42e2102..0000000 --- a/src/transaction/dbTransaction.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { ObjectWithId } from '@naturalcycles/js-lib' -import type { CommonDB } from '../common.db' -import type { CommonDBSaveOptions, DBOperation } from '../db.model' - -/** - * Convenience class that stores the list of DBOperations and provides a fluent API to add them. - */ -export class DBTransaction { - protected constructor(public ops: DBOperation[] = []) {} - - /** - * Convenience method. - */ - static create(ops: DBOperation[] = []): DBTransaction { - return new DBTransaction(ops) - } - - save>(table: string, row: ROW): this { - this.ops.push({ - type: 'saveBatch', - table, - rows: [row], - }) - return this - } - - saveBatch>(table: string, rows: ROW[]): this { - this.ops.push({ - type: 'saveBatch', - table, - rows, - }) - return this - } - - deleteById(table: string, id: string): this { - this.ops.push({ - type: 'deleteByIds', - table, - ids: [id], - }) - return this - } - - deleteByIds(table: string, ids: string[]): this { - this.ops.push({ - type: 'deleteByIds', - table, - ids, - }) - return this - } -} - -/** - * Extends DBTransaction by providing a convenient `commit` method that delegates - * to CommonDB.commitTransaction(). - */ -export class RunnableDBTransaction extends DBTransaction { - constructor(public db: CommonDB) { - super() - } - - async commit>(opt?: CommonDBSaveOptions): Promise { - await this.db.commitTransaction(this, opt) - } -} diff --git a/src/transaction/dbTransaction.util.ts b/src/transaction/dbTransaction.util.ts index ec24984..c9ce0d2 100644 --- a/src/transaction/dbTransaction.util.ts +++ b/src/transaction/dbTransaction.util.ts @@ -1,7 +1,7 @@ +import { ObjectWithId } from '@naturalcycles/js-lib' import type { CommonDB } from '../common.db' -import { CommonDBSaveOptions, DBOperation } from '../db.model' +import { CommonDBOptions, CommonDBSaveOptions, DBTransaction, RunQueryResult } from '../db.model' import { DBQuery } from '../query/dbQuery' -import { DBTransaction } from './dbTransaction' /** * Optimizes the Transaction (list of DBOperations) to do less operations. @@ -11,9 +11,9 @@ import { DBTransaction } from './dbTransaction' * Currently only takes into account SaveBatch and DeleteByIds ops. * Output ops are maximum 1 per entity - save or delete. */ -export function mergeDBOperations(ops: DBOperation[]): DBOperation[] { - return ops // currently "does nothing" -} +// export function mergeDBOperations(ops: DBOperation[]): DBOperation[] { +// return ops // currently "does nothing" +// } // Commented out as "overly complicated" /* @@ -70,23 +70,62 @@ export function mergeDBOperations(ops: DBOperation[]): DBOperation[] { * Does NOT actually implement a Transaction, cause partial ops application will happen * in case of an error in the middle. */ -export async function commitDBTransactionSimple( - db: CommonDB, - tx: DBTransaction, - opt?: CommonDBSaveOptions, -): Promise { - // const ops = mergeDBOperations(tx.ops) +// export async function commitDBTransactionSimple( +// db: CommonDB, +// ops: DBOperation[], +// opt?: CommonDBSaveOptions, +// ): Promise { +// // const ops = mergeDBOperations(tx.ops) +// +// for await (const op of ops) { +// if (op.type === 'saveBatch') { +// await db.saveBatch(op.table, op.rows, { ...op.opt, ...opt }) +// } else if (op.type === 'deleteByIds') { +// await db.deleteByQuery(DBQuery.create(op.table).filter('id', 'in', op.ids), { +// ...op.opt, +// ...opt, +// }) +// } else { +// throw new Error(`DBOperation not supported: ${(op as any).type}`) +// } +// } +// } - for await (const op of tx.ops) { - if (op.type === 'saveBatch') { - await db.saveBatch(op.table, op.rows, { ...op.opt, ...opt }) - } else if (op.type === 'deleteByIds') { - await db.deleteByQuery(DBQuery.create(op.table).filter('id', 'in', op.ids), { - ...op.opt, - ...opt, - }) - } else { - throw new Error(`DBOperation not supported: ${(op as any).type}`) - } +/** + * Fake implementation of DBTransactionContext, + * which executes all operations instantly, without any Transaction involved. + */ +export class FakeDBTransaction implements DBTransaction { + constructor(protected db: CommonDB) {} + + async commit(): Promise {} + async rollback(): Promise {} + + async getByIds( + table: string, + ids: string[], + opt?: CommonDBOptions, + ): Promise { + return await this.db.getByIds(table, ids, opt) + } + async runQuery( + q: DBQuery, + opt?: CommonDBOptions, + ): Promise> { + return await this.db.runQuery(q, opt) + } + async saveBatch>( + table: string, + rows: ROW[], + opt?: CommonDBSaveOptions, + ): Promise { + return await this.db.saveBatch(table, rows, opt) + } + async deleteByIds( + table: string, + ids: string[], + opt?: CommonDBOptions | undefined, + ): Promise { + return await this.db.deleteByIds(table, ids, opt) } } diff --git a/src/validation/index.ts b/src/validation/index.ts index 03cf9f2..704779d 100644 --- a/src/validation/index.ts +++ b/src/validation/index.ts @@ -17,8 +17,8 @@ import { } from '../query/dbQuery' export const commonDBOptionsSchema = objectSchema({ - onlyCache: booleanSchema.optional(), - skipCache: booleanSchema.optional(), + ['onlyCache' as any]: booleanSchema.optional(), + ['skipCache' as any]: booleanSchema.optional(), }) export const commonDBSaveOptionsSchema = objectSchema({ diff --git a/tsconfig.json b/tsconfig.json index 839aab3..b4f8db7 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,8 +1,8 @@ { "extends": "@naturalcycles/dev-lib/cfg/tsconfig.json", "compilerOptions": { - "outDir": "dist" + "outDir": "dist", }, "include": ["src"], - "exclude": ["**/__exclude"] + "exclude": ["**/__exclude"], } diff --git a/yarn.lock b/yarn.lock index acc8bbd..98406fd 100644 --- a/yarn.lock +++ b/yarn.lock @@ -866,9 +866,9 @@ zod "^3.20.2" "@naturalcycles/nodejs-lib@^13.0.1", "@naturalcycles/nodejs-lib@^13.0.2", "@naturalcycles/nodejs-lib@^13.1.1": - version "13.6.0" - resolved "https://registry.yarnpkg.com/@naturalcycles/nodejs-lib/-/nodejs-lib-13.6.0.tgz#33acd08d9104499ad37ffdafbac208f774ee4493" - integrity sha512-9Ol32xy24dmP38qhWCfZM8QkgLCsRRki+GFZjJeJQh0q42WhACahAwac9TF8XWCAEXQkOm1ft8zw/PQknEVNfw== + version "13.7.0" + resolved "https://registry.yarnpkg.com/@naturalcycles/nodejs-lib/-/nodejs-lib-13.7.0.tgz#8d49d85d9c3165f20a9bbc08024bc213f5318cae" + integrity sha512-mPGMLS5pBP8U9ToVwM91+7ydir+fCUqYE3NVDotvfQW6jYf+LExKqnyMDGCEnwkGZhkafvGHbHBh5eqluyXsWA== dependencies: "@naturalcycles/js-lib" "^14.0.0" "@types/js-yaml" "^4.0.9" @@ -1072,9 +1072,9 @@ integrity sha512-hov8bUuiLiyFPGyFPE1lwWhmzYbirOXQNNo40+y3zow8aFVTeyn3VWL0VFFfdNddA8S4Vf0Tc062rzyNr7Paag== "@types/node@*", "@types/node@^20.1.0", "@types/node@^20.2.1": - version "20.11.4" - resolved "https://registry.yarnpkg.com/@types/node/-/node-20.11.4.tgz#c724a5d6723182af758b91b994209336f4439cb7" - integrity sha512-6I0fMH8Aoy2lOejL3s4LhyIYX34DPwY8bl5xlNjBvUEk8OHrcuzsFt+Ied4LvJihbtXPM+8zUqdydfIti86v9g== + version "20.11.5" + resolved "https://registry.yarnpkg.com/@types/node/-/node-20.11.5.tgz#be10c622ca7fcaa3cf226cf80166abc31389d86e" + integrity sha512-g557vgQjUUfN76MZAN/dt1z3dzcUsimuysco0KeluHgrPdJXkP/XdAURgyO2W9fZWHRtRBiVKzKn8vyOAwlG+w== dependencies: undici-types "~5.26.4" @@ -1739,9 +1739,9 @@ camelcase@^6.2.0: integrity sha512-Gmy6FhYlCY7uOElZUSbxo2UCDH8owEk996gkbrpsgGtrJLM3J7jGxl9Ic7Qwwj4ivOE5AWZWRMecDdF7hqGjFA== caniuse-lite@^1.0.30001565: - version "1.0.30001577" - resolved "https://registry.yarnpkg.com/caniuse-lite/-/caniuse-lite-1.0.30001577.tgz#a24991eb4ad67324ba8b96716340d53151f2f6f8" - integrity sha512-rs2ZygrG1PNXMfmncM0B5H1hndY5ZCC9b5TkFaVNfZ+AUlyqcMyVIQtc3fsezi0NUCk5XZfDf9WS6WxMxnfdrg== + version "1.0.30001578" + resolved "https://registry.yarnpkg.com/caniuse-lite/-/caniuse-lite-1.0.30001578.tgz#11741580434ce60aae4b4a9abee9f9f8d7bf5be5" + integrity sha512-J/jkFgsQ3NEl4w2lCoM9ZPxrD+FoBNJ7uJUpGVjIg/j0OwJosWM36EPDv+Yyi0V4twBk9pPmlFS+PLykgEvUmg== chalk@5.3.0: version "5.3.0" @@ -2308,9 +2308,9 @@ ee-first@1.1.1: integrity sha512-WMwm9LhRUo+WUaRN+vRuETqG89IgZphVSNkdFgeb6sS/E4OrDIN7t48CAewSHXc6C8lefD8KKfr5vY61brQlow== electron-to-chromium@^1.4.601: - version "1.4.632" - resolved "https://registry.yarnpkg.com/electron-to-chromium/-/electron-to-chromium-1.4.632.tgz#df6253483b802eb83eee2fdc0e5067bd46f36f11" - integrity sha512-JGmudTwg7yxMYvR/gWbalqqQiyu7WTFv2Xu3vw4cJHXPFxNgAk0oy8UHaer8nLF4lZJa+rNoj6GsrKIVJTV6Tw== + version "1.4.635" + resolved "https://registry.yarnpkg.com/electron-to-chromium/-/electron-to-chromium-1.4.635.tgz#e4e064b8711a98827652ce17cc11b0e0184c40d1" + integrity sha512-iu/2D0zolKU3iDGXXxdOzNf72Jnokn+K1IN6Kk4iV6l1Tr2g/qy+mvmtfAiBwZe5S3aB5r92vp+zSZ69scYRrg== emittery@^0.13.1: version "0.13.1" @@ -4718,9 +4718,9 @@ prelude-ls@^1.2.1: integrity sha512-vkcDPrRZo1QZLbn5RLGPpg/WmIQ65qoWWhcGKf/b5eplkkarX0m9z8ppCat4mlOqUsWpyNuYgO3VRyrYHSzX5g== prettier@^3.0.0: - version "3.2.2" - resolved "https://registry.yarnpkg.com/prettier/-/prettier-3.2.2.tgz#96e580f7ca9c96090ad054616c0c4597e2844b65" - integrity sha512-HTByuKZzw7utPiDO523Tt2pLtEyK7OibUD9suEJQrPUCYQqrHr74GGX6VidMrovbf/I50mPqr8j/II6oBAuc5A== + version "3.2.4" + resolved "https://registry.yarnpkg.com/prettier/-/prettier-3.2.4.tgz#4723cadeac2ce7c9227de758e5ff9b14e075f283" + integrity sha512-FWu1oLHKCrtpO1ypU6J0SbK2d9Ckwysq6bHj/uaCP26DxrPpppCLQRGVuqAxSTvhF00AcvDRyYrLNW7ocBhFFQ== pretty-bytes@^5.4.1: version "5.6.0"