basicswap_miserver/basicswap/base.py
2023-03-09 00:53:54 +02:00

214 lines
7.8 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (c) 2019-2023 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import os
import time
import shlex
import socks
import socket
import urllib
import logging
import threading
import traceback
import subprocess
from sockshandler import SocksiPyHandler
import basicswap.config as cfg
from .rpc import (
callrpc,
)
from .util import (
TemporaryError,
)
from .chainparams import (
Coins,
chainparams,
)
def getaddrinfo_tor(*args):
return [(socket.AF_INET, socket.SOCK_STREAM, 6, "", (args[0], args[1]))]
class BaseApp:
def __init__(self, fp, data_dir, settings, chain, log_name='BasicSwap'):
self.log_name = log_name
self.fp = fp
self.is_running = True
self.fail_code = 0
self.mock_time_offset = 0
self.data_dir = data_dir
self.chain = chain
self.settings = settings
self.coin_clients = {}
self.coin_interfaces = {}
self.mxDB = threading.RLock()
self.debug = self.settings.get('debug', False)
self.delay_event = threading.Event()
self._network = None
self.prepareLogging()
self.log.info('Network: {}'.format(self.chain))
self.use_tor_proxy = self.settings.get('use_tor', False)
self.tor_proxy_host = self.settings.get('tor_proxy_host', '127.0.0.1')
self.tor_proxy_port = self.settings.get('tor_proxy_port', 9050)
self.tor_control_password = self.settings.get('tor_control_password', None)
self.tor_control_port = self.settings.get('tor_control_port', 9051)
self.default_socket = socket.socket
self.default_socket_timeout = socket.getdefaulttimeout()
self.default_socket_getaddrinfo = socket.getaddrinfo
def stopRunning(self, with_code=0):
self.fail_code = with_code
with self.mxDB:
self.is_running = False
self.delay_event.set()
def prepareLogging(self):
self.log = logging.getLogger(self.log_name)
self.log.propagate = False
# Remove any existing handlers
self.log.handlers = []
formatter = logging.Formatter('%(asctime)s %(levelname)s : %(message)s')
stream_stdout = logging.StreamHandler()
if self.log_name != 'BasicSwap':
stream_stdout.setFormatter(logging.Formatter('%(asctime)s %(name)s %(levelname)s : %(message)s'))
else:
stream_stdout.setFormatter(formatter)
stream_fp = logging.StreamHandler(self.fp)
stream_fp.setFormatter(formatter)
self.log.setLevel(logging.DEBUG if self.debug else logging.INFO)
self.log.addHandler(stream_fp)
self.log.addHandler(stream_stdout)
def getChainClientSettings(self, coin):
try:
return self.settings['chainclients'][chainparams[coin]['name']]
except Exception:
return {}
def setDaemonPID(self, name, pid) -> None:
if isinstance(name, Coins):
self.coin_clients[name]['pid'] = pid
return
for c, v in self.coin_clients.items():
if v['name'] == name:
v['pid'] = pid
def getChainDatadirPath(self, coin) -> str:
datadir = self.coin_clients[coin]['datadir']
testnet_name = '' if self.chain == 'mainnet' else chainparams[coin][self.chain].get('name', self.chain)
return os.path.join(datadir, testnet_name)
def getCoinIdFromName(self, coin_name: str):
for c, params in chainparams.items():
if coin_name.lower() == params['name'].lower():
return c
raise ValueError('Unknown coin: {}'.format(coin_name))
def callrpc(self, method, params=[], wallet=None):
cc = self.coin_clients[Coins.PART]
return callrpc(cc['rpcport'], cc['rpcauth'], method, params, wallet, cc['rpchost'])
def callcoinrpc(self, coin, method, params=[], wallet=None):
cc = self.coin_clients[coin]
return callrpc(cc['rpcport'], cc['rpcauth'], method, params, wallet, cc['rpchost'])
def calltx(self, cmd):
bindir = self.coin_clients[Coins.PART]['bindir']
args = [os.path.join(bindir, cfg.PARTICL_TX), ]
if self.chain != 'mainnet':
args.append('-' + self.chain)
args += shlex.split(cmd)
p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out = p.communicate()
if len(out[1]) > 0:
raise ValueError('TX error ' + str(out[1]))
return out[0].decode('utf-8').strip()
def callcoincli(self, coin_type, params, wallet=None, timeout=None):
bindir = self.coin_clients[coin_type]['bindir']
datadir = self.coin_clients[coin_type]['datadir']
command_cli = os.path.join(bindir, chainparams[coin_type]['name'] + '-cli' + ('.exe' if os.name == 'nt' else ''))
args = [command_cli, ]
if self.chain != 'mainnet':
args.append('-' + self.chain)
args.append('-datadir=' + datadir)
args += shlex.split(params)
p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out = p.communicate(timeout=timeout)
if len(out[1]) > 0:
raise ValueError('CLI error ' + str(out[1]))
return out[0].decode('utf-8').strip()
def is_transient_error(self, ex) -> bool:
if isinstance(ex, TemporaryError):
return True
str_error = str(ex).lower()
return 'read timed out' in str_error or 'no connection to daemon' in str_error
def setConnectionParameters(self, timeout=120):
opener = urllib.request.build_opener()
opener.addheaders = [('User-agent', 'Mozilla/5.0')]
urllib.request.install_opener(opener)
if self.use_tor_proxy:
socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, self.tor_proxy_host, self.tor_proxy_port, rdns=True)
socket.socket = socks.socksocket
socket.getaddrinfo = getaddrinfo_tor # Without this accessing .onion links would fail
socket.setdefaulttimeout(timeout)
def popConnectionParameters(self) -> None:
if self.use_tor_proxy:
socket.socket = self.default_socket
socket.getaddrinfo = self.default_socket_getaddrinfo
socket.setdefaulttimeout(self.default_socket_timeout)
def readURL(self, url: str, timeout: int = 120, headers=None) -> bytes:
open_handler = None
if self.use_tor_proxy:
open_handler = SocksiPyHandler(socks.PROXY_TYPE_SOCKS5, self.tor_proxy_host, self.tor_proxy_port)
opener = urllib.request.build_opener(open_handler) if self.use_tor_proxy else urllib.request.build_opener()
opener.addheaders = [('User-agent', 'Mozilla/5.0')]
request = urllib.request.Request(url, headers=headers)
return opener.open(request, timeout=timeout).read()
def logException(self, message) -> None:
self.log.error(message)
if self.debug:
self.log.error(traceback.format_exc())
def torControl(self, query):
try:
command = 'AUTHENTICATE "{}"\r\n{}\r\nQUIT\r\n'.format(self.tor_control_password, query).encode('utf-8')
c = socket.create_connection((self.tor_proxy_host, self.tor_control_port))
c.send(command)
response = bytearray()
while True:
rv = c.recv(1024)
if not rv:
break
response += rv
c.close()
return response
except Exception as e:
self.log.error(f'torControl {e}')
return
def getTime(self) -> int:
return int(time.time()) + self.mock_time_offset
def setMockTimeOffset(self, new_offset: int) -> None:
self.log.warning(f'Setting mocktime to {new_offset}')
self.mock_time_offset = new_offset