Skip to content

[SPARK-51981][SS] Add JobTags to queryStartedEvent #50780

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

gjxdxh
Copy link

@gjxdxh gjxdxh commented May 1, 2025

What changes were proposed in this pull request?

Adding a new jobTags parameter for QueryStartedEvent so that it can be connected to the actual spark connect command that triggered this streaming. Also besides adding the parameter, a fix has been applied to the timestamp because previously json reads the wrong argument

Why are the changes needed?

Without this, there is no way to tell where does this streaming originate from.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit test is added

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the PYTHON label May 2, 2025
@anishshri-db
Copy link
Contributor

@gjxdxh - just put SS in the PR title - thx !

Comment on lines 195 to 199
jobTags = set()
javaIterator = jevent.jobTags().iterator()
while javaIterator.hasNext():
jobTags.add(javaIterator.next().toString())
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what is the best way to convert a Java set object to a python set. I tried something like set(jevent.jobTags().toArray()) but it didn't work, this iterator approach seems to be working

@anishshri-db
Copy link
Contributor

@HeartSaVioR - PTAL, thanks !

@gjxdxh gjxdxh changed the title [SPARK-51981][Strcutured Streaming]Add JobTags to queryStartedEvent [SPARK-51981][Strcutured Streaming][SS]Add JobTags to queryStartedEvent May 2, 2025
@gjxdxh gjxdxh force-pushed the lingkai-kong_data/SPARK-51981 branch from c3306ba to 14406b5 Compare May 5, 2025 21:01
@HyukjinKwon HyukjinKwon changed the title [SPARK-51981][Strcutured Streaming][SS]Add JobTags to queryStartedEvent [SPARK-51981][SS] Add JobTags to queryStartedEvent May 6, 2025
* @since 2.1.0
*/
@Evolving
class QueryStartedEvent private[sql] (
val id: UUID,
val runId: UUID,
val name: String,
val timestamp: String)
val timestamp: String,
val jobTags: Set[String] = Set())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't actually binary compatible. we should define another constructor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to we leave this constructor unchanged, and create a new constructor that has this new parameter? Do we have any examples on how to do it in a binary compatible way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have MiMa check for this class? If this is not binary compatible, it'd be ideal to let linter to fail.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the code to add another constructor to keep the original signature, let me know if that's the right approach. Also just to confirm, this would only be needed for scala code right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't compatible in Scala too ..

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind sharing what is the way to make it compatible then? Should I move jobTags out of the primary constructor, and have it as a var instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that adding a parameter with a default value isn't binary compatible .. it has to be with a new constructor. see https://github.com/databricks/scala-style-guide?tab=readme-ov-file#default-parameter-values

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't compatible with Scala too. Not only Java.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohhhh it's private[sql]. Okay seems fine. The constructor is private. My bad :-).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, I removed that default value anyways since it's not being used.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK to me except comments from @HyukjinKwon and the expected size of job tags.

runId,
name,
progressReporter.formatTimestamp(startTimestamp),
sparkSession.sparkContext.getJobTags()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many elements are we anticipating to have here? The size of event should be considerably small enough.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it should be quite small, we already have the job tags in other listener event already, for example here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explanation.

* @since 2.1.0
*/
@Evolving
class QueryStartedEvent private[sql] (
val id: UUID,
val runId: UUID,
val name: String,
val timestamp: String)
val timestamp: String,
val jobTags: Set[String] = Set())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have MiMa check for this class? If this is not binary compatible, it'd be ideal to let linter to fail.

@gjxdxh gjxdxh requested review from HyukjinKwon and HeartSaVioR May 7, 2025 18:25
@gjxdxh gjxdxh force-pushed the lingkai-kong_data/SPARK-51981 branch from 7dba6ae to fe439b8 Compare May 7, 2025 20:38
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Looks like @HyukjinKwon 's comments are addressed, but I'll defer to him for the final review.


@classmethod
def fromJObject(cls, jevent: "JavaObject") -> "QueryStartedEvent":
job_tags = set()
java_iterator = jevent.jobTags().iterator()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no biggie but you can call set(jobTags().toList()) which will be automatically a Python list. Having individual Py4J call is actually pretty expensive. But tags are supposed to be few so I don't mind it. leave it to you with my approval.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried, but it didn't works. See the comment I put above #50780 (comment). I don't know why though, this is the tests result I got when running locally

lingkai.kong@K9WHXLR93K spark % python/run-tests --testnames 'pyspark.sql.tests.streaming.test_streaming_listener StreamingListenerTests.test_listener_events'
Running PySpark tests. Output is in /Users/lingkai.kong/spark/python/unit-tests.log
Will test against the following Python executables: ['python3.9']
Will test the following Python tests: ['pyspark.sql.tests.streaming.test_streaming_listener StreamingListenerTests.test_listener_events']
python3.9 python_implementation is CPython
python3.9 version is: Python 3.9.6
Starting test(python3.9): pyspark.sql.tests.streaming.test_streaming_listener StreamingListenerTests.test_listener_events (temp output: /Users/lingkai.kong/spark/python/target/ed1f4b6e-6661-4815-84fd-00bf6cedd0ab/python3.9__pyspark.sql.tests.streaming.test_streaming_listener_StreamingListenerTests.test_listener_events__ck29eqqs.log)
WARNING: Using incubator modules: jdk.incubator.vector
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
test_listener_events (pyspark.sql.tests.streaming.test_streaming_listener.StreamingListenerTests) ... FAIL

======================================================================
FAIL: test_listener_events (pyspark.sql.tests.streaming.test_streaming_listener.StreamingListenerTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/lingkai.kong/spark/python/pyspark/sql/tests/streaming/test_streaming_listener.py", line 413, in test_listener_events
    verify(TestListenerV1())
  File "/Users/lingkai.kong/spark/python/pyspark/sql/tests/streaming/test_streaming_listener.py", line 396, in verify
    self.check_start_event(start_event)
  File "/Users/lingkai.kong/spark/python/pyspark/sql/tests/streaming/test_streaming_listener.py", line 40, in check_start_event
    self.assertTrue(isinstance(event, QueryStartedEvent))
AssertionError: False is not true

----------------------------------------------------------------------
Ran 1 test in 5.903s

FAILED (failures=1)

Had test failures in pyspark.sql.tests.streaming.test_streaming_listener StreamingListenerTests.test_listener_events with python3.9; see logs.

Could this be a issue with regarding to package version etc?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah okie that's fine

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sg, thanks! Can you help me merge this PR once the tests pass?

@HyukjinKwon
Copy link
Member

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants