Skip to content

Commit

Permalink
feat: CommonDB time machine (readAt aka PITR) support
Browse files Browse the repository at this point in the history
  • Loading branch information
kirillgroshkov committed Nov 3, 2024
1 parent 7b4c4f2 commit eb9074b
Show file tree
Hide file tree
Showing 9 changed files with 458 additions and 459 deletions.
4 changes: 3 additions & 1 deletion src/adapter/inmemory/inMemory.db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
import {
CommonDB,
commonDBFullSupport,
CommonDBSupport,
CommonDBTransactionOptions,
CommonDBType,
DBOperation,
Expand Down Expand Up @@ -94,8 +95,9 @@ export interface InMemoryDBCfg {
export class InMemoryDB implements CommonDB {
dbType = CommonDBType.document

support = {
support: CommonDBSupport = {
...commonDBFullSupport,
timeMachine: false,
}

constructor(cfg?: Partial<InMemoryDBCfg>) {
Expand Down
11 changes: 7 additions & 4 deletions src/common.db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { ReadableTyped } from '@naturalcycles/nodejs-lib'
import {
CommonDBCreateOptions,
CommonDBOptions,
CommonDBReadOptions,
CommonDBSaveOptions,
CommonDBStreamOptions,
CommonDBTransactionOptions,
Expand Down Expand Up @@ -71,7 +72,7 @@ export interface CommonDB {
getByIds: <ROW extends ObjectWithId>(
table: string,
ids: string[],
opt?: CommonDBOptions,
opt?: CommonDBReadOptions,
) => Promise<ROW[]>

// QUERY
Expand All @@ -80,12 +81,12 @@ export interface CommonDB {
*/
runQuery: <ROW extends ObjectWithId>(
q: DBQuery<ROW>,
opt?: CommonDBOptions,
opt?: CommonDBReadOptions,
) => Promise<RunQueryResult<ROW>>

runQueryCount: <ROW extends ObjectWithId>(
q: DBQuery<ROW>,
opt?: CommonDBOptions,
opt?: CommonDBReadOptions,
) => Promise<number>

streamQuery: <ROW extends ObjectWithId>(
Expand Down Expand Up @@ -133,7 +134,7 @@ export interface CommonDB {
patchByQuery: <ROW extends ObjectWithId>(
q: DBQuery<ROW>,
patch: Partial<ROW>,
opt?: CommonDBOptions,
opt?: CommonDBReadOptions,
) => Promise<number>

// TRANSACTION
Expand Down Expand Up @@ -192,6 +193,7 @@ export interface CommonDBSupport {
bufferValues?: boolean
nullValues?: boolean
transactions?: boolean
timeMachine?: boolean
}

export const commonDBFullSupport: CommonDBSupport = {
Expand All @@ -210,4 +212,5 @@ export const commonDBFullSupport: CommonDBSupport = {
bufferValues: true,
nullValues: true,
transactions: true,
timeMachine: true,
}
11 changes: 10 additions & 1 deletion src/commondao/common.dao.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
CommonLogger,
ErrorMode,
Promisable,
UnixTimestampNumber,
ZodError,
ZodSchema,
} from '@naturalcycles/js-lib'
Expand Down Expand Up @@ -251,6 +252,14 @@ export interface CommonDaoOptions extends CommonDBOptions {
table?: string
}

export interface CommonDaoReadOptions extends CommonDaoOptions {
/**
* If provided (and supported by the DB) - will read the data at that point in time (aka "Time machine" feature).
* This feature is named PITR (point-in-time-recovery) query in Datastore.
*/
readAt?: UnixTimestampNumber
}

export interface CommonDaoSaveOptions<BM extends BaseDBEntity, DBM extends BaseDBEntity>
extends CommonDaoSaveBatchOptions<DBM> {
/**
Expand Down Expand Up @@ -314,7 +323,7 @@ export interface CommonDaoStreamForEachOptions<IN>
TransformMapOptions<IN, any> {}

export interface CommonDaoStreamOptions<IN>
extends CommonDaoOptions,
extends CommonDaoReadOptions,
TransformLogProgressOptions<IN> {
/**
* @default true (for streams)
Expand Down
44 changes: 24 additions & 20 deletions src/commondao/common.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import {
CommonDaoOptions,
CommonDaoPatchByIdOptions,
CommonDaoPatchOptions,
CommonDaoReadOptions,
CommonDaoSaveBatchOptions,
CommonDaoSaveOptions,
CommonDaoStreamDeleteOptions,
Expand Down Expand Up @@ -116,7 +117,7 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM, I
// overrides are disabled now, as they obfuscate errors when ID branded type is used
// async getById(id: undefined | null, opt?: CommonDaoOptions): Promise<null>
// async getById(id?: ID | null, opt?: CommonDaoOptions): Promise<BM | null>
async getById(id?: ID | null, opt: CommonDaoOptions = {}): Promise<BM | null> {
async getById(id?: ID | null, opt: CommonDaoReadOptions = {}): Promise<BM | null> {
if (!id) return null
const op = `getById(${id})`
const table = opt.table || this.cfg.table
Expand All @@ -132,7 +133,7 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM, I
return bm || null
}

async getByIdOrEmpty(id: ID, part: Partial<BM> = {}, opt?: CommonDaoOptions): Promise<BM> {
async getByIdOrEmpty(id: ID, part: Partial<BM> = {}, opt?: CommonDaoReadOptions): Promise<BM> {
const bm = await this.getById(id, opt)
if (bm) return bm

Expand All @@ -141,7 +142,7 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM, I

// async getByIdAsDBM(id: undefined | null, opt?: CommonDaoOptions): Promise<null>
// async getByIdAsDBM(id?: ID | null, opt?: CommonDaoOptions): Promise<DBM | null>
async getByIdAsDBM(id?: ID | null, opt: CommonDaoOptions = {}): Promise<DBM | null> {
async getByIdAsDBM(id?: ID | null, opt: CommonDaoReadOptions = {}): Promise<DBM | null> {
if (!id) return null
const op = `getByIdAsDBM(${id})`
const table = opt.table || this.cfg.table
Expand All @@ -156,7 +157,7 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM, I
return dbm || null
}

async getByIds(ids: ID[], opt: CommonDaoOptions = {}): Promise<BM[]> {
async getByIds(ids: ID[], opt: CommonDaoReadOptions = {}): Promise<BM[]> {
if (!ids.length) return []
const op = `getByIds ${ids.length} id(s) (${_truncate(ids.slice(0, 10).join(', '), 50)})`
const table = opt.table || this.cfg.table
Expand All @@ -173,7 +174,7 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM, I
return bms
}

async getByIdsAsDBM(ids: ID[], opt: CommonDaoOptions = {}): Promise<DBM[]> {
async getByIdsAsDBM(ids: ID[], opt: CommonDaoReadOptions = {}): Promise<DBM[]> {
if (!ids.length) return []
const op = `getByIdsAsDBM ${ids.length} id(s) (${_truncate(ids.slice(0, 10).join(', '), 50)})`
const table = opt.table || this.cfg.table
Expand All @@ -189,15 +190,15 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM, I
return dbms
}

async requireById(id: ID, opt: CommonDaoOptions = {}): Promise<BM> {
async requireById(id: ID, opt: CommonDaoReadOptions = {}): Promise<BM> {
const r = await this.getById(id, opt)
if (!r) {
this.throwRequiredError(id, opt)
}
return r
}

async requireByIdAsDBM(id: ID, opt: CommonDaoOptions = {}): Promise<DBM> {
async requireByIdAsDBM(id: ID, opt: CommonDaoReadOptions = {}): Promise<DBM> {
const r = await this.getByIdAsDBM(id, opt)
if (!r) {
this.throwRequiredError(id, opt)
Expand Down Expand Up @@ -246,16 +247,16 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM, I
}
}

async getBy(by: keyof DBM, value: any, limit = 0, opt?: CommonDaoOptions): Promise<BM[]> {
async getBy(by: keyof DBM, value: any, limit = 0, opt?: CommonDaoReadOptions): Promise<BM[]> {
return await this.query().filterEq(by, value).limit(limit).runQuery(opt)
}

async getOneBy(by: keyof DBM, value: any, opt?: CommonDaoOptions): Promise<BM | null> {
async getOneBy(by: keyof DBM, value: any, opt?: CommonDaoReadOptions): Promise<BM | null> {
const [bm] = await this.query().filterEq(by, value).limit(1).runQuery(opt)
return bm || null
}

async getAll(opt?: CommonDaoOptions): Promise<BM[]> {
async getAll(opt?: CommonDaoReadOptions): Promise<BM[]> {
return await this.query().runQuery(opt)
}

Expand All @@ -267,12 +268,12 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM, I
return new RunnableDBQuery<BM, DBM, ID>(this, table)
}

async runQuery(q: DBQuery<DBM>, opt?: CommonDaoOptions): Promise<BM[]> {
async runQuery(q: DBQuery<DBM>, opt?: CommonDaoReadOptions): Promise<BM[]> {
const { rows } = await this.runQueryExtended(q, opt)
return rows
}

async runQuerySingleColumn<T = any>(q: DBQuery<DBM>, opt?: CommonDaoOptions): Promise<T[]> {
async runQuerySingleColumn<T = any>(q: DBQuery<DBM>, opt?: CommonDaoReadOptions): Promise<T[]> {
_assert(
q._selectedFieldNames?.length === 1,
`runQuerySingleColumn requires exactly 1 column to be selected: ${q.pretty()}`,
Expand All @@ -289,14 +290,17 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM, I
* Does deduplication by id.
* Order is not guaranteed, as queries run in parallel.
*/
async runUnionQueries(queries: DBQuery<DBM>[], opt?: CommonDaoOptions): Promise<BM[]> {
async runUnionQueries(queries: DBQuery<DBM>[], opt?: CommonDaoReadOptions): Promise<BM[]> {
const results = (
await pMap(queries, async q => (await this.runQueryExtended(q, opt)).rows)
).flat()
return _uniqBy(results, r => r.id)
}

async runQueryExtended(q: DBQuery<DBM>, opt: CommonDaoOptions = {}): Promise<RunQueryResult<BM>> {
async runQueryExtended(
q: DBQuery<DBM>,
opt: CommonDaoReadOptions = {},
): Promise<RunQueryResult<BM>> {
this.validateQueryIndexes(q) // throws if query uses `excludeFromIndexes` property
q.table = opt.table || q.table
const op = `runQuery(${q.pretty()})`
Expand All @@ -317,14 +321,14 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM, I
}
}

async runQueryAsDBM(q: DBQuery<DBM>, opt?: CommonDaoOptions): Promise<DBM[]> {
async runQueryAsDBM(q: DBQuery<DBM>, opt?: CommonDaoReadOptions): Promise<DBM[]> {
const { rows } = await this.runQueryExtendedAsDBM(q, opt)
return rows
}

async runQueryExtendedAsDBM(
q: DBQuery<DBM>,
opt: CommonDaoOptions = {},
opt: CommonDaoReadOptions = {},
): Promise<RunQueryResult<DBM>> {
this.validateQueryIndexes(q) // throws if query uses `excludeFromIndexes` property
q.table = opt.table || q.table
Expand All @@ -343,7 +347,7 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM, I
return { rows: dbms, ...queryResult }
}

async runQueryCount(q: DBQuery<DBM>, opt: CommonDaoOptions = {}): Promise<number> {
async runQueryCount(q: DBQuery<DBM>, opt: CommonDaoReadOptions = {}): Promise<number> {
this.validateQueryIndexes(q) // throws if query uses `excludeFromIndexes` property
q.table = opt.table || q.table
const op = `runQueryCount(${q.pretty()})`
Expand Down Expand Up @@ -548,7 +552,7 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM, I
)
}

async queryIds(q: DBQuery<DBM>, opt: CommonDaoOptions = {}): Promise<ID[]> {
async queryIds(q: DBQuery<DBM>, opt: CommonDaoReadOptions = {}): Promise<ID[]> {
this.validateQueryIndexes(q) // throws if query uses `excludeFromIndexes` property
q.table = opt.table || q.table
const { rows } = await this.cfg.db.runQuery(q.select(['id']), opt)
Expand Down Expand Up @@ -1411,15 +1415,15 @@ export class CommonDaoTransaction {
async getById<BM extends BaseDBEntity, DBM extends BaseDBEntity, ID = BM['id']>(
dao: CommonDao<BM, DBM, ID>,
id?: ID | null,
opt?: CommonDaoOptions,
opt?: CommonDaoReadOptions,
): Promise<BM | null> {
return await dao.getById(id, { ...opt, tx: this.tx })
}

async getByIds<BM extends BaseDBEntity, DBM extends BaseDBEntity, ID = BM['id']>(
dao: CommonDao<BM, DBM, ID>,
ids: ID[],
opt?: CommonDaoOptions,
opt?: CommonDaoReadOptions,
): Promise<BM[]> {
return await dao.getByIds(ids, { ...opt, tx: this.tx })
}
Expand Down
12 changes: 10 additions & 2 deletions src/db.model.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ObjectWithId } from '@naturalcycles/js-lib'
import { ObjectWithId, UnixTimestampNumber } from '@naturalcycles/js-lib'
import { CommonDB } from './common.db'

/**
Expand Down Expand Up @@ -52,6 +52,14 @@ export interface CommonDBOptions {
tx?: DBTransaction
}

export interface CommonDBReadOptions extends CommonDBOptions {
/**
* If provided (and supported by the DB) - will read the data at that point in time (aka "Time machine" feature).
* This feature is named PITR (point-in-time-recovery) query in Datastore.
*/
readAt?: UnixTimestampNumber
}

/**
* All properties default to undefined.
*/
Expand All @@ -72,7 +80,7 @@ export interface CommonDBSaveOptions<ROW extends ObjectWithId> extends CommonDBO
assignGeneratedIds?: boolean
}

export type CommonDBStreamOptions = CommonDBOptions
export type CommonDBStreamOptions = CommonDBReadOptions

export interface CommonDBCreateOptions extends CommonDBOptions {
/**
Expand Down
15 changes: 8 additions & 7 deletions src/query/dbQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import {
CommonDaoOptions,
CommonDaoReadOptions,
CommonDaoStreamDeleteOptions,
CommonDaoStreamForEachOptions,
CommonDaoStreamOptions,
Expand Down Expand Up @@ -251,27 +252,27 @@ export class RunnableDBQuery<
super(table || dao.cfg.table)
}

async runQuery(opt?: CommonDaoOptions): Promise<BM[]> {
async runQuery(opt?: CommonDaoReadOptions): Promise<BM[]> {
return await this.dao.runQuery(this, opt)
}

async runQuerySingleColumn<T = any>(opt?: CommonDaoOptions): Promise<T[]> {
async runQuerySingleColumn<T = any>(opt?: CommonDaoReadOptions): Promise<T[]> {
return await this.dao.runQuerySingleColumn<T>(this, opt)
}

async runQueryAsDBM(opt?: CommonDaoOptions): Promise<DBM[]> {
async runQueryAsDBM(opt?: CommonDaoReadOptions): Promise<DBM[]> {
return await this.dao.runQueryAsDBM(this, opt)
}

async runQueryExtended(opt?: CommonDaoOptions): Promise<RunQueryResult<BM>> {
async runQueryExtended(opt?: CommonDaoReadOptions): Promise<RunQueryResult<BM>> {
return await this.dao.runQueryExtended(this, opt)
}

async runQueryExtendedAsDBM(opt?: CommonDaoOptions): Promise<RunQueryResult<DBM>> {
async runQueryExtendedAsDBM(opt?: CommonDaoReadOptions): Promise<RunQueryResult<DBM>> {
return await this.dao.runQueryExtendedAsDBM(this, opt)
}

async runQueryCount(opt?: CommonDaoOptions): Promise<number> {
async runQueryCount(opt?: CommonDaoReadOptions): Promise<number> {
return await this.dao.runQueryCount(this, opt)
}

Expand Down Expand Up @@ -301,7 +302,7 @@ export class RunnableDBQuery<
return this.dao.streamQueryAsDBM(this, opt)
}

async queryIds(opt?: CommonDaoOptions): Promise<ID[]> {
async queryIds(opt?: CommonDaoReadOptions): Promise<ID[]> {
return await this.dao.queryIds(this, opt)
}

Expand Down
13 changes: 12 additions & 1 deletion src/testing/dbTest.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { _deepFreeze, _filterObject, _pick, _sortBy, pMap } from '@naturalcycles/js-lib'
import { _deepFreeze, _filterObject, _pick, _sortBy, localTime, pMap } from '@naturalcycles/js-lib'
import { CommonDB, CommonDBType } from '../common.db'
import { DBQuery } from '../query/dbQuery'
import {
Expand Down Expand Up @@ -77,6 +77,17 @@ export function runCommonDBTest(db: CommonDB, quirks: CommonDBImplementationQuir
expect(await db.getByIds(TEST_TABLE, ['abc', 'abcd'])).toEqual([])
})

// TimeMachine
if (support.timeMachine) {
test('getByIds(...) 10 minutes ago should return []', async () => {
expect(
await db.getByIds(TEST_TABLE, [item1.id, 'abc'], {
readAt: localTime.now().minus(10, 'minute').unix,
}),
).toEqual([])
})
}

// SAVE
if (support.nullValues) {
test('should allow to save and load null values', async () => {
Expand Down
Loading

0 comments on commit eb9074b

Please sign in to comment.