Skip to content

Commit

Permalink
Revert back to original implementation of take()
Browse files Browse the repository at this point in the history
  • Loading branch information
jerelmiller committed Jan 11, 2025
1 parent bae1f93 commit bb1bfc2
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions src/testing/internal/ObservableStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ export class ObservableStream<T> {
}

take({ timeout = 100 }: TakeOptions = {}) {
return new Promise<ObservableEvent<T>>((resolve, reject) => {
this.reader.read().then((result) => resolve(result.value!));
setTimeout(reject, timeout, new Error("Timeout waiting for next event"));
});
return Promise.race([
this.reader.read().then((result) => result.value!),
new Promise<ObservableEvent<T>>((_, reject) => {
setTimeout(
reject,
timeout,
new Error("Timeout waiting for next event")
);
}),
]);
}

unsubscribe() {
Expand All @@ -57,7 +63,6 @@ export class ObservableStream<T> {

async takeComplete(options?: TakeOptions): Promise<void> {
const event = await this.take(options);

validateEquals(event, { type: "complete" });
}
}
Expand Down

0 comments on commit bb1bfc2

Please sign in to comment.