Source code for pyrad.io.config

"""
pyrad.io.config
===============

Functions for reading pyrad config files

.. autosummary::
    :toctree: generated/

    read_config
    get_num_elements
    string_to_datatype
    get_array
    get_struct
    get_array_type
    init_array

"""

import os
import numpy as np
from warnings import warn

_defaults_loc = {
    "ppiImageConfig": {
        "xsize": 10,
        "ysize": 10,
        "dpi": 72,
        "xmin": -50,
        "xmax": 50,
        "ymin": -50,
        "ymax": 50,
    },
    "ppiMapImageConfig": {"xsize": 10, "ysize": 10, "dpi": 72},
    "rhiImageConfig": {
        "xsize": 10,
        "ysize": 5,
        "dpi": 72,
        "xmin": 0,
        "xmax": 50,
        "ymin": 0,
        "ymax": 12,
    },
    "xsecImageConfig": {
        "xsize": 10,
        "ysize": 5,
        "dpi": 72,
        "xmin": 0,
        "xmax": 50,
        "ymin": 0,
        "ymax": 12,
    },
    "gridMapImageConfig": {"xsize": 10, "ysize": 10, "dpi": 72},
    "sunhitsImageConfig": {
        "xsize": 10,
        "ysize": 5,
        "dpi": 72,
        "azmin": -2.0,
        "azmax": 2.0,
        "elmin": -2.0,
        "elmax": 2.0,
        "azres": 0.1,
        "elres": 0.1,
    },
    "spectraImageConfig": {
        "xsize": 10,
        "ysize": 5,
        "dpi": 72,
        "ymin": None,
        "ymax": None,
        "velmin": None,
        "velmax": None,
    },
    "ScanPeriod": 5,
    "IconRunFreq": 3,
    "IconForecasted": 7,
    "RadarName": None,
    "RadarRes": None,
    "NumRadars": 1,
    "TimeTol": 3600.0,
    "ScanList": None,
}

_defaults_main = {
    "lastStateFile": None,
    "datapath": None,
    "satpath": None,
    "iconpath": None,
    "psrpath": None,
    "iqpath": None,
    "colocgatespath": None,
    "excessgatespath": None,
    "dempath": None,
    "smnpath": None,
    "disdropath": None,
    "solarfluxpath": None,
    "selfconsistencypath": None,
    "loadbasepath": None,
    "loadname": None,
    "gecsxbasepath": None,
    "gecsxname": None,
    "metranet_read_lib": "C",
}

DEFAULT_CONFIG = {"loc": _defaults_loc, "main": _defaults_main}


[docs] def read_config(fname, cfg=None, defaults=None): """ Read a pyrad config file. Parameters ---------- fname : str Name of the configuration file to read. cfg : dict of dicts, optional dictionary of dictionaries containing configuration parameters where the new parameters will be placed defaults: dict of dicts, optional dictionary of dictionaries containing default values. If a key is contained in the defaults dict but not in the configuration file, the value from the defaults dict will be assigned Returns ------- cfg : dict of dicts dictionary of dictionaries containing the configuration parameters """ # check if the file can be read try: cfgfile = open(fname, "r", encoding="utf-8", errors="ignore") except Exception: raise Exception("ERROR: Could not find|open config file '" + fname + "'") # if config dictionary does not exist yet create it if cfg is None: cfg = dict() # if default does not exist, create it if defaults is None: defaults = dict() # read file contents fileend = 0 while fileend == 0: # remove leading and trailing whitespace line = os.path.expandvars(cfgfile.readline()) if line: line = line.strip() # ignore white lines if not line: continue # ignore comments if line.startswith("#"): continue line = line.partition("#")[0] # Remove comments line = line.strip() try: vals = line.split() fieldname = vals[0] nvals = len(vals) if nvals < 3: raise Exception( "FILE FORMAT ERROR: file: " + fname + ", variable: " + fieldname + ": Wrong number of elements!" ) valtype = vals[1] valuestr = vals[2:nvals] nel, isstruct = get_num_elements(valtype, valuestr) if nel > 0: if isstruct: pos = cfgfile.tell() fieldvalue, newpos = get_struct(cfgfile, pos, nel, fname) cfgfile.seek(newpos) else: pos = cfgfile.tell() fieldvalue, newpos = get_array(cfgfile, pos, nel, valtype) cfgfile.seek(newpos) else: fieldvalue = string_to_datatype(valtype, valuestr) cfg.update({fieldname: fieldvalue}) except BaseException: if line != "": warn("Parsing failed after line {:s}".format(line)) raise else: fileend = 1 cfgfile.close() # Verify that all keys in default are in newly created config cfg = merge_dicts(cfg, defaults) return cfg
def get_num_elements(dtype, nelstr): """ Checks if data type is an array or a structure. Parameters ---------- dtype : str data type specifier nelstr : str number of elements Returns ------- nel : int number of elements if type is *ARR or STRUCT. 0 otherwise isstruct : bool true if the type is STRUCT """ uptype = dtype.upper() isstruct = False nel = 0 narr = uptype.count("ARR") if uptype == "STRUCT": isstruct = True if (narr > 0) or (isstruct is True): nel = int(nelstr[0]) return nel, isstruct def string_to_datatype(dtype, strval): """ Converts a string containing a value into its Python value Parameters ---------- dtype : str data type specifier strval : str string value Returns ------- val : scalar value contained in the string """ uptype = dtype.upper() if uptype == "BYTE": return int(strval[0]) elif uptype == "BOOL": return bool(strval[0]) elif uptype == "INT": return int(strval[0]) elif uptype == "LONG": return int(strval[0]) elif uptype == "HEX": return int(strval[0]) elif uptype == "EXP": return float(strval[0]) elif uptype == "FLOAT": return float(strval[0]) elif uptype == "DOUBLE": return float(strval[0]) elif uptype == "STRING": # Replace all environement variables return os.path.expandvars(str(strval[0])) else: raise Exception("ERROR: Unexpected data type " + uptype) def get_array(cfgfile, pos, nel, valtype): """ reads an array in a config file Parameters ---------- cfgfile : file object config file pos : int position in file object nel : int number of elements of the ray valtype : str type of array Returns ------- arr : array array values newpos : int new position in file object """ arr_type = get_array_type(valtype) if arr_type == "STRING": arr = [] else: arr = init_array(nel, arr_type) newpos = pos for i in range(nel): pos = cfgfile.seek(newpos) line = cfgfile.readline() # remove leading and trailing whitespace line = line.strip() line = line.partition("#")[0] # Remove comments line = line.strip() vals = line.split() value = string_to_datatype(arr_type, vals) if arr_type == "STRING": arr.append(value) else: arr[i] = value newpos = cfgfile.tell() return arr, newpos def get_struct(cfgfile, pos, nels, fname): """ reads an struct in a config file Parameters ---------- cfgfile : file object config file pos : int position in file object nel : int number of elements of the ray fname : str config file name Returns ------- struct : dict dictionary of struct values newpos : int new position in file object """ struct = dict() newpos = pos for i in range(nels): pos = cfgfile.seek(newpos) line = os.path.expandvars(cfgfile.readline()) # remove leading and trailing whitespace line = line.strip() line = line.partition("#")[0] # Remove comments line = line.strip() try: vals = line.split() sfieldname = vals[0] nvals = len(vals) if nvals < 3: raise Exception( "FILE FORMAT ERROR: file: " + fname + ", struct variable: " + sfieldname + ": Wrong number of elements!" ) svaltype = vals[1] svaluestr = vals[2:nvals] nel, isstruct = get_num_elements(svaltype, svaluestr) if nel > 0: if isstruct: pos = cfgfile.tell() sfieldvalue, newpos = get_struct(cfgfile, pos, nel, fname) cfgfile.seek(newpos) else: pos = cfgfile.tell() sfieldvalue, newpos = get_array(cfgfile, pos, nel, svaltype) cfgfile.seek(newpos) else: newpos = cfgfile.tell() sfieldvalue = string_to_datatype(svaltype, svaluestr) struct.update({sfieldname: sfieldvalue}) except BaseException: if line != "": warn("Parsing failed after line {:s}".format(line)) raise return struct, newpos def get_array_type(dtype): """ Determines Python array type from the config file array type Parameters ---------- dtype : str config file data type Returns ------- pytype : str Python array type """ uptype = dtype.upper() if uptype == "BYTARR": return "BYTE" elif uptype == "INTARR": return "INT" elif uptype == "LONARR": return "LON" elif uptype == "HEXARR": return "HEX" elif uptype == "EXPARR": return "EXP" elif uptype == "FLTARR": return "FLOAT" elif uptype == "DBLARR": return "DOUBLE" elif uptype == "STRARR": return "STRING" else: raise Exception("ERROR: Unexpected data type " + uptype) def init_array(nel, dtype): """ Initializes a Python array Parameters ---------- nel : int number of elements in the array dtype : str config file data type Returns ------- pyarr : array Python array """ uptype = dtype.upper() if uptype == "BYTE": return np.empty(nel, dtype="byte") elif uptype == "INT": return np.empty(nel, dtype="int32") elif uptype == "LONG": return np.empty(nel, dtype="int64") elif uptype == "HEX": return np.empty(nel, dtype="H") elif uptype == "EXP": return np.empty(nel, dtype="float64") elif uptype == "FLOAT": return np.empty(nel, dtype="float64") elif uptype == "DOUBLE": return np.empty(nel, dtype="double") elif uptype == "STRING": return [] else: raise Exception("ERROR: Unexpected array data type " + uptype) def merge_dicts(ref, defaults): """ Merge two nested dictionaries recursively. Parameters: ----------- ref : dict The base dictionary to be merged into. defaults : dict The dictionary whose keys and values are merged into dictionary ref. Returns: -------- ref: dict The base dictionary after addition of missing keys from defaults """ for key, value in defaults.items(): if key in ref: if isinstance(ref[key], dict) and isinstance(value, dict): merge_dicts(ref[key], value) else: ref[key] = value return ref