diff --git a/README.md b/README.md index 2a3502d..233fcfa 100644 --- a/README.md +++ b/README.md @@ -141,6 +141,17 @@ Type: `Hypercore` Add a hypercore to the indexer. Must have the same value encoding as other hypercores already in the indexer. +### indexer.removeCoreAndUnlinkIndexStorage(core) + +#### core + +_Required_\ +Type: `Hypercore` + +Remove a core from being indexed and unlink its storage. To be clear, this destroys the index's storage and doesn't touch the Hypercore's storage. + +If the core is not being indexed (or was previously removed), this is a no-op. + ### indexer.close() Stop the indexer and flush index state to storage. This will not close the diff --git a/index.js b/index.js index d928100..32e1377 100644 --- a/index.js +++ b/index.js @@ -29,6 +29,8 @@ const MOVING_AVG_FACTOR = 5 class MultiCoreIndexer extends TypedEmitter { #indexStream #writeStream + /** @type {Map, CoreIndexStream>} */ + #coreIndexStreamsByCore = new Map() #batch /** @type {import('./lib/types').IndexStateCurrent} */ #state = 'indexing' @@ -53,9 +55,9 @@ class MultiCoreIndexer extends TypedEmitter { constructor(cores, { batch, maxBatch = DEFAULT_BATCH_SIZE, storage }) { super() this.#createStorage = MultiCoreIndexer.defaultStorage(storage) - const coreIndexStreams = cores.map((core) => { - return new CoreIndexStream(core, this.#createStorage) - }) + const coreIndexStreams = cores.map((core) => + this.#createCoreIndexStream(core) + ) this.#indexStream = new MultiCoreIndexStream(coreIndexStreams, { highWaterMark: maxBatch, }) @@ -92,10 +94,25 @@ class MultiCoreIndexer extends TypedEmitter { * @param {import('hypercore')} core */ addCore(core) { - const coreIndexStream = new CoreIndexStream(core, this.#createStorage) + const coreIndexStream = this.#createCoreIndexStream(core) this.#indexStream.addStream(coreIndexStream) } + /** + * Remove a core from being indexed and unlink its storage. To be clear, this destroys the index's storage and doesn't touch the Hypercore's storage. + * + * If the core is not being indexed (or was previously removed), this is a no-op. + * + * @param {import('hypercore')} core + * @returns {Promise} + */ + async removeCoreAndUnlinkIndexStorage(core) { + const coreIndexStream = this.#coreIndexStreamsByCore.get(core) + if (!coreIndexStream) return + await this.#indexStream.removeStreamAndUnlinkStorage(coreIndexStream) + this.#coreIndexStreamsByCore.delete(core) + } + /** * Resolves when indexing state is 'idle' */ @@ -118,6 +135,16 @@ class MultiCoreIndexer extends TypedEmitter { ]) } + /** + * @param {import('hypercore')} core + * @returns {CoreIndexStream} + */ + #createCoreIndexStream(core) { + const coreIndexStream = new CoreIndexStream(core, this.#createStorage) + this.#coreIndexStreamsByCore.set(core, coreIndexStream) + return coreIndexStream + } + /** @param {Entry[]} entries */ async #handleEntries(entries) { this.#emitState() diff --git a/lib/core-index-stream.js b/lib/core-index-stream.js index 6ef9615..d68ddff 100644 --- a/lib/core-index-stream.js +++ b/lib/core-index-stream.js @@ -105,6 +105,27 @@ class CoreIndexStream extends Readable { this.#inProgressBitfield?.set(index, false) } + /** + * @returns {Promise} + */ + unlinkStorage() { + return new Promise((resolve, reject) => { + const storage = this.#storage + + if (storage) { + storage.unlink((err) => { + if (err) { + reject(err) + } else { + resolve() + } + }) + } else { + resolve() + } + }) + } + async #destroy() { this.#core.removeListener('append', this.#handleAppendBound) this.#core.removeListener('download', this.#handleDownloadBound) diff --git a/lib/multi-core-index-stream.js b/lib/multi-core-index-stream.js index 51fd0cd..59e19b4 100644 --- a/lib/multi-core-index-stream.js +++ b/lib/multi-core-index-stream.js @@ -29,8 +29,6 @@ class MultiCoreIndexStream extends Readable { #readable = new Set() #pending = pDefer() #destroying = false - // We cache drained state here rather than reading all streams every time - #drained /** * @@ -45,7 +43,6 @@ class MultiCoreIndexStream extends Readable { highWaterMark: opts.highWaterMark || 16, byteLength: () => 1, }) - this.#drained = streams.length === 0 this.#handleIndexingBound = this.#handleIndexing.bind(this) this.#handleDrainedBound = this.#handleDrained.bind(this) for (const s of streams) { @@ -62,7 +59,10 @@ class MultiCoreIndexStream extends Readable { } get drained() { - return this.#drained + for (const stream of this.#streams.keys()) { + if (!stream.drained) return false + } + return true } /** @@ -84,13 +84,14 @@ class MultiCoreIndexStream extends Readable { */ addStream(stream) { if (this.#streams.has(stream)) return - this.#drained = false // Do this so that we can remove this listener when we destroy the stream const handleReadableFn = this.#handleReadable.bind(this, stream) this.#streams.set(stream, handleReadableFn) stream.core .ready() .then(() => { + // This can happen if the stream was removed between the call to `.ready()` and the time it resolved. + if (!this.#streams.has(stream)) return const coreKey = stream.core.key /* istanbul ignore next: this is set after ready */ if (!coreKey) return @@ -103,6 +104,45 @@ class MultiCoreIndexStream extends Readable { stream.on('drained', this.#handleDrainedBound) } + /** + * Remove a stream and unlink its storage. If the stream is not found, this is a no-op. + * + * @param {CoreIndexStream} stream + * @returns {Promise} + */ + async removeStreamAndUnlinkStorage(stream) { + const handleReadableFn = this.#streams.get(stream) + if (!handleReadableFn) return + + const wasDrained = this.drained + await this.#removeStream(stream, handleReadableFn) + await stream.unlinkStorage() + if (!wasDrained && this.drained) this.emit('drained') + } + + /** + * @param {CoreIndexStream} stream + * @param {() => void} handleReadableFn + * @returns {Promise} + */ + async #removeStream(stream, handleReadableFn) { + // It's important to delete these before returning any control to + // the event loop, otherwise multiple calls to this method could + // cause a stream to be destroyed multiple times. + this.#readable.delete(stream) + this.#streams.delete(stream) + const coreKeyString = stream.core.key?.toString('hex') + if (coreKeyString) this.#streamsById.delete(coreKeyString) + + stream.off('readable', handleReadableFn) + stream.off('indexing', this.#handleIndexingBound) + stream.off('drained', this.#handleDrainedBound) + + const closePromise = once(stream, 'close') + stream.destroy() + await closePromise + } + /** @param {any} cb */ _open(cb) { cb() @@ -124,15 +164,11 @@ class MultiCoreIndexStream extends Readable { } async #destroy() { - const closePromises = [] + const removePromises = [] for (const [stream, handleReadableFn] of this.#streams) { - stream.off('readable', handleReadableFn) - stream.off('indexing', this.#handleIndexingBound) - stream.off('drained', this.#handleDrainedBound) - stream.destroy() - closePromises.push(once(stream, 'close')) + removePromises.push(this.#removeStream(stream, handleReadableFn)) } - await Promise.all(closePromises) + await Promise.all(removePromises) } async #read() { @@ -163,24 +199,21 @@ class MultiCoreIndexStream extends Readable { this.#pending.resolve() } - // Whenever a source stream emits an indexing event, bubble it up so that the - // `indexing` event always fires at the start of indexing in the chain of - // streams (the `drained` event should happen at the end of the chain once - // everything is read) #handleIndexing() { - if (!this.#drained) return - this.#drained = false - this.emit('indexing') + let indexingCount = 0 + for (const stream of this.#streams.keys()) { + if (!stream.drained) indexingCount++ + // We only care if there's exactly 1, so we can break early as an optimization. + if (indexingCount >= 2) break + } + const isFirstIndexing = indexingCount === 1 + + if (isFirstIndexing) this.emit('indexing') } #handleDrained() { - let drained = true - for (const stream of this.#streams.keys()) { - if (!stream.drained) drained = false - } - if (drained === this.#drained && !drained) return - this.#drained = drained - this.emit('drained') + const allDrained = this.drained + if (allDrained) this.emit('drained') } } diff --git a/test/multi-core-indexer.test.js b/test/multi-core-indexer.test.js index 62696dc..6c7381d 100644 --- a/test/multi-core-indexer.test.js +++ b/test/multi-core-indexer.test.js @@ -155,6 +155,180 @@ test('Indexes cores added with addCore method', async (t) => { t.pass('Indexer closed') }) +test('removing cores', async (t) => { + t.test('removing a core that was never added', async (t) => { + const ignoredCores = await createMultiple(5) + const coreToRemove = await create() + + const indexer = new MultiCoreIndexer(ignoredCores, { + batch: async () => {}, + storage: () => new ram(), + }) + await indexer.idle() + + indexer.removeCoreAndUnlinkIndexStorage(coreToRemove) + t.equal( + indexer.state.current, + 'idle', + 'Immediately idle after "removing" a core that was never there' + ) + + await indexer.close() + t.pass('Indexer closed') + }) + + t.test('double-removing a core', async (t) => { + const cores = await createMultiple(5) + const [coreToRemove] = cores + + const indexer = new MultiCoreIndexer(cores, { + batch: async () => {}, + storage: () => new ram(), + }) + await indexer.idle() + + indexer.removeCoreAndUnlinkIndexStorage(coreToRemove) + indexer.removeCoreAndUnlinkIndexStorage(coreToRemove) + t.equal( + indexer.state.current, + 'idle', + 'Immediately idle after double-removing a core' + ) + + await indexer.close() + t.pass('Indexer closed') + }) + + t.test('before anything is appended', async (t) => { + const cores = await createMultiple(5) + const [coreToRemove] = cores + /** @type {Entry[]} */ + const entries = [] + const indexer = new MultiCoreIndexer(cores, { + batch: async (data) => { + entries.push(...data) + }, + storage: () => new ram(), + }) + indexer.removeCoreAndUnlinkIndexStorage(coreToRemove) + const expected = (await generateFixtures(cores, 100)).filter( + (entry) => !entry.key.equals(coreToRemove.key) + ) + await indexer.idle() + t.same(sortEntries(entries), sortEntries(expected)) + await indexer.close() + t.pass('Indexer closed') + }) + + t.test('after some appends', async (t) => { + const cores = await createMultiple(5) + const [coreToRemove] = cores + + /** @type {Entry[]} */ + const entries = [] + const indexer = new MultiCoreIndexer(cores, { + batch: async (data) => { + entries.push(...data) + }, + storage: () => new ram(), + }) + const expected1 = await generateFixtures(cores, 100) + await indexer.idle() + + indexer.removeCoreAndUnlinkIndexStorage(coreToRemove) + await indexer.idle() + const expected2 = (await generateFixtures(cores, 100)).filter( + (entry) => !entry.key.equals(coreToRemove.key) + ) + await indexer.idle() + t.same(sortEntries(entries), sortEntries([...expected1, ...expected2])) + await indexer.close() + t.pass('Indexer closed') + }) + + t.test('re-indexes a core that was re-added', async (t) => { + const core = await create() + /** @type {Entry[]} */ + const entries = [] + const indexer = new MultiCoreIndexer([core], { + batch: async (data) => { + entries.push(...data) + }, + storage: () => new ram(), + }) + const expectedFirstBatch = await generateFixtures([core], 100) + await indexer.idle() + + await indexer.removeCoreAndUnlinkIndexStorage(core) + const expectedSecondBatch = await generateFixtures([core], 100) + await indexer.idle() + t.same(sortEntries(entries), sortEntries(expectedFirstBatch)) + + indexer.addCore(core) + await indexer.idle() + t.same( + sortEntries(entries), + sortEntries([ + ...expectedFirstBatch, + ...expectedFirstBatch, + ...expectedSecondBatch, + ]) + ) + + await indexer.close() + t.pass('Indexer closed') + }) + + t.test('removing non-ready cores', async (t) => { + const core = new Hypercore(() => new ram()) + + let entriesSeen = 0 + const indexer = new MultiCoreIndexer([core], { + batch: async (data) => { + entriesSeen += data.length + }, + storage: () => new ram(), + }) + + t.ok(!core.key, 'Core should not have a key. Test is not set up correctly') + + await indexer.removeCoreAndUnlinkIndexStorage(core) + + await generateFixtures([core], 100) + + t.equal(entriesSeen, 0, 'No entries should have been seen') + + await indexer.close() + t.pass('Indexer closed') + }) + + t.test("failure to remove a core's storage", async (t) => { + class StorageThatCannotBeUnlinked extends ram { + _unlink(req) { + req.callback(new Error('Failed to unlink!'), null) + } + } + + const core = new Hypercore(() => new ram()) + + const indexer = new MultiCoreIndexer([core], { + batch: async () => {}, + storage: () => new StorageThatCannotBeUnlinked(), + }) + + await core.ready() + await indexer.idle() + + await t.rejects( + indexer.removeCoreAndUnlinkIndexStorage(core), + 'Failed to unlink!' + ) + + await indexer.close() + t.pass('Indexer closed') + }) +}) + test('index sparse hypercores', async (t) => { const coreCount = 5 const localCores = await createMultiple(coreCount)