From f675ca5afa31bf4cbd4705453263783db6c0a6c5 Mon Sep 17 00:00:00 2001 From: Calin Culianu Date: Fri, 15 Mar 2019 12:50:45 +0200 Subject: [PATCH] Follow up to #70 - Fix #97 (#103) Fixes: - Race conditions/weirdness when last player delays things and he broadcasts late and the other players have already given up (#70, #97) - Found another race condition where coins got erroneously unfrozen and their reserved addresses unreserved when the protocol threads time out and are killed by the master backgroundshufflingthread (oops!) -- glad I caught this! - Clean up / trivial nit in wallet.py - Some misc. clean ups and simplification and/or rm dead code --- lib/wallet.py | 8 +-- plugins/shuffle/client.py | 99 ++++++++++++++++++++++++++------- plugins/shuffle/coin_shuffle.py | 6 +- 3 files changed, 88 insertions(+), 25 deletions(-) diff --git a/lib/wallet.py b/lib/wallet.py index 7afebd45f0d1..44c1cef8dc49 100644 --- a/lib/wallet.py +++ b/lib/wallet.py @@ -1187,16 +1187,16 @@ def set_frozen_coin_state(self, utxos, freeze): for utxo in utxos: if isinstance(utxo, str): if freeze: - self.frozen_coins |= { utxo } + self.frozen_coins.add( utxo ) else: - self.frozen_coins -= { utxo } + self.frozen_coins.discard( utxo ) ok += 1 elif isinstance(utxo, dict) and self.is_mine(utxo['address']): txo = "{}:{}".format(utxo['prevout_hash'], utxo['prevout_n']) if freeze: - self.frozen_coins |= { txo } + self.frozen_coins.add( txo ) else: - self.frozen_coins -= { txo } + self.frozen_coins.discard( txo ) utxo['is_frozen_coin'] = bool(freeze) ok += 1 if ok: diff --git a/plugins/shuffle/client.py b/plugins/shuffle/client.py index 584fceeffcd4..db6a1356a9f0 100644 --- a/plugins/shuffle/client.py +++ b/plugins/shuffle/client.py @@ -37,7 +37,6 @@ def __init__(self, *, host, port, coin, addr_new_addr, change_addr, version, coin_value, logger=None, ssl=False, comm_timeout=60.0, ctimeout=5.0, - reserved_change=True, typ=Messages.DEFAULT # NB: For now only 'DEFAULT' type is supported ): @@ -70,11 +69,11 @@ def __init__(self, *, host, port, coin, self.addr_new = addr_new_addr.to_storage_string() # used by internal protocol code self.change_addr = change_addr #outside self.change = change_addr.to_storage_string() #inside - self.reserved_change = reserved_change self.protocol = None self.tx = None self.ts = time.time() self.done = threading.Event() + self.already_did_cleanup = False # if this flag is set, stop_protocol_thread should avoid doing any cleanup def not_time_to_die(func): "Check if 'done' event appear" @@ -327,6 +326,12 @@ def __init__(self, window, wallet, network_settings, self._paused = False self._coins_busy_shuffling = set() # 'prevout_hash:n' (name) set of all coins that are currently being shuffled by a ProtocolThread. Both wallet locks should be held to read/write this. self._last_server_check = 0.0 # timestamp in seconds unix time + self._dummy_address = wallet.dummy_address() # usually receiving[0] ... address to use when we don't want to use an address + # below 4 vars are related to the "delayed unreserve address" mechanism as part of the bug #70 & #97 workaround and the complexity created by it.. + self._delayed_unreserve_addresses = dict() # dict of Address -> time.time() timestamp when its shuffle ended + self._last_delayed_unreserve_check = 0.0 # timestamp in seconds unix time + self._delayed_unreserve_check_interval = 60.0 # check these addresses every 60 seconds. + self._delayed_unreserve_timeout = 600.0 # how long before the delayed-unreserve addresses expire; 10 minutes def set_password(self, password): with self.lock: @@ -425,6 +430,7 @@ def run(self): continue self.check_server_if_errored_or_not_checked_in_a_while() # NB: this normally is a noop but if server port is bad or not checked in a while, blocks for up to 10.0 seconds self.check_idle_threads() + self.check_delayed_unreserve_addresses() self.print_error("Stopped") finally: self._unreserve_addresses() @@ -465,7 +471,6 @@ def _do_check_server(timeout, ssl_verify): self.logger.send(MSG_SERVER_OK, "MAINLOG") return True - def check_idle_threads(self): if self.stop_flg.is_set(): return @@ -485,6 +490,31 @@ def check_idle_threads(self): self.done_utxos.pop(utxo, None) self.logger.send("forget {}".format(utxo), "MAINLOG") + def check_delayed_unreserve_addresses(self): + ''' Expire addresses put in the "delayed unreserve" dict which are + >600 seconds old.''' + if self.stop_flg.is_set(): + return + now = time.time() + if not self._last_delayed_unreserve_check: + self._last_delayed_unreserve_check = now + return + if now - self._last_delayed_unreserve_check > self._delayed_unreserve_check_interval: + self._last_delayed_unreserve_check = now + ct = 0 + with self.wallet.lock, self.wallet.transaction_lock: + # We use the above 2 locks to ensure GUI or other threads don't + # touch the addresses_cashshuffle_reserved set while we mutate it. + # This code path is executed very infrequently so it's not really + # a huge hit. + for addr, ts in self._delayed_unreserve_addresses.copy().items(): + if now - ts > self._delayed_unreserve_timeout: + self.wallet._addresses_cashshuffle_reserved.discard(addr) + self._delayed_unreserve_addresses.pop(addr, None) + ct += 1 + if ct: + self.print_error("Freed {} 'delayed unreserve' addresses in {:.02f} msec".format(ct, (time.time()-now)*1e3)) + def process_shared_chan(self): timeLeft = 0.0 # this variable is modified by _loopCondition() call below def _loopCondition(t0): @@ -571,34 +601,65 @@ def _unreserve_addresses(self): def stop_protocol_thread(self, thr, scale, sender, message): self.print_error("Stop protocol thread for scale: {}".format(scale)) retVal = False - if sender: + if sender and not thr.already_did_cleanup: if message.endswith('complete protocol'): # remember this 'just spent' coin for self.timeout amount of # time as a guard to ensure that we wait for the tx to show # up in the wallet before considerng it again for shuffling self.done_utxos[sender] = time.time() retVal = True # indicate to interesteed callers that we had a completion. Our thread loop uses this retval to decide to scan for UTXOs to shuffle immediately. - if thr.protocol and not thr.protocol.did_use_change: - # The change address was not used. - # Immediately unreserve this change_addr (may be dummy_address, - # which is a harmless noop) so that it doesn't 'leak' - # (create gaps) and so that other threads may reserve it - # for shuffles immediately. - self.wallet._addresses_cashshuffle_reserved.discard(thr.change_addr) + # Note that when "complete protocol" happens we never unreserve + # reserved output addresses, as a paranoia measure. This is + # because technically there are tx's with those addresses "in + # the wild" which are signed and can be broadcast at any time. + # A caveat here is if the blockchain is bad or there's lots of + # reorgs or the user is switching forks -- or the completed + # protocol tx is never confirmed -- then the strategy here can + # create gaps in the change addresses.. but that's ok. Next + # app restart or CashShuffle plugin reload those gaps + # will go away since the 'reserved address' set will be cleared. + # The above note is a corner case consideration, though, + # and it's best to err on the side of caution here. In practice + # gaps in the change addresses will be a rare occurrence. + was_fake_change_addr = thr.change_addr == self._dummy_address + need_to_discard_change_if_errored = not was_fake_change_addr with self.wallet.lock, self.wallet.transaction_lock: + if need_to_discard_change_if_errored and thr.protocol and not thr.protocol.did_use_change: + # The reserved change output address was definitely not used. + # Immediately unreserve this change_addr so that it doesn't + # 'leak' (create gaps) and so that other threads may reserve + # it for shuffles immediately. + self.wallet._addresses_cashshuffle_reserved.discard(thr.change_addr) + need_to_discard_change_if_errored = False self.wallet.set_frozen_coin_state([sender], False) self._coins_busy_shuffling.discard(sender) self.wallet.storage.put(ConfKeys.PerWallet.COINS_FROZEN_BY_SHUFFLING, list(self._coins_busy_shuffling)) if message.startswith("Error"): - # unreserve addresses that were previously reserved iff error - self.wallet._addresses_cashshuffle_reserved.discard(thr.addr_new_addr) - self.wallet._addresses_cashshuffle_reserved.discard(thr.change_addr) # NB: may be a dummy address in which case this is a harmless noop - #self.print_error("Unreserving", thr.addr_new_addr, thr.change_addr) + if thr.protocol and thr.protocol.did_reach_tentative_stage: + # The shuffle got to a stage where maybe the tx will + # get broadcast sometime soon by a lagged player + # (see #70, #97). Since we don't want the next round to + # possibly use the previous round's shuffle addresses + # for output again (that would break privacy), we need + # to mark these addresses to be unreserved at a later + # time rather than right away. + now = time.time() + self._delayed_unreserve_addresses[thr.addr_new_addr] = now + if need_to_discard_change_if_errored: + self._delayed_unreserve_addresses[thr.change_addr] = now + self.print_error("Shuffle of coin {} did reach the 'tentative' stage. Will unreserve its reserved addresses in {} minutes." + .format(sender, self._delayed_unreserve_timeout / 60.0)) + else: + # unreserve addresses that were previously reserved iff error + self.wallet._addresses_cashshuffle_reserved.discard(thr.addr_new_addr) + if need_to_discard_change_if_errored: + self.wallet._addresses_cashshuffle_reserved.discard(thr.change_addr) + thr.already_did_cleanup = True # mark this thread as 'cleaned up'. this is necessary because this function may reenter with this thread again later and doing the clean-up twice would create bugs as it would unreserve addresses, etc, that may already be taken self.tell_gui_to_refresh() self.logger.send(message, sender) - else: + elif not sender: self.print_error("No sender! Thr={}".format(str(thr))) - if thr == self.threads[scale]: + if thr is self.threads[scale]: self.threads[scale] = None elif thr.is_alive(): self.print_error("WARNING: Stopping thread ({}) which was not in the self.threads dict for scale = {} coin = {}" @@ -704,7 +765,7 @@ def get_coin_for_shuffling(scale, coins, scale_lower_bound, scale_upper_bound): # it definitely won't be used. :/ # We'll just take the wallet API's 'dummy' address (receiving[0]) -- # Don't worry: It's 100% guaranteed we won't be using this address. - change = self.wallet.dummy_address() + change = self._dummy_address self.print_error("Scale {} Coin {} OutAddr {} {} {} make_protocol_thread".format(scale, utxo_name, output.to_storage_string(), "Change" if may_receive_change else "FakeChange", change.to_storage_string())) #self.print_error("Reserved addresses:", self.wallet._addresses_cashshuffle_reserved) ctimeout = 12.5 if (Network.get_instance() and Network.get_instance().get_proxies()) else 5.0 # allow for 12.5 second connection timeouts if using a proxy server @@ -712,7 +773,7 @@ def get_coin_for_shuffling(scale, coins, scale_lower_bound, scale_upper_bound): comm_timeout=self.timeout, ctimeout=ctimeout, # comm timeout and connect timeout coin=utxo_name, scale=scale, fee=self.FEE, coin_value=coin['value'], - addr_new_addr=output, change_addr=change, reserved_change=may_receive_change, + addr_new_addr=output, change_addr=change, sk=id_sk, sks=sks, inputs=inputs, pubk=id_pub, logger=None, version=self.version, typ=self.type) thr.logger = ChannelSendLambda(lambda msg: self.protocol_thread_callback(thr, msg)) diff --git a/plugins/shuffle/coin_shuffle.py b/plugins/shuffle/coin_shuffle.py index 059052e2393b..9df4b81e1dcd 100644 --- a/plugins/shuffle/coin_shuffle.py +++ b/plugins/shuffle/coin_shuffle.py @@ -55,6 +55,7 @@ def __init__(self, coin_utils, crypto, messages, self.transaction = None self.tx = None self.did_use_change = True # This will get recomputed later as the shuffle proceeds based on actual amounts in shuffle (#68) + self.did_reach_tentative_stage = False self.done = None if self.number_of_players == len(set(players.values())): if self.vk in players.values(): @@ -320,8 +321,9 @@ def process_equivocation_check(self): self.messages.add_signatures(signatures) self.send_message() self.log_message("send transction signatures") - # workaround for issue #70 - self.logchan.send("add_tentative_shuffle: {}".format(self._get_tentative_shuffle_string())) + # workaround for issue #70, #97 + self.did_reach_tentative_stage = True # Flag that tells BackgroungShuffleThread not to unreserve our output address (for a time) if things go bad and we fail, since after this point the tx may end up broadcast by a lagged client (see #70) + self.logchan.send("add_tentative_shuffle: {}".format(self._get_tentative_shuffle_string())) # workaround for #70 def process_verification_and_submission(self):