Skip to content

Commit

Permalink
Merge pull request #7 from PApostol/develop
Browse files Browse the repository at this point in the history
Develop -> Master
  • Loading branch information
PApostol authored Jan 16, 2022
2 parents 6b951b8 + 08e34d9 commit c679479
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 68 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ Thumbs.db
__pycache__/
junk/
dist/
.mypy_cache/
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
### Spark-submit

##### 1.2.1 (2022-01-16)
- Refactor to comply with `pylint`, `isort` and `mypy` checks
- Minor improvements

##### 1.2.0 (2021-12-09)
- Improve support for Yarn and Kubernetes clusters on polling and killing submitted tasks
- Use dedicated file for default arguments and Spark states
Expand Down
9 changes: 6 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
"""spark-submit module installation script"""
from setuptools import setup, find_packages
import os
from typing import Dict

from setuptools import find_packages, setup

info_location = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'spark_submit', '__info__.py')
about = {}
about: Dict[str, str] = {}

with open(info_location, 'r') as f:
exec(f.read(), about)

Expand Down Expand Up @@ -37,4 +40,4 @@
platforms = ['any'],
python_requires = '~=3.6',
keywords = ['apache', 'spark', 'submit'],
)
)
4 changes: 2 additions & 2 deletions spark_submit/__info__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" __ __ _ __
r""" __ __ _ __
_________ ____ ______/ /__ _______ __/ /_ ____ ___ (_) /_
/ ___/ __ \/ __ `/ ___/ //_/_____/ ___/ / / / __ \/ __ `__ \/ / __/
(__ ) /_/ / /_/ / / / ,< /_____(__ ) /_/ / /_/ / / / / / / / /_
Expand All @@ -11,7 +11,7 @@
__author_email__ = '[email protected]'
__maintainer__ = 'PApostol'
__license__ = 'MIT'
__version__ = '1.2.0'
__version__ = '1.2.1'
__description__ = 'Python manager for spark-submit jobs'
__url__ = f'https://github.com/{__author__}/{__title__}'
__bugtrack_url__ = f'{__url__}/issues'
14 changes: 3 additions & 11 deletions spark_submit/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,9 @@
"""spark-submit module"""

from .__info__ import (__author__, __author_email__, __bugtrack_url__,
__description__, __license__, __maintainer__, __title__,
__url__, __version__)
from .sparkjob import SparkJob
from .system import system_info
from .__info__ import (
__title__,
__author__,
__author_email__,
__maintainer__,
__license__,
__version__,
__description__,
__url__,
__bugtrack_url__
)

__all__ = ['SparkJob', 'system_info']
9 changes: 6 additions & 3 deletions spark_submit/_defaults.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Some basic default spark-submit arguments and driver end states"""
import os
from typing import Any, Dict, Set

# some basic default spark-submit arguments
__defaults__ = {

__defaults__: Dict[str, Any ] = {
'spark_home': os.environ.get('SPARK_HOME', os.path.expanduser('~/spark_home')),
'master': 'local[*]',
'name': 'spark-submit-task',
Expand Down Expand Up @@ -34,4 +36,5 @@
# FAILED: The driver exited non-zero and was not supervised
# ERROR: Unable to run or restart due to an unrecoverable error (e.g. missing jar file)

__end_states__ = {'FINISHED', 'UNKNOWN', 'KILLED', 'FAILED', 'ERROR'} # states that conclude a job
# states that conclude a job
__end_states__: Set[str] = {'FINISHED', 'UNKNOWN', 'KILLED', 'FAILED', 'ERROR'}
4 changes: 2 additions & 2 deletions spark_submit/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Exceptions raised by SparkJob class"""

class SparkSubmitError(Exception):
pass
"""Raised if spark-submit command fails"""


class SparkJobKillError(Exception):
pass
"""Raised if spark-submit command fails"""
56 changes: 30 additions & 26 deletions spark_submit/sparkjob.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from .system import _get_env_vars, _execute_cmd
from .exceptions import SparkSubmitError, SparkJobKillError
from ._defaults import __defaults__, __end_states__
from typing import Any, List, Optional, Tuple
"""SparkJob class functionality"""
import logging
import os
import platform
import re
import requests
import threading
import time
from typing import Any, Dict, List, Optional, Tuple

import requests

from ._defaults import __defaults__, __end_states__
from .exceptions import SparkJobKillError, SparkSubmitError
from .system import _execute_cmd, _get_env_vars


class SparkJob:
Expand Down Expand Up @@ -48,7 +51,7 @@ def __init__(self, main_file: str, **spark_args: Any) -> None:
self.is_k8s = 'k8s' in self.spark_args['master']

self.submit_cmd = self._get_submit_cmd()
self.submit_response = {'output': '', 'code': -1, 'submission_id': '', 'driver_state': ''}
self.submit_response: Dict[str, Any] = {'output': '', 'code': -1, 'submission_id': '', 'driver_state': ''}
self.concluded = False


Expand All @@ -59,6 +62,7 @@ def _update_concluded(self) -> None:
def _get_submit_cmd(self) -> str:
booleans = {arg for arg, val in self.spark_args.items() if isinstance(val, bool)}
exclude = {'spark_home', 'main_file', 'conf', 'main_file_args'} ^ booleans

args = ['--{0} {1}'.format(arg.replace('_', '-'), val) for arg, val in self.spark_args.items() if arg not in exclude and val is not None]
confs = [f'--conf {c}' for c in self.spark_args['conf']]

Expand Down Expand Up @@ -97,10 +101,10 @@ def _get_status_response(self) -> str:
def _check_submit(self) -> None:
if self.submit_response['submission_id'] and not self.concluded:
response = self._get_status_response()
driver_state = re.findall('\"driverState\" : \"(.+)\"', response)
driver_state = re.findall(r'\"driverState\" : \"(.+)\"', response)

if len(driver_state) < 1:
logging.warning('driverState not found for in output "{0}" for Spark job "{1}"'.format(response, self.spark_args['name']))
logging.warning('driverState not found for in output "%s" for Spark job "%s"', response, self.spark_args['name'])
self.submit_response['driver_state'] = 'UNKNOWN'
else:
self.submit_response['driver_state'] = driver_state[0]
Expand All @@ -109,11 +113,11 @@ def _check_submit(self) -> None:

def _get_submission_id(self, output: str) -> List[str]:
if self.is_yarn:
re_exp = '(application[0-9_]+)'
re_exp = r'(application[0-9_]+)'
elif self.is_k8s:
re_exp = '\s*pod name: ((.+?)-([a-z0-9]+)-driver)'
re_exp = r'\s*pod name: ((.+?)-([a-z0-9]+)-driver)'
else:
re_exp = '\"submissionId\" : \"(.+)\"'
re_exp = r'\"submissionId\" : \"(.+)\"'
return re.findall(re_exp, output)


Expand Down Expand Up @@ -148,14 +152,14 @@ def submit(self, await_result: int = 0, use_env_vars: bool = False, timeout: Opt
self._update_concluded()
raise SparkSubmitError(f'{output}\nReturn code: {code}')

elif self.spark_args['deploy_mode'] == 'client':
if self.spark_args['deploy_mode'] == 'client':
self.submit_response['driver_state'] = 'FINISHED'
self._update_concluded()

else:
submission_id = self._get_submission_id(output)
if len(submission_id) < 1:
logging.warning('submissionId not found in output "{0}" for Spark job "{1}"'.format(output, self.spark_args['name']))
logging.warning('submissionId not found in output "%s" for Spark job "%s"', output, self.spark_args['name'])
self.submit_response['driver_state'] = 'UNKNOWN'
self._update_concluded()
else:
Expand All @@ -175,8 +179,7 @@ def get_submit_cmd(self, multiline: bool = False) -> str:
"""
if multiline:
return self.submit_cmd.replace(' --', ' \ \n--').replace(' ' + self.spark_args['main_file'], ' \ \n' + self.spark_args['main_file'])
else:
return self.submit_cmd
return self.submit_cmd


def get_state(self) -> str:
Expand Down Expand Up @@ -211,13 +214,14 @@ def _get_kill_response(self) -> Tuple[str, int]:
if self.is_yarn:
kill_cmd = 'yarn application -kill {0}'.format(self.submit_response['submission_id'])
return _execute_cmd(kill_cmd)
elif self.is_k8s:

if self.is_k8s:
kill_cmd = '{0} --master {1} --kill {2}'.format(self.spark_bin, self.spark_args['master'], self.submit_response['submission_id'])
return _execute_cmd(kill_cmd)
else:
kill_url = self._get_api_url('kill')
r = requests.get(kill_url)
return r.text, r.status_code

kill_url = self._get_api_url('kill')
resp = requests.get(kill_url)
return resp.text, resp.status_code


def kill(self) -> None:
Expand All @@ -227,15 +231,15 @@ def kill(self) -> None:
None
"""
if self.concluded:
logging.warning('Spark job "{0}" has concluded with state {1} and cannot be killed.'.format(self.spark_args['name'], self.submit_response['driver_state']))
logging.warning('Spark job "%s" has concluded with state "%s" and cannot be killed.', self.spark_args['name'], self.submit_response['driver_state'])

elif self.submit_response['submission_id']:
response, code = self._get_kill_response()

if code not in {0, 200}:
raise SparkJobKillError('Problem with killing Spark job "{0}" with submission ID {1}: {2}\nReturn code: {3}'.format(self.spark_args['name'], self.submit_response['submission_id'], response, code))
else:
self.submit_response['driver_state'] = 'KILLED'
self._update_concluded()
raise SparkJobKillError('Error killing Spark job "%s" with submission ID "%s": %s\nReturn code: %i', self.spark_args['name'], self.submit_response['submission_id'], response, code)

self.submit_response['driver_state'] = 'KILLED'
self._update_concluded()
else:
raise SparkJobKillError('Spark job "{0}" has no submission ID to kill.'.format(self.spark_args['name']))
raise SparkJobKillError('Spark job "%s" has no submission ID to kill.', self.spark_args['name'])
37 changes: 19 additions & 18 deletions spark_submit/system.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""System related functionalities"""
import logging
import os
import platform
Expand All @@ -19,16 +20,15 @@ def _get_env_vars() -> Dict[str, str]:
return {env_var: _quote_spaces(val).replace(os.path.sep, '/') for env_var, val in env_vars.items()}


def _execute_cmd(cmd: str, silent: bool = True, timeout: Optional[int] = None) -> Optional[Tuple[str, int]]:
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
o, _ = p.communicate(timeout=timeout)
o = o.decode()
code = p.returncode
def _execute_cmd(cmd: str, silent: bool = True, timeout: Optional[int] = None) -> Tuple[str, int]:
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) as process:
output, _ = process.communicate(timeout=timeout)
res = output.decode()
code = process.returncode

if code != 0 and not silent:
logging.warning(o)
else:
return o, code
logging.warning(res)
return res, code


def system_info() -> str:
Expand All @@ -37,12 +37,13 @@ def system_info() -> str:
Returns:
str: system information
"""
spark_bin = os.environ.get('SPARK_HOME', os.path.expanduser('~/spark_home')).replace(os.path.sep, '/') + '/bin/spark-submit'
spark_home = os.environ.get('SPARK_HOME', os.path.expanduser('~/spark_home')).replace(os.path.sep, '/')
spark_bin = spark_home + '/bin/spark-submit'
info_cmd = _quote_spaces(spark_bin) + ' --version'

JAVA_HOME = os.environ.get('JAVA_HOME', '').replace(os.path.sep, '/')
if JAVA_HOME:
java_bin = JAVA_HOME + '/bin/java'
java_home = os.environ.get('JAVA_HOME', '').replace(os.path.sep, '/')
if java_home:
java_bin = java_home + '/bin/java'
info_cmd += f' ; {_quote_spaces(java_bin)} -version'

info_cmd += f' ; {_quote_spaces(sys.executable)} -m pip show pyspark'
Expand All @@ -56,12 +57,12 @@ def system_info() -> str:
'PySpark version': 'Version: (.+)'
}

sys_info = {}
for k, v in info_re.items():
i = re.findall(v, info_stdout, re.IGNORECASE)
sys_info: Dict[str, str] = {}
for key, val in info_re.items():
i = re.findall(val, info_stdout, re.IGNORECASE)
if i:
sys_info[k] = i[0].strip()
sys_info[key] = i[0].strip()

sys_info['Python version'] = sys.version.split(' ')[0]
sys_info['Python version'] = sys.version.split(' ', maxsplit=1)[0]
sys_info['OS'] = platform.platform()
return '\n'.join([f'{k}: {v}' for k, v in sys_info.items()])
return '\n'.join([f'{key}: {val}' for key, val in sys_info.items()])
8 changes: 5 additions & 3 deletions tests/resources/pyspark_example.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from pyspark.sql import SparkSession
from random import random
from operator import add
"""pyspark example to calculate pi"""
import sys
from operator import add
from random import random

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Calculate Pi').getOrCreate()

Expand Down
1 change: 1 addition & 0 deletions tests/run_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Simple spark-submit test"""
from spark_submit import SparkJob

print('---Running example on local mode---')
Expand Down

0 comments on commit c679479

Please sign in to comment.