diff --git a/.changeset/tricky-badgers-tan.md b/.changeset/tricky-badgers-tan.md new file mode 100644 index 0000000000..371d749714 --- /dev/null +++ b/.changeset/tricky-badgers-tan.md @@ -0,0 +1,5 @@ +--- +"@farcaster/shuttle": patch +--- + +feat(shuttle): Parametrize hub connection timeouts for HubSubscriber and MessageReconciliation diff --git a/packages/shuttle/src/shuttle.integration.test.ts b/packages/shuttle/src/shuttle.integration.test.ts index 87847c1c54..3ff8ac60b2 100644 --- a/packages/shuttle/src/shuttle.integration.test.ts +++ b/packages/shuttle/src/shuttle.integration.test.ts @@ -703,8 +703,8 @@ describe("shuttle", () => { ); }, getAllCastMessagesByFid: async (_request: FidRequest, _metadata: Metadata, _options: Partial) => { - // force wait for 2 seconds to trigger failure - await new Promise((resolve) => setTimeout(resolve, 5000)); + // force wait longer than MessageReconciliation's configured timeout to trigger failure + await new Promise((resolve) => setTimeout(resolve, 550)); return ok( MessagesResponse.create({ messages: [ @@ -743,8 +743,8 @@ describe("shuttle", () => { _metadata: Metadata, _options: Partial, ) => { - // force wait for 2 seconds to trigger failure - await new Promise((resolve) => setTimeout(resolve, 5000)); + // force wait longer than MessageReconciliation's configured timeout to trigger failure + await new Promise((resolve) => setTimeout(resolve, 550)); return ok( MessagesResponse.create({ messages: [], @@ -767,7 +767,7 @@ describe("shuttle", () => { }; // Only include 2 of the 3 messages in the time window - const reconciler = new MessageReconciliation(mockRPCClient as unknown as HubRpcClient, db, log); + const reconciler = new MessageReconciliation(mockRPCClient as unknown as HubRpcClient, db, log, 500); const messagesOnHub: Message[] = []; const messagesInDb: { hash: Uint8Array; @@ -791,7 +791,7 @@ describe("shuttle", () => { startTimestamp, ), ).rejects.toThrow(); - }, 15000); // Need to make sure this is long enough to handle the timeout termination + }, 5000); // Need to make sure this is long enough to handle the timeout termination test("marks messages as pruned", async () => { const addMessage = await Factories.ReactionAddMessage.create({}, { transient: { signer } }); diff --git a/packages/shuttle/src/shuttle/hubSubscriber.ts b/packages/shuttle/src/shuttle/hubSubscriber.ts index db0950cd24..a22804db57 100644 --- a/packages/shuttle/src/shuttle/hubSubscriber.ts +++ b/packages/shuttle/src/shuttle/hubSubscriber.ts @@ -55,6 +55,7 @@ export class BaseHubSubscriber extends HubSubscriber { private stream: ClientReadableStream | null = null; private totalShards: number | undefined; private shardIndex: number | undefined; + private connectionTimeout: number; // milliseconds constructor( label: string, @@ -63,6 +64,7 @@ export class BaseHubSubscriber extends HubSubscriber { eventTypes?: HubEventType[], totalShards?: number, shardIndex?: number, + connectionTimeout = 30000, ) { super(); this.label = label; @@ -71,6 +73,7 @@ export class BaseHubSubscriber extends HubSubscriber { this.totalShards = totalShards; this.shardIndex = shardIndex; this.eventTypes = eventTypes || DEFAULT_EVENT_TYPES; + this.connectionTimeout = connectionTimeout; } public override stop() { @@ -156,13 +159,13 @@ export class BaseHubSubscriber extends HubSubscriber { // Do not allow hanging unresponsive connections to linger: let cancel = setTimeout(() => { this.destroy(); - }, 30000); + }, this.connectionTimeout); for await (const event of stream) { await this.processHubEvent(event); clearTimeout(cancel); cancel = setTimeout(() => { this.destroy(); - }, 30000); + }, this.connectionTimeout); } clearTimeout(cancel); // biome-ignore lint/suspicious/noExplicitAny: error catching diff --git a/packages/shuttle/src/shuttle/messageReconciliation.ts b/packages/shuttle/src/shuttle/messageReconciliation.ts index 14f50ef763..4a1a79b2ed 100644 --- a/packages/shuttle/src/shuttle/messageReconciliation.ts +++ b/packages/shuttle/src/shuttle/messageReconciliation.ts @@ -35,11 +35,13 @@ export class MessageReconciliation { private stream: ClientDuplexStream | undefined; private db: DB; private log: pino.Logger; + private connectionTimeout: number; // milliseconds - constructor(client: HubRpcClient, db: DB, log: pino.Logger) { + constructor(client: HubRpcClient, db: DB, log: pino.Logger, connectionTimeout = 30000) { this.client = client; this.db = db; this.log = log; + this.connectionTimeout = connectionTimeout; this.establishStream(); } @@ -182,7 +184,10 @@ export class MessageReconciliation { const id = randomUUID(); const result = new Promise>((resolve) => { // Do not allow hanging unresponsive connections to linger: - const cancel = setTimeout(() => resolve(err(new HubError("unavailable", "server timeout"))), 5000); + const cancel = setTimeout( + () => resolve(err(new HubError("unavailable", "server timeout"))), + this.connectionTimeout, + ); if (!this.stream) { fallback()