Skip to content

Commit

Permalink
Optimize DL query. (#18949)
Browse files Browse the repository at this point in the history
Make query use the table primary key, optimizing the runtime
significantly.
  • Loading branch information
altendky authored Nov 29, 2024
2 parents aeea85f + c8a9574 commit 9e6f75d
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions chia/data_layer/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ async def insert_into_data_store_from_file(

merkle_blob = MerkleBlob(blob=bytearray())
if root_hash is not None:
await self.build_blob_from_nodes(internal_nodes, terminal_nodes, root_hash, merkle_blob)
await self.build_blob_from_nodes(internal_nodes, terminal_nodes, root_hash, merkle_blob, store_id)

await self.insert_root_from_merkle_blob(merkle_blob, store_id, Status.COMMITTED)
await self.add_node_hashes(store_id)
Expand Down Expand Up @@ -318,13 +318,13 @@ async def migrate_db(self, server_files_location: Path) -> None:
log.error(f"Cannot recover data from {filename}: {e}")
break

async def get_merkle_blob(self, root_hash: Optional[bytes32]) -> MerkleBlob:
async def get_merkle_blob(self, root_hash: Optional[bytes32], read_only: bool = False) -> MerkleBlob:
if root_hash is None:
return MerkleBlob(blob=bytearray())

existing_blob = self.recent_merkle_blobs.get(root_hash)
if existing_blob is not None:
return copy.deepcopy(existing_blob)
return existing_blob if read_only else copy.deepcopy(existing_blob)

async with self.db_wrapper.reader() as reader:
cursor = await reader.execute(
Expand Down Expand Up @@ -514,10 +514,17 @@ async def build_blob_from_nodes(
terminal_nodes: dict[bytes32, tuple[KVId, KVId]],
node_hash: bytes32,
merkle_blob: MerkleBlob,
store_id: bytes32,
) -> TreeIndex:
if node_hash not in terminal_nodes and node_hash not in internal_nodes:
async with self.db_wrapper.reader() as reader:
cursor = await reader.execute("SELECT root_hash, idx FROM nodes WHERE hash = ?", (node_hash,))
cursor = await reader.execute(
"SELECT root_hash, idx FROM nodes WHERE hash = ? AND store_id = ?",
(
node_hash,
store_id,
),
)

row = await cursor.fetchone()
if row is None:
Expand All @@ -526,7 +533,7 @@ async def build_blob_from_nodes(
root_hash = row["root_hash"]
index = row["idx"]

other_merkle_blob = await self.get_merkle_blob(root_hash)
other_merkle_blob = await self.get_merkle_blob(root_hash, read_only=True)
nodes = other_merkle_blob.get_nodes_with_indexes(index=index)
index_to_hash = {index: bytes32(node.hash) for index, node in nodes}
for _, node in nodes:
Expand Down Expand Up @@ -557,8 +564,12 @@ async def build_blob_from_nodes(
),
)
left_hash, right_hash = internal_nodes[node_hash]
left_index = await self.build_blob_from_nodes(internal_nodes, terminal_nodes, left_hash, merkle_blob)
right_index = await self.build_blob_from_nodes(internal_nodes, terminal_nodes, right_hash, merkle_blob)
left_index = await self.build_blob_from_nodes(
internal_nodes, terminal_nodes, left_hash, merkle_blob, store_id
)
right_index = await self.build_blob_from_nodes(
internal_nodes, terminal_nodes, right_hash, merkle_blob, store_id
)
for child_index in (left_index, right_index):
merkle_blob.update_entry(index=child_index, parent=index)
merkle_blob.update_entry(index=index, left=left_index, right=right_index)
Expand Down

0 comments on commit 9e6f75d

Please sign in to comment.