Source code for clisops.ops.average

from pathlib import Path
from typing import List, Optional, Sequence, Tuple, Union

import geopandas as gpd
import xarray as xr
from roocs_utils.exceptions import InvalidParameterValue
from roocs_utils.parameter.dimension_parameter import DimensionParameter
from roocs_utils.xarray_utils.xarray_utils import convert_coord_to_axis

from clisops.core import average
from clisops.ops.base_operation import Operation
from clisops.utils.file_namers import get_file_namer

__all__ = ["average_over_dims", "average_time", "average_shape"]


class Average(Operation):
    def _resolve_params(self, **params):
        dims = DimensionParameter(params.get("dims", None)).value
        ignore_undetected_dims = params.get("ignore_undetected_dims", False)

        self.params = {"dims": dims, "ignore_undetected_dims": ignore_undetected_dims}

    def _get_file_namer(self):
        if self.params.get("dims", None):
            dims = [convert_coord_to_axis(dim) for dim in self.params["dims"]]
            extra = f"_avg-{''.join(sorted(dims))}"
        else:
            extra = ""

        namer = get_file_namer(self._file_namer)(extra=extra)

        return namer

    def _calculate(self):
        avg_ds = average.average_over_dims(
            self.ds,
            self.params.get("dims", None),
            self.params.get("ignore_undetected_dims", None),
        )

        return avg_ds


[docs] def average_over_dims( ds: Union[xr.Dataset, str], dims: Optional[Union[Sequence[str], DimensionParameter]] = None, ignore_undetected_dims: bool = False, output_dir: Optional[Union[str, Path]] = None, output_type: str = "netcdf", split_method: str = "time:auto", file_namer: str = "standard", ) -> List[Union[xr.Dataset, str]]: """Calculate an average over given dimensions. Parameters ---------- ds : Union[xr.Dataset, str] Xarray dataset. dims : Optional[Union[Sequence[{"time", "level", "latitude", "longitude"}], DimensionParameter]] The dimensions over which to apply the average. If None, none of the dimensions are averaged over. Dimensions must be one of ["time", "level", "latitude", "longitude"]. ignore_undetected_dims : bool If the dimensions specified are not found in the dataset, an Exception will be raised if set to True. If False, an exception will not be raised and the other dimensions will be averaged over. Default = False output_dir : Optional[Union[str, Path]] output_type : {"netcdf", "nc", "zarr", "xarray"} split_method : {"time:auto"} file_namer : {"standard", "simple"} Returns ------- List[Union[xr.Dataset, str]] A list of the outputs in the format selected; str corresponds to file paths if the output format selected is a file. Examples -------- | ds: xarray Dataset or "cmip5.output1.MOHC.HadGEM2-ES.rcp85.mon.atmos.Amon.r1i1p1.latest.tas" | dims: ['latitude', 'longitude'] | ignore_undetected_dims: False | output_dir: "/cache/wps/procs/req0111" | output_type: "netcdf" | split_method: "time:auto" | file_namer: "standard" """ op = Average(**locals()) return op.process()
class AverageShape(Operation): def _resolve_params(self, **params): shape = params.get("shape") variable = params.get("variable", None) self.params = {"shape": shape, "variable": variable} if not shape: raise InvalidParameterValue( "At least one area for averaging must be provided" ) def _get_file_namer(self): extra = "_avg-shape" namer = get_file_namer(self._file_namer)(extra=extra) return namer def _calculate(self): avg_ds = average.average_shape( self.ds, self.params.get("shape", None), self.params.get("variable", None), ) return avg_ds
[docs] def average_shape( ds: Union[xr.Dataset, Path, str], shape: Union[str, Path, gpd.GeoDataFrame], variable: Optional[Union[str, Sequence[str]]] = None, output_dir: Optional[Union[str, Path]] = None, output_type: str = "netcdf", split_method: str = "time:auto", file_namer: str = "standard", ) -> List[Union[xr.Dataset, str]]: """Calculate a spatial average over a given shape. Parameters ---------- ds : Union[xr.Dataset, str] Xarray dataset. shape : Union[str, Path, gpd.GeoDataFrame] Path to shape file, or directly a GeoDataFrame. Supports formats compatible with geopandas. Will be converted to EPSG:4326 if needed. variable : Optional[Union[str, Sequence[str], None]] Variables to average. If None, average over all data variables. output_dir : Optional[Union[str, Path]] output_type : {"netcdf", "nc", "zarr", "xarray"} split_method : {"time:auto"} file_namer : {"standard", "simple"} Returns ------- List[Union[xr.Dataset, str]] A list of the outputs in the format selected; str corresponds to file paths if the output format selected is a file. Examples -------- | ds: xarray Dataset or "cmip5.output1.MOHC.HadGEM2-ES.rcp85.mon.atmos.Amon.r1i1p1.latest.tas" | dims: ['latitude', 'longitude'] | ignore_undetected_dims: False | output_dir: "/cache/wps/procs/req0111" | output_type: "netcdf" | split_method: "time:auto" | file_namer: "standard" """ op = AverageShape(**locals()) return op.process()
class AverageTime(Operation): def _resolve_params(self, **params): freq = params.get("freq", None) if not freq: raise InvalidParameterValue( "At least one frequency for averaging must be provided" ) if freq not in list(average.freqs.keys()): raise InvalidParameterValue( f"Time frequency for averaging must be one of {list(average.freqs.keys())}." ) self.params = {"freq": freq} def _get_file_namer(self): extra = f"_avg-{self.params.get('freq')}" namer = get_file_namer(self._file_namer)(extra=extra) return namer def _calculate(self): avg_ds = average.average_time( self.ds, self.params.get("freq", None), ) return avg_ds
[docs] def average_time( ds: Union[xr.Dataset, str], freq: str, output_dir: Optional[Union[str, Path]] = None, output_type: str = "netcdf", split_method: str = "time:auto", file_namer: str = "standard", ) -> List[Union[xr.Dataset, str]]: """ Parameters ---------- ds : Union[xr.Dataset, str] Xarray dataset. freq : str The frequency to average over. One of "month", "year". output_dir : Optional[Union[str, Path]] output_type : {"netcdf", "nc", "zarr", "xarray"} split_method : {"time:auto"} file_namer: {"standard", "simple"} Returns ------- List[Union[xr.Dataset, str]] A list of the outputs in the format selected; str corresponds to file paths if the output format selected is a file. Examples -------- | ds: xarray Dataset or "cmip5.output1.MOHC.HadGEM2-ES.rcp85.mon.atmos.Amon.r1i1p1.latest.tas" | dims: ['latitude', 'longitude'] | ignore_undetected_dims: False | output_dir: "/cache/wps/procs/req0111" | output_type: "netcdf" | split_method: "time:auto" | file_namer: "standard" """ op = AverageTime(**locals()) return op.process()