Skip to content

Commit

Permalink
avoid Promise.race in combinators
Browse files Browse the repository at this point in the history
  • Loading branch information
brainkim committed Sep 2, 2020
1 parent a06867a commit 03f4560
Showing 1 changed file with 141 additions and 97 deletions.
238 changes: 141 additions & 97 deletions packages/repeater/src/repeater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -594,50 +594,25 @@ export class Repeater<T, TReturn = any, TNext = unknown> {

/*** COMBINATOR FUNCTIONS ***/
// TODO: move these combinators to their own file.

function isAsyncIterable(value: any): value is AsyncIterable<unknown> {
return value != null && typeof value[Symbol.asyncIterator] === "function";
}

function isIterable(value: any): value is Iterable<unknown> {
return value != null && typeof value[Symbol.iterator] === "function";
}

function asyncIterators(
contenders: Iterable<any>,
function getIterators(
values: Iterable<any>,
options: { yieldValues?: boolean; returnValues?: boolean },
): AsyncIterator<any>[] {
const { yieldValues, returnValues } = options;
const iters: AsyncIterator<any>[] = [];
for (const contender of contenders) {
if (isAsyncIterable(contender)) {
iters.push((contender as AsyncIterable<any>)[Symbol.asyncIterator]());
} else if (isIterable(contender)) {
const iter = (contender as Iterable<any>)[Symbol.iterator]();
iters.push(
(async function* syncToAsyncIterator() {
try {
let result = iter.next();
while (!result.done) {
yield result.value;
result = iter.next();
}

return result.value;
} finally {
iter.return && iter.return();
}
})(),
);
): Array<AsyncIterator<any> | Iterator<any>> {
const iters: Array<AsyncIterator<any> | Iterator<any>> = [];
for (const value of values) {
if (value != null && typeof value[Symbol.asyncIterator] === "function") {
iters.push((value as AsyncIterable<any>)[Symbol.asyncIterator]());
} else if (value != null && typeof value[Symbol.iterator] === "function") {
iters.push((value as Iterable<any>)[Symbol.iterator]());
} else {
iters.push(
(async function* valueToAsyncIterator() {
if (yieldValues) {
yield contender;
if (options.yieldValues) {
yield value;
}

if (returnValues) {
return contender;
if (options.returnValues) {
return value;
}
})(),
);
Expand All @@ -647,6 +622,7 @@ function asyncIterators(
return iters;
}

// NOTE: whenever you see any variables called `advance` or `advances`, know that it is a hack to get around the fact that `Promise.race` leaks memory. These variables are intended to be set to the resolve function of a promise which is constructed and awaited as an alternative to Promise.race. For more information, see this comment in the Node.js issue tracker: https://github.com/nodejs/node/issues/17469#issuecomment-685216777.
function race<T>(
contenders: Iterable<T>,
): Repeater<
Expand All @@ -656,39 +632,51 @@ function race<T>(
: U
: never
> {
const iters = asyncIterators(contenders, { returnValues: true });
const iters = getIterators(contenders, { returnValues: true });
return new Repeater(async (push, stop) => {
if (!iters.length) {
stop();
return;
}

let advance!: (value?: IteratorYieldResult<unknown>) => unknown;
let stopped = false;
stop.then(() => (stopped = true));
let returned: any;
stop.then(() => {
advance();
stopped = true;
});

let finalIteration: IteratorReturnResult<unknown> | undefined;
try {
let iteration: IteratorYieldResult<unknown> | undefined;
let i = 0;
while (!stopped) {
const results = iters.map((iter) => iter.next());
for (const result of results) {
Promise.resolve(result).then(
(result) => {
if (result.done && !stopped) {
const j = i;
for (const iter of iters) {
Promise.resolve(iter.next()).then(
(iteration) => {
if (iteration.done) {
stop();
stopped = true;
returned = result.value;
if (finalIteration === undefined) {
finalIteration = iteration;
}
} else if (i === j) {
// This iterator has won, advance i and resolve the promise.
i++;
advance(iteration);
}
},
(err) => stop(err),
);
}

const result = await Promise.race([stop, ...results]);
if (result !== undefined && !result.done) {
await push(result.value);
iteration = await new Promise((resolve) => (advance = resolve));
if (iteration !== undefined) {
await push(iteration.value as any);
}
}

return returned;
return finalIteration && finalIteration.value;
} finally {
stop();
await Promise.race(iters.map((iter) => iter.return && iter.return()));
Expand All @@ -707,37 +695,56 @@ function merge<T>(
? U
: T
> {
const iters = asyncIterators(contenders, { yieldValues: true });
const iters = getIterators(contenders, { yieldValues: true });
return new Repeater(async (push, stop) => {
if (!iters.length) {
stop();
return;
}

const advances: Array<(value?: IteratorResult<unknown>) => unknown> = [];
let stopped = false;
stop.then(() => (stopped = true));
let returned: any;
await Promise.all(
iters.map(async (iter) => {
try {
while (!stopped) {
const result = await Promise.race([iter.next(), stop]);
if (result !== undefined) {
if (result.done) {
returned = result.value;
return;
}
stop.then(() => {
stopped = true;
for (const advance of advances) {
advance();
}
});

await push(result.value);
let finalIteration: IteratorReturnResult<unknown> | undefined;
try {
await Promise.all(
iters.map(async (iter, i) => {
try {
while (!stopped) {
Promise.resolve(iter.next()).then(
(iteration) => advances[i](iteration),
(err) => stop(err),
);
const iteration:
| IteratorResult<unknown>
| undefined = await new Promise((resolve) => {
advances[i] = resolve;
});

if (iteration !== undefined) {
if (iteration.done) {
finalIteration = iteration;
return;
}

await push(iteration.value as any);
}
}
} finally {
iter.return && (await iter.return());
}
} finally {
iter.return && (await iter.return());
}
}),
);
stop();
return returned;
}),
);
return finalIteration && finalIteration.value;
} finally {
stop();
}
});
}

Expand Down Expand Up @@ -768,25 +775,36 @@ function zip<T1, T2, T3, T4, T5, T6, T7, T8, T9>(contenders: [Contender<T1>, Con
// prettier-ignore
function zip<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>, Contender<T6>, Contender<T7>, Contender<T8>, Contender<T9>, Contender<T10>]): Repeater<[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10]>;
function zip(contenders: Iterable<any>) {
const iters = asyncIterators(contenders, { returnValues: true });
const iters = getIterators(contenders, { returnValues: true });
return new Repeater(async (push, stop) => {
if (!iters.length) {
stop();
return [];
}

let advance!: (iterations?: Array<IteratorResult<unknown>>) => unknown;
let stopped = false;
stop.then(() => (stopped = true));
stop.then(() => {
advance();
stopped = true;
});

try {
while (!stopped) {
const resultsP = Promise.all(iters.map((iter) => iter.next()));
const results = await Promise.race([stop, resultsP]);
if (results === undefined) {
Promise.all(iters.map((iter) => iter.next())).then(
(iterations) => advance(iterations),
(err) => stop(err),
);

const iterations: Array<IteratorResult<unknown>> | undefined = await new Promise(
(resolve) => (advance = resolve),
);
if (iterations === undefined) {
return;
}

const values = results.map((result) => result.value);
if (results.some((result) => result.done)) {
const values = iterations.map((iteration) => iteration.value);
if (iterations.some((iteration) => iteration.done)) {
return values;
}

Expand Down Expand Up @@ -820,47 +838,73 @@ function latest<T1, T2, T3, T4, T5, T6, T7, T8, T9>(contenders: [Contender<T1>,
// prettier-ignore
function latest<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>, Contender<T6>, Contender<T7>, Contender<T8>, Contender<T9>, Contender<T10>]): Repeater<[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10]>;
function latest(contenders: Iterable<any>) {
const iters = asyncIterators(contenders, {
const iters = getIterators(contenders, {
yieldValues: true,
returnValues: true,
});

return new Repeater(async (push, stop) => {
if (!iters.length) {
stop();
return [];
}

let advance!: (iterations?: Array<IteratorResult<unknown>>) => unknown;
const advances: Array<(iteration?: IteratorResult<unknown>) => unknown> = [];
let stopped = false;
stop.then(() => (stopped = true));
stop.then(() => {
advance();
for (const advance1 of advances) {
advance1();
}
stopped = true;
});

try {
const resultsP = Promise.all(iters.map((iter) => iter.next()));
const results = await Promise.race([stop, resultsP]);
if (results === undefined) {
Promise.all(iters.map((iter) => iter.next())).then(
(iterations) => advance(iterations),
(err) => stop(err),
);

const iterations:
| Array<IteratorResult<unknown>>
| undefined = await new Promise((resolve) => (advance = resolve));
if (iterations === undefined) {
return;
}

const values = results.map((result) => result.value);
if (results.every((result) => result.done)) {
const values = iterations.map((iteration) => iteration.value);
if (iterations.every((iteration) => iteration.done)) {
return values;
}

// We continuously yield and mutate the same values array so we shallow copy it each time it is pushed.
await push(values.slice());
return await Promise.all(
iters.map(async (iter, i) => {
if (results[i].done) {
return results[i].value;
if (iterations[i].done) {
return iterations[i].value;
}

while (!stopped) {
const result = await Promise.race([stop, iter.next()]);
if (result !== undefined) {
if (result.done) {
return result.value;
}

values[i] = result.value;
await push(values.slice());
Promise.resolve(iter.next()).then(
(iteration) => advances[i](iteration),
(err) => stop(err),
);

const iteration:
| IteratorResult<unknown>
| undefined = await new Promise(
(resolve) => (advances[i] = resolve),
);
if (iteration === undefined) {
return iterations[i].value;
} else if (iteration.done) {
return iteration.value;
}

values[i] = iteration.value;
await push(values.slice());
}
}),
);
Expand Down

0 comments on commit 03f4560

Please sign in to comment.