Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51402][SQL][TESTS] Test TimeType in UDF #50194

Open
wants to merge 16 commits into
base: master
Choose a base branch
from

Conversation

calilisantos
Copy link

What changes were proposed in this pull request?

Write tests for TimeType in UDF as input parameters and results.

Why are the changes needed?

It follows https://issues.apache.org/jira/browse/SPARK-51402 to improve test coverage.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit test

@github-actions github-actions bot added the SQL label Mar 6, 2025
@@ -862,7 +862,7 @@ class UDFSuite extends QueryTest with SharedSparkSession {
.select(myUdf1(Column("col"))),
Row(ArrayBuffer(100)))

val myUdf2 = udf((a: immutable.ArraySeq[Int]) =>
val myUdf2 = udf((a: immutable.ArraySeq[Int]) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the changes necessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to follow the style guide indentation. Make sense?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let's leave it but, please, avoid unrelated changes in the future:

  • there are many places in code base where you could fix indentations. It is better to open a separate PR, and focus only on this.
  • the changes can cause merge conflicts in down stream branches

calilisantos and others added 11 commits March 6, 2025 20:07
…rk-client

### What changes were proposed in this pull request?

This PR proposes to change release script to publish `pyspark-client`.

### Why are the changes needed?

We should have the release available in PyPI.

### Does this PR introduce _any_ user-facing change?

Yes, it releases `pyspark-client` package into PyPI.

### How was this patch tested?

Did the basic test for individual commands. This is similar with apache#46049

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#50203 from HyukjinKwon/SPARK-51433.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
…s are mismatched

### What changes were proposed in this pull request?

This PR proposes to raise a proper error instead of `assert` when the returned Arrow schema is mismatched with specified SQL return type. Users can see this error, and this is not an internal error mssage.

### Why are the changes needed?

```python
import pandas as pd
from pyspark.sql.functions import pandas_udf

pandas_udf("int")
def func(s: pd.Series) -> pd.Series:
    return pd.Series([1.0] * len(s))

spark.range(1).select(func("id")).show()
```

Now it throws

```
org.apache.spark.SparkException: [ARROW_TYPE_MISMATCH] Invalid schema from pandas_udf(): expected IntegerType, got IntegerType. SQLSTATE: 42K0G
	at org.apache.spark.sql.errors.QueryExecutionErrors$.arrowDataTypeMismatchError(QueryExecutionErrors.scala:857)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonEvaluatorFactory.$anonfun$evaluate$2(ArrowEvalPythonExec.scala:131)
	at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:594)
```

### Does this PR introduce _any_ user-facing change?

Yes, it changes the user-facing error mesasge.

### How was this patch tested?

Manually tested as described above.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#50201 from HyukjinKwon/SPARK-51432.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
…e BarrierCoordinator

### What changes were proposed in this pull request?
There are couple of changes proposed as part of this PR
1. Change the non-deamon timer thread to a daemon thread so that the JVM exits on Spark Application End
2. Use `Futures.cancel(true)` api to cancel the `timerTasks`

### Why are the changes needed?
In Barrier Execution Mode, Spark driver JVM could hang around after calling spark.stop(). Although the Spark Context was shutdown, the JVM was still running.
The reason was that there is a non-daemon timer thread named `BarrierCoordinator barrier epoch increment timer`, which prevented the driver JVM from stopping.
In [SPARK-46895](https://issues.apache.org/jira/browse/SPARK-46895) the `Timer` class was changed to `ScheduledThreadPoolExecutor` but the corresponding methods such as `timer.cancel` were not changed which made the logic no-op as explained [here](apache#47956 (comment))

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
- Run the following scripts locally using `spark-submit`.
- Without this change, the JVM would hang there and not exit.
- With this change it would exit successfully.

**_Code samples to simulate the problem and the corresponding fix is attached below:-_**

[barrier_example.py](https://gist.github.com/jshmchenxi/5d7e7c61e1eedd03cd6d676699059e9b#file-barrier_example-py)
[xgboost-test.py](https://gist.github.com/bcheena/510230e19120eb9ae631dcafa804409f).

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#50020 from jjayadeep06/master.

Authored-by: jjayadeep06 <[email protected]>
Signed-off-by: beliefer <[email protected]>
…rossValidator

### What changes were proposed in this pull request?
Optimize CrossValidator by eliminating the JVM-Python data exchange

### Why are the changes needed?
the logic in CrossValidator's udf is very simple, no need to introduce such data exchange

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci and manually test

```
import time
import tempfile
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel

dataset = spark.createDataFrame(
    [(Vectors.dense([0.0]), 0.0),
     (Vectors.dense([0.4]), 1.0),
     (Vectors.dense([0.5]), 0.0),
     (Vectors.dense([0.6]), 1.0),
     (Vectors.dense([1.0]), 1.0)] * 1000,
    ["features", "label"])

dataset.persist()
dataset.count()
dataset.count()

lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [m for m in range(10)]).build()
evaluator = BinaryClassificationEvaluator()

cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, parallelism=1)

tic = time.time()
cvModel = cv.fit(dataset)
toc = time.time()

toc - tic
```

master:
8.992794752120972

this PR:
7.696600914001465

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#50184 from zhengruifeng/py_ml_cv_udf.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
…rsive CTE Subqueries

### What changes were proposed in this pull request?

Change the place where we check whether there is a recursive CTE within a subquery. Also, change implementation to be instead of collecting all subqueries into one array, we do an in-place traversal of everything to check.

### Why are the changes needed?

It's more efficient to do in-place traversal instead of collecting subqueries to an array and traverse, so this change is a small optimization.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Will be tested in [49955](apache#49955).

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#50208 from Pajaraja/pavle-martinovic_data/SmallOptimizeToCTERefIllegal.

Authored-by: pavle-martinovic_data <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@@ -862,7 +862,7 @@ class UDFSuite extends QueryTest with SharedSparkSession {
.select(myUdf1(Column("col"))),
Row(ArrayBuffer(100)))

val myUdf2 = udf((a: immutable.ArraySeq[Int]) =>
val myUdf2 = udf((a: immutable.ArraySeq[Int]) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let's leave it but, please, avoid unrelated changes in the future:

  • there are many places in code base where you could fix indentations. It is better to open a separate PR, and focus only on this.
  • the changes can cause merge conflicts in down stream branches

val input = Seq(java.time.LocalTime.parse(mockTimeStr)).toDF("currentTime")
// Regular case
val plusHour = udf((l: java.time.LocalTime) => l.plusHours(1))
val result = input.select(plusHour($"currentTime").cast(TimeType()).as("newTime"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the case is not needed since the type is TimeType() already.

Comment on lines +140 to +145
svn update "pyspark-client-$RELEASE_VERSION.tar.gz"
svn update "pyspark-client-$RELEASE_VERSION.tar.gz.asc"
TWINE_USERNAME=spark-upload TWINE_PASSWORD="$PYPI_PASSWORD" twine upload \
--repository-url https://upload.pypi.org/legacy/ \
"pyspark-client-$RELEASE_VERSION.tar.gz" \
"pyspark-client-$RELEASE_VERSION.tar.gz.asc"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, revert unrelated changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants