"""Representing Cortex graphs as :py:class:`nx.Graph` objects
=============================================================
This module contains classes for representing Cortex graphs as
objects that are compatible with :py:mod:`networkx` algorithms.
todo: Simplify the Graph implementations
"""
import itertools
from collections.abc import Collection, Mapping, MutableMapping
import attr
import networkx as nx
from cortexpy.constants import EdgeTraversalOrientation
from cortexpy.graph.parser.kmer import find_all_neighbors, disconnect_kmers
from cortexpy.utils import lexlo
def build_cortex_graph_from_header(header, **kwargs):
return build_cortex_graph(sample_names=header.sample_names,
kmer_size=header.kmer_size,
num_colors=header.num_colors,
colors=header.colors,
**kwargs)
def build_empty_cortex_graph_from_ra_parser(ra_parser):
return build_cortex_graph(sample_names=ra_parser.sample_names,
kmer_size=ra_parser.kmer_size,
num_colors=ra_parser.num_colors,
colors=ra_parser.colors)
[docs]def build_cortex_graph(*, sample_names, kmer_size, num_colors, colors, kmer_generator=None,
kmer_mapping=None):
"""Colored de Bruijn graph constructor"""
assert (kmer_generator is None) or (kmer_mapping is None)
if kmer_generator is not None:
graph = CortexDiGraph({k.kmer: k for k in kmer_generator})
elif kmer_mapping is not None:
graph = CortexDiGraph(kmer_mapping)
else:
graph = CortexDiGraph()
graph.graph['sample_names'] = sample_names
graph.graph['kmer_size'] = kmer_size
graph.graph['num_colors'] = num_colors
graph.graph['colors'] = colors
return graph
[docs]@attr.s(slots=True)
class CortexGraphMapping(MutableMapping):
"""Create a dict-like kmer mapping from a RandomAccess parser (ra_parser)
The exclusion set tracks kmers deleted from the ra_parser.
The new_kmers track kmers that have been added to the mapping.
Kmers that exist in both new_kmers and ra_parser are considered overwritten. The kmers in
new_kmers have precedence.
"""
ra_parser = attr.ib()
_exclusion_set = attr.ib(attr.Factory(set))
_new_kmers = attr.ib(attr.Factory(dict))
_n_duplicates = attr.ib(0)
def __attrs_post_init__(self):
if isinstance(self.ra_parser, type(self)):
self._exclusion_set = self.ra_parser._exclusion_set
self.ra_parser = self.ra_parser.ra_parser
def __getitem__(self, key):
lexlo_key = lexlo(key)
if lexlo_key in self._exclusion_set:
raise KeyError
if lexlo_key in self._new_kmers:
return self._new_kmers[lexlo_key]
return self.ra_parser[lexlo_key]
def __setitem__(self, key, value):
lexlo_key = lexlo(key)
if lexlo_key in self._exclusion_set:
self._exclusion_set.discard(lexlo_key)
if lexlo_key in self.ra_parser and lexlo_key not in self._new_kmers:
self._n_duplicates += 1
self._new_kmers[lexlo_key] = value
def __delitem__(self, item):
lexlo_string = lexlo(item)
if lexlo_string in self._exclusion_set:
raise KeyError
in_new_kmers = lexlo_string in self._new_kmers
in_ra_parser = lexlo_string in self.ra_parser
if in_new_kmers:
del self._new_kmers[lexlo_string]
if in_ra_parser:
self._exclusion_set.add(lexlo_string)
if in_new_kmers and in_ra_parser:
self._n_duplicates -= 1
def __iter__(self):
for kmer_string in self._new_kmers:
yield kmer_string
for kmer_string in self.ra_parser:
if kmer_string not in self._exclusion_set and kmer_string not in self._new_kmers:
yield kmer_string
def __len__(self):
return len(self.ra_parser) + len(self._new_kmers) - len(
self._exclusion_set) - self._n_duplicates
[docs] def disconnect_kmers(self, first, second, colors):
"""Disconnect two kmers"""
are_neighbors = False
for ref_kmer, flip_kmer, ref_letter, flip_letter in find_all_neighbors(first, second):
are_neighbors = True
if colors:
self[ref_kmer.kmer] = ref_kmer
self[flip_kmer.kmer] = flip_kmer
for color in colors:
ref_kmer.edges[color].remove_edge(ref_letter)
flip_kmer.edges[color].remove_edge(flip_letter)
if not are_neighbors:
raise ValueError(
'first kmer ({}) cannot be connected to second kmer ({})'.format(first.kmer,
second.kmer)
)
[docs] def connect_kmers(self, first, second, color, identical_kmer_check=True):
"""Connect two kmers"""
if identical_kmer_check and first == second and first is not second:
raise ValueError('Kmers are equal, but not the same object')
are_neighbors = False
for ref_kmer, flip_kmer, ref_letter, flip_letter in find_all_neighbors(first, second):
are_neighbors = True
self[ref_kmer.kmer] = ref_kmer
self[flip_kmer.kmer] = flip_kmer
ref_kmer.edges[color].add_edge(ref_letter)
flip_kmer.edges[color].add_edge(flip_letter)
if not are_neighbors:
raise ValueError(
'first kmer ({}) cannot be connected to second kmer ({})'.format(first.kmer,
second.kmer)
)
[docs]@attr.s(slots=True)
class CortexDiGraph(Collection):
"""Stores cortex k-mers and conforms to parts of the interface of :py:class:`nx.MultiDiGraph`"""
_kmer_mapping = attr.ib(attr.Factory(dict))
graph = attr.ib(attr.Factory(dict))
def __attrs_post_init__(self):
# todo: implement .from_ra_parser class method as described in
# http://www.attrs.org/en/stable/init.html
if isinstance(self._kmer_mapping, type(self)):
self.graph = self._kmer_mapping.graph
self._kmer_mapping = self._kmer_mapping._kmer_mapping
else:
self._kmer_mapping = CortexGraphMapping(self._kmer_mapping)
@property
def nodes(self):
return NodeView(self._kmer_mapping)
@property
def node(self):
return NodeView(self._kmer_mapping)
def __len__(self):
return len(self._kmer_mapping)
def __iter__(self):
return iter(self._kmer_mapping)
def __contains__(self, item):
return item in self._kmer_mapping.keys()
def __getitem__(self, item):
return self.succ[item]
def add_node(self, kmer_string, *, kmer):
self._kmer_mapping[kmer_string] = kmer
def add_nodes_from(self, node_iterable):
for node in node_iterable:
self.add_node(node[0], kmer=node[1])
def is_multigraph(self):
return True
def is_directed(self):
return True
def is_consistent(self):
return False
@property
def edges(self):
return EdgeView(self)
def out_edges(self, node, keys=False, default=None, data=None):
kmer = self.node[node]
is_lexlo = kmer.kmer == node
for color in kmer.colors:
for out_node in kmer.edges[color].get_outgoing_kmer_strings(node, is_lexlo=is_lexlo):
if keys:
yield (node, out_node, color)
else:
yield (node, out_node)
def in_edges(self, node, keys=False, default=None, data=None):
kmer = self.node[node]
is_lexlo = kmer.kmer == node
for color in kmer.colors:
for in_node in kmer.edges[color].get_incoming_kmer_strings(node, is_lexlo=is_lexlo):
if keys:
yield (in_node, node, color)
else:
yield (in_node, node)
def out_degree(self, node):
return len(list(self.out_edges(node)))
def in_degree(self, node):
return len(list(self.in_edges(node)))
[docs] def add_edge(self, first, second, *, key):
"""Note: edges can only be added to existing nodes"""
first_kmer = self.node[first]
second_kmer = self.node[second]
self._kmer_mapping.connect_kmers(first_kmer, second_kmer, color=key)
@property
def succ(self):
return MultiAdjacencyView(self._kmer_mapping, EdgeTraversalOrientation.original)
@property
def pred(self):
return MultiAdjacencyView(self._kmer_mapping, EdgeTraversalOrientation.reverse)
def remove_node(self, node):
try:
node_kmer = self.node[node]
except KeyError:
return
for succ in self.succ[node]:
succ_kmer = self.node[succ]
self._kmer_mapping.disconnect_kmers(node_kmer, succ_kmer, node_kmer.colors)
for pred in self.pred[node]:
pred_kmer = self.node[pred]
self._kmer_mapping.disconnect_kmers(node_kmer, pred_kmer, node_kmer.colors)
del self._kmer_mapping[node]
def remove_nodes_from(self, nodes):
for node in nodes:
self.remove_node(node)
[docs] def nbunch_iter(self, nbunch=None):
"""Return an iterator over nodes contained in nbunch that are
also in the graph.
This code has been copied from :py:mod:`networkx`.
The nodes in nbunch are checked for membership in the graph
and if not are silently ignored.
Parameters
----------
nbunch : single node, container, or all nodes (default= all nodes)
The view will only report edges incident to these nodes.
Returns
-------
niter : iterator
An iterator over nodes in nbunch that are also in the graph.
If nbunch is None, iterate over all nodes in the graph.
Raises
------
NetworkXError
If nbunch is not a node or or sequence of nodes.
If a node in nbunch is not hashable.
See Also
--------
Graph.__iter__
Notes
-----
When nbunch is an iterator, the returned iterator yields values
directly from nbunch, becoming exhausted when nbunch is exhausted.
To test whether nbunch is a single node, one can use
"if nbunch in self:", even after processing with this routine.
If nbunch is not a node or a (possibly empty) sequence/iterator
or None, a :exc:`NetworkXError` is raised. Also, if any object in
nbunch is not hashable, a :exc:`NetworkXError` is raised.
Licence
-------
This method was copied from Networkx version 2.1 and then modified
"""
if nbunch is None: # include all nodes via iterator
bunch = iter(self._adj)
elif nbunch in self: # if nbunch is a single node
bunch = iter([nbunch])
else: # if nbunch is a sequence of nodes
def bunch_iter(nlist, adj):
try:
for n in nlist:
if n in adj:
yield n
except TypeError as e:
message = e.args[0]
# capture error for non-sequence/iterator nbunch.
if 'iter' in message:
msg = "nbunch is not a node or a sequence of nodes."
raise nx.NetworkXError(msg)
# capture error for unhashable node.
elif 'hashable' in message:
msg = "Node {} in sequence nbunch is not a valid node."
raise nx.NetworkXError(msg.format(n))
else:
raise
bunch = bunch_iter(nbunch, self._adj)
return bunch
[docs]@attr.s(slots=True)
class ConsistentCortexDiGraph(Collection):
"""Graph that stores kmer strings that are consistent with each other"""
_kmer_mapping = attr.ib(attr.Factory(dict))
graph = attr.ib(attr.Factory(dict)) # refers to graph attribute of nx.Graph
def __attrs_post_init__(self):
if isinstance(self._kmer_mapping, (CortexDiGraph, ConsistentCortexDiGraph)):
self.graph = self._kmer_mapping.graph
if isinstance(self._kmer_mapping, ConsistentCortexDiGraph):
self._kmer_mapping = self._kmer_mapping._kmer_mapping
def is_consistent(self):
return True
def is_directed(self):
return True
def is_multigraph(self):
return True
def __contains__(self, item):
return item in self._kmer_mapping
def __iter__(self):
yield from self._kmer_mapping
def __len__(self):
return len(self._kmer_mapping)
def __getitem__(self, item):
return self.succ[item]
@property
def succ(self):
return MultiAdjacencyView(self._kmer_mapping,
EdgeTraversalOrientation.original,
return_lexlo_kmers=False)
@property
def pred(self):
return MultiAdjacencyView(self._kmer_mapping,
EdgeTraversalOrientation.reverse,
return_lexlo_kmers=False)
@property
def nodes(self):
return NodeView(self._kmer_mapping)
@property
def node(self):
return NodeView(self._kmer_mapping)
def add_node(self, kmer_string, *, kmer):
self._kmer_mapping[kmer_string] = kmer
def remove_edge(self, k, v, key):
disconnect_kmers(self._kmer_mapping[k], self._kmer_mapping[v], [key])
@property
def edges(self):
return EdgeView(self)
def out_degree(self, node):
return len(list(self.out_edges(node)))
def in_degree(self, node):
return len(list(self.in_edges(node)))
def out_edges(self, node, keys=False, default=None, data=None):
kmer = self._kmer_mapping[node]
is_lexlo = kmer.kmer == node
for color in kmer.colors:
for out_node in kmer.edges[color].get_outgoing_kmer_strings(node, is_lexlo=is_lexlo):
if keys:
yield (node, out_node, color)
else:
yield (node, out_node)
def in_edges(self, node, keys=False, default=None, data=None):
kmer = self.node[node]
is_lexlo = kmer.kmer == node
for color in kmer.colors:
for in_node in kmer.edges[color].get_incoming_kmer_strings(node, is_lexlo=is_lexlo):
if keys:
yield (in_node, node, color)
else:
yield (in_node, node)
@attr.s(slots=True)
class NodeView(Collection):
_nodes = attr.ib()
def __call__(self, *args, data=False, **kwargs):
if data:
yield from self._nodes.items()
else:
yield from self._nodes.keys()
def __len__(self):
return len(self._nodes)
def __iter__(self):
return iter(self._nodes)
def __contains__(self, item):
return item in self._nodes
def __getitem__(self, item):
return self._nodes[item]
[docs]def get_canonical_edge(first, second):
"""Get canonical edge.
Canonical edges are between lexlo kmers and are ordered lexicographically
Return canonical edge, if the first and second nodes were lexlo"""
lexlo_first = lexlo(first)
lexlo_second = lexlo(second)
flip_first_second = lexlo_second < lexlo_first
if flip_first_second:
lexlo_second, lexlo_first = lexlo_first, lexlo_second
return lexlo_first, lexlo_second, flip_first_second
@attr.s(slots=True)
class EdgeView(object):
graph = attr.ib()
def __call__(self, *args, data=False, keys=False, **kwargs):
if not args:
if self.graph.is_consistent():
yield from self._edge_iter_consistent(data=data, keys=keys)
else:
yield from self._edge_iter(data=data, keys=keys)
else:
if self.graph.is_directed():
yield from self.graph.out_edges(args[0], data=data, keys=keys)
else:
yield from self._edge_iter(data=data, keys=keys)
def __len__(self):
return len(list(self()))
def __iter__(self):
return self()
def __contains__(self, item):
return item in self()
def _edge_iter(self, data=False, keys=False):
"""Non-consistent edges of graph. Only returns canonical edges"""
seen_self_edges = set()
for node, kmer in self.graph.nodes(data=True):
for color in kmer.colors:
edge = kmer.edges[color]
neighbors = itertools.chain(
edge.get_outgoing_kmers(kmer.kmer),
edge.get_incoming_kmers(kmer.kmer)
)
for neighbor in neighbors:
if neighbor not in self.graph:
continue
if neighbor < kmer.kmer:
continue
if neighbor == kmer.kmer:
if neighbor in seen_self_edges:
continue
else:
seen_self_edges.add(neighbor)
ret = [node, neighbor]
if keys:
ret.append(color)
if data:
ret.append({})
yield tuple(ret)
def _edge_iter_consistent(self, data=False, keys=False):
for node, kmer in self.graph.nodes(data=True):
is_lexlo = node == kmer.kmer
for color in kmer.colors:
for kmer_string in kmer.edges[color].get_outgoing_kmer_strings(node,
is_lexlo=is_lexlo):
if kmer_string in self.graph.nodes:
ret = [node, kmer_string]
if keys:
ret.append(color)
if data:
ret.append({})
yield tuple(ret)
@attr.s(slots=True)
class AdjancencyView(object):
_nodes = attr.ib()
kmer = attr.ib()
query = attr.ib()
orientation = attr.ib()
return_lexlo_kmers = attr.ib(True)
@property
def colors(self):
return self.kmer.colors
def __iter__(self):
edge_kmers = set()
query_is_lexlo = self.kmer.kmer == self.query
for color in self.colors:
if self.orientation == EdgeTraversalOrientation.original:
node_iter = self.kmer.edges[color].get_outgoing_kmer_strings(
self.query, is_lexlo=query_is_lexlo
)
else:
node_iter = self.kmer.edges[color].get_incoming_kmer_strings(
self.query, is_lexlo=query_is_lexlo
)
for out_node in node_iter:
if self.return_lexlo_kmers:
out_node = lexlo(out_node)
edge_kmers.add(out_node)
return iter(edge_kmers)
def __getitem__(self, item):
other = self._nodes[item]
edge_colors = set()
if self.orientation == EdgeTraversalOrientation.original:
edge_func = self.kmer.has_outgoing_edge_to_kmer_in_color
else:
edge_func = self.kmer.has_incoming_edge_from_kmer_in_color
try:
for color in self.colors:
if edge_func(other, color):
edge_colors.add(color)
except ValueError:
pass
return edge_colors
@attr.s(slots=True)
class MultiAdjacencyView(object):
_nodes = attr.ib()
orientation = attr.ib()
return_lexlo_kmers = attr.ib(True)
def __getitem__(self, item):
kmer = self._nodes[item]
return AdjancencyView(self._nodes,
kmer,
query=item,
orientation=self.orientation,
return_lexlo_kmers=self.return_lexlo_kmers)
@attr.s(slots=True)
class DictView(Mapping):
base_dict = attr.ib()
allowed_keys = attr.ib(None)
def __getitem__(self, item):
if item in self.allowed_keys:
return self.base_dict[item]
def __iter__(self):
for key, val in self.base_dict.items():
if key in self.allowed_keys:
yield key, val
def __len__(self):
num_overlap = len(self.allowed_keys & self.base_dict.keys())
return len(self.base_dict) - num_overlap
def __copy__(self):
return {k: v for k, v in self}