From afba6730858b0050cab4b0fcbe5d0c11d4503f61 Mon Sep 17 00:00:00 2001 From: tecnovert Date: Sun, 11 Sep 2022 17:16:51 +0200 Subject: [PATCH] refactor: Use persistent connection when expiring messages. --- basicswap/__init__.py | 2 +- basicswap/basicswap.py | 42 ++++++++++++++++++++++++++++-------------- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/basicswap/__init__.py b/basicswap/__init__.py index 3e17ae5..9953fb3 100644 --- a/basicswap/__init__.py +++ b/basicswap/__init__.py @@ -1,3 +1,3 @@ name = "basicswap" -__version__ = "0.11.36" +__version__ = "0.11.37" diff --git a/basicswap/basicswap.py b/basicswap/basicswap.py index a532d2e..da2b711 100644 --- a/basicswap/basicswap.py +++ b/basicswap/basicswap.py @@ -155,20 +155,26 @@ def zeroIfNone(value): return value -def threadPollChainState(swap_client, coin_type): +def threadPollXMRChainState(swap_client, coin_type): + ci = swap_client.ci(coin_type) + cc = swap_client.coin_clients[coin_type] while not swap_client.delay_event.is_set(): try: - ci = swap_client.ci(coin_type) - cc = swap_client.coin_clients[coin_type] - if coin_type == Coins.XMR: - new_height = ci.getChainHeight() - if new_height != cc['chain_height']: - swap_client.log.debug('New {} block at height: {}'.format(str(coin_type), new_height)) - with swap_client.mxDB: - cc['chain_height'] = new_height - continue + new_height = ci.getChainHeight() + if new_height != cc['chain_height']: + swap_client.log.debug('New {} block at height: {}'.format(str(coin_type), new_height)) + with swap_client.mxDB: + cc['chain_height'] = new_height + except Exception as e: + swap_client.log.warning('threadPollXMRChainState {}, error: {}'.format(str(coin_type), str(e))) + swap_client.delay_event.wait(random.randrange(20, 30)) # random to stagger updates - # Not XMR + +def threadPollChainState(swap_client, coin_type): + ci = swap_client.ci(coin_type) + cc = swap_client.coin_clients[coin_type] + while not swap_client.delay_event.is_set(): + try: chain_state = ci.getBlockchainInfo() if chain_state['bestblockhash'] != cc['chain_best_block']: swap_client.log.debug('New {} block at height: {}'.format(str(coin_type), chain_state['blocks'])) @@ -572,7 +578,10 @@ class BasicSwap(BaseApp): self.log.info('%s Core version %d', ci.coin_name(), core_version) self.coin_clients[c]['core_version'] = core_version - t = threading.Thread(target=threadPollChainState, args=(self, c)) + if c == Coins.XMR: + t = threading.Thread(target=threadPollXMRChainState, args=(self, c)) + else: + t = threading.Thread(target=threadPollChainState, args=(self, c)) self.threads.append(t) t.start() @@ -3553,10 +3562,13 @@ class BasicSwap(BaseApp): def expireMessages(self): self.mxDB.acquire() + rpc_conn = None try: + ci_part = self.ci(Coins.PART) + rpc_conn = ci_part.open_rpc() now = int(time.time()) options = {'encoding': 'none'} - ro = self.callrpc('smsginbox', ['all', '', options]) + ro = ci_part.json_request(rpc_conn, 'smsginbox', ['all', '', options]) num_messages = 0 num_removed = 0 for msg in ro['messages']: @@ -3564,7 +3576,7 @@ class BasicSwap(BaseApp): expire_at = msg['sent'] + msg['ttl'] if expire_at < now: options = {'encoding': 'none', 'delete': True} - del_msg = self.callrpc('smsg', [msg['msgid'], options]) + del_msg = ci_part.json_request(rpc_conn, 'smsg', [msg['msgid'], options]) num_removed += 1 if num_messages + num_removed > 0: @@ -3573,6 +3585,8 @@ class BasicSwap(BaseApp): self.log.debug('TODO: Expire records from db') finally: + if rpc_conn: + ci_part.close_rpc(rpc_conn) self.mxDB.release() def countQueuedActions(self, session, bid_id, action_type):