Source code for pyrad.io.trajectory

"""
pyrad.io.trajectory
===================
Trajectory class implementation for reading trajectory file.
Converting to different coordinate systems.
.. autosummary::
    :toctree: generated/
    Trajectory
    _Radar_Trajectory
"""

import sys
import re
import datetime
import locale
from warnings import warn
from copy import deepcopy

import numpy as np
import pandas as pd

import pyart

from ..io.read_data_sensor import read_lightning, read_trt_traj_data
from ..io.read_data_sensor import read_trt_thundertracking_traj_data


[docs] class Trajectory: """ A class for reading and handling trajectory data from a file. Attributes ---------- filename : str Path and name of the trajectory definition file starttime : datetime Start time of trajectory processing. endtime : datetime End time of trajectory processing. trajtype : str Type of trajectory. Can be 'plane' or 'lightning' time_vector : Array of datetime objects Array containing the trajectory time samples wgs84_lat_deg : Array of floats WGS84 latitude samples in deg wgs84_lon_deg : Array of floats WGS84 longitude samples in deg wgs84_alt_m : Array of floats WGS84 altitude samples in m nsamples : int Number of samples in the trajectory _swiss_grid_done : Bool Indicates that convertion to Swiss coordinates has been performed swiss_chy, swiss_chx, swiss_chh : Array of floats Swiss coordinates in m radar_list : list List of radars for which trajectories are going to be computed flashnr : int For 'lightning' only. Number of flash for which trajectory data is going to be computed. If 0 all all flashes are going to be considered. time_in_flash : array of floats For 'lightning' only. Time within flash (sec) flashnr_vec : array of ints For 'lightning' only. Flash number of each data sample dBm : array of floats For 'lightning' only. Lightning power (dBm) Methods: -------- add_radar : Add a radar calculate_velocities : Computes velocities get_start_time : Return time of first trajectory sample get_end_time : Return time of last trajectory sample get_samples_in_period : Get indices of samples within period _convert_traj_to_swissgrid : convert data from WGS84 to Swiss coordinates _read_traj : Read plane trajectory from file _read_traj_flores : Read FLORAKO plane trajectory from file _read_traj_trt : Read TRT trajectory from file _read_traj_lightning : Read lightning trajectory from file _get_total_seconds : Get the total time of the trajectory in seconds """ def __init__( self, filename, starttime=None, endtime=None, trajtype="plane", flashnr=0 ): """ Initalize the object. Parameters ---------- filename : str Filename containing the trajectory samples. starttime : datetime Start time of trajectory processing. If not given, use the time of the first trajectory sample. endtime : datetime End time of trajectory processing. If not given, use the time of the last trajectory sample. trajtype : str type of trajectory. Can be plane or lightning flashnr : int If type of trajectory is lightning, the flash number to check the trajectory. 0 means all flash numbers included """ self.filename = filename self.starttime = starttime self.endtime = endtime self.trajtype = trajtype self.time_vector = np.array([], dtype=datetime.datetime) self.wgs84_lat_deg = np.array([], dtype=float) self.wgs84_lon_deg = np.array([], dtype=float) self.wgs84_alt_m = np.array([], dtype=float) self.nsamples = None self._swiss_grid_done = False self.swiss_chy = None self.swiss_chx = None self.swiss_chh = None self.radar_list = [] if self.trajtype == "lightning": self.flashnr = flashnr self.time_in_flash = np.array([], dtype=datetime.datetime) self.dBm = np.array([], dtype=float) self.flashnr_vec = np.array([], dtype=float) elif self.trajtype == "trt": self.cell_contour = np.array([]) try: if self.trajtype == "lightning": self._read_traj_lightning(flashnr) elif self.trajtype == "trt": self._read_traj_trt() elif self.trajtype == "plane_flores": self._read_traj_flores() else: self._read_traj() except Exception as ee: warn(str(ee)) raise Exception( "ERROR: Could not load trajectory data from file '" + filename + "' into Trajectory object" )
[docs] def add_radar(self, radar): """ Add the coordinates (WGS84 longitude, latitude and non WGS84 altitude) of a radar to the radar_list. Parameters ---------- radar : pyart radar object containing the radar coordinates Return ------ Radar object """ # Check if radar location is already in the radar list for rad in self.radar_list: if rad.location_is_equal( radar.latitude["data"][0], radar.longitude["data"][0], radar.altitude["data"][0], ): # warn("WARNING: Tried to add the same radar twice to the" # " radar list") return rad rad = _Radar_Trajectory( radar.latitude["data"][0], radar.longitude["data"][0], radar.altitude["data"][0], ) self.radar_list.append(rad) # Convert trajectory WGS84 points to polara radar coordinates rad.convert_radpos_to_swissgrid() self._convert_traj_to_swissgrid() # Note: Earth curvature not considered yet! (rvec, azvec, elvec) = pyart.core.cartesian_to_antenna( self.swiss_chy - rad.ch_y, self.swiss_chx - rad.ch_x, self.swiss_chh - rad.ch_alt, ) rad.assign_trajectory(elvec, azvec, rvec) return rad
[docs] def calculate_velocities(self, radar): """ Calculate velocities. """ if not radar.traj_assigned: raise Exception("ERROR: No trajectory assigned to radar object") dt_secs = np.vectorize(self._get_total_seconds) dt = dt_secs(self.time_vector[2:] - self.time_vector[:-2]) v_r = np.empty(self.nsamples, dtype=float) v_r[0] = v_r[-1] = np.nan v_r[1:-1] = (radar.range_vec[2:] - radar.range_vec[:-2]) / dt v_az = np.empty(self.nsamples, dtype=float) v_az[0] = v_az[-1] = np.nan daz = radar.azimuth_vec[2:] - radar.azimuth_vec[:-2] daz[daz > 180.0] -= 360.0 daz[daz < -180.0] += 360.0 v_az[1:-1] = daz / dt v_el = np.empty(self.nsamples, dtype=float) v_el[0] = v_el[-1] = np.nan v_el[1:-1] = (radar.elevation_vec[2:] - radar.elevation_vec[:-2]) / dt v_abs = np.empty(self.nsamples, dtype=float) v_abs[0] = v_abs[-1] = np.nan self._convert_traj_to_swissgrid() dx = self.swiss_chy[2:] - self.swiss_chy[:-2] dy = self.swiss_chx[2:] - self.swiss_chx[:-2] dz = self.swiss_chh[2:] - self.swiss_chh[:-2] v_abs[1:-1] = np.sqrt(dx**2 + dy**2 + dz**2) / dt radar.assign_velocity_vecs(v_abs, v_r, v_el, v_az)
[docs] def get_samples_in_period(self, start=None, end=None): """ " Get indices of samples of the trajectory within given time period. """ if (start is None) and (end is None): raise Exception("ERROR: Either start or end must be defined") if start is None: return np.where(self.time_vector < end) if end is None: return np.where(self.time_vector >= start) return np.where((self.time_vector >= start) & (self.time_vector < end))
[docs] def get_start_time(self): """ Get time of first trajectory sample. Return ------ datetime object """ return self.time_vector[0]
[docs] def get_end_time(self): """ Get time of last trajectory sample. Return ------ datetime object """ return self.time_vector[-1]
def _convert_traj_to_swissgrid(self): """ Convert trajectory samples from WGS84 to Swiss CH1903 coordinates """ if self._swiss_grid_done: return (self.swiss_chy, self.swiss_chx, self.swiss_chh) = ( pyart.core.wgs84_to_swissCH1903( self.wgs84_lon_deg, self.wgs84_lat_deg, self.wgs84_alt_m ) ) self._swiss_grid_done = True def _read_traj(self): """ Read trajectory from file File format ----------- Comment symbol: '#' Columns: 1. Day (UTC) format: DD-MMM-YYYY 2. Seconds since midnight (UTC) 3. WGS84 latitude [radians] 4. WGS84 longitude [radians] 5. WGS84 altitude [m] """ # check if the file can be read try: tfile = open(self.filename, "r") except Exception: raise Exception( "ERROR: Could not find|open trajectory file '" + self.filename + "'" ) repat = re.compile( "(\\d+\\-[A-Za-z]+\\-\\d+)\\s+([\\d\\.]+)\\s+" "([\\-\\d\\.]+)\\s+([\\-\\d\\.]+)\\s+([\\-\\d\\.]+)" ) try: loc_set = False loc = locale.getlocale() # get current locale if loc[0] != "en_US": try: locale.setlocale(locale.LC_ALL, ("en_US", "UTF-8")) except Exception as ee: raise Exception(f"ERROR: Cannot set local 'en_US': {ee}") loc_set = True recording_started = True if self.starttime is not None: recording_started = False recording_check_stop = False if self.endtime is not None: recording_check_stop = True for line in tfile: line = line.strip() if not line: continue # ignore comments if line.startswith("#"): continue line = line.partition("#")[0] # Remove comments line = line.strip() mm = repat.match(line) if not mm: print( f"WARNING: Format error in trajectory file" f" '{self.filename}' on line '{line}'", file=sys.stderr, ) continue # Get time stamp try: sday = datetime.datetime.strptime(mm.group(1), "%d-%b-%Y") except Exception as ee: print(datetime.datetime.utcnow().strftime("%d-%b-%Y")) raise Exception( f"ERROR: Format error in traj file '{self.filename}' " f"on line '{line}' ({str(ee)})" ) sday += datetime.timedelta(seconds=float(mm.group(2))) if not recording_started: if sday < self.starttime: continue recording_started = True if recording_check_stop: if sday > self.endtime: break self.time_vector = np.append(self.time_vector, [sday]) self.wgs84_lat_deg = np.append( self.wgs84_lat_deg, [float(mm.group(3)) * 180.0 / np.pi] ) self.wgs84_lon_deg = np.append( self.wgs84_lon_deg, [float(mm.group(4)) * 180.0 / np.pi] ) self.wgs84_alt_m = np.append(self.wgs84_alt_m, [float(mm.group(5))]) except Exception: raise finally: tfile.close() if loc_set: locale.setlocale(locale.LC_ALL, loc) # restore saved locale self.nsamples = len(self.time_vector) def _read_traj_flores(self): """ Read trajectory from FLORAKO file File format ----------- Column/Variable Contents, Units and Description: 01: UTCDate Year-Month-Day Date of epoch or feature (UTC time) 02: UTCTime HH:MM:SS.SS Time of epoch or feature - Receiver time frame (UTC) 03: Latitude Decimal Degrees (signed) North/South Geographic coordinate 04: Longitude Decimal Degrees (signed) East/West Geographic coordinate 05: H-MSL Metres Height above the geoid 06: H-Ell Metres Height above the current ellipsoid 07: Undulation Metres Height of the ellipsoid above the geoid 08: PDOP Position Dilution of Precision, which is a measure of X, Y, Z position geometry 09: HDOP Horizontal Position Dilution of Precision, which is a measure of X, Y position geometry 10: VDOP Vertical Position Dilution of Precision, which is a measure of Z position geometry 11: DopRms Root mean square of all L1 Doppler measurement residuals """ # check if the file can be read try: tfile = open(self.filename, "r") except Exception: raise Exception( "ERROR: Could not find|open trajectory file '" + self.filename + "'" ) tfile.close() try: data = pd.read_csv(self.filename, skiprows=28, delim_whitespace=True) data = data.drop(0) times = [ datetime.datetime.strptime(d + "_" + h + "0000", "%Y-%m-%d_%H:%M:%S.%f") for d, h in zip(data["UTCDate"], data["UTCTime"]) ] self.time_vector = np.array(times) self.wgs84_lat_deg = np.array(pd.to_numeric(data["Latitude"])) self.wgs84_lon_deg = np.array(pd.to_numeric(data["Longitude"])) self.wgs84_alt_m = np.array(pd.to_numeric(data["H-MSL"])) except Exception: raise self.nsamples = len(self.time_vector) def _read_traj_lightning(self, flashnr=0): """ Read trajectory from lightning file File format ----------- Columns: 1. flashnr (0=noise) 2. Seconds since midnight (UTC) 2. Time in flash (s) 3. WGS84 latitude [deg] 4. WGS84 longitude [deg] 5. WGS84 altitude [m] 6. Power [dBm] Parameters ---------- flashnr : int the flash number to keep. If 0 data from all flashes will be kept """ flashnr_vec, time, time_in_flash, lat, lon, alt, dBm = read_lightning( self.filename ) if flashnr_vec is None: raise Exception( "ERROR: Could not find|open trajectory file '" + self.filename + "'" ) recording_started = True if self.starttime is not None: recording_started = False recording_check_stop = False if self.endtime is not None: recording_check_stop = True if flashnr > 0: flashnr_vec_aux = deepcopy(flashnr_vec) flashnr_vec = flashnr_vec[flashnr_vec_aux == flashnr] time = time[flashnr_vec_aux == flashnr] time_in_flash = time_in_flash[flashnr_vec_aux == flashnr] lat = lat[flashnr_vec_aux == flashnr] lon = lon[flashnr_vec_aux == flashnr] alt = alt[flashnr_vec_aux == flashnr] dBm = dBm[flashnr_vec_aux == flashnr] for i, dBm_val in enumerate(dBm): if not recording_started: if time[i] < self.starttime: continue recording_started = True if recording_check_stop: if time[i] > self.endtime: break self.flashnr_vec = np.append(self.flashnr_vec, [flashnr_vec[i]]) self.time_vector = np.append(self.time_vector, [time[i]]) self.time_in_flash = np.append(self.time_in_flash, [time_in_flash[i]]) self.wgs84_lat_deg = np.append(self.wgs84_lat_deg, [lat[i]]) self.wgs84_lon_deg = np.append(self.wgs84_lon_deg, [lon[i]]) self.wgs84_alt_m = np.append(self.wgs84_alt_m, [alt[i]]) self.dBm = np.append(self.dBm, [dBm_val]) self.nsamples = len(self.time_vector) def _read_traj_trt(self): """ Read trajectory from TRT file File format ----------- Columns: 1. traj_ID 2. yyyymmddHHMM (UTC) 2. lon (deg) 3. lat [deg] 4. ell_L 5. ell_S 6. ell_or 7. area 8. vel_x 9. vel_y 10. det 11. RANKr 12. CG_n 13. CG_p 14. CG 15. CG_percent_p 16. ET45 17. ET45m 18. ET15 19. ET15m 20. VIL 21. maxH 22. maxHm 23. POH 24. RANK 25. Dvel_x 26. Dvel_y 27. cell_contours Parameters ---------- """ if "_tt.trt" in self.filename: ( traj_ID, _, yyyymmddHHMM, _, _, _, lon, lat, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, cell_contours, ) = read_trt_thundertracking_traj_data(self.filename) if traj_ID is None: raise Exception( "ERROR: Could not find|open trajectory file '" + self.filename + "'" ) valid = np.logical_not(np.ma.getmaskarray(yyyymmddHHMM)) yyyymmddHHMM = yyyymmddHHMM[valid] traj_ID = traj_ID[valid] lon = lon[valid] lat = lat[valid] cell_contours = cell_contours[valid] if traj_ID.size == 0: raise Exception( "ERROR: No valid data in trajectory file '" + self.filename + "'" ) else: ( traj_ID, yyyymmddHHMM, lon, lat, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, cell_contours, ) = read_trt_traj_data(self.filename) if traj_ID is None: raise Exception( "ERROR: Could not find|open trajectory file '" + self.filename + "'" ) recording_started = True if self.starttime is not None: recording_started = False recording_check_stop = False if self.endtime is not None: recording_check_stop = True for i, cell_contour in enumerate(cell_contours): if not recording_started: if yyyymmddHHMM[i] < self.starttime: continue recording_started = True if recording_check_stop: if yyyymmddHHMM[i] > self.endtime: break self.time_vector = np.append(self.time_vector, [yyyymmddHHMM[i]]) self.wgs84_lat_deg = np.append(self.wgs84_lat_deg, [lat[i]]) self.wgs84_lon_deg = np.append(self.wgs84_lon_deg, [lon[i]]) self.wgs84_alt_m = np.append(self.wgs84_alt_m, 0.0) self.cell_contour = np.append(self.cell_contour, [cell_contour]) self.nsamples = len(self.time_vector) def _get_total_seconds(self, x): """Return total seconds of timedelta object""" return x.total_seconds()
class _Radar_Trajectory: """ A class for holding the trajectory data assigned to a radar. Attributes ---------- latitude : float WGS84 radar latitude [deg] longitude : float WGS84 radar longitude [deg] altitude : float radar altitude [m] (non WGS84) ch_y, ch_x, ch_alt : float radar coordinates in swiss CH1903 coordinates elevation_vec : float list Elevation values of the trajectory samples azimuth_vec : float list Azimuth values of the trajectory samples range_vec : float list Range values of the trajectory samples v_abs, v_r, v_el, v_az : array-like Velocity vectors of the absolute [m/s], radial [m/s], elevation [deg/s] and azimuth [deg/s] velocities Methods: -------- location_is_equal assign_trajectory convert_radpos_to_swissgrid assign_velocity_vecs """ def __init__(self, lat, lon, alt): """ Initalize the object. Parameters ---------- lat, lon , alt : radar location coordinates nsamps : number of samples """ # radar position in lat, lon self.latitude = lat self.longitude = lon self.altitude = alt # radar position in swiss CH1903 coordinates self._swiss_coords_done = False self.ch_y = None self.ch_x = None self.ch_alt = None # trajectory in polar radar coordinates self.traj_assigned = False self.elevation_vec = None self.azimuth_vec = None self.range_vec = None # Velocity vectors self._velocity_vecs_assigned = False self.v_abs = None self.v_r = None self.v_el = None self.v_az = None def location_is_equal(self, lat, lon, alt): """ Check if the given coordinates are the same. Parameters ---------- lat, lon , alt : radar location coordinates Return ------ True if the radar location is equal, False otherwise """ lat_tol = 0.002 # [deg] lon_tol = 0.002 # [deg] alt_tol = 2.0 # [m] if ( (np.abs(self.latitude - lat) > lat_tol) or (np.abs(self.longitude - lon) > lon_tol) or (np.abs(self.altitude - alt) > alt_tol) ): return False return True def assign_trajectory(self, el, az, rr): """ Assign a trajectory to the radar in polar radar coordinates. Parameters ---------- el, az, rr : array-like elevation, azimuth and range vector """ if self.traj_assigned: warn("WARNING: Trajectory already assigned") return self.elevation_vec = el self.azimuth_vec = az self.range_vec = rr self.traj_assigned = True def convert_radpos_to_swissgrid(self): """ Convert the radar location (in WGS84 coordinates) to swiss CH1903 coordinates. """ if self._swiss_coords_done: return (self.ch_y, self.ch_x, self.ch_alt) = pyart.core.wgs84_to_swissCH1903( self.longitude, self.latitude, self.altitude, no_altitude_transform=True ) self._swiss_coords_done = True def assign_velocity_vecs(self, v_abs, v_r, v_el, v_az): """ Assign velocity vectors to the radar. """ if self._velocity_vecs_assigned: warn("WARNING: Trajectory velocity vectors already assigned") return self.v_abs = v_abs self.v_r = v_r self.v_az = v_az self.v_el = v_el self._velocity_vecs_assigned = True