|
|
|
@ -244,6 +244,10 @@ class BasicSwap(BaseApp): |
|
|
|
|
self._possibly_revoked_offers = collections.deque([], maxlen=48) # TODO: improve |
|
|
|
|
self._updating_wallets_info = {} |
|
|
|
|
self._last_updated_wallets_info = 0 |
|
|
|
|
self._zmq_queue_enabled = self.settings.get('zmq_queue_enabled', True) |
|
|
|
|
self._poll_smsg = self.settings.get('poll_smsg', False) |
|
|
|
|
self.check_smsg_seconds = self.settings.get('check_smsg_seconds', 10) |
|
|
|
|
self._last_checked_smsg = 0 |
|
|
|
|
|
|
|
|
|
self._notifications_enabled = self.settings.get('notifications_enabled', True) |
|
|
|
|
self._disabled_notification_types = self.settings.get('disabled_notification_types', []) |
|
|
|
@ -336,11 +340,12 @@ class BasicSwap(BaseApp): |
|
|
|
|
session.close() |
|
|
|
|
session.remove() |
|
|
|
|
|
|
|
|
|
self.zmqContext = zmq.Context() |
|
|
|
|
self.zmqSubscriber = self.zmqContext.socket(zmq.SUB) |
|
|
|
|
if self._zmq_queue_enabled: |
|
|
|
|
self.zmqContext = zmq.Context() |
|
|
|
|
self.zmqSubscriber = self.zmqContext.socket(zmq.SUB) |
|
|
|
|
|
|
|
|
|
self.zmqSubscriber.connect(self.settings['zmqhost'] + ':' + str(self.settings['zmqport'])) |
|
|
|
|
self.zmqSubscriber.setsockopt_string(zmq.SUBSCRIBE, 'smsg') |
|
|
|
|
self.zmqSubscriber.connect(self.settings['zmqhost'] + ':' + str(self.settings['zmqport'])) |
|
|
|
|
self.zmqSubscriber.setsockopt_string(zmq.SUBSCRIBE, 'smsg') |
|
|
|
|
|
|
|
|
|
for c in Coins: |
|
|
|
|
if c in chainparams: |
|
|
|
@ -388,7 +393,8 @@ class BasicSwap(BaseApp): |
|
|
|
|
else: |
|
|
|
|
self.thread_pool.shutdown() |
|
|
|
|
|
|
|
|
|
self.zmqContext.destroy() |
|
|
|
|
if self._zmq_queue_enabled: |
|
|
|
|
self.zmqContext.destroy() |
|
|
|
|
|
|
|
|
|
self.swaps_in_progress.clear() |
|
|
|
|
close_all_sessions() |
|
|
|
@ -690,6 +696,9 @@ class BasicSwap(BaseApp): |
|
|
|
|
upgradeDatabase(self, self.db_version) |
|
|
|
|
upgradeDatabaseData(self, self.db_data_version) |
|
|
|
|
|
|
|
|
|
if self._zmq_queue_enabled and self._poll_smsg: |
|
|
|
|
self.log.warning('SMSG polling and zmq listener enabled.') |
|
|
|
|
|
|
|
|
|
for c in Coins: |
|
|
|
|
if c not in chainparams: |
|
|
|
|
continue |
|
|
|
@ -908,28 +917,37 @@ class BasicSwap(BaseApp): |
|
|
|
|
self._is_encrypted, self._is_locked = self.ci(Coins.PART).isWalletEncryptedLocked() |
|
|
|
|
|
|
|
|
|
def unlockWallets(self, password: str, coin=None) -> None: |
|
|
|
|
self._read_zmq_queue = False |
|
|
|
|
for c in self.getListOfWalletCoins(): |
|
|
|
|
if coin and c != coin: |
|
|
|
|
continue |
|
|
|
|
self.ci(c).unlockWallet(password) |
|
|
|
|
if c == Coins.PART: |
|
|
|
|
self._is_locked = False |
|
|
|
|
try: |
|
|
|
|
self._read_zmq_queue = False |
|
|
|
|
for c in self.getListOfWalletCoins(): |
|
|
|
|
if coin and c != coin: |
|
|
|
|
continue |
|
|
|
|
try: |
|
|
|
|
self.ci(c).unlockWallet(password) |
|
|
|
|
except Exception as e: |
|
|
|
|
self.log.warning('Failed to unlock wallet {}'.format(getCoinName(c))) |
|
|
|
|
if coin is not None or c == Coins.PART: |
|
|
|
|
raise e |
|
|
|
|
if c == Coins.PART: |
|
|
|
|
self._is_locked = False |
|
|
|
|
|
|
|
|
|
self.loadFromDB() |
|
|
|
|
self._read_zmq_queue = True |
|
|
|
|
self.loadFromDB() |
|
|
|
|
finally: |
|
|
|
|
self._read_zmq_queue = True |
|
|
|
|
|
|
|
|
|
def lockWallets(self, coin=None) -> None: |
|
|
|
|
self._read_zmq_queue = False |
|
|
|
|
self.swaps_in_progress.clear() |
|
|
|
|
try: |
|
|
|
|
self._read_zmq_queue = False |
|
|
|
|
self.swaps_in_progress.clear() |
|
|
|
|
|
|
|
|
|
for c in self.getListOfWalletCoins(): |
|
|
|
|
if coin and c != coin: |
|
|
|
|
continue |
|
|
|
|
self.ci(c).lockWallet() |
|
|
|
|
if c == Coins.PART: |
|
|
|
|
self._is_locked = True |
|
|
|
|
self._read_zmq_queue = True |
|
|
|
|
for c in self.getListOfWalletCoins(): |
|
|
|
|
if coin and c != coin: |
|
|
|
|
continue |
|
|
|
|
self.ci(c).lockWallet() |
|
|
|
|
if c == Coins.PART: |
|
|
|
|
self._is_locked = True |
|
|
|
|
finally: |
|
|
|
|
self._read_zmq_queue = True |
|
|
|
|
|
|
|
|
|
def initialiseWallet(self, coin_type, raise_errors: bool = False) -> None: |
|
|
|
|
if coin_type == Coins.PART: |
|
|
|
@ -4236,7 +4254,7 @@ class BasicSwap(BaseApp): |
|
|
|
|
self.log.error(traceback.format_exc()) |
|
|
|
|
|
|
|
|
|
now: int = self.getTime() |
|
|
|
|
options = {'encoding': 'none'} |
|
|
|
|
options = {'encoding': 'none', 'setread': False} |
|
|
|
|
inbox_messages = ci_part.json_request(rpc_conn, 'smsginbox', ['all', '', options])['messages'] |
|
|
|
|
for msg in inbox_messages: |
|
|
|
|
remove_if_expired(msg) |
|
|
|
@ -6055,15 +6073,25 @@ class BasicSwap(BaseApp): |
|
|
|
|
self.processMsg(msg) |
|
|
|
|
|
|
|
|
|
def update(self) -> None: |
|
|
|
|
try: |
|
|
|
|
if self._read_zmq_queue: |
|
|
|
|
message = self.zmqSubscriber.recv(flags=zmq.NOBLOCK) |
|
|
|
|
if message == b'smsg': |
|
|
|
|
self.processZmqSmsg() |
|
|
|
|
except zmq.Again as ex: |
|
|
|
|
pass |
|
|
|
|
except Exception as ex: |
|
|
|
|
self.logException(f'smsg zmq {ex}') |
|
|
|
|
if self._zmq_queue_enabled: |
|
|
|
|
try: |
|
|
|
|
if self._read_zmq_queue: |
|
|
|
|
message = self.zmqSubscriber.recv(flags=zmq.NOBLOCK) |
|
|
|
|
if message == b'smsg': |
|
|
|
|
self.processZmqSmsg() |
|
|
|
|
except zmq.Again as ex: |
|
|
|
|
pass |
|
|
|
|
except Exception as ex: |
|
|
|
|
self.logException(f'smsg zmq {ex}') |
|
|
|
|
|
|
|
|
|
if self._poll_smsg: |
|
|
|
|
now: int = self.getTime() |
|
|
|
|
if now - self._last_checked_smsg >= self.check_smsg_seconds: |
|
|
|
|
self._last_checked_smsg = now |
|
|
|
|
options = {'encoding': 'hex', 'setread': True} |
|
|
|
|
msgs = self.callrpc('smsginbox', ['unread', '', options]) |
|
|
|
|
for msg in msgs['messages']: |
|
|
|
|
self.processMsg(msg) |
|
|
|
|
|
|
|
|
|
self.mxDB.acquire() |
|
|
|
|
try: |
|
|
|
|