Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce subscription lock usage to unsubscribe workflow only #17483

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 13 additions & 18 deletions chia/data_layer/data_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,15 +782,13 @@ async def subscribe(self, store_id: bytes32, urls: list[str]) -> Subscription:
parsed_urls = [url.rstrip("/") for url in urls]
subscription = Subscription(store_id, [ServerInfo(url, 0, 0) for url in parsed_urls])
await self.wallet_rpc.dl_track_new(subscription.store_id)
async with self.subscription_lock:
await self.data_store.subscribe(subscription)
await self.data_store.subscribe(subscription)
self.log.info(f"Done adding subscription: {subscription.store_id}")
return subscription

async def remove_subscriptions(self, store_id: bytes32, urls: list[str]) -> None:
parsed_urls = [url.rstrip("/") for url in urls]
async with self.subscription_lock:
await self.data_store.remove_subscriptions(store_id, parsed_urls)
await self.data_store.remove_subscriptions(store_id, parsed_urls)

async def unsubscribe(self, store_id: bytes32, retain_data: bool) -> None:
async with self.subscription_lock:
Expand Down Expand Up @@ -846,8 +844,7 @@ async def process_unsubscribe(self, store_id: bytes32, retain_data: bool) -> Non
pass

async def get_subscriptions(self) -> list[Subscription]:
async with self.subscription_lock:
return await self.data_store.get_subscriptions()
return await self.data_store.get_subscriptions()

async def add_mirror(self, store_id: bytes32, urls: list[str], amount: uint64, fee: uint64) -> None:
if not urls:
Expand Down Expand Up @@ -892,16 +889,15 @@ async def get_kv_diff_paginated(
async def periodically_manage_data(self) -> None:
manage_data_interval = self.config.get("manage_data_interval", 60)
while not self._shut_down:
async with self.subscription_lock:
try:
subscriptions = await self.data_store.get_subscriptions()
for subscription in subscriptions:
await self.wallet_rpc.dl_track_new(subscription.store_id)
break
except aiohttp.client_exceptions.ClientConnectorError:
pass
except Exception as e:
self.log.error(f"Exception while requesting wallet track subscription: {type(e)} {e}")
try:
subscriptions = await self.data_store.get_subscriptions()
for subscription in subscriptions:
await self.wallet_rpc.dl_track_new(subscription.store_id)
break
except aiohttp.client_exceptions.ClientConnectorError:
pass
except Exception as e:
self.log.error(f"Exception while requesting wallet track subscription: {type(e)} {e}")

self.log.warning("Cannot connect to the wallet. Retrying in 3s.")

Expand All @@ -913,8 +909,7 @@ async def periodically_manage_data(self) -> None:

while not self._shut_down:
# Add existing subscriptions
async with self.subscription_lock:
subscriptions = await self.data_store.get_subscriptions()
subscriptions = await self.data_store.get_subscriptions()

# pseudo-subscribe to all unsubscribed owned stores
# Need this to make sure we process updates and generate DAT files
Expand Down
Loading