Source code for genipe.tests.test_impute2_merger


# This file is part of genipe.
#
# This work is licensed under the Creative Commons Attribution-NonCommercial
# 4.0 International License. To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc/4.0/ or send a letter to Creative
# Commons, PO Box 1866, Mountain View, CA 94042, USA.


import os
import logging
import unittest
from tempfile import TemporaryDirectory

from ..tools import impute2_merger


__author__ = "Louis-Philippe Lemieux Perreault"
__copyright__ = "Copyright 2014, Beaulieu-Saucier Pharmacogenomics Centre"
__license__ = "Attribution-NonCommercial 4.0 International (CC BY-NC 4.0)"


__all__ = ["TestImpute2Merger"]


[docs]class TestImpute2Merger(unittest.TestCase):
[docs] @staticmethod def clean_logging_handlers(): handlers = list(logging.root.handlers) for handler in handlers: logging.root.removeHandler(handler)
[docs] def setUp(self): """Setup the tests.""" # Creating the temporary directory self.output_dir = TemporaryDirectory(prefix="genipe_test_") # Creating small input files self.filenames = [ os.path.join(self.output_dir.name, "input_1.impute2"), os.path.join(self.output_dir.name, "input_2.impute2"), os.path.join(self.output_dir.name, "input_3.impute2"), os.path.join(self.output_dir.name, "input_4.impute2"), os.path.join(self.output_dir.name, "input_5.impute2"), os.path.join(self.output_dir.name, "input_6.impute2"), os.path.join(self.output_dir.name, "input_7.impute2"), ] # The content of the files file_content = [ "--- rs12345 1231415 A G 1 0 0 0.988 0.002 0 0 0.997 0.003", "1 rs23456 3214569 T C 0.869 0.130 0 0.903 0.095 0.002 0 0 1", "--- rs23457 3214570 T TC 0.869 0.130 0 0 1 0 0 0 1", "--- rs23457 3214570 T TC 0.869 0.130 0 0 1 0 0 0 1", "--- rs23457 3214570 T TC 0.869 0.130 0 0 1 0 0 0 1", "--- . 3214570 T TC 0.869 0.130 0 0 1 0 0 0 1", "--- . 3214570 T TC 0.869 0.130 0 0.869 0.130 0 0.869 0.130 0", ] # The summary content summary_content = [ "-Output file\n" " --1 type 0 SNPs\n" " --0 type 1 SNPs\n" " --1 type 2 SNPs\n" " --1 type 3 SNPs\n" " --1 total SNPs" ] * len(self.filenames) # The SNP-wise information content info_content = [ "--- rs12345 1231415 A G 0.006 0.359 0.987 0 -1 -1 -1", "1 rs23456 3214569 T C 0.082 0.362 0.866 0 -1 -1 -1", "--- rs23457 3214570 T TC 0.126 0.299 0.832 0 -1 -1 -1", "--- rs23457 3214570 T TC 0.060 0.300 0.909 0 -1 -1 -1", "--- rs23457 3214570 T TC 0.084 0.203 0.854 0 -1 -1 -1", "--- . 3214570 T TC 0.371 0.339 0.619 0 -1 -1 -1", "--- . 3214570 T TC 0.174 0.589 0.831 0 -1 -1 -1", ] # Creating the files zipped = zip(self.filenames, file_content, summary_content, info_content) for filename, content, s_content, i_content in zipped: with open(filename, "w") as o_file: print(content, file=o_file) with open(filename + "_summary", "w") as o_file: print(s_content, file=o_file) with open(filename + "_info", "w") as o_file: print("snp_id", "rs_id", "position", "a0", "a1", "exp_freq_a1", "info", "certainty", "type", "info_type0", "concord_type0", "r2_type0", file=o_file) print(i_content, file=o_file) # Executing the script for the first time prefix1 = os.path.join(self.output_dir.name, "genipe_results_1") args = [ "--chr", "1", "--probability", "0.9", "--completion", "0.98", "--prefix", prefix1, "--impute2", ] args += self.filenames impute2_merger.main(args=args) # Cleaning the handlers TestImpute2Merger.clean_logging_handlers() # Executing the script for the second time with different values prefix2 = os.path.join(self.output_dir.name, "genipe_results_2") args = [ "--chr", "1", "--probability", "0.8", "--completion", "0.98", "--prefix", prefix2, "--info", "0.21", "--impute2", ] args += self.filenames impute2_merger.main(args=args) # Cleaning the handlers TestImpute2Merger.clean_logging_handlers() # Executing the script for the second time with different values prefix3 = os.path.join(self.output_dir.name, "genipe_results_3") args = [ "--chr", "1", "--probability", "0.9", "--completion", "0.6", "--prefix", prefix3, "--info", "0.3", "--impute2", ] args += self.filenames impute2_merger.main(args=args) # Cleaning the handlers TestImpute2Merger.clean_logging_handlers() # Saving the prefixes self.prefixes = [prefix1, prefix2, prefix3]
[docs] def tearDown(self): """Finishes the test.""" # Deleting the output directory self.output_dir.cleanup()
[docs] def test_check_output_files(self): """Checks the presence of all the output files.""" suffixes = [".alleles", ".completion_rates", ".good_sites", ".impute2", ".imputed_sites", ".log", ".maf", ".map", ".impute2_info"] for suffix in suffixes: for prefix in self.prefixes: self.assertTrue(os.path.isfile(prefix + suffix))
[docs] def test_check_alleles(self): """Checks the '.alleles' file.""" expected = ( "name\ta1\ta2\n" "rs12345\tA\tG\n" "rs23456\tT\tC\n" "rs23457\tT\tTC\n" "rs23457_1\tT\tTC\n" "rs23457_2\tT\tTC\n" "1:3214570\tT\tTC\n" "1:3214570_1\tT\tTC\n" ) # Checking the files for prefix in self.prefixes: observed = None with open(prefix + ".alleles", "r") as i_file: observed = i_file.read() self.assertEqual(expected, observed)
[docs] def test_completion_rates(self): """Checks the '.completion_rates' file.""" all_expected = [] all_expected.append(( "name\tnb_missing\tcompletion_rate\n" "rs12345\t0\t{}\n" "rs23456\t1\t{}\n" "rs23457\t1\t{}\n" "rs23457_1\t1\t{}\n" "rs23457_2\t1\t{}\n" "1:3214570\t1\t{}\n" "1:3214570_1\t3\t{}\n" ).format(1, 2/3, 2/3, 2/3, 2/3, 2/3, 0)) all_expected.append(( "name\tnb_missing\tcompletion_rate\n" "rs12345\t0\t{}\n" "rs23456\t0\t{}\n" "rs23457\t0\t{}\n" "rs23457_1\t0\t{}\n" "rs23457_2\t0\t{}\n" "1:3214570\t0\t{}\n" "1:3214570_1\t0\t{}\n" ).format(1, 1, 1, 1, 1, 1, 1)) all_expected.append(( "name\tnb_missing\tcompletion_rate\n" "rs12345\t0\t{}\n" "rs23456\t1\t{}\n" "rs23457\t1\t{}\n" "rs23457_1\t1\t{}\n" "rs23457_2\t1\t{}\n" "1:3214570\t1\t{}\n" "1:3214570_1\t3\t{}\n" ).format(1, 2/3, 2/3, 2/3, 2/3, 2/3, 0)) # There should be enough expected values if len(self.prefixes) != len(all_expected): self.fail("Wrong number of expected values...") # Checking the files for prefix, expected in zip(self.prefixes, all_expected): observed = None with open(prefix + ".completion_rates", "r") as i_file: observed = i_file.read() # Splitting lines expected = expected.splitlines() observed = observed.splitlines() # Should have the same number of lines self.assertEqual(len(expected), len(observed)) # Comparing the results for i, (e, o) in enumerate(zip(expected, observed)): if i == 0: self.assertEqual(e, o) continue # Splitting e = e.split("\t") o = o.split("\t") # Should be the same length self.assertEqual(len(e), len(o)) # Values should be the same for j in range(len(e)): if j == 0: self.assertEqual(e[j], o[j]) elif j == 1: self.assertEqual(int(e[j]), int(o[j])) elif j == 2: self.assertAlmostEqual(float(e[j]), float(o[j])) else: self.fail("Wrong number of values")
[docs] def test_good_sites(self): """Checks the '.good_sites' file.""" all_expected = [] all_expected.append( "rs12345\n" ) all_expected.append( "rs12345\n" "rs23456\n" "rs23457\n" "rs23457_1\n" "1:3214570\n" "1:3214570_1\n" ) all_expected.append( "rs12345\n" "rs23456\n" "rs23457_1\n" "1:3214570\n" ) # There should be enough expected values if len(self.prefixes) != len(all_expected): self.fail("Wrong number of expected values...") for prefix, expected in zip(self.prefixes, all_expected): observed = None with open(prefix + ".good_sites", "r") as i_file: observed = i_file.read() self.assertEqual(expected, observed)
[docs] def test_impute2(self): """Checks the '.impute2' file.""" expected = ( "1 rs12345 1231415 A G 1 0 0 0.988 0.002 0 0 0.997 0.003\n" "1 rs23456 3214569 T C 0.869 0.130 0 0.903 0.095 0.002 0 0 1\n" "1 rs23457 3214570 T TC 0.869 0.130 0 0 1 0 0 0 1\n" "1 rs23457_1 3214570 T TC 0.869 0.130 0 0 1 0 0 0 1\n" "1 rs23457_2 3214570 T TC 0.869 0.130 0 0 1 0 0 0 1\n" "1 1:3214570 3214570 T TC 0.869 0.130 0 0 1 0 0 0 1\n" "1 1:3214570_1 3214570 T TC 0.869 0.130 0 0.869 0.130 0 0.869 " "0.130 0\n" ) # Checking the files for prefix in self.prefixes: observed = None with open(prefix + ".impute2", "r") as i_file: observed = i_file.read() self.assertEqual(expected, observed)
[docs] def test_impute2_info(self): """Checks the '.impute2_info' file.""" expected = ( "chr\tname\tposition\ta0\ta1\texp_freq_a1\tinfo\tcertainty\ttype\t" "info_type0\tconcord_type0\tr2_type0\n" "1\trs12345\t1231415\tA\tG\t0.006\t0.359\t0.987\t0\t-1\t-1\t-1\n" "1\trs23456\t3214569\tT\tC\t0.082\t0.362\t0.866\t0\t-1\t-1\t-1\n" "1\trs23457\t3214570\tT\tTC\t0.126\t0.299\t0.832\t0\t-1\t-1\t-1\n" "1\trs23457_1\t3214570\tT\tTC\t0.060\t0.300\t0.909\t0\t-1\t-1\t" "-1\n" "1\trs23457_2\t3214570\tT\tTC\t0.084\t0.203\t0.854\t0\t-1\t-1\t" "-1\n" "1\t1:3214570\t3214570\tT\tTC\t0.371\t0.339\t0.619\t0\t-1\t-1\t" "-1\n" "1\t1:3214570_1\t3214570\tT\tTC\t0.174\t0.589\t0.831\t0\t-1\t-1\t" "-1\n" ) # Checking the files for prefix in self.prefixes: observed = None with open(prefix + ".impute2_info", "r") as i_file: observed = i_file.read() self.assertEqual(expected, observed)
[docs] def test_imputed_sites(self): """Checks the '.imputed_sites' file.""" expected = ( "rs12345\n" "rs23457\n" "rs23457_1\n" "rs23457_2\n" "1:3214570\n" "1:3214570_1\n" ) # Checking the files for prefix in self.prefixes: observed = None with open(prefix + ".imputed_sites", "r") as i_file: observed = i_file.read() self.assertEqual(expected, observed)
[docs] def test_maf(self): """Checks the '.maf' file.""" all_expected = [] all_expected.append(( "name\tmajor\tminor\tmaf\n" "rs12345\tA\tG\t{}\n" "rs23456\tT\tC\t{}\n" "rs23457\tTC\tT\t{}\n" "rs23457_1\tTC\tT\t{}\n" "rs23457_2\tTC\tT\t{}\n" "1:3214570\tTC\tT\t{}\n" "1:3214570_1\tT\tTC\t{}\n" ).format(1/6, 0.5, 1/4, 1/4, 1/4, 1/4, "NA")) all_expected.append(( "name\tmajor\tminor\tmaf\n" "rs12345\tA\tG\t{}\n" "rs23456\tT\tC\t{}\n" "rs23457\tT\tTC\t{}\n" "rs23457_1\tT\tTC\t{}\n" "rs23457_2\tT\tTC\t{}\n" "1:3214570\tT\tTC\t{}\n" "1:3214570_1\tT\tTC\t{}\n" ).format(1/6, 2/6, 0.5, 0.5, 0.5, 0.5, 0)) all_expected.append(( "name\tmajor\tminor\tmaf\n" "rs12345\tA\tG\t{}\n" "rs23456\tT\tC\t{}\n" "rs23457\tTC\tT\t{}\n" "rs23457_1\tTC\tT\t{}\n" "rs23457_2\tTC\tT\t{}\n" "1:3214570\tTC\tT\t{}\n" "1:3214570_1\tT\tTC\t{}\n" ).format(1/6, 0.5, 1/4, 1/4, 1/4, 1/4, "NA")) # There should be enough expected values if len(self.prefixes) != len(all_expected): self.fail("Wrong number of expected values...") for prefix, expected in zip(self.prefixes, all_expected): observed = None with open(prefix + ".maf", "r") as i_file: observed = i_file.read() # Splitting lines expected = expected.splitlines() observed = observed.splitlines() # Should have the same number of lines self.assertEqual(len(expected), len(observed)) # Comparing the results for i, (e, o) in enumerate(zip(expected, observed)): if i == 0: self.assertEqual(e, o) continue # Splitting e = e.split("\t") o = o.split("\t") # Should be the same length self.assertEqual(len(e), len(o)) # Values should be the same self.assertEqual(e[:3], o[:3]) for j in range(len(e)): if j < 3: self.assertEqual(e[j], o[j]) elif j == 3: if e[j] != "NA": self.assertAlmostEqual(float(e[j]), float(o[j])) else: self.assertEqual(e[j], o[j]) else: self.fail("Wrong number of values")
[docs] def test_map(self): """Checks the '.map' file.""" expected = ( "1\trs12345\t0\t1231415\n" "1\trs23456\t0\t3214569\n" "1\trs23457\t0\t3214570\n" "1\trs23457_1\t0\t3214570\n" "1\trs23457_2\t0\t3214570\n" "1\t1:3214570\t0\t3214570\n" "1\t1:3214570_1\t0\t3214570\n" ) # Checking the files for prefix in self.prefixes: observed = None with open(prefix + ".map", "r") as i_file: observed = i_file.read() self.assertEqual(expected, observed)