import os
import tempfile
import uuid
from concurrent import futures
from functools import partial
from importlib import import_module
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import proplot as pplt
import pylatex as pyl
from tqdm.auto import tqdm
from pyhdx.plot import FitResultPlotBase, SCATTER_KWARGS
geometry_options = {
"lmargin": "1in",
"rmargin": "1in",
"tmargin": "1in",
}
# pyl.NoEscape
# geometry_options = {
# 'total': pyl.NoEscape(r'{170mm, 157mm}'),
# 'left': '20mm',
# 'top': '20mm'
# }
# assuming A4 210 mm width
PAGE_WIDTH = (
210
- pplt.units(geometry_options["lmargin"], dest="mm")
- pplt.units(geometry_options["rmargin"], dest="mm")
)
# PAGE_WIDTH = 170
class BaseReport(object):
pass
class Report(BaseReport):
def __init__(self, hdxm_set, **kwargs):
raise NotImplementedError()
[docs]class FitReport(FitResultPlotBase):
"""
Create .pdf output of a fit result
"""
def __init__(
self, fit_result, title=None, doc=None, add_date=True, temp_dir=None, **kwargs
):
super().__init__(fit_result)
self.title = title or f"Fit report"
self.doc = doc or self._init_doc(add_date=add_date)
self._temp_dir = temp_dir or self.make_temp_dir()
self._temp_dir = Path(self._temp_dir)
self.figure_queue = []
self.tex_dict = (
{}
) # dictionary gathering lists of partial functions which when executed generate the tex output
self._figure_number = 0 # todo automate
def make_temp_dir(self):
# todo pathlib
_tmp_path = os.path.abspath(os.path.join(tempfile.gettempdir(), str(id(self))))
if not os.path.exists(_tmp_path):
os.makedirs(_tmp_path)
return _tmp_path
def _init_doc(self, add_date=True):
doc = pyl.Document(geometry_options=geometry_options)
doc.packages.append(pyl.Package("float"))
doc.packages.append(pyl.Package("hyperref"))
doc.preamble.append(pyl.Command("title", self.title))
if add_date:
doc.preamble.append(pyl.Command("date", pyl.NoEscape(r"\today")))
else:
doc.preamble.append(pyl.Command("date", pyl.NoEscape(r"")))
doc.append(pyl.NoEscape(r"\maketitle"))
doc.append(pyl.NewPage())
doc.append(pyl.Command("tableofcontents"))
doc.append(pyl.NewPage())
return doc
# def _save_fig(self, fig, *args, extension='pdf', **kwargs):
# filename = '{}.{}'.format(str(uuid.uuid4()), extension.strip('.'))
# filepath = os.path.join(self._temp_dir, filename)
# fig.savefig(filepath, *args, **kwargs)
# return filepath
def reset_doc(self, add_date=True):
self.doc = self._init_doc(add_date=add_date)
def add_standard_figure(self, name, **kwargs):
extension = ".pdf"
self.tex_dict[name] = {}
module = import_module("pyhdx.plot")
f = getattr(module, name)
arg_dict = self._get_arg(name)
width = kwargs.pop("width", PAGE_WIDTH)
for args_name, arg in arg_dict.items():
fig_func = partial(
f, arg, width=width, **kwargs
) # todo perhaps something like fig = lazy(func(args, **kwargs))?
file_name = "{}.{}".format(str(uuid.uuid4()), extension.strip("."))
file_path = self._temp_dir / file_name
self.figure_queue.append((file_path, fig_func))
tex_func = partial(_place_figure, file_path)
self.tex_dict[name][args_name] = [tex_func]
# def _get_args(self, plot_func_name):
# #Add _figure suffix if not present
# if not plot_func_name.endswith('_figure'):
# plot_func_name += '_figure'
#
# if plot_func_name == 'peptide_coverage_figure':
# return {hdxm.name: [hdxm.data] for hdxm in self.fit_result.hdxm_set.hdxm_list}
# elif plot_func_name == 'residue_time_scatter_figure':
# return {hdxm.name: [hdxm] for hdxm in self.fit_result.hdxm_set.hdxm_list}
# elif plot_func_name == 'residue_scatter_figure':
# return {'All states': [self.fit_result.hdxm_set]}
# elif plot_func_name == 'dG_scatter_figure':
# return {'All states': [self.fit_result.output]}
# elif plot_func_name == 'ddG_scatter_figure':
# return {'All states': [self.fit_result.output.df]} # Todo change protein object to dataframe!
# elif plot_func_name == 'linear_bars_figure':
# return {'All states': [self.fit_result.output.df]}
# elif plot_func_name == 'rainbowclouds_figure':
# return {'All states': [self.fit_result.output.df]}
# else:
# raise ValueError(f"Unknown plot function {plot_func_name!r}")
def add_peptide_uptake_curves(self, layout=(5, 4), time_axis=None):
extension = ".pdf"
self.tex_dict["peptide_uptake"] = {}
nrows, ncols = layout
n = nrows * ncols
time = time_axis or self.get_fit_timepoints()
if time.ndim == 1:
time = np.tile(
time, (len(self.fit_result), 1)
) # todo move shape change to FitResult object
d_calc = self.fit_result(time) # Ns x Np x Nt
fig_factory = partial(
pplt.subplots,
ncols=ncols,
nrows=nrows,
sharex=1,
sharey=1,
width=f"{PAGE_WIDTH}mm",
)
# iterate over samples
for hdxm, d_calc_s in zip(self.fit_result.hdxm_set, d_calc):
name = hdxm.name
indices = range(hdxm.Np)
chunks = [indices[i : i + n] for i in range(0, len(indices), n)]
tex = []
for chunk in chunks:
file_name = "{}.{}".format(str(uuid.uuid4()), extension.strip("."))
file_path = self._temp_dir / file_name
fig_func = partial(
_peptide_uptake_figure, fig_factory, chunk, time[0], d_calc_s, hdxm
)
self.figure_queue.append((file_path, fig_func))
tex_func = partial(_place_figure, file_path)
tex.append(tex_func)
self.tex_dict["peptide_uptake"][name] = tex
def generate_latex(
self, sort_by="graphs"
): # graphs = [] #todo allow for setting which graphs to output
if sort_by == "graphs":
for graph_type, state_dict in self.tex_dict.items():
# todo map graph type to human readable section name
with self.doc.create(pyl.Section(graph_type)):
for state, tex_list in state_dict.items():
with self.doc.create(pyl.Subsection(state)):
[tex_func(doc=self.doc) for tex_func in tex_list]
else:
raise NotImplementedError("Sorting by protein state not implemented")
def generate_figures(self, executor="local"):
if isinstance(executor, futures.Executor):
exec_klass = executor
elif executor == "process":
exec_klass = futures.ProcessPoolExecutor()
elif executor == "local":
exec_klass = LocalThreadExecutor()
else:
raise ValueError("Invalid value for 'executor'")
total = len(self.figure_queue)
ft = [exec_klass.submit(run, item) for item in self.figure_queue]
with tqdm(total=total, desc="Generating figures") as pbar:
for future in futures.as_completed(ft):
pbar.update(1)
def generate_pdf(self, file_path, cleanup=True, **kwargs):
defaults = {"compiler_args": ["--xelatex"]}
defaults.update(kwargs)
self.doc.generate_pdf(file_path, **defaults)
#
# if cleanup:
# #try:
# self._temp_dir.clean()
# #except:
def _place_figure(file_path, width=r"\textwidth", doc=None):
with doc.create(pyl.Figure(position="H")) as tex_fig:
tex_fig.add_image(str(file_path), width=pyl.NoEscape(width))
def _peptide_uptake_figure(fig_factory, indices, _t, _d, hdxm):
fig, axes = fig_factory()
axes_iter = iter(axes)
for i in indices:
ax = next(axes_iter)
ax.plot(_t, _d[i], color="r")
ax.scatter(hdxm.timepoints, hdxm.d_exp.iloc[i], color="k", **SCATTER_KWARGS)
start, end = hdxm.coverage.data.iloc[i][["_start", "_end"]]
ax.format(title=f"Peptide_{i}: {start} - {end}")
for ax in axes_iter:
ax.axis("off")
# todo second y axis with RFU
axes.format(
xscale="log",
xlabel="Time (s)",
ylabel="Corrected D-uptake",
xformatter="log",
ylim=(0, None),
)
return fig
def run(item):
file_path, fig_func = item
fig = fig_func()
if not isinstance(fig, plt.Figure):
fig = fig[0]
fig.savefig(file_path)
plt.close(fig)
[docs]class LocalThreadExecutor(futures.Executor):
[docs] def submit(self, f, *args, **kwargs):
future = futures.Future()
future.set_result(f(*args, **kwargs))
return future
[docs] def shutdown(self, wait=True):
pass