-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathhelper.py
347 lines (260 loc) · 8.34 KB
/
helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
from rbnf.core.Tokenizer import Tokenizer
import yapypy.extended_python.extended_ast as ex_ast
import ast
import typing as t
class ExprContextFixer(ast.NodeVisitor):
def __init__(self, ctx):
self.ctx = ctx
def _store_simply(self, node):
node.ctx = self.ctx
def _store_recursively(self, node):
node.ctx = self.ctx
self.generic_visit(node)
def _store_ex_dict(self, node: ex_ast.ExDict):
node.ctx = self.ctx
for each in node.values:
self.visit(each)
visit_Name = _store_simply
visit_Subscript = _store_simply
visit_Attribute = _store_simply
visit_Tuple = _store_recursively
visit_List = _store_recursively
visit_ExDict = _store_ex_dict
visit_Starred = _store_recursively
_fix_store = ExprContextFixer(ast.Store()).visit
_fix_del = ExprContextFixer(ast.Del()).visit
class Loc:
def __matmul__(self, other: t.Union[ast.AST, Tokenizer]):
return {
'lineno': other.lineno,
'col_offset':
other.col_offset if hasattr(other, 'col_offset') else other.colno
}
class LocatedError(Exception):
def __init__(self, lineno: int, exc: Exception):
self.lineno = lineno
self.exc = exc
loc = Loc()
def as_store(it):
if hasattr(it, '_fields'):
_fix_store(it)
return it
return it
def as_del(it):
if hasattr(it, '_fields'):
_fix_del(it)
return it
return it
def _parse_expr(token: Tokenizer):
expr = ast.parse(token.value).body[0].value
expr.lineno = token.lineno
expr.col_offset = token.colno
return expr
def _value(code: Tokenizer):
return code.value
def raise_exp(e):
raise e
def str_maker(*strs: Tokenizer):
head = strs[0]
return ast.JoinedStr(**(loc @ head), values=list(map(_parse_expr, strs)))
def atom_expr_rewrite(a: t.Optional[Tokenizer], atom: ast.AST,
trailers: t.List[t.Callable[[ast.AST], ast.Suite]]):
for each in trailers:
atom = each(atom)
if a:
atom = ast.Await(**(loc @ a), value=atom)
return atom
def shift_expr_rewrite(head, tail):
if tail:
for op, each in tail:
head = ast.BinOp(head,
{
'>>': ast.RShift,
'<<': ast.LShift
}[op.value](), each, **loc @ op)
return head
def comp_op_rewrite(op: t.Union[Tokenizer, t.List[Tokenizer]]):
"""
('<'|'>'|'=='|'>='|'<='|'<>'|'!='|'in'|'not' 'in'|'is'|'is' 'not')
"""
if isinstance(op, list):
op = tuple(map(lambda it: it.value, op))
else:
op = op.value
return {
'<': ast.Lt,
'>': ast.Gt,
'==': ast.Eq,
'>=': ast.GtE,
'<=': ast.LtE,
'<>': lambda: raise_exp(NotImplemented),
'!=': ast.NotEq,
'in': ast.In,
('is', ): ast.Is,
('is', 'not'): ast.IsNot,
('not', 'in'): ast.NotIn,
}[op]()
def expr_rewrite(head, tail):
if tail:
for op, each in tail:
head = ast.BinOp(head, ast.BitOr(), each, **loc @ op)
return head
def xor_expr_rewrite(head, tail):
if tail:
for op, each in tail:
head = ast.BinOp(head, ast.BitXor(), each, **loc @ op)
return head
def and_expr_rewrite(head, tail):
if tail:
for op, each in tail:
head = ast.BinOp(head, ast.BitAnd(), each, **loc @ op)
return head
def arith_expr_rewrite(head, tail):
if tail:
for op, each in tail:
head = ast.BinOp(
head,
{
'+': ast.Add,
'-': ast.Sub
}[op.value](),
each,
**loc @ op,
)
return head
def term_rewrite(head, tail):
if tail:
for op, each in tail:
head = ast.BinOp(
head,
{
'*': ast.Mult,
'@': ast.MatMult,
'%': ast.Mod,
'//': ast.FloorDiv,
'/': ast.Div
}[op.value](),
each,
**loc @ op,
)
return head
def factor_rewrite(mark: Tokenizer, factor, power):
return power if power else ast.UnaryOp(
**(loc @ mark),
op={
'~': ast.Invert,
'+': ast.UAdd,
'-': ast.USub
}[mark.value](),
operand=factor,
)
def split_args_helper(arglist):
positional = []
keywords = []
for each in arglist:
if isinstance(each, ast.keyword):
keywords.append(each)
else:
positional.append(each)
return positional, keywords
def def_rewrite(mark: Tokenizer,
name: Tokenizer,
args: ast.arguments,
ret: ast.AST,
body: list,
is_async: bool = False):
name = name.value
ty = ast.AsyncFunctionDef if is_async else ast.FunctionDef
return ty(name, args, body, [], ret, **loc @ mark)
def augassign_rewrite(it: Tokenizer):
return {
'+=': ast.Add,
'-=': ast.Sub,
'*=': ast.Mult,
'/=': ast.Div,
'//=': ast.FloorDiv,
'@=': ast.MatMult,
'%=': ast.Mod,
'&=': ast.BitAnd,
'|=': ast.BitOr,
'^=': ast.BitXor,
'<<=': ast.LShift,
'>>=': ast.RShift,
'**=': ast.Pow,
}[it.value]
def expr_stmt_rewrite(lhs, ann, aug, aug_exp, rhs: t.Optional[list]):
if rhs:
as_store(lhs)
*init, end = rhs
for each in init:
as_store(each)
return ast.Assign([lhs, *init], end)
if ann:
as_store(lhs)
anno, value = ann
return ast.AnnAssign(lhs, anno, value, 1)
if aug_exp:
as_store(lhs)
return ast.AugAssign(lhs, aug(), aug_exp)
# NO AS STORE HERE!
return ast.Expr(lhs)
def if_stmt_rewrite(marks, tests, bodies, orelse):
orelse = orelse or []
head = None
for mark, test, body, in reversed(tuple(zip(marks, tests, bodies))):
head = ast.If(test, body, orelse, **loc @ mark)
orelse = [head]
return head
def while_stmt_rewrite(test, body, orelse):
orelse = orelse or []
return ast.While(test, body, orelse)
def for_stmt_rewrite(target, iter, body, orelse, is_async=False):
orelse = orelse or []
as_store(target)
ty = ast.AsyncFor if is_async else ast.For
return ty(target, iter, body, orelse)
def try_stmt_rewrite(mark, body, excs, rescues, orelse, final):
excs = excs or []
rescues = rescues or []
def handlers():
for (type, name), body in zip(excs, rescues):
yield ast.ExceptHandler(type, name, body)
return ast.Try(body, list(handlers()), orelse or [], final or [], **loc @ mark)
def with_stmt_rewrite(mark, items, body, is_async=False):
ty = ast.AsyncWith if is_async else ast.With
return ty(items, body, **loc @ mark)
def check_call_args(loc, seq: t.List[ast.expr]):
in_keyword_section = False
for each in seq:
if isinstance(each, ast.keyword):
in_keyword_section = True
elif in_keyword_section and not isinstance(each, ast.Starred):
error = SyntaxError()
error.lineno = loc['lineno']
error.msg = 'non-keyword argument follows keyword argument'
raise error
return seq
def atom_rewrite(loc, name, token, value, number, strs, namedc, ellipsis, dict, is_dict,
is_gen, is_list, comp, yield_expr):
if name:
if not token:
return ast.Name(name.value, ast.Load(), **loc @ name)
return ex_ast.AssignExpr(
ast.Name(name.value, ast.Store(), **loc @ name), value=value, **loc @ token)
if number:
return ast.Num(eval(number.value), **loc @ number)
if strs:
return str_maker(*strs)
if ellipsis:
return ast.Ellipsis()
if namedc:
return ast.NameConstant(eval(namedc.value), **loc @ namedc)
if is_dict:
return dict or ex_ast.ExDict([], [], ast.Load(), **loc @ is_dict)
if is_gen:
if yield_expr:
return yield_expr
return comp(is_tuple=True) if comp else ast.Tuple([], ast.Load(), **loc @ is_gen)
if is_list:
return comp(is_list=True) if comp else ast.List([], ast.Load(), **loc @ is_list)
raise TypeError