Source code for job_utilities

"""Utiliites for Jobs class which store all important information for each job to run

Classes:
    Jobs: Stores all important information for each job to run on cluster

Contains module functions::

    queue_starter_job(args, jobname='starter-jobname', dummy=1)
    run_job_step(args, job_type, tmpdict)

    run_local_fetch(args)
    curl_handler(args, jobname, job_str)
    chronos_parent_str(parentlist)

Attributes:
    CURL_PREFIX (list): parts of the chronos curl command
"""

import os
import subprocess
import json
import stat
import config_utilities as cf

CURL_PREFIX = ["curl", "-i", "-L", "-H", "'Content-Type: application/json'",
               "-X", "POST"]

[docs]class Job: """Base class for each job to be run in pipeline. This Job class provides attributes and default functions to store information about and perform operations on job. Attributes: jobtype (str): the type of job to be referenced in components.json jobname (str): name of job as appears on chronos tmpdict (dict): dictionary of default tmp variable substitutions cjobfile (str): chronos local file name of json job descriptor cjobstr (str): contents of json job descriptor as single string. args (namespace): command line arguments and default arguments to method """ def __init__(self, jobtype, args): """Init a Job object with the provided parameters. Constructs a Job object with the provided parameters. Args: jobtype (str): the type of job to be referenced in components.json args (namespace): command line arguments and default arguments to method """ self.jobtype = jobtype self.args = args self.jobname = jobtype + "_job" self.cjobfile = 'missing.json' # read in dummy template self.cjobstr = "" with open(os.path.join(args.code_path, "template", "job_template.json"), 'r') as infile: self.cjobstr = infile.read(10000) # read in job dictionaries jobsdesc = "" with open(os.path.join(args.code_path, "template", "components.json"), 'r') as infile: jobsdesc = json.load(infile) # prepare volume mount vmntstr = '{"containerPath": "TMPWORKDIR", "hostPath":"TMPWORKDIR", "mode":"RW"}' if self.args.storage_dir and self.args.working_dir != self.args.storage_dir: vmntstr += ', {"containerPath": "TMPSHAREDIR", "hostPath":"TMPSHAREDIR", "mode":"RW"}' if os.path.abspath(self.args.code_path) != os.path.abspath(cf.DEFAULT_CODE_PATH): vmntstr += ', {"containerPath": "TMPCODEPATH", "hostPath":"TMPCODEPATH", "mode":"RW"}' # replace global dummy values with job specific ones self.tmpdict = jobsdesc[jobtype] self.tmpdict["TMPVMNTSTR"] = vmntstr self.replace_jobtmp(self.tmpdict)
[docs] def replace_jobtmp(self, tmpdict): """Replaces temporary strings in self.cjobstr with specific values This loops through all keys in tmpdict and replaces any placeholder matches in self.cjobstr with the key values. Also, adds tmpdict to self.tmpdict. Args: tmpdict (dict): dictionary of default tmp variable substitutions Returns: """ envstr = str(self.tmpdict.get("TMPENV", "[]")) for key in tmpdict: # print(key + " " + str(tmpdict[key]) ) if key == 'TMPJOB': self.jobname = tmpdict[key] self.cjobstr = self.cjobstr.replace(key, str(tmpdict[key])) envstr = envstr.replace(key, str(tmpdict[key])) self.tmpdict[key] = tmpdict[key] self.tmpdict['TMPENV'] = envstr
[docs] def print_chronos_job(self): """Prints out job description to .json file This creates a directory and in it prints a .json file containing self.cjobstr. It saves the created file as self.cjobfile Args: Returns: """ jobs_dir = os.path.join(self.args.working_dir, self.args.logs_path, 'chron_jobs') if not os.path.exists(jobs_dir): os.makedirs(jobs_dir) cjobfile = jobs_dir + os.sep + self.jobname + ".json" self.cjobfile = cjobfile with open(cjobfile, 'w') as outfile: outfile.write(self.cjobstr)
[docs] def run_local_job(self): """prints and runs the job in the local environment Using the args, tmpdict, and cjobstr create a command line call to executes the job """ jobjson = json.loads(self.cjobstr) command = jobjson["command"] print(command) if not self.args.test_mode: # subprocess.call(command, shell=True) try: subprocess.check_output(command, shell=True) except subprocess.CalledProcessError as ex1: print(ex1.output)
[docs] def run_docker_job(self): """runs the job locally using docker Using the args, tmpdict, and cjobstr create a command line call to docker run that executes the job and removes itself """ jobjson = json.loads(self.cjobstr) envvars = json.loads(self.tmpdict.get("TMPENV", "[]")) envstr = "" vmntstr = "-v " + self.args.working_dir + ":" + self.args.working_dir if self.args.storage_dir and self.args.working_dir != self.args.storage_dir: vmntstr += " -v " + self.args.storage_dir + ":" + self.args.storage_dir if os.path.abspath(self.args.code_path) != os.path.abspath(cf.DEFAULT_CODE_PATH): vmntstr += " -v " + self.args.code_path + ":" + self.args.code_path for env in envvars: envstr += ' -e ' + env['variable'] + '=' + env['value'] docker_cmd = ["docker", "run", "--name", jobjson["name"], "--rm=true", vmntstr, self.args.build_image, jobjson["command"]] print("\n"+" ".join(docker_cmd)) if not self.args.test_mode: # subprocess.call(' '.join(docker_cmd), shell=True) try: subprocess.check_output(' '.join(docker_cmd), shell=True) except subprocess.CalledProcessError as ex1: print(ex1.output)
[docs] def queue_chronos_job(self): """puts the job on the chronos queue Using the chronos url from args.chronos, this creates a tmp .sh job that runs the curl statement to sent job to chronos """ self.print_chronos_job() curl_cmd = list(CURL_PREFIX) curl_cmd.extend(["-d@" + self.cjobfile]) if self.tmpdict["TMPLAUNCH"].find("schedule") > -1: curl_cmd.extend([self.args.chronos + "/scheduler/iso8601"]) else: curl_cmd.extend([self.args.chronos + "/scheduler/dependency"]) print(" ".join(curl_cmd)) print(self.cjobstr) if not self.args.test_mode: #subprocess.call(curl_cmd, shell=True) shfile = self.cjobfile.replace(".json", ".sh") with open(shfile, 'w') as outfile: outfile.write(" ".join(curl_cmd)) os.chmod(shfile, stat.S_IRWXU) subprocess.call(['sh', "-c", shfile]) os.remove(shfile)
[docs] def run_job(self): """Sends job to chronos job queue Using the chronos url from args.chronos, this creates a tmp .sh job that runs the curl statement to sent job to chronos """ if self.args.chronos == "LOCAL": self.run_local_job() elif self.args.chronos == "DOCKER": self.run_docker_job() else: self.queue_chronos_job()
[docs]def queue_starter_job(args, jobname='starter-jobname', dummy=1): """Queues a starter job. If dummy=1, creates and queues a dummy job that will never run, else it queues a simple job with a single print statement that will run immediately on which other jobs will depend Args: jobstr (str): contents of json job descriptor as single string. dummy (bool): 1, queue jobs that does not run, 0, queue job that does Returns: Job object """ myjob = Job('starter', args) if args.chronos == "LOCAL": return myjob elif args.chronos == "DOCKER": return myjob tmpdict = {'TMPLAUNCH': r'"schedule": "R1\/\/P3M"', 'TMPMSG': 'date | sed \\"s#^#TMPJOB begun #g\\"'} if dummy == 1: tmpdict['TMPLAUNCH'] = r'"schedule": "R1\/2200-01-01T06:00:00Z\/P3M"' tmpdict['TMPMSG'] = 'echo \\"TMPJOB was not supposed to run\\"' tmpdict['TMPJOB'] = jobname myjob.replace_jobtmp(tmpdict) myjob.queue_chronos_job() return myjob
[docs]def run_job_step(args, job_type, tmpdict): """Creates and runs a job. Using the tmpdict description of the job will create and queue a new job that runs when its dependencies finish in the correct mode Args: args (namespace): arguments from main_parse_args(). job_type (string): type of job to be created tmpdict (dict): dictionary with all of the arguments values required Returns: Job object """ myjob = Job(job_type, args) myjob.replace_jobtmp(tmpdict) myjob.run_job() return myjob
[docs]def chronos_parent_str(parentlist): """Returns correct string for parent dependencies. Formatting of returned string depends on number of parents Args: parentlist (list): names of parent jobs Returns: string """ return '"parents": {0}'.format(str(parentlist).replace("'", "\""))