From f5d92da81cefe2edbf504c187d87bd9dde357005 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Wed, 7 Feb 2024 21:08:05 +0100 Subject: [PATCH 1/2] fix: send data during graceful close. (#73) When a stream is closed gracefully, it's status goes from `'open'` to `'closing'` then to either `'closed'`, `'aborted'` or `'reset'`. While it's `'closing'` we should still try to send any queued data, this can be aborted by calling `.abort` on the stream or by the signal passed to `.close` firing the `'abort'` event. This change makes the tests added in https://github.com/libp2p/js-libp2p/pull/2398 pass. --- src/stream.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/stream.ts b/src/stream.ts index 3297673..79bb444 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -98,12 +98,14 @@ export class YamuxStream extends AbstractStream { while (buf.byteLength !== 0) { // wait for the send window to refill if (this.sendWindowCapacity === 0) { + this.log?.trace('wait for send window capacity', this.status) await this.waitForSendWindowCapacity(options) - } - // check we didn't close while waiting for send window capacity - if (this.status !== 'open') { - return + // check we didn't close while waiting for send window capacity + if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') { + this.log?.trace('%s while waiting for send window capacity', this.status) + return + } } // send as much as we can From f77b9ec7118720d23073bbfcd96504c0a179f4b2 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Thu, 8 Feb 2024 09:00:07 +0100 Subject: [PATCH 2/2] fix: send data during graceful close (#75) #73 was merged on red, this fixes the build --- package.json | 18 +++++++++--------- src/stream.ts | 4 ++-- test/stream.spec.ts | 7 +++++-- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/package.json b/package.json index 9de8003..19535a0 100644 --- a/package.json +++ b/package.json @@ -172,21 +172,21 @@ "docs": "aegir docs" }, "dependencies": { - "@libp2p/interface": "^1.0.0", - "@libp2p/utils": "^5.0.0", + "@libp2p/interface": "^1.1.3", + "@libp2p/utils": "^5.2.5", "get-iterator": "^2.0.1", - "it-foreach": "^2.0.3", + "it-foreach": "^2.0.6", "it-pipe": "^3.0.1", - "it-pushable": "^3.2.0", - "uint8arraylist": "^2.4.3" + "it-pushable": "^3.2.3", + "uint8arraylist": "^2.4.8" }, "devDependencies": { "@dapplion/benchmark": "^0.2.4", - "@libp2p/interface-compliance-tests": "^5.0.0", - "@libp2p/logger": "^4.0.0", - "@libp2p/mplex": "^10.0.0", + "@libp2p/interface-compliance-tests": "^5.3.1", + "@libp2p/logger": "^4.0.6", + "@libp2p/mplex": "^10.0.15", "aegir": "^41.1.10", - "it-drain": "^3.0.2", + "it-drain": "^3.0.5", "it-pair": "^2.0.6", "it-stream-types": "^2.0.1" } diff --git a/src/stream.ts b/src/stream.ts index 79bb444..3ebfa8f 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -98,7 +98,7 @@ export class YamuxStream extends AbstractStream { while (buf.byteLength !== 0) { // wait for the send window to refill if (this.sendWindowCapacity === 0) { - this.log?.trace('wait for send window capacity', this.status) + this.log?.trace('wait for send window capacity, status %s', this.status) await this.waitForSendWindowCapacity(options) // check we didn't close while waiting for send window capacity @@ -172,7 +172,7 @@ export class YamuxStream extends AbstractStream { let resolve: () => void let reject: (err: Error) => void const abort = (): void => { - if (this.status === 'open') { + if (this.status === 'open' || this.status === 'closing') { reject(new CodeError('stream aborted', ERR_STREAM_ABORT)) } else { // the stream was closed already, ignore the failure to send diff --git a/test/stream.spec.ts b/test/stream.spec.ts index 3670ac4..2acf27f 100644 --- a/test/stream.spec.ts +++ b/test/stream.spec.ts @@ -244,8 +244,11 @@ describe('stream', () => { await sleep(10) - // the client should close gracefully even though it was waiting to send more data - await client.close() + // the client should fail to close gracefully because there is unsent data + // that will never be sent + await client.close({ + signal: AbortSignal.timeout(10) + }) p.end() await sendPipe