This repository was archived by the owner on Nov 15, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 186
Expand file tree
/
Copy pathUtils.py
More file actions
405 lines (323 loc) · 12 KB
/
Utils.py
File metadata and controls
405 lines (323 loc) · 12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
import binascii
from neo.Core.BigInteger import BigInteger
from neo.Core.Fixed8 import Fixed8
from neo.Core.Helper import Helper
from neo.Core.Blockchain import Blockchain
from neo.Wallets.Coin import CoinState
from neo.Core.TX.TransactionAttribute import TransactionAttribute, TransactionAttributeUsage
from neo.SmartContract.ContractParameter import ContractParameterType
from neo.Core.Cryptography.ECCurve import ECDSA
from decimal import Decimal
from neo.logging import log_manager
from neo.Wallets import NEP5Token
from neo.Core.Cryptography.Crypto import Crypto
from typing import TYPE_CHECKING
from neo.Network.common import blocking_prompt as prompt
if TYPE_CHECKING:
from neo.Wallets.Wallet import Wallet
logger = log_manager.getLogger()
def get_asset_attachments(params):
to_remove = []
neo_to_attach = None
gas_to_attach = None
for item in params:
if type(item) is str:
if '--attach-neo=' in item:
to_remove.append(item)
try:
neo_to_attach = Fixed8.TryParse(int(item.replace('--attach-neo=', '')))
except Exception as e:
pass
if '--attach-gas=' in item:
to_remove.append(item)
try:
gas_to_attach = Fixed8.FromDecimal(float(item.replace('--attach-gas=', '')))
except Exception as e:
pass
for item in to_remove:
params.remove(item)
return params, neo_to_attach, gas_to_attach
def get_owners_from_params(params):
to_remove = []
owners = None
for item in params:
if type(item) is str:
if '--owners=' in item:
owners = []
to_remove.append(item)
try:
owner_list = eval(item.replace('--owners=', ''))
owners = set()
for o in owner_list:
shash = Helper.AddrStrToScriptHash(o)
owners.add(shash)
except Exception as e:
logger.info("Could not parse owner %s " % e)
pass
for item in to_remove:
params.remove(item)
return params, owners
def get_asset_id(wallet, asset_str):
assetId = None
# check to see if this is a token
for token in wallet.GetTokens().values():
if asset_str == token.symbol:
return token
elif asset_str == token.ScriptHash.ToString():
return token
if asset_str.lower() == 'neo':
assetId = Blockchain.Default().SystemShare().Hash
elif asset_str.lower() == 'gas':
assetId = Blockchain.Default().SystemCoin().Hash
elif Blockchain.Default().GetAssetState(asset_str):
assetId = Blockchain.Default().GetAssetState(asset_str).AssetId
return assetId
def get_asset_amount(amount, assetId):
f8amount = Fixed8.TryParse(amount, require_positive=True)
if f8amount is None:
print("invalid amount format")
return False
elif f8amount.value % pow(10, 8 - Blockchain.Default().GetAssetState(assetId.ToBytes()).Precision) != 0:
print("incorrect amount precision")
return False
return f8amount
def get_withdraw_from_watch_only(wallet, scripthash_from):
withdraw_from_watch_only = 0
# check to see if contract address is in the wallet
wallet_contract = wallet.GetContract(scripthash_from)
# if it is not, check to see if it in the wallet watch_addr
if wallet_contract is None:
if scripthash_from in wallet._watch_only:
withdraw_from_watch_only = CoinState.WatchOnly
wallet_contract = scripthash_from
if wallet_contract is None:
print("please add this contract into your wallet before withdrawing from it")
print("Use import watch_addr {ADDR}, then rebuild your wallet")
return None
return withdraw_from_watch_only
def get_from_addr(params):
from_addr = None
for item in params:
if '--from-addr' in item:
params.remove(item)
from_addr = item.replace('--from-addr=', '')
return params, from_addr
def get_change_addr(params):
change_addr = None
for item in params:
if '--change-addr' in item:
params.remove(item)
change_addr = item.replace('--change-addr=', '')
return params, change_addr
def get_to_addr(params):
to_addr = None
for item in params:
if '--to-addr' in item:
params.remove(item)
to_addr = item.replace('--to-addr=', '')
return params, to_addr
def get_return_type_from_args(params):
return_type = None
for item in params:
if '--return-type' in item:
params.remove(item)
return_type = item.replace('--return-type=', '')
return params, return_type
def get_fee(params):
fee = None
for item in params:
if '--fee=' in item:
params.remove(item)
fee = get_asset_amount(item.replace('--fee=', ''), Blockchain.SystemCoin().Hash)
return params, fee
def get_parse_addresses(params):
if '--no-parse-addr' in params:
params.remove('--no-parse-addr')
return params, False
return params, True
def get_tx_attr_from_args(params):
tx_attr_dict = []
for item in params:
if '--tx-attr=' in item:
params.remove(item)
try:
attr_str = item.replace('--tx-attr=', '')
# this doesn't work for loading in bytearrays
# tx_attr_obj = json.loads(attr_str)
tx_attr_obj = eval(attr_str)
if type(tx_attr_obj) is dict:
if attr_obj_to_tx_attr(tx_attr_obj) is not None:
tx_attr_dict.append(attr_obj_to_tx_attr(tx_attr_obj))
elif type(tx_attr_obj) is list:
for obj in tx_attr_obj:
if attr_obj_to_tx_attr(obj) is not None:
tx_attr_dict.append(attr_obj_to_tx_attr(obj))
else:
logger.error("Invalid transaction attribute specification: %s " % type(tx_attr_obj))
except Exception as e:
logger.error("Could not parse json from tx attrs: %s " % e)
return params, tx_attr_dict
def attr_obj_to_tx_attr(obj):
try:
datum = obj['data']
if type(datum) is str:
datum = datum.encode('utf-8')
usage = obj['usage']
if usage == TransactionAttributeUsage.Script and len(datum) == 40:
datum = binascii.unhexlify(datum)
return TransactionAttribute(usage=usage, data=datum)
except Exception as e:
logger.error("could not convert object %s into TransactionAttribute: %s " % (obj, e))
return None
def parse_param(p, wallet=None, ignore_int=False, prefer_hex=True, parse_addr=True):
# first, we'll try to parse an array
try:
items = eval(p, {"__builtins__": {'list': list}}, {})
return items
except Exception as e:
# print("Could not eval items as array %s " % e)
pass
if not ignore_int:
try:
val = int(p)
out = BigInteger(val)
return out
except Exception as e:
pass
try:
val = eval(p, {"__builtins__": {'bytearray': bytearray, 'bytes': bytes, 'list': list}}, {})
if type(val) is bytearray:
return val
elif type(val) is bytes:
# try to unhex
try:
val = binascii.unhexlify(val)
except Exception as e:
pass
# now it should be unhexxed no matter what, and we can hex it
return val.hex().encode('utf-8')
elif type(val) is bool:
return val
except Exception as e:
pass
if type(p) is str:
if wallet is not None:
for na in wallet.NamedAddr:
if na.Title == p:
return bytearray(na.ScriptHash)
# check for address strings like 'ANE2ECgA6YAHR5Fh2BrSsiqTyGb5KaS19u' and
# convert them to a bytearray
if parse_addr and len(p) == 34 and p[0] == 'A':
addr = Helper.AddrStrToScriptHash(p).Data
return addr
if prefer_hex:
return binascii.hexlify(p.encode('utf-8'))
else:
return p.encode('utf-8')
return p
def get_arg(arguments, index=0, convert_to_int=False, do_parse=False):
try:
arg = arguments[index]
if convert_to_int:
return int(arg)
if do_parse:
return parse_param(arg)
return arg
except Exception as e:
pass
return None
def lookup_addr_str(wallet, addr):
for alias in wallet.NamedAddr:
if addr == alias.Title:
return alias.UInt160ScriptHash()
try:
script_hash = wallet.ToScriptHash(addr)
return script_hash
except Exception as e:
print(e)
def string_from_fixed8(amount, decimals):
precision_mult = pow(10, decimals)
amount = Decimal(amount) / Decimal(precision_mult)
formatter_str = '.%sf' % decimals
amount_str = format(amount, formatter_str)
return amount_str
def get_input_prompt(message):
return prompt(message)
def verify_params(ptype, param):
if ptype == ContractParameterType.String:
return str(param), False
elif ptype == ContractParameterType.Integer:
return int(param), False
elif ptype == ContractParameterType.Boolean:
return bool(param), False
elif ptype == ContractParameterType.PublicKey:
try:
return ECDSA.decode_secp256r1(param).G, False
except ValueError:
return None, True
elif ptype == ContractParameterType.ByteArray:
if isinstance(param, str) and len(param) == 34 and param[0] == 'A':
return Helper.AddrStrToScriptHash(param).Data, False
res = eval(param, {"__builtins__": {'bytearray': bytearray, 'bytes': bytes}}, {})
if isinstance(res, bytes):
return bytearray(res), False
return res, False
elif ptype == ContractParameterType.Array:
res = eval(param)
if isinstance(res, list):
return res, False
raise Exception("Please provide a list")
else:
raise Exception("Unknown param type %s " % ptype.name)
def gather_param(index, param_type, do_continue=True):
ptype = ContractParameterType(param_type)
prompt_message = '[Param %s] %s input: ' % (index, ptype.name)
try:
result = get_input_prompt(prompt_message)
except KeyboardInterrupt:
print("Input cancelled")
return None, True
except Exception as e:
print(str(e))
# no results, abort True
return None, True
try:
return verify_params(ptype, result)
except KeyboardInterrupt: # Control-C pressed: exit
return None, True
except Exception as e:
print("Could not parse param as %s : %s " % (ptype, e))
if do_continue:
return gather_param(index, param_type, do_continue)
return None, True
def get_token(wallet: 'Wallet', token_str: str) -> 'NEP5Token.NEP5Token':
"""
Try to get a NEP-5 token based on the symbol or script_hash
Args:
wallet: wallet instance
token_str: symbol or script_hash (accepts script hash with or without 0x prefix)
Raises:
ValueError: if token is not found
Returns:
NEP5Token instance if found.
"""
if token_str.startswith('0x'):
token_str = token_str[2:]
token = None
for t in wallet.GetTokens().values():
if token_str in [t.symbol, t.ScriptHash.ToString()]:
token = t
break
if not isinstance(token, NEP5Token.NEP5Token):
raise ValueError("The given token argument does not represent a known NEP5 token")
return token
def is_valid_public_key(key):
if len(key) != 66:
return False
try:
Crypto.ToScriptHash(key, unhex=True)
except Exception:
# the UINT160 inside ToScriptHash can throw Exception
return False
else:
return True