Source code for ocean_model_skill_assessor.utils

"""
Utility functions.
"""

import json
import logging
import pathlib
import sys

from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple, Union

import cf_pandas as cfp
import cf_xarray
import extract_model as em
import intake
import numpy as np
import pandas as pd
import pyproj
import xarray as xr

from cf_pandas import Vocab, always_iterable, astype, merge
from intake.catalog import Catalog
from shapely.geometry import Polygon
from xarray import DataArray, Dataset

from .paths import Paths


[docs] def read_model_file( fname_processed_model: Path, no_Z: bool, dsm: xr.Dataset ) -> xr.Dataset: """_summary_ Parameters ---------- fname_processed_model : Path Model file path no_Z : bool _description_ dsm : Dataset Returns ------- Processed model output (Dataset) """ model = xr.open_dataset(fname_processed_model).cf.guess_coord_axis() try: check_dataset(model, no_Z=no_Z) except KeyError: # see if I can fix it model = fix_dataset(model, dsm) check_dataset(model, no_Z=no_Z) return model
[docs] def read_processed_data_file( fname_processed_data: Path, no_Z: bool ) -> Union[xr.Dataset, pd.DataFrame]: """_summary_ Parameters ---------- fname_processed_data : Path Data file path no_Z : bool _description_ Returns ------- Processed data (DataFrame or Dataset) """ # read in from newly made file to make sure output is loaded if ".csv" in str(fname_processed_data): obs = pd.read_csv(fname_processed_data) obs = check_dataframe(obs, no_Z) elif ".nc" in str(fname_processed_data): obs = xr.open_dataset(fname_processed_data).cf.guess_coord_axis() check_dataset(obs, is_model=False, no_Z=no_Z) else: raise TypeError("object is neither DataFrame nor Dataset.") return obs
[docs] def save_processed_files( dfd: Union[xr.Dataset, pd.DataFrame], fname_processed_data: Path, model_var: xr.Dataset, fname_processed_model: Path, ): """Save processed data and model output into files. Parameters ---------- dfd : Union[xr.Dataset, pd.DataFrame] Processed data fname_processed_data : Path Data file path model_var : xr.Dataset Processed model output fname_processed_model : Path Model file path """ if isinstance(dfd, pd.DataFrame): # make sure datetimes will be recognized when reread # actually seems to work without this dfd = dfd.rename(columns={dfd.cf["T"].name: "time_UTC"}) dfd.to_csv(fname_processed_data, index=False) elif isinstance(dfd, xr.Dataset): dfd.to_netcdf(fname_processed_data) dfd.close() else: raise TypeError("object is neither DataFrame nor Dataset.") if fname_processed_model.is_file(): pathlib.Path.unlink(fname_processed_model) model_var.to_netcdf(fname_processed_model, mode="w") model_var.close()
[docs] def fix_dataset( model_var: Union[xr.DataArray, xr.Dataset], ds: Union[xr.DataArray, xr.Dataset] ) -> Union[xr.DataArray, xr.Dataset]: """Fill in info necessary to pass `check_dataset()` if possible. Right now it is only for converting horizontal indices to lon/lat but conceivably could do more in the future. Looks for lon/lat being 2D coords. Parameters ---------- model_var : Union[xr.DataArray,xr.Dataset] xarray object that needs some more info filled in ds : Union[xr.DataArray,xr.Dataset] xarray object that has info that can be used to fill in model_var Returns ------- Union[xr.DataArray,xr.Dataset] model_var with more information included, hopefully """ # see if lon/lat are in model_var as data_vars instead of as coordinates if ( "longitude" not in model_var.cf.coordinates and "longitude" in model_var.cf ) or ("latitude" not in model_var.cf.coordinates and "latitude" in model_var.cf): lonkey, latkey = model_var.cf["longitude"].name, model_var.cf["latitude"].name model_var = model_var.assign_coords( {lonkey: model_var[lonkey], latkey: model_var[latkey]} ) # if we have X/Y indices in model_var but not their equivalent lon/lat, get them from ds elif ( "longitude" not in model_var.cf.coordinates and "X" in model_var.cf and "longitude" in ds.cf.coordinates # and ds.cf["longitude"].ndim == 2 and ds[cf_xarray.accessor._get_all(ds, "longitude")[0]].ndim == 2 and "latitude" not in model_var.cf.coordinates and "Y" in model_var.cf and "latitude" in ds.cf # and ds.cf["latitude"].ndim == 2 and ds[cf_xarray.accessor._get_all(ds, "latitude")[0]].ndim == 2 ): lonkey, latkey = ds.cf["longitude"].name, ds.cf["latitude"].name X, Y = model_var.cf["X"], model_var.cf["Y"] # model_var[lonkey] = ds.cf["longitude"].isel({Y.name: Y, X.name: X}) # model_var[lonkey].attrs = ds[lonkey].attrs model_var = model_var.assign_coords( { lonkey: ds.cf["longitude"].isel({Y.name: Y, X.name: X}), latkey: ds.cf["latitude"].isel({Y.name: Y, X.name: X}), } ) # see if Z is in variables but not in coords # can't figure out how to catch this case but generalize yet if "Z" not in model_var.cf.coordinates and "s_rho" in model_var.variables: model_var = model_var.assign_coords({"s_rho": model_var["s_rho"]}) return model_var
[docs] def check_dataset( ds: Union[xr.DataArray, xr.Dataset], is_model: bool = True, no_Z: bool = False ): """Check xarray datasets (usually model output) for necessary cf-xarray dims/coords. If Dataset is model output (`is_model=True`), must have T, Z, vertical, latitude, longitude, and "positive" attribute must be associated with Z or vertical. But, if `no_Z=True`, neither Z, vertical, nor positive attribute need to be present. If Dataset is not model output (is_model=False), must have T, Z, latitude, longitude. But, if `no_Z=True`, Z does not need to be present. """ if "T" not in ds.cf: raise KeyError( "a variable of datetimes needs to be identifiable by `cf-xarray` in dataset. Ways to address this include: variable name has the word 'time' in it; variable contains datetime objects; variable has an attribute of `'axis': 'T'`. See `cf-xarray` docs for more information." ) if not no_Z: if is_model: if "Z" not in ds.cf or "vertical" not in ds.cf: raise KeyError( "a variable of depths needs to be identifiable by `cf-xarray` in dataset for both axis 'Z' and coordinate 'vertical'. Ways to address this include: variable name has the word 'depth' in it; for axis 'Z' variable has an attribute of `'axis': 'Z'`. See `cf-xarray` docs for more information." ) if ( "positive" not in ds[ds.cf.axes["Z"][0]].attrs and "positive" not in ds[ds.cf.coordinates["vertical"][0]].attrs ): raise KeyError( "ds.cf['Z'] or ds.cf['vertical'] needs to have an attribute stating `'positive': 'up'` or `'positive': 'down'`." ) else: if "Z" not in ds.cf: raise KeyError( "a variable of depths needs to be identifiable by `cf-xarray` in dataset for axis 'Z'. Ways to address this include: variable name has the word 'depth' in it; variable has an attribute of `'axis': 'Z'`. See `cf-xarray` docs for more information." ) if "longitude" not in ds.cf.coordinates or "latitude" not in ds.cf.coordinates: raise KeyError( "A variable containing longitudes and a variable containing latitudes must each be identifiable. One way to address this is to make sure the variable names start with 'lon' and 'lat' respectively. See `cf-xarray` docs for more information." )
[docs] def check_dataframe(dfd: pd.DataFrame, no_Z: bool) -> pd.DataFrame: """Check dataframe for T, Z, lon, lat; reset indices; parse dates.""" # drop index if it is just the default range index, otherwise return to columns if ( isinstance(dfd.index, pd.core.indexes.range.RangeIndex) and dfd.index.start == 0 and dfd.index.stop == len(dfd.index) ): drop = True else: drop = False dfd = dfd.reset_index(drop=drop) # check for presence of required axis/coord information # in the future relax these requirements depending on featuretype is instead is in # catalog metadata if "T" not in dfd.cf: raise KeyError( "a column of datetimes needs to be identifiable by `cf-pandas` in dataset. One way to address this is to make sure the name of the column has the word 'time' in it." ) if "Z" not in dfd.cf and not no_Z: raise KeyError( "a column of depths (even if the same value) needs to be identifiable by `cf-pandas` in dataset. If there is no concept of depth for this dataset, you can instead set `no_Z=True`. If a depth-column is present, make sure it has 'depth' in the column name." ) if "longitude" not in dfd.cf or "latitude" not in dfd.cf: raise KeyError( "A column containing longitudes and a column containing latitudes must each be identifiable by `cf-pandas`. If they are present make sure the column name includes 'lon' and 'lat', respectively." ) dfd[dfd.cf["T"].name] = pd.to_datetime(dfd.cf["T"]) return dfd
[docs] def check_catalog( cat: Catalog, source_names: Optional[list] = None, skip_strings: Optional[list] = None, ): """Check a catalog for required keys. Parameters ---------- catalogs : Catalog Catalog object source_names : list Use these source_names instead of list(cat) if entered, for checking. skip_strings : list of strings, optional If provided, source_names in catalog will only be checked for goodness if they do not contain one of skip_strings. For example, if `skip_strings=["_base"]` then any source in the catalog whose name contains that string will be skipped. """ required_keys = { "minLongitude", "maxLongitude", "minLatitude", "maxLatitude", "minTime", "maxTime", "featuretype", "maptype", } skip_strings = skip_strings or [] if source_names is None: source_names = list(cat) for skip_string in skip_strings: source_names = [ source_name for source_name in source_names if skip_string not in source_name ] for source_name in source_names: missing_keys = set(required_keys) - set(cat[source_name].metadata.keys()) if len(missing_keys) > 0: cat_name = cat.name or cat.metadata["name"] raise KeyError( f"In catalog {cat_name} and dataset {source_name}, missing required keys {missing_keys}." ) allowed_featuretypes = [ "timeSeries", "profile", "trajectoryProfile", "timeSeriesProfile", "grid", ] future_featuretypes = ["trajectory"] if cat[source_name].metadata["featuretype"] in future_featuretypes: raise KeyError( f"featuretype {cat[source_name].metadata['featuretype']} is not available yet." ) elif cat[source_name].metadata["featuretype"] not in allowed_featuretypes: raise KeyError( f"featuretype in metadata must be one of {allowed_featuretypes} but instead is {cat[source_name].metadata['featuretype']}." ) allowed_maptypes = ["point", "line", "box"] if cat[source_name].metadata["maptype"] not in allowed_maptypes: raise KeyError( f"maptype in metadata must be one of {allowed_maptypes} but instead is {cat[source_name].metadata['maptype']}." )
[docs] def open_catalogs( catalogs: Union[str, Catalog, Sequence], paths: Optional[Paths] = None, skip_check: bool = False, skip_strings: Optional[list] = None, ) -> List[Catalog]: """Initialize catalog objects from inputs. Parameters ---------- catalogs : Union[str, Catalog, Sequence] Catalog name(s) or list of names, or catalog object or list of catalog objects. paths : Paths, optional Paths object for finding paths to use. Required if any catalog is a string referencing paths. skip_check : bool If True, do not check catalogs. Use this for testing as needed. Default is False. skip_strings : list of strings, optional If provided, source_names in catalog will only be checked for goodness if they do not contain one of skip_strings. For example, if `skip_strings=["_base"]` then any source in the catalog whose name contains that string will be skipped. Returns ------- list[Catalog] Catalogs, ready to use. """ catalogs = always_iterable(catalogs) cats = [] for catalog in catalogs: if isinstance(catalog, str): if paths is None: raise KeyError("if any catalog is a string, need to input `paths`.") cat = intake.open_catalog(paths.CAT_PATH(catalog)) elif isinstance(catalog, (Catalog, intake.readers.entry.Catalog)): cat = catalog else: raise ValueError( "Catalog(s) should be input as string paths or Catalog objects or Sequence thereof." ) if not skip_check: check_catalog(cat, skip_strings=skip_strings) cats.append(cat) return cats
[docs] def open_vocabs( vocabs: Union[str, Vocab, Sequence, Path], paths: Optional[Paths] = None ) -> Vocab: """Open vocabularies, can input mix of forms. Parameters ---------- vocabs : Union[str, Vocab, Sequence, Path] Criteria to use to map from variable to attributes describing the variable. This is to be used with a key representing what variable to search for. This input is for the name of one or more existing vocabularies which are stored in a user application cache. paths : Paths, optional Paths object for finding paths to use. Required if any input vocab is a str referencing paths. Returns ------- Vocab Single Vocab object with vocab stored in vocab.vocab """ vocab_objects = [] vocabs = always_iterable(vocabs) for vocab in vocabs: # convert to Vocab object if isinstance(vocab, str): if paths is None: raise KeyError("if any vocab is a string, need to input `paths`.") vocab = Vocab(paths.VOCAB_PATH(vocab)) elif isinstance(vocab, Path): vocab = Vocab(vocab) elif isinstance(vocab, Vocab): vocab = vocab else: raise ValueError( "Vocab(s) should be input as string, paths or Vocab objects or Sequence thereof." ) vocab_objects.append(vocab) vocab = merge(vocab_objects) return vocab
[docs] def open_vocab_labels( vocab_labels: Union[str, dict, Path], paths: Optional[Paths] = None, ) -> dict: """Open dict of vocab_labels if needed Parameters ---------- vocab_labels : Union[str, Vocab, Sequence, Path], optional Criteria to use to map from variable to attributes describing the variable. This is to be used with a key representing what variable to search for. This input is for the name of one or more existing vocabularies which are stored in a user application cache. paths : Paths, optional Paths object for finding paths to use. Returns ------- dict dict of vocab_labels for plotting """ if isinstance(vocab_labels, str): assert ( paths is not None ), "need to input `paths` to `open_vocab_labels()` if inputting string." vocab_labels = json.loads( open( pathlib.Path(paths.VOCAB_PATH(vocab_labels)).with_suffix(".json"), "r", ).read() ) elif isinstance(vocab_labels, Path): vocab_labels = json.loads(open(vocab_labels.with_suffix(".json"), "r").read()) elif isinstance(vocab_labels, dict): vocab_labels = vocab_labels else: raise ValueError("vocab_labels should be input as string, Path, or dict.") assert isinstance(vocab_labels, dict) return vocab_labels
[docs] def coords1Dto2D(dam: DataArray) -> DataArray: """expand 1D coordinates to 2D Parameters ---------- dam : DataArray Model output variable to work on. Returns ------- DataArray Model output but with 2D coordinates in place of 1D coordinates, if applicable. Otherwise same as input. """ if dam.cf["longitude"].ndim == 1: logging.info("Lon/lat coordinates are 1D and are being changed to 2D.") # need to meshgrid lon/lat lon2, lat2 = np.meshgrid(dam.cf["longitude"], dam.cf["latitude"]) lonkey, latkey = dam.cf["longitude"].name, dam.cf["latitude"].name # 2D coord names lonkey2, latkey2 = f"{lonkey}2", f"{latkey}2" # dam = dam.assign_coords({lonkey2: ((dam.cf["Y"].name, dam.cf["X"].name), lon2, dam.cf["Longitude"].attrs), # latkey2: ((dam.cf["Y"].name, dam.cf["X"].name), lat2, dam.cf["Latitude"].attrs)}) dam[lonkey2] = ( (dam.cf["Y"].name, dam.cf["X"].name), lon2, dam.cf["Longitude"].attrs, ) dam[latkey2] = ( (dam.cf["Y"].name, dam.cf["X"].name), lat2, dam.cf["Latitude"].attrs, ) # remove attributes from 1D lon/lats that are interpreted for coordinates (but not for axes) if "standard_name" in dam[lonkey].attrs: del dam[lonkey].attrs["standard_name"] if "units" in dam[lonkey].attrs: del dam[lonkey].attrs["units"] if "standard_name" in dam[latkey].attrs: del dam[latkey].attrs["standard_name"] if "units" in dam[latkey].attrs: del dam[latkey].attrs["units"] # modify coordinates if "_CoordinateAxes" in dam.attrs: dam.attrs["_CoordinateAxes"] = dam.attrs["_CoordinateAxes"].replace( lonkey, lonkey2 ) dam.attrs["_CoordinateAxes"] = dam.attrs["_CoordinateAxes"].replace( latkey, latkey2 ) elif "coordinates" in dam.encoding: dam.encoding["coordinates"] = dam.encoding["coordinates"].replace( lonkey, lonkey2 ) dam.encoding["coordinates"] = dam.encoding["coordinates"].replace( latkey, latkey2 ) return dam
[docs] def set_up_logging(verbose, paths: Paths, mode: str = "w", testing: bool = False): """set up logging""" if not testing: logging.captureWarnings(True) file_handler = logging.FileHandler(filename=paths.LOG_PATH, mode=mode) handlers: List[Union[logging.StreamHandler, logging.FileHandler]] = [file_handler] if verbose: stdout_handler = logging.StreamHandler(stream=sys.stdout) handlers.append(stdout_handler) logging.basicConfig( level=logging.INFO, format="[%(asctime)s] {%(pathname)s:%(lineno)d}\n%(levelname)s - %(message)s\n", handlers=handlers, ) # logger = logging.getLogger('OMSA log') logger = logging.getLogger(__name__) return logger
[docs] def get_mask( dsm: Dataset, varname: str, wetdry: bool = False ) -> Union[DataArray, None]: """Return mask that matches x/y coords of var. If no mask can be identified with `.filter_by_attrs(flag_meanings="land water")`, instead will make one of non-nans for 1 horizontal grid cross-section of varname. Parameters ---------- dsm : Dataset Model output varname : str Name of variable in dsm. wetdry : bool If True, selected mask must include "wetdry" in name and will use first year of time steps to get best approximation of mask. Returns ------- DataArray mask associated with varname in dsm """ logging.info("Retrieving mask") # if not varname in dsm.data_vars: # raise KeyError( # f"varname {varname} needs to be a data variable in dsm but is not found." # ) # include matching static mask if present masks = dsm.filter_by_attrs(flag_meanings="land water") # dask or something messes up the flags? just in case: if len(masks.data_vars) == 0: masks = dsm.filter_by_attrs(option_0="land") if wetdry: mask_names = [mask for mask in masks if "wetdry" in mask] masks = dsm[mask_names] if (len(masks.data_vars) > 0) and (varname in dsm.data_vars): if wetdry: mask_name = [ mask for mask in masks.data_vars if dsm[mask].encoding["coordinates"] in dsm[varname].encoding["coordinates"] or dsm[mask].cf.isel(T=0).shape == dsm[varname].shape ][0] # mask = dsm[mask_name].cf.isel(T=0) year = pd.Timestamp(dsm.cf["T"][0].values).year mask = dsm[mask_name].cf.sel(T=f"{year}").min("ocean_time").load() else: mask_name = [ mask for mask in masks.data_vars if dsm[mask].encoding["coordinates"] in dsm[varname].encoding["coordinates"] or dsm[mask].shape == dsm[varname].shape ][0] mask = dsm[mask_name] elif (len(masks.data_vars) > 0) and (varname in dsm.coords): if wetdry: mask_name = [ mask for mask in masks.data_vars if dsm[mask].cf.isel(T=0).shape == dsm[varname].shape ][0] # mask = dsm[mask_name].cf.isel(T=0) year = pd.Timestamp(dsm.cf["T"][0].values).year mask = dsm[mask_name].cf.sel(T=f"{year}").min("ocean_time").load() else: mask_name = [ mask for mask in masks.data_vars if dsm[mask].shape == dsm[varname].shape ][0] mask = dsm[mask_name] else: # want just X and Y to make mask. Just use first time and surface depth value, as needed. dam = dsm[varname] if "T" in dam.cf.axes: dam = dam.cf.isel(T=0) if "Z" in dam.cf.axes: dam = dam.cf.sel(Z=0, method="nearest") mask = dam.notnull().load().astype(int) msg = "Generated mask for model using 1 horizontal cross section of model output and searching for nans." logging.info(msg) # if dask-backed, read into memory if mask.chunks is not None: mask = mask.load() return mask
[docs] def find_bbox( ds: xr.DataArray, paths: Optional[Paths] = None, mask: Optional[DataArray] = None, dd: int = 1, alpha: int = 5, save: bool = False, ) -> tuple: """Determine bounds and boundary of model. This does not know how to handle a rectilinear 1D lon/lat model with a mask Parameters ---------- ds: DataArray xarray Dataset containing model output. paths : Paths Paths object for finding paths to use. mask : DataArray, optional Mask with 1's for active locations and 0's for masked. dd: int, optional Number to decimate model output lon/lat, as a stride. alpha: int, optional Number for alphashape to determine what counts as the convex hull. Larger number is more detailed, 1 is a good starting point. save : bool, optional Input True to save. Returns ------- List Contains the name of the longitude and latitude variables for ds, geographic bounding box of model output (`[min_lon, min_lat, max_lon, max_lat]`), low res and high res wkt representation of model boundary. Notes ----- This was originally from the package ``model_catalogs``. """ try: lon = ds.cf["longitude"].values lat = ds.cf["latitude"].values lonkey = ds.cf["longitude"].name latkey = ds.cf["latitude"].name except KeyError: if "lon_rho" in ds: lonkey = "lon_rho" latkey = "lat_rho" else: lonkey = ds.cf.coordinates["longitude"][0] # need to make sure latkey matches lonkey grid latkey = f"lat{lonkey[3:]}" # In case there are multiple grids, just take first one; # they are close enough lon = ds[lonkey].values lat = ds[latkey].values # This is structured, rectilinear # GFS, RTOFS, HYCOM if (lon.ndim == 1) and ("nele" not in ds.dims): nlon, nlat = ds[lonkey].size, ds[latkey].size lonb = np.concatenate(([lon[0]] * nlat, lon[:], [lon[-1]] * nlat, lon[::-1])) latb = np.concatenate((lat[:], [lat[-1]] * nlon, lat[::-1], [lat[0]] * nlon)) # boundary = np.vstack((lonb, latb)).T p = Polygon(zip(lonb, latb)) p0 = p.simplify(1) # Now using the more simplified version because all of these models are boxes p1 = p0 if mask is not None: raise NotImplemented else: if mask is not None: if mask.ndim == 2 and lon.ndim == 1: # # need to meshgrid lon/lat # lon, lat = np.meshgrid(lon, lat) # This shouldn't happen anymore, so make note if it does msg = "1D coordinates were found for this model but that should not be possible anymore." raise ValueError(msg) lon = lon[np.where(mask == 1)] lon = lon[~np.isnan(lon)].flatten() lat = lat[np.where(mask == 1)] lat = lat[~np.isnan(lat)].flatten() else: lon = lon.flatten() lat = lat.flatten() assertion = ( "dd and alpha need to be defined in the catalog metadata for this model." ) assert dd is not None and alpha is not None, assertion # this leads to a circular import error if read in at top level bc of other packages brought in. import alphashape lon, lat = lon[::dd], lat[::dd] pts = list(zip(lon, lat)) # need to calculate concave hull or alphashape of grid # low res, same as convex hull p0 = alphashape.alphashape(pts, 0.0) # downsample a bit to save time, still should clearly see shape of domain # import pdb; pdb.set_trace() # pts = shapely.geometry.MultiPoint(list(zip(lon, lat))) p1 = alphashape.alphashape(pts, alpha) if save: if paths is None: words = "To save the model boundary, you need to input `paths`." raise ValueError(words) with open(paths.ALPHA_PATH, "w") as text_file: text_file.write(p1.wkt) # useful things to look at: p.wkt #shapely.geometry.mapping(p) return lonkey, latkey, list(p0.bounds), p1
[docs] def shift_longitudes(dam: Union[DataArray, Dataset]) -> Union[DataArray, Dataset]: """Shift longitudes from 0 to 360 to -180 to 180 if necessary. Parameters ---------- dam : Union[DataArray,Dataset] Object with model output to check Returns ------- Union[DataArray,Dataset] Return model output with shifted longitudes, if it was necessary. """ if dam.cf["longitude"].max() > 180: lkey, xkey = dam.cf["longitude"].name, dam.cf["X"].name nlon = int((dam[lkey] >= 180).sum()) # number of longitudes to roll by dam = dam.assign_coords({lkey: (((dam[lkey] + 180) % 360) - 180)}) # rotate arrays so that the locations and values are -180 to 180 # instead of 0 to 180 to -180 to 0 dam = dam.roll({xkey: nlon}, roll_coords=True) logging.warning( "Longitudes are being shifted because they look like they are not -180 to 180." ) return dam
[docs] def kwargs_search_from_model( kwargs_search: Dict[str, Union[str, float]], paths: Paths ) -> dict: """Adds spatial and/or temporal range from model output to dict. Examines model output and uses the bounding box of the model as the search spatial range if needed, and the time range of the model as the search time search if needed. They are added into `kwargs_search` and the dict is returned. Parameters ---------- kwargs_search : dict Keyword arguments to input to search on the server before making the catalog. paths : Paths Paths object for finding paths to use. Returns ------- dict `kwargs_search` but with modifications if relevant. Raises ------ KeyError If all of `max_lon`, `min_lon`, `max_lat`, `min_lat` and `min_time`, `max_time` are already specified along with `model_name`. """ # if model_name input, use it to select the search kwargs if "model_name" in kwargs_search: if kwargs_search.keys() >= { "max_lon", "min_lon", "min_lat", "max_lat", "min_time", "max_time", }: raise KeyError( "Can input `model_name` to `kwargs_search` to determine the spatial and/or temporal search box OR specify `max_lon`, `min_lon`, `max_lat`, `min_lat` and `min_time`, `max_time`. Can also do a combination of the two." ) # read in model output if isinstance(kwargs_search["model_name"], str): model_cat = intake.open_catalog(paths.CAT_PATH(kwargs_search["model_name"])) elif isinstance(kwargs_search["model_name"], Catalog): model_cat = kwargs_search["model_name"] else: raise ValueError( "model_name should be input as string path or Catalog object." ) dsm = model_cat[list(model_cat)[0]].to_dask() kwargs_search.pop("model_name") kwargs_search.pop("project_name") # if none of these present, read from model output if kwargs_search.keys().isdisjoint( { "max_lon", "min_lon", "min_lat", "max_lat", } ): min_lon, max_lon = float( dsm[dsm.cf.coordinates["longitude"][0]].min() ), float(dsm[dsm.cf.coordinates["longitude"][0]].max()) min_lat, max_lat = float( dsm[dsm.cf.coordinates["latitude"][0]].min() ), float(dsm[dsm.cf.coordinates["latitude"][0]].max()) if abs(min_lon) > 180 or abs(max_lon) > 180: min_lon -= 360 max_lon -= 360 kwargs_search.update( { "min_lon": min_lon, "max_lon": max_lon, "min_lat": min_lat, "max_lat": max_lat, } ) if kwargs_search.keys().isdisjoint( { "max_time", "min_time", } ): min_time, max_time = str(dsm.cf["T"].min().values), str( dsm.cf["T"].max().values ) kwargs_search.update( { "min_time": min_time, "max_time": max_time, } ) return kwargs_search
[docs] def calculate_anomaly( dd_in: Union[pd.Series, pd.DataFrame, xr.DataArray], monthly_mean, varname=None, ) -> pd.Series: """Given monthly mean that is indexed by month of year, subtract it from time series to get anomaly. Should work with both pd.Series/pd.DataFrame and xr. DataArray. Assume that variable in monthly_mean is the same as in the input time series. The way it works for DataArrays is by changing it to a DataFrame. Assumes this is a time series. Returns dd as the type as DataFrame it is came in as Series and Dataset if it came in DataArray. It is pd.Series in the middle so this probably won't work well for datasets that are more complex than time series. """ if varname is None: varname = dd_in.name else: varname = dd_in.cf[ varname ].name # translate from key_variable alias to actual variable name varname_mean = f"{varname}_mean" varname_anomaly = f"{varname}_anomaly" # if monthly_mean is None: # monthly_mean = dd[varname].groupby(dd.cf["T"].dt.month).mean() # in_type = type(dd) # if isinstance(dd, xr.DataArray): # dd = dd.squeeze().to_dataframe() if isinstance(dd_in, pd.Series): dd_in = dd_in.to_frame() # this changes dd into a DataFrame # import pdb; pdb.set_trace() dd = pd.DataFrame() dd["time"] = dd_in.cf["T"].values # dd["time"] = dd_in.index.values # save times dd = dd.set_index(dd["time"].dt.month) dd[varname_mean] = monthly_mean dd = dd.set_index(dd["time"].name) # this shifts the mean for the first and last month so they are a bit off since they aren't interpolated # using the month before and month after, but the middle months are good. # generally this sets the mean to the 15th of the month rather than the beginning or end inan = (dd.index.day != 15) * (dd.index > dd.index[0]) * (dd.index < dd.index[-1]) dd.loc[inan, varname_mean] = pd.NA inan = dd[varname_mean] == dd[varname_mean].shift(1) dd.loc[inan, varname_mean] = pd.NA dd[varname_mean] = dd[varname_mean].interpolate() dd[varname_anomaly] = dd_in[varname].squeeze() - dd[varname_mean] # return in original container if isinstance(dd_in, (xr.DataArray, xr.Dataset)): dd_out = xr.DataArray( coords={dd_in.cf["T"].name: dd.index.values}, data=dd[varname_anomaly].values, ).broadcast_like(dd_in[varname]) if len(dd_in[varname].coords) > len(dd_out.coords): coordstoadd = list(set(dd_in[varname].coords) - set(dd_out.coords)) for coord in coordstoadd: dd_out[coord] = dd_in[varname][coord] dd_out.attrs = dd_in[varname].attrs dd_out.name = dd_in[varname].name elif isinstance(dd_in, (pd.Series, pd.DataFrame)): dd_out = pd.DataFrame() for key in ["T", "Z", "latitude", "longitude"]: dd_out[dd_in.cf[key].name] = dd_in.cf[key] dd_out[varname_anomaly] = dd[varname_anomaly] return dd_out
[docs] def calculate_distance(lons, lats): """Calculate distance (km), esp for transects.""" G = pyproj.Geod(ellps="WGS84") distance = G.inv( lons[:-1], lats[:-1], lons[1:], lats[1:], )[2] distance = np.hstack((np.array([0]), distance)) distance = distance.cumsum() / 1000 # km return distance