Source code for meteodatalab.data_cache

"""Helper class to cache data.

The helper is intended to create a cache of grib files with a layout that is
suitable for reading by fieldextra.

"""

# Standard library
import dataclasses as dc
from itertools import product
from pathlib import Path

# Local
from . import data_source

DEFAULT_FILES = {
    "inputi": "<mmm>/lfff<ddhh>0000",
    "inputc": "<mmm>/lfff00000000c",
}


[docs] @dc.dataclass class DataCache: cache_dir: Path fields: dict[str, list] files: dict[str, str] = dc.field(default_factory=lambda: DEFAULT_FILES) steps: list[int] = dc.field(default_factory=lambda: [0]) numbers: list[int] = dc.field(default_factory=lambda: [0]) _populated: list[Path] = dc.field(default_factory=list, init=False) def __post_init__(self): if not self.fields.keys() <= self.files.keys(): raise ValueError("fields keys must be a subset of files keys") @property def populated_files(self) -> list[Path]: return self._populated @property def conf_files(self) -> dict[str, Path]: return { label: self.cache_dir / pattern for label, pattern in self.files.items() } def _iter_files(self): # support more patterns ? # https://github.com/COSMO-ORG/fieldextra/blob/develop/documentation/README.user#L2797 patterns = ( ("<mmm>", "{mmm:03d}"), ("<ddhh>", "{dd:02d}{hh:02d}"), ) for label, name in self.files.items(): name = name.lower() numbers = self.numbers if "<mmm>" in name else [None] steps = self.steps if "<ddhh>" in name else [None] for src, dst in patterns: name = name.replace(src, dst) for number, step in product(numbers, steps): dd = step // 24 if step is not None else None hh = step % 24 if step is not None else None yield label, name.format(mmm=number, dd=dd, hh=hh), number, step def _iter_requests(self, label: str, number: int | None, step: int | None): param_map: dict[str, list[str]] = {} for param, levtype in self.fields[label]: param_map.setdefault(levtype, []).append(param) for levtype, params in param_map.items(): req = {"param": params, "levtype": levtype} if number is not None: req["number"] = number if step is not None: req["step"] = step yield req def populate(self, source: data_source.DataSource): for label, rel_path, number, step in self._iter_files(): path = self.cache_dir / rel_path path.parent.mkdir(parents=True, exist_ok=True) with path.open("ba") as f: for req in self._iter_requests(label, number, step): for field in source.retrieve(req): f.write(field.message()) self._populated.append(path) def clear(self): for path in self._populated: path.unlink()