diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 59f7856688ee..e557fe1cd8fb 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -321,21 +321,36 @@ def shutdown(self) -> None: self.server_close() -class AccumulatorUnixServer(socketserver.UnixStreamServer): - server_shutdown = False - - def __init__( - self, socket_path: str, RequestHandlerClass: Type[socketserver.BaseRequestHandler] - ): - super().__init__(socket_path, RequestHandlerClass) - self.auth_token = None - - def shutdown(self) -> None: - self.server_shutdown = True - super().shutdown() - self.server_close() - if os.path.exists(self.server_address): # type: ignore[arg-type] - os.remove(self.server_address) # type: ignore[arg-type] +# socketserver.UnixStreamServer is not available on Windows yet +# (https://github.com/python/cpython/issues/77589). +if hasattr(socketserver, "UnixStreamServer"): + + class AccumulatorUnixServer(socketserver.UnixStreamServer): + server_shutdown = False + + def __init__( + self, socket_path: str, RequestHandlerClass: Type[socketserver.BaseRequestHandler] + ): + super().__init__(socket_path, RequestHandlerClass) + self.auth_token = None + + def shutdown(self) -> None: + self.server_shutdown = True + super().shutdown() + self.server_close() + if os.path.exists(self.server_address): # type: ignore[arg-type] + os.remove(self.server_address) # type: ignore[arg-type] + +else: + + class AccumulatorUnixServer(socketserver.TCPServer): # type: ignore[no-redef] + def __init__( + self, socket_path: str, RequestHandlerClass: Type[socketserver.BaseRequestHandler] + ): + raise NotImplementedError( + "Unix Domain Sockets are not supported on this platform. " + "Please disable it by setting spark.python.unix.domain.socket.enabled to false." + ) def _start_update_server(