Source code for parpe.misc

"""Various helper functions"""
import h5py
import numpy as np
from typing import Optional, Union, Iterable, List, Any
import sys
import importlib
from petab.C import SIMULATION, MEASUREMENT


"""
TODO test    
    import tempfile
    tempfile.mktemp()
"""

from .plotting import correlation_coefficient


[docs] def get_amici_model(model_name: str, model_dir: Optional[str] = None) -> 'amici.Model': """Load AMICI model module with given name from given path and return a model instance Arguments: model_name: Name of the model module model_dir: Path to directory containing the AMICI model module Returns: The given model instance """ sys.path.insert(0, model_dir) model_module = importlib.import_module(model_name) return model_module.getModel()
[docs] def get_global_name_for_local_parameter( sbml_model: 'libsbml.Model', needle_parameter_id: str) -> Union[str, None]: """Find a local SBML parameter in kinetic laws and return {reaction_id}_{local_parameter_id}""" for reaction in sbml_model.getListOfReactions(): kl = reaction.getKineticLaw() for p in kl.getListOfParameters(): parameter_id = p.getId() if parameter_id.endswith(needle_parameter_id): return f'{reaction.getId()}_{parameter_id}' return None
[docs] def unique_ordered(seq: Iterable[Any]) -> List[Any]: """ Make unique, preserving order of first occurrence Arguments: seq: any sequence """ seen = set() seen_add = seen.add return [x for x in seq if not (x in seen or seen_add(x))]
[docs] def getCostTrajectories(filename, starts = None): """ Read cost trajectory from HDF5 result file. Arguments: filename: result file name starts: list with indices or None meaning all starts """ with h5py.File(filename, 'r') as f: trajectories = None if starts: for ms in starts: trajectory = np.transpose(f['/multistarts/%s/iterCostFunCost'%ms][:]) trajectories = concatenateStarts(trajectories, trajectory) else: for mspath in f['/multistarts/']: trajectory = np.transpose(f['/multistarts/%s/iterCostFunCost'%mspath][:]) trajectories = concatenateStarts(trajectories, trajectory) return trajectories
[docs] def getCostTrajectory2(filename): """Read cost trajectory from file #Arguments: #filename: HDF5 file as generated by misc/extractMultiStartParameters.py Returns: ndarray(numIterations x numStarts): cost over iterations for all optimizations """ with h5py.File(filename, 'r') as f: numStarts = len(f['/multistarts']) # should take max of present ones numIter = 0 # find max iter for ms in range(numStarts): ms = int(ms) numIter = np.max( [len(f['/multistarts/%d/iteration' % ms]), numIter]) costTrajectory = np.full(shape=(numIter, numStarts), fill_value=np.nan) for ms in range(numStarts): for iteration in range(numIter): try: if not '/multistarts/%d/iteration/%d/costFunCost' % ( ms, iteration) in f: continue value = np.nanmin(f[ '/multistarts/%d/iteration/%d/costFunCost' % ( ms, iteration)]) costTrajectory[iteration, ms] = value except Exception as e: print(e) return costTrajectory
[docs] def getCostTrajectoryFromSummary(filename): """Read cost trajectory from file Arguments: filename: HDF5 file as generated by misc/extractMultiStartParameters.py Returns: ndarray(numIterations x numStarts): cost over iterations for all optimizations """ with h5py.File(filename, 'r') as pe_summary: costTrajectory = pe_summary['/costTrajectory'][:] return costTrajectory
[docs] def concatenateStarts(a, b): """Concatenate two optimization trajectory matrices (numIteration x numStarts). Dimensions can be different for a and b. """ if a is None or not a.size: return b if b is None or not b.size: return a maxIter = max(a.shape[0], b.shape[0]) a = np.pad(a, pad_width=[(0, maxIter - a.shape[0]), (0, 0)], mode='constant', constant_values=np.nan) b = np.pad(b, pad_width=[(0, maxIter - b.shape[0]), (0, 0)], mode='constant', constant_values=np.nan) concat = np.concatenate((a, b), axis=1) return concat
[docs] def readSimulationsFromFile(filename): """ Read result from simulations at final point from data file for all starts Returns: (measured, simulated, time, llh)[startString] [nCondition][nTimepoints, nObservables] """ sim = {} mes = {} llh = {} time = {} with h5py.File(filename, 'r') as f: for ms in f['/multistarts']: try: llh[ms] = f[f'/multistarts/{ms}/llh'][:] except OSError: print(f'Getting the data for start {ms} failed.') continue mes[ms] = [] sim[ms] = [] time[ms] = [] for condition_idx in range(len(f[f'/multistarts/{ms}/t'])): mes[ms].append(f[f'/multistarts/{ms}/yMes/{condition_idx}'][:]) sim[ms].append(f[f'/multistarts/{ms}/ySim/{condition_idx}'][:]) time[ms].append(f[f'/multistarts/{ms}/t/{condition_idx}'][:]) return mes, sim, time, llh
[docs] def readSimulationsFromFileLegacy(filename): """ Read result from simulations at final point from data file for all starts Returns: (measure, simulated)[startString][nCondition, nTimepoints, nObservables] """ sim = {} mes = {} llh = {} with h5py.File(filename, 'r') as f: for ms in f['/multistarts']: mes[ms] = f['/multistarts/%s/yMes'%ms][:] sim[ms] = f['/multistarts/%s/ySim'%ms][:] llh[ms] = f['/multistarts/%s/llh'%ms][:] return mes, sim, llh
[docs] def getConditionNames(filename): with h5py.File(filename, 'r') as f: if '/inputData/fixedParameters/conditionNames' in f: conditionNames = f['/inputData/fixedParameters/conditionNames'][:] else: conditionNames = f['/fixedParameters/conditionNames'][:] return conditionNames
[docs] def getFinalCostFromSummary(filename): """Read final cost from file Arguments: filename: HDF5 file as generated by misc/extractMultiStartParameters.py Returns: ndarray(numStarts): cost at last iteration for all optimizations """ with h5py.File(filename, 'r') as pe_summary: finalCost = pe_summary['/finalCost'][:] return finalCost
[docs] def getCorrTable(mes, sim, minimum_number_datapoints=0): """Generate correlation table by observable Arguments: simulationResults: simulationResults as obtained from parpe.readSimulationsFromFile Returns: ndarray with correlations of measurement and simulation for each start and observables in simulationResults """ numStarts = len(mes) numCond = len(list(mes.values())[0]) numObs = list(mes.values())[0][0].shape[1] corr = np.full((numStarts, numObs), np.nan) for ims, ms in enumerate(mes): ymes = None ysim = None for icond in range(numCond): if ymes is None: ymes = mes[ms][icond] ysim = sim[ms][icond] else: ymes = np.append(ymes, mes[ms][icond], axis=0) ysim = np.append(ysim, sim[ms][icond], axis=0) for iobservable in range(numObs): if np.sum(np.isfinite( ymes[:, iobservable])) < minimum_number_datapoints: corr[ims, iobservable] = np.nan continue corr[ims, iobservable] = correlation_coefficient( ymes[:, iobservable], ysim[:, iobservable]) return corr
[docs] def getCorrTableLegacy(simulationResults, minimum_number_datapoints=0): """Generate correlation table by observable Old data format Arguments: simulationResults: simulationResults as obtained from parpe.readSimulationsFromFile Returns: ndarray with correlations of measurement and simulation for each start and observables in simulationResults """ numStarts = len(simulationResults[0]) numObs = list(simulationResults[0].values())[0].shape[2] corr = np.full((numStarts, numObs), np.nan) for ims, ms in enumerate(simulationResults[0]): ymes = simulationResults[0][ms] ysim = simulationResults[1][ms] for observable in range(ymes.shape[2]): if np.sum(np.isfinite( ymes[:, :, observable])) < minimum_number_datapoints: corr[ims, observable] = np.nan continue corr[ims, observable] = correlation_coefficient( ymes[:, :, observable], ysim[:, :, observable]) return corr
[docs] class ParameterEstimationSummaryFile: """Interface to a parPE parameter estimation summary file as created with misc/extractMultiStartParameters.py """ pass
[docs] class ParameterEstimationResultFile: """Interface to a parPE parameter estimation result file""" pass
[docs] def simulation_to_df(mes_df, sim, result_file, start, observable_ids, root_path: str = "/"): """Put simulation results in new PEtab simulation df based on PEtab measurement df mes_df: petab measurement dataframe on which optimization was based sim: simulation results obtained by readSimulationsFromFile() result_file: HDF5-file with optimization results (or input HDF5-file) start: start for which simulation results should be taken observable_ids: observable ids which should be considered root_path: root path to where group fixedParameters is saved in HDF5file """ with h5py.File(result_file, 'r') as f: condition_names = f[root_path + 'fixedParameters/conditionNames'][:] simulation_conditions = f[root_path + 'fixedParameters/simulationConditions'][ :] cond_id_to_idx = {id_: idx for idx, id_ in enumerate(condition_names)} cond_comb_to_idx = { (condition_names[preeq_idx] if not np.isnan( preeq_idx) and preeq_idx >= 0 else -1.0, condition_names[sim_idx] if sim_idx >= 0 else -1.0): comb_idx for comb_idx, (preeq_idx, sim_idx, _) in enumerate( simulation_conditions)} obs_id_to_idx = {id_: idx for idx, id_ in enumerate(observable_ids)} mes_df[MEASUREMENT] = np.nan mes_df = mes_df.rename(columns={MEASUREMENT: SIMULATION}) time_tracker = {} for _, row in mes_df.iterrows(): if np.isnan(row.preequilibrationConditionId): row.preequilibrationConditionId = -1 condition_idx = cond_comb_to_idx[ row.preequilibrationConditionId, row.simulationConditionId] observable_idx = obs_id_to_idx[row.observableId] """ Replicate simulations will be identical, but lets consider them""" try: time_tracker[(condition_idx, observable_idx, row.time)] += 1 except KeyError: time_tracker[(condition_idx, observable_idx, row.time)] = 0 time_idx = time_tracker[(condition_idx, observable_idx, row.time)] # time_idx = 0 mes_df.loc[row.name, SIMULATION] = \ sim[start][condition_idx][time_idx, observable_idx] return mes_df
[docs] def compare_optimization_results_to_true_parameters( filename: str, start_idx: str = '0'): """Compare parameter estimates to true parameters. Print as table. Used in example notebooks. Arguments: filename: Parameter estimation result file name start_idx: optimizer run index/name to use """ with h5py.File(filename, 'r') as f: pscale = f['/inputData/parameters/pscaleOptimization'][:] names = f['/inputData/parameters/parameterNames'][:] true_parameters = f['/inputData/parameters/true_parameters'][:] expectedNllh = -f['/inputData/parameters/true_llh'][:] final_parameters = f[f'/multistarts/{start_idx}/finalParameters'][:] exit_status = f[f'/multistarts/{start_idx}/exitStatus'][:] final_cost = f[f'/multistarts/{start_idx}/finalCost'][:] for i, p in enumerate(pscale): if p == 2: final_parameters[i] = np.power(10, final_parameters[i]) print("# __Exp____ __Act______ __Err______ __RelErr___ __ID_______") for i in range(len(true_parameters)): error = final_parameters[i]-true_parameters[i] rel_error = error / true_parameters[i] print('%d: %9.5f %11.5f %11.5f %11.5f %s' % (i, true_parameters[i], final_parameters[i], error, rel_error, names[i])) print() print('Status: %d' % exit_status) # FIXME: expectedNllh not correctly written to file print('Cost: %f (expected: %f)' % (final_cost, expectedNllh))