from argparse import ArgumentParser
from datetime import datetime, timezone
import os
import sys
import csv
import subprocess
from collections import defaultdict
import yaml
#import numpy as np
#import scipy.sparse as ss
import config_utilities as cf
import redis_utilities as ru
import mysql_utilities as mu
import sanitize_utilities as su
[docs]def get_gg(db, et, taxon):
"""Get gene-gene nodes.
"""
return db.run("SELECT s.n1_id, s.n2_id, s.weight, s.et_name, rl.file_id, rl.line_num "
"FROM status s JOIN node_species n1 ON s.n1_id = n1.node_id "
"JOIN node_species n2 ON s.n2_id = n2.node_id "
"JOIN raw_line rl ON s.line_hash = rl.line_hash "
"WHERE s.et_name = '{}' AND n1.taxon = {} AND n2.taxon = {} "
"AND s.status = 'production';".format(et, taxon, taxon))
[docs]def get_pg(db, et, taxon):
"""Get property-gene nodes.
"""
return db.run("SELECT s.n1_id, s.n2_id, s.weight, s.et_name, rl.file_id, rl.line_num "
"FROM status s JOIN node_species n2 ON s.n2_id = n2.node_id "
"JOIN raw_line rl ON s.line_hash = rl.line_hash "
"WHERE s.et_name = '{}' AND n2.taxon = {} "
"AND s.status = 'production';".format(et, taxon))
[docs]def num_connected_components(edges, nodes):
"""Count the number of connected components in a graph given the edges and the nodes.
"""
nodes = list(nodes)
rev_nodes = {v: i for i, v in enumerate(nodes)}
row = []
col = []
for edge in edges:
r, c = edge[:2]
row.append(rev_nodes[r])
col.append(rev_nodes[c])
return -1
# mat = ss.coo_matrix((np.ones(len(edges)), (row, col)), shape=(len(nodes), len(nodes)))
# num, _ = ss.csgraph.connected_components(mat)
# return num
[docs]def norm_edges(edges, args):
"""Normalizes and cleans edges according to the specified arguments.
"""
lines = []
lines.append(len(edges))
if args.make_unweighted:
edges = su.make_network_unweighted(edges, 2)
lines.append(len(edges))
if args.make_undirected: #TODO: less important, yes, no, auto
edges = su.make_network_undirected(edges)
lines.append(len(edges))
edges = su.sort_network(edges)
lines.append(len(edges))
edges = su.drop_duplicates_by_type_or_node(edges, 0, 1, 3)
lines.append(len(edges))
if args.make_undirected: #TODO: less important, yes, no, auto
edges = su.upper_triangle(edges, 0, 1)
lines.append(len(edges))
edges = su.normalize_network_by_type(edges, 3, 2) #TODO: none, all, type
lines.append(len(edges))
return edges, lines
[docs]def convert_nodes(args, nodes):
"""Uses redis_utilities to convert a set of nodes.
"""
rdb = ru.get_database(args)
return ru.get_node_info(rdb, nodes, None, None, args.species)
[docs]def get_sources(edges):
"""Given a list of edges, determines the set of sources included.
"""
return set(edge[4] for edge in edges)
def get_log_query(sources):
return "SELECT filename, info_type, info_value FROM log WHERE filename IS NULL"
[docs]def should_skip(cls, res):
"""Determine if the subnetwork is especially small, and if we should skip it.
"""
return (cls == 'Property' and len(res) < 4000) or (cls == 'Gene' and len(res) < 125000)
[docs]def main():
"""Parses arguments and then exports the specified subnetworks.
"""
parser = ArgumentParser()
parser = cf.add_config_args(parser)
parser = su.add_config_args(parser)
parser.add_argument("-e", "--edge_type", help="Edge type")
parser.add_argument("-s", "--species", help="Species")
args = parser.parse_args()
db = mu.get_database(args=args)
db.use_db("KnowNet")
cls, bidir = figure_out_class(db, args.edge_type)
edges_fn = '{}.{}.edge'.format(args.species, args.edge_type)
nodes_fn = '{}.{}.node_map'.format(args.species, args.edge_type)
meta_fn = '{}.{}.metadata'.format(args.species, args.edge_type)
bucket_dir = os.path.join(cls, args.species, args.edge_type)
sync_dir = os.path.join(args.working_dir, args.export_path, bucket_dir)
sync_edges = os.path.join(sync_dir, edges_fn)
sync_nodes = os.path.join(sync_dir, nodes_fn)
sync_meta = os.path.join(sync_dir, meta_fn)
if not args.force_fetch and all(map(os.path.exists, [sync_edges, sync_nodes, sync_meta])):
print("Files already exist. Skipping.")
return
get = get_gg if cls == 'Gene' else get_pg
res = get(db, args.edge_type, args.species)
print("ProductionLines: " + str(len(res)))
if not args.force_fetch and should_skip(cls, res):
print('Skipping {}.{}'.format(args.species, args.edge_type))
return
res, lines = norm_edges(res, args)
n1des = list(set(i[0] for i in res))
n2des = list(set(i[1] for i in res))
n1des_desc = convert_nodes(args, n1des)
n2des_desc = convert_nodes(args, n2des)
nodes_desc = set(n1des_desc) | set(n2des_desc)
metadata = get_metadata(db, res, nodes_desc, lines, args.species, args.edge_type, args)
db.close()
os.makedirs(sync_dir, exist_ok=True)
with open(sync_edges, 'w') as file:
csvw = csv.writer(file, delimiter='\t')
csvw.writerows(res)
with open(sync_nodes, 'w', encoding='utf-8') as file:
csvw = csv.writer(file, delimiter='\t')
csvw.writerows(nodes_desc)
with open(sync_meta, 'w') as file:
yaml.dump(metadata, file, default_flow_style=False)
if __name__ == "__main__":
main()