Source code for pyrad.io.write_data

"""
pyrad.io.write_data
====================

Functions for writing pyrad output data

.. autosummary::
    :toctree: generated/

    write_to_s3
    write_vol_csv
    write_vol_kml
    write_centroids
    write_proc_periods
    write_fixed_angle
    write_ts_lightning
    send_msg
    write_alarm_msg
    write_last_state
    write_smn
    write_timeseries_point
    write_trt_info
    write_trt_cell_data
    write_trt_thundertracking_data
    write_trt_cell_scores
    write_trt_cell_lightning
    write_trt_rpc
    write_vpr_theo_params
    write_vpr_info
    write_rhi_profile
    write_field_coverage
    write_cdf
    write_histogram
    write_quantiles
    write_multiple_points
    write_multiple_points_grid
    write_ts_polar_data
    write_ts_grid_data
    write_ts_ml
    write_ts_stats
    write_ts_cum
    write_monitoring_ts
    write_excess_gates
    write_intercomp_scores_ts
    write_colocated_gates
    write_colocated_data
    write_colocated_data_time_avg
    write_sun_hits
    write_sun_retrieval

"""

from __future__ import print_function
import glob
import csv
import os

from ..util import warn
import smtplib
from email.message import EmailMessage

import datetime

import numpy as np
import pandas as pd

from pyart.config import get_fillvalue, get_field_colormap, get_field_limits
from pyart.core import antenna_to_cartesian, cartesian_to_geographic_aeqd
from cmweather.cm import cmap_d

from .io_aux import generate_field_name_str
from .flock_utils import lock_file, unlock_file

try:
    import simplekml

    _SIMPLEKML_AVAILABLE = True
except ImportError:
    _SIMPLEKML_AVAILABLE = False

try:
    import boto3

    _BOTO3_AVAILABLE = True
except ImportError:
    warn(
        "boto3 is not installed, no copy to S3 bucket will be performed!",
        use_debug=False,
    )
    _BOTO3_AVAILABLE = False


def write_to_s3(
    fname,
    basepath,
    s3endpoint,
    s3bucket,
    s3key,
    s3secret,
    s3path="",
    s3accesspolicy=None,
    s3splitext=False,
    s3verify=True,
    s3certificates="",
):
    """
    Copies a locally stored product to a S3 bucket

    Parameters
    ----------
    fname : str
        filename of the product on the local storage
    basepath : str
        basepath of pyrad products as provided with datapath key in
        the main conf file
    s3endpoint : str
        s3 endpoint URL
        for example
        fr-par-1.linodeobjects.com/ or
        https://eu-central-1.linodeobjects.com/ or
        https://s3.us-east-1.amazonaws.com/
        https:// prefix is optional!
        or
    s3bucket : str
        name of the s3 bucket to use to access the data
        the URL that will be polled is
        https://s3bucket.s3endpoint
    s3key: str
        AWS API access key for the specified bucket
    s3secret: str
        AWS API secret for the corresponding s3key
    s3path : str
        name of the path where to store the data on the s3 bucket
        the files will have the following url
        https://s3bucket.s3endpoint/s3path/filename
        Optional: default is empty string (place them at the root of the bucket)
    s3accesspolicy : str
        S3 access policy can be either private or public-read
        Default is to use nothing which will inherit the policy of the bucket
    s3splitext: bool
        If True, the extension will be added before the filename in the s3 name
        For example folderA/folderB/folderC/out.csv will become folderA/folderB/folderC/csv/out.csv
        on the S3.
    s3verify: bool
        If False, ssl verification will be disabled when writing data to S3. Beware this can
        be dangerous
    s3certificates: str
        Path of the certificates used for ssl verification. Only required if
        you use custom certificates.
    """
    if not _BOTO3_AVAILABLE:
        warn("boto3 not installed, aborting writing to s3")
        return

    if not s3endpoint.startswith("https://"):  # add prefix
        s3endpoint = f"https://{s3endpoint}"
    endpoint_raw = s3endpoint.replace("https://", "")
    if s3path:
        if not s3path.endswith("/"):
            s3path += "/"
    else:
        s3path = ""

    if s3splitext:
        ext = os.path.splitext(fname)[1][1:]
        fname_bis = os.path.join(os.path.dirname(fname), ext, os.path.basename(fname))
    else:
        fname_bis = fname
    s3fname = s3path + fname_bis.replace(basepath, "")
    if s3fname.startswith("/"):  # Remove leading /
        s3fname = s3fname[1:]
    s3_client_config = {
        "aws_access_key_id": s3key,
        "endpoint_url": s3endpoint,
        "aws_secret_access_key": s3secret,
    }
    if len(s3certificates):
        s3verify = s3certificates

    s3_client = boto3.client("s3", **s3_client_config, verify=s3verify)
    if s3accesspolicy:
        ExtraArgs = {"ACL": s3accesspolicy}
    else:
        ExtraArgs = None
    s3copypath = f"https://{s3bucket}.{endpoint_raw}/{s3fname}"
    s3_client.upload_file(fname, s3bucket, s3fname, ExtraArgs=ExtraArgs)
    print(f"----- copying {fname} to {s3copypath}")


[docs] def write_vol_csv(fname, radar, field_name, ignore_masked=False): """ Creates a kml file with the radar volume data Parameters ---------- fname : str kml file name radar : radar object radar object containing the data field_name : str field name to convert ignore_masked : bool if True the masked values will be ignored Returns ------- fname : str the name of the file containing the content """ df = pd.DataFrame() df["lon"] = radar.gate_longitude["data"][0, :] df["lat"] = radar.gate_latitude["data"][0, :] df["alt"] = radar.gate_altitude["data"][0, :] if radar.azimuth["data"].size == df.shape[0]: df["azi"] = radar.azimuth["data"] df["ele"] = radar.elevation["data"] df["rng"] = radar.range["data"] df[field_name] = radar.fields[field_name]["data"][0, :] if ignore_masked: df.dropna(inplace=True) df.to_csv(fname, index=False) return fname
[docs] def write_vol_kml( fname, radar, field_name, ignore_masked=False, rng_res_km=0.24, azi_res=0.5 ): """ Creates a kml file with the radar volume data Parameters ---------- fname : str kml file name radar : radar object radar object containing the data field_name : str field name to convert ignore_masked : bool if True the masked values will be ignored rng_res_km : float range resolution in km azi_res : float azimuth resolution (deg) Returns ------- fname : str the name of the file containing the content """ if not _SIMPLEKML_AVAILABLE: warn("simplekml module required to write kml files") return None df = pd.DataFrame() df["lon"] = radar.gate_longitude["data"][0, :] df["lat"] = radar.gate_latitude["data"][0, :] df["alt"] = radar.gate_altitude["data"][0, :] if radar.azimuth["data"].size == df.shape[0]: df["azi"] = radar.azimuth["data"] df["ele"] = radar.elevation["data"] df["rng"] = radar.range["data"] df[field_name] = radar.fields[field_name]["data"][0, :] if ignore_masked: df.dropna(inplace=True) x1, y1, z1 = antenna_to_cartesian( df["rng"].values / 1000.0 - rng_res_km / 2.0, df["azi"].values - azi_res / 2, df["ele"].values, ) x2, y2, z2 = antenna_to_cartesian( df["rng"].values / 1000.0 - rng_res_km / 2.0, df["azi"].values + azi_res / 2, df["ele"].values, ) x3, y3, z3 = antenna_to_cartesian( df["rng"].values / 1000.0 + rng_res_km / 2.0, df["azi"].values + azi_res / 2, df["ele"].values, ) x4, y4, z4 = antenna_to_cartesian( df["rng"].values / 1000.0 + rng_res_km / 2.0, df["azi"].values - azi_res / 2, df["ele"].values, ) lon1, lat1 = cartesian_to_geographic_aeqd( x1, y1, radar.longitude["data"][0], radar.latitude["data"][0] ) lon2, lat2 = cartesian_to_geographic_aeqd( x2, y2, radar.longitude["data"][0], radar.latitude["data"][0] ) lon3, lat3 = cartesian_to_geographic_aeqd( x3, y3, radar.longitude["data"][0], radar.latitude["data"][0] ) lon4, lat4 = cartesian_to_geographic_aeqd( x4, y4, radar.longitude["data"][0], radar.latitude["data"][0] ) df["lon1"] = lon1 df["lon2"] = lon2 df["lon3"] = lon3 df["lon4"] = lon4 df["lat1"] = lat1 df["lat2"] = lat2 df["lat3"] = lat3 df["lat4"] = lat4 df["alt1"] = z1 df["alt2"] = z2 df["alt3"] = z3 df["alt4"] = z4 kml = simplekml.Kml() df.apply( lambda X: kml.newpolygon( name=f'azi={X["azi"]}, ele={X["ele"]}, rng={X["rng"]}', outerboundaryis=[ (X["lon1"], X["lat1"], X["alt1"]), (X["lon2"], X["lat2"], X["alt2"]), (X["lon3"], X["lat3"], X["alt3"]), (X["lon4"], X["lat4"], X["alt4"]), (X["lon1"], X["lat1"], X["alt1"]), ], altitudemode="absolute", ), axis=1, ) # color value cmap = cmap_d[get_field_colormap(field_name).split("_")[1]] vmin, vmax = get_field_limits(field_name) col_number = (df[field_name].values - vmin) / (vmax - vmin) for ind, pol in enumerate(kml.features): r, g, b, a = cmap(col_number[ind], bytes=True) pol.style.polystyle.color = simplekml.Color.rgb(r, g, b) kml.save(fname) return fname
[docs] def write_centroids(fname, centroids_dict, var_names): """ writes the centroids of a semi-supervised hydrometeor classification Parameters ---------- fname : str file name centroids_dict : dict dictionary containing the centroids var_names : tuple List of variables Returns ------- fname : str the name of the file containing the content """ with open(fname, "w", newline="") as csvfile: csvfile.write("# Centroids file\n") csvfile.write("# Header lines with comments are preceded by '#'\n") csvfile.write("#\n") field_names = np.append("hydro_type", var_names) writer = csv.DictWriter(csvfile, field_names) writer.writeheader() for hydro_type in centroids_dict.keys(): dict_row = {"hydro_type": hydro_type} for ivar, var_name in enumerate(var_names): dict_row.update({var_name: centroids_dict[hydro_type][ivar]}) writer.writerow(dict_row) csvfile.close() return fname
[docs] def write_proc_periods(start_times, end_times, fname): """ writes an output file containing start and stop times of periods to process Parameters ---------- start_times, end_times : datetime object The starting and ending times of the periods fname : str The name of the file where to write Returns ------- fname : str the name of the file containing the content """ with open(fname, "w", newline="") as csvfile: field_names = ["start_time", "end_time"] writer = csv.DictWriter(csvfile, field_names) writer.writeheader() for start_time, end_time in zip(start_times, end_times): dict_row = { "start_time": start_time.strftime("%Y%m%d%H%M%S"), "end_time": end_time.strftime("%Y%m%d%H%M%S"), } writer.writerow(dict_row) csvfile.close() return fname
[docs] def write_fixed_angle(time_data, fixed_angle, rad_lat, rad_lon, rad_alt, fname): """ writes an output file with the fixed angle data Parameters ---------- time_data : datetime object The scan time fixed_angle : float The first fixed angle in the scan rad_lat, rad_lon, rad_alt : float Latitude, longitude [deg] and altitude [m MSL] of the radar fname : str The name of the file where to write Returns ------- fname : str the name of the file containing the content """ filelist = glob.glob(fname) if not filelist: with open(fname, "w", newline="") as csvfile: fieldnames = ["date_time", "rad_lat", "rad_lon", "rad_alt", "fixed_angle"] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() writer.writerow( { "date_time": time_data.strftime("%Y%m%d%H%M%S"), "rad_lat": rad_lat, "rad_lon": rad_lon, "rad_alt": rad_alt, "fixed_angle": fixed_angle, } ) csvfile.close() else: with open(fname, "a", newline="") as csvfile: fieldnames = ["date_time", "rad_lat", "rad_lon", "rad_alt", "fixed_angle"] writer = csv.DictWriter(csvfile, fieldnames) writer.writerow( { "date_time": time_data.strftime("%Y%m%d%H%M%S"), "rad_lat": rad_lat, "rad_lon": rad_lon, "rad_alt": rad_alt, "fixed_angle": fixed_angle, } ) csvfile.close()
[docs] def write_ts_lightning( flashnr, time_data, time_in_flash, lat, lon, alt, dBm, vals_list, fname, pol_vals_labels, ): """ writes the LMA sources data and the value of the colocated polarimetric variables Parameters ---------- flashnr : int flash number time_data : datetime object flash source time time_in_flash : float seconds since start of flash lat, lon, alt : float latitude, longitude [deg] and altitude [m MSL] of the flash source dBm : float flash power vals_list : list of arrays List containing the data for each polarimetric variable fname : str the name of the file containing the content pol_values_labels : list of strings List containing strings identifying each polarimetric variable Returns ------- fname : str the name of the file containing the content """ with open(fname, "w", newline="") as csvfile: vals_list_aux = [] for j, label in enumerate(pol_vals_labels): vals_list_aux.append(vals_list[j].filled(fill_value=get_fillvalue())) csvfile.write("# Weather radar timeseries data file\n") csvfile.write("# Project: MALSplus\n") if time_data.size > 0: csvfile.write( "# Start : %s UTC\n" % time_data[0].strftime("%Y-%m-%d %H:%M:%S") ) csvfile.write( "# End : %s UTC\n" % time_data[-1].strftime("%Y-%m-%d %H:%M:%S") ) csvfile.write("# Header lines with comments are preceded by '#'\n") csvfile.write("#\n") field_names = [ "flashnr", "time_data", "time_in_flash", "lat", "lon", "alt", "dBm", ] field_names.extend(pol_vals_labels) writer = csv.DictWriter(csvfile, field_names) writer.writeheader() if flashnr.size == 0.0: warn("No data to write in file " + fname) csvfile.close() return fname for i, flash in enumerate(flashnr): dict_row = { "flashnr": int(flash), "time_data": time_data[i].strftime("%Y-%m-%d %H:%M:%S.%f"), "time_in_flash": time_in_flash[i], "lat": lat[i], "lon": lon[i], "alt": alt[i], "dBm": dBm[i], } for j, label in enumerate(pol_vals_labels): dict_row.update({label: vals_list_aux[j][i]}) writer.writerow(dict_row) csvfile.close() return fname
[docs] def send_msg(sender, receiver_list, subject, fname): """ sends the content of a text file by email Parameters ---------- sender : str the email address of the sender receiver_list : list of string list with the email addresses of the receiver subject : str the subject of the email fname : str name of the file containing the content of the email message Returns ------- fname : str the name of the file containing the content """ # Create the container email message. msg = EmailMessage() msg["Subject"] = subject msg["From"] = sender msg["To"] = ", ".join(receiver_list) # Open the plain text file whose name is in fname for reading. with open(fname) as fp: # Create a text/plain message msg.set_content(fp.read()) # Send the message via our own SMTP server. with smtplib.SMTP("localhost") as s: s.send_message(msg) return fname
[docs] def write_alarm_msg( radar_name, param_name_unit, date_last, target, tol_abs, np_trend, value_trend, tol_trend, nevents, np_last, value_last, fname, ): """ writes an alarm file Parameters ---------- radar_name : str Name of the radar being controlled param_name_unit : str Parameter and units date_last : datetime object date of the current event target, tol_abs : float Target value and tolerance np_trend : int Total number of points in trend value_trend, tol_trend : float Trend value and tolerance nevents: int Number of events in trend np_last : int Number of points in the current event value_last : float Value of the current event fname : str Name of file where to store the alarm information Returns ------- fname : str the name of the file where data has written """ with open(fname, "w", newline="") as txtfile: txtfile.write("Weather radar polarimetric parameters monitoring alarm\n") if radar_name is not None: txtfile.write("Radar name: " + radar_name + "\n") txtfile.write("Parameter [Unit]: " + param_name_unit + "\n") txtfile.write("Date : " + date_last.strftime("%Y%m%d") + "\n") txtfile.write("Target value: " + str(target) + " +/- " + str(tol_abs) + "\n") if np_trend > 0: txtfile.write( "Number of points in trend: " + str(np_trend) + " in " + str(nevents) + " events\n" ) txtfile.write( "Trend value: " + str(value_trend) + " (tolerance: +/- " + str(tol_trend) + ")\n" ) else: txtfile.write("Number of points in trend: NA\n") txtfile.write("Trend value: NA\n") txtfile.write("Number of points: " + str(np_last) + "\n") txtfile.write("Value: " + str(value_last) + "\n") txtfile.close() return fname
[docs] def write_last_state(datetime_last, fname): """ writes SwissMetNet data in format datetime,avg_value, std_value Parameters ---------- datetime_last : datetime object date and time of the last state fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ try: with open(fname, "w", newline="") as txtfile: txtfile.write(datetime_last.strftime("%Y%m%d%H%M%S")) txtfile.close() return fname except EnvironmentError: warn("Unable to write on file " + fname) return None
[docs] def write_smn(datetime_vec, value_avg_vec, value_std_vec, fname): """ writes SwissMetNet data in format datetime,avg_value, std_value Parameters ---------- datetime_vec : datetime array array containing the measurement time value_avg_vec : float array array containing the average value value_std_vec : float array array containing the standard deviation fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ with open(fname, "w", newline="") as csvfile: fieldnames = ["datetime", "avg", "std"] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, value_avg in enumerate(value_avg_vec): writer.writerow( { "datetime": datetime_vec[i].strftime("%Y%m%d%H%M%S"), "avg": value_avg, "std": value_std_vec[i], } ) csvfile.close() return fname
def write_timeseries_point(fname, data, dstype, text, timeformat=None, timeinfo=None): """ Write one timesample of a time series to a file Parameters ---------- data : time series structure with the fields: time : Time sample in Julian calendar format. label[] : Header of each value column value[] : Data value of each column dataType : Type of the data cfginfo : Information string describing the kind of data in the text file. This string is part of the generated file name. text : Text with description of the data in the file. Must be a string array. Each string is written to a new line an the comment symbol is set at the beginning. Keywords -------- timeformat: If set, the date time column is set according to the time format number. If not set, the time format is 'YYYY-MM-DD, <seconds of day>' timeinfo : Used for the time stamp of the output file. If not set, data.time is used to generate it. """ datatime = data["time"].strftime("%Y-%m-%d %H:%M:%S") data["value"] datatypename = dstype unit = data["unit"] print("----- Write timeseries ", fname) if not os.path.isfile(fname): # File does not exist. Open it and fill in header info. with open(fname, "w", newline="") as csvfile: csvfile.write("# Weather radar timeseries data file\n") csvfile.write("# Project: MALSplus\n") csvfile.write("# Data/Unit : " + datatypename + " [" + unit + "]\n") csvfile.write("# Start : " + datatime + " UTC\n") csvfile.write("# Header lines with comments are preceded by '#'\n") for line in text: csvfile.write("# " + line + "\n") csvfile.write("#\n") if timeformat is None: label_str = "# Date [YYYY-MM-DD hh:mm:ss]" tformat = "%Y-%m-%d %H:%M:%S" time_str_old = datetime.datetime.strptime(datatime, tformat).replace( tzinfo=datetime.timezone.utc ) time_str = time_str_old.strftime(tformat) else: label_str = "# Date [" + timeformat + "]" tformat = timeformat time_str = datatime.strftime(tformat) for label in data["label"]: label_str = label_str + ", " + label csvfile.write(label_str + "\n") for value in data["value"]: time_str = time_str + ", " + ("%.4f" % value) csvfile.write(time_str + "\n") else: if timeformat is None: tformat = "%Y-%m-%d %H:%M:%S" time_str_old = datetime.datetime.strptime(datatime, tformat).replace( tzinfo=datetime.timezone.utc ) time_str = time_str_old.strftime(tformat) else: tformat = timeformat time_str = datatime.strftime(tformat) with open(fname, "a", newline="") as csvfile: for value in data["value"]: time_str = time_str + ", " + ("%.4f" % value) csvfile.write(time_str + "\n") csvfile.close() return fname
[docs] def write_trt_info(ids, max_rank, nscans, time_start, time_end, fname): """ writes TRT info of the thundertracking Parameters ---------- ids, max_rank, nscans, time_start, time_end: array the cell parameters fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ with open(fname, "w", newline="") as csvfile: fieldnames = ["id", "max_rank", "nscans_Xband", "time_start", "time_end"] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, id_cell in enumerate(ids): writer.writerow( { "id": id_cell, "max_rank": max_rank[i], "nscans_Xband": nscans[i], "time_start": time_start[i].strftime("%Y%m%d%H%M"), "time_end": time_end[i].strftime("%Y%m%d%H%M"), } ) csvfile.close() return fname
[docs] def write_trt_cell_data( traj_ID, yyyymmddHHMM, lon, lat, ell_L, ell_S, ell_or, area, vel_x, vel_y, det, RANKr, CG_n, CG_p, CG, CG_percent_p, ET45, ET45m, ET15, ET15m, VIL, maxH, maxHm, POH, RANK, Dvel_x, Dvel_y, cell_contour, fname, ): """ writes TRT cell data Parameters ---------- traj_ID, yyyymmddHHMM, lon, lat, ell_L, ell_S, ell_or, area, vel_x, vel_y, det, RANKr, CG_n, CG_p, CG, CG_percent_p, ET45, ET45m, ET15, ET15m, VIL, maxH, maxHm, POH, RANK, Dvel_x, Dvel_y, cell_contour: the cell parameters fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ try: with open(fname, "w", newline="") as csvfile: fieldnames = [ "traj_ID", "yyyymmddHHMM", "lon", "lat", "ell_L", "ell_S", "ell_or", "area", "vel_x", "vel_y", "det", "RANKr", "CG-", "CG+", "CG", "%CG+", "ET45", "ET45m", "ET15", "ET15m", "VIL", "maxH", "maxHm", "POH", "RANK", "Dvel_x", "Dvel_y", "cell_contour_lon-lat", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, traj_ID_el in enumerate(traj_ID): cell_contour_aux = cell_contour[i] npoints_contour = len(cell_contour_aux["lon"]) cell_contour_arr = np.empty(2 * npoints_contour, dtype=float) cell_contour_arr[0:-1:2] = cell_contour_aux["lon"] cell_contour_arr[1::2] = cell_contour_aux["lat"] cell_contour_str = str(cell_contour_arr[0]) for j in range(1, 2 * npoints_contour): cell_contour_str += " " + str(cell_contour_arr[j]) writer.writerow( { "traj_ID": traj_ID_el, "yyyymmddHHMM": yyyymmddHHMM[i].strftime("%Y%m%d%H%M"), "lon": lon[i], "lat": lat[i], "ell_L": ell_L[i], "ell_S": ell_S[i], "ell_or": ell_or[i], "area": area[i], "vel_x": vel_x[i], "vel_y": vel_y[i], "det": det[i], "RANKr": RANKr[i], "CG-": CG_n[i], "CG+": CG_p[i], "CG": CG[i], "%CG+": CG_percent_p[i], "ET45": ET45[i], "ET45m": ET45m[i], "ET15": ET15[i], "ET15m": ET15m[i], "VIL": VIL[i], "maxH": maxH[i], "maxHm": maxHm[i], "POH": POH[i], "RANK": RANK[i], "Dvel_x": Dvel_x[i], "Dvel_y": Dvel_y[i], "cell_contour_lon-lat": cell_contour_str, } ) csvfile.close() return fname except EnvironmentError: warn("Unable to write on file " + fname) return None
[docs] def write_trt_thundertracking_data( traj_ID, scan_ordered_time, scan_time, azi, rng, yyyymmddHHMM, lon, lat, ell_L, ell_S, ell_or, area, vel_x, vel_y, det, RANKr, CG_n, CG_p, CG, CG_percent_p, ET45, ET45m, ET15, ET15m, VIL, maxH, maxHm, POH, RANK, Dvel_x, Dvel_y, cell_contour, fname, ): """ writes TRT cell data of the thundertracking scan Parameters ---------- traj_ID, scan_ordered_time, scan_time, azi, rng, yyyymmddHHMM, lon, lat, ell_L, ell_S, ell_or, area, vel_x, vel_y, det, RANKr, CG_n, CG_p, CG, CG_percent_p, ET45, ET45m, ET15, ET15m, VIL, maxH, maxHm, POH, RANK, Dvel_x, Dvel_y, cell_contour: the cell parameters fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ try: with open(fname, "w", newline="") as csvfile: fieldnames = [ "traj_ID", "scan_ordered_time", "scan_time", "azi", "rng", "yyyymmddHHMM", "lon", "lat", "ell_L", "ell_S", "ell_or", "area", "vel_x", "vel_y", "det", "RANKr", "CG-", "CG+", "CG", "%CG+", "ET45", "ET45m", "ET15", "ET15m", "VIL", "maxH", "maxHm", "POH", "RANK", "Dvel_x", "Dvel_y", "cell_contour_lon-lat", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, traj_ID_el in enumerate(traj_ID): cell_contour_aux = cell_contour[i] npoints_contour = len(cell_contour_aux["lon"]) cell_contour_arr = np.empty(2 * npoints_contour, dtype=float) cell_contour_arr[0:-1:2] = cell_contour_aux["lon"] cell_contour_arr[1::2] = cell_contour_aux["lat"] cell_contour_str = str(cell_contour_arr[0]) for j in range(1, 2 * npoints_contour): cell_contour_str += " " + str(cell_contour_arr[j]) if np.ma.is_masked(scan_time[i]): scan_time_str = get_fillvalue() else: scan_time_str = scan_time[i].strftime("%Y%m%d%H%M%S") writer.writerow( { "traj_ID": traj_ID_el, "scan_ordered_time": scan_ordered_time[i].strftime( "%Y%m%d%H%M%S.%f" ), "scan_time": scan_time_str, "azi": azi[i], "rng": rng[i], "yyyymmddHHMM": yyyymmddHHMM[i].strftime("%Y%m%d%H%M"), "lon": lon[i], "lat": lat[i], "ell_L": ell_L[i], "ell_S": ell_S[i], "ell_or": ell_or[i], "area": area[i], "vel_x": vel_x[i], "vel_y": vel_y[i], "det": det[i], "RANKr": RANKr[i], "CG-": CG_n[i], "CG+": CG_p[i], "CG": CG[i], "%CG+": CG_percent_p[i], "ET45": ET45[i], "ET45m": ET45m[i], "ET15": ET15[i], "ET15m": ET15m[i], "VIL": VIL[i], "maxH": maxH[i], "maxHm": maxHm[i], "POH": POH[i], "RANK": RANK[i], "Dvel_x": Dvel_x[i], "Dvel_y": Dvel_y[i], "cell_contour_lon-lat": cell_contour_str, } ) csvfile.close() return fname except EnvironmentError: warn("Unable to write on file " + fname) return None
[docs] def write_trt_cell_scores( traj_ID, flash_density_max_time, flash_density_max_rank, nflashes_max_list, area_flash_max_list, flash_density_max, rank_max_time, rank_max, fname, ): """ writes TRT cells scores Parameters ---------- traj_ID : array of ints The ID of the cells flash_density_max_time : array of date times The time at which the maximum flash density was reached for each cell flash_density_max_rank : array of floats The rank when the maximum flash density was reached for each cell nflashes_max_list : array of ints the number of flashes when the max flash density was reached area_flash_max_list : array of floats The area when the max flash density was reached flash_density_max : array of floats The maximum flash density for each cell rank_max_time : array of datetime the time at wich the maximum rank of each cell was reached rank_max : array of float the rank when the maximum rank of each cell was reached fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ with open(fname, "w", newline="") as csvfile: fieldnames = [ "traj ID", "max flash density time", "max flash density rank", "max flash density flashes", "max flash density area", "max flash density", "max rank time", "max rank", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, traj_ID_el in enumerate(traj_ID): writer.writerow( { "traj ID": traj_ID_el, "max flash density time": flash_density_max_time[i].strftime( "%Y-%m-%d %H:%M:%S" ), "max flash density rank": flash_density_max_rank[i], "max flash density flashes": nflashes_max_list[i], "max flash density area": area_flash_max_list[i], "max flash density": flash_density_max[i], "max rank time": rank_max_time[i].strftime("%Y-%m-%d %H:%M:%S"), "max rank": rank_max[i], } ) csvfile.close() return fname
[docs] def write_trt_cell_lightning( cell_ID, cell_time, lon, lat, area, rank, nflash, flash_density, fname, timeformat="%Y%m%d%H%M", ): """ writes the lightning data for each TRT cell Parameters ---------- cell_ID : array of ints the cell ID cell_time : array of datetime the time step lon, lat : array of floats the latitude and longitude of the center of the cell area : array of floats the area of the cell rank : array of floats the rank of the cell nflash : array of ints the number of flashes/sources within the cell flash_density : array of floats the flash/source density fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ nflash = nflash.filled(fill_value=get_fillvalue()) flash_density = flash_density.filled(fill_value=get_fillvalue()) with open(fname, "w", newline="") as csvfile: fieldnames = [ "traj_ID", "yyyymmddHHMM", "lon", "lat", "area", "RANKr", "nflashes", "flash_dens", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, traj_ID_el in enumerate(cell_ID): writer.writerow( { "traj_ID": traj_ID_el, "yyyymmddHHMM": cell_time[i].strftime(timeformat), "lon": lon[i], "lat": lat[i], "area": area[i], "RANKr": rank[i], "nflashes": nflash[i], "flash_dens": flash_density[i], } ) csvfile.close() return fname
[docs] def write_trt_rpc( cell_ID, cell_time, lon, lat, area, rank, hmin, hmax, freq, fname, timeformat="%Y%m%d%H%M", ): """ writes the rimed particles column data for a TRT cell Parameters ---------- cell_ID : array of ints the cell ID cell_time : array of datetime the time step lon, lat : array of floats the latitude and longitude of the center of the cell area : array of floats the area of the cell rank : array of floats the rank of the cell hmin, hmax : array of floats Minimum and maximum altitude of the rimed particle column freq : array of floats Frequency of the species constituting the rime particle column within the limits of it fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ hmin = hmin.filled(fill_value=get_fillvalue()) hmax = hmax.filled(fill_value=get_fillvalue()) freq = freq.filled(fill_value=get_fillvalue()) with open(fname, "w", newline="") as csvfile: fieldnames = [ "traj_ID", "yyyymmddHHMM", "lon", "lat", "area", "RANKr", "hmin", "hmax", "freq", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, traj_ID_el in enumerate(cell_ID): writer.writerow( { "traj_ID": traj_ID_el, "yyyymmddHHMM": cell_time[i].strftime(timeformat), "lon": lon[i], "lat": lat[i], "area": area[i], "RANKr": rank[i], "hmin": hmin[i], "hmax": hmax[i], "freq": freq[i], } ) csvfile.close() return fname
[docs] def write_vpr_theo_params( vpr_theo_dict, fname, labels=("ml_top", "ml_bottom", "val_ml_peak", "val_dr") ): """ writes the parameters defining a theoretical VPR profile into a csv file Parameters ---------- vpr_theo_dict : dict dictionary containing the parameters of the VPR profile fname : str file name where to store the data labels : list of str new of parameters to write in file Returns ------- fname : str the name of the file where data has been written """ with open(fname, "w", newline="") as csvfile: writer = csv.DictWriter(csvfile, labels) writer.writeheader() data_dict = dict((k, vpr_theo_dict[k]) for k in labels) writer.writerow(data_dict) csvfile.close() return fname
[docs] def write_vpr_info(vpr_info_dict, fname): """ Writes a dictionary containing relevant parameters of the VPR profile retrieval into a text file Parameters ---------- vpr_info_dict : dict dictionary containing the parameters of the VPR profile fname : str file name where to store the data Returns ------- fname : str the name of the file where data has been written """ with open(fname, "w", newline="") as csvfile: writer = csv.DictWriter(csvfile, vpr_info_dict.keys()) writer.writeheader() writer.writerow(vpr_info_dict) csvfile.close() return fname
[docs] def write_rhi_profile( hvec, data, nvalid_vec, labels, fname, datatype=None, timeinfo=None, sector=None ): """ writes the values of an RHI profile in a text file Parameters ---------- hvec : float array array containing the alitude in m MSL data : list of float array the quantities at each altitude nvalid_vec : int array or None number of valid data points used to compute the quantiles labels : list of strings label specifying the quantitites in data fname : str file name where to store the data datatype : str the data type timeinfo : datetime object time of the rhi profile sector : dict dictionary specying the sector limits Returns ------- fname : str the name of the file where data has been written """ data_aux = list() for data_points in data: data_aux.append(data_points.filled(fill_value=get_fillvalue())) with open(fname, "w", newline="") as csvfile: csvfile.write("# Height Profile\n") csvfile.write("# ==============\n") csvfile.write("#\n") if datatype is None: csvfile.write("# Datatype (Unit) : Not specified\n") else: csvfile.write("# Datatype (Unit) : " + datatype + "\n") csvfile.write("# Fill value : " + str(get_fillvalue()) + "\n") if timeinfo is None: csvfile.write("# Time : Not specified\n") else: csvfile.write( "# Time : " + timeinfo.strftime("%Y-%m-%d %H:%M:%S") + "\n" ) if sector is None: csvfile.write("# Sector specification: None\n") else: csvfile.write("# Sector specification:\n") csvfile.write("# Azimuth : " + str(sector["az"]) + " deg\n") csvfile.write("# Range start : " + str(sector["rmin"]) + " m\n") csvfile.write("# Range stop : " + str(sector["rmax"]) + " m\n") fieldnames = ["Altitude [m MSL]"] fieldnames.extend(labels) if nvalid_vec is not None: fieldnames.append("N valid") writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for j, height in enumerate(hvec): data_dict = {fieldnames[0]: height} for i in range(len(labels)): data_dict.update({fieldnames[i + 1]: data_aux[i][j]}) if nvalid_vec is not None: data_dict.update({fieldnames[-1]: nvalid_vec[j]}) writer.writerow(data_dict) csvfile.close() return fname
[docs] def write_field_coverage( quantiles, values, ele_start, ele_stop, azi_start, azi_stop, threshold, nvalid_min, datatype, timeinfo, fname, ): """ writes the quantiles of the coverage on a particular sector Parameters ---------- quantiles : datetime array array containing the quantiles computed values : float array quantile value ele_start, ele_stop, azi_start, azi_stop : float The limits of the sector threshold : float The minimum value to consider the data valid nvalid_min : int the minimum number of points to consider that there are values in a ray datatype : str data type and units timeinfo : datetime object the time stamp of the data fname : str name of the file where to write the data Returns ------- fname : str the name of the file where data has written """ with open(fname, "w", newline="") as csvfile: csvfile.write("# Quantiles of the field coverage\n") csvfile.write("# Datatype (Unit) : " + datatype + "\n") csvfile.write("# Time : " + timeinfo.strftime("%Y-%m-%d %H:%M:%S") + "\n") csvfile.write("# Sector specification:\n") csvfile.write("# Azimuth start : " + str(azi_start) + " deg\n") csvfile.write("# Azimuth stop : " + str(azi_stop) + " deg\n") csvfile.write("# Elevation start : " + str(ele_start) + " deg\n") csvfile.write("# Elevation stop : " + str(ele_stop) + " deg\n") csvfile.write("# Threshold : " + str(threshold) + "\n") csvfile.write( "# Minimum number of valid gates per ray : " + str(nvalid_min) + "\n" ) fieldnames = ["Quantile [%]", "Rain extension [m]"] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, quant in enumerate(quantiles): writer.writerow({"Quantile [%]": quant, "Rain extension [m]": values[i]}) csvfile.close() return fname
[docs] def write_cdf( quantiles, values, ntot, nnan, nclut, nblocked, nprec_filter, noutliers, ncdf, fname, use_nans=False, nan_value=0.0, filterprec=[], vismin=None, sector=None, datatype=None, timeinfo=None, ): """ writes a cumulative distribution function Parameters ---------- quantiles : datetime array array containing the measurement time values : float array array containing the average value fname : float array array containing the standard deviation sector : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ hydrotype_list = ["NC", "DS", "CR", "LR", "GR", "RN", "VI", "WS", "MH", "IH/HDG"] with open(fname, "w", newline="") as txtfile: txtfile.write("Statistical analysis\n") txtfile.write("====================\n\n") if datatype is None: txtfile.write("Datatype (Unit) : Not specified\n") else: txtfile.write("Datatype (Unit) : " + datatype + "\n") if timeinfo is None: txtfile.write("Time : Not specified\n") else: txtfile.write("Time : " + timeinfo.strftime("%Y-%m-%d %H:%M:%S") + "\n") if sector is None: txtfile.write("Sector specification: None\n") else: txtfile.write("Sector specification:\n") if sector["rmin"] is None: txtfile.write(" Range start : Not specified\n") else: txtfile.write(" Range start : " + str(sector["rmin"]) + " m\n") if sector["rmax"] is None: txtfile.write(" Range stop : Not specified\n") else: txtfile.write(" Range stop : " + str(sector["rmax"]) + " m\n") if sector["azmin"] is None: txtfile.write(" Azimuth start : Not specified\n") else: txtfile.write(" Azimuth start : " + str(sector["azmin"]) + " deg\n") if sector["azmax"] is None: txtfile.write(" Azimuth stop : Not specified\n") else: txtfile.write(" Azimuth stop : " + str(sector["azmax"]) + " deg\n") if sector["elmin"] is None: txtfile.write(" Elevation start : Not specified\n") else: txtfile.write(" Elevation start : " + str(sector["elmin"]) + " deg\n") if sector["elmax"] is None: txtfile.write(" Elevation stop : Not specified\n") else: txtfile.write(" Elevation stop : " + str(sector["elmax"]) + " deg\n") if sector["hmin"] is None: txtfile.write(" Height start : Not specified\n") else: txtfile.write(" Height start : " + str(sector["hmin"]) + " m\n") if sector["hmax"] is None: txtfile.write(" Height stop : Not specified\n") else: txtfile.write(" Height stop : " + str(sector["hmax"]) + " m\n") txtfile.write("") txtfile.write("Total number of gates in sector : " + str(ntot) + "\n") txtfile.write("Number of gates with no value (NaNs) : " + str(nnan) + "\n") if use_nans: txtfile.write(" NaNs are set to : " + str(nan_value) + "\n") else: txtfile.write(" NaNs are ignored!\n") if nclut == -1: txtfile.write("Clutter contaminated gates : " + "Not checked\n") else: txtfile.write("Clutter contaminated gates : " + str(nclut) + "\n") if nblocked == -1: txtfile.write("Blocked gates : " + "Not checked\n") else: txtfile.write( "Blocked gates (vismin = " + str(int(vismin)) + ") : " + str(nblocked) + "\n" ) if nprec_filter == -1: txtfile.write("Filtered precipitation gates : None\n") else: txtfile.write( "Filtered precipitation gates : " + str(nprec_filter) + "\n" ) txtfile.write(" precipitation types filtered: ") for ind_hydro in filterprec: txtfile.write(hydrotype_list[ind_hydro] + " ") txtfile.write("\n") txtfile.write("Number of outliers : " + str(noutliers) + "\n") txtfile.write( "Number of gates used for histogram : " + str(ncdf) + " (" + str(int(ncdf * 100.0 / ntot)) + "%)\n\n" ) txtfile.write("Quantiles\n") txtfile.write("=========\n") for i, value in enumerate(values): txtfile.write( " Quantile_" + str(int(quantiles[i])) + " = " + "{:5.1f}".format(value) + "\n" ) txtfile.close() return fname
[docs] def write_histogram(bin_edges, values, fname, datatype="undefined", step=0): """ writes a histogram Parameters ---------- bin_edges : float array array containing the histogram bin edges values : int array array containing the number of points in each bin fname : str file name datatype :str The data type step : str The bin step Returns ------- fname : str the name of the file where data has written """ with open(fname, "w", newline="") as csvfile: datatype_str = "undefined" if datatype != "undefined": datatype_str = generate_field_name_str(datatype) csvfile.write( "# Weather radar data histogram file\n" + '# Comment lines are preceded by "#"\n' + "# Description:\n" + "# Histogram of weather radar data.\n" + "# Data: " + datatype_str + "\n" + "# Step: " + str(step) + "\n" "#\n" ) fieldnames = ["bin_edge_left", "bin_edge_right", "value"] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, val in enumerate(values): writer.writerow( { "bin_edge_left": bin_edges[i], "bin_edge_right": bin_edges[i + 1], "value": val, } ) csvfile.close() return fname
[docs] def write_quantiles(quantiles, values, fname, datatype="undefined"): """ writes quantiles Parameters ---------- quantiles : float array array containing the quantiles to write values : float array array containing the value of each quantile fname : str file name datatype :str The data type Returns ------- fname : str the name of the file where data has written """ values_aux = values.filled(fill_value=get_fillvalue()) with open(fname, "w", newline="") as csvfile: csvfile.write( "# Weather radar data histogram file\n" + '# Comment lines are preceded by "#"\n' + "# Description:\n" + "# Histogram of weather radar data.\n" + "# Data: " + generate_field_name_str(datatype) + "\n" + "#\n" ) fieldnames = ["quantile", "value"] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, quant in enumerate(quantiles): writer.writerow({"quantile": quant, "value": values_aux[i]}) csvfile.close() return fname
[docs] def write_multiple_points(dataset, fname): """ write radar data obtained at multiple points Parameters ---------- dataset : dict dictionary containing the data fname : str file name Returns ------- fname : str the name of the file where data has written """ val = dataset["value"].filled(fill_value=get_fillvalue()) lon_ref = dataset["used_point_coordinates_WGS84_lon"].filled( fill_value=get_fillvalue() ) lat_ref = dataset["used_point_coordinates_WGS84_lat"].filled( fill_value=get_fillvalue() ) alt_ref = dataset["used_point_coordinates_WGS84_alt"].filled( fill_value=get_fillvalue() ) with open(fname, "w", newline="") as csvfile: csvfile.write( "# Weather radar data at multiple points file\n" + '# Comment lines are preceded by "#"\n' + "# Description:\n" + "# weather radar data at points of interest.\n" + "# Data: " + generate_field_name_str(dataset["datatype"]) + "\n" + "#\n" ) fieldnames = [ "point_ID", "time", "azi", "ele", "rng", "lon", "lat", "alt", "lon_ref", "lat_ref", "alt_ref", "val", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i in range(val.size): time_aux = dataset["time"][i] if not np.ma.is_masked(dataset["time"][i]): time_aux = time_aux.strftime("%Y%m%d%H%M%S") else: time_aux = "NA" writer.writerow( { "point_ID": dataset["point_id"][i], "time": time_aux, "azi": dataset["antenna_coordinates_az"][i], "ele": dataset["antenna_coordinates_el"][i], "rng": dataset["antenna_coordinates_r"][i], "lon": dataset["point_coordinates_WGS84_lon"][i], "lat": dataset["point_coordinates_WGS84_lat"][i], "alt": dataset["point_coordinates_WGS84_alt"][i], "lon_ref": lon_ref[i], "lat_ref": lat_ref[i], "alt_ref": alt_ref[i], "val": val[i], } ) csvfile.close() return fname
[docs] def write_multiple_points_grid(dataset, fname): """ write data obtained at multiple points from a grid Parameters ---------- dataset : dict dictionary containing the data fname : str file name Returns ------- fname : str the name of the file where data has written """ val = dataset["value"].filled(fill_value=get_fillvalue()) lon_ref = dataset["used_point_coordinates_WGS84_lon"].filled( fill_value=get_fillvalue() ) lat_ref = dataset["used_point_coordinates_WGS84_lat"].filled( fill_value=get_fillvalue() ) with open(fname, "w", newline="") as csvfile: csvfile.write( "# Gridded data at multiple points file\n" + '# Comment lines are preceded by "#"\n' + "# Description:\n" + "# Gridded data at points of interest.\n" + "# Data: " + generate_field_name_str(dataset["datatype"]) + "\n" + "#\n" ) fieldnames = [ "point_ID", "time", "ix", "iy", "lon", "lat", "lon_ref", "lat_ref", "val", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i in range(val.size): time_aux = dataset["time"][i] if not np.ma.is_masked(dataset["time"][i]): time_aux = time_aux.strftime("%Y%m%d%H%M%S") else: time_aux = "NA" writer.writerow( { "point_ID": dataset["point_id"][i], "time": time_aux, "ix": dataset["grid_point_ix"][i], "iy": dataset["grid_point_iy"][i], "lon": dataset["point_coordinates_WGS84_lon"][i], "lat": dataset["point_coordinates_WGS84_lat"][i], "lon_ref": lon_ref[i], "lat_ref": lat_ref[i], "val": val[i], } ) csvfile.close() return fname
[docs] def write_ts_polar_data(dataset, fname): """ writes time series of data Parameters ---------- dataset : dict dictionary containing the time series parameters fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ filelist = glob.glob(fname) nsamples = len(dataset["used_antenna_coordinates_az_el_r"][0]) if not filelist: with open(fname, "w", newline="") as csvfile: if nsamples > 1: start_time = dataset["time"][0] else: start_time = dataset["time"] csvfile.write("# Weather radar timeseries data file\n") csvfile.write('# Comment lines are preceded by "#"\n') csvfile.write("# Description: \n") csvfile.write( "# Time series of a weather radar data over a " + "fixed location.\n" ) csvfile.write( "# Location [lon, lat, alt]: " + str(dataset["point_coordinates_WGS84_lon_lat_alt"]) + "\n" ) csvfile.write( "# Nominal antenna coordinates used [az, el, r]: " + str(dataset["antenna_coordinates_az_el_r"]) + "\n" ) csvfile.write( "# Data: " + generate_field_name_str(dataset["datatype"]) + "\n" ) csvfile.write("# Fill Value: " + str(get_fillvalue()) + "\n") csvfile.write( "# Start: " + start_time.strftime("%Y-%m-%d %H:%M:%S UTC") + "\n" ) csvfile.write("#\n") fieldnames = ["date", "az", "el", "r", "value"] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() if nsamples == 1: writer.writerow( { "date": dataset["time"], "az": dataset["used_antenna_coordinates_az_el_r"][0][0], "el": dataset["used_antenna_coordinates_az_el_r"][1][0], "r": dataset["used_antenna_coordinates_az_el_r"][2][0], "value": dataset["value"], } ) else: for i in range(nsamples): writer.writerow( { "date": dataset["time"][i], "az": dataset["used_antenna_coordinates_az_el_r"][0][i], "el": dataset["used_antenna_coordinates_az_el_r"][1][i], "r": dataset["used_antenna_coordinates_az_el_r"][2][i], "value": dataset["value"][i], } ) csvfile.close() else: with open(fname, "a", newline="") as csvfile: fieldnames = ["date", "az", "el", "r", "value"] writer = csv.DictWriter(csvfile, fieldnames) if nsamples == 1: writer.writerow( { "date": dataset["time"], "az": dataset["used_antenna_coordinates_az_el_r"][0][0], "el": dataset["used_antenna_coordinates_az_el_r"][1][0], "r": dataset["used_antenna_coordinates_az_el_r"][2][0], "value": dataset["value"], } ) else: for i in range(nsamples): writer.writerow( { "date": dataset["time"][i], "az": dataset["used_antenna_coordinates_az_el_r"][0][i], "el": dataset["used_antenna_coordinates_az_el_r"][1][i], "r": dataset["used_antenna_coordinates_az_el_r"][2][i], "value": dataset["value"][i], } ) csvfile.close() return fname
[docs] def write_ts_grid_data(dataset, fname): """ writes time series of data Parameters ---------- dataset : dict dictionary containing the time series parameters fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ filelist = glob.glob(fname) if not filelist: with open(fname, "w", newline="") as csvfile: csvfile.write("# Gridded data timeseries data file\n") csvfile.write('# Comment lines are preceded by "#"\n') csvfile.write("# Description: \n") csvfile.write( "# Time series of a gridded data over a " + "fixed location.\n" ) csvfile.write( "# Nominal location [lon, lat, alt]: " + str(dataset["point_coordinates_WGS84_lon_lat_alt"]) + "\n" ) csvfile.write( "# Grid points used [iz, iy, ix]: " + str(dataset["grid_points_iz_iy_ix"]) + "\n" ) csvfile.write( "# Data: " + generate_field_name_str(dataset["datatype"]) + "\n" ) csvfile.write("# Fill Value: " + str(get_fillvalue()) + "\n") csvfile.write( "# Start: " + dataset["time"].strftime("%Y-%m-%d %H:%M:%S UTC") + "\n" ) csvfile.write("#\n") fieldnames = ["date", "lon", "lat", "alt", "value"] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() writer.writerow( { "date": dataset["time"].strftime("%Y-%m-%d %H:%M:%S.%f"), "lon": dataset["used_coordinates_WGS84_lon_lat_alt"][0], "lat": dataset["used_coordinates_WGS84_lon_lat_alt"][1], "alt": dataset["used_coordinates_WGS84_lon_lat_alt"][2], "value": dataset["value"], } ) csvfile.close() else: with open(fname, "a", newline="") as csvfile: fieldnames = ["date", "lon", "lat", "alt", "value"] writer = csv.DictWriter(csvfile, fieldnames) writer.writerow( { "date": dataset["time"].strftime("%Y-%m-%d %H:%M:%S.%f"), "lon": dataset["used_coordinates_WGS84_lon_lat_alt"][0], "lat": dataset["used_coordinates_WGS84_lon_lat_alt"][1], "alt": dataset["used_coordinates_WGS84_lon_lat_alt"][2], "value": dataset["value"], } ) csvfile.close() return fname
def write_ts_ml( dt_ml, ml_top_avg, ml_top_std, thick_avg, thick_std, nrays_valid, nrays_total, fname ): """ writes time series of melting layer data Parameters ---------- dt_ml : date time array array of time steps ml_top_avg, ml_top_std: float arrays the average and the standard deviation of the melting layer top height thick_avg, thick_std: float arrays the average and the standard deviation of the metling layer thickness nrays_valid, nrays_total: int arrays the number of rays where melting layer has been identified and the total number of arrays in the scan fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ filelist = glob.glob(fname) if not filelist: with open(fname, "w", newline="") as csvfile: csvfile.write( "# Weather radar detected melting layer data file\n" + '# Comment lines are preceded by "#"\n' + "# Description: \n" + "# Time series of melting layer data detected by weather radar.\n" + "# Fill Value: " + str(get_fillvalue()) + "\n" + "# Start: " + dt_ml.strftime("%Y-%m-%d %H:%M:%S UTC") + "\n" + "#\n" ) fieldnames = [ "date-time [UTC]", "mean ml top height [m MSL]", "std ml top height [m MSL]", "mean ml thickness [m]", "std ml thickness [m]", "N valid rays", "rays total", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() writer.writerow( { "date-time [UTC]": dt_ml.strftime("%Y-%m-%d %H:%M:%S"), "mean ml top height [m MSL]": ml_top_avg.filled( fill_value=get_fillvalue() ), "std ml top height [m MSL]": ml_top_std.filled( fill_value=get_fillvalue() ), "mean ml thickness [m]": thick_avg.filled( fill_value=get_fillvalue() ), "std ml thickness [m]": thick_std.filled( fill_value=get_fillvalue() ), "N valid rays": nrays_valid, "rays total": nrays_total, } ) csvfile.close() else: with open(fname, "a", newline="") as csvfile: fieldnames = [ "date-time [UTC]", "mean ml top height [m MSL]", "std ml top height [m MSL]", "mean ml thickness [m]", "std ml thickness [m]", "N valid rays", "rays total", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writerow( { "date-time [UTC]": dt_ml.strftime("%Y-%m-%d %H:%M:%S"), "mean ml top height [m MSL]": ml_top_avg.filled( fill_value=get_fillvalue() ), "std ml top height [m MSL]": ml_top_std.filled( fill_value=get_fillvalue() ), "mean ml thickness [m]": thick_avg.filled( fill_value=get_fillvalue() ), "std ml thickness [m]": thick_std.filled( fill_value=get_fillvalue() ), "N valid rays": nrays_valid, "rays total": nrays_total, } ) csvfile.close() return fname
[docs] def write_ts_stats(dt, value, fname, stat="mean"): """ writes time series of statistics Parameters ---------- dt : date time array array of time steps value: float arrays the average and the standard deviation of the melting layer top height fname : str file name where to store the data stat : str Statistic that is written Returns ------- fname : str the name of the file where data has written """ filelist = glob.glob(fname) if not filelist: with open(fname, "w", newline="") as csvfile: csvfile.write( "# Statistics\n" + '# Comment lines are preceded by "#"\n' + "# Description: \n" + "# Time series of " + stat + ".\n" + "# Fill Value: " + str(get_fillvalue()) + "\n" + "# Start: " + datetime.datetime.strftime("%Y-%m-%d %H:%M:%S UTC") + "\n" + "#\n" ) fieldnames = ["date-time [UTC]", "value"] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() writer.writerow( { "date-time [UTC]": datetime.datetime.strftime("%Y-%m-%d %H:%M:%S"), "value": value.filled(fill_value=get_fillvalue())[0], } ) csvfile.close() else: with open(fname, "a", newline="") as csvfile: fieldnames = ["date-time [UTC]", "value"] writer = csv.DictWriter(csvfile, fieldnames) writer.writerow( { "date-time [UTC]": datetime.datetime.strftime("%Y-%m-%d %H:%M:%S"), "value": value.filled(fill_value=get_fillvalue())[0], } ) csvfile.close() return fname
[docs] def write_ts_cum(dataset, fname): """ writes time series accumulation of data Parameters ---------- dataset : dict dictionary containing the time series parameters fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ radar_value = dataset["radar_value"].filled(fill_value=get_fillvalue()) sensor_value = dataset["sensor_value"].filled(fill_value=get_fillvalue()) np_radar = dataset["np_radar"] np_sensor = dataset["np_sensor"] with open(fname, "w", newline="") as csvfile: csvfile.write("# Precipitation accumulation data file\n") csvfile.write('# Comment lines are preceded by "#"\n') csvfile.write("# Description: \n") csvfile.write( "# Time series of a precipitation accumulation of " + "weather radar data and another sensor over a " + "fixed location.\n" ) csvfile.write( "# Location [lon, lat, alt]: " + str(dataset["point_coordinates_WGS84_lon_lat_alt"]) + "\n" ) if "antenna_coordinates_az_el_r" in dataset: csvfile.write( "# Nominal antenna coordinates used [az, el, r]: " + str(dataset["antenna_coordinates_az_el_r"]) + "\n" ) csvfile.write("# sensor type: " + dataset["sensor"] + "\n") csvfile.write("# sensor ID: " + dataset["sensorid"] + "\n") csvfile.write( "# Data: Precipitation accumulation over " + str(dataset["cum_time"]) + " s\n" ) csvfile.write("# Fill Value: " + str(get_fillvalue()) + "\n") csvfile.write( "# Start: " + dataset["time"][0].strftime("%Y-%m-%d %H:%M:%S UTC") + "\n" ) csvfile.write("#\n") fieldnames = ["date", "np_radar", "radar_value", "np_sensor", "sensor_value"] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, rad_val in enumerate(radar_value): writer.writerow( { "date": dataset["time"][i], "np_radar": np_radar[i], "radar_value": rad_val, "np_sensor": np_sensor[i], "sensor_value": sensor_value[i], } ) csvfile.close() return fname
[docs] def write_monitoring_ts( start_time, np_t, values, quantiles, mean_list, datatype, fname, rewrite=False ): nvalues = np.size(start_time) start_time_aux = ( np.asarray([start_time]) if nvalues == 1 else np.asarray(start_time) ) np_t_aux = np.asarray([np_t]) if nvalues == 1 else np_t values_aux = ( np.asarray([values.filled(get_fillvalue())]) if nvalues == 1 else values.filled(get_fillvalue()) ) data = { "date": [t.strftime("%Y%m%d%H%M%S") for t in start_time_aux], "NP": np_t_aux, "geometric_mean_dB": mean_list[0], "linear_mean_dB": mean_list[1], } for i, q in enumerate(quantiles): data[f"quantile_{q}"] = values_aux[:, i] df = pd.DataFrame(data) file_exists = os.path.exists(fname) with open(fname, "r+" if file_exists and not rewrite else "w") as f: lock_file(f) if rewrite or not file_exists: header = ( "# Weather radar monitoring timeseries data file\n" '# Comment lines are preceded by "#"\n' "# Description: \n" "# Time series of a monitoring of weather radar data.\n" f"# Quantiles: {', '.join(map(str, quantiles))} percent.\n" f"# Data: {generate_field_name_str(datatype)}\n" f"# Fill Value: {get_fillvalue()}\n" f"# Start: {start_time_aux[0].strftime('%Y-%m-%d %H:%M:%S UTC')}\n" "#\n" ) f.write(header) df.to_csv(f, index=False) else: header = [line.strip() for line in f if line.startswith("#")] f.seek(0) existing_df = pd.read_csv(f, comment="#") existing_df["date"] = existing_df["date"].astype(str) combined_df = pd.concat([existing_df, df], ignore_index=True) combined_df.drop_duplicates(subset=["date"], keep="last", inplace=True) f.seek(0) f.write("\n".join(header) + "\n") combined_df.to_csv(f, index=False) unlock_file(f) return fname
[docs] def write_excess_gates(excess_dict, fname): """ Writes the position and values of gates that have a frequency of occurrence higher than a particular threshold Parameters ---------- excess_dict : dict dictionary containing the gates parameters fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ ngates = len(excess_dict["ray_ind"]) with open(fname, "w", newline="") as csvfile: csvfile.write( "# Gates exceeding " + str(excess_dict["quant_min"]) + " percentile data file\n" ) csvfile.write('# Comment lines are preceded by "#"\n') csvfile.write( "# Data collection start time: " + excess_dict["starttime"].strftime("%Y-%m-%d %H:%M:%S UTC") + "\n" ) csvfile.write( "# Data collection end time: " + excess_dict["endtime"].strftime("%Y-%m-%d %H:%M:%S UTC") + "\n" ) csvfile.write("# Number of gates in file: " + str(ngates) + "\n") csvfile.write("#\n") fieldnames = [ "ray_ind", "rng_ind", "ele", "azi", "rng", "nsamples", "occurrence", "freq_occu", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, ray_ind in enumerate(excess_dict["ray_ind"]): writer.writerow( { "ray_ind": ray_ind, "rng_ind": excess_dict["rng_ind"][i], "ele": excess_dict["ele"][i], "azi": excess_dict["azi"][i], "rng": excess_dict["rng"][i], "nsamples": excess_dict["nsamples"][i], "occurrence": excess_dict["occurrence"][i], "freq_occu": excess_dict["freq_occu"][i], } ) csvfile.close() return fname
[docs] def write_intercomp_scores_ts( start_time, stats, field_name, fname, rad1_name="RADAR001", rad2_name="RADAR002", rewrite=False, ): """ writes time series of radar intercomparison scores Parameters ---------- start_time : datetime object or array of date time objects the time of the intercomparison stats : dict dictionary containing the statistics field_name : str The name of the field fname : str file name where to store the data rad1_name, rad2_name : str Name of the radars intercompared rewrite : bool if True a new file is created Returns ------- fname : str the name of the file where data has written """ nvalues = np.size(start_time) meanbias = stats["meanbias"].filled(fill_value=get_fillvalue()) medianbias = stats["medianbias"].filled(fill_value=get_fillvalue()) quant25bias = stats["quant25bias"].filled(fill_value=get_fillvalue()) quant75bias = stats["quant75bias"].filled(fill_value=get_fillvalue()) modebias = stats["modebias"].filled(fill_value=get_fillvalue()) corr = stats["corr"].filled(fill_value=get_fillvalue()) slope = stats["slope"].filled(fill_value=get_fillvalue()) intercep = stats["intercep"].filled(fill_value=get_fillvalue()) intercep_slope_1 = stats["intercep_slope_1"].filled(fill_value=get_fillvalue()) if nvalues == 1: start_time_aux = np.asarray([start_time]) meanbias = np.asarray([meanbias]) medianbias = np.asarray([medianbias]) quant25bias = np.asarray([quant25bias]) quant75bias = np.asarray([quant75bias]) modebias = np.asarray([modebias]) corr = np.asarray([corr]) slope = np.asarray([slope]) intercep = np.asarray([intercep]) intercep_slope_1 = np.asarray([intercep_slope_1]) np_t = np.asarray([stats["npoints"]]) else: start_time_aux = np.asarray(start_time) np_t = stats["npoints"] if rewrite: file_exists = False else: filelist = glob.glob(fname) if not filelist: file_exists = False else: file_exists = True if not file_exists: with open(fname, "w", newline="") as csvfile: lock_file(csvfile) csvfile.write( "# Weather radar intercomparison scores timeseries file\n" + '# Comment lines are preceded by "#"\n' + "# Description: \n" + "# Time series of the intercomparison between two radars.\n" + "# Radar 1: " + rad1_name + "\n" + "# Radar 2: " + rad2_name + "\n" + "# Field name: " + field_name + "\n" + "# Fill Value: " + str(get_fillvalue()) + "\n" + "# Start: " + start_time_aux[0].strftime("%Y-%m-%d %H:%M:%S UTC") + "\n" + "#\n" ) fieldnames = [ "date", "NP", "mean_bias", "median_bias", "quant25_bias", "quant75_bias", "mode_bias", "corr", "slope_of_linear_regression", "intercep_of_linear_regression", "intercep_of_linear_regression_of_slope_1", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, dtime in enumerate(start_time_aux): writer.writerow( { "date": dtime.strftime("%Y%m%d%H%M%S"), "NP": np_t[i], "mean_bias": meanbias[i], "median_bias": medianbias[i], "quant25_bias": quant25bias[i], "quant75_bias": quant75bias[i], "mode_bias": modebias[i], "corr": corr[i], "slope_of_linear_regression": slope[i], "intercep_of_linear_regression": intercep[i], "intercep_of_linear_regression_of_slope_1": ( intercep_slope_1[i] ), } ) unlock_file(csvfile) csvfile.close() else: with open(fname, "a", newline="") as csvfile: lock_file(csvfile) fieldnames = [ "date", "NP", "mean_bias", "median_bias", "quant25_bias", "quant75_bias", "mode_bias", "corr", "slope_of_linear_regression", "intercep_of_linear_regression", "intercep_of_linear_regression_of_slope_1", ] writer = csv.DictWriter(csvfile, fieldnames) for i, dtime in enumerate(start_time_aux): writer.writerow( { "date": dtime.strftime("%Y%m%d%H%M%S"), "NP": np_t[i], "mean_bias": meanbias[i], "median_bias": medianbias[i], "quant25_bias": quant25bias[i], "quant75_bias": quant75bias[i], "mode_bias": modebias[i], "corr": corr[i], "slope_of_linear_regression": slope[i], "intercep_of_linear_regression": intercep[i], "intercep_of_linear_regression_of_slope_1": ( intercep_slope_1[i] ), } ) unlock_file(csvfile) csvfile.close() return fname
[docs] def write_colocated_gates(coloc_gates, fname): """ Writes the position of gates colocated with two radars Parameters ---------- coloc_gates : dict dictionary containing the colocated gates parameters fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ with open(fname, "w", newline="") as csvfile: csvfile.write("# Colocated radar gates data file\n") csvfile.write('# Comment lines are preceded by "#"\n') csvfile.write("#\n") fieldnames = [ "rad1_ray_ind", "rad1_rng_ind", "rad1_ele", "rad1_azi", "rad1_rng", "rad2_ray_ind", "rad2_rng_ind", "rad2_ele", "rad2_azi", "rad2_rng", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, rad1_ray_ind in enumerate(coloc_gates["rad1_ray_ind"]): writer.writerow( { "rad1_ray_ind": rad1_ray_ind, "rad1_rng_ind": coloc_gates["rad1_rng_ind"][i], "rad1_ele": coloc_gates["rad1_ele"][i], "rad1_azi": coloc_gates["rad1_azi"][i], "rad1_rng": coloc_gates["rad1_rng"][i], "rad2_ray_ind": coloc_gates["rad2_ray_ind"][i], "rad2_rng_ind": coloc_gates["rad2_rng_ind"][i], "rad2_ele": coloc_gates["rad2_ele"][i], "rad2_azi": coloc_gates["rad2_azi"][i], "rad2_rng": coloc_gates["rad2_rng"][i], } ) csvfile.close() return fname
[docs] def write_colocated_data(coloc_data, fname): """ Writes the data of gates colocated with two radars Parameters ---------- coloc_data : dict dictionary containing the colocated data parameters fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ filelist = glob.glob(fname) if not filelist: with open(fname, "w", newline="") as csvfile: csvfile.write("# Colocated radar gates data file\n") csvfile.write('# Comment lines are preceded by "#"\n') csvfile.write("#\n") fieldnames = [ "rad1_time", "rad1_ray_ind", "rad1_rng_ind", "rad1_ele", "rad1_azi", "rad1_rng", "rad1_val", "rad2_time", "rad2_ray_ind", "rad2_rng_ind", "rad2_ele", "rad2_azi", "rad2_rng", "rad2_val", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, rad1_time in enumerate(coloc_data["rad1_time"]): writer.writerow( { "rad1_time": rad1_time.strftime("%Y%m%d%H%M%S"), "rad1_ray_ind": coloc_data["rad1_ray_ind"][i], "rad1_rng_ind": coloc_data["rad1_rng_ind"][i], "rad1_ele": coloc_data["rad1_ele"][i], "rad1_azi": coloc_data["rad1_azi"][i], "rad1_rng": coloc_data["rad1_rng"][i], "rad1_val": coloc_data["rad1_val"][i], "rad2_time": ( coloc_data["rad2_time"][i].strftime("%Y%m%d%H%M%S") ), "rad2_ray_ind": coloc_data["rad2_ray_ind"][i], "rad2_rng_ind": coloc_data["rad2_rng_ind"][i], "rad2_ele": coloc_data["rad2_ele"][i], "rad2_azi": coloc_data["rad2_azi"][i], "rad2_rng": coloc_data["rad2_rng"][i], "rad2_val": coloc_data["rad2_val"][i], } ) csvfile.close() else: with open(fname, "a", newline="") as csvfile: fieldnames = [ "rad1_time", "rad1_ray_ind", "rad1_rng_ind", "rad1_ele", "rad1_azi", "rad1_rng", "rad1_val", "rad2_time", "rad2_ray_ind", "rad2_rng_ind", "rad2_ele", "rad2_azi", "rad2_rng", "rad2_val", ] writer = csv.DictWriter(csvfile, fieldnames) for i, rad1_time in enumerate(coloc_data["rad1_time"]): writer.writerow( { "rad1_time": rad1_time.strftime("%Y%m%d%H%M%S"), "rad1_ray_ind": coloc_data["rad1_ray_ind"][i], "rad1_rng_ind": coloc_data["rad1_rng_ind"][i], "rad1_ele": coloc_data["rad1_ele"][i], "rad1_azi": coloc_data["rad1_azi"][i], "rad1_rng": coloc_data["rad1_rng"][i], "rad1_val": coloc_data["rad1_val"][i], "rad2_time": ( coloc_data["rad2_time"][i].strftime("%Y%m%d%H%M%S") ), "rad2_ray_ind": coloc_data["rad2_ray_ind"][i], "rad2_rng_ind": coloc_data["rad2_rng_ind"][i], "rad2_ele": coloc_data["rad2_ele"][i], "rad2_azi": coloc_data["rad2_azi"][i], "rad2_rng": coloc_data["rad2_rng"][i], "rad2_val": coloc_data["rad2_val"][i], } ) csvfile.close() return fname
[docs] def write_colocated_data_time_avg(coloc_data, fname): """ Writes the time averaged data of gates colocated with two radars Parameters ---------- coloc_data : dict dictionary containing the colocated data parameters fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ filelist = glob.glob(fname) if not filelist: with open(fname, "w", newline="") as csvfile: csvfile.write("# Colocated radar gates data file\n") csvfile.write('# Comment lines are preceded by "#"\n') csvfile.write("#\n") fieldnames = [ "rad1_time", "rad1_ray_ind", "rad1_rng_ind", "rad1_ele", "rad1_azi", "rad1_rng", "rad1_dBZavg", "rad1_PhiDPavg", "rad1_Flagavg", "rad2_time", "rad2_ray_ind", "rad2_rng_ind", "rad2_ele", "rad2_azi", "rad2_rng", "rad2_dBZavg", "rad2_PhiDPavg", "rad2_Flagavg", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, rad1_time in enumerate(coloc_data["rad1_time"]): writer.writerow( { "rad1_time": rad1_time.strftime("%Y%m%d%H%M%S"), "rad1_ray_ind": coloc_data["rad1_ray_ind"][i], "rad1_rng_ind": coloc_data["rad1_rng_ind"][i], "rad1_ele": coloc_data["rad1_ele"][i], "rad1_azi": coloc_data["rad1_azi"][i], "rad1_rng": coloc_data["rad1_rng"][i], "rad1_dBZavg": coloc_data["rad1_dBZavg"][i], "rad1_PhiDPavg": coloc_data["rad1_PhiDPavg"][i], "rad1_Flagavg": coloc_data["rad1_Flagavg"][i], "rad2_time": ( coloc_data["rad2_time"][i].strftime("%Y%m%d%H%M%S") ), "rad2_ray_ind": coloc_data["rad2_ray_ind"][i], "rad2_rng_ind": coloc_data["rad2_rng_ind"][i], "rad2_ele": coloc_data["rad2_ele"][i], "rad2_azi": coloc_data["rad2_azi"][i], "rad2_rng": coloc_data["rad2_rng"][i], "rad2_dBZavg": coloc_data["rad2_dBZavg"][i], "rad2_PhiDPavg": coloc_data["rad2_PhiDPavg"][i], "rad2_Flagavg": coloc_data["rad2_Flagavg"][i], } ) csvfile.close() else: with open(fname, "a", newline="") as csvfile: fieldnames = [ "rad1_time", "rad1_ray_ind", "rad1_rng_ind", "rad1_ele", "rad1_azi", "rad1_rng", "rad1_dBZavg", "rad1_PhiDPavg", "rad1_Flagavg", "rad2_time", "rad2_ray_ind", "rad2_rng_ind", "rad2_ele", "rad2_azi", "rad2_rng", "rad2_dBZavg", "rad2_PhiDPavg", "rad2_Flagavg", ] writer = csv.DictWriter(csvfile, fieldnames) for i, rad1_time in enumerate(coloc_data["rad1_time"]): writer.writerow( { "rad1_time": rad1_time.strftime("%Y%m%d%H%M%S"), "rad1_ray_ind": coloc_data["rad1_ray_ind"][i], "rad1_rng_ind": coloc_data["rad1_rng_ind"][i], "rad1_ele": coloc_data["rad1_ele"][i], "rad1_azi": coloc_data["rad1_azi"][i], "rad1_rng": coloc_data["rad1_rng"][i], "rad1_dBZavg": coloc_data["rad1_dBZavg"][i], "rad1_PhiDPavg": coloc_data["rad1_PhiDPavg"][i], "rad1_Flagavg": coloc_data["rad1_Flagavg"][i], "rad2_time": ( coloc_data["rad2_time"][i].strftime("%Y%m%d%H%M%S") ), "rad2_ray_ind": coloc_data["rad2_ray_ind"][i], "rad2_rng_ind": coloc_data["rad2_rng_ind"][i], "rad2_ele": coloc_data["rad2_ele"][i], "rad2_azi": coloc_data["rad2_azi"][i], "rad2_rng": coloc_data["rad2_rng"][i], "rad2_dBZavg": coloc_data["rad2_dBZavg"][i], "rad2_PhiDPavg": coloc_data["rad2_PhiDPavg"][i], "rad2_Flagavg": coloc_data["rad2_Flagavg"][i], } ) csvfile.close() return fname
[docs] def write_sun_hits(sun_hits, fname): """ Writes sun hits data. Parameters ---------- sun_hits : dict dictionary containing the sun hits parameters fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ dBm_sun_hit = sun_hits["dBm_sun_hit"].filled(fill_value=get_fillvalue()) std_dBm_sun_hit = sun_hits["std(dBm_sun_hit)"].filled(fill_value=get_fillvalue()) dBmv_sun_hit = sun_hits["dBmv_sun_hit"].filled(fill_value=get_fillvalue()) std_dBmv_sun_hit = sun_hits["std(dBmv_sun_hit)"].filled(fill_value=get_fillvalue()) zdr_sun_hit = sun_hits["ZDR_sun_hit"].filled(fill_value=get_fillvalue()) std_zdr_sun_hit = sun_hits["std(ZDR_sun_hit)"].filled(fill_value=get_fillvalue()) filelist = glob.glob(fname) if not filelist: with open(fname, "w", newline="") as csvfile: csvfile.write("# Weather radar sun hits data file\n") csvfile.write('# Comment lines are preceded by "#"\n') csvfile.write("# Fill Value: " + str(get_fillvalue()) + "\n") csvfile.write("#\n") fieldnames = [ "time", "ray", "NPrng", "rad_el", "rad_az", "sun_el", "sun_az", "dBm_sun_hit", "std(dBm_sun_hit)", "NPh", "NPhval", "dBmv_sun_hit", "std(dBmv_sun_hit)", "NPv", "NPvval", "ZDR_sun_hit", "std(ZDR_sun_hit)", "NPzdr", "NPzdrval", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() for i, sh_time in enumerate(sun_hits["time"]): writer.writerow( { "time": sh_time.strftime("%Y-%m-%d %H:%M:%S.%f"), "ray": sun_hits["ray"][i], "NPrng": sun_hits["NPrng"][i], "rad_el": sun_hits["rad_el"][i], "rad_az": sun_hits["rad_az"][i], "sun_el": sun_hits["sun_el"][i], "sun_az": sun_hits["sun_az"][i], "dBm_sun_hit": dBm_sun_hit[i], "std(dBm_sun_hit)": std_dBm_sun_hit[i], "NPh": sun_hits["NPh"][i], "NPhval": sun_hits["NPhval"][i], "dBmv_sun_hit": dBmv_sun_hit[i], "std(dBmv_sun_hit)": std_dBmv_sun_hit[i], "NPv": sun_hits["NPv"][i], "NPvval": sun_hits["NPvval"][i], "ZDR_sun_hit": zdr_sun_hit[i], "std(ZDR_sun_hit)": std_zdr_sun_hit[i], "NPzdr": sun_hits["NPzdr"][i], "NPzdrval": sun_hits["NPzdrval"][i], } ) csvfile.close() else: with open(fname, "a", newline="") as csvfile: fieldnames = [ "time", "ray", "NPrng", "rad_el", "rad_az", "sun_el", "sun_az", "dBm_sun_hit", "std(dBm_sun_hit)", "NPh", "NPhval", "dBmv_sun_hit", "std(dBmv_sun_hit)", "NPv", "NPvval", "ZDR_sun_hit", "std(ZDR_sun_hit)", "NPzdr", "NPzdrval", ] writer = csv.DictWriter(csvfile, fieldnames) for i, sh_time in enumerate(sun_hits["time"]): writer.writerow( { "time": sh_time.strftime("%Y-%m-%d %H:%M:%S.%f"), "ray": sun_hits["ray"][i], "NPrng": sun_hits["NPrng"][i], "rad_el": sun_hits["rad_el"][i], "rad_az": sun_hits["rad_az"][i], "sun_el": sun_hits["sun_el"][i], "sun_az": sun_hits["sun_az"][i], "dBm_sun_hit": dBm_sun_hit[i], "std(dBm_sun_hit)": std_dBm_sun_hit[i], "NPh": sun_hits["NPh"][i], "NPhval": sun_hits["NPhval"][i], "dBmv_sun_hit": dBmv_sun_hit[i], "std(dBmv_sun_hit)": std_dBmv_sun_hit[i], "NPv": sun_hits["NPv"][i], "NPvval": sun_hits["NPvval"][i], "ZDR_sun_hit": zdr_sun_hit[i], "std(ZDR_sun_hit)": std_zdr_sun_hit[i], "NPzdr": sun_hits["NPzdr"][i], "NPzdrval": sun_hits["NPzdrval"][i], } ) csvfile.close() return fname
[docs] def write_sun_retrieval(sun_retrieval, fname): """ Writes sun retrieval data. Parameters ---------- sun_retrieval : dict dictionary containing the sun retrieval parameters fname : str file name where to store the data Returns ------- fname : str the name of the file where data has written """ first_hit_time = sun_retrieval["first_hit_time"].strftime("%Y%m%d%H%M%S") last_hit_time = sun_retrieval["last_hit_time"].strftime("%Y%m%d%H%M%S") el_width_h = sun_retrieval["el_width_h"].filled(fill_value=get_fillvalue()) az_width_h = sun_retrieval["az_width_h"].filled(fill_value=get_fillvalue()) el_bias_h = sun_retrieval["el_bias_h"].filled(fill_value=get_fillvalue()) az_bias_h = sun_retrieval["az_bias_h"].filled(fill_value=get_fillvalue()) dBm_sun_est = sun_retrieval["dBm_sun_est"].filled(fill_value=get_fillvalue()) std_dBm_sun_est = sun_retrieval["std(dBm_sun_est)"].filled( fill_value=get_fillvalue() ) sf_h = sun_retrieval["sf_h"].filled(fill_value=get_fillvalue()) el_width_v = sun_retrieval["el_width_v"].filled(fill_value=get_fillvalue()) az_width_v = sun_retrieval["az_width_v"].filled(fill_value=get_fillvalue()) el_bias_v = sun_retrieval["el_bias_v"].filled(fill_value=get_fillvalue()) az_bias_v = sun_retrieval["az_bias_v"].filled(fill_value=get_fillvalue()) dBmv_sun_est = sun_retrieval["dBmv_sun_est"].filled(fill_value=get_fillvalue()) std_dBmv_sun_est = sun_retrieval["std(dBmv_sun_est)"].filled( fill_value=get_fillvalue() ) sf_v = sun_retrieval["sf_v"].filled(fill_value=get_fillvalue()) zdr_sun_est = sun_retrieval["ZDR_sun_est"].filled(fill_value=get_fillvalue()) std_zdr_sun_est = sun_retrieval["std(ZDR_sun_est)"].filled( fill_value=get_fillvalue() ) sf_ref = sun_retrieval["sf_ref"].filled(fill_value=get_fillvalue()) ref_time = "None" if sun_retrieval["ref_time"] is not None: ref_time = sun_retrieval["ref_time"].strftime("%Y%m%d%H%M%S") filelist = glob.glob(fname) if not filelist: with open(fname, "w", newline="") as csvfile: csvfile.write("# Weather radar sun retrievals data file\n") csvfile.write('# Comment lines are preceded by "#"\n') csvfile.write("# Fill Value: " + str(get_fillvalue()) + "\n") csvfile.write("#\n") fieldnames = [ "first_hit_time", "last_hit_time", "nhits_h", "el_width_h", "az_width_h", "el_bias_h", "az_bias_h", "dBm_sun_est", "std(dBm_sun_est)", "sf_h", "nhits_v", "el_width_v", "az_width_v", "el_bias_v", "az_bias_v", "dBmv_sun_est", "std(dBmv_sun_est)", "sf_v", "nhits_zdr", "ZDR_sun_est", "std(ZDR_sun_est)", "sf_ref", "ref_time", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writeheader() writer.writerow( { "first_hit_time": first_hit_time, "last_hit_time": last_hit_time, "nhits_h": sun_retrieval["nhits_h"], "el_width_h": el_width_h, "az_width_h": az_width_h, "el_bias_h": el_bias_h, "az_bias_h": az_bias_h, "dBm_sun_est": dBm_sun_est, "std(dBm_sun_est)": std_dBm_sun_est, "sf_h": sf_h, "nhits_v": sun_retrieval["nhits_v"], "el_width_v": el_width_v, "az_width_v": az_width_v, "el_bias_v": el_bias_v, "az_bias_v": az_bias_v, "dBmv_sun_est": dBmv_sun_est, "std(dBmv_sun_est)": std_dBmv_sun_est, "sf_v": sf_v, "nhits_zdr": sun_retrieval["nhits_zdr"], "ZDR_sun_est": zdr_sun_est, "std(ZDR_sun_est)": std_zdr_sun_est, "sf_ref": sf_ref, "ref_time": ref_time, } ) csvfile.close() else: with open(fname, "a", newline="") as csvfile: fieldnames = [ "first_hit_time", "last_hit_time", "nhits_h", "el_width_h", "az_width_h", "el_bias_h", "az_bias_h", "dBm_sun_est", "std(dBm_sun_est)", "sf_h", "nhits_v", "el_width_v", "az_width_v", "el_bias_v", "az_bias_v", "dBmv_sun_est", "std(dBmv_sun_est)", "sf_v", "nhits_zdr", "ZDR_sun_est", "std(ZDR_sun_est)", "sf_ref", "ref_time", ] writer = csv.DictWriter(csvfile, fieldnames) writer.writerow( { "first_hit_time": first_hit_time, "last_hit_time": last_hit_time, "nhits_h": sun_retrieval["nhits_h"], "el_width_h": el_width_h, "az_width_h": az_width_h, "el_bias_h": el_bias_h, "az_bias_h": az_bias_h, "dBm_sun_est": dBm_sun_est, "std(dBm_sun_est)": std_dBm_sun_est, "sf_h": sf_h, "nhits_v": sun_retrieval["nhits_v"], "el_width_v": el_width_v, "az_width_v": az_width_v, "el_bias_v": el_bias_v, "az_bias_v": az_bias_v, "dBmv_sun_est": dBmv_sun_est, "std(dBmv_sun_est)": std_dBmv_sun_est, "sf_v": sf_v, "nhits_zdr": sun_retrieval["nhits_zdr"], "ZDR_sun_est": zdr_sun_est, "std(ZDR_sun_est)": std_zdr_sun_est, "sf_ref": sf_ref, "ref_time": ref_time, } ) csvfile.close() return fname