Skip to content

Commit

Permalink
[SPARK-42960][PYTHON][SS][TESTS] Factor Connect/non-Connect specific …
Browse files Browse the repository at this point in the history
…logics out

### What changes were proposed in this pull request?

This PR factor Connect/non-Connect specific logics out into dedicated test classes. This PR is a followup of #40785

### Why are the changes needed?

In order to avoid test failure such as #44698 (comment) by missing dependencies

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

CI in this PR should verify it.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44715 from HyukjinKwon/SPARK-42960-followup.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
HyukjinKwon committed Jan 13, 2024
1 parent 1c9b022 commit 57121c7
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@


class StreamingParityTests(StreamingTestsMixin, ReusedConnectTestCase):
pass
def _assert_exception_tree_contains_msg(self, exception, msg):
self.assertTrue(
msg in exception._message,
"Exception tree doesn't contain the expected message: %s" % msg,
)


if __name__ == "__main__":
Expand Down
29 changes: 7 additions & 22 deletions python/pyspark/sql/tests/streaming/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.testing.sqlutils import ReusedSQLTestCase
from pyspark.errors.exceptions.connect import SparkConnectException


class StreamingTestsMixin:
Expand Down Expand Up @@ -295,26 +294,6 @@ def test_stream_exception(self):
self.assertIsInstance(exception, StreamingQueryException)
self._assert_exception_tree_contains_msg(exception, "ZeroDivisionError")

def _assert_exception_tree_contains_msg(self, exception, msg):
if isinstance(exception, SparkConnectException):
self._assert_exception_tree_contains_msg_connect(exception, msg)
else:
self._assert_exception_tree_contains_msg_default(exception, msg)

def _assert_exception_tree_contains_msg_connect(self, exception, msg):
self.assertTrue(
msg in exception._message,
"Exception tree doesn't contain the expected message: %s" % msg,
)

def _assert_exception_tree_contains_msg_default(self, exception, msg):
e = exception
contains = msg in e._desc
while e._cause is not None and not contains:
e = e._cause
contains = msg in e._desc
self.assertTrue(contains, "Exception tree doesn't contain the expected message: %s" % msg)

def test_query_manager_get(self):
df = self.spark.readStream.format("rate").load()
for q in self.spark.streams.active:
Expand Down Expand Up @@ -408,7 +387,13 @@ def test_streaming_with_temporary_view(self):


class StreamingTests(StreamingTestsMixin, ReusedSQLTestCase):
pass
def _assert_exception_tree_contains_msg(self, exception, msg):
e = exception
contains = msg in e._desc
while e._cause is not None and not contains:
e = e._cause
contains = msg in e._desc
self.assertTrue(contains, "Exception tree doesn't contain the expected message: %s" % msg)


if __name__ == "__main__":
Expand Down

0 comments on commit 57121c7

Please sign in to comment.