From 67bdc6fe6d76709f080709ed5b90b20e5a060a07 Mon Sep 17 00:00:00 2001 From: Ty Book Date: Tue, 15 Oct 2024 13:11:15 -0400 Subject: [PATCH 1/5] feat(shuttle): Parametrize hub connection timeouts for HubSubscriber and MessageReconciliation --- .changeset/tricky-badgers-tan.md | 5 +++++ packages/shuttle/src/shuttle/hubSubscriber.ts | 7 +++++-- packages/shuttle/src/shuttle/messageReconciliation.ts | 6 ++++-- 3 files changed, 14 insertions(+), 4 deletions(-) create mode 100644 .changeset/tricky-badgers-tan.md diff --git a/.changeset/tricky-badgers-tan.md b/.changeset/tricky-badgers-tan.md new file mode 100644 index 0000000000..371d749714 --- /dev/null +++ b/.changeset/tricky-badgers-tan.md @@ -0,0 +1,5 @@ +--- +"@farcaster/shuttle": patch +--- + +feat(shuttle): Parametrize hub connection timeouts for HubSubscriber and MessageReconciliation diff --git a/packages/shuttle/src/shuttle/hubSubscriber.ts b/packages/shuttle/src/shuttle/hubSubscriber.ts index db0950cd24..a22804db57 100644 --- a/packages/shuttle/src/shuttle/hubSubscriber.ts +++ b/packages/shuttle/src/shuttle/hubSubscriber.ts @@ -55,6 +55,7 @@ export class BaseHubSubscriber extends HubSubscriber { private stream: ClientReadableStream | null = null; private totalShards: number | undefined; private shardIndex: number | undefined; + private connectionTimeout: number; // milliseconds constructor( label: string, @@ -63,6 +64,7 @@ export class BaseHubSubscriber extends HubSubscriber { eventTypes?: HubEventType[], totalShards?: number, shardIndex?: number, + connectionTimeout = 30000, ) { super(); this.label = label; @@ -71,6 +73,7 @@ export class BaseHubSubscriber extends HubSubscriber { this.totalShards = totalShards; this.shardIndex = shardIndex; this.eventTypes = eventTypes || DEFAULT_EVENT_TYPES; + this.connectionTimeout = connectionTimeout; } public override stop() { @@ -156,13 +159,13 @@ export class BaseHubSubscriber extends HubSubscriber { // Do not allow hanging unresponsive connections to linger: let cancel = setTimeout(() => { this.destroy(); - }, 30000); + }, this.connectionTimeout); for await (const event of stream) { await this.processHubEvent(event); clearTimeout(cancel); cancel = setTimeout(() => { this.destroy(); - }, 30000); + }, this.connectionTimeout); } clearTimeout(cancel); // biome-ignore lint/suspicious/noExplicitAny: error catching diff --git a/packages/shuttle/src/shuttle/messageReconciliation.ts b/packages/shuttle/src/shuttle/messageReconciliation.ts index 14f50ef763..8716def59e 100644 --- a/packages/shuttle/src/shuttle/messageReconciliation.ts +++ b/packages/shuttle/src/shuttle/messageReconciliation.ts @@ -35,11 +35,13 @@ export class MessageReconciliation { private stream: ClientDuplexStream | undefined; private db: DB; private log: pino.Logger; + private connectionTimeout: number; // milliseconds - constructor(client: HubRpcClient, db: DB, log: pino.Logger) { + constructor(client: HubRpcClient, db: DB, log: pino.Logger, connectionTimeout = 30000) { this.client = client; this.db = db; this.log = log; + this.connectionTimeout = connectionTimeout; this.establishStream(); } @@ -182,7 +184,7 @@ export class MessageReconciliation { const id = randomUUID(); const result = new Promise>((resolve) => { // Do not allow hanging unresponsive connections to linger: - const cancel = setTimeout(() => resolve(err(new HubError("unavailable", "server timeout"))), 5000); + const cancel = setTimeout(() => resolve(err(new HubError("unavailable", "server timeout"))), this.connectionTimeout); if (!this.stream) { fallback() From aeb3bd6f6c8775b41cef8e6dced919ebfae286a5 Mon Sep 17 00:00:00 2001 From: Ty Book Date: Tue, 15 Oct 2024 13:26:30 -0400 Subject: [PATCH 2/5] Linting fix --- packages/shuttle/src/shuttle/messageReconciliation.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/shuttle/src/shuttle/messageReconciliation.ts b/packages/shuttle/src/shuttle/messageReconciliation.ts index 8716def59e..4a1a79b2ed 100644 --- a/packages/shuttle/src/shuttle/messageReconciliation.ts +++ b/packages/shuttle/src/shuttle/messageReconciliation.ts @@ -184,7 +184,10 @@ export class MessageReconciliation { const id = randomUUID(); const result = new Promise>((resolve) => { // Do not allow hanging unresponsive connections to linger: - const cancel = setTimeout(() => resolve(err(new HubError("unavailable", "server timeout"))), this.connectionTimeout); + const cancel = setTimeout( + () => resolve(err(new HubError("unavailable", "server timeout"))), + this.connectionTimeout, + ); if (!this.stream) { fallback() From 7276ea1c3f1bef2f5091732405c88a1a636a7496 Mon Sep 17 00:00:00 2001 From: Ty Book Date: Tue, 15 Oct 2024 13:49:52 -0400 Subject: [PATCH 3/5] Try fixing test --- packages/shuttle/src/shuttle.integration.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/shuttle/src/shuttle.integration.test.ts b/packages/shuttle/src/shuttle.integration.test.ts index 87847c1c54..41e1a06d7b 100644 --- a/packages/shuttle/src/shuttle.integration.test.ts +++ b/packages/shuttle/src/shuttle.integration.test.ts @@ -704,7 +704,7 @@ describe("shuttle", () => { }, getAllCastMessagesByFid: async (_request: FidRequest, _metadata: Metadata, _options: Partial) => { // force wait for 2 seconds to trigger failure - await new Promise((resolve) => setTimeout(resolve, 5000)); + await new Promise((resolve) => setTimeout(resolve, 30000)); return ok( MessagesResponse.create({ messages: [ @@ -744,7 +744,7 @@ describe("shuttle", () => { _options: Partial, ) => { // force wait for 2 seconds to trigger failure - await new Promise((resolve) => setTimeout(resolve, 5000)); + await new Promise((resolve) => setTimeout(resolve, 30000)); return ok( MessagesResponse.create({ messages: [], @@ -791,7 +791,7 @@ describe("shuttle", () => { startTimestamp, ), ).rejects.toThrow(); - }, 15000); // Need to make sure this is long enough to handle the timeout termination + }, 40000); // Need to make sure this is long enough to handle the timeout termination test("marks messages as pruned", async () => { const addMessage = await Factories.ReactionAddMessage.create({}, { transient: { signer } }); From 2fbbf1e4ea62e8339ad6dd55ad473a4dd3b040b9 Mon Sep 17 00:00:00 2001 From: Ty Book Date: Wed, 16 Oct 2024 14:10:10 -0400 Subject: [PATCH 4/5] Decrease test timeouts --- packages/shuttle/src/shuttle.integration.test.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/shuttle/src/shuttle.integration.test.ts b/packages/shuttle/src/shuttle.integration.test.ts index 41e1a06d7b..d1f1118e86 100644 --- a/packages/shuttle/src/shuttle.integration.test.ts +++ b/packages/shuttle/src/shuttle.integration.test.ts @@ -704,7 +704,7 @@ describe("shuttle", () => { }, getAllCastMessagesByFid: async (_request: FidRequest, _metadata: Metadata, _options: Partial) => { // force wait for 2 seconds to trigger failure - await new Promise((resolve) => setTimeout(resolve, 30000)); + await new Promise((resolve) => setTimeout(resolve, 500)); return ok( MessagesResponse.create({ messages: [ @@ -744,7 +744,7 @@ describe("shuttle", () => { _options: Partial, ) => { // force wait for 2 seconds to trigger failure - await new Promise((resolve) => setTimeout(resolve, 30000)); + await new Promise((resolve) => setTimeout(resolve, 500)); return ok( MessagesResponse.create({ messages: [], @@ -767,7 +767,7 @@ describe("shuttle", () => { }; // Only include 2 of the 3 messages in the time window - const reconciler = new MessageReconciliation(mockRPCClient as unknown as HubRpcClient, db, log); + const reconciler = new MessageReconciliation(mockRPCClient as unknown as HubRpcClient, db, log, 500); const messagesOnHub: Message[] = []; const messagesInDb: { hash: Uint8Array; @@ -791,7 +791,7 @@ describe("shuttle", () => { startTimestamp, ), ).rejects.toThrow(); - }, 40000); // Need to make sure this is long enough to handle the timeout termination + }, 1000); // Need to make sure this is long enough to handle the timeout termination test("marks messages as pruned", async () => { const addMessage = await Factories.ReactionAddMessage.create({}, { transient: { signer } }); From 7d66f0416a3a041381b4f66534cf4723ffb13240 Mon Sep 17 00:00:00 2001 From: Ty Book Date: Wed, 16 Oct 2024 15:26:04 -0400 Subject: [PATCH 5/5] Fix test again --- packages/shuttle/src/shuttle.integration.test.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/shuttle/src/shuttle.integration.test.ts b/packages/shuttle/src/shuttle.integration.test.ts index d1f1118e86..3ff8ac60b2 100644 --- a/packages/shuttle/src/shuttle.integration.test.ts +++ b/packages/shuttle/src/shuttle.integration.test.ts @@ -703,8 +703,8 @@ describe("shuttle", () => { ); }, getAllCastMessagesByFid: async (_request: FidRequest, _metadata: Metadata, _options: Partial) => { - // force wait for 2 seconds to trigger failure - await new Promise((resolve) => setTimeout(resolve, 500)); + // force wait longer than MessageReconciliation's configured timeout to trigger failure + await new Promise((resolve) => setTimeout(resolve, 550)); return ok( MessagesResponse.create({ messages: [ @@ -743,8 +743,8 @@ describe("shuttle", () => { _metadata: Metadata, _options: Partial, ) => { - // force wait for 2 seconds to trigger failure - await new Promise((resolve) => setTimeout(resolve, 500)); + // force wait longer than MessageReconciliation's configured timeout to trigger failure + await new Promise((resolve) => setTimeout(resolve, 550)); return ok( MessagesResponse.create({ messages: [], @@ -791,7 +791,7 @@ describe("shuttle", () => { startTimestamp, ), ).rejects.toThrow(); - }, 1000); // Need to make sure this is long enough to handle the timeout termination + }, 5000); // Need to make sure this is long enough to handle the timeout termination test("marks messages as pruned", async () => { const addMessage = await Factories.ReactionAddMessage.create({}, { transient: { signer } });