Skip to content

Commit

Permalink
fix: lower batch size on BulkWriter retry (#1549)
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Chen authored Jun 30, 2021
1 parent 8d97847 commit 26d480b
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 8 deletions.
39 changes: 35 additions & 4 deletions dev/src/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ import api = google.firestore.v1;
*/
const MAX_BATCH_SIZE = 20;

/*!
* The maximum number of writes can be can in a single batch that is being retried.
*/
export const RETRY_MAX_BATCH_SIZE = 10;

/*!
* The starting maximum number of operations per second as allowed by the
* 500/50/5 rule.
Expand Down Expand Up @@ -213,6 +218,13 @@ class BulkCommitBatch extends WriteBatch {
// been resolved.
readonly pendingOps: Array<BulkWriterOperation> = [];

readonly maxBatchSize: number;

constructor(firestore: Firestore, maxBatchSize: number) {
super(firestore);
this.maxBatchSize = maxBatchSize;
}

has(documentRef: firestore.DocumentReference<unknown>): boolean {
return this.docPaths.has(documentRef.path);
}
Expand Down Expand Up @@ -333,14 +345,17 @@ export class BulkWriter {
* Visible for testing.
* @private
*/
_maxBatchSize = MAX_BATCH_SIZE;
private _maxBatchSize = MAX_BATCH_SIZE;

/**
* The batch that is currently used to schedule operations. Once this batch
* reaches maximum capacity, a new batch is created.
* @private
*/
private _bulkCommitBatch = new BulkCommitBatch(this.firestore);
private _bulkCommitBatch = new BulkCommitBatch(
this.firestore,
this._maxBatchSize
);

/**
* A pointer to the tail of all active BulkWriter operations. This pointer
Expand Down Expand Up @@ -384,6 +399,16 @@ export class BulkWriter {
return this._bufferedOperations.length;
}

// Visible for testing.
_setMaxBatchSize(size: number): void {
assert(
this._bulkCommitBatch.pendingOps.length === 0,
'BulkCommitBatch should be empty'
);
this._maxBatchSize = size;
this._bulkCommitBatch = new BulkCommitBatch(this.firestore, size);
}

/**
* The maximum number of pending operations that can be enqueued onto this
* BulkWriter instance. Once the this number of writes have been enqueued,
Expand Down Expand Up @@ -840,7 +865,6 @@ export class BulkWriter {
if (this._bulkCommitBatch._opCount === 0) return;

const pendingBatch = this._bulkCommitBatch;
this._bulkCommitBatch = new BulkCommitBatch(this.firestore);

// Use the write with the longest backoff duration when determining backoff.
const highestBackoffDuration = pendingBatch.pendingOps.reduce((prev, cur) =>
Expand All @@ -849,6 +873,13 @@ export class BulkWriter {
const backoffMsWithJitter = BulkWriter._applyJitter(highestBackoffDuration);
const delayedExecution = new Deferred<void>();

// A backoff duration greater than 0 implies that this batch is a retry.
// Retried writes are sent with a batch size of 10 in order to guarantee
// that the batch is under the 10MiB limit.
const maxBatchSize =
highestBackoffDuration > 0 ? RETRY_MAX_BATCH_SIZE : this._maxBatchSize;
this._bulkCommitBatch = new BulkCommitBatch(this.firestore, maxBatchSize);

if (backoffMsWithJitter > 0) {
delayExecution(() => delayedExecution.resolve(), backoffMsWithJitter);
} else {
Expand Down Expand Up @@ -988,7 +1019,7 @@ export class BulkWriter {
enqueueOnBatchCallback(this._bulkCommitBatch);
this._bulkCommitBatch.processLastOperation(op);

if (this._bulkCommitBatch._opCount === this._maxBatchSize) {
if (this._bulkCommitBatch._opCount === this._bulkCommitBatch.maxBatchSize) {
this._scheduleCurrentBatch();
} else if (op.flushed) {
// If flush() was called before this operation was enqueued into a batch,
Expand Down
47 changes: 44 additions & 3 deletions dev/test/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
DEFAULT_INITIAL_OPS_PER_SECOND_LIMIT,
DEFAULT_JITTER_FACTOR,
DEFAULT_MAXIMUM_OPS_PER_SECOND_LIMIT,
RETRY_MAX_BATCH_SIZE,
} from '../src/bulk-writer';
import {
ApiOverride,
Expand Down Expand Up @@ -576,7 +577,7 @@ describe('BulkWriter', () => {
},
]);
bulkWriter._setMaxPendingOpCount(6);
bulkWriter._maxBatchSize = 3;
bulkWriter._setMaxBatchSize(3);
bulkWriter
.set(firestore.doc('collectionId/doc1'), {foo: 'bar'})
.then(incrementOpCount);
Expand Down Expand Up @@ -822,6 +823,46 @@ describe('BulkWriter', () => {
expect(timeoutHandlerCounter).to.equal(3);
});

it('retries with smaller batch size', async () => {
const nLengthArray = (n: number): number[] => Array.from(Array(n).keys());

const bulkWriter = await instantiateInstance([
{
request: createRequest(
nLengthArray(15).map((_, i) => setOp('doc' + i, 'bar'))
),
response: mergeResponses(
nLengthArray(15).map(() => failedResponse(Status.ABORTED))
),
},
{
request: createRequest(
nLengthArray(RETRY_MAX_BATCH_SIZE).map((_, i) =>
setOp('doc' + i, 'bar')
)
),
response: mergeResponses(
nLengthArray(RETRY_MAX_BATCH_SIZE).map(() => successResponse(1))
),
},
{
request: createRequest(
nLengthArray(15 - RETRY_MAX_BATCH_SIZE).map((_, i) =>
setOp('doc' + i + RETRY_MAX_BATCH_SIZE, 'bar')
)
),
response: mergeResponses(
nLengthArray(15 - RETRY_MAX_BATCH_SIZE).map(() => successResponse(1))
),
},
]);
for (let i = 0; i < 15; i++) {
bulkWriter.set(firestore.doc('collectionId/doc' + i), {foo: 'bar'});
}

await bulkWriter.close();
});

it('retries maintain correct write resolution ordering', async () => {
const bulkWriter = await instantiateInstance([
{
Expand Down Expand Up @@ -910,7 +951,7 @@ describe('BulkWriter', () => {
},
]);

bulkWriter._maxBatchSize = 2;
bulkWriter._setMaxBatchSize(2);
for (let i = 0; i < 6; i++) {
bulkWriter
.set(firestore.doc('collectionId/doc' + i), {foo: 'bar'})
Expand Down Expand Up @@ -942,7 +983,7 @@ describe('BulkWriter', () => {
},
]);

bulkWriter._maxBatchSize = 3;
bulkWriter._setMaxBatchSize(3);
const promise1 = bulkWriter
.set(firestore.doc('collectionId/doc1'), {foo: 'bar'})
.then(incrementOpCount);
Expand Down
2 changes: 1 addition & 1 deletion dev/test/recursive-delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ describe('recursiveDelete() method:', () => {
const firestore = await createInstance(overrides);

const bulkWriter = firestore.bulkWriter();
bulkWriter._maxBatchSize = maxBatchSize;
bulkWriter._setMaxBatchSize(maxBatchSize);
await firestore._recursiveDelete(
firestore.collection('root'),
maxPendingOps,
Expand Down

0 comments on commit 26d480b

Please sign in to comment.