Skip to content

Commit a8f817d

Browse files
KontinuationHyukjinKwon
authored andcommitted
[SPARK-54745][PYTHON] Fix PySpark import error caused by missing UnixStreamServer on Windows
### What changes were proposed in this pull request? This PR fixes an error caused by `socketserver.UnixStreamServer` not being available on Windows. We define a fallback `AccumulatorUnixServer` to raise an exception on construction and inform the user to disable `spark.python.unix.domain.socket.enabled`. ### Why are the changes needed? `import pyspark` fails with the following message on Windows since PySpark 4.1.0: ``` sedona\spark\__init__.py:19: in <module> import pyspark .venv\Lib\site-packages\pyspark\__init__.py:71: in <module> from pyspark.accumulators import Accumulator, AccumulatorParam .venv\Lib\site-packages\pyspark\accumulators.py:324: in <module> class AccumulatorUnixServer(socketserver.UnixStreamServer): ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ E AttributeError: module 'socketserver' has no attribute 'UnixStreamServer' ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually test this on Windows 11 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53546 from Kontinuation/fix-uds-windows-compat. Authored-by: Kristin Cowalcijk <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 532de64 commit a8f817d

File tree

1 file changed

+30
-15
lines changed

1 file changed

+30
-15
lines changed

python/pyspark/accumulators.py

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -345,21 +345,36 @@ def shutdown(self) -> None:
345345
self.server_close()
346346

347347

348-
class AccumulatorUnixServer(socketserver.UnixStreamServer):
349-
server_shutdown = False
350-
351-
def __init__(
352-
self, socket_path: str, RequestHandlerClass: Type[socketserver.BaseRequestHandler]
353-
):
354-
super().__init__(socket_path, RequestHandlerClass)
355-
self.auth_token = None
356-
357-
def shutdown(self) -> None:
358-
self.server_shutdown = True
359-
super().shutdown()
360-
self.server_close()
361-
if os.path.exists(self.server_address): # type: ignore[arg-type]
362-
os.remove(self.server_address) # type: ignore[arg-type]
348+
# socketserver.UnixStreamServer is not available on Windows yet
349+
# (https://github.com/python/cpython/issues/77589).
350+
if hasattr(socketserver, "UnixStreamServer"):
351+
352+
class AccumulatorUnixServer(socketserver.UnixStreamServer):
353+
server_shutdown = False
354+
355+
def __init__(
356+
self, socket_path: str, RequestHandlerClass: Type[socketserver.BaseRequestHandler]
357+
):
358+
super().__init__(socket_path, RequestHandlerClass)
359+
self.auth_token = None
360+
361+
def shutdown(self) -> None:
362+
self.server_shutdown = True
363+
super().shutdown()
364+
self.server_close()
365+
if os.path.exists(self.server_address): # type: ignore[arg-type]
366+
os.remove(self.server_address) # type: ignore[arg-type]
367+
368+
else:
369+
370+
class AccumulatorUnixServer(socketserver.TCPServer): # type: ignore[no-redef]
371+
def __init__(
372+
self, socket_path: str, RequestHandlerClass: Type[socketserver.BaseRequestHandler]
373+
):
374+
raise NotImplementedError(
375+
"Unix Domain Sockets are not supported on this platform. "
376+
"Please disable it by setting spark.python.unix.domain.socket.enabled to false."
377+
)
363378

364379

365380
def _start_update_server(

0 commit comments

Comments
 (0)