Skip to content

Commit

Permalink
avoid Promise.race in combinators
Browse files Browse the repository at this point in the history
  • Loading branch information
brainkim committed Sep 2, 2020
1 parent a06867a commit 104cc16
Showing 1 changed file with 133 additions and 75 deletions.
208 changes: 133 additions & 75 deletions packages/repeater/src/repeater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -603,40 +603,24 @@ function isIterable(value: any): value is Iterable<unknown> {
return value != null && typeof value[Symbol.iterator] === "function";
}

function asyncIterators(
function getIterators(
contenders: Iterable<any>,
options: { yieldValues?: boolean; returnValues?: boolean },
): AsyncIterator<any>[] {
const { yieldValues, returnValues } = options;
const iters: AsyncIterator<any>[] = [];
): Array<AsyncIterator<any> | Iterator<any>> {
const iters: Array<AsyncIterator<any> | Iterator<any>> = [];
for (const contender of contenders) {
if (isAsyncIterable(contender)) {
iters.push((contender as AsyncIterable<any>)[Symbol.asyncIterator]());
} else if (isIterable(contender)) {
const iter = (contender as Iterable<any>)[Symbol.iterator]();
iters.push(
(async function* syncToAsyncIterator() {
try {
let result = iter.next();
while (!result.done) {
yield result.value;
result = iter.next();
}

return result.value;
} finally {
iter.return && iter.return();
}
})(),
);
iters.push((contender as Iterable<any>)[Symbol.iterator]());
} else {
iters.push(
(async function* valueToAsyncIterator() {
if (yieldValues) {
if (options.yieldValues) {
yield contender;
}

if (returnValues) {
if (options.returnValues) {
return contender;
}
})(),
Expand All @@ -656,42 +640,60 @@ function race<T>(
: U
: never
> {
const iters = asyncIterators(contenders, { returnValues: true });
const iters = getIterators(contenders, { returnValues: true });
return new Repeater(async (push, stop) => {
if (!iters.length) {
if (iters.length === 0) {
stop();
return;
}

let advance!: (value?: IteratorYieldResult<unknown>) => unknown;
let stopped = false;
stop.then(() => (stopped = true));
let returned: any;
stop.then(() => {
advance();
stopped = true;
});

let finalIteration: IteratorReturnResult<unknown> | undefined;
try {
let iteration: IteratorYieldResult<unknown> | undefined;
let i = 0;
while (!stopped) {
const results = iters.map((iter) => iter.next());
for (const result of results) {
Promise.resolve(result).then(
(result) => {
if (result.done && !stopped) {
const j = i;
for (const iter of iters) {
Promise.resolve(iter.next()).then(
(iteration) => {
if (iteration.done) {
stop();
stopped = true;
returned = result.value;
if (finalIteration === undefined) {
finalIteration = iteration;
}
} else if (i === j) {
// This iterator has won, advance i and resolve the promise.
i++;
advance(iteration);
}
},
(err) => stop(err),
);
}

const result = await Promise.race([stop, ...results]);
if (result !== undefined && !result.done) {
await push(result.value);
iteration = await new Promise((resolve) => (advance = resolve));
if (iteration !== undefined) {
await push(iteration.value as any);
}
}

return returned;
return finalIteration && finalIteration.value;
} finally {
stop();
await Promise.race(iters.map((iter) => iter.return && iter.return()));
await Promise.race(
iters.map((iter) => {
if (iter.return) {
return iter.return();
}
}),
);
}
});
}
Expand All @@ -707,37 +709,56 @@ function merge<T>(
? U
: T
> {
const iters = asyncIterators(contenders, { yieldValues: true });
const iters = getIterators(contenders, { yieldValues: true });
return new Repeater(async (push, stop) => {
if (!iters.length) {
stop();
return;
}

const advances: Array<(value?: IteratorResult<unknown>) => unknown> = [];
let stopped = false;
stop.then(() => (stopped = true));
let returned: any;
await Promise.all(
iters.map(async (iter) => {
try {
while (!stopped) {
const result = await Promise.race([iter.next(), stop]);
if (result !== undefined) {
if (result.done) {
returned = result.value;
return;
}
stop.then(() => {
stopped = true;
for (const advance of advances) {
advance();
}
});

await push(result.value);
let finalIteration: IteratorReturnResult<unknown> | undefined;
try {
await Promise.all(
iters.map(async (iter, i) => {
try {
while (!stopped) {
Promise.resolve(iter.next()).then(
(iteration) => advances[i](iteration),
(err) => stop(err),
);
const iteration:
| IteratorResult<unknown>
| undefined = await new Promise((resolve) => {
advances[i] = resolve;
});

if (iteration !== undefined) {
if (iteration.done) {
finalIteration = iteration;
return;
}

await push(iteration.value as any);
}
}
} finally {
iter.return && (await iter.return());
}
} finally {
iter.return && (await iter.return());
}
}),
);
stop();
return returned;
}),
);
return finalIteration && finalIteration.value;
} finally {
stop();
}
});
}

Expand Down Expand Up @@ -768,19 +789,30 @@ function zip<T1, T2, T3, T4, T5, T6, T7, T8, T9>(contenders: [Contender<T1>, Con
// prettier-ignore
function zip<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>, Contender<T6>, Contender<T7>, Contender<T8>, Contender<T9>, Contender<T10>]): Repeater<[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10]>;
function zip(contenders: Iterable<any>) {
const iters = asyncIterators(contenders, { returnValues: true });
const iters = getIterators(contenders, { returnValues: true });
return new Repeater(async (push, stop) => {
if (!iters.length) {
stop();
return [];
}

let advance!: (results?: Array<IteratorResult<unknown>>) => unknown;
let stopped = false;
stop.then(() => (stopped = true));
stop.then(() => {
advance();
stopped = true;
});

try {
while (!stopped) {
const resultsP = Promise.all(iters.map((iter) => iter.next()));
const results = await Promise.race([stop, resultsP]);
Promise.all(iters.map((iter) => iter.next())).then(
(results) => advance(results),
(err) => stop(err),
);

const results: Array<IteratorResult<unknown>> = await new Promise(
(resolve) => (advance = resolve),
);
if (results === undefined) {
return;
}
Expand Down Expand Up @@ -820,21 +852,37 @@ function latest<T1, T2, T3, T4, T5, T6, T7, T8, T9>(contenders: [Contender<T1>,
// prettier-ignore
function latest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>, Contender<T6>, Contender<T7>, Contender<T8>, Contender<T9>, Contender<T10>]): Repeater<[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10]>;
function latest(contenders: Iterable<any>) {
const iters = asyncIterators(contenders, {
const iters = getIterators(contenders, {
yieldValues: true,
returnValues: true,
});

return new Repeater(async (push, stop) => {
if (!iters.length) {
stop();
return [];
}

let advance!: (results?: Array<IteratorResult<unknown>>) => unknown;
const advances: Array<(result?: IteratorResult<unknown>) => unknown> = [];
let stopped = false;
stop.then(() => (stopped = true));
stop.then(() => {
advance();
for (const advance1 of advances) {
advance1();
}
stopped = true;
});

try {
const resultsP = Promise.all(iters.map((iter) => iter.next()));
const results = await Promise.race([stop, resultsP]);
Promise.all(iters.map((iter) => iter.next())).then(
(results) => advance(results),
(err) => stop(err),
);

const results:
| Array<IteratorResult<unknown>>
| undefined = await new Promise((resolve) => (advance = resolve));
if (results === undefined) {
return;
}
Expand All @@ -844,6 +892,7 @@ function latest(contenders: Iterable<any>) {
return values;
}

// We continuously yield the values array so we must shallow copy it each time it is pushed.
await push(values.slice());
return await Promise.all(
iters.map(async (iter, i) => {
Expand All @@ -852,15 +901,24 @@ function latest(contenders: Iterable<any>) {
}

while (!stopped) {
const result = await Promise.race([stop, iter.next()]);
if (result !== undefined) {
if (result.done) {
return result.value;
}

values[i] = result.value;
await push(values.slice());
Promise.resolve(iter.next()).then(
(result) => advances[i](result),
(err) => stop(err),
);

const result:
| IteratorResult<unknown>
| undefined = await new Promise(
(resolve) => (advances[i] = resolve),
);
if (result === undefined) {
return results[i].value;
} else if (result.done) {
return result.value;
}

values[i] = result.value;
await push(values.slice());
}
}),
);
Expand Down

0 comments on commit 104cc16

Please sign in to comment.