"""Cortex kmers
===============
This module provides classes and functions for working with Cortex kmers.
"""
import math
import struct
from itertools import repeat
import attr
import numpy as np
import cortexpy.edge_set
from cortexpy.utils import revcomp, lexlo
from cortexpy.graph.parser.constants import (
UINT64_T, UINT32_T, LETTER_TO_NUM,
NUM_LETTERS_PER_UINT, NUM_TO_BITS,
)
from cortexpy.graph.parser.kmer_ext import raw_kmer_to_string, raw_edges_to_list, raw_to_coverage
def check_kmer_string(kmer_string):
if len(kmer_string) % 2 == 0:
raise ValueError('kmer_string needs to be odd length')
if len(kmer_string) < 3:
raise ValueError('kmer_string needs to length 3 or more')
@attr.s(slots=True)
class EmptyKmerBuilder(object):
num_colors = attr.ib(1)
default_coverage = attr.ib(1)
_seen_kmers = attr.ib(attr.Factory(dict))
@num_colors.validator # noqa
def not_less_than_zero(self, attribute, value): # noqa
if value < 0:
raise ValueError('value less than zero')
def _build_from_lexlo(self, kmer_string):
"""Build empty kmer from a lexicographically lowest string"""
return Kmer.from_kmer_data(EmptyKmer(kmer=kmer_string,
coverage=tuple(
repeat(self.default_coverage, self.num_colors)),
kmer_size=len(kmer_string),
num_colors=self.num_colors))
def build(self, kmer_string):
"""Build empty kmer from a kmer string"""
check_kmer_string(kmer_string)
kmer_string_to_use = lexlo(kmer_string)
return self._build_from_lexlo(kmer_string_to_use)
def build_or_get(self, kmer_string):
"""Build empty kmer or return a cached kmer for a kmer string"""
check_kmer_string(kmer_string)
kmer_string_to_use = lexlo(kmer_string)
if kmer_string_to_use in self._seen_kmers.keys():
return self._seen_kmers[kmer_string_to_use]
kmer = self._build_from_lexlo(kmer_string_to_use)
self._seen_kmers[kmer_string_to_use] = kmer
return kmer
def revcomp_target_to_match_ref(target, ref, rc_is_after_reference_kmer=True):
target_revcomp = revcomp(target)
if rc_is_after_reference_kmer:
ref_core = ref[1:]
flip_core = target[:-1]
flip_revcomp_core = target_revcomp[:-1]
else:
ref_core = ref[:-1]
flip_core = target[1:]
flip_revcomp_core = target_revcomp[1:]
if ref_core == flip_core:
return target, False
elif ref_core == flip_revcomp_core:
return target_revcomp, True
else:
raise ValueError(
"ref_core ({}) does not match"
" flip_core ({}) or flip_revcomp_core ({})".format(ref_core,
flip_core,
flip_revcomp_core))
def find_neighbors(first, second, *, rc_is_after_reference_kmer, revcomp_second):
if revcomp_second:
revcomp_kmer, ref_kmer = second, first
else:
revcomp_kmer, ref_kmer = first, second
try:
revcomp_string, is_revcomp = revcomp_target_to_match_ref(
revcomp_kmer.kmer, ref_kmer.kmer,
rc_is_after_reference_kmer=rc_is_after_reference_kmer
)
except ValueError:
pass
else:
if rc_is_after_reference_kmer:
ref_letter = revcomp_string[-1]
flip_letter = ref_kmer.kmer[0].lower()
else:
ref_letter = revcomp_string[0].lower()
flip_letter = ref_kmer.kmer[-1]
if is_revcomp:
flip_letter = revcomp(flip_letter).swapcase()
yield ref_kmer, revcomp_kmer, ref_letter, flip_letter
[docs]def find_all_neighbors(first, second):
"""Return kmers and letters to get from first kmer to second"""
for rc_is_after_reference_kmer in [True, False]:
for revcomp_second in [True, False]:
yield from find_neighbors(first, second,
rc_is_after_reference_kmer=rc_is_after_reference_kmer,
revcomp_second=revcomp_second)
[docs]def connect_kmers(first, second, color, identical_kmer_check=True):
"""Connect two kmers"""
if identical_kmer_check and first == second and first is not second:
raise ValueError('Kmers are equal, but not the same object')
are_neighbors = False
for ref_kmer, flip_kmer, ref_letter, flip_letter in find_all_neighbors(first, second):
are_neighbors = True
ref_kmer.edges[color].add_edge(ref_letter)
flip_kmer.edges[color].add_edge(flip_letter)
if not are_neighbors:
raise ValueError(
'first kmer ({}) cannot be connected to second kmer ({})'.format(first.kmer,
second.kmer)
)
[docs]def disconnect_kmers(first, second, colors):
"""Disconnect two kmers"""
are_neighbors = False
for ref_kmer, flip_kmer, ref_letter, flip_letter in find_all_neighbors(first, second):
are_neighbors = True
for color in colors:
ref_kmer.edges[color].remove_edge(ref_letter)
flip_kmer.edges[color].remove_edge(flip_letter)
if not are_neighbors:
raise ValueError(
'first kmer ({}) cannot be connected to second kmer ({})'.format(first.kmer,
second.kmer)
)
def kmer_eq(self, other):
if any([self.kmer != other.kmer,
self.kmer_size != other.kmer_size,
self.num_colors != other.num_colors,
not np.all(self.coverage == other.coverage),
(self.edges is None) != (other.edges is None),
(self.edges is not None and not np.all(self.edges == other.edges))]):
return False
return True
@attr.s(slots=True, eq=False)
class EmptyKmer(object):
kmer = attr.ib()
coverage = attr.ib()
kmer_size = attr.ib()
num_colors = attr.ib()
edges = attr.ib(init=False)
def __attrs_post_init__(self):
self.edges = [cortexpy.edge_set.empty() for _ in range(len(self.coverage))]
def get_raw_kmer(self):
return StringKmerConverter(self.kmer_size).to_raw(self.kmer)
@attr.s(slots=True)
class RawKmerConverter(object):
kmer_size = attr.ib()
def to_string(self, raw_kmer):
return raw_kmer_to_string(self.kmer_size, raw_kmer)
def calc_kmer_container_size(kmer_size):
return math.ceil(kmer_size / NUM_LETTERS_PER_UINT)
[docs]@attr.s(slots=True)
class StringKmerConverter(object):
"""Converts kmer strings to various binary representations"""
kmer_size = attr.ib()
_kmer_container_size_in_uint64ts = attr.ib(init=False)
_padding_array = attr.ib(init=False)
def __attrs_post_init__(self):
self._kmer_container_size_in_uint64ts = calc_kmer_container_size(self.kmer_size)
padding_size = NUM_LETTERS_PER_UINT - self.kmer_size % NUM_LETTERS_PER_UINT
if padding_size == NUM_LETTERS_PER_UINT:
padding_size = 0
self._padding_array = np.zeros(padding_size, dtype=np.uint8)
[docs] def to_uints(self, kmer_string):
"""Converts kmer_string to big-endian uint64 array"""
kmer_string = str(kmer_string)
encoded_kmer_string = kmer_string.encode()
translated_kmer_string = encoded_kmer_string.translate(LETTER_TO_NUM)
letter_vals = np.frombuffer(translated_kmer_string, dtype=np.uint8)
letter_vals = np.concatenate((letter_vals[:-self.kmer_size],
self._padding_array,
letter_vals[-self.kmer_size:]))
letter_val_bits = NUM_TO_BITS[letter_vals]
return np.packbits(letter_val_bits).view('uint64').newbyteorder()
def to_raw(self, kmer_string):
uints = self.to_uints(kmer_string)
little_endian_uints = uints.astype('<u8')
return little_endian_uints.tobytes()
@attr.s(slots=True, eq=False)
class KmerData(object):
_data = attr.ib()
kmer_size = attr.ib()
num_colors = attr.ib()
_kmer = attr.ib(None, init=False)
_coverage = attr.ib(None, init=False)
_edges = attr.ib(None, init=False)
def get_raw_kmer(self):
return self._data[:self.kmer_container_size_in_uint64ts * UINT64_T]
@property
def kmer(self):
if self._kmer is None:
self._kmer = raw_kmer_to_string(self.kmer_size, self.get_raw_kmer())
return self._kmer
@property
def coverage(self):
if self._coverage is None:
self._coverage = raw_to_coverage(self._data,
self.kmer_container_size_in_uint64ts * UINT64_T,
self.num_colors)
return self._coverage
@coverage.setter
def coverage(self, val):
self._coverage = val
@property
def edges(self):
if self._edges is None:
start = (
self.kmer_container_size_in_uint64ts * UINT64_T + self.num_colors * UINT32_T
)
tuples = raw_edges_to_list(self._data[start:])
self._edges = [cortexpy.edge_set.EdgeSet(t) for t in tuples]
return self._edges
@edges.setter
def edges(self, val):
self._edges = val
@property
def kmer_container_size_in_uint64ts(self):
return calc_kmer_container_size(self.kmer_size)
[docs]@attr.s(slots=True, eq=False)
class Kmer:
"""Represents a Cortex kmer
This class wraps a kmer data object with attributes and methods for inspecting and manipulating
the underlying kmer data object."""
_kmer_data = attr.ib()
num_colors = attr.ib()
kmer_size = attr.ib()
_revcomp = attr.ib(None)
@num_colors.validator
def check(self, attribute, value):
if value < 1:
raise ValueError("num_colors must be greater than 0")
@classmethod
def from_kmer_data(cls, kmer_data):
return cls(kmer_data, kmer_size=kmer_data.kmer_size, num_colors=kmer_data.num_colors)
@property
def revcomp(self):
if self._revcomp is None:
self._revcomp = revcomp(self.kmer)
return self._revcomp
@property
def kmer_container_size(self):
return self._kmer_data.kmer_container_size_in_uint64ts
@property
def kmer(self):
return self._kmer_data.kmer
@property
def coverage(self):
return self._kmer_data.coverage
@coverage.setter
def coverage(self, value):
assert isinstance(value, tuple)
if len(value) != self.num_colors:
raise ValueError(f'coverage length ({value}) must match num_colors ({self.num_colors})')
assert any(c > 0 for c in value)
self._kmer_data.coverage = value
@property
def edges(self):
return self._kmer_data.edges
@edges.setter
def edges(self, val):
self._kmer_data.edges = val
[docs] def increment_color_coverage(self, color):
"""Increment the coverage of a color by one"""
num_colors = max(self.num_colors, color + 1)
coverage = list(self.coverage)
if num_colors > self.num_colors:
coverage += list(repeat(0, num_colors - self.num_colors))
for edge_idx in range(self.num_colors, num_colors):
assert len(self.edges) < edge_idx + 1
self.edges.append(cortexpy.edge_set.empty())
coverage[color] += 1
self.num_colors = num_colors
self.coverage = tuple(coverage)
def __eq__(self, other):
return kmer_eq(self, other)
def __str__(self):
string_parts = [self.kmer]
string_parts += [str(c) for c in self.coverage]
string_parts += [e.to_str() for e in self.edges]
return ' '.join(string_parts)
def __repr__(self):
return str(self)
@property
def colors(self):
return range(self.num_colors)
def has_outgoing_edge_to_kmer_in_color(self, other, color):
try:
_, _, ref_letter, flip_letter = next(find_neighbors(self, other,
revcomp_second=True,
rc_is_after_reference_kmer=True))
except StopIteration:
raise ValueError('Kmers ({}) and ({}) do not agree on connection'.format(self, other))
edge_set = self.edges[color]
if edge_set.is_edge(ref_letter) != other.edges[color].is_edge(flip_letter):
raise ValueError('Kmers ({}) and ({}) do not agree on connection'.format(self, other))
return edge_set.is_edge(ref_letter)
def has_incoming_edge_from_kmer_in_color(self, other, color):
try:
_, _, ref_letter, flip_letter = next(find_neighbors(self, other,
revcomp_second=True,
rc_is_after_reference_kmer=False))
except StopIteration:
raise ValueError('Kmers ({}) and ({}) do not agree on connection'.format(self, other))
edge_set = self.edges[color]
if edge_set.is_edge(ref_letter) != other.edges[color].is_edge(flip_letter):
raise ValueError('Kmers do not agree on connection')
return edge_set.is_edge(ref_letter)
def get_raw_kmer(self):
return self._kmer_data.get_raw_kmer()
def dump(self, buffer):
if self.kmer == self._kmer_data.kmer:
buffer.write(self.get_raw_kmer())
else:
raise NotImplementedError
for cov in self.coverage:
buffer.write(struct.pack('I', cov))
for edge_set in self.edges:
edge_set.dump(buffer)
def __getitem__(self, item):
"""Allow kmers to be used as node objects in networkx graphs"""
if item == 'kmer':
return self
raise KeyError
@attr.s(slots=True, eq=False)
class KmerByStringComparator(object):
kmer = attr.ib(None)
kmer_object = attr.ib(None)
def __attrs_post_init__(self):
if self.kmer is None:
self.kmer = self.kmer_object.kmer
def __eq__(self, other):
return self.kmer == other.kmer
def __lt__(self, other):
return self.kmer < other.kmer
@attr.s(slots=True, eq=False)
class KmerUintComparator(object):
kmer_uints = attr.ib()
def __lt__(self, other):
for i, val in enumerate(self.kmer_uints):
if val < other.kmer_uints[i]:
return True
elif val != other.kmer_uints[i]:
return False
return False
def __eq__(self, other):
return np.all(self.kmer_uints == other.kmer_uints)
def __len__(self):
return len(self.kmer_uints)