210 lines
7.8 KiB
Python
210 lines
7.8 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (c) 2022 tecnovert
|
|
# Distributed under the MIT software license, see the accompanying
|
|
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
|
|
|
|
import hashlib
|
|
from .btc import BTCInterface, find_vout_for_address_from_txobj
|
|
from basicswap.chainparams import Coins
|
|
|
|
from basicswap.util.address import decodeAddress
|
|
from basicswap.contrib.test_framework.script import (
|
|
CScript,
|
|
OP_0,
|
|
OP_DUP,
|
|
OP_EQUAL,
|
|
OP_HASH160,
|
|
OP_CHECKSIG,
|
|
OP_EQUALVERIFY,
|
|
hash160,
|
|
)
|
|
from basicswap.contrib.test_framework.messages import (
|
|
CTransaction,
|
|
)
|
|
|
|
|
|
class FIROInterface(BTCInterface):
|
|
@staticmethod
|
|
def coin_type():
|
|
return Coins.FIRO
|
|
|
|
def getExchangeName(self, exchange_name):
|
|
return 'zcoin'
|
|
|
|
def initialiseWallet(self, key):
|
|
# load with -hdseed= parameter
|
|
pass
|
|
|
|
def getNewAddress(self, use_segwit, label='swap_receive'):
|
|
return self.rpc_callback('getnewaddress', [label])
|
|
# addr_plain = self.rpc_callback('getnewaddress', [label])
|
|
# return self.rpc_callback('addwitnessaddress', [addr_plain])
|
|
|
|
def decodeAddress(self, address):
|
|
return decodeAddress(address)[1:]
|
|
|
|
def encodeSegwitAddress(self, script):
|
|
raise ValueError('TODO')
|
|
|
|
def decodeSegwitAddress(self, addr):
|
|
raise ValueError('TODO')
|
|
|
|
def isWatchOnlyAddress(self, address):
|
|
addr_info = self.rpc_callback('validateaddress', [address])
|
|
return addr_info['iswatchonly']
|
|
|
|
def isAddressMine(self, address: str, or_watch_only: bool = False) -> bool:
|
|
addr_info = self.rpc_callback('validateaddress', [address])
|
|
if not or_watch_only:
|
|
return addr_info['ismine']
|
|
return addr_info['ismine'] or addr_info['iswatchonly']
|
|
|
|
def getSCLockScriptAddress(self, lock_script):
|
|
lock_tx_dest = self.getScriptDest(lock_script)
|
|
address = self.encodeScriptDest(lock_tx_dest)
|
|
|
|
if not self.isAddressMine(address, or_watch_only=True):
|
|
# Expects P2WSH nested in BIP16_P2SH
|
|
ro = self.rpc_callback('importaddress', [lock_tx_dest.hex(), 'bid lock', False, True])
|
|
addr_info = self.rpc_callback('validateaddress', [address])
|
|
|
|
return address
|
|
|
|
def getLockTxHeightFiro(self, txid, lock_script, bid_amount, rescan_from, find_index=False):
|
|
# Add watchonly address and rescan if required
|
|
lock_tx_dest = self.getScriptDest(lock_script)
|
|
dest_address = self.encodeScriptDest(lock_tx_dest)
|
|
if not self.isAddressMine(dest_address, or_watch_only=True):
|
|
self.rpc_callback('importaddress', [lock_tx_dest.hex(), 'bid lock', False, True])
|
|
self._log.info('Imported watch-only addr: {}'.format(dest_address))
|
|
self._log.info('Rescanning {} chain from height: {}'.format(self.coin_name(), rescan_from))
|
|
self.rpc_callback('rescanblockchain', [rescan_from])
|
|
|
|
return_txid = True if txid is None else False
|
|
if txid is None:
|
|
txns = self.rpc_callback('listunspent', [0, 9999999, [dest_address, ]])
|
|
|
|
for tx in txns:
|
|
if self.make_int(tx['amount']) == bid_amount:
|
|
txid = bytes.fromhex(tx['txid'])
|
|
break
|
|
|
|
if txid is None:
|
|
return None
|
|
|
|
try:
|
|
tx = self.rpc_callback('gettransaction', [txid.hex()])
|
|
|
|
block_height = 0
|
|
if 'blockhash' in tx:
|
|
block_header = self.rpc_callback('getblockheader', [tx['blockhash']])
|
|
block_height = block_header['height']
|
|
|
|
rv = {
|
|
'depth': 0 if 'confirmations' not in tx else tx['confirmations'],
|
|
'height': block_height}
|
|
|
|
except Exception as e:
|
|
self._log.debug('getLockTxHeight gettransaction failed: %s, %s', txid.hex(), str(e))
|
|
return None
|
|
|
|
if find_index:
|
|
tx_obj = self.rpc_callback('decoderawtransaction', [tx['hex']])
|
|
rv['index'] = find_vout_for_address_from_txobj(tx_obj, dest_address)
|
|
|
|
if return_txid:
|
|
rv['txid'] = txid.hex()
|
|
|
|
return rv
|
|
|
|
def createSCLockTx(self, value: int, script: bytearray, vkbv: bytes = None) -> bytes:
|
|
tx = CTransaction()
|
|
tx.nVersion = self.txVersion()
|
|
tx.vout.append(self.txoType()(value, self.getScriptDest(script)))
|
|
|
|
return tx.serialize()
|
|
|
|
def fundSCLockTx(self, tx_bytes, feerate, vkbv=None):
|
|
return self.fundTx(tx_bytes, feerate)
|
|
|
|
def signTxWithWallet(self, tx):
|
|
rv = self.rpc_callback('signrawtransaction', [tx.hex()])
|
|
return bytes.fromhex(rv['hex'])
|
|
|
|
def createRawFundedTransaction(self, addr_to: str, amount: int, sub_fee: bool = False, lock_unspents: bool = True) -> str:
|
|
txn = self.rpc_callback('createrawtransaction', [[], {addr_to: self.format_amount(amount)}])
|
|
fee_rate, fee_src = self.get_fee_rate(self._conf_target)
|
|
self._log.debug(f'Fee rate: {fee_rate}, source: {fee_src}, block target: {self._conf_target}')
|
|
options = {
|
|
'lockUnspents': lock_unspents,
|
|
'feeRate': fee_rate,
|
|
}
|
|
if sub_fee:
|
|
options['subtractFeeFromOutputs'] = [0,]
|
|
return self.rpc_callback('fundrawtransaction', [txn, options])['hex']
|
|
|
|
def createRawSignedTransaction(self, addr_to, amount) -> str:
|
|
txn_funded = self.createRawFundedTransaction(addr_to, amount)
|
|
return self.rpc_callback('signrawtransaction', [txn_funded])['hex']
|
|
|
|
def getScriptForPubkeyHash(self, pkh: bytes) -> bytearray:
|
|
# Return P2PKH
|
|
return CScript([OP_DUP, OP_HASH160, pkh, OP_EQUALVERIFY, OP_CHECKSIG])
|
|
|
|
def getScriptDest(self, script: bytearray) -> bytearray:
|
|
# P2WSH nested in BIP16_P2SH
|
|
|
|
script_hash = hashlib.sha256(script).digest()
|
|
assert len(script_hash) == 32
|
|
script_hash_hash = hash160(script_hash)
|
|
assert len(script_hash_hash) == 20
|
|
|
|
return CScript([OP_HASH160, script_hash_hash, OP_EQUAL])
|
|
|
|
def getSeedHash(self, seed: bytes) -> bytes:
|
|
return hash160(seed)[::-1]
|
|
|
|
def encodeScriptDest(self, script_dest: bytes) -> str:
|
|
# Extract hash from script
|
|
script_hash = script_dest[2:-1]
|
|
return self.sh_to_address(script_hash)
|
|
|
|
def getScriptScriptSig(self, script: bytes) -> bytearray:
|
|
return CScript([OP_0, hashlib.sha256(script).digest()])
|
|
|
|
def withdrawCoin(self, value, addr_to, subfee):
|
|
params = [addr_to, value, '', '', subfee]
|
|
return self.rpc_callback('sendtoaddress', params)
|
|
|
|
def getWalletSeedID(self):
|
|
return self.rpc_callback('getwalletinfo')['hdmasterkeyid']
|
|
|
|
def getSpendableBalance(self) -> int:
|
|
return self.make_int(self.rpc_callback('getwalletinfo')['balance'])
|
|
|
|
def getBLockSpendTxFee(self, tx, fee_rate: int) -> int:
|
|
add_bytes = 107
|
|
size = len(tx.serialize_with_witness()) + add_bytes
|
|
pay_fee = round(fee_rate * size / 1000)
|
|
self._log.info(f'BLockSpendTx fee_rate, size, fee: {fee_rate}, {size}, {pay_fee}.')
|
|
return pay_fee
|
|
|
|
def signTxWithKey(self, tx: bytes, key: bytes) -> bytes:
|
|
key_wif = self.encodeKey(key)
|
|
rv = self.rpc_callback('signrawtransaction', [tx.hex(), [], [key_wif, ]])
|
|
return bytes.fromhex(rv['hex'])
|
|
|
|
def findTxnByHash(self, txid_hex: str):
|
|
# Only works for wallet txns
|
|
try:
|
|
rv = self.rpc_callback('gettransaction', [txid_hex])
|
|
except Exception as ex:
|
|
self._log.debug('findTxnByHash getrawtransaction failed: {}'.format(txid_hex))
|
|
return None
|
|
if 'confirmations' in rv and rv['confirmations'] >= self.blocks_confirmed:
|
|
block_height = self.getBlockHeader(rv['blockhash'])['height']
|
|
return {'txid': txid_hex, 'amount': 0, 'height': block_height}
|
|
return None
|