diff --git a/pubsub.ts b/pubsub.ts index bad2812d..2f231c80 100644 --- a/pubsub.ts +++ b/pubsub.ts @@ -87,7 +87,16 @@ class RedisSubscriptionImpl implements RedisSubscription { let forceReconnect = false; while (this.isConnected) { try { - const rep = (await readArrayReply(this.connection.reader!)) as string[]; + let rep: string[]; + try { + rep = (await readArrayReply(this.connection.reader)) as string[]; + } catch (err) { + if (err instanceof Deno.errors.BadResource) { // Connection already closed. + this.connection.close(); + break; + } + throw err; + } const ev = rep[0]; if (ev === "message" && rep.length === 3) { diff --git a/tests/pubsub_test.ts b/tests/pubsub_test.ts index 8baee93d..fc95ffc8 100644 --- a/tests/pubsub_test.ts +++ b/tests/pubsub_test.ts @@ -128,4 +128,19 @@ suite.test("testSubscribe4", async () => { stopRedis(tempServer); }); +suite.test( + "SubscriptionShouldNotThrowBadResourceErrorWhenConnectionIsClosed (#89)", + async () => { + const redis = await newClient(opts); + const sub = await redis.subscribe("test"); + const subscriptionPromise = (async () => { + // deno-lint-ignore no-empty + for await (const _ of sub.receive()) {} + })(); + redis.close(); + await subscriptionPromise; + assert(sub.isClosed); + }, +); + suite.runTests();