-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutil.py
More file actions
82 lines (65 loc) · 2.3 KB
/
util.py
File metadata and controls
82 lines (65 loc) · 2.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import logging
import sqlite3
import typing as t
from collections import OrderedDict
import attr
import hishel
from attrs import define
from cattrs.preconf.json import make_converter as make_json_converter
from cattrs.preconf.pyyaml import make_converter as make_yaml_converter
from cratedb_about.settings import settings
logger = logging.getLogger()
@define
class Metadata:
version: t.Union[float, None] = None
type: t.Union[str, None] = None
@define
class DictTools:
def to_dict(self) -> t.Dict[str, t.Any]:
return attr.asdict(self, dict_factory=OrderedDict)
@classmethod
def from_dict(cls, data: t.Dict[str, t.Any]):
return cls(**data)
@define
class Dumpable(DictTools):
"""
Todo: Refactor to `pueblo.data`.
"""
meta: t.Union[Metadata, None] = None
def to_json(self) -> str:
converter = make_json_converter(dict_factory=OrderedDict)
return converter.dumps(self.to_dict())
def to_yaml(self) -> str:
converter = make_yaml_converter(dict_factory=OrderedDict)
return converter.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str):
converter = make_json_converter(dict_factory=OrderedDict)
return converter.loads(json_str, cls)
@classmethod
def from_yaml(cls, yaml_str: str):
converter = make_yaml_converter(dict_factory=OrderedDict)
return converter.loads(yaml_str, cls)
def get_cache_client(ttl: t.Optional[t.Union[int, float]] = settings.http_cache_ttl):
"""
Return the configured cache client.
https://hishel.com/
"""
# Configure Hishel, a httpx client with caching.
logger.info(f"Configuring cache. ttl={ttl}, path={settings.http_cache_path}")
try:
controller = hishel.Controller(allow_stale=True)
storage = hishel.SQLiteStorage(
connection=sqlite3.connect(settings.http_cache_path, check_same_thread=False),
ttl=ttl,
)
return hishel.CacheClient(
controller=controller, storage=storage, timeout=settings.http_timeout
)
except Exception as e:
msg = (
f"Failed to configure Hishel cache with SQLite. "
f"ttl={ttl}, path={settings.http_cache_path}. Reason: {e}"
)
logger.exception(msg)
raise e.__class__(msg) from e