Source code for genipe.tests.test_main_pipeline


# This file is part of genipe.
#
# This work is licensed under the Creative Commons Attribution-NonCommercial
# 4.0 International License. To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc/4.0/ or send a letter to Creative
# Commons, PO Box 1866, Mountain View, CA 94042, USA.


import os
import unittest
from random import randint
from tempfile import TemporaryDirectory

from .. import autosomes
from ..pipeline import cli
from ..error import GenipeError

if cli.HAS_PYFAIDX:
    import pyfaidx


__author__ = "Louis-Philippe Lemieux Perreault"
__copyright__ = "Copyright 2014, Beaulieu-Saucier Pharmacogenomics Centre"
__license__ = "Attribution-NonCommercial 4.0 International (CC BY-NC 4.0)"


__all__ = ["TestMainPipeline"]


[docs]class TestMainPipeline(unittest.TestCase):
[docs] def setUp(self): """Setup the tests.""" # Creating the temporary directory self.output_dir = TemporaryDirectory(prefix="genipe_test_")
[docs] def tearDown(self): """Finishes the test.""" # Deleting the output directory self.output_dir.cleanup()
[docs] def test_file_sorter(self): """Tests the 'file_sorter' function.""" filenames = [ "chr1.1000_100000.impute2", "chr2.10000_1002300.impute2", "chr1.1_100.impute2", "/some/path/chr1.1_10.impute2", "chr25_1.3_40.impute2", "chr23.100_400.impute2", "some/path/to_file/chr25_2.1000_1500.impute2.gz", "chr25_2.1600_1650.impute2.gz", "chr1.100000_2000000.impute2.some_extension", ] expected_filenames = [ "/some/path/chr1.1_10.impute2", "chr1.1_100.impute2", "chr1.1000_100000.impute2", "chr1.100000_2000000.impute2.some_extension", "chr2.10000_1002300.impute2", "chr23.100_400.impute2", "chr25_1.3_40.impute2", "some/path/to_file/chr25_2.1000_1500.impute2.gz", "chr25_2.1600_1650.impute2.gz", ] expected_results = [(1, 1000, 100000), (2, 10000, 1002300), (1, 1, 100), (1, 1, 10), (25, 3, 40), (23, 100, 400), (25, 1000, 1500), (25, 1600, 1650), (1, 100000, 2000000)] # Trying the function for filename, expected in zip(filenames, expected_results): self.assertEqual(expected, cli.file_sorter(filename)) # Trying the sort function filenames.sort(key=cli.file_sorter) self.assertEqual(filenames, expected_filenames)
[docs] def test_get_chromosome_length(self): """Tests the 'get_chromosome_length' function.""" # The expected chromosome expected_chrom = {3, 6, 9, 23, 25} expected_length = {} # Writing some legend file for different chromosome legend_template = os.path.join(self.output_dir.name, "chr{chrom}.legend") legend_chr23 = os.path.join(self.output_dir.name, "chr23_nonPAR.legend") legend_par1 = os.path.join(self.output_dir.name, "chr23_PAR1.legend") legend_par2 = os.path.join(self.output_dir.name, "chr23_PAR2.legend") for chrom in expected_chrom: if chrom == 23: # Getting the positions and expected length positions = sorted([randint(5000, 100000) for i in range(100)]) expected_length[chrom] = (min(positions), max(positions)) # Writing the positions to file with open(legend_chr23, "w") as o_file: print("id", "position", file=o_file) for i, position in enumerate(positions): print("marker_{}".format(i+1), position, file=o_file) # Continuing to next chromosome continue if chrom == 25: # Getting the positions and expected length positions = sorted([randint(1, 4999) for i in range(100)]) expected_length[chrom] = [max(positions)] # Writing the positions to file with open(legend_par1, "w") as o_file: print("id", "position", file=o_file) for i, position in enumerate(positions): print("marker_{}".format(i+1), position, file=o_file) # Getting the positions and expected length positions = sorted([ randint(100000, 200000) for i in range(100) ]) expected_length[chrom].extend([min(positions), max(positions)]) expected_length[chrom] = tuple(expected_length[chrom]) # Writing the positions to file with open(legend_par2, "w") as o_file: print("id", "position", file=o_file) for i, position in enumerate(positions): print("marker_{}".format(i+1), position, file=o_file) # Continuing to next chromosome continue # Getting the position and saving the expected length positions = sorted([randint(1, 2000000) for i in range(1000)]) expected_length[chrom] = max(positions) # Saving the file with open(legend_template.format(chrom=chrom), "w") as o_file: print("id", "position", file=o_file) for i, position in enumerate(positions): print("marker_{}".format(i+1), position, file=o_file) # Getting the chromosome length chrom_length = cli.get_chromosome_length( required_chrom=expected_chrom, legend=legend_template, legend_chr23=legend_chr23, legend_par1=legend_par1, legend_par2=legend_par2, out_dir=self.output_dir.name, ) # Checking the expected length self.assertEqual(expected_length, chrom_length) # Checking the file was created self.assertTrue(os.path.isfile(os.path.join(self.output_dir.name, "chromosome_lengths.txt"))) # Tests that we correctly read the file expected_chrom = {6: expected_length[6], 9: expected_length[9], 23: expected_length[23], 25: expected_length[25]} # Writing the file chrom_filename = os.path.join(self.output_dir.name, "chromosome_lengths.txt") with open(chrom_filename, "w") as o_file: for chrom, length in expected_chrom.items(): if (chrom == 23) or (chrom == 25): print(chrom, *length, sep="\t", file=o_file) else: print(chrom, length, sep="\t", file=o_file) # Comparing what we got chrom_length = cli.get_chromosome_length( required_chrom=expected_chrom.keys(), legend=legend_template, legend_chr23=legend_chr23, legend_par1=legend_par1, legend_par2=legend_par2, out_dir=self.output_dir.name, ) self.assertEqual(expected_chrom, chrom_length) # Removing some autosomes from the file del expected_chrom[9] del expected_chrom[23] del expected_chrom[25] with open(chrom_filename, "w") as o_file: for chrom, length in expected_chrom.items(): if (chrom == 23) or (chrom == 25): print(chrom, *length, sep="\t", file=o_file) else: print(chrom, length, sep="\t", file=o_file) expected_chrom[9] = expected_length[9] expected_chrom[23] = expected_length[23] expected_chrom[25] = expected_length[25] # Tests that a warning is logged if there is a missing chromosome with self.assertLogs(level="WARNING") as cm: chrom_length = cli.get_chromosome_length( required_chrom=sorted(expected_chrom.keys()), legend=legend_template, legend_chr23=legend_chr23, legend_par1=legend_par1, legend_par2=legend_par2, out_dir=self.output_dir.name, ) log_m = [ "WARNING:root:missing length for chromosome 9", "WARNING:root:missing length for chromosome 23", "WARNING:root:missing length for chromosome 25", ] self.assertEqual(log_m, cm.output) # Testing the content self.assertEqual(expected_chrom, chrom_length)
[docs] @unittest.skipIf(not cli.HAS_PYFAIDX, "optional requirement (pyfaidx) not satisfied") def test_get_chrom_encoding(self): """Tests the 'get_chrom_encoding' function.""" # Creating the reference file (fasta file) and index (using samtools) fasta_content = [[">{}".format(i), "ACGT"] for i in range(1, 25)] fasta_content.append([">26", "ACGT"]) fasta_content.append([">Unaligned", "ACGT"]) index_content = [ ["1", "4", "3", "4", "5"], ["2", "4", "11", "4", "5"], ["3", "4", "19", "4", "5"], ["4", "4", "27", "4", "5"], ["5", "4", "35", "4", "5"], ["6", "4", "43", "4", "5"], ["7", "4", "51", "4", "5"], ["8", "4", "59", "4", "5"], ["9", "4", "67", "4", "5"], ["10", "4", "76", "4", "5"], ["11", "4", "85", "4", "5"], ["12", "4", "94", "4", "5"], ["13", "4", "103", "4", "5"], ["14", "4", "112", "4", "5"], ["15", "4", "121", "4", "5"], ["16", "4", "130", "4", "5"], ["17", "4", "139", "4", "5"], ["18", "4", "148", "4", "5"], ["19", "4", "157", "4", "5"], ["20", "4", "166", "4", "5"], ["21", "4", "175", "4", "5"], ["22", "4", "184", "4", "5"], ["23", "4", "193", "4", "5"], ["24", "4", "202", "4", "5"], ["26", "4", "210", "4", "5"], ["Unaligned", "4", "226", "4", "5"], ] reference_filename = os.path.join(self.output_dir.name, "ref.fasta") with open(reference_filename, "w") as o_file: for chromosome_fasta in fasta_content: print(*chromosome_fasta, sep="\n", file=o_file) with open(reference_filename + ".fai", "w") as o_file: for line in index_content: print(*line, sep="\t", file=o_file) # Reading the reference using pyfaidx reference = pyfaidx.Fasta(reference_filename, as_raw=True) # The expected result expected = {str(i): str(i) for i in range(1, 25)} expected["26"] = "26" # The observed result observed = cli.get_chrom_encoding(reference) self.assertEqual(expected, observed) reference.close() # Replacing 23 and 24 to X and Y fasta_content[22][0] = ">X" fasta_content[23][0] = ">Y" index_content[22][0] = "X" index_content[23][0] = "Y" # Writing to file with open(reference_filename, "w") as o_file: for chromosome_fasta in fasta_content: print(*chromosome_fasta, sep="\n", file=o_file) with open(reference_filename + ".fai", "w") as o_file: for line in index_content: print(*line, sep="\t", file=o_file) # Reading the reference using pyfaidx reference = pyfaidx.Fasta(reference_filename, as_raw=True) # The expected result expected["23"] = "X" expected["24"] = "Y" # The observed result observed = cli.get_chrom_encoding(reference) self.assertEqual(expected, observed) reference.close() # Adding chr everywhere for i in range(len(fasta_content)): fasta_content[i][0] = ">chr" + fasta_content[i][0][1:] index_content[i][0] = "chr" + index_content[i][0] # Writing to file with open(reference_filename, "w") as o_file: for chromosome_fasta in fasta_content: print(*chromosome_fasta, sep="\n", file=o_file) with open(reference_filename + ".fai", "w") as o_file: for line in index_content: print(*line, sep="\t", file=o_file) # Reading the reference using pyfaidx reference = pyfaidx.Fasta(reference_filename, as_raw=True) # The expected result expected = {str(i): "chr{}".format(i) for i in range(1, 23)} expected["23"] = "chrX" expected["24"] = "chrY" expected["26"] = "chr26" # The observed result observed = cli.get_chrom_encoding(reference) self.assertEqual(expected, observed) reference.close() # Replacing 23 and 24 to X and Y fasta_content[22][0] = ">chr23" fasta_content[23][0] = ">chr24" index_content[22][0] = "chr23" index_content[23][0] = "chr24" # Writing to file with open(reference_filename, "w") as o_file: for chromosome_fasta in fasta_content: print(*chromosome_fasta, sep="\n", file=o_file) with open(reference_filename + ".fai", "w") as o_file: for line in index_content: print(*line, sep="\t", file=o_file) # Reading the reference using pyfaidx reference = pyfaidx.Fasta(reference_filename, as_raw=True) # The expected result expected["23"] = "chr23" expected["24"] = "chr24" # The observed result observed = cli.get_chrom_encoding(reference) self.assertEqual(expected, observed) reference.close() # Finally, removing from chromosome 18 to trigger warnings fasta_content = fasta_content[:18] index_content = index_content[:18] # Writing to file with open(reference_filename, "w") as o_file: for chromosome_fasta in fasta_content: print(*chromosome_fasta, sep="\n", file=o_file) with open(reference_filename + ".fai", "w") as o_file: for line in index_content: print(*line, sep="\t", file=o_file) # Reading the reference using pyfaidx reference = pyfaidx.Fasta(reference_filename, as_raw=True) # The observed result with self.assertLogs(level="WARNING") as cm: cli.get_chrom_encoding(reference) log_m = [ "WARNING:root:{}: chromosome not in reference".format(i) for i in range(19, 27) if i != 25 ] self.assertEqual(log_m, cm.output) reference.close()
[docs] @unittest.skipIf(not cli.HAS_PYFAIDX, "optional requirement (pyfaidx) not satisfied") def test_is_reversed(self): """Tests the 'is_reversed' function.""" # Creating the reference file (fasta file) and index (using samtools) fasta_content = ( ">1\n" "ACGT\n" ">2\n" "ACGT\n" ">3\n" "acgt\n" ) index_content = ( "1\t4\t3\t4\t5\n" "2\t4\t11\t4\t5\n" "3\t4\t19\t4\t5\n" ) reference_filename = os.path.join(self.output_dir.name, "ref.fasta") with open(reference_filename, "w") as o_file: o_file.write(fasta_content) with open(reference_filename + ".fai", "w") as o_file: o_file.write(index_content) # Reading the reference using pyfaidx reference = pyfaidx.Fasta(reference_filename, as_raw=True) # The chromosome encoding encoding = {"1": "1", "2": "2", "3": "3"} # Testing invalid allele (should return False) self.assertFalse(cli.is_reversed( "1", 1, "I", "D", reference, encoding), ) self.assertFalse(cli.is_reversed( "1", 1, "Z", "A", reference, encoding), ) self.assertFalse(cli.is_reversed( "1", 1, "A", "K", reference, encoding), ) # Testing invalid chromosome (should return False) self.assertFalse(cli.is_reversed( "23", 1, "A", "C", reference, encoding), ) # Testing invalid position (should return False) self.assertFalse(cli.is_reversed( "1", 100, "A", "C", reference, encoding), ) # Testing valid input, without strand problem (should return False) self.assertFalse(cli.is_reversed( "1", 3, "G", "T", reference, encoding), ) self.assertFalse(cli.is_reversed( "2", 4, "G", "T", reference, encoding), ) self.assertFalse(cli.is_reversed( "3", 2, "g", "c", reference, encoding), ) # Testing valid input, but strand problem (should return True) self.assertTrue(cli.is_reversed("1", 1, "T", "G", reference, encoding)) self.assertTrue(cli.is_reversed("2", 2, "t", "g", reference, encoding)) self.assertTrue(cli.is_reversed("3", 3, "T", "C", reference, encoding)) self.assertTrue(cli.is_reversed("1", 4, "A", "C", reference, encoding)) # Closing the reference reference.close()
[docs] @unittest.skip("Test not implemented") def test_read_preamble(self): """Tests the 'read_preamble' function.""" self.fail("Test not implemented")
[docs] @unittest.skip("Test not implemented") def test_get_cross_validation_results(self): """Tests the 'get_cross_validation_results' function.""" self.fail("Test not implemented")
[docs] @unittest.skip("Test not implemented") def test_gather_imputation_stats(self): """Tests the 'gather_imputation_stats' function.""" self.fail("Test not implemented")
[docs] def test_gather_maf_stats(self): """Tests the 'gather_maf_stats' function.""" # Creating one file per chromosome with two markers in each header = ["name", "maf"] content = [ ["marker_1", "0.0"], ["marker_2", "0.2"], ["marker_3", "NA"], ["marker_4", "0.3"], ["marker_5", "9.4e-05"], ["marker_6", "0.04"], ["marker_7", "0.01"], ["marker_8", "0.003"], ["marker_9", "0.015"], ["marker_10", "0.020"], ["marker_11", "0.005"], ["marker_12", "0.004"], ["marker_13", "0.05"], ["marker_14", "0.055"], ["marker_15", "0.001"], ["marker_16", "0.004"], ["marker_17", "0.056"], ["marker_18", "0.0123"], ["marker_19", "0.005"], ["marker_20", "0.012"], ["marker_21", "0.001"], ["marker_22", "0.0316"], ["marker_23", "0.432"], ["marker_24", "0.03423"], ["marker_25", "0.00514"], ["marker_26", "0.01004"], ["marker_27", "0.011"], ["marker_28", "0.051"], ["marker_29", "0.048"], ["marker_30", "0.0484"], ["marker_31", "0.0871"], ["marker_32", "0.5"], ["marker_33", "0.006"], ["marker_34", "0.06"], ["marker_35", "0.08"], ["marker_36", "0.0784"], ["marker_37", "0.0984"], ["marker_38", "0.19444"], ["marker_39", "1.5e-04"], ["marker_40", "NA"], ["marker_41", "4.87e-07"], ["marker_42", "5.4e-08"], ["marker_43", "0.394"], ["marker_44", "0.004"], ] good_sites = ["marker_1", "marker_2", "marker_5", "marker_6", "marker_7", "marker_8", "marker_9", "marker_10", "marker_11", "marker_12", "marker_13", "marker_14", "marker_15", "marker_16", "marker_17", "marker_18", "marker_19", "marker_20", "marker_21", "marker_22", "marker_23", "marker_24", "marker_25", "marker_26", "marker_27", "marker_28", "marker_29", "marker_30", "marker_31", "marker_32", "marker_33", "marker_34", "marker_35", "marker_36", "marker_37", "marker_38", "marker_39", "marker_41", "marker_43", "marker_44"] # The PDF generated frequency_barh = "" if cli.HAS_MATPLOTLIB: frequency_barh = os.path.join(self.output_dir.name, "frequency_barh.pdf") # The expected results nb_sites = len(good_sites) expected_results = { "nb_marker_with_maf": str(nb_sites), "nb_maf_geq_01": "26", "pct_maf_geq_01": "{:.1f}".format(26 / nb_sites * 100), "nb_maf_geq_05": "14", "pct_maf_geq_05": "{:.1f}".format(14 / nb_sites * 100), "nb_maf_lt_05": "26", "pct_maf_lt_05": "{:.1f}".format(26 / nb_sites * 100), "nb_maf_lt_01": "14", "pct_maf_lt_01": "{:.1f}".format(14 / nb_sites * 100), "nb_maf_geq_01_lt_05": "12", "pct_maf_geq_01_lt_05": "{:.1f}".format(12 / nb_sites * 100), "nb_maf_nan": "0", "frequency_barh": frequency_barh, } # Creating the files for the test filename_template = os.path.join(self.output_dir.name, "chr{chrom}", "final_impute2", "chr{chrom}.imputed.{suffix}") for chrom in autosomes: # Getting the name of the file maf_filename = filename_template.format(chrom=chrom, suffix="maf") good_sites_filename = filename_template.format(chrom=chrom, suffix="good_sites") # Getting the directory and create it dirname = os.path.dirname(maf_filename) if not os.path.isdir(dirname): os.makedirs(dirname) self.assertTrue(os.path.isdir(dirname)) # Creating the content of the maf file with open(maf_filename, "w") as o_file: print(*header, sep="\t", file=o_file) for i in range(2): print(*content.pop(), sep="\t", file=o_file) # Creating the content of the good sites file with open(good_sites_filename, "w") as o_file: print(*good_sites, sep="\n", file=o_file) # Checking the files were created self.assertTrue(os.path.isfile(maf_filename)) self.assertTrue(os.path.isfile(good_sites_filename)) # Checking we passed all the content (MAF) self.assertEqual(0, len(content)) # Executing the command (getting the observed data) observed = cli.gather_maf_stats( required_chrom=autosomes, o_dir=self.output_dir.name, ) # Checking the observed results self.assertEqual(len(expected_results), len(observed)) for expected_key, expected_value in expected_results.items(): self.assertTrue(expected_key in observed) self.assertEqual(expected_value, observed[expected_key]) # If matplotlib is installed, checking we have a figure (and not # otherwise) if cli.HAS_MATPLOTLIB: self.assertTrue(os.path.isfile(frequency_barh)) else: self.assertFalse(os.path.isfile(frequency_barh)) # Testing an invalid entry changed_filename = filename_template.format(chrom=1, suffix="maf") with open(changed_filename, "w") as o_file: print(*header, sep="\t", file=o_file) print("marker_1", "0.6", sep="\t", file=o_file) # This should raise an exception with self.assertRaises(GenipeError) as cm: cli.gather_maf_stats( required_chrom=autosomes, o_dir=self.output_dir.name, ) self.assertEqual("{}: {}: invalid MAF".format("marker_1", round(0.6, 3)), str(cm.exception)) # Testing an invalid entry changed_filename = filename_template.format(chrom=1, suffix="maf") with open(changed_filename, "w") as o_file: print(*header, sep="\t", file=o_file) print("marker_1", "-0.01", sep="\t", file=o_file) # This should raise an exception with self.assertRaises(GenipeError) as cm: cli.gather_maf_stats( required_chrom=autosomes, o_dir=self.output_dir.name, ) self.assertEqual("{}: {}: invalid MAF".format("marker_1", round(-0.01, 3)), str(cm.exception)) # Testing a good site with NA MAF changed_filename = filename_template.format(chrom=1, suffix="maf") with open(changed_filename, "w") as o_file: print(*header, sep="\t", file=o_file) print("marker_1", "NA", sep="\t", file=o_file) # This should issue a warning with self.assertLogs(level="WARNING") as cm: cli.gather_maf_stats( required_chrom=autosomes, o_dir=self.output_dir.name, ) log_m = "WARNING:root:chr1: good sites with invalid MAF (NaN)" self.assertEqual(1, len(cm.output)) self.assertEqual(log_m, cm.output[0]) # Clearing the good sites file to see if we have a warning for chrom in autosomes: filename = filename_template.format(chrom=chrom, suffix="good_sites") with open(filename, "w") as o_file: pass # This should issue a warning with self.assertLogs(level="WARNING") as cm: cli.gather_maf_stats( required_chrom=autosomes, o_dir=self.output_dir.name, ) log_m = ("WARNING:root:There were no marker with MAF (something went " "wrong)") self.assertEqual(1, len(cm.output)) self.assertEqual(log_m, cm.output[0]) # Deleting a good sites file, and checking we have an error removed_filename = filename_template.format(chrom=1, suffix="good_sites") os.remove(removed_filename) self.assertFalse(os.path.isfile(removed_filename)) # This should raise an exception with self.assertRaises(GenipeError) as cm: cli.gather_maf_stats( required_chrom=autosomes, o_dir=self.output_dir.name, ) self.assertEqual("{}: no such file".format(removed_filename), str(cm.exception)) # Deleting a MAF sites file, and checking we have an error removed_filename = filename_template.format(chrom=1, suffix="maf") os.remove(removed_filename) self.assertFalse(os.path.isfile(removed_filename)) # This should raise an exception with self.assertRaises(GenipeError) as cm: cli.gather_maf_stats( required_chrom=autosomes, o_dir=self.output_dir.name, ) self.assertEqual("{}: no such file".format(removed_filename), str(cm.exception))
[docs] @unittest.skip("Test not implemented") def test_gather_execution_time(self): """Tests the 'gather_execution_time' function.""" self.fail("Test not implemented")