Host-customized fork of https://github.com/tecnovert/basicswap/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

368 lines
14 KiB

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2022-2023 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import random
import hashlib
from .btc import BTCInterface, find_vout_for_address_from_txobj
from basicswap.util import (
i2b,
ensure,
)
from basicswap.rpc import make_rpc_func
from basicswap.util.crypto import hash160
from basicswap.util.address import decodeAddress
from basicswap.chainparams import Coins
from basicswap.interface.contrib.firo_test_framework.script import (
CScript,
OP_DUP,
OP_EQUAL,
OP_HASH160,
OP_CHECKSIG,
OP_EQUALVERIFY,
)
from basicswap.interface.contrib.firo_test_framework.mininode import (
CBlock,
FromHex,
CTransaction,
)
class FIROInterface(BTCInterface):
@staticmethod
def coin_type():
return Coins.FIRO
def __init__(self, coin_settings, network, swap_client=None):
super(FIROInterface, self).__init__(coin_settings, network, swap_client)
# No multiwallet support
self.rpc_wallet = make_rpc_func(self._rpcport, self._rpcauth, host=self._rpc_host)
def checkWallets(self) -> int:
return 1
def getExchangeName(self, exchange_name):
return 'zcoin'
def initialiseWallet(self, key):
# load with -hdseed= parameter
pass
def getNewAddress(self, use_segwit, label='swap_receive'):
return self.rpc('getnewaddress', [label])
# addr_plain = self.rpc('getnewaddress', [label])
# return self.rpc('addwitnessaddress', [addr_plain])
def decodeAddress(self, address):
return decodeAddress(address)[1:]
def encodeSegwitAddress(self, script):
raise ValueError('TODO')
def decodeSegwitAddress(self, addr):
raise ValueError('TODO')
def isWatchOnlyAddress(self, address):
addr_info = self.rpc('validateaddress', [address])
return addr_info['iswatchonly']
def isAddressMine(self, address: str, or_watch_only: bool = False) -> bool:
addr_info = self.rpc('validateaddress', [address])
if not or_watch_only:
return addr_info['ismine']
return addr_info['ismine'] or addr_info['iswatchonly']
def getSCLockScriptAddress(self, lock_script):
lock_tx_dest = self.getScriptDest(lock_script)
address = self.encodeScriptDest(lock_tx_dest)
if not self.isAddressMine(address, or_watch_only=True):
# Expects P2WSH nested in BIP16_P2SH
ro = self.rpc('importaddress', [lock_tx_dest.hex(), 'bid lock', False, True])
addr_info = self.rpc('validateaddress', [address])
return address
def getLockTxHeight(self, txid, dest_address, bid_amount, rescan_from, find_index: bool = False):
# Add watchonly address and rescan if required
if not self.isAddressMine(dest_address, or_watch_only=True):
self.importWatchOnlyAddress(dest_address, 'bid')
self._log.info('Imported watch-only addr: {}'.format(dest_address))
self._log.info('Rescanning {} chain from height: {}'.format(self.coin_name(), rescan_from))
self.rescanBlockchainForAddress(rescan_from, dest_address)
return_txid = True if txid is None else False
if txid is None:
txns = self.rpc('listunspent', [0, 9999999, [dest_address, ]])
for tx in txns:
if self.make_int(tx['amount']) == bid_amount:
txid = bytes.fromhex(tx['txid'])
break
if txid is None:
return None
try:
tx = self.rpc('gettransaction', [txid.hex()])
block_height = 0
if 'blockhash' in tx:
block_header = self.rpc('getblockheader', [tx['blockhash']])
block_height = block_header['height']
rv = {
'depth': 0 if 'confirmations' not in tx else tx['confirmations'],
'height': block_height}
except Exception as e:
self._log.debug('getLockTxHeight gettransaction failed: %s, %s', txid.hex(), str(e))
return None
if find_index:
tx_obj = self.rpc('decoderawtransaction', [tx['hex']])
rv['index'] = find_vout_for_address_from_txobj(tx_obj, dest_address)
if return_txid:
rv['txid'] = txid.hex()
return rv
def createSCLockTx(self, value: int, script: bytearray, vkbv: bytes = None) -> bytes:
tx = CTransaction()
tx.nVersion = self.txVersion()
tx.vout.append(self.txoType()(value, self.getScriptDest(script)))
return tx.serialize()
def fundSCLockTx(self, tx_bytes, feerate, vkbv=None):
return self.fundTx(tx_bytes, feerate)
def signTxWithWallet(self, tx):
rv = self.rpc('signrawtransaction', [tx.hex()])
return bytes.fromhex(rv['hex'])
def createRawFundedTransaction(self, addr_to: str, amount: int, sub_fee: bool = False, lock_unspents: bool = True) -> str:
txn = self.rpc('createrawtransaction', [[], {addr_to: self.format_amount(amount)}])
fee_rate, fee_src = self.get_fee_rate(self._conf_target)
self._log.debug(f'Fee rate: {fee_rate}, source: {fee_src}, block target: {self._conf_target}')
options = {
'lockUnspents': lock_unspents,
'feeRate': fee_rate,
}
if sub_fee:
options['subtractFeeFromOutputs'] = [0,]
return self.rpc('fundrawtransaction', [txn, options])['hex']
def createRawSignedTransaction(self, addr_to, amount) -> str:
txn_funded = self.createRawFundedTransaction(addr_to, amount)
return self.rpc('signrawtransaction', [txn_funded])['hex']
def getScriptForPubkeyHash(self, pkh: bytes) -> bytearray:
# Return P2PKH
return CScript([OP_DUP, OP_HASH160, pkh, OP_EQUALVERIFY, OP_CHECKSIG])
def getScriptDest(self, script: bytearray) -> bytearray:
# P2SH
script_hash = hash160(script)
assert len(script_hash) == 20
return CScript([OP_HASH160, script_hash, OP_EQUAL])
def getSeedHash(self, seed: bytes) -> bytes:
return hash160(seed)[::-1]
def encodeScriptDest(self, script_dest: bytes) -> str:
# Extract hash from script
script_hash = script_dest[2:-1]
return self.sh_to_address(script_hash)
def getDestForScriptHash(self, script_hash):
assert len(script_hash) == 20
return CScript([OP_HASH160, script_hash, OP_EQUAL])
def withdrawCoin(self, value, addr_to, subfee):
params = [addr_to, value, '', '', subfee]
return self.rpc('sendtoaddress', params)
def getWalletSeedID(self):
return self.rpc('getwalletinfo')['hdmasterkeyid']
def getSpendableBalance(self) -> int:
return self.make_int(self.rpc('getwalletinfo')['balance'])
def getBLockSpendTxFee(self, tx, fee_rate: int) -> int:
add_bytes = 107
size = len(tx.serialize_with_witness()) + add_bytes
pay_fee = round(fee_rate * size / 1000)
self._log.info(f'BLockSpendTx fee_rate, size, fee: {fee_rate}, {size}, {pay_fee}.')
return pay_fee
def signTxWithKey(self, tx: bytes, key: bytes) -> bytes:
key_wif = self.encodeKey(key)
rv = self.rpc('signrawtransaction', [tx.hex(), [], [key_wif, ]])
return bytes.fromhex(rv['hex'])
def findTxnByHash(self, txid_hex: str):
# Only works for wallet txns
try:
rv = self.rpc('gettransaction', [txid_hex])
except Exception as ex:
self._log.debug('findTxnByHash getrawtransaction failed: {}'.format(txid_hex))
return None
if 'confirmations' in rv and rv['confirmations'] >= self.blocks_confirmed:
block_height = self.getBlockHeader(rv['blockhash'])['height']
return {'txid': txid_hex, 'amount': 0, 'height': block_height}
return None
def getProofOfFunds(self, amount_for, extra_commit_bytes):
# TODO: Lock unspent and use same output/s to fund bid
unspents_by_addr = dict()
unspents = self.rpc('listunspent')
for u in unspents:
if u['spendable'] is not True:
continue
if u['address'] not in unspents_by_addr:
unspents_by_addr[u['address']] = {'total': 0, 'utxos': []}
utxo_amount: int = self.make_int(u['amount'], r=1)
unspents_by_addr[u['address']]['total'] += utxo_amount
unspents_by_addr[u['address']]['utxos'].append((utxo_amount, u['txid'], u['vout']))
max_utxos: int = 4
viable_addrs = []
for addr, data in unspents_by_addr.items():
if data['total'] >= amount_for:
# Sort from largest to smallest amount
sorted_utxos = sorted(data['utxos'], key=lambda x: x[0])
# Max outputs required to reach amount_for
utxos_req: int = 0
sum_value: int = 0
for utxo in sorted_utxos:
sum_value += utxo[0]
utxos_req += 1
if sum_value >= amount_for:
break
if utxos_req <= max_utxos:
viable_addrs.append(addr)
continue
ensure(len(viable_addrs) > 0, 'Could not find address with enough funds for proof')
sign_for_addr: str = random.choice(viable_addrs)
self._log.debug('sign_for_addr %s', sign_for_addr)
prove_utxos = []
sorted_utxos = sorted(unspents_by_addr[sign_for_addr]['utxos'], key=lambda x: x[0])
hasher = hashlib.sha256()
sum_value: int = 0
for utxo in sorted_utxos:
sum_value += utxo[0]
outpoint = (bytes.fromhex(utxo[1]), utxo[2])
prove_utxos.append(outpoint)
hasher.update(outpoint[0])
hasher.update(outpoint[1].to_bytes(2, 'big'))
if sum_value >= amount_for:
break
utxos_hash = hasher.digest()
self._log.debug('sign_for_addr %s', sign_for_addr)
if self.using_segwit(): # TODO: Use isSegwitAddress when scantxoutset can use combo
# 'Address does not refer to key' for non p2pkh
pkh = self.decodeAddress(sign_for_addr)
sign_for_addr = self.pkh_to_address(pkh)
self._log.debug('sign_for_addr converted %s', sign_for_addr)
signature = self.rpc('signmessage', [sign_for_addr, sign_for_addr + '_swap_proof_' + utxos_hash.hex() + extra_commit_bytes.hex()])
return (sign_for_addr, signature, prove_utxos)
def verifyProofOfFunds(self, address, signature, utxos, extra_commit_bytes):
hasher = hashlib.sha256()
sum_value: int = 0
for outpoint in utxos:
hasher.update(outpoint[0])
hasher.update(outpoint[1].to_bytes(2, 'big'))
utxos_hash = hasher.digest()
passed = self.verifyMessage(address, address + '_swap_proof_' + utxos_hash.hex() + extra_commit_bytes.hex(), signature)
ensure(passed is True, 'Proof of funds signature invalid')
if self.using_segwit():
address = self.encodeSegwitAddress(decodeAddress(address)[1:])
sum_value: int = 0
for outpoint in utxos:
txout = self.rpc('gettxout', [outpoint[0].hex(), outpoint[1]])
sum_value += self.make_int(txout['value'])
return sum_value
def rescanBlockchainForAddress(self, height_start: int, addr_find: str):
# Very ugly workaround for missing `rescanblockchain` rpc command
chain_blocks: int = self.getChainHeight()
current_height: int = chain_blocks
block_hash = self.rpc('getblockhash', [current_height])
script_hash: bytes = self.decodeAddress(addr_find)
find_scriptPubKey = self.getDestForScriptHash(script_hash)
while current_height > height_start:
block_hash = self.rpc('getblockhash', [current_height])
block = self.rpc('getblock', [block_hash, False])
decoded_block = CBlock()
decoded_block = FromHex(decoded_block, block)
for tx in decoded_block.vtx:
for txo in tx.vout:
if txo.scriptPubKey == find_scriptPubKey:
tx.rehash()
txid = i2b(tx.sha256)
self._log.info('Found output to addr: {} in tx {} in block {}'.format(addr_find, txid.hex(), block_hash))
self._log.info('rescanblockchain hack invalidateblock {}'.format(block_hash))
self.rpc('invalidateblock', [block_hash])
self.rpc('reconsiderblock', [block_hash])
return
current_height -= 1
def getBlockWithTxns(self, block_hash):
# TODO: Bypass decoderawtransaction and getblockheader
block = self.rpc('getblock', [block_hash, False])
block_header = self.rpc('getblockheader', [block_hash])
decoded_block = CBlock()
decoded_block = FromHex(decoded_block, block)
tx_rv = []
for tx in decoded_block.vtx:
tx_hex = tx.serialize_with_witness().hex()
tx_dec = self.rpc('decoderawtransaction', [tx_hex])
if 'hex' not in tx_dec:
tx_dec['hex'] = tx_hex
tx_rv.append(tx_dec)
block_rv = {
'hash': block_hash,
'tx': tx_rv,
'confirmations': block_header['confirmations'],
'height': block_header['height'],
'version': block_header['version'],
'merkleroot': block_header['merkleroot'],
}
return block_rv