Skip to content

Commit

Permalink
fix: CommonDao BM/DBM now extends BaseDBEntity
Browse files Browse the repository at this point in the history
BaseDBEntity now has non-optional id
(and optional created/updated).
This is another step in (hopefully) simplifying DB model constraints
  • Loading branch information
kirillgroshkov committed Jan 21, 2024
1 parent 59e40bd commit d8dc9c2
Show file tree
Hide file tree
Showing 24 changed files with 234 additions and 243 deletions.
4 changes: 2 additions & 2 deletions src/adapter/cachedb/cache.db.model.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CommonLogger, PartialObjectWithId } from '@naturalcycles/js-lib'
import { CommonLogger, ObjectWithId } from '@naturalcycles/js-lib'
import { CommonDB } from '../../common.db'
import {
CommonDBCreateOptions,
Expand Down Expand Up @@ -62,7 +62,7 @@ export interface CacheDBOptions extends CommonDBOptions {
onlyCache?: boolean
}

export interface CacheDBSaveOptions<ROW extends PartialObjectWithId>
export interface CacheDBSaveOptions<ROW extends ObjectWithId>
extends CacheDBOptions,
CommonDBSaveOptions<ROW> {}

Expand Down
29 changes: 14 additions & 15 deletions src/adapter/cachedb/cache.db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ import {
_isTruthy,
JsonSchemaObject,
JsonSchemaRootObject,
PartialObjectWithId,
Saved,
ObjectWithId,
StringMap,
} from '@naturalcycles/js-lib'
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
Expand Down Expand Up @@ -59,13 +58,13 @@ export class CacheDB extends BaseCommonDB implements CommonDB {
return await this.cfg.downstreamDB.getTables()
}

override async getTableSchema<ROW extends PartialObjectWithId>(
override async getTableSchema<ROW extends ObjectWithId>(
table: string,
): Promise<JsonSchemaRootObject<ROW>> {
return await this.cfg.downstreamDB.getTableSchema<ROW>(table)
}

override async createTable<ROW extends PartialObjectWithId>(
override async createTable<ROW extends ObjectWithId>(
table: string,
schema: JsonSchemaObject<ROW>,
opt: CacheDBCreateOptions = {},
Expand All @@ -79,12 +78,12 @@ export class CacheDB extends BaseCommonDB implements CommonDB {
}
}

override async getByIds<ROW extends PartialObjectWithId>(
override async getByIds<ROW extends ObjectWithId>(
table: string,
ids: string[],
opt: CacheDBSaveOptions<ROW> = {},
): Promise<Saved<ROW>[]> {
const resultMap: StringMap<Saved<ROW>> = {}
): Promise<ROW[]> {
const resultMap: StringMap<ROW> = {}
const missingIds: string[] = []

if (!opt.skipCache && !this.cfg.skipCache) {
Expand Down Expand Up @@ -125,7 +124,7 @@ export class CacheDB extends BaseCommonDB implements CommonDB {
return ids.map(id => resultMap[id]).filter(_isTruthy)
}

override async saveBatch<ROW extends PartialObjectWithId>(
override async saveBatch<ROW extends ObjectWithId>(
table: string,
rows: ROW[],
opt: CacheDBSaveOptions<ROW> = {},
Expand Down Expand Up @@ -154,10 +153,10 @@ export class CacheDB extends BaseCommonDB implements CommonDB {
}
}

override async runQuery<ROW extends PartialObjectWithId>(
override async runQuery<ROW extends ObjectWithId>(
q: DBQuery<ROW>,
opt: CacheDBSaveOptions<ROW> = {},
): Promise<RunQueryResult<Saved<ROW>>> {
): Promise<RunQueryResult<ROW>> {
if (!opt.onlyCache && !this.cfg.onlyCache) {
const { rows, ...queryResult } = await this.cfg.downstreamDB.runQuery(q, opt)

Expand All @@ -184,7 +183,7 @@ export class CacheDB extends BaseCommonDB implements CommonDB {
return { rows, ...queryResult }
}

override async runQueryCount<ROW extends PartialObjectWithId>(
override async runQueryCount<ROW extends ObjectWithId>(
q: DBQuery<ROW>,
opt: CacheDBOptions = {},
): Promise<number> {
Expand All @@ -201,10 +200,10 @@ export class CacheDB extends BaseCommonDB implements CommonDB {
return count
}

override streamQuery<ROW extends PartialObjectWithId>(
override streamQuery<ROW extends ObjectWithId>(
q: DBQuery<ROW>,
opt: CacheDBStreamOptions = {},
): ReadableTyped<Saved<ROW>> {
): ReadableTyped<ROW> {
if (!opt.onlyCache && !this.cfg.onlyCache) {
const stream = this.cfg.downstreamDB.streamQuery<ROW>(q, opt)

Expand Down Expand Up @@ -240,7 +239,7 @@ export class CacheDB extends BaseCommonDB implements CommonDB {
return stream
}

override async deleteByQuery<ROW extends PartialObjectWithId>(
override async deleteByQuery<ROW extends ObjectWithId>(
q: DBQuery<ROW>,
opt: CacheDBOptions = {},
): Promise<number> {
Expand Down Expand Up @@ -272,7 +271,7 @@ export class CacheDB extends BaseCommonDB implements CommonDB {
return deletedIds
}

override async updateByQuery<ROW extends PartialObjectWithId>(
override async updateByQuery<ROW extends ObjectWithId>(
q: DBQuery<ROW>,
patch: DBPatch<ROW>,
opt: CacheDBOptions = {},
Expand Down
6 changes: 3 additions & 3 deletions src/adapter/file/file.db.model.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { CommonLogger, PartialObjectWithId, Saved } from '@naturalcycles/js-lib'
import { CommonLogger, ObjectWithId } from '@naturalcycles/js-lib'
import { DBSaveBatchOperation } from '../../db.model'
import type { DBQueryOrder } from '../../query/dbQuery'

export interface FileDBPersistencePlugin {
ping: () => Promise<void>
getTables: () => Promise<string[]>
loadFile: <ROW extends PartialObjectWithId>(table: string) => Promise<Saved<ROW>[]>
loadFile: <ROW extends ObjectWithId>(table: string) => Promise<ROW[]>
saveFiles: (ops: DBSaveBatchOperation<any>[]) => Promise<void>
}

Expand All @@ -15,7 +15,7 @@ export interface FileDBCfg {
/**
* @default undefined, which means "insertion order"
*/
sortOnSave?: DBQueryOrder
sortOnSave?: DBQueryOrder<any>

/**
* @default true
Expand Down
33 changes: 15 additions & 18 deletions src/adapter/file/file.db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import {
JsonSchemaRootObject,
_filterUndefinedValues,
_assert,
Saved,
PartialObjectWithId,
ObjectWithId,
} from '@naturalcycles/js-lib'
import { readableCreate, ReadableTyped, dimGrey } from '@naturalcycles/nodejs-lib'
import {
Expand Down Expand Up @@ -74,24 +73,24 @@ export class FileDB extends BaseCommonDB implements CommonDB {
return tables
}

override async getByIds<ROW extends PartialObjectWithId>(
override async getByIds<ROW extends ObjectWithId>(
table: string,
ids: string[],
_opt?: CommonDBOptions,
): Promise<Saved<ROW>[]> {
): Promise<ROW[]> {
const byId = _by(await this.loadFile<ROW>(table), r => r.id)
return ids.map(id => byId[id]!).filter(Boolean)
}

override async saveBatch<ROW extends PartialObjectWithId>(
override async saveBatch<ROW extends ObjectWithId>(
table: string,
rows: ROW[],
_opt?: CommonDBSaveOptions<ROW>,
): Promise<void> {
if (!rows.length) return // save some api calls

// 1. Load the whole file
const byId = _by(await this.loadFile<Saved<ROW>>(table), r => r.id)
const byId = _by(await this.loadFile<ROW>(table), r => r.id)

// 2. Merge with new data (using ids)
let saved = 0
Expand All @@ -111,23 +110,23 @@ export class FileDB extends BaseCommonDB implements CommonDB {
}
}

override async runQuery<ROW extends PartialObjectWithId>(
override async runQuery<ROW extends ObjectWithId>(
q: DBQuery<ROW>,
_opt?: CommonDBOptions,
): Promise<RunQueryResult<Saved<ROW>>> {
): Promise<RunQueryResult<ROW>> {
return {
rows: queryInMemory(q, await this.loadFile<ROW>(q.table)),
}
}

override async runQueryCount<ROW extends PartialObjectWithId>(
override async runQueryCount<ROW extends ObjectWithId>(
q: DBQuery<ROW>,
_opt?: CommonDBOptions,
): Promise<number> {
return (await this.loadFile(q.table)).length
}

override streamQuery<ROW extends PartialObjectWithId>(
override streamQuery<ROW extends ObjectWithId>(
q: DBQuery<ROW>,
opt?: CommonDBStreamOptions,
): ReadableTyped<ROW> {
Expand All @@ -141,7 +140,7 @@ export class FileDB extends BaseCommonDB implements CommonDB {
return readable
}

override async deleteByQuery<ROW extends PartialObjectWithId>(
override async deleteByQuery<ROW extends ObjectWithId>(
q: DBQuery<ROW>,
_opt?: CommonDBOptions,
): Promise<number> {
Expand Down Expand Up @@ -181,7 +180,7 @@ export class FileDB extends BaseCommonDB implements CommonDB {
return deleted
}

override async getTableSchema<ROW extends PartialObjectWithId>(
override async getTableSchema<ROW extends ObjectWithId>(
table: string,
): Promise<JsonSchemaRootObject<ROW>> {
const rows = await this.loadFile(table)
Expand All @@ -192,15 +191,15 @@ export class FileDB extends BaseCommonDB implements CommonDB {
}

// wrapper, to handle logging
async loadFile<ROW extends PartialObjectWithId>(table: string): Promise<Saved<ROW>[]> {
async loadFile<ROW extends ObjectWithId>(table: string): Promise<ROW[]> {
const started = this.logStarted(`loadFile(${table})`)
const rows = await this.cfg.plugin.loadFile<ROW>(table)
this.logFinished(started, `loadFile(${table}) ${rows.length} row(s)`)
return rows
}

// wrapper, to handle logging, sorting rows before saving
async saveFile<ROW extends PartialObjectWithId>(table: string, _rows: ROW[]): Promise<void> {
async saveFile<ROW extends ObjectWithId>(table: string, _rows: ROW[]): Promise<void> {
// if (!_rows.length) return // NO, it should be able to save file with 0 rows!

// Sort the rows, if needed
Expand All @@ -212,9 +211,7 @@ export class FileDB extends BaseCommonDB implements CommonDB {
this.logFinished(started, op)
}

async saveFiles<ROW extends PartialObjectWithId>(
ops: DBSaveBatchOperation<ROW>[],
): Promise<void> {
async saveFiles<ROW extends ObjectWithId>(ops: DBSaveBatchOperation<ROW>[]): Promise<void> {
if (!ops.length) return
const op =
`saveFiles ${ops.length} op(s):\n` + ops.map(o => `${o.table} (${o.rows.length})`).join('\n')
Expand All @@ -227,7 +224,7 @@ export class FileDB extends BaseCommonDB implements CommonDB {
// return new FileDBTransaction(this)
// }

sortRows<ROW extends PartialObjectWithId>(rows: ROW[]): ROW[] {
sortRows<ROW extends ObjectWithId>(rows: ROW[]): ROW[] {
rows = rows.map(r => _filterUndefinedValues(r))

if (this.cfg.sortOnSave) {
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/file/inMemory.persistence.plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export class InMemoryPersistencePlugin implements FileDBPersistencePlugin {
return Object.values(this.data[table] || ({} as any))
}

async saveFiles(ops: DBSaveBatchOperation[]): Promise<void> {
async saveFiles(ops: DBSaveBatchOperation<any>[]): Promise<void> {
ops.forEach(op => {
this.data[op.table] = _by(op.rows, r => r.id)
})
Expand Down
10 changes: 5 additions & 5 deletions src/adapter/file/localFile.persistence.plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import fs from 'node:fs'
import fsp from 'node:fs/promises'
import { Readable } from 'node:stream'
import { createGzip, createUnzip } from 'node:zlib'
import { pMap, PartialObjectWithId, Saved } from '@naturalcycles/js-lib'
import { ObjectWithId, pMap } from '@naturalcycles/js-lib'
import {
transformJsonParse,
transformSplit,
Expand Down Expand Up @@ -48,7 +48,7 @@ export class LocalFilePersistencePlugin implements FileDBPersistencePlugin {
.map(f => f.split('.ndjson')[0]!)
}

async loadFile<ROW extends PartialObjectWithId>(table: string): Promise<Saved<ROW>[]> {
async loadFile<ROW extends ObjectWithId>(table: string): Promise<ROW[]> {
await fs2.ensureDirAsync(this.cfg.storagePath)
const ext = `ndjson${this.cfg.gzip ? '.gz' : ''}`
const filePath = `${this.cfg.storagePath}/${table}.${ext}`
Expand All @@ -57,7 +57,7 @@ export class LocalFilePersistencePlugin implements FileDBPersistencePlugin {

const transformUnzip = this.cfg.gzip ? [createUnzip()] : []

const rows: Saved<ROW>[] = []
const rows: ROW[] = []

await _pipeline([
fs.createReadStream(filePath),
Expand All @@ -70,11 +70,11 @@ export class LocalFilePersistencePlugin implements FileDBPersistencePlugin {
return rows
}

async saveFiles(ops: DBSaveBatchOperation[]): Promise<void> {
async saveFiles(ops: DBSaveBatchOperation<any>[]): Promise<void> {
await pMap(ops, async op => await this.saveFile(op.table, op.rows), { concurrency: 16 })
}

async saveFile<ROW extends PartialObjectWithId>(table: string, rows: ROW[]): Promise<void> {
async saveFile<ROW extends ObjectWithId>(table: string, rows: ROW[]): Promise<void> {
await fs2.ensureDirAsync(this.cfg.storagePath)
const ext = `ndjson${this.cfg.gzip ? '.gz' : ''}`
const filePath = `${this.cfg.storagePath}/${table}.${ext}`
Expand Down
6 changes: 3 additions & 3 deletions src/adapter/file/noop.persistence.plugin.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PartialObjectWithId, Saved } from '@naturalcycles/js-lib'
import { ObjectWithId } from '@naturalcycles/js-lib'
import { DBSaveBatchOperation } from '../../db.model'
import { FileDBPersistencePlugin } from './file.db.model'

Expand All @@ -9,9 +9,9 @@ export class NoopPersistencePlugin implements FileDBPersistencePlugin {
return []
}

async loadFile<ROW extends PartialObjectWithId>(_table: string): Promise<Saved<ROW>[]> {
async loadFile<ROW extends ObjectWithId>(_table: string): Promise<ROW[]> {
return []
}

async saveFiles(_ops: DBSaveBatchOperation[]): Promise<void> {}
async saveFiles(_ops: DBSaveBatchOperation<any>[]): Promise<void> {}
}
Loading

0 comments on commit d8dc9c2

Please sign in to comment.