-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcollection_utils.py
140 lines (106 loc) · 3.49 KB
/
collection_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import json
import logging
from json import JSONEncoder
from decimal import Decimal
from typing import Any, Callable, Optional, Iterable
from .mypy import Predicate
logger = logging.getLogger(__name__)
def add_list_elem(dct, key, elem):
if key in dct and dct[key] is not None:
dct[key].append(elem)
else:
dct[key] = [elem]
def map_dict(dct: dict, map_fn: Callable) -> dict:
return {k: map_fn(k, v) for k, v in dct.items()}
def to_list(dct: dict, sort_by_key: bool = False, sorting_key_fn: Optional[Callable] = None):
lst = list(dct.items())
if sort_by_key:
lst.sort(key=lambda e: e[0])
elif sorting_key_fn:
lst.sort(key=sorting_key_fn)
return lst
def split_chunks(lst: list, chunk_size: int) -> list[list]:
chunks = []
i = 0
while i < len(lst):
chunks.append(lst[i: i + chunk_size])
i += chunk_size
return chunks
def filter_map(dct: dict, predicate: Predicate) -> dict:
return {k: dct[k] for k in dct if predicate(k)}
def filter_map_keys(dct: dict, keys: Iterable) -> dict:
return filter_map(dct, lambda k: k in keys)
def get_or_add_new(dct: dict, key: Any, new_elem_func: Callable) -> Any:
if key not in dct:
dct[key] = new_elem_func()
return dct[key]
def is_list_or_tuple(x: Any) -> bool:
return isinstance(x, (list, tuple))
def list2comma_separated(lst: list) -> str:
if not lst:
return ''
lst = [str(e) for e in lst]
return ','.join(lst)
def dict_encode(obj, encoder: Callable = None):
"""
Useful for serializing objects to JSON
Use: json.dumps(dict_encode(obj, encoder=some_encoder_func))
"""
if obj is None:
return None
if isinstance(obj, (str, int, float, bool)):
return obj
if isinstance(obj, dict):
rv = {}
for k, v in obj.items():
rv[k] = dict_encode(v, encoder)
return rv
if hasattr(obj, '__iter__'):
return [dict_encode(e, encoder) for e in obj]
if not hasattr(obj, '__dict__'):
return obj
if not encoder:
raise Exception(f"Can't encode object of type {type(obj)} without an encoder")
return dict_encode(encoder(obj), encoder)
def first(predicate: Predicate, iterable: Iterable) -> Optional[Any]:
for item in iterable:
if predicate(item): return item
return None
def exists(predicate: Predicate, iterable: Iterable) -> bool:
return first(predicate, iterable) is not None
class DictJsonEncoder(JSONEncoder):
def default(self, o):
dictionary = getattr(o, '__dict__', None)
if dictionary:
return dictionary
elif isinstance(o, Decimal):
return float(o)
else:
try:
return json.dumps(o)
except TypeError:
logger.warning(f"Don't know how to json encode {o} - using str")
return str(o)
def filter_json_encodable(dct: dict) -> dict:
rv = {}
for k, v in dct.items():
try:
json.dumps(k)
json.dumps(v)
rv[k] = v
except:
pass
return rv
def load_json(string: str, default: Optional[Any] = None) -> Any:
"""
will load the json string if there is something in str
otherwise will return the default object if set, otherwise {}
"""
if string: return json.loads(string)
if default is not None: return default
return {}
def contains_non_empty(lst: list) -> bool:
for e in lst:
if e:
return True
return False