basicswap_miserver/tests/basicswap/test_other.py

474 lines
17 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2019-2024 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import hashlib
import random
import secrets
import unittest
import basicswap.contrib.ed25519_fast as edf
import basicswap.ed25519_fast_util as edu
from coincurve.ed25519 import ed25519_get_pubkey
from coincurve.ecdsaotves import (
ecdsaotves_enc_sign,
ecdsaotves_enc_verify,
ecdsaotves_dec_sig,
ecdsaotves_rec_enc_key)
from coincurve.keys import (
PrivateKey)
from basicswap.util import i2b, h2b
from basicswap.util.integer import encode_varint, decode_varint
from basicswap.util.crypto import ripemd160, hash160
from basicswap.util.network import is_private_ip_address
from basicswap.util.rfc2440 import rfc2440_hash_password
from basicswap.util_xmr import encode_address as xmr_encode_address
from basicswap.interface.btc import BTCInterface
from basicswap.interface.xmr import XMRInterface
from basicswap.basicswap_util import (
TxLockTypes)
from basicswap.util import (
make_int,
SerialiseNum,
format_amount,
DeserialiseNum,
validate_amount)
from basicswap.messages_pb2 import (
BidMessage,
BidMessage_test,
)
from basicswap.contrib.test_framework.script import hash160 as hash160_btc
class Test(unittest.TestCase):
REQUIRED_SETTINGS = {'blocks_confirmed': 1, 'conf_target': 1, 'use_segwit': True, 'connection_type': 'rpc'}
def test_serialise_num(self):
def test_case(v, nb=None):
b = SerialiseNum(v)
if nb is not None:
assert (len(b) == nb)
assert (v == DeserialiseNum(b))
test_case(0, 1)
test_case(1, 1)
test_case(16, 1)
test_case(-1, 2)
test_case(17, 2)
test_case(500)
test_case(-500)
test_case(4194642)
def test_sequence(self):
coin_settings = {'rpcport': 0, 'rpcauth': 'none'}
coin_settings.update(self.REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, 'regtest')
time_val = 48 * 60 * 60
encoded = ci.getExpectedSequence(TxLockTypes.SEQUENCE_LOCK_TIME, time_val)
decoded = ci.decodeSequence(encoded)
assert (decoded >= time_val)
assert (decoded <= time_val + 512)
time_val = 24 * 60
encoded = ci.getExpectedSequence(TxLockTypes.SEQUENCE_LOCK_TIME, time_val)
decoded = ci.decodeSequence(encoded)
assert (decoded >= time_val)
assert (decoded <= time_val + 512)
blocks_val = 123
encoded = ci.getExpectedSequence(TxLockTypes.SEQUENCE_LOCK_BLOCKS, blocks_val)
decoded = ci.decodeSequence(encoded)
assert (decoded == blocks_val)
def test_make_int(self):
def test_case(vs, vf, expect_int):
i = make_int(vs)
assert (i == expect_int and isinstance(i, int))
i = make_int(vf)
assert (i == expect_int and isinstance(i, int))
vs_out = format_amount(i, 8)
# Strip
for i in range(7):
if vs_out[-1] == '0':
vs_out = vs_out[:-1]
if '.' in vs:
assert (vs_out == vs)
else:
assert (vs_out[:-2] == vs)
test_case('0', 0, 0)
test_case('1', 1, 100000000)
test_case('10', 10, 1000000000)
test_case('0.00899999', 0.00899999, 899999)
test_case('899999.0', 899999.0, 89999900000000)
test_case('899999.00899999', 899999.00899999, 89999900899999)
test_case('0.0', 0.0, 0)
test_case('1.0', 1.0, 100000000)
test_case('1.1', 1.1, 110000000)
test_case('1.2', 1.2, 120000000)
test_case('0.00899991', 0.00899991, 899991)
test_case('0.0089999', 0.0089999, 899990)
test_case('0.0089991', 0.0089991, 899910)
test_case('0.123', 0.123, 12300000)
test_case('123000.000123', 123000.000123, 12300000012300)
try:
make_int('0.123456789')
assert (False)
except Exception as e:
assert (str(e) == 'Mantissa too long')
validate_amount('0.12345678')
# floor
assert (make_int('0.123456789', r=-1) == 12345678)
# Round up
assert (make_int('0.123456789', r=1) == 12345679)
def test_make_int12(self):
def test_case(vs, vf, expect_int):
i = make_int(vs, 12)
assert (i == expect_int and isinstance(i, int))
i = make_int(vf, 12)
assert (i == expect_int and isinstance(i, int))
vs_out = format_amount(i, 12)
# Strip
for i in range(7):
if vs_out[-1] == '0':
vs_out = vs_out[:-1]
if '.' in vs:
assert (vs_out == vs)
else:
assert (vs_out[:-2] == vs)
test_case('0.123456789', 0.123456789, 123456789000)
test_case('0.123456789123', 0.123456789123, 123456789123)
try:
make_int('0.1234567891234', 12)
assert (False)
except Exception as e:
assert (str(e) == 'Mantissa too long')
validate_amount('0.123456789123', 12)
try:
validate_amount('0.1234567891234', 12)
assert (False)
except Exception as e:
assert ('Too many decimal places' in str(e))
try:
validate_amount(0.1234567891234, 12)
assert (False)
except Exception as e:
assert ('Too many decimal places' in str(e))
def test_ed25519(self):
privkey = edu.get_secret()
pubkey = edu.encodepoint(edf.scalarmult_B(privkey))
privkey_bytes = i2b(privkey)
pubkey_test = ed25519_get_pubkey(privkey_bytes)
assert (pubkey == pubkey_test)
def test_ecdsa_otves(self):
coin_settings = {'rpcport': 0, 'rpcauth': 'none'}
coin_settings.update(self.REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, 'regtest')
vk_sign = ci.getNewSecretKey()
vk_encrypt = ci.getNewSecretKey()
pk_sign = ci.getPubkey(vk_sign)
pk_encrypt = ci.getPubkey(vk_encrypt)
sign_hash = secrets.token_bytes(32)
cipher_text = ecdsaotves_enc_sign(vk_sign, pk_encrypt, sign_hash)
assert (ecdsaotves_enc_verify(pk_sign, pk_encrypt, sign_hash, cipher_text))
sig = ecdsaotves_dec_sig(vk_encrypt, cipher_text)
assert (ci.verifySig(pk_sign, sign_hash, sig))
recovered_key = ecdsaotves_rec_enc_key(pk_encrypt, cipher_text, sig)
assert (vk_encrypt == recovered_key)
def test_sign(self):
coin_settings = {'rpcport': 0, 'rpcauth': 'none'}
coin_settings.update(self.REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, 'regtest')
vk = ci.getNewSecretKey()
pk = ci.getPubkey(vk)
message = 'test signing message'
message_hash = hashlib.sha256(bytes(message, 'utf-8')).digest()
eck = PrivateKey(vk)
sig = eck.sign(message.encode('utf-8'))
ci.verifySig(pk, message_hash, sig)
def test_sign_compact(self):
coin_settings = {'rpcport': 0, 'rpcauth': 'none'}
coin_settings.update(self.REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, 'regtest')
vk = ci.getNewSecretKey()
pk = ci.getPubkey(vk)
sig = ci.signCompact(vk, 'test signing message')
assert (len(sig) == 64)
ci.verifyCompactSig(pk, 'test signing message', sig)
# Nonce is set deterministically (using default libsecp256k1 method rfc6979)
sig2 = ci.signCompact(vk, 'test signing message')
assert (sig == sig2)
def test_sign_recoverable(self):
coin_settings = {'rpcport': 0, 'rpcauth': 'none'}
coin_settings.update(self.REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, 'regtest')
vk = ci.getNewSecretKey()
pk = ci.getPubkey(vk)
sig = ci.signRecoverable(vk, 'test signing message')
assert (len(sig) == 65)
pk_rec = ci.verifySigAndRecover(sig, 'test signing message')
assert (pk == pk_rec)
# Nonce is set deterministically (using default libsecp256k1 method rfc6979)
sig2 = ci.signRecoverable(vk, 'test signing message')
assert (sig == sig2)
def test_pubkey_to_address(self):
coin_settings = {'rpcport': 0, 'rpcauth': 'none'}
coin_settings.update(self.REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, 'regtest')
pk = h2b('02c26a344e7d21bcc6f291532679559f2fd234c881271ff98714855edc753763a6')
addr = ci.pubkey_to_address(pk)
assert (addr == 'mj6SdSxmWRmdDqR5R3FfZmRiLmQfQAsLE8')
def test_dleag(self):
coin_settings = {'rpcport': 0, 'walletrpcport': 0, 'walletrpcauth': 'none'}
coin_settings.update(self.REQUIRED_SETTINGS)
ci = XMRInterface(coin_settings, 'regtest')
key = ci.getNewSecretKey()
proof = ci.proveDLEAG(key)
assert (ci.verifyDLEAG(proof))
def test_rate(self):
scale_from = 8
scale_to = 12
amount_from = make_int(100, scale_from)
rate = make_int(0.1, scale_to)
amount_to = int((amount_from * rate) // (10 ** scale_from))
assert ('100.00000000' == format_amount(amount_from, scale_from))
assert ('10.000000000000' == format_amount(amount_to, scale_to))
rate_check = make_int((amount_to / amount_from), scale_from)
assert (rate == rate_check)
scale_from = 12
scale_to = 8
amount_from = make_int(1, scale_from)
rate = make_int(12, scale_to)
amount_to = int((amount_from * rate) // (10 ** scale_from))
assert ('1.000000000000' == format_amount(amount_from, scale_from))
assert ('12.00000000' == format_amount(amount_to, scale_to))
rate_check = make_int((amount_to / amount_from), scale_from)
assert (rate == rate_check)
scale_from = 8
scale_to = 8
amount_from = make_int(0.073, scale_from)
amount_to = make_int(10, scale_to)
rate = make_int(amount_to / amount_from, scale_to, r=1)
amount_to_recreate = int((amount_from * rate) // (10 ** scale_from))
assert ('10.00000000' == format_amount(amount_to_recreate, scale_to))
scale_from = 8
scale_to = 12
amount_from = make_int(10.0, scale_from)
amount_to = make_int(0.06935, scale_to)
rate = make_int(amount_to / amount_from, scale_from, r=1)
amount_to_recreate = int((amount_from * rate) // (10 ** scale_from))
assert ('0.069350000000' == format_amount(amount_to_recreate, scale_to))
scale_from = 12
scale_to = 8
amount_from = make_int(0.06935, scale_from)
amount_to = make_int(10.0, scale_to)
rate = make_int(amount_to / amount_from, scale_from, r=1)
amount_to_recreate = int((amount_from * rate) // (10 ** scale_from))
assert ('10.00000000' == format_amount(amount_to_recreate, scale_to))
coin_settings = {'rpcport': 0, 'rpcauth': 'none', 'walletrpcport': 0, 'walletrpcauth': 'none'}
coin_settings.update(self.REQUIRED_SETTINGS)
ci_xmr = XMRInterface(coin_settings, 'regtest')
ci_btc = BTCInterface(coin_settings, 'regtest')
for i in range(10000):
test_pairs = random.randint(0, 3)
if test_pairs == 0:
ci_from = ci_btc
ci_to = ci_xmr
elif test_pairs == 1:
ci_from = ci_xmr
ci_to = ci_btc
elif test_pairs == 2:
ci_from = ci_xmr
ci_to = ci_xmr
else:
ci_from = ci_btc
ci_to = ci_btc
test_range = random.randint(0, 5)
if test_range == 0:
amount_from = random.randint(10000, 1 * ci_from.COIN())
elif test_range == 1:
amount_from = random.randint(10000, 1000 * ci_from.COIN())
elif test_range == 2:
amount_from = random.randint(10000, 2100 * ci_from.COIN())
elif test_range == 3:
amount_from = random.randint(10000, 210000 * ci_from.COIN())
elif test_range == 4:
amount_from = random.randint(10000, 21000000 * ci_from.COIN())
else:
amount_from = random.randint(10000, 2100000000 * ci_from.COIN())
test_range = random.randint(0, 5)
if test_range == 0:
amount_to = random.randint(10000, 1 * ci_to.COIN())
elif test_range == 1:
amount_to = random.randint(10000, 1000 * ci_to.COIN())
elif test_range == 2:
amount_to = random.randint(10000, 2100 * ci_to.COIN())
elif test_range == 3:
amount_to = random.randint(10000, 210000 * ci_to.COIN())
elif test_range == 4:
amount_to = random.randint(10000, 21000000 * ci_to.COIN())
else:
amount_to = random.randint(10000, 2100000000 * ci_to.COIN())
offer_rate = ci_from.make_int(amount_to / amount_from, r=1)
amount_to_from_rate: int = int((int(amount_from) * offer_rate) // (10 ** scale_from))
scale_from = 24
offer_rate = make_int(amount_to, scale_from) // amount_from
amount_to_from_rate: int = int((int(amount_from) * offer_rate) // (10 ** scale_from))
if abs(amount_to - amount_to_from_rate) == 1:
offer_rate += 1
offer_rate_human_read: int = int(offer_rate // (10 ** (scale_from - ci_from.exp())))
amount_to_from_rate: int = int((int(amount_from) * offer_rate) // (10 ** scale_from))
if amount_to != amount_to_from_rate:
print('from exp, amount', ci_from.exp(), amount_from)
print('to exp, amount', ci_to.exp(), amount_to)
print('amount_to_from_rate', amount_to_from_rate)
raise ValueError('Bad amount_to')
scale_to = 24
reversed_rate = make_int(amount_from, scale_to) // amount_to
amount_from_from_rate: int = int((int(amount_to) * reversed_rate) // (10 ** scale_to))
if abs(amount_from - amount_from_from_rate) == 1:
reversed_rate += 1
amount_from_from_rate: int = int((int(amount_to) * reversed_rate) // (10 ** scale_to))
if amount_from != amount_from_from_rate:
print('from exp, amount', ci_from.exp(), amount_from)
print('to exp, amount', ci_to.exp(), amount_to)
print('amount_from_from_rate', amount_from_from_rate)
raise ValueError('Bad amount_from')
def test_rfc2440(self):
password = 'test'
salt = bytes.fromhex('B7A94A7E4988630E')
password_hash = rfc2440_hash_password(password, salt=salt)
assert (password_hash == '16:B7A94A7E4988630E6095334BA67F06FBA509B2A7136A04C9C1B430F539')
def test_ripemd160(self):
input_data = b'hash this'
assert (ripemd160(input_data).hex() == 'd5443a154f167e2c1332f6de72cfb4c6ab9c8c17')
def test_hash160(self):
# hash160 is RIPEMD(SHA256(data))
input_data = b'hash this'
assert (hash160(input_data).hex() == '072985b3583a4a71f548494a5e1d5f6b00d0fe13')
assert (hash160_btc(input_data).hex() == '072985b3583a4a71f548494a5e1d5f6b00d0fe13')
def test_protobuf(self):
# Ensure old protobuf templates can be read
msg_buf = BidMessage_test()
msg_buf.protocol_version = 2
msg_buf.time_valid = 1024
serialised_msg = msg_buf.SerializeToString()
msg_buf_v2 = BidMessage()
msg_buf_v2.ParseFromString(serialised_msg)
assert (msg_buf_v2.protocol_version == 2)
assert (msg_buf_v2.time_valid == 1024)
# Decode only the first field
msg_buf_v2.ParseFromString(serialised_msg[:2])
assert (msg_buf_v2.protocol_version == 2)
assert (msg_buf_v2.time_valid == 0)
def test_is_private_ip_address(self):
assert (is_private_ip_address('localhost'))
assert (is_private_ip_address('127.0.0.1'))
assert (is_private_ip_address('10.0.0.0'))
assert (is_private_ip_address('172.16.0.0'))
assert (is_private_ip_address('192.168.0.0'))
assert (is_private_ip_address('20.87.245.0') is False)
assert (is_private_ip_address('particl.io') is False)
def test_varint(self):
def test_case(i, expect_length):
b = encode_varint(i)
assert (len(b) == expect_length)
assert (decode_varint(b) == i)
test_case(0, 1)
test_case(1, 1)
test_case(127, 1)
test_case(128, 2)
test_case(253, 2)
test_case(8321, 2)
test_case(16383, 2)
test_case(16384, 3)
test_case(2097151, 3)
test_case(2097152, 4)
def test_base58(self):
kv = edu.get_secret()
Kv = edu.encodepoint(edf.scalarmult_B(kv))
ks = edu.get_secret()
Ks = edu.encodepoint(edf.scalarmult_B(ks))
addr = xmr_encode_address(Kv, Ks)
assert (addr.startswith('4'))
addr = xmr_encode_address(Kv, Ks, 4146)
assert (addr.startswith('Wo'))
if __name__ == '__main__':
unittest.main()