From aa3b89989bd223e788db895b03c4fabc56f061d2 Mon Sep 17 00:00:00 2001 From: Rhys Evans Date: Thu, 25 Jul 2024 13:06:01 +0100 Subject: [PATCH] feat: cancel readable streams as effectively as possible --- packages/core/src/Router.js | 7 +- .../FetchMock/response-negotiation.test.js | 73 ++++++++++++++----- 2 files changed, 60 insertions(+), 20 deletions(-) diff --git a/packages/core/src/Router.js b/packages/core/src/Router.js index 08654ef2..ab220837 100644 --- a/packages/core/src/Router.js +++ b/packages/core/src/Router.js @@ -161,15 +161,16 @@ export default class Router { 'The operation was aborted.', 'AbortError', ); - reject(error); - if (request?.body && request.body instanceof ReadableStream) { - request.body.cancel(error); + const requestBody = request?.body || options?.body; + if (requestBody instanceof ReadableStream) { + requestBody.cancel(error); } if (callLog?.response?.body) { callLog.response.body.cancel(error); } + reject(error); }; if (callLog.signal.aborted) { abort(); diff --git a/packages/core/src/__tests__/FetchMock/response-negotiation.test.js b/packages/core/src/__tests__/FetchMock/response-negotiation.test.js index 27810f36..0fe7605e 100644 --- a/packages/core/src/__tests__/FetchMock/response-negotiation.test.js +++ b/packages/core/src/__tests__/FetchMock/response-negotiation.test.js @@ -1,4 +1,4 @@ -import { beforeEach, describe, expect, it } from 'vitest'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; import fetchMock from '../../FetchMock'; describe('response negotiation', () => { @@ -161,9 +161,6 @@ describe('response negotiation', () => { }); describe('abortable fetch', () => { - const RESPONSE_DELAY = 50; - const ABORT_DELAY = 10; - const getDelayedAbortController = (delay) => { const controller = new AbortController(); setTimeout(() => controller.abort(), delay); @@ -171,26 +168,26 @@ describe('response negotiation', () => { }; it('error on signal abort', async () => { - fm.route('*', 200, { delay: RESPONSE_DELAY }); + fm.route('*', 200, { delay: 50 }); await expect( fm.fetchHandler('http://a.com', { - signal: getDelayedAbortController(ABORT_DELAY).signal, + signal: getDelayedAbortController(10).signal, }), ).rejects.toThrowError( - new DOMException('The operation was aborted.', 'ABortError'), + new DOMException('The operation was aborted.', 'AbortError'), ); }); it('error on signal abort for request object', async () => { - fm.route('*', 200, { delay: RESPONSE_DELAY }); + fm.route('*', 200, { delay: 50 }); await expect( fm.fetchHandler( new fm.config.Request('http://a.com', { - signal: getDelayedAbortController(ABORT_DELAY).signal, + signal: getDelayedAbortController(10).signal, }), ), ).rejects.toThrowError( - new DOMException('The operation was aborted.', 'ABortError'), + new DOMException('The operation was aborted.', 'AbortError'), ); }); @@ -203,33 +200,75 @@ describe('response negotiation', () => { signal: controller.signal, }), ).rejects.toThrowError( - new DOMException('The operation was aborted.', 'ABortError'), + new DOMException('The operation was aborted.', 'AbortError'), + ); + }); + + it('aborts sending request options body stream', async () => { + fm.route('*', 200, { delay: 50 }); + const body = new ReadableStream(); + vi.spyOn(body, 'cancel'); + await expect( + fm.fetchHandler('http://a.com', { + method: 'post', + body, + signal: getDelayedAbortController(10).signal, + }), + ).rejects.toThrowError( + new DOMException('The operation was aborted.', 'AbortError'), ); + expect(body.cancel).toHaveBeenCalledWith( + new DOMException('The operation was aborted.', 'AbortError'), + ); + }); + + // this doesn't work as the callLog creatde from the request awaits the body + it.skip('aborts sending request body stream', async () => { + fm.route('*', 200, { delay: 50 }); + const body = new ReadableStream(); + vi.spyOn(body, 'cancel'); + const request = new Request('http://a.com', { + method: 'post', + body, + duplex: 'half', + signal: getDelayedAbortController(10).signal, + }); + await expect(fm.fetchHandler(request)).rejects.toThrowError( + new DOMException('The operation was aborted.', 'AbortError'), + ); + expect(body.cancel).toHaveBeenCalledWith( + new DOMException('The operation was aborted.', 'AbortError'), + ); + }); + + it.skip('aborts receiving response body stream', async () => { + // so fiddly to implement a test for this. Uses the same mechanism as cancelling request body though + // so I trust that if one works the other does }); it('go into `done` state even when aborted', async () => { - fm.once('http://a.com', 200, { delay: RESPONSE_DELAY }); + fm.once('http://a.com', 200, { delay: 50 }); await expect( fm.fetchHandler('http://a.com', { - signal: getDelayedAbortController(ABORT_DELAY).signal, + signal: getDelayedAbortController(10).signal, }), ).rejects.toThrowError( - new DOMException('The operation was aborted.', 'ABortError'), + new DOMException('The operation was aborted.', 'AbortError'), ); expect(fm.callHistory.done()).toBe(true); }); it('will flush even when aborted', async () => { - fm.route('http://a.com', 200, { delay: RESPONSE_DELAY }); + fm.route('http://a.com', 200, { delay: 50 }); await expect( fm.fetchHandler('http://a.com', { - signal: getDelayedAbortController(ABORT_DELAY).signal, + signal: getDelayedAbortController(10).signal, }), ).rejects.toThrowError( - new DOMException('The operation was aborted.', 'ABortError'), + new DOMException('The operation was aborted.', 'AbortError'), ); await fm.callHistory.flush(); expect(fm.callHistory.done()).toBe(true);