diff --git a/packages/modules/packages/archivist/packages/indexeddb/src/Archivist.ts b/packages/modules/packages/archivist/packages/indexeddb/src/Archivist.ts index f646a7f9e2..1c774f56cc 100644 --- a/packages/modules/packages/archivist/packages/indexeddb/src/Archivist.ts +++ b/packages/modules/packages/archivist/packages/indexeddb/src/Archivist.ts @@ -2,7 +2,6 @@ import { uniq } from '@xylabs/array' import { assertEx } from '@xylabs/assert' import { exists } from '@xylabs/exists' import { Hash, Hex } from '@xylabs/hex' -import { Logger } from '@xylabs/logger' import { AbstractArchivist } from '@xyo-network/archivist-abstract' import { ArchivistAllQuerySchema, @@ -21,10 +20,15 @@ import { Payload, Schema, SequenceConstants, WithStorageMeta, } from '@xyo-network/payload-model' import { - IDBPCursorWithValue, IDBPDatabase, IDBPObjectStore, openDB, + IDBPCursorWithValue, IDBPDatabase, openDB, } from 'idb' import { IndexedDbArchivistConfigSchema } from './Config.ts' +import { + createStore, + getExistingIndexes, + useDb, useReadOnlyStore, useReadWriteStore, +} from './IndexedDbHelpers.ts' import { IndexedDbArchivistParams } from './Params.ts' export interface PayloadStore { @@ -142,72 +146,15 @@ export class IndexedDbArchivist< ] } - private static createStore(db: IDBPDatabase, storeName: string, indexes: IndexDescription[], logger?: Logger) { - logger?.log(`Creating store ${storeName}`) - // Create the store - const store = db.createObjectStore(storeName, { - // If it isn't explicitly set, create a value by auto incrementing. - autoIncrement: true, - }) - // Name the store - store.name = storeName - // Create an index on the hash - for (const { - key, multiEntry, unique, - } of indexes) { - const indexKeys = Object.keys(key) - const keys = indexKeys.length === 1 ? indexKeys[0] : indexKeys - const indexName = buildStandardIndexName({ key, unique }) - store.createIndex(indexName, keys, { multiEntry, unique }) - } - } - - private static async useDb(dbName: string, callback: (db: IDBPDatabase) => Promise | T): Promise { - const db = await openDB(dbName) - try { - return await callback(db) - } finally { - db.close() - } - } - - private static async useReadOnlyStore( - db: IDBPDatabase, - storeName: string, - callback: (store: IDBPObjectStore) => Promise | T, - ): Promise { - const transaction = db.transaction(storeName, 'readonly') - const store = transaction.objectStore(storeName) - try { - return await callback(store) - } finally { - await transaction.done - } - } - - private static async useReadWriteStore( - db: IDBPDatabase, - storeName: string, - callback: (store: IDBPObjectStore) => Promise | T, - ): Promise { - const transaction = db.transaction(storeName, 'readwrite') - const store = transaction.objectStore(storeName) - try { - return await callback(store) - } finally { - await transaction.done - } - } - protected override async allHandler(): Promise[]> { // Get all payloads from the store - const payloads = await this.useInitializedDb(db => db.getAll(this.storeName)) + const payloads = await this.useDb(db => db.getAll(this.storeName)) // Remove any metadata before returning to the client return payloads } protected override async clearHandler(): Promise { - await this.useInitializedDb(db => db.clear(this.storeName)) + await this.useDb(db => db.clear(this.storeName)) } protected override async deleteHandler(hashes: Hash[]): Promise { @@ -220,7 +167,7 @@ export class IndexedDbArchivist< }))).flat() // Remove any duplicates const distinctHashes = [...new Set(hashesToDelete)] - return await this.useInitializedDb(async (db) => { + return await this.useDb(async (db) => { // Only return hashes that were successfully deleted const found = await Promise.all( distinctHashes.map(async (hash) => { @@ -249,7 +196,7 @@ export class IndexedDbArchivist< cursor?: Hex, ): Promise { // TODO: We have to handle the case where the cursor is not found, and then find the correct cursor to start with (thunked cursor) - return await IndexedDbArchivist.useReadOnlyStore(db, storeName, async (store) => { + return await useReadOnlyStore(db, storeName, async (store) => { const sequenceIndex = assertEx(store.index(IndexedDbArchivist.sequenceIndexName), () => 'Failed to get sequence index') let sequenceCursor: IDBPCursorWithValue | null | undefined const parsedCursor = cursor === SequenceConstants.minLocalSequence ? null : cursor @@ -303,7 +250,7 @@ export class IndexedDbArchivist< indexName: string, key: IDBValidKey, ): Promise<[number, WithStorageMeta] | undefined> { - return await IndexedDbArchivist.useReadOnlyStore(db, storeName, async (store) => { + return await useReadOnlyStore(db, storeName, async (store) => { const index = store.index(indexName) const cursor = await index.openCursor(key) if (cursor) { @@ -319,7 +266,7 @@ export class IndexedDbArchivist< } protected override async getHandler(hashes: string[]): Promise { - const payloads = await this.useInitializedDb(db => + const payloads = await this.useDb(db => Promise.all( // Filter duplicates to prevent unnecessary DB queries uniq(hashes).map(async (hash) => { @@ -354,12 +301,12 @@ export class IndexedDbArchivist< } protected override async insertHandler(payloads: WithStorageMeta[]): Promise[]> { - return await this.useInitializedDb(async (db) => { + return await this.useDb(async (db) => { // Perform all inserts via a single transaction to ensure atomicity // with respect to checking for the pre-existence of the hash. // This is done to prevent duplicate root hashes due to race // conditions between checking vs insertion. - return await IndexedDbArchivist.useReadWriteStore(db, this.storeName, async (store) => { + return await useReadWriteStore(db, this.storeName, async (store) => { // Return only the payloads that were successfully inserted const inserted: WithStorageMeta[] = [] await Promise.all( @@ -382,7 +329,7 @@ export class IndexedDbArchivist< const { limit, cursor, order, } = options ?? {} - return await this.useInitializedDb(async (db) => { + return await this.useDb(async (db) => { return await this.getFromCursor(db, this.storeName, order, limit ?? 10, cursor) }) } @@ -391,48 +338,40 @@ export class IndexedDbArchivist< await super.startHandler() // NOTE: We could defer this creation to first access but we // want to fail fast here in case something is wrong - await this.useInitializedDb(() => {}) + await this.useDb(() => {}) return true } - private async checkIndexes(db: IDBPDatabase) { + private async checkIndexes(db: IDBPDatabase): Promise { const { indexes, storeName } = this if (db.objectStoreNames.contains(storeName)) { - try { - return await IndexedDbArchivist.useReadWriteStore(db, storeName, (store) => { - const existingIndexes = store.indexNames - for (const { key, unique } of indexes) { - const indexName = buildStandardIndexName({ key, unique }) - if (!existingIndexes.contains(indexName)) { - // the index is missing, so trigger an upgrade - this._dbVersion = this._dbVersion === undefined ? 0 : this._dbVersion + 1 - return - } - } - }) - } finally { - db.close() + const existingIndexes = await getExistingIndexes(db, storeName) + const existingIndexNames = new Set(existingIndexes.map(({ name }) => name).filter(exists)) + for (const { key, unique } of indexes) { + const indexName = buildStandardIndexName({ key, unique }) + if (!existingIndexNames.has(indexName)) { + // the index is missing, so trigger an upgrade + this._dbVersion = this._dbVersion === undefined ? 0 : this._dbVersion + 1 + break + } } + return existingIndexes } + return [] } - private async checkObjectStore() { + private async checkObjectStore(): Promise { const { dbName, storeName } = this - return await IndexedDbArchivist.useDb(dbName, (db) => { + return await useDb(dbName, (db) => { // we check the version here to see if someone else upgraded it past where we think we are if (db.version >= (this._dbVersion ?? 0)) { this._dbVersion = db.version } - try { - if (db.objectStoreNames.contains(storeName)) { - return this.checkIndexes(db) - } else { - this.logger.debug(`IndexedDbArchivist: Object store ${storeName} missing, triggering upgrade ${(this._dbVersion ?? 0) + 1}`) - this._dbVersion = (this._dbVersion ?? 0) + 1 - return - } - } finally { - db.close() + if (db.objectStoreNames.contains(storeName)) { + return this.checkIndexes(db) + } else { + this._dbVersion = (this._dbVersion ?? 0) + 1 + return [] } }) } @@ -442,11 +381,10 @@ export class IndexedDbArchivist< * @returns The initialized DB */ private async getInitializedDb(): Promise> { - await this.checkObjectStore() + const existingIndexes = await this.checkObjectStore() const { dbName, dbVersion, indexes, storeName, logger, } = this - this.logger.debug(`IndexedDbArchivist: Opening DB ${dbName} Store ${storeName} version ${dbVersion}`) return await openDB(dbName, dbVersion, { blocked(currentVersion, blockedVersion, event) { logger.warn(`IndexedDbArchivist: Blocked from upgrading from ${currentVersion} to ${blockedVersion}`, event) @@ -458,7 +396,6 @@ export class IndexedDbArchivist< logger.log('IndexedDbArchivist: Terminated') }, upgrade(database, oldVersion, newVersion, transaction) { - logger.debug(`IndexedDbArchivist: upgrade from ${oldVersion} to ${newVersion}`) // NOTE: This is called whenever the DB is created/updated. We could simply ensure the desired end // state but, out of an abundance of caution, we will just delete (so we know where we are starting // from a known good point) and recreate the desired state. This prioritizes resilience over data @@ -476,7 +413,10 @@ export class IndexedDbArchivist< } } } - IndexedDbArchivist.createStore(database, storeName, indexes, logger) + // keep any indexes that were there before but are not required by this config + // we do this incase there are two or more configs trying to use the db and they have mismatched indexes, so they do not erase each other's indexes + const existingIndexesToKeep = existingIndexes.filter(({ name: existingName }) => !indexes.some(({ name }) => name === existingName)) + createStore(database, storeName, [...existingIndexesToKeep, ...indexes], logger) }, }) } @@ -486,8 +426,7 @@ export class IndexedDbArchivist< * @param callback The method to execute with the initialized DB * @returns */ - private async useInitializedDb(callback: (db: IDBPDatabase) => Promise | T): Promise { - this.logger.debug('IndexedDbArchivist: Using DB') + private async useDb(callback: (db: IDBPDatabase) => Promise | T): Promise { // Get the initialized DB const db = await this.getInitializedDb() try { diff --git a/packages/modules/packages/archivist/packages/indexeddb/src/IndexedDbHelpers.ts b/packages/modules/packages/archivist/packages/indexeddb/src/IndexedDbHelpers.ts new file mode 100644 index 0000000000..872498431f --- /dev/null +++ b/packages/modules/packages/archivist/packages/indexeddb/src/IndexedDbHelpers.ts @@ -0,0 +1,87 @@ +import type { Logger } from '@xylabs/logger' +import type { IndexDescription, IndexDirection } from '@xyo-network/archivist-model' +import { buildStandardIndexName } from '@xyo-network/archivist-model' +import type { IDBPDatabase, IDBPObjectStore } from 'idb' +import { openDB } from 'idb' + +import type { PayloadStore } from './Archivist.ts' + +export function createStore(db: IDBPDatabase, storeName: string, indexes: IndexDescription[], logger?: Logger) { + logger?.log(`Creating store ${storeName}`) + // Create the store + const store = db.createObjectStore(storeName, { + // If it isn't explicitly set, create a value by auto incrementing. + autoIncrement: true, + }) + // Name the store + store.name = storeName + // Create an index on the hash + for (const { + key, multiEntry, unique, + } of indexes) { + const indexKeys = Object.keys(key) + const keys = indexKeys.length === 1 ? indexKeys[0] : indexKeys + const indexName = buildStandardIndexName({ key, unique }) + store.createIndex(indexName, keys, { multiEntry, unique }) + } +} + +export async function getExistingIndexes(db: IDBPDatabase, storeName: string): Promise { + return await useReadOnlyStore(db, storeName, (store) => { + return [...store.indexNames].map((indexName) => { + const index = store.index(indexName) + const key: Record = {} + if (Array.isArray(index.keyPath)) { + for (const keyPath of index.keyPath) { + key[keyPath] = 1 + } + } else { + key[index.keyPath] = 1 + } + const desc: IndexDescription = { + name: indexName, + key, + unique: index.unique, + multiEntry: index.multiEntry, + } + return desc + }) + }) +} + +export async function useDb(dbName: string, callback: (db: IDBPDatabase) => Promise | T): Promise { + const db = await openDB(dbName) + try { + return await callback(db) + } finally { + db.close() + } +} + +export async function useReadOnlyStore( + db: IDBPDatabase, + storeName: string, + callback: (store: IDBPObjectStore) => Promise | T, +): Promise { + const transaction = db.transaction(storeName, 'readonly') + const store = transaction.objectStore(storeName) + try { + return await callback(store) + } finally { + await transaction.done + } +} + +export async function useReadWriteStore( + db: IDBPDatabase, + storeName: string, + callback: (store: IDBPObjectStore) => Promise | T, +): Promise { + const transaction = db.transaction(storeName, 'readwrite') + const store = transaction.objectStore(storeName) + try { + return await callback(store) + } finally { + await transaction.done + } +} diff --git a/packages/modules/packages/archivist/packages/indexeddb/src/spec/Archivist.spec.ts b/packages/modules/packages/archivist/packages/indexeddb/src/spec/Archivist.spec.ts index 842cc87fde..48137b5eb8 100644 --- a/packages/modules/packages/archivist/packages/indexeddb/src/spec/Archivist.spec.ts +++ b/packages/modules/packages/archivist/packages/indexeddb/src/spec/Archivist.spec.ts @@ -348,7 +348,7 @@ describe('IndexedDbArchivist', () => { archivistModule = await IndexedDbArchivist.create({ account, config: { - dbName, schema: IndexedDbArchivistConfigSchema, storeName, consoleLogger: LogLevel.debug, + dbName, schema: IndexedDbArchivistConfigSchema, storeName, }, }) })