Skip to content

Commit

Permalink
Further clarify logic
Browse files Browse the repository at this point in the history
Moved lock-grabbing to outer level.
  • Loading branch information
Neil Booth committed Feb 11, 2021
1 parent ddf63f9 commit 48db78b
Showing 1 changed file with 43 additions and 41 deletions.
84 changes: 43 additions & 41 deletions electrumx/server/block_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,17 +311,7 @@ def flush_data(self):

async def flush(self, flush_utxos):
self.db.flush_dbs(self.flush_data(), flush_utxos, self.estimate_txs_remaining)

async def _maybe_flush(self):
# If caught up, flush everything as client queries are
# performed on the DB.
if self._caught_up_event.is_set():
await self.flush(True)
elif time.monotonic() > self.next_cache_check:
flush_arg = self.check_cache_size()
if flush_arg is not None:
await self.flush(flush_arg)
self.next_cache_check = time.monotonic() + 30
self.next_cache_check = time.monotonic() + 30

def check_cache_size(self):
'''Flush a cache if it gets too big.'''
Expand Down Expand Up @@ -349,29 +339,39 @@ def check_cache_size(self):
return utxo_MB >= cache_MB * 4 // 5
return None

async def check_and_advance_blocks(self, raw_blocks):
async def _advance_blocks(self, raw_blocks):
'''Process the list of raw blocks passed. Detects and handles reorgs.'''
start = time.monotonic()
for raw_block in raw_blocks:
block = self.coin.block(raw_block)
if self.coin.header_prevhash(block.header) != self.tip:
self.schedule_reorg(-1)
return
await self.advance_block(block)

await self._maybe_flush()
await self._advance_block(block)
end = time.monotonic()

if not self.db.first_sync:
s = '' if len(raw_blocks) == 1 else 's'
blocks_size = sum(len(block) for block in raw_blocks) / 1_000_000
self.logger.info(f'processed {len(raw_blocks):,d} block{s} size {blocks_size:.2f} MB '
f'in {time.monotonic() - start:.1f}s')
f'in {end - start:.1f}s')

# If caught up, flush everything as client queries are performed on the DB,
# otherwise check at regular intervals.
if self.height == self.daemon.cached_height():
await self.flush(True)
await self._on_caught_up()
elif end > self.next_cache_check:
flush_arg = self.check_cache_size()
if flush_arg is not None:
await self.flush(flush_arg)

if self._caught_up_event.is_set():
await self.notifications.on_block(self.touched, self.height)

self.touched = set()

async def advance_block(self, block):
async def _advance_block(self, block):
'''Advance once block. It is already verified they correctly connect onto our tip.'''
min_height = self.db.min_undo_height(self.daemon.cached_height())
height = self.height + 1
Expand Down Expand Up @@ -607,34 +607,36 @@ def spend_utxo(self, tx_hash, tx_idx):
raise ChainError('UTXO {} / {:,d} not found in "h" table'
.format(hash_to_hex_str(tx_hash), tx_idx))

async def _process_prefetched_blocks(self):
async def _process_blocks(self):
'''Loop forever processing blocks as they arrive.'''
while True:
if self.height == self.daemon.cached_height():
if not self._caught_up_event.is_set():
await self.run_with_lock(self._first_caught_up())
self._caught_up_event.set()

await self.blocks_event.wait()
self.blocks_event.clear()

async def process_event():
'''Perform any pending reorg, the process prefetched blocks.'''
if self.reorg_count is not None:
await self.run_with_lock(self.reorg_chain(self.reorg_count))
await self.reorg_chain(self.reorg_count)
self.reorg_count = None

blocks = self.prefetcher.get_prefetched_blocks()
await self.run_with_lock(self.check_and_advance_blocks(blocks))
await self._advance_blocks(blocks)

async def _first_caught_up(self):
self.logger.info(f'caught up to height {self.height}')
# Flush everything but with first_sync->False state.
first_sync = self.db.first_sync
self.db.first_sync = False
await self.flush(True)
if first_sync:
self.logger.info(f'{electrumx.version} synced to height {self.height:,d}')
# Reopen for serving
await self.db.open_for_serving()
# This must be done to set state before the main loop
if self.height == self.daemon.cached_height():
await self._on_caught_up()

while True:
await self.blocks_event.wait()
self.blocks_event.clear()
await self.run_with_lock(process_event())

async def _on_caught_up(self):
if not self._caught_up_event.is_set():
self._caught_up_event.set()
self.logger.info(f'caught up to height {self.height}')
# Flush everything but with first_sync->False state.
first_sync = self.db.first_sync
self.db.first_sync = False
if first_sync:
self.logger.info(f'{electrumx.version} synced to height {self.height:,d}')
# Reopen for serving
await self.db.open_for_serving()

async def _first_open_dbs(self):
await self.db.open_for_sync()
Expand All @@ -661,7 +663,7 @@ async def fetch_and_process_blocks(self, caught_up_event):
try:
async with TaskGroup() as group:
await group.spawn(self.prefetcher.main_loop(self.height))
await group.spawn(self._process_prefetched_blocks())
await group.spawn(self._process_blocks())
# Don't flush for arbitrary exceptions as they might be a cause or consequence of
# corrupted data
except CancelledError:
Expand Down

0 comments on commit 48db78b

Please sign in to comment.