Skip to content

Commit

Permalink
feat: long transaction support (#16)
Browse files Browse the repository at this point in the history
BREAKING CHANGE:
multiple CommonDB interface changes:
- DBTransaction
- deleteByIds
- CommonDBSupport
  • Loading branch information
kirillgroshkov authored Jan 17, 2024
1 parent ede8bca commit cc87e57
Show file tree
Hide file tree
Showing 23 changed files with 627 additions and 552 deletions.
2 changes: 1 addition & 1 deletion scripts/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
//
{
"extends": "@naturalcycles/dev-lib/scripts/tsconfig.json",
"exclude": ["**/__exclude"]
"exclude": ["**/__exclude"],
}
9 changes: 7 additions & 2 deletions src/adapter/cachedb/cache.db.model.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { CommonLogger, ObjectWithId } from '@naturalcycles/js-lib'
import { CommonDB } from '../../common.db'
import { CommonDBCreateOptions, CommonDBSaveOptions, CommonDBStreamOptions } from '../../db.model'
import {
CommonDBCreateOptions,
CommonDBOptions,
CommonDBSaveOptions,
CommonDBStreamOptions,
} from '../../db.model'

export interface CacheDBCfg {
name: string
Expand Down Expand Up @@ -45,7 +50,7 @@ export interface CacheDBCfg {
logger?: CommonLogger
}

export interface CacheDBOptions {
export interface CacheDBOptions extends CommonDBOptions {
/**
* @default false
*/
Expand Down
15 changes: 7 additions & 8 deletions src/adapter/cachedb/cache.db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import {
StringMap,
} from '@naturalcycles/js-lib'
import { BaseCommonDB } from '../../base.common.db'
import { CommonDB } from '../../common.db'
import { CommonDBOptions, DBPatch, RunQueryResult } from '../../db.model'
import { CommonDB, commonDBFullSupport, CommonDBSupport } from '../../common.db'
import { DBPatch, RunQueryResult } from '../../db.model'
import { DBQuery } from '../../query/dbQuery'
import { DBTransaction } from '../../transaction/dbTransaction'
import {
CacheDBCfg,
CacheDBCreateOptions,
Expand All @@ -26,6 +25,11 @@ import {
* Queries always hit downstream (unless `onlyCache` is passed)
*/
export class CacheDB extends BaseCommonDB implements CommonDB {
override support: CommonDBSupport = {
...commonDBFullSupport,
transactions: false,
}

constructor(cfg: CacheDBCfg) {
super()
this.cfg = {
Expand Down Expand Up @@ -284,9 +288,4 @@ export class CacheDB extends BaseCommonDB implements CommonDB {

return updated || 0
}

override async commitTransaction(tx: DBTransaction, opt?: CommonDBOptions): Promise<void> {
await this.cfg.downstreamDB.commitTransaction(tx, opt)
await this.cfg.cacheDB.commitTransaction(tx, opt)
}
}
14 changes: 2 additions & 12 deletions src/adapter/file/file.db.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,6 @@ const db = new FileDB({
plugin: new InMemoryPersistencePlugin(),
})

describe('runCommonDBTest', () =>
runCommonDBTest(db, {
bufferSupport: false, // todo: implement
insert: false,
update: false,
updateByQuery: false,
createTable: false,
}))
describe('runCommonDBTest', () => runCommonDBTest(db))

describe('runCommonDaoTest', () =>
runCommonDaoTest(db, {
createTable: false,
}))
describe('runCommonDaoTest', () => runCommonDaoTest(db))
190 changes: 121 additions & 69 deletions src/adapter/file/file.db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ import {
Saved,
} from '@naturalcycles/js-lib'
import { readableCreate, ReadableTyped, dimGrey } from '@naturalcycles/nodejs-lib'
import { BaseCommonDB, DBSaveBatchOperation, queryInMemory } from '../..'
import {
BaseCommonDB,
commonDBFullSupport,
CommonDBSupport,
DBOperation,
DBSaveBatchOperation,
DBTransaction,
queryInMemory,
} from '../..'
import { CommonDB } from '../../common.db'
import {
CommonDBOptions,
Expand All @@ -27,7 +35,6 @@ import {
RunQueryResult,
} from '../../db.model'
import { DBQuery } from '../../query/dbQuery'
import { DBTransaction } from '../../transaction/dbTransaction'
import { FileDBCfg } from './file.db.model'

/**
Expand All @@ -41,6 +48,16 @@ import { FileDBCfg } from './file.db.model'
* Each save operation saves *whole* file to the persistence layer.
*/
export class FileDB extends BaseCommonDB implements CommonDB {
override support: CommonDBSupport = {
...commonDBFullSupport,
bufferValues: false, // todo: implement
insertSaveMethod: false,
updateSaveMethod: false,
updateByQuery: false,
createTable: false,
transactions: false,
}

constructor(cfg: FileDBCfg) {
super()
this.cfg = {
Expand Down Expand Up @@ -101,72 +118,6 @@ export class FileDB extends BaseCommonDB implements CommonDB {
}
}

/**
* Implementation is optimized for loading/saving _whole files_.
*/
override async commitTransaction(tx: DBTransaction, _opt?: CommonDBOptions): Promise<void> {
// data[table][id] => row
const data: StringMap<StringMap<ObjectWithId>> = {}

// 1. Load all tables data (concurrently)
const tables = _uniq(tx.ops.map(o => o.table))

await pMap(
tables,
async table => {
const rows = await this.loadFile(table)
data[table] = _by(rows, r => r.id)
},
{ concurrency: 16 },
)

const backup = _deepCopy(data)

// 2. Apply ops one by one (in order)
tx.ops.forEach(op => {
if (op.type === 'deleteByIds') {
op.ids.forEach(id => delete data[op.table]![id])
} else if (op.type === 'saveBatch') {
op.rows.forEach(r => {
if (!r.id) {
throw new Error('FileDB: row has an empty id')
}
data[op.table]![r.id] = r
})
} else {
throw new Error(`DBOperation not supported: ${(op as any).type}`)
}
})

// 3. Sort, turn it into ops
// Not filtering empty arrays, cause it's already filtered in this.saveFiles()
const ops: DBSaveBatchOperation[] = _stringMapEntries(data).map(([table, map]) => {
return {
type: 'saveBatch',
table,
rows: this.sortRows(_stringMapValues(map)),
}
})

// 4. Save all files
try {
await this.saveFiles(ops)
} catch (err) {
const ops: DBSaveBatchOperation[] = _stringMapEntries(backup).map(([table, map]) => {
return {
type: 'saveBatch',
table,
rows: this.sortRows(_stringMapValues(map)),
}
})

// Rollback, ignore rollback error (if any)
await this.saveFiles(ops).catch(_ => {})

throw err
}
}

override async runQuery<ROW extends ObjectWithId>(
q: DBQuery<ROW>,
_opt?: CommonDBOptions,
Expand Down Expand Up @@ -216,6 +167,27 @@ export class FileDB extends BaseCommonDB implements CommonDB {
return deleted
}

override async deleteByIds(
table: string,
ids: string[],
_opt?: CommonDBOptions,
): Promise<number> {
const byId = _by(await this.loadFile(table), r => r.id)

let deleted = 0
ids.forEach(id => {
if (!byId[id]) return
delete byId[id]
deleted++
})

if (deleted > 0) {
await this.saveFile(table, _stringMapValues(byId))
}

return deleted
}

override async getTableSchema<ROW extends ObjectWithId>(
table: string,
): Promise<JsonSchemaRootObject<ROW>> {
Expand Down Expand Up @@ -256,7 +228,11 @@ export class FileDB extends BaseCommonDB implements CommonDB {
this.logFinished(started, op)
}

private sortRows<ROW extends ObjectWithId>(rows: ROW[]): ROW[] {
override async createTransaction(): Promise<FileDBTransaction> {
return new FileDBTransaction(this)
}

sortRows<ROW extends ObjectWithId>(rows: ROW[]): ROW[] {
rows = rows.map(r => _filterUndefinedValues(r))

if (this.cfg.sortOnSave) {
Expand All @@ -283,3 +259,79 @@ export class FileDB extends BaseCommonDB implements CommonDB {
this.cfg.logger?.log(`<< ${op} ${dimGrey(`in ${_since(started)}`)}`)
}
}

export class FileDBTransaction implements DBTransaction {
constructor(private db: FileDB) {}

ops: DBOperation[] = []

/**
* Implementation is optimized for loading/saving _whole files_.
*/
async commit(): Promise<void> {
// data[table][id] => row
const data: StringMap<StringMap<ObjectWithId>> = {}

// 1. Load all tables data (concurrently)
const tables = _uniq(this.ops.map(o => o.table))

await pMap(
tables,
async table => {
const rows = await this.db.loadFile(table)
data[table] = _by(rows, r => r.id)
},
{ concurrency: 16 },
)

const backup = _deepCopy(data)

// 2. Apply ops one by one (in order)
this.ops.forEach(op => {
if (op.type === 'deleteByIds') {
op.ids.forEach(id => delete data[op.table]![id])
} else if (op.type === 'saveBatch') {
op.rows.forEach(r => {
if (!r.id) {
throw new Error('FileDB: row has an empty id')
}
data[op.table]![r.id] = r
})
} else {
throw new Error(`DBOperation not supported: ${(op as any).type}`)
}
})

// 3. Sort, turn it into ops
// Not filtering empty arrays, cause it's already filtered in this.saveFiles()
const ops: DBSaveBatchOperation[] = _stringMapEntries(data).map(([table, map]) => {
return {
type: 'saveBatch',
table,
rows: this.db.sortRows(_stringMapValues(map)),
}
})

// 4. Save all files
try {
await this.db.saveFiles(ops)
} catch (err) {
const ops: DBSaveBatchOperation[] = _stringMapEntries(backup).map(([table, map]) => {
return {
type: 'saveBatch',
table,
rows: this.db.sortRows(_stringMapValues(map)),
}
})

// Rollback, ignore rollback error (if any)
await this.db.saveFiles(ops).catch(_ => {})

throw err
}
}

async rollback(): Promise<void> {
this.ops = []
}
}
14 changes: 2 additions & 12 deletions src/adapter/file/localFile.persistence.plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,6 @@ const db = new FileDB({
}),
})

describe('runCommonDBTest', () =>
runCommonDBTest(db, {
bufferSupport: false, // todo: use bufferReviver
insert: false,
update: false,
updateByQuery: false,
createTable: false,
}))
describe('runCommonDBTest', () => runCommonDBTest(db))

describe('runCommonDaoTest', () =>
runCommonDaoTest(db, {
createTable: false,
}))
describe('runCommonDaoTest', () => runCommonDaoTest(db))
Loading

0 comments on commit cc87e57

Please sign in to comment.