Skip to content

Commit

Permalink
Keeping old indexes in indexeddb upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
arietrouw committed Dec 24, 2024
1 parent 20ebf38 commit 0a48c99
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 103 deletions.
143 changes: 41 additions & 102 deletions packages/modules/packages/archivist/packages/indexeddb/src/Archivist.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { uniq } from '@xylabs/array'
import { assertEx } from '@xylabs/assert'
import { exists } from '@xylabs/exists'
import { Hash, Hex } from '@xylabs/hex'
import { Logger } from '@xylabs/logger'
import { AbstractArchivist } from '@xyo-network/archivist-abstract'
import {
ArchivistAllQuerySchema,
Expand All @@ -21,10 +20,15 @@ import {
Payload, Schema, SequenceConstants, WithStorageMeta,
} from '@xyo-network/payload-model'
import {
IDBPCursorWithValue, IDBPDatabase, IDBPObjectStore, openDB,
IDBPCursorWithValue, IDBPDatabase, openDB,
} from 'idb'

import { IndexedDbArchivistConfigSchema } from './Config.ts'
import {
createStore,
getExistingIndexes,
useDb, useReadOnlyStore, useReadWriteStore,
} from './IndexedDbHelpers.ts'
import { IndexedDbArchivistParams } from './Params.ts'

export interface PayloadStore {
Expand Down Expand Up @@ -142,72 +146,15 @@ export class IndexedDbArchivist<
]
}

private static createStore(db: IDBPDatabase<PayloadStore>, storeName: string, indexes: IndexDescription[], logger?: Logger) {
logger?.log(`Creating store ${storeName}`)
// Create the store
const store = db.createObjectStore(storeName, {
// If it isn't explicitly set, create a value by auto incrementing.
autoIncrement: true,
})
// Name the store
store.name = storeName
// Create an index on the hash
for (const {
key, multiEntry, unique,
} of indexes) {
const indexKeys = Object.keys(key)
const keys = indexKeys.length === 1 ? indexKeys[0] : indexKeys
const indexName = buildStandardIndexName({ key, unique })
store.createIndex(indexName, keys, { multiEntry, unique })
}
}

private static async useDb<T>(dbName: string, callback: (db: IDBPDatabase<PayloadStore>) => Promise<T> | T): Promise<T> {
const db = await openDB<PayloadStore>(dbName)
try {
return await callback(db)
} finally {
db.close()
}
}

private static async useReadOnlyStore<T>(
db: IDBPDatabase<PayloadStore>,
storeName: string,
callback: (store: IDBPObjectStore<PayloadStore, [string], string, 'readonly'>) => Promise<T> | T,
): Promise<T> {
const transaction = db.transaction(storeName, 'readonly')
const store = transaction.objectStore(storeName)
try {
return await callback(store)
} finally {
await transaction.done
}
}

private static async useReadWriteStore<T>(
db: IDBPDatabase<PayloadStore>,
storeName: string,
callback: (store: IDBPObjectStore<PayloadStore, [string], string, 'readwrite'>) => Promise<T> | T,
): Promise<T> {
const transaction = db.transaction(storeName, 'readwrite')
const store = transaction.objectStore(storeName)
try {
return await callback(store)
} finally {
await transaction.done
}
}

protected override async allHandler(): Promise<WithStorageMeta<Payload>[]> {
// Get all payloads from the store
const payloads = await this.useInitializedDb(db => db.getAll(this.storeName))
const payloads = await this.useDb(db => db.getAll(this.storeName))
// Remove any metadata before returning to the client
return payloads
}

protected override async clearHandler(): Promise<void> {
await this.useInitializedDb(db => db.clear(this.storeName))
await this.useDb(db => db.clear(this.storeName))
}

protected override async deleteHandler(hashes: Hash[]): Promise<Hash[]> {
Expand All @@ -220,7 +167,7 @@ export class IndexedDbArchivist<
}))).flat()
// Remove any duplicates
const distinctHashes = [...new Set(hashesToDelete)]
return await this.useInitializedDb(async (db) => {
return await this.useDb(async (db) => {
// Only return hashes that were successfully deleted
const found = await Promise.all(
distinctHashes.map(async (hash) => {
Expand Down Expand Up @@ -249,7 +196,7 @@ export class IndexedDbArchivist<
cursor?: Hex,
): Promise<WithStorageMeta[]> {
// TODO: We have to handle the case where the cursor is not found, and then find the correct cursor to start with (thunked cursor)
return await IndexedDbArchivist.useReadOnlyStore(db, storeName, async (store) => {
return await useReadOnlyStore(db, storeName, async (store) => {
const sequenceIndex = assertEx(store.index(IndexedDbArchivist.sequenceIndexName), () => 'Failed to get sequence index')
let sequenceCursor: IDBPCursorWithValue<PayloadStore, [string]> | null | undefined
const parsedCursor = cursor === SequenceConstants.minLocalSequence ? null : cursor
Expand Down Expand Up @@ -303,7 +250,7 @@ export class IndexedDbArchivist<
indexName: string,
key: IDBValidKey,
): Promise<[number, WithStorageMeta] | undefined> {
return await IndexedDbArchivist.useReadOnlyStore(db, storeName, async (store) => {
return await useReadOnlyStore(db, storeName, async (store) => {
const index = store.index(indexName)
const cursor = await index.openCursor(key)
if (cursor) {
Expand All @@ -319,7 +266,7 @@ export class IndexedDbArchivist<
}

protected override async getHandler(hashes: string[]): Promise<WithStorageMeta[]> {
const payloads = await this.useInitializedDb(db =>
const payloads = await this.useDb(db =>
Promise.all(
// Filter duplicates to prevent unnecessary DB queries
uniq(hashes).map(async (hash) => {
Expand Down Expand Up @@ -354,12 +301,12 @@ export class IndexedDbArchivist<
}

protected override async insertHandler(payloads: WithStorageMeta<Payload>[]): Promise<WithStorageMeta<Payload>[]> {
return await this.useInitializedDb(async (db) => {
return await this.useDb(async (db) => {
// Perform all inserts via a single transaction to ensure atomicity
// with respect to checking for the pre-existence of the hash.
// This is done to prevent duplicate root hashes due to race
// conditions between checking vs insertion.
return await IndexedDbArchivist.useReadWriteStore(db, this.storeName, async (store) => {
return await useReadWriteStore(db, this.storeName, async (store) => {
// Return only the payloads that were successfully inserted
const inserted: WithStorageMeta<Payload>[] = []
await Promise.all(
Expand All @@ -382,7 +329,7 @@ export class IndexedDbArchivist<
const {
limit, cursor, order,
} = options ?? {}
return await this.useInitializedDb(async (db) => {
return await this.useDb(async (db) => {
return await this.getFromCursor(db, this.storeName, order, limit ?? 10, cursor)
})
}
Expand All @@ -391,48 +338,40 @@ export class IndexedDbArchivist<
await super.startHandler()
// NOTE: We could defer this creation to first access but we
// want to fail fast here in case something is wrong
await this.useInitializedDb(() => {})
await this.useDb(() => {})
return true
}

private async checkIndexes(db: IDBPDatabase<PayloadStore>) {
private async checkIndexes(db: IDBPDatabase<PayloadStore>): Promise<IndexDescription[]> {
const { indexes, storeName } = this
if (db.objectStoreNames.contains(storeName)) {
try {
return await IndexedDbArchivist.useReadWriteStore(db, storeName, (store) => {
const existingIndexes = store.indexNames
for (const { key, unique } of indexes) {
const indexName = buildStandardIndexName({ key, unique })
if (!existingIndexes.contains(indexName)) {
// the index is missing, so trigger an upgrade
this._dbVersion = this._dbVersion === undefined ? 0 : this._dbVersion + 1
return
}
}
})
} finally {
db.close()
const existingIndexes = await getExistingIndexes(db, storeName)
const existingIndexNames = new Set(existingIndexes.map(({ name }) => name).filter(exists))
for (const { key, unique } of indexes) {
const indexName = buildStandardIndexName({ key, unique })
if (!existingIndexNames.has(indexName)) {
// the index is missing, so trigger an upgrade
this._dbVersion = this._dbVersion === undefined ? 0 : this._dbVersion + 1
break
}
}
return existingIndexes
}
return []
}

private async checkObjectStore() {
private async checkObjectStore(): Promise<IndexDescription[]> {
const { dbName, storeName } = this
return await IndexedDbArchivist.useDb(dbName, (db) => {
return await useDb(dbName, (db) => {
// we check the version here to see if someone else upgraded it past where we think we are
if (db.version >= (this._dbVersion ?? 0)) {
this._dbVersion = db.version
}
try {
if (db.objectStoreNames.contains(storeName)) {
return this.checkIndexes(db)
} else {
this.logger.debug(`IndexedDbArchivist: Object store ${storeName} missing, triggering upgrade ${(this._dbVersion ?? 0) + 1}`)
this._dbVersion = (this._dbVersion ?? 0) + 1
return
}
} finally {
db.close()
if (db.objectStoreNames.contains(storeName)) {
return this.checkIndexes(db)
} else {
this._dbVersion = (this._dbVersion ?? 0) + 1
return []
}
})
}
Expand All @@ -442,11 +381,10 @@ export class IndexedDbArchivist<
* @returns The initialized DB
*/
private async getInitializedDb(): Promise<IDBPDatabase<PayloadStore>> {
await this.checkObjectStore()
const existingIndexes = await this.checkObjectStore()
const {
dbName, dbVersion, indexes, storeName, logger,
} = this
this.logger.debug(`IndexedDbArchivist: Opening DB ${dbName} Store ${storeName} version ${dbVersion}`)
return await openDB<PayloadStore>(dbName, dbVersion, {
blocked(currentVersion, blockedVersion, event) {
logger.warn(`IndexedDbArchivist: Blocked from upgrading from ${currentVersion} to ${blockedVersion}`, event)
Expand All @@ -458,7 +396,6 @@ export class IndexedDbArchivist<
logger.log('IndexedDbArchivist: Terminated')
},
upgrade(database, oldVersion, newVersion, transaction) {
logger.debug(`IndexedDbArchivist: upgrade from ${oldVersion} to ${newVersion}`)
// NOTE: This is called whenever the DB is created/updated. We could simply ensure the desired end
// state but, out of an abundance of caution, we will just delete (so we know where we are starting
// from a known good point) and recreate the desired state. This prioritizes resilience over data
Expand All @@ -476,7 +413,10 @@ export class IndexedDbArchivist<
}
}
}
IndexedDbArchivist.createStore(database, storeName, indexes, logger)
// keep any indexes that were there before but are not required by this config
// we do this incase there are two or more configs trying to use the db and they have mismatched indexes, so they do not erase each other's indexes
const existingIndexesToKeep = existingIndexes.filter(({ name: existingName }) => !indexes.some(({ name }) => name === existingName))
createStore(database, storeName, [...existingIndexesToKeep, ...indexes], logger)
},
})
}
Expand All @@ -486,8 +426,7 @@ export class IndexedDbArchivist<
* @param callback The method to execute with the initialized DB
* @returns
*/
private async useInitializedDb<T>(callback: (db: IDBPDatabase<PayloadStore>) => Promise<T> | T): Promise<T> {
this.logger.debug('IndexedDbArchivist: Using DB')
private async useDb<T>(callback: (db: IDBPDatabase<PayloadStore>) => Promise<T> | T): Promise<T> {
// Get the initialized DB
const db = await this.getInitializedDb()
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import type { Logger } from '@xylabs/logger'
import type { IndexDescription, IndexDirection } from '@xyo-network/archivist-model'
import { buildStandardIndexName } from '@xyo-network/archivist-model'
import type { IDBPDatabase, IDBPObjectStore } from 'idb'
import { openDB } from 'idb'

import type { PayloadStore } from './Archivist.ts'

export function createStore(db: IDBPDatabase<PayloadStore>, storeName: string, indexes: IndexDescription[], logger?: Logger) {
logger?.log(`Creating store ${storeName}`)
// Create the store
const store = db.createObjectStore(storeName, {
// If it isn't explicitly set, create a value by auto incrementing.
autoIncrement: true,
})
// Name the store
store.name = storeName
// Create an index on the hash
for (const {
key, multiEntry, unique,
} of indexes) {
const indexKeys = Object.keys(key)
const keys = indexKeys.length === 1 ? indexKeys[0] : indexKeys
const indexName = buildStandardIndexName({ key, unique })
store.createIndex(indexName, keys, { multiEntry, unique })
}
}

export async function getExistingIndexes(db: IDBPDatabase<PayloadStore>, storeName: string): Promise<IndexDescription[]> {
return await useReadOnlyStore(db, storeName, (store) => {
return [...store.indexNames].map((indexName) => {
const index = store.index(indexName)
const key: Record<string, IndexDirection> = {}
if (Array.isArray(index.keyPath)) {
for (const keyPath of index.keyPath) {
key[keyPath] = 1
}
} else {
key[index.keyPath] = 1
}
const desc: IndexDescription = {
name: indexName,
key,
unique: index.unique,
multiEntry: index.multiEntry,
}
return desc
})
})
}

export async function useDb<T>(dbName: string, callback: (db: IDBPDatabase<PayloadStore>) => Promise<T> | T): Promise<T> {
const db = await openDB<PayloadStore>(dbName)
try {
return await callback(db)
} finally {
db.close()
}
}

export async function useReadOnlyStore<T>(
db: IDBPDatabase<PayloadStore>,
storeName: string,
callback: (store: IDBPObjectStore<PayloadStore, [string], string, 'readonly'>) => Promise<T> | T,
): Promise<T> {
const transaction = db.transaction(storeName, 'readonly')
const store = transaction.objectStore(storeName)
try {
return await callback(store)
} finally {
await transaction.done
}
}

export async function useReadWriteStore<T>(
db: IDBPDatabase<PayloadStore>,
storeName: string,
callback: (store: IDBPObjectStore<PayloadStore, [string], string, 'readwrite'>) => Promise<T> | T,
): Promise<T> {
const transaction = db.transaction(storeName, 'readwrite')
const store = transaction.objectStore(storeName)
try {
return await callback(store)
} finally {
await transaction.done
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ describe('IndexedDbArchivist', () => {
archivistModule = await IndexedDbArchivist.create({
account,
config: {
dbName, schema: IndexedDbArchivistConfigSchema, storeName, consoleLogger: LogLevel.debug,
dbName, schema: IndexedDbArchivistConfigSchema, storeName,
},
})
})
Expand Down

0 comments on commit 0a48c99

Please sign in to comment.