Skip to content

Commit

Permalink
Optimize insert from delta file.
Browse files Browse the repository at this point in the history
  • Loading branch information
fchirica committed Jan 10, 2025
1 parent 1c9e210 commit a8fc678
Showing 1 changed file with 38 additions and 24 deletions.
62 changes: 38 additions & 24 deletions chia/data_layer/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,43 @@ async def insert_into_data_store_from_file(
node_hash = leaf_hash(serialized_node.value1, serialized_node.value2)
terminal_nodes[node_hash] = (kid, vid)

missing_hashes: list[bytes32] = []
merkle_blob_queries: dict[bytes32, list[int]] = defaultdict(list)

for _, (left, right) in internal_nodes.items():
for node_hash in (left, right):
if node_hash not in internal_nodes and node_hash not in terminal_nodes:
missing_hashes.append(node_hash)

async with self.db_wrapper.reader() as reader:
for node_hash in missing_hashes:
cursor = await reader.execute(
"SELECT root_hash, idx FROM nodes WHERE hash = ? AND store_id = ?",
(
node_hash,
store_id,
),
)

row = await cursor.fetchone()
if row is None:
raise Exception(f"Unknown hash {node_hash.hex()}")

root_hash_blob = row["root_hash"]
index = row["idx"]
merkle_blob_queries[bytes32(root_hash_blob)].append(index)

for root_hash_blob, indexes in merkle_blob_queries.items():
merkle_blob = await self.get_merkle_blob(root_hash_blob, read_only=True)
for index in indexes:
nodes = merkle_blob.get_nodes_with_indexes(index=index)
index_to_hash = {index: bytes32(node.hash) for index, node in nodes}
for _, node in nodes:
if isinstance(node, RawLeafMerkleNode):
terminal_nodes[bytes32(node.hash)] = (node.key, node.value)
elif isinstance(node, RawInternalMerkleNode):
internal_nodes[bytes32(node.hash)] = (index_to_hash[node.left], index_to_hash[node.right])

merkle_blob = MerkleBlob(blob=bytearray())
if root_hash is not None:
await self.build_blob_from_nodes(internal_nodes, terminal_nodes, root_hash, merkle_blob, store_id)
Expand Down Expand Up @@ -534,30 +571,7 @@ async def build_blob_from_nodes(
store_id: bytes32,
) -> TreeIndex:
if node_hash not in terminal_nodes and node_hash not in internal_nodes:
async with self.db_wrapper.reader() as reader:
cursor = await reader.execute(
"SELECT root_hash, idx FROM nodes WHERE hash = ? AND store_id = ?",
(
node_hash,
store_id,
),
)

row = await cursor.fetchone()
if row is None:
raise Exception(f"Unknown hash {node_hash}")

root_hash = row["root_hash"]
index = row["idx"]

other_merkle_blob = await self.get_merkle_blob(root_hash, read_only=True)
nodes = other_merkle_blob.get_nodes_with_indexes(index=index)
index_to_hash = {index: bytes32(node.hash) for index, node in nodes}
for _, node in nodes:
if isinstance(node, RawLeafMerkleNode):
terminal_nodes[bytes32(node.hash)] = (node.key, node.value)
elif isinstance(node, RawInternalMerkleNode):
internal_nodes[bytes32(node.hash)] = (index_to_hash[node.left], index_to_hash[node.right])
raise Exception(f"Unknown hash {node_hash.hex()}")

index = merkle_blob.get_new_index()
if node_hash in terminal_nodes:
Expand Down

0 comments on commit a8fc678

Please sign in to comment.