Source code for golos.broadcast

# -*- coding: utf-8 -*-

import hashlib
import struct
import time
from binascii import hexlify, unhexlify
from calendar import timegm
from datetime import datetime, timedelta

import ecdsa

from .base58 import Base58
from .operations import type_op
from .storage import chain_id, expiration, operations, prefix, time_format, time_format_utc


[docs]class Tx:
[docs] def __init__(self, rpc, **kwargs): self.rpc = rpc self.expiration = int(kwargs.pop("expiration", expiration)) self.time_format = time_format self.time_format_utc = time_format_utc self.chain_id = chain_id self.prefix = prefix
# ----- broadcast -----
[docs] def finalizeOp(self, ops, wif): tx = self.constructTx(ops, wif) # Строим начальную транзакцию # result = self.rpc.call('broadcast_transaction', tx) ### если успешно, то ничего нет {} поэтому проверяем на тип данных False == bool result = self.rpc.call('broadcast_transaction_synchronous', tx) if isinstance(result, bool): return False tx["block_num"] = result["block_num"] tx["id"] = result["id"] return tx
[docs] def constructTx(self, ops, wif): tx = { "ref_block_num": None, # construct "ref_block_prefix": None, # construct "expiration": None, # construct "operations": ops, "extensions": [], "signatures": [] # construct } # get_block_params Auxiliary method to obtain ref_block_num and ref_block_prefix. Requires a websocket # connection to a witness node! Постоянно вылетает на это месте - обвязка через try n = 0 while True: try: props = self.rpc.call('get_dynamic_global_properties') tx["ref_block_num"] = props["head_block_number"] & 0xFFFF tx["ref_block_prefix"] = struct.unpack_from("<I", unhexlify(props["head_block_id"]), 4)[0] break except: print('error constructTx get_block_params') n += 1 if n >= 10: return (tx) time.sleep(1) # Properly Format Time that is x seconds in the future :param int secs: Seconds to go in the future (x>0) or # the past (x<0) now = datetime.strptime(props["time"], self.time_format) new = timedelta(seconds=self.expiration) + now tx["expiration"] = new.strftime(self.time_format) digest = self.get_digest(tx) # получаем хэш транзакции sigs = self.sign(wif, digest) # получаем подпись tx["signatures"] = sigs return tx
[docs] def get_digest(self, tx): b = b'' b += struct.pack("<H", tx["ref_block_num"]) # Uint16 b += struct.pack("<I", tx["ref_block_prefix"]) # Uint32 b += struct.pack("<I", timegm(time.strptime((tx["expiration"] + "UTC"), self.time_format_utc))) # PointInTime ops = tx["operations"] b += self.varint(len(ops)) # Array len() for op in ops: b += self.varint(operations[op[0]]) # Id op[0] name operations in dict for key, type_value in type_op[op[0]]: b += bytes(type_value(op[1][key])) b += self.varint(len(tx["extensions"])) # Set = Array [] message = unhexlify(chain_id) + b # chain_id + tx digest = hashlib.sha256(message).digest() return digest
[docs] def varint(self, n): # Varint encoding data = b'' while n >= 0x80: data += bytes([(n & 0x7f) | 0x80]) n >>= 7 data += bytes([n]) return data
[docs] def sign(self, wif, digest): # digest # Sign the message private key sigs = [] p = bytes(Base58(wif, prefix=self.prefix)) i = 0 cnt = 0 sk = ecdsa.SigningKey.from_string(p, curve=ecdsa.SECP256k1) while True: cnt += 1 # print('cnt', cnt) if not cnt % 20: print("Still searching for a canonical signature. Tried %d times already!" % cnt) # Deterministic k k = ecdsa.rfc6979.generate_k( sk.curve.generator.order(), sk.privkey.secret_multiplier, hashlib.sha256, hashlib.sha256( digest + struct.pack("d", time.time()) # use the local time to randomize the signature ).digest()) # Sign message sigder = sk.sign_digest( digest, sigencode=ecdsa.util.sigencode_der, k=k) # Reformating of signature r, s = ecdsa.util.sigdecode_der(sigder, sk.curve.generator.order()) signature = ecdsa.util.sigencode_string(r, s, sk.curve.generator.order()) # Make sure signature is canonical! lenR = sigder[3] lenS = sigder[5 + lenR] if lenR is 32 and lenS is 32: # Derive the recovery parameter pubkey = sk.get_verifying_key() # i = self.recoverPubkeyParameter(digest, signature, sk.get_verifying_key()) for ii in range(0, 4): # print('ii', ii) ### p = self.recover_public_key(digest, signature, ii) # if (p.to_string() == pubkey.to_string() or self.compressedPubkey(p) == pubkey.to_string()): if p.to_string() == pubkey.to_string(): # print('find GLS') ### break ii += 4 # compressed ii += 27 # compact break # pack signature sigstr = struct.pack("<B", ii) sigstr += signature sigs.append(hexlify(sigstr).decode('ascii')) # sigs.append(Signature(sigstr)) # self.data["signatures"] = Array(sigs) return sigs
[docs] def recover_public_key(self, digest, signature, i): """ Recover the public key from the the signature """ # See http: //www.secg.org/download/aid-780/sec1-v2.pdf # section 4.1.6 primarily curve = ecdsa.SECP256k1.curve G = ecdsa.SECP256k1.generator order = ecdsa.SECP256k1.order yp = (i % 2) r, s = ecdsa.util.sigdecode_string(signature, order) # 1.1 x = r + (i // 2) * order # 1.3. This actually calculates for either effectively # 02||X or 03||X depending on 'k' instead of always # for 02||X as specified. # This substitutes for the lack of reversing R later on. # -R actually is defined to be just flipping the y-coordinate # in the elliptic curve. alpha = ((x * x * x) + (curve.a() * x) + curve.b()) % curve.p() beta = ecdsa.numbertheory.square_root_mod_prime(alpha, curve.p()) y = beta if (beta - yp) % 2 == 0 else curve.p() - beta # 1.4 Constructor of Point is supposed to check if nR is at infinity. R = ecdsa.ellipticcurve.Point(curve, x, y, order) # 1.5 Compute e e = ecdsa.util.string_to_number(digest) # 1.6 Compute Q = r^-1(sR - eG) Q = ecdsa.numbertheory.inverse_mod(r, order) * (s * R + (-e % order) * G) # Not strictly necessary, but let's verify the message for # paranoia's sake. if not ecdsa.VerifyingKey.from_public_point( Q, curve=ecdsa.SECP256k1).verify_digest( signature, digest, sigdecode=ecdsa.util.sigdecode_string): return None return ecdsa.VerifyingKey.from_public_point(Q, curve=ecdsa.SECP256k1)
[docs] def compressedPubkey(self, pk): order = pk.curve.generator.order() p = pk.pubkey.point x_str = ecdsa.util.number_to_string(p.x(), order) return bytes(chr(2 + (p.y() & 1)), 'ascii') + x_str
# ----- main ----- if __name__ == '__main__': pass