Skip to content

Commit b391677

Browse files
committed
feat: add temporary result storage and smoke test options
1 parent 448487d commit b391677

File tree

5 files changed

+99
-1
lines changed

5 files changed

+99
-1
lines changed

tests/smoke.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,15 @@
1111
from rich.table import Table
1212

1313
from wherobots.db import connect, connect_direct
14-
from wherobots.db.constants import DEFAULT_ENDPOINT, DEFAULT_SESSION_TYPE
14+
from wherobots.db.constants import (
15+
DEFAULT_ENDPOINT,
16+
DEFAULT_SESSION_TYPE,
17+
DEFAULT_STORAGE_FORMAT,
18+
)
1519
from wherobots.db.connection import Connection
1620
from wherobots.db.region import Region
1721
from wherobots.db.session_type import SessionType
22+
from wherobots.db.result_storage import StorageFormat, Store
1823

1924
if __name__ == "__main__":
2025
parser = argparse.ArgumentParser()
@@ -48,7 +53,35 @@
4853
parser.add_argument(
4954
"--wide", help="Enable wide output", action="store_const", const=80, default=30
5055
)
56+
parser.add_argument(
57+
"-s",
58+
"--store",
59+
help="Store results in temporary storage",
60+
action="store_true",
61+
)
5162
parser.add_argument("sql", nargs="+", help="SQL query to execute")
63+
64+
args, unknown = parser.parse_known_args()
65+
if args.store:
66+
parser.add_argument(
67+
"-sf",
68+
"--storage-format",
69+
help="Storage format for the results",
70+
default=DEFAULT_STORAGE_FORMAT,
71+
choices=[sf.value for sf in StorageFormat],
72+
)
73+
parser.add_argument(
74+
"--single",
75+
help="Generate only a single part file",
76+
action="store_true",
77+
)
78+
parser.add_argument(
79+
"-p",
80+
"--presigned-url",
81+
help="Generate a presigned URL for the results (only when --single is set)",
82+
action="store_true",
83+
)
84+
5285
args = parser.parse_args()
5386

5487
logging.basicConfig(
@@ -73,6 +106,16 @@
73106
token = f.read().strip()
74107
headers = {"Authorization": f"Bearer {token}"}
75108

109+
store = None
110+
if args.store:
111+
store = Store(
112+
format=StorageFormat(args.storage_format)
113+
if args.storage_format
114+
else DEFAULT_STORAGE_FORMAT,
115+
single=args.single,
116+
generate_presigned_url=args.presigned_url,
117+
)
118+
76119
if args.ws_url:
77120
conn_func = functools.partial(connect_direct, uri=args.ws_url, headers=headers)
78121
else:
@@ -86,6 +129,7 @@
86129
region=Region(args.region) if args.region else Region.AWS_US_WEST_2,
87130
version=args.version,
88131
session_type=SessionType(args.session_type),
132+
store=store,
89133
)
90134

91135
def render(results: pandas.DataFrame) -> None:

wherobots/db/connection.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
)
2525
from wherobots.db.cursor import Cursor
2626
from wherobots.db.errors import NotSupportedError, OperationalError
27+
from wherobots.db.result_storage import Store
2728

2829

2930
@dataclass
@@ -56,12 +57,14 @@ def __init__(
5657
results_format: Union[ResultsFormat, None] = None,
5758
data_compression: Union[DataCompression, None] = None,
5859
geometry_representation: Union[GeometryRepresentation, None] = None,
60+
store: Union[Store, None] = None,
5961
):
6062
self.__ws = ws
6163
self.__read_timeout = read_timeout
6264
self.__results_format = results_format
6365
self.__data_compression = data_compression
6466
self.__geometry_representation = geometry_representation
67+
self.__store = store
6568

6669
self.__queries: dict[str, Query] = {}
6770
self.__thread = threading.Thread(
@@ -134,6 +137,9 @@ def __listen(self) -> None:
134137
# On a state_updated event telling us the query succeeded,
135138
# ask for results.
136139
if kind == EventKind.STATE_UPDATED:
140+
logging.info(
141+
"Query %s succeeded; full message is %s", execution_id, message
142+
)
137143
self.__request_results(execution_id)
138144
return
139145

@@ -209,6 +215,17 @@ def __execute_sql(self, sql: str, handler: Callable[[Any], None]) -> str:
209215
"statement": sql,
210216
}
211217

218+
if self.__store:
219+
request["store"] = {}
220+
if self.__store.format:
221+
request["store"]["format"] = self.__store.format.value
222+
if self.__store.single:
223+
request["store"]["single"] = str(self.__store.single)
224+
if self.__store.generate_presigned_url:
225+
request["store"]["generate_presigned_url"] = str(
226+
self.__store.generate_presigned_url
227+
)
228+
212229
self.__queries[execution_id] = Query(
213230
sql=sql,
214231
execution_id=execution_id,

wherobots/db/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from .region import Region
66
from .runtime import Runtime
77
from .session_type import SessionType
8+
from .result_storage import StorageFormat
89

910

1011
DEFAULT_ENDPOINT: str = "api.cloud.wherobots.com" # "api.cloud.wherobots.com"
@@ -14,6 +15,7 @@
1415
DEFAULT_REGION: Region = Region.AWS_US_WEST_2
1516
DEFAULT_VERSION: str = "latest"
1617
DEFAULT_SESSION_TYPE: SessionType = SessionType.MULTI
18+
DEFAULT_STORAGE_FORMAT: StorageFormat = StorageFormat.PARQUET
1719
DEFAULT_READ_TIMEOUT_SECONDS: float = 0.25
1820
DEFAULT_SESSION_WAIT_TIMEOUT_SECONDS: float = 900
1921

wherobots/db/driver.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
)
4141
from .region import Region
4242
from .runtime import Runtime
43+
from .result_storage import Store
4344

4445
apilevel = "2.0"
4546
threadsafety = 1
@@ -72,6 +73,7 @@ def connect(
7273
results_format: Union[ResultsFormat, None] = None,
7374
data_compression: Union[DataCompression, None] = None,
7475
geometry_representation: Union[GeometryRepresentation, None] = None,
76+
store: Union[Store, None] = None,
7577
) -> Connection:
7678
if not token and not api_key:
7779
raise ValueError("At least one of `token` or `api_key` is required")
@@ -157,6 +159,7 @@ def get_session_uri() -> str:
157159
results_format=results_format,
158160
data_compression=data_compression,
159161
geometry_representation=geometry_representation,
162+
store=store,
160163
)
161164

162165

@@ -177,6 +180,7 @@ def connect_direct(
177180
results_format: Union[ResultsFormat, None] = None,
178181
data_compression: Union[DataCompression, None] = None,
179182
geometry_representation: Union[GeometryRepresentation, None] = None,
183+
store: Union[Store, None] = None,
180184
) -> Connection:
181185
uri_with_protocol = f"{uri}/{protocol}"
182186

@@ -199,4 +203,5 @@ def connect_direct(
199203
results_format=results_format,
200204
data_compression=data_compression,
201205
geometry_representation=geometry_representation,
206+
store=store,
202207
)

wherobots/db/result_storage.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from enum import auto
2+
from strenum import LowercaseStrEnum
3+
from typing import Union
4+
5+
6+
class StorageFormat(LowercaseStrEnum):
7+
PARQUET = auto()
8+
CSV = auto()
9+
GEOJSON = auto()
10+
11+
12+
class Store:
13+
def __init__(
14+
self,
15+
format: Union[StorageFormat, None] = None,
16+
single: bool = False,
17+
generate_presigned_url: bool = False,
18+
):
19+
self.format = format
20+
self.single = single
21+
self.generate_presigned_url = generate_presigned_url
22+
assert (
23+
single or not generate_presigned_url
24+
), "Presigned URL can only be generated when single part file is requested."
25+
26+
def __repr__(self):
27+
return f"Store(format={self.format}, single={self.single}, generate_presigned_url={self.generate_presigned_url})"
28+
29+
def __str__(self):
30+
return f"Store(format={self.format}, single={self.single}, generate_presigned_url={self.generate_presigned_url})"

0 commit comments

Comments
 (0)