Source code for pyhdx.support

from __future__ import annotations

import contextlib
import hashlib
import itertools
import re
import warnings
from collections import OrderedDict
from functools import wraps
from io import StringIO
from itertools import count, groupby
from pathlib import Path
from typing import Iterable, Mapping, Any

import numpy as np
import pandas as pd
import typer
from skimage.filters import threshold_multiotsu

from pyhdx.config import cfg


def make_tuple(item):
    if isinstance(item, list):
        return tuple(make_tuple(i) for i in item)
    elif isinstance(item, dict):
        return tuple((key, make_tuple(value)) for key, value in item.items())
    else:
        return item


def hash_dataframe(df, method="builtin"):
    if method == "builtin":
        tup = (
            *pd.util.hash_pandas_object(df, index=True).values,
            *df.columns,
            *df.columns.names,
            df.index.name,
        )

        return hash(tup)

    elif method == "md5":
        pd_values = list(pd.util.hash_pandas_object(df, index=True).values)
        if isinstance(df.columns, pd.MultiIndex):
            columns = [name for cols in df.columns for name in cols]
        else:
            columns = list(df.columns)

        all_vals = pd_values + columns + list(df.index.names) + list(df.columns.names)
        h = hashlib.md5()
        for val in all_vals:
            h.update(str(val).encode("UTF-8"))

        return h.digest().hex()

    else:
        raise ValueError(f"Invalid method {method!r}, must be 'builtin' or 'md5'")


def hash_array(array, method="builtin"):
    if method == "buitin":
        return hash(array.data.tobytes())
    elif method == "md5":
        h = hashlib.md5(array.data.tobytes())
        return h.digest().hex()
    else:
        raise ValueError(f"Invalid method {method!r}, must be 'builtin' or 'md5'")


def multiindex_apply_function(
    index: pd.MultiIndex,
    level: int,
    func: str,
    args: Iterable = None,
    kwargs: Mapping = None,
) -> pd.MultiIndex:

    args = args or []
    kwargs = kwargs or {}
    new_index = index.set_levels(
        getattr(index.levels[level], func)(*args, **kwargs), level=level
    )

    return new_index


# def multiindex_astype(index: pd.MultiIndex, level: int, dtype: str) -> pd.MultiIndex:
#
#     new_index = multiindex_apply_function(index, level, "astype", args=[dtype])
#     return new_index


def multiindex_set_categories(
    index: pd.MultiIndex,
    level: int,
    categories: Any,  # index-like
    ordered: bool = False,
    rename: bool = False,
) -> pd.MultiIndex:
    new_index = multiindex_apply_function(
        index,
        level,
        "set_categories",
        args=[categories],
        kwargs=dict(ordered=ordered, rename=rename),
    )
    return new_index


def multiindex_add_categories(
    index: pd.MultiIndex,
    level: int,
    categories: Any,  # index-like
) -> pd.MultiIndex:
    new_index = multiindex_apply_function(
        index, level, "add_categories", args=[categories]
    )
    return new_index


def multiindex_astype(
    index: pd.MultiIndex,
    level: int,
    dtype: str,
) -> pd.MultiIndex:
    new_index = multiindex_apply_function(index, level, "astype", args=[dtype])
    return new_index


# use tostring(print(df.to_string()))
def df_fullstr(df):
    with pd.option_context(
        "display.max_rows",
        None,
        "display.max_columns",
        None,
        "display.expand_frame_repr",
        False,
    ):
        s = df.__str__()

    return s


def pprint_df(df):
    print(df_fullstr(df))


def get_reduced_blocks(coverage, max_combine=2, max_join=5):
    block_length = list(coverage.block_length.copy())

    i = 0
    subblock = 0
    while i < len(block_length):
        curr_size = block_length[i]
        if curr_size <= max_combine:
            del block_length[i]
            subblock += curr_size
        elif subblock != 0:
            # if block_length > 1:
            block_length.insert(i, subblock)
            subblock = 0
            i += 1
        else:
            i += 1
    if subblock != 0:
        block_length.append(subblock)

    changes = True
    while changes:
        i = 0
        changes = False
        while i < len(block_length):
            curr_size = block_length[i]
            if curr_size < max_join:
                changes = True
                if i == 0:  # beginning of list
                    block_length[i + 1] += curr_size
                    del block_length[i]
                elif i == len(block_length) - 1:  # end of the list
                    block_length[i - 1] += curr_size
                    del block_length[i]
                else:
                    if block_length[i - 1] < block_length[i + 1]:
                        block_length[i - 1] += curr_size
                    else:
                        block_length[i + 1] += curr_size
                    del block_length[i]
            else:
                i += 1

    return block_length


def get_constant_blocks(coverage, block_size=10, initial_block=5):
    num_repeats = (coverage.prot_len - initial_block) // block_size
    remainder = (coverage.prot_len - initial_block) % block_size

    blocks = [initial_block] + [block_size] * num_repeats
    if remainder:
        blocks += [remainder]

    return blocks


def get_original_blocks(coverage):
    block_length = list(coverage.block_length.copy())
    return block_length


[docs]def reduce_inter( args: list[tuple[int, int]], gap_size: int = -1 ) -> list[tuple[int, int]]: """Reduce overlapping intervals to its non-overlapping intveral parts Author: Brent Pedersen Source: https://github.com/brentp/interlap/blob/3c4a5923c97a5d9a11571e0c9ea5bb7ea4e784ee/interlap.py#L224 gap_size : :obj:`int` Gaps of this size between adjacent peptides is not considered to overlap. A value of -1 means that peptides with exactly zero overlap are separated. With gap_size=0 peptides with exactly zero overlap are not separated, and larger values tolerate larger gap sizes. >>> reduce_inter([(2, 4), (4, 9)]) [(2, 4), (4, 9)] >>> reduce_inter([(2, 6), (4, 10)]) [(2, 10)] """ gap_size += 1 if len(args) < 2: return args args.sort() ret = [args[0]] for next_i, (s, e) in enumerate(args, start=1): if next_i == len(args): ret[-1] = ret[-1][0], max(ret[-1][1], e) break ns, ne = args[next_i] # next start, next end if ( e + gap_size > ns or ret[-1][1] + gap_size > ns ): # if current end is further than next start (overlap), OR current inverterval end later then next start ret[-1] = ( ret[-1][0], max(e, ne, ret[-1][1]), ) # extend the end value of the current inverval by the new end else: ret.append((ns, ne)) return ret
@contextlib.contextmanager def temporary_seed(seed): # https://stackoverflow.com/questions/49555991/can-i-create-a-local-numpy-random-seed state = np.random.get_state() np.random.seed(seed) try: yield finally: np.random.set_state(state)
[docs]def grouper(n, iterable, padvalue=None): "grouper(3, 'abcdefg', 'x') --> ('a','b','c'), ('d','e','f'), ('g','x','x')" return itertools.zip_longest(*[iter(iterable)] * n, fillvalue=padvalue)
def _get_f_width(data, sign): i = 1 if sign else 0 w_pos = np.log10(np.nanmax(data)) + i w_neg = np.log10(np.nanmax(-data)) + 1 w = np.nanmax([w_pos, w_neg]) + 1 try: width = int(np.floor(w)) except OverflowError: # all zeros width = 0 return width # move to fileIO? def fmt_export( arr, delimiter="\t", header=True, sig_fig=8, width="auto", justify="left", sign=False, pad="", ): warnings.warn("fmt_export is to pyhdx.fileIO", PendingDeprecationWarning) with np.testing.suppress_warnings() as sup: sup.filter(RuntimeWarning) flag1 = "" if justify != "left" else "-" flag2 = "+" if sign else "" flag3 = "0" if pad == "0" else "" fmt = [] hdr = [] for j, name in enumerate(arr.dtype.names): dtype = arr[name].dtype if dtype.kind in ["b"]: specifier = "i" precision = "" w = 4 if np.all(arr[name]) else 5 elif dtype.kind in ["i", "u"]: specifier = "i" precision = "" w = _get_f_width(arr[name], sign) elif dtype.kind in ["f", "c"]: specifier = "g" precision = "." + str(sig_fig) # float notation width # todo check for nan widths w_f = _get_f_width(arr[name], sign) + sig_fig # scientific notation width i = 1 if sign or np.any(arr[name] < 0) else 0 w_s = ( sig_fig + 4 + i + 1 ) # +1 for decimal point which is not always needed w = min(w_f, w_s) + 1 elif dtype.kind in ["U", "S", "O"]: specifier = "s" precision = "" w = np.max([len(str(item)) for item in arr[name]]) else: raise TypeError(f"Invalid dtype kind {dtype.kind} for field {name}") if width == "auto": col_w = w elif isinstance(width, int): col_w = width else: raise ValueError("Invalid width") if header: i = 2 if j == 0 else 0 # Additional space for header comment # if width == "auto": _width = max(col_w, len(name) + i) elif isinstance(width, int): _width = col_w func = str.ljust if justify == "left" else str.rjust fill = flag3 if flag3 else " " h = func(name, _width - i, fill) hdr.append(h) else: _width = col_w s = f"%{flag1}{flag2}{flag3}{_width}{precision}{specifier}" fmt.append(s) fmt = delimiter.join(fmt) hdr = delimiter.join(hdr) return fmt, hdr # move to fileIO? def np_from_txt(file_path, delimiter="\t"): warnings.warn( "np_from_txt is moved to pyhdx.fileIO as txt_to_np", PendingDeprecationWarning ) if isinstance(file_path, StringIO): file_obj = file_path else: file_obj = open(file_path, "r") names = None header_lines = 0 while True: header = file_obj.readline().strip() if header.startswith("#"): names = header[2:].split(delimiter) header_lines += 1 else: break file_obj.seek(0) return np.genfromtxt( file_obj, dtype=None, names=names, skip_header=header_lines, delimiter=delimiter, encoding=None, autostrip=True, comments=None, )
[docs]def try_wrap(start, end, wrap, margin=4): """Check for a given coverage if the value of wrap is high enough to not have peptides overlapping within margin start, end interval is inclusive, exclusive """ assert len(start) == len(end), "Unequal length of 'start' and 'end' vectors" offset = np.min(start) start = np.array(start) - offset end = np.array(end) - offset x = np.zeros((wrap, len(start) + margin)) wrap_gen = itertools.cycle(range(wrap)) for i, s, e in zip(wrap_gen, start, end): section = x[i, s : e + margin] if np.any(section): return False section[:] = 1 return True
[docs]def autowrap(start, end, margin=4, step=5): """ Automatically finds wrap value for coverage to not have overlapping peptides within margin Parameters ---------- start end margin Returns ------- """ assert len(start) == len(end), "Unequal length of 'start' and 'end' vectors" wrap = step while True: wraps = try_wrap(start, end, wrap, margin=margin) wrap += step if wraps or wrap > len(start): break return wrap
# https://stackoverflow.com/questions/15182381/how-to-return-a-view-of-several-columns-in-numpy-structured-array/ def fields_view(arr, fields): dtype2 = np.dtype({name: arr.dtype.fields[name] for name in fields}) return np.ndarray(arr.shape, dtype2, arr, 0, arr.strides) # https://stackoverflow.com/questions/15182381/how-to-return-a-view-of-several-columns-in-numpy-structured-array/ def make_view(arr, fields, dtype): offsets = [arr.dtype.fields[f][1] for f in fields] offset = min(offsets) stride = max(offsets) return np.ndarray( (len(arr), 2), buffer=arr, offset=offset, strides=(arr.strides[0], stride - offset), dtype=dtype, ) vhex = np.vectorize(hex) base_v = np.vectorize(np.base_repr)
[docs]def rgb_to_hex(rgb_a): """Converts rgba input values are [0, 255] alpha is set to zero returns as '#000000' """ # Single value if isinstance(rgb_a, tuple): try: r, g, b, a = rgb_a except ValueError: r, g, b = rgb_a return f"#{r:02x}{g:02x}{b:02x}" elif isinstance(rgb_a, list): try: rgba_array = np.array( [[b, g, r, 0] for r, g, b, a in rgb_a], dtype=np.uint8 ) except ValueError: # todo this only works with lists of list and gives to wrong result? tests needed rgba_array = np.array([[b, g, r, 0] for r, g, b in rgb_a], dtype=np.uint8) elif isinstance(rgb_a, np.ndarray): # todo: allow rgb arrays assert rgb_a.shape[-1] == 4 if rgb_a.data.c_contiguous: # todo check for c-contigious rgba_array = rgb_a else: rgba_array = np.array(rgb_a) else: raise TypeError(f"Invalid type for 'rgb_a': {rgb_a}") ints = rgba_array.astype(np.uint8).view(dtype=np.uint32).byteswap() padded = np.char.rjust(base_v(ints // 2**8, 16), 6, "0") result = np.char.add("#", padded).squeeze() return result
# def rgb_to_hex(r, g, b): # return f'#{r:02x}{g:02x}{b:02x}'
[docs]def hex_to_rgb(h): """returns rgb as int 0-255""" r, g, b = tuple(int(h.lstrip("#")[2 * i : 2 * i + 2], 16) for i in range(3)) return r, g, b
def hex_to_rgba(h, alpha=255): r, g, b = tuple(int(h.lstrip("#")[2 * i : 2 * i + 2], 16) for i in range(3)) return r, g, b, alpha def group_with_index(arr): # https://stackoverflow.com/questions/25438491/finding-consecutively-repeating-strings-in-python-list/25438531#25438531 i = 0 for k, vs in itertools.groupby(arr): c = sum(1 for _ in vs) yield k, c, i i += c # move to output? # pymol coloring functions need cleaning/refactoring # remvoe color_to_pymol # instead use apply_cmap then color_pymol / color_pymol_script
[docs]def colors_to_pymol(r_number, color_arr, c_term=None, no_coverage="#8c8c8c"): """coverts colors (hexadecimal format) and corresponding residue numbers to pml script to color structures in pymol residue ranges in output are inclusive, incluive c_term: optional residue number of the c terminal of the last peptide doedsnt cover the c terminal """ # todo replace with pandas dataframe magic c_term = c_term or np.max(r_number) pd_series = pd.Series(color_arr, index=r_number) pd_series = pd_series.reindex(np.arange(1, c_term + 1)) pd_series = pd_series.replace("nan", no_coverage) # No coverage at nan entries pd_series = pd_series.replace(np.nan, no_coverage) # Numpy NaNs return series_to_pymol(pd_series)
def apply_cmap(pd_series_or_df, cmap, norm=None): values = pd_series_or_df if norm is None else norm(pd_series_or_df) rgb_colors = cmap(values, bytes=True) hex_colors = rgb_to_hex(rgb_colors) if isinstance(pd_series_or_df, pd.Series): return pd.Series(hex_colors, index=pd_series_or_df.index) elif isinstance(pd_series_or_df, pd.DataFrame): return pd.DataFrame( hex_colors, index=pd_series_or_df.index, columns=pd_series_or_df.columns ) def color_pymol(pd_series, cmd, model=None): grp = pd_series.groupby(pd_series) for c, pd_series in grp: result = [ list(g) for _, g in groupby(pd_series.index, key=lambda n, c=count(): n - next(c)) ] r, g, b = hex_to_rgb(c) residues = [f"resi {g[0]}-{g[-1]}" for g in result] selection = " + ".join(residues) if model: selection = f"model {model} and ({selection})" cmd.set_color(c, [r, g, b]) cmd.color(c, selection=selection)
[docs]def series_to_pymol(pd_series): """ Coverts a pandas series to pymol script to color proteins structures in pymol Series must have hexadecimal color values and residue number as index Parameters ---------- pd_series : :class:`~pandas.Series` Returns ------- s_out : :obj:`str` """ # https://stackoverflow.com/questions/33483670/how-to-group-a-series-by-values-in-pandas grp = pd_series.groupby(pd_series) s_out = "" for c, pd_series in grp: r, g, b = hex_to_rgb(c) s_out += f"set_color color_{c}, [{r},{g},{b}]\n" # https://stackoverflow.com/questions/30993182/how-to-get-the-index-range-of-a-list-that-the-values-satisfy-some-criterion-in-p for c, pd_series in grp: result = [ list(g) for _, g in groupby(pd_series.index, key=lambda n, c=count(): n - next(c)) ] residues = [f"resi {g[0]}-{g[-1]}" for g in result] s_out += f"color color_{c}, " + " + ".join(residues) + "\n" return s_out
[docs]def make_monomer(input_file, output_file): """reads input_file pdb file and removes all chains except chain A and all water""" with open(input_file, "r") as f_in: with open(output_file, "w") as f_out: for line in iter(f_in.readline, ""): if line.startswith("COMPND") and "CHAIN" in line: res = re.findall(":(.*);", line)[0] line = line.replace(res + ";", " A;" + " " * (len(res) - 2)) if line.startswith("ATOM") and not " A " in line: continue elif line.startswith("HETATM") and "HOH" in line: continue f_out.write(line)
# move t # o output?
[docs]def make_color_array(rates, colors, thds, no_coverage="#8c8c8c"): """ :param rates: array of rates :param colors: list of colors (slow to fast) :param thds: list of thresholds no_coverage: color value for no coverage :return: """ output = np.full_like(rates, fill_value=no_coverage, dtype="U7") full_thds = [-np.inf] + list(thds) + [np.inf] for lower, upper, color in zip(full_thds[:-1], full_thds[1:], colors): b = (rates > lower) & (rates <= upper) output[b] = color return output
[docs]def multi_otsu(*rates, classes=3): """ global otsu thesholding of multiple rate arrays in log space Parameters ---------- rates : iterable iterable of numpy structured arrays with a 'rate' field classes : :obj:`int` Number of classes to divide the data into Returns ------- thds : :obj:`tuple` tuple with thresholds """ all_rates = np.concatenate([data["rate"] for data in rates]) thd_rates = np.log(all_rates[~np.isnan(all_rates)]) thds = threshold_multiotsu(thd_rates, classes=classes) return tuple(np.e**thd for thd in thds)
[docs]def scale(x, out_range=(-1, 1)): """rescale input array x to range `out_range`""" domain = np.nanmin(x), np.nanmax(x) y = (x - (domain[1] + domain[0]) / 2) / (domain[1] - domain[0]) return y * (out_range[1] - out_range[0]) + (out_range[1] + out_range[0]) / 2
[docs]def gen_subclasses(cls): """Recursively find all subclasses of cls""" for sub_cls in cls.__subclasses__(): yield sub_cls yield from gen_subclasses(sub_cls)
[docs]def pprint_df_to_file(df, file_path_or_obj): """ Pretty print (human-readable) a dataframe to a file Parameters ---------- df : :class:`~pandas.DataFrame` file_path_or_obj : :obj:`str`, Path or :class:`~io.StringIO` """ with pd.option_context( "display.max_rows", None, "display.max_columns", None, "display.expand_frame_repr", False, ): # more options can be specified also if isinstance(file_path_or_obj, str): pth = Path(file_path_or_obj) pth.write_text(df.__str__()) elif isinstance(file_path_or_obj, Path): file_path_or_obj.write_text(df.__str__()) elif isinstance(file_path_or_obj, StringIO): file_path_or_obj.write(df.__str__())
[docs]def clean_types(d: Any) -> Any: """cleans up nested dict/list/tuple/other `d` for exporting as yaml Converts library specific types to python native types, including numpy dtypes, OrderedDict, numpy arrays # https://stackoverflow.com/questions/59605943/python-convert-types-in-deeply-nested-dictionary-or-array """ if isinstance(d, np.floating): return float(d) if isinstance(d, np.integer): return int(d) if isinstance(d, np.ndarray): return d.tolist() if isinstance(d, list): return [clean_types(item) for item in d] if isinstance(d, tuple): return tuple(clean_types(item) for item in d) if isinstance(d, OrderedDict): return clean_types(dict(d)) if isinstance(d, dict): return {k: clean_types(v) for k, v in d.items()} else: return d
[docs]def select_config() -> None: """When the .pyhdx directory has multiple config files, prompts the users for which config to use and subsequently loads it. """ pyhdx_dir = Path().home() / '.pyhdx' config_options = list(pyhdx_dir.glob("*.yaml")) if len(config_options) > 1: s = "Found multiple configuration files:\n" for i, cfg_file in enumerate(config_options, start=1): s += f"{i}: {cfg_file.stem}\n" print(s) choice = typer.prompt("Which config file to use?", type=int) if choice < 1 or choice > len(config_options): print(f"Invalid option: {choice}") else: cfg.load_config(config_options[choice - 1])
[docs]def pbar_decorator(pbar): """Wraps a progress bar around a function, updating the progress bar with each function call""" def func_wrapper(func): @wraps(func) def wrapper(*args, **kwargs): result = func(*args, **kwargs) pbar.update() return result return wrapper return func_wrapper