Source code for pyrad.proc.process_intercomp

"""
pyrad.proc.process_intercomp
============================

Functions used in the inter-comparison between radars

.. autosummary::
    :toctree: generated/

    process_time_stats
    process_time_stats2
    process_time_avg
    process_weighted_time_avg
    process_time_avg_flag
    process_colocated_gates
    process_intercomp
    process_intercomp_time_avg
    process_fields_diff
    process_intercomp_fields

"""

from copy import deepcopy
from warnings import warn
import datetime
import numpy as np
import scipy
from netCDF4 import num2date

import pyart

from ..io.io_aux import get_datatype_fields, get_fieldname_pyart
from ..io.io_aux import get_save_dir, make_filename
from ..io.read_data_other import read_colocated_gates, read_colocated_data
from ..io.read_data_other import read_colocated_data_time_avg
from ..io.read_data_radar import interpol_field

from ..util.radar_utils import time_avg_range, get_range_bins_to_avg
from ..util.radar_utils import find_colocated_indexes


[docs] def process_time_stats(procstatus, dscfg, radar_list=None): """ computes the temporal statistics of a field Parameters ---------- procstatus : int Processing status: 0 initializing, 1 processing volume, 2 post-processing dscfg : dictionary of dictionaries data set configuration. Accepted Configuration Keywords:: datatype : list of string. Dataset keyword Arbitrary data type supported by pyrad and contained in the radar data period : float. Dataset keyword the period to average [s]. If -1 the statistics are going to be performed over the entire data. Default 3600. start_average : float. Dataset keyword when to start the average [s from midnight UTC]. Default 0. lin_trans: int. Dataset keyword If 1 apply linear transformation before averaging use_nan : bool. Dataset keyword If true non valid data will be used nan_value : float. Dataset keyword The value of the non valid data. Default 0 stat: string. Dataset keyword Statistic to compute: Can be mean, std, cov, min, max. Default mean radar_list : list of Radar objects Optional. list of radar objects Returns ------- new_dataset : dict dictionary containing the statistic computed on the input field, as well as "nsamples", as well as "sum2" (sum-squared) if stat in (cov, std), as well as ind_rad : int radar index """ for datatypedescr in dscfg["datatype"]: radarnr, _, datatype, _, _ = get_datatype_fields(datatypedescr) field_name = get_fieldname_pyart(datatype) break ind_rad = int(radarnr[5:8]) - 1 start_average = dscfg.get("start_average", 0.0) period = dscfg.get("period", 3600.0) lin_trans = dscfg.get("lin_trans", 0) use_nan = dscfg.get("use_nan", 0) nan_value = dscfg.get("nan_value", 0.0) stat = dscfg.get("stat", "mean") if procstatus == 0: return None, None if procstatus == 1: if radar_list[ind_rad] is None: warn("No valid radar") return None, None radar = radar_list[ind_rad] if field_name not in radar.fields: warn(field_name + " not available.") return None, None # Prepare auxiliary radar field = deepcopy(radar.fields[field_name]) if stat in ("mean", "std", "cov"): if lin_trans: field["data"] = np.ma.power(10.0, 0.1 * field["data"]) if use_nan: field["data"] = np.ma.asarray(field["data"].filled(nan_value)) if stat in ("std", "cov"): sum2_dict = pyart.config.get_metadata("sum_squared") sum2_dict["data"] = field["data"] * field["data"] else: if use_nan: field["data"] = np.ma.asarray(field["data"].filled(nan_value)) npoints_dict = pyart.config.get_metadata("number_of_samples") npoints_dict["data"] = np.ma.asarray( np.logical_not(np.ma.getmaskarray(field["data"])), dtype=int ) radar_aux = deepcopy(radar) radar_aux.fields = dict() radar_aux.add_field(field_name, field) radar_aux.add_field("number_of_samples", npoints_dict) if stat in ("std", "cov"): radar_aux.add_field("sum_squared", sum2_dict) # first volume: initialize start and end time of averaging if dscfg["initialized"] == 0: avg_par = dict() if period != -1: date_00 = dscfg["timeinfo"].replace( hour=0, minute=0, second=0, microsecond=0 ) avg_par.update( {"starttime": date_00 + datetime.timedelta(seconds=start_average)} ) avg_par.update( { "endtime": avg_par["starttime"] + datetime.timedelta(seconds=period) } ) else: avg_par.update({"starttime": dscfg["timeinfo"]}) avg_par.update({"endtime": dscfg["timeinfo"]}) avg_par.update({"timeinfo": dscfg["timeinfo"]}) dscfg["global_data"] = avg_par dscfg["initialized"] = 1 if dscfg["initialized"] == 0: return None, None dscfg["global_data"]["timeinfo"] = dscfg["timeinfo"] # no radar object in global data: create it if "radar_out" not in dscfg["global_data"]: if period != -1: # get start and stop times of new radar object ( dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], ) = time_avg_range( dscfg["timeinfo"], dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], period, ) # check if volume time older than starttime if dscfg["timeinfo"] > dscfg["global_data"]["starttime"]: dscfg["global_data"].update({"radar_out": radar_aux}) else: dscfg["global_data"].update({"radar_out": radar_aux}) return None, None # still accumulating: add field to global field if period == -1 or dscfg["timeinfo"] < dscfg["global_data"]["endtime"]: if period == -1: dscfg["global_data"]["endtime"] = dscfg["timeinfo"] field_interp = interpol_field( dscfg["global_data"]["radar_out"], radar_aux, field_name ) npoints_interp = interpol_field( dscfg["global_data"]["radar_out"], radar_aux, "number_of_samples" ) if use_nan: field_interp["data"] = np.ma.asarray( field_interp["data"].filled(nan_value) ) dscfg["global_data"]["radar_out"].fields["number_of_samples"][ "data" ] += np.ma.asarray( npoints_interp["data"].filled(fill_value=1), dtype=int ) else: dscfg["global_data"]["radar_out"].fields["number_of_samples"][ "data" ] += np.ma.asarray( npoints_interp["data"].filled(fill_value=0), dtype=int ) if stat in ("mean", "std", "cov"): masked_sum = np.ma.getmaskarray( dscfg["global_data"]["radar_out"].fields[field_name]["data"] ) valid_sum = np.logical_and( np.logical_not(masked_sum), np.logical_not(np.ma.getmaskarray(field_interp["data"])), ) dscfg["global_data"]["radar_out"].fields[field_name]["data"][ masked_sum ] = field_interp["data"][masked_sum] dscfg["global_data"]["radar_out"].fields[field_name]["data"][ valid_sum ] += field_interp["data"][valid_sum] if stat in ("cov", "std"): dscfg["global_data"]["radar_out"].fields["sum_squared"]["data"][ masked_sum ] = ( field_interp["data"][masked_sum] * field_interp["data"][masked_sum] ) dscfg["global_data"]["radar_out"].fields["sum_squared"]["data"][ valid_sum ] += ( field_interp["data"][valid_sum] * field_interp["data"][valid_sum] ) elif stat == "max": dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = np.maximum( dscfg["global_data"]["radar_out"] .fields[field_name]["data"] .filled(fill_value=-1.0e300), field_interp["data"].filled(fill_value=-1.0e300), ) dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = np.ma.masked_values( dscfg["global_data"]["radar_out"].fields[field_name]["data"], -1.0e300, ) elif stat == "min": dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = np.minimum( dscfg["global_data"]["radar_out"] .fields[field_name]["data"] .filled(fill_value=1.0e300), field_interp["data"].filled(fill_value=1.0e300), ) dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = np.ma.masked_values( dscfg["global_data"]["radar_out"].fields[field_name]["data"], 1.0e300, ) return None, None # we have reached the end of the accumulation period: do the averaging # and start a new object (only reachable if period != -1) if stat in ("mean", "std", "cov"): field_mean = ( dscfg["global_data"]["radar_out"].fields[field_name]["data"] / dscfg["global_data"]["radar_out"].fields["number_of_samples"]["data"] ) if stat == "mean": if lin_trans: dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = 10.0 * np.ma.log10(field_mean) else: dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = field_mean elif stat in ("std", "cov"): field_std = np.ma.sqrt( dscfg["global_data"]["radar_out"].fields["sum_squared"]["data"] / dscfg["global_data"]["radar_out"].fields["number_of_samples"][ "data" ] - field_mean * field_mean ) if stat == "std": if lin_trans: dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = 10.0 * np.ma.log10(field_std) else: dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = field_std else: if lin_trans: dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = 10.0 * np.ma.log10(field_std / field_mean) else: dscfg["global_data"]["radar_out"].fields[field_name]["data"] = ( field_std / field_mean ) new_dataset = { "radar_out": deepcopy(dscfg["global_data"]["radar_out"]), "timeinfo": dscfg["global_data"]["endtime"], } dscfg["global_data"]["starttime"] += datetime.timedelta(seconds=period) dscfg["global_data"]["endtime"] += datetime.timedelta(seconds=period) # remove old radar object from global_data dictionary dscfg["global_data"].pop("radar_out", None) # get start and stop times of new radar object ( dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], ) = time_avg_range( dscfg["timeinfo"], dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], period, ) # check if volume time older than starttime if dscfg["timeinfo"] > dscfg["global_data"]["starttime"]: dscfg["global_data"].update({"radar_out": radar_aux}) return new_dataset, ind_rad # no more files to process if there is global data pack it up if procstatus == 2: if dscfg["initialized"] == 0: return None, None if "radar_out" not in dscfg["global_data"]: return None, None if stat in ("mean", "std", "cov"): field_mean = ( dscfg["global_data"]["radar_out"].fields[field_name]["data"] / dscfg["global_data"]["radar_out"].fields["number_of_samples"]["data"] ) if stat == "mean": if lin_trans: dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = 10.0 * np.ma.log10(field_mean) else: dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = field_mean elif stat in ("std", "cov"): field_std = np.ma.sqrt( dscfg["global_data"]["radar_out"].fields["sum_squared"]["data"] / dscfg["global_data"]["radar_out"].fields["number_of_samples"][ "data" ] - field_mean * field_mean ) if stat == "std": if lin_trans: dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = 10.0 * np.ma.log10(field_std) else: dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = field_std else: dscfg["global_data"]["radar_out"].fields[field_name]["data"] = ( field_std / field_mean ) new_dataset = { "radar_out": deepcopy(dscfg["global_data"]["radar_out"]), "timeinfo": dscfg["global_data"]["endtime"], } return new_dataset, ind_rad
[docs] def process_time_stats2(procstatus, dscfg, radar_list=None): """ computes the temporal mean of a field Parameters ---------- procstatus : int Processing status: 0 initializing, 1 processing volume, 2 post-processing dscfg : dictionary of dictionaries data set configuration. Accepted Configuration Keywords:: datatype : list of string. Dataset keyword Arbitrary data type supported by pyrad and contianed in the radar data period : float. Dataset keyword the period to average [s]. If -1 the statistics are going to be performed over the entire data. Default 3600. start_average : float. Dataset keyword when to start the average [s from midnight UTC]. Default 0. stat: string. Dataset keyword Statistic to compute: Can be median, mode, percentileXX use_nan : bool. Dataset keyword If true non valid data will be used nan_value : float. Dataset keyword The value of the non valid data. Default 0 radar_list : list of Radar objects Optional. list of radar objects Returns ------- new_dataset : dict dictionary containing the statistic computed on the input field, as well as "nsamples" ind_rad : int radar index """ for datatypedescr in dscfg["datatype"]: radarnr, _, datatype, _, _ = get_datatype_fields(datatypedescr) field_name = get_fieldname_pyart(datatype) break ind_rad = int(radarnr[5:8]) - 1 start_average = dscfg.get("start_average", 0.0) period = dscfg.get("period", 3600.0) use_nan = dscfg.get("use_nan", 0) nan_value = dscfg.get("nan_value", 0.0) stat = dscfg.get("stat", "median") if "percentile" in stat: percentile = float(stat.replace("percentile", "")) if procstatus == 0: return None, None if procstatus == 1: if radar_list[ind_rad] is None: warn("No valid radar") return None, None radar = radar_list[ind_rad] if field_name not in radar.fields: warn(field_name + " not available.") return None, None # prepare auxiliary radar field = deepcopy(radar.fields[field_name]) if use_nan: field["data"] = np.ma.asarray(field["data"].filled(nan_value)) npoints_dict = pyart.config.get_metadata("number_of_samples") npoints_dict["data"] = np.ma.asarray( np.logical_not(np.ma.getmaskarray(field["data"])), dtype=int ) radar_aux = deepcopy(radar) radar_aux.fields = dict() radar_aux.add_field(field_name, field) radar_aux.add_field("number_of_samples", npoints_dict) # first volume: initialize start and end time of averaging if dscfg["initialized"] == 0: avg_par = dict() if period != -1: date_00 = dscfg["timeinfo"].replace( hour=0, minute=0, second=0, microsecond=0 ) avg_par.update( {"starttime": date_00 + datetime.timedelta(seconds=start_average)} ) avg_par.update( { "endtime": avg_par["starttime"] + datetime.timedelta(seconds=period) } ) else: avg_par.update({"starttime": dscfg["timeinfo"]}) avg_par.update({"endtime": dscfg["timeinfo"]}) avg_par.update({"timeinfo": dscfg["timeinfo"]}) dscfg["global_data"] = avg_par dscfg["initialized"] = 1 if dscfg["initialized"] == 0: return None, None dscfg["global_data"]["timeinfo"] = dscfg["timeinfo"] # no radar object in global data: create it if "radar_out" not in dscfg["global_data"]: if period != -1: # get start and stop times of new radar object ( dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], ) = time_avg_range( dscfg["timeinfo"], dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], period, ) # check if volume time older than starttime if dscfg["timeinfo"] > dscfg["global_data"]["starttime"]: dscfg["global_data"].update({"radar_out": radar_aux}) dscfg["global_data"].update( { "field_data": np.atleast_3d( radar_aux.fields[field_name]["data"] ) } ) else: dscfg["global_data"].update({"radar_out": radar_aux}) dscfg["global_data"].update( {"field_data": np.atleast_3d(radar_aux.fields[field_name]["data"])} ) return None, None # still accumulating: add field to global field if period == -1 or dscfg["timeinfo"] < dscfg["global_data"]["endtime"]: if period == -1: dscfg["global_data"]["endtime"] = dscfg["timeinfo"] field_interp = interpol_field( dscfg["global_data"]["radar_out"], radar_aux, field_name ) npoints_interp = interpol_field( dscfg["global_data"]["radar_out"], radar_aux, "number_of_samples" ) if use_nan: field_interp["data"] = np.ma.asarray( field_interp["data"].filled(nan_value) ) dscfg["global_data"]["radar_out"].fields["number_of_samples"][ "data" ] += np.ma.asarray( npoints_interp["data"].filled(fill_value=1), dtype=int ) else: dscfg["global_data"]["radar_out"].fields["number_of_samples"][ "data" ] += np.ma.asarray( npoints_interp["data"].filled(fill_value=0), dtype=int ) dscfg["global_data"]["field_data"] = np.ma.append( dscfg["global_data"]["field_data"], np.atleast_3d(field_interp["data"]), axis=2, ) return None, None # we have reached the end of the accumulation period: do the averaging # and start a new object (only reachable if period != -1) if stat == "median": dscfg["global_data"]["radar_out"].fields[field_name]["data"] = np.ma.median( dscfg["global_data"]["field_data"], axis=2 ) elif stat == "mode": mode_data, _ = scipy.stats.mode( dscfg["global_data"]["field_data"].filled(fill_value=np.nan), axis=2, nan_policy="omit", ) dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = np.ma.masked_invalid(np.squeeze(mode_data, axis=2)) elif "percentile" in stat: percent_data = np.nanpercentile( dscfg["global_data"]["field_data"].filled(fill_value=np.nan), percentile, axis=2, ) dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = np.ma.masked_invalid(percent_data) new_dataset = { "radar_out": deepcopy(dscfg["global_data"]["radar_out"]), "timeinfo": dscfg["global_data"]["endtime"], } dscfg["global_data"]["starttime"] += datetime.timedelta(seconds=period) dscfg["global_data"]["endtime"] += datetime.timedelta(seconds=period) # remove old radar object from global_data dictionary dscfg["global_data"].pop("radar_out", None) # get start and stop times of new radar object ( dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], ) = time_avg_range( dscfg["timeinfo"], dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], period, ) # check if volume time older than starttime if dscfg["timeinfo"] > dscfg["global_data"]["starttime"]: dscfg["global_data"].update({"radar_out": radar_aux}) return new_dataset, ind_rad # no more files to process if there is global data pack it up if procstatus == 2: if dscfg["initialized"] == 0: return None, None if "radar_out" not in dscfg["global_data"]: return None, None if stat == "median": dscfg["global_data"]["radar_out"].fields[field_name]["data"] = np.ma.median( dscfg["global_data"]["field_data"], axis=2 ) elif stat == "mode": mode_data, _ = scipy.stats.mode( dscfg["global_data"]["field_data"].filled(fill_value=np.nan), axis=2, nan_policy="omit", ) dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = np.ma.masked_invalid(np.squeeze(mode_data, axis=2)) elif "percentile" in stat: percent_data = np.nanpercentile( dscfg["global_data"]["field_data"].filled(fill_value=np.nan), percentile, axis=2, ) dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = np.ma.masked_invalid(percent_data) new_dataset = { "radar_out": deepcopy(dscfg["global_data"]["radar_out"]), "timeinfo": dscfg["global_data"]["endtime"], } return new_dataset, ind_rad
[docs] def process_time_avg(procstatus, dscfg, radar_list=None): """ computes the temporal mean of a field Parameters ---------- procstatus : int Processing status: 0 initializing, 1 processing volume, 2 post-processing dscfg : dictionary of dictionaries data set configuration. Accepted Configuration Keywords:: datatype : list of string. Dataset keyword Arbitrary data type supported by pyrad and contained in the radar data period : float. Dataset keyword the period to average [s]. Default 3600. start_average : float. Dataset keyword when to start the average [s from midnight UTC]. Default 0. lin_trans: int. Dataset keyword If 1 apply linear transformation before averaging radar_list : list of Radar objects Optional. list of radar objects Returns ------- new_dataset : dict dictionary containing the statistic computed on the input field, as well as "nsamples" ind_rad : int radar index """ for datatypedescr in dscfg["datatype"]: radarnr, _, datatype, _, _ = get_datatype_fields(datatypedescr) field_name = get_fieldname_pyart(datatype) break ind_rad = int(radarnr[5:8]) - 1 lin_trans = dscfg.get("lin_trans", 0) if procstatus == 0: return None, None if procstatus == 1: if radar_list[ind_rad] is None: warn("No valid radar") return None, None radar = radar_list[ind_rad] if field_name not in radar.fields: warn(field_name + " not available.") return None, None period = dscfg.get("period", 3600.0) field = deepcopy(radar.fields[field_name]) if lin_trans: field["data"] = np.ma.power(10.0, 0.1 * field["data"]) field["data"] = field["data"].filled(fill_value=0.0) field["data"] = np.ma.asarray(field["data"]) radar_aux = deepcopy(radar) radar_aux.fields = dict() radar_aux.add_field(field_name, field) npoints_dict = pyart.config.get_metadata("number_of_samples") npoints_dict["data"] = np.ma.ones((radar.nrays, radar.ngates), dtype=int) radar_aux.add_field("number_of_samples", npoints_dict) # first volume: initialize start and end time of averaging if dscfg["initialized"] == 0: start_average = dscfg.get("start_average", 0.0) date_00 = dscfg["timeinfo"].replace( hour=0, minute=0, second=0, microsecond=0 ) avg_par = dict() avg_par.update( {"starttime": date_00 + datetime.timedelta(seconds=start_average)} ) avg_par.update( {"endtime": avg_par["starttime"] + datetime.timedelta(seconds=period)} ) avg_par.update({"timeinfo": dscfg["timeinfo"]}) dscfg["global_data"] = avg_par dscfg["initialized"] = 1 if dscfg["initialized"] == 0: return None, None dscfg["global_data"]["timeinfo"] = dscfg["timeinfo"] # no radar object in global data: create it if "radar_out" not in dscfg["global_data"]: # get start and stop times of new radar object ( dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], ) = time_avg_range( dscfg["timeinfo"], dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], period, ) # check if volume time older than starttime if dscfg["timeinfo"] > dscfg["global_data"]["starttime"]: dscfg["global_data"].update({"radar_out": radar_aux}) return None, None # still accumulating: add field to global field if dscfg["timeinfo"] < dscfg["global_data"]["endtime"]: field_interp = interpol_field( dscfg["global_data"]["radar_out"], radar_aux, field_name ) npoints_interp = interpol_field( dscfg["global_data"]["radar_out"], radar_aux, "number_of_samples" ) dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] += field_interp["data"].filled(fill_value=0) dscfg["global_data"]["radar_out"].fields["number_of_samples"]["data"] += ( npoints_interp["data"].filled(fill_value=0) ).astype("int") return None, None # we have reached the end of the accumulation period: do the averaging # and start a new object dscfg["global_data"]["radar_out"].fields[field_name]["data"] /= dscfg[ "global_data" ]["radar_out"].fields["number_of_samples"]["data"] if lin_trans: dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = 10.0 * np.ma.log10( dscfg["global_data"]["radar_out"].fields[field_name]["data"] ) new_dataset = { "radar_out": deepcopy(dscfg["global_data"]["radar_out"]), "timeinfo": dscfg["global_data"]["endtime"], } dscfg["global_data"]["starttime"] += datetime.timedelta(seconds=period) dscfg["global_data"]["endtime"] += datetime.timedelta(seconds=period) # remove old radar object from global_data dictionary dscfg["global_data"].pop("radar_out", None) # get start and stop times of new radar object ( dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], ) = time_avg_range( dscfg["timeinfo"], dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], period, ) # check if volume time older than starttime if dscfg["timeinfo"] > dscfg["global_data"]["starttime"]: dscfg["global_data"].update({"radar_out": radar_aux}) return new_dataset, ind_rad # no more files to process if there is global data pack it up if procstatus == 2: if dscfg["initialized"] == 0: return None, None if "radar_out" not in dscfg["global_data"]: return None, None (dscfg["global_data"]["radar_out"].fields[field_name]["data"]) /= dscfg[ "global_data" ]["radar_out"].fields["number_of_samples"]["data"] if lin_trans: dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] = 10.0 * np.ma.log10( dscfg["global_data"]["radar_out"].fields[field_name]["data"] ) new_dataset = { "radar_out": deepcopy(dscfg["global_data"]["radar_out"]), "timeinfo": dscfg["global_data"]["endtime"], } return new_dataset, ind_rad
[docs] def process_weighted_time_avg(procstatus, dscfg, radar_list=None): """ computes the temporal mean of a field weighted by the reflectivity Parameters ---------- procstatus : int Processing status: 0 initializing, 1 processing volume, 2 post-processing dscfg : dictionary of dictionaries data set configuration. Accepted Configuration Keywords:: datatype : list of string. Dataset keyword Arbitrary data type supported by pyrad and contained in the radar data, as well as "dBZ" or "dBZc" or "dBuZ" or "dBZv" or "dBZvc" or "dBuZv" (refl. weighting) period : float. Dataset keyword the period to average [s]. Default 3600. start_average : float. Dataset keyword when to start the average [s from midnight UTC]. Default 0. radar_list : list of Radar objects Optional. list of radar objects Returns ------- new_dataset : dict dictionary containing the statistic computed on the input field ind_rad : int radar index """ for datatypedescr in dscfg["datatype"]: radarnr, _, datatype, _, _ = get_datatype_fields(datatypedescr) if datatype in ("dBZ", "dBZc", "dBuZ", "dBZv", "dBZvc", "dBuZv"): refl_name = get_fieldname_pyart(datatype) else: field_name = get_fieldname_pyart(datatype) ind_rad = int(radarnr[5:8]) - 1 if procstatus == 0: return None, None if procstatus == 1: if radar_list[ind_rad] is None: warn("No valid radar") return None, None radar = radar_list[ind_rad] if field_name not in radar.fields or refl_name not in radar.fields: warn("Unable to compute weighted average. Missing data") return None, None period = dscfg.get("period", 3600.0) field = deepcopy(radar.fields[field_name]) field["data"] = field["data"].filled(fill_value=0.0) field["data"] = np.ma.asarray(field["data"]) refl_field = deepcopy(radar.fields[refl_name]) refl_field["data"] = np.ma.power(10.0, 0.1 * refl_field["data"]) refl_field["data"] = refl_field["data"].filled(fill_value=0.0) refl_field["data"] = np.ma.asarray(refl_field["data"]) field["data"] *= refl_field["data"] radar_aux = deepcopy(radar) radar_aux.fields = dict() radar_aux.add_field(field_name, field) radar_aux.add_field(refl_name, refl_field) # first volume: initialize start and end time of averaging if dscfg["initialized"] == 0: start_average = dscfg.get("start_average", 0.0) date_00 = dscfg["timeinfo"].replace( hour=0, minute=0, second=0, microsecond=0 ) avg_par = dict() avg_par.update( {"starttime": date_00 + datetime.timedelta(seconds=start_average)} ) avg_par.update( {"endtime": avg_par["starttime"] + datetime.timedelta(seconds=period)} ) avg_par.update({"timeinfo": dscfg["timeinfo"]}) dscfg["global_data"] = avg_par dscfg["initialized"] = 1 if dscfg["initialized"] == 0: return None, None dscfg["global_data"]["timeinfo"] = dscfg["timeinfo"] # no radar object in global data: create it if "radar_out" not in dscfg["global_data"]: # get start and stop times of new radar object ( dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], ) = time_avg_range( dscfg["timeinfo"], dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], period, ) # check if volume time older than starttime if dscfg["timeinfo"] > dscfg["global_data"]["starttime"]: dscfg["global_data"].update({"radar_out": radar_aux}) return None, None # still accumulating: add field to global field if dscfg["timeinfo"] < dscfg["global_data"]["endtime"]: field_interp = interpol_field( dscfg["global_data"]["radar_out"], radar_aux, field_name ) dscfg["global_data"]["radar_out"].fields[field_name][ "data" ] += field_interp["data"].filled(fill_value=0) refl_interp = interpol_field( dscfg["global_data"]["radar_out"], radar_aux, refl_name ) dscfg["global_data"]["radar_out"].fields[refl_name]["data"] += refl_interp[ "data" ].filled(fill_value=0) return None, None # we have reached the end of the accumulation period: do the averaging # and start a new object dscfg["global_data"]["radar_out"].fields[field_name]["data"] /= dscfg[ "global_data" ]["radar_out"].fields[refl_name]["data"] new_dataset = { "radar_out": deepcopy(dscfg["global_data"]["radar_out"]), "timeinfo": dscfg["global_data"]["endtime"], } dscfg["global_data"]["starttime"] += datetime.timedelta(seconds=period) dscfg["global_data"]["endtime"] += datetime.timedelta(seconds=period) # remove old radar object from global_data dictionary dscfg["global_data"].pop("radar_out", None) # get start and stop times of new radar object ( dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], ) = time_avg_range( dscfg["timeinfo"], dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], period, ) # check if volume time older than starttime if dscfg["timeinfo"] > dscfg["global_data"]["starttime"]: dscfg["global_data"].update({"radar_out": radar_aux}) return new_dataset, ind_rad # no more files to process if there is global data pack it up if procstatus == 2: if dscfg["initialized"] == 0: return None, None if "radar_out" not in dscfg["global_data"]: return None, None dscfg["global_data"]["radar_out"].fields[field_name]["data"] /= dscfg[ "global_data" ]["radar_out"].fields[refl_name]["data"] new_dataset = { "radar_out": deepcopy(dscfg["global_data"]["radar_out"]), "timeinfo": dscfg["global_data"]["endtime"], } return new_dataset, ind_rad
[docs] def process_time_avg_flag(procstatus, dscfg, radar_list=None): """ computes a flag field describing the conditions of the data used while averaging Parameters ---------- procstatus : int Processing status: 0 initializing, 1 processing volume, 2 post-processing dscfg : dictionary of dictionaries data set configuration. Accepted Configuration Keywords:: datatype : list of string. Dataset keyword The input data types, must be "PhiDP" or "PhiDPc" (Optional, for PhiDP flagging), and, "echoID" (Optional, for echoID flagging), and, "hydro" (Optional, for no rain flagging), and, "TEMP" (Optional, for solid precip flagging), and, "H_ISO0" (Optional, also for solid precip flagging) period : float. Dataset keyword the period to average [s]. Default 3600. start_average : float. Dataset keyword when to start the average [s from midnight UTC]. Default 0. phidpmax: float. Dataset keyword maximum PhiDP beamwidth : float. Dataset keyword the antenna beamwidth [deg]. If None that of the keys radar_beam_width_h or radar_beam_width_v in attribute instrument_parameters of the radar object will be used. If the key or the attribute are not present the beamwidth will be set to None radar_list : list of Radar objects Optional. list of radar objects Returns ------- new_dataset : dict dictionary containing the field "time_avg_flag" ind_rad : int radar index """ temp_name = None hydro_name = None iso0_name = None echo_name = None for datatypedescr in dscfg["datatype"]: radarnr, _, datatype, _, _ = get_datatype_fields(datatypedescr) if datatype in ("PhiDP", "PhiDPc"): phidp_name = get_fieldname_pyart(datatype) elif datatype == "echoID": echo_name = get_fieldname_pyart(datatype) elif datatype == "hydro": hydro_name = get_fieldname_pyart(datatype) elif datatype == "TEMP": temp_name = get_fieldname_pyart(datatype) elif datatype == "H_ISO0": iso0_name = "height_over_iso0" ind_rad = int(radarnr[5:8]) - 1 if procstatus == 0: return None, None if procstatus == 1: if radar_list[ind_rad] is None: warn("No valid radar") return None, None radar = radar_list[ind_rad] phidpmax = dscfg.get("phidpmax", 60.0) period = dscfg.get("period", 3600.0) time_avg_flag = pyart.config.get_metadata("time_avg_flag") time_avg_flag["data"] = np.ma.zeros((radar.nrays, radar.ngates), dtype=int) if phidp_name not in radar.fields: warn("Missing PhiDP data") time_avg_flag["data"] += 1 else: phidp_field = radar.fields[phidp_name] time_avg_flag["data"][phidp_field["data"] > phidpmax] += 1 if echo_name is not None: if echo_name not in radar.fields: warn("Missing echo ID data") time_avg_flag["data"] += 100 else: echo_field = radar.fields[echo_name] time_avg_flag["data"][echo_field["data"] == 2] += 100 if hydro_name is not None and echo_name is not None: if (hydro_name not in radar.fields) or (echo_name not in radar.fields): warn("Missing hydrometeor classification data") time_avg_flag["data"] += 10000 else: hydro_field = radar.fields[hydro_name] # check where is no rain is_not_rain = np.logical_and( hydro_field["data"] != 4, hydro_field["data"] != 6 ) # where is no rain should be precip is_not_rain = np.logical_and(is_not_rain, echo_field["data"] == 3) time_avg_flag["data"][is_not_rain] += 10000 elif temp_name is not None: if temp_name not in radar.fields: warn("Missing temperature data") time_avg_flag["data"] += 10000 else: beamwidth = dscfg.get("beamwidth", None) if beamwidth is None: if radar.instrument_parameters is not None: if "radar_beam_width_h" in radar.instrument_parameters: beamwidth = radar.instrument_parameters[ "radar_beam_width_h" ]["data"][0] elif "radar_beam_width_v" in radar.instrument_parameters: beamwidth = radar.instrument_parameters[ "radar_beam_width_v" ]["data"][0] if beamwidth is None: warn("Antenna beam width unknown.") mask_fzl, _ = pyart.correct.get_mask_fzl( radar, fzl=None, doc=None, min_temp=0.0, max_h_iso0=0.0, thickness=700.0, beamwidth=beamwidth, temp_field=temp_name, iso0_field=iso0_name, temp_ref="temperature", ) time_avg_flag["data"][mask_fzl] += 10000 elif iso0_name is not None: if iso0_name not in radar.fields: warn("Missing height relative to iso0 data") time_avg_flag["data"] += 10000 else: beamwidth = dscfg.get("beamwidth", None) if beamwidth is None: if radar.instrument_parameters is not None: if "radar_beam_width_h" in radar.instrument_parameters: beamwidth = radar.instrument_parameters[ "radar_beam_width_h" ]["data"][0] elif "radar_beam_width_v" in radar.instrument_parameters: beamwidth = radar.instrument_parameters[ "radar_beam_width_v" ]["data"][0] if beamwidth is None: warn("Antenna beam width unknown.") mask_fzl, _ = pyart.correct.get_mask_fzl( radar, fzl=None, doc=None, min_temp=0.0, max_h_iso0=0.0, thickness=700.0, beamwidth=beamwidth, temp_field=temp_name, iso0_field=iso0_name, temp_ref="height_over_iso0", ) time_avg_flag["data"][mask_fzl] += 10000 radar_aux = deepcopy(radar) radar_aux.fields = dict() radar_aux.add_field("time_avg_flag", time_avg_flag) # first volume: initialize start and end time of averaging if dscfg["initialized"] == 0: start_average = dscfg.get("start_average", 0.0) date_00 = dscfg["timeinfo"].replace( hour=0, minute=0, second=0, microsecond=0 ) avg_par = dict() avg_par.update( {"starttime": date_00 + datetime.timedelta(seconds=start_average)} ) avg_par.update( {"endtime": avg_par["starttime"] + datetime.timedelta(seconds=period)} ) avg_par.update({"timeinfo": dscfg["timeinfo"]}) dscfg["global_data"] = avg_par dscfg["initialized"] = 1 if dscfg["initialized"] == 0: return None, None dscfg["global_data"]["timeinfo"] = dscfg["timeinfo"] # no radar object in global data: create it if "radar_out" not in dscfg["global_data"]: # get start and stop times of new radar object ( dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], ) = time_avg_range( dscfg["timeinfo"], dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], period, ) # check if volume time older than starttime if dscfg["timeinfo"] > dscfg["global_data"]["starttime"]: dscfg["global_data"].update({"radar_out": radar_aux}) return None, None # still accumulating: add field to global field if dscfg["timeinfo"] < dscfg["global_data"]["endtime"]: flag_interp = interpol_field( dscfg["global_data"]["radar_out"], radar_aux, "time_avg_flag" ) dscfg["global_data"]["radar_out"].fields["time_avg_flag"]["data"] += ( flag_interp["data"].filled(fill_value=0) ).astype(int) return None, None # we have reached the end of the accumulation: start a new object new_dataset = { "radar_out": deepcopy(dscfg["global_data"]["radar_out"]), "timeinfo": dscfg["global_data"]["endtime"], } dscfg["global_data"]["starttime"] += datetime.timedelta(seconds=period) dscfg["global_data"]["endtime"] += datetime.timedelta(seconds=period) # remove old radar object from global_data dictionary dscfg["global_data"].pop("radar_out", None) # get start and stop times of new radar object ( dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], ) = time_avg_range( dscfg["timeinfo"], dscfg["global_data"]["starttime"], dscfg["global_data"]["endtime"], period, ) # check if volume time older than starttime if dscfg["timeinfo"] > dscfg["global_data"]["starttime"]: dscfg["global_data"].update({"radar_out": radar_aux}) return new_dataset, ind_rad # no more files to process if there is global data pack it up if procstatus == 2: if dscfg["initialized"] == 0: return None, None if "radar_out" not in dscfg["global_data"]: return None, None new_dataset = { "radar_out": deepcopy(dscfg["global_data"]["radar_out"]), "timeinfo": dscfg["global_data"]["endtime"], } return new_dataset, ind_rad
[docs] def process_colocated_gates(procstatus, dscfg, radar_list=None): """ Find colocated gates within two radars Parameters ---------- procstatus : int Processing status: 0 initializing, 1 processing volume, 2 post-processing dscfg : dictionary of dictionaries data set configuration. Accepted Configuration Keywords:: datatype : list of string. Dataset keyword The input data types to use to check colocated gates (one for every radar) Any datatype supported by pyrad and available in both radars is accepted. If visibility filtering is desired, the fields "visibility" or "visibility_polar" must be specified for both radars. h_tol : float. Dataset keyword Tolerance in altitude difference between radar gates [m]. Default 100. latlon_tol : float. Dataset keyword Tolerance in latitude and longitude position between radar gates [deg]. Default 0.0005 vol_d_tol : float. Dataset keyword Tolerance in pulse volume diameter [m]. Default 100. vismin : float. Dataset keyword Minimum visibility [percent]. Default None. hmin : float. Dataset keyword Minimum altitude [m MSL]. Default None. hmax : float. Dataset keyword Maximum altitude [m MSL]. Default None. rmin : float. Dataset keyword Minimum range [m]. Default None. rmax : float. Dataset keyword Maximum range [m]. Default None. elmin : float. Dataset keyword Minimum elevation angle [deg]. Default None. elmax : float. Dataset keyword Maximum elevation angle [deg]. Default None. azrad1min : float. Dataset keyword Minimum azimuth angle [deg] for radar 1. Default None. azrad1max : float. Dataset keyword Maximum azimuth angle [deg] for radar 1. Default None. azrad2min : float. Dataset keyword Minimum azimuth angle [deg] for radar 2. Default None. azrad2max : float. Dataset keyword Maximum azimuth angle [deg] for radar 2. Default None. radar_list : list of Radar objects Optional. list of radar objects Returns ------- new_dataset : dict dictionary containing the field "colocated_gates" ind_rad : int radar index """ if procstatus != 1: return None, None # check how many radars are there radarnr_dict = dict() ind_radar_list = set() for datatypedescr in dscfg["datatype"]: radarnr = datatypedescr.split(":")[0] radarnr_dict.update({radarnr: []}) ind_radar_list.add(int(radarnr[5:8]) - 1) ind_radar_list = list(ind_radar_list) if (len(radarnr_dict) != 2) or (len(radar_list) < 2): warn("Intercomparison requires data from two different radars") return None, None # create the list of data types for each radar for datatypedescr in dscfg["datatype"]: radarnr, _, datatype, _, _ = get_datatype_fields(datatypedescr) if radarnr in radarnr_dict: radarnr_dict[radarnr].append(get_fieldname_pyart(datatype)) radar1 = radar_list[ind_radar_list[0]] radar2 = radar_list[ind_radar_list[1]] if radar1 is None or radar2 is None: warn("Unable to inter-compare radars. Missing radar") if "instrument_name" in radar1.metadata: print("Radar 1: " + radar1.metadata["instrument_name"]) if "instrument_name" in radar2.metadata: print("Radar 2: " + radar2.metadata["instrument_name"]) coloc_gates_field = "colocated_gates" h_tol = dscfg.get("h_tol", 100.0) latlon_tol = dscfg.get("latlon_tol", 0.0005) vol_d_tol = dscfg.get("vol_d_tol", 100.0) vismin = dscfg.get("vismin", None) hmin = dscfg.get("hmin", None) hmax = dscfg.get("hmax", None) rmin = dscfg.get("rmin", None) rmax = dscfg.get("rmax", None) elmin = dscfg.get("elmin", None) elmax = dscfg.get("elmax", None) azrad1min = dscfg.get("azrad1min", None) azrad1max = dscfg.get("azrad1max", None) azrad2min = dscfg.get("azrad2min", None) azrad2max = dscfg.get("azrad2max", None) visib_field = None if "visibility" in radarnr_dict["RADAR" + "{:03d}".format(ind_radar_list[0] + 1)]: visib_field = "visibility" # older IDL visibility codes elif ( "visibility_polar" in radarnr_dict["RADAR" + "{:03d}".format(ind_radar_list[0] + 1)] ): visib_field = "visibility_polar" # GECSX format if vismin is not None and visib_field is None: warn( "Unable to filter data according to visibility. " + "Visibility field for RADAR" + "{:03d}".format(ind_radar_list[0] + 1) + " not available" ) gate_coloc_rad1_dict = pyart.util.intersection( radar1, radar2, h_tol=h_tol, latlon_tol=latlon_tol, vol_d_tol=vol_d_tol, vismin=vismin, hmin=hmin, hmax=hmax, rmin=rmin, rmax=rmax, elmin=elmin, elmax=elmax, azmin=azrad1min, azmax=azrad1max, visib_field=visib_field, intersec_field=coloc_gates_field, ) visib_field = None if "visibility" in radarnr_dict["RADAR" + "{:03d}".format(ind_radar_list[1] + 1)]: visib_field = "visibility" # older IDL visibility codes elif ( "visibility_polar" in radarnr_dict["RADAR" + "{:03d}".format(ind_radar_list[1] + 1)] ): visib_field = "visibility_polar" # GECSX format if vismin is not None and visib_field is None: warn( "Unable to filter data according to visibility. " + "Visibility field for RADAR" + "{:03d}".format(ind_radar_list[1] + 1) + " not available" ) gate_coloc_rad2_dict = pyart.util.intersection( radar2, radar1, h_tol=h_tol, latlon_tol=latlon_tol, vol_d_tol=vol_d_tol, vismin=vismin, hmin=hmin, hmax=hmax, rmin=rmin, rmax=rmax, elmin=elmin, elmax=elmax, azmin=azrad2min, azmax=azrad2max, visib_field=visib_field, intersec_field=coloc_gates_field, ) new_rad1 = deepcopy(radar1) new_rad1.fields = dict() new_rad1.add_field("colocated_gates", gate_coloc_rad1_dict) new_rad2 = deepcopy(radar2) new_rad2.fields = dict() new_rad2.add_field("colocated_gates", gate_coloc_rad2_dict) coloc_rad1_dict, new_rad1.fields["colocated_gates"] = pyart.util.colocated_gates( new_rad1, new_rad2, h_tol=h_tol, latlon_tol=latlon_tol, coloc_gates_field=coloc_gates_field, ) coloc_rad2_dict, new_rad2.fields["colocated_gates"] = pyart.util.colocated_gates( new_rad2, new_rad1, h_tol=h_tol, latlon_tol=latlon_tol, coloc_gates_field=coloc_gates_field, ) # prepare output rad1_dict = {"coloc_dict": coloc_rad1_dict, "radar_out": new_rad1} rad2_dict = {"coloc_dict": coloc_rad2_dict, "radar_out": new_rad2} new_dataset = { "RADAR" + "{:03d}".format(ind_radar_list[0] + 1): rad1_dict, "RADAR" + "{:03d}".format(ind_radar_list[1] + 1): rad2_dict, } return new_dataset, ind_radar_list
[docs] def process_intercomp(procstatus, dscfg, radar_list=None): """ intercomparison between two radars at co-located gates. The variables compared must be of the same type. Parameters ---------- procstatus : int Processing status: 0 initializing, 1 processing volume, 2 post-processing dscfg : dictionary of dictionaries data set configuration. Accepted Configuration Keywords:: datatype : list of string. Dataset keyword The input data types (one for every radar). Any arbitrary datatype supported by pyrad and available in both radar is accepted. colocgatespath : string. base path to the file containing the coordinates of the co-located gates coloc_radars_name : string. Dataset keyword string identifying the radar names rays_are_indexed : bool. Dataset keyword If True it is considered that the rays are indexed and that the data can be selected simply looking at the ray number. Default false azi_tol : float. Dataset keyword azimuth tolerance between the two radars. Default 0.5 deg ele_tol : float. Dataset keyword elevation tolerance between the two radars. Default 0.5 deg rng_tol : float. Dataset keyword range tolerance between the two radars. Default 50 m coloc_data_dir : string. Dataset keyword name of the directory containing the csv file with colocated data radar_list : list of Radar objects Optional. list of radar objects Returns ------- new_dataset : dict dictionary containing a dictionary with intercomparison data and the key "final" which contains a boolean that is true when all volumes have been processed ind_rad : int radar index """ # check how many radars have to be compared and which datatype to use ind_radar_list = [] field_name_list = [] datatype_list = [] for datatypedescr in dscfg["datatype"]: radarnr, _, datatype, _, _ = get_datatype_fields(datatypedescr) field_name = get_fieldname_pyart(datatype) ind_radar_list.append(int(radarnr[5:8]) - 1) datatype_list.append(datatype) field_name_list.append(field_name) ind_radar_list = np.array(ind_radar_list) datatype_list = np.array(datatype_list) field_name_list = np.array(field_name_list) if (ind_radar_list.size != 2) or (np.unique(ind_radar_list).size < 2): warn("Intercomparison requires data from two different radars") return None, None if np.unique(field_name_list).size > 1: warn("Intercomparison must be performed on the same variable") return None, None if procstatus == 0: savedir = dscfg["colocgatespath"] + dscfg["coloc_radars_name"] + "/" prdtype = "info" if "prdtype" in dscfg: prdtype = dscfg["prdtype"] fname = make_filename( prdtype, "COLOCATED_GATES", dscfg["coloc_radars_name"], ["csv"], timeinfo=None, )[0] ( rad1_ray_ind, rad1_rng_ind, rad1_ele, rad1_azi, rad1_rng, rad2_ray_ind, rad2_rng_ind, rad2_ele, rad2_azi, rad2_rng, ) = read_colocated_gates(savedir + fname) if rad1_ele is None: raise ValueError( "Unable to intercompare radars. " + "Missing colocated gates file" ) dscfg["global_data"] = { "rad1_ray_ind": rad1_ray_ind, "rad1_rng_ind": rad1_rng_ind, "rad1_ele": rad1_ele, "rad1_azi": rad1_azi, "rad1_rng": rad1_rng, "rad2_ray_ind": rad2_ray_ind, "rad2_rng_ind": rad2_rng_ind, "rad2_ele": rad2_ele, "rad2_azi": rad2_azi, "rad2_rng": rad2_rng, } return None, None if procstatus == 1: radar1 = radar_list[ind_radar_list[0]] radar2 = radar_list[ind_radar_list[1]] if radar1 is None or radar2 is None: warn("Unable to inter-compare radars. Missing radar") return None, None if (field_name not in radar1.fields) or (field_name not in radar2.fields): warn( "Unable to get values of field " + field_name + " at colocated range bins. " + "Field missing in one of the radars" ) return None, None if not dscfg["initialized"]: dscfg["global_data"].update({"timeinfo": dscfg["timeinfo"]}) dscfg["global_data"].update( {"rad1_name": dscfg["RadarName"][ind_radar_list[0]]} ) dscfg["global_data"].update( {"rad2_name": dscfg["RadarName"][ind_radar_list[1]]} ) dscfg["initialized"] = 1 rad1_field = radar1.fields[field_name]["data"] rad2_field = radar2.fields[field_name]["data"] intercomp_dict = { "rad1_time": [], "rad1_ray_ind": [], "rad1_rng_ind": [], "rad1_ele": [], "rad1_azi": [], "rad1_rng": [], "rad1_val": [], "rad2_time": [], "rad2_ray_ind": [], "rad2_rng_ind": [], "rad2_ele": [], "rad2_azi": [], "rad2_rng": [], "rad2_val": [], "rad1_name": dscfg["global_data"]["rad1_name"], "rad2_name": dscfg["global_data"]["rad2_name"], } # determine if radar data has to be averaged avg_rad1, avg_rad2, avg_rad_lim = get_range_bins_to_avg( radar1.range["data"], radar2.range["data"] ) # rays are indexed to regular grid rays_are_indexed = dscfg.get("rays_are_indexed", False) if not rays_are_indexed: azi_tol = dscfg.get("azi_tol", 0.5) ele_tol = dscfg.get("ele_tol", 0.5) rng_tol = dscfg.get("rng_tol", 50.0) ( rad1_ray_ind, rad1_rng_ind, rad2_ray_ind, rad2_rng_ind, ) = find_colocated_indexes( radar1, radar2, dscfg["global_data"]["rad1_ele"], dscfg["global_data"]["rad1_azi"], dscfg["global_data"]["rad1_rng"], dscfg["global_data"]["rad2_ele"], dscfg["global_data"]["rad2_azi"], dscfg["global_data"]["rad2_rng"], ele_tol=ele_tol, azi_tol=azi_tol, rng_tol=rng_tol, ) else: rad1_ray_ind = deepcopy(dscfg["global_data"]["rad1_ray_ind"]) rad1_rng_ind = deepcopy(dscfg["global_data"]["rad1_rng_ind"]) rad2_ray_ind = deepcopy(dscfg["global_data"]["rad2_ray_ind"]) rad2_rng_ind = deepcopy(dscfg["global_data"]["rad2_rng_ind"]) # keep only indices of valid gates val1_vec = rad1_field[rad1_ray_ind, rad1_rng_ind] val2_vec = rad2_field[rad1_ray_ind, rad1_rng_ind] mask_val1 = np.ma.getmaskarray(val1_vec) mask_val2 = np.ma.getmaskarray(val2_vec) isvalid = np.logical_not(np.logical_or(mask_val1, mask_val2)) rad1_ray_ind = rad1_ray_ind[isvalid] rad1_rng_ind = rad1_rng_ind[isvalid] rad2_ray_ind = rad2_ray_ind[isvalid] rad2_rng_ind = rad2_rng_ind[isvalid] # if averaging required loop over valid gates and average if avg_rad1: ngates_valid = len(rad1_ray_ind) val1_vec = np.ma.masked_all(ngates_valid, dtype=float) is_valid_avg = np.zeros(ngates_valid, dtype=bool) for i in range(ngates_valid): if rad1_rng_ind[i] + avg_rad_lim[1] >= radar1.ngates: continue if rad1_rng_ind[i] + avg_rad_lim[0] < 0: continue ind_rng = list( range( rad1_rng_ind[i] + avg_rad_lim[0], rad1_rng_ind[i] + avg_rad_lim[1] + 1, ) ) if np.any(np.ma.getmaskarray(rad1_field[rad1_ray_ind[i], ind_rng])): continue val1_vec[i] = np.ma.asarray( np.ma.mean(rad1_field[rad1_ray_ind[i], ind_rng]) ) is_valid_avg[i] = True rad1_ray_ind = rad1_ray_ind[is_valid_avg] rad1_rng_ind = rad1_rng_ind[is_valid_avg] rad2_ray_ind = rad2_ray_ind[is_valid_avg] rad2_rng_ind = rad2_rng_ind[is_valid_avg] val1_vec = val1_vec[is_valid_avg] val2_vec = rad2_field[rad2_ray_ind, rad2_rng_ind] elif avg_rad2: ngates_valid = len(rad2_ray_ind) val2_vec = np.ma.masked_all(ngates_valid, dtype=float) is_valid_avg = np.zeros(ngates_valid, dtype=bool) for i in range(ngates_valid): if rad2_rng_ind[i] + avg_rad_lim[1] >= radar2.ngates: continue if rad2_rng_ind[i] + avg_rad_lim[0] < 0: continue ind_rng = list( range( rad2_rng_ind[i] + avg_rad_lim[0], rad2_rng_ind[i] + avg_rad_lim[1] + 1, ) ) if np.any(np.ma.getmaskarray(rad2_field[rad2_ray_ind[i], ind_rng])): continue val2_vec[i] = np.ma.asarray( np.ma.mean(rad2_field[rad2_ray_ind[i], ind_rng]) ) is_valid_avg[i] = True rad1_ray_ind = rad1_ray_ind[is_valid_avg] rad1_rng_ind = rad1_rng_ind[is_valid_avg] rad2_ray_ind = rad2_ray_ind[is_valid_avg] rad2_rng_ind = rad2_rng_ind[is_valid_avg] val2_vec = val2_vec[is_valid_avg] val1_vec = rad1_field[rad1_ray_ind, rad1_rng_ind] else: val1_vec = val1_vec[isvalid] val2_vec = val2_vec[isvalid] intercomp_dict["rad1_time"] = num2date( radar1.time["data"][rad1_ray_ind], radar1.time["units"], radar1.time["calendar"], ) intercomp_dict["rad1_ray_ind"] = rad1_ray_ind intercomp_dict["rad1_rng_ind"] = rad1_rng_ind intercomp_dict["rad1_ele"] = radar1.elevation["data"][rad1_ray_ind] intercomp_dict["rad1_azi"] = radar1.azimuth["data"][rad1_ray_ind] intercomp_dict["rad1_rng"] = radar1.range["data"][rad1_rng_ind] intercomp_dict["rad1_val"] = val1_vec intercomp_dict["rad2_time"] = num2date( radar2.time["data"][rad2_ray_ind], radar2.time["units"], radar2.time["calendar"], ) intercomp_dict["rad2_ray_ind"] = rad2_ray_ind intercomp_dict["rad2_rng_ind"] = rad2_rng_ind intercomp_dict["rad2_ele"] = radar2.elevation["data"][rad2_ray_ind] intercomp_dict["rad2_azi"] = radar2.azimuth["data"][rad2_ray_ind] intercomp_dict["rad2_rng"] = radar2.range["data"][rad2_rng_ind] intercomp_dict["rad2_val"] = val2_vec new_dataset = { "intercomp_dict": intercomp_dict, "timeinfo": dscfg["global_data"]["timeinfo"], "final": False, } return new_dataset, None if procstatus == 2: tseries_prod = [ prod for prod in dscfg["products"] if "WRITE_INTERCOMP" in dscfg["products"][prod]["type"] ][0] savedir = get_save_dir( dscfg["basepath"], dscfg["procname"], dscfg["dsname"], tseries_prod, timeinfo=dscfg["global_data"]["timeinfo"], create_dir=False, ) fname = make_filename( "colocated_data", dscfg["type"], datatype, ["csv"], timeinfo=dscfg["global_data"]["timeinfo"], timeformat="%Y%m%d", ) fname = savedir + fname[0] coloc_data = read_colocated_data(fname) intercomp_dict = { "rad1_name": dscfg["global_data"]["rad1_name"], "rad1_time": coloc_data[0], "rad1_ray_ind": coloc_data[1], "rad1_rng_ind": coloc_data[2], "rad1_ele": coloc_data[3], "rad1_azi": coloc_data[4], "rad1_rng": coloc_data[5], "rad1_val": coloc_data[6], "rad2_name": dscfg["global_data"]["rad2_name"], "rad2_time": coloc_data[7], "rad2_ray_ind": coloc_data[8], "rad2_rng_ind": coloc_data[9], "rad2_ele": coloc_data[10], "rad2_azi": coloc_data[11], "rad2_rng": coloc_data[12], "rad2_val": coloc_data[13], } new_dataset = { "intercomp_dict": intercomp_dict, "timeinfo": dscfg["global_data"]["timeinfo"], "final": True, } return new_dataset, None
[docs] def process_intercomp_time_avg(procstatus, dscfg, radar_list=None): """ intercomparison between the average reflectivity of two radars Parameters ---------- procstatus : int Processing status: 0 initializing, 1 processing volume, 2 post-processing dscfg : dictionary of dictionaries data set configuration. Accepted Configuration Keywords:: datatype : list of string. Dataset keyword The input data types, must contain dBZ" or "dBZc" or "dBuZ" or "dBZv" or "dBZvc" or "dBuZv, and, "PhiDP" or "PhiDPc", and, "time_avg_flag" for the two radars colocgatespath : string. base path to the file containing the coordinates of the co-located gates coloc_data_dir : string. Dataset keyword name of the directory containing the csv file with colocated data coloc_radars_name : string. Dataset keyword string identifying the radar names rays_are_indexed : bool. Dataset keyword If True it is considered that the rays are indexed and that the data can be selected simply looking at the ray number. Default false azi_tol : float. Dataset keyword azimuth tolerance between the two radars. Default 0.5 deg ele_tol : float. Dataset keyword elevation tolerance between the two radars. Default 0.5 deg rng_tol : float. Dataset keyword range tolerance between the two radars. Default 50 m clt_max : int. Dataset keyword maximum number of samples that can be clutter contaminated. Default 100 i.e. all phi_excess_max : int. Dataset keyword maximum number of samples that can have excess instantaneous PhiDP. Default 100 i.e. all non_rain_max : int. Dataset keyword maximum number of samples that can be no rain. Default 100 i.e. all phi_avg_max : float. Dataset keyword maximum average PhiDP allowed. Default 600 deg i.e. any radar_list : list of Radar objects Optional. list of radar objects Returns ------- new_dataset : dict dictionary containing a dictionary with intercomparison data and the key "final" which contains a boolean that is true when all volumes have been processed ind_rad : int radar index """ if procstatus == 0: savedir = dscfg["colocgatespath"] + dscfg["coloc_radars_name"] + "/" prdtype = "info" if "prdtype" in dscfg: prdtype = dscfg["prdtype"] fname = make_filename( prdtype, "COLOCATED_GATES", dscfg["coloc_radars_name"], ["csv"], timeinfo=None, )[0] ( rad1_ray_ind, rad1_rng_ind, rad1_ele, rad1_azi, rad1_rng, rad2_ray_ind, rad2_rng_ind, rad2_ele, rad2_azi, rad2_rng, ) = read_colocated_gates(savedir + fname) if rad1_ele is None: raise ValueError( "Unable to intercompare radars. " + "Missing colocated gates file" ) dscfg["global_data"] = { "rad1_ray_ind": rad1_ray_ind, "rad1_rng_ind": rad1_rng_ind, "rad1_ele": rad1_ele, "rad1_azi": rad1_azi, "rad1_rng": rad1_rng, "rad2_ray_ind": rad2_ray_ind, "rad2_rng_ind": rad2_rng_ind, "rad2_ele": rad2_ele, "rad2_azi": rad2_azi, "rad2_rng": rad2_rng, } return None, None if procstatus == 1: # check how many radars are there ind_radar_list = set() for datatypedescr in dscfg["datatype"]: radarnr = datatypedescr.split(":")[0] ind_radar_list.add(int(radarnr[5:8]) - 1) ind_radar_list = list(ind_radar_list) if (len(ind_radar_list) != 2) or (len(radar_list) < 2): warn("Intercomparison requires data from two different radars") return None, None radarnr_list = [ "RADAR" + "{:03d}".format(ind_radar_list[0] + 1), "RADAR" + "{:03d}".format(ind_radar_list[1] + 1), ] # get field names for datatypedescr in dscfg["datatype"]: radarnr, _, datatype, _, _ = get_datatype_fields(datatypedescr) if radarnr == radarnr_list[0]: if datatype in ("dBZ", "dBZc", "dBuZ", "dBZv", "dBZvc", "dBuZv"): rad1_refl_field = get_fieldname_pyart(datatype) elif datatype in ("PhiDP", "PhiDPc"): rad1_phidp_field = get_fieldname_pyart(datatype) elif datatype == "time_avg_flag": rad1_flag_field = get_fieldname_pyart(datatype) elif radarnr == radarnr_list[1]: if datatype in ("dBZ", "dBZc", "dBuZ", "dBZv", "dBZvc", "dBuZv"): rad2_refl_field = get_fieldname_pyart(datatype) elif datatype in ("PhiDP", "PhiDPc"): rad2_phidp_field = get_fieldname_pyart(datatype) elif datatype == "time_avg_flag": rad2_flag_field = get_fieldname_pyart(datatype) radar1 = radar_list[ind_radar_list[0]] radar2 = radar_list[ind_radar_list[1]] dscfg["global_data"].update({"timeinfo": dscfg["timeinfo"]}) if radar1 is None or radar2 is None: warn("Unable to inter-compare radars. Missing radar") return None, None if ( (rad1_refl_field not in radar1.fields) or (rad1_phidp_field not in radar1.fields) or (rad1_flag_field not in radar1.fields) or (rad2_refl_field not in radar2.fields) or (rad2_phidp_field not in radar2.fields) or (rad2_flag_field not in radar2.fields) ): warn("Unable to compare radar time avg fields. " + "Fields missing") return None, None if not dscfg["initialized"]: dscfg["global_data"].update( {"rad1_name": dscfg["RadarName"][ind_radar_list[0]]} ) dscfg["global_data"].update( {"rad2_name": dscfg["RadarName"][ind_radar_list[1]]} ) dscfg["initialized"] = 1 refl1 = radar1.fields[rad1_refl_field]["data"] refl2 = radar2.fields[rad2_refl_field]["data"] phidp1 = radar1.fields[rad1_phidp_field]["data"] phidp2 = radar2.fields[rad2_phidp_field]["data"] flag1 = radar1.fields[rad1_flag_field]["data"] flag2 = radar2.fields[rad2_flag_field]["data"] intercomp_dict = { "rad1_time": [], "rad1_ray_ind": [], "rad1_rng_ind": [], "rad1_ele": [], "rad1_azi": [], "rad1_rng": [], "rad1_dBZavg": [], "rad1_PhiDPavg": [], "rad1_Flagavg": [], "rad2_time": [], "rad2_ray_ind": [], "rad2_rng_ind": [], "rad2_ele": [], "rad2_azi": [], "rad2_rng": [], "rad2_dBZavg": [], "rad2_PhiDPavg": [], "rad2_Flagavg": [], "rad1_name": dscfg["global_data"]["rad1_name"], "rad2_name": dscfg["global_data"]["rad2_name"], } # determine if radar data has to be averaged avg_rad1, avg_rad2, avg_rad_lim = get_range_bins_to_avg( radar1.range["data"], radar2.range["data"] ) # rays are indexed to regular grid rays_are_indexed = dscfg.get("rays_are_indexed", False) # get current radars gates indices if not rays_are_indexed: azi_tol = dscfg.get("azi_tol", 0.5) ele_tol = dscfg.get("ele_tol", 0.5) rng_tol = dscfg.get("rng_tol", 50.0) ( rad1_ray_ind, rad1_rng_ind, rad2_ray_ind, rad2_rng_ind, ) = find_colocated_indexes( radar1, radar2, dscfg["global_data"]["rad1_ele"], dscfg["global_data"]["rad1_azi"], dscfg["global_data"]["rad1_rng"], dscfg["global_data"]["rad2_ele"], dscfg["global_data"]["rad2_azi"], dscfg["global_data"]["rad2_rng"], ele_tol=ele_tol, azi_tol=azi_tol, rng_tol=rng_tol, ) else: rad1_ray_ind = deepcopy(dscfg["global_data"]["rad1_ray_ind"]) rad1_rng_ind = deepcopy(dscfg["global_data"]["rad1_rng_ind"]) rad2_ray_ind = deepcopy(dscfg["global_data"]["rad2_ray_ind"]) rad2_rng_ind = deepcopy(dscfg["global_data"]["rad2_rng_ind"]) # keep only indices and data of valid gates refl1_vec = refl1[rad1_ray_ind, rad1_rng_ind] phidp1_vec = phidp1[rad1_ray_ind, rad1_rng_ind] flag1_vec = flag1[rad1_ray_ind, rad1_rng_ind] refl2_vec = refl2[rad2_ray_ind, rad2_rng_ind] phidp2_vec = phidp2[rad2_ray_ind, rad2_rng_ind] flag2_vec = flag2[rad2_ray_ind, rad2_rng_ind] mask_refl1 = np.ma.getmaskarray(refl1_vec) mask_phidp1 = np.ma.getmaskarray(phidp1_vec) mask_refl2 = np.ma.getmaskarray(refl2_vec) mask_phidp2 = np.ma.getmaskarray(phidp2_vec) isvalid = np.logical_not( np.logical_or( np.logical_or(mask_refl1, mask_refl2), np.logical_or(mask_phidp1, mask_phidp2), ) ) rad1_ray_ind = rad1_ray_ind[isvalid] rad1_rng_ind = rad1_rng_ind[isvalid] rad2_ray_ind = rad2_ray_ind[isvalid] rad2_rng_ind = rad2_rng_ind[isvalid] # if averaging required loop over valid gates and average # only if all gates valid if avg_rad1: ngates_valid = len(rad1_ray_ind) refl1_vec = np.ma.masked_all(ngates_valid, dtype=float) phidp1_vec = np.ma.masked_all(ngates_valid, dtype=float) flag1_vec = np.ma.masked_all(ngates_valid, dtype=int) is_valid_avg = np.zeros(ngates_valid, dtype=bool) for i in range(ngates_valid): if rad1_rng_ind[i] + avg_rad_lim[1] >= radar1.ngates: continue if rad1_rng_ind[i] + avg_rad_lim[0] < 0: continue ind_rng = list( range( rad1_rng_ind[i] + avg_rad_lim[0], rad1_rng_ind[i] + avg_rad_lim[1] + 1, ) ) if np.any(np.ma.getmaskarray(refl1[rad1_ray_ind[i], ind_rng])): continue if np.any(np.ma.getmaskarray(phidp1[rad1_ray_ind[i], ind_rng])): continue refl1_vec[i] = np.ma.asarray( np.ma.mean(refl1[rad1_ray_ind[i], ind_rng]) ) phidp1_vec[i] = np.ma.asarray( np.ma.mean(phidp1[rad1_ray_ind[i], ind_rng]) ) rad1_flag = flag1[rad1_ray_ind[i], ind_rng] rad1_excess_phi = rad1_flag % 100 rad1_clt = ((rad1_flag - rad1_excess_phi) % 10000) / 100 rad1_prec = ( (rad1_flag - rad1_clt * 100 - rad1_excess_phi) % 1000000 ) / 10000 flag1_vec[i] = int( 10000 * np.max(rad1_prec) + 100 * np.max(rad1_clt) + np.max(rad1_excess_phi) ) is_valid_avg[i] = True rad1_ray_ind = rad1_ray_ind[is_valid_avg] rad1_rng_ind = rad1_rng_ind[is_valid_avg] rad2_ray_ind = rad2_ray_ind[is_valid_avg] rad2_rng_ind = rad2_rng_ind[is_valid_avg] refl1_vec = refl1_vec[is_valid_avg] phidp1_vec = phidp1_vec[is_valid_avg] flag1_vec = flag1_vec[is_valid_avg] refl2_vec = refl2[rad2_ray_ind, rad2_rng_ind] phidp2_vec = phidp2[rad2_ray_ind, rad2_rng_ind] flag2_vec = flag2[rad2_ray_ind, rad2_rng_ind] elif avg_rad2: ngates_valid = len(rad2_ray_ind) refl2_vec = np.ma.masked_all(ngates_valid, dtype=float) phidp2_vec = np.ma.masked_all(ngates_valid, dtype=float) flag2_vec = np.ma.masked_all(ngates_valid, dtype=int) is_valid_avg = np.zeros(ngates_valid, dtype=bool) for i in range(ngates_valid): if rad2_rng_ind[i] + avg_rad_lim[1] >= radar2.ngates: continue if rad2_rng_ind[i] + avg_rad_lim[0] < 0: continue ind_rng = list( range( rad2_rng_ind[i] + avg_rad_lim[0], rad2_rng_ind[i] + avg_rad_lim[1] + 1, ) ) if np.any(np.ma.getmaskarray(refl2[rad2_ray_ind[i], ind_rng])): continue if np.any(np.ma.getmaskarray(phidp2[rad2_ray_ind[i], ind_rng])): continue refl2_vec[i] = np.ma.asarray( np.ma.mean(refl2[rad2_ray_ind[i], ind_rng]) ) phidp2_vec[i] = np.ma.asarray( np.ma.mean(phidp2[rad2_ray_ind[i], ind_rng]) ) rad2_flag = flag2[rad2_ray_ind[i], ind_rng] rad2_excess_phi = rad2_flag % 100 rad2_clt = ((rad2_flag - rad2_excess_phi) % 10000) / 100 rad2_prec = ( (rad2_flag - rad2_clt * 100 - rad2_excess_phi) % 1000000 ) / 10000 flag2_vec[i] = int( 10000 * np.max(rad2_prec) + 100 * np.max(rad2_clt) + np.max(rad2_excess_phi) ) is_valid_avg[i] = True rad1_ray_ind = rad1_ray_ind[is_valid_avg] rad1_rng_ind = rad1_rng_ind[is_valid_avg] rad2_ray_ind = rad2_ray_ind[is_valid_avg] rad2_rng_ind = rad2_rng_ind[is_valid_avg] refl2_vec = refl2_vec[is_valid_avg] phidp2_vec = phidp2_vec[is_valid_avg] flag2_vec = flag2_vec[is_valid_avg] refl1_vec = refl1[rad1_ray_ind, rad1_rng_ind] phidp1_vec = phidp1[rad1_ray_ind, rad1_rng_ind] flag1_vec = flag1[rad1_ray_ind, rad1_rng_ind] else: refl1_vec = refl1_vec[isvalid] phidp1_vec = phidp1_vec[isvalid] flag1_vec = flag1_vec[isvalid] refl2_vec = refl2_vec[isvalid] phidp2_vec = phidp2_vec[isvalid] flag2_vec = flag2_vec[isvalid] intercomp_dict["rad1_time"] = np.empty( len(rad1_ray_ind), dtype=datetime.datetime ) intercomp_dict["rad1_time"][:] = dscfg["timeinfo"] intercomp_dict["rad1_ray_ind"] = rad1_ray_ind intercomp_dict["rad1_rng_ind"] = rad1_rng_ind intercomp_dict["rad1_ele"] = radar1.elevation["data"][rad1_ray_ind] intercomp_dict["rad1_azi"] = radar1.azimuth["data"][rad1_ray_ind] intercomp_dict["rad1_rng"] = radar1.range["data"][rad1_rng_ind] intercomp_dict["rad1_dBZavg"] = refl1_vec intercomp_dict["rad1_PhiDPavg"] = phidp1_vec intercomp_dict["rad1_Flagavg"] = flag1_vec intercomp_dict["rad2_time"] = deepcopy(intercomp_dict["rad1_time"]) intercomp_dict["rad2_ray_ind"] = rad2_ray_ind intercomp_dict["rad2_rng_ind"] = rad2_rng_ind intercomp_dict["rad2_ele"] = radar2.elevation["data"][rad2_ray_ind] intercomp_dict["rad2_azi"] = radar2.azimuth["data"][rad2_ray_ind] intercomp_dict["rad2_rng"] = radar2.range["data"][rad2_rng_ind] intercomp_dict["rad2_dBZavg"] = refl2_vec intercomp_dict["rad2_PhiDPavg"] = phidp2_vec intercomp_dict["rad2_Flagavg"] = flag2_vec new_dataset = { "intercomp_dict": intercomp_dict, "timeinfo": dscfg["global_data"]["timeinfo"], "final": False, } return new_dataset, None if procstatus == 2: # get field name refl_type = None for datatypedescr in dscfg["datatype"]: radarnr, _, datatype, _, _ = get_datatype_fields(datatypedescr) if datatype in ("dBZ", "dBZc", "dBuZ", "dBZv", "dBZvc", "dBuZv"): refl_type = datatype break if refl_type is None: warn("Unknown reflectivity type") return None, None tseries_prod = [ prod for prod in dscfg["products"] if "WRITE_INTERCOMP" in dscfg["products"][prod]["type"] ][0] savedir = get_save_dir( dscfg["basepath"], dscfg["procname"], dscfg["dsname"], tseries_prod, timeinfo=dscfg["global_data"]["timeinfo"], create_dir=False, ) fname = make_filename( "colocated_data", dscfg["type"], refl_type, ["csv"], timeinfo=dscfg["global_data"]["timeinfo"], timeformat="%Y%m%d", ) fname = savedir + fname[0] ( rad1_time, rad1_ray_ind, rad1_rng_ind, rad1_ele, rad1_azi, rad1_rng, rad1_dBZ, rad1_phi, rad1_flag, rad2_time, rad2_ray_ind, rad2_rng_ind, rad2_ele, rad2_azi, rad2_rng, rad2_dBZ, rad2_phi, rad2_flag, ) = read_colocated_data_time_avg(fname) if rad1_time is None: return None, None rad1_excess_phi = (rad1_flag % 100).astype(int) rad2_excess_phi = (rad2_flag % 100).astype(int) rad1_clt = (((rad1_flag - rad1_excess_phi) % 10000) / 100).astype(int) rad2_clt = (((rad2_flag - rad2_excess_phi) % 10000) / 100).astype(int) rad1_non_rain = ( ((rad1_flag - rad1_clt * 100 - rad1_excess_phi) % 1000000) / 10000 ).astype(int) rad2_non_rain = ( ((rad2_flag - rad2_clt * 100 - rad2_excess_phi) % 1000000) / 10000 ).astype(int) clt_max = dscfg.get("clt_max", 100) phi_excess_max = dscfg.get("phi_excess_max", 100) non_rain_max = dscfg.get("non_rain_max", 100) phi_avg_max = dscfg.get("phi_avg_max", 600.0) # filter out invalid data ind_val = np.where( np.logical_and.reduce( ( rad1_clt <= clt_max, rad2_clt <= clt_max, rad1_excess_phi <= phi_excess_max, rad2_excess_phi <= phi_excess_max, rad1_non_rain <= non_rain_max, rad2_non_rain <= non_rain_max, rad1_phi <= phi_avg_max, rad2_phi <= phi_avg_max, ) ) )[0] intercomp_dict = { "rad1_name": dscfg["global_data"]["rad1_name"], "rad1_time": rad1_time[ind_val], "rad1_ray_ind": rad1_ray_ind[ind_val], "rad1_rng_ind": rad1_rng_ind[ind_val], "rad1_ele": rad1_ele[ind_val], "rad1_azi": rad1_azi[ind_val], "rad1_rng": rad1_rng[ind_val], "rad1_val": rad1_dBZ[ind_val], "rad2_name": dscfg["global_data"]["rad2_name"], "rad2_time": rad2_time[ind_val], "rad2_ray_ind": rad1_ray_ind[ind_val], "rad2_rng_ind": rad1_rng_ind[ind_val], "rad2_ele": rad2_ele[ind_val], "rad2_azi": rad2_azi[ind_val], "rad2_rng": rad2_rng[ind_val], "rad2_val": rad2_dBZ[ind_val], } new_dataset = { "intercomp_dict": intercomp_dict, "timeinfo": dscfg["global_data"]["timeinfo"], "final": True, } return new_dataset, None
[docs] def process_fields_diff(procstatus, dscfg, radar_list=None): """ Computes the field difference between RADAR001 and radar002, i.e. RADAR001-RADAR002. Assumes both radars have the same geometry Parameters ---------- procstatus : int Processing status: 0 initializing, 1 processing volume, 2 post-processing dscfg : dictionary of dictionaries data set configuration. Accepted Configuration Keywords:: datatype : list of string. Dataset keyword The input data types for each radar, Any datatype supported by pyrad is supported radar_list : list of Radar objects Optional. list of radar objects Returns ------- new_dataset : dict dictionary containing a radar object containing the field differences ind_rad : int radar index """ if procstatus != 1: return None, None # check how many radars are there radarnr_dict = dict() ind_radar_list = set() for datatypedescr in dscfg["datatype"]: radarnr = datatypedescr.split(":")[0] radarnr_dict.update({radarnr: []}) ind_radar_list.add(int(radarnr[5:8]) - 1) ind_radar_list = list(ind_radar_list) if (len(radarnr_dict) != 2) or (len(radar_list) < 2): warn("Intercomparison requires data from two different radars") return None, None # create the list of data types for each radar radarnr, _, datatype, _, _ = get_datatype_fields(dscfg["datatype"][0]) field_name_1 = get_fieldname_pyart(datatype) radarnr, _, datatype, _, _ = get_datatype_fields(dscfg["datatype"][1]) field_name_2 = get_fieldname_pyart(datatype) radar1 = radar_list[ind_radar_list[0]] radar2 = radar_list[ind_radar_list[1]] if radar1 is None or radar2 is None: warn("Unable to inter-compare radars. Missing radar") return None, None if (field_name_1 not in radar1.fields) or (field_name_2 not in radar2.fields): warn( "Unable to compare fields " + field_name_1 + "and " + field_name_2 + ". Field missing in one of the radars" ) return None, None field_diff = pyart.config.get_metadata("fields_difference") field_diff["data"] = ( radar1.fields[field_name_1]["data"] - radar2.fields[field_name_2]["data"] ) field_diff["long_name"] = field_name_1 + " - " + field_name_2 rad_diff = deepcopy(radar1) rad_diff.fields = dict() rad_diff.add_field("fields_difference", field_diff) new_dataset = {"radar_out": rad_diff} return new_dataset, None
[docs] def process_intercomp_fields(procstatus, dscfg, radar_list=None): """ intercomparison between two radars Parameters ---------- procstatus : int Processing status: 0 initializing, 1 processing volume, 2 post-processing dscfg : dictionary of dictionaries data set configuration. Accepted Configuration Keywords:: datatype : list of string. Dataset keyword The input data types for each radar, Any datatype supported by pyrad and available in both radars is supported radar_list : list of Radar objects Optional. list of radar objects Returns ------- new_dataset : dict dictionary containing a dictionary with intercomparison data ind_rad : int radar index """ if procstatus != 1: return None, None # check how many radars are there radarnr_dict = dict() ind_radar_list = set() for datatypedescr in dscfg["datatype"]: radarnr = datatypedescr.split(":")[0] radarnr_dict.update({radarnr: []}) ind_radar_list.add(int(radarnr[5:8]) - 1) ind_radar_list = list(ind_radar_list) if (len(radarnr_dict) != 2) or (len(radar_list) < 2): warn("Intercomparison requires data from two different radars") return None, None # create the list of data types for each radar radarnr, _, datatype, _, _ = get_datatype_fields(dscfg["datatype"][0]) field_name_1 = get_fieldname_pyart(datatype) radarnr, _, datatype, _, _ = get_datatype_fields(dscfg["datatype"][1]) field_name_2 = get_fieldname_pyart(datatype) radar1 = radar_list[ind_radar_list[0]] radar2 = radar_list[ind_radar_list[1]] if radar1 is None or radar2 is None: warn("Unable to inter-compare radars. Missing radar") return None, None if (field_name_1 not in radar1.fields) or (field_name_2 not in radar2.fields): warn( "Unable to compare fields " + field_name_1 + " and " + field_name_2 + ". Field missing in one of the radars" ) return None, None data1 = deepcopy(radar1.fields[field_name_1]["data"]) data2 = deepcopy(radar2.fields[field_name_2]["data"]) mask1 = np.ma.getmaskarray(data1) mask2 = np.ma.getmaskarray(data2) data1[mask2] = np.ma.masked data2[mask1] = np.ma.masked intercomp_dict = { "rad1_name": dscfg["RadarName"][ind_radar_list[0]], "rad1_val": data1.compressed(), "rad2_name": dscfg["RadarName"][ind_radar_list[1]], "rad2_val": data2.compressed(), } new_dataset = { "intercomp_dict": intercomp_dict, "timeinfo": dscfg["timeinfo"], "final": False, } return new_dataset, None