refactor: Use persistent connection when expiring messages.

This commit is contained in:
tecnovert 2022-09-11 17:16:51 +02:00
parent 1694e73f92
commit afba673085
No known key found for this signature in database
GPG Key ID: 8ED6D8750C4E3F93
2 changed files with 29 additions and 15 deletions

View File

@ -1,3 +1,3 @@
name = "basicswap" name = "basicswap"
__version__ = "0.11.36" __version__ = "0.11.37"

View File

@ -155,20 +155,26 @@ def zeroIfNone(value):
return value return value
def threadPollChainState(swap_client, coin_type): def threadPollXMRChainState(swap_client, coin_type):
ci = swap_client.ci(coin_type)
cc = swap_client.coin_clients[coin_type]
while not swap_client.delay_event.is_set(): while not swap_client.delay_event.is_set():
try: try:
ci = swap_client.ci(coin_type) new_height = ci.getChainHeight()
cc = swap_client.coin_clients[coin_type] if new_height != cc['chain_height']:
if coin_type == Coins.XMR: swap_client.log.debug('New {} block at height: {}'.format(str(coin_type), new_height))
new_height = ci.getChainHeight() with swap_client.mxDB:
if new_height != cc['chain_height']: cc['chain_height'] = new_height
swap_client.log.debug('New {} block at height: {}'.format(str(coin_type), new_height)) except Exception as e:
with swap_client.mxDB: swap_client.log.warning('threadPollXMRChainState {}, error: {}'.format(str(coin_type), str(e)))
cc['chain_height'] = new_height swap_client.delay_event.wait(random.randrange(20, 30)) # random to stagger updates
continue
# Not XMR
def threadPollChainState(swap_client, coin_type):
ci = swap_client.ci(coin_type)
cc = swap_client.coin_clients[coin_type]
while not swap_client.delay_event.is_set():
try:
chain_state = ci.getBlockchainInfo() chain_state = ci.getBlockchainInfo()
if chain_state['bestblockhash'] != cc['chain_best_block']: if chain_state['bestblockhash'] != cc['chain_best_block']:
swap_client.log.debug('New {} block at height: {}'.format(str(coin_type), chain_state['blocks'])) swap_client.log.debug('New {} block at height: {}'.format(str(coin_type), chain_state['blocks']))
@ -572,7 +578,10 @@ class BasicSwap(BaseApp):
self.log.info('%s Core version %d', ci.coin_name(), core_version) self.log.info('%s Core version %d', ci.coin_name(), core_version)
self.coin_clients[c]['core_version'] = core_version self.coin_clients[c]['core_version'] = core_version
t = threading.Thread(target=threadPollChainState, args=(self, c)) if c == Coins.XMR:
t = threading.Thread(target=threadPollXMRChainState, args=(self, c))
else:
t = threading.Thread(target=threadPollChainState, args=(self, c))
self.threads.append(t) self.threads.append(t)
t.start() t.start()
@ -3553,10 +3562,13 @@ class BasicSwap(BaseApp):
def expireMessages(self): def expireMessages(self):
self.mxDB.acquire() self.mxDB.acquire()
rpc_conn = None
try: try:
ci_part = self.ci(Coins.PART)
rpc_conn = ci_part.open_rpc()
now = int(time.time()) now = int(time.time())
options = {'encoding': 'none'} options = {'encoding': 'none'}
ro = self.callrpc('smsginbox', ['all', '', options]) ro = ci_part.json_request(rpc_conn, 'smsginbox', ['all', '', options])
num_messages = 0 num_messages = 0
num_removed = 0 num_removed = 0
for msg in ro['messages']: for msg in ro['messages']:
@ -3564,7 +3576,7 @@ class BasicSwap(BaseApp):
expire_at = msg['sent'] + msg['ttl'] expire_at = msg['sent'] + msg['ttl']
if expire_at < now: if expire_at < now:
options = {'encoding': 'none', 'delete': True} options = {'encoding': 'none', 'delete': True}
del_msg = self.callrpc('smsg', [msg['msgid'], options]) del_msg = ci_part.json_request(rpc_conn, 'smsg', [msg['msgid'], options])
num_removed += 1 num_removed += 1
if num_messages + num_removed > 0: if num_messages + num_removed > 0:
@ -3573,6 +3585,8 @@ class BasicSwap(BaseApp):
self.log.debug('TODO: Expire records from db') self.log.debug('TODO: Expire records from db')
finally: finally:
if rpc_conn:
ci_part.close_rpc(rpc_conn)
self.mxDB.release() self.mxDB.release()
def countQueuedActions(self, session, bid_id, action_type): def countQueuedActions(self, session, bid_id, action_type):