Ensure pid file is correct before reading authcookie.
This commit is contained in:
parent
8796433c11
commit
31ed5e7142
@ -1,3 +1,3 @@
|
||||
name = "basicswap"
|
||||
|
||||
__version__ = "0.0.2"
|
||||
__version__ = "0.0.3"
|
||||
|
@ -359,7 +359,7 @@ class BasicSwap():
|
||||
self.zmqSubscriber.setsockopt_string(zmq.SUBSCRIBE, 'smsg')
|
||||
|
||||
for c in Coins:
|
||||
self.coin_clients[c] = self.setCoinConnectParams(c)
|
||||
self.setCoinConnectParams(c)
|
||||
|
||||
def prepareLogging(self):
|
||||
self.log = logging.getLogger(self.log_name)
|
||||
@ -385,6 +385,7 @@ class BasicSwap():
|
||||
return {}
|
||||
|
||||
def setCoinConnectParams(self, coin):
|
||||
# Set anything that does not require the daemon to be running
|
||||
chain_client_settings = self.getChainClientSettings(coin)
|
||||
|
||||
bindir = os.path.expanduser(chain_client_settings.get('bindir', ''))
|
||||
@ -399,22 +400,6 @@ class BasicSwap():
|
||||
elif 'rpcpassword' in chain_client_settings:
|
||||
rpcauth = chain_client_settings['rpcuser'] + ':' + chain_client_settings['rpcpassword']
|
||||
self.log.debug('Read %s rpc credentials from json settings', coin)
|
||||
if rpcauth is None:
|
||||
if self.chain == 'mainnet':
|
||||
testnet_name = ''
|
||||
else:
|
||||
testnet_name = chainparams[coin][self.chain].get('name', self.chain)
|
||||
authcookiepath = os.path.join(datadir, testnet_name, '.cookie')
|
||||
self.log.debug('Reading %s rpc credentials from auth cookie %s', coin, authcookiepath)
|
||||
# Wait for daemon to start
|
||||
for i in range(10):
|
||||
if not os.path.exists(authcookiepath):
|
||||
time.sleep(0.5)
|
||||
try:
|
||||
with open(authcookiepath, 'rb') as fp:
|
||||
rpcauth = fp.read().decode('utf-8')
|
||||
except Exception:
|
||||
self.log.warning('Unable to read authcookie for %s, %s', str(coin), authcookiepath)
|
||||
|
||||
session = scoped_session(self.session_factory)
|
||||
try:
|
||||
@ -424,8 +409,9 @@ class BasicSwap():
|
||||
session.close()
|
||||
session.remove()
|
||||
|
||||
return {
|
||||
self.coin_clients[coin] = {
|
||||
'coin': coin,
|
||||
'name': chainparams[coin]['name'],
|
||||
'connection_type': connection_type,
|
||||
'bindir': bindir,
|
||||
'datadir': datadir,
|
||||
@ -437,8 +423,44 @@ class BasicSwap():
|
||||
'last_height_checked': last_height_checked,
|
||||
'use_segwit': chain_client_settings.get('use_segwit', False),
|
||||
'use_csv': chain_client_settings.get('use_csv', True),
|
||||
'pid': None,
|
||||
}
|
||||
|
||||
def setDaemonPID(self, name, pid):
|
||||
for c, v in self.coin_clients.items():
|
||||
if v['name'] == name:
|
||||
v['pid'] = pid
|
||||
|
||||
def getChainDatadirPath(self, coin):
|
||||
datadir = self.coin_clients[coin]['datadir']
|
||||
testnet_name = '' if self.chain == 'mainnet' else chainparams[coin][self.chain].get('name', self.chain)
|
||||
return os.path.join(datadir, testnet_name)
|
||||
|
||||
def setCoinRunParams(self, coin):
|
||||
cc = self.coin_clients[coin]
|
||||
if cc['connection_type'] == 'rpc' and cc['rpcauth'] is None:
|
||||
chain_client_settings = self.getChainClientSettings(coin)
|
||||
authcookiepath = os.path.join(self.getChainDatadirPath(coin), '.cookie')
|
||||
pidfilepath = os.path.join(self.getChainDatadirPath(coin), cc['name'] + '.pid')
|
||||
self.log.debug('Reading %s rpc credentials from auth cookie %s', coin, authcookiepath)
|
||||
# Wait for daemon to start
|
||||
# Test pids to ensure authcookie is read for the correct process
|
||||
for i in range(20):
|
||||
try:
|
||||
with open(pidfilepath, 'rb') as fp:
|
||||
datadir_pid = int(fp.read().decode('utf-8'))
|
||||
assert(datadir_pid == cc['pid'])
|
||||
assert(os.path.exists(authcookiepath))
|
||||
except Exception:
|
||||
time.sleep(0.5)
|
||||
try:
|
||||
assert(datadir_pid == cc['pid'])
|
||||
with open(authcookiepath, 'rb') as fp:
|
||||
cc['rpcauth'] = fp.read().decode('utf-8')
|
||||
except Exception:
|
||||
self.log.error('Unable to read authcookie for %s, %s, datadir pid %d, daemon pid %s', str(coin), authcookiepath, datadir_pid, cc['pid'])
|
||||
raise ValueError('Error, terminating')
|
||||
|
||||
def start(self):
|
||||
self.log.info('Starting BasicSwap %s\n\n', __version__)
|
||||
self.log.info('sqlalchemy version %s', sa.__version__)
|
||||
@ -446,6 +468,7 @@ class BasicSwap():
|
||||
self.upgradeDatabase(self.db_version)
|
||||
|
||||
for c in Coins:
|
||||
self.setCoinRunParams(c)
|
||||
if self.coin_clients[c]['connection_type'] == 'rpc':
|
||||
self.waitForDaemonRPC(c)
|
||||
core_version = self.callcoinrpc(c, 'getnetworkinfo')['version']
|
||||
@ -458,6 +481,38 @@ class BasicSwap():
|
||||
|
||||
self.initialise()
|
||||
|
||||
def stopDaemon(self, coin):
|
||||
num_tries = 10
|
||||
authcookiepath = os.path.join(self.getChainDatadirPath(coin), '.cookie')
|
||||
stopping = False
|
||||
try:
|
||||
for i in range(num_tries):
|
||||
rv = self.callcoincli(coin, 'stop', timeout=10)
|
||||
self.log.debug('Trying to stop %s', str(coin))
|
||||
stopping = True
|
||||
time.sleep(i + 1)
|
||||
except Exception as ex:
|
||||
if 'Could not connect' in str(ex):
|
||||
if stopping:
|
||||
for i in range(30):
|
||||
# The lock file doesn't get deleted
|
||||
# Using .cookie is a temporary workaround, will only work if rpc password is unset.
|
||||
# TODO: Query lock on .lock properly
|
||||
if os.path.exists(authcookiepath):
|
||||
self.log.debug('Waiting on .cookie file %s', str(coin))
|
||||
time.sleep(i + 1)
|
||||
time.sleep(4) # Extra time to settle
|
||||
return
|
||||
self.log.error('stopDaemon %s', str(ex))
|
||||
traceback.print_exc()
|
||||
raise ValueError('Could not stop {}'.format(str(coin)))
|
||||
|
||||
def stopDaemons(self):
|
||||
for c in Coins:
|
||||
chain_client_settings = self.getChainClientSettings(c)
|
||||
if self.coin_clients[c]['connection_type'] == 'rpc' and chain_client_settings['manage_daemon'] is True:
|
||||
self.stopDaemon(c)
|
||||
|
||||
def stopRunning(self, with_code=0):
|
||||
self.fail_code = with_code
|
||||
self.is_running = False
|
||||
@ -2391,14 +2446,14 @@ class BasicSwap():
|
||||
raise ValueError('TX error ' + str(out[1]))
|
||||
return out[0].decode('utf-8').strip()
|
||||
|
||||
def callcoincli(self, coin_type, params, wallet=None):
|
||||
def callcoincli(self, coin_type, params, wallet=None, timeout=None):
|
||||
bindir = self.coin_clients[coin_type]['bindir']
|
||||
datadir = self.coin_clients[coin_type]['datadir']
|
||||
command_cli = os.path.join(bindir, chainparams[coin_type]['name'] + '-cli' + ('.exe' if os.name == 'nt' else ''))
|
||||
chainname = '' if self.chain == 'mainnet' else (' -' + self.chain)
|
||||
args = command_cli + chainname + ' ' + '-datadir=' + datadir + ' ' + params
|
||||
p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
|
||||
out = p.communicate()
|
||||
out = p.communicate(timeout=timeout)
|
||||
if len(out[1]) > 0:
|
||||
raise ValueError('CLI error ' + str(out[1]))
|
||||
return out[0].decode('utf-8').strip()
|
||||
|
@ -20,7 +20,7 @@ import subprocess
|
||||
import logging
|
||||
|
||||
from basicswap import __version__
|
||||
from basicswap.basicswap import BasicSwap
|
||||
from basicswap.basicswap import BasicSwap, Coins
|
||||
from basicswap.http_server import HttpThread
|
||||
|
||||
|
||||
@ -58,8 +58,11 @@ def runClient(fp, data_dir, chain, test_mode):
|
||||
with open(settings_path) as fs:
|
||||
settings = json.load(fs)
|
||||
|
||||
swap_client = BasicSwap(fp, data_dir, settings, chain)
|
||||
|
||||
daemons = []
|
||||
pids = []
|
||||
threads = []
|
||||
|
||||
if os.path.exists(pids_path):
|
||||
with open(pids_path) as fd:
|
||||
@ -67,6 +70,11 @@ def runClient(fp, data_dir, chain, test_mode):
|
||||
# TODO: try close
|
||||
logger.warning('Found pid for daemon {} '.format(ln.strip()))
|
||||
|
||||
# Ensure daemons are stopped
|
||||
swap_client.stopDaemons()
|
||||
|
||||
try:
|
||||
# Try start daemons
|
||||
for c, v in settings['chainclients'].items():
|
||||
if v['manage_daemon'] is True:
|
||||
logger.info('Starting {} daemon'.format(c.capitalize()))
|
||||
@ -75,22 +83,19 @@ def runClient(fp, data_dir, chain, test_mode):
|
||||
daemons.append(startDaemon(v['datadir'], v['bindir'], filename))
|
||||
pid = daemons[-1].pid
|
||||
pids.append((c, pid))
|
||||
swap_client.setDaemonPID(c, pid)
|
||||
logger.info('Started {} {}'.format(filename, pid))
|
||||
|
||||
if len(pids) > 0:
|
||||
with open(pids_path, 'w') as fd:
|
||||
for p in pids:
|
||||
fd.write('{}:{}\n'.format(*p))
|
||||
|
||||
swap_client = BasicSwap(fp, data_dir, settings, chain)
|
||||
|
||||
if not test_mode:
|
||||
# signal only works in main thread
|
||||
# Signal only works in main thread
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
swap_client.start()
|
||||
|
||||
threads = []
|
||||
if 'htmlhost' in settings:
|
||||
swap_client.log.info('Starting server at %s:%d.' % (settings['htmlhost'], settings['htmlport']))
|
||||
allow_cors = settings['allowcors'] if 'allowcors' in settings else ALLOW_CORS
|
||||
@ -98,12 +103,11 @@ def runClient(fp, data_dir, chain, test_mode):
|
||||
threads.append(tS1)
|
||||
tS1.start()
|
||||
|
||||
try:
|
||||
logger.info('Exit with Ctrl + c.')
|
||||
while swap_client.is_running:
|
||||
time.sleep(0.5)
|
||||
swap_client.update()
|
||||
except Exception:
|
||||
except Exception as ex:
|
||||
traceback.print_exc()
|
||||
|
||||
swap_client.log.info('Stopping threads.')
|
||||
|
Loading…
Reference in New Issue
Block a user