Skip to content

Commit

Permalink
Rename LockOrchestrator to Ledger (#2081)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartfeenstra authored Oct 6, 2024
1 parent 0d79663 commit e4dabe0
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 27 deletions.
12 changes: 6 additions & 6 deletions betty/cache/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Generic, Self, overload, AsyncContextManager, Literal, TypeVar

from betty.cache import Cache, CacheItem, CacheItemValueSetter
from betty.concurrent import AsynchronizedLock, LockOrchestrator
from betty.concurrent import AsynchronizedLock, Ledger
from typing_extensions import override

_CacheItemValueCoT = TypeVar("_CacheItemValueCoT", covariant=True)
Expand Down Expand Up @@ -42,7 +42,7 @@ def __init__(
self._scopes = scopes or ()
self._scoped_caches: MutableMapping[str, Self] = {}
self._cache_lock = AsynchronizedLock.threading()
self._cache_item_lock_orchestrator = LockOrchestrator(self._cache_lock)
self._cache_item_lock_ledger = Ledger(self._cache_lock)

@override
def with_scope(self, scope: str) -> Self:
Expand All @@ -61,7 +61,7 @@ def _with_scope(self, scope: str) -> Self:
async def get(
self, cache_item_id: str
) -> AsyncIterator[CacheItem[_CacheItemValueContraT] | None]:
async with self._cache_item_lock_orchestrator.orchestrate(cache_item_id):
async with self._cache_item_lock_ledger.ledger(cache_item_id):
yield await self._get(cache_item_id)

@abstractmethod
Expand All @@ -78,7 +78,7 @@ async def set(
*,
modified: int | float | None = None,
) -> None:
async with self._cache_item_lock_orchestrator.orchestrate(cache_item_id):
async with self._cache_item_lock_ledger.ledger(cache_item_id):
await self._set(cache_item_id, value, modified=modified)

@abstractmethod
Expand Down Expand Up @@ -122,7 +122,7 @@ async def getset(
CacheItemValueSetter[_CacheItemValueContraT] | None,
]
]:
lock = self._cache_item_lock_orchestrator.orchestrate(cache_item_id)
lock = self._cache_item_lock_ledger.ledger(cache_item_id)
if await lock.acquire(wait=wait):
try:

Expand All @@ -137,7 +137,7 @@ async def _setter(value: _CacheItemValueContraT) -> None:

@override
async def delete(self, cache_item_id: str) -> None:
async with self._cache_item_lock_orchestrator.orchestrate(cache_item_id):
async with self._cache_item_lock_ledger.ledger(cache_item_id):
await self._delete(cache_item_id)

@abstractmethod
Expand Down
40 changes: 20 additions & 20 deletions betty/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,56 +144,56 @@ async def wait(self) -> None:
self._available -= 1


class _OrchestratedLock(Lock):
class _Transaction(Lock):
def __init__(
self,
target: Hashable,
transaction_id: Hashable,
orchestrator_lock: Lock,
identifiers: MutableMapping[Hashable, bool],
ledger: MutableMapping[Hashable, bool],
):
self._target = target
self._orchestrator_lock = orchestrator_lock
self._targets = identifiers
self._transaction_id = transaction_id
self._ledger_lock = orchestrator_lock
self._ledger = ledger

@override
async def acquire(self, *, wait: bool = True) -> bool:
if wait:
while True:
async with self._orchestrator_lock:
async with self._ledger_lock:
if self._can_acquire():
return self._acquire()
await sleep(0)
else:
async with self._orchestrator_lock:
async with self._ledger_lock:
if self._can_acquire():
return self._acquire()
return False

def _can_acquire(self) -> bool:
return not self._targets[self._target]
return not self._ledger[self._transaction_id]

def _acquire(self) -> bool:
self._targets[self._target] = True
self._ledger[self._transaction_id] = True
return True

@override
async def release(self) -> None:
self._targets[self._target] = False
self._ledger[self._transaction_id] = False


class LockOrchestrator:
class Ledger:
"""
Orchestrate the lazy creation of locks, using a primary orchestrator lock to guard all administrative tasks.
Lazily create locks by keeping a ledger.
The primary orchestrator lock is released once a orchestrated lock is acquired.
The ledger lock is released once a transaction lock is acquired.
"""

def __init__(self, orchestrator_lock: Lock):
self._orchestrator_lock = orchestrator_lock
self._targets: MutableMapping[Hashable, bool] = defaultdict(lambda: False)
def __init__(self, ledger_lock: Lock):
self._ledger_lock = ledger_lock
self._ledger: MutableMapping[Hashable, bool] = defaultdict(lambda: False)

def orchestrate(self, target: Hashable) -> Lock:
def ledger(self, transaction_id: Hashable) -> Lock:
"""
Create a new lock for the given target.
Ledger a new lock for the given transaction ID.
"""
return _OrchestratedLock(target, self._orchestrator_lock, self._targets)
return _Transaction(transaction_id, self._ledger_lock, self._ledger)
2 changes: 1 addition & 1 deletion betty/tests/coverage/test_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class TestKnownToBeMissing:
# This is an abstract method.
"release": TestKnownToBeMissing,
},
"LockOrchestrator": TestKnownToBeMissing,
"Ledger": TestKnownToBeMissing,
"RateLimiter": {
"__aenter__": TestKnownToBeMissing,
"__aexit__": TestKnownToBeMissing,
Expand Down

0 comments on commit e4dabe0

Please sign in to comment.