Source code for clisops.ops.subset

from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union

import xarray as xr
from loguru import logger
from roocs_utils.parameter import parameterise
from roocs_utils.parameter.area_parameter import AreaParameter
from roocs_utils.parameter.level_parameter import LevelParameter
from roocs_utils.parameter.param_utils import Interval, Series, TimeComponents
from roocs_utils.parameter.time_components_parameter import TimeComponentsParameter
from roocs_utils.parameter.time_parameter import TimeParameter

from clisops.core import (
    subset_bbox,
    subset_level,
    subset_level_by_values,
    subset_time,
    subset_time_by_components,
    subset_time_by_values,
)
from clisops.core.subset import assign_bounds, get_lat, get_lon  # noqa
from clisops.ops.base_operation import Operation
from clisops.utils.dataset_utils import cf_convert_between_lon_frames

__all__ = ["Subset", "subset"]


[docs] class Subset(Operation): def _resolve_params(self, **params): """Generates a dictionary of subset parameters.""" time = params.get("time", None) area = params.get("area", None) level = params.get("level", None) time_comps = params.get("time_components", None) logger.debug( f"Mapping parameters: time: {time}, area: {area}, " f"level: {level}, time_components: {time_comps}." ) # Set up args dictionary to be used by `self._calculate()` args = dict() parameters = parameterise( collection=self.ds, time=time, area=area, level=level, time_components=time_comps, ) # For each required parameter, check if the parameter can be accessed as a tuple # If not: then use the dictionary representation for it for param_name in ["time", "area", "level", "time_components"]: param_value = parameters.get(param_name) if param_value.value is not None: args.update(param_value.asdict()) # Rename start_time and end_time to start_date and end_date to # match clisops.core.subset function parameters. if "start_time" in args: args["start_date"] = args.pop("start_time") if "end_time" in args: args["end_date"] = args.pop("end_time") self.params = args def _calculate(self): if "lon_bnds" and "lat_bnds" in self.params: lon = get_lon(self.ds) lat = get_lat(self.ds) # ensure lat/lon bounds are in the same order as data, before trying to roll # if descending in dataset, they will be flipped in subset_bbox self.params["lon_bnds"], self.params["lat_bnds"] = ( assign_bounds(self.params.get("lon_bnds"), self.ds[lon.name]), assign_bounds(self.params.get("lat_bnds"), self.ds[lat.name]), ) # subset with space and optionally time and level logger.debug(f"subset_bbox with parameters: {self.params}") # bounds are always ascending, so if lon is descending rolling will not work. ds, lb, ub = cf_convert_between_lon_frames( self.ds, self.params.get("lon_bnds") ) self.params["lon_bnds"] = (lb, ub) try: kwargs = {} valid_args = [ "lon_bnds", "lat_bnds", "start_date", "end_date", "first_level", "last_level", "time_values", "level_values", ] for arg in valid_args: kwargs.setdefault(arg, self.params.get(arg, None)) result = subset_bbox(ds, **kwargs) except NotImplementedError: lon_min, lon_max = lon.values.min(), lon.values.max() raise Exception( f"The requested longitude subset {self.params.get('lon_bnds')} is not within the longitude bounds " "of this dataset and the data could not be converted to this longitude frame successfully. " "Please re-run your request with longitudes within the bounds of the dataset: " f"({lon_min:.2f}, {lon_max:.2f})" ) else: kwargs = {} valid_args = ["start_date", "end_date"] for arg in valid_args: kwargs.setdefault(arg, self.params.get(arg, None)) # Subset over time interval if requested if any(kwargs.values()): logger.debug(f"subset_time with parameters: {kwargs}") result = subset_time(self.ds, **kwargs) # Subset a series of time values if requested elif self.params.get("time_values"): result = subset_time_by_values( self.ds, time_values=self.params["time_values"] ) else: result = self.ds # Now test for level subsetting kwargs = {} valid_args = ["first_level", "last_level"] for arg in valid_args: kwargs.setdefault(arg, self.params.get(arg, None)) # Subset with level only if any(kwargs.values()): # ensure bounds are ascending if self.params.get("first_level") > self.params.get("last_level"): first, last = self.params.get("first_level"), self.params.get( "last_level" ) self.params["first_level"], self.params["last_level"] = last, first logger.debug(f"subset_level with parameters: {kwargs}") result = subset_level(result, **kwargs) elif self.params.get("level_values", None): kwargs = {"level_values": self.params["level_values"]} logger.debug(f"subset_level_by_values with parameters: {kwargs}") result = subset_level_by_values(result, **kwargs) # Now apply time components if specified time_comps = self.params.get("time_components") if time_comps: logger.debug(f"subset_by_time_components with parameters: {time_comps}") result = subset_time_by_components(result, time_components=time_comps) return result
[docs] def subset( ds: Union[xr.Dataset, str, Path], *, time: Optional[Union[str, Tuple[str, str], TimeParameter, Series, Interval]] = None, area: Optional[ Union[ str, Tuple[ Union[int, float, str], Union[int, float, str], Union[int, float, str], Union[int, float, str], ], AreaParameter, ] ] = None, level: Optional[ Union[ str, Tuple[Union[int, float, str], Union[int, float, str]], LevelParameter, Interval, ] ] = None, time_components: Optional[ Union[str, Dict, TimeComponents, TimeComponentsParameter] ] = None, output_dir: Optional[Union[str, Path]] = None, output_type="netcdf", split_method="time:auto", file_namer="standard", ) -> List[Union[xr.Dataset, str]]: """Subset operation. Parameters ---------- ds : Union[xr.Dataset, str] time : Optional[Union[str, Tuple[str, str], TimeParameter, Series, Interval]] = None, area : str or AreaParameter or Tuple[Union[int, float, str], Union[int, float, str], Union[int, float, str], Union[int, float, str]], optional level : Optional[Union[str, Tuple[Union[int, float, str], Union[int, float, str]], LevelParameter, Interval] = None, time_components : Optional[Union[str, Dict, TimeComponentsParameter]] = None, output_dir : Optional[Union[str, Path]] = None output_type : {"netcdf", "nc", "zarr", "xarray"} split_method : {"time:auto"} file_namer : {"standard", "simple"} Returns ------- List[Union[xr.Dataset, str]] A list of the subsetted outputs in the format selected; str corresponds to file paths if the output format selected is a file. Examples -------- | ds: xarray Dataset or "cmip5.output1.MOHC.HadGEM2-ES.rcp85.mon.atmos.Amon.r1i1p1.latest.tas" | time: ("1999-01-01T00:00:00", "2100-12-30T00:00:00") or "2085-01-01T12:00:00Z/2120-12-30T12:00:00Z" | area: (-5.,49.,10.,65) or "0.,49.,10.,65" or [0, 49.5, 10, 65] with the order being lon_0, lat_0, lon_1, lat_1 | level: (1000.,) or "1000/2000" or ("1000.50", "2000.60") | time_components: "year:2000,2004,2008|month:01,02" or {"year": (2000, 2004, 2008), "months": (1, 2)} | output_dir: "/cache/wps/procs/req0111" | output_type: "netcdf" | split_method: "time:auto" | file_namer: "standard" Note ---- If you request a selection range (such as level, latitude or longitude) that specifies the lower and upper bounds in the opposite direction to the actual coordinate values then clisops.ops.subset will detect this issue and reverse your selection before returning the data subset. """ op = Subset(**locals()) return op.process()