Skip to content

Commit 81c29ee

Browse files
jackywang-dbcloud-fan
authored andcommitted
[SPARK-52238][SDP] Rename Pipeline Spec Field "definitions" to 'libraries'
### What changes were proposed in this pull request? Rename the pipeline spec field from "definitions" to "libraries". This field allows user to include pipeline source code files. ```diff name: libraries-test - definitions: + libraries: - glob: include: transformations/**/*.py - glob: include: transformations/**/*.sql ``` ### Why are the changes needed? Open up the possibility to add other types of dependencies for a pipeline execution, such as python wheels. `libraries` is a more general term. ### Does this PR introduce _any_ user-facing change? Yes, but SDP not released. ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #52294 from JiaqiWang18/rename-spec-field-libraries. Authored-by: Jacky Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent d61a7b9 commit 81c29ee

File tree

3 files changed

+30
-30
lines changed

3 files changed

+30
-30
lines changed

python/pyspark/pipelines/cli.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@
5252

5353

5454
@dataclass(frozen=True)
55-
class DefinitionsGlob:
56-
"""A glob pattern for finding pipeline definitions files."""
55+
class LibrariesGlob:
56+
"""A glob pattern for finding pipeline source codes."""
5757

5858
include: str
5959

@@ -66,14 +66,14 @@ class PipelineSpec:
6666
:param catalog: The default catalog to use for the pipeline.
6767
:param database: The default database to use for the pipeline.
6868
:param configuration: A dictionary of Spark configuration properties to set for the pipeline.
69-
:param definitions: A list of glob patterns for finding pipeline definitions files.
69+
:param libraries: A list of glob patterns for finding pipeline source codes.
7070
"""
7171

7272
name: str
7373
catalog: Optional[str]
7474
database: Optional[str]
7575
configuration: Mapping[str, str]
76-
definitions: Sequence[DefinitionsGlob]
76+
libraries: Sequence[LibrariesGlob]
7777

7878

7979
def find_pipeline_spec(current_dir: Path) -> Path:
@@ -113,7 +113,7 @@ def load_pipeline_spec(spec_path: Path) -> PipelineSpec:
113113

114114

115115
def unpack_pipeline_spec(spec_data: Mapping[str, Any]) -> PipelineSpec:
116-
ALLOWED_FIELDS = {"name", "catalog", "database", "schema", "configuration", "definitions"}
116+
ALLOWED_FIELDS = {"name", "catalog", "database", "schema", "configuration", "libraries"}
117117
REQUIRED_FIELDS = ["name"]
118118
for key in spec_data.keys():
119119
if key not in ALLOWED_FIELDS:
@@ -133,9 +133,9 @@ def unpack_pipeline_spec(spec_data: Mapping[str, Any]) -> PipelineSpec:
133133
catalog=spec_data.get("catalog"),
134134
database=spec_data.get("database", spec_data.get("schema")),
135135
configuration=validate_str_dict(spec_data.get("configuration", {}), "configuration"),
136-
definitions=[
137-
DefinitionsGlob(include=entry["glob"]["include"])
138-
for entry in spec_data.get("definitions", [])
136+
libraries=[
137+
LibrariesGlob(include=entry["glob"]["include"])
138+
for entry in spec_data.get("libraries", [])
139139
],
140140
)
141141

@@ -178,8 +178,8 @@ def register_definitions(
178178
with change_dir(path):
179179
with graph_element_registration_context(registry):
180180
log_with_curr_timestamp(f"Loading definitions. Root directory: '{path}'.")
181-
for definition_glob in spec.definitions:
182-
glob_expression = definition_glob.include
181+
for libraries_glob in spec.libraries:
182+
glob_expression = libraries_glob.include
183183
matching_files = [p for p in path.glob(glob_expression) if p.is_file()]
184184
log_with_curr_timestamp(
185185
f"Found {len(matching_files)} files matching glob '{glob_expression}'"

python/pyspark/pipelines/init_cli.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
SPEC = """
2121
name: {{ name }}
22-
definitions:
22+
libraries:
2323
- glob:
2424
include: transformations/**/*.py
2525
- glob:

python/pyspark/pipelines/tests/test_cli.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
load_pipeline_spec,
3535
register_definitions,
3636
unpack_pipeline_spec,
37-
DefinitionsGlob,
37+
LibrariesGlob,
3838
PipelineSpec,
3939
run,
4040
)
@@ -58,7 +58,7 @@ def test_load_pipeline_spec(self):
5858
"key1": "value1",
5959
"key2": "value2"
6060
},
61-
"definitions": [
61+
"libraries": [
6262
{"glob": {"include": "test_include"}}
6363
]
6464
}
@@ -70,8 +70,8 @@ def test_load_pipeline_spec(self):
7070
assert spec.catalog == "test_catalog"
7171
assert spec.database == "test_database"
7272
assert spec.configuration == {"key1": "value1", "key2": "value2"}
73-
assert len(spec.definitions) == 1
74-
assert spec.definitions[0].include == "test_include"
73+
assert len(spec.libraries) == 1
74+
assert spec.libraries[0].include == "test_include"
7575

7676
def test_load_pipeline_spec_name_is_required(self):
7777
with tempfile.NamedTemporaryFile(mode="w") as tmpfile:
@@ -84,7 +84,7 @@ def test_load_pipeline_spec_name_is_required(self):
8484
"key1": "value1",
8585
"key2": "value2"
8686
},
87-
"definitions": [
87+
"libraries": [
8888
{"glob": {"include": "test_include"}}
8989
]
9090
}
@@ -110,7 +110,7 @@ def test_load_pipeline_spec_schema_fallback(self):
110110
"key1": "value1",
111111
"key2": "value2"
112112
},
113-
"definitions": [
113+
"libraries": [
114114
{"glob": {"include": "test_include"}}
115115
]
116116
}
@@ -121,8 +121,8 @@ def test_load_pipeline_spec_schema_fallback(self):
121121
assert spec.catalog == "test_catalog"
122122
assert spec.database == "test_database"
123123
assert spec.configuration == {"key1": "value1", "key2": "value2"}
124-
assert len(spec.definitions) == 1
125-
assert spec.definitions[0].include == "test_include"
124+
assert len(spec.libraries) == 1
125+
assert spec.libraries[0].include == "test_include"
126126

127127
def test_load_pipeline_spec_invalid(self):
128128
with tempfile.NamedTemporaryFile(mode="w") as tmpfile:
@@ -134,7 +134,7 @@ def test_load_pipeline_spec_invalid(self):
134134
"key1": "value1",
135135
"key2": "value2"
136136
},
137-
"definitions": [
137+
"libraries": [
138138
{"glob": {"include": "test_include"}}
139139
]
140140
}
@@ -150,7 +150,7 @@ def test_load_pipeline_spec_invalid(self):
150150

151151
def test_unpack_empty_pipeline_spec(self):
152152
empty_spec = PipelineSpec(
153-
name="test_pipeline", catalog=None, database=None, configuration={}, definitions=[]
153+
name="test_pipeline", catalog=None, database=None, configuration={}, libraries=[]
154154
)
155155
self.assertEqual(unpack_pipeline_spec({"name": "test_pipeline"}), empty_spec)
156156

@@ -176,7 +176,7 @@ def test_find_pipeline_spec_in_current_directory(self):
176176
{
177177
"catalog": "test_catalog",
178178
"configuration": {},
179-
"definitions": []
179+
"libraries": []
180180
}
181181
"""
182182
)
@@ -193,7 +193,7 @@ def test_find_pipeline_spec_in_current_directory_yml(self):
193193
{
194194
"catalog": "test_catalog",
195195
"configuration": {},
196-
"definitions": []
196+
"libraries": []
197197
}
198198
"""
199199
)
@@ -226,7 +226,7 @@ def test_find_pipeline_spec_in_parent_directory(self):
226226
{
227227
"catalog": "test_catalog",
228228
"configuration": {},
229-
"definitions": []
229+
"libraries": []
230230
}
231231
"""
232232
)
@@ -240,15 +240,15 @@ def test_register_definitions(self):
240240
catalog=None,
241241
database=None,
242242
configuration={},
243-
definitions=[DefinitionsGlob(include="subdir1/*")],
243+
libraries=[LibrariesGlob(include="subdir1/*")],
244244
)
245245
with tempfile.TemporaryDirectory() as temp_dir:
246246
outer_dir = Path(temp_dir)
247247
subdir1 = outer_dir / "subdir1"
248248
subdir1.mkdir()
249249
subdir2 = outer_dir / "subdir2"
250250
subdir2.mkdir()
251-
with (subdir1 / "definitions.py").open("w") as f:
251+
with (subdir1 / "libraries.py").open("w") as f:
252252
f.write(
253253
textwrap.dedent(
254254
"""
@@ -260,7 +260,7 @@ def mv1():
260260
)
261261
)
262262

263-
with (subdir2 / "definitions.py").open("w") as f:
263+
with (subdir2 / "libraries.py").open("w") as f:
264264
f.write(
265265
textwrap.dedent(
266266
"""
@@ -283,7 +283,7 @@ def test_register_definitions_file_raises_error(self):
283283
catalog=None,
284284
database=None,
285285
configuration={},
286-
definitions=[DefinitionsGlob(include="*")],
286+
libraries=[LibrariesGlob(include="*")],
287287
)
288288
with tempfile.TemporaryDirectory() as temp_dir:
289289
outer_dir = Path(temp_dir)
@@ -301,7 +301,7 @@ def test_register_definitions_unsupported_file_extension_matches_glob(self):
301301
catalog=None,
302302
database=None,
303303
configuration={},
304-
definitions=[DefinitionsGlob(include="*")],
304+
libraries=[LibrariesGlob(include="*")],
305305
)
306306
with tempfile.TemporaryDirectory() as temp_dir:
307307
outer_dir = Path(temp_dir)
@@ -355,7 +355,7 @@ def test_python_import_current_directory(self):
355355
catalog=None,
356356
database=None,
357357
configuration={},
358-
definitions=[DefinitionsGlob(include="defs.py")],
358+
libraries=[LibrariesGlob(include="defs.py")],
359359
),
360360
)
361361

0 commit comments

Comments
 (0)