Source code for helpers.config

"""
Core configuration management for the project.

This module centralises all configuration handling using the Dynaconf library.
It loads settings from multiple sources, including default TOML files, local
overrides, and secrets. It also provides a custom loader to import specific
experimental scenarios from an Excel file based on a configuration ID.

Furthermore, it includes utility functions for:
- Saving the current configuration state to a file for reproducibility.
- Setting up a flexible logging system based on an external config file.
- Generating standardised, run-specific output paths for both local and
  remote (cluster) environments.

:Authors:
 - Sascha Holzhauer <Sascha.Holzhauer@uni-kassel.de>
 - Ivan Digel <ivan.digel@uni-kassel.de>

"""
from dynaconf import Dynaconf
from dynaconf import inspect_settings
from dynaconf import loaders
from dynaconf.utils.boxing import DynaBox

try:
    from dynaconf.config import AHIDConfig
    stopeval = True
except ImportError:
    stopeval = False
    
import pandas as pd
import os
import logging.config
import pathlib
import datetime
import gitinfo
from helpers.information import get_git_version

current_directory = pathlib.Path(__file__).parent.absolute()
settings_filename = os.getenv("AHOI_SETTINGS_FILE_FOR_DYNACONF", default = "settings/settings_local.toml")

settings = Dynaconf(root_path=current_directory,
                    merge_enabled=True,
                    envvar_prefix="AHOI",
                    settings_files=["settings/settings.toml", settings_filename,
                                    "settings/.secrets.toml", "settings/constants.toml"],
                    includes=["config_post.toml"],)

output_path_task = None

logger = logging.getLogger("ahoi")

#Custom excel loader
[docs] def settings_loader(settings, filename, config_id, delimiter=","): """ This function serves as a custom loader for Dynaconf. It reads a specified Excel file, finds the row matching the given `config_id`, and updates the `settings` object with the values from that row. String values containing the specified delimiter are automatically split into lists. Parameters ---------- settings: Dynaconf The Dynaconf settings object to be updated. filename: str The path to the Excel scenario file. config_id: int The ID of the configuration row to load from the Excel file. delimiter: str, optional The delimiter used to split string values into lists, by default ",". """ # Helper Function for Type Conversion def parse_value(item): """Try to convert string to int, then float, else return string.""" cleaned_item = item.strip() try: return int(cleaned_item) except ValueError: pass try: return float(cleaned_item) except ValueError: pass return cleaned_item # Read the Excel file df = pd.read_excel(filename, sheet_name="Sheet1") # Filter the DataFrame for the specified config ID matching_rows = df[df['ID'] == config_id] # Check if any rows match the config_id if matching_rows.empty: raise ValueError(f"No configuration found for ID {config_id} in {filename}") # Proceed with loading the configuration data config_data = matching_rows.drop(columns=['ID']).to_dict(orient='records')[0] # Load each item for key, value in config_data.items(): if isinstance(value, str) and delimiter in value: # 1. Split the string raw_list = value.split(delimiter) # 2. Parse each item using the helper function settings[key] = [parse_value(item) for item in raw_list if item.strip()] else: settings[key] = value
[docs] def load_config_for_id(config_id): """ Loads a specific scenario configuration from the project's Excel file. This is a convenience wrapper around `settings_loader` that uses the scenario file path defined in the global settings. Parameters ---------- config_id : int The ID of the configuration to load. """ base_dir = os.path.dirname(os.path.abspath(__file__)) filename = os.path.join(base_dir, "../", settings.main.excel_scenario_file) settings_loader(settings, filename=filename, config_id=config_id)
[docs] def output_conf(settings_to_output=None): """ Writes the current settings configuration and history to files. This function serialises the state of a Dynaconf settings object to disk, creating two files: one with the current configuration (including Git repo info) and another with the full settings history (how values were loaded and merged). Parameters ---------- settings_to_output : Dynaconf, optional The settings object to output. If None, the global `settings` object is used, by default None. Other Parameters ---------------- settings.output.output_settings_path : str The subfolder within the run's output path to save the files. settings.output.output_settings_filename : str The filename for the current settings dump. settings.output.output_settingshistory_filename : str The filename for the settings history dump. Notes ----- The 'other parameters' are taken from the global settings object. """ if settings_to_output is None: settings_to_output = settings if stopeval: AHIDConfig.evaluate = True data = settings_to_output.as_dict() data["repos"] = gitinfo.get_git_info() data["repos"]["version"] = get_git_version() loaders.write( os.path.join( get_output_path(subfolder=settings.output.output_settings_path), settings.output.output_settings_filename, ), DynaBox(data).to_dict(), ) inspect_settings( settings_to_output, to_file=os.path.join( get_output_path(subfolder=settings.output.output_settings_path), settings.output.output_settingshistory_filename, ), dumper="json", key=settings.output.output_settingshistory_key, ) if stopeval: AHIDConfig.evaluate = False
[docs] def stop_dynaconf_evaluation(): """ Disables Dynaconf's deep evaluation feature. This function is a performance optimisation. It disables the deep evaluation of settings values in Dynaconf after all necessary configurations have been loaded, which can prevent costly re-evaluations. """ global stopeval if stopeval: AHIDConfig.evaluate = False logger = logging.getLogger("ahoi") logger.warning("Set Dynaconf's AHIDConfig.evaluate to False!")
[docs] def config_logging(): """ Configures the project's logging system from a file. This function initialises the Python logging framework using a configuration file specified in the settings. It dynamically re-routes file handlers to write logs to run-specific files, ensuring that logs from different runs and modules are kept separate. """ configfile=os.path.join( os.path.dirname(__file__), settings.logging.configfile ) logfolder="logs" runidjobfolder=settings.main.run_id logging.config.fileConfig(configfile) os.makedirs(logfolder, exist_ok=True) for name in logging.root.manager.loggerDict: logger_to_redirect = logging.getLogger(name) numfilehandlers = 0 for handler in logger_to_redirect.handlers: if handler.name is not None and "file" in handler.name: numfilehandlers += 1 fhandler = handler postfix = "" if numfilehandlers < 2 else "_" + handler.name newhandler = logging.FileHandler( os.path.join( logfolder, "ahoi" + str(runidjobfolder) + "_" + name + postfix + ".log", ), "w", ) newhandler.setFormatter(fhandler.formatter) newhandler.setLevel(fhandler.level) logger_to_redirect.removeHandler(fhandler) logger_to_redirect.addHandler(newhandler) logger.info("Logging set up using config file " + configfile)
[docs] def get_output_path( runid=None, subfolder=None, task=None, createfolder=True, ): """ Defines the output path for output data, logging, settings storage, slurm files. If not existing, the folder is created. Parameters ---------- runid: int or string the run ID the output data is associated with. To refer to "RS" runs, the runid may be a string such as "RS0912". subfolder: str last part of the output folder createfolder: bool If True, the folder will be created if non-existent Other Parameters ---------------- - settings.main.runid - settings.main.task - settings.main.output_path - settings.main.output_path_custom Returns ------- str The absolute path to the designated output directory. """ global output_path_task if runid is None: runid = settings.main.run_id if subfolder is None: subfolder = "" if task is None: task = settings.main.task if output_path_task != task: if not task and not task == "": task = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") output_path_task = task if not os.path.isabs(settings.main.output_path): opath = os.path.abspath( os.path.join( os.path.dirname(__file__), "..", "..", settings.main.output_path, task, str(runid), ) ) else: opath = os.path.abspath( os.path.join( settings.main.output_path, task, str(runid), ) ) output_path_sub = os.path.join(opath, subfolder) if createfolder: os.makedirs(output_path_sub, exist_ok=True) return output_path_sub
[docs] def get_cluster_output_path( runid=settings.main.run_id, subfolder=None, ): """ Returns the output folder of output data, logging, settings storage, slurm files on the cluster Parameters ---------- runid: int the run ID the output data is associated with subfolder: str last part of the output folder createfolder: bool If True, the folder will be created if non-existent Other Parameters ---------------- - settings.slurm.target_cluster_mainpath - settings.main.project, - settings.main.task Returns ------- str: cluster output path """ if subfolder is None: subfolder = "" opath = os.path.join( settings.slurm.target_cluster_mainpath, settings.main.task, str(runid), ) return str(pathlib.Path(os.path.join(opath, subfolder)).as_posix())