Skip to content

Commit

Permalink
Reorganize code in preparation
Browse files Browse the repository at this point in the history
  • Loading branch information
Neil Booth committed Feb 11, 2021
1 parent 6264ece commit 1379529
Showing 1 changed file with 64 additions and 55 deletions.
119 changes: 64 additions & 55 deletions electrumx/server/block_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,25 @@ def __init__(self, env, db, daemon, notifications):
self.daemon = daemon
self.notifications = notifications

self.coin = env.coin
# Set when there is block processing to do, e.g. when new blocks come in, or a
# reorg is needed.
self.blocks_event = asyncio.Event()

# If the lock is successfully acquired, in-memory chain state
# is consistent with self.height
self.state_lock = asyncio.Lock()

# Signalled after backing up during a reorg
self.backed_up_event = asyncio.Event()

self.coin = env.coin
self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event)
self.logger = class_logger(__name__, self.__class__.__name__)

# Meta
self.next_cache_check = 0
self.touched = set()
self.reorg_count = 0
self.reorg_count = None
self.height = -1
self.tip = None
self.tx_count = 0
Expand All @@ -184,63 +194,27 @@ def __init__(self, env, db, daemon, notifications):
self.utxo_cache = {}
self.db_deletes = []

# If the lock is successfully acquired, in-memory chain state
# is consistent with self.height
self.state_lock = asyncio.Lock()

# Signalled after backing up during a reorg
self.backed_up_event = asyncio.Event()

async def run_with_lock(self, coro):
# Shielded so that cancellations from shutdown don't lose work - when the task
# completes the data will be flushed and then we shut down. Take the state lock
# to be certain in-memory state is consistent and not being updated elsewhere.
# Shielded so that cancellations from shutdown don't lose work. Cancellation will
# cause fetch_and_process_blocks to block on the lock in flush(), the task completes,
# and then the data is flushed. We also don't want user-signalled reorgs to happen
# in the middle of processing blocks; they need to wait.
async def run_locked():
async with self.state_lock:
return await coro
return await asyncio.shield(run_locked())

async def check_and_advance_blocks(self, raw_blocks):
'''Process the list of raw blocks passed. Detects and handles
reorgs.
'''
if not raw_blocks:
return
blocks = [self.coin.block(raw_block) for raw_block in raw_blocks]
headers = [block.header for block in blocks]
hprevs = [self.coin.header_prevhash(h) for h in headers]
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]

if hprevs == chain:
start = time.monotonic()
await self.run_with_lock(self.advance_blocks(blocks))
await self._maybe_flush()
if not self.db.first_sync:
s = '' if len(blocks) == 1 else 's'
blocks_size = sum(len(block) for block in raw_blocks) / 1_000_000
self.logger.info(f'processed {len(blocks):,d} block{s} size {blocks_size:.2f} MB '
f'in {time.monotonic() - start:.1f}s')
if self._caught_up_event.is_set():
await self.notifications.on_block(self.touched, self.height)
self.touched = set()
elif hprevs[0] != chain[0]:
await self.reorg_chain()
else:
# It is probably possible but extremely rare that what
# bitcoind returns doesn't form a chain because it
# reorg-ed the chain as it was processing the batched
# block hash requests. Should this happen it's simplest
# just to reset the prefetcher and try again.
self.logger.warning('daemon blocks do not form a chain; '
'resetting the prefetcher')
await self.prefetcher.reset_height(self.height)
def schedule_reorg(self, count):
'''A count >= 0 is a user-forced reorg; < 0 is a natural reorg.'''
self.reorg_count = count
self.blocks_event.set()

async def reorg_chain(self, count=None):
async def reorg_chain(self, count):
'''Handle a chain reorganisation.
Count is the number of blocks to simulate a reorg, or None for
a real reorg.'''
if count is None:
if count < 0:
self.logger.info('chain reorg detected')
else:
self.logger.info(f'faking a reorg of {count:,d} blocks')
Expand Down Expand Up @@ -299,7 +273,7 @@ def diff_pos(hashes1, hashes2):
return n
return len(hashes)

if count is None:
if count < 0:
# A real reorg
start = self.height - 1
count = 1
Expand Down Expand Up @@ -381,6 +355,41 @@ def check_cache_size(self):
return utxo_MB >= cache_MB * 4 // 5
return None

async def check_and_advance_blocks(self, raw_blocks):
'''Process the list of raw blocks passed. Detects and handles
reorgs.
'''
if not raw_blocks:
return
blocks = [self.coin.block(raw_block) for raw_block in raw_blocks]
headers = [block.header for block in blocks]
hprevs = [self.coin.header_prevhash(h) for h in headers]
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]

if hprevs == chain:
start = time.monotonic()
await self.run_with_lock(self.advance_blocks(blocks))
await self._maybe_flush()
if not self.db.first_sync:
s = '' if len(blocks) == 1 else 's'
blocks_size = sum(len(block) for block in raw_blocks) / 1_000_000
self.logger.info(f'processed {len(blocks):,d} block{s} size {blocks_size:.2f} MB '
f'in {time.monotonic() - start:.1f}s')
if self._caught_up_event.is_set():
await self.notifications.on_block(self.touched, self.height)
self.touched = set()
elif hprevs[0] != chain[0]:
self.schedule_reorg(-1)
else:
# It is probably possible but extremely rare that what
# bitcoind returns doesn't form a chain because it
# reorg-ed the chain as it was processing the batched
# block hash requests. Should this happen it's simplest
# just to reset the prefetcher and try again.
self.logger.warning('daemon blocks do not form a chain; '
'resetting the prefetcher')
await self.prefetcher.reset_height(self.height)

async def advance_blocks(self, blocks):
'''Advance the blocks.
Expand Down Expand Up @@ -631,11 +640,13 @@ async def _process_prefetched_blocks(self):
if not self._caught_up_event.is_set():
await self._first_caught_up()
self._caught_up_event.set()

await self.blocks_event.wait()
self.blocks_event.clear()
if self.reorg_count:

if self.reorg_count is not None:
await self.reorg_chain(self.reorg_count)
self.reorg_count = 0
self.reorg_count = None
else:
blocks = self.prefetcher.get_prefetched_blocks()
await self.check_and_advance_blocks(blocks)
Expand All @@ -647,8 +658,7 @@ async def _first_caught_up(self):
self.db.first_sync = False
await self.flush(True)
if first_sync:
self.logger.info(f'{electrumx.version} synced to '
f'height {self.height:,d}')
self.logger.info(f'{electrumx.version} synced to height {self.height:,d}')
# Reopen for serving
await self.db.open_for_serving()

Expand Down Expand Up @@ -690,7 +700,6 @@ def force_chain_reorg(self, count):
Returns True if a reorg is queued, false if not caught up.
'''
if self._caught_up_event.is_set():
self.reorg_count = count
self.blocks_event.set()
self.schedule_reorg(count)
return True
return False

0 comments on commit 1379529

Please sign in to comment.