#!/usr/bin/env python3
"""Utiliites for interacting with the KnowEnG MySQL db through python.
Contains the class MySQL which provides functionality for interacting with
MySQL database
Contains module functions::
combine_tables(alias, args=None)
create_dictionary(results)
import_nodes(version_dict, args=None)
query_all_mappings(version_dict, args=None)
create_mapping_dicts(version_dict, args=None)
get_database(db=None, args=None)
get_insert_cmd(step)
import_ensembl(alias, args=None)
"""
import os
import json
import subprocess
import shutil
from argparse import ArgumentParser
import config_utilities as cf
import mysql.connector as sql
[docs]def deploy_container(args=None):
"""Deplays a container with marathon running MySQL using the specified
args.
This replaces the placeholder args in the json describing how to deploy a
container running mysql with those supplied in the users arguements.
Args:
args (Namespace): args as populated namespace or 'None' for defaults
"""
if args is None:
args = cf.config_args()
# put mysql config conf_path on storage directory if exists
mysql_dir = os.path.join(args.working_dir, args.data_path, 'mysql_config')
if args.storage_dir:
mysql_dir = os.path.join(args.storage_dir, args.data_path, 'mysql_config')
# set up config directory
conf_path = os.path.join(mysql_dir, args.mysql_conf)
if not os.path.exists(conf_path):
os.makedirs(conf_path)
os.chmod(os.path.dirname(mysql_dir), 0o777)
conf_template = os.path.join(args.code_path, 'mysql', args.mysql_conf)
shutil.copy(os.path.join(conf_template, 'my.cnf'), os.path.join(conf_path, 'my.cnf'))
# add port to end of config file
with open(os.path.join(conf_path, 'my.cnf'), "a") as cnffile:
cnffile.write("port = " + args.mysql_port)
# modify marathon job template
template_job = os.path.join(args.code_path, 'marathon', 'mysql.json')
with open(template_job, 'r') as infile:
deploy_dict = json.load(infile)
deploy_dict["id"] = os.path.basename(args.mysql_dir)
deploy_dict["cpus"] = float(args.mysql_cpu)
deploy_dict["mem"] = int(args.mysql_mem)
if args.mysql_host is not cf.DEFAULT_MYSQL_URL:
deploy_dict["constraints"] = [["hostname", "CLUSTER", args.mysql_host]]
else:
deploy_dict["constraints"] = []
deploy_dict["container"]["volumes"][0]["hostPath"] = args.mysql_dir
deploy_dict["container"]["volumes"][1]["hostPath"] = conf_path
deploy_dict["container"]["docker"]["parameters"][0]["value"] = \
"MYSQL_ROOT_PASSWORD=KnowEnG"
# write and curl marathon job
deploy_dir = os.path.join(args.working_dir, args.logs_path, 'marathon_jobs')
if not os.path.exists(deploy_dir):
os.makedirs(deploy_dir)
out_path = os.path.join(deploy_dir, "kn_mysql-" + args.mysql_port +'.json')
with open(out_path, 'w') as outfile:
outfile.write(json.dumps(deploy_dict))
job = 'curl -fX POST -H "Content-type: application/json" ' + args.marathon + "/v2/apps -d '"
job += json.dumps(deploy_dict) + "'"
print(job)
if not args.test_mode:
try:
subprocess.check_output(job, shell=True)
except subprocess.CalledProcessError as ex1:
print(ex1.output)
else:
print(job)
[docs]def combine_tables(alias, args=None):
"""Combine all of the data imported from ensembl for the provided alias
into a single database.
This combines the imported tables into a single table knownet_mappings with
information from genes, transcripts, and translations. It then merges this
table into the KnowNet database for use in gene identifier mapping.
Args:
alias (str): An alias defined in ensembl.aliases.
args (Namespace): args as populated namespace or 'None' for defaults
"""
if args is None:
args = cf.config_args()
alias_db = 'ensembl_' + alias
combined_db = 'KnowNet'
combined_table = alias + '_mappings'
all_table = 'all_mappings'
steps = ['transcript', 'translation', 'transcript2stable',
'translation2stable', 'synonyms']
db = MySQL(alias_db, args)
db.create_table(combined_table, get_insert_cmd('gene'))
for step in steps:
db.insert(combined_table, get_insert_cmd(step))
db.use_db(combined_db)
cmd = ("SELECT *, db_display_name AS species FROM " + alias_db + '.' +
alias + "_mappings WHERE 1=2")
db.create_table(all_table, cmd)
cmd = ("SELECT UCASE(dbprimary_acc) as dbprimary_acc, "
"UCASE(display_label) AS display_label, "
"UCASE(db_name) AS db_name, priority, "
"UCASE(db_display_name) AS db_display_name, "
"UCASE(stable_id) AS stable_id, '" + alias + "' AS species FROM " +
alias_db + '.' + alias + "_mappings")
db.insert(all_table, cmd)
db.close()
[docs]def create_dictionary(results):
"""Creates a dictionary from a MySQL fetched results.
This returns a dictionary from the MySQL results after a query from the DB.
It assumes there are two columns in the results and reads through all of
the results, making them into a dictionary.
Args:
results (list): a list of the results returned from a MySQL query
Returns:
dict: dictionary with first column as key and second as values
"""
map_dict = dict()
for (raw, mapped) in results:
map_dict[str(raw)] = str(mapped)
return map_dict
[docs]def import_nodes(version_dict, args=None):
"""Imports the gene nodes into the KnowNet nodes and node_species tables.
Queries the imported ensembl nodes and uses the stable ids as nodes for
the KnowNet nodes table and uses the taxid to create the corresponding
node_species table.
Args:
version_dict (dict): the version dictionary describing the
source:alias
args (Namespace): args as populated namespace or 'None' for defaults
"""
if args is None:
args = cf.config_args()
alias = version_dict['alias']
taxid = version_dict['alias_info'].split('::')[0]
alias_db = 'ensembl_' + alias
db = MySQL(alias_db, args)
cmd = ("SELECT DISTINCT UCASE(gene.stable_id) AS node_id, "
"SUBSTRING(gene.description, 1, 512) AS n_alias, "
"'Gene' AS n_type_id "
"FROM gene "
"ON DUPLICATE KEY UPDATE node_id=node_id")
tablename = 'KnowNet.node'
db.insert(tablename, cmd)
cmd = ("SELECT DISTINCT UCASE(gene.stable_id) AS node_id, " + taxid +
" AS taxon FROM gene ON DUPLICATE KEY UPDATE node_id=node_id")
tablename = 'KnowNet.node_species'
db.insert(tablename, cmd)
cmd = ("SELECT DISTINCT UCASE(gene.stable_id) AS node_id, "
"'biotype' AS info_type, "
"gene.biotype AS info_desc "
"FROM gene "
"ON DUPLICATE KEY UPDATE node_id=node_id")
tablename = 'KnowNet.node_meta'
db.insert(tablename, cmd)
cmd = ("SELECT DISTINCT UCASE(gene.stable_id) AS node_id, 'taxid' AS info_type, " + taxid +
" AS info_desc FROM gene ON DUPLICATE KEY UPDATE node_id=node_id")
tablename = 'KnowNet.node_meta'
db.insert(tablename, cmd)
cmd = ("SELECT DISTINCT UCASE(gene.stable_id) AS node_id, "
"gene.description AS n_alias, "
"'Gene' AS n_type_id "
"FROM gene")
return db.run(cmd)
[docs]def query_all_mappings(version_dict, args=None):
"""Creates the all mappings dictionary for the provided alias.
Produces a dictionary of ensembl stable mappings and the all unique mappings
the provided alias. It then saves them as json objects to
file.
Args:
version_dict (dict): the version dictionary describing the
source:alias
args (Namespace): args as populated namespace or 'None' for defaults
"""
if args is None:
args = cf.config_args()
alias = version_dict['alias']
taxid = version_dict['alias_info']
database = 'ensembl_' + alias
table = alias + '_mappings'
map_dir = os.path.join(args.data_path, cf.DEFAULT_MAP_PATH)
if os.path.isdir(args.working_dir):
map_dir = os.path.join(args.working_dir, map_dir)
if not os.path.isdir(map_dir):
os.mkdir(map_dir)
db = MySQL(database, args)
cmd = "WHERE db_name='ENS_LRG_GENE'"
results = db.query_distinct('dbprimary_acc, stable_id', table, cmd)
lrg_dict = create_dictionary(results)
results = db.query_distinct('stable_id, stable_id', table)
map_dict = dict()
for (raw, mapped) in results:
raw = raw.replace("::","|")
if str(raw) in lrg_dict:
mapped = lrg_dict[str(raw)]
if str(mapped) in lrg_dict:
mapped = lrg_dict[str(mapped)]
map_dict[taxid + '::ENSEMBL_STABLE_ID::' + str(raw)] = str(mapped)
results = db.query_distinct('display_label AS dbprimary_acc, db_name, stable_id',
table)
results.extend(db.query_distinct('dbprimary_acc, db_name, stable_id',
table))
for (raw, hint, mapped) in results:
raw = raw.replace("::","|")
if str(raw) in lrg_dict:
mapped = lrg_dict[str(raw)]
if str(mapped) in lrg_dict:
mapped = lrg_dict[str(mapped)]
map_dict['::'.join([taxid, str(hint), str(raw)])] = str(mapped)
with open(os.path.join(map_dir, alias + '_all.json'), 'w') as outfile:
json.dump(map_dict, outfile, indent=4)
[docs]def create_mapping_dicts(version_dict, args=None):
"""Creates the mapping dictionaries for the provided alias.
Produces the ensembl stable mappings dictionary and the all unique mappings
dictionary for the provided alias. It then saves them as json objects to
file.
Args:
version_dict (dict): the version dictionary describing the
source:alias
args (Namespace): args as populated namespace or 'None' for defaults
"""
if args is None:
args = cf.config_args()
alias = version_dict['alias']
taxid = version_dict['alias_info']
database = 'KnowNet'
table = 'all_mappings'
cmd = "WHERE species='" + alias + "'"
map_dir = os.path.join(args.data_path, cf.DEFAULT_MAP_PATH)
if os.path.isdir(args.working_dir):
map_dir = os.path.join(args.working_dir, map_dir)
if not os.path.isdir(map_dir):
os.mkdir(map_dir)
db = MySQL(database, args)
results = db.query_distinct('stable_id, stable_id', table, cmd)
with open(os.path.join(map_dir, alias + '_stable.json'), 'w') as outfile:
map_dict = create_dictionary(results)
json.dump(map_dict, outfile, indent=4)
results = db.query_distinct('display_label AS dbprimary_acc, stable_id',
table, cmd)
results.extend(db.query_distinct('dbprimary_acc, stable_id', table, cmd))
cmd += " AND db_name='ENS_LRG_gene'"
results.extend(db.query_distinct('dbprimary_acc, stable_id', table, cmd))
with open(os.path.join(map_dir, alias + '_unique.json'), 'w') as outfile:
map_dict = create_dictionary(results)
json.dump(map_dict, outfile, indent=4)
[docs]def get_database(db=None, args=None):
"""Returns an object of the MySQL class.
This returns an object of the MySQL class to allow access to its functions
if the module is imported.
Args:
db (str): optional db to connect to
args (Namespace): args as populated namespace or 'None' for defaults
Returns:
MySQL: a source class object
"""
if args is None:
args = cf.config_args()
return MySQL(db, args)
[docs]def create_KnowNet(args=None):
"""Returns an object of the MySQL class with KnowNet db.
This returns an object of the MySQL class to allow access to its functions
if the module is imported.
Args:
db (str): optional db to connect to
args (Namespace): args as populated namespace or 'None' for defaults
Returns:
MySQL: a source class object
"""
if args is None:
args = cf.config_args()
db = MySQL(None, args)
db.init_knownet()
return db
[docs]def get_insert_cmd(step):
"""Returns the command to be used with an insert for the provided step.
This takes a predefined step to determine which type of insert is being
performed during the production of the knownet_mappings combined tables.
Based off of this step, it returns a MySQL command to be used with an
INSERT INTO statement.
Args:
step (str): the step indicating the step during the production of the
combined knownet_mapping tables
Returns:
str: the command to be used with an INSERT INTO statement at this step
"""
if step == 'gene':
cmd = ("SELECT DISTINCT xref.dbprimary_acc, xref.display_label, "
"external_db.db_name, external_db.priority, "
"external_db.db_display_name, gene.stable_id "
"FROM xref INNER JOIN external_db "
"ON xref.external_db_id = external_db.external_db_id "
"INNER JOIN object_xref "
"ON xref.xref_id = object_xref.xref_id "
"INNER JOIN gene "
"ON object_xref.ensembl_id = gene.gene_id "
"WHERE object_xref.ensembl_object_type = 'Gene'")
elif step == 'transcript':
cmd = ("SELECT DISTINCT xref.dbprimary_acc, xref.display_label, "
"external_db.db_name, external_db.priority, "
"external_db.db_display_name, gene.stable_id "
"FROM xref INNER JOIN external_db "
"ON xref.external_db_id = external_db.external_db_id "
"INNER JOIN object_xref "
"ON xref.xref_id = object_xref.xref_id "
"INNER JOIN transcript "
"ON object_xref.ensembl_id = transcript.transcript_id "
"INNER JOIN gene "
"ON transcript.gene_id = gene.gene_id "
"WHERE object_xref.ensembl_object_type = 'Transcript'")
elif step == 'translation':
cmd = ("SELECT DISTINCT xref.dbprimary_acc, xref.display_label, "
"external_db.db_name, external_db.priority, "
"external_db.db_display_name, gene.stable_id "
"FROM xref INNER JOIN external_db "
"ON xref.external_db_id = external_db.external_db_id "
"INNER JOIN object_xref "
"ON xref.xref_id = object_xref.xref_id "
"INNER JOIN translation "
"ON object_xref.ensembl_id = translation.translation_id "
"INNER JOIN transcript "
"ON translation.transcript_id = transcript.transcript_id "
"INNER JOIN gene "
"ON transcript.gene_id = gene.gene_id "
"WHERE object_xref.ensembl_object_type = 'Translation'")
elif step == 'transcript2stable':
cmd = ("SELECT DISTINCT transcript.stable_id AS dbprimary_acc, "
"transcript.stable_id AS display_label, "
"'ensembl' AS db_name, "
"1000 AS priority, "
"'ensembl' AS db_display_name, "
"gene.stable_id "
"FROM transcript "
"INNER JOIN gene "
"ON transcript.gene_id = gene.gene_id")
elif step == 'translation2stable':
cmd = ("SELECT DISTINCT translation.stable_id AS dbprimary_acc, "
"translation.stable_id AS display_label, "
"'ensembl' AS db_name, "
"1000 AS priority, "
"'ensembl' AS db_display_name, "
"gene.stable_id "
"FROM translation "
"INNER JOIN transcript "
"ON translation.transcript_id = transcript.transcript_id "
"INNER JOIN gene "
"ON transcript.gene_id = gene.gene_id")
elif step == 'synonyms':
cmd = ("SELECT DISTINCT external_synonym.synonym AS dbprimary_acc, "
"external_synonym.synonym AS display_label, "
"'ensembl_external_synonym' AS db_name, "
"1000 AS priority, "
"'ensembl_external_synonym' AS db_display_name, "
"gene.stable_id "
"FROM external_synonym "
"INNER JOIN object_xref "
"ON external_synonym.xref_id = object_xref.xref_id "
"INNER JOIN gene "
"ON object_xref.ensembl_id = gene.gene_id")
else:
cmd = ''
return cmd
[docs]def import_ensembl(alias, args=None):
"""Imports the ensembl data for the provided alias into the KnowEnG
database.
This produces the local copy of the fetched ensembl database for alias.
It drops the existing database, creates a new database, imports the
relevant ensembl sql schema, and imports the table.
Args:
alias (str): An alias defined in ensembl.aliases.
args (Namespace): args as populated namespace or 'None' for defaults
"""
if args is None:
args = cf.config_args()
database = 'ensembl_' + alias
db = MySQL(None, args)
db.init_knownet()
db.drop_db(database)
db.import_schema(database, 'schema.sql')
db.import_table(database, '*.txt')
db.close()
[docs]class MySQL(object):
"""Class providing functionality for interacting with the MySQL database.
This class serves as a wrapper for interacting with the KnowEnG MySQL
Attributes:
host (str): the MySQL db hostname
user (str): the MySQL db username
port (str): the MySQL db port
passw (str): the MySQL db password
database (str): the MySQL database to connect to
conn (object): connection object for the database
cursor (object): cursor object for the database
"""
def __init__(self, database=None, args=None):
"""Init a MySQL object with the provided parameters.
Constructs a MySQL object with the provided parameters, and connect to
the relevant database.
Args:
database (str): the MySQL database to connect to (optional)
args (Namespace): args as populated namespace or 'None' for defaults
"""
if args is None:
args = cf.config_args()
self.user = args.mysql_user
self.host = args.mysql_host
self.port = args.mysql_port
self.passw = args.mysql_pass
self.database = database
self.args = args
if self.database is None:
self.conn = sql.connect(host=self.host, port=self.port,
user=self.user, password=self.passw,
client_flags=[sql.ClientFlag.LOCAL_FILES])
else:
self.conn = sql.connect(host=self.host, port=self.port,
user=self.user, password=self.passw,
db=self.database,
client_flags=[sql.ClientFlag.LOCAL_FILES])
self.cursor = self.conn.cursor()
[docs] def drop_db(self, database):
"""Remove a database from the MySQL server
Drops the provided database from the MySQL server.
Args:
database (str): name of the database to remove from the MySQL server
"""
self.cursor.execute('DROP DATABASE IF EXISTS ' + database + ';')
self.conn.commit()
[docs] def init_knownet(self):
"""Inits the Knowledge Network MySQL DB.
Creates the KnowNet database and all of its tables if they do not
already exist. Also imports the edge_type, node_type, and species
files, but ignores any lines that have the same unique key as those
already in the tables.
"""
import_tables = ['node_type.txt', 'edge_type.txt']
mysql_dir = os.path.join(self.args.code_path, 'mysql')
self.import_schema('KnowNet', os.path.join(mysql_dir, 'KnowNet.sql'))
for table in import_tables:
tablefile = os.path.join(mysql_dir, table)
self.import_table('KnowNet', tablefile, '--ignore')
#self.cursor.execute("SET @@GLOBAL.SQL_MODE = REPLACE(@@SQL_MODE, " + \
# "'NO_ZERO_DATE', '')")
self.conn.commit()
[docs] def create_db(self, database):
"""Add a database to the MySQL server
Adds the provided database from the MySQL server.
Args:
database (str): name of the database to add to the MySQL server
"""
self.cursor.execute('CREATE DATABASE IF NOT EXISTS ' + database + ';')
self.conn.commit()
[docs] def use_db(self, database):
"""Use a database from the MySQL server
Use the provided database from the MySQL server.
Args:
database (str): name of the database to use from the MySQL server
"""
self.cursor.execute('USE ' + database + ';')
self.conn.commit()
[docs] def create_table(self, tablename, cmd=''):
"""Add a table to the MySQL database.
Adds the provided tablename to the MySQL database. If cmd is specified,
it will create the table using the provided cmd.
Args:
tablename (str): name of the table to add to the MySQL database
cmd (str): optional string to overwrite default create table
"""
self.cursor.execute('CREATE TABLE IF NOT EXISTS ' + tablename + ' ' +
cmd + ';')
self.conn.commit()
[docs] def create_temp_table(self, tablename, cmd=''):
"""Add a table to the MySQL database.
Adds the provided tablename to the MySQL database. If cmd is specified,
it will create the table using the provided cmd.
Args:
tablename (str): name of the table to add to the MySQL database
cmd (str): optional additional command
"""
self.cursor.execute('CREATE TEMPORARY TABLE IF NOT EXISTS ' + \
tablename + ' ' + cmd + ';')
self.conn.commit()
[docs] def load_data(self, filename, tablename, cmd='', sep='\\t', enc='"'):
"""Import data into table in the MySQL database.
Loads the data located on the local machine into the provided MySQL
table. Uses the LOAD DATA LOCAL INFILE command.
Args:
filename (str): name of the file to import from
tablename (str): name of the table to import into
sep (str): separator for fields in file
enc (str): enclosing character for fields in file
cmd (str): optional additional command
"""
self.cursor.execute("LOAD DATA LOCAL INFILE '" + filename +
"' INTO TABLE " + tablename +
" FIELDS TERMINATED BY '" + sep + "'" +
" OPTIONALLY ENCLOSED BY '" + enc + "' " +
cmd + ";")
self.conn.commit()
[docs] def drop_temp_table(self, tablename):
"""Remove a temporary table from the MySQL database
Drops the provided tablename from the MySQL database.
Args:
tablename (str): name of the table to remove from the MySQL database
"""
self.cursor.execute('DROP TEMPORARY TABLE IF EXISTS ' + tablename + ';')
self.conn.commit()
[docs] def drop_table(self, tablename):
"""Remove a table from the MySQL database
Drops the provided tablename from the MySQL database.
Args:
tablename (str): name of the table to remove from the MySQL database
"""
self.cursor.execute('DROP TABLE IF EXISTS ' + tablename + ';')
self.conn.commit()
[docs] def move_table(self, old_database, old_table, new_database, new_table):
"""Move a table in the MySQL database
Moves the provided tablename to the MySQL database.
Args:
old_database (str): name of the database to move from
old_table (str): name of the table to move from
new_database (str): name of the database to move to
new_table (str): name of the table to move to
"""
self.cursor.execute('ALTER TABLE ' + old_database + '.' + old_table +
' RENAME ' + new_database + '.' + new_table + ';')
self.conn.commit()
[docs] def copy_table(self, old_database, old_table, new_database, new_table):
"""Copy a table in the MySQL database
Copy the provided tablename to the MySQL database.
Args:
old_database (str): name of the database to move from
old_table (str): name of the table to move from
new_database (str): name of the database to move to
new_table (str): name of the table to move to
"""
table1 = old_database + '.' + old_table
table2 = new_database + '.' + new_table
self.create_table(table2, ' LIKE ' + table1)
cmd = 'INSERT INTO ' + table2 + ' SELECT * FROM ' + table1
self.cursor.execute(cmd)
self.conn.commit()
[docs] def insert(self, tablename, cmd):
"""Insert into tablename using cmd.
Args:
tablename (str): name of the table to add to the MySQL database
cmd (str): a valid SQL command to use for inserting into tablename
"""
self.cursor.execute('INSERT INTO ' + tablename + ' ' + cmd + ';')
self.conn.commit()
[docs] def set_isolation(self, duration='', level='REPEATABLE READ'):
"""Sets the transaction isolation level.
Modify the transaction isolation level to modulate lock status behavior.
Default InnoDB is repeatable read. For other levels check online at
https://dev.mysql.com/doc/refman/5.7/en/set-transaction.html
Args:
duration (str): time for isolation level to be used. Can be empty,
GLOBAL, or SESSION
level (str): isolation level. In order of locking level:
SERIALIZABLE, REPEATABLE READ, READ COMMITTED, READ UNCOMMITTED
"""
cmd = 'SET ' + duration + ' TRANSACTION ISOLATION LEVEL ' + level + ';'
self.cursor.execute(cmd)
[docs] def start_transaction(self, level='REPEATABLE READ'):
"""Starts a mysql transaction with the provided isolation level
Uses the provided isolation level to start a MySQL transaction using
the current connection. Transaction persists until the next commit.
Args:
level (str): isolation level. In order of locking level:
SERIALIZABLE, REPEATABLE READ, READ COMMITTED, READ UNCOMMITTED
"""
self.conn.start_transaction(isolation_level=level)
[docs] def replace(self, tablename, cmd):
"""Insert into tablename using cmd.
Replace into tablename using cmd.
Args:
tablename (str): name of the table to add to the MySQL database
cmd (str): a valid SQL command to use for inserting into tablename
"""
self.cursor.execute('REPLACE INTO ' + tablename + ' ' + cmd + ';')
self.conn.commit()
[docs] def replace_safe(self, tablename, cmd, values):
"""Insert into tablename using cmd.
Replace into tablename using cmd.
Args:
tablename (str): name of the table to add to the MySQL database
cmd (str): a valid SQL command to use for inserting into tablename
"""
self.cursor.execute('REPLACE INTO ' + tablename + ' ' + cmd + ';', values)
self.conn.commit()
[docs] def insert_ignore(self, tablename, cmd=''):
"""Insert ignore into tablename using cmd.
Args:
tablename (str): name of the table to add to the MySQL database
cmd (str): a valid SQL command to use for inserting into tablename
"""
self.cursor.execute('INSERT IGNORE INTO ' + tablename + ' ' + cmd + ';')
self.conn.commit()
[docs] def run(self, cmd):
"""Run the provided command in MySQL.
This runs the provided command using the current MySQL connection and
cursor.
Args:
cmd (str): the SQL command to run on the MySQL server
Returns:
list: the fetched results
"""
self.cursor.execute(cmd + ';')
try:
results = list(self.cursor)
except sql.Error:
results = list()
self.conn.commit()
return results
[docs] def query_distinct(self, query, table, cmd=''):
"""Run the provided query distinct in MySQL.
This runs the provided distinct query from the provided table with the
optional extra cmd using the current MySQL connection and cursor. It
then returns the fetched results.
Args:
query (str): the SQL query to run on the MySQL server
table (str): the table to query from
cmd (str): the addtional SQL command to run on the MySQL server
(optional)
Returns:
list: the fetched results
"""
cmd = 'SELECT DISTINCT ' + query + ' FROM ' + table + ' ' + cmd + ';'
self.cursor.execute(cmd)
return self.cursor.fetchall()
[docs] def import_schema(self, database, sqlfile):
"""Import the schema for the provided database from sqlfile.
Removes the provided database if it exists, creates a new one, and
imports the schema as defined in the provided sqlfile.
Args:
database (str): name of the database to add to the MySQL server
sqlfile (str): name of the sql file specifying the format for the
database
"""
self.create_db(database)
cmd = ['mysql', '-u', self.user, '-h', self.host, '--port', self.port,
'--password='+self.passw, database, '<', sqlfile]
subprocess.check_call(' '.join(cmd), shell=True)
[docs] def import_table(self, database, tablefile, import_flags='--delete'):
"""Import the data for the table in the provided database described by
tablefile.
Imports the data as defined in the provided tablefile.
Args:
database (str): name of the database to add to the MySQL server
tablefile (str): name of the txt file specifying the data for the
table
import_flag (str): additional flags to pass to mysqlimport
"""
cmd = ['mysqlimport', '-u', self.user, '-h', self.host, '--port',
self.port, '--password='+self.passw, import_flags,
database, '-L', tablefile, '-v']
subprocess.call(' '.join(cmd), shell=True)
[docs] def dump_table(self, table, file):
"""Dump the data for the table in the provided file(name).
"""
cmd = ['mysql', '-u', self.user, '-h', self.host, '--port', self.port,
'--password='+self.passw, self.database, "--execute 'SELECT * FROM", table,
"' --batch --silent >", file]
subprocess.check_call(' '.join(cmd), shell=True)
[docs] def disable_keys(self):
"""Disables keys for faster operations.
Turns off autocommit, unique_checks, and foreign_key_checks for
the MySQLdatabase.
"""
self.cursor.execute('SET autocommit=0;')
self.cursor.execute('SET unique_checks=0;')
self.cursor.execute('SET foreign_key_checks=0;')
self.conn.commit()
[docs] def enable_keys(self):
"""Enables keys for safer operations.
Turns on autocommit, unique_checks, and foreign_key_checks for
the MySQLdatabase.
"""
self.cursor.execute('SET autocommit=1;')
self.cursor.execute('SET unique_checks=1;')
self.cursor.execute('SET foreign_key_checks=1;')
self.conn.commit()
[docs] def close(self):
"""Close connection to the MySQL server.
This commits any changes remaining and closes the connection to the
MySQL server.
"""
self.conn.commit()
self.conn.close()
[docs]def main():
"""Deploy a MySQL container using marathon with the provided command line
arguements.
This uses the provided command line arguments and the defaults found in
config_utilities to launch a MySQL docker container using marathon.
"""
parser = ArgumentParser()
parser = cf.add_config_args(parser)
args = parser.parse_args()
print(args)
deploy_container(args)
if __name__ == "__main__":
main()