Source code for genipe.tools.impute2_merger


# This file is part of genipe.
#
# This work is licensed under the Creative Commons Attribution-NonCommercial
# 4.0 International License. To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc/4.0/ or send a letter to Creative
# Commons, PO Box 1866, Mountain View, CA 94042, USA.


import os
import re
import sys
import shlex
import logging
import argparse
from collections import defaultdict

import numpy as np

from .. import __version__
from ..formats import impute2
from ..error import GenipeError


__author__ = "Louis-Philippe Lemieux Perreault"
__copyright__ = "Copyright 2014, Beaulieu-Saucier Pharmacogenomics Centre"
__license__ = "Attribution-NonCommercial 4.0 International (CC BY-NC 4.0)"


[docs]def main(args=None): """The main function. Args: args (argparse.Namespace): the arguments to be parsed (if :py:func:`main` is called by another modulel) """ # Creating the option parser desc = ("Concatenate IMPUTE2 output files and retrieve some " "statistics. This script is part of the 'genipe' package, " "version {}.".format(__version__)) parser = argparse.ArgumentParser(description=desc) # Files that need closing logging_fh = None try: # Parsing the options args = parse_args(parser, args) # Adding the logging capability log_file = args.prefix + ".log" logging_fh = logging.FileHandler(log_file, mode="w") logging.basicConfig( format="[%(asctime)s %(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.DEBUG if args.debug else logging.INFO, handlers=[logging.StreamHandler(), logging_fh] ) logging.info("Logging everything into '{}'".format(log_file)) logging.info("Program arguments: {}".format( " ".join(shlex.quote(part) for part in sys.argv[1:]) )) # Checking the options check_args(args) # Concatenate and extract information concatenate_files(args.impute2, args.prefix, args.chrom, args) # Catching the Ctrl^C except KeyboardInterrupt: logging.info("Cancelled by user") sys.exit(0) # Catching the GenipeError except GenipeError as e: logging.error(e) parser.error(e.message) except Exception as e: logging.error(e) raise finally: if logging_fh is not None: logging_fh.close()
[docs]def concatenate_files(i_filenames, out_prefix, real_chrom, options): """Concatenates and extracts information from IMPUTE2 GEN file(s). Args: i_filenames (list): the list of input filenames (to concatenate) out_prefix (str): the output prefix for the output files real_chrom (str): the chromosome contained in all the input files options (argparse.Namespace): the options This function will create the following seven files: +-----------------------+-------------------------------------------------+ | File name | Description | +=======================+=================================================+ | ``.impute2`` | Imputation results (merged from all the input | | | files). | +-----------------------+-------------------------------------------------+ | ``.alleles`` | Description of the reference and alternative | | | allele at each sites. | +-----------------------+-------------------------------------------------+ | ``.imputed_sites`` | List of imputed sites (excluding sites that | | | were previously genotyped in the study cohort). | +-----------------------+-------------------------------------------------+ | ``.impute2_info`` | SNP-wise information file with one line per SNP | | | and a single header line at the beginning. | +-----------------------+-------------------------------------------------+ | ``.completion_rates`` | Number of missing values and completion rate | | | for all sites (using the probability threshold | | | set by the user, where the default is higher | | | and equal to 0.9). | +-----------------------+-------------------------------------------------+ | ``.good_sites`` | List of sites which pass the completion rate | | | threshold (set by the user, where the default | | | is higher and equal to 0.98) using the | | | probability threshold (set by the user, where | | | the default is higher and equal to 0.9). | +-----------------------+-------------------------------------------------+ | ``.map`` | A map file describing the genomic location of | | | all sites. | +-----------------------+-------------------------------------------------+ | ``.maf`` | File containing the minor allele frequency | | | (along with minor allele identification) for | | | all sites using the probabilitty threshold of | | | 0.9. When no genotypes are available (because | | | they are all below the threshold), the MAF is | | | ``NA``. | +-----------------------+-------------------------------------------------+ """ # Opening output files impute2_o_file = open(out_prefix + ".impute2", "w") impute2_info_o_file = open(out_prefix + ".impute2_info", "w") alleles_o_file = open(out_prefix + ".alleles", "w") imputed_sites_o_file = open(out_prefix + ".imputed_sites", "w") completion_o_file = open(out_prefix + ".completion_rates", "w") good_sites_o_file = open(out_prefix + ".good_sites", "w") map_o_file = open(out_prefix + ".map", "w") maf_o_file = open(out_prefix + ".maf", "w") # Printing the headers print("name", "nb_missing", "completion_rate", sep="\t", file=completion_o_file) print("name", "a1", "a2", sep="\t", file=alleles_o_file) print("name", "major", "minor", "maf", sep="\t", file=maf_o_file) # The markers that were already seen already_seen = defaultdict(int) # Opening the input file(s) one by one chr23_par_already_warned = False info_header_printed = False for i_filename in i_filenames: logging.info("Working with {}".format(i_filename)) # Getting the expected number of lines from summary file summary = None with open(i_filename + "_summary", "r") as i_file: summary = i_file.read() r = re.search(r"-Output file\n --\d+ type 0 SNPs\n --\d+ type 1 SNPs" r"\n --\d+ type 2 SNPs\n --\d+ type 3 SNPs\n" r" --(\d+) total SNPs", summary) if r is None: raise GenipeError("{}: unknown " "format".format(i_filename + "_summary")) nb_expected = int(r.group(1)) logging.info(" - expecting {:,d} lines".format(nb_expected)) # The input files i_file = open(i_filename, "r") i_info_file = open(i_filename + "_info", "r") # Reading the info header info_header_row = i_info_file.readline().rstrip("\r\n").split(" ") info_header = {name: i for i, name in enumerate(info_header_row)} # Printing the header for the info file only once if not info_header_printed: print("chr", "name", "position", *info_header_row[info_header["position"]+1:], sep="\t", file=impute2_info_o_file) info_header_printed = True nb_line = 0 for line in i_file: # The number of line nb_line += 1 # Splitting the line row = line.rstrip("\r\n").split(" ") # Gathering genotypes (chrom, name, pos, a1, a2), geno = impute2.matrix_from_line(row) # Splitting the info line info_row = i_info_file.readline() if info_row == "": raise GenipeError("{}: missing information for '{}'".format( i_filename + "_info", name, )) info_row = info_row.rstrip("\r\n").split(" ") # Checking that the two names and position are the same if ((name != info_row[info_header["rs_id"]]) or (pos != info_row[info_header["position"]])): raise GenipeError("{} and {}: not same order".format( i_filename, i_filename + "_info", )) # Checking the name of the marker if name == ".": name = "{}:{}".format(real_chrom, pos) # Have we seen this marker? if name in already_seen: new_name = "{}_{}".format(name, already_seen[name]) while new_name in already_seen: already_seen[name] += 1 new_name = "{}_{}".format(name, already_seen[name]) name = new_name already_seen[name] += 1 # Checking the chromosome if chrom == "---": # This site is imputed print(name, file=imputed_sites_o_file) elif chrom != real_chrom: # The chromosome in the impute2 file is not the same as # the one required in the options if (chrom == "23") and (real_chrom == "25"): # This might be that we are in the PAR # (pseudo-autosomal) region, so we print a warning # and continue if not chr23_par_already_warned: logging.warning("WARNING: asked for chromosome 25, " "but in chromosome 23: be sure to be " "in the pseudo-autosomal region") chr23_par_already_warned = True else: # This is a problem, so we quit raise GenipeError("{} != {}: not same " "chromosome".format(chrom, real_chrom)) # Adding the information to the coding file print(name, a1, a2, sep="\t", file=alleles_o_file) # Checking the completion rate and saving it good_calls = impute2.get_good_probs(geno, options.probability) nb = np.sum(good_calls) comp = 0 if geno.shape[0] != 0: comp = nb / geno.shape[0] print(name, geno.shape[0] - nb, comp, sep="\t", file=completion_o_file) # Computing the MAF and saving it maf, minor, major = impute2.maf_from_probs( prob_matrix=geno[good_calls], a1=a1, a2=a2, ) print(name, major, minor, maf, sep="\t", file=maf_o_file) # Checking the information value info_value = float(info_row[info_header["info"]]) if (comp >= options.completion) and (info_value >= options.info): # The completion is over the thresholds print(name, file=good_sites_o_file) # Saving the map file print(real_chrom, name, "0", pos, sep="\t", file=map_o_file) # Saving the data print(real_chrom, name, pos, a1, a2, *row[5:], sep=" ", file=impute2_o_file) # Saving the information data print(real_chrom, name, pos, *info_row[info_header["position"]+1:], sep="\t", file=impute2_info_o_file) # Closing the input file i_file.close() i_info_file.close() if nb_line != nb_expected: logging.warning(" - number of lines ({:,d}) is not as expected " "({:,d})".format(nb_line, nb_expected)) # Closing output files impute2_o_file.close() impute2_info_o_file.close() alleles_o_file.close() imputed_sites_o_file.close() completion_o_file.close() good_sites_o_file.close() map_o_file.close() maf_o_file.close()
[docs]def check_args(args): """Checks the arguments and options. Args: args (argparse.Namespace): the options to verify Note ---- If there is a problem, a :py:class:`genipe.error.GenipeError` is raised. """ # Checking the input files for filename in args.impute2: if not os.path.isfile(filename): raise GenipeError("{}: no such file".format(filename)) summary_file = filename + "_summary" if not os.path.isfile(summary_file): raise GenipeError("{}: no such file".format(summary_file)) info_file = filename + "_info" if not os.path.isfile(info_file): raise GenipeError("{}: no such file".format(info_file)) # Checking the header of the info file with open(info_file, "r") as i_file: header = set(i_file.readline().rstrip("\r\n").split(" ")) for name in ("rs_id", "position", "info"): if name not in header: raise GenipeError("{}: missing column '{}'".format( info_file, name, )) # Checking the chromosome valid_chromosome = [str(i) for i in range(1, 24)] valid_chromosome.append("25") if args.chrom not in valid_chromosome: raise GenipeError("{}: invalid chromosome".format(args.chrom)) if args.chrom == "23": logging.warning("MAF computation is wrong for chromosome 23 (males " "have 2 alleles in the computation, instead of 1)...") # Checking that probability, completion and info are between 0 and 1 if args.probability < 0 or args.probability > 1: raise GenipeError("{}: invalid probability".format(args.probability)) if args.completion < 0 or args.completion > 1: raise GenipeError("{}: invalid completion".format(args.completion)) if args.info < 0 or args.info > 1: raise GenipeError("{}: invalid info".format(args.info)) return True
[docs]def parse_args(parser, args=None): """Parses the command line options and arguments. Args: parser (argparse.ArgumentParser): the argument parser args (list): the list of arguments (if not taken from ``sys.argv``) Returns: argparse.Namespace: the list of options and arguments Note ---- The only check that is done here is by the parser itself. Values are verified later by the :py:func:`check_args` function. """ # The parser object parser.add_argument( "--version", action="version", version="%(prog)s, part of genipe version {}".format(__version__), ) parser.add_argument( "--debug", action="store_true", help="set the logging level to debug", ) # The input files group = parser.add_argument_group("Input Files") group.add_argument( "-i", "--impute2", type=str, metavar="FILE", required=True, nargs="+", help="IMPUTE2 file(s) to merge.", ) # The options group = parser.add_argument_group("Options") group.add_argument( "--chr", type=str, metavar="CHR", required=True, dest="chrom", help="The chromosome on witch the imputation was made.", ) group.add_argument( "--probability", type=float, metavar="FLOAT", default=0.9, help="The probability threshold for no calls. [<%(default).1f]", ) group.add_argument( "--completion", type=float, metavar="FLOAT", default=0.98, help="The completion rate threshold for site exclusion. " "[<%(default).2f]", ) group.add_argument( "--info", type=float, metavar="FLOAT", default=0, help="The measure of the observed statistical information associated " "with the allele frequency estimate threshold for site " "exclusion. [<%(default).2f]", ) # The output files group = parser.add_argument_group("Output Files") group.add_argument( "--prefix", type=str, metavar="FILE", default="imputed", help="The prefix for the output files. [%(default)s]", ) if args is not None: return parser.parse_args(args) return parser.parse_args()
# Calling the main, if necessary if __name__ == "__main__": main()