Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: performance fix #34

Merged
merged 12 commits into from
Aug 19, 2024
Merged
57 changes: 57 additions & 0 deletions _raw_semaphore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* @internal
*/
export class RawSemaphore {
#resolves: (() => void)[] = [];
#value: number;
#size: number;

/**
* Creates a new semaphore with the specified limit.
*
* @param size The maximum number of times the semaphore can be acquired before blocking.
* @throws {RangeError} if the size is not a positive safe integer.
*/
constructor(size: number) {
if (size <= 0 || !Number.isSafeInteger(size)) {
throw new RangeError(
`size must be a positive safe integer, got ${size}`,
);
}
this.#value = size;
this.#size = size;
}

/**
* Returns true if the semaphore is currently locked.
*/
get locked(): boolean {
return this.#value === 0;
}

/**
* Acquires the semaphore, blocking until the semaphore is available.
*/
acquire(): Promise<void> {
if (this.#value > 0) {
this.#value -= 1;
return Promise.resolve();
} else {
const { promise, resolve } = Promise.withResolvers<void>();
this.#resolves.push(resolve);
return promise;
}
}

/**
* Releases the semaphore, allowing the next waiting operation to proceed.
*/
release(): void {
const resolve = this.#resolves.shift();
if (resolve) {
resolve();
} else if (this.#value < this.#size) {
this.#value += 1;
}
}
}
29 changes: 14 additions & 15 deletions barrier.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { Notify } from "./notify.ts";

/**
* A synchronization primitive that allows multiple tasks to wait until all of
* them have reached a certain point of execution before continuing.
Expand All @@ -26,8 +24,8 @@ import { Notify } from "./notify.ts";
* ```
*/
export class Barrier {
#notify = new Notify();
#rest: number;
#waiter: PromiseWithResolvers<void> = Promise.withResolvers();
#value: number;

/**
* Creates a new `Barrier` that blocks until `size` threads have called `wait`.
Expand All @@ -41,23 +39,24 @@ export class Barrier {
`size must be a positive safe integer, got ${size}`,
);
}
this.#rest = size;
this.#value = size;
}

/**
* Wait for all threads to reach the barrier.
* Blocks until all threads reach the barrier.
*/
async wait({ signal }: { signal?: AbortSignal } = {}): Promise<void> {
signal?.throwIfAborted();
this.#rest -= 1;
if (this.#rest === 0) {
await Promise.all([
this.#notify.notified({ signal }),
this.#notify.notifyAll(),
]);
} else {
await this.#notify.notified({ signal });
wait({ signal }: { signal?: AbortSignal } = {}): Promise<void> {
if (signal?.aborted) {
return Promise.reject(signal.reason);
}
const { promise, resolve, reject } = this.#waiter;
const abort = () => reject(signal!.reason);
signal?.addEventListener("abort", abort, { once: true });
this.#value -= 1;
if (this.#value === 0) {
resolve();
}
return promise.finally(() => signal?.removeEventListener("abort", abort));
}
}
23 changes: 23 additions & 0 deletions barrier_bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Barrier as Barrier100 } from "jsr:@core/asyncutil@~1.0.0/barrier";
import { Barrier } from "./barrier.ts";

const length = 1_000;

Deno.bench({
name: "current",
fn: async () => {
const barrier = new Barrier(length);
await Promise.all(Array.from({ length }).map(() => barrier.wait()));
},
group: "Barrier#wait",
baseline: true,
});

Deno.bench({
name: "v1.0.0",
fn: async () => {
const barrier = new Barrier100(length);
await Promise.all(Array.from({ length }).map(() => barrier.wait()));
},
group: "Barrier#wait",
});
1 change: 1 addition & 0 deletions deno.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
],
"exclude": [
"**/*_test.ts",
"**/*_bench.ts",
".*"
]
},
Expand Down
7 changes: 7 additions & 0 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 9 additions & 5 deletions lock.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Mutex } from "./mutex.ts";
import { RawSemaphore } from "./_raw_semaphore.ts";

/**
* A mutual exclusion lock that provides safe concurrent access to a shared value.
Expand All @@ -16,7 +16,7 @@
* ```
*/
export class Lock<T> {
#mu = new Mutex();
#sem = new RawSemaphore(1);
#value: T;

/**
Expand All @@ -32,7 +32,7 @@
* Returns true if the lock is currently locked, false otherwise.
*/
get locked(): boolean {
return this.#mu.locked;
return this.#sem.locked;

Check warning on line 35 in lock.ts

View check run for this annotation

Codecov / codecov/patch

lock.ts#L35

Added line #L35 was not covered by tests
}

/**
Expand All @@ -43,7 +43,11 @@
* @returns A Promise that resolves with the result of the function.
*/
async lock<R>(fn: (value: T) => R | PromiseLike<R>): Promise<R> {
using _lock = await this.#mu.acquire();
return await fn(this.#value);
await this.#sem.acquire();
try {
return await fn(this.#value);
} finally {
this.#sem.release();
}
}
}
23 changes: 23 additions & 0 deletions lock_bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Lock as Lock100 } from "jsr:@core/asyncutil@~1.0.0/lock";
import { Lock } from "./lock.ts";

const length = 1_000;

Deno.bench({
name: "current",
fn: async () => {
const lock = new Lock(0);
await Promise.all(Array.from({ length }).map(() => lock.lock(() => {})));
},
group: "Lock#lock",
baseline: true,
});

Deno.bench({
name: "v1.0.0",
fn: async () => {
const lock = new Lock100(0);
await Promise.all(Array.from({ length }).map(() => lock.lock(() => {})));
},
group: "Lock#lock",
});
22 changes: 8 additions & 14 deletions mutex.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { RawSemaphore } from "./_raw_semaphore.ts";

/**
* A mutex (mutual exclusion) is a synchronization primitive that grants
* exclusive access to a shared resource.
Expand Down Expand Up @@ -26,33 +28,25 @@
* ```
*/
export class Mutex {
#waiters: Set<Promise<void>> = new Set();
#sem: RawSemaphore = new RawSemaphore(1);

/**
* Returns true if the mutex is locked, false otherwise.
*/
get locked(): boolean {
return this.#waiters.size > 0;
return this.#sem.locked;

Check warning on line 37 in mutex.ts

View check run for this annotation

Codecov / codecov/patch

mutex.ts#L37

Added line #L37 was not covered by tests
}

/**
* Acquire the mutex and return a promise with disposable that releases the mutex when disposed.
*
* @returns A Promise with Disposable that releases the mutex when disposed.
*/
acquire(): Promise<Disposable> & Disposable {
const waiters = [...this.#waiters];
const { promise, resolve } = Promise.withResolvers<void>();
this.#waiters.add(promise);
const disposable = {
acquire(): Promise<Disposable> {
return this.#sem.acquire().then(() => ({
[Symbol.dispose]: () => {
resolve();
this.#waiters.delete(promise);
this.#sem.release();
},
};
return Object.assign(
Promise.all(waiters).then(() => disposable),
disposable,
);
}));
}
}
33 changes: 33 additions & 0 deletions mutex_bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Mutex as Mutex100 } from "jsr:@core/asyncutil@~1.0.0/mutex";
import { Mutex } from "./mutex.ts";

const length = 1_000;

Deno.bench({
name: "current",
fn: async () => {
const mutex = new Mutex();
await Promise.all(
Array.from({ length }).map(async () => {
const lock = await mutex.acquire();
lock[Symbol.dispose]();
}),
);
},
group: "Mutex#wait",
baseline: true,
});

Deno.bench({
name: "v1.0.0",
fn: async () => {
const mutex = new Mutex100();
await Promise.all(
Array.from({ length }).map(async () => {
const lock = await mutex.acquire();
lock[Symbol.dispose]();
}),
);
},
group: "Mutex#wait",
});
38 changes: 16 additions & 22 deletions notify.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import { iter } from "@core/iterutil/iter";
import { take } from "@core/iterutil/take";

/**
* Async notifier that allows one or more "waiters" to wait for a notification.
*
Expand All @@ -23,13 +20,13 @@ import { take } from "@core/iterutil/take";
* ```
*/
export class Notify {
#waiters: Set<PromiseWithResolvers<void>> = new Set();
#waiters: PromiseWithResolvers<void>[] = [];

/**
* Returns the number of waiters that are waiting for notification.
*/
get waiterCount(): number {
return this.#waiters.size;
return this.#waiters.length;
}

/**
Expand All @@ -43,40 +40,37 @@ export class Notify {
if (n <= 0 || !Number.isSafeInteger(n)) {
throw new RangeError(`n must be a positive safe integer, got ${n}`);
}
const it = iter(this.#waiters);
for (const waiter of take(it, n)) {
waiter.resolve();
}
this.#waiters = new Set(it);
this.#waiters.splice(0, n).forEach(({ resolve }) => resolve());
}

/**
* Notifies all waiters that are waiting for notification. Resolves each of the notified waiters.
*/
notifyAll(): void {
for (const waiter of this.#waiters) {
waiter.resolve();
}
this.#waiters = new Set();
this.#waiters.forEach(({ resolve }) => resolve());
this.#waiters = [];
}

/**
* Asynchronously waits for notification. The caller's execution is suspended until
* the `notify` method is called. The method returns a Promise that resolves when the caller is notified.
* Optionally takes an AbortSignal to abort the waiting if the signal is aborted.
*/
async notified({ signal }: { signal?: AbortSignal } = {}): Promise<void> {
notified({ signal }: { signal?: AbortSignal } = {}): Promise<void> {
if (signal?.aborted) {
throw signal.reason;
return Promise.reject(signal.reason);
}
const waiter = Promise.withResolvers<void>();
const abort = () => {
this.#waiters.delete(waiter);
waiter.reject(signal!.reason);
const waiter = this.#waiters.shift();
if (waiter) {
waiter.reject(signal!.reason);
}
};
signal?.addEventListener("abort", abort, { once: true });
this.#waiters.add(waiter);
await waiter.promise;
signal?.removeEventListener("abort", abort);
const waiter = Promise.withResolvers<void>();
this.#waiters.push(waiter);
return waiter.promise.finally(() => {
signal?.removeEventListener("abort", abort);
});
}
}
Loading