Source code for meteodatalab.operators.vertical_interpolation

"""Vertical interpolation operators."""

# Standard library
from typing import Literal, Sequence

# Third-party
import numpy as np
import xarray as xr

# First-party
from meteodatalab.operators.support_operators import (
    TargetCoordinates,
    TargetCoordinatesAttrs,
    init_field_with_vcoord,
)


[docs] def interpolate_k2p( field: xr.DataArray, mode: Literal["linear_in_p", "linear_in_lnp", "nearest_sfc"], p_field: xr.DataArray, p_tc_values: Sequence[float], p_tc_units: Literal["Pa", "hPa"], ) -> xr.DataArray: """Interpolate a field from model (k) levels to pressure coordinates. Example for vertical interpolation to isosurfaces of a target field, which is strictly monotonically decreasing with height. Parameters ---------- field : xarray.DataArray field to interpolate (only typeOfLevel="generalVerticalLayer" is supported) mode : str interpolation algorithm, one of {"linear_in_p", "linear_in_lnp", "nearest_sfc"} p_field : xarray.DataArray pressure field on k levels in Pa (only typeOfLevel="generalVerticalLayer" is supported) p_tc_values : list of float pressure target coordinate values p_tc_units : str pressure target coordinate units Returns ------- field_on_tc : xarray.DataArray field on target (i.e., pressure) coordinates """ # TODO: check missing value consistency with GRIB2 (currently comparisons are # done with np.nan) # check that p_field is the pressure field, given in Pa (can only be done # if attributes are consequently set) # check that field and p_field are compatible (have the same # dimensions and sizes) # print warn message if result contains missing values # Initializations # ... supported interpolation modes interpolation_modes = ("linear_in_p", "linear_in_lnp", "nearest_sfc") if mode not in interpolation_modes: raise RuntimeError("interpolate_k2p: unknown mode", mode) # ... supported tc units and corresponding conversion factors to Pa p_tc_unit_conversions = dict(Pa=1.0, hPa=100.0) if p_tc_units not in p_tc_unit_conversions.keys(): raise RuntimeError( "interpolate_k2p: unsupported value of p_tc_units", p_tc_units ) # ... supported range of pressure tc values (in Pa) p_tc_min = 1.0 p_tc_max = 120000.0 # Define vertical target coordinates (tc) tc_factor = p_tc_unit_conversions[p_tc_units] tc_values = np.array(sorted(p_tc_values)) * tc_factor if np.any((tc_values < p_tc_min) | (tc_values > p_tc_max)): raise RuntimeError( "interpolate_k2p: target coordinate value out of range " f"(must be in interval [{p_tc_min}, {p_tc_max}]Pa)" ) tc = TargetCoordinates( type_of_level="isobaricInPa", values=tc_values.tolist(), attrs=TargetCoordinatesAttrs( units="Pa", positive="down", standard_name="air_pressure", long_name="pressure", ), ) # Check that typeOfLevel is supported and equal for both field and p_field if field.vcoord_type != "model_level": raise RuntimeError( "interpolate_k2p: field to interpolate must be defined on model levels" ) if p_field.vcoord_type != "model_level": raise RuntimeError( "interpolate_k2p: pressure field must be defined on model levels" ) # Check that dimensions are the same for field and p_field if field.origin_z != p_field.origin_z: raise RuntimeError( "interpolate_k2p: field and p_field must have equal vertical staggering" ) # Prepare output field field_on_tc on target coordinates field_on_tc = init_field_with_vcoord(field.broadcast_like(p_field), tc, np.nan) # Interpolate # ... prepare interpolation pkm1 = p_field.shift(z=1) fkm1 = field.shift(z=1) # ... loop through tc values for tc_idx, p0 in enumerate(tc_values): # ... find the 3d field where pressure is > p0 on level k # and was <= p0 on level k-1 p2 = p_field.where((p_field > p0) & (pkm1 <= p0)) # ... extract the index k of the vertical layer at which p2 adopts its minimum # (corresponds to search from top of atmosphere to bottom) # ... note that if the condition above is not fulfilled, minind will # be set to k_top minind = p2.fillna(p_tc_max).argmin(dim="z") # ... extract pressure and field at level k p2 = p2[{"z": minind}] f2 = field[{"z": minind}] # ... extract pressure and field at level k-1 # ... note that f1 and p1 are both undefined, if minind equals k_top f1 = fkm1[{"z": minind}] p1 = pkm1[{"z": minind}] # ... compute the interpolation weights if mode == "linear_in_p": # ... note that p1 is undefined, if minind equals k_top, so ratio will # be undefined ratio = (p0 - p1) / (p2 - p1) if mode == "linear_in_lnp": # ... note that p1 is undefined, if minind equals k_top, so ratio will # be undefined ratio = (np.log(p0) - np.log(p1)) / (np.log(p2) - np.log(p1)) if mode == "nearest_sfc": # ... note that by construction, p2 is always defined; # this operation sets ratio to 0 if p1 (and by construction also f1) # is undefined; therefore, the interpolation formula below works # correctly also in this case ratio = xr.where(np.abs(p0 - p1) >= np.abs(p0 - p2), 1.0, 0.0) # ... interpolate and update field_on_tc field_on_tc[{"z": tc_idx}] = (1.0 - ratio) * f1 + ratio * f2 return field_on_tc
[docs] def interpolate_k2theta( field: xr.DataArray, mode: Literal["low_fold", "high_fold", "undef_fold"], th_field: xr.DataArray, th_tc_values: Sequence[float], th_tc_units: Literal["K", "cK"], h_field: xr.DataArray, ) -> xr.DataArray: """Interpolate a field from model levels to potential temperature coordinates. Example for vertical interpolation to isosurfaces of a target field that is no monotonic function of height. Parameters ---------- field : xarray.DataArray field to interpolate (only typeOfLevel="generalVerticalLayer" is supported) mode : str interpolation algorithm, one of {"low_fold", "high_fold", "undef_fold"} th_field : xarray.DataArray potential temperature theta on k levels in K (only typeOfLevel="generalVerticalLayer" is supported) th_tc_values : list of float target coordinate values th_tc_units : str target coordinate units h_field : xarray.DataArray height on k levels (only typeOfLevel="generalVerticalLayer" is supported) Returns ------- field_on_tc : xarray.DataArray field on target (i.e., theta) coordinates """ # TODO: check missing value consistency with GRIB2 # (currently comparisons are done with np.nan) # check that th_field is the theta field, given in K # (can only be done if attributes are consequently set) # check that field, th_field, and h_field are compatible # print warn message if result contains missing values # ATTENTION: the attribute "positive" is not set for generalVerticalLayer # we know that for COSMO it would be defined as positive:"down"; # for the time being, # we explicitly use the height field on model mid layer # surfaces as auxiliary field # Parameters # ... supported folding modes folding_modes = ("low_fold", "high_fold", "undef_fold") if mode not in folding_modes: raise RuntimeError("interpolate_k2theta: unsupported mode", mode) # ... supported tc units and corresponding conversion factor to K # (i.e. to the same unit as theta); according to GRIB2 # isentropic surfaces are coded in K; fieldextra codes # them in cK for NetCDF (to be checked) th_tc_unit_conversions = dict(K=1.0, cK=0.01) if th_tc_units not in th_tc_unit_conversions.keys(): raise RuntimeError( "interpolate_k2theta: unsupported value of th_tc_units", th_tc_units ) # ... supported range of tc values (in K) th_tc_min = 1.0 th_tc_max = 1000.0 # ... tc values outside range of meaningful values of height, # used in tc interval search (in m amsl) h_min = -1000.0 h_max = 100000.0 # Define vertical target coordinates # Sorting cannot be exploited for optimizations, since theta is # not monotonous wrt to height tc values are stored in K tc_values = np.array(th_tc_values) * th_tc_unit_conversions[th_tc_units] if np.any((tc_values < th_tc_min) | (tc_values > th_tc_max)): raise RuntimeError( "interpolate_k2theta: target coordinate value " f"out of range (must be in interval [{th_tc_min}, {th_tc_max}]K)" ) tc = TargetCoordinates( type_of_level="theta", values=tc_values.tolist(), attrs=TargetCoordinatesAttrs( units="K", positive="up", standard_name="air_potential_temperature", long_name="potential temperature", ), ) # Check that typeOfLevel is supported and equal for field, th_field, and h_field if field.vcoord_type != "model_level" or field.origin_z != 0.0: raise RuntimeError( "interpolate_k2theta: field to interpolate must " "be defined on model_level layers" ) if th_field.vcoord_type != "model_level" or th_field.origin_z != 0.0: raise RuntimeError( "interpolate_k2theta: theta field must be defined on model_level layers" ) if h_field.vcoord_type != "model_level" or h_field.origin_z != 0.0: raise RuntimeError( "interpolate_k2theta: height field must be defined on model_level layers" ) # Prepare output field field_on_tc on target coordinates field_on_tc = init_field_with_vcoord(field.broadcast_like(th_field), tc, np.nan) # Interpolate # ... prepare interpolation thkm1 = th_field.shift(z=1) fkm1 = field.shift(z=1) # ... loop through tc values for tc_idx, th0 in enumerate(tc.values): folding_coord_exception = xr.full_like(h_field[{"z": 0}], False) # ... find the height field where theta is >= th0 on level k and was <= th0 # on level k-1 or where theta is <= th0 on level k # and was >= th0 on level k-1 h = h_field.where( ((th_field >= th0) & (thkm1 <= th0)) | ((th_field <= th0) & (thkm1 >= th0)) ) if mode == "undef_fold": # ... find condition where more than one interval is found, which # contains the target coordinate value folding_coord_exception = xr.where(h.notnull(), 1.0, 0.0).sum(dim=["z"]) folding_coord_exception = folding_coord_exception.where( folding_coord_exception > 1.0 ).notnull() if mode in ("low_fold", "undef_fold"): # ... extract the index k of the smallest height at which # the condition is fulfilled tcind = h.fillna(h_max).argmin(dim="z") if mode == "high_fold": # ... extract the index k of the largest height at which the condition # is fulfilled tcind = h.fillna(h_min).argmax(dim="z") # ... extract theta and field at level k th2 = th_field[{"z": tcind}] f2 = field[{"z": tcind}] # ... extract theta and field at level k-1 f1 = fkm1[{"z": tcind}] th1 = thkm1[{"z": tcind}] # ... compute the interpolation weights ratio = xr.where(np.abs(th2 - th1) > 0, (th0 - th1) / (th2 - th1), 0.0) # ... interpolate and update field_on_tc field_on_tc[{"z": tc_idx}] = xr.where( folding_coord_exception, np.nan, (1.0 - ratio) * f1 + ratio * f2 ) return field_on_tc
[docs] def interpolate_k2any( field: xr.DataArray, mode: Literal["low_fold", "high_fold"], tc_field: xr.DataArray, tc: TargetCoordinates, h_field: xr.DataArray, ) -> xr.DataArray: """Interpolate a field from model levels to coordinates w.r.t. an arbitrary field. Example for vertical interpolation to isosurfaces of a target field that is no monotonic function of height. Parameters ---------- field : xarray.DataArray field to interpolate (only typeOfLevel="generalVerticalLayer" is supported) mode : str interpolation algorithm, one of {"low_fold", "high_fold"} tc_field : xarray.DataArray target field (only typeOfLevel="generalVerticalLayer" is supported) tc : TargetCoordinates target coordinate definition h_field : xarray.DataArray height on k levels (only typeOfLevel="generalVerticalLayer" is supported) Returns ------- field_on_tc : xarray.DataArray field on target coordinates """ modes = ("low_fold", "high_fold") if mode not in modes: raise ValueError(f"Unsupported mode: {mode}") for f in (field, tc_field, h_field): if f.vcoord_type != "model_level" or f.origin_z != 0.0: raise ValueError("Input fields must be defined on full model levels") # ... tc values outside range of meaningful values of height, # used in tc interval search (in m amsl) h_min = -1000.0 h_max = 100000.0 # Prepare output field field_on_tc on target coordinates field_on_tc = init_field_with_vcoord(field.broadcast_like(tc_field), tc, np.nan) # Interpolate # ... prepare interpolation tckm1 = tc_field.shift(z=1) fkm1 = field.shift(z=1) for tc_idx, value in enumerate(tc.values): # ... find the height field where target is >= value on level k and was <= value # on level k-1 or where target is <= value on level k # and was >= value on level k-1 h = h_field.where( ((tc_field >= value) & (tckm1 <= value)) | ((tc_field <= value) & (tckm1 >= value)) ) if mode == "low_fold": # ... extract the index k of the smallest height at which # the condition is fulfilled tcind = h.fillna(h_max).argmin(dim="z") if mode == "high_fold": # ... extract the index k of the largest height at which the condition # is fulfilled tcind = h.fillna(h_min).argmax(dim="z") # ... extract target and field at level k t2 = tc_field[{"z": tcind}] f2 = field[{"z": tcind}] # ... extract target and field at level k-1 f1 = fkm1[{"z": tcind}] t1 = tckm1[{"z": tcind}] # ... compute the interpolation weights ratio = xr.where(np.abs(t2 - t1) > 0, (value - t1) / (t2 - t1), 0.0) # ... interpolate and update field_on_tc field_on_tc[{"z": tc_idx}] = (1.0 - ratio) * f1 + ratio * f2 return field_on_tc