Source code for ccobra.benchmark.evaluator

""" CCOBRA evaluation module.

"""

import copy
import logging
import time

import numpy as np
import pandas as pd

from ..model import CCobraModel

from . import contextmanager
from . import modelimporter


# Initialize module-level logger
logger = logging.getLogger(__name__)

[docs]class Evaluator(): """ CCOBRA evaluation routine. """ def __init__(self, benchmark, is_silent=False, cache_df=None): """ Initializes the evaluator object by preparing the data representations and precomputing the required training and adaption steps. Parameters ---------- benchmarks : ccobra.Benchmark Benchmark container. is_silent : bool, optional Flag indicating that output is supposed to be suppressed. cache_df : pandas.DataFrame, option Cache result dataframe. """ logger.info('Setting up evaluator...') # Store the information self.benchmark = benchmark self.is_silent = is_silent self.cache_df = cache_df # Extract the dataset information self.dict_test = benchmark.data_test.to_eval_dict() self.dict_pre_train = None self.dict_pre_train_person = None self.dict_pre_person_background = None if benchmark.data_pre_train is not None: logger.debug('Supplied training data to evaluation.') self.dict_pre_train = benchmark.data_pre_train.to_eval_dict() if benchmark.data_pre_train_person is not None: logger.debug('Supplied person training data to evaluation.') self.dict_pre_train_person = benchmark.data_pre_train_person.to_eval_dict() if benchmark.data_pre_person_background is not None: logger.debug('Supplied person background data to evaluation.') self.dict_pre_person_background = benchmark.data_pre_person_background.to_eval_dict() if benchmark.type == 'coverage': self.dict_pre_train_person = self.dict_test # Extract the functionality to apply self.do_adapt = (benchmark.type == 'adaption') self.do_pre_train_global = (self.dict_pre_train is not None) and not benchmark.corresponding_data self.do_pre_train_leaveoneout = (self.dict_pre_train is not None) and benchmark.corresponding_data self.do_pre_train_person = (self.dict_pre_train_person is not None) self.do_pre_person_background = (self.dict_pre_person_background is not None) logger.debug('Evaluation ready:') logger.debug(' do_adapt: %s', self.do_adapt) logger.debug(' do_pre_train_global: %s', self.do_pre_train_global) logger.debug(' do_pre_train_leaveoneout: %s', self.do_pre_train_leaveoneout) logger.debug(' do_pre_train_person: %s', self.do_pre_train_person) logger.debug(' do_pre_person_background: %s', self.do_pre_person_background)
[docs] def evaluate(self): """ Core evaluation routine. Returns ------- pd.DataFrame Pandas dataframe containing the evaluation results. """ logger.info('Starting evaluation routine...') model_logging_results = {} model_name_cache = set() if self.cache_df is None else set(self.cache_df['model'].unique()) # Activate model context for model_idx, modelinfo in enumerate(self.benchmark.models): # Print the progress log_str = "Evaluating '{}' ({}/{})...".format( modelinfo.path, model_idx + 1, len(self.benchmark.models)) logger.debug(''.join(['='] * 80)) logger.info(log_str) logger.debug(''.join(['='] * 80)) if not self.is_silent: print(log_str) # Initialize the dictionary for the models logging output model_logging_dict = {} # Setup model context with contextmanager.dir_context(modelinfo.path): # Dynamically import the CCOBRA model importer = modelimporter.ModelImporter( modelinfo.path, CCobraModel, load_specific_class=modelinfo.load_specific_class ) # Instantiate and prepare the model for predictions pre_model = importer.instantiate(modelinfo.args) pre_model.setup_environment(self.benchmark.type) # Check if model is applicable to domains/response types self.check_model_applicability(pre_model) # Only use the model's name if no override is specified model_name = modelinfo.override_name if not model_name: model_name = pre_model.name # Ensure that names are unique and show a warning if duplicates are detected original_model_name = model_name changed = False while model_name in model_name_cache: model_name = model_name + '\'' changed = True model_name_cache.add(model_name) if changed: logger.warning( 'Duplicate model name detected ("%s"). Changed to "%s".', original_model_name, model_name ) # Only perform general pre-training if training data is # supplied and corresponding data is false. Otherwise, the # model has to be re-trained for each subject. if self.do_pre_train_global: logger.debug('General pre-training for %s...', model_name) pre_model.pre_train(list(self.dict_pre_train.values())) # Iterate subject for subj_key_identifier, subj_data in self.dict_test.items(): start_subject = time.time() subj_id = subj_data[0]['item'].identifier model = copy.deepcopy(pre_model) # Set the model to new participant model.start_participant(id=subj_id) # Perform pre-training for individual subjects only if # corresponding data is set to true if self.do_pre_train_leaveoneout: logger.debug('Individual pre-training for %s...', model_name) cur_train_data = [ value for key, value in self.dict_pre_train.items() if key != subj_id] model.pre_train(cur_train_data) # Perform background fitting if self.do_pre_person_background: logger.debug('Person background training for %s...', model_name) cur_train_data = self.dict_pre_person_background.get(subj_key_identifier, []) model.pre_person_background(cur_train_data) # Perform person training if self.do_pre_train_person: logger.debug('Person training for %s...', model_name) subj_person_train_data = self.dict_pre_train_person.get(subj_key_identifier, []) model.pre_train_person(subj_person_train_data) # Iterate over individual tasks start_eval = time.time() for task_idx, task in enumerate(subj_data): start_task = time.time() logger.debug('Querying for task %s/%s...', task_idx + 1, len(subj_data)) # Integrity checks assert task['item'].identifier == subj_id # Query models for predictions for eh in self.benchmark.evaluation_handlers: target = task[eh.data_column] eh.predict(model, model_name, task['item'], target, task['aux']) # Perform model adaption if self.do_adapt: for eh in self.benchmark.evaluation_handlers: target = task[eh.data_column] eh.adapt(model, task['item'], task['full']) logger.debug( 'Task {} took {:4f}s'.format(task_idx + 1, time.time() - start_task)) # Finalize subject evaluation and allow the model to store parameters model_log = {} model.end_participant(subj_id, model_log) if len(model_log) > 0: model_logging_dict[subj_id]= model_log logger.debug('Subject evaluation took {:.4}s'.format(time.time() - start_eval)) logger.debug('Subject {} done. took {:.4}s'.format( subj_id, time.time() - start_subject)) # Save the models logging information if available if len(model_logging_dict) > 0: model_logging_results[model_name] = model_logging_dict # Unload the imported model and its dependencies. Might cause garbage collection # issues importer.unimport() res_df = None on_list = [ 'model', 'id', 'domain', 'response_type', 'sequence', 'task', 'choices' ] for enc in self.benchmark.evaluation_handlers: if res_df is None: logger.debug('Preparing new result dataframe based on evaluation handler') res_df = enc.get_result_df() else: logger.debug('Adding evaluation handler result to result dataframe') res_df = res_df.merge(enc.get_result_df(), on=on_list, suffixes=('', '_' + enc.data_column)) # Rename score column res_df = res_df.rename(columns={'score' : 'score_response'}) # Integrate cache if self.cache_df is None: logger.debug('Empty cache. Returning only result dataframe.') return res_df, model_logging_results if res_df.empty: logger.debug('Empty result dataframe. Returning cache only.') return self.cache_df, {} logger.debug('Merging cache and result dataframe...') assert sorted(list(res_df)) == sorted(list(self.cache_df)), 'Incompatible cache' return pd.concat([res_df, self.cache_df]), model_logging_results
[docs] def check_model_applicability(self, pre_model): """ Verifies the applicability of a model by checking its supported domains and response types and comparing them with the evaluation dataset. Parameters ---------- pre_model : CCobraModel Model to check applicability for. Raises ------ ValueError Exception thrown when model is not applicable to some domains or response types in the test data. """ missing_domains = set(self.benchmark.data_test.domains) - set(pre_model.supported_domains) if missing_domains: raise ValueError( 'Model {} is not applicable to domains {} found in ' \ 'the test dataset.'.format( pre_model.name, missing_domains)) missing_response_types = set(self.benchmark.data_test.response_types) - set(pre_model.supported_response_types) if missing_response_types: raise ValueError( 'Model {} is not applicable to response_types {} ' \ 'found in the test dataset.'.format( pre_model.name, missing_response_types))