Skip to content

Commit

Permalink
Follow up to #70 - Fix #97 (#103)
Browse files Browse the repository at this point in the history
Fixes:

- Race conditions/weirdness when last player delays things and he broadcasts late and the other 
  players have already given up (#70, #97)
- Found another race condition where coins got erroneously unfrozen and their reserved addresses 
  unreserved when the protocol threads time out and are killed by the master 
  backgroundshufflingthread (oops!) -- glad I caught this!
- Clean up / trivial nit in wallet.py
- Some misc. clean ups and simplification and/or rm dead code
  • Loading branch information
cculianu authored Mar 15, 2019
1 parent 0bb3caa commit f675ca5
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 25 deletions.
8 changes: 4 additions & 4 deletions lib/wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1187,16 +1187,16 @@ def set_frozen_coin_state(self, utxos, freeze):
for utxo in utxos:
if isinstance(utxo, str):
if freeze:
self.frozen_coins |= { utxo }
self.frozen_coins.add( utxo )
else:
self.frozen_coins -= { utxo }
self.frozen_coins.discard( utxo )
ok += 1
elif isinstance(utxo, dict) and self.is_mine(utxo['address']):
txo = "{}:{}".format(utxo['prevout_hash'], utxo['prevout_n'])
if freeze:
self.frozen_coins |= { txo }
self.frozen_coins.add( txo )
else:
self.frozen_coins -= { txo }
self.frozen_coins.discard( txo )
utxo['is_frozen_coin'] = bool(freeze)
ok += 1
if ok:
Expand Down
99 changes: 80 additions & 19 deletions plugins/shuffle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def __init__(self, *, host, port, coin,
addr_new_addr, change_addr, version, coin_value,
logger=None, ssl=False,
comm_timeout=60.0, ctimeout=5.0,
reserved_change=True,
typ=Messages.DEFAULT # NB: For now only 'DEFAULT' type is supported
):

Expand Down Expand Up @@ -70,11 +69,11 @@ def __init__(self, *, host, port, coin,
self.addr_new = addr_new_addr.to_storage_string() # used by internal protocol code
self.change_addr = change_addr #outside
self.change = change_addr.to_storage_string() #inside
self.reserved_change = reserved_change
self.protocol = None
self.tx = None
self.ts = time.time()
self.done = threading.Event()
self.already_did_cleanup = False # if this flag is set, stop_protocol_thread should avoid doing any cleanup

def not_time_to_die(func):
"Check if 'done' event appear"
Expand Down Expand Up @@ -327,6 +326,12 @@ def __init__(self, window, wallet, network_settings,
self._paused = False
self._coins_busy_shuffling = set() # 'prevout_hash:n' (name) set of all coins that are currently being shuffled by a ProtocolThread. Both wallet locks should be held to read/write this.
self._last_server_check = 0.0 # timestamp in seconds unix time
self._dummy_address = wallet.dummy_address() # usually receiving[0] ... address to use when we don't want to use an address
# below 4 vars are related to the "delayed unreserve address" mechanism as part of the bug #70 & #97 workaround and the complexity created by it..
self._delayed_unreserve_addresses = dict() # dict of Address -> time.time() timestamp when its shuffle ended
self._last_delayed_unreserve_check = 0.0 # timestamp in seconds unix time
self._delayed_unreserve_check_interval = 60.0 # check these addresses every 60 seconds.
self._delayed_unreserve_timeout = 600.0 # how long before the delayed-unreserve addresses expire; 10 minutes

def set_password(self, password):
with self.lock:
Expand Down Expand Up @@ -425,6 +430,7 @@ def run(self):
continue
self.check_server_if_errored_or_not_checked_in_a_while() # NB: this normally is a noop but if server port is bad or not checked in a while, blocks for up to 10.0 seconds
self.check_idle_threads()
self.check_delayed_unreserve_addresses()
self.print_error("Stopped")
finally:
self._unreserve_addresses()
Expand Down Expand Up @@ -465,7 +471,6 @@ def _do_check_server(timeout, ssl_verify):
self.logger.send(MSG_SERVER_OK, "MAINLOG")
return True


def check_idle_threads(self):
if self.stop_flg.is_set():
return
Expand All @@ -485,6 +490,31 @@ def check_idle_threads(self):
self.done_utxos.pop(utxo, None)
self.logger.send("forget {}".format(utxo), "MAINLOG")

def check_delayed_unreserve_addresses(self):
''' Expire addresses put in the "delayed unreserve" dict which are
>600 seconds old.'''
if self.stop_flg.is_set():
return
now = time.time()
if not self._last_delayed_unreserve_check:
self._last_delayed_unreserve_check = now
return
if now - self._last_delayed_unreserve_check > self._delayed_unreserve_check_interval:
self._last_delayed_unreserve_check = now
ct = 0
with self.wallet.lock, self.wallet.transaction_lock:
# We use the above 2 locks to ensure GUI or other threads don't
# touch the addresses_cashshuffle_reserved set while we mutate it.
# This code path is executed very infrequently so it's not really
# a huge hit.
for addr, ts in self._delayed_unreserve_addresses.copy().items():
if now - ts > self._delayed_unreserve_timeout:
self.wallet._addresses_cashshuffle_reserved.discard(addr)
self._delayed_unreserve_addresses.pop(addr, None)
ct += 1
if ct:
self.print_error("Freed {} 'delayed unreserve' addresses in {:.02f} msec".format(ct, (time.time()-now)*1e3))

def process_shared_chan(self):
timeLeft = 0.0 # this variable is modified by _loopCondition() call below
def _loopCondition(t0):
Expand Down Expand Up @@ -571,34 +601,65 @@ def _unreserve_addresses(self):
def stop_protocol_thread(self, thr, scale, sender, message):
self.print_error("Stop protocol thread for scale: {}".format(scale))
retVal = False
if sender:
if sender and not thr.already_did_cleanup:
if message.endswith('complete protocol'):
# remember this 'just spent' coin for self.timeout amount of
# time as a guard to ensure that we wait for the tx to show
# up in the wallet before considerng it again for shuffling
self.done_utxos[sender] = time.time()
retVal = True # indicate to interesteed callers that we had a completion. Our thread loop uses this retval to decide to scan for UTXOs to shuffle immediately.
if thr.protocol and not thr.protocol.did_use_change:
# The change address was not used.
# Immediately unreserve this change_addr (may be dummy_address,
# which is a harmless noop) so that it doesn't 'leak'
# (create gaps) and so that other threads may reserve it
# for shuffles immediately.
self.wallet._addresses_cashshuffle_reserved.discard(thr.change_addr)
# Note that when "complete protocol" happens we never unreserve
# reserved output addresses, as a paranoia measure. This is
# because technically there are tx's with those addresses "in
# the wild" which are signed and can be broadcast at any time.
# A caveat here is if the blockchain is bad or there's lots of
# reorgs or the user is switching forks -- or the completed
# protocol tx is never confirmed -- then the strategy here can
# create gaps in the change addresses.. but that's ok. Next
# app restart or CashShuffle plugin reload those gaps
# will go away since the 'reserved address' set will be cleared.
# The above note is a corner case consideration, though,
# and it's best to err on the side of caution here. In practice
# gaps in the change addresses will be a rare occurrence.
was_fake_change_addr = thr.change_addr == self._dummy_address
need_to_discard_change_if_errored = not was_fake_change_addr
with self.wallet.lock, self.wallet.transaction_lock:
if need_to_discard_change_if_errored and thr.protocol and not thr.protocol.did_use_change:
# The reserved change output address was definitely not used.
# Immediately unreserve this change_addr so that it doesn't
# 'leak' (create gaps) and so that other threads may reserve
# it for shuffles immediately.
self.wallet._addresses_cashshuffle_reserved.discard(thr.change_addr)
need_to_discard_change_if_errored = False
self.wallet.set_frozen_coin_state([sender], False)
self._coins_busy_shuffling.discard(sender)
self.wallet.storage.put(ConfKeys.PerWallet.COINS_FROZEN_BY_SHUFFLING, list(self._coins_busy_shuffling))
if message.startswith("Error"):
# unreserve addresses that were previously reserved iff error
self.wallet._addresses_cashshuffle_reserved.discard(thr.addr_new_addr)
self.wallet._addresses_cashshuffle_reserved.discard(thr.change_addr) # NB: may be a dummy address in which case this is a harmless noop
#self.print_error("Unreserving", thr.addr_new_addr, thr.change_addr)
if thr.protocol and thr.protocol.did_reach_tentative_stage:
# The shuffle got to a stage where maybe the tx will
# get broadcast sometime soon by a lagged player
# (see #70, #97). Since we don't want the next round to
# possibly use the previous round's shuffle addresses
# for output again (that would break privacy), we need
# to mark these addresses to be unreserved at a later
# time rather than right away.
now = time.time()
self._delayed_unreserve_addresses[thr.addr_new_addr] = now
if need_to_discard_change_if_errored:
self._delayed_unreserve_addresses[thr.change_addr] = now
self.print_error("Shuffle of coin {} did reach the 'tentative' stage. Will unreserve its reserved addresses in {} minutes."
.format(sender, self._delayed_unreserve_timeout / 60.0))
else:
# unreserve addresses that were previously reserved iff error
self.wallet._addresses_cashshuffle_reserved.discard(thr.addr_new_addr)
if need_to_discard_change_if_errored:
self.wallet._addresses_cashshuffle_reserved.discard(thr.change_addr)
thr.already_did_cleanup = True # mark this thread as 'cleaned up'. this is necessary because this function may reenter with this thread again later and doing the clean-up twice would create bugs as it would unreserve addresses, etc, that may already be taken
self.tell_gui_to_refresh()
self.logger.send(message, sender)
else:
elif not sender:
self.print_error("No sender! Thr={}".format(str(thr)))
if thr == self.threads[scale]:
if thr is self.threads[scale]:
self.threads[scale] = None
elif thr.is_alive():
self.print_error("WARNING: Stopping thread ({}) which was not in the self.threads dict for scale = {} coin = {}"
Expand Down Expand Up @@ -704,15 +765,15 @@ def get_coin_for_shuffling(scale, coins, scale_lower_bound, scale_upper_bound):
# it definitely won't be used. :/
# We'll just take the wallet API's 'dummy' address (receiving[0]) --
# Don't worry: It's 100% guaranteed we won't be using this address.
change = self.wallet.dummy_address()
change = self._dummy_address
self.print_error("Scale {} Coin {} OutAddr {} {} {} make_protocol_thread".format(scale, utxo_name, output.to_storage_string(), "Change" if may_receive_change else "FakeChange", change.to_storage_string()))
#self.print_error("Reserved addresses:", self.wallet._addresses_cashshuffle_reserved)
ctimeout = 12.5 if (Network.get_instance() and Network.get_instance().get_proxies()) else 5.0 # allow for 12.5 second connection timeouts if using a proxy server
thr = ProtocolThread(host=self.host, port=self.port, ssl=self.ssl,
comm_timeout=self.timeout, ctimeout=ctimeout, # comm timeout and connect timeout
coin=utxo_name,
scale=scale, fee=self.FEE, coin_value=coin['value'],
addr_new_addr=output, change_addr=change, reserved_change=may_receive_change,
addr_new_addr=output, change_addr=change,
sk=id_sk, sks=sks, inputs=inputs, pubk=id_pub,
logger=None, version=self.version, typ=self.type)
thr.logger = ChannelSendLambda(lambda msg: self.protocol_thread_callback(thr, msg))
Expand Down
6 changes: 4 additions & 2 deletions plugins/shuffle/coin_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(self, coin_utils, crypto, messages,
self.transaction = None
self.tx = None
self.did_use_change = True # This will get recomputed later as the shuffle proceeds based on actual amounts in shuffle (#68)
self.did_reach_tentative_stage = False
self.done = None
if self.number_of_players == len(set(players.values())):
if self.vk in players.values():
Expand Down Expand Up @@ -320,8 +321,9 @@ def process_equivocation_check(self):
self.messages.add_signatures(signatures)
self.send_message()
self.log_message("send transction signatures")
# workaround for issue #70
self.logchan.send("add_tentative_shuffle: {}".format(self._get_tentative_shuffle_string()))
# workaround for issue #70, #97
self.did_reach_tentative_stage = True # Flag that tells BackgroungShuffleThread not to unreserve our output address (for a time) if things go bad and we fail, since after this point the tx may end up broadcast by a lagged client (see #70)
self.logchan.send("add_tentative_shuffle: {}".format(self._get_tentative_shuffle_string())) # workaround for #70


def process_verification_and_submission(self):
Expand Down

0 comments on commit f675ca5

Please sign in to comment.