Skip to content

Commit

Permalink
Merge branch 'main' into clvm_mypie
Browse files Browse the repository at this point in the history
  • Loading branch information
altendky committed Dec 27, 2023
2 parents 14362a8 + cd78dba commit b8bcc46
Show file tree
Hide file tree
Showing 17 changed files with 186 additions and 119 deletions.
11 changes: 8 additions & 3 deletions benchmarks/mempool-long-lived.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import asyncio
from dataclasses import dataclass
from time import monotonic
from typing import Dict, Optional
from typing import Collection, Dict, List, Optional

from chia_rs import G2Element
from clvm.casts import int_to_bytes
Expand Down Expand Up @@ -81,8 +81,13 @@ def fake_block_record(block_height: uint32, timestamp: uint64) -> BenchBlockReco
async def run_mempool_benchmark() -> None:
coin_records: Dict[bytes32, CoinRecord] = {}

async def get_coin_record(coin_id: bytes32) -> Optional[CoinRecord]:
return coin_records.get(coin_id)
async def get_coin_record(coin_ids: Collection[bytes32]) -> List[CoinRecord]:
ret: List[CoinRecord] = []
for name in coin_ids:
r = coin_records.get(name)
if r is not None:
ret.append(r)
return ret

timestamp = uint64(1631794488)

Expand Down
15 changes: 10 additions & 5 deletions benchmarks/mempool.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dataclasses import dataclass
from subprocess import check_call
from time import monotonic
from typing import Dict, Iterator, List, Optional, Tuple
from typing import Collection, Dict, Iterator, List, Optional, Tuple

from chia.consensus.coinbase import create_farmer_coin, create_pool_coin
from chia.consensus.default_constants import DEFAULT_CONSTANTS
Expand Down Expand Up @@ -78,8 +78,13 @@ def fake_block_record(block_height: uint32, timestamp: uint64) -> BenchBlockReco
async def run_mempool_benchmark() -> None:
all_coins: Dict[bytes32, CoinRecord] = {}

async def get_coin_record(coin_id: bytes32) -> Optional[CoinRecord]:
return all_coins.get(coin_id)
async def get_coin_records(coin_ids: Collection[bytes32]) -> List[CoinRecord]:
ret: List[CoinRecord] = []
for name in coin_ids:
r = all_coins.get(name)
if r is not None:
ret.append(r)
return ret

wt = WalletTool(DEFAULT_CONSTANTS)

Expand Down Expand Up @@ -156,7 +161,7 @@ async def get_coin_record(coin_id: bytes32) -> Optional[CoinRecord]:
else:
print("\n== Multi-threaded")

mempool = MempoolManager(get_coin_record, DEFAULT_CONSTANTS, single_threaded=single_threaded)
mempool = MempoolManager(get_coin_records, DEFAULT_CONSTANTS, single_threaded=single_threaded)

height = start_height
rec = fake_block_record(height, timestamp)
Expand Down Expand Up @@ -186,7 +191,7 @@ async def add_spend_bundles(spend_bundles: List[SpendBundle]) -> None:
print(f" time: {stop - start:0.4f}s")
print(f" per call: {(stop - start) / total_bundles * 1000:0.2f}ms")

mempool = MempoolManager(get_coin_record, DEFAULT_CONSTANTS, single_threaded=single_threaded)
mempool = MempoolManager(get_coin_records, DEFAULT_CONSTANTS, single_threaded=single_threaded)

height = start_height
rec = fake_block_record(height, timestamp)
Expand Down
2 changes: 1 addition & 1 deletion chia/clvm/spend_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ async def managed(

async with DBWrapper2.managed(database=uri, uri=True, reader_count=1, db_version=2) as self.db_wrapper:
self.coin_store = await CoinStore.create(self.db_wrapper)
self.mempool_manager = MempoolManager(self.coin_store.get_coin_record, defaults)
self.mempool_manager = MempoolManager(self.coin_store.get_coin_records, defaults)
self.defaults = defaults

# Load the next data if there is any
Expand Down
2 changes: 1 addition & 1 deletion chia/full_node/coin_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async def get_coin_record(self, coin_name: bytes32) -> Optional[CoinRecord]:
return CoinRecord(coin, row[0], row[1], row[2], row[6])
return None

async def get_coin_records(self, names: List[bytes32]) -> List[CoinRecord]:
async def get_coin_records(self, names: Collection[bytes32]) -> List[CoinRecord]:
if len(names) == 0:
return []

Expand Down
10 changes: 2 additions & 8 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,6 @@ class FullNode:
_add_transaction_semaphore: Optional[asyncio.Semaphore] = None
_db_wrapper: Optional[DBWrapper2] = None
_hint_store: Optional[HintStore] = None
transaction_responses: List[Tuple[bytes32, MempoolInclusionStatus, Optional[Err]]] = dataclasses.field(
default_factory=list
)
_block_store: Optional[BlockStore] = None
_coin_store: Optional[CoinStore] = None
_mempool_manager: Optional[MempoolManager] = None
Expand Down Expand Up @@ -276,7 +273,7 @@ async def manage(self) -> AsyncIterator[None]:
)

self._mempool_manager = MempoolManager(
get_coin_record=self.coin_store.get_coin_record,
get_coin_records=self.coin_store.get_coin_records,
consensus_constants=self.constants,
multiprocessing_context=self.multiprocessing_context,
single_threaded=single_threaded,
Expand All @@ -285,7 +282,6 @@ async def manage(self) -> AsyncIterator[None]:
# Transactions go into this queue from the server, and get sent to respond_transaction
self._transaction_queue = TransactionQueue(1000, self.log)
self._transaction_queue_task: asyncio.Task[None] = asyncio.create_task(self._handle_transactions())
self.transaction_responses = []

self._init_weight_proof = asyncio.create_task(self.initialize_weight_proof())

Expand Down Expand Up @@ -470,9 +466,7 @@ async def _handle_one_transaction(self, entry: TransactionQueueEntry) -> None:
peer = entry.peer
try:
inc_status, err = await self.add_transaction(entry.transaction, entry.spend_name, peer, entry.test)
self.transaction_responses.append((entry.spend_name, inc_status, err))
if len(self.transaction_responses) > 50:
self.transaction_responses = self.transaction_responses[1:]
entry.done.set_result((inc_status, err))
except asyncio.CancelledError:
error_stack = traceback.format_exc()
self.log.debug(f"Cancelling _handle_one_transaction, closing: {error_stack}")
Expand Down
23 changes: 7 additions & 16 deletions chia/full_node/full_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple

import anyio
from chia_rs import AugSchemeMPL, G1Element, G2Element
from chiabip158 import PyBIP158

Expand Down Expand Up @@ -1271,22 +1272,12 @@ async def send_transaction(
response = wallet_protocol.TransactionAck(spend_name, uint8(MempoolInclusionStatus.SUCCESS), None)
return make_msg(ProtocolMessageTypes.transaction_ack, response)

await self.full_node.transaction_queue.put(
TransactionQueueEntry(request.transaction, None, spend_name, None, test), peer_id=None, high_priority=True
)
# Waits for the transaction to go into the mempool, times out after 45 seconds.
status, error = None, None
sleep_time = 0.01
for i in range(int(45 / sleep_time)):
await asyncio.sleep(sleep_time)
for potential_name, potential_status, potential_error in self.full_node.transaction_responses:
if spend_name == potential_name:
status = potential_status
error = potential_error
break
if status is not None:
break
if status is None:
queue_entry = TransactionQueueEntry(request.transaction, None, spend_name, None, test)
await self.full_node.transaction_queue.put(queue_entry, peer_id=None, high_priority=True)
try:
with anyio.fail_after(delay=45):
status, error = await queue_entry.done
except TimeoutError:
response = wallet_protocol.TransactionAck(spend_name, uint8(MempoolInclusionStatus.PENDING), None)
else:
error_name = error.name if error is not None else None
Expand Down
73 changes: 59 additions & 14 deletions chia/full_node/mempool_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from concurrent.futures.process import ProcessPoolExecutor
from dataclasses import dataclass
from multiprocessing.context import BaseContext
from typing import Awaitable, Callable, Dict, List, Optional, Set, Tuple, TypeVar
from typing import Awaitable, Callable, Collection, Dict, List, Optional, Set, Tuple, TypeVar

from chia_rs import ELIGIBLE_FOR_DEDUP, GTElement
from chiabip158 import PyBIP158
Expand Down Expand Up @@ -152,7 +152,7 @@ class MempoolManager:
pool: Executor
constants: ConsensusConstants
seen_bundle_hashes: Dict[bytes32, bytes32]
get_coin_record: Callable[[bytes32], Awaitable[Optional[CoinRecord]]]
get_coin_records: Callable[[Collection[bytes32]], Awaitable[List[CoinRecord]]]
nonzero_fee_minimum_fpc: int
mempool_max_total_cost: int
# a cache of MempoolItems that conflict with existing items in the pool
Expand All @@ -165,7 +165,7 @@ class MempoolManager:

def __init__(
self,
get_coin_record: Callable[[bytes32], Awaitable[Optional[CoinRecord]]],
get_coin_records: Callable[[Collection[bytes32]], Awaitable[List[CoinRecord]]],
consensus_constants: ConsensusConstants,
multiprocessing_context: Optional[BaseContext] = None,
*,
Expand All @@ -176,7 +176,7 @@ def __init__(
# Keep track of seen spend_bundles
self.seen_bundle_hashes: Dict[bytes32, bytes32] = {}

self.get_coin_record = get_coin_record
self.get_coin_records = get_coin_records

# The fee per cost must be above this amount to consider the fee "nonzero", and thus able to kick out other
# transactions. This prevents spam. This is equivalent to 0.055 XCH per block, or about 0.00005 XCH for two
Expand Down Expand Up @@ -310,7 +310,12 @@ async def pre_validate_spendbundle(
return ret

async def add_spend_bundle(
self, new_spend: SpendBundle, npc_result: NPCResult, spend_name: bytes32, first_added_height: uint32
self,
new_spend: SpendBundle,
npc_result: NPCResult,
spend_name: bytes32,
first_added_height: uint32,
get_coin_records: Optional[Callable[[Collection[bytes32]], Awaitable[List[CoinRecord]]]] = None,
) -> Tuple[Optional[uint64], MempoolInclusionStatus, Optional[Err]]:
"""
Validates and adds to mempool a new_spend with the given NPCResult, and spend_name, and the current mempool.
Expand All @@ -334,8 +339,14 @@ async def add_spend_bundle(
if existing_item is not None:
return existing_item.cost, MempoolInclusionStatus.SUCCESS, None

if get_coin_records is None:
get_coin_records = self.get_coin_records
err, item, remove_items = await self.validate_spend_bundle(
new_spend, npc_result, spend_name, first_added_height
new_spend,
npc_result,
spend_name,
first_added_height,
get_coin_records,
)
if err is None:
# No error, immediately add to mempool, after removing conflicting TXs.
Expand Down Expand Up @@ -365,6 +376,7 @@ async def validate_spend_bundle(
npc_result: NPCResult,
spend_name: bytes32,
first_added_height: uint32,
get_coin_records: Callable[[Collection[bytes32]], Awaitable[List[CoinRecord]]],
) -> Tuple[Optional[Err], Optional[MempoolItem], List[bytes32]]:
"""
Validates new_spend with the given NPCResult, and spend_name, and the current mempool. The mempool should
Expand Down Expand Up @@ -427,11 +439,14 @@ async def validate_spend_bundle(

removal_record_dict: Dict[bytes32, CoinRecord] = {}
removal_amount: int = 0
removal_records = await get_coin_records(removal_names)
for record in removal_records:
removal_record_dict[record.coin.name()] = record

for name in removal_names:
removal_record = await self.get_coin_record(name)
if removal_record is None and name not in additions_dict:
if name not in removal_record_dict and name not in additions_dict:
return Err.UNKNOWN_UNSPENT, None, []
elif name in additions_dict:
if name in additions_dict:
removal_coin = additions_dict[name]
# The timestamp and block-height of this coin being spent needs
# to be consistent with what we use to check time-lock
Expand All @@ -447,10 +462,10 @@ async def validate_spend_bundle(
False,
self.peak.timestamp,
)

assert removal_record is not None
removal_record_dict[name] = removal_record
else:
removal_record = removal_record_dict[name]
removal_amount = removal_amount + removal_record.coin.amount
removal_record_dict[name] = removal_record

fees = uint64(removal_amount - addition_amount)

Expand Down Expand Up @@ -641,9 +656,35 @@ async def new_peak(
old_pool = self.mempool
self.mempool = Mempool(old_pool.mempool_info, old_pool.fee_estimator)
self.seen_bundle_hashes = {}

# in order to make this a bit quicker, we look-up all the spends in
# a single query, rather than one at a time.
coin_records: Dict[bytes32, CoinRecord] = {}

removals: Set[bytes32] = set()
for item in old_pool.all_items():
for s in item.spend_bundle.coin_spends:
removals.add(s.coin.name())

for record in await self.get_coin_records(removals):
name = record.coin.name()
coin_records[name] = record

async def local_get_coin_records(names: Collection[bytes32]) -> List[CoinRecord]:
ret: List[CoinRecord] = []
for name in names:
r = coin_records.get(name)
if r is not None:
ret.append(r)
return ret

for item in old_pool.all_items():
_, result, err = await self.add_spend_bundle(
item.spend_bundle, item.npc_result, item.spend_bundle_name, item.height_added_to_mempool
item.spend_bundle,
item.npc_result,
item.spend_bundle_name,
item.height_added_to_mempool,
local_get_coin_records,
)
# Only add to `seen` if inclusion worked, so it can be resubmitted in case of a reorg
if result == MempoolInclusionStatus.SUCCESS:
Expand All @@ -660,7 +701,11 @@ async def new_peak(
txs_added = []
for item in potential_txs.values():
cost, status, error = await self.add_spend_bundle(
item.spend_bundle, item.npc_result, item.spend_bundle_name, item.height_added_to_mempool
item.spend_bundle,
item.npc_result,
item.spend_bundle_name,
item.height_added_to_mempool,
self.get_coin_records,
)
if status == MempoolInclusionStatus.SUCCESS:
txs_added.append((item.spend_bundle, item.npc_result, item.spend_bundle_name))
Expand Down
31 changes: 13 additions & 18 deletions chia/types/transaction_queue_entry.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Optional
import asyncio
from dataclasses import dataclass, field
from typing import Optional, Tuple

from chia.server.ws_connection import WSChiaConnection
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
from chia.types.spend_bundle import SpendBundle
from chia.util.errors import Err


@dataclass(frozen=True)
Expand All @@ -14,20 +17,12 @@ class TransactionQueueEntry:
A transaction received from peer. This is put into a queue, and not yet in the mempool.
"""

transaction: SpendBundle
transaction_bytes: Optional[bytes]
transaction: SpendBundle = field(compare=False)
transaction_bytes: Optional[bytes] = field(compare=False)
spend_name: bytes32
peer: Optional[WSChiaConnection]
test: bool

def __lt__(self, other: TransactionQueueEntry) -> bool:
return self.spend_name < other.spend_name

def __le__(self, other: TransactionQueueEntry) -> bool:
return self.spend_name <= other.spend_name

def __gt__(self, other: TransactionQueueEntry) -> bool:
return self.spend_name > other.spend_name

def __ge__(self, other: TransactionQueueEntry) -> bool:
return self.spend_name >= other.spend_name
peer: Optional[WSChiaConnection] = field(compare=False)
test: bool = field(compare=False)
done: asyncio.Future[Tuple[MempoolInclusionStatus, Optional[Err]]] = field(
default_factory=asyncio.Future,
compare=False,
)
6 changes: 3 additions & 3 deletions chia/wallet/did_wallet/did_wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,9 @@ async def generate_new_decentralised_id(
origin_id=origin.name(),
)

if tx_record.spend_bundle is None:
return None

genesis_launcher_solution = Program.to([did_puzzle_hash, amount, bytes(0x80)])

launcher_cs = make_spend(launcher_coin, genesis_launcher_puz, genesis_launcher_solution)
Expand All @@ -1317,9 +1320,6 @@ async def generate_new_decentralised_id(
await self.add_parent(eve_coin.parent_coin_info, eve_parent)
await self.add_parent(eve_coin.name(), future_parent)

if tx_record.spend_bundle is None:
return None

# Only want to save this information if the transaction is valid
did_info = DIDInfo(
origin_coin=launcher_coin,
Expand Down
2 changes: 1 addition & 1 deletion chia/wallet/puzzles/clawback/drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import logging
from typing import Any, List, Optional, Set, Union

from chia.types.blockchain_format.serialized_program import SerializedProgram
from chia.consensus.default_constants import DEFAULT_CONSTANTS
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.serialized_program import SerializedProgram
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_spend import CoinSpend, make_spend
from chia.types.condition_opcodes import ConditionOpcode
Expand Down
Loading

0 comments on commit b8bcc46

Please sign in to comment.