Skip to content

fitting

DUptakeFitResult dataclass

Source code in pyhdx/fitting.py
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
@dataclass
class DUptakeFitResult:
    result: np.ndarray
    """Array with raw D-uptake fit values, including (guessed/interpolated) D-uptake at 
        residues without coverage."""
    mse_loss: np.ndarray
    reg_loss: np.ndarray
    hdx_obj: Union[HDXMeasurement, HDXTimepoint]
    metadata: dict

    @property
    def d_uptake(self) -> np.ndarray:
        d = self.result.copy()
        d[..., ~self.exchanges] = np.nan

        return d

    @property
    def percentiles(self) -> pd.DataFrame:
        percentiles = [5, 25, 75, 95]
        if isinstance(self.hdx_obj, HDXMeasurement):
            perc = np.percentile(self.result, percentiles, axis=1)
            perc[..., ~self.exchanges] = np.nan
            cols = pd.MultiIndex.from_product(
                [
                    self.hdx_obj.timepoints,
                    [f"percentile_{p:02}" for p in percentiles],
                ],
                names=["exposure", "quantity"],
            )
            reshaped = perc.T.reshape(perc.shape[-1], -1)
            perc_df = pd.DataFrame(reshaped, columns=cols, index=self.r_number)
        elif isinstance(self.hdx_obj, HDXTimepoint):
            perc = np.percentile(self.result, percentiles, axis=0)
            perc[..., ~self.exchanges] = np.nan
            perc_df = pd.DataFrame(
                perc.T,
                columns=[f"percentile_{p:02}" for p in percentiles],
                index=self.r_number,
            )

        return perc_df

    @property
    def means(self) -> pd.DataFrame:
        if isinstance(self.hdx_obj, HDXMeasurement):
            means = self.d_uptake.mean(axis=1)
            cols = pd.MultiIndex.from_product(
                [self.hdx_obj.timepoints, ["d_uptake"]], names=["exposure", "quantity"]
            )
            means_df = pd.DataFrame(means.T, columns=cols, index=self.r_number)

            return means_df

        elif isinstance(self.hdx_obj, HDXTimepoint):
            means = self.d_uptake.mean(axis=0)
            means_df = pd.DataFrame(means, index=self.r_number, columns=["d_uptake"])

            return means_df

    @property
    def output(self) -> Union[pd.Series, pd.DataFrame]:
        if isinstance(self.hdx_obj, HDXMeasurement):
            combined = pd.concat([self.percentiles, self.means], axis=1).sort_index(axis=1, level=0)
            combined.columns = multiindex_astype(combined.columns, 0, "float")
            return combined

        elif isinstance(self.hdx_obj, HDXTimepoint):
            combined = pd.concat([self.percentiles, self.means], axis=1).sort_index(axis=1, level=0)

            return combined

    @property
    def exchanges(self) -> pd.Series:
        if isinstance(self.hdx_obj, HDXMeasurement):
            return self.hdx_obj.coverage["exchanges"]
        elif isinstance(self.hdx_obj, HDXTimepoint):
            return self.hdx_obj["exchanges"]

    @property
    def r_number(self) -> pd.Index:
        if isinstance(self.hdx_obj, HDXMeasurement):
            return self.hdx_obj.coverage.r_number
        elif isinstance(self.hdx_obj, HDXTimepoint):
            return self.hdx_obj.r_number

result: np.ndarray instance-attribute

Array with raw D-uptake fit values, including (guessed/interpolated) D-uptake at residues without coverage.

KineticsFitResult

Bases: object

Fit result object. Generally used for initial guess results.

Parameters

:class:~pyhdx.models.HDXMeasurement

HDX measurement object to fit

intervals: :obj:list List of tuples with intervals (inclusive, exclusive) describing which residues results and models refer to results: :obj:list List of :class:~symfit.FitResults models: :obj:list Lis of :class:~pyhdx.fit_models.KineticsModel

Source code in pyhdx/fitting.py
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
class KineticsFitResult(object):
    """
    Fit result object. Generally used for initial guess results.

    Parameters
    ----------

    hdxm : :class:`~pyhdx.models.HDXMeasurement`
        HDX measurement object to fit
    intervals: :obj:`list`
        List of tuples with intervals (inclusive, exclusive) describing which residues `results` and `models` refer to
    results: :obj:`list`
        List of :class:`~symfit.FitResults`
    models: :obj:`list`
        Lis of :class:`~pyhdx.fit_models.KineticsModel`

    """

    def __init__(self, hdxm, intervals, results, models):
        """
        each model with corresponding interval covers a region in the protein corresponding to r_number
        """
        assert len(results) == len(models)
        #        assert len(models) == len(block_length)
        self.hdxm = hdxm
        self.r_number = hdxm.coverage.r_number  # pandas RangeIndex
        self.intervals = intervals  # inclusive, excluive
        self.results = results
        self.models = models

    @property
    def model_type(self):
        # Most likely all current instances have model_type `single`
        if np.all([isinstance(m, SingleKineticModel) for m in self.models]):
            return "Single"
        else:
            raise ValueError("Unsupported model types")

    @property
    def name(self):
        return self.hdxm.name

    def __call__(self, timepoints):
        """call the result with timepoints to get fitted uptake per peptide back"""
        # todo outdated
        d_list = []
        if self.model_type == "Single":
            for time in timepoints:
                p = self.get_p(time)
                p = np.nan_to_num(p)
                d = self.hdxm.coverage.X.dot(p)
                d_list.append(d)
        elif self.model_type == "Global":
            for time in timepoints:
                d = self.get_d(time)
                d_list.append(d)

        uptake = np.vstack(d_list).T
        return uptake

    def get_p(self, t):
        """
        Calculate P at timepoint t. Only for wt average type fitting results

        """
        p = np.full_like(self.r_number, fill_value=np.nan, dtype=float)
        for (s, e), result, model in zip(self.intervals, self.results, self.models):
            i0, i1 = np.searchsorted(self.r_number, [s, e])
            p[i0:i1] = model(t, **result.params)

        return p

    def get_d(self, t):
        "calculate d at timepoint t only for lsqkinetics (refactor glocal) type fitting results (scores per peptide)"
        d_arr = np.array([])
        for result, model in zip(self.results, self.models):
            d = np.array(model(t, **result.params)).flatten()
            d_arr = np.append(d_arr, d)
        return d_arr

    def __len__(self):
        return len(self.results)

    def __iter__(self):
        raise DeprecationWarning()
        iterable = [(r, m, b) for r, m, b in zip(self.results, self.models, self.block_length)]
        return iterable.__iter__()

    def get_param(self, name):
        """
        Get an array of parameter with name `name` from the fit result. The length of the array is equal to the
        number of amino acids.

        Parameters
        ----------
        name : :obj:`str`
            Name of the parameter to extract

        Returns
        -------
        par_arr : :class:`~numpy.ndarray`
            Array with parameter values

        """

        output = np.full_like(self.r_number, np.nan, dtype=float)
        for (s, e), result, model in zip(self.intervals, self.results, self.models):
            try:
                dummy_name = model.names[name]  ## dummy parameter name
                value = result.params[dummy_name]  # value is scalar
            except KeyError:  # is a lsqmodel funky town
                value = model.get_param_values(name, **result.params)  # value is vector
                # values = model.get_parameter(name)            #todo unify nomenclature param/parameter

            i0, i1 = np.searchsorted(self.r_number, [s, e])
            output[i0:i1] = value

        return output

    @property
    def rate(self):
        """Returns an array with the exchange rates"""
        # todo pd series/dataframe in favour of searchsorted
        output = np.full_like(self.r_number, np.nan, dtype=float)
        for (s, e), result, model in zip(self.intervals, self.results, self.models):
            rate = model.get_rate(**result.params)
            i0, i1 = np.searchsorted(self.r_number, [s, e])
            output[i0:i1] = rate
        return output

    @property
    def tau(self):
        """Returns an array with the exchange rates"""
        return 1 / self.rate

    def get_output(self, names):
        # this does not seem to work:
        # df_dict = {name: getattr(self, name, self.get_param(name)) for name in names}
        df_dict = {}
        for name in names:
            try:
                df_dict[name] = getattr(self, name)
            except AttributeError:
                df_dict[name] = self.get_param(name)

        df = pd.DataFrame(df_dict, index=self.r_number)

        return df

    @property
    def output(self):
        """:class:`~pandas.Dataframe`: Dataframe with fitted rates per residue"""
        df = self.get_output(["rate", "k1", "k2", "r"])
        return df

output property

:class:~pandas.Dataframe: Dataframe with fitted rates per residue

rate property

Returns an array with the exchange rates

tau property

Returns an array with the exchange rates

__call__(timepoints)

call the result with timepoints to get fitted uptake per peptide back

Source code in pyhdx/fitting.py
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
def __call__(self, timepoints):
    """call the result with timepoints to get fitted uptake per peptide back"""
    # todo outdated
    d_list = []
    if self.model_type == "Single":
        for time in timepoints:
            p = self.get_p(time)
            p = np.nan_to_num(p)
            d = self.hdxm.coverage.X.dot(p)
            d_list.append(d)
    elif self.model_type == "Global":
        for time in timepoints:
            d = self.get_d(time)
            d_list.append(d)

    uptake = np.vstack(d_list).T
    return uptake

__init__(hdxm, intervals, results, models)

each model with corresponding interval covers a region in the protein corresponding to r_number

Source code in pyhdx/fitting.py
969
970
971
972
973
974
975
976
977
978
979
def __init__(self, hdxm, intervals, results, models):
    """
    each model with corresponding interval covers a region in the protein corresponding to r_number
    """
    assert len(results) == len(models)
    #        assert len(models) == len(block_length)
    self.hdxm = hdxm
    self.r_number = hdxm.coverage.r_number  # pandas RangeIndex
    self.intervals = intervals  # inclusive, excluive
    self.results = results
    self.models = models

get_d(t)

calculate d at timepoint t only for lsqkinetics (refactor glocal) type fitting results (scores per peptide)

Source code in pyhdx/fitting.py
1023
1024
1025
1026
1027
1028
1029
def get_d(self, t):
    "calculate d at timepoint t only for lsqkinetics (refactor glocal) type fitting results (scores per peptide)"
    d_arr = np.array([])
    for result, model in zip(self.results, self.models):
        d = np.array(model(t, **result.params)).flatten()
        d_arr = np.append(d_arr, d)
    return d_arr

get_p(t)

Calculate P at timepoint t. Only for wt average type fitting results

Source code in pyhdx/fitting.py
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
def get_p(self, t):
    """
    Calculate P at timepoint t. Only for wt average type fitting results

    """
    p = np.full_like(self.r_number, fill_value=np.nan, dtype=float)
    for (s, e), result, model in zip(self.intervals, self.results, self.models):
        i0, i1 = np.searchsorted(self.r_number, [s, e])
        p[i0:i1] = model(t, **result.params)

    return p

get_param(name)

Get an array of parameter with name name from the fit result. The length of the array is equal to the number of amino acids.

Parameters

name : :obj:str Name of the parameter to extract

Returns

par_arr : :class:~numpy.ndarray Array with parameter values

Source code in pyhdx/fitting.py
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
def get_param(self, name):
    """
    Get an array of parameter with name `name` from the fit result. The length of the array is equal to the
    number of amino acids.

    Parameters
    ----------
    name : :obj:`str`
        Name of the parameter to extract

    Returns
    -------
    par_arr : :class:`~numpy.ndarray`
        Array with parameter values

    """

    output = np.full_like(self.r_number, np.nan, dtype=float)
    for (s, e), result, model in zip(self.intervals, self.results, self.models):
        try:
            dummy_name = model.names[name]  ## dummy parameter name
            value = result.params[dummy_name]  # value is scalar
        except KeyError:  # is a lsqmodel funky town
            value = model.get_param_values(name, **result.params)  # value is vector
            # values = model.get_parameter(name)            #todo unify nomenclature param/parameter

        i0, i1 = np.searchsorted(self.r_number, [s, e])
        output[i0:i1] = value

    return output

RatesFitResult dataclass

Accumulates multiple Generic/KineticsFit Results

Source code in pyhdx/fitting.py
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
@dataclass
class RatesFitResult:
    """Accumulates multiple Generic/KineticsFit Results"""

    results: list

    @property
    def output(self):
        dfs = [result.output for result in self.results]
        keys = [result.name for result in self.results]
        combined_results = pd.concat(dfs, axis=1, keys=keys, names=["state", "quantity"])

        return combined_results

check_bounds(fit_result)

Check if the obtained fit result is within bounds

Source code in pyhdx/fitting.py
473
474
475
476
477
478
479
480
481
def check_bounds(fit_result):
    """Check if the obtained fit result is within bounds"""
    for param in fit_result.model.params:
        value = fit_result.params[param.name]
        if value < param.min:
            return False
        elif value > param.max:
            return False
    return True

d_uptake_cost_func(x, A, b, d)

Cost functions for residue-level D-uptake Calculates ||Ax - b|| + d*regularization

Where the regularization is the sum of the absolute value of the gradient of x:

.. math:: \frac{d}{N-1}\sum^{N-1}i |x_i - x|

Parameters:

Name Type Description Default
x ndarray

D-uptake values per residue.

required
A ndarray

Coupling matrix ('X'), connecting peptides to residues

required
b ndarray

D-uptake values per residue

required
d float

regularization parameter

required

Returns:

Type Description
float

Value of the cost function

Source code in pyhdx/fitting.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def d_uptake_cost_func(x: np.ndarray, A: np.ndarray, b: np.ndarray, d: float) -> float:
    r"""
    Cost functions for residue-level D-uptake
    Calculates ||Ax - b|| + d*regularization

    Where the regularization is the sum of the absolute value of the gradient of x:

    .. math::
        \frac{d}{N-1}\sum^{N-1}_i |x_i - x_{i+1}|

    Args:
        x: D-uptake values per residue.
        A: Coupling matrix ('X'), connecting peptides to residues
        b: D-uptake values per residue
        d: regularization parameter

    Returns:
        Value of the cost function

    """
    norm = np.mean((A.dot(x) - b) ** 2)
    reg = d * np.mean(np.abs(np.diff(x)))

    return norm + reg

fit_d_uptake(hdx_obj, guess=None, r1=1.0, bounds=True, repeats=10, verbose=True, client=None)

Fit residue-level D-uptake to a HDX measurement of multiple timepoints or a single HDX timepoint.

Parameters:

Name Type Description Default
hdx_obj Union[HDXMeasurement, HDXTimepoint]

Input HDX object, either HDXMeasurement or HDXTimepoint.

required
guess Optional[ndarray]

Optional guess array of D-uptake values.

None
r1 float

Value for r1 regularizer.

1.0
bounds Union[Bounds, list[tuple[Optional[float], Optional[float]]], None, bool]

Optional bounds. Default is True, which are bounds [0, 1] for all elements. Set to False or None to disable. Custom bounds can be supplied as list of tuples or scipy bounds object.

True
repeats

Number of times to repeat the fit.

10
verbose

Show/hide progress bar

True

Returns:

Type Description
DUptakeFitResult

D-Uptake fit result object.

Source code in pyhdx/fitting.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def fit_d_uptake(
    hdx_obj: Union[HDXMeasurement, HDXTimepoint],
    guess: Optional[np.ndarray] = None,
    r1: float = 1.0,
    bounds: Union[Bounds, list[tuple[Optional[float], Optional[float]]], None, bool] = True,
    repeats=10,
    verbose=True,
    client: Union[Client, Literal["worker_client"], DummyClient, None] = None,
) -> DUptakeFitResult:
    """
    Fit residue-level D-uptake to a HDX measurement of multiple timepoints or a single HDX
    timepoint.

    Args:
        hdx_obj: Input HDX object, either HDXMeasurement or HDXTimepoint.
        guess: Optional guess array of D-uptake values.
        r1: Value for r1 regularizer.
        bounds: Optional bounds. Default is `True`, which are bounds [0, 1] for all elements.
            Set to `False` or `None` to disable. Custom bounds can be supplied as list of
            tuples or scipy bounds object.
        repeats: Number of times to repeat the fit.
        verbose: Show/hide progress bar

    Returns:
        D-Uptake fit result object.
    """

    client = client or DummyClient()

    if isinstance(hdx_obj, HDXMeasurement):
        Nt = hdx_obj.Nt
        iterable = hdx_obj
    elif isinstance(hdx_obj, HDXTimepoint):
        Nt = 1
        iterable = [hdx_obj]
    else:
        raise TypeError(f"Invalid type for 'hdx_obj': {type(hdx_obj)!r}")

    out = np.empty((Nt, repeats, hdx_obj.Nr))
    mse_arr = np.empty((Nt, repeats))
    reg_arr = np.empty((Nt, repeats))

    pbar = tqdm(total=Nt * repeats, disable=not verbose)
    pbar_wrapper = pbar_decorator(pbar)

    for Ni, hdx_t in enumerate(iterable):
        X = hdx_t.X
        d_uptake = hdx_t.data["uptake_corrected"].values
        pfunc = partial(_fit_single_d_update, X, d_uptake, guess=guess, r1=r1, bounds=bounds)
        if isinstance(client, DummyClient):
            pbar_func = pbar_wrapper(pfunc)
        else:
            pbar_func = pfunc

        if client == "worker_client":
            with worker_client() as c:
                futures = [c.submit(pbar_func, pure=False) for r in range(repeats)]
                results = c.gather(futures)
        else:
            futures = [client.submit(pbar_func, pure=False) for r in range(repeats)]
            results = client.gather(futures)

        for r, (res, mse_loss, reg_loss) in enumerate(results):
            out[Ni, r, :] = res.x
            mse_arr[Ni, r] = mse_loss
            reg_arr[Ni, r] = reg_loss

            # futures = client.map(pbar_func)

    # if client is None:
    #     for d, model in zip(d_list, models):
    #         result = fit_kinetics(hdxm.timepoints, d, model, chisq_thd=chisq_thd)
    #         results.append(result)
    # else:
    #     iterables = [[hdxm.timepoints] * len(d_list), d_list, models]
    #
    #     if isinstance(client, Client):
    #         futures = client.map(fit_kinetics, *iterables, chisq_thd=chisq_thd)
    #         results = client.gather(futures)
    #     elif client == "worker_client":
    #         with worker_client() as client:
    #             futures = client.map(fit_kinetics, *iterables, chisq_thd=chisq_thd)
    #             results = client.gather(futures)

    # for Ni, hdx_t in enumerate(iterable):
    #     for r in range(repeats):
    #         res, mse_loss, reg_loss = fit_single_d_update(hdx_t, guess=guess, r1=r1, bounds=bounds)
    #         out[Ni, r, :] = res.x
    #         mse_arr[Ni, r] = mse_loss
    #         reg_arr[Ni, r] = reg_loss
    #
    #         pbar.update()

    metadata = {"r1": r1, "repeats": repeats}
    result = DUptakeFitResult(
        result=out.squeeze(),
        mse_loss=mse_arr.squeeze(),
        reg_loss=reg_arr.squeeze(),
        hdx_obj=hdx_obj,
        metadata=metadata,
    )

    return result

    ...

fit_gibbs_global(hdxm, initial_guess, r1=R1, epochs=EPOCHS, patience=PATIENCE, stop_loss=STOP_LOSS, optimizer='SGD', callbacks=None, **optimizer_kwargs)

Fit Gibbs free energies globally to all D-uptake data in the supplied hdxm

Parameters

hdxm : :class:~pyhdx.models.HDXMeasurement Input HDX measurement initial_guess : :class:~pandas.Series or :class:~numpy.ndarray Gibbs free energy initial guesses (shape Nr, units J/mol) r1 : :obj:float Regularizer value r1 (along residues) epochs: :obj:int Maximum number of fitting iterations patience: :obj:int Number of epochs to wait until termination when progress between epochs is below stop_loss stop_loss: :obj:float Threshold for difference in loss between epochs when an epoch is considered to make no more progress. optimizer : :obj:str Which optimizer to use. Default is Stochastic Gradient Descent. See PyTorch documentation for information. callbacks: :obj:list or None List of callback objects. Call signature is callback(epoch, model, optimizer) **optimizer_kwargs Additional keyword arguments passed to the optimizer.

Returns

result: :class:~pyhdx.fitting_torch.TorchSingleFitResult

Source code in pyhdx/fitting.py
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
def fit_gibbs_global(
    hdxm,
    initial_guess,
    r1=R1,
    epochs=EPOCHS,
    patience=PATIENCE,
    stop_loss=STOP_LOSS,
    optimizer="SGD",
    callbacks=None,
    **optimizer_kwargs,
) -> TorchFitResult:
    """
    Fit Gibbs free energies globally to all D-uptake data in the supplied hdxm

    Parameters
    ----------
    hdxm : :class:`~pyhdx.models.HDXMeasurement`
        Input HDX measurement
    initial_guess : :class:`~pandas.Series` or :class:`~numpy.ndarray`
        Gibbs free energy initial guesses (shape Nr, units J/mol)
    r1 : :obj:`float`
        Regularizer value r1 (along residues)
    epochs: :obj:`int`
        Maximum number of fitting iterations
    patience: :obj:`int`
        Number of epochs to wait until termination when progress between epochs is below `stop_loss`
    stop_loss: :obj:`float`
        Threshold for difference in loss between epochs when an epoch is considered to make no more progress.
    optimizer : :obj:`str`
        Which optimizer to use. Default is Stochastic Gradient Descent. See PyTorch documentation for information.
    callbacks: :obj:`list` or None
        List of callback objects. Call signature is callback(epoch, model, optimizer)
    **optimizer_kwargs
        Additional keyword arguments passed to the optimizer.

    Returns
    -------
    result: :class:`~pyhdx.fitting_torch.TorchSingleFitResult`

    """

    fit_keys = ["r1", "epochs", "patience", "stop_loss", "optimizer"]
    locals_dict = locals()
    fit_kwargs = {k: locals_dict[k] for k in fit_keys}

    tensors = hdxm.get_tensors()
    inputs = [tensors[key] for key in ["temperature", "X", "k_int", "timepoints"]]
    output_data = tensors["d_exp"]

    if isinstance(initial_guess, pd.Series):
        assert (
            initial_guess.index.inferred_type == "integer"
        ), "Invalid dtype for initial guess index, must be 'integer'"
        # Map guesses to covered residue range and fill NaN gaps
        initial_guess = initial_guess.reindex(hdxm.coverage.r_number).interpolate(
            limit_direction="both"
        )
        initial_guess = initial_guess.to_numpy()

    assert len(initial_guess) == hdxm.Nr, "Invalid length of initial guesses"
    assert not np.any(np.isnan(initial_guess)), "Initial guess has NaN entries"

    dtype = torch.float64
    dG_par = torch.nn.Parameter(
        torch.tensor(initial_guess, dtype=cfg.TORCH_DTYPE, device=cfg.TORCH_DEVICE).unsqueeze(-1)
    )  # reshape (nr, 1)

    model = DeltaGFit(dG_par)
    criterion = torch.nn.MSELoss(reduction="mean")

    # Take default optimizer kwargs and update them with supplied kwargs
    optimizer_kwargs = {
        **optimizer_defaults.get(optimizer, {}),
        **optimizer_kwargs,
    }  # Take defaults and override with user-specified
    optimizer_klass = getattr(torch.optim, optimizer)

    reg_func = partial(regularizer_1d, r1)

    # returned_model is the same object as model
    losses_array, returned_model = run_optimizer(
        inputs,
        output_data,
        optimizer_klass,
        optimizer_kwargs,
        model,
        criterion,
        reg_func,
        epochs=epochs,
        patience=patience,
        stop_loss=stop_loss,
        callbacks=callbacks,
    )
    losses = _loss_df(losses_array)
    fit_kwargs.update(optimizer_kwargs)
    hdxm_set = HDXMeasurementSet([hdxm])
    result = TorchFitResult(hdxm_set, model, losses=losses, **fit_kwargs)

    return result

fit_gibbs_global_batch(hdx_set, initial_guess, r1=R1, r2=R2, r2_reference=False, epochs=EPOCHS, patience=PATIENCE, stop_loss=STOP_LOSS, optimizer='SGD', callbacks=None, **optimizer_kwargs)

Fit Gibbs free energies globally to all D-uptake data in multiple HDX measurements

Parameters

hdx_set : :class:~pyhdx.models.HDXMeasurementSet Input HDX measurements initial_guess : :class:~pandas.Series or :class:~pandas.DataFrame or :class:~numpy.ndarray Gibbs free energy initial guesses (shape Ns x Nr or Nr, units J/mol) r1 : :obj:float Regularizer value r1 (along residues) r2 : :obj:float Regularizer value r2 (along protein states/samples) r2_reference : :obj:bool: If True the first dataset is used as a reference to calculate r2 differences, otherwise the mean is used epochs: :obj:int Maximum number of fitting iterations patience: :obj:int Number of epochs to wait until termination when progress between epochs is below stop_loss stop_loss: :obj:float Threshold for difference in loss between epochs when an epoch is considered to make no more progress. optimizer : :obj:str Which optimizer to use. Default is Stochastic Gradient Descent. See PyTorch documentation for information. callbacks: :obj:list or None List of callback objects. Call signature is callback(epoch, model, optimizer) **optimizer_kwargs Additional keyword arguments passed to the optimizer.

Returns

result: :class:~pyhdx.fitting_torch.TorchBatchFitResult

Source code in pyhdx/fitting.py
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
def fit_gibbs_global_batch(
    hdx_set,
    initial_guess,
    r1=R1,
    r2=R2,
    r2_reference=False,
    epochs=EPOCHS,
    patience=PATIENCE,
    stop_loss=STOP_LOSS,
    optimizer="SGD",
    callbacks=None,
    **optimizer_kwargs,
):
    """
    Fit Gibbs free energies globally to all D-uptake data in multiple HDX measurements

    Parameters
    ----------
    hdx_set : :class:`~pyhdx.models.HDXMeasurementSet`
        Input HDX measurements
    initial_guess : :class:`~pandas.Series` or :class:`~pandas.DataFrame` or :class:`~numpy.ndarray`
        Gibbs free energy initial guesses (shape Ns x Nr or Nr, units J/mol)
    r1 : :obj:`float`
        Regularizer value r1 (along residues)
    r2 : :obj:`float`
        Regularizer value r2 (along protein states/samples)
    r2_reference : :obj:`bool`:
        If `True` the first dataset is used as a reference to calculate r2 differences, otherwise the mean is used
    epochs: :obj:`int`
        Maximum number of fitting iterations
    patience: :obj:`int`
        Number of epochs to wait until termination when progress between epochs is below `stop_loss`
    stop_loss: :obj:`float`
        Threshold for difference in loss between epochs when an epoch is considered to make no more progress.
    optimizer : :obj:`str`
        Which optimizer to use. Default is Stochastic Gradient Descent. See PyTorch documentation for information.
    callbacks: :obj:`list` or None
        List of callback objects. Call signature is callback(epoch, model, optimizer)
    **optimizer_kwargs
        Additional keyword arguments passed to the optimizer.

    Returns
    -------
    result: :class:`~pyhdx.fitting_torch.TorchBatchFitResult`

    """
    # todo still some repeated code with fit_gibbs single

    fit_keys = [
        "r1",
        "r2",
        "r2_reference",
        "epochs",
        "patience",
        "stop_loss",
        "optimizer",
        "callbacks",
    ]
    locals_dict = locals()
    fit_kwargs = {k: locals_dict[k] for k in fit_keys}

    if r2_reference:
        reg_func = partial(regularizer_2d_reference, r1, r2)
    else:
        reg_func = partial(regularizer_2d_mean, r1, r2)

    return _batch_fit(hdx_set, initial_guess, reg_func, fit_kwargs, optimizer_kwargs)

fit_gibbs_global_batch_aligned(hdx_set, initial_guess, r1=R1, r2=R2, epochs=EPOCHS, patience=PATIENCE, stop_loss=STOP_LOSS, optimizer='SGD', callbacks=None, **optimizer_kwargs)

Batch fit gibbs free energies to two HDX measurements. The supplied HDXMeasurementSet must have alignment information (supplied by HDXMeasurementSet.add_alignment)

Parameters

hdx_set : :class:~pyhdx.models.HDXMeasurementSet Input HDX measurements initial_guess : :class:~pandas.Series or :class:~pandas.DataFrame or :class:~numpy.ndarray Gibbs free energy initial guesses (shape Ns x Nr or Nr, units J/mol) r1 : :obj:float Regularizer value r1 (along residues) r2 : :obj:float Regularizer value r2 (along protein states/samples) epochs: :obj:int Maximum number of fitting iterations patience: :obj:int Number of epochs to wait until termination when progress between epochs is below stop_loss stop_loss: :obj:float Threshold for difference in loss between epochs when an epoch is considered to make no more progress. optimizer : :obj:str Which optimizer to use. Default is Stochastic Gradient Descent. See PyTorch documentation for information. callbacks: :obj:list or None List of callback objects. Call signature is callback(epoch, model, optimizer) **optimizer_kwargs Additional keyword arguments passed to the optimizer.

Returns

result: :class:~pyhdx.fitting_torch.TorchBatchFitResult

Source code in pyhdx/fitting.py
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
def fit_gibbs_global_batch_aligned(
    hdx_set,
    initial_guess,
    r1=R1,
    r2=R2,
    epochs=EPOCHS,
    patience=PATIENCE,
    stop_loss=STOP_LOSS,
    optimizer="SGD",
    callbacks=None,
    **optimizer_kwargs,
):
    """
    Batch fit gibbs free energies to two HDX measurements. The supplied HDXMeasurementSet must have alignment information
    (supplied by HDXMeasurementSet.add_alignment)

    Parameters
    ----------
    hdx_set : :class:`~pyhdx.models.HDXMeasurementSet`
        Input HDX measurements
    initial_guess : :class:`~pandas.Series` or :class:`~pandas.DataFrame` or :class:`~numpy.ndarray`
        Gibbs free energy initial guesses (shape Ns x Nr or Nr, units J/mol)
    r1 : :obj:`float`
        Regularizer value r1 (along residues)
    r2 : :obj:`float`
        Regularizer value r2 (along protein states/samples)
    epochs: :obj:`int`
        Maximum number of fitting iterations
    patience: :obj:`int`
        Number of epochs to wait until termination when progress between epochs is below `stop_loss`
    stop_loss: :obj:`float`
        Threshold for difference in loss between epochs when an epoch is considered to make no more progress.
    optimizer : :obj:`str`
        Which optimizer to use. Default is Stochastic Gradient Descent. See PyTorch documentation for information.
    callbacks: :obj:`list` or None
        List of callback objects. Call signature is callback(epoch, model, optimizer)
    **optimizer_kwargs
        Additional keyword arguments passed to the optimizer.

    Returns
    -------
    result: :class:`~pyhdx.fitting_torch.TorchBatchFitResult`

    """
    # todo expand to N proteins

    assert hdx_set.Ns == 2, "Aligned batch fitting is limited to two states"
    if hdx_set.aligned_indices is None:
        raise ValueError("No alignment added to HDX measurements")

    indices = [torch.tensor(i, dtype=torch.long) for i in hdx_set.aligned_indices]
    reg_func = partial(regularizer_2d_aligned, r1, r2, indices)

    fit_keys = ["r1", "r2", "epochs", "patience", "stop_loss", "optimizer", "callbacks"]
    locals_dict = locals()
    fit_kwargs = {k: locals_dict[k] for k in fit_keys}

    return _batch_fit(hdx_set, initial_guess, reg_func, fit_kwargs, optimizer_kwargs)

fit_kinetics(t, d, model, chisq_thd=100)

Fit time kinetics with two time components and corresponding relative amplitude.

Parameters

t : :class:~numpy.ndarray Array of time points d : :class:~numpy.ndarray Array of uptake values model : :class:~pyhdx.fit_models.KineticsModel chisq_thd : :obj:float Threshold chi squared above which the fitting is repeated with the Differential Evolution algorithm.

Returns

res : :class:~symfit.FitResults Symfit fitresults object.

Source code in pyhdx/fitting.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def fit_kinetics(t, d, model, chisq_thd=100):
    """
    Fit time kinetics with two time components and corresponding relative amplitude.

    Parameters
    ----------
    t : :class:`~numpy.ndarray`
        Array of time points
    d : :class:`~numpy.ndarray`
        Array of uptake values
    model : :class:`~pyhdx.fit_models.KineticsModel`
    chisq_thd : :obj:`float`
        Threshold chi squared above which the fitting is repeated with the Differential Evolution algorithm.

    Returns
    -------
    res : :class:`~symfit.FitResults`
        Symfit fitresults object.
    """

    if np.any(np.isnan(d)):
        raise ValueError("There shouldnt be NaNs anymore")
        er = EmptyResult(np.nan, {p.name: np.nan for p in model.sf_model.params})
        return er

    model.initial_guess(t, d)
    with temporary_seed(43):
        fit = Fit(model.sf_model, t, d, minimizer=Powell)
        res = fit.execute()

        if (
            not check_bounds(res)
            or np.any(np.isnan(list(res.params.values())))
            or res.chi_squared > chisq_thd
        ):
            fit = Fit(model.sf_model, t, d, minimizer=DifferentialEvolution)
            # grid = model.initial_grid(t, d, step=5)
            res = fit.execute()

    return res

fit_rates(hdxm, method='wt_avg', **kwargs)

Fit observed rates of exchange to HDX-MS data in hdxm

Parameters

hdxm : :class:~pyhdx.models.HDXMeasurement method : :obj:str Method to use to determine rates of exchange kwargs Additional kwargs passed to fitting

Returns

fit_result : :class:~pyhdx.fitting.KineticsFitResult

Source code in pyhdx/fitting.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
def fit_rates(hdxm, method="wt_avg", **kwargs):
    """
    Fit observed rates of exchange to HDX-MS data in `hdxm`

    Parameters
    ----------
    hdxm : :class:`~pyhdx.models.HDXMeasurement`
    method : :obj:`str`
        Method to use to determine rates of exchange
    kwargs
        Additional kwargs passed to fitting

    Returns
    -------

    fit_result : :class:`~pyhdx.fitting.KineticsFitResult`

    """

    if method == "wt_avg":
        result = fit_rates_weighted_average(hdxm, **kwargs)
    else:
        raise ValueError(f"Invalid value for 'method': {method}")

    return result

fit_rates_half_time_interpolate(hdxm)

Calculates exchange rates based on weighted averaging followed by interpolation to determine half-time, which is then calculated to rates.

Parameters

hdxm : :class:~pyhdx.models.HDXMeasurement

Returns

dataclass

dataclass with fit result

Source code in pyhdx/fitting.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def fit_rates_half_time_interpolate(hdxm):
    """
    Calculates exchange rates based on weighted averaging followed by interpolation to determine half-time, which is
    then calculated to rates.

    Parameters
    ----------

    hdxm : :class:`~pyhdx.models.HDXMeasurement`


    Returns
    -------

    output: dataclass
        dataclass with fit result

    """
    # find t_50
    interpolated = np.array(
        [np.interp(0.5, d_uptake, hdxm.timepoints) for d_uptake in hdxm.rfu_residues.to_numpy()]
    )  # iterate over residues
    rate = np.log(2) / interpolated  # convert to rate

    output = pd.DataFrame({"rate": rate}, index=hdxm.coverage.r_number)
    result = GenericFitResult(
        output=output, fit_function="fit_rates_half_time_interpolate", name=hdxm.name
    )

    return result

fit_rates_weighted_average(hdxm, bounds=None, chisq_thd=0.2, model_type='association', client=None, pbar=None)

Fit a model specified by 'model_type' to D-uptake kinetics. D-uptake is weighted averaged across peptides per timepoint to obtain residue-level D-uptake.

Parameters

hdxm : :class:~pyhdx.models.HDXMeasurement bounds : :obj:tuple, optional Tuple of lower and upper bounds of rate constants in the model used. chisq_thd : :obj:float Threshold of chi squared result, values above will trigger a second round of fitting using DifferentialEvolution model_type : :obj:str Missing docstring client : : ?? Controls delegation of fitting tasks to Dask clusters. Options are: None: Do not use task, fitting is done in the local thread in a for loop. :class: Dask Client : Uses the supplied Dask client to schedule fitting task. worker_client: The function was ran by a Dask worker and the additional fitting tasks created are scheduled on the same Cluster. pbar: Not implemented

Returns

fit_result : :class:~pyhdx.fitting.KineticsFitResult

Source code in pyhdx/fitting.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def fit_rates_weighted_average(
    hdxm, bounds=None, chisq_thd=0.20, model_type="association", client=None, pbar=None
):
    """
    Fit a model specified by 'model_type' to D-uptake kinetics. D-uptake is weighted averaged across peptides per
    timepoint to obtain residue-level D-uptake.

    Parameters
    ----------
    hdxm : :class:`~pyhdx.models.HDXMeasurement`
    bounds : :obj:`tuple`, optional
        Tuple of lower and upper bounds of rate constants in the model used.
    chisq_thd : :obj:`float`
        Threshold of chi squared result, values above will trigger a second round of fitting using DifferentialEvolution
    model_type : :obj:`str`
        Missing docstring
    client : : ??
        Controls delegation of fitting tasks to Dask clusters. Options are: `None`: Do not use task, fitting is done
        in the local thread in a for loop. :class: Dask Client : Uses the supplied Dask client to schedule fitting task.
        `worker_client`: The function was ran by a Dask worker and the additional fitting tasks created are scheduled
        on the same Cluster.
    pbar:
        Not implemented

    Returns
    -------

    fit_result : :class:`~pyhdx.fitting.KineticsFitResult`

    """
    d_list, intervals, models = _prepare_wt_avg_fit(hdxm, model_type=model_type, bounds=bounds)
    if pbar:
        raise NotImplementedError()
    else:
        inc = lambda: None

    results = []

    if client is None:
        for d, model in zip(d_list, models):
            result = fit_kinetics(hdxm.timepoints, d, model, chisq_thd=chisq_thd)
            results.append(result)
    else:
        iterables = [[hdxm.timepoints] * len(d_list), d_list, models]

        if isinstance(client, Client):
            futures = client.map(fit_kinetics, *iterables, chisq_thd=chisq_thd)
            results = client.gather(futures)
        elif client == "worker_client":
            with worker_client() as client:
                futures = client.map(fit_kinetics, *iterables, chisq_thd=chisq_thd)
                results = client.gather(futures)

    fit_result = KineticsFitResult(hdxm, intervals, results, models)

    return fit_result

get_bounds(times)

estimate default bound for rate fitting from a series of timepoints

Parameters

times : array_like

Returns

bounds : :obj:tuple lower and upper bounds

Source code in pyhdx/fitting.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def get_bounds(times):
    """
    estimate default bound for rate fitting from a series of timepoints

    Parameters
    ----------
    times : array_like

    Returns
    -------
    bounds : :obj:`tuple`
        lower and upper bounds

    """
    # todo document

    nonzero_times = times[np.nonzero(times)]
    t_first = np.min(nonzero_times)
    t_last = np.max(nonzero_times)
    b_upper = 50 * np.log(2) / t_first
    b_lower = np.log(2) / (t_last * 50)

    return b_lower, b_upper

run_optimizer(inputs, output_data, optimizer_klass, optimizer_kwargs, model, criterion, regularizer, epochs=EPOCHS, patience=PATIENCE, stop_loss=STOP_LOSS, callbacks=None, verbose=True)

Runs optimization/fitting of PyTorch model.

Parameters

inputs : :obj:list List of input Tensors output_data : :class:~torch.Tensor comparison data to model output optimizer_klass : :mod:~torch.optim optimizer_kwargs : :obj:dict kwargs to pass to pytorch optimizer model : :class:~torch.nn.Module pytorch model criterion: callable loss function regularizer callable regularizer function epochs : :obj:int Max number of epochs patience : :obj:int Number of epochs with less progress than stop_loss before terminating optimization stop_loss : :obj:float Threshold of optimization value below which no progress is made callbacks: :obj:list or None List of callback functions verbose : :obj:bool Toggle progress bar

Returns

Source code in pyhdx/fitting.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
def run_optimizer(
    inputs,
    output_data,
    optimizer_klass,
    optimizer_kwargs,
    model,
    criterion,
    regularizer,
    epochs=EPOCHS,
    patience=PATIENCE,
    stop_loss=STOP_LOSS,
    callbacks=None,
    verbose=True,
):
    """

    Runs optimization/fitting of PyTorch model.

    Parameters
    ----------
    inputs : :obj:`list`
        List of input Tensors
    output_data : :class:`~torch.Tensor`
        comparison data to model output
    optimizer_klass : :mod:`~torch.optim`
    optimizer_kwargs : :obj:`dict`
        kwargs to pass to pytorch optimizer
    model : :class:`~torch.nn.Module`
        pytorch model
    criterion: callable
        loss function
    regularizer callable
        regularizer function
    epochs : :obj:`int`
        Max number of epochs
    patience : :obj:`int`
        Number of epochs with less progress than `stop_loss` before terminating optimization
    stop_loss : :obj:`float`
        Threshold of optimization value below which no progress is made
    callbacks: :obj:`list` or `None`
        List of callback functions
    verbose : :obj:`bool`
        Toggle progress bar

    Returns
    -------

    """

    optimizer_obj = optimizer_klass(model.parameters(), **optimizer_kwargs)

    # todo these seeds should be temporary
    np.random.seed(43)
    torch.manual_seed(43)

    callbacks = callbacks or []
    losses_list = [[np.inf]]

    def closure():
        output = model(*inputs)
        loss = criterion(output, output_data)
        losses_list.append([loss.item()])  # store mse loss
        reg_loss_tuple = regularizer(model.dG)
        for r in reg_loss_tuple:
            loss += r

        losses_list[-1] += [r.item() for r in reg_loss_tuple]  # store reg losses

        loss.backward()
        return loss

    stop = 0
    iter = trange(epochs) if verbose else range(epochs)
    for epoch in iter:
        optimizer_obj.zero_grad()
        loss = optimizer_obj.step(closure)

        for cb in callbacks:
            cb(epoch, model, optimizer_obj)

        diff = sum(losses_list[-2]) - sum(losses_list[-1])
        if diff < stop_loss:
            stop += 1
            if stop > patience:
                break
        else:
            stop = 0

    return np.array(losses_list[1:]), model