Skip to content

Commit

Permalink
IndexedDb Archivist Auto Upgrade based on Indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
arietrouw committed Dec 24, 2024
1 parent 084b0f8 commit 20ebf38
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"@xylabs/assert": "^4.4.26",
"@xylabs/exists": "^4.4.26",
"@xylabs/hex": "^4.4.26",
"@xylabs/logger": "^4.4.26",
"@xyo-network/archivist-abstract": "workspace:^",
"@xyo-network/archivist-model": "workspace:^",
"@xyo-network/module-model": "workspace:^",
Expand Down
252 changes: 169 additions & 83 deletions packages/modules/packages/archivist/packages/indexeddb/src/Archivist.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/* eslint-disable complexity */
import { uniq } from '@xylabs/array'
import { assertEx } from '@xylabs/assert'
import { exists } from '@xylabs/exists'
import { Hash, Hex } from '@xylabs/hex'
import { Logger } from '@xylabs/logger'
import { AbstractArchivist } from '@xyo-network/archivist-abstract'
import {
ArchivistAllQuerySchema,
Expand All @@ -21,7 +21,7 @@ import {
Payload, Schema, SequenceConstants, WithStorageMeta,
} from '@xyo-network/payload-model'
import {
IDBPCursorWithValue, IDBPDatabase, openDB,
IDBPCursorWithValue, IDBPDatabase, IDBPObjectStore, openDB,
} from 'idb'

import { IndexedDbArchivistConfigSchema } from './Config.ts'
Expand Down Expand Up @@ -67,6 +67,7 @@ export class IndexedDbArchivist<
static readonly sequenceIndexName = buildStandardIndexName(IndexedDbArchivist.sequenceIndex)

private _dbName?: string
private _dbVersion?: number
private _storeName?: string

/**
Expand Down Expand Up @@ -97,7 +98,8 @@ export class IndexedDbArchivist<
* The database version. If not supplied via config, it defaults to 1.
*/
get dbVersion() {
return this.config?.dbVersion ?? IndexedDbArchivist.defaultDbVersion
this._dbVersion = this._dbVersion ?? this.config?.dbVersion ?? IndexedDbArchivist.defaultDbVersion
return this._dbVersion
}

override get queries() {
Expand Down Expand Up @@ -140,15 +142,72 @@ export class IndexedDbArchivist<
]
}

private static createStore(db: IDBPDatabase<PayloadStore>, storeName: string, indexes: IndexDescription[], logger?: Logger) {
logger?.log(`Creating store ${storeName}`)
// Create the store
const store = db.createObjectStore(storeName, {
// If it isn't explicitly set, create a value by auto incrementing.
autoIncrement: true,
})
// Name the store
store.name = storeName
// Create an index on the hash
for (const {
key, multiEntry, unique,
} of indexes) {
const indexKeys = Object.keys(key)
const keys = indexKeys.length === 1 ? indexKeys[0] : indexKeys
const indexName = buildStandardIndexName({ key, unique })
store.createIndex(indexName, keys, { multiEntry, unique })
}
}

private static async useDb<T>(dbName: string, callback: (db: IDBPDatabase<PayloadStore>) => Promise<T> | T): Promise<T> {
const db = await openDB<PayloadStore>(dbName)
try {
return await callback(db)
} finally {
db.close()
}
}

private static async useReadOnlyStore<T>(
db: IDBPDatabase<PayloadStore>,
storeName: string,
callback: (store: IDBPObjectStore<PayloadStore, [string], string, 'readonly'>) => Promise<T> | T,
): Promise<T> {
const transaction = db.transaction(storeName, 'readonly')
const store = transaction.objectStore(storeName)
try {
return await callback(store)
} finally {
await transaction.done
}
}

private static async useReadWriteStore<T>(
db: IDBPDatabase<PayloadStore>,
storeName: string,
callback: (store: IDBPObjectStore<PayloadStore, [string], string, 'readwrite'>) => Promise<T> | T,
): Promise<T> {
const transaction = db.transaction(storeName, 'readwrite')
const store = transaction.objectStore(storeName)
try {
return await callback(store)
} finally {
await transaction.done
}
}

protected override async allHandler(): Promise<WithStorageMeta<Payload>[]> {
// Get all payloads from the store
const payloads = await this.useDb(db => db.getAll(this.storeName))
const payloads = await this.useInitializedDb(db => db.getAll(this.storeName))
// Remove any metadata before returning to the client
return payloads
}

protected override async clearHandler(): Promise<void> {
await this.useDb(db => db.clear(this.storeName))
await this.useInitializedDb(db => db.clear(this.storeName))
}

protected override async deleteHandler(hashes: Hash[]): Promise<Hash[]> {
Expand All @@ -161,7 +220,7 @@ export class IndexedDbArchivist<
}))).flat()
// Remove any duplicates
const distinctHashes = [...new Set(hashesToDelete)]
return await this.useDb(async (db) => {
return await this.useInitializedDb(async (db) => {
// Only return hashes that were successfully deleted
const found = await Promise.all(
distinctHashes.map(async (hash) => {
Expand Down Expand Up @@ -190,42 +249,44 @@ export class IndexedDbArchivist<
cursor?: Hex,
): Promise<WithStorageMeta[]> {
// TODO: We have to handle the case where the cursor is not found, and then find the correct cursor to start with (thunked cursor)
const transaction = db.transaction(storeName, 'readonly')
const store = transaction.objectStore(storeName)
const sequenceIndex = assertEx(store.index(IndexedDbArchivist.sequenceIndexName), () => 'Failed to get sequence index')
let sequenceCursor: IDBPCursorWithValue<PayloadStore, [string]> | null | undefined = undefined
const parsedCursor = cursor === SequenceConstants.minLocalSequence ? null : cursor
sequenceCursor = assertEx(await sequenceIndex.openCursor(
null,
order === 'desc' ? 'prev' : 'next',
), () => `Failed to get cursor [${parsedCursor}, ${cursor}]`)
if (!sequenceCursor?.value) return []
try {
sequenceCursor = parsedCursor
? sequenceCursor.value._sequence === parsedCursor ? await sequenceCursor?.advance(1) : await (await sequenceCursor?.continue(parsedCursor))?.advance(1)
: sequenceCursor // advance to skip the initial value
} catch {
return []
}
return await IndexedDbArchivist.useReadOnlyStore(db, storeName, async (store) => {
const sequenceIndex = assertEx(store.index(IndexedDbArchivist.sequenceIndexName), () => 'Failed to get sequence index')
let sequenceCursor: IDBPCursorWithValue<PayloadStore, [string]> | null | undefined
const parsedCursor = cursor === SequenceConstants.minLocalSequence ? null : cursor
sequenceCursor = assertEx(await sequenceIndex.openCursor(
null,
order === 'desc' ? 'prev' : 'next',
), () => `Failed to get cursor [${parsedCursor}, ${cursor}]`)
if (!sequenceCursor?.value) return []
try {
sequenceCursor = parsedCursor
? sequenceCursor.value._sequence === parsedCursor
? await sequenceCursor?.advance(1)
: await (await sequenceCursor?.continue(parsedCursor))?.advance(1)
: sequenceCursor // advance to skip the initial value
} catch {
return []
}

let remaining = limit
const result: WithStorageMeta[] = []
while (remaining) {
const value = sequenceCursor?.value
if (value) {
result.push(value)
try {
sequenceCursor = await sequenceCursor?.advance(1)
} catch {
break
}
if (sequenceCursor === null) {
break
let remaining = limit
const result: WithStorageMeta[] = []
while (remaining) {
const value = sequenceCursor?.value
if (value) {
result.push(value)
try {
sequenceCursor = await sequenceCursor?.advance(1)
} catch {
break
}
if (sequenceCursor === null) {
break
}
}
remaining--
}
remaining--
}
return result
return result
})
}

/**
Expand All @@ -242,23 +303,23 @@ export class IndexedDbArchivist<
indexName: string,
key: IDBValidKey,
): Promise<[number, WithStorageMeta] | undefined> {
const transaction = db.transaction(storeName, 'readonly')
const store = transaction.objectStore(storeName)
const index = store.index(indexName)
const cursor = await index.openCursor(key)
if (cursor) {
const singleValue = cursor.value
// NOTE: It's known to be a number because we are using IndexedDB supplied auto-incrementing keys
if (typeof cursor.primaryKey !== 'number') {
throw new TypeError('primaryKey must be a number')
}
return await IndexedDbArchivist.useReadOnlyStore(db, storeName, async (store) => {
const index = store.index(indexName)
const cursor = await index.openCursor(key)
if (cursor) {
const singleValue = cursor.value
// NOTE: It's known to be a number because we are using IndexedDB supplied auto-incrementing keys
if (typeof cursor.primaryKey !== 'number') {
throw new TypeError('primaryKey must be a number')
}

return [cursor.primaryKey, singleValue]
}
return [cursor.primaryKey, singleValue]
}
})
}

protected override async getHandler(hashes: string[]): Promise<WithStorageMeta[]> {
const payloads = await this.useDb(db =>
const payloads = await this.useInitializedDb(db =>
Promise.all(
// Filter duplicates to prevent unnecessary DB queries
uniq(hashes).map(async (hash) => {
Expand Down Expand Up @@ -293,17 +354,14 @@ export class IndexedDbArchivist<
}

protected override async insertHandler(payloads: WithStorageMeta<Payload>[]): Promise<WithStorageMeta<Payload>[]> {
return await this.useDb(async (db) => {
return await this.useInitializedDb(async (db) => {
// Perform all inserts via a single transaction to ensure atomicity
// with respect to checking for the pre-existence of the hash.
// This is done to prevent duplicate root hashes due to race
// conditions between checking vs insertion.
const tx = db.transaction(this.storeName, 'readwrite')
// Get the object store
const store = tx.objectStore(this.storeName)
// Return only the payloads that were successfully inserted
const inserted: WithStorageMeta<Payload>[] = []
try {
return await IndexedDbArchivist.useReadWriteStore(db, this.storeName, async (store) => {
// Return only the payloads that were successfully inserted
const inserted: WithStorageMeta<Payload>[] = []
await Promise.all(
payloads.map(async (payload) => {
// only insert if hash does not already exist
Expand All @@ -315,19 +373,16 @@ export class IndexedDbArchivist<
}
}),
)
} finally {
// Ensure the transaction is closed
await tx.done
}
return inserted
return inserted
})
})
}

protected override async nextHandler(options?: ArchivistNextOptions): Promise<WithStorageMeta<Payload>[]> {
const {
limit, cursor, order,
} = options ?? {}
return await this.useDb(async (db) => {
return await this.useInitializedDb(async (db) => {
return await this.getFromCursor(db, this.storeName, order, limit ?? 10, cursor)
})
}
Expand All @@ -336,18 +391,62 @@ export class IndexedDbArchivist<
await super.startHandler()
// NOTE: We could defer this creation to first access but we
// want to fail fast here in case something is wrong
await this.useDb(() => {})
await this.useInitializedDb(() => {})
return true
}

private async checkIndexes(db: IDBPDatabase<PayloadStore>) {
const { indexes, storeName } = this
if (db.objectStoreNames.contains(storeName)) {
try {
return await IndexedDbArchivist.useReadWriteStore(db, storeName, (store) => {
const existingIndexes = store.indexNames
for (const { key, unique } of indexes) {
const indexName = buildStandardIndexName({ key, unique })
if (!existingIndexes.contains(indexName)) {
// the index is missing, so trigger an upgrade
this._dbVersion = this._dbVersion === undefined ? 0 : this._dbVersion + 1
return
}
}
})
} finally {
db.close()
}
}
}

private async checkObjectStore() {
const { dbName, storeName } = this
return await IndexedDbArchivist.useDb(dbName, (db) => {
// we check the version here to see if someone else upgraded it past where we think we are
if (db.version >= (this._dbVersion ?? 0)) {
this._dbVersion = db.version
}
try {
if (db.objectStoreNames.contains(storeName)) {
return this.checkIndexes(db)
} else {
this.logger.debug(`IndexedDbArchivist: Object store ${storeName} missing, triggering upgrade ${(this._dbVersion ?? 0) + 1}`)
this._dbVersion = (this._dbVersion ?? 0) + 1
return
}
} finally {
db.close()
}
})
}

/**
* Returns that the desired DB/Store initialized to the correct version
* @returns The initialized DB
*/
private async getInitializedDb(): Promise<IDBPDatabase<PayloadStore>> {
await this.checkObjectStore()
const {
dbName, dbVersion, indexes, storeName, logger,
} = this
this.logger.debug(`IndexedDbArchivist: Opening DB ${dbName} Store ${storeName} version ${dbVersion}`)
return await openDB<PayloadStore>(dbName, dbVersion, {
blocked(currentVersion, blockedVersion, event) {
logger.warn(`IndexedDbArchivist: Blocked from upgrading from ${currentVersion} to ${blockedVersion}`, event)
Expand All @@ -359,6 +458,7 @@ export class IndexedDbArchivist<
logger.log('IndexedDbArchivist: Terminated')
},
upgrade(database, oldVersion, newVersion, transaction) {
logger.debug(`IndexedDbArchivist: upgrade from ${oldVersion} to ${newVersion}`)
// NOTE: This is called whenever the DB is created/updated. We could simply ensure the desired end
// state but, out of an abundance of caution, we will just delete (so we know where we are starting
// from a known good point) and recreate the desired state. This prioritizes resilience over data
Expand All @@ -376,22 +476,7 @@ export class IndexedDbArchivist<
}
}
}
// Create the store
const store = database.createObjectStore(storeName, {
// If it isn't explicitly set, create a value by auto incrementing.
autoIncrement: true,
})
// Name the store
store.name = storeName
// Create an index on the hash
for (const {
key, multiEntry, unique,
} of indexes) {
const indexKeys = Object.keys(key)
const keys = indexKeys.length === 1 ? indexKeys[0] : indexKeys
const indexName = buildStandardIndexName({ key, unique })
store.createIndex(indexName, keys, { multiEntry, unique })
}
IndexedDbArchivist.createStore(database, storeName, indexes, logger)
},
})
}
Expand All @@ -401,7 +486,8 @@ export class IndexedDbArchivist<
* @param callback The method to execute with the initialized DB
* @returns
*/
private async useDb<T>(callback: (db: IDBPDatabase<PayloadStore>) => Promise<T> | T): Promise<T> {
private async useInitializedDb<T>(callback: (db: IDBPDatabase<PayloadStore>) => Promise<T> | T): Promise<T> {
this.logger.debug('IndexedDbArchivist: Using DB')
// Get the initialized DB
const db = await this.getInitializedDb()
try {
Expand Down
Loading

0 comments on commit 20ebf38

Please sign in to comment.