From 47556c0f2c9da48ef8b40cb2fb03431a9c6ae5c9 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Wed, 11 Sep 2024 13:46:55 +0100 Subject: [PATCH] fix!: upgrade to libp2p@2.x.x (#84) Incorporates changes necessary to upgrade to libp2p@2.x.x BREAKING CHANGE: requires libp2p@2.x.x --- package.json | 10 +++---- src/config.ts | 18 ++++++------ src/constants.ts | 32 +++++--------------- src/decode.ts | 7 ++--- src/errors.ts | 71 +++++++++++++++++++++++++++++++++++++++++++++ src/muxer.ts | 38 ++++++++++++------------ src/stream.ts | 9 +++--- test/codec.util.ts | 5 ++-- test/decode.spec.ts | 3 +- test/muxer.spec.ts | 3 +- test/stream.spec.ts | 3 +- 11 files changed, 125 insertions(+), 74 deletions(-) create mode 100644 src/errors.ts diff --git a/package.json b/package.json index 0e570b3..037cb90 100644 --- a/package.json +++ b/package.json @@ -172,8 +172,8 @@ "docs": "aegir docs" }, "dependencies": { - "@libp2p/interface": "^1.5.0", - "@libp2p/utils": "^5.2.5", + "@libp2p/interface": "^2.0.0", + "@libp2p/utils": "^6.0.0", "get-iterator": "^2.0.1", "it-foreach": "^2.0.6", "it-pushable": "^3.2.3", @@ -182,9 +182,9 @@ }, "devDependencies": { "@dapplion/benchmark": "^0.2.4", - "@libp2p/interface-compliance-tests": "^5.3.1", - "@libp2p/logger": "^4.0.6", - "@libp2p/mplex": "^10.0.15", + "@libp2p/interface-compliance-tests": "^6.0.0", + "@libp2p/logger": "^5.0.0", + "@libp2p/mplex": "^11.0.0", "aegir": "^44.1.1", "it-drain": "^3.0.5", "it-pair": "^2.0.6", diff --git a/src/config.ts b/src/config.ts index 62fd092..2cd9c54 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,5 +1,5 @@ -import { CodeError } from '@libp2p/interface' -import { ERR_INVALID_CONFIG, INITIAL_STREAM_WINDOW, MAX_STREAM_WINDOW } from './constants.js' +import { InvalidParametersError } from '@libp2p/interface' +import { INITIAL_STREAM_WINDOW, MAX_STREAM_WINDOW } from './constants.js' // TOOD use config items or delete them export interface Config { @@ -58,24 +58,24 @@ export const defaultConfig: Config = { export function verifyConfig (config: Config): void { if (config.keepAliveInterval <= 0) { - throw new CodeError('keep-alive interval must be positive', ERR_INVALID_CONFIG) + throw new InvalidParametersError('keep-alive interval must be positive') } if (config.maxInboundStreams < 0) { - throw new CodeError('max inbound streams must be larger or equal 0', ERR_INVALID_CONFIG) + throw new InvalidParametersError('max inbound streams must be larger or equal 0') } if (config.maxOutboundStreams < 0) { - throw new CodeError('max outbound streams must be larger or equal 0', ERR_INVALID_CONFIG) + throw new InvalidParametersError('max outbound streams must be larger or equal 0') } if (config.initialStreamWindowSize < INITIAL_STREAM_WINDOW) { - throw new CodeError('InitialStreamWindowSize must be larger or equal 256 kB', ERR_INVALID_CONFIG) + throw new InvalidParametersError('InitialStreamWindowSize must be larger or equal 256 kB') } if (config.maxStreamWindowSize < config.initialStreamWindowSize) { - throw new CodeError('MaxStreamWindowSize must be larger than the InitialStreamWindowSize', ERR_INVALID_CONFIG) + throw new InvalidParametersError('MaxStreamWindowSize must be larger than the InitialStreamWindowSize') } if (config.maxStreamWindowSize > 2 ** 32 - 1) { - throw new CodeError('MaxStreamWindowSize must be less than equal MAX_UINT32', ERR_INVALID_CONFIG) + throw new InvalidParametersError('MaxStreamWindowSize must be less than equal MAX_UINT32') } if (config.maxMessageSize < 1024) { - throw new CodeError('MaxMessageSize must be greater than a kilobyte', ERR_INVALID_CONFIG) + throw new InvalidParametersError('MaxMessageSize must be greater than a kilobyte') } } diff --git a/src/constants.ts b/src/constants.ts index d288300..546b38f 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -1,33 +1,17 @@ // Protocol violation errors -export const ERR_INVALID_FRAME = 'ERR_INVALID_FRAME' -export const ERR_UNREQUESTED_PING = 'ERR_UNREQUESTED_PING' -export const ERR_NOT_MATCHING_PING = 'ERR_NOT_MATCHING_PING' -export const ERR_STREAM_ALREADY_EXISTS = 'ERR_STREAM_ALREADY_EXISTS' -export const ERR_DECODE_INVALID_VERSION = 'ERR_DECODE_INVALID_VERSION' -export const ERR_BOTH_CLIENTS = 'ERR_BOTH_CLIENTS' -export const ERR_RECV_WINDOW_EXCEEDED = 'ERR_RECV_WINDOW_EXCEEDED' +import { BothClientsError, DecodeInvalidVersionError, InvalidFrameError, NotMatchingPingError, ReceiveWindowExceededError, StreamAlreadyExistsError, UnrequestedPingError } from './errors.js' export const PROTOCOL_ERRORS = new Set([ - ERR_INVALID_FRAME, - ERR_UNREQUESTED_PING, - ERR_NOT_MATCHING_PING, - ERR_STREAM_ALREADY_EXISTS, - ERR_DECODE_INVALID_VERSION, - ERR_BOTH_CLIENTS, - ERR_RECV_WINDOW_EXCEEDED + InvalidFrameError.name, + UnrequestedPingError.name, + NotMatchingPingError.name, + StreamAlreadyExistsError.name, + DecodeInvalidVersionError.name, + BothClientsError.name, + ReceiveWindowExceededError.name ]) -// local errors - -export const ERR_INVALID_CONFIG = 'ERR_INVALID_CONFIG' -export const ERR_MUXER_LOCAL_CLOSED = 'ERR_MUXER_LOCAL_CLOSED' -export const ERR_MUXER_REMOTE_CLOSED = 'ERR_MUXER_REMOTE_CLOSED' -export const ERR_STREAM_RESET = 'ERR_STREAM_RESET' -export const ERR_STREAM_ABORT = 'ERR_STREAM_ABORT' -export const ERR_MAX_OUTBOUND_STREAMS_EXCEEDED = 'ERROR_MAX_OUTBOUND_STREAMS_EXCEEDED' -export const ERR_DECODE_IN_PROGRESS = 'ERR_DECODE_IN_PROGRESS' - /** * INITIAL_STREAM_WINDOW is the initial stream window size. * diff --git a/src/decode.ts b/src/decode.ts index 56440b9..dccb75d 100644 --- a/src/decode.ts +++ b/src/decode.ts @@ -1,6 +1,5 @@ -import { CodeError } from '@libp2p/interface' import { Uint8ArrayList } from 'uint8arraylist' -import { ERR_DECODE_INVALID_VERSION, ERR_DECODE_IN_PROGRESS } from './constants.js' +import { InvalidFrameError, InvalidStateError } from './errors.js' import { type FrameHeader, FrameType, HEADER_LENGTH, YAMUX_VERSION } from './frame.js' import type { Source } from 'it-stream-types' @@ -15,7 +14,7 @@ const twoPow24 = 2 ** 24 */ export function decodeHeader (data: Uint8Array): FrameHeader { if (data[0] !== YAMUX_VERSION) { - throw new CodeError('Invalid frame version', ERR_DECODE_INVALID_VERSION) + throw new InvalidFrameError('Invalid frame version') } return { type: data[1], @@ -87,7 +86,7 @@ export class Decoder { // Sanity check to ensure a header isn't read when another frame is partially decoded // In practice this shouldn't happen if (this.frameInProgress) { - throw new CodeError('decoding frame already in progress', ERR_DECODE_IN_PROGRESS) + throw new InvalidStateError('decoding frame already in progress') } if (this.buffer.length < HEADER_LENGTH) { diff --git a/src/errors.ts b/src/errors.ts new file mode 100644 index 0000000..acf0bbd --- /dev/null +++ b/src/errors.ts @@ -0,0 +1,71 @@ +export class InvalidFrameError extends Error { + static name = 'InvalidFrameError' + + constructor (message = 'The frame was invalid') { + super(message) + this.name = 'InvalidFrameError' + } +} + +export class UnrequestedPingError extends Error { + static name = 'UnrequestedPingError' + + constructor (message = 'Unrequested ping error') { + super(message) + this.name = 'UnrequestedPingError' + } +} + +export class NotMatchingPingError extends Error { + static name = 'NotMatchingPingError' + + constructor (message = 'Unrequested ping error') { + super(message) + this.name = 'NotMatchingPingError' + } +} + +export class InvalidStateError extends Error { + static name = 'InvalidStateError' + + constructor (message = 'Invalid state') { + super(message) + this.name = 'InvalidStateError' + } +} + +export class StreamAlreadyExistsError extends Error { + static name = 'StreamAlreadyExistsError' + + constructor (message = 'Strean already exists') { + super(message) + this.name = 'StreamAlreadyExistsError' + } +} + +export class DecodeInvalidVersionError extends Error { + static name = 'DecodeInvalidVersionError' + + constructor (message = 'Decode invalid version') { + super(message) + this.name = 'DecodeInvalidVersionError' + } +} + +export class BothClientsError extends Error { + static name = 'BothClientsError' + + constructor (message = 'Both clients') { + super(message) + this.name = 'BothClientsError' + } +} + +export class ReceiveWindowExceededError extends Error { + static name = 'ReceiveWindowExceededError' + + constructor (message = 'Receive window exceeded') { + super(message) + this.name = 'ReceiveWindowExceededError' + } +} diff --git a/src/muxer.ts b/src/muxer.ts index 944ce08..adb7ecb 100644 --- a/src/muxer.ts +++ b/src/muxer.ts @@ -1,11 +1,12 @@ -import { CodeError, serviceCapabilities, setMaxListeners } from '@libp2p/interface' +import { InvalidParametersError, MuxerClosedError, TooManyOutboundProtocolStreamsError, serviceCapabilities, setMaxListeners } from '@libp2p/interface' import { getIterator } from 'get-iterator' import { pushable, type Pushable } from 'it-pushable' import { Uint8ArrayList } from 'uint8arraylist' import { type Config, defaultConfig, verifyConfig } from './config.js' -import { ERR_BOTH_CLIENTS, ERR_INVALID_FRAME, ERR_MAX_OUTBOUND_STREAMS_EXCEEDED, ERR_MUXER_LOCAL_CLOSED, ERR_MUXER_REMOTE_CLOSED, ERR_NOT_MATCHING_PING, ERR_STREAM_ALREADY_EXISTS, ERR_UNREQUESTED_PING, PROTOCOL_ERRORS } from './constants.js' +import { PROTOCOL_ERRORS } from './constants.js' import { Decoder } from './decode.js' import { encodeHeader } from './encode.js' +import { InvalidFrameError, NotMatchingPingError, UnrequestedPingError } from './errors.js' import { Flag, type FrameHeader, FrameType, GoAwayCode } from './frame.js' import { StreamState, YamuxStream } from './stream.js' import type { YamuxMuxerComponents } from './index.js' @@ -139,10 +140,9 @@ export class YamuxMuxer implements StreamMuxer { } reason = GoAwayCode.NormalTermination - } catch (err: unknown) { + } catch (err: any) { // either a protocol or internal error - const errCode = (err as { code: string }).code - if (PROTOCOL_ERRORS.has(errCode)) { + if (PROTOCOL_ERRORS.has(err.name)) { this.log?.error('protocol error in sink', err) reason = GoAwayCode.ProtocolError } else { @@ -187,10 +187,10 @@ export class YamuxMuxer implements StreamMuxer { newStream (name?: string | undefined): YamuxStream { if (this.remoteGoAway !== undefined) { - throw new CodeError('muxer closed remotely', ERR_MUXER_REMOTE_CLOSED) + throw new MuxerClosedError('Muxer closed remotely') } if (this.localGoAway !== undefined) { - throw new CodeError('muxer closed locally', ERR_MUXER_LOCAL_CLOSED) + throw new MuxerClosedError('Muxer closed locally') } const id = this.nextStreamID @@ -198,7 +198,7 @@ export class YamuxMuxer implements StreamMuxer { // check against our configured maximum number of outbound streams if (this.numOutboundStreams >= this.config.maxOutboundStreams) { - throw new CodeError('max outbound streams exceeded', ERR_MAX_OUTBOUND_STREAMS_EXCEEDED) + throw new TooManyOutboundProtocolStreamsError('max outbound streams exceeded') } this.log?.trace('new outgoing stream id=%s', id) @@ -224,10 +224,10 @@ export class YamuxMuxer implements StreamMuxer { */ async ping (): Promise { if (this.remoteGoAway !== undefined) { - throw new CodeError('muxer closed remotely', ERR_MUXER_REMOTE_CLOSED) + throw new MuxerClosedError('Muxer closed remotely') } if (this.localGoAway !== undefined) { - throw new CodeError('muxer closed locally', ERR_MUXER_LOCAL_CLOSED) + throw new MuxerClosedError('Muxer closed locally') } // An active ping does not yet exist, handle the process here @@ -239,7 +239,7 @@ export class YamuxMuxer implements StreamMuxer { // this promise awaits resolution or the close controller aborting promise: new Promise((resolve, reject) => { const closed = (): void => { - reject(new CodeError('muxer closed locally', ERR_MUXER_LOCAL_CLOSED)) + reject(new MuxerClosedError('Muxer closed locally')) } this.closeController.signal.addEventListener('abort', closed, { once: true }) _resolve = (): void => { @@ -357,7 +357,7 @@ export class YamuxMuxer implements StreamMuxer { /** Create a new stream */ private _newStream (id: number, name: string | undefined, state: StreamState, direction: 'inbound' | 'outbound'): YamuxStream { if (this._streams.get(id) != null) { - throw new CodeError('Stream already exists', ERR_STREAM_ALREADY_EXISTS, { id }) + throw new InvalidParametersError('Stream already exists with that id') } const stream = new YamuxStream({ @@ -428,7 +428,7 @@ export class YamuxMuxer implements StreamMuxer { { this.handleGoAway(length); return } default: // Invalid state - throw new CodeError('Invalid frame type', ERR_INVALID_FRAME, { header }) + throw new InvalidFrameError('Invalid frame type') } } else { switch (header.type) { @@ -437,7 +437,7 @@ export class YamuxMuxer implements StreamMuxer { { await this.handleStreamMessage(header, readData); return } default: // Invalid state - throw new CodeError('Invalid frame type', ERR_INVALID_FRAME, { header }) + throw new InvalidFrameError('Invalid frame type') } } } @@ -452,18 +452,18 @@ export class YamuxMuxer implements StreamMuxer { this.handlePingResponse(header.length) } else { // Invalid state - throw new CodeError('Invalid frame flag', ERR_INVALID_FRAME, { header }) + throw new InvalidFrameError('Invalid frame flag') } } private handlePingResponse (pingId: number): void { if (this.activePing === undefined) { // this ping was not requested - throw new CodeError('ping not requested', ERR_UNREQUESTED_PING) + throw new UnrequestedPingError('ping not requested') } if (this.activePing.id !== pingId) { // this ping doesn't match our active ping request - throw new CodeError('ping doesn\'t match our id', ERR_NOT_MATCHING_PING) + throw new NotMatchingPingError('ping doesn\'t match our id') } // valid ping response @@ -522,7 +522,7 @@ export class YamuxMuxer implements StreamMuxer { private incomingStream (id: number): void { if (this.client !== (id % 2 === 0)) { - throw new CodeError('both endpoints are clients', ERR_BOTH_CLIENTS) + throw new InvalidParametersError('Both endpoints are clients') } if (this._streams.has(id)) { return @@ -565,7 +565,7 @@ export class YamuxMuxer implements StreamMuxer { this.log?.trace('sending frame %o', header) if (header.type === FrameType.Data) { if (data === undefined) { - throw new CodeError('invalid frame', ERR_INVALID_FRAME) + throw new InvalidFrameError('Invalid frame') } this.source.push( new Uint8ArrayList(encodeHeader(header), data) diff --git a/src/stream.ts b/src/stream.ts index 3ebfa8f..7e17922 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,7 +1,8 @@ -import { CodeError } from '@libp2p/interface' +import { AbortError } from '@libp2p/interface' import { AbstractStream, type AbstractStreamInit } from '@libp2p/utils/abstract-stream' import each from 'it-foreach' -import { ERR_RECV_WINDOW_EXCEEDED, ERR_STREAM_ABORT, INITIAL_STREAM_WINDOW } from './constants.js' +import { INITIAL_STREAM_WINDOW } from './constants.js' +import { ReceiveWindowExceededError } from './errors.js' import { Flag, type FrameHeader, FrameType, HEADER_LENGTH } from './frame.js' import type { Config } from './config.js' import type { AbortOptions } from '@libp2p/interface' @@ -173,7 +174,7 @@ export class YamuxStream extends AbstractStream { let reject: (err: Error) => void const abort = (): void => { if (this.status === 'open' || this.status === 'closing') { - reject(new CodeError('stream aborted', ERR_STREAM_ABORT)) + reject(new AbortError('Stream aborted')) } else { // the stream was closed already, ignore the failure to send resolve() @@ -219,7 +220,7 @@ export class YamuxStream extends AbstractStream { // check that our recv window is not exceeded if (this.recvWindowCapacity < header.length) { - throw new CodeError('receive window exceeded', ERR_RECV_WINDOW_EXCEEDED, { available: this.recvWindowCapacity, recv: header.length }) + throw new ReceiveWindowExceededError('Receive window exceeded') } const data = await readData() diff --git a/test/codec.util.ts b/test/codec.util.ts index fffe5f7..9f40b7c 100644 --- a/test/codec.util.ts +++ b/test/codec.util.ts @@ -1,5 +1,4 @@ -import { CodeError } from '@libp2p/interface' -import { ERR_DECODE_INVALID_VERSION } from '../src/constants.js' +import { InvalidFrameError } from '../src/errors.js' import { type FrameHeader, HEADER_LENGTH, YAMUX_VERSION } from '../src/frame.js' // Slower encode / decode functions that use dataview @@ -8,7 +7,7 @@ export function decodeHeaderNaive (data: Uint8Array): FrameHeader { const view = new DataView(data.buffer, data.byteOffset, data.byteLength) if (view.getUint8(0) !== YAMUX_VERSION) { - throw new CodeError('Invalid frame version', ERR_DECODE_INVALID_VERSION) + throw new InvalidFrameError('Invalid frame version') } return { type: view.getUint8(1), diff --git a/test/decode.spec.ts b/test/decode.spec.ts index e9c799e..89b4926 100644 --- a/test/decode.spec.ts +++ b/test/decode.spec.ts @@ -1,7 +1,6 @@ /* eslint-disable @typescript-eslint/dot-notation */ import { expect } from 'aegir/chai' import { type Pushable, pushable } from 'it-pushable' -import { ERR_DECODE_IN_PROGRESS } from '../src/constants.js' import { Decoder } from '../src/decode.js' import { encodeHeader } from '../src/encode.js' import { Flag, type FrameHeader, FrameType, GoAwayCode } from '../src/frame.js' @@ -344,7 +343,7 @@ describe('Decoder', () => { } expect.fail('decoding another frame before the first is finished should error') } catch (e) { - expect((e as { code: string }).code).to.equal(ERR_DECODE_IN_PROGRESS) + expect(e).to.have.property('name', 'InvalidStateError') } }) }) diff --git a/test/muxer.spec.ts b/test/muxer.spec.ts index 4ba2164..7071a8f 100644 --- a/test/muxer.spec.ts +++ b/test/muxer.spec.ts @@ -4,7 +4,6 @@ import { expect } from 'aegir/chai' import { duplexPair } from 'it-pair/duplex' import { pipe } from 'it-pipe' import { type Uint8ArrayList } from 'uint8arraylist' -import { ERR_MUXER_LOCAL_CLOSED } from '../src/constants.js' import { sleep, testClientServer, testYamuxMuxer, type YamuxFixture } from './util.js' describe('muxer', () => { @@ -104,7 +103,7 @@ describe('muxer', () => { expect(() => { client.newStream() - }).to.throw().with.property('code', ERR_MUXER_LOCAL_CLOSED, 'should not be able to open a stream after close') + }).to.throw().with.property('name', 'MuxerClosedError', 'should not be able to open a stream after close') }) it('test keep alive', async () => { diff --git a/test/stream.spec.ts b/test/stream.spec.ts index 2acf27f..8f118e7 100644 --- a/test/stream.spec.ts +++ b/test/stream.spec.ts @@ -4,7 +4,6 @@ import { expect } from 'aegir/chai' import { pipe } from 'it-pipe' import { type Pushable, pushable } from 'it-pushable' import { defaultConfig } from '../src/config.js' -import { ERR_RECV_WINDOW_EXCEEDED } from '../src/constants.js' import { GoAwayCode } from '../src/frame.js' import { StreamState } from '../src/stream.js' import { sleep, testClientServer, type YamuxFixture } from './util.js' @@ -220,7 +219,7 @@ describe('stream', () => { try { await Promise.all([sendPipe, recvPipe]) } catch (e) { - expect((e as { code: string }).code).to.equal(ERR_RECV_WINDOW_EXCEEDED) + expect(e).to.have.property('name', 'ReceiveWindowExceededError') } expect(client).to.have.property('remoteGoAway', GoAwayCode.ProtocolError)