Source code for pyrad.io.config

"""
pyrad.io.config
===============

Functions for reading pyrad config files

.. autosummary::
    :toctree: generated/

    read_config
    get_num_elements
    string_to_datatype
    get_array
    get_struct
    get_array_type
    init_array

"""
import re
import os
import yaml

# Allows parsing of environment variables in YAML safe loader
env_var_pattern = re.compile(r".*\$\{([^}^{]+)\}.*")


def env_var_constructor(loader, node):
    value = loader.construct_scalar(node)
    return os.path.expandvars(value)


yaml.SafeLoader.add_implicit_resolver("!env_var", env_var_pattern, None)
yaml.SafeLoader.add_constructor("!env_var", env_var_constructor)

# Define type mappings
SCALAR_TYPES = {
    "BYTE": int,
    "BOOL": lambda x: str(x).lower() in ("true", "1"),
    "INT": int,
    "LONG": int,
    "HEX": lambda x: int(x, 16)
    if isinstance(x, str) and x.startswith("0x")
    else int(x),
    "EXP": float,
    "FLOAT": float,
    "DOUBLE": float,
    "STRING": str,
}

ARRAY_TYPE_NAMES = {
    "BYTARR": "BYTE",
    "INTARR": "INT",
    "LONARR": "LONG",
    "HEXARR": "HEX",
    "EXPARR": "EXP",
    "FLTARR": "FLOAT",
    "DBLARR": "DOUBLE",
    "STRARR": "STRING",
}


[docs] def read_config(fname, cfg=None, defaults=None): """ Read a pyrad config file. It can use either the classical pyrad config syntax or yaml files Parameters ---------- fname : str Name of the configuration file to read. cfg : dict of dicts, optional dictionary of dictionaries containing configuration parameters where the new parameters will be placed defaults: dict of dicts, optional dictionary of dictionaries containing default values. If a key is contained in the defaults dict but not in the configuration file, the value from the defaults dict will be assigned Returns ------- cfg : dict of dicts dictionary of dictionaries containing the configuration parameters """ # if config dictionary does not exist yet create it if cfg is None: cfg = dict() # check if the file can be read try: with open(fname, "r", encoding="utf-8", errors="ignore") as cfgfile: # Figure out if it is yaml or not if fname.endswith(".yaml") or fname.endswith(".yml"): cfg_new = yaml.load(cfgfile, Loader=yaml.SafeLoader) else: cfg_new = _read_config_pyrad(cfgfile) except Exception as err: raise Exception( f"ERROR: Could not find|open config file {fname}, error is {err}" ) # Merge with existing dict cfg = merge_dicts(cfg, cfg_new) # if default does not exist, create it if defaults is None: defaults = dict() # Verify that all keys in default are in newly created config cfg = merge_dicts(cfg, defaults) return cfg
def _read_config_pyrad(cfgfile): lines = [ os.path.expandvars(line).split("#")[0].strip() for line in cfgfile.readlines() if line.strip() and not line.strip().startswith("#") ] def parse_block(line, index, expected_count): result = {} count = 0 while index < len(lines): line = lines[index] parts = line.split(maxsplit=2) if len(parts) < 2: raise ValueError(f"Invalid line at {index + 1}: '{line}'") key = parts[0] type_str = parts[1].upper() if type_str == "STRUCT": if len(parts) != 3: raise ValueError( f"Missing struct count at line {index + 1}: '{line}'" ) struct_count = int(parts[2]) index += 1 struct_data, index = parse_block(line, index, struct_count) result[key] = struct_data count += 1 elif type_str in ARRAY_TYPE_NAMES: if len(parts) != 3: raise ValueError( f"Missing array count at line {index + 1}: '{line}'" ) arr_count = int(parts[2]) index += 1 arr = [] for _ in range(arr_count): if index >= len(lines): raise ValueError( f"Expected {arr_count} array items for '{key}'" ) parser = SCALAR_TYPES[ARRAY_TYPE_NAMES[type_str]] arr.append(parser(lines[index])) index += 1 result[key] = arr count += 1 elif type_str in SCALAR_TYPES: if len(parts) != 3: raise ValueError( f"Expected value for key '{key}' at line {index + 1}" ) raw_val = parts[2] parser = SCALAR_TYPES[type_str] value = parser(raw_val) result[key] = value index += 1 count += 1 else: break # End of current block if expected_count is not None and count == expected_count: break if expected_count is not None and count != expected_count: raise ValueError( f"At line {line}, expected {expected_count} elements, but parsed {count}" ) return result, index config = {} index = 0 while index < len(lines): line = lines[index] parts = line.split(maxsplit=2) if len(parts) < 2: raise ValueError( f"Invalid top-level declaration at line {index+1}: '{line}'" ) key, type_str = parts[0], parts[1].upper() if type_str == "STRUCT": if len(parts) != 3: raise ValueError(f"Missing struct count at line {index + 1}: '{line}'") struct_count = int(parts[2]) index += 1 struct_data, index = parse_block(line, index, struct_count) config[key] = struct_data elif type_str in ARRAY_TYPE_NAMES: if len(parts) != 3: raise ValueError(f"Missing array count at line {index + 1}: '{line}'") arr_count = int(parts[2]) index += 1 arr = [] for _ in range(arr_count): if index >= len(lines): raise ValueError(f"Expected {arr_count} array items for '{key}'") if lines[index] == "\n" or "STRUCT" in lines[index]: raise ValueError( f"Parsed illegal line while reading array for {line}" ) arr.append(lines[index]) index += 1 config[key] = arr elif type_str in SCALAR_TYPES: if len(parts) != 3: raise ValueError(f"Expected value for key '{key}' at line {index + 1}") raw_val = parts[2] parser = SCALAR_TYPES[type_str] value = parser(raw_val) config[key] = value index += 1 else: raise ValueError(f"Unknown type '{type_str}' at line {index+1}") if "dataSetList" in config: if len(config["dataSetList"]): # Check if all elements are in config for dset in config["dataSetList"]: if ":" in dset: dset = dset.split(":")[1] if dset not in config: raise ValueError( f"Dataset {dset} specificied in dataSetList was not found in config file" ) return config def merge_dicts(ref, defaults): """ Merge two nested dictionaries recursively. Parameters: ----------- ref : dict The base dictionary to be merged into. defaults : dict The dictionary whose keys and values are merged into dictionary ref. Returns: -------- ref: dict The base dictionary after addition of missing keys from defaults """ for key, value in defaults.items(): if key in ref: if isinstance(ref[key], dict) and isinstance(value, dict): merge_dicts(ref[key], value) else: ref[key] = value return ref