-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtask-buffer.ts
87 lines (77 loc) · 2.06 KB
/
task-buffer.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import {
createChannel,
Err,
Ok,
Operation,
Resolve,
resource,
Result,
spawn,
Stream,
Task,
useScope,
withResolvers,
} from "effection";
export interface TaskBuffer extends Operation<void> {
spawn<T>(op: () => Operation<T>): Operation<Operation<Task<T>>>;
}
export function useTaskBuffer(max: number): Operation<TaskBuffer> {
return resource(function* (provide) {
let input = createChannel<void, never>();
let output = createChannel<Result<unknown>, never>();
let buffer = new Set<Task<unknown>>();
let scope = yield* useScope();
let requests: SpawnRequest<unknown>[] = [];
yield* spawn(function* () {
while (true) {
if (requests.length === 0) {
yield* next(input);
} else if (buffer.size < max) {
let request = requests.pop()!;
let task = yield* scope.spawn(request.operation);
buffer.add(task);
yield* spawn(function* () {
try {
let result = Ok(yield* task);
buffer.delete(task);
yield* output.send(result);
} catch (error) {
buffer.delete(task);
yield* output.send(Err(error as Error));
}
});
request.resolve(task);
} else {
yield* next(output);
}
}
});
yield* provide({
*[Symbol.iterator]() {
let outputs = yield* output;
while (buffer.size > 0 || requests.length > 0) {
yield* outputs.next();
}
},
*spawn<T>(fn: () => Operation<T>) {
let { operation, resolve } = withResolvers<Task<T>>();
requests.unshift({
operation: fn,
resolve: resolve as Resolve<unknown>,
});
yield* input.send();
return operation;
},
});
});
}
interface SpawnRequest<T> {
operation(): Operation<T>;
resolve: Resolve<Task<T>>;
}
function* next<T, TClose>(
stream: Stream<T, TClose>,
): Operation<IteratorResult<T, TClose>> {
let subscription = yield* stream;
return yield* subscription.next();
}