from copy import deepcopy
import numpy as np
import pandas as pd
import torch as t
import torch.nn as nn
from scipy import constants, linalg
from pyhdx.fileIO import dataframe_to_file
from pyhdx.models import Protein
from pyhdx.config import cfg
# TORCH_DTYPE = t.double
# TORCH_DEVICE = t.device('cpu')
[docs]class DeltaGFit(nn.Module):
def __init__(self, dG):
super(DeltaGFit, self).__init__()
self.register_parameter(name="dG", param=nn.Parameter(dG))
[docs] def forward(self, temperature, X, k_int, timepoints):
"""
# inputs, list of:
temperatures: scalar (1,)
X (N_peptides, N_residues)
k_int: (N_peptides, 1)
"""
pfact = t.exp(self.dG / (constants.R * temperature))
uptake = 1 - t.exp(-t.matmul((k_int / (1 + pfact)), timepoints))
return t.matmul(X, uptake)
[docs]def estimate_errors(hdxm, dG):
"""
Calculate covariances and uncertainty (perr, experimental)
Parameters
----------
hdxm : :class:`~pyhdx.models.HDXMeasurement`
dG : :class:`~numpy.ndarray`
Array with dG values.
Returns
-------
"""
dtype = t.float64
joined = pd.concat([dG, hdxm.coverage["exchanges"]], axis=1, keys=["dG", "ex"])
dG = joined.query("ex==True")["dG"]
dG_tensor = t.tensor(dG.to_numpy(), dtype=dtype)
tensors = {
k: v.cpu() for k, v in hdxm.get_tensors(exchanges=True, dtype=dtype).items()
}
def hes_loss(dG_input):
criterion = t.nn.MSELoss(reduction="sum")
pfact = t.exp(dG_input.unsqueeze(-1) / (constants.R * tensors["temperature"]))
uptake = 1 - t.exp(
-t.matmul((tensors["k_int"] / (1 + pfact)), tensors["timepoints"])
)
d_calc = t.matmul(tensors["X"], uptake)
loss = criterion(d_calc, tensors["d_exp"])
return loss
hessian = t.autograd.functional.hessian(hes_loss, dG_tensor)
hessian_inverse = t.inverse(-hessian)
covariance = np.sqrt(np.abs(np.diagonal(hessian_inverse)))
cov_series = pd.Series(covariance, index=dG.index, name="covariance")
def jac_loss(dG_input):
pfact = t.exp(dG_input.unsqueeze(-1) / (constants.R * tensors["temperature"]))
uptake = 1 - t.exp(
-t.matmul((tensors["k_int"] / (1 + pfact)), tensors["timepoints"])
)
d_calc = t.matmul(tensors["X"], uptake)
residuals = d_calc - tensors["d_exp"]
return residuals.flatten()
# https://stackoverflow.com/questions/42388139/how-to-compute-standard-deviation-errors-with-scipy-optimize-least-squares
jacobian = t.autograd.functional.jacobian(jac_loss, dG_tensor).numpy()
U, s, Vh = linalg.svd(jacobian, full_matrices=False)
tol = np.finfo(float).eps * s[0] * max(jacobian.shape)
w = s > tol
cov = (Vh[w].T / s[w] ** 2) @ Vh[w] # robust covariance matrix
res = jac_loss(dG_tensor).numpy()
chi2dof = np.sum(res**2) / (res.size - dG_tensor.numpy().size)
cov *= chi2dof
perr = np.sqrt(np.diag(cov))
perr_series = pd.Series(perr, index=dG.index, name="perr")
return cov_series, perr_series
[docs]class TorchFitResult(object):
"""
PyTorch Fit result object.
Parameters
----------
hdxm_set : :class:`~pyhdx.models.HDXMeasurementSet`
model
**metdata
"""
def __init__(self, hdxm_set, model, losses=None, **metadata):
self.hdxm_set = hdxm_set
self.model = model
self.losses = losses
self.metadata = metadata
self.metadata["model_name"] = type(model).__name__
if losses is not None:
self.metadata["total_loss"] = self.total_loss
self.metadata["mse_loss"] = self.mse_loss
self.metadata["reg_loss"] = self.reg_loss
self.metadata["regularization_percentage"] = self.regularization_percentage
self.metadata["epochs_run"] = len(self.losses)
self.names = [hdxm.name for hdxm in self.hdxm_set.hdxm_list]
dfs = [
self.generate_output(hdxm, self.dG[g_column])
for hdxm, g_column in zip(self.hdxm_set, self.dG)
]
df = pd.concat(
dfs, keys=self.names, names=["state", "quantity"], axis=1, sort=True
)
self.output = df
[docs] def get_peptide_mse(self):
"""Get a dataframe with mean squared error per peptide (ie per peptide squared error averaged over time)"""
squared_errors = self.get_squared_errors()
dfs = {}
for mse_sample, hdxm in zip(squared_errors, self.hdxm_set):
peptide_data = hdxm[0].data
mse = np.mean(mse_sample, axis=1)
# Indexing of mse_sum with Np to account for zero-padding
passthrough_fields = ["start", "end", "sequence"]
df = peptide_data[passthrough_fields].copy()
df["peptide_mse"] = mse[: hdxm.Np]
dfs[hdxm.name] = df
mse_df = pd.concat(
dfs.values(),
keys=dfs.keys(),
names=["state", "quantity"],
axis=1,
sort=True,
)
return mse_df
[docs] def get_residue_mse(self):
"""Get a dataframe with residue mean squared errors
Errors are from peptide MSE which is subsequently reduced to residue level by weighted averaging
"""
peptide_mse = self.get_peptide_mse()
residue_mse_list = []
for hdxm in self.hdxm_set:
peptide_mse_values = (
peptide_mse[hdxm.name, "peptide_mse"].dropna().to_numpy()
)
residue_mse_values = hdxm.coverage.Z_norm.T.dot(peptide_mse_values)
residue_mse = pd.Series(residue_mse_values, index=hdxm.coverage.r_number)
residue_mse_list.append(residue_mse)
residue_mse = pd.concat(
residue_mse_list, keys=self.hdxm_set.names, axis=1, sort=True
)
columns = pd.MultiIndex.from_tuples(
[(name, "residue_mse") for name in self.hdxm_set.names],
names=["state", "quantity"],
)
residue_mse.columns = columns
return residue_mse
@property
def mse_loss(self):
"""obj:`float`: Losses from mean squared error part of Lagrangian"""
mse_loss = self.losses["mse_loss"].iloc[-1]
return float(mse_loss)
@property
def total_loss(self):
"""obj:`float`: Total loss value of the Lagrangian"""
total_loss = self.losses.iloc[-1].sum()
return float(total_loss)
@property
def reg_loss(self):
""":obj:`float`: Losses from regularization part of Lagrangian"""
return self.total_loss - self.mse_loss
@property
def regularization_percentage(self):
""":obj:`float`: Percentage part of the total loss that is regularization loss"""
return (self.reg_loss / self.total_loss) * 100
@property
def dG(self):
"""output dG as :class:`~pandas.Series` or as :class:`~pandas.DataFrame`
index is residue numbers
"""
g_values = self.model.dG.cpu().detach().numpy().squeeze()
# if g_values.ndim == 1:
# dG = pd.Series(g_values, index=self.hdxm_set.coverage.index)
# else:
dG = pd.DataFrame(
g_values.T, index=self.hdxm_set.coverage.index, columns=self.hdxm_set.names
)
return dG
[docs] @staticmethod
def generate_output(hdxm, dG):
"""
Parameters
----------
hdxm : :class:`~pyhdx.models.HDXMeasurement`
dG : :class:`~pandas.Series` with r_number as index
Returns
-------
"""
sequence = hdxm.coverage["sequence"].reindex(dG.index)
out_dict = {"sequence": sequence, "_dG": dG}
out_dict["dG"] = out_dict["_dG"].copy()
exchanges = hdxm.coverage["exchanges"].reindex(dG.index, fill_value=False)
out_dict["dG"][~exchanges] = np.nan
pfact = np.exp(out_dict["dG"] / (constants.R * hdxm.temperature))
out_dict["pfact"] = pfact
k_int = hdxm.coverage["k_int"].reindex(dG.index, fill_value=False)
k_obs = k_int / (1 + pfact)
out_dict["k_obs"] = k_obs
covariance, perr = estimate_errors(hdxm, dG)
df = pd.DataFrame(out_dict, index=dG.index)
df = df.join(covariance)
return df
[docs] def to_file(
self,
file_path,
include_version=True,
include_metadata=True,
fmt="csv",
**kwargs,
):
"""save only output to file"""
metadata = self.metadata if include_metadata else include_metadata
dataframe_to_file(
file_path,
self.output,
include_version=include_version,
include_metadata=metadata,
fmt=fmt,
**kwargs,
)
[docs] def get_squared_errors(self) -> np.ndarray:
"""np.ndarray: Returns the squared error per peptide per timepoint. Output shape is Ns x Np x Nt"""
d_calc = self(self.hdxm_set.timepoints)
errors = (d_calc - self.hdxm_set.d_exp) ** 2
return errors
def __call__(self, timepoints):
"""timepoints: shape must be Ns x Nt, or Nt and will be reshaped to Ns x 1 x Nt
output: Ns x Np x Nt array"""
# todo fix and tests
timepoints = np.array(timepoints)
if timepoints.ndim == 1:
time_reshaped = np.tile(timepoints, (self.hdxm_set.Ns, 1, 1))
elif timepoints.ndim == 2:
Ns, Nt = timepoints.shape
assert (
Ns == self.hdxm_set.Ns
), "First dimension of 'timepoints' must match the number of samples"
time_reshaped = timepoints.reshape(Ns, 1, Nt)
elif timepoints.ndim == 3:
assert (
timepoints.shape[0] == self.hdxm_set.Ns
), "First dimension of 'timepoints' must match the number of samples"
time_reshaped = timepoints
else:
raise ValueError("Invalid timepoints number of dimensions, must be <=3")
dtype = t.float64
with t.no_grad():
tensors = self.hdxm_set.get_tensors()
inputs = [tensors[key] for key in ["temperature", "X", "k_int"]]
time_tensor = t.tensor(time_reshaped, dtype=dtype)
inputs.append(time_tensor)
output = self.model(*inputs)
array = output.detach().numpy()
return array
[docs] def get_dcalc(self, timepoints=None):
"""returns calculated d uptake for optional timepoints
if no timepoints are given, a default set of logarithmically space timepoints is generated
"""
if timepoints is None:
timepoints = self.hdxm_set.timepoints
tmin = np.log10(timepoints[np.nonzero(timepoints)].min())
tmax = np.log10(timepoints.max())
pad = 0.05 * (tmax - tmin) # 5% padding percentage
tvec = np.logspace(tmin - pad, tmax + pad, num=100, endpoint=True)
else:
tvec = timepoints
df = self.eval(tvec)
return df
[docs] def eval(self, timepoints):
"""evaluate the model at timepoints and return dataframe"""
assert timepoints.ndim == 1, "Timepoints must be one-dimensional"
array = self(timepoints)
Ns, Np, Nt = array.shape
reshaped = array.reshape(Ns * Np, Nt)
columns = pd.MultiIndex.from_product(
[self.hdxm_set.names, np.arange(Np), ["d_calc"]],
names=["state", "peptide_id", "quantity"],
)
index = pd.Index(timepoints, name="exposure")
df = pd.DataFrame(reshaped.T, index=index, columns=columns)
df = df.loc[
:, (df != 0).any(axis=0)
] # remove zero columns, replace with NaN when possible
return df
def __len__(self):
return self.hdxm_set.Ns
[docs]class TorchFitResultSet(object):
"""
Set of multiple TorchFitResults
"""
def __init__(self, results):
# todo enforce within fit result set always length one hdxm_sets in results objects?
# yes: but only in the GUI?
# -> asusme in GUI its one
self.results = results
dfs = [result.output for result in self.results]
self.output = pd.concat(dfs, axis=1, sort=True)
dfs = [result.losses for result in self.results]
names = ["_".join(result.hdxm_set.names) for result in self.results]
self.losses = pd.concat(dfs, axis=1, keys=names, sort=True)
@property
def metadata(self):
return {
"_".join(result.hdxm_set.names): result.metadata for result in self.results
}
[docs] def to_file(
self,
file_path,
include_version=True,
include_metadata=True,
fmt="csv",
**kwargs,
):
"""save only output to file"""
metadata = self.metadata if include_metadata else include_metadata
dataframe_to_file(
file_path,
self.output,
include_version=include_version,
include_metadata=metadata,
fmt=fmt,
**kwargs,
)
def get_peptide_mse(self):
dfs = [result.get_peptide_mse() for result in self.results]
df = pd.concat(dfs, axis=1, sort=True)
return df
def get_residue_mse(self):
dfs = [result.get_residue_mse() for result in self.results]
df = pd.concat(dfs, axis=1, sort=True)
return df
def get_dcalc(self, timepoints=None):
# or do we want timepoints range per measurement? probably not
if timepoints is None:
all_timepoints = np.concatenate(
[result.hdxm_set.timepoints.flatten() for result in self.results]
)
tmin = np.log10(all_timepoints[np.nonzero(all_timepoints)].min())
tmax = np.log10(all_timepoints.max())
pad = 0.05 * (tmax - tmin) # 5% padding percentage
tvec = np.logspace(tmin - pad, tmax + pad, num=100, endpoint=True)
else:
tvec = timepoints
dfs = [result.get_dcalc(tvec) for result in self.results]
df = pd.concat(dfs, axis=1)
return df
# TODO needs testing and probably the data types are wrong here
def eval(self, timepoints):
dfs = [result(timepoints) for result in self.results]
df = pd.concat(dfs, axis=1) # TODO sort=True?
return df
def __len__(self):
return len(self.results)
class Callback(object):
def __call__(self, epoch, model, optimizer):
pass
class CheckPoint(Callback):
def __init__(self, epoch_step=1000):
self.epoch_step = epoch_step
self.model_history = {}
def __call__(self, epoch, model, optimizer):
if epoch % self.epoch_step == 0:
self.model_history[epoch] = deepcopy(model.state_dict())
def to_dataframe(self, names=None, field="dG"):
"""convert history of `field` into dataframe.
names must be given for batch fits with length equal to number of states
"""
entry = next(iter(self.model_history.values()))
g = entry[field]
if g.ndim == 3:
num_states = entry[field].shape[0] # G shape is Ns x Nr x 1
if not len(names) == num_states:
raise ValueError(
f"Number of names provided must be equal to number of states ({num_states})"
)
dfs = []
for i in range(num_states):
df = pd.DataFrame(
{
k: v[field].numpy()[i].squeeze()
for k, v in self.model_history.items()
}
)
dfs.append(df)
full_df = pd.concat(dfs, keys=names, axis=1)
else:
full_df = pd.DataFrame(
{k: v[field].numpy().squeeze() for k, v in self.model_history.items()}
)
return full_df