Skip to content

Commit

Permalink
feat: refactor and improve long transaction api
Browse files Browse the repository at this point in the history
To match modern Firestore, Mongo, etc API.
  • Loading branch information
kirillgroshkov committed Jan 17, 2024
1 parent cc87e57 commit 32ac3ab
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 155 deletions.
22 changes: 8 additions & 14 deletions src/adapter/file/file.db.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,23 @@
import {
generateJsonSchemaFromData,
pMap,
StringMap,
_by,
_deepEquals,
_since,
_sortBy,
_sortObjectDeep,
_stringMapValues,
_uniq,
JsonSchemaRootObject,
_filterUndefinedValues,
ObjectWithId,
_assert,
_deepCopy,
_stringMapEntries,
Saved,
} from '@naturalcycles/js-lib'
import { readableCreate, ReadableTyped, dimGrey } from '@naturalcycles/nodejs-lib'
import {
BaseCommonDB,
commonDBFullSupport,
CommonDBSupport,
DBOperation,
DBSaveBatchOperation,
DBTransaction,
queryInMemory,
} from '../..'
import { CommonDB } from '../../common.db'
Expand Down Expand Up @@ -55,7 +48,7 @@ export class FileDB extends BaseCommonDB implements CommonDB {
updateSaveMethod: false,
updateByQuery: false,
createTable: false,
transactions: false,
transactions: false, // todo
}

constructor(cfg: FileDBCfg) {
Expand Down Expand Up @@ -228,9 +221,9 @@ export class FileDB extends BaseCommonDB implements CommonDB {
this.logFinished(started, op)
}

override async createTransaction(): Promise<FileDBTransaction> {
return new FileDBTransaction(this)
}
// override async createTransaction(): Promise<FileDBTransaction> {
// return new FileDBTransaction(this)
// }

sortRows<ROW extends ObjectWithId>(rows: ROW[]): ROW[] {
rows = rows.map(r => _filterUndefinedValues(r))
Expand Down Expand Up @@ -260,14 +253,14 @@ export class FileDB extends BaseCommonDB implements CommonDB {
}
}

// todo: get back and fix it
// Implementation is optimized for loading/saving _whole files_.
/*
export class FileDBTransaction implements DBTransaction {
constructor(private db: FileDB) {}
ops: DBOperation[] = []
/**
* Implementation is optimized for loading/saving _whole files_.
*/
async commit(): Promise<void> {
// data[table][id] => row
const data: StringMap<StringMap<ObjectWithId>> = {}
Expand Down Expand Up @@ -335,3 +328,4 @@ export class FileDBTransaction implements DBTransaction {
this.ops = []
}
}
*/
85 changes: 44 additions & 41 deletions src/adapter/inmemory/inMemory.db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import {
CommonLogger,
_deepCopy,
_assert,
_omit,
} from '@naturalcycles/js-lib'
import {
bufferReviver,
Expand All @@ -37,6 +36,7 @@ import {
DBIncrement,
DBOperation,
DBPatch,
DBTransactionFn,
queryInMemory,
} from '../..'
import {
Expand Down Expand Up @@ -177,17 +177,6 @@ export class InMemoryDB implements CommonDB {
rows: ROW[],
opt: CommonDBSaveOptions<ROW> = {},
): Promise<void> {
const { tx } = opt
if (tx) {
;(tx as InMemoryDBTransaction).ops.push({
type: 'saveBatch',
table: _table,
rows,
opt: _omit(opt, ['tx']),
})
return
}

const table = this.cfg.tablesPrefix + _table
this.data[table] ||= {}

Expand Down Expand Up @@ -216,41 +205,18 @@ export class InMemoryDB implements CommonDB {

async deleteByQuery<ROW extends ObjectWithId>(
q: DBQuery<ROW>,
opt: CommonDBOptions = {},
_opt?: CommonDBOptions,
): Promise<number> {
const table = this.cfg.tablesPrefix + q.table
if (!this.data[table]) return 0
const ids = queryInMemory(q, Object.values(this.data[table]!) as ROW[]).map(r => r.id)

const { tx } = opt
if (tx) {
;(tx as InMemoryDBTransaction).ops.push({
type: 'deleteByIds',
table: q.table,
ids,
opt: _omit(opt, ['tx']),
})
return ids.length
}

return await this.deleteByIds(q.table, ids)
}

async deleteByIds(_table: string, ids: string[], opt: CommonDBOptions = {}): Promise<number> {
async deleteByIds(_table: string, ids: string[], _opt?: CommonDBOptions): Promise<number> {
const table = this.cfg.tablesPrefix + _table
if (!this.data[table]) return 0

const { tx } = opt
if (tx) {
;(tx as InMemoryDBTransaction).ops.push({
type: 'deleteByIds',
table: _table,
ids,
opt: _omit(opt, ['tx']),
})
return ids.length
}

let count = 0
ids.forEach(id => {
if (!this.data[table]![id]) return
Expand All @@ -268,8 +234,6 @@ export class InMemoryDB implements CommonDB {
const patchEntries = Object.entries(patch)
if (!patchEntries.length) return 0

// todo: can we support tx here? :thinking:

const table = this.cfg.tablesPrefix + q.table
const rows = queryInMemory(q, Object.values(this.data[table] || {}) as ROW[])
rows.forEach((row: any) => {
Expand Down Expand Up @@ -309,8 +273,15 @@ export class InMemoryDB implements CommonDB {
return Readable.from(queryInMemory(q, Object.values(this.data[table] || {}) as ROW[]))
}

async createTransaction(): Promise<DBTransaction> {
return new InMemoryDBTransaction(this)
async runInTransaction(fn: DBTransactionFn): Promise<void> {
const tx = new InMemoryDBTransaction(this)
try {
await fn(tx)
await tx.commit()
} catch (err) {
await tx.rollback()
throw err
}
}

/**
Expand Down Expand Up @@ -394,6 +365,37 @@ export class InMemoryDBTransaction implements DBTransaction {

ops: DBOperation[] = []

async getByIds<ROW extends ObjectWithId>(
table: string,
ids: string[],
opt?: CommonDBOptions,
): Promise<ROW[]> {
return await this.db.getByIds(table, ids, opt)
}

async saveBatch<ROW extends Partial<ObjectWithId>>(
table: string,
rows: ROW[],
opt?: CommonDBSaveOptions<ROW>,
): Promise<void> {
this.ops.push({
type: 'saveBatch',
table,
rows,
opt,
})
}

async deleteByIds(table: string, ids: string[], opt?: CommonDBOptions): Promise<number> {
this.ops.push({
type: 'deleteByIds',
table,
ids,
opt,
})
return ids.length
}

async commit(): Promise<void> {
const backup = _deepCopy(this.db.data)

Expand All @@ -411,6 +413,7 @@ export class InMemoryDBTransaction implements DBTransaction {
this.ops = []
} catch (err) {
// rollback
this.ops = []
this.db.data = backup
this.db.cfg.logger!.log('InMemoryDB transaction rolled back')

Expand Down
8 changes: 5 additions & 3 deletions src/base.common.db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
CommonDBOptions,
CommonDBSaveOptions,
DBPatch,
DBTransaction,
DBTransactionFn,
RunQueryResult,
} from './db.model'
import { DBQuery } from './query/dbQuery'
Expand Down Expand Up @@ -83,7 +83,9 @@ export class BaseCommonDB implements CommonDB {
throw new Error('deleteByIds is not implemented')
}

async createTransaction(): Promise<DBTransaction> {
return new FakeDBTransaction(this)
async runInTransaction(fn: DBTransactionFn): Promise<void> {
const tx = new FakeDBTransaction(this)
await fn(tx)
// there's no try/catch and rollback, as there's nothing to rollback
}
}
10 changes: 7 additions & 3 deletions src/common.db.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { JsonSchemaObject, JsonSchemaRootObject, ObjectWithId } from '@naturalcycles/js-lib'
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import type { ReadableTyped } from '@naturalcycles/nodejs-lib'
import {
CommonDBCreateOptions,
CommonDBOptions,
CommonDBSaveOptions,
CommonDBStreamOptions,
DBPatch,
DBTransaction,
DBTransactionFn,
RunQueryResult,
} from './db.model'
import { DBQuery } from './query/dbQuery'
Expand Down Expand Up @@ -159,8 +159,12 @@ export interface CommonDB {
/**
* Should be implemented as a Transaction (best effort), which means that
* either ALL or NONE of the operations should be applied.
*
* Transaction is automatically committed if fn resolves normally.
* Transaction is rolled back if fn throws, the error is re-thrown in that case.
* Graceful rollback is allowed on tx.rollback()
*/
createTransaction: () => Promise<DBTransaction>
runInTransaction: (fn: DBTransactionFn) => Promise<void>
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/commondao/common.dao.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ async function getEven(): Promise<TestItemBM[]> {
test('runInTransaction', async () => {
const items = createTestItemsBM(4)

await dao.useTransaction(async tx => {
await dao.runInTransaction(async tx => {
await tx.save(dao, items[0]!)
await tx.save(dao, items[1]!)
await tx.save(dao, items[3]!)
Expand Down
Loading

0 comments on commit 32ac3ab

Please sign in to comment.