From 6f1e3040800d0dcc5ed3f9f7cef16fe41804266a Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Wed, 5 May 2021 11:06:34 -0500 Subject: [PATCH] feat: add recursive delete to Firestore class (#1494) --- dev/src/bulk-writer.ts | 2 +- dev/src/index.ts | 120 +++---- dev/src/recursive-delete.ts | 314 ++++++++++++++++++ dev/src/reference.ts | 40 ++- dev/system-test/firestore.ts | 150 +++++---- dev/test/bulk-writer.ts | 132 ++++---- dev/test/query.ts | 60 +++- dev/test/recursive-delete.ts | 603 +++++++++++++++++++++++++++++++++++ types/firestore.d.ts | 40 +++ 9 files changed, 1253 insertions(+), 208 deletions(-) create mode 100644 dev/src/recursive-delete.ts create mode 100644 dev/test/recursive-delete.ts diff --git a/dev/src/bulk-writer.ts b/dev/src/bulk-writer.ts index 5871af522..72c3b2949 100644 --- a/dev/src/bulk-writer.ts +++ b/dev/src/bulk-writer.ts @@ -765,7 +765,7 @@ export class BulkWriter { * Throws an error if the BulkWriter instance has been closed. * @private */ - private _verifyNotClosed(): void { + _verifyNotClosed(): void { if (this._closing) { throw new Error('BulkWriter has already been closed.'); } diff --git a/dev/src/index.ts b/dev/src/index.ts index c4b6e66b8..8742429a7 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -40,7 +40,7 @@ import { validateResourcePath, } from './path'; import {ClientPool} from './pool'; -import {CollectionReference, Query, QueryOptions} from './reference'; +import {CollectionReference} from './reference'; import {DocumentReference} from './reference'; import {Serializer} from './serializer'; import {Timestamp} from './timestamp'; @@ -76,6 +76,7 @@ const serviceConfig = interfaces['google.firestore.v1.Firestore']; import api = google.firestore.v1; import {CollectionGroup} from './collection-group'; +import {RecursiveDelete} from './recursive-delete'; export { CollectionReference, @@ -141,7 +142,7 @@ const CLOUD_RESOURCE_HEADER = 'google-cloud-resource-prefix'; /*! * The maximum number of times to retry idempotent requests. */ -const MAX_REQUEST_RETRIES = 5; +export const MAX_REQUEST_RETRIES = 5; /*! * The default number of idle GRPC channel to keep. @@ -156,18 +157,6 @@ const DEFAULT_MAX_IDLE_CHANNELS = 1; */ const MAX_CONCURRENT_REQUESTS_PER_CLIENT = 100; -/** - * Datastore allowed numeric IDs where Firestore only allows strings. Numeric - * IDs are exposed to Firestore as __idNUM__, so this is the lowest possible - * negative numeric value expressed in that format. - * - * This constant is used to specify startAt/endAt values when querying for all - * descendants in a single collection. - * - * @private - */ -const REFERENCE_NAME_MIN_ID = '__id-9223372036854775808__'; - /** * Document data (e.g. for use with * [set()]{@link DocumentReference#set}) consisting of fields mapped @@ -399,6 +388,26 @@ export class Firestore implements firestore.Firestore { */ private registeredListenersCount = 0; + /** + * A lazy-loaded BulkWriter instance to be used with recursiveDelete() if no + * BulkWriter instance is provided. + * + * @private + */ + private _bulkWriter: BulkWriter | undefined; + + /** + * Lazy-load the Firestore's default BulkWriter. + * + * @private + */ + private getBulkWriter(): BulkWriter { + if (!this._bulkWriter) { + this._bulkWriter = this.bulkWriter(); + } + return this._bulkWriter; + } + /** * Number of pending operations on the client. * @@ -1200,50 +1209,49 @@ export class Firestore implements firestore.Firestore { } /** - * Retrieves all descendant documents nested under the provided reference. + * Recursively deletes all documents and subcollections at and under the + * specified level. * - * @private - * @return {Stream} Stream of descendant documents. + * If any delete fails, the promise is rejected with an error message + * containing the number of failed deletes and the stack trace of the last + * failed delete. The provided reference is deleted regardless of whether + * all deletes succeeded. + * + * `recursiveDelete()` uses a BulkWriter instance with default settings to + * perform the deletes. To customize throttling rates or add success/error + * callbacks, pass in a custom BulkWriter instance. + * + * @param ref The reference of a document or collection to delete. + * @param bulkWriter A custom BulkWriter instance used to perform the + * deletes. + * @return A promise that resolves when all deletes have been performed. + * The promise is rejected if any of the deletes fail. + * + * @example + * // Recursively delete a reference and log the references of failures. + * const bulkWriter = firestore.bulkWriter(); + * bulkWriter + * .onWriteError((error) => { + * if ( + * error.failedAttempts < MAX_RETRY_ATTEMPTS + * ) { + * return true; + * } else { + * console.log('Failed write at document: ', error.documentRef.path); + * return false; + * } + * }); + * await firestore.recursiveDelete(docRef, bulkWriter); */ - // TODO(chenbrian): Make this a private method after adding recursive delete. - _getAllDescendants( - ref: CollectionReference | DocumentReference - ): NodeJS.ReadableStream { - // The parent is the closest ancestor document to the location we're - // deleting. If we are deleting a document, the parent is the path of that - // document. If we are deleting a collection, the parent is the path of the - // document containing that collection (or the database root, if it is a - // root collection). - let parentPath = ref._resourcePath; - if (ref instanceof CollectionReference) { - parentPath = parentPath.popLast(); - } - const collectionId = - ref instanceof CollectionReference ? ref.id : ref.parent.id; - - let query: Query = new Query( - this, - QueryOptions.forKindlessAllDescendants(parentPath, collectionId) - ); - - // Query for names only to fetch empty snapshots. - query = query.select(FieldPath.documentId()); - - if (ref instanceof CollectionReference) { - // To find all descendants of a collection reference, we need to use a - // composite filter that captures all documents that start with the - // collection prefix. The MIN_KEY constant represents the minimum key in - // this collection, and a null byte + the MIN_KEY represents the minimum - // key is the next possible collection. - const nullChar = String.fromCharCode(0); - const startAt = collectionId + '/' + REFERENCE_NAME_MIN_ID; - const endAt = collectionId + nullChar + '/' + REFERENCE_NAME_MIN_ID; - query = query - .where(FieldPath.documentId(), '>=', startAt) - .where(FieldPath.documentId(), '<', endAt); - } - - return query.stream(); + recursiveDelete( + ref: + | firestore.CollectionReference + | firestore.DocumentReference, + bulkWriter?: BulkWriter + ): Promise { + const writer = bulkWriter ?? this.getBulkWriter(); + const deleter = new RecursiveDelete(this, writer, ref); + return deleter.run(); } /** diff --git a/dev/src/recursive-delete.ts b/dev/src/recursive-delete.ts new file mode 100644 index 000000000..5aeecfd20 --- /dev/null +++ b/dev/src/recursive-delete.ts @@ -0,0 +1,314 @@ +/*! + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import * as firestore from '@google-cloud/firestore'; + +import * as assert from 'assert'; + +import Firestore, { + BulkWriter, + CollectionReference, + DocumentReference, + FieldPath, + Query, + QueryDocumentSnapshot, +} from '.'; +import {Deferred, wrapError} from './util'; +import {GoogleError, Status} from 'google-gax'; +import {BulkWriterError} from './bulk-writer'; +import {QueryOptions} from './reference'; + +/** + * Datastore allowed numeric IDs where Firestore only allows strings. Numeric + * IDs are exposed to Firestore as __idNUM__, so this is the lowest possible + * negative numeric value expressed in that format. + * + * This constant is used to specify startAt/endAt values when querying for all + * descendants in a single collection. + * + * @private + */ +export const REFERENCE_NAME_MIN_ID = '__id-9223372036854775808__'; + +/*! + * The query limit used for recursive deletes when fetching all descendants of + * the specified reference to delete. This is done to prevent the query stream + * from streaming documents faster than Firestore can delete. + */ +// Visible for testing. +export const MAX_PENDING_OPS = 5000; + +/** + * The number of pending BulkWriter operations at which RecursiveDelete + * starts the next limit query to fetch descendants. By starting the query + * while there are pending operations, Firestore can improve BulkWriter + * throughput. This helps prevent BulkWriter from idling while Firestore + * fetches the next query. + */ +const MIN_PENDING_OPS = 1000; + +/** + * Class used to store state required for running a recursive delete operation. + * Each recursive delete call should use a new instance of the class. + * @private + */ +export class RecursiveDelete { + /** + * The number of deletes that failed with a permanent error. + * @private + */ + private errorCount = 0; + + /** + * The most recently thrown error. Used to populate the developer-facing + * error message when the recursive delete operation completes. + * @private + */ + private lastError: GoogleError | BulkWriterError | undefined; + + /** + * Whether there are still documents to delete that still need to be fetched. + * @private + */ + private documentsPending = true; + + /** + * A deferred promise that resolves when the recursive delete operation + * is completed. + * @private + */ + private readonly completionDeferred = new Deferred(); + + /** + * Whether a query stream is currently in progress. Only one stream can be + * run at a time. + * @private + */ + private streamInProgress = false; + + /** + * The last document snapshot returned by the stream. Used to set the + * startAfter() field in the subsequent stream. + * @private + */ + private lastDocumentSnap: QueryDocumentSnapshot | undefined; + + /** + * The number of pending BulkWriter operations. Used to determine when the + * next query can be run. + * @private + */ + private pendingOpsCount = 0; + + private errorStack = ''; + + /** + * + * @param firestore The Firestore instance to use. + * @param writer The BulkWriter instance to use for delete operations. + * @param ref The document or collection reference to recursively delete. + */ + constructor( + private readonly firestore: Firestore, + private readonly writer: BulkWriter, + private readonly ref: + | firestore.CollectionReference + | firestore.DocumentReference + ) {} + + /** + * Recursively deletes the reference provided in the class constructor. + * Returns a promise that resolves when all descendants have been deleted, or + * if an error occurs. + */ + run(): Promise { + assert( + this.documentsPending, + 'The recursive delete operation has already been completed.' + ); + + // Capture the error stack to preserve stack tracing across async calls. + this.errorStack = Error().stack!; + + this.writer._verifyNotClosed(); + this.setupStream(); + return this.completionDeferred.promise; + } + + /** + * Creates a query stream and attaches event handlers to it. + * @private + */ + private setupStream(): void { + const limit = MAX_PENDING_OPS; + const stream = this.getAllDescendants( + this.ref instanceof CollectionReference + ? (this.ref as CollectionReference) + : (this.ref as DocumentReference), + limit + ); + this.streamInProgress = true; + let streamedDocsCount = 0; + stream + .on('error', err => { + err.code = Status.UNAVAILABLE; + err.stack = 'Failed to fetch children documents: ' + err.stack; + this.lastError = err; + this.onQueryEnd(); + }) + .on('data', (snap: QueryDocumentSnapshot) => { + streamedDocsCount++; + this.lastDocumentSnap = snap; + this.deleteRef(snap.ref); + }) + .on('end', () => { + this.streamInProgress = false; + // If there are fewer than the number of documents specified in the + // limit() field, we know that the query is complete. + if (streamedDocsCount < limit) { + this.onQueryEnd(); + } else if (this.pendingOpsCount === 0) { + this.setupStream(); + } + }); + } + + /** + * Retrieves all descendant documents nested under the provided reference. + * @param ref The reference to fetch all descendants for. + * @param limit The number of descendants to fetch in the query. + * @private + * @return {Stream} Stream of descendant documents. + */ + private getAllDescendants( + ref: CollectionReference | DocumentReference, + limit: number + ): NodeJS.ReadableStream { + // The parent is the closest ancestor document to the location we're + // deleting. If we are deleting a document, the parent is the path of that + // document. If we are deleting a collection, the parent is the path of the + // document containing that collection (or the database root, if it is a + // root collection). + let parentPath = ref._resourcePath; + if (ref instanceof CollectionReference) { + parentPath = parentPath.popLast(); + } + const collectionId = + ref instanceof CollectionReference + ? ref.id + : (ref as DocumentReference).parent.id; + + let query: Query = new Query( + this.firestore, + QueryOptions.forKindlessAllDescendants( + parentPath, + collectionId, + /* requireConsistency= */ false + ) + ); + + // Query for names only to fetch empty snapshots. + query = query.select(FieldPath.documentId()).limit(limit); + + if (ref instanceof CollectionReference) { + // To find all descendants of a collection reference, we need to use a + // composite filter that captures all documents that start with the + // collection prefix. The MIN_KEY constant represents the minimum key in + // this collection, and a null byte + the MIN_KEY represents the minimum + // key is the next possible collection. + const nullChar = String.fromCharCode(0); + const startAt = collectionId + '/' + REFERENCE_NAME_MIN_ID; + const endAt = collectionId + nullChar + '/' + REFERENCE_NAME_MIN_ID; + query = query + .where(FieldPath.documentId(), '>=', startAt) + .where(FieldPath.documentId(), '<', endAt); + } + + if (this.lastDocumentSnap) { + query = query.startAfter(this.lastDocumentSnap); + } + + return query.stream(); + } + + /** + * Called when all descendants of the provided reference have been streamed + * or if a permanent error occurs during the stream. Deletes the developer + * provided reference and wraps any errors that occurred. + * @private + */ + private onQueryEnd(): void { + this.documentsPending = false; + if (this.ref instanceof DocumentReference) { + this.writer.delete(this.ref).catch(err => this.incrementErrorCount(err)); + } + this.writer.flush().then(async () => { + if (this.lastError === undefined) { + this.completionDeferred.resolve(); + } else { + let error = new GoogleError( + `${this.errorCount} ` + + `${this.errorCount !== 1 ? 'deletes' : 'delete'} ` + + 'failed. The last delete failed with: ' + ); + if (this.lastError.code !== undefined) { + error.code = (this.lastError.code as number) as Status; + } + error = wrapError(error, this.errorStack); + + // Wrap the BulkWriter error last to provide the full stack trace. + this.completionDeferred.reject( + this.lastError.stack + ? wrapError(error, this.lastError.stack ?? '') + : error + ); + } + }); + } + + /** + * Deletes the provided reference and starts the next stream if conditions + * are met. + * @private + */ + private deleteRef(docRef: DocumentReference): void { + this.pendingOpsCount++; + this.writer + .delete(docRef) + .catch(err => { + this.incrementErrorCount(err); + }) + .then(() => { + this.pendingOpsCount--; + + // We wait until the previous stream has ended in order to sure the + // startAfter document is correct. Starting the next stream while + // there are pending operations allows Firestore to maximize + // BulkWriter throughput. + if ( + this.documentsPending && + !this.streamInProgress && + this.pendingOpsCount < MIN_PENDING_OPS + ) { + this.setupStream(); + } + }); + } + + private incrementErrorCount(err: Error): void { + this.errorCount++; + this.lastError = err; + } +} diff --git a/dev/src/reference.ts b/dev/src/reference.ts index 40953c5c9..06283c4b0 100644 --- a/dev/src/reference.ts +++ b/dev/src/reference.ts @@ -1006,7 +1006,11 @@ export class QueryOptions { readonly projection?: api.StructuredQuery.IProjection, // Whether to select all documents under `parentPath`. By default, only // collections that match `collectionId` are selected. - readonly kindless = false + readonly kindless = false, + // Whether to require consistent documents when restarting the query. By + // default, restarting the query uses the readTime offset of the original + // query to provide consistent results. + readonly requireConsistency = true ) {} /** @@ -1053,7 +1057,8 @@ export class QueryOptions { */ static forKindlessAllDescendants( parent: ResourcePath, - id: string + id: string, + requireConsistency = true ): QueryOptions { let options = new QueryOptions( parent, @@ -1066,6 +1071,7 @@ export class QueryOptions { options = options.with({ kindless: true, + requireConsistency, }); return options; } @@ -1088,7 +1094,8 @@ export class QueryOptions { coalesce(settings.limitType, this.limitType), coalesce(settings.offset, this.offset), coalesce(settings.projection, this.projection), - coalesce(settings.kindless, this.kindless) + coalesce(settings.kindless, this.kindless), + coalesce(settings.requireConsistency, this.requireConsistency) ); } @@ -1133,7 +1140,8 @@ export class QueryOptions { deepEqual(this.startAt, other.startAt) && deepEqual(this.endAt, other.endAt) && deepEqual(this.projection, other.projection) && - this.kindless === other.kindless + this.kindless === other.kindless && + this.requireConsistency === other.requireConsistency ); } } @@ -1515,25 +1523,21 @@ export class Query implements firestore.Query { } const fieldOrders = this._queryOptions.fieldOrders.slice(); - let hasDocumentId = false; + // If no explicit ordering is specified, use the first inequality to + // define an implicit order. if (fieldOrders.length === 0) { - // If no explicit ordering is specified, use the first inequality to - // define an implicit order. for (const fieldFilter of this._queryOptions.fieldFilters) { if (fieldFilter.isInequalityFilter()) { fieldOrders.push(new FieldOrder(fieldFilter.field)); break; } } - } else { - for (const fieldOrder of fieldOrders) { - if (FieldPath.documentId().isEqual(fieldOrder.field)) { - hasDocumentId = true; - } - } } + const hasDocumentId = !!fieldOrders.find(fieldOrder => + FieldPath.documentId().isEqual(fieldOrder.field) + ); if (!hasDocumentId) { // Add implicit sorting by name, using the last specified direction. const lastDirection = @@ -2181,9 +2185,13 @@ export class Query implements firestore.Query { // query cursor. Note that we do not use backoff here. The call to // `requestStream()` will backoff should the restart fail before // delivering any results. - request = this.startAfter(lastReceivedDocument).toProto( - lastReceivedDocument.readTime - ); + if (this._queryOptions.requireConsistency) { + request = this.startAfter(lastReceivedDocument).toProto( + lastReceivedDocument.readTime + ); + } else { + request = this.startAfter(lastReceivedDocument).toProto(); + } } streamActive.resolve(/* active= */ true); } else { diff --git a/dev/system-test/firestore.ts b/dev/system-test/firestore.ts index 6a388b6d8..77922d4cd 100644 --- a/dev/system-test/firestore.ts +++ b/dev/system-test/firestore.ts @@ -2629,69 +2629,105 @@ describe('BulkWriter class', () => { return firestore.terminate(); }); - // TODO(chenbrian): This is a temporary test used to validate that the - // StructuredQuery calls work properly. Remove these tests after adding - // recursive delete tests. - it('finds nested documents and collection', async () => { - // ROOT-DB - // └── randomCol - // ├── anna - // └── bob - // └── parentsCol - // ├── charlie - // └── daniel - // └── childCol - // ├── ernie - // └── francis - const batch = firestore.batch(); - batch.set(randomCol.doc('anna'), {name: 'anna'}); - batch.set(randomCol.doc('bob'), {name: 'bob'}); - batch.set(randomCol.doc('bob/parentsCol/charlie'), {name: 'charlie'}); - batch.set(randomCol.doc('bob/parentsCol/daniel'), {name: 'daniel'}); - batch.set(randomCol.doc('bob/parentsCol/daniel/childCol/ernie'), { - name: 'ernie', - }); - batch.set(randomCol.doc('bob/parentsCol/daniel/childCol/francis'), { - name: 'francis', - }); - await batch.commit(); + describe('recursiveDelete()', () => { + async function countDocumentChildren( + ref: DocumentReference + ): Promise { + let count = 0; + const collections = await ref.listCollections(); + for (const collection of collections) { + count += await countCollectionChildren(collection); + } + return count; + } - const numStreamItems = async ( - stream: NodeJS.ReadableStream - ): Promise => { + async function countCollectionChildren( + ref: CollectionReference + ): Promise { let count = 0; - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const _ of stream) { - ++count; + const docs = await ref.listDocuments(); + for (const doc of docs) { + count += (await countDocumentChildren(doc)) + 1; } return count; - }; + } - // Query all descendants of collections. - let descendantsStream = await firestore._getAllDescendants(randomCol); - expect(await numStreamItems(descendantsStream)).to.equal(6); - descendantsStream = await firestore._getAllDescendants( - randomCol.doc('bob').collection('parentsCol') - ); - expect(await numStreamItems(descendantsStream)).to.equal(4); - descendantsStream = await firestore._getAllDescendants( - randomCol.doc('bob').collection('parentsCol/daniel/childCol') - ); - expect(await numStreamItems(descendantsStream)).to.equal(2); + beforeEach(async () => { + // ROOT-DB + // └── randomCol + // ├── anna + // └── bob + // └── parentsCol + // ├── charlie + // └── daniel + // └── childCol + // ├── ernie + // └── francis + const batch = firestore.batch(); + batch.set(randomCol.doc('anna'), {name: 'anna'}); + batch.set(randomCol.doc('bob'), {name: 'bob'}); + batch.set(randomCol.doc('bob/parentsCol/charlie'), {name: 'charlie'}); + batch.set(randomCol.doc('bob/parentsCol/daniel'), {name: 'daniel'}); + batch.set(randomCol.doc('bob/parentsCol/daniel/childCol/ernie'), { + name: 'ernie', + }); + batch.set(randomCol.doc('bob/parentsCol/daniel/childCol/francis'), { + name: 'francis', + }); + await batch.commit(); + }); - // Query all descendants of documents. - descendantsStream = await firestore._getAllDescendants( - randomCol.doc('bob') - ); - expect(await numStreamItems(descendantsStream)).to.equal(4); - descendantsStream = await firestore._getAllDescendants( - randomCol.doc('bob/parentsCol/daniel') - ); - expect(await numStreamItems(descendantsStream)).to.equal(2); - descendantsStream = await firestore._getAllDescendants( - randomCol.doc('anna') - ); - expect(await numStreamItems(descendantsStream)).to.equal(0); + it('on top-level collection', async () => { + await firestore.recursiveDelete(randomCol); + expect(await countCollectionChildren(randomCol)).to.equal(0); + }); + + it('on nested collection', async () => { + const coll = randomCol.doc('bob').collection('parentsCol'); + await firestore.recursiveDelete(coll); + + expect(await countCollectionChildren(coll)).to.equal(0); + expect(await countCollectionChildren(randomCol)).to.equal(2); + }); + + it('on nested document', async () => { + const doc = randomCol.doc('bob/parentsCol/daniel'); + await firestore.recursiveDelete(doc); + + const docSnap = await doc.get(); + expect(docSnap.exists).to.be.false; + expect(await countDocumentChildren(randomCol.doc('bob'))).to.equal(1); + expect(await countCollectionChildren(randomCol)).to.equal(3); + }); + + it('on leaf document', async () => { + const doc = randomCol.doc('bob/parentsCol/daniel/childCol/ernie'); + await firestore.recursiveDelete(doc); + + const docSnap = await doc.get(); + expect(docSnap.exists).to.be.false; + expect(await countCollectionChildren(randomCol)).to.equal(5); + }); + + it('does not affect other collections', async () => { + // Add other nested collection that shouldn't be deleted. + const collB = firestore.collection('doggos'); + await collB.doc('doggo').set({name: 'goodboi'}); + + await firestore.recursiveDelete(collB); + expect(await countCollectionChildren(randomCol)).to.equal(6); + expect(await countCollectionChildren(collB)).to.equal(0); + }); + + it('with custom BulkWriter instance', async () => { + const bulkWriter = firestore.bulkWriter(); + let callbackCount = 0; + bulkWriter.onWriteResult(() => { + callbackCount++; + }); + await firestore.recursiveDelete(randomCol, bulkWriter); + expect(callbackCount).to.equal(6); + }); }); it('can retry failed writes with a provided callback', async () => { diff --git a/dev/test/bulk-writer.ts b/dev/test/bulk-writer.ts index ad904e1ff..b9c49ef58 100644 --- a/dev/test/bulk-writer.ts +++ b/dev/test/bulk-writer.ts @@ -64,6 +64,73 @@ interface RequestResponse { response: api.IBatchWriteResponse; } +export function createRequest(requests: api.IWrite[]): api.IBatchWriteRequest { + return { + writes: requests, + }; +} + +export function successResponse( + updateTimeSeconds: number +): api.IBatchWriteResponse { + return { + writeResults: [ + { + updateTime: { + nanos: 0, + seconds: updateTimeSeconds, + }, + }, + ], + status: [{code: Status.OK}], + }; +} + +export function failedResponse( + code = Status.DEADLINE_EXCEEDED +): api.IBatchWriteResponse { + return { + writeResults: [ + { + updateTime: null, + }, + ], + status: [{code}], + }; +} + +export function mergeResponses( + responses: api.IBatchWriteResponse[] +): api.IBatchWriteResponse { + return { + writeResults: responses.map(v => v.writeResults![0]), + status: responses.map(v => v.status![0]), + }; +} + +export function setOp(doc: string, value: string): api.IWrite { + return set({ + document: document(doc, 'foo', value), + }).writes![0]; +} + +export function updateOp(doc: string, value: string): api.IWrite { + return update({ + document: document(doc, 'foo', value), + mask: updateMask('foo'), + }).writes![0]; +} + +export function createOp(doc: string, value: string): api.IWrite { + return create({ + document: document(doc, 'foo', value), + }).writes![0]; +} + +export function deleteOp(doc: string): api.IWrite { + return remove(doc).writes![0]; +} + describe('BulkWriter', () => { let firestore: Firestore; let requestCounter: number; @@ -93,71 +160,6 @@ describe('BulkWriter', () => { expect(opCount).to.equal(expected); } - function setOp(doc: string, value: string): api.IWrite { - return set({ - document: document(doc, 'foo', value), - }).writes![0]; - } - - function updateOp(doc: string, value: string): api.IWrite { - return update({ - document: document(doc, 'foo', value), - mask: updateMask('foo'), - }).writes![0]; - } - - function createOp(doc: string, value: string): api.IWrite { - return create({ - document: document(doc, 'foo', value), - }).writes![0]; - } - - function deleteOp(doc: string): api.IWrite { - return remove(doc).writes![0]; - } - - function createRequest(requests: api.IWrite[]): api.IBatchWriteRequest { - return { - writes: requests, - }; - } - - function successResponse(updateTimeSeconds: number): api.IBatchWriteResponse { - return { - writeResults: [ - { - updateTime: { - nanos: 0, - seconds: updateTimeSeconds, - }, - }, - ], - status: [{code: Status.OK}], - }; - } - - function failedResponse( - code = Status.DEADLINE_EXCEEDED - ): api.IBatchWriteResponse { - return { - writeResults: [ - { - updateTime: null, - }, - ], - status: [{code}], - }; - } - - function mergeResponses( - responses: api.IBatchWriteResponse[] - ): api.IBatchWriteResponse { - return { - writeResults: responses.map(v => v.writeResults![0]), - status: responses.map(v => v.status![0]), - }; - } - /** * Creates an instance with the mocked objects. */ diff --git a/dev/test/query.ts b/dev/test/query.ts index 1c7264504..f92a1ac5f 100644 --- a/dev/test/query.ts +++ b/dev/test/query.ts @@ -49,6 +49,7 @@ import { } from './util/helpers'; import api = google.firestore.v1; +import protobuf = google.protobuf; const PROJECT_ID = 'test-project'; const DATABASE_ROOT = `projects/${PROJECT_ID}/databases/(default)`; @@ -202,7 +203,7 @@ export function orderBy( return {orderBy}; } -function limit(n: number): api.IStructuredQuery { +export function limit(n: number): api.IStructuredQuery { return { limit: { value: n, @@ -216,11 +217,14 @@ function offset(n: number): api.IStructuredQuery { }; } -function allDescendants(): api.IStructuredQuery { +export function allDescendants(kindless = false): api.IStructuredQuery { + if (kindless) { + return {from: [{allDescendants: true}]}; + } return {from: [{collectionId: 'collectionId', allDescendants: true}]}; } -function select(...fields: string[]): api.IStructuredQuery { +export function select(...fields: string[]): api.IStructuredQuery { const select: api.StructuredQuery.IProjection = { fields: [], }; @@ -282,27 +286,50 @@ function endAt( return {endAt: cursor}; } -export function queryEquals( +/** + * Returns the timestamp value for the provided readTimes, or the default + * readTime value used in tests if no values are provided. + */ +export function readTime( + seconds?: number, + nanos?: number +): protobuf.ITimestamp { + if (seconds === undefined && nanos === undefined) { + return {seconds: '5', nanos: 6}; + } + return {seconds: String(seconds), nanos: nanos}; +} + +export function queryEqualsWithParent( actual: api.IRunQueryRequest | undefined, + parent: string, ...protoComponents: api.IStructuredQuery[] -) { +): void { expect(actual).to.not.be.undefined; + if (parent !== '') { + parent = '/' + parent; + } + const query: api.IRunQueryRequest = { - parent: DATABASE_ROOT + '/documents', - structuredQuery: { - from: [ - { - collectionId: 'collectionId', - }, - ], - }, + parent: DATABASE_ROOT + '/documents' + parent, + structuredQuery: {}, }; for (const protoComponent of protoComponents) { extend(true, query.structuredQuery, protoComponent); } + // We add the `from` selector here in order to avoid setting collectionId on + // kindless queries. + if (query.structuredQuery!.from === undefined) { + query.structuredQuery!.from = [ + { + collectionId: 'collectionId', + }, + ]; + } + // 'extend' removes undefined fields in the request object. The backend // ignores these fields, but we need to manually strip them before we compare // the expected and the actual request. @@ -310,6 +337,13 @@ export function queryEquals( expect(actual).to.deep.eq(query); } +export function queryEquals( + actual: api.IRunQueryRequest | undefined, + ...protoComponents: api.IStructuredQuery[] +): void { + queryEqualsWithParent(actual, /* parent= */ '', ...protoComponents); +} + function bundledQueryEquals( actual: firestore.IBundledQuery | undefined, limitType: firestore.BundledQuery.LimitType | undefined, diff --git a/dev/test/recursive-delete.ts b/dev/test/recursive-delete.ts new file mode 100644 index 000000000..ad62e28c1 --- /dev/null +++ b/dev/test/recursive-delete.ts @@ -0,0 +1,603 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +import {afterEach, beforeEach, describe, it} from 'mocha'; +import {fail} from 'assert'; +import {expect} from 'chai'; +import {GoogleError, Status} from 'google-gax'; +import sinon = require('sinon'); + +import {google} from '../protos/firestore_v1_proto_api'; + +import * as Firestore from '../src'; +import {setTimeoutHandler} from '../src/backoff'; +import { + ApiOverride, + createInstance, + postConverter, + response, + stream, + verifyInstance, +} from './util/helpers'; +import { + allDescendants, + fieldFilters, + limit, + orderBy, + queryEquals, + queryEqualsWithParent, + result, + select, + startAt as queryStartAt, +} from './query'; +import { + createRequest, + deleteOp, + failedResponse, + mergeResponses, + successResponse, +} from './bulk-writer'; +import {MAX_REQUEST_RETRIES} from '../src'; + +import api = google.firestore.v1; +import {MAX_PENDING_OPS, REFERENCE_NAME_MIN_ID} from '../src/recursive-delete'; + +const PROJECT_ID = 'test-project'; +const DATABASE_ROOT = `projects/${PROJECT_ID}/databases/(default)`; + +describe('recursiveDelete() method:', () => { + // We store errors from batchWrite inside an error object since errors thrown + // in batchWrite do not affect the recursiveDelete promise. + let batchWriteError: Error | undefined; + let firestore: Firestore.Firestore; + + beforeEach(() => { + setTimeoutHandler(setImmediate); + }); + + afterEach(() => { + verifyInstance(firestore); + setTimeoutHandler(setTimeout); + expect(batchWriteError, 'batchWrite should not have errored').to.be + .undefined; + }); + + function instantiateInstance( + childrenDocs: Array, + deleteDocRef = '', + responses?: api.IBatchWriteResponse + ): Promise { + const overrides: ApiOverride = { + runQuery: () => { + return stream(...childrenDocs.map(docId => result(docId))); + }, + batchWrite: request => { + const documents = childrenDocs; + if (deleteDocRef.length > 0) { + documents.push(deleteDocRef); + } + const expected = createRequest(documents.map(docId => deleteOp(docId))); + try { + expect(request.writes).to.deep.equal(expected.writes); + } catch (e) { + batchWriteError = e; + } + const returnedResponse = + responses ?? mergeResponses(documents.map(() => successResponse(1))); + + return response({ + writeResults: returnedResponse.writeResults, + status: returnedResponse.status, + }); + }, + }; + + return createInstance(overrides); + } + + describe('calls getAllDescendants() with correct StructuredQuery', () => { + function startAt(name: string): api.IValue { + return { + referenceValue: + DATABASE_ROOT + '/documents/' + name + '/' + REFERENCE_NAME_MIN_ID, + }; + } + + function endAt(name: string): api.IValue { + return { + referenceValue: + DATABASE_ROOT + + '/documents/' + + name + + String.fromCharCode(0) + + '/' + + REFERENCE_NAME_MIN_ID, + }; + } + + it('for root-level collections', async () => { + const overrides: ApiOverride = { + runQuery: req => { + queryEquals( + req, + select('__name__'), + allDescendants(/* kindless= */ true), + fieldFilters( + '__name__', + 'GREATER_THAN_OR_EQUAL', + startAt('root'), + '__name__', + 'LESS_THAN', + endAt('root') + ), + limit(MAX_PENDING_OPS) + ); + return stream(); + }, + }; + firestore = await createInstance(overrides); + return firestore.recursiveDelete(firestore.collection('root')); + }); + + it('for nested collections', async () => { + const overrides: ApiOverride = { + runQuery: req => { + queryEqualsWithParent( + req, + 'root/doc', + select('__name__'), + allDescendants(/* kindless= */ true), + fieldFilters( + '__name__', + 'GREATER_THAN_OR_EQUAL', + startAt('root/doc/nestedCol'), + '__name__', + 'LESS_THAN', + endAt('root/doc/nestedCol') + ), + limit(MAX_PENDING_OPS) + ); + return stream(); + }, + }; + firestore = await createInstance(overrides); + return firestore.recursiveDelete( + firestore.collection('root/doc/nestedCol') + ); + }); + + it('documents', async () => { + const overrides: ApiOverride = { + runQuery: req => { + queryEqualsWithParent( + req, + 'root/doc', + select('__name__'), + allDescendants(/* kindless= */ true), + limit(MAX_PENDING_OPS) + ); + return stream(); + }, + // Include dummy response for the deleted docRef. + batchWrite: () => response(successResponse(1)), + }; + firestore = await createInstance(overrides); + return firestore.recursiveDelete(firestore.doc('root/doc')); + }); + + it('creates retry query after stream exception with last received doc', async () => { + let callCount = 0; + const overrides: ApiOverride = { + runQuery: request => { + callCount++; + if (callCount === 1) { + return stream(result('doc1'), new Error('failed in test')); + } else { + queryEqualsWithParent( + request, + /* parent= */ '', + select('__name__'), + allDescendants(/* kindless= */ true), + orderBy('__name__', 'ASCENDING'), + queryStartAt(false, { + referenceValue: + `projects/${PROJECT_ID}/databases/(default)/` + + 'documents/collectionId/doc1', + }), + fieldFilters( + '__name__', + 'GREATER_THAN_OR_EQUAL', + startAt('root'), + '__name__', + 'LESS_THAN', + endAt('root') + ), + limit(MAX_PENDING_OPS) + ); + return stream(); + } + }, + batchWrite: () => response(successResponse(1)), + }; + + const firestore = await createInstance(overrides); + await firestore.recursiveDelete(firestore.collection('root')); + }); + + it('creates a second query with the correct startAfter', async () => { + const firstStream = Array.from( + Array(MAX_PENDING_OPS).keys() + ).map((_, i) => result('doc' + i)); + + // Use an array to store that the queryEquals() method succeeded, since + // thrown errors do not result in the recursiveDelete() method failing. + const called: number[] = []; + const overrides: ApiOverride = { + runQuery: request => { + if (called.length === 0) { + queryEquals( + request, + select('__name__'), + allDescendants(/* kindless= */ true), + fieldFilters( + '__name__', + 'GREATER_THAN_OR_EQUAL', + startAt('root'), + '__name__', + 'LESS_THAN', + endAt('root') + ), + limit(MAX_PENDING_OPS) + ); + called.push(1); + return stream(...firstStream); + } else if (called.length === 1) { + queryEquals( + request, + select('__name__'), + allDescendants(/* kindless= */ true), + orderBy('__name__', 'ASCENDING'), + fieldFilters( + '__name__', + 'GREATER_THAN_OR_EQUAL', + startAt('root'), + '__name__', + 'LESS_THAN', + endAt('root') + ), + queryStartAt(false, { + referenceValue: + `projects/${PROJECT_ID}/databases/(default)/` + + 'documents/collectionId/doc' + + (MAX_PENDING_OPS - 1), + }), + limit(MAX_PENDING_OPS) + ); + called.push(2); + return stream(); + } else { + called.push(3); + return stream(); + } + }, + batchWrite: () => { + const responses = mergeResponses( + Array.from(Array(500).keys()).map(() => successResponse(1)) + ); + return response({ + writeResults: responses.writeResults, + status: responses.status, + }); + }, + }; + const firestore = await createInstance(overrides); + + // Use a custom batch size with BulkWriter to simplify the dummy + // batchWrite() response logic. + const bulkWriter = firestore.bulkWriter(); + bulkWriter._maxBatchSize = 500; + await firestore.recursiveDelete(firestore.collection('root'), bulkWriter); + expect(called).to.deep.equal([1, 2]); + }); + }); + + describe('deletes', () => { + it('collection', async () => { + // The four documents are under the 'collectionId' collection, and is + // automatically prefixed by `instantiateInstance()`. + firestore = await instantiateInstance([ + 'anna', + 'bob', + 'bob/children/charlie', + 'bob/children/daniel', + ]); + await firestore.recursiveDelete(firestore.collection('collectionId')); + }); + + it('document along with reference', async () => { + firestore = await instantiateInstance( + ['bob/children/brian', 'bob/children/charlie', 'bob/children/daniel'], + 'bob' + ); + await firestore.recursiveDelete( + firestore.collection('collectionId').doc('bob') + ); + }); + + it('promise is rejected with the last error code if writes fail', async () => { + firestore = await instantiateInstance( + ['bob/children/brian', 'bob/children/charlie', 'bob/children/daniel'], + 'bob', + mergeResponses([ + successResponse(1), + failedResponse(Status.CANCELLED), + failedResponse(Status.PERMISSION_DENIED), + successResponse(1), + ]) + ); + try { + await firestore.recursiveDelete( + firestore.collection('collectionId').doc('bob') + ); + fail('recursiveDelete should have failed'); + } catch (err) { + expect(err.code).to.equal(Status.PERMISSION_DENIED); + expect(err.message).to.contain('2 deletes failed'); + } + }); + + it('promise is rejected if BulkWriter success handler fails', async () => { + firestore = await instantiateInstance(['bob/children/brian'], 'bob'); + + const bulkWriter = firestore.bulkWriter(); + bulkWriter.onWriteResult(() => { + throw new Error('User provided result callback failed'); + }); + + try { + await firestore.recursiveDelete( + firestore.collection('collectionId').doc('bob'), + bulkWriter + ); + fail('recursiveDelete() should have failed'); + } catch (err) { + expect(err.message).to.contain('2 deletes failed'); + expect(err.stack).to.contain('User provided result callback failed'); + } + }); + + it('BulkWriter success handler provides the correct references and results', async () => { + firestore = await instantiateInstance( + ['bob/children/brian', 'bob/children/charlie'], + 'bob', + mergeResponses([ + successResponse(1), + successResponse(2), + successResponse(3), + ]) + ); + const results: number[] = []; + const refs: string[] = []; + const bulkWriter = firestore.bulkWriter(); + bulkWriter.onWriteResult((ref, result) => { + results.push(result.writeTime.seconds); + refs.push(ref.path); + }); + + await firestore.recursiveDelete( + firestore.collection('collectionId').doc('bob'), + bulkWriter + ); + expect(results).to.deep.equal([1, 2, 3]); + expect(refs).to.deep.equal([ + 'collectionId/bob/children/brian', + 'collectionId/bob/children/charlie', + 'collectionId/bob', + ]); + }); + + it('BulkWriter error handler provides the correct information', async () => { + firestore = await instantiateInstance( + ['bob/children/brian', 'bob/children/charlie'], + 'bob', + mergeResponses([ + failedResponse(Status.PERMISSION_DENIED), + failedResponse(Status.UNAVAILABLE), + failedResponse(Status.INTERNAL), + ]) + ); + const codes: Status[] = []; + const refs: string[] = []; + const bulkWriter = firestore.bulkWriter(); + bulkWriter.onWriteError(err => { + codes.push((err.code as unknown) as Status); + refs.push(err.documentRef.path); + return false; + }); + + try { + await firestore.recursiveDelete( + firestore.collection('collectionId').doc('bob'), + bulkWriter + ); + fail('recursiveDelete() should have failed'); + } catch (err) { + expect(codes).to.deep.equal([ + Status.PERMISSION_DENIED, + Status.UNAVAILABLE, + Status.INTERNAL, + ]); + expect(refs).to.deep.equal([ + 'collectionId/bob/children/brian', + 'collectionId/bob/children/charlie', + 'collectionId/bob', + ]); + } + }); + + it('promise is rejected if provided reference was not deleted', async () => { + const overrides: ApiOverride = { + runQuery: () => stream(), + batchWrite: () => { + throw new GoogleError('batchWrite() failed in test'); + }, + }; + firestore = await createInstance(overrides); + try { + await firestore.recursiveDelete(firestore.doc('root/doc')); + } catch (err) { + expect(err.stack).to.contain('batchWrite() failed in test'); + } + }); + + it('retries stream errors', async () => { + let attempts = 0; + const overrides: ApiOverride = { + runQuery: () => { + attempts++; + throw new Error('runQuery() error in test'); + }, + batchWrite: () => response(successResponse(1)), + }; + firestore = await createInstance(overrides); + try { + await firestore.recursiveDelete(firestore.doc('coll/foo')); + fail('recursiveDelete() should have failed'); + } catch (err) { + expect(err.code).to.equal(Status.UNAVAILABLE); + expect(err.stack).to.contain('Failed to fetch children documents'); + expect(err.stack).to.contain('runQuery() error in test'); + expect(attempts).to.equal(MAX_REQUEST_RETRIES); + } + }); + + it('handles successful stream error retries', async () => { + let requestCounter = 0; + const streamItems = [ + [result('a'), result('b'), new Error('runQuery() error in test')], + [new Error('runQuery() error in test')], + [result('c'), new Error('runQuery() error in test')], + [result('d')], + ]; + + const overrides: ApiOverride = { + runQuery: () => { + const streamPromise = stream(...streamItems[requestCounter]); + requestCounter++; + return streamPromise; + }, + batchWrite: request => { + const expected = createRequest([ + deleteOp('a'), + deleteOp('b'), + deleteOp('c'), + deleteOp('d'), + ]); + try { + expect(request.writes).to.deep.equal(expected.writes); + } catch (e) { + batchWriteError = e; + } + return response( + mergeResponses(expected.writes!.map(() => successResponse(1))) + ); + }, + }; + firestore = await createInstance(overrides); + await firestore.recursiveDelete(firestore.collection('letters')); + }); + + it('handles multiple calls to recursiveDelete()', async () => { + let requestCounter = 0; + const docIds = ['a', 'b', 'c']; + const streamItems = docIds.map(docId => [result(docId)]); + const expected = docIds.map(docId => createRequest([deleteOp(docId)])); + const responses = docIds.map(() => successResponse(1)); + + const overrides: ApiOverride = { + runQuery: () => { + return stream(...streamItems[requestCounter]); + }, + batchWrite: request => { + try { + expect(request.writes).to.deep.equal( + expected[requestCounter]!.writes + ); + } catch (e) { + batchWriteError = e; + } + const responsePromise = response(responses[requestCounter]); + requestCounter++; + return responsePromise; + }, + }; + firestore = await createInstance(overrides); + await firestore.recursiveDelete(firestore.collection('a')); + await firestore.recursiveDelete(firestore.collection('b')); + await firestore.recursiveDelete(firestore.collection('c')); + }); + + it('accepts references with converters', async () => { + const overrides: ApiOverride = { + runQuery: () => stream(), + // Include response for deleting the provided document reference. + batchWrite: () => response(successResponse(1)), + }; + firestore = await createInstance(overrides); + await firestore.recursiveDelete( + firestore.doc('root/doc').withConverter(postConverter) + ); + await firestore.recursiveDelete( + firestore.collection('root').withConverter(postConverter) + ); + }); + }); + + describe('BulkWriter instance', () => { + it('uses custom BulkWriter instance if provided', async () => { + firestore = await instantiateInstance(['a', 'b', 'c']); + let callbackCount = 0; + const bulkWriter = firestore.bulkWriter(); + bulkWriter.onWriteResult(() => { + callbackCount++; + }); + await firestore.recursiveDelete(firestore.collection('foo'), bulkWriter); + expect(callbackCount).to.equal(3); + }); + + it('default: uses the same BulkWriter instance across calls', async () => { + const overrides: ApiOverride = { + runQuery: () => stream(), + }; + firestore = await createInstance(overrides); + const spy = sinon.spy(firestore, 'bulkWriter'); + + await firestore.recursiveDelete(firestore.collection('foo')); + await firestore.recursiveDelete(firestore.collection('boo')); + await firestore.recursiveDelete(firestore.collection('moo')); + + // Only the first recursiveDelete() call should have called the + // constructor. Subsequent calls should have used the same bulkWriter. + expect(spy.callCount).to.equal(1); + }); + + it('throws error if BulkWriter instance is closed', async () => { + firestore = await createInstance(); + const bulkWriter = firestore.bulkWriter(); + await bulkWriter.close(); + await expect(() => () => + firestore.recursiveDelete(firestore.collection('foo'), bulkWriter) + ).to.throw; + }); + }); +}); diff --git a/types/firestore.d.ts b/types/firestore.d.ts index 91ec3cc7a..8763562b2 100644 --- a/types/firestore.d.ts +++ b/types/firestore.d.ts @@ -240,6 +240,46 @@ declare namespace FirebaseFirestore { > ): Promise>>; + /** + * Recursively deletes all documents and subcollections at and under the + * specified level. + * + * If any delete fails, the promise is rejected with an error message + * containing the number of failed deletes and the stack trace of the last + * failed delete. The provided reference is deleted regardless of whether + * all deletes succeeded. + * + * `recursiveDelete()` uses a BulkWriter instance with default settings to + * perform the deletes. To customize throttling rates or add success/error + * callbacks, pass in a custom BulkWriter instance. + * + * @param ref The reference of a document or collection to delete. + * @param bulkWriter A custom BulkWriter instance used to perform the + * deletes. + * @return A promise that resolves when all deletes have been performed. + * The promise is rejected if any of the deletes fail. + * + * @example + * // Recursively delete a reference and log the references of failures. + * const bulkWriter = firestore.bulkWriter(); + * bulkWriter + * .onWriteError((error) => { + * if ( + * error.failedAttempts < MAX_RETRY_ATTEMPTS + * ) { + * return true; + * } else { + * console.log('Failed write at document: ', error.documentRef.path); + * return false; + * } + * }); + * await firestore.recursiveDelete(docRef, bulkWriter); + */ + recursiveDelete( + ref: CollectionReference | DocumentReference, + bulkWriter?: BulkWriter + ): Promise; + /** * Terminates the Firestore client and closes all open streams. *