Source code for parpe.hdf5_pe_input

"""Functions for generating parPE parameter estimation HDF5 input files"""
import argparse
import logging
import sys
from numbers import Number
from typing import Any, Collection, Optional, Dict, Tuple, Iterator

import amici
import coloredlogs
import h5py
import numpy as np
import pandas as pd
import petab
from amici.petab_import import PREEQ_INDICATOR_ID
from amici.petab_import import petab_scale_to_amici_scale
from colorama import Fore
from colorama import init as init_colorama
from pandas import DataFrame
# noinspection PyPep8Naming
from petab import C as ptc
from petab import to_float_if_float

from .hdf5 import write_string_array
from .hierarchical_optimization import (get_candidates_for_hierarchical,
                                        get_analytical_parameter_table)
from .misc import get_amici_model, unique_ordered

logger = logging.getLogger(__name__)

# TODO: transpose all datasets?


# For condition mapping table in HDF5 file: value indicating no
# reference/preequilibration condition
NO_PREEQ_CONDITION_IDX: int = -1

# For parameter mapping table in HDF5 file: value indicating unmapped model
# parameter in opt<->sim mapping
UNMAPPED_PARAMETER: int = -1


[docs] def requires_preequilibration(measurement_df: DataFrame) -> bool: return ptc.PREEQUILIBRATION_CONDITION_ID in measurement_df \ and not np.issubdtype( measurement_df[ptc.PREEQUILIBRATION_CONDITION_ID].dtype, np.number)
[docs] class HDF5DataGenerator: """ Generate HDF5 file with fixed parameters and measurements for an AMICI-imported SBML model based on a PEtab problem. Attributes: amici_model: AMICI model for which to estimate parameters petab_problem: PEtab optimization problem (will be modified in place) compression: h5py compression to be used condition_ids: numpy.array condition IDs (different condition vectors, both simulation and preequilibration) num_condition_vectors: Number of condition vectors, including simulation and preequilibration. Not necessarily equal to the number of simulations. unique_timepoints: time points for which there is data f: h5py.File hdf5 file which is being created """
[docs] def __init__(self, petab_problem: petab.Problem, amici_model: amici.Model, verbose=1): """ fileNameSBML: filename of model SBML file (PEtab-style) fileMeasurements: filename of measurement file fileFixedParameters: filename with AMICI fixed parameter vectors for all conditions referred to in the measurement file fileParameters: PEtab parameter table filename """ self.condition_ids = None self.f: Optional[h5py.File] = None self.num_condition_vectors: int = 0 self.unique_timepoints = None self.parameter_mapping: Optional[petab.ParMappingDict] = None self.parameter_scale_mapping: Optional[petab.ScaleMappingDict] = None self.optimization_parameter_name_to_index: Dict[str, int] = {} self.nk: int = 0 self.condition_map = None self.observable_ids = None self.ny: int = 0 self.verbose = verbose self.petab_problem: petab.Problem = petab_problem self.amici_model: amici.Model = amici_model # ensure we have valid inputs petab.lint_problem(self.petab_problem) # index for no reference/preequilibration condition self.NO_PREEQ_CONDITION_IDX: int = NO_PREEQ_CONDITION_IDX # value for unmapped model parameter in opt<->sim mapping self.UNMAPPED_PARAMETER: int = UNMAPPED_PARAMETER # hdf5 dataset compression self.compression = "gzip"
[docs] def generate_file(self, hdf5_file_name: str) -> None: """ Create the output file Arguments: hdf5_file_name: filename of HDF5 file that is to be generated """ self.f = h5py.File(hdf5_file_name, "w") self._save_metadata() self._collect_simulation_conditions() self._process_condition_table() logger.info(Fore.GREEN + "Generating simulation condition list...") self._generate_simulation_condition_map() logger.info(Fore.GREEN + "Generating parameter list...") self._generate_parameter_list() self._generate_simulation_to_optimization_parameter_mapping() logger.info(Fore.GREEN + "Generating measurement matrix...") self._generate_measurement_matrices() logger.info(Fore.GREEN + "Handling scaling parameters...") self._generate_hierarchical_optimization_data() logger.info(Fore.GREEN + "Copying default AMICI options...") self._write_amici_options() logger.info(Fore.GREEN + "Writing default optimization options...") write_optimization_options(self.f) self._write_bounds() self._write_starting_points()
def _collect_simulation_conditions(self) -> None: """ Read cost_fun file and determine number of conditions and timepoints """ measurement_df = self.petab_problem.measurement_df if self.verbose: logger.info(f"{Fore.CYAN}Number of data points:{Fore.RESET} " f"{measurement_df.shape[0]}") # Get list of used conditions vectors self.condition_ids = unique_ordered( measurement_df[ptc.SIMULATION_CONDITION_ID].values) if requires_preequilibration(measurement_df): self.condition_ids = unique_ordered( [*self.condition_ids, *measurement_df[ ptc.PREEQUILIBRATION_CONDITION_ID].values]) self.num_condition_vectors = len(self.condition_ids) self.condition_id_to_index = {name: idx for idx, name in enumerate(self.condition_ids)} # list of unique timepoints, just for info and AMICI model default # setting self.unique_timepoints = sorted(measurement_df[ptc.TIME].unique()) logger.info(f"{Fore.CYAN}Num condition vectors:{Fore.RESET} " f"{self.num_condition_vectors}") logger.info(f"{Fore.CYAN}Num timepoints:{Fore.RESET} " f"{len(self.unique_timepoints)}") logger.debug(f"Timepoints: {self.unique_timepoints}") def _process_condition_table(self) -> None: """ Load file and select relevant conditions """ condition_df = self.petab_problem.condition_df if ptc.PARAMETER_NAME in condition_df: # we don't need that, so we drop it that we don't need to care # later on condition_df.drop(columns=ptc.PARAMETER_NAME) logger.info(f"{Fore.CYAN}Conditions original:{Fore.RESET} " f"{condition_df.shape}") # drop conditions that do not have measurements drop_rows = [label for label in condition_df.index if label not in self.condition_ids] condition_df.drop(index=drop_rows, inplace=True) logger.info(f"{Fore.CYAN}Conditions with data:{Fore.RESET} " f"{condition_df.shape}") def _save_metadata(self): """Save some extra information in the generated file""" g = self.f.require_group('/metadata') g.attrs['invocation'] = ' '.join(sys.argv) g.attrs['amici_version'] = amici.__version__ g.attrs['petab_version'] = petab.__version__ # TODO: parPE version # g.attrs['parpe_version'] = parpe.__version__ # Model info # Allows for checking in C++ code whether we are likely to use the # correct model g = self.f.require_group('/model') g.attrs['model_name'] = self.amici_model.getName() write_string_array(g, "observableIds", self.amici_model.getObservableIds()) write_string_array(g, "parameterIds", self.amici_model.getParameterIds()) write_string_array(g, "fixedParameterIds", self.amici_model.getFixedParameterIds()) write_string_array(g, "stateIds", self.amici_model.getStateIds()) def _generate_parameter_list(self) -> None: """ Optimization to simulation parameter mapping. Write parameter names. """ # simulation parameters from model model_parameter_ids = np.array(self.amici_model.getParameterIds()) # TODO: rename to "ID" write_string_array(self.f, "/parameters/modelParameterNames", model_parameter_ids) logger.info(f"{Fore.CYAN}Number of model parameters: {Fore.RESET}" f"{len(model_parameter_ids)}") self.problem_parameter_ids = self.petab_problem \ .get_optimization_parameters() # sanity check: estimated parameters should not be AMICI fixed # parameters fixed_opt_pars = set(self.problem_parameter_ids) \ & set(self.amici_model.getFixedParameterIds()) if fixed_opt_pars: raise RuntimeError(f"Parameter {fixed_opt_pars} are to be " "optimized, but are fixed parameters in the " "model. This should not happen.") logger.info(f"{Fore.CYAN}Number of optimization parameters: " f"{Fore.RESET}{len(self.problem_parameter_ids)}") write_string_array(self.f, "/parameters/parameterNames", self.problem_parameter_ids) def _generate_simulation_to_optimization_parameter_mapping(self) -> None: """ Create dataset n_parameters_simulation x n_conditions with indices of respective parameters in parameters_optimization """ # get list of tuple of parameters dicts for all conditions self.parameter_mapping = self.petab_problem \ .get_optimization_to_simulation_parameter_mapping( warn_unmapped=False, scaled_parameters=False, allow_timepoint_specific_numeric_noise_parameters=True) variable_par_ids = self.amici_model.getParameterIds() fixed_par_ids = self.amici_model.getFixedParameterIds() # Translate parameter ID mapping to index mapping # create inverse mapping for faster lookup logger.debug(f"Problem parameters: {self.problem_parameter_ids}") optimization_parameter_name_to_index = { name: idx for idx, name in enumerate(self.problem_parameter_ids)} self.optimization_parameter_name_to_index = \ optimization_parameter_name_to_index # use in-memory matrix, don't write every entry to file directly num_model_parameters = self.amici_model.np() mapping_matrix = np.zeros( shape=(num_model_parameters, self.condition_map.shape[0]), dtype='<i4') override_matrix = np.full(shape=mapping_matrix.shape, fill_value=np.nan) pscale_matrix = np.full(shape=mapping_matrix.shape, dtype='<i4', fill_value=amici.ParameterScaling_none) # AMICI fixed parameters self.nk = len(fixed_par_ids) logger.info(f"{Fore.CYAN}Number of fixed parameters:{Fore.RESET} " f"{len(fixed_par_ids)}") # TODO: can fixed parameter differ between same condition vector used # in different sim x preeq combinations? fixed_parameter_matrix = np.full( shape=(self.nk, self.num_condition_vectors), fill_value=np.nan) # For handling initial states species_in_condition_table = [ col for col in self.petab_problem.condition_df if self.petab_problem.sbml_model.getSpecies(col) is not None] # for reinitialization state_id_to_idx = { id: i for i, id in enumerate(self.amici_model.getStateIds())} state_idxs_reinitialization_all = [] # Merge and preeq and sim parameters, filter fixed parameters for condition_idx, \ (condition_map_preeq, condition_map_sim, condition_scale_map_preeq, condition_scale_map_sim) \ in enumerate(self.parameter_mapping): preeq_cond_idx, sim_cond_idx = self.condition_map[condition_idx] preeq_cond_id = self.condition_ids[preeq_cond_idx] \ if preeq_cond_idx != NO_PREEQ_CONDITION_IDX else None sim_cond_id = self.condition_ids[sim_cond_idx] logger.debug(f"preeq_cond_idx: {preeq_cond_idx}, " f"preeq_cond_id: {preeq_cond_id}, " f"sim_cond_idx: {sim_cond_idx}, " f"sim_cond_id: {sim_cond_id}") if len(condition_map_preeq) != len(condition_scale_map_preeq) \ or len(condition_map_sim) != len(condition_scale_map_sim): raise AssertionError( "Number of parameters and number of parameter " "scales do not match.") if len(condition_map_preeq) \ and len(condition_map_preeq) != len(condition_map_sim): # logger.debug( # f"Preequilibration parameter map: {condition_map_preeq}") # logger.debug(f"Simulation parameter map: {condition_map_sim}") raise AssertionError( "Number of parameters for preequilibration " "and simulation do not match.") # state indices for reinitialization # for current simulation condition state_idxs_for_reinitialization_cur = [] # TODO: requires special handling of initial concentrations if species_in_condition_table: # set indicator fixed parameter for preeq # (we expect here, that this parameter was added during AMICI # model import and that it was not added by the user with a # different meaning...) if preeq_cond_idx != NO_PREEQ_CONDITION_IDX: condition_map_preeq[PREEQ_INDICATOR_ID] = 1.0 condition_scale_map_preeq[PREEQ_INDICATOR_ID] = ptc.LIN condition_map_sim[PREEQ_INDICATOR_ID] = 0.0 condition_scale_map_sim[PREEQ_INDICATOR_ID] = ptc.LIN def _set_initial_concentration(condition_id, species_id, init_par_id, par_map, scale_map): value = petab.to_float_if_float( self.petab_problem.condition_df.loc[ condition_id, species_id]) if isinstance(value, Number): # numeric initial state par_map[init_par_id] = value scale_map[init_par_id] = ptc.LIN else: # parametric initial state try: # try find in mapping par_map[init_par_id] = par_map[value] scale_map[init_par_id] = scale_map[value] except KeyError: # otherwise look up in parameter table if (self.petab_problem.parameter_df.loc[ value, ptc.ESTIMATE] == 0): par_map[init_par_id] = \ self.petab_problem.parameter_df.loc[ value, ptc.NOMINAL_VALUE] else: par_map[init_par_id] = value if (ptc.PARAMETER_SCALE not in self.petab_problem.parameter_df or not self.petab_problem.parameter_df.loc[ value, ptc.PARAMETER_SCALE]): scale_map[init_par_id] = ptc.LIN else: scale_map[init_par_id] = \ self.petab_problem.parameter_df.loc[ value, ptc.PARAMETER_SCALE] for species_id in species_in_condition_table: # for preequilibration init_par_id = f'initial_{species_id}_preeq' # preeq initial parameter is always added during PEtab # import, independently of whether preeq is used. Need to # set dummy value for preeq parameter anyways, as # it is expected in parameter mapping below # (set to 0, not nan, because will be multiplied with # indicator variable in initial assignment) condition_map_sim[init_par_id] = 0.0 condition_scale_map_sim[init_par_id] = ptc.LIN if preeq_cond_idx != NO_PREEQ_CONDITION_IDX: _set_initial_concentration( preeq_cond_id, species_id, init_par_id, condition_map_preeq, condition_scale_map_preeq) # Check if reinitialization is requested value_sim = petab.to_float_if_float( self.petab_problem.condition_df.loc[ sim_cond_id, species_id]) if not isinstance(value_sim, Number) \ or not np.isnan(value_sim): # mark for reinitialization, # unless the value is nan species_idx = state_id_to_idx[species_id] state_idxs_for_reinitialization_cur.append( species_idx) # Set the preequilibration value also for simulation. # Either it will be overwritten in the next step, # or it will not be used anywhere. # Setting it to the same value as for # preequilibration avoids issues with AMICI, # where we cannot provide different values for # dynamic parameter for preequilibration and # simulation. condition_map_sim[init_par_id] = \ condition_map_preeq[init_par_id] condition_scale_map_sim[init_par_id] = \ condition_scale_map_preeq[init_par_id] # for simulation init_par_id = f'initial_{species_id}_sim' _set_initial_concentration( sim_cond_id, species_id, init_par_id, condition_map_sim, condition_scale_map_sim) state_idxs_reinitialization_all.append(state_idxs_for_reinitialization_cur) logger.debug(f"condition_map_preeq: {condition_map_preeq}, " f"condition_map_sim: {condition_map_sim}") # split into fixed and variable parameters: condition_map_preeq_var = condition_scale_map_preeq_var = None condition_map_preeq_fix = None if condition_map_preeq: condition_map_preeq_var, condition_map_preeq_fix = \ _subset_dict(condition_map_preeq, variable_par_ids, fixed_par_ids) condition_scale_map_preeq_var, _ = \ _subset_dict(condition_scale_map_preeq, variable_par_ids, fixed_par_ids) condition_map_sim_var, condition_map_sim_fix = \ _subset_dict(condition_map_sim, variable_par_ids, fixed_par_ids) condition_scale_map_sim_var, condition_scale_map_sim_fix = \ _subset_dict(condition_scale_map_sim, variable_par_ids, fixed_par_ids) if condition_map_preeq: # merge after having removed potentially fixed parameters # which may differ between preequilibration and simulation petab.merge_preeq_and_sim_pars_condition( condition_map_preeq_var, condition_map_sim_var, condition_scale_map_preeq_var, condition_scale_map_sim_var, condition_idx) # mapping for each model parameter for model_parameter_idx, model_parameter_id \ in enumerate(variable_par_ids): mapped_parameter = condition_map_sim[model_parameter_id] mapped_parameter = to_float_if_float(mapped_parameter) try: (mapped_idx, override) = self.get_index_mapping_for_par( mapped_parameter, optimization_parameter_name_to_index) mapping_matrix[model_parameter_idx, condition_idx] = \ mapped_idx override_matrix[model_parameter_idx, condition_idx] = \ override except IndexError as e: logger.critical(f"{Fore.RED}Error in parameter mapping: " f"{Fore.RESET} {e}") logger.critical(f"{model_parameter_idx}, " f"{mapped_parameter}") logger.critical(f"{self.parameter_mapping}") raise e # Set parameter scales for simulation pscale_matrix[:, condition_idx] = list( petab_scale_to_amici_scale(condition_scale_map_sim[amici_id]) for amici_id in variable_par_ids) fixed_parameter_matrix[:, self.condition_map[condition_idx, 1]] = \ np.array([condition_map_sim_fix[par_id] for par_id in fixed_par_ids]) if condition_map_preeq: fixed_parameter_matrix[:, self.condition_map[condition_idx, 0]] = \ np.array([condition_map_preeq_fix[par_id] for par_id in fixed_par_ids]) # write to file self.create_fixed_parameter_dataset_and_write_attributes( fixed_par_ids, fixed_parameter_matrix) self.f.require_dataset('/parameters/pscaleSimulation', shape=pscale_matrix.T.shape, dtype="<i4", data=pscale_matrix.T) write_parameter_map(self.f, mapping_matrix, override_matrix, num_model_parameters, self.compression) # for cost function parameters petab_opt_par_scale = \ self.petab_problem.get_optimization_parameter_scales() pscale_opt_par = np.array( list(petab_scale_to_amici_scale(petab_opt_par_scale[par_id]) for par_id in self.problem_parameter_ids)) self.f.require_dataset('/parameters/pscaleOptimization', shape=pscale_opt_par.shape, dtype="<i4", data=pscale_opt_par) # Ragged array of state indices for reinitialization data_type = h5py.vlen_dtype(np.dtype('int32')) dset = self.f.create_dataset( '/fixedParameters/reinitializationIndices', shape=(self.condition_map.shape[0],), dtype=data_type) for i, state_idxs in enumerate(state_idxs_reinitialization_all): dset[i] = state_idxs logger.debug(f"Reinitialization [{i}]: {dset[i]}") self.f.flush()
[docs] def get_index_mapping_for_par( self, mapped_parameter: Any, optimization_parameter_name_to_index: Dict[str, int] ) -> Tuple[int, float]: """Get index mapping for a given parameter Arguments: mapped_parameter: value mapped to some model parameter optimization_parameter_name_to_index: Dictionary mapping optimization parameter IDs to their position in the optimization parameter vector Returns: Index of parameter to map to, and value for override matrix """ if isinstance(mapped_parameter, str): # actually a mapped optimization parameter return (optimization_parameter_name_to_index[mapped_parameter], np.nan) if np.isnan(mapped_parameter): # This condition does not use any parameter override. # We override with 0.0, NAN will cause AMICI warnings. return self.UNMAPPED_PARAMETER, 1.0 # Have numeric override return self.UNMAPPED_PARAMETER, mapped_parameter
[docs] def create_fixed_parameter_dataset_and_write_attributes( self, fixed_parameter_ids: Collection[str], data) -> h5py.Dataset: """ Create fixed parameters data set and annotations """ g = self.f.require_group("fixedParameters") write_string_array(g, "parameterNames", fixed_parameter_ids) write_string_array(g, "conditionNames", self.condition_ids) # chunked for reading experiment-wise nk = len(fixed_parameter_ids) if len(data): dset = g.create_dataset("k", dtype='f8', chunks=(nk, 1), compression=self.compression, data=data) # set dimension scales g['parameterNames'].make_scale('parameterNames') g['conditionNames'].make_scale('conditionNames') dset.dims[0].attach_scale(g['parameterNames']) dset.dims[1].attach_scale(g['conditionNames']) else: dset = g.create_dataset("k", dtype='f8') return dset
def _generate_simulation_condition_map(self): """ Write index map for independent simulations to be performed (preequilibrationConditionIdx, simulationConditionIdx) referencing the fixed parameter table. Get PEtab simulation conditions, and write to hdf5 dataset. If conditionRef is empty, set to NO_PREEQ_CONDITION_IDX, otherwise to respective condition index. """ # PEtab condition list simulations = \ self.petab_problem.get_simulation_conditions_from_measurement_df() # empty dataset condition_map = np.full(shape=(len(simulations), 2), fill_value=self.NO_PREEQ_CONDITION_IDX) condition_id_to_idx = {condition_id: idx for idx, condition_id in enumerate(self.condition_ids)} if simulations.shape[1] == 1: # preeq always nan, we only need simulation condition id condition_map[:, 1] = \ list(condition_id_to_idx[condition_id] for condition_id in simulations[ptc.SIMULATION_CONDITION_ID]) else: # We need to set both preeq and simulation condition # Mind different ordering preeq/sim sim/preeq here and in PEtab! for sim_idx, (sim_id, preeq_id) \ in enumerate(simulations.iloc[:, 0:2].values): if preeq_id: condition_map[sim_idx, 0] = condition_id_to_idx[preeq_id] condition_map[sim_idx, 1] = condition_id_to_idx[sim_id] logger.info(f"{Fore.CYAN}Number of simulation conditions: {Fore.RESET}" f" {len(simulations)}") self.condition_map = condition_map self.f.create_dataset("/fixedParameters/simulationConditions", dtype="<i4", data=condition_map) def _generate_measurement_matrices(self): """ Generate matrices with training data for conditions, observables and timepoints. Matrices are numbered by simulation conditions, each of dimension of the respective num_timepoints x num_observables. num_observables will always be ny from the model, since this is what is required for AMICI. num_timepoints is condition dependent """ if petab.measurement_table_has_timepoint_specific_mappings( self.petab_problem.measurement_df, allow_scalar_numeric_noise_parameters=True): raise RuntimeError("Timepoint-specific overrides are not yet " "supported.") self.f.create_group("/measurements") self.observable_ids = self.amici_model.getObservableIds() self.ny = self.amici_model.ny write_string_array(self.f, "/measurements/observableNames", self.observable_ids) logger.info(f"{Fore.CYAN}Number of observables:{Fore.RESET} {self.ny}") self.write_measurements() self.f.flush()
[docs] def write_measurements(self): """ Write measurements to hdf5 dataset """ # create inverse mapping for faster lookup observable_id_to_index = { name: idx for idx, name in enumerate(self.observable_ids)} measurement_df = self.petab_problem.measurement_df if ptc.NOISE_PARAMETERS not in measurement_df: measurement_df[ptc.NOISE_PARAMETERS] = np.nan for sim_idx, (preeq_cond_idx, sim_cond_idx) \ in enumerate(self.condition_map): # print("Condition", sim_idx, (preeq_cond_idx, sim_cond_idx)) cur_mes_df = self._get_measurements_for_condition( sim_idx, preeq_cond_idx, sim_cond_idx) # get the required, possibly non-unique (replicates) timepoints grp = cur_mes_df.groupby([ptc.OBSERVABLE_ID, ptc.TIME]).size() \ .reset_index().rename(columns={0: 'sum'}) \ .groupby([ptc.TIME])['sum'].agg(['max']).reset_index() timepoints = np.sort(np.repeat(grp[ptc.TIME], grp['max'])).tolist() mes = np.full(shape=(len(timepoints), self.ny), fill_value=np.nan) sd = mes.copy() # write measurements from each row in measurementDf for index, row in cur_mes_df.iterrows(): observable_idx = observable_id_to_index[row[ptc.OBSERVABLE_ID]] time_idx = timepoints.index(row.time) while not np.isnan(mes[time_idx, observable_idx]): time_idx += 1 if timepoints[time_idx] != row.time: raise AssertionError( "Replicate handling failed for " f'{row[ptc.SIMULATION_CONDITION_ID]} - ' f'{row[ptc.OBSERVABLE_ID]}' f' time {row[ptc.TIME]}\n' + str(cur_mes_df)) mes[time_idx, observable_idx] = float(row[ptc.MEASUREMENT]) sigma = to_float_if_float(row[ptc.NOISE_PARAMETERS]) if isinstance(sigma, Number): sd[time_idx, observable_idx] = sigma # write to file g = self.f.require_group("measurements") g.create_dataset(name=f"y/{sim_idx}", data=mes, dtype='f8', compression=self.compression) g.create_dataset(name=f"ysigma/{sim_idx}", data=sd, dtype='f8', compression=self.compression) g.create_dataset(name=f"t/{sim_idx}", data=timepoints, dtype='f8', compression=self.compression)
def _get_measurements_for_condition(self, sim_idx, preeq_cond_idx, sim_cond_idx): """Get subset of measurement table for the given condition""" measurement_df = self.petab_problem.measurement_df row_filter = measurement_df[ptc.SIMULATION_CONDITION_ID] \ == self.condition_ids[sim_cond_idx] if ptc.PREEQUILIBRATION_CONDITION_ID in measurement_df: if preeq_cond_idx == self.NO_PREEQ_CONDITION_IDX: row_filter &= \ measurement_df[ptc.PREEQUILIBRATION_CONDITION_ID].isnull() else: row_filter &= \ measurement_df[ptc.PREEQUILIBRATION_CONDITION_ID] \ == self.condition_ids[preeq_cond_idx] cur_mes_df = measurement_df.loc[row_filter, :] if not len(cur_mes_df): # Should have been filtered out before raise AssertionError("No measurements for this condition: ", sim_idx, (preeq_cond_idx, sim_cond_idx)) return cur_mes_df def _generate_hierarchical_optimization_data(self, verbose=1): """ Deal with offsets, proportionality factors and sigmas for hierarchical optimization Generate the respective index lists and mappings """ parameter_df = self.petab_problem.parameter_df if 'hierarchicalOptimization' not in parameter_df: logger.warning("Missing hierarchicalOptimization column in " "parameter table. Skipping.") return offset_candidates, scaling_candidates, sigma_candidates = \ get_candidates_for_hierarchical( measurement_df=self.petab_problem.measurement_df, observable_df=self.petab_problem.observable_df, parameter_df=self.petab_problem.parameter_df) logger.info(f"{Fore.CYAN}offset_candidates:{Fore.RESET} " f"{offset_candidates}") logger.info(f"{Fore.CYAN}scaling_candidates:{Fore.RESET} " f"{scaling_candidates}") logger.info(f"{Fore.CYAN}sigma_candidates:{Fore.RESET} " f"{sigma_candidates}") self._handle_proportionality_factors(scaling_candidates) # must call after _handle_proportionality_factors self.handle_offset_parameter(offset_candidates) self._handle_sigmas(sigma_candidates) self._check_hierarchical_optimization_tables() def _check_hierarchical_optimization_tables(self): """Try spotting potential error in hierarchical optimization tables""" if '/scalingParametersMapToObservables' not in self.f: return if '/sigmaParametersMapToObservables' not in self.f: return scaling_map = self.f['/scalingParametersMapToObservables'][:] sigma_map = self.f['/sigmaParametersMapToObservables'][:] df = pd.DataFrame(data=dict(scaling_id=scaling_map[:, 0], condition_id=scaling_map[:, 1], observable_id=scaling_map[:, 2])) df.set_index(['observable_id', 'condition_id'], inplace=True) df2 = pd.DataFrame(data=dict(sigma_id=sigma_map[:, 0], condition_id=sigma_map[:, 1], observable_id=sigma_map[:, 2])) df2.set_index(['observable_id', 'condition_id'], inplace=True) df = df.join(df2) del df2 # TODO: smarter check if df.isnull().values.any(): logger.warning("Couldn't verify that parameter selection " "for hierarchical optimization is ok.") else: df_grouped = \ df.groupby(['scaling_id', 'sigma_id']).size().reset_index() # must be the same, otherwise one scaling is used with # multiple sigma if len(df_grouped) != len(df_grouped.scaling_id.unique()): raise AssertionError( "Scaling parameter selected for hierarchical " "optimization is used with multiple sigmas.") # TODO: same check for offsets
[docs] def handle_offset_parameter(self, offset_candidates): """ Write list of offset parameters selected for hierarchical optimization """ # don't create dataset if it would be empty if not len(offset_candidates): return # find indices for names offsets_for_hierarchical_indices = [ self.optimization_parameter_name_to_index[x] for x in offset_candidates] order = np.argsort(offsets_for_hierarchical_indices) offsets_for_hierarchical_indices = \ [offsets_for_hierarchical_indices[i] for i in order] offset_candidates = [offset_candidates[i] for i in order] logger.info(f"{Fore.CYAN}Number of offset parameters for hierarchical " f"optimization:{Fore.RESET} " f"{len(offsets_for_hierarchical_indices)}") self.f.require_dataset("/offsetParameterIndices", shape=(len(offsets_for_hierarchical_indices),), dtype='<i4', data=offsets_for_hierarchical_indices) # find usages for the selected parameters use = get_analytical_parameter_table( offset_candidates, 'observable', self.condition_id_to_index, self.petab_problem.observable_df, self.petab_problem.measurement_df, self.observable_ids, self.condition_map, self.NO_PREEQ_CONDITION_IDX) self.f.require_dataset("/offsetParametersMapToObservables", shape=(len(use), 3), dtype='<i4', data=use)
def _handle_proportionality_factors(self, scaling_candidates): """ Write datasets specifying which proportionality factors to consider for hierarchical optimization """ if not len(scaling_candidates): return scalings_for_hierarchical_indices = [ self.optimization_parameter_name_to_index[x] for x in scaling_candidates] order = np.argsort(scalings_for_hierarchical_indices) scalings_for_hierarchical_indices = \ [scalings_for_hierarchical_indices[i] for i in order] scaling_candidates = [scaling_candidates[i] for i in order] self.f.require_dataset("/scalingParameterIndices", shape=(len(scalings_for_hierarchical_indices),), dtype='<i4', data=scalings_for_hierarchical_indices) logger.info(f"{Fore.CYAN}Number of proportionality factors for " f"hierarchical optimization:{Fore.RESET} " f"{len(scalings_for_hierarchical_indices)}") # find usages for the selected parameters use = get_analytical_parameter_table( scaling_candidates, 'observable', self.condition_id_to_index, self.petab_problem.observable_df, self.petab_problem.measurement_df, self.observable_ids, self.condition_map, self.NO_PREEQ_CONDITION_IDX ) self.f.require_dataset("/scalingParametersMapToObservables", shape=(len(use), 3), dtype='<i4', data=use) def _handle_sigmas(self, sigma_candidates): """ Write data for dealing with sigma parameters in hierarchical optimization Parameters: sigma_candidates: IDs of optimization parameters which are sigmas to be hierarchically computed. No particular order. """ if not len(sigma_candidates): return # must sort by indices for C++ code sigmas_for_hierarchical_indices = [ self.optimization_parameter_name_to_index[x] for x in sigma_candidates] order = np.argsort(sigmas_for_hierarchical_indices) sigmas_for_hierarchical_indices = \ [sigmas_for_hierarchical_indices[i] for i in order] sigma_candidates = [sigma_candidates[i] for i in order] self.f.require_dataset("/sigmaParameterIndices", shape=(len(sigmas_for_hierarchical_indices),), dtype='<i4', data=sigmas_for_hierarchical_indices) logger.info(f"{Fore.CYAN}Number of sigmas for hierarchical " f"optimization:{Fore.RESET} " f"{len(sigmas_for_hierarchical_indices)}") # find usages for the selected parameters use = get_analytical_parameter_table( sigma_candidates, 'noise', self.condition_id_to_index, self.petab_problem.observable_df, self.petab_problem.measurement_df, self.observable_ids, self.condition_map, self.NO_PREEQ_CONDITION_IDX ) self.f.require_dataset("/sigmaParametersMapToObservables", shape=(len(use), 3), dtype='<i4', data=use) def _write_amici_options(self) -> None: """ Write simulation options """ g = self.f.require_group("/amiciOptions") g.attrs['sensi'] = amici.SensitivityOrder.first g.attrs['sensi_meth'] = amici.SensitivityMethod.adjoint g.attrs['sensi_meth_preeq'] = amici.SensitivityMethod.adjoint # TODO PEtab support: get from file g.attrs['tstart'] = 0.0 g.attrs['atol'] = 1e-14 g.attrs['interpType'] = amici.InterpolationType_hermite g.attrs['ism'] = amici.InternalSensitivityMethod_simultaneous g.attrs['iter'] = amici.NonlinearSolverIteration_newton g.attrs['linsol'] = amici.LinearSolver_KLU g.attrs['lmm'] = amici.LinearMultistepMethod_BDF g.attrs['maxsteps'] = 10000 # fail fast to retry with looser tolerances g.attrs['maxstepsB'] = 10000 g.attrs['newton_preeq'] = 0 g.attrs['nmaxevent'] = 0 g.attrs['ordering'] = 0 g.attrs['rtol'] = 1e-6 g.attrs['stldet'] = 1 # Required to handle parametric initial concentrations where newton # solver would fail with KLU error g.attrs['steadyStateSensitivityMode'] = \ amici.SteadyStateSensitivityMode.integrationOnly num_model_parameters = \ self.f['/parameters/modelParameterNames'].shape[0] # parameter indices w.r.t. which to compute sensitivities self.f.require_dataset( '/amiciOptions/sens_ind', shape=(num_model_parameters,), dtype="<i4", data=range(num_model_parameters)) # TODO not really meaningful anymore - remove? self.f.require_dataset( '/amiciOptions/ts', shape=(len(self.unique_timepoints),), dtype="f8", data=self.unique_timepoints) def _get_analytically_computed_optimization_parameter_indices(self): """ Get optimization parameter index of all analytically computed parameters. """ indices = [] if '/offsetParameterIndices' in self.f: indices.extend(self.f['/offsetParameterIndices']) if '/scalingParameterIndices' in self.f: indices.extend(self.f['/scalingParameterIndices']) if '/sigmaParameterIndices' in self.f: indices.extend(self.f['/sigmaParameterIndices']) return list(set(indices)) def _write_bounds(self): """ Parameter bounds for optimizer Offset parameters are allowed to be negative """ lower_bound = np.array([petab.scale( self.petab_problem.parameter_df.loc[par_id, ptc.LOWER_BOUND], self.petab_problem.parameter_df.loc[par_id, ptc.PARAMETER_SCALE]) for par_id in self.problem_parameter_ids]) upper_bound = np.array([petab.scale( self.petab_problem.parameter_df.loc[par_id, ptc.UPPER_BOUND], self.petab_problem.parameter_df.loc[par_id, ptc.PARAMETER_SCALE]) for par_id in self.problem_parameter_ids]) self.f.require_dataset('/parameters/lowerBound', shape=lower_bound.shape, data=lower_bound, dtype='f8') self.f.require_dataset('/parameters/upperBound', shape=upper_bound.shape, data=upper_bound, dtype='f8') def _write_starting_points(self): """ Write a list of random starting points sampled as specified in PEtab file. """ num_params = len(self.problem_parameter_ids) num_starting_points = 100 np.random.seed(0) starting_points = self.f.require_dataset( '/optimizationOptions/randomStarts', [num_params, num_starting_points], 'f8') starting_points[:] = \ self.petab_problem.sample_parameter_startpoints( num_starting_points).T # Write nominal values for testing purposes if 'nominalValue' in self.petab_problem.parameter_df: # TODO: remove estimated=0, which should be part of mapping tables self.f['/parameters/nominalValues'] = list(petab.map_scale( self.petab_problem.parameter_df[ptc.NOMINAL_VALUE][ self.problem_parameter_ids], self.petab_problem.parameter_df[ptc.PARAMETER_SCALE][ self.problem_parameter_ids]))
[docs] def write_parameter_map(f: h5py.File, mapping_matrix: np.array, override_matrix: np.array, num_model_parameters: int, compression=None) -> None: """Write parameter mapping""" f.require_dataset( name='/parameters/parameterOverrides', chunks=(num_model_parameters, 1), dtype='f8', fillvalue=np.nan, compression=compression, shape=override_matrix.shape, data=override_matrix) f.require_dataset( name='/parameters/optimizationSimulationMapping', chunks=(num_model_parameters, 1), dtype='<i4', fillvalue=-1, compression=compression, shape=mapping_matrix.shape, data=mapping_matrix)
def _subset_dict( full: dict[Any, Any], *args: Collection[Any] ) -> Iterator[dict[Any, Any]]: """Get subset of dictionary based on provided keys :param full: Dictionary to subset :param args: Collections of keys to be contained in the different subsets :return: subsetted dictionary """ for keys in args: yield {key: val for (key, val) in full.items() if key in keys}
[docs] def write_optimization_options(f: h5py.File) -> None: """ Create groups and write some default optimization settings """ # set common options g = f.require_group('optimizationOptions') g.attrs['optimizer'] = 0 # IpOpt g.attrs['retryOptimization'] = 1 g.attrs['hierarchicalOptimization'] = 1 g.attrs['numStarts'] = 1 # set IpOpt options g = f.require_group('optimizationOptions/ipopt') g.attrs['max_iter'] = 100 g.attrs['hessian_approximation'] = np.string_("limited-memory") g.attrs["limited_memory_update_type"] = np.string_("bfgs") g.attrs["tol"] = 1e-9 g.attrs["acceptable_iter"] = 1 # set ridiculously high, so only the acceptable_* options below matter g.attrs["acceptable_tol"] = 1e20 g.attrs["acceptable_obj_change_tol"] = 1e-12 g.attrs["watchdog_shortened_iter_trigger"] = 0 # set CERES options g = f.require_group('optimizationOptions/ceres') g.attrs['max_num_iterations'] = 100 # set toms611/SUMSL options g = f.require_group('optimizationOptions/toms611') g.attrs['mxfcal'] = 1e8
# TODO mini-batch options
[docs] def parse_cli_args(): """Parse command line arguments""" parser = argparse.ArgumentParser( description='Create HDF5 input file for parameter estimation with ' 'parPE.') parser.add_argument('-d', '--model-dir', dest='model_dir', help='Model directory containing the python module') parser.add_argument('-n', '--model-name', dest='model_name', required=True, help='Name of the AMICI model module') parser.add_argument('-y', '--yaml', dest='petab_yaml', required=True, help='PEtab problem yaml file') parser.add_argument('-o', dest='hdf5_file_name', default='data.h5', help='Name of HDF5 file to generate') parser.add_argument('--flatten', dest='flatten', default=False, action='store_true', help='Flatten measurement specific overrides of ' 'observable and noise parameters') return parser.parse_args()
[docs] def main(): """Generate HDF5 file based on CLI options""" init_colorama(autoreset=True) coloredlogs.DEFAULT_LEVEL_STYLES['debug'] = dict(color='white', faint='true') coloredlogs.install(level='DEBUG', logger=logging.root) args = parse_cli_args() petab_problem = petab.Problem.from_yaml(args.petab_yaml) if args.flatten: petab.flatten_timepoint_specific_output_overrides(petab_problem) amici_model = get_amici_model(model_name=args.model_name, model_dir=args.model_dir) h5gen = HDF5DataGenerator( petab_problem=petab_problem, amici_model=amici_model) h5gen.generate_file(args.hdf5_file_name)