Source code for cortexpy.graph.parser.random_access

"""Random access of Cortex graphs
===================================

This module contains classes for inspecting Cortex graphs with random access to their kmers.
"""
from bisect import bisect_left
from collections.abc import Sequence, Mapping
from functools import lru_cache
from io import SEEK_END

import attr
import numpy as np

import cortexpy.graph.cortex
import cortexpy.graph.parser.header
from cortexpy.utils import lexlo
from .constants import UINT64_T
from .kmer import (
    Kmer, KmerData, KmerUintComparator,
    StringKmerConverter,
)
from .streaming import (
    kmer_generator_from_stream_and_header,
    kmer_string_generator_from_stream_and_header,
)


@attr.s(slots=True)
class SlurpedRandomAccess(Mapping):
    header = attr.ib()
    kmer_dict = attr.ib(attr.Factory(dict))

    @classmethod
    def from_handle(cls, graph_handle, kmer_cache_size=None):
        """
        Slurp the whole mccortex file and serve in O(1) time complexity.
        kmer_cache_size is ignored
        """
        header = cortexpy.graph.parser.header.Header.from_stream(graph_handle)
        kmer_dict = {k.kmer: k for k in kmer_generator_from_stream_and_header(graph_handle, header)}
        return cls(header, kmer_dict)

    def __getitem__(self, item):
        return self.kmer_dict[item]

    def __len__(self):
        return len(self.kmer_dict)

    def __iter__(self):
        return iter(self.kmer_dict)

    def get_kmer_for_string(self, string):
        """Will compute the revcomp of kmer string before getting a kmer"""
        return self[lexlo(string)]

    @property
    def num_colors(self):
        return self.header.num_colors

    @property
    def colors(self):
        return self.header.colors

    @property
    def sample_names(self):
        return self.header.sample_names

    @property
    def kmer_size(self):
        return self.header.kmer_size


[docs]@attr.s(slots=True, repr=False) class RandomAccess(Mapping): """Provide fast k-mer access to Cortex graph in log(n) time (n = number of kmers in graph)""" graph_handle = attr.ib() kmer_cache_size = attr.ib(None) header = attr.ib(init=False) graph_sequence = attr.ib(init=False) graph_kmer_sequence = attr.ib(init=False) n_records = attr.ib(init=False) _cached_get_uints_index_for_string = attr.ib(init=False) def __attrs_post_init__(self): assert self.graph_handle.seekable() self.graph_handle.seek(0) self.header = cortexpy.graph.parser.header.Header.from_stream(self.graph_handle) body_start_stream_position = self.graph_handle.tell() self.graph_handle.seek(0, SEEK_END) body_size = self.graph_handle.tell() - body_start_stream_position if body_size % self.header.record_size != 0: raise ValueError( "Body size ({}) % Record size ({}) != 0".format(body_size, self.header.record_size)) self.n_records = body_size // self.header.record_size if self.kmer_cache_size is None: self.kmer_cache_size = self.n_records self.graph_sequence = KmerRecordSequence(graph_handle=self.graph_handle, body_start=body_start_stream_position, header=self.header, n_records=self.n_records) self.graph_kmer_sequence = KmerUintSequence( graph_handle=self.graph_handle, body_start=body_start_stream_position, header=self.header, n_records=self.n_records ) self._cached_get_uints_index_for_string = lru_cache(maxsize=self.kmer_cache_size)( self._get_uints_and_index_for_string) def _get_uints_and_index_for_string(self, kmer_string): uints = self.graph_kmer_sequence.kmer_string_converter.to_uints(kmer_string) index = self.graph_kmer_sequence.index_uint_vector(uints) return uints, index def _get_kmer_data_for_string(self, lexlo_string): uints, index = self._cached_get_uints_index_for_string(lexlo_string) if index < self.n_records: if KmerUintComparator(uints) == self.graph_kmer_sequence[index]: kmer_data = self.graph_sequence[index] kmer_data._kmer = lexlo_string return kmer_data raise KeyError('Could not retrieve kmer: ' + lexlo_string)
[docs] def __getitem__(self, lexlo_string): """Return kmer associated with kmer string No check is performed to make sure that the input string is a lexicographically-lowest kmer string. Use :py:func:`get_kmer_for_string` in order to convert a kmer string to its lexlo form before retrieving it from the cortex object. """ return Kmer.from_kmer_data(self._get_kmer_data_for_string(lexlo_string))
def __len__(self): return max(0, self.n_records)
[docs] def __iter__(self): """Iterate over kmer strings in graph in order stored in graph""" self.graph_handle.seek(self.graph_sequence.body_start) return kmer_string_generator_from_stream_and_header(self.graph_handle, self.header)
[docs] def items(self): """Iterate over kmer strings and kmers in graph in order stored in graph""" self.graph_handle.seek(self.graph_sequence.body_start) return ((k.kmer, k) for k in kmer_generator_from_stream_and_header(self.graph_handle, self.header))
[docs] def values(self): """Iterate over kmers in cortex graph""" self.graph_handle.seek(self.graph_sequence.body_start) return kmer_generator_from_stream_and_header(self.graph_handle, self.header)
[docs] def get_kmer_for_string(self, string): """Will compute the revcomp of kmer string before getting a kmer""" return self[lexlo(string)]
@property def num_colors(self): return self.header.num_colors @property def colors(self): return self.header.colors @property def sample_names(self): return self.header.sample_names @property def kmer_size(self): return self.header.kmer_size
@attr.s(slots=True) class KmerRecordSequence(Sequence): graph_handle = attr.ib() header = attr.ib() body_start = attr.ib() n_records = attr.ib() record_size = attr.ib(init=False) num_colors = attr.ib(init=False) kmer_size = attr.ib(init=False) kmer_container_size = attr.ib(init=False) def __attrs_post_init__(self): self.record_size = self.header.record_size self.kmer_size = self.header.kmer_size self.num_colors = self.header.num_colors self.kmer_container_size = self.header.kmer_container_size def __getitem__(self, item): if item >= self.n_records or item < 0: raise IndexError("Index ({}) is out of range".format(item)) return self._get_kmer_data_for_item(item) def __len__(self): return max(0, self.n_records) def _get_kmer_data_for_item(self, item): self.graph_handle.seek(self.body_start + self.record_size * item) kmer_bytes = self.graph_handle.read(self.record_size) return KmerData( kmer_bytes, kmer_size=self.kmer_size, num_colors=self.num_colors, ) @attr.s(slots=True) class KmerUintSequence(Sequence): graph_handle = attr.ib() header = attr.ib() body_start = attr.ib() n_records = attr.ib() record_size = attr.ib(init=False) kmer_container_size = attr.ib(init=False) kmer_string_converter = attr.ib(init=False) def __attrs_post_init__(self): self.record_size = self.header.record_size self.kmer_container_size = self.header.kmer_container_size self.kmer_string_converter = StringKmerConverter(self.header.kmer_size) def __getitem__(self, item): if item >= self.n_records or item < 0: raise IndexError("Index ({}) is out of range".format(item)) kmer_uints = self._get_kmer_data_for_item(item) return KmerUintComparator(kmer_uints=kmer_uints) def __len__(self): return max(0, self.n_records) def _get_kmer_data_for_item(self, item): self.graph_handle.seek(self.body_start + self.record_size * item) kmer_bytes = self.graph_handle.read(self.kmer_container_size * UINT64_T) return np.frombuffer(kmer_bytes, dtype='<u8') def index_kmer_string(self, kmer_string): uints = self.kmer_string_converter.to_uints(kmer_string) return self.index_uint_vector(uints) def index_uint_vector(self, uints): return bisect_left(self, KmerUintComparator(uints)) def load_ra_cortex_graph(file_handle, ra_parser_args=None): if ra_parser_args is None: ra_parser_args = {} ra_parser = RandomAccess(file_handle, **ra_parser_args) return cortexpy.graph.cortex.build_cortex_graph( sample_names=ra_parser.sample_names, kmer_size=ra_parser.kmer_size, num_colors=ra_parser.num_colors, colors=ra_parser.colors, kmer_mapping=cortexpy.graph.cortex.CortexGraphMapping(ra_parser) )