Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
callebtc committed Jul 6, 2024
1 parent deaa3e5 commit 20099a4
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions cashu/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,38 +205,45 @@ async def connect(
lock_select_statement: Optional[str] = None,
lock_timeout: Optional[float] = None,
):
async def _handle_lock_retry(retry_delay, timeout, start_time):
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, timeout - (time.time() - start_time))

def _is_lock_exception(e):
return "database is locked" in str(e) or "could not obtain lock" in str(e)

timeout = lock_timeout or 5 # default to 5 seconds
start_time = time.time()
conn = None
retry_delay = 0.1
random_int = int(time.time() * 1000)
trial = 0
while time.time() - start_time < timeout:
trial += 1
try:
logger.trace("Connecting to database")
logger.trace("Connecting to database (trial: {trial})")
async with self.engine.begin() as conn: # type: ignore
assert type(conn) == AsyncConnection
assert isinstance(conn, AsyncConnection)
logger.trace("Connected to database. Starting transaction")
wconn = Connection(conn, None, self.type, self.name, self.schema)
if lock_table:
logger.trace("Acquiring lock")
await self.acquire_lock(
wconn, lock_table, lock_select_statement
)
logger.trace("Lock acquired")
logger.trace(f"> Yielding connection. Lock: {lock_table}")
logger.trace(
f"> Yielding connection. Lock: {lock_table} - {random_int} - {trial}"
)
yield wconn
logger.trace(f"< Connection yielded. Unlock: {lock_table}")
logger.trace(
f"< Connection yielded. Unlock: {lock_table} - {random_int} - {trial}"
)
return
except psycopg2.errors.LockNotAvailable as e:
# we don't raise exceptions related to locks
logger.trace(f"Table {lock_table} is already locked: {e}")
await asyncio.sleep(0.1)
# OperationalError
await _handle_lock_retry(retry_delay, timeout, start_time)
except Exception as e:
# check error strings for SQLite and PostgreSQL
if "database is locked" in str(e) or "could not obtain lock" in str(e):
# we don't raise exceptions related to locks
await asyncio.sleep(0.1)
if _is_lock_exception(e):
await _handle_lock_retry(retry_delay, timeout, start_time)
else:
# we raise all other exceptions
raise e
raise Exception("failed to acquire database lock")

Expand Down

0 comments on commit 20099a4

Please sign in to comment.