Host-customized fork of https://github.com/tecnovert/basicswap/
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
531 lines
19 KiB
531 lines
19 KiB
#!/usr/bin/env python3 |
|
# -*- coding: utf-8 -*- |
|
|
|
# Copyright (c) 2019-2024 tecnovert |
|
# Distributed under the MIT software license, see the accompanying |
|
# file LICENSE or http://www.opensource.org/licenses/mit-license.php. |
|
|
|
import hashlib |
|
import random |
|
import secrets |
|
import unittest |
|
|
|
import basicswap.contrib.ed25519_fast as edf |
|
import basicswap.ed25519_fast_util as edu |
|
|
|
from coincurve.ed25519 import ed25519_get_pubkey |
|
from coincurve.ecdsaotves import ( |
|
ecdsaotves_enc_sign, |
|
ecdsaotves_enc_verify, |
|
ecdsaotves_dec_sig, |
|
ecdsaotves_rec_enc_key) |
|
from coincurve.keys import ( |
|
PrivateKey) |
|
|
|
from basicswap.contrib.mnemonic import Mnemonic |
|
from basicswap.util import i2b, h2b |
|
from basicswap.util.address import decodeAddress |
|
from basicswap.util.crypto import ripemd160, hash160, blake256 |
|
from basicswap.util.extkey import ExtKeyPair |
|
from basicswap.util.integer import encode_varint, decode_varint |
|
from basicswap.util.network import is_private_ip_address |
|
from basicswap.util.rfc2440 import rfc2440_hash_password |
|
from basicswap.util_xmr import encode_address as xmr_encode_address |
|
from basicswap.interface.btc import BTCInterface |
|
from basicswap.interface.xmr import XMRInterface |
|
from tests.basicswap.mnemonics import mnemonics |
|
from tests.basicswap.util import REQUIRED_SETTINGS |
|
|
|
from basicswap.basicswap_util import ( |
|
TxLockTypes) |
|
from basicswap.util import ( |
|
make_int, |
|
SerialiseNum, |
|
format_amount, |
|
DeserialiseNum, |
|
validate_amount) |
|
from basicswap.messages_npb import ( |
|
BidMessage, |
|
) |
|
from basicswap.contrib.test_framework.script import hash160 as hash160_btc |
|
|
|
|
|
class Test(unittest.TestCase): |
|
|
|
def test_serialise_num(self): |
|
def test_case(v, nb=None): |
|
b = SerialiseNum(v) |
|
if nb is not None: |
|
assert (len(b) == nb) |
|
assert (v == DeserialiseNum(b)) |
|
test_case(0, 1) |
|
test_case(1, 1) |
|
test_case(16, 1) |
|
|
|
test_case(-1, 2) |
|
test_case(17, 2) |
|
|
|
test_case(500) |
|
test_case(-500) |
|
test_case(4194642) |
|
|
|
def test_sequence(self): |
|
coin_settings = {'rpcport': 0, 'rpcauth': 'none'} |
|
coin_settings.update(REQUIRED_SETTINGS) |
|
|
|
ci = BTCInterface(coin_settings, 'regtest') |
|
|
|
time_val = 48 * 60 * 60 |
|
encoded = ci.getExpectedSequence(TxLockTypes.SEQUENCE_LOCK_TIME, time_val) |
|
decoded = ci.decodeSequence(encoded) |
|
assert (decoded >= time_val) |
|
assert (decoded <= time_val + 512) |
|
|
|
time_val = 24 * 60 |
|
encoded = ci.getExpectedSequence(TxLockTypes.SEQUENCE_LOCK_TIME, time_val) |
|
decoded = ci.decodeSequence(encoded) |
|
assert (decoded >= time_val) |
|
assert (decoded <= time_val + 512) |
|
|
|
blocks_val = 123 |
|
encoded = ci.getExpectedSequence(TxLockTypes.SEQUENCE_LOCK_BLOCKS, blocks_val) |
|
decoded = ci.decodeSequence(encoded) |
|
assert (decoded == blocks_val) |
|
|
|
def test_make_int(self): |
|
def test_case(vs, vf, expect_int): |
|
i = make_int(vs) |
|
assert (i == expect_int and isinstance(i, int)) |
|
i = make_int(vf) |
|
assert (i == expect_int and isinstance(i, int)) |
|
vs_out = format_amount(i, 8) |
|
# Strip |
|
for i in range(7): |
|
if vs_out[-1] == '0': |
|
vs_out = vs_out[:-1] |
|
if '.' in vs: |
|
assert (vs_out == vs) |
|
else: |
|
assert (vs_out[:-2] == vs) |
|
test_case('0', 0, 0) |
|
test_case('1', 1, 100000000) |
|
test_case('10', 10, 1000000000) |
|
test_case('0.00899999', 0.00899999, 899999) |
|
test_case('899999.0', 899999.0, 89999900000000) |
|
test_case('899999.00899999', 899999.00899999, 89999900899999) |
|
test_case('0.0', 0.0, 0) |
|
test_case('1.0', 1.0, 100000000) |
|
test_case('1.1', 1.1, 110000000) |
|
test_case('1.2', 1.2, 120000000) |
|
test_case('0.00899991', 0.00899991, 899991) |
|
test_case('0.0089999', 0.0089999, 899990) |
|
test_case('0.0089991', 0.0089991, 899910) |
|
test_case('0.123', 0.123, 12300000) |
|
test_case('123000.000123', 123000.000123, 12300000012300) |
|
|
|
try: |
|
make_int('0.123456789') |
|
assert (False) |
|
except Exception as e: |
|
assert (str(e) == 'Mantissa too long') |
|
validate_amount('0.12345678') |
|
|
|
# floor |
|
assert (make_int('0.123456789', r=-1) == 12345678) |
|
# Round up |
|
assert (make_int('0.123456789', r=1) == 12345679) |
|
|
|
def test_make_int12(self): |
|
def test_case(vs, vf, expect_int): |
|
i = make_int(vs, 12) |
|
assert (i == expect_int and isinstance(i, int)) |
|
i = make_int(vf, 12) |
|
assert (i == expect_int and isinstance(i, int)) |
|
vs_out = format_amount(i, 12) |
|
# Strip |
|
for i in range(7): |
|
if vs_out[-1] == '0': |
|
vs_out = vs_out[:-1] |
|
if '.' in vs: |
|
assert (vs_out == vs) |
|
else: |
|
assert (vs_out[:-2] == vs) |
|
test_case('0.123456789', 0.123456789, 123456789000) |
|
test_case('0.123456789123', 0.123456789123, 123456789123) |
|
try: |
|
make_int('0.1234567891234', 12) |
|
assert (False) |
|
except Exception as e: |
|
assert (str(e) == 'Mantissa too long') |
|
validate_amount('0.123456789123', 12) |
|
try: |
|
validate_amount('0.1234567891234', 12) |
|
assert (False) |
|
except Exception as e: |
|
assert ('Too many decimal places' in str(e)) |
|
try: |
|
validate_amount(0.1234567891234, 12) |
|
assert (False) |
|
except Exception as e: |
|
assert ('Too many decimal places' in str(e)) |
|
|
|
def test_ed25519(self): |
|
privkey = edu.get_secret() |
|
pubkey = edu.encodepoint(edf.scalarmult_B(privkey)) |
|
|
|
privkey_bytes = i2b(privkey) |
|
pubkey_test = ed25519_get_pubkey(privkey_bytes) |
|
assert (pubkey == pubkey_test) |
|
|
|
def test_ecdsa_otves(self): |
|
coin_settings = {'rpcport': 0, 'rpcauth': 'none'} |
|
coin_settings.update(REQUIRED_SETTINGS) |
|
ci = BTCInterface(coin_settings, 'regtest') |
|
vk_sign = ci.getNewSecretKey() |
|
vk_encrypt = ci.getNewSecretKey() |
|
|
|
pk_sign = ci.getPubkey(vk_sign) |
|
pk_encrypt = ci.getPubkey(vk_encrypt) |
|
sign_hash = secrets.token_bytes(32) |
|
|
|
cipher_text = ecdsaotves_enc_sign(vk_sign, pk_encrypt, sign_hash) |
|
|
|
assert (ecdsaotves_enc_verify(pk_sign, pk_encrypt, sign_hash, cipher_text)) |
|
|
|
sig = ecdsaotves_dec_sig(vk_encrypt, cipher_text) |
|
|
|
assert (ci.verifySig(pk_sign, sign_hash, sig)) |
|
|
|
recovered_key = ecdsaotves_rec_enc_key(pk_encrypt, cipher_text, sig) |
|
|
|
assert (vk_encrypt == recovered_key) |
|
|
|
def test_sign(self): |
|
coin_settings = {'rpcport': 0, 'rpcauth': 'none'} |
|
coin_settings.update(REQUIRED_SETTINGS) |
|
ci = BTCInterface(coin_settings, 'regtest') |
|
|
|
vk = ci.getNewSecretKey() |
|
pk = ci.getPubkey(vk) |
|
|
|
message = 'test signing message' |
|
message_hash = hashlib.sha256(bytes(message, 'utf-8')).digest() |
|
eck = PrivateKey(vk) |
|
sig = eck.sign(message.encode('utf-8')) |
|
|
|
ci.verifySig(pk, message_hash, sig) |
|
|
|
def test_sign_compact(self): |
|
coin_settings = {'rpcport': 0, 'rpcauth': 'none'} |
|
coin_settings.update(REQUIRED_SETTINGS) |
|
ci = BTCInterface(coin_settings, 'regtest') |
|
|
|
vk = ci.getNewSecretKey() |
|
pk = ci.getPubkey(vk) |
|
sig = ci.signCompact(vk, 'test signing message') |
|
assert (len(sig) == 64) |
|
ci.verifyCompactSig(pk, 'test signing message', sig) |
|
|
|
# Nonce is set deterministically (using default libsecp256k1 method rfc6979) |
|
sig2 = ci.signCompact(vk, 'test signing message') |
|
assert (sig == sig2) |
|
|
|
def test_sign_recoverable(self): |
|
coin_settings = {'rpcport': 0, 'rpcauth': 'none'} |
|
coin_settings.update(REQUIRED_SETTINGS) |
|
ci = BTCInterface(coin_settings, 'regtest') |
|
|
|
vk = ci.getNewSecretKey() |
|
pk = ci.getPubkey(vk) |
|
sig = ci.signRecoverable(vk, 'test signing message') |
|
assert (len(sig) == 65) |
|
pk_rec = ci.verifySigAndRecover(sig, 'test signing message') |
|
assert (pk == pk_rec) |
|
|
|
# Nonce is set deterministically (using default libsecp256k1 method rfc6979) |
|
sig2 = ci.signRecoverable(vk, 'test signing message') |
|
assert (sig == sig2) |
|
|
|
def test_pubkey_to_address(self): |
|
coin_settings = {'rpcport': 0, 'rpcauth': 'none'} |
|
coin_settings.update(REQUIRED_SETTINGS) |
|
ci = BTCInterface(coin_settings, 'regtest') |
|
pk = h2b('02c26a344e7d21bcc6f291532679559f2fd234c881271ff98714855edc753763a6') |
|
addr = ci.pubkey_to_address(pk) |
|
assert (addr == 'mj6SdSxmWRmdDqR5R3FfZmRiLmQfQAsLE8') |
|
|
|
def test_dleag(self): |
|
coin_settings = {'rpcport': 0, 'walletrpcport': 0, 'walletrpcauth': 'none'} |
|
coin_settings.update(REQUIRED_SETTINGS) |
|
|
|
ci = XMRInterface(coin_settings, 'regtest') |
|
|
|
key = ci.getNewSecretKey() |
|
proof = ci.proveDLEAG(key) |
|
assert (ci.verifyDLEAG(proof)) |
|
|
|
def test_rate(self): |
|
scale_from = 8 |
|
scale_to = 12 |
|
amount_from = make_int(100, scale_from) |
|
rate = make_int(0.1, scale_to) |
|
|
|
amount_to = int((amount_from * rate) // (10 ** scale_from)) |
|
assert ('100.00000000' == format_amount(amount_from, scale_from)) |
|
assert ('10.000000000000' == format_amount(amount_to, scale_to)) |
|
|
|
rate_check = make_int((amount_to / amount_from), scale_from) |
|
assert (rate == rate_check) |
|
|
|
scale_from = 12 |
|
scale_to = 8 |
|
amount_from = make_int(1, scale_from) |
|
rate = make_int(12, scale_to) |
|
|
|
amount_to = int((amount_from * rate) // (10 ** scale_from)) |
|
assert ('1.000000000000' == format_amount(amount_from, scale_from)) |
|
assert ('12.00000000' == format_amount(amount_to, scale_to)) |
|
|
|
rate_check = make_int((amount_to / amount_from), scale_from) |
|
assert (rate == rate_check) |
|
|
|
scale_from = 8 |
|
scale_to = 8 |
|
amount_from = make_int(0.073, scale_from) |
|
amount_to = make_int(10, scale_to) |
|
rate = make_int(amount_to / amount_from, scale_to, r=1) |
|
amount_to_recreate = int((amount_from * rate) // (10 ** scale_from)) |
|
assert ('10.00000000' == format_amount(amount_to_recreate, scale_to)) |
|
|
|
scale_from = 8 |
|
scale_to = 12 |
|
amount_from = make_int(10.0, scale_from) |
|
amount_to = make_int(0.06935, scale_to) |
|
rate = make_int(amount_to / amount_from, scale_from, r=1) |
|
amount_to_recreate = int((amount_from * rate) // (10 ** scale_from)) |
|
assert ('0.069350000000' == format_amount(amount_to_recreate, scale_to)) |
|
|
|
scale_from = 12 |
|
scale_to = 8 |
|
amount_from = make_int(0.06935, scale_from) |
|
amount_to = make_int(10.0, scale_to) |
|
rate = make_int(amount_to / amount_from, scale_from, r=1) |
|
amount_to_recreate = int((amount_from * rate) // (10 ** scale_from)) |
|
assert ('10.00000000' == format_amount(amount_to_recreate, scale_to)) |
|
|
|
coin_settings = {'rpcport': 0, 'rpcauth': 'none', 'walletrpcport': 0, 'walletrpcauth': 'none'} |
|
coin_settings.update(REQUIRED_SETTINGS) |
|
ci_xmr = XMRInterface(coin_settings, 'regtest') |
|
ci_btc = BTCInterface(coin_settings, 'regtest') |
|
|
|
for i in range(10000): |
|
test_pairs = random.randint(0, 3) |
|
if test_pairs == 0: |
|
ci_from = ci_btc |
|
ci_to = ci_xmr |
|
elif test_pairs == 1: |
|
ci_from = ci_xmr |
|
ci_to = ci_btc |
|
elif test_pairs == 2: |
|
ci_from = ci_xmr |
|
ci_to = ci_xmr |
|
else: |
|
ci_from = ci_btc |
|
ci_to = ci_btc |
|
|
|
test_range = random.randint(0, 5) |
|
if test_range == 0: |
|
amount_from = random.randint(10000, 1 * ci_from.COIN()) |
|
elif test_range == 1: |
|
amount_from = random.randint(10000, 1000 * ci_from.COIN()) |
|
elif test_range == 2: |
|
amount_from = random.randint(10000, 2100 * ci_from.COIN()) |
|
elif test_range == 3: |
|
amount_from = random.randint(10000, 210000 * ci_from.COIN()) |
|
elif test_range == 4: |
|
amount_from = random.randint(10000, 21000000 * ci_from.COIN()) |
|
else: |
|
amount_from = random.randint(10000, 2100000000 * ci_from.COIN()) |
|
|
|
test_range = random.randint(0, 5) |
|
if test_range == 0: |
|
amount_to = random.randint(10000, 1 * ci_to.COIN()) |
|
elif test_range == 1: |
|
amount_to = random.randint(10000, 1000 * ci_to.COIN()) |
|
elif test_range == 2: |
|
amount_to = random.randint(10000, 2100 * ci_to.COIN()) |
|
elif test_range == 3: |
|
amount_to = random.randint(10000, 210000 * ci_to.COIN()) |
|
elif test_range == 4: |
|
amount_to = random.randint(10000, 21000000 * ci_to.COIN()) |
|
else: |
|
amount_to = random.randint(10000, 2100000000 * ci_to.COIN()) |
|
|
|
offer_rate = ci_from.make_int(amount_to / amount_from, r=1) |
|
amount_to_from_rate: int = int((int(amount_from) * offer_rate) // (10 ** scale_from)) |
|
|
|
scale_from = 24 |
|
offer_rate = make_int(amount_to, scale_from) // amount_from |
|
amount_to_from_rate: int = int((int(amount_from) * offer_rate) // (10 ** scale_from)) |
|
|
|
if abs(amount_to - amount_to_from_rate) == 1: |
|
offer_rate += 1 |
|
|
|
offer_rate_human_read: int = int(offer_rate // (10 ** (scale_from - ci_from.exp()))) |
|
amount_to_from_rate: int = int((int(amount_from) * offer_rate) // (10 ** scale_from)) |
|
|
|
if amount_to != amount_to_from_rate: |
|
print('from exp, amount', ci_from.exp(), amount_from) |
|
print('to exp, amount', ci_to.exp(), amount_to) |
|
print('amount_to_from_rate', amount_to_from_rate) |
|
raise ValueError('Bad amount_to') |
|
|
|
scale_to = 24 |
|
reversed_rate = make_int(amount_from, scale_to) // amount_to |
|
|
|
amount_from_from_rate: int = int((int(amount_to) * reversed_rate) // (10 ** scale_to)) |
|
if abs(amount_from - amount_from_from_rate) == 1: |
|
reversed_rate += 1 |
|
|
|
amount_from_from_rate: int = int((int(amount_to) * reversed_rate) // (10 ** scale_to)) |
|
|
|
if amount_from != amount_from_from_rate: |
|
print('from exp, amount', ci_from.exp(), amount_from) |
|
print('to exp, amount', ci_to.exp(), amount_to) |
|
print('amount_from_from_rate', amount_from_from_rate) |
|
raise ValueError('Bad amount_from') |
|
|
|
def test_rfc2440(self): |
|
password = 'test' |
|
salt = bytes.fromhex('B7A94A7E4988630E') |
|
password_hash = rfc2440_hash_password(password, salt=salt) |
|
|
|
assert (password_hash == '16:B7A94A7E4988630E6095334BA67F06FBA509B2A7136A04C9C1B430F539') |
|
|
|
def test_ripemd160(self): |
|
input_data = b'hash this' |
|
assert (ripemd160(input_data).hex() == 'd5443a154f167e2c1332f6de72cfb4c6ab9c8c17') |
|
|
|
def test_hash160(self): |
|
# hash160 is RIPEMD(SHA256(data)) |
|
input_data = b'hash this' |
|
assert (hash160(input_data).hex() == '072985b3583a4a71f548494a5e1d5f6b00d0fe13') |
|
assert (hash160_btc(input_data).hex() == '072985b3583a4a71f548494a5e1d5f6b00d0fe13') |
|
|
|
def test_protobuf(self): |
|
msg_buf = BidMessage() |
|
msg_buf.protocol_version = 2 |
|
msg_buf.time_valid = 1024 |
|
serialised_msg = msg_buf.to_bytes() |
|
|
|
msg_buf_2 = BidMessage() |
|
msg_buf_2.from_bytes(serialised_msg) |
|
assert (msg_buf_2.protocol_version == 2) |
|
assert (msg_buf_2.time_valid == 1024) |
|
assert (msg_buf_2.amount == 0) |
|
assert (msg_buf_2.pkhash_buyer is not None) |
|
assert (len(msg_buf_2.pkhash_buyer) == 0) |
|
|
|
# Decode only the first field |
|
msg_buf_3 = BidMessage() |
|
msg_buf_3.from_bytes(serialised_msg[:2]) |
|
assert (msg_buf_3.protocol_version == 2) |
|
assert (msg_buf_3.time_valid == 0) |
|
|
|
try: |
|
msg_buf_4 = BidMessage(doesnotexist=1) |
|
except Exception as e: |
|
assert ('unexpected keyword argument' in str(e)) |
|
else: |
|
raise ValueError('Should have errored.') |
|
|
|
def test_is_private_ip_address(self): |
|
test_addresses = [ |
|
('localhost', True), |
|
('127.0.0.1', True), |
|
('10.0.0.0', True), |
|
('172.16.0.0', True), |
|
('192.168.0.0', True), |
|
('20.87.245.0', False), |
|
('particl.io', False), |
|
] |
|
for addr, is_private in test_addresses: |
|
assert (is_private_ip_address(addr) is is_private) |
|
|
|
def test_varint(self): |
|
test_vectors = [ |
|
(0, 1), |
|
(1, 1), |
|
(127, 1), |
|
(128, 2), |
|
(253, 2), |
|
(8321, 2), |
|
(16383, 2), |
|
(16384, 3), |
|
(2097151, 3), |
|
(2097152, 4), |
|
] |
|
for i, expect_length in test_vectors: |
|
b = encode_varint(i) |
|
assert (len(b) == expect_length) |
|
assert (decode_varint(b) == (i, expect_length)) |
|
|
|
def test_base58(self): |
|
kv = edu.get_secret() |
|
Kv = edu.encodepoint(edf.scalarmult_B(kv)) |
|
ks = edu.get_secret() |
|
Ks = edu.encodepoint(edf.scalarmult_B(ks)) |
|
|
|
addr = xmr_encode_address(Kv, Ks) |
|
assert (addr.startswith('4')) |
|
|
|
addr = xmr_encode_address(Kv, Ks, 4146) |
|
assert (addr.startswith('Wo')) |
|
|
|
def test_blake256(self): |
|
test_vectors = [ |
|
('716f6e863f744b9ac22c97ec7b76ea5f5908bc5b2f67c61510bfc4751384ea7a', b''), |
|
('7576698ee9cad30173080678e5965916adbb11cb5245d386bf1ffda1cb26c9d7', b'The quick brown fox jumps over the lazy dog'), |
|
] |
|
for expect_hash, data in test_vectors: |
|
assert (blake256(data).hex() == expect_hash) |
|
|
|
def test_extkey(self): |
|
test_key = 'XPARHAr37YxmFP8wyjkaHAQWmp84GiyLikL7EL8j9BCx4LkB8Q1Bw5Kr8sA1GA3Ym53zNLcaxxFHr6u81JVTeCaD61c6fKS1YRAuti8Zu5SzJCjh' |
|
test_key_c0 = 'XPARHAt1XMcNYAwP5wEnQXknBAkGSzaetdZt2eoJZehdB4WXfV1xbSjpgHe44AivmumcSejW5KaYx6L5M6MyR1WyXrsWTwaiUEfHq2RrqCfXj3ZW' |
|
test_key_c0_p = 'PPARTKPL4rp5WLnrYP6jZfuRjx6jrmvbsz5QdHofPfFqJdm918mQwdPLq6Dd9TkdbQeKUqjbHWkyzWe7Pftd7itzm7ETEoUMq4cbG4fY9FKH1YSU' |
|
test_key_c0h = 'XPARHAt1XMcNgWbv48LwoQbjs1bC8kCXKomzvJLRT5xmbQ2GKf9e8Vfr1MMcfiWJC34RyDp5HvAfjeiNyLDfkFm1UrRCrPkVC9GGaAWa3nXMWew8' |
|
|
|
ek_data = decodeAddress(test_key)[4:] |
|
|
|
ek = ExtKeyPair() |
|
ek.decode(ek_data) |
|
assert (ek.encode_v() == ek_data) |
|
|
|
m_0 = ek.derive(0) |
|
|
|
ek_c0_data = decodeAddress(test_key_c0)[4:] |
|
assert (m_0.encode_v() == ek_c0_data) |
|
|
|
child_no: int = 0 | (1 << 31) |
|
m_0h = ek.derive(child_no) |
|
|
|
ek_c0h_data = decodeAddress(test_key_c0h)[4:] |
|
assert (m_0h.encode_v() == ek_c0h_data) |
|
|
|
ek.neuter() |
|
assert (ek.has_key() is False) |
|
m_0 = ek.derive(0) |
|
|
|
ek_c0_p_data = decodeAddress(test_key_c0_p)[4:] |
|
assert (m_0.encode_p() == ek_c0_p_data) |
|
|
|
def test_mnemonic(self): |
|
entropy0: bytes = Mnemonic('english').to_entropy(mnemonics[0]) |
|
assert (entropy0.hex() == '0002207e9b744ea2d7ab41702f31f000') |
|
mnemonic_recovered: str = Mnemonic('english').to_mnemonic(entropy0) |
|
assert (mnemonic_recovered == mnemonics[0]) |
|
|
|
|
|
if __name__ == '__main__': |
|
unittest.main()
|
|
|