#!/usr/bin/env python3
"""Utiliites for running single or multiple steps of the setup or data pipeline
either locally , in docker, or on the cloud.
Contains module functions::
list_sources(args)
generic_dict(args, ns_parent=None)
run_check(args)
run_fetch(args)
run_table(args)
run_map(args)
main_parse_args()
main()
Attributes:
DEFAULT_START_STEP (str): first step of setup
POSSIBLE_STEPS (list): list of all steps
SETUP_FILES (list): list of setup SrcClasses
SPECIAL_MODES (list): list of modes that run breadth first
Examples:
To view all optional arguments that can be specified::
$ python3 code/workflow_utilities.py -h
To run just check step of one setup src (e.g. ppi) locally::
$ python3 code/workflow_utilities.py CHECK -su -os -c LOCAL -p ppi
To run all steps of setup on cloud::
$ python3 code/workflow_utilities.py CHECK -su
To run all steps one pipeline src (e.g. kegg) locally::
$ python3 code/workflow_utilities.py CHECK -os -c LOCAL -p kegg
"""
import os
import sys
import json
import time
from argparse import ArgumentParser
import config_utilities as cf
import mysql_utilities as db
import job_utilities as ju
DEFAULT_START_STEP = 'CHECK'
POSSIBLE_STEPS = ['CHECK', 'FETCH', 'TABLE', 'MAP', 'IMPORT', 'EXPORT']
SETUP_FILES = ['ppi', 'ensembl']
SPECIAL_MODES = ['LOCAL', 'DOCKER']
[docs]def main_parse_args():
"""Processes command line arguments.
Expects one argument (start_step) and a number of optional arguments. If
argument is missing, supplies default value.
.. csv-table::
:header: parameter,argument type,flag,description
:widths: 4,2,2,12
:delim: |
[start_step] | | |string indicating which pipeline stage to start with
--setup | |-su |run db inits instead of source specific pipelines
--one_step | |-os |run for a single step instead of rest of pipeline
--step_parameters |str |-p |parameters to specify calls of a single step in pipeline
--no_ensembl | |-ne |do not run ensembl in setup pipeline
--dependencies |str |-d |names of parent jobs that must finish
Returns:
Namespace: args as populated namespace
"""
parser = ArgumentParser()
parser.add_argument('start_step', default=DEFAULT_START_STEP,
help='start step, must be ' + str(POSSIBLE_STEPS))
parser.add_argument('-su', '--setup', default=False, action='store_true',
help='run db inits instead of source specific pipelines')
parser.add_argument('-os', '--one_step', default=False, action='store_true',
help='run for a single step instead of pipeline')
parser.add_argument('-p', '--step_parameters', default='',
help='parameters to specify calls of a single step in pipeline')
parser.add_argument('-ne', '--no_ensembl', action='store_true', default=False,
help='do not run ensembl in setup pipeline', )
parser.add_argument('-d', '--dependencies', default='',
help='names of parent jobs that must finish')
parser = cf.add_config_args(parser)
args = parser.parse_args()
config_opts = sys.argv[1:]
for opt in [args.start_step, '-p', '--step_parameters', args.step_parameters,
'-d', '--dependencies', args.dependencies]:
if opt in config_opts:
config_opts.remove(opt)
workflow_opts = []
for opt in ['-su', '--setup', '-os', '--one_step', '-ne', '--no_ensembl']:
if opt in config_opts:
config_opts.remove(opt)
workflow_opts.extend([opt])
args.time_stamp = time.strftime('_%m-%d_%H-%M-%S')
args.config_opts = " ".join(config_opts)
args.workflow_opts = " ".join(workflow_opts)
args.working_dir = args.working_dir.rstrip('/')
if args.storage_dir:
args.storage_dir = args.storage_dir.rstrip('/')
return args
[docs]def list_sources(args):
""" creates a list of all sources for step to process
Depending on args.setup, loops through all sources in the srccode
directory pulling out valid names or return SETUP_FILES
Args:
args (Namespace): args as populated namespace from parse_args
"""
src_list = []
if args.step_parameters == "":
if args.setup:
for srcstr in SETUP_FILES:
if srcstr == 'ensembl' and args.no_ensembl:
continue
src_list.extend([srcstr])
else:
local_src_code_dir = os.path.join(args.code_path, args.src_path)
if not os.path.exists(local_src_code_dir):
raise IOError("ERROR: cannot find {0}!".format(local_src_code_dir))
src_pys_list = sorted(os.listdir(local_src_code_dir))
for filename in src_pys_list:
if not filename.endswith(".py"):
continue
if 'utilities' in filename:
continue
srcstr = os.path.splitext(filename)[0]
if srcstr in SETUP_FILES:
continue
src_list.extend([srcstr])
else:
src_list = args.step_parameters.split(",,")
return sorted(src_list)
[docs]def generic_dict(args, ns_parent=None):
""" Creates a dictionary to specify variables for a job
Creates a dictionary used to substitute temporary job variables in the specification
of the command line call. ns_parent should be defined for only a next step caller
job.
Args:
args (Namespace): args as populated namespace from parse_args
Returns:
dict: tmp substitution dictionary with appropriate values depending on args
"""
job_dict = {'TMPLAUNCH': r'"schedule": "R1\/\/P3M"',
'TMPWORKDIR': args.working_dir,
'TMPDATAPATH': args.data_path,
'TMPCODEPATH': args.code_path,
'TMPLOGSPATH': args.logs_path,
'TMPOPTS': args.config_opts,
'TMPSHAREDIR': args.storage_dir,
'TMPSHAREBOOL': 'false',
'TMPIMG': args.build_image
}
if ns_parent is None: # regular job
if args.dependencies != "": # continuation job
job_dict['TMPLAUNCH'] = ju.chronos_parent_str(args.dependencies.split(",,"))
else: # next step caller job
job_dict['TMPLAUNCH'] = ju.chronos_parent_str([ns_parent])
if args.storage_dir and args.storage_dir != args.working_dir:
job_dict['TMPSHAREBOOL'] = 'true'
else:
job_dict['TMPSHAREDIR'] = job_dict['TMPWORKDIR']
return job_dict
[docs]def run_check(args):
"""Runs checks for all sources.
This loops through args.parameters sources, creates a job for each that calls
check_utilities clean() (and if not args.one_step, calls workflow_utilities
FETCH), and runs job in args.chronos location.
Args:
args (Namespace): args as populated namespace from parse_args
"""
ctr = 0
src_list = list_sources(args)
ns_parameters = []
step_job = ju.Job("checker", args)
for module in src_list:
ctr += 1
print(str(ctr) + "\t" + module)
jobname = "-".join(["check", module])
jobname = jobname.replace(".", "-")
jobdict = generic_dict(args, None)
jobdict.update({'TMPJOB': jobname,
'TMPSRC': module
})
step_job = ju.run_job_step(args, "checker", jobdict)
ns_parameters.extend([module])
if not args.one_step and args.chronos not in SPECIAL_MODES:
ns_jobname = "-".join([jobname, "next_step"])
ns_dict = generic_dict(args, step_job.jobname)
ns_dict.update({'TMPJOB': ns_jobname,
'TMPNEXTSTEP': "FETCH",
'TMPSTART': module,
'TMPOPTS': " ".join([args.config_opts, args.workflow_opts,
'-d', ns_jobname])
})
ju.run_job_step(args, "next_step_caller", ns_dict)
if not args.one_step and args.chronos in SPECIAL_MODES and ns_parameters:
ns_dict = generic_dict(args, step_job.jobname)
ns_dict.update({'TMPJOB': "-".join(["check", "next_step"]),
'TMPSTART': ",,".join(ns_parameters),
'TMPNEXTSTEP': "FETCH",
'TMPOPTS': " ".join([args.config_opts, args.workflow_opts,
'-d', "-".join(["check", "next_step"])])
})
tmpargs = args
tmpargs.chronos = "LOCAL"
ju.run_job_step(tmpargs, "next_step_caller", ns_dict)
return 0
[docs]def run_fetch(args):
"""Runs fetches for all aliases of a single source.
This loops through aliases of args.parameters sources, creates a job for
each that calls fetch_utilities main() (and if not args.one_step, calls
workflow_utilities TABLE), and runs job in args.chronos location.
Args:
args (Namespace): args as populated namespace from parse_args, must
specify --step_parameters(-p) as ',,' separated list of sources
"""
src_list = list_sources(args)
ns_parameters = []
step_job = ju.Job("fetcher", args)
for src in src_list:
local_src_dir = os.path.join(args.working_dir, args.data_path, src)
if not os.path.exists(local_src_dir):
raise IOError("ERROR: source specified with --step_parameters (-p) option, \
{0}, does not have data directory: {1}".format(src, local_src_dir))
alias_ctr = 0
if args.chronos not in SPECIAL_MODES:
for alias in sorted(os.listdir(local_src_dir)):
jobname = "-".join(["fetch", src, alias])
jobname = jobname.replace(".", "-")
jobdict = generic_dict(args, None)
jobdict.update({'TMPJOB': jobname,
'TMPLAUNCH': r'"schedule": "R1\/2200-01-01T06:00:00Z\/P3M"'
})
ju.run_job_step(args, "placeholder", jobdict)
for alias in sorted(os.listdir(local_src_dir)):
alias_path = os.path.join(src, alias)
local_alias_dir = os.path.join(local_src_dir, alias)
alias_ctr += 1
print("\t".join([src, str(alias_ctr), alias]))
## check for dependencies
parents = []
if args.dependencies != "":
parents = args.dependencies.split(",,")
metadata_file = os.path.join(local_alias_dir, "file_metadata.json")
if not os.path.isfile(metadata_file):
raise IOError("ERROR: Missing {0}".format(metadata_file))
version_dict = {}
with open(metadata_file, 'r') as infile:
version_dict = json.load(infile)
dependencies = version_dict["dependencies"]
ismap = version_dict["is_map"]
fetch_needed = version_dict["fetch_needed"] or args.force_fetch
if dependencies:
for dep in dependencies:
parent_string = "-".join(["fetch", src, dep])
parents.extend([parent_string])
launchstr = r'"schedule": "R1\/\/P3M"'
if parents:
launchstr = ju.chronos_parent_str(parents)
jobname = "-".join(["fetch", src, alias])
jobname = jobname.replace(".", "-")
jobdict = generic_dict(args, None)
jobdict.update({'TMPJOB': jobname,
'TMPLAUNCH': launchstr,
'TMPALIASPATH': alias_path
})
step_job = ju.run_job_step(args, "fetcher", jobdict)
if not ismap and fetch_needed:
ns_parameters.extend([",".join([src, alias])])
if not args.setup and not args.one_step and not ismap and \
args.chronos not in SPECIAL_MODES and fetch_needed:
ns_jobname = "-".join([jobname, "next_step"])
ns_dict = generic_dict(args, step_job.jobname)
ns_dict.update({'TMPJOB': ns_jobname,
'TMPNEXTSTEP': "TABLE",
'TMPSTART': ",".join([src, alias]),
'TMPOPTS': " ".join([args.config_opts, args.workflow_opts,
'-d', ns_jobname])
})
ju.run_job_step(args, "next_step_caller", ns_dict)
if not args.setup and not args.one_step and args.chronos in SPECIAL_MODES \
and ns_parameters:
ns_dict = generic_dict(args, step_job.jobname)
ns_dict.update({'TMPJOB': "-".join(["fetch", "next_step"]),
'TMPNEXTSTEP': "TABLE",
'TMPSTART': ",,".join(ns_parameters),
'TMPOPTS': " ".join([args.config_opts, args.workflow_opts,
'-d', "-".join(["fetch", "next_step"])])
})
tmpargs = args
tmpargs.chronos = "LOCAL"
ju.run_job_step(tmpargs, "next_step_caller", ns_dict)
return 0
[docs]def run_table(args):
"""Runs tables for all chunks of a single source alias.
This loops through chunks of args.parameters aliases, creates a job for
each that calls table_utilities main() (and if not args.one_step, calls
workflow_utilities MAP), and runs job in args.chronos location.
Args:
args (Namespace): args as populated namespace from parse_args, must
specify --step_parameters(-p) as ',,' separated list of
'source,alias'
"""
alias_list = args.step_parameters.split(",,")
if args.step_parameters == "":
raise ValueError("ERROR: 'source,alias' must be specified with --step_parameters (-p)")
ns_parameters = []
step_job = ju.Job("tabler", args)
for pair in alias_list:
src, alias = pair.split(",")
alias_path = os.path.join(src, alias)
local_chunk_dir = os.path.join(args.working_dir, args.data_path, alias_path, "chunks")
if not os.path.exists(local_chunk_dir):
raise IOError('ERROR: "source,alias" specified with --step_parameters '
'(-p) option, ' + pair + ' does not have chunk directory:'
+ local_chunk_dir)
chunk_ctr = 0
for chunk_name in sorted(os.listdir(local_chunk_dir)):
if "raw_line" not in chunk_name or "unique" in chunk_name:
continue
output_files = chunk_name.replace('.raw_line.', '.*.')
chunk_ctr += 1
print("\t".join([str(chunk_ctr), chunk_name]))
jobname = "-".join(["table", chunk_name])
jobname = jobname.replace(".", "-")
jobname = jobname.replace(".txt", "")
jobdict = generic_dict(args, None)
jobdict.update({'TMPJOB': jobname,
'TMPALIASPATH': alias_path,
'TMPCHUNK': os.path.join("chunks", chunk_name),
'TMPFILES': os.path.join("chunks", output_files)
})
step_job = ju.run_job_step(args, "tabler", jobdict)
ns_parameters.extend([chunk_name.replace('.raw_line.', '.table.')])
if not args.setup and not args.one_step and args.chronos not in SPECIAL_MODES:
ns_jobname = "-".join([jobname, "next_step"])
ns_dict = generic_dict(args, step_job.jobname)
ns_dict.update({'TMPJOB': ns_jobname,
'TMPNEXTSTEP': "MAP",
'TMPSTART': chunk_name.replace('.raw_line.', '.table.'),
'TMPOPTS': " ".join([args.config_opts, args.workflow_opts,
'-d', ns_jobname])
})
ju.run_job_step(args, "next_step_caller", ns_dict)
if not args.setup and not args.one_step and args.chronos in SPECIAL_MODES and \
ns_parameters:
ns_dict = generic_dict(args, step_job.jobname)
ns_dict.update({'TMPJOB': "-".join(["table", "next_step"]),
'TMPNEXTSTEP': "MAP",
'TMPSTART': ",,".join(ns_parameters),
'TMPOPTS': " ".join([args.config_opts, args.workflow_opts,
'-d', "-".join(["table", "next_step"])])
})
tmpargs = args
tmpargs.chronos = "LOCAL"
ju.run_job_step(tmpargs, "next_step_caller", ns_dict)
return 0
[docs]def run_map(args):
"""Runs id conversion for a single .table. file on the cloud.
This loops through args.parameters tablefiles, creates a job for each that
calls conv_utilities main(), and runs job in args.chronos location.
Args:
args (Namespace): args as populated namespace from parse_args, must
specify --step_parameters(-p) as ',,' separated list of
'source.alias.table.chunk.txt' file names
"""
tablefile_list = args.step_parameters.split(",,")
if args.step_parameters == "":
raise ValueError("ERROR: 'tablefile' must be specified with --step_parameters (-p)")
ju.Job("mapper", args)
ctr = 0
for filestr in tablefile_list:
tablefile = os.path.basename(filestr)
output_files = tablefile.replace('.table.', '.*.')
src = tablefile.split('.')[0]
alias = tablefile.split('.table.')[0].replace(src+'.', '', 1)
chunk_path = os.path.join(src, alias, "chunks")
local_chunk_dir = os.path.join(args.working_dir, args.data_path, chunk_path)
local_tablefile = os.path.join(local_chunk_dir, tablefile)
if not os.path.exists(local_tablefile):
raise IOError('ERROR: "tablefile" specified with --step_parameters (-p) '
'option, ' + filestr + ' does not exist: ' + local_tablefile)
ctr += 1
print("\t".join([str(ctr), tablefile]))
jobname = "-".join(["map", tablefile])
jobname = jobname.replace(".", "-")
jobname = jobname.replace(".txt", "")
jobdict = generic_dict(args, None)
jobdict.update({'TMPJOB': jobname,
'TMPTABLEPATH': os.path.join(chunk_path, tablefile),
'TMPFILES': os.path.join(chunk_path, output_files)
})
ju.run_job_step(args, "mapper", jobdict)
return 0
[docs]def run_import(args):
"""Merges sorted files and runs import on output file on the cloud.
This loops through args.step_parameters (see Args below), and creates a job
for each that merges the already sorted and unique files found in the data
path (if args.merge is True), then calls import_utilities main().
Args:
args (Namespace): args as populated namespace from parse_args,
specify --step_parameters(-p) as ',,' separated list of files to
import or the allowed possible SQL table names: node, node_meta,
edge2line, status, or edge_meta. If not specified, by default it
will try to import all tables.
"""
importfile_list = args.step_parameters.split(",,")
tables = ['node', 'node_meta', 'edge2line', 'status', 'edge_meta', 'edge', 'raw_line']
if args.step_parameters == "":
importfile_list = tables
ju.Job("importer", args)
ctr = 0
for importfile in importfile_list:
if importfile in tables:
mergefile = 'unique.' + importfile + '.txt'
output_files = mergefile
filestr = importfile
else:
output_files = importfile
filestr = os.path.basename(importfile)
if not os.path.exists(importfile):
raise IOError('ERROR: "importfile" specified with --step_parameters (-p) '
'option, ' + importfile + ' does not exist: ' + importfile)
ctr += 1
print("\t".join([str(ctr), filestr]))
jobname = "-".join(["import", filestr])
jobname = jobname.replace(".", "-")
jobname = jobname.replace(".txt", "")
jobdict = generic_dict(args, None)
base_dir = os.path.join(jobdict['TMPWORKDIR'], jobdict['TMPDATAPATH'])
output_files.replace(base_dir, '')
jobdict.update({'TMPJOB': jobname,
'TMPIMPORTPATH': importfile,
'TMPFILES': output_files
})
ju.run_job_step(args, "importer", jobdict)
return 0
[docs]def run_export(args):
"""
TODO: Documentationr.
Args:
args (Namespace): args as populated namespace from parse_args,
specify --step_parameters(-p) as ',,' separated list of files to
import or the allowed possible SQL table names: node, node_meta,
edge2line, status, or edge_meta. If not specified, by default it
will try to import all tables.
"""
export_list = args.step_parameters.split(",,")
for num, expair in enumerate(export_list):
print("\t".join([str(num), expair]))
species, etype = expair.split('::')
jobname = "-".join(["export", expair])
jobname = jobname.replace(".", "-")
jobname = jobname.replace("::", "--")
jobdict = generic_dict(args, None)
jobdict.update({'TMPJOB': jobname,
'TMPTAXON': species,
'TMPETYPE': etype,
})
ju.run_job_step(args, "exporter", jobdict)
return 0
[docs]def main():
"""Runs the 'start_step' step of the main or args.setup pipeline on the
args.chronos location, and all subsequent steps if not args.one_step
Parses the arguments and runs the specified part of the pipeline using the
specified local or cloud resources.
"""
args = main_parse_args()
stage = 'PARSE'
init_job = ''
if args.dependencies == "":
if args.setup:
knownet = db.MySQL(None, args)
knownet.init_knownet()
stage = 'SETUP'
elif args.start_step == 'IMPORT':
stage = 'IMPORT'
jobdict = generic_dict(args, None)
jobdict['TMPJOB'] = "KN_starter_" + stage + args.time_stamp
jobdict['TMPLAUNCH'] = r'"schedule": "R1\/2200-01-01T06:00:00Z\/P3M"'
file_setup_job = ju.run_job_step(args, "file_setup", jobdict)
args.dependencies = file_setup_job.jobname
init_job = file_setup_job.jobname
if args.setup:
if args.start_step == 'CHECK':
run_check(args)
elif args.start_step == 'FETCH':
run_fetch(args)
else:
if args.start_step == 'CHECK':
run_check(args)
elif args.start_step == 'FETCH':
run_fetch(args)
elif args.start_step == 'TABLE':
run_table(args)
elif args.start_step == 'MAP':
run_map(args)
elif args.start_step == 'IMPORT':
run_import(args)
elif args.start_step == 'EXPORT':
run_export(args)
else:
print(args.start_step + ' is an unacceptable start_step. Must be ' +
str(POSSIBLE_STEPS))
if init_job != '' and args.chronos not in SPECIAL_MODES:
args.dependencies = ""
jobdict = generic_dict(args, None)
jobdict['TMPJOB'] = "KN_starter_" + stage + args.time_stamp
file_setup_job = ju.run_job_step(args, "file_setup", jobdict)
if __name__ == "__main__":
main()