basicswap_miserver/basicswap/util.py
tecnovert deb71856e8
Poll chainstates.
Litecoin download link changed.
Fix fee comparison tx weight difference.
Remove format8.
New stalled for test bid state.
Moved sequence code to coin interfaces.
Display estimated time lock refund tx will be valid.
2021-02-03 16:01:27 +02:00

327 lines
8.1 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (c) 2018-2021 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import json
import time
import struct
import decimal
import hashlib
from .script import OpCodes
from .contrib.segwit_addr import bech32_decode, convertbits, bech32_encode
COIN = 100000000
decimal_ctx = decimal.Context()
decimal_ctx.prec = 20
def assert_cond(v, err='Bad opcode'):
if not v:
raise ValueError(err)
def toBool(s):
return s.lower() in ["1", "true"]
def jsonDecimal(obj):
if isinstance(obj, decimal.Decimal):
return str(obj)
raise TypeError
def dumpj(jin, indent=4):
return json.dumps(jin, indent=indent, default=jsonDecimal)
def dumpje(jin):
return json.dumps(jin, default=jsonDecimal).replace('"', '\\"')
__b58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
def b58decode(v, length=None):
long_value = 0
for (i, c) in enumerate(v[::-1]):
ofs = __b58chars.find(c)
if ofs < 0:
return None
long_value += ofs * (58**i)
result = bytes()
while long_value >= 256:
div, mod = divmod(long_value, 256)
result = bytes((mod,)) + result
long_value = div
result = bytes((long_value,)) + result
nPad = 0
for c in v:
if c == __b58chars[0]:
nPad += 1
else:
break
pad = bytes((0,)) * nPad
result = pad + result
if length is not None and len(result) != length:
return None
return result
def b58encode(v):
long_value = 0
for (i, c) in enumerate(v[::-1]):
long_value += (256**i) * c
result = ''
while long_value >= 58:
div, mod = divmod(long_value, 58)
result = __b58chars[mod] + result
long_value = div
result = __b58chars[long_value] + result
# leading 0-bytes in the input become leading-1s
nPad = 0
for c in v:
if c == 0:
nPad += 1
else:
break
return (__b58chars[0] * nPad) + result
def decodeWif(network_key):
key = b58decode(network_key)[1:-4]
if len(key) == 33:
return key[:-1]
return key
def toWIF(prefix_byte, b, compressed=True):
b = bytes((prefix_byte,)) + b
if compressed:
b += bytes((0x01,))
b += hashlib.sha256(hashlib.sha256(b).digest()).digest()[:4]
return b58encode(b)
def bech32Decode(hrp, addr):
hrpgot, data = bech32_decode(addr)
if hrpgot != hrp:
return None
decoded = convertbits(data, 5, 8, False)
if decoded is None or len(decoded) < 2 or len(decoded) > 40:
return None
return bytes(decoded)
def bech32Encode(hrp, data):
ret = bech32_encode(hrp, convertbits(data, 8, 5))
if bech32Decode(hrp, ret) is None:
return None
return ret
def decodeAddress(address_str):
b58_addr = b58decode(address_str)
if b58_addr is not None:
address = b58_addr[:-4]
checksum = b58_addr[-4:]
assert(hashlib.sha256(hashlib.sha256(address).digest()).digest()[:4] == checksum), 'Checksum mismatch'
return b58_addr[:-4]
return None
def encodeAddress(address):
checksum = hashlib.sha256(hashlib.sha256(address).digest()).digest()
return b58encode(address + checksum[0:4])
def getKeyID(bytes):
data = hashlib.sha256(bytes).digest()
return hashlib.new("ripemd160", data).digest()
def pubkeyToAddress(prefix, pubkey):
return encodeAddress(bytes((prefix,)) + getKeyID(pubkey))
def SerialiseNum(n):
if n == 0:
return bytes((0x00,))
if n > 0 and n <= 16:
return bytes((0x50 + n,))
rv = bytearray()
neg = n < 0
absvalue = -n if neg else n
while(absvalue):
rv.append(absvalue & 0xff)
absvalue >>= 8
if rv[-1] & 0x80:
rv.append(0x80 if neg else 0)
elif neg:
rv[-1] |= 0x80
return bytes((len(rv),)) + rv
def DeserialiseNum(b, o=0):
if b[o] == 0:
return 0
if b[o] > 0x50 and b[o] <= 0x50 + 16:
return b[o] - 0x50
v = 0
nb = b[o]
o += 1
for i in range(0, nb):
v |= b[o + i] << (8 * i)
# If the input vector's most significant byte is 0x80, remove it from the result's msb and return a negative.
if b[o + nb - 1] & 0x80:
return -(v & ~(0x80 << (8 * (nb - 1))))
return v
def decodeScriptNum(script_bytes, o):
v = 0
num_len = script_bytes[o]
if num_len >= OpCodes.OP_1 and num_len <= OpCodes.OP_16:
return((num_len - OpCodes.OP_1) + 1, 1)
if num_len > 4:
raise ValueError('Bad scriptnum length') # Max 4 bytes
if num_len + o >= len(script_bytes):
raise ValueError('Bad script length')
o += 1
for i in range(num_len):
b = script_bytes[o + i]
# Negative flag set in last byte, if num is positive and > 0x80 an extra 0x00 byte will be appended
if i == num_len - 1 and b & 0x80:
b &= (~(0x80) & 0xFF)
v += int(b) << 8 * i
v *= -1
else:
v += int(b) << 8 * i
return(v, 1 + num_len)
def getCompactSizeLen(v):
# Compact Size
if v < 253:
return 1
if v <= 0xffff: # USHRT_MAX
return 3
if v <= 0xffffffff: # UINT_MAX
return 5
if v <= 0xffffffffffffffff: # UINT_MAX
return 9
raise ValueError('Value too large')
def SerialiseNumCompact(v):
if v < 253:
return bytes((v,))
if v <= 0xffff: # USHRT_MAX
return struct.pack("<BH", 253, v)
if v <= 0xffffffff: # UINT_MAX
return struct.pack("<BI", 254, v)
if v <= 0xffffffffffffffff: # UINT_MAX
return struct.pack("<BQ", 255, v)
raise ValueError('Value too large')
def float_to_str(f):
# stackoverflow.com/questions/38847690
d1 = decimal_ctx.create_decimal(repr(f))
return format(d1, 'f')
def make_int(v, scale=8, r=0): # r = 0, no rounding, fail, r > 0 round up, r < 0 floor
if type(v) == float:
v = float_to_str(v)
elif type(v) == int:
return v * 10 ** scale
ep = 10 ** scale
have_dp = False
rv = 0
for c in v:
if c == '.':
rv *= ep
have_dp = True
continue
if not c.isdigit():
raise ValueError('Invalid char: ' + c)
if have_dp:
ep //= 10
if ep <= 0:
if r == 0:
raise ValueError('Mantissa too long')
if r > 0:
# Round up
if int(c) > 4:
rv += 1
break
rv += ep * int(c)
else:
rv = rv * 10 + int(c)
if not have_dp:
rv *= ep
return rv
def validate_amount(amount, scale=8):
str_amount = float_to_str(amount) if type(amount) == float else str(amount)
has_decimal = False
for c in str_amount:
if c == '.' and not has_decimal:
has_decimal = True
continue
if not c.isdigit():
raise ValueError('Invalid amount')
ar = str_amount.split('.')
if len(ar) > 1 and len(ar[1]) > scale:
raise ValueError('Too many decimal places in amount {}'.format(str_amount))
return True
def format_amount(i, display_scale, scale=None):
if not isinstance(i, int):
raise ValueError('Amount must be an integer.') # Raise error instead of converting as amounts should always be integers
if scale is None:
scale = display_scale
ep = 10 ** scale
n = abs(i)
quotient = n // ep
remainder = n % ep
if display_scale != scale:
remainder %= (10 ** display_scale)
rv = '{}.{:0>{scale}}'.format(quotient, remainder, scale=display_scale)
if i < 0:
rv = '-' + rv
return rv
def format_timestamp(value, with_seconds=False):
str_format = '%Y-%m-%d %H:%M'
if with_seconds:
str_format += ':%S'
return time.strftime(str_format, time.localtime(value))
def getP2SHScriptForHash(p2sh):
return bytes((OpCodes.OP_HASH160, 0x14)) \
+ p2sh \
+ bytes((OpCodes.OP_EQUAL,))
def getP2WSH(script):
return bytes((OpCodes.OP_0, 0x20)) + hashlib.sha256(script).digest()