# This file is part of genipe.
#
# This work is licensed under the Creative Commons Attribution-NonCommercial
# 4.0 International License. To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc/4.0/ or send a letter to Creative
# Commons, PO Box 1866, Mountain View, CA 94042, USA.
import os
import logging
import sqlite3
from datetime import datetime
__author__ = "Louis-Philippe Lemieux Perreault"
__copyright__ = "Copyright 2014, Beaulieu-Saucier Pharmacogenomics Centre"
__license__ = "Attribution-NonCommercial 4.0 International (CC BY-NC 4.0)"
__all__ = ["create_task_db", "check_task_completion", "create_task_entry",
"mark_task_completed", "mark_task_incomplete", "get_task_runtime",
"get_all_runtimes", "mark_drmaa_task_completed"]
[docs]def create_task_db(out_dir):
"""Creates a task DB.
Args:
out_dir (str): the directory where the DB will be saved
Returns:
str: the name of the file containing the DB
A SQLITE database will be created in the ``out_dir`` directory (with the
name ``tasks.db``. The ``genipe_task`` table is automatically created.
"""
# The name
db_name = os.path.join(out_dir, "tasks.db")
logging.info("Connecting to DB '{}'".format(db_name))
# The DB
conn, c = _create_db_connection(db_name)
# Creating the table if it doesn't exists
c.execute("""CREATE TABLE IF NOT EXISTS genipe_task (
name TEXT PRIMARY KEY,
launch TIMESTAMP,
start TIMESTAMP,
end TIMESTAMP,
completed INT)""")
# Committing the changes
conn.commit()
conn.close()
return db_name
def _create_db_connection(db_name):
"""Creates a DB connection.
Args:
db_name (str): the name of the database (usually a file)
Returns:
tuple: a tuple containing the connection object and a cursor to that
object
"""
conn = sqlite3.connect(
db_name,
timeout=1800,
detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
)
c = conn.cursor()
return conn, c
[docs]def check_task_completion(task_id, db_name):
"""Checks if the task exists and if it's completed.
Args:
task_id (str): the ID of the task
db_name (str): the name of the database (usually a file)
Returns:
bool: ``True`` if the task exists **and** is completed, ``False``
otherwise
Note
----
A task is completed if the column ``completed`` equals 1. It is not
completed otherwise.
"""
conn, c = _create_db_connection(db_name)
# Retrieving the task information
c.execute("SELECT completed FROM genipe_task WHERE name=?", (task_id, ))
r = c.fetchone()
conn.close()
if r is None:
# There is not entry with this task ID
logging.debug("'{}' no entry".format(task_id))
return False
if r[0] is None or r[0] != 1:
# There is an entry, but it wasn't completed
logging.debug("'{}' not completed ({})".format(task_id, r[0]))
return False
return True
[docs]def create_task_entry(task_id, db_name):
"""Creates (or updates) a task.
Args:
task_id (str): the ID of the task
db_name (str): the name of the database (usually a file)
If the task ID doesn't exist in the DB, a new one will be created with the
current time as launch and start time.
If the task ID already exist, it is presumed that the task will be
relaunched, hence the database entry is updated to the current time (for
launch and start time) and ``completed`` is set to ``0``.
"""
conn, c = _create_db_connection(db_name)
# Checking if the entry already exists
c.execute("SELECT name FROM genipe_task WHERE name=?", (task_id, ))
r = c.fetchone()
# The time of launch
time = datetime.now()
if r is None:
# This is the first time we see this task, so we create an new entry
c.execute("INSERT INTO genipe_task (name, launch, start) "
"VALUES (?, ?, ?)",
(task_id, time, time))
else:
# We saw this task, but we need to relaunch it (setting completed=0)
c.execute("UPDATE genipe_task "
"SET launch=?, start=?, completed=0 WHERE name=?",
(time, time, task_id))
conn.commit()
conn.close()
[docs]def mark_task_completed(task_id, db_name):
"""Marks the task as completed.
Args:
task_id (str): the ID of the task
db_name (str): the name of the DB (usually a file)
The task entry is modified so that ``completed=1`` and the end time is
updated to the current time.
"""
conn, c = _create_db_connection(db_name)
# Updating the end time
c.execute("UPDATE genipe_task SET end=?, completed=1 WHERE name=?",
(datetime.now(), task_id))
conn.commit()
conn.close()
[docs]def mark_task_incomplete(task_id, db_name):
"""Marks a task as incomplete.
Args:
task_id (str): the ID of the task
db_name (str): the name of the DB (usually a file)
The task entry is set as incomplete by updating the ``completed`` value to
``0``.
"""
conn, c = _create_db_connection(db_name)
# Setting the completion to 0 for this task
c.execute("UPDATE genipe_task SET completed=0 WHERE name=?", (task_id, ))
conn.commit()
conn.close()
[docs]def mark_drmaa_task_completed(task_id, launch_time, start_time, end_time,
db_name):
"""Marks a task run by DRMAA as completed (while updating times).
Args:
task_id (str): the ID of the task
launch_time (float): the launch time (according to DRMAA)
start_time (float): the start time (according to DRMAA)
end_time (float): the end time (according to DRMAA)
db_name (str): the name of the DB (usually a file)
The task entry is updated with the launch, start and end time. Those times
come from the DRMAA library. The launch time is the time at which the task
was launched to the scheduler. The start time correspond to the time that
the scheduler started the job on the cluster. Finally, the end time is the
time that the job was completed.
"""
conn, c = _create_db_connection(db_name)
# The time
launch_time = datetime.fromtimestamp(launch_time)
start_time = datetime.fromtimestamp(start_time)
end_time = datetime.fromtimestamp(end_time)
# Updating
c.execute("UPDATE genipe_task SET launch=?, start=?, end=?, completed=1 "
"WHERE name=?", (launch_time, start_time, end_time, task_id))
conn.commit()
conn.close()
[docs]def get_task_runtime(task_id, db_name):
"""Gets the task run time.
Args:
task_id (str): the ID of the task
db_name (str): the name of the DB (usually a file)
Returns:
int: the execution time of the task (in seconds)
"""
conn, c = _create_db_connection(db_name)
# Getting the start and end time
c.execute("SELECT start, end FROM genipe_task WHERE name=?", (task_id, ))
r = c.fetchone()
conn.close()
return int(round((r[1] - r[0]).total_seconds(), ndigits=0))
[docs]def get_all_runtimes(db_name):
"""Gets all tasks execution time.
Args:
db_name (str): the name of the DB (usually a file)
Returns:
dict: the execution time (seconds) of all the tasks in the database
This function returns a dictionary of task ID (keys) pointing to execution
time (in second) (int).
"""
conn, c = _create_db_connection(db_name)
# Getting the start and end time
c.execute("SELECT name, start, end FROM genipe_task")
r = c.fetchall()
conn.close()
# Computing the execution time
final = {}
for entry in r:
name, start, end = entry
if (start is None) or (end is None):
logging.warning("{}: no execution time for task".format(name))
continue
final[name] = int(round((end - start).total_seconds(), ndigits=0))
return final