Skip to content

Commit

Permalink
feat: cancel readable streams as effectively as possible
Browse files Browse the repository at this point in the history
  • Loading branch information
wheresrhys committed Jul 25, 2024
1 parent c3548a5 commit aa3b899
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 20 deletions.
7 changes: 4 additions & 3 deletions packages/core/src/Router.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,16 @@ export default class Router {
'The operation was aborted.',
'AbortError',
);
reject(error);

if (request?.body && request.body instanceof ReadableStream) {
request.body.cancel(error);
const requestBody = request?.body || options?.body;
if (requestBody instanceof ReadableStream) {
requestBody.cancel(error);
}

if (callLog?.response?.body) {
callLog.response.body.cancel(error);
}
reject(error);
};
if (callLog.signal.aborted) {
abort();
Expand Down
73 changes: 56 additions & 17 deletions packages/core/src/__tests__/FetchMock/response-negotiation.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { beforeEach, describe, expect, it } from 'vitest';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import fetchMock from '../../FetchMock';

describe('response negotiation', () => {
Expand Down Expand Up @@ -161,36 +161,33 @@ describe('response negotiation', () => {
});

describe('abortable fetch', () => {
const RESPONSE_DELAY = 50;
const ABORT_DELAY = 10;

const getDelayedAbortController = (delay) => {
const controller = new AbortController();
setTimeout(() => controller.abort(), delay);
return controller;
};

it('error on signal abort', async () => {
fm.route('*', 200, { delay: RESPONSE_DELAY });
fm.route('*', 200, { delay: 50 });
await expect(
fm.fetchHandler('http://a.com', {
signal: getDelayedAbortController(ABORT_DELAY).signal,
signal: getDelayedAbortController(10).signal,
}),
).rejects.toThrowError(
new DOMException('The operation was aborted.', 'ABortError'),
new DOMException('The operation was aborted.', 'AbortError'),
);
});

it('error on signal abort for request object', async () => {
fm.route('*', 200, { delay: RESPONSE_DELAY });
fm.route('*', 200, { delay: 50 });
await expect(
fm.fetchHandler(
new fm.config.Request('http://a.com', {
signal: getDelayedAbortController(ABORT_DELAY).signal,
signal: getDelayedAbortController(10).signal,
}),
),
).rejects.toThrowError(
new DOMException('The operation was aborted.', 'ABortError'),
new DOMException('The operation was aborted.', 'AbortError'),
);
});

Expand All @@ -203,33 +200,75 @@ describe('response negotiation', () => {
signal: controller.signal,
}),
).rejects.toThrowError(
new DOMException('The operation was aborted.', 'ABortError'),
new DOMException('The operation was aborted.', 'AbortError'),
);
});

it('aborts sending request options body stream', async () => {
fm.route('*', 200, { delay: 50 });
const body = new ReadableStream();
vi.spyOn(body, 'cancel');
await expect(
fm.fetchHandler('http://a.com', {
method: 'post',
body,
signal: getDelayedAbortController(10).signal,
}),
).rejects.toThrowError(
new DOMException('The operation was aborted.', 'AbortError'),
);
expect(body.cancel).toHaveBeenCalledWith(
new DOMException('The operation was aborted.', 'AbortError'),
);
});

// this doesn't work as the callLog creatde from the request awaits the body
it.skip('aborts sending request body stream', async () => {
fm.route('*', 200, { delay: 50 });
const body = new ReadableStream();
vi.spyOn(body, 'cancel');
const request = new Request('http://a.com', {
method: 'post',
body,
duplex: 'half',
signal: getDelayedAbortController(10).signal,
});
await expect(fm.fetchHandler(request)).rejects.toThrowError(
new DOMException('The operation was aborted.', 'AbortError'),
);
expect(body.cancel).toHaveBeenCalledWith(
new DOMException('The operation was aborted.', 'AbortError'),
);
});

it.skip('aborts receiving response body stream', async () => {
// so fiddly to implement a test for this. Uses the same mechanism as cancelling request body though
// so I trust that if one works the other does
});

it('go into `done` state even when aborted', async () => {
fm.once('http://a.com', 200, { delay: RESPONSE_DELAY });
fm.once('http://a.com', 200, { delay: 50 });

await expect(
fm.fetchHandler('http://a.com', {
signal: getDelayedAbortController(ABORT_DELAY).signal,
signal: getDelayedAbortController(10).signal,
}),
).rejects.toThrowError(
new DOMException('The operation was aborted.', 'ABortError'),
new DOMException('The operation was aborted.', 'AbortError'),
);

expect(fm.callHistory.done()).toBe(true);
});

it('will flush even when aborted', async () => {
fm.route('http://a.com', 200, { delay: RESPONSE_DELAY });
fm.route('http://a.com', 200, { delay: 50 });

await expect(
fm.fetchHandler('http://a.com', {
signal: getDelayedAbortController(ABORT_DELAY).signal,
signal: getDelayedAbortController(10).signal,
}),
).rejects.toThrowError(
new DOMException('The operation was aborted.', 'ABortError'),
new DOMException('The operation was aborted.', 'AbortError'),
);
await fm.callHistory.flush();
expect(fm.callHistory.done()).toBe(true);
Expand Down

0 comments on commit aa3b899

Please sign in to comment.