Source code for cortexpy.graph.parser.header

"""Cortex graph headers
=======================

This module contains classes for parsing and representing a Cortex file header
"""

import struct
from struct import unpack

import attr

from cortexpy.graph.parser.constants import (
    CORTEX_MAGIC_WORD, CORTEX_VERSION, UINT8_T, UINT32_T,
    UINT64_T,
    ERROR_RATE_SIZE,
)


def none_or_greater_than_zero(_, attribute, value):
    if value is not None and value <= 0:
        raise ValueError("'{}' has to be greater than 0!".format(attribute.name))


def greater_than_zero(_, attribute, value):
    if value < 1:
        raise ValueError("'{}' has to be greater than 0!".format(attribute.name))


def odd(_, attribute, value):
    if value % 2 == 0:
        raise ValueError("'{}' has to be odd!".format(attribute.name))





@attr.s(slots=True)
class HeaderFromStreamBuilder(object):
    stream = attr.ib()
    header = attr.ib(attr.Factory(Header))

    def _extract_magic_word(self):
        return unpack('6c', self.stream.read(6))

    def extract_initial_magic_word(self):
        magic_word = self._extract_magic_word()
        if magic_word != CORTEX_MAGIC_WORD:
            raise ValueError(
                "Saw initial magic word {} but was expecting {}".format(magic_word,
                                                                        CORTEX_MAGIC_WORD)
            )
        return self

    def extract_concluding_magic_word(self):
        concluding_magic_word = self._extract_magic_word()
        if concluding_magic_word != CORTEX_MAGIC_WORD:
            raise ValueError(
                (
                    'Concluding magic word {} != starting magic word {}\n'
                    'Unparsed: {}\n'
                    'Parsed header: {}'
                ).format(
                    concluding_magic_word,
                    CORTEX_MAGIC_WORD,
                    self.stream.read(1000),
                    self.header
                )
            )
        return self.header

    def fill_first_four_params(self):
        params = unpack('4I', self.stream.read(16))
        self.header = attr.evolve(self.header,
                                  version=params[0],
                                  kmer_size=params[1],
                                  kmer_container_size=params[2],
                                  num_colors=params[3])
        return self

    def extract_mean_read_lengths(self):
        self.header = attr.evolve(
            self.header,
            mean_read_lengths=unpack(
                '{}I'.format(self.header.num_colors),
                self.stream.read(struct.calcsize('I') * self.header.num_colors)
            )
        )
        return self

    def extract_total_sequences(self):
        assert struct.calcsize('L') == UINT64_T
        self.header = attr.evolve(
            self.header,
            total_sequences=unpack(
                '{}L'.format(self.header.num_colors),
                self.stream.read(UINT64_T * self.header.num_colors)
            )
        )
        return self

    def extract_sample_names_from_stream(self):
        sample_names = []
        for _ in range(self.header.num_colors):
            sample_name_length_string = self.stream.read(struct.calcsize('I'))
            sample_name_length = unpack('I', sample_name_length_string)[0]
            sample_name = unpack('{}c'.format(sample_name_length),
                                 self.stream.read(sample_name_length))
            sample_names.append(b''.join(sample_name))
        self.header = attr.evolve(self.header, sample_names=tuple(sample_names))
        return self

    def extract_error_rates(self):
        error_rates = []
        for _ in range(self.header.num_colors):
            error_rates.append(self.stream.read(16))
        self.header = attr.evolve(self.header, error_rates=tuple(error_rates))
        return self

    def extract_color_info_blocks(self):
        for _ in range(self.header.num_colors):
            color_info_block_string = self.stream.read(4 + 3 * struct.calcsize('I'))
            color_info_block = unpack('4c3I', color_info_block_string)
            cleaned_graph_name = self.stream.read(color_info_block[6])
            self.header.color_info_blocks.append((color_info_block, cleaned_graph_name))
        return self