Source code for clisops.ops.average

"""Average operations for xarray datasets."""

from collections.abc import Sequence
from pathlib import Path

import geopandas as gpd
import xarray as xr

from clisops.core import average
from clisops.exceptions import InvalidParameterValue
from clisops.ops.base_operation import Operation
from clisops.parameter import DimensionParameter
from clisops.utils.dataset_utils import convert_coord_to_axis
from clisops.utils.file_namers import get_file_namer

__all__ = ["average_over_dims", "average_shape", "average_time"]


class Average(Operation):
    """Average operation for xarray datasets."""

    def _resolve_params(self, **params):
        dims = DimensionParameter(params.get("dims", None)).value
        ignore_undetected_dims = params.get("ignore_undetected_dims", False)

        self.params = {"dims": dims, "ignore_undetected_dims": ignore_undetected_dims}

    def _get_file_namer(self):
        if self.params.get("dims", None):
            dims = [convert_coord_to_axis(dim) for dim in self.params["dims"]]
            extra = f"_avg-{''.join(sorted(dims))}"
        else:
            extra = ""

        namer = get_file_namer(self._file_namer)(extra=extra)

        return namer

    def _calculate(self):
        avg_ds = average.average_over_dims(
            self.ds,
            self.params.get("dims", None),
            self.params.get("ignore_undetected_dims", None),
        )

        return avg_ds


[docs] def average_over_dims( ds: xr.Dataset | str, dims: Sequence[str] | DimensionParameter | None = None, ignore_undetected_dims: bool = False, output_dir: str | Path | None = None, output_type: str = "netcdf", split_method: str = "time:auto", file_namer: str = "standard", ) -> list[xr.Dataset | str]: """ Calculate an average over given dimensions. Parameters ---------- ds : xr.Dataset or str Xarray dataset. dims : Sequence of str or DimensionParameter, optional The dimensions over which to apply the average. If None, none of the dimensions are averaged over. Dimensions must be one of ["time", "level", "latitude", "longitude"]. ignore_undetected_dims : bool If the dimensions specified are not found in the dataset, an Exception will be raised if set to True. If False, an exception will not be raised and the other dimensions will be averaged over. Default = False. output_dir : str or Path, optional The directory where the output files will be saved. If None, the output will not be saved to disk. output_type : {"netcdf", "nc", "zarr", "xarray"} The format of the output files. If "xarray", the output will be an xarray Dataset. If "netcdf", "nc", or "zarr", the output will be saved to disk in the specified format. split_method : {"time:auto"} The method to split the output files. Currently only "time:auto" is supported, which will file_namer : {"standard", "simple"} The file namer to use for generating output file names. "standard" uses a more descriptive naming convention, while "simple" uses a numbered sequence. Returns ------- list of xr.Dataset or str A list of the outputs in the format selected; str corresponds to file paths if the output format selected is a file. Examples -------- | ds: xarray Dataset or "cmip5.output1.MOHC.HadGEM2-ES.rcp85.mon.atmos.Amon.r1i1p1.latest.tas" | dims: ['latitude', 'longitude'] | ignore_undetected_dims: False | output_dir: "/cache/wps/procs/req0111" | output_type: "netcdf" | split_method: "time:auto" | file_namer: "standard" """ op = Average(**locals()) return op.process()
class AverageShape(Operation): """Average operation for xarray datasets over a given shape.""" def _resolve_params(self, **params): shape = params.get("shape") variable = params.get("variable", None) self.params = {"shape": shape, "variable": variable} if not shape: raise InvalidParameterValue("At least one area for averaging must be provided") def _get_file_namer(self): extra = "_avg-shape" namer = get_file_namer(self._file_namer)(extra=extra) return namer def _calculate(self): avg_ds = average.average_shape( self.ds, self.params.get("shape", None), self.params.get("variable", None), ) return avg_ds
[docs] def average_shape( ds: xr.Dataset | Path | str, shape: str | Path | gpd.GeoDataFrame, variable: str | Sequence[str] | None = None, output_dir: str | Path | None = None, output_type: str = "netcdf", split_method: str = "time:auto", file_namer: str = "standard", ) -> list[xr.Dataset | str]: """ Calculate a spatial average over a given shape. Parameters ---------- ds : xr.Dataset or str or Path Xarray dataset. shape : str, Path, or gpd.GeoDataFrame Path to shape file, or directly a GeoDataFrame. Supports formats compatible with geopandas. Will be converted to EPSG:4326 if needed. variable : str or sequence of str, optional Variables to average. If None, average over all data variables. output_dir : str or Path, optional The directory where the output files will be saved. If None, the output will not be saved to disk. output_type : {"netcdf", "nc", "zarr", "xarray"} The format of the output files. If "xarray", the output will be an xarray Dataset. split_method : {"time:auto"} The method to split the output files. Currently only "time:auto" is supported, which will automatically split the output files based on time. file_namer : {"standard", "simple"} The file namer to use for generating output file names. "standard" uses a more descriptive naming convention, while "simple" uses a numbered sequence. Returns ------- list of xr.Dataset or str A list of the outputs in the format selected. str corresponds to file paths if the output format selected is a file. Examples -------- | ds: xarray Dataset or "cmip5.output1.MOHC.HadGEM2-ES.rcp85.mon.atmos.Amon.r1i1p1.latest.tas" | dims: ['latitude', 'longitude'] | ignore_undetected_dims: False | output_dir: "/cache/wps/procs/req0111" | output_type: "netcdf" | split_method: "time:auto" | file_namer: "standard" """ op = AverageShape(**locals()) return op.process()
class AverageTime(Operation): """Average operation for xarray datasets over a given time frequency.""" def _resolve_params(self, **params): freq = params.get("freq", None) if not freq: raise InvalidParameterValue("At least one frequency for averaging must be provided") if freq not in list(average.freqs.keys()): raise InvalidParameterValue(f"Time frequency for averaging must be one of {list(average.freqs.keys())}.") self.params = {"freq": freq} def _get_file_namer(self): extra = f"_avg-{self.params.get('freq')}" namer = get_file_namer(self._file_namer)(extra=extra) return namer def _calculate(self): avg_ds = average.average_time( self.ds, self.params.get("freq", None), ) return avg_ds
[docs] def average_time( ds: xr.Dataset | str, freq: str, output_dir: str | Path | None = None, output_type: str = "netcdf", split_method: str = "time:auto", file_namer: str = "standard", ) -> list[xr.Dataset | str]: """ Calculate an average over time for a given frequency. Parameters ---------- ds : xr.Dataset or str Xarray dataset. freq : str The frequency to average over, either "month" or "year". output_dir : str or Path, optional The directory where the output files will be saved. If None, the output will not be saved to disk. output_type : {"netcdf", "nc", "zarr", "xarray"} The format of the output files. If "xarray", the output will be an xarray Dataset. split_method : {"time:auto"} The method to split the output files. Currently only "time:auto" is supported, which will automatically split the output files based on time. file_namer : {"standard", "simple"} The file namer to use for generating output file names. "standard" uses a more descriptive naming convention, while "simple" uses a numbered sequence. Returns ------- List of datasets or file paths A list of the outputs in the format selected. str corresponds to file paths if the output format selected is a file. Examples -------- | ds: xarray Dataset or "cmip5.output1.MOHC.HadGEM2-ES.rcp85.mon.atmos.Amon.r1i1p1.latest.tas" | dims: ['latitude', 'longitude'] | ignore_undetected_dims: False | output_dir: "/cache/wps/procs/req0111" | output_type: "netcdf" | split_method: "time:auto" | file_namer: "standard" """ op = AverageTime(**locals()) return op.process()