Source code for genipe.task.launcher


# This file is part of genipe.
#
# This work is licensed under the Creative Commons Attribution-NonCommercial
# 4.0 International License. To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc/4.0/ or send a letter to Creative
# Commons, PO Box 1866, Mountain View, CA 94042, USA.


import os
import re
import sys
import time
import shlex
import logging
import traceback
from os.path import isfile
from datetime import datetime
from multiprocessing import Pool
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from multiprocessing.pool import ThreadPool

from ..db import utils as db
from ..error import GenipeError


__author__ = "Louis-Philippe Lemieux Perreault"
__copyright__ = "Copyright 2014, Beaulieu-Saucier Pharmacogenomics Centre"
__license__ = "Attribution-NonCommercial 4.0 International (CC BY-NC 4.0)"


__all__ = ["launch_tasks", ]


[docs]def launch_tasks(to_process, nb_threads, check_rc=True, hpc=False, hpc_options=None, out_dir=None, preamble=""): """Executes commands. Args: to_process (list): a list of tasks to process nb_threads (int): the number of processes that is required check_rc (bool): whether or not to check the return code of the task hpc (bool): whether or not to execute the tasks on a cluster (DRMAA) hpc_options (dict): the DRMAA options out_dir (str): the output directory preamble (str): the script preamble (for DRMAA) """ # Do we need a DRMAA session? drmaa_session = None if hpc: import drmaa drmaa_session = drmaa.Session() drmaa_session.initialize() # Do we need to check the return code? to_run = [] for i in range(len(to_process)): assert "name" in to_process[i] assert "task_id" in to_process[i] assert "task_db" in to_process[i] assert "o_files" in to_process[i] task_name = to_process[i]["name"] task_id = to_process[i]["task_id"] db_name = to_process[i]["task_db"] o_files = to_process[i]["o_files"] # Checking if we need to run this task if db.check_task_completion(task_id, db_name): if _check_output_files(o_files, task_id): run_time = db.get_task_runtime(task_id, db_name) logging.info("Task '{}': already performed in {:,d} " "seconds".format(task_name, run_time)) continue else: # The DB said the task was completed, but there is a missing # output files. Setting this task completion to '0' db.mark_task_incomplete(task_id, db_name) # The name of the task task_id = to_process[i]["task_id"] # Some options to add to_process[i]["check_retcode"] = check_rc to_process[i]["out_dir"] = out_dir # Setting the task options if hpc: assert hpc_options is not None walltime = None nodes = None if task_id in hpc_options: if "walltime" in hpc_options[task_id]: walltime = hpc_options[task_id]["walltime"] if "nodes" in hpc_options[task_id]: nodes = hpc_options[task_id]["nodes"] to_process[i]["walltime"] = walltime to_process[i]["nodes"] = nodes to_process[i]["preamble"] = preamble to_process[i]["drmaa_session"] = drmaa_session # Adding to list to run to_run.append(to_process[i]) # The execution function and the execution type execute_func = _execute_command GenipePool = Pool if hpc: execute_func = _execute_command_drmaa GenipePool = ThreadPool # Launching the command if nb_threads > 1: # Running all the processes pool = GenipePool(processes=nb_threads) results = None try: results = pool.map(execute_func, to_run) except Exception as e: pool.terminate() traceback.print_exc(file=sys.stdout) raise finally: # Closing the pool pool.close() if drmaa_session is not None: drmaa_session.exit() # Checking the results problems = [] for result in results: if not result[0]: problems.append(result[1]) logging.error("Task '{}': did not finish...".format(result[1])) else: logging.info("Task '{}': {} in {:,d} " "seconds".format(result[1], result[2], result[3])) if len(problems) > 0: raise GenipeError("the following task did not work: " + repr(problems)) else: try: for data in to_run: logging.info("Executing {}".format(data["name"])) result = execute_func(data) if result[0]: logging.info("Task '{}': {} in {:,d} " "seconds".format( result[1], result[2], result[3], )) else: raise GenipeError( "problem executing {}".format(data["name"]) ) finally: if drmaa_session is not None: drmaa_session.exit()
def _check_output_files(o_files, task): """Check that the files exist. Args: o_files (list): the list of files got check task (str): the name of the task Returns: bool: ``True`` if all files exist, ``False`` otherwise If the file to check is an impute2 file, and that this file is missing, we check for further statistics using the :py:func:`_check_impute2_file`. Note ---- If the file name ends with ``.impute2`` and the file doesn't exist, we look for the compressed file (``.impute2.gz``) instead. """ for filename in o_files: if filename.endswith(".impute2"): # IMPUTE2 files might be gzipped if not (isfile(filename) or isfile(filename + ".gz")): if not _check_impute2_file(filename, task): return False elif filename.endswith(".snp.strand"): # SHAPEIT alignment file might not exits if not isfile(filename): if not _check_shapeit_align_file(filename, task): return False elif not isfile(filename): return False return True def _check_shapeit_failed_rc(fn, task=None): """Checks the log to explain a failure return code. Args: fn (str): the name of the file to check task (str): the name of the task Returns: bool: ``True`` if everything is norma, ``False`` otherwise This function looks for a known error message in the log file. If the message ``ERROR: Reference and Main panels are not well aligned:`` appears in the log file, then it's normal that the job failed. """ # The name of the log file log_fn = fn.replace(".snp.strand", "") + ".log" if not os.path.isfile(log_fn): # The log file doesn't exist... return False # Reading the content of the file log = None with open(log_fn, "r") as i_file: log = i_file.read() # Checking that the SNPs were read in the legend file (the strand error # message is just after this notice). match = re.search( r"\sERROR: Reference and Main panels are not well aligned:\n", log, ) if match is None: return False return True def _check_shapeit_align_file(fn, task=None): """Checks the log to explain the absence of an .snp.strand file. Args: fn (str): the name of the file to check task (str): the name of the task Returns: bool: ``True`` if everything is normal, ``False`` otherwise. This function looks for known message in the log file. If the SNPs were read from the legend file and the haplotypes were read from the hap file, then there were no SNPs flip issue. """ # The name of the log file log_fn = fn.replace(".snp.strand", "") + ".log" if not os.path.isfile(log_fn): # The log file doesn't exist... return False # Reading the content of the file log = None with open(log_fn, "r") as i_file: log = i_file.read() # Checking that the SNPs were read in the legend file (the strand error # message is just after this notice). match = re.search(r"\sReading SNPs in \[.+\]\n", log) if match is None: return False # Checking if the step after was run (i.e. meaning there were no strand # issue) match = re.search(r"\sReading reference haplotypes in \[.+\]\n", log) if match is None: return False # We are here, so it is normal that no *.snp.strand file exists if task: logging.info("{}: there are no flip issue".format(task)) return True def _check_impute2_file(fn, task=None): """Checks the summary to explain the absence of an .impute2 file. Args: fn (str): the name of the file to check task (str): the name of the task Returns: bool: ``True`` if everything is normal, ``False`` otherwise. This function looks for known message in the summary file. Three possible ways that an impute2 file is missing: 1. there are no SNPs in the imputation interval; 2. there are no type 2 SNPs after applying the settings; 3. there are no SNPs for output. """ # The name of the summary file summary_fn = fn + "_summary" if not os.path.isfile(summary_fn): # The summary file doesn't exist... return False # Reading the file content summary = None with open(summary_fn, "r") as i_file: summary = i_file.read() # Checking if there are no SNPs in the imputation interval? match = re.search( r"\sThere are no SNPs in the imputation interval, so there is " "nothing for IMPUTE2 to analyze; the program will quit now.", summary, ) if match is not None: if task: logging.warning("{}: there are no SNPs in the imputation " "interval".format(task)) return True # Checking if there are not type 2 SNPs match = re.search( r"\sERROR: There are no type 2 SNPs after applying the command-line " "settings for this run, which makes it impossible to perform " "imputation.", summary, ) if match is not None: if task: logging.warning("{}: there are no type 2 SNPs for this " "run".format(task)) return True # Checking if there are no output SNPs match = re.search( r"\sYour current command-line settings imply that there will not be " "any SNPs in the output file, so IMPUTE2 will not perform any " "analysis or print output files.", summary, ) if match is not None: if task: logging.warning("{}: no SNPs in the output file".format(task)) return True # If attained, there is a problem return False def _execute_command(command_info): """Executes a single command. Args: command_info (dict): information about the command Returns: tuple: a tuple containing 4 entries: whether the task completed (bool), the name of the task (str), the status of the run (str) and the execution time in seconds (int) """ # Some assertions assert "task_id" in command_info assert "name" in command_info assert "command" in command_info assert "check_retcode" in command_info assert "task_db" in command_info assert "o_files" in command_info # Getting the command's information name = command_info["name"] command = command_info["command"] check_rc = command_info["check_retcode"] task_id = command_info["task_id"] db_name = command_info["task_db"] logging.debug("Checking status for '{}'".format(task_id)) # Checking if the command was completed if db.check_task_completion(task_id, db_name): if _check_output_files(command_info["o_files"], task_id): logging.debug("'{}' completed".format(task_id)) runtime = db.get_task_runtime(task_id, db_name) return True, name, "already performed", runtime else: logging.debug("'{}' problem with output files".format(task_id)) db.mark_task_incomplete(task_id, db_name) logging.debug("'{}' to run".format(task_id)) # Creating a new entry in the database db.create_task_entry(task_id, db_name) # Launching the command proc = Popen(command, stdout=PIPE, stderr=PIPE) logging.debug("'{}' finished".format(task_id)) # Waiting for the process to terminate outs, errs = proc.communicate() rc = proc.returncode if check_rc and rc != 0: if task_id.startswith("impute2"): # Task is IMPUTE2, and it might be normal according to message in # the summary file impute2_fn = None for fn in command_info["o_files"]: if fn.endswith(".impute2"): impute2_fn = fn break if not _check_impute2_file(impute2_fn): logging.debug("'{}' exit status problem".format(task_id)) return False, name, "problem", None elif task_id.startswith("shapeit_check"): shapeit_fn = None for fn in command_info["o_files"]: if fn.endswith(".alignments.snp.strand"): shapeit_fn = fn break if not _check_shapeit_failed_rc(shapeit_fn): logging.debug("'{}' exit status problem".format(task_id)) return False, name, "problem", None else: # There was a problem... logging.debug("'{}' exit status problem".format(task_id)) return False, name, "problem", None # Checking all the required files were generated if not _check_output_files(command_info["o_files"], task_id): logging.debug("'{}' exit status problem".format(task_id)) return False, name, "problem", None # The task was performed correctly, so we update to completed db.mark_task_completed(task_id, db_name) # Everything when well logging.debug("'{}' everything was fine".format(task_id)) return True, name, "performed", db.get_task_runtime(task_id, db_name) def _execute_command_drmaa(command_info): """Executes a command using DRMAA (usually on a HPC). Args: command_info (dict): information about the command Returns: tuple: a tuple containing 4 entries: whether the task completed (bool), the name of the task (str), the status of the run (str) and the execution time in seconds (int) Note ---- The preamble (if required) is inserted between the shebang line and the actual command. """ # Some import from drmaa import Session, JobControlAction # Some assertions assert "out_dir" in command_info assert "command" in command_info assert "walltime" in command_info assert "nodes" in command_info assert "task_id" in command_info assert "task_db" in command_info assert "name" in command_info assert "check_retcode" in command_info assert "o_files" in command_info assert "preamble" in command_info assert "drmaa_session" in command_info # Getting the command's information name = command_info["name"] command = command_info["command"] task_id = command_info["task_id"] db_name = command_info["task_db"] out_dir = command_info["out_dir"] check_rc = command_info["check_retcode"] preamble = command_info["preamble"] drmaa_session = command_info["drmaa_session"] # Checking if the command was completed logging.debug("Checking status for '{}'".format(task_id)) if db.check_task_completion(task_id, db_name): if _check_output_files(command_info["o_files"], task_id): logging.debug("'{}' completed".format(task_id)) runtime = db.get_task_runtime(task_id, db_name) return True, name, "already performed", runtime else: logging.debug("'{}' problem with output files".format(task_id)) db.mark_task_incomplete(task_id, db_name) else: logging.debug("'{}' to run because not completed".format(task_id)) logging.debug("'{}' to run".format(task_id)) # Creating the script tmp_file = NamedTemporaryFile(mode="w", suffix="_execute.sh", delete=False, dir=out_dir) # Writing the shebang print("#!/usr/bin/env bash", file=tmp_file) # Writing the preamble print(preamble, file=tmp_file) # Writing the command print(command[0], end=" ", file=tmp_file) for chunk in command[1:]: print(shlex.quote(chunk), end=" ", file=tmp_file) print("", file=tmp_file) # Closing the temporary file tmp_file.close() # Making the script executable os.chmod(tmp_file.name, 0o755) # Creating the job template job = drmaa_session.createJobTemplate() job.remoteCommand = tmp_file.name job.jobName = "_{}".format(task_id) job.workingDirectory = os.getcwd() if command_info["walltime"] is not None: job.hardWallclockTimeLimit = command_info["walltime"] if command_info["nodes"] is not None: job.nativeSpecification = command_info["nodes"] # Creating a new entry in the database db.create_task_entry(task_id, db_name) # Running the job job_id = drmaa_session.runJob(job) # Waiting for the job ret_val = None try: ret_val = drmaa_session.wait(job_id, Session.TIMEOUT_WAIT_FOREVER) except KeyboardInterrupt: drmaa_session.control(job_id, JobControlAction.TERMINATE) logging.warning("{}: terminated".format(task_id)) raise finally: drmaa_session.deleteJobTemplate(job) # The job is done logging.debug("'{}' finished".format(task_id)) # Removing the temporary file os.remove(tmp_file.name) # Checking the task's return values if ret_val.hasCoreDump or ret_val.wasAborted or ret_val.hasSignal: logging.debug("'{}' problems ({}, {}, {})".format( task_id, ret_val.hasCoreDump, ret_val.wasAborted, ret_val.hasSignal, )) return False, name, "problem", None if check_rc and ret_val.exitStatus != 0: if task_id.startswith("impute2"): # Task is IMPUTE2, and it might be normal according to message in # the summary file impute2_fn = None for fn in command_info["o_files"]: if fn.endswith(".impute2"): impute2_fn = fn break if not _check_impute2_file(impute2_fn): logging.debug("'{}' exit status problem".format(task_id)) return False, name, "problem", None elif task_id.startswith("shapeit_check"): shapeit_fn = None for fn in command_info["o_files"]: if fn.endswith(".alignments.snp.strand"): shapeit_fn = fn break if not _check_shapeit_failed_rc(shapeit_fn): logging.debug("'{}' exit status problem".format(task_id)) return False, name, "problem", None else: # There was a problem... logging.debug("'{}' exit status problem".format(task_id)) return False, name, "problem", None # Checking all the required files were generated if not _check_output_files(command_info["o_files"], task_id): logging.debug("'{}' exit status problem".format(task_id)) return False, name, "problem", None # Getting the launch time (should always been present) launch_time = float(ret_val.resourceUsage["submission_time"]) # Getting the start time. If 'start_time' is missing from the dictionary # (e.g. using Slurm), we use the launch time as the start time start_time = float(ret_val.resourceUsage.get("start_time", launch_time)) # Getting the end time. If 'end_time' is missing from the dictionary # (e.g. using Slurm), we use the current time as the end time end_time = float(ret_val.resourceUsage.get( "end_time", time.mktime(datetime.now().timetuple()), )) # The task was performed correctly, so we update to completed db.mark_drmaa_task_completed(task_id, launch_time, start_time, end_time, db_name) # Everything when well logging.debug("'{}' everything was fine".format(task_id)) return True, name, "performed", db.get_task_runtime(task_id, db_name)