Skip to content

Commit d969aaf

Browse files
feat(clp-package): Add support for extracting JSON streams from archives. (#569)
Co-authored-by: kirkrodrigues <[email protected]>
1 parent ac4f1c1 commit d969aaf

File tree

20 files changed

+415
-198
lines changed

20 files changed

+415
-198
lines changed

components/clp-package-utils/clp_package_utils/general.py

+9-8
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
# CONSTANTS
3434
EXTRACT_FILE_CMD = "x"
3535
EXTRACT_IR_CMD = "i"
36+
EXTRACT_JSON_CMD = "j"
3637

3738
# Paths
3839
CONTAINER_CLP_HOME = pathlib.Path("/") / "opt" / "clp"
@@ -84,7 +85,7 @@ def __init__(self, clp_home: pathlib.Path, docker_clp_home: pathlib.Path):
8485
self.data_dir: typing.Optional[DockerMount] = None
8586
self.logs_dir: typing.Optional[DockerMount] = None
8687
self.archives_output_dir: typing.Optional[DockerMount] = None
87-
self.ir_output_dir: typing.Optional[DockerMount] = None
88+
self.stream_output_dir: typing.Optional[DockerMount] = None
8889

8990

9091
def get_clp_home():
@@ -251,17 +252,17 @@ def generate_container_config(
251252
container_clp_config.archive_output.directory,
252253
)
253254

254-
container_clp_config.ir_output.directory = pathlib.Path("/") / "mnt" / "ir-output"
255+
container_clp_config.stream_output.directory = pathlib.Path("/") / "mnt" / "stream-output"
255256
if not is_path_already_mounted(
256257
clp_home,
257258
CONTAINER_CLP_HOME,
258-
clp_config.ir_output.directory,
259-
container_clp_config.ir_output.directory,
259+
clp_config.stream_output.directory,
260+
container_clp_config.stream_output.directory,
260261
):
261-
docker_mounts.ir_output_dir = DockerMount(
262+
docker_mounts.stream_output_dir = DockerMount(
262263
DockerMountType.BIND,
263-
clp_config.ir_output.directory,
264-
container_clp_config.ir_output.directory,
264+
clp_config.stream_output.directory,
265+
container_clp_config.stream_output.directory,
265266
)
266267

267268
return container_clp_config, docker_mounts
@@ -482,7 +483,7 @@ def validate_results_cache_config(
482483
def validate_worker_config(clp_config: CLPConfig):
483484
clp_config.validate_input_logs_dir()
484485
clp_config.validate_archive_output_dir()
485-
clp_config.validate_ir_output_dir()
486+
clp_config.validate_stream_output_dir()
486487

487488

488489
def validate_webui_config(

components/clp-package-utils/clp_package_utils/scripts/decompress.py

+35-15
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
dump_container_config,
1515
EXTRACT_FILE_CMD,
1616
EXTRACT_IR_CMD,
17+
EXTRACT_JSON_CMD,
1718
generate_container_config,
1819
generate_container_name,
1920
generate_container_start_cmd,
@@ -146,11 +147,11 @@ def handle_extract_file_cmd(
146147
return 0
147148

148149

149-
def handle_extract_ir_cmd(
150+
def handle_extract_stream_cmd(
150151
parsed_args, clp_home: pathlib.Path, default_config_file_path: pathlib.Path
151152
) -> int:
152153
"""
153-
Handles the IR extraction command.
154+
Handles the stream extraction command.
154155
:param parsed_args:
155156
:param clp_home:
156157
:param default_config_file_path:
@@ -174,29 +175,41 @@ def handle_extract_ir_cmd(
174175
)
175176

176177
# fmt: off
178+
job_command = parsed_args.command
177179
extract_cmd = [
178180
"python3",
179181
"-m", "clp_package_utils.scripts.native.decompress",
180182
"--config", str(generated_config_path_on_container),
181-
EXTRACT_IR_CMD,
182-
str(parsed_args.msg_ix),
183+
job_command
183184
]
184185
# fmt: on
185-
if parsed_args.orig_file_id:
186-
extract_cmd.append("--orig-file-id")
187-
extract_cmd.append(str(parsed_args.orig_file_id))
186+
187+
if EXTRACT_IR_CMD == job_command:
188+
extract_cmd.append(str(parsed_args.msg_ix))
189+
if parsed_args.orig_file_id:
190+
extract_cmd.append("--orig-file-id")
191+
extract_cmd.append(str(parsed_args.orig_file_id))
192+
else:
193+
extract_cmd.append("--orig-file-path")
194+
extract_cmd.append(str(parsed_args.orig_file_path))
195+
if parsed_args.target_uncompressed_size:
196+
extract_cmd.append("--target-uncompressed-size")
197+
extract_cmd.append(str(parsed_args.target_uncompressed_size))
198+
elif EXTRACT_JSON_CMD == job_command:
199+
extract_cmd.append(str(parsed_args.archive_id))
200+
if parsed_args.target_chunk_size:
201+
extract_cmd.append("--target-chunk-size")
202+
extract_cmd.append(str(parsed_args.target_chunk_size))
188203
else:
189-
extract_cmd.append("--orig-file-path")
190-
extract_cmd.append(str(parsed_args.orig_file_path))
191-
if parsed_args.target_uncompressed_size:
192-
extract_cmd.append("--target-uncompressed-size")
193-
extract_cmd.append(str(parsed_args.target_uncompressed_size))
204+
logger.error(f"Unexpected command: {job_command}")
205+
return -1
206+
194207
cmd = container_start_cmd + extract_cmd
195208

196209
try:
197210
subprocess.run(cmd, check=True)
198211
except subprocess.CalledProcessError:
199-
logger.exception("Docker or IR extraction command failed.")
212+
logger.exception("Docker or stream extraction command failed.")
200213
return -1
201214

202215
# Remove generated files
@@ -241,13 +254,20 @@ def main(argv):
241254
group.add_argument("--orig-file-id", type=str, help="Original file's ID.")
242255
group.add_argument("--orig-file-path", type=str, help="Original file's path.")
243256

257+
# JSON extraction command parser
258+
json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD)
259+
json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID")
260+
json_extraction_parser.add_argument(
261+
"--target-chunk-size", type=int, help="Target chunk size", default=100000
262+
)
263+
244264
parsed_args = args_parser.parse_args(argv[1:])
245265

246266
command = parsed_args.command
247267
if EXTRACT_FILE_CMD == command:
248268
return handle_extract_file_cmd(parsed_args, clp_home, default_config_file_path)
249-
elif EXTRACT_IR_CMD == command:
250-
return handle_extract_ir_cmd(parsed_args, clp_home, default_config_file_path)
269+
elif command in (EXTRACT_IR_CMD, EXTRACT_JSON_CMD):
270+
return handle_extract_stream_cmd(parsed_args, clp_home, default_config_file_path)
251271
else:
252272
logger.exception(f"Unexpected command: {command}")
253273
return -1

components/clp-package-utils/clp_package_utils/scripts/native/decompress.py

+60-36
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,17 @@
1212
from clp_py_utils.clp_config import CLP_METADATA_TABLE_PREFIX, CLPConfig, Database
1313
from clp_py_utils.sql_adapter import SQL_Adapter
1414
from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType
15-
from job_orchestration.scheduler.job_config import ExtractIrJobConfig
15+
from job_orchestration.scheduler.job_config import (
16+
ExtractIrJobConfig,
17+
ExtractJsonJobConfig,
18+
QueryJobConfig,
19+
)
1620

1721
from clp_package_utils.general import (
1822
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
1923
EXTRACT_FILE_CMD,
2024
EXTRACT_IR_CMD,
25+
EXTRACT_JSON_CMD,
2126
get_clp_home,
2227
load_config_file,
2328
)
@@ -70,45 +75,37 @@ def get_orig_file_id(db_config: Database, path: str) -> Optional[str]:
7075
return results[0]["orig_file_id"]
7176

7277

73-
def submit_and_monitor_ir_extraction_job_in_db(
78+
def submit_and_monitor_extraction_job_in_db(
7479
db_config: Database,
75-
orig_file_id: str,
76-
msg_ix: int,
77-
target_uncompressed_size: Optional[int],
80+
job_type: QueryJobType,
81+
job_config: QueryJobConfig,
7882
) -> int:
7983
"""
80-
Submits an IR extraction job to the scheduler and waits until the job finishes.
84+
Submits a stream extraction job to the scheduler and waits until it finishes.
8185
:param db_config:
82-
:param orig_file_id:
83-
:param msg_ix:
84-
:param target_uncompressed_size:
86+
:param job_type:
87+
:param job_config:
8588
:return: 0 on success, -1 otherwise.
8689
"""
87-
extract_ir_config = ExtractIrJobConfig(
88-
orig_file_id=orig_file_id,
89-
msg_ix=msg_ix,
90-
target_uncompressed_size=target_uncompressed_size,
91-
)
92-
9390
sql_adapter = SQL_Adapter(db_config)
94-
job_id = submit_query_job(sql_adapter, extract_ir_config, QueryJobType.EXTRACT_IR)
91+
job_id = submit_query_job(sql_adapter, job_config, job_type)
9592
job_status = wait_for_query_job(sql_adapter, job_id)
9693

9794
if QueryJobStatus.SUCCEEDED == job_status:
98-
logger.info(f"Finished IR extraction job {job_id}.")
95+
logger.info(f"Finished extraction job {job_id}.")
9996
return 0
10097

101-
logger.error(
102-
f"IR extraction job {job_id} finished with unexpected status: {job_status.to_str()}."
103-
)
98+
logger.error(f"Extraction job {job_id} finished with unexpected status: {job_status.to_str()}.")
10499
return -1
105100

106101

107-
def handle_extract_ir_cmd(
108-
parsed_args: argparse.Namespace, clp_home: pathlib.Path, default_config_file_path: pathlib.Path
102+
def handle_extract_stream_cmd(
103+
parsed_args: argparse.Namespace,
104+
clp_home: pathlib.Path,
105+
default_config_file_path: pathlib.Path,
109106
) -> int:
110107
"""
111-
Handles the IR extraction command.
108+
Handles the stream extraction command.
112109
:param parsed_args:
113110
:param clp_home:
114111
:param default_config_file_path:
@@ -121,26 +118,46 @@ def handle_extract_ir_cmd(
121118
if clp_config is None:
122119
return -1
123120

124-
orig_file_id: str
125-
if parsed_args.orig_file_id:
126-
orig_file_id = parsed_args.orig_file_id
121+
command = parsed_args.command
122+
123+
job_config: QueryJobConfig
124+
job_type: QueryJobType
125+
if EXTRACT_IR_CMD == command:
126+
job_type = QueryJobType.EXTRACT_IR
127+
orig_file_id: str
128+
if parsed_args.orig_file_id:
129+
orig_file_id = parsed_args.orig_file_id
130+
else:
131+
orig_file_path = parsed_args.orig_file_path
132+
orig_file_id = get_orig_file_id(clp_config.database, orig_file_path)
133+
if orig_file_id is None:
134+
logger.error(f"Cannot find orig_file_id corresponding to '{orig_file_path}'.")
135+
return -1
136+
job_config = ExtractIrJobConfig(
137+
orig_file_id=orig_file_id,
138+
msg_ix=parsed_args.msg_ix,
139+
target_uncompressed_size=parsed_args.target_uncompressed_size,
140+
)
141+
elif EXTRACT_JSON_CMD == command:
142+
job_type = QueryJobType.EXTRACT_JSON
143+
job_config = ExtractJsonJobConfig(
144+
archive_id=parsed_args.archive_id, target_chunk_size=parsed_args.target_chunk_size
145+
)
127146
else:
128-
orig_file_id = get_orig_file_id(clp_config.database, parsed_args.orig_file_path)
129-
if orig_file_id is None:
130-
return -1
147+
logger.error(f"Unsupported stream extraction command: {command}")
148+
return -1
131149

132150
try:
133151
return asyncio.run(
134152
run_function_in_process(
135-
submit_and_monitor_ir_extraction_job_in_db,
153+
submit_and_monitor_extraction_job_in_db,
136154
clp_config.database,
137-
orig_file_id,
138-
parsed_args.msg_ix,
139-
parsed_args.target_uncompressed_size,
155+
job_type,
156+
job_config,
140157
)
141158
)
142159
except asyncio.CancelledError:
143-
logger.error("IR extraction cancelled.")
160+
logger.error("Stream extraction cancelled.")
144161
return -1
145162

146163

@@ -278,13 +295,20 @@ def main(argv):
278295
group.add_argument("--orig-file-id", type=str, help="Original file's ID.")
279296
group.add_argument("--orig-file-path", type=str, help="Original file's path.")
280297

298+
# JSON extraction command parser
299+
json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD)
300+
json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID")
301+
json_extraction_parser.add_argument(
302+
"--target-chunk-size", type=int, help="Target chunk size.", required=True
303+
)
304+
281305
parsed_args = args_parser.parse_args(argv[1:])
282306

283307
command = parsed_args.command
284308
if EXTRACT_FILE_CMD == command:
285309
return handle_extract_file_cmd(parsed_args, clp_home, default_config_file_path)
286-
elif EXTRACT_IR_CMD == command:
287-
return handle_extract_ir_cmd(parsed_args, clp_home, default_config_file_path)
310+
elif command in (EXTRACT_IR_CMD, EXTRACT_JSON_CMD):
311+
return handle_extract_stream_cmd(parsed_args, clp_home, default_config_file_path)
288312
else:
289313
logger.exception(f"Unexpected command: {command}")
290314
return -1

components/clp-package-utils/clp_package_utils/scripts/start_clp.py

+8-8
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ def create_results_cache_indices(
286286
"python3",
287287
str(clp_py_utils_dir / "create-results-cache-indices.py"),
288288
"--uri", container_clp_config.results_cache.get_uri(),
289-
"--ir-collection", container_clp_config.results_cache.ir_collection_name,
289+
"--stream-collection", container_clp_config.results_cache.stream_collection_name,
290290
]
291291
# fmt: on
292292

@@ -660,10 +660,10 @@ def start_query_worker(
660660
celery_method = "job_orchestration.executor.query"
661661
celery_route = f"{QueueName.QUERY}"
662662

663-
query_worker_mount = [mounts.ir_output_dir]
663+
query_worker_mount = [mounts.stream_output_dir]
664664
query_worker_env = {
665-
"CLP_IR_OUTPUT_DIR": container_clp_config.ir_output.directory,
666-
"CLP_IR_COLLECTION": clp_config.results_cache.ir_collection_name,
665+
"CLP_STREAM_OUTPUT_DIR": container_clp_config.stream_output.directory,
666+
"CLP_STREAM_COLLECTION_NAME": clp_config.results_cache.stream_collection_name,
667667
}
668668

669669
generic_start_worker(
@@ -710,7 +710,7 @@ def generic_start_worker(
710710

711711
# Create necessary directories
712712
clp_config.archive_output.directory.mkdir(parents=True, exist_ok=True)
713-
clp_config.ir_output.directory.mkdir(parents=True, exist_ok=True)
713+
clp_config.stream_output.directory.mkdir(parents=True, exist_ok=True)
714714

715715
clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages"
716716
# fmt: off
@@ -933,9 +933,9 @@ def start_log_viewer_webui(
933933
"MongoDbHost": clp_config.results_cache.host,
934934
"MongoDbPort": clp_config.results_cache.port,
935935
"MongoDbName": clp_config.results_cache.db_name,
936-
"MongoDbIrFilesCollectionName": clp_config.results_cache.ir_collection_name,
936+
"MongoDbStreamFilesCollectionName": clp_config.results_cache.stream_collection_name,
937937
"ClientDir": str(container_log_viewer_webui_dir / "client"),
938-
"IrFilesDir": str(container_clp_config.ir_output.directory),
938+
"StreamFilesDir": str(container_clp_config.stream_output.directory),
939939
"LogViewerDir": str(container_log_viewer_webui_dir / "yscope-log-viewer"),
940940
}
941941
settings_json = read_and_update_settings_json(settings_json_path, settings_json_updates)
@@ -961,7 +961,7 @@ def start_log_viewer_webui(
961961
# fmt: on
962962
necessary_mounts = [
963963
mounts.clp_home,
964-
mounts.ir_output_dir,
964+
mounts.stream_output_dir,
965965
]
966966
for mount in necessary_mounts:
967967
if mount:

0 commit comments

Comments
 (0)