Skip to content

Commit

Permalink
[SPARK-46543][PYTHON][CONNECT] Make json_tuple throw PySparkValueEr…
Browse files Browse the repository at this point in the history
…ror for empty fields

### What changes were proposed in this pull request?
Make `json_tuple` throw PySparkValueError for empty fields

### Why are the changes needed?
Python side should have the same check as the Scala side:
https://github.com/apache/spark/blob/fa4096eb6aba4c66f0d9c5dcbabdfc0804064fff/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L6330-L6334

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #44534 from zhengruifeng/py_check_functions.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
  • Loading branch information
zhengruifeng committed Jan 2, 2024
1 parent e608211 commit f3e1623
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 0 deletions.
5 changes: 5 additions & 0 deletions python/pyspark/sql/connect/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1928,6 +1928,11 @@ def inline_outer(col: "ColumnOrName") -> Column:


def json_tuple(col: "ColumnOrName", *fields: str) -> Column:
if len(fields) == 0:
raise PySparkValueError(
error_class="CANNOT_BE_EMPTY",
message_parameters={"item": "field"},
)
return _invoke_function("json_tuple", _to_col(col), *[lit(field) for field in fields])


Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/sql/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14246,6 +14246,11 @@ def json_tuple(col: "ColumnOrName", *fields: str) -> Column:
>>> df.select(df.key, json_tuple(df.jstring, 'f1', 'f2')).collect()
[Row(key='1', c0='value1', c1='value2'), Row(key='2', c0='value12', c1=None)]
"""
if len(fields) == 0:
raise PySparkValueError(
error_class="CANNOT_BE_EMPTY",
message_parameters={"item": "field"},
)
sc = _get_active_spark_context()
return _invoke_function("json_tuple", _to_java_column(col), _to_seq(sc, fields))

Expand Down
14 changes: 14 additions & 0 deletions python/pyspark/sql/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1452,6 +1452,20 @@ def test_current_timestamp(self):
self.assertIsInstance(df.first()[0], datetime.datetime)
self.assertEqual(df.schema.names[0], "now()")

def test_json_tuple_empty_fields(self):
df = self.spark.createDataFrame(
[
("1", """{"f1": "value1", "f2": "value2"}"""),
("2", """{"f1": "value12"}"""),
],
("key", "jstring"),
)
self.assertRaisesRegex(
PySparkValueError,
"At least one field must be specified",
lambda: df.select(F.json_tuple(df.jstring)),
)


class FunctionsTests(ReusedSQLTestCase, FunctionsTestsMixin):
pass
Expand Down

0 comments on commit f3e1623

Please sign in to comment.