Skip to content

Commit b0c5642

Browse files
committed
[SPARK-49287][SPARK-49432][CONNECT][SQL] Move streaming classes to sql/api
### What changes were proposed in this pull request? This PR moves a number of streaming classes into sql/api. This is the initial step in create a shared interface for streaming operations. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #48017 from hvanhovell/SPARK-49287. Authored-by: Herman van Hovell <[email protected]> Signed-off-by: Herman van Hovell <[email protected]>
1 parent 14e4908 commit b0c5642

File tree

15 files changed

+339
-1200
lines changed

15 files changed

+339
-1200
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler
18+
19+
import com.fasterxml.jackson.annotation.JsonTypeInfo
20+
21+
import org.apache.spark.annotation.DeveloperApi
22+
23+
@DeveloperApi
24+
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
25+
trait SparkListenerEvent {
26+
/* Whether output this event to the event log */
27+
protected[spark] def logEvent: Boolean = true
28+
}

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala

Lines changed: 6 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -18,166 +18,21 @@
1818
package org.apache.spark.sql.streaming
1919

2020
import java.util.UUID
21-
import java.util.concurrent.TimeoutException
2221

2322
import scala.jdk.CollectionConverters._
2423

25-
import org.apache.spark.annotation.Evolving
2624
import org.apache.spark.connect.proto.Command
2725
import org.apache.spark.connect.proto.ExecutePlanResponse
2826
import org.apache.spark.connect.proto.StreamingQueryCommand
2927
import org.apache.spark.connect.proto.StreamingQueryCommandResult
3028
import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.StreamingQueryInstance
31-
import org.apache.spark.sql.SparkSession
29+
import org.apache.spark.sql.{api, Dataset, SparkSession}
3230

33-
/**
34-
* A handle to a query that is executing continuously in the background as new data arrives. All
35-
* these methods are thread-safe.
36-
* @since 3.5.0
37-
*/
38-
@Evolving
39-
trait StreamingQuery {
40-
// This is a copy of StreamingQuery in sql/core/.../streaming/StreamingQuery.scala
41-
42-
/**
43-
* Returns the user-specified name of the query, or null if not specified. This name can be
44-
* specified in the `org.apache.spark.sql.streaming.DataStreamWriter` as
45-
* `dataframe.writeStream.queryName("query").start()`. This name, if set, must be unique across
46-
* all active queries.
47-
*
48-
* @since 3.5.0
49-
*/
50-
def name: String
51-
52-
/**
53-
* Returns the unique id of this query that persists across restarts from checkpoint data. That
54-
* is, this id is generated when a query is started for the first time, and will be the same
55-
* every time it is restarted from checkpoint data. Also see [[runId]].
56-
*
57-
* @since 3.5.0
58-
*/
59-
def id: UUID
60-
61-
/**
62-
* Returns the unique id of this run of the query. That is, every start/restart of a query will
63-
* generate a unique runId. Therefore, every time a query is restarted from checkpoint, it will
64-
* have the same [[id]] but different [[runId]]s.
65-
*/
66-
def runId: UUID
67-
68-
/**
69-
* Returns the `SparkSession` associated with `this`.
70-
*
71-
* @since 3.5.0
72-
*/
73-
def sparkSession: SparkSession
74-
75-
/**
76-
* Returns `true` if this query is actively running.
77-
*
78-
* @since 3.5.0
79-
*/
80-
def isActive: Boolean
81-
82-
/**
83-
* Returns the [[StreamingQueryException]] if the query was terminated by an exception.
84-
* @since 3.5.0
85-
*/
86-
def exception: Option[StreamingQueryException]
87-
88-
/**
89-
* Returns the current status of the query.
90-
*
91-
* @since 3.5.0
92-
*/
93-
def status: StreamingQueryStatus
94-
95-
/**
96-
* Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. The
97-
* number of progress updates retained for each stream is configured by Spark session
98-
* configuration `spark.sql.streaming.numRecentProgressUpdates`.
99-
*
100-
* @since 3.5.0
101-
*/
102-
def recentProgress: Array[StreamingQueryProgress]
103-
104-
/**
105-
* Returns the most recent [[StreamingQueryProgress]] update of this streaming query.
106-
*
107-
* @since 3.5.0
108-
*/
109-
def lastProgress: StreamingQueryProgress
110-
111-
/**
112-
* Waits for the termination of `this` query, either by `query.stop()` or by an exception. If
113-
* the query has terminated with an exception, then the exception will be thrown.
114-
*
115-
* If the query has terminated, then all subsequent calls to this method will either return
116-
* immediately (if the query was terminated by `stop()`), or throw the exception immediately (if
117-
* the query has terminated with exception).
118-
*
119-
* @throws StreamingQueryException
120-
* if the query has terminated with an exception.
121-
* @since 3.5.0
122-
*/
123-
@throws[StreamingQueryException]
124-
def awaitTermination(): Unit
125-
126-
/**
127-
* Waits for the termination of `this` query, either by `query.stop()` or by an exception. If
128-
* the query has terminated with an exception, then the exception will be thrown. Otherwise, it
129-
* returns whether the query has terminated or not within the `timeoutMs` milliseconds.
130-
*
131-
* If the query has terminated, then all subsequent calls to this method will either return
132-
* `true` immediately (if the query was terminated by `stop()`), or throw the exception
133-
* immediately (if the query has terminated with exception).
134-
*
135-
* @throws StreamingQueryException
136-
* if the query has terminated with an exception
137-
* @since 3.5.0
138-
*/
139-
@throws[StreamingQueryException]
140-
def awaitTermination(timeoutMs: Long): Boolean
141-
142-
/**
143-
* Blocks until all available data in the source has been processed and committed to the sink.
144-
* This method is intended for testing. Note that in the case of continually arriving data, this
145-
* method may block forever. Additionally, this method is only guaranteed to block until data
146-
* that has been synchronously appended data to a
147-
* `org.apache.spark.sql.execution.streaming.Source` prior to invocation. (i.e. `getOffset` must
148-
* immediately reflect the addition).
149-
* @since 3.5.0
150-
*/
151-
def processAllAvailable(): Unit
152-
153-
/**
154-
* Stops the execution of this query if it is running. This waits until the termination of the
155-
* query execution threads or until a timeout is hit.
156-
*
157-
* By default stop will block indefinitely. You can configure a timeout by the configuration
158-
* `spark.sql.streaming.stopTimeout`. A timeout of 0 (or negative) milliseconds will block
159-
* indefinitely. If a `TimeoutException` is thrown, users can retry stopping the stream. If the
160-
* issue persists, it is advisable to kill the Spark application.
161-
*
162-
* @since 3.5.0
163-
*/
164-
@throws[TimeoutException]
165-
def stop(): Unit
166-
167-
/**
168-
* Prints the physical plan to the console for debugging purposes.
169-
* @since 3.5.0
170-
*/
171-
def explain(): Unit
172-
173-
/**
174-
* Prints the physical plan to the console for debugging purposes.
175-
*
176-
* @param extended
177-
* whether to do extended explain or not
178-
* @since 3.5.0
179-
*/
180-
def explain(extended: Boolean): Unit
31+
/** @inheritdoc */
32+
trait StreamingQuery extends api.StreamingQuery[Dataset] {
33+
34+
/** @inheritdoc */
35+
override def sparkSession: SparkSession
18136
}
18237

18338
class RemoteStreamingQuery(

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -145,21 +145,6 @@ object CheckConnectJvmClientCompatibility {
145145
checkMiMaCompatibility(clientJar, protobufJar, includedRules, excludeRules)
146146
}
147147

148-
private lazy val mergeIntoWriterExcludeRules: Seq[ProblemFilter] = {
149-
// Exclude some auto-generated methods in [[MergeIntoWriter]] classes.
150-
// The incompatible changes are due to the uses of [[proto.Expression]] instead
151-
// of [[catalyst.Expression]] in the method signature.
152-
val classNames = Seq("WhenMatched", "WhenNotMatched", "WhenNotMatchedBySource")
153-
val methodNames = Seq("apply", "condition", "copy", "copy$*", "unapply")
154-
155-
classNames.flatMap { className =>
156-
methodNames.map { methodName =>
157-
ProblemFilters.exclude[IncompatibleSignatureProblem](
158-
s"org.apache.spark.sql.$className.$methodName")
159-
}
160-
}
161-
}
162-
163148
private def checkMiMaCompatibilityWithSqlModule(
164149
clientJar: File,
165150
sqlJar: File): List[Problem] = {
@@ -269,12 +254,6 @@ object CheckConnectJvmClientCompatibility {
269254
"org.apache.spark.sql.streaming.TestGroupState"),
270255
ProblemFilters.exclude[MissingClassProblem](
271256
"org.apache.spark.sql.streaming.TestGroupState$"),
272-
ProblemFilters.exclude[MissingClassProblem](
273-
"org.apache.spark.sql.streaming.PythonStreamingQueryListener"),
274-
ProblemFilters.exclude[MissingClassProblem](
275-
"org.apache.spark.sql.streaming.PythonStreamingQueryListenerWrapper"),
276-
ProblemFilters.exclude[MissingTypesProblem](
277-
"org.apache.spark.sql.streaming.StreamingQueryListener$Event"),
278257

279258
// SQLImplicits
280259
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.rddToDatasetHolder"),
@@ -286,10 +265,6 @@ object CheckConnectJvmClientCompatibility {
286265
ProblemFilters.exclude[MissingClassProblem](
287266
"org.apache.spark.sql.artifact.ArtifactManager$"),
288267

289-
// UDFRegistration
290-
ProblemFilters.exclude[DirectMissingMethodProblem](
291-
"org.apache.spark.sql.UDFRegistration.register"),
292-
293268
// ColumnNode conversions
294269
ProblemFilters.exclude[DirectMissingMethodProblem](
295270
"org.apache.spark.sql.SparkSession.Converter"),
@@ -304,6 +279,8 @@ object CheckConnectJvmClientCompatibility {
304279
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.scalalang.*"),
305280

306281
// UDFRegistration
282+
ProblemFilters.exclude[DirectMissingMethodProblem](
283+
"org.apache.spark.sql.UDFRegistration.register"),
307284
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.UDFRegistration"),
308285
ProblemFilters.exclude[DirectMissingMethodProblem](
309286
"org.apache.spark.sql.UDFRegistration.log*"),
@@ -326,14 +303,7 @@ object CheckConnectJvmClientCompatibility {
326303
ProblemFilters.exclude[DirectMissingMethodProblem](
327304
"org.apache.spark.sql.DataFrameReader.validateJsonSchema"),
328305
ProblemFilters.exclude[DirectMissingMethodProblem](
329-
"org.apache.spark.sql.DataFrameReader.validateXmlSchema"),
330-
331-
// Datasource V2 partition transforms
332-
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.PartitionTransform"),
333-
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.PartitionTransform$"),
334-
ProblemFilters.exclude[MissingClassProblem](
335-
"org.apache.spark.sql.PartitionTransform$ExtractTransform")) ++
336-
mergeIntoWriterExcludeRules
306+
"org.apache.spark.sql.DataFrameReader.validateXmlSchema"))
337307

338308
checkMiMaCompatibility(clientJar, sqlJar, includedRules, excludeRules)
339309
}
@@ -443,8 +413,7 @@ object CheckConnectJvmClientCompatibility {
443413

444414
// Encoders are in the wrong JAR
445415
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Encoders"),
446-
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Encoders$")) ++
447-
mergeIntoWriterExcludeRules
416+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Encoders$"))
448417

449418
checkMiMaCompatibility(sqlJar, clientJar, includedRules, excludeRules)
450419
}

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,13 @@ import javax.annotation.Nullable
2222

2323
import scala.collection.Map
2424

25-
import com.fasterxml.jackson.annotation.JsonTypeInfo
26-
2725
import org.apache.spark.TaskEndReason
2826
import org.apache.spark.annotation.{DeveloperApi, Since}
2927
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
3028
import org.apache.spark.resource.ResourceProfile
3129
import org.apache.spark.scheduler.cluster.ExecutorInfo
3230
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
3331

34-
@DeveloperApi
35-
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
36-
trait SparkListenerEvent {
37-
/* Whether output this event to the event log */
38-
protected[spark] def logEvent: Boolean = true
39-
}
40-
4132
@DeveloperApi
4233
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
4334
extends SparkListenerEvent

project/MimaExcludes.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,22 @@ object MimaExcludes {
159159
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CreateTableWriter"),
160160
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrameWriterV2"),
161161
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WriteConfigMethods"),
162+
163+
// SPARK-49287: Shared Streaming interfaces
164+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.SparkListenerEvent"),
165+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.ForeachWriter"),
166+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.SourceProgress"),
167+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.SourceProgress$"),
168+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StateOperatorProgress"),
169+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StateOperatorProgress$"),
170+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener"),
171+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$"),
172+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$Event"),
173+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryIdleEvent"),
174+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent"),
175+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryStartedEvent"),
176+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent"),
177+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryStatus"),
162178
)
163179

164180
// Default exclude rules
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ package org.apache.spark.sql
8989
* });
9090
* }}}
9191
*
92-
* @since 3.5.0
92+
* @since 2.0.0
9393
*/
9494
abstract class ForeachWriter[T] extends Serializable {
9595

0 commit comments

Comments
 (0)