Source code for parpe.hierarchical_optimization

"""Functions related to hierarchical optimization

https://doi.org/10.1093/bioinformatics/btz581
"""

import logging
from numbers import Number
from typing import Dict, List, Set, Tuple

import numpy as np
import pandas as pd
import petab.C as ptc
import sympy as sp
from numpy import isnan
from petab import split_parameter_replacement_list

from .petab import get_parameter_override_id_to_placeholder_id


logger = logging.getLogger(__name__)


[docs] def parameter_is_scaling_parameter(parameter: str, formula: str) -> bool: """ Check if is scaling parameter. Arguments: parameter: Some identifier. formula: Some sympy-compatible formula. Returns: ``True`` if parameter ``parameter`` is a scaling parameter in formula ``formula``. """ sym_parameter = sp.sympify(parameter) sym_formula = sp.sympify(formula) return sym_parameter not in (sym_formula / sym_parameter).free_symbols
[docs] def parameter_is_offset_parameter(parameter: str, formula: str) -> bool: """ Check if is offset parameter. Arguments: parameter: Some identifier. formula: Some sympy-compatible formula. Returns: ``True`` if parameter ``parameter`` is an offset parameter with positive sign in formula ``formula``. """ sym_parameter = sp.sympify(parameter) sym_formula = sp.sympify(formula) return sym_parameter not in (sym_formula - sym_parameter).free_symbols
[docs] def get_candidates_for_hierarchical( observable_df: pd.DataFrame, measurement_df: pd.DataFrame, parameter_df: pd.DataFrame): """Based on PEtab files, check which parameters are suitable for hierarchical optimization. Arguments: observable_df: PEtab observable table measurement_df: PEtab measurement table parameter_df: PEtab measurement table Returns: """ observable_parameter_override_id_to_placeholder_id, \ noise_parameter_override_id_to_placeholder_id = \ get_parameter_override_id_to_placeholder_id( observable_df=observable_df, measurement_df=measurement_df) # parameters selected for hierarchical optimization hierarchical_candidates = parameter_df.index[ (parameter_df.estimate == 1) & (parameter_df.hierarchicalOptimization == 1)] offset_candidates = set() scaling_candidates = set() sigma_candidates = set() for optimization_parameter_id in hierarchical_candidates: # check which model parameter this one overrides if optimization_parameter_id \ in observable_parameter_override_id_to_placeholder_id: placeholder_ids = \ observable_parameter_override_id_to_placeholder_id[ optimization_parameter_id] # check in which observables this parameter occurs for placeholder_id in placeholder_ids: observable_id = '_'.join(placeholder_id.split('_')[1:]) observable_formula = observable_df.loc[observable_id, ptc.OBSERVABLE_FORMULA] _add_hierarchical_parameter_candidate( placeholder_id, optimization_parameter_id, observable_id, observable_formula, offset_candidates, scaling_candidates ) elif optimization_parameter_id \ in noise_parameter_override_id_to_placeholder_id: # TODO: what is there to check? formula - sigma == 0! # TODO: must not occur anywhere else logger.warning(f"Did not verify that '{optimization_parameter_id}'" " is suitable for hierarchical optimization. " "Assuming you did.") sigma_candidates.add(optimization_parameter_id) else: # handle parameters that are no overrides parameter_added = False for observable_id, observable_formula, noise_formula in zip( observable_df.index, observable_df[ptc.OBSERVABLE_FORMULA], observable_df[ptc.NOISE_FORMULA]): for formula in (observable_formula, noise_formula): free_syms = sp.sympify(formula).free_symbols if any(sym.name == optimization_parameter_id for sym in free_syms): _add_hierarchical_parameter_candidate( param_or_placeholder_id=optimization_parameter_id, param_id=optimization_parameter_id, observable_id=observable_id, observable_formula=formula, offset_candidates=offset_candidates, scaling_candidates=scaling_candidates ) logger.warning( f"Did not verify that '{optimization_parameter_id}'" " is suitable for hierarchical optimization. " "Assuming you did.") parameter_added = True break # TODO ensure this is only output parameter if not parameter_added: raise RuntimeError( f'Parameter {optimization_parameter_id} selected ' 'for hierarchical optimization but is neither ' 'offset, proportionality or sigma parameter. ' 'Dunno what to do.') # check if scalingIndices lists are non-overlapping for x in offset_candidates: if x in scaling_candidates: raise RuntimeError( f"Determined {x} as candidate for both offset and scaling.") if x in sigma_candidates: raise RuntimeError( f"Determined {x} as candidate for both offset and sigma.") for x in scaling_candidates: if x in sigma_candidates: raise RuntimeError( f"Determined {x} as candidate for both scaling and sigma.") # TODO Can't use hierarchical optimization with non-normal or # transformation yet if (offset_candidates or scaling_candidates or sigma_candidates): not_normal = ptc.NOISE_DISTRIBUTION in observable_df \ and not np.all(x == ptc.NORMAL or (isinstance(x, Number) and np.isnan(x)) for x in observable_df[ptc.NOISE_DISTRIBUTION]) not_lin = ptc.OBSERVABLE_TRANSFORMATION in observable_df \ and not np.all(x == ptc.LIN or (isinstance(x, Number) and np.isnan(x)) for x in observable_df[ ptc.OBSERVABLE_TRANSFORMATION]) if not_normal or not_lin: raise ValueError("Can't use hierarchical optimization with " "non-normal noise or observable transformation " "yet.") return (list(offset_candidates), list(scaling_candidates), list(sigma_candidates))
def _add_hierarchical_parameter_candidate( param_or_placeholder_id: str, param_id: str, observable_id: str, observable_formula: str, offset_candidates: Set, scaling_candidates: Set, ): """Check if the given parameter is suitable for hierarchical optimization and if so, add it to the appropriate candidate set""" if parameter_is_offset_parameter( param_or_placeholder_id, observable_formula): offset_candidates.add(param_id) elif parameter_is_scaling_parameter( param_or_placeholder_id, observable_formula): scaling_candidates.add(param_id) else: raise RuntimeError( f'Parameter {param_id} selected ' 'for hierarchical optimization but is neither ' 'offset, proportionality or sigma parameter in ' f'{observable_id}: {observable_formula}.' 'Dunno what to do.')
[docs] def get_analytical_parameter_table( hierarchical_candidate_ids: list, parameter_type: str, condition_id_to_index: Dict[str, int], observable_df: pd.DataFrame, measurement_df: pd.DataFrame, observable_ids, condition_map, no_preeq_condition_idx: int ) -> List[Tuple[int, int, int]]: """Generate (scalingIdx, conditionIdx, observableIdx) table for all occurrences of the given parameter names. Parameters: hierarchical_candidate_ids: Ids of optimization parameters for hierarchical optimization. This table depends on ordering of this list. parameter_type: 'observable' or 'noise' Returns: list of (scalingIdx, conditionIdx, observableIdx) tuples """ # need list, not ndarray condition_map_list = [list(x) for x in condition_map] if parameter_type == 'observable': def _get_overrides(): return split_parameter_replacement_list( row.get(ptc.OBSERVABLE_PARAMETERS, "")) elif parameter_type == 'noise': def _get_overrides(): return split_parameter_replacement_list( row.get(ptc.NOISE_PARAMETERS, "")) else: raise ValueError("parameter_type must be 'noise' or " f"'observable', but got {parameter_type}") use = [] # case 1: parameter selected for hierarchical optimization overrides # a placeholder for _, row in measurement_df.iterrows(): overrides = _get_overrides() sim_cond_idx = \ condition_id_to_index[row.simulationConditionId] preeq_cond_idx = no_preeq_condition_idx if ptc.PREEQUILIBRATION_CONDITION_ID in row \ and not (isinstance(row.preequilibrationConditionId, Number) and isnan(row.preequilibrationConditionId)): preeq_cond_idx = condition_id_to_index[ row.preequilibrationConditionId] for s in overrides: # print(s, parametersForHierarchical) try: candidate_idx = hierarchical_candidate_ids.index(s) except ValueError: continue # current parameter not in list condition_idx = condition_map_list.index( [preeq_cond_idx, sim_cond_idx]) observable_idx = observable_ids.index(row.observableId) tup = (candidate_idx, condition_idx, observable_idx) # Don't add a new line for each timepoint # We don't allow separate parameters for individual time-points # (Can be implemented via different observables) if tup not in use: use.append(tup) # case 2: parameter selected for hierarchical optimization is a regular # parameter observable_id_to_parameter_idx = {} for observable_id, observable_formula, noise_formula in zip( observable_df.index, observable_df[ptc.OBSERVABLE_FORMULA], observable_df[ptc.NOISE_FORMULA]): # determine hierarchical candidates occuring in the given observable formula = observable_formula if parameter_type == 'observable' \ else noise_formula free_syms = {sym.name for sym in sp.sympify(formula).free_symbols} matching_parameters = [hierarchical_candidate_ids.index(parameter_id) for parameter_id in hierarchical_candidate_ids if parameter_id in free_syms] if matching_parameters: observable_id_to_parameter_idx[observable_id] = matching_parameters # determine all conditions that have data for this observable for _, row in measurement_df.iterrows(): candidate_idxs = observable_id_to_parameter_idx.get( row.get(ptc.OBSERVABLE_ID), None) if not candidate_idxs: continue sim_cond_idx = condition_id_to_index[row.simulationConditionId] preeq_cond_idx = no_preeq_condition_idx if ptc.PREEQUILIBRATION_CONDITION_ID in row \ and not (isinstance(row.preequilibrationConditionId, Number) and isnan(row.preequilibrationConditionId)): preeq_cond_idx = condition_id_to_index[ row.preequilibrationConditionId] for candidate_idx in candidate_idxs: condition_idx = condition_map_list.index( [preeq_cond_idx, sim_cond_idx]) observable_idx = observable_ids.index(row.observableId) tup = (candidate_idx, condition_idx, observable_idx) # Don't add a new line for each timepoint # We don't allow separate parameters for individual time-points # (Can be implemented via different observables) if tup not in use: use.append(tup) if not len(use): raise AssertionError("Candidates were: " f"{hierarchical_candidate_ids} but nothing " "usable found") return use