"""Data source helper class."""
# Standard library
import dataclasses as dc
import sys
import typing
from abc import ABC, abstractmethod
from collections.abc import Iterator
from contextlib import contextmanager, nullcontext
from functools import singledispatchmethod
from pathlib import Path
# Third-party
import earthkit.data as ekd # type: ignore
import eccodes # type: ignore
import polytope # type: ignore
# Local
from . import config, mars
[docs]
@contextmanager
def cosmo_grib_defs():
"""Enable COSMO GRIB definitions."""
restore = eccodes.codes_definition_path()
root_dir = Path(sys.prefix) / "share"
paths = (
root_dir / "eccodes-cosmo-resources/definitions",
Path(restore),
)
for path in paths:
if not path.exists():
raise RuntimeError(f"{path} does not exist")
defs_path = ":".join(map(str, paths))
eccodes.codes_set_definitions_path(defs_path)
try:
yield
finally:
eccodes.codes_set_definitions_path(restore)
def grib_def_ctx(grib_def: str):
if grib_def == "cosmo":
return cosmo_grib_defs()
return nullcontext()
[docs]
@dc.dataclass
class DataSource(ABC):
request_template: dict[str, typing.Any] = dc.field(default_factory=dict)
[docs]
@singledispatchmethod
def retrieve(
self, request: dict[str, typing.Any] | str | tuple[str, str] | mars.Request
) -> Iterator:
"""Stream GRIB fields from files or FDB.
Request for data from the source in the mars language.
Simple strings are interpreted as `param` filters and pairs of strings
are interpreted as `param` and `levtype` filters.
Key value pairs from the `request_template` attribute are used as default
values. Note that the default values in the mars request passed as an input
will take precedence on the template values.
Parameters
----------
request : dict | str | tuple[str, str] | meteodatalab.mars.Request
Request for data from the source in the mars language.
Yields
------
GribField
GribField instances containing the requested data.
"""
raise NotImplementedError(f"request of type {type(request)} not supported.")
@retrieve.register
def _(self, request: dict) -> Iterator:
# The presence of the yield keyword makes this def a generator.
# As a result, the context manager will remain active until the
# exhaustion of the data source iterator.
grib_def = config.get("data_scope", "cosmo")
with grib_def_ctx(grib_def):
yield from self._retrieve(request)
@retrieve.register
def _(self, request: mars.Request) -> Iterator:
yield from self.retrieve(request.dump())
@retrieve.register
def _(self, request: str) -> Iterator:
yield from self.retrieve({"param": request})
@retrieve.register
def _(self, request: tuple) -> Iterator:
param, levtype = request
yield from self.retrieve({"param": param, "levtype": levtype})
@abstractmethod
def _retrieve(self, request: dict):
pass
[docs]
@dc.dataclass
class FDBDataSource(DataSource):
def _retrieve(self, request: dict):
req_kwargs = self.request_template | request
req = mars.Request(**req_kwargs)
yield from ekd.from_source("fdb", req.to_fdb())
[docs]
@dc.dataclass
class FileDataSource(DataSource):
datafiles: list[str] | None = None
def _retrieve(self, request: dict):
req_kwargs = self.request_template | request
_ = mars.Request(**req_kwargs)
fs = ekd.from_source("file", self.datafiles)
yield from fs.sel(req_kwargs)
[docs]
@dc.dataclass
class PolytopeDataSource(DataSource):
polytope_collection: str | None = None
polytope_client: polytope.api.Client = dc.field(init=False)
def __post_init__(self):
self.polytope_client = polytope.api.Client()
def _retrieve(self, request: dict):
req_kwargs = self.request_template | request
req = mars.Request(**req_kwargs)
pointers = self.polytope_client.retrieve(
self.polytope_collection,
req.to_polytope(),
pointer=True,
asynchronous=False,
)
urls = [p["location"] for p in pointers]
yield from ekd.from_source("url", urls, stream=True)
[docs]
@dc.dataclass
class URLDataSource(DataSource):
urls: list[str] | None = None
def _retrieve(self, request: dict):
req_kwargs = self.request_template | request
fs = ekd.from_source("url", self.urls)
yield from fs.sel(**req_kwargs)