# This file is part of genipe.
#
# This work is licensed under the Creative Commons Attribution-NonCommercial
# 4.0 International License. To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc/4.0/ or send a letter to Creative
# Commons, PO Box 1866, Mountain View, CA 94042, USA.
import re
import os
import sys
import stat
import shlex
import datetime
import logging
import argparse
import platform
import traceback
from shutil import which
from multiprocessing import Pool
from subprocess import Popen, PIPE
from collections import namedtuple
import jinja2
import pandas as pd
from numpy.linalg.linalg import LinAlgError
from .. import __version__
from ..formats import impute2
from ..error import GenipeError
__author__ = ["Louis-Philippe Lemieux Perreault", "Marc-Andre Legault"]
__copyright__ = "Copyright 2014, Beaulieu-Saucier Pharmacogenomics Centre"
__license__ = "Attribution-NonCommercial 4.0 International (CC BY-NC 4.0)"
# Check if lifelines is installed
try:
from lifelines import CoxPHFitter
HAS_LIFELINES = True
except ImportError:
HAS_LIFELINES = False
# Check if statsmodels is installed
try:
import statsmodels.api as sm
import statsmodels.formula.api as smf
from patsy import dmatrices
HAS_STATSMODELS = True
except ImportError:
HAS_STATSMODELS = False
def _has_skat():
"""Checks if the SKAT R library is installed.
Returns:
bool: True if SKAT is installed, False otherwise.
"""
proc = Popen(
["Rscript", "-e", 'is.element("SKAT", installed.packages()[,1])'],
stdout=PIPE,
)
out = proc.communicate()[0].decode().strip()
return out.endswith("TRUE")
# Check if R and SKAT is installed
HAS_R = which("Rscript") is not None
HAS_SKAT = _has_skat() if HAS_R else False
# An IMPUTE2 row to process
_Row = namedtuple("_Row", ("row", "samples", "pheno", "pheno_name", "use_ml",
"categorical", "formula", "time_to_event", "event",
"inter_c", "is_chrx", "gender_c", "del_g", "scale",
"maf_t", "prob_t", "analysis_type",
"number_to_print", "random_effects", "mixedlm_p"))
# The Cox's regression required values
_COX_REQ_COLS = [
"coef", "se(coef)", "coef lower 95%", "coef upper 95%", "z", "p",
]
[docs]def main(args=None):
"""The main function.
Args:
args (argparse.Namespace): the arguments to be parsed (if
:py:func:`main` is called by another
modulel)
"""
# Creating the option parser
desc = ("Performs statistical analysis on imputed data (either SKAT "
"analysis, or linear, logistic or survival regression). This "
"script is part of the 'genipe' package, version "
"{}.".format(__version__))
parser = argparse.ArgumentParser(description=desc)
# Files that need closing
logging_fh = None
try:
# Parsing the options
args = parse_args(parser, args)
# Adding the logging capability
log_file = args.out + ".log"
logging_fh = logging.FileHandler(log_file, mode="w")
logging.basicConfig(
format="[%(asctime)s %(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.DEBUG if args.debug else logging.INFO,
handlers=[logging.StreamHandler(), logging_fh]
)
logging.info("Logging everything into '{}'".format(log_file))
logging.info("Program arguments: {}".format(
" ".join(shlex.quote(part) for part in sys.argv[1:])
))
# Checking the options
check_args(args)
# Reading the phenotype file
logging.info("Reading phenotype file")
phenotypes, remove_gender = read_phenotype(
args.pheno,
args,
check_duplicated=args.analysis_type != "mixedlm",
)
logging.info(" - {:,d} samples with phenotype".format(
len(phenotypes.index.unique()),
))
# Reading the sample file
logging.info("Reading the sample file")
samples = read_samples(args.sample)
logging.info(" - {:,d} samples with imputation data".format(
len(samples),
))
# Reading the sites to extract (if required)
sites_to_extract = None
if args.extract_sites is not None:
logging.info("Reading the sites to extract for analysis")
sites_to_extract = read_sites_to_extract(args.extract_sites)
logging.info(" - {:,d} sites to extract".format(
len(sites_to_extract),
))
# Computing the statistics
if args.analysis_type != "skat":
compute_statistics(
impute2_filename=args.impute2,
samples=samples,
markers_to_extract=sites_to_extract,
phenotypes=phenotypes,
remove_gender=remove_gender,
out_prefix=args.out,
options=args,
)
else:
skat_parse_impute2(
impute2_filename=args.impute2,
samples=samples,
markers_to_extract=sites_to_extract,
phenotypes=phenotypes,
remove_gender=remove_gender,
out_prefix=args.out,
args=args,
)
logging.info("Analysis completed")
# Catching the Ctrl^C
except KeyboardInterrupt:
logging.info("Cancelled by user")
sys.exit(0)
# Catching the GenipeError
except GenipeError as e:
logging.error(e)
parser.error(e.message)
except Exception as e:
logging.error(e)
raise
finally:
if logging_fh is not None:
logging_fh.close()
[docs]def read_phenotype(i_filename, opts, check_duplicated=True):
"""Reads the phenotype file.
Args:
i_filename (str): the name of the input file
opts (argparse.Namespace): the options
check_duplicated (bool): whether or not to check for duplicated samples
Returns:
pandas.DataFrame: the phenotypes
This file is expected to be a tab separated file of phenotypes and
covariates. The columns to use will be determined by the
``--sample-column`` and the ``--covar`` options.
For analysis including the X chromosome, the gender is automatically
added as a covariate. The results are not shown to the user unless asked
for.
"""
# Reading the data (and setting the index)
pheno = pd.read_csv(i_filename, sep="\t", na_values=opts.missing_value)
pheno[opts.sample_column] = pheno[opts.sample_column].astype(str)
pheno = pheno.set_index(opts.sample_column,
verify_integrity=check_duplicated)
# Finding the required column
required_columns = opts.covar.copy()
if opts.analysis_type == "cox":
required_columns.extend([opts.tte, opts.event])
else:
required_columns.append(opts.pheno_name)
# Interaction?
if opts.interaction is not None:
if opts.interaction not in required_columns:
required_columns.append(opts.interaction)
# Do we need the gender column?
remove_gender_column = False
if opts.chrx and (opts.gender_column not in required_columns):
# It's not already in the gender column, so we keep it, but we need to
# remove it later on...
required_columns.append(opts.gender_column)
remove_gender_column = True
# We need to exclude unknown gender only if gender was required
if (opts.gender_column in required_columns) and (not remove_gender_column):
# Log the number of male/female/missing.
sex_counts = pheno.groupby(opts.gender_column).size()
# Computing the number of unknown sex
unknown = sex_counts.loc[
~((sex_counts.index == 1) | (sex_counts.index == 2))
]
nb_unknown = 0
if unknown.shape[0] > 0:
nb_unknown = unknown.sum()
logging.info(" - {:,d} males / {:,d} females ({:,d} unknown)".format(
sex_counts.get(1, 0), sex_counts.get(2, 0), nb_unknown,
))
pheno = pheno[(pheno[opts.gender_column] == 1) |
(pheno[opts.gender_column] == 2)]
# Extracting the required column
pheno = pheno.loc[:, required_columns]
# Returning the phenotypes (dropping the nan values)
return pheno.dropna(axis=0), remove_gender_column
[docs]def read_samples(i_filename):
"""Reads the sample file (produced by SHAPEIT).
Args:
i_filename (str): the name of the input file
Returns:
pandas.DataFrame: the list of samples
This file contains the list of samples that are contained in the
``impute2`` file (with same order). The expected format for this file is a
tab separated file with a first row containing the following columns: ::
ID_1 ID_2 missing father mother sex plink_pheno
The subsequent row will be discarded and should contain: ::
0 0 0 D D D B
Notes
-----
We are mostly interested in the sample IDs corresponding to the
``ID_2`` column. Their uniqueness is verified by pandas.
"""
samples = pd.read_csv(i_filename, sep=" ", usecols=[0, 1])
samples = samples.drop(samples.index[0], axis=0)
samples["ID_2"] = samples["ID_2"].astype(str)
return samples.set_index("ID_2", verify_integrity=True)
[docs]def skat_read_snp_set(i_filename):
"""Reads the SKAT SNP set file.
Args:
i_filename (str): the name of the input file
Returns:
pandas.DataFrame: the SNP set for the SKAT analysis
This file has to be supplied by the user. The recognized columns are:
``variant``, ``snp_set`` and ``weight``. The ``weight`` column is optional
and can be used to specify a custom weighting scheme for SKAT. If nothing
is specified, the default Beta weights are used.
The file has to be tab delimited.
"""
skat_info = pd.read_csv(i_filename, sep="\t", header=0)
if "variant" not in skat_info.columns:
raise GenipeError("The SKAT SNP set file needs to have a 'variant' "
"column containing the ID of every variant of "
"interest.")
if "snp_set" not in skat_info.columns:
raise GenipeError("The SKAT SNP set file needs to have a 'snp_set' "
"column containing the SNP set ID for every "
"variant. The user is free to choose the SNP ID")
return skat_info
[docs]def skat_parse_impute2(impute2_filename, samples, markers_to_extract,
phenotypes, remove_gender, out_prefix, args):
"""Read the impute2 file and run the SKAT analysis.
Args:
impute2_filename (str): the name of the input file
samples (pandas.DataFrame): the samples
markers_to_extract (set): the set of markers to analyse
phenotypes (pandas.DataFrame): the phenotypes
remove_gender (bool): whether or not to remove the gender column
out_prefix (str): the output prefix
args (argparse.Namespace): the options
This function does most of the "dispatching" to run SKAT. It writes the
input files to the disk, runs the generated R scripts to do the actual
analysis and then writes the results to disk.
"""
# We keep track of the files that are generated because we will need the
# paths to generate the R script correctly.
r_files = {"snp_sets": [], "covariates": None, "outcome": None,
"weights": None}
# Read the SNP set and create the output files.
snp_set = skat_read_snp_set(args.snp_sets)
# We will use a temporary directory for our analysis.
dir_name = "{}.{}".format(
args.out,
datetime.datetime.today().strftime("skat.%Y.%m.%d")
)
dir_name = os.path.abspath(dir_name)
if not os.path.isdir(dir_name):
os.makedirs(dir_name)
else:
# This should not happen because the folder name contains a timestamp
raise GenipeError("A folder named '{}' already exists.".format(
dir_name
))
# If weights were provided, we will write a weight vector to disk for R.
if "weight" in snp_set.columns:
logging.info("SKAT will use the provided weights (from the SNP set "
"file).")
weight_filename = os.path.join(dir_name, "weights.csv")
snp_set[["weight"]].to_csv(weight_filename, index=False, header=False)
r_files["weights"] = weight_filename
# Open genotype CSV files for every SNP set. Those CSV files will be
# read in R and used by SKAT.
genotype_files = {}
snp_sets = snp_set["snp_set"].unique()
for set_id in snp_sets:
filename = os.path.join(dir_name, "{}.genotypes.csv".format(set_id))
r_files["snp_sets"].append(filename) # Track the filenames.
genotype_files[set_id] = open(filename, "w")
# We write the column headers for every file.
print("", *samples.index, sep=",", file=genotype_files[set_id])
# The markers of interest are the markers we want to include in the
# analysis. Concretely, they are the markers that were included in the snp
# sets. If a list of markers to extract is provided by the user, we also
# look at this to filter ou undesired markers.
#
# We also fill a set of written_markers to stop whenever all the markers
# were written to file.
markers_of_interest = set(snp_set["variant"])
written_markers = set()
if markers_to_extract is not None:
markers_of_interest = markers_of_interest & markers_to_extract
# Open the file. We use subprocess if it's gunzipped because it's faster.
# We use gzip -d -c instead of zcat because the default Mac OS zcat has
# weird behavior.
if impute2_filename.endswith(".gz"):
proc = Popen(["gzip", "-d", "-c", impute2_filename], stdout=PIPE)
i_file = proc.stdout
else:
i_file = open(impute2_filename, "rb")
for line in i_file:
# If we already found everything, we will stop here.
if len(written_markers) == len(markers_of_interest):
if written_markers == markers_of_interest:
logging.info("Found all the necessary markers, skipping the "
"rest of the file.")
break
line = line.decode("ascii")
# TODO: Add the gender and do QC (men with hetero on chrX).
line = _skat_parse_line(line, markers_of_interest, samples)
if line is not None:
name, dosage = line
written_markers.add(name)
_skat_write_marker(name, dosage, snp_set, genotype_files)
# We will warn the user if some variants were not found.
number_missing = len(markers_of_interest) - len(written_markers)
if number_missing > 0:
logging.warning("{} markers of interest were not found in the Impute2 "
"file.".format(number_missing))
i_file.close()
# Close the genotype files.
for file_handle in genotype_files.values():
file_handle.close()
# Write the covariate file.
phenotype_df = samples.join(phenotypes)
phenotype_df.index.name = "sample"
if args.covar:
# Make sure the samples are consistent by merging phenotype and
# samples.
filename = os.path.join(dir_name, "covariates.csv")
phenotype_df[args.covar].to_csv(
filename,
sep=",",
)
r_files["covariates"] = filename
# Write the phenotype file.
filename = os.path.join(
dir_name,
"{}.{}.csv".format(args.pheno_name, args.outcome_type)
)
phenotype_df[[args.pheno_name]].to_csv(
filename,
sep=",",
)
r_files["outcome"] = filename
r_scripts = _skat_generate_r_script(dir_name, r_files, args)
# Run the SKAT analysis by calling Rscript either in different subprocesses
# or linearly.
logging.info("Launching SKAT using {} processes on {} SNP sets.".format(
args.nb_process, len(snp_sets)
))
results = []
if args.nb_process > 1:
with Pool(processes=args.nb_process) as pool:
results = pool.map(_skat_run_job, r_scripts)
else:
for script in r_scripts:
results.append(_skat_run_job(script))
# Finally, write the SKAT output to disk.
output_filename = args.out + ".skat.dosage"
logging.info("SKAT completed, writing the results to disk ({}).".format(
output_filename
))
with open(output_filename, "w") as f:
# Write the header.
if args.skat_o:
print("snp_set_id", "p_value", sep="\t", file=f)
else:
print("snp_set_id", "p_value", "q_value", sep="\t", file=f)
# The order in the snp_sets list should have been consistent
# across the whole analysis. We just want to make sure that we
# actually have the right number of results.
assert len(snp_sets) == len(results)
# Write a tab separated file containing the set and p-values.
for i, (p_value, q_value) in enumerate(results):
set_id = snp_sets[i]
if args.skat_o:
print(set_id, p_value, sep="\t", file=f)
else:
print(set_id, p_value, q_value, sep="\t", file=f)
def _skat_run_job(script_filename):
"""Calls Rscript with the generated script and parses the results.
Args:
script_filename (str): the name of the script
Returns:
tuple: two values: the *p-value* and the *q-value* (for SKAT-O, the
*q-value* is set to None)
The results should be somewhere in the standard output. The expected
format is: ::
_PYTHON_HOOK_QVAL:[0.123]
_PYTHON_HOOK_PVAL:[0.123]
If the template script is modified, this format should still be respected.
It is also noteworthy that this function uses ``Rscript`` to run the
analysis. Hence, it should be in the path when using the imputed_stats
skat mode.
"""
# Parse the p-value.
proc = Popen(
["Rscript", script_filename],
stdout=PIPE,
stderr=PIPE,
)
out, err = proc.communicate()
if err:
logging.info("SKAT Warning: " + err.decode("utf-8"))
# Decoding
out = out.decode("utf-8")
# Getting the p value
p_match = re.search(r"_PYTHON_HOOK_PVAL:\[(.+)\]", out)
if p_match is None:
raise GenipeError("SKAT did not return properly. See script "
"'{}' for details.".format(script_filename))
# Getting the Q value
q_match = re.search(r"_PYTHON_HOOK_QVAL:\[(.+)\]", out)
if q_match is None:
raise GenipeError("SKAT did not return properly. See script "
"'{}' for details.".format(script_filename))
if q_match.group(1) == "NA":
# For SKAT-O, the Q will be set to NA.
return float(p_match.group(1)), None
else:
return float(p_match.group(1)), float(q_match.group(1))
def _skat_generate_r_script(dir_name, r_files, args):
"""Uses jinja2 to generate an R script to do the SKAT analysis.
Args:
dir_name (str): the output directory name to write the scripts in
r_files (dict): contains the different input files required by the R
script
args (argparse.Namespace): the parsed arguments
Returns:
list: the list of all script names
"""
jinja_env = jinja2.Environment(
loader=jinja2.PackageLoader("genipe", "script_templates")
)
template = jinja_env.get_template("run_skat.R")
scripts = []
# Create one file per SNP set for parallelism.
for snp_set_file in r_files["snp_sets"]:
rendered_script = template.render(
version=__version__,
snp_set_file=snp_set_file,
covariate_file=r_files["covariates"],
outcome_file=r_files["outcome"],
weights=r_files["weights"],
outcome_type="C" if args.outcome_type == "continuous" else "D",
skat_o=args.skat_o,
)
# Write the rendered script to disk.
script_filename = "run_skat_{}R".format(
# We parse the set id name.
os.path.basename(snp_set_file)[:-len(".genotype.csv")]
)
script_filename = os.path.join(dir_name, script_filename)
with open(script_filename, "w") as f:
f.write(rendered_script)
scripts.append(script_filename)
return scripts
def _skat_parse_line(line, markers_of_interest, samples, gender=None):
"""Parses a single line of the Impute2 file.
Args:
line (str): a line from the Impute2 file
markers_of_interest (set): a set of markers that are required for the
analysis
samples (pandas.DataFrame): contains the samples IDs (this is useful to
make sure we return a dosage vector with
the appropriate data)
Returns:
tuple: Either None if the marker is not of interest or a tuple of
``(name , dosage_vector)`` where ``name`` is a ``str``
representing the variant ID and ``dosage_vector`` is a numpy
array containing the dosage values for every sample in the
``samples`` dataframe.
"""
line = line.split(" ")
# info_tuple contains: chrom, name, pos, a1, a2
# proba_matrix is a matrix of sample x (aa, ab, bb)
info_tuple, proba_matrix = impute2.matrix_from_line(line)
chrom, name, pos, a1, a2 = info_tuple
# If this marker is not of interest, we don't bother continuing.
if name not in markers_of_interest:
return None
# We need to compute the dosage vector.
# This is given wrt to the minor and major alleles, so we need to get the
# MAF to identify those.
maf, minor_i, major_i = impute2.maf_from_probs(
prob_matrix=proba_matrix,
a1=0,
a2=2,
gender=gender,
site_name=name,
)
# We don't pass a scale parameter, because we want additive coding.
dosage = impute2.dosage_from_probs(
homo_probs=proba_matrix[:, minor_i],
hetero_probs=proba_matrix[:, 1],
)
return (name, dosage)
def _skat_write_marker(name, dosage, snp_set, genotype_files):
"""Write the dosage information to the appropriate genotype file.
Args:
name (str): the name of the marker
dosage (numpy.array): the dosage vector
snp_set (pandas.DataFrame: the dataframe that allows us to identify the
correct SNP set for the specified variant
genotype_files (dict): a dictionary containing the opened CSV files for
the genotypes
"""
# Identify the correct SNP set.
this_snp_set = snp_set.loc[snp_set["variant"] == name, "snp_set"].unique()
for set_id in this_snp_set:
file_object = genotype_files[set_id]
print(name, *dosage, sep=",", file=file_object)
def _extract_mixedlm_random_effect(fitted):
"""Extracts the random effects from a MixedLM fit object.
Args:
fitted (MixedLMResultsWrapper): The fitted object.
Returns:
pandas.DataFrame: The random effects as a DataFrame (with a column
named "RE").
Note
====
Depending of the version of StatsModels, the object might be a pandas
DataFrame or a dictionary...
"""
# Getting the random effects
random_effects = fitted.random_effects
# If it's a dictionary, we need to create a DataFrame
if isinstance(random_effects, dict):
return pd.DataFrame(random_effects).T.rename(
columns={"groups": "RE", "Group": "RE"},
)
return random_effects.rename(columns={"Intercept": "RE"})
[docs]def compute_statistics(impute2_filename, samples, markers_to_extract,
phenotypes, remove_gender, out_prefix, options):
"""Parses IMPUTE2 file while computing statistics.
Args:
impute2_filename (str): the name of the input file
samples (pandas.DataFrame): the list of samples
markers_to_extract (set): the set of markers to extract
phenotypes (pandas.DataFrame): the phenotypes
remove_gender (bool): whether or not to remove the gender column
out_prefix (str): the output prefix
options (argparse.Namespace): the options
This function takes care of parallelism. It reads the Impute2 file and
fills a queue that will trigger the analysis when full.
If the number of process to launch is 1, the rows are analyzed as they
come.
"""
# The name of the output file
o_name = "{}.{}.dosage".format(out_prefix, options.analysis_type)
# Do we need to create a formula?
formula = None
if options.analysis_type != "cox":
formula = get_formula(
phenotype=options.pheno_name,
covars=options.covar,
interaction=options.interaction,
gender_c=options.gender_column,
categorical=options.categorical,
)
logging.info("{}: '{}'".format(options.analysis_type, formula))
if options.analysis_type == "mixedlm" and options.use_ml:
logging.info(" - using ML")
else:
# This is Cox
formula = get_formula(
phenotype=options.tte + " + " + options.event,
covars=options.covar,
interaction=options.interaction,
gender_c=options.gender_column,
categorical=options.categorical,
)
# Reading the IMPUTE2 file one line (site) at a time, creating a subprocess
# if required
proc = None
i_file = None
o_file = open(o_name, "w")
pool = None
# Multiprocessing?
if options.nb_process > 1:
pool = Pool(processes=options.nb_process)
try:
if impute2_filename.endswith(".gz"):
proc = Popen(["gzip", "-d", "-c", impute2_filename], stdout=PIPE)
i_file = proc.stdout
else:
i_file = open(impute2_filename, "rb")
# Printing the header of the output file
header = ("chr", "pos", "snp", "major", "minor", "maf", "n", "coef",
"se", "lower", "upper",
"t" if options.analysis_type == "linear" else "z", "p")
if options.analysis_type == "linear":
header = header + ("adj.r-squared", )
if options.analysis_type == "mixedlm":
header = header + ("type", )
print(*header, sep="\t", file=o_file)
# The sites to process (if multiprocessing)
sites_to_process = []
# We need to compute the random effects if it's a MixedLM analysis
random_effects = None
if options.analysis_type == "mixedlm" and options.interaction is None:
random_effects = _extract_mixedlm_random_effect(smf.mixedlm(
formula=formula.replace("_GenoD + ", ""),
data=phenotypes,
groups=phenotypes.index,
).fit(reml=not options.use_ml))
# Reading the file
nb_processed = 0
for line in i_file:
row = line.decode().rstrip("\r\n").split(" ")
# Is this site required?
if markers_to_extract and (row[1] not in markers_to_extract):
continue
# Constructing the row object
site = _Row(
row=row,
samples=samples,
pheno=phenotypes,
use_ml=vars(options).get("use_ml", None),
pheno_name=vars(options).get("pheno_name", None),
formula=formula,
time_to_event=vars(options).get("tte", None),
event=vars(options).get("event", None),
inter_c=options.interaction,
is_chrx=options.chrx,
gender_c=options.gender_column,
del_g=remove_gender,
scale=options.scale,
maf_t=options.maf,
prob_t=options.prob,
analysis_type=options.analysis_type,
number_to_print=len(header),
categorical=options.categorical,
random_effects=random_effects,
mixedlm_p=vars(options).get("p_threshold", None),
)
# Is there more than one process
if options.nb_process > 1:
# Saving this site to process later
sites_to_process.append(site)
# Is there enough sites to process?
if len(sites_to_process) >= options.nb_lines:
for result in pool.map(process_impute2_site,
sites_to_process):
print(*result, sep="\t", file=o_file)
# Logging
nb_processed += options.nb_lines
logging.info("Processed {:,d} lines".format(nb_processed))
# Resetting the sites to process
sites_to_process = []
else:
# Processing this row
print(*process_impute2_site(site), sep="\t", file=o_file)
if len(sites_to_process) > 0:
for result in pool.map(process_impute2_site, sites_to_process):
print(*result, sep="\t", file=o_file)
# Logging
nb_processed += len(sites_to_process)
logging.info("Processed {:,d} lines".format(nb_processed))
except Exception:
if pool is not None:
pool.terminate()
logging.error(traceback.format_exc())
raise
finally:
# Closing the input file
i_file.close()
# Finishing the rows if required
if (options.nb_process > 1) and (pool is not None):
pool.close()
# Closing the output file
o_file.close()
# Closing the proc
if proc is not None:
if proc.wait() != 0:
raise GenipeError("{}: problem while reading the GZ "
"file".format(impute2_filename))
[docs]def process_impute2_site(site_info):
"""Process an IMPUTE2 site (a line in an IMPUTE2 file).
Args:
site_info (list): the impute2 line (split by space)
Returns:
list: the results of the analysis
"""
# Getting the probability matrix and site information
(chrom, name, pos, a1, a2), geno = impute2.matrix_from_line(site_info.row)
# The name of the dosage column
dosage_columns = ["_D1", "_D2", "_D3"]
# Allele encoding
allele_encoding = {dosage_columns[0]: a1, dosage_columns[-1]: a2}
# Creating the sample data frame
samples = site_info.samples
for i, col_name in enumerate(dosage_columns):
samples[col_name] = geno[:, i]
# Merging with phenotypes
data = pd.merge(
site_info.pheno,
samples,
how="inner",
left_index=True,
right_index=True
).dropna(axis=0)[list(site_info.pheno.columns) + dosage_columns]
# Keeping only good quality markers
data = data[
impute2.get_good_probs(data[dosage_columns].values, site_info.prob_t)
]
# FIXME: Quick and dirty fix for mixedlm...
# If the analysis type is not MixedLM, then t_data is just a pointer to the
# real data. Otherwise, t_data is the grouped values (first occurrence,
# which shouldn't cause a problem, since gender and probabilities are the
# same for each measurement).
t_data = data
if site_info.analysis_type == "mixedlm":
t_data = data.groupby(level=0).first()
# Checking gender if required
gender = None
if site_info.is_chrx:
# We want to exclude males with heterozygous calls for the rest of the
# analysis
invalid_rows = samples_with_hetero_calls(
t_data.loc[t_data[site_info.gender_c] == 1, dosage_columns],
dosage_columns[1]
)
if len(invalid_rows) > 0:
logging.warning("There were {:,d} males with heterozygous "
"calls for {}".format(len(invalid_rows), name))
logging.debug(t_data.shape)
data = data.drop(invalid_rows, axis=0)
t_data = t_data.drop(invalid_rows, axis=0)
logging.debug(t_data.shape)
# Getting the genders
gender = t_data[site_info.gender_c].values
# Computing the frequency
maf, minor, major = impute2.maf_from_probs(
prob_matrix=t_data[dosage_columns].values,
a1=dosage_columns[0],
a2=dosage_columns[-1],
gender=gender,
site_name=name,
)
# What we want to print
to_return = [chrom, pos, name, allele_encoding[major],
allele_encoding[minor], maf, t_data.shape[0]]
# If the marker is too rare, we continue with the rest
if (maf == "NA") or (maf < site_info.maf_t):
to_return.extend(["NA"] * (site_info.number_to_print - len(to_return)))
return to_return
# Computing the dosage on the minor allele
data["_GenoD"] = impute2.dosage_from_probs(
homo_probs=data[minor],
hetero_probs=data[dosage_columns[1]],
scale=site_info.scale,
)
# Removing the unwanted columns
unwanted_columns = dosage_columns
if site_info.del_g:
unwanted_columns.append(site_info.gender_c)
data = data.drop(unwanted_columns, axis=1)
# The column to get the result from
result_from_column = "_GenoD"
if site_info.inter_c is not None:
if ((site_info.inter_c == site_info.gender_c) or
(site_info.inter_c in site_info.categorical)):
result_from_column = "_GenoD:C({})[T.{}]".format(
site_info.inter_c,
sorted(data[site_info.inter_c].unique())[-1],
)
else:
result_from_column = "_GenoD:" + site_info.inter_c
# Fitting
results = []
try:
results = _fit_map[site_info.analysis_type](
data=data,
groups=data.index.values,
time_to_event=site_info.time_to_event,
event=site_info.event,
formula=site_info.formula,
result_col=result_from_column,
use_ml=site_info.use_ml,
random_effects=site_info.random_effects,
mixedlm_p=site_info.mixedlm_p,
interaction=site_info.inter_c is not None,
)
except LinAlgError as e:
# Something strange happened...
logging.warning("{}: numpy LinAlgError: {}".format(name, str(e)))
except ValueError as e:
if ((site_info.analysis_type == "cox")
and ("convergence halted" in str(e).lower())):
# With newer versions of lifelines (cox), if the convergence halted
# prematurely, a ValueError is raised containing the 'Convergence
# halted' message.
logging.warning("{}: Convergence halted".format(name))
else:
raise
# Extending the list to return
if len(results) == 0:
results = ["NA"] * (site_info.number_to_print - len(to_return))
to_return.extend(results)
return to_return
[docs]def samples_with_hetero_calls(data, hetero_c):
"""Gets male and heterozygous calls.
Args:
data (pandas.DataFrame): the probability matrix
hetero_c (str): the name of the heterozygous column
Returns:
pandas.Index: samples where call is heterozygous
Note
----
If there are no data (i.e. no males), an empty list is returned.
"""
if data.shape[0] == 0:
return []
return data[data.idxmax(axis=1) == hetero_c].index
[docs]def fit_cox(data, time_to_event, event, formula, result_col, **kwargs):
"""Fit a Cox' proportional hazard to the data.
Args:
data (pandas.DataFrame): the data to analyse
time_to_event (str): the time to event column for the survival analysis
event (str): the event column for the survival analysis
formula (str): the formula for the data preparation
result_col (str): the column that will contain the results
Returns:
numpy.array: the results from the survival analysis
Note
----
Using alpha of 0.95, and default parameters.
"""
# Preparing the data using Patsy
y, X = dmatrices(formula, data=data, return_type="dataframe")
data = pd.merge(y, X.drop("Intercept", axis=1), left_index=True,
right_index=True)
# Fitting
cf = CoxPHFitter()
cf.fit(data, duration_col=time_to_event, event_col=event)
return cf.summary.loc[result_col, _COX_REQ_COLS].values
[docs]def fit_linear(data, formula, result_col, **kwargs):
"""Fit a linear regression to the data.
Args:
data (pandas.DataFrame): the data to analyse
formula (str): the formula for the linear regression
result_col (str): the column that will contain the results
Returns:
list: the results from the linear regression
"""
return _get_result_from_linear(
smf.ols(formula=formula, data=data).fit(),
result_col=result_col,
)
[docs]def fit_logistic(data, formula, result_col, **kwargs):
"""Fit a logistic regression to the data.
Args:
data (pandas.DataFrame): the data to analyse
formula (str): the formula for the logistic regression
result_col (str): the column that will contain the results
Returns:
list: the results from the logistic regression
"""
return _get_result_from_logistic_mixedlm(
smf.glm(formula=formula, data=data,
family=sm.families.Binomial()).fit(),
result_col=result_col,
)
[docs]def fit_mixedlm(data, formula, use_ml, groups, result_col, random_effects,
mixedlm_p, interaction, **kwargs):
"""Fit a linear mixed effects model to the data.
Args:
data (pandas.DataFrame): the data to analyse
formula (str): the formula for the linear mixed effects model
use_ml (bool): whether to use ML instead of REML
groups (str): the column containing the groups
result_col (str): the column that will contain the results
random_effects (pandas.Series): the random effects
mixedlm_p (float): the p-value threshold for which loci will be
computed with the real MixedLM analysis
interaction (bool): Whether there is an interaction or not
Returns:
list: the results from the linear mixed effects model
"""
# We perform the optimization if there is no interaction
if not interaction:
# Getting the single copy of each genotypes
geno = data.reset_index()[["index", "_GenoD"]]
indexes = geno[["index"]].drop_duplicates().index
geno = geno.loc[indexes, :]
# Merging with the random effects
t_data = pd.merge(random_effects, geno.set_index("index"),
left_index=True, right_index=True)
# Approximating the results
approximate_r = _get_result_from_linear(
smf.ols(formula="RE ~ _GenoD", data=t_data).fit(),
result_col="_GenoD",
)
# If the approximated p-value is higher or equal to the threshold, we
# return the approximation
if approximate_r[5] >= mixedlm_p:
result = ["NA"] * (len(approximate_r) - 2)
result.append(approximate_r[5])
result.append("TS-MixedLM")
return result
# If we get here, it's because the p-value was low enough so we compute the
# real statistics
result = _get_result_from_logistic_mixedlm(
smf.mixedlm(formula=formula, data=data,
groups=groups).fit(reml=not use_ml),
result_col=result_col,
)
result.append("MixedLM")
return result
_fit_map = {
"cox": fit_cox,
"linear": fit_linear,
"logistic": fit_logistic,
"mixedlm": fit_mixedlm,
}
def _get_result_from_linear(fit_result, result_col):
"""Gets results from either a linear, a logistic or a mixedlm regression.
Args:
fit_result (RegressionResults): the results from the regression
result_col (str): the name of the result column
Returns:
list: the regression results
"""
conf_int = fit_result.conf_int().loc[result_col, :].values
assert len(conf_int) == 2
return [
fit_result.params[result_col],
fit_result.bse[result_col],
conf_int[0],
conf_int[1],
fit_result.tvalues[result_col],
fit_result.pvalues[result_col],
fit_result.rsquared_adj,
]
def _get_result_from_logistic_mixedlm(fit_result, result_col):
"""Gets results from either a linear, a logistic or a mixedlm regression.
Args:
fit_result (RegressionResults): the results from the regression
result_col (str): the name of the result column
Returns:
list: the regression results
"""
conf_int = fit_result.conf_int().loc[result_col, :].values
assert len(conf_int) == 2
return [
fit_result.params[result_col],
fit_result.bse[result_col],
conf_int[0],
conf_int[1],
fit_result.tvalues[result_col],
fit_result.pvalues[result_col],
]
[docs]def is_file_like(fn):
"""Checks if the path is like a file (it might be a named pipe).
Args:
fn (str): the path to check
Returns:
bool: True if path is like a file, False otherwise.
"""
return os.path.isfile(fn) or stat.S_ISFIFO(os.stat(fn).st_mode)
[docs]def check_args(args):
"""Checks the arguments and options.
Args:
args (argparse.Namespace): the options to verify
Note
----
If there is a problem, a :py:class:`genipe.error.GenipeError` is
raised.
"""
# Checking if we have the requirements
if args.analysis_type == "cox":
if not HAS_LIFELINES:
raise GenipeError("missing optional module: lifelines")
elif args.analysis_type in {"linear", "logistic", "mixedlm"}:
if not HAS_STATSMODELS:
raise GenipeError("missing optional module: statsmodels")
elif args.analysis_type == "skat":
if not HAS_R:
raise GenipeError("R is not installed")
if not HAS_SKAT:
raise GenipeError("R library missing: SKAT")
# Checking the required input files
for filename in [args.impute2, args.sample, args.pheno]:
if not os.path.isfile(filename):
raise GenipeError("{}: no such file".format(filename))
# Checking the optional input files
for filename in [args.extract_sites]:
if filename is not None:
if not is_file_like(filename):
raise GenipeError("{}: no such file".format(filename))
# Checking the number of process
on_mac_os = platform.system() == "Darwin"
if args.nb_process < 1:
raise GenipeError("{}: invalid number of "
"processes".format(args.nb_process))
if (args.nb_process > 1 and on_mac_os and args.analysis_type != "skat"):
raise GenipeError("multiprocessing is not supported on Mac OS when "
"using linear regression, logistic regression or "
"Cox.")
# Checking the number of lines to read
if args.nb_lines < 1:
raise GenipeError("{}: invalid number of lines to "
"read".format(args.nb_lines))
# Checking the MAF threshold
if args.maf < 0 or args.maf > 1:
raise GenipeError("{}: invalid MAF".format(args.maf))
# Checking the probability threshold
if args.prob < 0 or args.prob > 1:
raise GenipeError("{}: invalid probability "
"threshold".format(args.prob))
# Reading all the variables in the phenotype file
header = None
with open(args.pheno, "r") as i_file:
header = {name for name in i_file.readline().rstrip("\n").split("\t")}
# Checking the restricted columns
restricted_columns = {"_D1", "_D2", "_D3", "_MaxD", "_GenoD", "_Inter"}
if len(restricted_columns & header) != 0:
raise GenipeError("{}: {}: restricted variables".format(
args.pheno,
restricted_columns & header,
))
# Checking the required columns
variables_to_check = None
if args.analysis_type == "cox":
variables_to_check = {args.tte, args.event}
else:
variables_to_check = {args.pheno_name}
for variable in variables_to_check:
if variable not in header:
raise GenipeError("{}: {}: missing variable for {}".format(
args.pheno,
variable,
args.analysis_type,
))
# The categorical variables
categorical_set = set()
if args.categorical != "":
categorical_set = set(args.categorical.split(","))
for name in categorical_set:
if name not in header:
raise GenipeError("{}: {}: missing categorical value".format(
args.pheno,
name,
))
if args.analysis_type != "cox":
if args.pheno_name in categorical_set:
raise GenipeError("{}: {}: should not be in categorical "
"list".format(args.pheno, args.pheno_name))
args.categorical = categorical_set
# Checking the co-variables
covar_list = []
if args.covar != "":
covar_list = args.covar.split(",")
for covar in covar_list:
if covar not in header:
raise GenipeError("{}: {}: missing co-variable".format(
args.pheno,
covar,
))
args.covar = covar_list
# Checking the sample column
if args.sample_column not in header:
raise GenipeError("{}: {}: no such column (--sample-column)".format(
args.pheno,
args.sample_column,
))
# Checking the gender column (only if required)
if args.gender_column != "None":
if args.gender_column not in header:
raise GenipeError(
"{}: {}: no such column (--gender-column)".format(
args.pheno,
args.gender_column,
)
)
# Checking the interaction column (if required)
if args.interaction is not None:
if args.interaction not in header:
raise GenipeError(
"{}: {}: no such column (--interaction)".format(
args.pheno,
args.interaction,
)
)
if args.interaction in args.categorical:
logging.warning("interaction term is categorical: the last "
"category will be used in the results")
if args.analysis_type == "mixedlm":
logging.warning("when using interaction, mixedlm optimization "
"cannot be performed, analysis will be slow")
return True
[docs]def parse_args(parser, args=None):
"""Parses the command line options and arguments.
Args:
parser (argparse.ArgumentParser): the argument parser
args (list): the list of arguments (if not taken from ``sys.argv``)
Returns:
argparse.Namespace: the list of options and arguments
Note
----
The only check that is done here is by the parser itself. Values are
verified later by the :py:func:`check_args` function.
"""
# Adding the version option for the main parser
parser.add_argument(
"-v",
"--version",
action="version",
version="%(prog)s, part of genipe version {}".format(__version__),
)
# Creating a parent parser (for common options between analysis type)
p_parser = argparse.ArgumentParser(add_help=False)
# The parser object
p_parser.add_argument(
"-v", "--version", action="version",
version="%(prog)s (part of genipe version {})".format(__version__),
)
p_parser.add_argument(
"--debug", action="store_true",
help="set the logging level to debug",
)
# The input files
group = p_parser.add_argument_group("Input Files")
group.add_argument(
"--impute2", type=str, metavar="FILE", required=True,
help="The output from IMPUTE2.",
)
group.add_argument(
"--sample", type=str, metavar="FILE", required=True,
help="The sample file (the order should be the same as in the IMPUTE2 "
"files).",
)
group.add_argument(
"--pheno", type=str, metavar="FILE", required=True,
help="The file containing phenotypes and co variables.",
)
group.add_argument(
"--extract-sites", type=str, metavar="FILE",
help="A list of sites to extract for analysis (optional).",
)
# The output files
group = p_parser.add_argument_group("Output Options")
group.add_argument(
"--out", metavar="FILE", default="imputed_stats",
help="The prefix for the output files. [%(default)s]",
)
# General options
group = p_parser.add_argument_group("General Options")
group.add_argument(
"--nb-process", type=int, metavar="INT", default=1,
help="The number of process to use. [%(default)d]",
)
group.add_argument(
"--nb-lines", type=int, metavar="INT", default=1000,
help="The number of line to read at a time. [%(default)d]",
)
group.add_argument(
"--chrx", action="store_true",
help="The analysis is performed for the non pseudo-autosomal region "
"of the chromosome X (male dosage will be divided by 2 to get "
"values [0, 0.5] instead of [0, 1]) (males are coded as 1 and "
"option '--gender-column' should be used).",
)
group.add_argument(
"--gender-column", type=str, metavar="NAME", default="Gender",
help="The name of the gender column (use to exclude samples with "
"unknown gender (i.e. not 1, male, or 2, female). If gender not "
"available, use 'None'. [%(default)s]",
)
# The dosage options
group = p_parser.add_argument_group("Dosage Options")
group.add_argument(
"--scale", type=int, metavar="INT", default=2, choices=[1, 2],
help="Scale dosage so that values are in [0, n] (possible values are "
"1 (no scaling) or 2). [%(default)d]",
)
group.add_argument(
"--prob", type=float, metavar="FLOAT", default=0.9,
help="The minimal probability for which a genotype should be "
"considered. [>=%(default).1f]",
)
group.add_argument(
"--maf", type=float, metavar="FLOAT", default=0.01,
help="Minor allele frequency threshold for which marker will be "
"skipped. [<%(default).2f]",
)
# The general phenotype options
group = p_parser.add_argument_group("Phenotype Options")
group.add_argument(
"--covar", type=str, metavar="NAME", default="",
help="The co variable names (in the phenotype file), separated by "
"coma.",
)
group.add_argument(
"--categorical", type=str, metavar="NAME", default="",
help="The name of the variables that are categorical (note that the "
"gender is always categorical). The variables are separated by "
"coma.",
)
group.add_argument(
"--missing-value", type=str, metavar="NAME",
help="The missing value in the phenotype file.",
)
group.add_argument(
"--sample-column", type=str, metavar="NAME", default="sample_id",
help="The name of the sample ID column (in the phenotype file). "
"[%(default)s]",
)
group.add_argument(
"--interaction", type=str, metavar="NAME",
help="Add an interaction between the genotype and this variable.",
)
# Sub parsers
subparsers = parser.add_subparsers(
title="Statistical Analysis Type",
description="The type of statistical analysis to be performed on the "
"imputed data.",
dest="analysis_type",
)
subparsers.required = True
# The Cox parser
cox_parser = subparsers.add_parser(
"cox",
help="Cox's proportional hazard model (survival regression).",
description="Performs a survival regression on imputed data using "
"Cox's proportional hazard model. This script is part of "
"the 'genipe' package, version {}.".format(__version__),
parents=[p_parser],
)
group = cox_parser.add_argument_group("Cox's Proportional Hazard Model "
"Options")
group.add_argument(
"--time-to-event", type=str, metavar="NAME", required=True, dest="tte",
help="The time to event variable (in the pheno file).",
)
group.add_argument(
"--event", type=str, metavar="NAME", required=True,
help="The event variable (1 if observed, 0 if not observed)",
)
# The linear parser
lin_parser = subparsers.add_parser(
"linear",
help="Linear regression (ordinary least squares).",
description="Performs a linear regression (ordinary least squares) on "
"imputed data. This script is part of the 'genipe' "
"package, version {}.".format(__version__),
parents=[p_parser],
)
# The phenotype options for linear regression
group = lin_parser.add_argument_group("Linear Regression Options")
group.add_argument(
"--pheno-name", type=str, metavar="NAME", required=True,
help="The phenotype.",
)
# The logistic parser
logit_parser = subparsers.add_parser(
"logistic",
help="Logistic regression (GLM with binomial distribution).",
description="Performs a logistic regression on imputed data using a "
"GLM with a binomial distribution. This script is part of "
"the 'genipe' package, version {}.".format(__version__),
parents=[p_parser],
)
# The phenotype options for logistic regression
group = logit_parser.add_argument_group("Logistic Regression Options")
group.add_argument(
"--pheno-name", type=str, metavar="NAME", required=True,
help="The phenotype.",
)
# The mixed effect model parser
mixedlm_parser = subparsers.add_parser(
"mixedlm",
help="Linear mixed effect model (random intercept).",
description="Performs a linear mixed effects regression on imputed "
"data using a random intercept for each group. A p-value "
"approximation is performed so that computation time is "
"acceptable for imputed data. This script is part of the "
"'genipe' package, version {}.".format(__version__),
parents=[p_parser],
)
# The phenotype options for the mixed effect model
group = mixedlm_parser.add_argument_group("Linear Mixed Effects Options")
group.add_argument(
"--pheno-name", type=str, metavar="NAME", required=True,
help="The phenotype.",
)
group.add_argument(
"--use-ml", action="store_true",
help="Fit the standard likelihood using maximum likelihood (ML) "
"estimation instead of REML (default is REML).",
)
group.add_argument(
"--p-threshold", type=float, metavar="FLOAT", default=1e-4,
help="The p-value threshold for which the real MixedLM analysis will "
"be performed. [<%(default).4f]",
)
# The SKAT parser.
skat_parser = subparsers.add_parser(
"skat",
help="SKAT analysis.",
description="Uses the SKAT R package to analyze user defined gene "
"sets. This script is part of the 'genipe' package, "
"version {}.".format(__version__),
parents=[p_parser],
)
# Additional options for SKAT analysis.
group = skat_parser.add_argument_group("SKAT Options")
# The SNP set file.
group.add_argument(
"--snp-sets", type=str, metavar="FILE", required=True,
help="A file indicating a snp_set and an optional weight for every "
"variant."
)
# The outcome type: discrete or continuous.
group.add_argument(
"--outcome-type", type=str, choices=("continuous", "discrete"),
required=True,
help="The variable type for the outcome. This will be passed to SKAT."
)
# SKAT-O flag.
group.add_argument(
"--skat-o", action="store_true",
help="By default, the regular SKAT is used. Setting this flag will "
"use the SKAT-O algorithm instead."
)
group.add_argument(
"--pheno-name", type=str, metavar="NAME", required=True,
help="The phenotype.",
)
if args is not None:
return parser.parse_args(args)
return parser.parse_args()
# Calling the main, if necessary
if __name__ == "__main__":
main()