Source code for cortexpy.links

"""Link-informed graph traversal
================================

This module provides classes for parsing Mccortex link files and for traversing graphs while using
links.
"""
import copy
import json
from collections import defaultdict
from collections.abc import Sequence
from enum import Enum
from logging import getLogger

import attr

from cortexpy.utils import lexlo

logger = getLogger('cortexpy.links')


[docs]@attr.s(slots=True) class LinkedGraphTraverser(Sequence): """Adapter for linked walkers to be able to work with :py:func:`nx.all_simple_paths`""" graph = attr.ib() walkers = attr.ib(attr.Factory(dict)) @classmethod def from_graph_and_link_walker(cls, graph, link_walker): return cls(graph, {link_walker.current_unitig: link_walker}) def __contains__(self, item): return item in self.graph def __iter__(self): pass def __len__(self): pass
[docs] def __getitem__(self, item): """Get the children of :py:obj:`item` according to the walker object associated with :py:obj:`item` Warning: This scheme only works with depth-first search. """ parent_walker = self.walkers[item] successors = list(parent_walker.successors()) if len(successors) == 0: return [] child_walkers = [copy.copy(parent_walker) for _ in range(len(successors))] children = [] for succ, walker in zip(successors, child_walkers): walker.choose(succ) children.append(walker.current_unitig) self.walkers[children[-1]] = walker return children
@classmethod def is_multigraph(cls): return False
[docs]@attr.s(slots=True) class UnitigLinkWalker: """Traverses a unitig graph with links""" link_walker = attr.ib() unitigs = attr.ib() kmer_size = attr.ib() current_unitig = attr.ib() @classmethod def from_links_unitigs_kmer_size_unitig(cls, links, unitigs, kmer_size, unitig): obj = cls(LinkWalker.from_links(links), unitigs, kmer_size, unitig) logger.debug('Creating UnitigWalker with unitig: %s', unitigs.nodes[obj.current_unitig]) obj.link_walker.load_kmer(obj._current_unitig_right_kmer()) return obj
[docs] def successors(self): """Returns nodes from links or all available junctions if no link info exists""" successors = list(self.unitigs.successors(self.current_unitig)) if len(successors) < 2: return successors j_unitigs = list(self.link_successors()) if len(j_unitigs) != 0: return j_unitigs return successors
[docs] def choose(self, successor): """Register the choice of a successor and advance""" logger.debug('Choosing next unitig: %s', self.unitigs.nodes[successor]) next_unitigs = list(self.unitigs.successors(self.current_unitig)) assert successor in next_unitigs if len(next_unitigs) > 1: if next(self.link_successors(), None) is not None: self.link_walker.choose_branch(self._unitig_choice_base(successor)) self._advance_to_successor(successor) return self
def __copy__(self): return UnitigLinkWalker(copy.copy(self.link_walker), self.unitigs, self.kmer_size, self.current_unitig) def _advance_to_successor(self, successor): self.current_unitig = successor self.link_walker.load_kmer(self._current_unitig_right_kmer()) def _current_unitig_string(self): return self.unitigs.nodes[self.current_unitig]['unitig'] def _current_unitig_right_kmer(self): unitig_string = self._current_unitig_string() return unitig_string[(len(unitig_string) - self.kmer_size):] def _unitig_choice_base(self, unitig_id): return self.unitigs.nodes[unitig_id]['unitig'][self.kmer_size - 1]
[docs]@attr.s(slots=True) class LinkWalker: """Manages the loading and walking of links for kmers""" links = attr.ib() junctions = attr.ib() @classmethod def from_links(cls, links): junctions = defaultdict(list) return cls(links, junctions) @property def n_junctions(self): return sum(len(juncs) for juncs in self.junctions.values())
[docs] def load_kmer(self, kmer): """Load the link group for a kmer in the orientation of the kmer.""" lexlo_kmer = lexlo(kmer) is_lexlo = lexlo_kmer == kmer try: link_group = self.links.body[lexlo_kmer] logger.debug('Loaded link group for kmer %s: %s', kmer, link_group) except KeyError: pass else: for junc in link_group.get_link_junctions_in_kmer_orientation(is_lexlo): self.junctions[junc[0]].append(junc) return self
[docs] def choose_branch(self, base): """Choose a branch and advance all links. Keep only links consistent with branch.""" if base in self.junctions: junctions_to_traverse = self.junctions[base] self.clear() for junc in junctions_to_traverse: if len(junc) > 1: self.junctions[junc[1]].append(junc[1:]) return self raise KeyError('Invalid junction choice. Valid junction choices are: %s', self.junctions.keys())
[docs] def next_junction_bases(self): """Returns the the bases of the branches that can be chosen.""" return self.junctions.keys()
def clear(self): self.junctions.clear() return self def __copy__(self): return LinkWalker(self.links, copy.copy(self.junctions))
[docs]class LinkOrientation(Enum): F = 0 R = 1 @classmethod def other(cls, orientation): if orientation == cls.F: return cls.R else: return cls.F
@attr.s(slots=True) class Links: header = attr.ib() body = attr.ib() @classmethod def from_binary_stream(cls, stream): header = LinksHeader.from_binary_stream(stream) if header.json['graph']['num_colours'] != 1: raise NotImplementedError body = LinksBody.from_binary_stream(stream) return cls(header, body) @attr.s(slots=True) class LinksHeader: json = attr.ib() @classmethod def from_binary_stream(cls, stream): lines = [] bases = list(b'ACTG') last_line = False while not last_line: line = stream.readline() peek = stream.peek(1) # this is a really nasty way of determining when to stop reading but I've got nothing better -,- if len(peek) == 0 or peek[0] in bases: last_line = True if line.startswith((b'#', b'\n')): continue lines.append(line.decode().rstrip()) return cls(json.loads(''.join(lines))) @attr.s(slots=True) class LinksBody: @classmethod def from_binary_stream(cls, stream): body_dict = {} for group in link_groups(useful_lines(stream)): body_dict[group.kmer] = group return body_dict @attr.s(slots=True) class LinkGroup: kmer = attr.ib() coverage = attr.ib() link_lines = attr.ib(attr.Factory(list)) def get_link_junctions_in_kmer_orientation(self, is_lexlo): """kmer orientation is from the perspective of the potentially non-lexlo kmer""" if is_lexlo: orientation = LinkOrientation.F else: orientation = LinkOrientation.R for line in self.link_lines: if line.orientation != orientation: continue yield line.juncs def __str__(self): elements = [f'<{self.kmer} {self.coverage}: '] for line in self.link_lines: elements.append('|'.join([str(l) for l in self.link_lines])) elements.append('>') return ''.join(elements) @attr.s(slots=True) class LinkLine: orientation = attr.ib() num_juncs = attr.ib() juncs = attr.ib() counts = attr.ib(attr.Factory(list)) @classmethod def from_list(cls, fields): num_juncs = int(fields[1]) juncs = fields[3] assert len(juncs) == num_juncs return cls(orientation=LinkOrientation[fields[0]], num_juncs=num_juncs, counts=[int(fields[2])], juncs=juncs) def __str__(self): return f'{self.orientation.name} {self.num_juncs} ' + \ f'{",".join([str(c) for c in self.counts])} {self.juncs}' def link_groups(lines): lg = None for line in lines: line = line.decode() fields = line.rstrip().split() if 2 == len(fields): if lg is not None: yield lg lg = LinkGroup(fields[0], int(fields[1])) continue lg.link_lines.append(LinkLine.from_list(fields)) if lg is not None: yield lg def useful_lines(lines): for line in lines: if line == b'\n' or line.startswith(b'#'): continue yield line