Source code for genipe.formats.index


# This file is part of genipe, but came from gepyto.
#
# This work is licensed under the Creative Commons Attribution-NonCommercial
# 4.0 International License. To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc/4.0/ or send a letter to Creative
# Commons, PO Box 1866, Mountain View, CA 94042, USA.


import io
import os
import zlib
import logging

import numpy as np
import pandas as pd

from ..error import GenipeError


__author__ = "Louis-Philippe Lemieux Perreault"
__copyright__ = "Copyright 2014, Beaulieu-Saucier Pharmacogenomics Centre"
__license__ = "Attribution-NonCommercial 4.0 International (CC BY-NC 4.0)"


__all__ = ["get_index", "get_open_func"]


_CHECK_STRING = b"GENIPE INDEX FILE"

try:
    from Bio.bgzf import BgzfReader
    HAS_BIOPYTHON = True
except ImportError:
    HAS_BIOPYTHON = False


def _seek_generator(f):
    """Yields seek position for each line.

    Args:
        f (file): the file object

    """
    yield 0
    for line in f:
        yield f.tell()


def generate_index(fn, cols=None, names=None, sep=" "):
    """Build a index for the given file.

    Args:
        fn (str): the name of the file
        cols (list): a list containing column to keep (as int)
        names (list): the name corresponding to the column to keep (as str)
        sep (str): the field separator

    Returns:
        pandas.DataFrame: the index

    """
    logging.info("Generating index for '{}'".format(fn))

    # Some assertions
    assert cols is not None, "'cols' was not set"
    assert names is not None, "'names' was not set"
    assert len(cols) == len(names)

    # Getting the open function
    bgzip, open_func = get_open_func(fn, return_fmt=True)

    # Reading the required columns
    data = pd.read_csv(fn, sep=sep, engine="c", usecols=cols, names=names,
                       compression="gzip" if bgzip else None)

    # Getting the seek information
    f = open_func(fn, "rb")
    data["seek"] = np.fromiter(_seek_generator(f), dtype=np.uint)[:-1]
    f.close()

    # Saving the index to file
    write_index(get_index_fn(fn), data)

    return data


[docs]def get_open_func(fn, return_fmt=False): """Get the opening function. Args: fn (str): the name of the file return_fmt (bool): if the file format needs to be returned Returns: tuple: either a tuple containing two elements: a boolean telling if the format is bgzip, and the opening function. """ # The file might be compressed using bgzip bgzip = None with open(fn, "rb") as i_file: bgzip = i_file.read(3) == b"\x1f\x8b\x08" if bgzip and not HAS_BIOPYTHON: raise GenipeError("needs BioPython to index a bgzip file") open_func = open if bgzip: open_func = BgzfReader # Trying to read try: with open_func(fn, "r") as i_file: if bgzip: if not i_file.seekable(): raise ValueError pass except ValueError: raise GenipeError("{}: use bgzip for compression...".format(fn)) if return_fmt: return bgzip, open_func return open_func
[docs]def get_index(fn, cols, names, sep): """Restores the index for a given file. Args: fn (str): the name of the file cols (list): a list containing column to keep (as int) names (list): the name corresponding to the column to keep (as str) sep (str): the field separator Returns: pandas.DataFrame: the index If the index doesn't exist for the file, it is first created. """ if not has_index(fn): # The index doesn't exists, generate it return generate_index(fn, cols, names, sep) # Retrieving the index logging.info("Retrieving the index for '{}'".format(fn)) file_index = read_index(get_index_fn(fn)) # Checking the names are there if len(set(names) - (set(file_index.columns) - {'seek'})) != 0: raise GenipeError("{}: missing index columns: reindex".format(fn)) if "seek" not in file_index.columns: raise GenipeError("{}: invalid index: reindex".format(fn)) return file_index
def write_index(fn, index): """Writes the index to file. Args: fn (str): the name of the file that will contain the index index (pandas.DataFrame): the index """ with open(fn, "wb") as o_file: o_file.write(_CHECK_STRING) o_file.write(zlib.compress(bytes( index.to_csv(None, index=False, encoding="utf-8"), encoding="utf-8", ))) def read_index(fn): """Reads index from file. Args: fn (str): the name of the file containing the index Returns: pandas.DataFrame: the index of the file Before reading the index, we check the first couple of bytes to see if it is a valid index file. """ index = None with open(fn, "rb") as i_file: if i_file.read(len(_CHECK_STRING)) != _CHECK_STRING: raise GenipeError("{}: not a valid index file".format(fn)) index = pd.read_csv(io.StringIO( zlib.decompress(i_file.read()).decode(encoding="utf-8"), )) return index def get_index_fn(fn): """Generates the index filename from the path to the indexed file. Args: fn (str): the name of the file for which we want an index Returns: str: the name of the file containing the index """ return os.path.abspath("{}.idx".format(fn)) def has_index(fn): """Checks if the index exists, if not, create it. Args: fn (str): the name of the file for which we want the index Returns: bool: ``True`` if the file contains an index, ``False`` otherwise """ return os.path.isfile(get_index_fn(fn))