#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (c) 2020-2024 tecnovert # Distributed under the MIT software license, see the accompanying # file LICENSE or http://www.opensource.org/licenses/mit-license.php. import base64 import hashlib import json import logging import traceback from io import BytesIO from basicswap.basicswap_util import ( getVoutByAddress, getVoutByScriptPubKey, ) from basicswap.contrib.test_framework import ( segwit_addr, ) from basicswap.interface.base import ( Secp256k1Interface, ) from basicswap.util import ( ensure, b2h, i2b, b2i, i2h, ) from basicswap.util.ecc import ( pointToCPK, CPKToPoint, ) from basicswap.util.script import ( decodeScriptNum, getCompactSizeLen, SerialiseNumCompact, getWitnessElementLen, ) from basicswap.util.address import ( toWIF, b58encode, decodeWif, decodeAddress, pubkeyToAddress, ) from basicswap.util.crypto import ( hash160, sha256, ) from coincurve.keys import ( PrivateKey, PublicKey, ) from coincurve.ecdsaotves import ( ecdsaotves_enc_sign, ecdsaotves_enc_verify, ecdsaotves_dec_sig, ecdsaotves_rec_enc_key ) from basicswap.contrib.test_framework.messages import ( COIN, COutPoint, CTransaction, CTxIn, CTxInWitness, CTxOut, ) from basicswap.contrib.test_framework.script import ( CScript, CScriptOp, OP_IF, OP_ELSE, OP_ENDIF, OP_0, OP_2, OP_CHECKSIG, OP_CHECKMULTISIG, OP_CHECKSEQUENCEVERIFY, OP_DROP, OP_HASH160, OP_EQUAL, SIGHASH_ALL, SegwitV0SignatureHash, ) from basicswap.basicswap_util import ( TxLockTypes ) from basicswap.chainparams import Coins from basicswap.rpc import make_rpc_func, openrpc SEQUENCE_LOCKTIME_GRANULARITY = 9 # 512 seconds SEQUENCE_LOCKTIME_TYPE_FLAG = (1 << 22) SEQUENCE_LOCKTIME_MASK = 0x0000ffff def ensure_op(v, err_string='Bad opcode'): ensure(v, err_string) def findOutput(tx, script_pk: bytes): for i in range(len(tx.vout)): if tx.vout[i].scriptPubKey == script_pk: return i return None def find_vout_for_address_from_txobj(tx_obj, addr: str) -> int: """ Locate the vout index of the given transaction sending to the given address. Raises runtime error exception if not found. """ for i in range(len(tx_obj["vout"])): scriptPubKey = tx_obj["vout"][i]["scriptPubKey"] if "addresses" in scriptPubKey: if any([addr == a for a in scriptPubKey["addresses"]]): return i elif "address" in scriptPubKey: if addr == scriptPubKey["address"]: return i raise RuntimeError("Vout not found for address: txid={}, addr={}".format(tx_obj['txid'], addr)) def extractScriptLockScriptValues(script_bytes: bytes) -> (bytes, bytes): script_len = len(script_bytes) ensure(script_len == 71, 'Bad script length') o = 0 ensure_op(script_bytes[o] == OP_2) ensure_op(script_bytes[o + 1] == 33) o += 2 pk1 = script_bytes[o: o + 33] o += 33 ensure_op(script_bytes[o] == 33) o += 1 pk2 = script_bytes[o: o + 33] o += 33 ensure_op(script_bytes[o] == OP_2) ensure_op(script_bytes[o + 1] == OP_CHECKMULTISIG) return pk1, pk2 def extractScriptLockRefundScriptValues(script_bytes: bytes): script_len = len(script_bytes) ensure(script_len > 73, 'Bad script length') ensure_op(script_bytes[0] == OP_IF) ensure_op(script_bytes[1] == OP_2) ensure_op(script_bytes[2] == 33) pk1 = script_bytes[3: 3 + 33] ensure_op(script_bytes[36] == 33) pk2 = script_bytes[37: 37 + 33] ensure_op(script_bytes[70] == OP_2) ensure_op(script_bytes[71] == OP_CHECKMULTISIG) ensure_op(script_bytes[72] == OP_ELSE) o = 73 csv_val, nb = decodeScriptNum(script_bytes, o) o += nb ensure(script_len == o + 5 + 33, 'Bad script length') # Fails if script too long ensure_op(script_bytes[o] == OP_CHECKSEQUENCEVERIFY) o += 1 ensure_op(script_bytes[o] == OP_DROP) o += 1 ensure_op(script_bytes[o] == 33) o += 1 pk3 = script_bytes[o: o + 33] o += 33 ensure_op(script_bytes[o] == OP_CHECKSIG) o += 1 ensure_op(script_bytes[o] == OP_ENDIF) return pk1, pk2, csv_val, pk3 class BTCInterface(Secp256k1Interface): @staticmethod def coin_type(): return Coins.BTC @staticmethod def COIN(): return COIN @staticmethod def exp() -> int: return 8 @staticmethod def nbk() -> int: return 32 @staticmethod def nbK() -> int: # No. of bytes requires to encode a public key return 33 @staticmethod def witnessScaleFactor() -> int: return 4 @staticmethod def txVersion() -> int: return 2 @staticmethod def getTxOutputValue(tx) -> int: rv = 0 for output in tx.vout: rv += output.nValue return rv @staticmethod def xmr_swap_a_lock_spend_tx_vsize() -> int: return 147 @staticmethod def xmr_swap_b_lock_spend_tx_vsize() -> int: return 110 @staticmethod def txoType(): return CTxOut @staticmethod def getExpectedSequence(lockType: int, lockVal: int) -> int: ensure(lockVal >= 1, 'Bad lockVal') if lockType == TxLockTypes.SEQUENCE_LOCK_BLOCKS: return lockVal if lockType == TxLockTypes.SEQUENCE_LOCK_TIME: secondsLocked = lockVal # Ensure the locked time is never less than lockVal if secondsLocked % (1 << SEQUENCE_LOCKTIME_GRANULARITY) != 0: secondsLocked += (1 << SEQUENCE_LOCKTIME_GRANULARITY) secondsLocked >>= SEQUENCE_LOCKTIME_GRANULARITY return secondsLocked | SEQUENCE_LOCKTIME_TYPE_FLAG raise ValueError('Unknown lock type') @staticmethod def decodeSequence(lock_value: int) -> int: # Return the raw value if lock_value & SEQUENCE_LOCKTIME_TYPE_FLAG: return (lock_value & SEQUENCE_LOCKTIME_MASK) << SEQUENCE_LOCKTIME_GRANULARITY return lock_value & SEQUENCE_LOCKTIME_MASK @staticmethod def depth_spendable() -> int: return 0 def __init__(self, coin_settings, network, swap_client=None): super().__init__(network) self._rpc_host = coin_settings.get('rpchost', '127.0.0.1') self._rpcport = coin_settings['rpcport'] self._rpcauth = coin_settings['rpcauth'] self.rpc = make_rpc_func(self._rpcport, self._rpcauth, host=self._rpc_host) self._rpc_wallet = 'wallet.dat' self.rpc_wallet = make_rpc_func(self._rpcport, self._rpcauth, host=self._rpc_host, wallet=self._rpc_wallet) self.blocks_confirmed = coin_settings['blocks_confirmed'] self.setConfTarget(coin_settings['conf_target']) self._use_segwit = coin_settings['use_segwit'] self._connection_type = coin_settings['connection_type'] self._sc = swap_client self._log = self._sc.log if self._sc and self._sc.log else logging self._expect_seedid_hex = None def open_rpc(self, wallet=None): return openrpc(self._rpcport, self._rpcauth, wallet=wallet, host=self._rpc_host) def json_request(self, rpc_conn, method, params): try: v = rpc_conn.json_request(method, params) r = json.loads(v.decode('utf-8')) except Exception as ex: traceback.print_exc() raise ValueError('RPC Server Error ' + str(ex)) if 'error' in r and r['error'] is not None: raise ValueError('RPC error ' + str(r['error'])) return r['result'] def close_rpc(self, rpc_conn): rpc_conn.close() def checkWallets(self) -> int: wallets = self.rpc('listwallets') # Wallet name is "" for some LTC and PART installs on older cores if self._rpc_wallet not in wallets and len(wallets) > 0: self._log.debug('Changing {} wallet name.'.format(self.ticker())) for wallet_name in wallets: # Skip over other expected wallets if wallet_name in ('mweb', ): continue self._rpc_wallet = wallet_name self._log.info('Switched {} wallet name to {}.'.format(self.ticker(), self._rpc_wallet)) self.rpc_wallet = make_rpc_func(self._rpcport, self._rpcauth, host=self._rpc_host, wallet=self._rpc_wallet) break return len(wallets) def testDaemonRPC(self, with_wallet=True) -> None: self.rpc_wallet('getwalletinfo' if with_wallet else 'getblockchaininfo') def getDaemonVersion(self): return self.rpc('getnetworkinfo')['version'] def getBlockchainInfo(self): return self.rpc('getblockchaininfo') def getChainHeight(self) -> int: return self.rpc('getblockcount') def getMempoolTx(self, txid): return self.rpc('getrawtransaction', [txid.hex()]) def getBlockHeaderFromHeight(self, height): block_hash = self.rpc('getblockhash', [height]) return self.rpc('getblockheader', [block_hash]) def getBlockHeader(self, block_hash): return self.rpc('getblockheader', [block_hash]) def getBlockHeaderAt(self, time: int, block_after=False): blockchaininfo = self.rpc('getblockchaininfo') last_block_header = self.rpc('getblockheader', [blockchaininfo['bestblockhash']]) max_tries = 5000 for i in range(max_tries): prev_block_header = self.rpc('getblockheader', [last_block_header['previousblockhash']]) if prev_block_header['time'] <= time: return last_block_header if block_after else prev_block_header last_block_header = prev_block_header raise ValueError(f'Block header not found at time: {time}') def initialiseWallet(self, key_bytes: bytes) -> None: key_wif = self.encodeKey(key_bytes) self.rpc_wallet('sethdseed', [True, key_wif]) def getWalletInfo(self): rv = self.rpc_wallet('getwalletinfo') rv['encrypted'] = 'unlocked_until' in rv rv['locked'] = rv.get('unlocked_until', 1) <= 0 rv['locked_utxos'] = len(self.rpc_wallet('listlockunspent')) return rv def getWalletRestoreHeight(self) -> int: start_time = self.rpc_wallet('getwalletinfo')['keypoololdest'] blockchaininfo = self.getBlockchainInfo() best_block = blockchaininfo['bestblockhash'] chain_synced = round(blockchaininfo['verificationprogress'], 3) if chain_synced < 1.0: raise ValueError('{} chain isn\'t synced.'.format(self.coin_name())) self._log.debug('Finding block at time: {}'.format(start_time)) rpc_conn = self.open_rpc() try: block_hash = best_block while True: block_header = self.json_request(rpc_conn, 'getblockheader', [block_hash]) if block_header['time'] < start_time: return block_header['height'] block_hash = block_header['previousblockhash'] finally: self.close_rpc(rpc_conn) raise ValueError('{} wallet restore height not found.'.format(self.coin_name())) def getWalletSeedID(self) -> str: wi = self.rpc_wallet('getwalletinfo') return 'Not found' if 'hdseedid' not in wi else wi['hdseedid'] def checkExpectedSeed(self, expect_seedid) -> bool: self._expect_seedid_hex = expect_seedid return expect_seedid == self.getWalletSeedID() def getNewAddress(self, use_segwit: bool, label: str = 'swap_receive') -> str: args = [label] if use_segwit: args.append('bech32') return self.rpc_wallet('getnewaddress', args) def isValidAddress(self, address: str) -> bool: try: rv = self.rpc_wallet('validateaddress', [address]) if rv['isvalid'] is True: return True except Exception as ex: self._log.debug('validateaddress failed: {}'.format(address)) return False def isAddressMine(self, address: str, or_watch_only: bool = False) -> bool: addr_info = self.rpc_wallet('getaddressinfo', [address]) if not or_watch_only: return addr_info['ismine'] return addr_info['ismine'] or addr_info['iswatchonly'] def checkAddressMine(self, address: str) -> None: addr_info = self.rpc_wallet('getaddressinfo', [address]) ensure(addr_info['ismine'], 'ismine is false') if self.sc._restrict_unknown_seed_wallets: ensure(addr_info['hdseedid'] == self._expect_seedid_hex, 'unexpected seedid') def get_fee_rate(self, conf_target: int = 2) -> (float, str): chain_client_settings = self._sc.getChainClientSettings(self.coin_type()) # basicswap.json override_feerate = chain_client_settings.get('override_feerate', None) if override_feerate: self._log.debug('Fee rate override used for %s: %f', self.coin_name(), override_feerate) return override_feerate, 'override_feerate' min_relay_fee = chain_client_settings.get('min_relay_fee', None) def try_get_fee_rate(self, conf_target): try: fee_rate: float = self.rpc_wallet('estimatesmartfee', [conf_target])['feerate'] assert (fee_rate > 0.0), 'Negative feerate' return fee_rate, 'estimatesmartfee' except Exception: try: fee_rate: float = self.rpc_wallet('getwalletinfo')['paytxfee'] assert (fee_rate > 0.0), 'Non positive feerate' return fee_rate, 'paytxfee' except Exception: fee_rate: float = self.rpc('getnetworkinfo')['relayfee'] return fee_rate, 'relayfee' fee_rate, rate_src = try_get_fee_rate(self, conf_target) if min_relay_fee and min_relay_fee > fee_rate: self._log.warning('Feerate {} ({}) is below min relay fee {} for {}'.format(self.format_amount(fee_rate, True, 1), rate_src, self.format_amount(min_relay_fee, True, 1), self.coin_name())) return min_relay_fee, 'min_relay_fee' return fee_rate, rate_src def isSegwitAddress(self, address: str) -> bool: return address.startswith(self.chainparams_network()['hrp'] + '1') def decodeAddress(self, address: str) -> bytes: bech32_prefix = self.chainparams_network()['hrp'] if len(bech32_prefix) > 0 and address.startswith(bech32_prefix + '1'): return bytes(segwit_addr.decode(bech32_prefix, address)[1]) return decodeAddress(address)[1:] def pubkey_to_segwit_address(self, pk: bytes) -> str: bech32_prefix = self.chainparams_network()['hrp'] version = 0 pkh = hash160(pk) return segwit_addr.encode(bech32_prefix, version, pkh) def pkh_to_address(self, pkh: bytes) -> str: # pkh is ripemd160(sha256(pk)) assert (len(pkh) == 20) prefix = self.chainparams_network()['pubkey_address'] data = bytes((prefix,)) + pkh checksum = sha256(sha256(data)) return b58encode(data + checksum[0:4]) def sh_to_address(self, sh: bytes) -> str: assert (len(sh) == 20) prefix = self.chainparams_network()['script_address'] data = bytes((prefix,)) + sh checksum = hashlib.sha256(hashlib.sha256(data).digest()).digest() return b58encode(data + checksum[0:4]) def encode_p2wsh(self, script: bytes) -> str: bech32_prefix = self.chainparams_network()['hrp'] version = 0 program = script[2:] # strip version and length return segwit_addr.encode(bech32_prefix, version, program) def encodeScriptDest(self, script: bytes) -> str: return self.encode_p2wsh(script) def encode_p2sh(self, script: bytes) -> str: return pubkeyToAddress(self.chainparams_network()['script_address'], script) def pubkey_to_address(self, pk: bytes) -> str: assert (len(pk) == 33) return self.pkh_to_address(hash160(pk)) def getAddressHashFromKey(self, key: bytes) -> bytes: pk = self.getPubkey(key) return hash160(pk) def getSeedHash(self, seed) -> bytes: return self.getAddressHashFromKey(seed)[::-1] def encodeKey(self, key_bytes: bytes) -> str: wif_prefix = self.chainparams_network()['key_prefix'] return toWIF(wif_prefix, key_bytes) def encodePubkey(self, pk: bytes) -> bytes: return pointToCPK(pk) def encodeSegwitAddress(self, key_hash: bytes) -> str: return segwit_addr.encode(self.chainparams_network()['hrp'], 0, key_hash) def decodeSegwitAddress(self, addr: str) -> bytes: return bytes(segwit_addr.decode(self.chainparams_network()['hrp'], addr)[1]) def decodePubkey(self, pke): return CPKToPoint(pke) def decodeKey(self, k: str) -> bytes: return decodeWif(k) def getScriptForPubkeyHash(self, pkh: bytes) -> CScript: # p2wpkh return CScript([OP_0, pkh]) def loadTx(self, tx_bytes: bytes) -> CTransaction: # Load tx from bytes to internal representation tx = CTransaction() tx.deserialize(BytesIO(tx_bytes)) return tx def createSCLockTx(self, value: int, script: bytearray, vkbv: bytes = None) -> bytes: tx = CTransaction() tx.nVersion = self.txVersion() tx.vout.append(self.txoType()(value, self.getScriptDest(script))) return tx.serialize() def fundSCLockTx(self, tx_bytes, feerate, vkbv=None): return self.fundTx(tx_bytes, feerate) def genScriptLockRefundTxScript(self, Kal, Kaf, csv_val) -> CScript: Kal_enc = Kal if len(Kal) == 33 else self.encodePubkey(Kal) Kaf_enc = Kaf if len(Kaf) == 33 else self.encodePubkey(Kaf) return CScript([ CScriptOp(OP_IF), 2, Kal_enc, Kaf_enc, 2, CScriptOp(OP_CHECKMULTISIG), CScriptOp(OP_ELSE), csv_val, CScriptOp(OP_CHECKSEQUENCEVERIFY), CScriptOp(OP_DROP), Kaf_enc, CScriptOp(OP_CHECKSIG), CScriptOp(OP_ENDIF)]) def createSCLockRefundTx(self, tx_lock_bytes, script_lock, Kal, Kaf, lock1_value, csv_val, tx_fee_rate, vkbv=None): tx_lock = CTransaction() tx_lock = self.loadTx(tx_lock_bytes) output_script = self.getScriptDest(script_lock) locked_n = findOutput(tx_lock, output_script) ensure(locked_n is not None, 'Output not found in tx') locked_coin = tx_lock.vout[locked_n].nValue tx_lock.rehash() tx_lock_id_int = tx_lock.sha256 refund_script = self.genScriptLockRefundTxScript(Kal, Kaf, csv_val) tx = CTransaction() tx.nVersion = self.txVersion() tx.vin.append(CTxIn(COutPoint(tx_lock_id_int, locked_n), nSequence=lock1_value, scriptSig=self.getScriptScriptSig(script_lock))) tx.vout.append(self.txoType()(locked_coin, self.getScriptDest(refund_script))) dummy_witness_stack = self.getScriptLockTxDummyWitness(script_lock) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(tx, add_witness_bytes=witness_bytes) pay_fee = round(tx_fee_rate * vsize / 1000) tx.vout[0].nValue = locked_coin - pay_fee tx.rehash() self._log.info('createSCLockRefundTx %s:\n fee_rate, vsize, fee: %ld, %ld, %ld.', i2h(tx.sha256), tx_fee_rate, vsize, pay_fee) return tx.serialize(), refund_script, tx.vout[0].nValue def createSCLockRefundSpendTx(self, tx_lock_refund_bytes, script_lock_refund, pkh_refund_to, tx_fee_rate, vkbv=None): # Returns the coinA locked coin to the leader # The follower will sign the multisig path with a signature encumbered by the leader's coinB spend pubkey # If the leader publishes the decrypted signature the leader's coinB spend privatekey will be revealed to the follower tx_lock_refund = self.loadTx(tx_lock_refund_bytes) output_script = self.getScriptDest(script_lock_refund) locked_n = findOutput(tx_lock_refund, output_script) ensure(locked_n is not None, 'Output not found in tx') locked_coin = tx_lock_refund.vout[locked_n].nValue tx_lock_refund.rehash() tx_lock_refund_hash_int = tx_lock_refund.sha256 tx = CTransaction() tx.nVersion = self.txVersion() tx.vin.append(CTxIn(COutPoint(tx_lock_refund_hash_int, locked_n), nSequence=0, scriptSig=self.getScriptScriptSig(script_lock_refund))) tx.vout.append(self.txoType()(locked_coin, self.getScriptForPubkeyHash(pkh_refund_to))) dummy_witness_stack = self.getScriptLockRefundSpendTxDummyWitness(script_lock_refund) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(tx, add_witness_bytes=witness_bytes) pay_fee = round(tx_fee_rate * vsize / 1000) tx.vout[0].nValue = locked_coin - pay_fee tx.rehash() self._log.info('createSCLockRefundSpendTx %s:\n fee_rate, vsize, fee: %ld, %ld, %ld.', i2h(tx.sha256), tx_fee_rate, vsize, pay_fee) return tx.serialize() def createSCLockRefundSpendToFTx(self, tx_lock_refund_bytes, script_lock_refund, pkh_dest, tx_fee_rate, vkbv=None): # lock refund swipe tx # Sends the coinA locked coin to the follower tx_lock_refund = self.loadTx(tx_lock_refund_bytes) output_script = self.getScriptDest(script_lock_refund) locked_n = findOutput(tx_lock_refund, output_script) ensure(locked_n is not None, 'Output not found in tx') locked_coin = tx_lock_refund.vout[locked_n].nValue A, B, lock2_value, C = extractScriptLockRefundScriptValues(script_lock_refund) tx_lock_refund.rehash() tx_lock_refund_hash_int = tx_lock_refund.sha256 tx = CTransaction() tx.nVersion = self.txVersion() tx.vin.append(CTxIn(COutPoint(tx_lock_refund_hash_int, locked_n), nSequence=lock2_value, scriptSig=self.getScriptScriptSig(script_lock_refund))) tx.vout.append(self.txoType()(locked_coin, self.getScriptForPubkeyHash(pkh_dest))) dummy_witness_stack = self.getScriptLockRefundSwipeTxDummyWitness(script_lock_refund) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(tx, add_witness_bytes=witness_bytes) pay_fee = round(tx_fee_rate * vsize / 1000) tx.vout[0].nValue = locked_coin - pay_fee tx.rehash() self._log.info('createSCLockRefundSpendToFTx %s:\n fee_rate, vsize, fee: %ld, %ld, %ld.', i2h(tx.sha256), tx_fee_rate, vsize, pay_fee) return tx.serialize() def createSCLockSpendTx(self, tx_lock_bytes, script_lock, pkh_dest, tx_fee_rate, vkbv=None, fee_info={}): tx_lock = self.loadTx(tx_lock_bytes) output_script = self.getScriptDest(script_lock) locked_n = findOutput(tx_lock, output_script) ensure(locked_n is not None, 'Output not found in tx') locked_coin = tx_lock.vout[locked_n].nValue tx_lock.rehash() tx_lock_id_int = tx_lock.sha256 tx = CTransaction() tx.nVersion = self.txVersion() tx.vin.append(CTxIn(COutPoint(tx_lock_id_int, locked_n), scriptSig=self.getScriptScriptSig(script_lock))) tx.vout.append(self.txoType()(locked_coin, self.getScriptForPubkeyHash(pkh_dest))) dummy_witness_stack = self.getScriptLockTxDummyWitness(script_lock) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(tx, add_witness_bytes=witness_bytes) pay_fee = round(tx_fee_rate * vsize / 1000) tx.vout[0].nValue = locked_coin - pay_fee fee_info['fee_paid'] = pay_fee fee_info['rate_used'] = tx_fee_rate fee_info['witness_bytes'] = witness_bytes fee_info['vsize'] = vsize tx.rehash() self._log.info('createSCLockSpendTx %s:\n fee_rate, vsize, fee: %ld, %ld, %ld.', i2h(tx.sha256), tx_fee_rate, vsize, pay_fee) return tx.serialize() def verifySCLockTx(self, tx_bytes, script_out, swap_value, Kal, Kaf, feerate, check_lock_tx_inputs, vkbv=None): # Verify: # # Not necessary to check the lock txn is mineable, as protocol will wait for it to confirm # However by checking early we can avoid wasting time processing unmineable txns # Check fee is reasonable tx = self.loadTx(tx_bytes) txid = self.getTxid(tx) self._log.info('Verifying lock tx: {}.'.format(b2h(txid))) ensure(tx.nVersion == self.txVersion(), 'Bad version') ensure(tx.nLockTime == 0, 'Bad nLockTime') # TODO match txns created by cores script_pk = self.getScriptDest(script_out) locked_n = findOutput(tx, script_pk) ensure(locked_n is not None, 'Output not found in tx') locked_coin = tx.vout[locked_n].nValue # Check value ensure(locked_coin == swap_value, 'Bad locked value') # Check script A, B = extractScriptLockScriptValues(script_out) ensure(A == Kal, 'Bad script pubkey') ensure(B == Kaf, 'Bad script pubkey') if check_lock_tx_inputs: # TODO: Check that inputs are unspent # Verify fee rate inputs_value = 0 add_bytes = 0 add_witness_bytes = getCompactSizeLen(len(tx.vin)) for pi in tx.vin: ptx = self.rpc('getrawtransaction', [i2h(pi.prevout.hash), True]) prevout = ptx['vout'][pi.prevout.n] inputs_value += self.make_int(prevout['value']) prevout_type = prevout['scriptPubKey']['type'] if prevout_type == 'witness_v0_keyhash': add_witness_bytes += 107 # sig 72, pk 33 and 2 size bytes add_witness_bytes += getCompactSizeLen(107) else: # Assume P2PKH, TODO more types add_bytes += 107 # OP_PUSH72 OP_PUSH33 outputs_value = 0 for txo in tx.vout: outputs_value += txo.nValue fee_paid = inputs_value - outputs_value assert (fee_paid > 0) vsize = self.getTxVSize(tx, add_bytes, add_witness_bytes) fee_rate_paid = fee_paid * 1000 // vsize self._log.info('tx amount, vsize, feerate: %ld, %ld, %ld', locked_coin, vsize, fee_rate_paid) if not self.compareFeeRates(fee_rate_paid, feerate): self._log.warning('feerate paid doesn\'t match expected: %ld, %ld', fee_rate_paid, feerate) # TODO: Display warning to user return txid, locked_n def verifySCLockRefundTx(self, tx_bytes, lock_tx_bytes, script_out, prevout_id, prevout_n, prevout_seq, prevout_script, Kal, Kaf, csv_val_expect, swap_value, feerate, vkbv=None): # Verify: # Must have only one input with correct prevout and sequence # Must have only one output to the p2wsh of the lock refund script # Output value must be locked_coin - lock tx fee tx = self.loadTx(tx_bytes) txid = self.getTxid(tx) self._log.info('Verifying lock refund tx: {}.'.format(b2h(txid))) ensure(tx.nVersion == self.txVersion(), 'Bad version') ensure(tx.nLockTime == 0, 'nLockTime not 0') ensure(len(tx.vin) == 1, 'tx doesn\'t have one input') ensure(tx.vin[0].nSequence == prevout_seq, 'Bad input nSequence') ensure(tx.vin[0].scriptSig == self.getScriptScriptSig(prevout_script), 'Input scriptsig mismatch') ensure(tx.vin[0].prevout.hash == b2i(prevout_id) and tx.vin[0].prevout.n == prevout_n, 'Input prevout mismatch') ensure(len(tx.vout) == 1, 'tx doesn\'t have one output') script_pk = self.getScriptDest(script_out) locked_n = findOutput(tx, script_pk) ensure(locked_n is not None, 'Output not found in tx') locked_coin = tx.vout[locked_n].nValue # Check script and values A, B, csv_val, C = extractScriptLockRefundScriptValues(script_out) ensure(A == Kal, 'Bad script pubkey') ensure(B == Kaf, 'Bad script pubkey') ensure(csv_val == csv_val_expect, 'Bad script csv value') ensure(C == Kaf, 'Bad script pubkey') fee_paid = swap_value - locked_coin assert (fee_paid > 0) dummy_witness_stack = self.getScriptLockTxDummyWitness(prevout_script) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(tx, add_witness_bytes=witness_bytes) fee_rate_paid = fee_paid * 1000 // vsize self._log.info('tx amount, vsize, feerate: %ld, %ld, %ld', locked_coin, vsize, fee_rate_paid) if not self.compareFeeRates(fee_rate_paid, feerate): raise ValueError('Bad fee rate, expected: {}'.format(feerate)) return txid, locked_coin, locked_n def verifySCLockRefundSpendTx(self, tx_bytes, lock_refund_tx_bytes, lock_refund_tx_id, prevout_script, Kal, prevout_n, prevout_value, feerate, vkbv=None): # Verify: # Must have only one input with correct prevout (n is always 0) and sequence # Must have only one output sending lock refund tx value - fee to leader's address, TODO: follower shouldn't need to verify destination addr tx = self.loadTx(tx_bytes) txid = self.getTxid(tx) self._log.info('Verifying lock refund spend tx: {}.'.format(b2h(txid))) ensure(tx.nVersion == self.txVersion(), 'Bad version') ensure(tx.nLockTime == 0, 'nLockTime not 0') ensure(len(tx.vin) == 1, 'tx doesn\'t have one input') ensure(tx.vin[0].nSequence == 0, 'Bad input nSequence') ensure(tx.vin[0].scriptSig == self.getScriptScriptSig(prevout_script), 'Input scriptsig mismatch') ensure(tx.vin[0].prevout.hash == b2i(lock_refund_tx_id) and tx.vin[0].prevout.n == 0, 'Input prevout mismatch') ensure(len(tx.vout) == 1, 'tx doesn\'t have one output') # Destination doesn't matter to the follower ''' p2wpkh = CScript([OP_0, hash160(Kal)]) locked_n = findOutput(tx, p2wpkh) ensure(locked_n is not None, 'Output not found in lock refund spend tx') ''' tx_value = tx.vout[0].nValue fee_paid = prevout_value - tx_value assert (fee_paid > 0) dummy_witness_stack = self.getScriptLockRefundSpendTxDummyWitness(prevout_script) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(tx, add_witness_bytes=witness_bytes) fee_rate_paid = fee_paid * 1000 // vsize self._log.info('tx amount, vsize, feerate: %ld, %ld, %ld', tx_value, vsize, fee_rate_paid) if not self.compareFeeRates(fee_rate_paid, feerate): raise ValueError('Bad fee rate, expected: {}'.format(feerate)) return True def verifySCLockSpendTx(self, tx_bytes, lock_tx_bytes, lock_tx_script, a_pkhash_f, feerate, vkbv=None): # Verify: # Must have only one input with correct prevout (n is always 0) and sequence # Must have only one output with destination and amount tx = self.loadTx(tx_bytes) txid = self.getTxid(tx) self._log.info('Verifying lock spend tx: {}.'.format(b2h(txid))) ensure(tx.nVersion == self.txVersion(), 'Bad version') ensure(tx.nLockTime == 0, 'nLockTime not 0') ensure(len(tx.vin) == 1, 'tx doesn\'t have one input') lock_tx = self.loadTx(lock_tx_bytes) lock_tx_id = self.getTxid(lock_tx) output_script = self.getScriptDest(lock_tx_script) locked_n = findOutput(lock_tx, output_script) ensure(locked_n is not None, 'Output not found in tx') locked_coin = lock_tx.vout[locked_n].nValue ensure(tx.vin[0].nSequence == 0, 'Bad input nSequence') ensure(tx.vin[0].scriptSig == self.getScriptScriptSig(lock_tx_script), 'Input scriptsig mismatch') ensure(tx.vin[0].prevout.hash == b2i(lock_tx_id) and tx.vin[0].prevout.n == locked_n, 'Input prevout mismatch') ensure(len(tx.vout) == 1, 'tx doesn\'t have one output') p2wpkh = self.getScriptForPubkeyHash(a_pkhash_f) ensure(tx.vout[0].scriptPubKey == p2wpkh, 'Bad output destination') # The value of the lock tx output should already be verified, if the fee is as expected the difference will be the correct amount fee_paid = locked_coin - tx.vout[0].nValue assert (fee_paid > 0) dummy_witness_stack = self.getScriptLockTxDummyWitness(lock_tx_script) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(tx, add_witness_bytes=witness_bytes) fee_rate_paid = fee_paid * 1000 // vsize self._log.info('tx amount, vsize, feerate: %ld, %ld, %ld', tx.vout[0].nValue, vsize, fee_rate_paid) if not self.compareFeeRates(fee_rate_paid, feerate): raise ValueError('Bad fee rate, expected: {}'.format(feerate)) return True def signTx(self, key_bytes: bytes, tx_bytes: bytes, input_n: int, prevout_script: bytes, prevout_value: int) -> bytes: tx = self.loadTx(tx_bytes) sig_hash = SegwitV0SignatureHash(prevout_script, tx, input_n, SIGHASH_ALL, prevout_value) eck = PrivateKey(key_bytes) return eck.sign(sig_hash, hasher=None) + bytes((SIGHASH_ALL,)) def signTxOtVES(self, key_sign: bytes, pubkey_encrypt: bytes, tx_bytes: bytes, input_n: int, prevout_script: bytes, prevout_value: int) -> bytes: tx = self.loadTx(tx_bytes) sig_hash = SegwitV0SignatureHash(prevout_script, tx, input_n, SIGHASH_ALL, prevout_value) return ecdsaotves_enc_sign(key_sign, pubkey_encrypt, sig_hash) def verifyTxOtVES(self, tx_bytes: bytes, ct: bytes, Ks: bytes, Ke: bytes, input_n: int, prevout_script: bytes, prevout_value): tx = self.loadTx(tx_bytes) sig_hash = SegwitV0SignatureHash(prevout_script, tx, input_n, SIGHASH_ALL, prevout_value) return ecdsaotves_enc_verify(Ks, Ke, sig_hash, ct) def decryptOtVES(self, k: bytes, esig: bytes) -> bytes: return ecdsaotves_dec_sig(k, esig) + bytes((SIGHASH_ALL,)) def recoverEncKey(self, esig, sig, K): return ecdsaotves_rec_enc_key(K, esig, sig[:-1]) # Strip sighash type def verifyTxSig(self, tx_bytes: bytes, sig: bytes, K: bytes, input_n: int, prevout_script: bytes, prevout_value: int) -> bool: tx = self.loadTx(tx_bytes) sig_hash = SegwitV0SignatureHash(prevout_script, tx, input_n, SIGHASH_ALL, prevout_value) pubkey = PublicKey(K) return pubkey.verify(sig[: -1], sig_hash, hasher=None) # Pop the hashtype byte def fundTx(self, tx: bytes, feerate) -> bytes: feerate_str = self.format_amount(feerate) # TODO: unlock unspents if bid cancelled options = { 'lockUnspents': True, 'feeRate': feerate_str, } rv = self.rpc_wallet('fundrawtransaction', [tx.hex(), options]) return bytes.fromhex(rv['hex']) def listInputs(self, tx_bytes: bytes): tx = self.loadTx(tx_bytes) all_locked = self.rpc_wallet('listlockunspent') inputs = [] for pi in tx.vin: txid_hex = i2h(pi.prevout.hash) islocked = any([txid_hex == a['txid'] and pi.prevout.n == a['vout'] for a in all_locked]) inputs.append({'txid': txid_hex, 'vout': pi.prevout.n, 'islocked': islocked}) return inputs def unlockInputs(self, tx_bytes): tx = self.loadTx(tx_bytes) inputs = [] for pi in tx.vin: inputs.append({'txid': i2h(pi.prevout.hash), 'vout': pi.prevout.n}) self.rpc_wallet('lockunspent', [True, inputs]) def signTxWithWallet(self, tx: bytes) -> bytes: rv = self.rpc_wallet('signrawtransactionwithwallet', [tx.hex()]) return bytes.fromhex(rv['hex']) def signTxWithKey(self, tx: bytes, key: bytes) -> bytes: key_wif = self.encodeKey(key) rv = self.rpc('signrawtransactionwithkey', [tx.hex(), [key_wif, ]]) return bytes.fromhex(rv['hex']) def publishTx(self, tx: bytes): return self.rpc('sendrawtransaction', [tx.hex()]) def encodeTx(self, tx) -> bytes: return tx.serialize() def getTxid(self, tx) -> bytes: if isinstance(tx, str): tx = bytes.fromhex(tx) if isinstance(tx, bytes): tx = self.loadTx(tx) tx.rehash() return i2b(tx.sha256) def getTxOutputPos(self, tx, script): if isinstance(tx, bytes): tx = self.loadTx(tx) script_pk = self.getScriptDest(script) return findOutput(tx, script_pk) def getPubkeyHash(self, K: bytes) -> bytes: return hash160(K) def getScriptDest(self, script): return CScript([OP_0, sha256(script)]) def getScriptScriptSig(self, script: bytes) -> bytes: return bytes() def getP2SHP2WSHDest(self, script): script_hash = sha256(script) assert len(script_hash) == 32 p2wsh_hash = hash160(CScript([OP_0, script_hash])) assert len(p2wsh_hash) == 20 return CScript([OP_HASH160, p2wsh_hash, OP_EQUAL]) def getP2SHP2WSHScriptSig(self, script): script_hash = sha256(script) assert len(script_hash) == 32 return CScript([CScript([OP_0, script_hash, ]), ]) def getPkDest(self, K: bytes) -> bytearray: return self.getScriptForPubkeyHash(self.getPubkeyHash(K)) def scanTxOutset(self, dest): return self.rpc('scantxoutset', ['start', ['raw({})'.format(dest.hex())]]) def getTransaction(self, txid: bytes): try: return bytes.fromhex(self.rpc('getrawtransaction', [txid.hex()])) except Exception as ex: # TODO: filter errors return None def getWalletTransaction(self, txid: bytes): try: return bytes.fromhex(self.rpc_wallet('gettransaction', [txid.hex()])) except Exception as ex: # TODO: filter errors return None def setTxSignature(self, tx_bytes: bytes, stack) -> bytes: tx = self.loadTx(tx_bytes) tx.wit.vtxinwit.clear() tx.wit.vtxinwit.append(CTxInWitness()) tx.wit.vtxinwit[0].scriptWitness.stack = stack return tx.serialize() def setTxScriptSig(self, tx_bytes: bytes, input_no: int, script_sig: bytes) -> bytes: tx = self.loadTx(tx_bytes) tx.vin[0].scriptSig = script_sig return tx.serialize() def stripTxSignature(self, tx_bytes) -> bytes: tx = self.loadTx(tx_bytes) tx.wit.vtxinwit.clear() return tx.serialize() def extractLeaderSig(self, tx_bytes: bytes) -> bytes: tx = self.loadTx(tx_bytes) return tx.wit.vtxinwit[0].scriptWitness.stack[1] def extractFollowerSig(self, tx_bytes: bytes) -> bytes: tx = self.loadTx(tx_bytes) return tx.wit.vtxinwit[0].scriptWitness.stack[2] def createBLockTx(self, Kbs, output_amount, vkbv=None) -> bytes: tx = CTransaction() tx.nVersion = self.txVersion() p2wpkh_script_pk = self.getPkDest(Kbs) tx.vout.append(self.txoType()(output_amount, p2wpkh_script_pk)) return tx.serialize() def encodeSharedAddress(self, Kbv, Kbs): return self.pubkey_to_segwit_address(Kbs) def publishBLockTx(self, kbv, Kbs, output_amount, feerate, unlock_time: int = 0) -> bytes: b_lock_tx = self.createBLockTx(Kbs, output_amount) b_lock_tx = self.fundTx(b_lock_tx, feerate) b_lock_tx_id = self.getTxid(b_lock_tx) b_lock_tx = self.signTxWithWallet(b_lock_tx) return bytes.fromhex(self.publishTx(b_lock_tx)) def getTxVSize(self, tx, add_bytes: int = 0, add_witness_bytes: int = 0) -> int: wsf = self.witnessScaleFactor() len_full = len(tx.serialize_with_witness()) + add_bytes + add_witness_bytes len_nwit = len(tx.serialize_without_witness()) + add_bytes weight = len_nwit * (wsf - 1) + len_full return (weight + wsf - 1) // wsf def findTxB(self, kbv, Kbs, cb_swap_value, cb_block_confirmed, restore_height, bid_sender): dest_address = self.pubkey_to_segwit_address(Kbs) if self.using_segwit() else self.pubkey_to_address(Kbs) return self.getLockTxHeight(None, dest_address, cb_swap_value, restore_height) ''' raw_dest = self.getPkDest(Kbs) rv = self.scanTxOutset(raw_dest) for utxo in rv['unspents']: if 'height' in utxo and utxo['height'] > 0 and rv['height'] - utxo['height'] > cb_block_confirmed: if self.make_int(utxo['amount']) != cb_swap_value: self._log.warning('Found output to lock tx pubkey of incorrect value: %s', str(utxo['amount'])) else: return {'txid': utxo['txid'], 'vout': utxo['vout'], 'amount': utxo['amount'], 'height': utxo['height']} return None ''' def getBLockSpendTxFee(self, tx, fee_rate: int) -> int: witness_bytes = 109 vsize = self.getTxVSize(tx, add_witness_bytes=witness_bytes) pay_fee = round(fee_rate * vsize / 1000) self._log.info(f'BLockSpendTx fee_rate, vsize, fee: {fee_rate}, {vsize}, {pay_fee}.') return pay_fee def spendBLockTx(self, chain_b_lock_txid: bytes, address_to: str, kbv: bytes, kbs: bytes, cb_swap_value: int, b_fee: int, restore_height: int, lock_tx_vout=None) -> bytes: self._log.info('spendBLockTx %s:\n', chain_b_lock_txid.hex()) wtx = self.rpc_wallet('gettransaction', [chain_b_lock_txid.hex(), ]) lock_tx = self.loadTx(bytes.fromhex(wtx['hex'])) Kbs = self.getPubkey(kbs) script_pk = self.getPkDest(Kbs) locked_n = findOutput(lock_tx, script_pk) ensure(locked_n is not None, 'Output not found in tx') pkh_to = self.decodeAddress(address_to) tx = CTransaction() tx.nVersion = self.txVersion() script_lock = self.getScriptForPubkeyHash(Kbs) chain_b_lock_txid_int = b2i(chain_b_lock_txid) tx.vin.append(CTxIn(COutPoint(chain_b_lock_txid_int, locked_n), nSequence=0, scriptSig=self.getScriptScriptSig(script_lock))) tx.vout.append(self.txoType()(cb_swap_value, self.getScriptForPubkeyHash(pkh_to))) pay_fee = self.getBLockSpendTxFee(tx, b_fee) tx.vout[0].nValue = cb_swap_value - pay_fee b_lock_spend_tx = tx.serialize() b_lock_spend_tx = self.signTxWithKey(b_lock_spend_tx, kbs) return bytes.fromhex(self.publishTx(b_lock_spend_tx)) def importWatchOnlyAddress(self, address: str, label: str): self.rpc_wallet('importaddress', [address, label, False]) def isWatchOnlyAddress(self, address: str): addr_info = self.rpc_wallet('getaddressinfo', [address]) return addr_info['iswatchonly'] def getSCLockScriptAddress(self, lock_script: bytes) -> str: lock_tx_dest = self.getScriptDest(lock_script) return self.encodeScriptDest(lock_tx_dest) def getLockTxHeight(self, txid, dest_address, bid_amount, rescan_from, find_index: bool = False, vout: int = -1): # Add watchonly address and rescan if required if not self.isAddressMine(dest_address, or_watch_only=True): self.importWatchOnlyAddress(dest_address, 'bid') self._log.info('Imported watch-only addr: {}'.format(dest_address)) self._log.info('Rescanning {} chain from height: {}'.format(self.coin_name(), rescan_from)) self.rpc_wallet('rescanblockchain', [rescan_from]) return_txid = True if txid is None else False if txid is None: txns = self.rpc_wallet('listunspent', [0, 9999999, [dest_address, ]]) for tx in txns: if self.make_int(tx['amount']) == bid_amount: txid = bytes.fromhex(tx['txid']) break if txid is None: return None try: tx = self.rpc_wallet('gettransaction', [txid.hex()]) block_height = 0 if 'blockhash' in tx: block_header = self.rpc('getblockheader', [tx['blockhash']]) block_height = block_header['height'] rv = { 'depth': 0 if 'confirmations' not in tx else tx['confirmations'], 'height': block_height} except Exception as e: self._log.debug('getLockTxHeight gettransaction failed: %s, %s', txid.hex(), str(e)) return None if find_index: tx_obj = self.rpc('decoderawtransaction', [tx['hex']]) rv['index'] = find_vout_for_address_from_txobj(tx_obj, dest_address) if return_txid: rv['txid'] = txid.hex() return rv def getOutput(self, txid, dest_script, expect_value, xmr_swap=None): # TODO: Use getrawtransaction if txindex is active utxos = self.rpc('scantxoutset', ['start', ['raw({})'.format(dest_script.hex())]]) if 'height' in utxos: # chain_height not returned by v18 codebase chain_height = utxos['height'] else: chain_height = self.getChainHeight() rv = [] for utxo in utxos['unspents']: if txid and txid.hex() != utxo['txid']: continue if expect_value != self.make_int(utxo['amount']): continue rv.append({ 'depth': 0 if 'height' not in utxo else (chain_height - utxo['height']) + 1, 'height': 0 if 'height' not in utxo else utxo['height'], 'amount': self.make_int(utxo['amount']), 'txid': utxo['txid'], 'vout': utxo['vout']}) return rv, chain_height def withdrawCoin(self, value: float, addr_to: str, subfee: bool): params = [addr_to, value, '', '', subfee, True, self._conf_target] return self.rpc_wallet('sendtoaddress', params) def signCompact(self, k, message: str) -> bytes: message_hash = sha256(bytes(message, 'utf-8')) privkey = PrivateKey(k) return privkey.sign_recoverable(message_hash, hasher=None)[:64] def signRecoverable(self, k, message: str) -> bytes: message_hash = sha256(bytes(message, 'utf-8')) privkey = PrivateKey(k) return privkey.sign_recoverable(message_hash, hasher=None) def verifyCompactSig(self, K, message: str, sig) -> None: message_hash = sha256(bytes(message, 'utf-8')) pubkey = PublicKey(K) rv = pubkey.verify_compact(sig, message_hash, hasher=None) assert (rv is True) def verifySigAndRecover(self, sig, message: str) -> bytes: message_hash = sha256(bytes(message, 'utf-8')) pubkey = PublicKey.from_signature_and_message(sig, message_hash, hasher=None) return pubkey.format() def verifyMessage(self, address: str, message: str, signature: str, message_magic: str = None) -> bool: if message_magic is None: message_magic = self.chainparams()['message_magic'] message_bytes = SerialiseNumCompact(len(message_magic)) + bytes(message_magic, 'utf-8') + SerialiseNumCompact(len(message)) + bytes(message, 'utf-8') message_hash = sha256(sha256(message_bytes)) signature_bytes = base64.b64decode(signature) rec_id = (signature_bytes[0] - 27) & 3 signature_bytes = signature_bytes[1:] + bytes((rec_id,)) try: pubkey = PublicKey.from_signature_and_message(signature_bytes, message_hash, hasher=None) except Exception as e: self._log.info('verifyMessage failed: ' + str(e)) return False address_hash = self.decodeAddress(address) pubkey_hash = hash160(pubkey.format()) return True if address_hash == pubkey_hash else False def showLockTransfers(self, kbv, Kbs, restore_height): raise ValueError('Unimplemented') def getWitnessStackSerialisedLength(self, witness_stack): length = getCompactSizeLen(len(witness_stack)) for e in witness_stack: length += getWitnessElementLen(len(e)) # See core SerializeTransaction length += 1 # vinDummy length += 1 # flags return length def describeTx(self, tx_hex: str): return self.rpc('decoderawtransaction', [tx_hex]) def getSpendableBalance(self) -> int: return self.make_int(self.rpc_wallet('getbalances')['mine']['trusted']) def createUTXO(self, value_sats: int): # Create a new address and send value_sats to it spendable_balance = self.getSpendableBalance() if spendable_balance < value_sats: raise ValueError('Balance too low') address = self.getNewAddress(self._use_segwit, 'create_utxo') return self.withdrawCoin(self.format_amount(value_sats), address, False), address def createRawFundedTransaction(self, addr_to: str, amount: int, sub_fee: bool = False, lock_unspents: bool = True) -> str: txn = self.rpc('createrawtransaction', [[], {addr_to: self.format_amount(amount)}]) options = { 'lockUnspents': lock_unspents, 'conf_target': self._conf_target, } if sub_fee: options['subtractFeeFromOutputs'] = [0,] return self.rpc_wallet('fundrawtransaction', [txn, options])['hex'] def createRawSignedTransaction(self, addr_to, amount) -> str: txn_funded = self.createRawFundedTransaction(addr_to, amount) return self.rpc_wallet('signrawtransactionwithwallet', [txn_funded])['hex'] def getBlockWithTxns(self, block_hash: str): return self.rpc('getblock', [block_hash, 2]) def getUnspentsByAddr(self): unspent_addr = dict() unspent = self.rpc_wallet('listunspent') for u in unspent: if u.get('spendable', False) is False: continue if 'address' not in u: continue if 'desc' in u: desc = u['desc'] if self.using_segwit: if self.use_p2shp2wsh(): if not desc.startswith('sh(wpkh'): continue else: if not desc.startswith('wpkh'): continue else: if not desc.startswith('pkh'): continue unspent_addr[u['address']] = unspent_addr.get(u['address'], 0) + self.make_int(u['amount'], r=1) return unspent_addr def getUTXOBalance(self, address: str): num_blocks = self.rpc('getblockcount') sum_unspent = 0 self._log.debug('[rm] scantxoutset start') # scantxoutset is slow ro = self.rpc('scantxoutset', ['start', ['addr({})'.format(address)]]) # TODO: Use combo(address) where possible self._log.debug('[rm] scantxoutset end') for o in ro['unspents']: sum_unspent += self.make_int(o['amount']) return sum_unspent def getProofOfFunds(self, amount_for, extra_commit_bytes): # TODO: Lock unspent and use same output/s to fund bid unspent_addr = self.getUnspentsByAddr() sign_for_addr = None for addr, value in unspent_addr.items(): if value >= amount_for: sign_for_addr = addr break ensure(sign_for_addr is not None, 'Could not find address with enough funds for proof') self._log.debug('sign_for_addr %s', sign_for_addr) if self.using_segwit(): # TODO: Use isSegwitAddress when scantxoutset can use combo # 'Address does not refer to key' for non p2pkh pkh = self.decodeAddress(sign_for_addr) sign_for_addr = self.pkh_to_address(pkh) self._log.debug('sign_for_addr converted %s', sign_for_addr) signature = self.rpc_wallet('signmessage', [sign_for_addr, sign_for_addr + '_swap_proof_' + extra_commit_bytes.hex()]) prove_utxos = [] # TODO: Send specific utxos return (sign_for_addr, signature, prove_utxos) def encodeProofUtxos(self, proof_utxos): packed_utxos = bytes() for utxo in proof_utxos: packed_utxos += utxo[0] + utxo[1].to_bytes(2, 'big') return packed_utxos def decodeProofUtxos(self, msg_utxos): proof_utxos = [] if len(msg_utxos) > 0: num_utxos = len(msg_utxos) // 34 p: int = 0 for i in range(num_utxos): proof_utxos.append((msg_utxos[p: p + 32], int.from_bytes(msg_utxos[p + 32: p + 34], 'big'))) p += 34 return proof_utxos def verifyProofOfFunds(self, address, signature, utxos, extra_commit_bytes): passed = self.verifyMessage(address, address + '_swap_proof_' + extra_commit_bytes.hex(), signature) ensure(passed is True, 'Proof of funds signature invalid') if self.using_segwit(): address = self.encodeSegwitAddress(decodeAddress(address)[1:]) return self.getUTXOBalance(address) def isWalletEncrypted(self) -> bool: wallet_info = self.rpc_wallet('getwalletinfo') return 'unlocked_until' in wallet_info def isWalletLocked(self) -> bool: wallet_info = self.rpc_wallet('getwalletinfo') if 'unlocked_until' in wallet_info and wallet_info['unlocked_until'] <= 0: return True return False def isWalletEncryptedLocked(self) -> (bool, bool): wallet_info = self.rpc_wallet('getwalletinfo') encrypted = 'unlocked_until' in wallet_info locked = encrypted and wallet_info['unlocked_until'] <= 0 return encrypted, locked def changeWalletPassword(self, old_password: str, new_password: str): self._log.info('changeWalletPassword - {}'.format(self.ticker())) if old_password == '': if self.isWalletEncrypted(): raise ValueError('Old password must be set') return self.rpc_wallet('encryptwallet', [new_password]) self.rpc_wallet('walletpassphrasechange', [old_password, new_password]) def unlockWallet(self, password: str): if password == '': return self._log.info('unlockWallet - {}'.format(self.ticker())) if self.coin_type() == Coins.BTC: # Recreate wallet if none found # Required when encrypting an existing btc wallet, workaround is to delete the btc wallet and recreate wallets = self.rpc('listwallets') if len(wallets) < 1: self._log.info('Creating wallet.dat for {}.'.format(self.coin_name())) # wallet_name, disable_private_keys, blank, passphrase, avoid_reuse, descriptors self.rpc('createwallet', ['wallet.dat', False, True, '', False, False]) self.rpc_wallet('encryptwallet', [password]) # Max timeout value, ~3 years self.rpc_wallet('walletpassphrase', [password, 100000000]) self._sc.checkWalletSeed(self.coin_type()) def lockWallet(self): self._log.info('lockWallet - {}'.format(self.ticker())) self.rpc_wallet('walletlock') def get_p2sh_script_pubkey(self, script: bytearray) -> bytearray: script_hash = hash160(script) assert len(script_hash) == 20 return CScript([OP_HASH160, script_hash, OP_EQUAL]) def get_p2wsh_script_pubkey(self, script: bytearray) -> bytearray: return CScript([OP_0, sha256(script)]) def findTxnByHash(self, txid_hex: str): # Only works for wallet txns try: rv = self.rpc_wallet('gettransaction', [txid_hex]) except Exception as ex: self._log.debug('findTxnByHash getrawtransaction failed: {}'.format(txid_hex)) return None if 'confirmations' in rv and rv['confirmations'] >= self.blocks_confirmed: return {'txid': txid_hex, 'amount': 0, 'height': rv['blockheight']} return None def createRedeemTxn(self, prevout, output_addr: str, output_value: int, txn_script: bytes = None) -> str: tx = CTransaction() tx.nVersion = self.txVersion() prev_txid = b2i(bytes.fromhex(prevout['txid'])) tx.vin.append(CTxIn(COutPoint(prev_txid, prevout['vout']))) pkh = self.decodeAddress(output_addr) script = self.getScriptForPubkeyHash(pkh) tx.vout.append(self.txoType()(output_value, script)) tx.rehash() return tx.serialize().hex() def createRefundTxn(self, prevout, output_addr: str, output_value: int, locktime: int, sequence: int, txn_script: bytes = None) -> str: tx = CTransaction() tx.nVersion = self.txVersion() tx.nLockTime = locktime prev_txid = b2i(bytes.fromhex(prevout['txid'])) tx.vin.append(CTxIn(COutPoint(prev_txid, prevout['vout']), nSequence=sequence,)) pkh = self.decodeAddress(output_addr) script = self.getScriptForPubkeyHash(pkh) tx.vout.append(self.txoType()(output_value, script)) tx.rehash() return tx.serialize().hex() def ensureFunds(self, amount: int) -> None: if self.getSpendableBalance() < amount: raise ValueError('Balance too low') def getHTLCSpendTxVSize(self, redeem: bool = True) -> int: tx_vsize = 5 # Add a few bytes, sequence in script takes variable amount of bytes if self.using_segwit(): tx_vsize += 143 if redeem else 134 else: tx_vsize += 323 if redeem else 287 return tx_vsize def find_prevout_info(self, txn_hex: str, txn_script: bytes): txjs = self.rpc('decoderawtransaction', [txn_hex]) if self.using_segwit(): p2wsh = self.getScriptDest(txn_script) n = getVoutByScriptPubKey(txjs, p2wsh.hex()) else: addr_to = self.encode_p2sh(txn_script) n = getVoutByAddress(txjs, addr_to) return { 'txid': txjs['txid'], 'vout': n, 'scriptPubKey': txjs['vout'][n]['scriptPubKey']['hex'], 'redeemScript': txn_script.hex(), 'amount': txjs['vout'][n]['value'] } def isTxExistsError(self, err_str: str) -> bool: return 'Transaction already in block chain' in err_str def isTxNonFinalError(self, err_str: str) -> bool: return 'non-BIP68-final' in err_str or 'non-final' in err_str def testBTCInterface(): print('TODO: testBTCInterface') if __name__ == "__main__": testBTCInterface()