basicswap_miserver/basicswap/interface/xmr.py

581 lines
23 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2020-2024 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import json
import logging
import basicswap.contrib.ed25519_fast as edf
import basicswap.ed25519_fast_util as edu
import basicswap.util_xmr as xmr_util
from coincurve.ed25519 import (
ed25519_add,
ed25519_get_pubkey,
ed25519_scalar_add,
)
from coincurve.keys import PrivateKey
from coincurve.dleag import (
dleag_prove,
dleag_verify,
dleag_proof_len,
verify_ed25519_point,
)
from basicswap.interface import (
Curves)
from basicswap.util import (
i2b, b2i, b2h,
dumpj,
ensure,
make_int,
TemporaryError)
from basicswap.util.network import (
is_private_ip_address)
from basicswap.rpc_xmr import (
make_xmr_rpc_func,
make_xmr_rpc2_func)
from basicswap.chainparams import XMR_COIN, CoinInterface, Coins
class XMRInterface(CoinInterface):
@staticmethod
def curve_type():
return Curves.ed25519
@staticmethod
def coin_type():
return Coins.XMR
@staticmethod
def COIN():
return XMR_COIN
@staticmethod
def exp() -> int:
return 12
@staticmethod
def nbk() -> int:
return 32
@staticmethod
def nbK() -> int: # No. of bytes requires to encode a public key
return 32
@staticmethod
def depth_spendable() -> int:
return 10
@staticmethod
def xmr_swap_a_lock_spend_tx_vsize() -> int:
raise ValueError('Not possible')
@staticmethod
def xmr_swap_b_lock_spend_tx_vsize() -> int:
# TODO: Estimate with ringsize
return 1507
def __init__(self, coin_settings, network, swap_client=None):
super().__init__(network)
self.blocks_confirmed = coin_settings['blocks_confirmed']
self._restore_height = coin_settings.get('restore_height', 0)
self.setFeePriority(coin_settings.get('fee_priority', 0))
self._sc = swap_client
self._log = self._sc.log if self._sc and self._sc.log else logging
self._wallet_password = None
self._have_checked_seed = False
daemon_login = None
if coin_settings.get('rpcuser', '') != '':
daemon_login = (coin_settings.get('rpcuser', ''), coin_settings.get('rpcpassword', ''))
rpchost = coin_settings.get('rpchost', '127.0.0.1')
proxy_host = None
proxy_port = None
# Connect to the daemon over a proxy if not running locally
if swap_client:
chain_client_settings = swap_client.getChainClientSettings(self.coin_type())
manage_daemon: bool = chain_client_settings['manage_daemon']
if swap_client.use_tor_proxy:
if manage_daemon is False:
log_str: str = ''
have_cc_tor_opt = 'use_tor' in chain_client_settings
if have_cc_tor_opt and chain_client_settings['use_tor'] is False:
log_str = ' bypassing proxy (use_tor false for XMR)'
elif have_cc_tor_opt is False and is_private_ip_address(rpchost):
log_str = ' bypassing proxy (private ip address)'
else:
proxy_host = swap_client.tor_proxy_host
proxy_port = swap_client.tor_proxy_port
log_str = f' through proxy at {proxy_host}'
self._log.info(f'Connecting to remote {self.coin_name()} daemon at {rpchost}{log_str}.')
else:
self._log.info(f'Not connecting to local {self.coin_name()} daemon through proxy.')
elif manage_daemon is False:
self._log.info(f'Connecting to remote {self.coin_name()} daemon at {rpchost}.')
self._rpctimeout = coin_settings.get('rpctimeout', 60)
self._walletrpctimeout = coin_settings.get('walletrpctimeout', 120)
self._walletrpctimeoutlong = coin_settings.get('walletrpctimeoutlong', 600)
self.rpc = make_xmr_rpc_func(coin_settings['rpcport'], daemon_login, host=rpchost, proxy_host=proxy_host, proxy_port=proxy_port, default_timeout=self._rpctimeout, tag='Node(j) ')
self.rpc2 = make_xmr_rpc2_func(coin_settings['rpcport'], daemon_login, host=rpchost, proxy_host=proxy_host, proxy_port=proxy_port, default_timeout=self._rpctimeout, tag='Node ') # non-json endpoint
self.rpc_wallet = make_xmr_rpc_func(coin_settings['walletrpcport'], coin_settings['walletrpcauth'], host=coin_settings.get('walletrpchost', '127.0.0.1'), default_timeout=self._walletrpctimeout, tag='Wallet ')
def checkWallets(self) -> int:
return 1
def setFeePriority(self, new_priority):
ensure(new_priority >= 0 and new_priority < 4, 'Invalid fee_priority value')
self._fee_priority = new_priority
def setWalletFilename(self, wallet_filename):
self._wallet_filename = wallet_filename
def createWallet(self, params):
if self._wallet_password is not None:
params['password'] = self._wallet_password
rv = self.rpc_wallet('generate_from_keys', params)
self._log.info('generate_from_keys %s', dumpj(rv))
def openWallet(self, filename):
params = {'filename': filename}
if self._wallet_password is not None:
params['password'] = self._wallet_password
try:
# Can't reopen the same wallet in windows, !is_keys_file_locked()
self.rpc_wallet('close_wallet')
except Exception:
pass
self.rpc_wallet('open_wallet', params)
def initialiseWallet(self, key_view, key_spend, restore_height=None):
with self._mx_wallet:
try:
self.openWallet(self._wallet_filename)
# TODO: Check address
return # Wallet exists
except Exception as e:
pass
Kbv = self.getPubkey(key_view)
Kbs = self.getPubkey(key_spend)
address_b58 = xmr_util.encode_address(Kbv, Kbs)
params = {
'filename': self._wallet_filename,
'address': address_b58,
'viewkey': b2h(key_view[::-1]),
'spendkey': b2h(key_spend[::-1]),
'restore_height': self._restore_height,
}
self.createWallet(params)
self.openWallet(self._wallet_filename)
def ensureWalletExists(self) -> None:
with self._mx_wallet:
self.openWallet(self._wallet_filename)
def testDaemonRPC(self, with_wallet=True) -> None:
self.rpc_wallet('get_languages')
def getDaemonVersion(self):
return self.rpc_wallet('get_version')['version']
def getBlockchainInfo(self):
get_height = self.rpc2('get_height', timeout=self._rpctimeout)
rv = {
'blocks': get_height['height'],
'verificationprogress': 0.0,
}
try:
# get_block_count.block_count is how many blocks are in the longest chain known to the node.
# get_block_count returns "Internal error" if bootstrap-daemon is active
if get_height['untrusted'] is True:
rv['bootstrapping'] = True
get_info = self.rpc2('get_info', timeout=self._rpctimeout)
if 'height_without_bootstrap' in get_info:
rv['blocks'] = get_info['height_without_bootstrap']
rv['known_block_count'] = get_info['height']
if rv['known_block_count'] > rv['blocks']:
rv['verificationprogress'] = rv['blocks'] / rv['known_block_count']
else:
rv['known_block_count'] = self.rpc('get_block_count', timeout=self._rpctimeout)['count']
rv['verificationprogress'] = rv['blocks'] / rv['known_block_count']
except Exception as e:
self._log.warning('XMR get_block_count failed with: %s', str(e))
rv['verificationprogress'] = 0.0
return rv
def getChainHeight(self):
return self.rpc2('get_height', timeout=self._rpctimeout)['height']
def getWalletInfo(self):
with self._mx_wallet:
try:
self.openWallet(self._wallet_filename)
except Exception as e:
if 'Failed to open wallet' in str(e):
rv = {'encrypted': True, 'locked': True, 'balance': 0, 'unconfirmed_balance': 0}
return rv
raise e
rv = {}
self.rpc_wallet('refresh')
balance_info = self.rpc_wallet('get_balance')
rv['balance'] = self.format_amount(balance_info['unlocked_balance'])
rv['unconfirmed_balance'] = self.format_amount(balance_info['balance'] - balance_info['unlocked_balance'])
rv['encrypted'] = False if self._wallet_password is None else True
rv['locked'] = False
return rv
def walletRestoreHeight(self):
return self._restore_height
def getMainWalletAddress(self) -> str:
with self._mx_wallet:
self.openWallet(self._wallet_filename)
return self.rpc_wallet('get_address')['address']
def getNewAddress(self, placeholder) -> str:
with self._mx_wallet:
self.openWallet(self._wallet_filename)
new_address = self.rpc_wallet('create_address', {'account_index': 0})['address']
self.rpc_wallet('store')
return new_address
def get_fee_rate(self, conf_target: int = 2):
self._log.warning('TODO - estimate XMR fee rate?')
return 0.0, 'unused'
def getNewSecretKey(self) -> bytes:
# Note: Returned bytes are in big endian order
return i2b(edu.get_secret())
def pubkey(self, key: bytes) -> bytes:
return edf.scalarmult_B(key)
def encodeKey(self, vk: bytes) -> str:
return vk[::-1].hex()
def decodeKey(self, k_hex: str) -> bytes:
return bytes.fromhex(k_hex)[::-1]
def encodePubkey(self, pk: bytes) -> str:
return edu.encodepoint(pk)
def decodePubkey(self, pke):
return edf.decodepoint(pke)
def getPubkey(self, privkey):
return ed25519_get_pubkey(privkey)
def getAddressFromKeys(self, key_view: bytes, key_spend: bytes) -> str:
pk_view = self.getPubkey(key_view)
pk_spend = self.getPubkey(key_spend)
return xmr_util.encode_address(pk_view, pk_spend)
def verifyKey(self, k: int) -> bool:
i = b2i(k)
return (i < edf.l and i > 8)
def verifyPubkey(self, pubkey_bytes):
# Calls ed25519_decode_check_point() in secp256k1
# Checks for small order
return verify_ed25519_point(pubkey_bytes)
def proveDLEAG(self, key: bytes) -> bytes:
privkey = PrivateKey(key)
return dleag_prove(privkey)
def verifyDLEAG(self, dleag_bytes: bytes) -> bool:
return dleag_verify(dleag_bytes)
def lengthDLEAG(self) -> int:
return dleag_proof_len()
def sumKeys(self, ka: bytes, kb: bytes) -> bytes:
return ed25519_scalar_add(ka, kb)
def sumPubkeys(self, Ka: bytes, Kb: bytes) -> bytes:
return ed25519_add(Ka, Kb)
def encodeSharedAddress(self, Kbv: bytes, Kbs: bytes) -> str:
return xmr_util.encode_address(Kbv, Kbs)
def publishBLockTx(self, kbv: bytes, Kbs: bytes, output_amount: int, feerate: int, delay_for: int = 10, unlock_time: int = 0) -> bytes:
with self._mx_wallet:
self.openWallet(self._wallet_filename)
self.rpc_wallet('refresh')
Kbv = self.getPubkey(kbv)
shared_addr = xmr_util.encode_address(Kbv, Kbs)
params = {'destinations': [{'amount': output_amount, 'address': shared_addr}], 'unlock_time': unlock_time}
if self._fee_priority > 0:
params['priority'] = self._fee_priority
rv = self.rpc_wallet('transfer', params)
self._log.info('publishBLockTx %s to address_b58 %s', rv['tx_hash'], shared_addr)
tx_hash = bytes.fromhex(rv['tx_hash'])
if self._sc.debug:
i = 0
while not self._sc.delay_event.is_set():
gt_params = {'out': True, 'pending': True, 'failed': True, 'pool': True, }
rv = self.rpc_wallet('get_transfers', gt_params)
self._log.debug('get_transfers {}'.format(dumpj(rv)))
if 'pending' not in rv:
break
if i >= delay_for:
break
self._sc.delay_event.wait(1.0)
return tx_hash
def findTxB(self, kbv, Kbs, cb_swap_value, cb_block_confirmed, restore_height, bid_sender):
with self._mx_wallet:
Kbv = self.getPubkey(kbv)
address_b58 = xmr_util.encode_address(Kbv, Kbs)
kbv_le = kbv[::-1]
params = {
'restore_height': restore_height,
'filename': address_b58,
'address': address_b58,
'viewkey': b2h(kbv_le),
}
try:
self.openWallet(address_b58)
except Exception as e:
self.createWallet(params)
self.openWallet(address_b58)
self.rpc_wallet('refresh', timeout=self._walletrpctimeoutlong)
'''
# Debug
try:
current_height = self.rpc_wallet('get_height')['height']
self._log.info('findTxB XMR current_height %d\nAddress: %s', current_height, address_b58)
except Exception as e:
self._log.info('rpc failed %s', str(e))
current_height = None # If the transfer is available it will be deep enough
# and (current_height is None or current_height - transfer['block_height'] > cb_block_confirmed):
'''
params = {'transfer_type': 'available'}
transfers = self.rpc_wallet('incoming_transfers', params)
rv = None
if 'transfers' in transfers:
for transfer in transfers['transfers']:
# unlocked <- wallet->is_transfer_unlocked() checks unlock_time and CRYPTONOTE_DEFAULT_TX_SPENDABLE_AGE
if not transfer['unlocked']:
full_tx = self.rpc_wallet('get_transfer_by_txid', {'txid': transfer['tx_hash']})
unlock_time = full_tx['transfer']['unlock_time']
if unlock_time != 0:
self._log.warning('Coin b lock txn is locked: {}, unlock_time {}'.format(transfer['tx_hash'], unlock_time))
rv = -1
continue
if transfer['amount'] == cb_swap_value:
return {'txid': transfer['tx_hash'], 'amount': transfer['amount'], 'height': 0 if 'block_height' not in transfer else transfer['block_height']}
else:
self._log.warning('Incorrect amount detected for coin b lock txn: {}'.format(transfer['tx_hash']))
rv = -1
return rv
def findTxnByHash(self, txid):
with self._mx_wallet:
self.openWallet(self._wallet_filename)
self.rpc_wallet('refresh', timeout=self._walletrpctimeoutlong)
try:
current_height = self.rpc2('get_height', timeout=self._rpctimeout)['height']
self._log.info('findTxnByHash XMR current_height %d\nhash: %s', current_height, txid)
except Exception as e:
self._log.info('rpc failed %s', str(e))
current_height = None # If the transfer is available it will be deep enough
params = {'transfer_type': 'available'}
rv = self.rpc_wallet('incoming_transfers', params)
if 'transfers' in rv:
for transfer in rv['transfers']:
if transfer['tx_hash'] == txid \
and (current_height is None or current_height - transfer['block_height'] > self.blocks_confirmed):
return {'txid': transfer['tx_hash'], 'amount': transfer['amount'], 'height': transfer['block_height']}
return None
def spendBLockTx(self, chain_b_lock_txid: bytes, address_to: str, kbv: bytes, kbs: bytes, cb_swap_value: int, b_fee_rate: int, restore_height: int, spend_actual_balance: bool = False) -> bytes:
'''
Notes:
"Error: No unlocked balance in the specified subaddress(es)" can mean not enough funds after tx fee.
'''
with self._mx_wallet:
Kbv = self.getPubkey(kbv)
Kbs = self.getPubkey(kbs)
address_b58 = xmr_util.encode_address(Kbv, Kbs)
wallet_filename = address_b58 + '_spend'
params = {
'filename': wallet_filename,
'address': address_b58,
'viewkey': b2h(kbv[::-1]),
'spendkey': b2h(kbs[::-1]),
'restore_height': restore_height,
}
try:
self.openWallet(wallet_filename)
except Exception as e:
self.createWallet(params)
self.openWallet(wallet_filename)
self.rpc_wallet('refresh')
rv = self.rpc_wallet('get_balance')
if rv['balance'] < cb_swap_value:
self._log.warning('Balance is too low, checking for existing spend.')
txns = self.rpc_wallet('get_transfers', {'out': True})
if 'out' in txns:
txns = txns['out']
if len(txns) > 0:
txid = txns[0]['txid']
self._log.warning(f'spendBLockTx detected spending tx: {txid}.')
if txns[0]['address'] == address_b58:
return bytes.fromhex(txid)
self._log.error('wallet {} balance {}, expected {}'.format(wallet_filename, rv['balance'], cb_swap_value))
if not spend_actual_balance:
raise TemporaryError('Invalid balance')
if spend_actual_balance and rv['balance'] != cb_swap_value:
self._log.warning('Spending actual balance {}, not swap value {}.'.format(rv['balance'], cb_swap_value))
cb_swap_value = rv['balance']
if rv['unlocked_balance'] < cb_swap_value:
self._log.error('wallet {} balance {}, expected {}, blocks_to_unlock {}'.format(wallet_filename, rv['unlocked_balance'], cb_swap_value, rv['blocks_to_unlock']))
raise TemporaryError('Invalid unlocked_balance')
params = {'address': address_to}
if self._fee_priority > 0:
params['priority'] = self._fee_priority
rv = self.rpc_wallet('sweep_all', params)
self._log.debug('sweep_all {}'.format(json.dumps(rv)))
return bytes.fromhex(rv['tx_hash_list'][0])
def withdrawCoin(self, value, addr_to: str, sweepall: bool, estimate_fee: bool = False) -> str:
with self._mx_wallet:
self.openWallet(self._wallet_filename)
self.rpc_wallet('refresh')
if sweepall:
balance = self.rpc_wallet('get_balance')
if balance['balance'] != balance['unlocked_balance']:
raise ValueError('Balance must be fully confirmed to use sweep all.')
self._log.info('XMR {} sweep_all.'.format('estimate fee' if estimate_fee else 'withdraw'))
self._log.debug('XMR balance: {}'.format(balance['balance']))
params = {'address': addr_to, 'do_not_relay': estimate_fee}
if self._fee_priority > 0:
params['priority'] = self._fee_priority
rv = self.rpc_wallet('sweep_all', params)
if estimate_fee:
return {'num_txns': len(rv['fee_list']), 'sum_amount': sum(rv['amount_list']), 'sum_fee': sum(rv['fee_list']), 'sum_weight': sum(rv['weight_list'])}
return rv['tx_hash_list'][0]
value_sats: int = make_int(value, self.exp())
params = {'destinations': [{'amount': value_sats, 'address': addr_to}], 'do_not_relay': estimate_fee}
if self._fee_priority > 0:
params['priority'] = self._fee_priority
rv = self.rpc_wallet('transfer', params)
if estimate_fee:
return {'num_txns': 1, 'sum_amount': rv['amount'], 'sum_fee': rv['fee'], 'sum_weight': rv['weight']}
return rv['tx_hash']
def estimateFee(self, value: int, addr_to: str, sweepall: bool) -> str:
return self.withdrawCoin(value, addr_to, sweepall, estimate_fee=True)
def showLockTransfers(self, kbv, Kbs, restore_height):
with self._mx_wallet:
try:
Kbv = self.getPubkey(kbv)
address_b58 = xmr_util.encode_address(Kbv, Kbs)
wallet_file = address_b58 + '_spend'
try:
self.openWallet(wallet_file)
except Exception:
wallet_file = address_b58
try:
self.openWallet(wallet_file)
except Exception:
self._log.info(f'showLockTransfers trying to create wallet for address {address_b58}.')
kbv_le = kbv[::-1]
params = {
'restore_height': restore_height,
'filename': address_b58,
'address': address_b58,
'viewkey': b2h(kbv_le),
}
self.createWallet(params)
self.openWallet(address_b58)
self.rpc_wallet('refresh')
rv = self.rpc_wallet('get_transfers', {'in': True, 'out': True, 'pending': True, 'failed': True})
rv['filename'] = wallet_file
return rv
except Exception as e:
return {'error': str(e)}
def getSpendableBalance(self) -> int:
with self._mx_wallet:
self.openWallet(self._wallet_filename)
self.rpc_wallet('refresh')
balance_info = self.rpc_wallet('get_balance')
return balance_info['unlocked_balance']
def changeWalletPassword(self, old_password, new_password):
self._log.info('changeWalletPassword - {}'.format(self.ticker()))
orig_password = self._wallet_password
if old_password != '':
self._wallet_password = old_password
try:
self.openWallet(self._wallet_filename)
self.rpc_wallet('change_wallet_password', {'old_password': old_password, 'new_password': new_password})
except Exception as e:
self._wallet_password = orig_password
raise e
def unlockWallet(self, password: str) -> None:
self._log.info('unlockWallet - {}'.format(self.ticker()))
self._wallet_password = password
if not self._have_checked_seed:
self._sc.checkWalletSeed(self.coin_type())
def lockWallet(self) -> None:
self._log.info('lockWallet - {}'.format(self.ticker()))
self._wallet_password = None
def isAddressMine(self, address):
# TODO
return True
def ensureFunds(self, amount: int) -> None:
if self.getSpendableBalance() < amount:
raise ValueError('Balance too low')
def getTransaction(self, txid: bytes):
return self.rpc2('get_transactions', {'txs_hashes': [txid.hex(), ]})