From 40314d72ea52d85578cf441dcad98754720cb365 Mon Sep 17 00:00:00 2001 From: Wang Guan Date: Sat, 6 Apr 2024 02:18:41 +0900 Subject: [PATCH] wip --- src/concurrency/locks/lockable.spec.ts | 3 +++ src/concurrency/locks/lockable.ts | 15 +++++------ src/concurrency/resource-pool.ts | 35 +++++++++++++++++++------- 3 files changed, 35 insertions(+), 18 deletions(-) create mode 100644 src/concurrency/locks/lockable.spec.ts diff --git a/src/concurrency/locks/lockable.spec.ts b/src/concurrency/locks/lockable.spec.ts new file mode 100644 index 0000000..99a0e25 --- /dev/null +++ b/src/concurrency/locks/lockable.spec.ts @@ -0,0 +1,3 @@ +describe('ResourcePool lock', () => { + it.skip(''); +}); diff --git a/src/concurrency/locks/lockable.ts b/src/concurrency/locks/lockable.ts index ec29ff4..da8c236 100644 --- a/src/concurrency/locks/lockable.ts +++ b/src/concurrency/locks/lockable.ts @@ -1,14 +1,11 @@ +import { ResourcePool } from '../resource-pool'; + export class LockableError extends Error { static A = class L2 extends LockableError {}; } -interface AutoClosable { - close(): void; - [Symbol.dispose](): void; -} - -export abstract class Mutex { - abstract lock(): AutoClosable; - abstract unlock(): void; - abstract tryLock(): boolean; +export interface Lease { + value: T; + // close(): Promise; + [Symbol.asyncDispose](): Promise; } diff --git a/src/concurrency/resource-pool.ts b/src/concurrency/resource-pool.ts index f1b88ed..e795cfc 100644 --- a/src/concurrency/resource-pool.ts +++ b/src/concurrency/resource-pool.ts @@ -5,6 +5,8 @@ * - NOT supported: replace / refresh / timeout of tasks */ import { wait } from './timing'; +import { Lease } from './locks/lockable'; +import { lazyThenable } from './lazy-thenable'; export class ResourcePool { // can be used as a mutex @@ -38,16 +40,11 @@ export class ResourcePool { } async use(task: (res: T) => R): Promise> { - const r = await this.borrow(); - try { - return await task(r); - } finally { - this.resources.push(r); - this.balance(); - } + await using lease = await this.borrow(); + return await task(lease.value); } - tryUse(task: (res: T | null) => R): R | Promise { + tryUse(task: (res: T | null) => R): R | Promise> { if (/** some resource is immediately available */ this.freeCount > 0) { return this.use(task); } else { @@ -88,13 +85,33 @@ export class ResourcePool { } } - private borrow(): Promise { + async borrow(timeout?: number): Promise> { + const v = await this._borrow(); + + const _return = lazyThenable(async () => { + this._return(v); + }); + + return { + value: v, + async [Symbol.asyncDispose]() { + await _return; + }, + }; + } + + private _borrow(): Promise { return new Promise((f) => { this.consumers.push(f); this.balance(); }); } + private _return(value: T): void { + this.resources.push(value); + this.balance(); + } + private balance(): void { while (this.resources.length && this.consumers.length) { const r = this.resources.shift()!;