From dc94650ce26df16248f8438d637af6d245c02cf1 Mon Sep 17 00:00:00 2001 From: PApostol Date: Wed, 24 Nov 2021 17:58:21 +0000 Subject: [PATCH 1/9] Use consistent path separators where possible --- CHANGELOG.md | 3 +++ spark_submit/sparkjob.py | 6 +++--- spark_submit/system.py | 6 +++--- tests/run_test.py | 8 ++++---- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e563a3..42ac8fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ### Spark-submit +##### 1.1.1 (TBD) +- Use consistent path separators where possible + ##### 1.1.0 (2021-11-13) - Use of `concluded` attribute to keep track of submitted job - Addition of `get_submit_cmd()` method diff --git a/spark_submit/sparkjob.py b/spark_submit/sparkjob.py index 3da02d7..dbcad77 100644 --- a/spark_submit/sparkjob.py +++ b/spark_submit/sparkjob.py @@ -63,11 +63,11 @@ def __init__(self, main_file: str, **spark_args: Any) -> None: self.spark_args = {**__defaults__, **spark_args} if main_file.startswith('s3') or main_file.startswith('local:') or os.path.isfile(os.path.expanduser(main_file)): - self.spark_args['main_file'] = main_file + self.spark_args['main_file'] = main_file.replace(os.path.sep, '/') else: raise FileNotFoundError(f'File {main_file} does not exist.') - spark_home = self.spark_args['spark_home'] + spark_home = self.spark_args['spark_home'].replace(os.path.sep, '/') self.spark_bin = spark_home + '/bin/spark-submit' if not os.path.isfile(self.spark_bin): raise FileNotFoundError(f'bin/spark-submit was not found in "{spark_home}". Please add SPARK_HOME to path or provide it as an argument: spark_home') @@ -178,7 +178,7 @@ def submit(self, await_result: int=0, use_env_vars: bool=False) -> None: threading.Thread(name=self.spark_args['name'], target=self._await_result, args=(await_result, )).start() - def get_submit_cmd(self, multiline=False) -> str: + def get_submit_cmd(self, multiline: bool=False) -> str: """Gets the associated spark-submit command Parameters diff --git a/spark_submit/system.py b/spark_submit/system.py index 909352d..1af643a 100644 --- a/spark_submit/system.py +++ b/spark_submit/system.py @@ -16,7 +16,7 @@ def _get_env_vars() -> dict: 'PYSPARK_PYTHON': os.environ.get('PYSPARK_PYTHON', sys.executable), 'PYSPARK_DRIVER_PYTHON': os.environ.get('PYSPARK_DRIVER_PYTHON', sys.executable) } - return {env_var: _quote_spaces(val) for env_var, val in env_vars.items()} + return {env_var: _quote_spaces(val).replace(os.path.sep, '/') for env_var, val in env_vars.items()} def _execute_cmd(cmd: str, silent: bool=True) -> Tuple[str, int]: @@ -37,10 +37,10 @@ def system_info() -> str: Returns: str: system information """ - spark_bin = os.environ.get('SPARK_HOME', os.path.expanduser('~/spark_home')) + '/bin/spark-submit' + spark_bin = os.environ.get('SPARK_HOME', os.path.expanduser('~/spark_home')).replace(os.path.sep, '/') + '/bin/spark-submit' info_cmd = _quote_spaces(spark_bin) + ' --version' - JAVA_HOME = os.environ.get('JAVA_HOME', '') + JAVA_HOME = os.environ.get('JAVA_HOME', '').replace(os.path.sep, '/') if JAVA_HOME: java_bin = JAVA_HOME + '/bin/java' info_cmd += f' ; {_quote_spaces(java_bin)} -version' diff --git a/tests/run_test.py b/tests/run_test.py index 55b9f3a..469bb38 100644 --- a/tests/run_test.py +++ b/tests/run_test.py @@ -3,10 +3,10 @@ print('---Running example on local mode---') job = SparkJob('tests/resources/pyspark_example.py', main_file_args ='100000') -print('spark-submit command:\n' + job.get_submit_cmd(multiline=True)) +print(f'spark-submit command:\n {job.get_submit_cmd(multiline=True)}') print('\nJob running...\n') job.submit() -print('Job output:\n{0}\n'.format(job.get_output())) -print('Job state: {0}\n'.format(job.get_state())) -print('Job return code: {0}\n'.format(job.get_code())) +print(f'Job output:\n{job.get_output()}\n') +print(f'Job state: {job.get_state()}\n') +print(f'Job return code: {job.get_code()}\n') From b5e2dc440cbfb3fcfe594234aaaefb4949eb6491 Mon Sep 17 00:00:00 2001 From: PApostol Date: Tue, 7 Dec 2021 16:16:04 +0000 Subject: [PATCH 2/9] Add dedicated file for defaults and end-states --- spark_submit/_defaults.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 spark_submit/_defaults.py diff --git a/spark_submit/_defaults.py b/spark_submit/_defaults.py new file mode 100644 index 0000000..fe55b6e --- /dev/null +++ b/spark_submit/_defaults.py @@ -0,0 +1,37 @@ +import os + +# some basic default spark-submit arguments +__defaults__ = { + 'spark_home': os.environ.get('SPARK_HOME', os.path.expanduser('~/spark_home')), + 'master': 'local[*]', + 'name': 'spark-submit-task', + 'deploy_mode': 'client', + 'driver_memory': '1g', + 'executor_memory': '1g', + 'executor_cores': '1', + 'total_executor_cores': '2', + 'py_files': None, + 'files': None, + 'class': None, + 'jars': None, + 'packages': None, + 'exclude_packages': None, + 'repositories': None, + 'verbose': False, + 'supervise': False, + 'properties_file': None, + 'conf': [], + 'main_file_args': '' +} + +# Possible Spark driver states: +# SUBMITTED: Submitted but not yet scheduled on a worker +# RUNNING: Has been allocated to a worker to run +# FINISHED: Previously ran and exited cleanly +# RELAUNCHING: Exited non-zero or due to worker failure, but has not yet started running again +# UNKNOWN: The state of the driver is temporarily not known due to master failure recovery +# KILLED: A user manually killed this driver +# FAILED: The driver exited non-zero and was not supervised +# ERROR: Unable to run or restart due to an unrecoverable error (e.g. missing jar file) + +__end_states__ = {'FINISHED', 'UNKNOWN', 'KILLED', 'FAILED', 'ERROR'} # states that conclude a job From 5fefe5e65986bddab0283fb3df82440802e8bab5 Mon Sep 17 00:00:00 2001 From: PApostol Date: Tue, 7 Dec 2021 16:17:42 +0000 Subject: [PATCH 3/9] Add timeout functionality --- spark_submit/system.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spark_submit/system.py b/spark_submit/system.py index 1af643a..1687590 100644 --- a/spark_submit/system.py +++ b/spark_submit/system.py @@ -4,14 +4,14 @@ import re import subprocess import sys -from typing import Tuple +from typing import Dict, Optional, Tuple def _quote_spaces(val: str) -> str: return f'"{val}"' if ' ' in val else val -def _get_env_vars() -> dict: +def _get_env_vars() -> Dict[str, str]: env_vars = {'JAVA_HOME': os.environ.get('JAVA_HOME', ''), 'PYSPARK_PYTHON': os.environ.get('PYSPARK_PYTHON', sys.executable), 'PYSPARK_DRIVER_PYTHON': os.environ.get('PYSPARK_DRIVER_PYTHON', sys.executable) @@ -19,9 +19,9 @@ def _get_env_vars() -> dict: return {env_var: _quote_spaces(val).replace(os.path.sep, '/') for env_var, val in env_vars.items()} -def _execute_cmd(cmd: str, silent: bool=True) -> Tuple[str, int]: +def _execute_cmd(cmd: str, silent: bool = True, timeout: Optional[int] = None) -> Optional[Tuple[str, int]]: p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) - o, _ = p.communicate() + o, _ = p.communicate(timeout=timeout) o = o.decode() code = p.returncode From 66d3ab8672cb095b84c24323c9b8c0a8aaef1a1b Mon Sep 17 00:00:00 2001 From: PApostol Date: Wed, 8 Dec 2021 20:56:35 +0000 Subject: [PATCH 4/9] Improve yarn and k8s cluster support --- spark_submit/sparkjob.py | 107 ++++++++++++++++++--------------------- 1 file changed, 50 insertions(+), 57 deletions(-) diff --git a/spark_submit/sparkjob.py b/spark_submit/sparkjob.py index dbcad77..b96a19a 100644 --- a/spark_submit/sparkjob.py +++ b/spark_submit/sparkjob.py @@ -1,6 +1,7 @@ from .system import _get_env_vars, _execute_cmd from .exceptions import SparkSubmitError, SparkJobKillError -from typing import Any +from ._defaults import __defaults__, __end_states__ +from typing import Any, List, Optional, Tuple import logging import os import platform @@ -9,39 +10,6 @@ import threading import time -# some basic default spark-submit arguments -__defaults__ = { - 'spark_home': os.environ.get('SPARK_HOME', os.path.expanduser('~/spark_home')), - 'master': 'local[*]', - 'name': 'spark-submit-task', - 'class': None, - 'py_files': None, - 'files': None, - 'deploy_mode': 'client', - 'driver_memory': '1g', - 'executor_memory': '1g', - 'executor_cores': '1', - 'total_executor_cores': '2', - 'jars': None, - 'verbose': False, - 'supervise': False, - 'properties_file': None, - 'conf': [], - 'main_file_args': '' -} - -# Possible Spark driver states: -# SUBMITTED: Submitted but not yet scheduled on a worker -# RUNNING: Has been allocated to a worker to run -# FINISHED: Previously ran and exited cleanly -# RELAUNCHING: Exited non-zero or due to worker failure, but has not yet started running again -# UNKNOWN: The state of the driver is temporarily not known due to master failure recovery -# KILLED: A user manually killed this driver -# FAILED: The driver exited non-zero and was not supervised -# ERROR: Unable to run or restart due to an unrecoverable error (e.g. missing jar file) - -end_states = {'FINISHED', 'UNKNOWN', 'KILLED', 'FAILED', 'ERROR'} # states that conclude a job - class SparkJob: """SparkJob class encapsulates the basics needed to submit jobs to Spark master based on user input and monitor the outcome @@ -76,13 +44,16 @@ def __init__(self, main_file: str, **spark_args: Any) -> None: if not self.env_vars['JAVA_HOME']: logging.warning('"JAVA_HOME" is not defined in environment variables.') + self.is_yarn = 'yarn' in self.spark_args['master'] + self.is_k8s = 'k8s' in self.spark_args['master'] + self.submit_cmd = self._get_submit_cmd() self.submit_response = {'output': '', 'code': -1, 'submission_id': '', 'driver_state': ''} self.concluded = False def _update_concluded(self) -> None: - self.concluded = self.submit_response['driver_state'] in end_states + self.concluded = self.submit_response['driver_state'] in __end_states__ def _get_submit_cmd(self) -> str: @@ -110,20 +81,24 @@ def _get_api_url(self, endpoint: str) -> str: return '{0}/v1/submissions/{1}/{2}'.format(self.spark_args['master'].replace('spark://', 'http://'), endpoint, self.submit_response['submission_id']) - def _get_api_cmd(self, arg: str) -> str: - return '{0} --master {1} --{2} {3}'.format(self.spark_bin, self.spark_args['master'], arg, self.submit_response['submission_id']) + def _get_status_response(self) -> str: + if self.is_yarn: + status_cmd = 'yarn application -status {0}'.format(self.submit_response['submission_id']) + response, _ = _execute_cmd(status_cmd) + elif self.is_k8s: + status_cmd = '{0} --master status --{1} {2}'.format(self.spark_bin, self.spark_args['master'], self.submit_response['submission_id']) + response, _ = _execute_cmd(status_cmd) + else: + status_url = self._get_api_url('status') + response = requests.get(status_url).text + return response def _check_submit(self) -> None: if self.submit_response['submission_id'] and not self.concluded: - if self.spark_args['master'].startswith('spark://'): - status_url = self._get_api_url('status') - response = requests.get(status_url).text - else: - status_cmd = self._get_api_cmd('status') - response, _ = _execute_cmd(status_cmd) - + response = self._get_status_response() driver_state = re.findall('\"driverState\" : \"(.+)\"', response) + if len(driver_state) < 1: logging.warning('driverState not found for in output "{0}" for Spark job "{1}"'.format(response, self.spark_args['name'])) self.submit_response['driver_state'] = 'UNKNOWN' @@ -132,12 +107,23 @@ def _check_submit(self) -> None: self._update_concluded() - def submit(self, await_result: int=0, use_env_vars: bool=False) -> None: + def _get_submission_id(self, output: str) -> List[str]: + if self.is_yarn: + re_exp = '(application[0-9_]+)' + elif self.is_k8s: + re_exp = '\s*pod name: ((.+?)-([a-z0-9]+)-driver)' + else: + re_exp = '\"submissionId\" : \"(.+)\"' + return re.findall(re_exp, output) + + + def submit(self, await_result: int = 0, use_env_vars: bool = False, timeout: Optional[int] = None) -> None: """Submits the current Spark job to Spark master Parameters await_result (int): how often to poll for the Spark driver state in a background thread (default: 0, don't monitor in a background thread) use_env_vars (bool): whether the environment variables obtained should be used (default: False) + timeout (int): a `TimeoutExpired` exception is raised if spark-submit does not terminate after `timeout` seconds (default: None) Returns: None @@ -153,7 +139,7 @@ def submit(self, await_result: int=0, use_env_vars: bool=False) -> None: self.submit_response['driver_state'] = 'SUBMITTED' self._update_concluded() - output, code = _execute_cmd(env_vars + self.submit_cmd) + output, code = _execute_cmd(env_vars + self.submit_cmd, timeout=timeout) self.submit_response['output'] = output self.submit_response['code'] = code @@ -167,18 +153,18 @@ def submit(self, await_result: int=0, use_env_vars: bool=False) -> None: self._update_concluded() else: - submission_id = re.findall('\"submissionId\" : \"(.+)\"', output) + submission_id = self._get_submission_id(output) if len(submission_id) < 1: logging.warning('submissionId not found in output "{0}" for Spark job "{1}"'.format(output, self.spark_args['name'])) self.submit_response['driver_state'] = 'UNKNOWN' self._update_concluded() else: self.submit_response['submission_id'] = submission_id[0] - if await_result > 0: + if await_result > 0 and not (self.is_yarn or self.is_k8s): threading.Thread(name=self.spark_args['name'], target=self._await_result, args=(await_result, )).start() - def get_submit_cmd(self, multiline: bool=False) -> str: + def get_submit_cmd(self, multiline: bool = False) -> str: """Gets the associated spark-submit command Parameters @@ -221,6 +207,19 @@ def get_code(self) -> int: return self.submit_response['code'] + def _get_kill_response(self) -> Tuple[str, int]: + if self.is_yarn: + kill_cmd = 'yarn application -kill {0}'.format(self.submit_response['submission_id']) + return _execute_cmd(kill_cmd) + elif self.is_k8s: + kill_cmd = '{0} --master {1} --kill {2}'.format(self.spark_bin, self.spark_args['master'], self.submit_response['submission_id']) + return _execute_cmd(kill_cmd) + else: + kill_url = self._get_api_url('kill') + r = requests.get(kill_url) + return r.text, r.status_code + + def kill(self) -> None: """Kills the running Spark job (cluster mode only) @@ -231,16 +230,10 @@ def kill(self) -> None: logging.warning('Spark job "{0}" has concluded with state {1} and cannot be killed.'.format(self.spark_args['name'], self.submit_response['driver_state'])) elif self.submit_response['submission_id']: - if self.spark_args['master'].startswith('spark://'): - kill_url = self._get_api_url('kill') - response = requests.post(kill_url) - resp_text, code = response.text, response.status_code - else: - kill_cmd = self._get_api_cmd('kill') - resp_text, code = _execute_cmd(kill_cmd) + response, code = self._get_kill_response() if code not in {0, 200}: - raise SparkJobKillError('Problem with killing Spark job "{0}" with submission ID {1}: {2}\nReturn code: {3}'.format(self.spark_args['name'], self.submit_response['submission_id'], resp_text, code)) + raise SparkJobKillError('Problem with killing Spark job "{0}" with submission ID {1}: {2}\nReturn code: {3}'.format(self.spark_args['name'], self.submit_response['submission_id'], response, code)) else: self.submit_response['driver_state'] = 'KILLED' self._update_concluded() From 2d13359d8f22463995140ceba05ae486c90810c5 Mon Sep 17 00:00:00 2001 From: PApostol Date: Wed, 8 Dec 2021 20:57:17 +0000 Subject: [PATCH 5/9] Update README --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 7f3f7e5..86834c6 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ ## Spark-submit [![PyPI version](https://badge.fury.io/py/spark-submit.svg)](https://badge.fury.io/py/spark-submit) +[![Downloads](https://static.pepy.tech/personalized-badge/spark-submit?period=month&units=international_system&left_color=grey&right_color=green&left_text=total%20downloads)](https://pepy.tech/project/spark-submit) [![PyPI - Downloads](https://img.shields.io/pypi/dm/spark-submit)](https://pypi.org/project/spark-submit/) [![](https://img.shields.io/badge/python-3.6+-blue.svg)](https://www.python.org/downloads/) [![License](https://img.shields.io/badge/License-MIT-blue)](#license "Go to license section") From 378cd177ef20ddb0c73d6184bb52d7712711bb09 Mon Sep 17 00:00:00 2001 From: PApostol Date: Thu, 9 Dec 2021 18:26:21 +0000 Subject: [PATCH 6/9] Update CHANGELOG for new release --- CHANGELOG.md | 5 ++++- spark_submit/__info__.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 42ac8fc..f6d1f77 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,9 @@ ### Spark-submit -##### 1.1.1 (TBD) +##### 1.2.0 (2021-12-09) +- Improve support for Yarn and Kubernetes clusters on polling and killing submitted tasks +- Use dedicated file for default arguments and Spark states +- Added optional `timeout` argument for `submit()` - Use consistent path separators where possible ##### 1.1.0 (2021-11-13) diff --git a/spark_submit/__info__.py b/spark_submit/__info__.py index fbf37a8..3f09695 100644 --- a/spark_submit/__info__.py +++ b/spark_submit/__info__.py @@ -11,7 +11,7 @@ __author_email__ = 'foo@bar.com' __maintainer__ = 'PApostol' __license__ = 'MIT' -__version__ = '1.1.0' +__version__ = '1.2.0' __description__ = 'Python manager for spark-submit jobs' __url__ = f'https://github.com/{__author__}/{__title__}' __bugtrack_url__ = f'{__url__}/issues' From e2438131709a1e0b81c036a35dc83786335fd43a Mon Sep 17 00:00:00 2001 From: PApostol Date: Thu, 13 Jan 2022 19:40:30 +0000 Subject: [PATCH 7/9] Ignore .mypy_cache --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 394f219..a2ba63d 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,4 @@ Thumbs.db __pycache__/ junk/ dist/ +.mypy_cache/ From 4c3e16bdf115addae4701138d1cc8609a8648b9e Mon Sep 17 00:00:00 2001 From: PApostol Date: Sat, 15 Jan 2022 09:57:59 +0000 Subject: [PATCH 8/9] Refactor to comply with pylint, isort and mypy checks --- setup.py | 9 +++-- spark_submit/__info__.py | 2 +- spark_submit/__init__.py | 14 ++------ spark_submit/_defaults.py | 9 +++-- spark_submit/exceptions.py | 4 +-- spark_submit/sparkjob.py | 56 ++++++++++++++++-------------- spark_submit/system.py | 37 ++++++++++---------- tests/resources/pyspark_example.py | 8 +++-- tests/run_test.py | 1 + 9 files changed, 73 insertions(+), 67 deletions(-) diff --git a/setup.py b/setup.py index 472f0b2..99e515a 100644 --- a/setup.py +++ b/setup.py @@ -1,9 +1,12 @@ """spark-submit module installation script""" -from setuptools import setup, find_packages import os +from typing import Dict + +from setuptools import find_packages, setup info_location = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'spark_submit', '__info__.py') -about = {} +about: Dict[str, str] = {} + with open(info_location, 'r') as f: exec(f.read(), about) @@ -37,4 +40,4 @@ platforms = ['any'], python_requires = '~=3.6', keywords = ['apache', 'spark', 'submit'], - ) \ No newline at end of file + ) diff --git a/spark_submit/__info__.py b/spark_submit/__info__.py index 3f09695..7d063a5 100644 --- a/spark_submit/__info__.py +++ b/spark_submit/__info__.py @@ -1,4 +1,4 @@ -""" __ __ _ __ +r""" __ __ _ __ _________ ____ ______/ /__ _______ __/ /_ ____ ___ (_) /_ / ___/ __ \/ __ `/ ___/ //_/_____/ ___/ / / / __ \/ __ `__ \/ / __/ (__ ) /_/ / /_/ / / / ,< /_____(__ ) /_/ / /_/ / / / / / / / /_ diff --git a/spark_submit/__init__.py b/spark_submit/__init__.py index c66470a..dae948d 100644 --- a/spark_submit/__init__.py +++ b/spark_submit/__init__.py @@ -1,17 +1,9 @@ """spark-submit module""" +from .__info__ import (__author__, __author_email__, __bugtrack_url__, + __description__, __license__, __maintainer__, __title__, + __url__, __version__) from .sparkjob import SparkJob from .system import system_info -from .__info__ import ( - __title__, - __author__, - __author_email__, - __maintainer__, - __license__, - __version__, - __description__, - __url__, - __bugtrack_url__ -) __all__ = ['SparkJob', 'system_info'] diff --git a/spark_submit/_defaults.py b/spark_submit/_defaults.py index fe55b6e..13b10d0 100644 --- a/spark_submit/_defaults.py +++ b/spark_submit/_defaults.py @@ -1,7 +1,9 @@ +"""Some basic default spark-submit arguments and driver end states""" import os +from typing import Any, Dict, Set -# some basic default spark-submit arguments -__defaults__ = { + +__defaults__: Dict[str, Any ] = { 'spark_home': os.environ.get('SPARK_HOME', os.path.expanduser('~/spark_home')), 'master': 'local[*]', 'name': 'spark-submit-task', @@ -34,4 +36,5 @@ # FAILED: The driver exited non-zero and was not supervised # ERROR: Unable to run or restart due to an unrecoverable error (e.g. missing jar file) -__end_states__ = {'FINISHED', 'UNKNOWN', 'KILLED', 'FAILED', 'ERROR'} # states that conclude a job +# states that conclude a job +__end_states__: Set[str] = {'FINISHED', 'UNKNOWN', 'KILLED', 'FAILED', 'ERROR'} diff --git a/spark_submit/exceptions.py b/spark_submit/exceptions.py index 5489908..088b616 100644 --- a/spark_submit/exceptions.py +++ b/spark_submit/exceptions.py @@ -1,8 +1,8 @@ """Exceptions raised by SparkJob class""" class SparkSubmitError(Exception): - pass + """Raised if spark-submit command fails""" class SparkJobKillError(Exception): - pass + """Raised if spark-submit command fails""" diff --git a/spark_submit/sparkjob.py b/spark_submit/sparkjob.py index b96a19a..02ae45b 100644 --- a/spark_submit/sparkjob.py +++ b/spark_submit/sparkjob.py @@ -1,14 +1,17 @@ -from .system import _get_env_vars, _execute_cmd -from .exceptions import SparkSubmitError, SparkJobKillError -from ._defaults import __defaults__, __end_states__ -from typing import Any, List, Optional, Tuple +"""SparkJob class functionality""" import logging import os import platform import re -import requests import threading import time +from typing import Any, Dict, List, Optional, Tuple + +import requests + +from ._defaults import __defaults__, __end_states__ +from .exceptions import SparkJobKillError, SparkSubmitError +from .system import _execute_cmd, _get_env_vars class SparkJob: @@ -48,7 +51,7 @@ def __init__(self, main_file: str, **spark_args: Any) -> None: self.is_k8s = 'k8s' in self.spark_args['master'] self.submit_cmd = self._get_submit_cmd() - self.submit_response = {'output': '', 'code': -1, 'submission_id': '', 'driver_state': ''} + self.submit_response: Dict[str, Any] = {'output': '', 'code': -1, 'submission_id': '', 'driver_state': ''} self.concluded = False @@ -59,6 +62,7 @@ def _update_concluded(self) -> None: def _get_submit_cmd(self) -> str: booleans = {arg for arg, val in self.spark_args.items() if isinstance(val, bool)} exclude = {'spark_home', 'main_file', 'conf', 'main_file_args'} ^ booleans + args = ['--{0} {1}'.format(arg.replace('_', '-'), val) for arg, val in self.spark_args.items() if arg not in exclude and val is not None] confs = [f'--conf {c}' for c in self.spark_args['conf']] @@ -97,10 +101,10 @@ def _get_status_response(self) -> str: def _check_submit(self) -> None: if self.submit_response['submission_id'] and not self.concluded: response = self._get_status_response() - driver_state = re.findall('\"driverState\" : \"(.+)\"', response) + driver_state = re.findall(r'\"driverState\" : \"(.+)\"', response) if len(driver_state) < 1: - logging.warning('driverState not found for in output "{0}" for Spark job "{1}"'.format(response, self.spark_args['name'])) + logging.warning('driverState not found for in output "%s" for Spark job "%s"', response, self.spark_args['name']) self.submit_response['driver_state'] = 'UNKNOWN' else: self.submit_response['driver_state'] = driver_state[0] @@ -109,11 +113,11 @@ def _check_submit(self) -> None: def _get_submission_id(self, output: str) -> List[str]: if self.is_yarn: - re_exp = '(application[0-9_]+)' + re_exp = r'(application[0-9_]+)' elif self.is_k8s: - re_exp = '\s*pod name: ((.+?)-([a-z0-9]+)-driver)' + re_exp = r'\s*pod name: ((.+?)-([a-z0-9]+)-driver)' else: - re_exp = '\"submissionId\" : \"(.+)\"' + re_exp = r'\"submissionId\" : \"(.+)\"' return re.findall(re_exp, output) @@ -148,14 +152,14 @@ def submit(self, await_result: int = 0, use_env_vars: bool = False, timeout: Opt self._update_concluded() raise SparkSubmitError(f'{output}\nReturn code: {code}') - elif self.spark_args['deploy_mode'] == 'client': + if self.spark_args['deploy_mode'] == 'client': self.submit_response['driver_state'] = 'FINISHED' self._update_concluded() else: submission_id = self._get_submission_id(output) if len(submission_id) < 1: - logging.warning('submissionId not found in output "{0}" for Spark job "{1}"'.format(output, self.spark_args['name'])) + logging.warning('submissionId not found in output "%s" for Spark job "%s"', output, self.spark_args['name']) self.submit_response['driver_state'] = 'UNKNOWN' self._update_concluded() else: @@ -175,8 +179,7 @@ def get_submit_cmd(self, multiline: bool = False) -> str: """ if multiline: return self.submit_cmd.replace(' --', ' \ \n--').replace(' ' + self.spark_args['main_file'], ' \ \n' + self.spark_args['main_file']) - else: - return self.submit_cmd + return self.submit_cmd def get_state(self) -> str: @@ -211,13 +214,14 @@ def _get_kill_response(self) -> Tuple[str, int]: if self.is_yarn: kill_cmd = 'yarn application -kill {0}'.format(self.submit_response['submission_id']) return _execute_cmd(kill_cmd) - elif self.is_k8s: + + if self.is_k8s: kill_cmd = '{0} --master {1} --kill {2}'.format(self.spark_bin, self.spark_args['master'], self.submit_response['submission_id']) return _execute_cmd(kill_cmd) - else: - kill_url = self._get_api_url('kill') - r = requests.get(kill_url) - return r.text, r.status_code + + kill_url = self._get_api_url('kill') + resp = requests.get(kill_url) + return resp.text, resp.status_code def kill(self) -> None: @@ -227,15 +231,15 @@ def kill(self) -> None: None """ if self.concluded: - logging.warning('Spark job "{0}" has concluded with state {1} and cannot be killed.'.format(self.spark_args['name'], self.submit_response['driver_state'])) + logging.warning('Spark job "%s" has concluded with state "%s" and cannot be killed.', self.spark_args['name'], self.submit_response['driver_state']) elif self.submit_response['submission_id']: response, code = self._get_kill_response() if code not in {0, 200}: - raise SparkJobKillError('Problem with killing Spark job "{0}" with submission ID {1}: {2}\nReturn code: {3}'.format(self.spark_args['name'], self.submit_response['submission_id'], response, code)) - else: - self.submit_response['driver_state'] = 'KILLED' - self._update_concluded() + raise SparkJobKillError('Error killing Spark job "%s" with submission ID "%s": %s\nReturn code: %i', self.spark_args['name'], self.submit_response['submission_id'], response, code) + + self.submit_response['driver_state'] = 'KILLED' + self._update_concluded() else: - raise SparkJobKillError('Spark job "{0}" has no submission ID to kill.'.format(self.spark_args['name'])) + raise SparkJobKillError('Spark job "%s" has no submission ID to kill.', self.spark_args['name']) diff --git a/spark_submit/system.py b/spark_submit/system.py index 1687590..6f86987 100644 --- a/spark_submit/system.py +++ b/spark_submit/system.py @@ -1,3 +1,4 @@ +"""System related functionalities""" import logging import os import platform @@ -19,16 +20,15 @@ def _get_env_vars() -> Dict[str, str]: return {env_var: _quote_spaces(val).replace(os.path.sep, '/') for env_var, val in env_vars.items()} -def _execute_cmd(cmd: str, silent: bool = True, timeout: Optional[int] = None) -> Optional[Tuple[str, int]]: - p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) - o, _ = p.communicate(timeout=timeout) - o = o.decode() - code = p.returncode +def _execute_cmd(cmd: str, silent: bool = True, timeout: Optional[int] = None) -> Tuple[str, int]: + with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) as process: + output, _ = process.communicate(timeout=timeout) + res = output.decode() + code = process.returncode if code != 0 and not silent: - logging.warning(o) - else: - return o, code + logging.warning(res) + return res, code def system_info() -> str: @@ -37,12 +37,13 @@ def system_info() -> str: Returns: str: system information """ - spark_bin = os.environ.get('SPARK_HOME', os.path.expanduser('~/spark_home')).replace(os.path.sep, '/') + '/bin/spark-submit' + spark_home = os.environ.get('SPARK_HOME', os.path.expanduser('~/spark_home')).replace(os.path.sep, '/') + spark_bin = spark_home + '/bin/spark-submit' info_cmd = _quote_spaces(spark_bin) + ' --version' - JAVA_HOME = os.environ.get('JAVA_HOME', '').replace(os.path.sep, '/') - if JAVA_HOME: - java_bin = JAVA_HOME + '/bin/java' + java_home = os.environ.get('JAVA_HOME', '').replace(os.path.sep, '/') + if java_home: + java_bin = java_home + '/bin/java' info_cmd += f' ; {_quote_spaces(java_bin)} -version' info_cmd += f' ; {_quote_spaces(sys.executable)} -m pip show pyspark' @@ -56,12 +57,12 @@ def system_info() -> str: 'PySpark version': 'Version: (.+)' } - sys_info = {} - for k, v in info_re.items(): - i = re.findall(v, info_stdout, re.IGNORECASE) + sys_info: Dict[str, str] = {} + for key, val in info_re.items(): + i = re.findall(val, info_stdout, re.IGNORECASE) if i: - sys_info[k] = i[0].strip() + sys_info[key] = i[0].strip() - sys_info['Python version'] = sys.version.split(' ')[0] + sys_info['Python version'] = sys.version.split(' ', maxsplit=1)[0] sys_info['OS'] = platform.platform() - return '\n'.join([f'{k}: {v}' for k, v in sys_info.items()]) + return '\n'.join([f'{key}: {val}' for key, val in sys_info.items()]) diff --git a/tests/resources/pyspark_example.py b/tests/resources/pyspark_example.py index 910616f..72b8c54 100644 --- a/tests/resources/pyspark_example.py +++ b/tests/resources/pyspark_example.py @@ -1,7 +1,9 @@ -from pyspark.sql import SparkSession -from random import random -from operator import add +"""pyspark example to calculate pi""" import sys +from operator import add +from random import random + +from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Calculate Pi').getOrCreate() diff --git a/tests/run_test.py b/tests/run_test.py index 469bb38..bc3f577 100644 --- a/tests/run_test.py +++ b/tests/run_test.py @@ -1,3 +1,4 @@ +"""Simple spark-submit test""" from spark_submit import SparkJob print('---Running example on local mode---') From e9b1adfcd943a21f305013756ec42dc8f31b54a6 Mon Sep 17 00:00:00 2001 From: PApostol Date: Sun, 16 Jan 2022 14:15:25 +0000 Subject: [PATCH 9/9] Update CHANGELOG and bump version --- CHANGELOG.md | 4 ++++ spark_submit/__info__.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f6d1f77..f6f00de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ### Spark-submit +##### 1.2.1 (2022-01-16) +- Refactor to comply with `pylint`, `isort` and `mypy` checks +- Minor improvements + ##### 1.2.0 (2021-12-09) - Improve support for Yarn and Kubernetes clusters on polling and killing submitted tasks - Use dedicated file for default arguments and Spark states diff --git a/spark_submit/__info__.py b/spark_submit/__info__.py index 7d063a5..b994403 100644 --- a/spark_submit/__info__.py +++ b/spark_submit/__info__.py @@ -11,7 +11,7 @@ __author_email__ = 'foo@bar.com' __maintainer__ = 'PApostol' __license__ = 'MIT' -__version__ = '1.2.0' +__version__ = '1.2.1' __description__ = 'Python manager for spark-submit jobs' __url__ = f'https://github.com/{__author__}/{__title__}' __bugtrack_url__ = f'{__url__}/issues'