#!/usr/bin/env python3
"""Utiliites for interacting with the KnowEnG Redis db through python.
Contains module functions::
get_database(args=None)
import_ensembl(alias, args=None)
conv_gene(rdb, foreign_key, hint, taxid)
"""
import json
import os
from argparse import ArgumentParser
import subprocess
import csv
import redis
import config_utilities as cf
MGET_CHUNK = 5000
[docs]def deploy_container(args=None):
"""Deplays a container with marathon running Redis using the specified
args.
This replaces the placeholder args in the json describing how to deploy a
container running Redis with those supplied in the users arguements.
Args:
args (Namespace): args as populated namespace or 'None' for defaults
"""
if args is None:
args = cf.config_args()
deploy_dir = os.path.join(args.working_dir, args.logs_path, 'marathon_jobs')
if not os.path.exists(deploy_dir):
os.makedirs(deploy_dir)
template_job = os.path.join(args.code_path, 'marathon', 'redis.json')
with open(template_job, 'r') as infile:
deploy_dict = json.load(infile)
deploy_dict["id"] = os.path.basename(args.redis_dir)
deploy_dict["cmd"] = "redis-server --appendonly yes --requirepass " + \
args.redis_pass + " --port " + args.redis_port
deploy_dict["cpus"] = float(args.redis_cpu)
deploy_dict["mem"] = int(args.redis_mem)
if args.redis_host is not cf.DEFAULT_REDIS_URL:
deploy_dict["constraints"] = [["hostname", "CLUSTER", args.redis_host]]
else:
deploy_dict["constraints"] = []
deploy_dict["container"]["volumes"][0]["hostPath"] = args.redis_dir
out_path = os.path.join(deploy_dir, "kn_redis-" + args.redis_port +'.json')
with open(out_path, 'w') as outfile:
outfile.write(json.dumps(deploy_dict))
job = 'curl -fX POST -H "Content-type: application/json" ' + args.marathon + "/v2/apps -d '"
job += json.dumps(deploy_dict) + "'"
if not args.test_mode:
try:
subprocess.check_output(job, shell=True)
except subprocess.CalledProcessError as ex1:
print(ex1.output)
else:
print(job)
[docs]def get_database(args=None):
"""Returns a Redis database connection.
This returns a Redis database connection access to its functions if the
module is imported.
Args:
args (Namespace): args as populated namespace or 'None' for defaults
Returns:
StrictRedis: a redis connection object
"""
if args is None:
args = cf.config_args()
return redis.StrictRedis(host=args.redis_host, port=args.redis_port,
password=args.redis_pass)
[docs]def import_ensembl(alias, args=None):
"""Imports the ensembl data for the provided alias into the Redis database.
This stores the foreign key to ensembl stable ids in the Redis database.
It uses the all mappings dictionary created by mysql.query_all_mappings
for alias. This then iterates through each foreign_key. If the foreign_key
has not been seen before, it sets unique:foreign_key as the stable id. If
the key has been seen before and maps to a different ensembl stable id, it
sets the value for unique:foreign_key as unmapped:many. In each case, it
sets the value of taxid:hint:foreign_key as the stable_id, and appends
taxid:hint to the set with foreign_key as the key.
Args:
alias (str): An alias defined in ensembl.aliases.
args (Namespace): args as populated namespace or 'None' for defaults
"""
if args is None:
args = cf.config_args()
rdb = get_database(args)
map_dir = os.path.join(args.working_dir, args.data_path, cf.DEFAULT_MAP_PATH)
with open(os.path.join(map_dir, alias + '_all.json')) as infile:
map_dict = json.load(infile)
for key in map_dict:
(taxid, _, _, hint, foreign_key) = key.split('::')
hint = hint.upper()
ens_id = map_dict[key].upper()
foreign_key = foreign_key.upper()
keystr = 'unique::' + foreign_key
rkey = rdb.getset(keystr, ens_id)
if rkey is not None and rkey.decode() != ens_id:
rdb.set(keystr, 'unmapped-many')
keystr = 'hint::' + foreign_key + '::' + hint
rkey = rdb.getset(keystr, ens_id)
if rkey is not None and rkey.decode() != ens_id:
rdb.set(keystr, 'unmapped-many')
keystr = 'taxon::' + foreign_key + '::' + taxid
rkey = rdb.getset(keystr, ens_id)
if rkey is not None and rkey.decode() != ens_id:
rdb.set(keystr, 'unmapped-many')
keystr = 'triplet::' + foreign_key + '::' + taxid + '::' + hint
rkey = rdb.getset(keystr, ens_id)
if rkey is not None and rkey.decode() != ens_id:
rdb.set(keystr, 'unmapped-many')
if hint == 'WIKIGENE': # to replace integer aliases with strings
try:
int(rdb.get('::'.join(['stable', ens_id, 'alias'])))
except TypeError:
rdb.set('::'.join(['stable', ens_id, 'alias']), foreign_key)
except ValueError:
pass
else:
rdb.set('::'.join(['stable', ens_id, 'alias']), foreign_key)
[docs]def import_gene_nodes(node_table, args=None):
"""Import gene node metadata into redis.
"""
if args is None:
args = cf.config_args()
rdb = get_database(args)
for row in node_table:
node_id, node_desc, node_type = row
node_id = node_id.upper()
rdb.set('::'.join(['stable', node_id, 'desc']), node_desc)
rdb.set('::'.join(['stable', node_id, 'type']), node_type)
[docs]def get_node_info(rdb, fk_array, ntype, hint, taxid):
"""Uses the redis database to convert a node alias to KN internal id
Figures out the type of node for each id in fk_array and then returns
all of the metadata associated or unmapped-*
Args:
rdb (redis object): redis connection to the mapping db
fk_array (list): the array of foreign gene identifers to be translated
ntype (str): 'Gene' or 'Property' or None
hint (str): a hint for conversion
taxid (str): the species taxid, None if unknown
Returns:
list: list of lists containing 5 col info for each mapped gene
"""
hint = None if hint == '' or hint is None else hint.upper()
taxid = None if taxid == '' or taxid is None else str(taxid)
if ntype == '':
ntype = None
if ntype is None:
res_arr = rdb.mget(['::'.join(['stable', str(fk), 'type']) for fk in fk_array])
fk_prop = [fk for fk, res in zip(fk_array, res_arr) if res is not None
and res.decode() == 'Property']
fk_gene = [fk for fk, res in zip(fk_array, res_arr) if res is not None
and res.decode() == 'Gene']
if fk_prop and fk_gene:
raise ValueError("Mixture of property and gene nodes.")
ntype = 'Property' if fk_prop else 'Gene'
if ntype == "Gene":
stable_array = conv_gene(rdb, fk_array, hint, taxid)
elif ntype == "Property":
stable_array = fk_array
else:
raise ValueError("Invalid ntype")
return list(zip(fk_array, *node_desc(rdb, stable_array)))
[docs]def conv_gene(rdb, fk_array, hint, taxid):
"""Uses the redis database to convert a gene to ensembl stable id
This checks first if there is a unique name for the provided foreign key.
If not it uses the hint and taxid to try and filter the foreign key
possiblities to find a matching stable id.
Args:
rdb (redis object): redis connection to the mapping db
fk_array (list): the foreign gene identifers to be translated
hint (str): a hint for conversion
taxid (str): the species taxid, 'unknown' if unknown
Returns:
str: result of searching for gene in redis DB
"""
hint = None if hint == '' or hint is None else hint.upper()
taxid = None if taxid == '' or taxid is None else str(taxid)
#use ensembl internal uniprot mappings
if hint == 'UNIPROT' or hint == 'UNIPROTKB':
hint = 'UNIPROT_GN'
ret_stable = ['unmapped-none'] * len(fk_array)
def replace_none(ret_st, pattern):
"""Search redis for genes that still are unmapped
"""
curr_none = [i for i in range(len(fk_array)) if ret_st[i] == 'unmapped-none']
while curr_none:
temp_curr_none = curr_none[:MGET_CHUNK]
curr_none = curr_none[MGET_CHUNK:]
vals_array = rdb.mget([pattern.format(str(fk_array[i]).upper(), taxid, hint)
for i in temp_curr_none])
for i, val in zip(temp_curr_none, vals_array):
if val is None:
continue
ret_st[i] = val.decode()
if hint is not None and taxid is not None:
replace_none(ret_stable, 'triplet::{0}::{1}::{2}')
if taxid is not None:
replace_none(ret_stable, 'taxon::{0}::{1}')
if hint is not None:
replace_none(ret_stable, 'hint::{0}::{2}')
if taxid is None:
replace_none(ret_stable, 'unique::{0}')
return ret_stable
[docs]def node_desc(rdb, stable_array):
"""Uses the redis database to find metadata about node given its stable id
Return all metadata for each element of stable_array
Args:
rdb (redis object): redis connection to the mapping db
stable_array (str): the array of stable identifers to be searched
Returns:
list: list of lists containing 4 col info for each mapped node
"""
ret_type = ["None"] * len(stable_array)
ret_alias = ["unmapped-none"] * len(stable_array)
ret_desc = ["unmapped-none"] * len(stable_array)
ret_biotype = ["unmapped-none"] * len(stable_array)
st_map_idxs = [idx for idx, st in enumerate(stable_array) if not st.startswith('unmapped')]
if st_map_idxs:
vals_array = rdb.mget(['::'.join(['stable', stable_array[i], 'type']) for i in st_map_idxs])
for i, val in zip(st_map_idxs, vals_array):
if val is None:
continue
ret_type[i] = val.decode()
vals_array = rdb.mget(['::'.join(['stable', stable_array[i], 'alias']) for i in st_map_idxs])
for i, val in zip(st_map_idxs, vals_array):
if val is None:
continue
ret_alias[i] = val.decode()
vals_array = rdb.mget(['::'.join(['stable', stable_array[i], 'desc']) for i in st_map_idxs])
for i, val in zip(st_map_idxs, vals_array):
if val is None:
continue
ret_desc[i] = val.decode()
vals_array = rdb.mget(['::'.join(['stable', stable_array[i], 'biotype']) for i in st_map_idxs])
for i, val in zip(st_map_idxs, vals_array):
if val is None:
continue
ret_biotype[i] = val.decode()
return stable_array, ret_type, ret_alias, ret_desc, ret_biotype
[docs]def main():
"""Deploy a Redis container using marathon with the provided command line
arguements.
This uses the provided command line arguments and the defaults found in
config_utilities to launch a Redis docker container using marathon.
"""
parser = ArgumentParser()
parser = cf.add_config_args(parser)
args = parser.parse_args()
deploy_container(args)
if __name__ == "__main__":
main()