diff --git a/src/tesk_core/callback_sender.py b/src/tesk_core/callback_sender.py new file mode 100644 index 0000000..efcc013 --- /dev/null +++ b/src/tesk_core/callback_sender.py @@ -0,0 +1,46 @@ +import logging + +import requests + + +class CallbackSender: + """Implemention for the callback sender class.""" + + def __init__(self, task_id="", url=""): + self.url = url + self.task_id = task_id + + def send(self, state): + """Send the state to the callback receiver. + Args: + state (str): Descriptor of the state of the task. + Returns: + response (requests.models.Response): Response from the callback receiver. + None: if the callback receiver is not set or some error occurs. + """ + + if not self.url: + return None + sent = False + retries = 0 + response = None + while not sent: + try: + data = {"id": self.task_id, "state": state} + headers = {"Content-Type": "application/json"} + response = requests.post(self.url, json=data, headers=headers) + sent = True + except requests.exceptions.Timeout: + retries += 1 + if retries > 3: + logging.error("Callback Timeout") + break + continue + except requests.exceptions.TooManyRedirects as err: + logging.error("Bad URL: %s", err) + break + except requests.exceptions.RequestException as err: + logging.error(err) + break + + return response diff --git a/src/tesk_core/job.py b/src/tesk_core/job.py index a164831..60abb0f 100644 --- a/src/tesk_core/job.py +++ b/src/tesk_core/job.py @@ -4,11 +4,12 @@ from kubernetes import client, config from kubernetes.client.rest import ApiException from tesk_core.Util import pprint +from tesk_core.callback_sender import CallbackSender logging.basicConfig(format='%(message)s', level=logging.INFO) class Job: - def __init__(self, body, name='task-job', namespace='default'): + def __init__(self, body, name='task-job', namespace='default', callback_url=None): self.name = name self.namespace = namespace self.status = 'Initialized' @@ -17,6 +18,10 @@ def __init__(self, body, name='task-job', namespace='default'): self.timeout = 240 self.body = body self.body['metadata']['name'] = self.name + self.callback = None + if callback_url: + task_name = '-'.join(name.split('-')[:2]) + self.callback = CallbackSender(task_name, callback_url) def run_to_completion(self, poll_interval, check_cancelled, pod_timeout): @@ -34,9 +39,15 @@ def run_to_completion(self, poll_interval, check_cancelled, pod_timeout): raise ApiException(ex.status, ex.reason) is_all_pods_running = False status, is_all_pods_running = self.get_status(is_all_pods_running) + # notify the callback receiver that the job is running + if self.callback and status == 'Running': + self.callback.send('RUNNING') while status == 'Running': if check_cancelled(): self.delete() + # notify the callback receiver that the task is cancelled + if self.callback: + self.callback.send('CANCELED') return 'Cancelled' time.sleep(poll_interval) status, is_all_pods_running = self.get_status(is_all_pods_running) diff --git a/src/tesk_core/taskmaster.py b/src/tesk_core/taskmaster.py index a3113a2..5765525 100755 --- a/src/tesk_core/taskmaster.py +++ b/src/tesk_core/taskmaster.py @@ -4,20 +4,25 @@ import json import os import re +from subprocess import call import sys import logging from kubernetes import client, config from tesk_core.job import Job from tesk_core.pvc import PVC from tesk_core.filer_class import Filer +from tesk_core.callback_sender import CallbackSender created_jobs = [] poll_interval = 5 task_volume_basename = 'task-volume' args = None logger = None +callback = CallbackSender() def run_executor(executor, namespace, pvc=None): + # notify the callback receiver that an executor is queued + callback.send('QUEUED') jobname = executor['metadata']['name'] spec = executor['spec']['template']['spec'] @@ -31,7 +36,7 @@ def run_executor(executor, namespace, pvc=None): volumes.extend([{'name': task_volume_basename, 'persistentVolumeClaim': { 'readonly': False, 'claimName': pvc.name}}]) logger.debug('Created job: ' + jobname) - job = Job(executor, jobname, namespace) + job = Job(executor, jobname, namespace, callback.url) logger.debug('Job spec: ' + str(job.body)) global created_jobs @@ -39,6 +44,9 @@ def run_executor(executor, namespace, pvc=None): status = job.run_to_completion(poll_interval, check_cancelled,args.pod_timeout) if status != 'Complete': + # notify the callback receiver about the error + if status in ('Failed', 'Error'): + callback.send('EXECUTOR_ERROR') if status == 'Error': job.delete() exit_cancelled('Got status ' + status) @@ -98,6 +106,8 @@ def generate_mounts(data, pvc): def init_pvc(data, filer): + # notify the callback receiver that pvc initialization is queued + callback.send('QUEUED') task_name = data['executors'][0]['metadata']['labels']['taskmaster-name'] pvc_name = task_name + '-pvc' pvc_size = data['resources']['disk_gb'] @@ -127,6 +137,9 @@ def init_pvc(data, filer): # filerjob.run_to_completion(poll_interval) status = filerjob.run_to_completion(poll_interval, check_cancelled, args.pod_timeout) if status != 'Complete': + # notify the callback receiver about the error + if status in ('Failed', 'Error'): + callback.send('SYSTEM_ERROR') exit_cancelled('Got status ' + status) return pvc @@ -150,10 +163,10 @@ def run_task(data, filer_name, filer_version): pvc = init_pvc(data, filer) + # run executors for executor in data['executors']: run_executor(executor, args.namespace, pvc) - # run executors logging.debug("Finished running executors") # upload files and delete pvc @@ -167,12 +180,18 @@ def run_task(data, filer_name, filer_version): created_jobs.append(filerjob) # filerjob.run_to_completion(poll_interval) - status = filerjob.run_to_completion(poll_interval, check_cancelled, args.pod_timeout) - if status != 'Complete': - exit_cancelled('Got status ' + status) + filer_status = filerjob.run_to_completion(poll_interval, check_cancelled, args.pod_timeout) + if filer_status != 'Complete': + # send "SYSTEM_ERROR" to callback receiver if taskmaster completes + # but the output filer fails + callback.send('SYSTEM_ERROR') + exit_cancelled('Got status ' + filer_status) else: pvc.delete() + # notify the callback receiver upon task completion + callback.send('COMPLETE') + def newParser(): @@ -283,10 +302,17 @@ def main(): global created_pvc created_pvc = None + # Fill information for callback object + callback.url = os.getenv('CALLBACK_URL', '') + callback.task_id = data['executors'][0]['metadata']['labels']['taskmaster-name'] + # Check if we're cancelled during init if check_cancelled(): + callback.send('CANCELED') exit_cancelled('Cancelled during init') + # notify the callback receiver upon its initialization + callback.send('INITIALIZING') run_task(data, args.filer_name, args.filer_version) @@ -297,7 +323,6 @@ def clean_on_interrupt(): job.delete() - def exit_cancelled(reason='Unknown reason'): logger.error('Cancelling taskmaster: ' + reason) sys.exit(0)