diff --git a/src/cchdo/hydro/core.py b/src/cchdo/hydro/core.py index 8492616..498ce21 100644 --- a/src/cchdo/hydro/core.py +++ b/src/cchdo/hydro/core.py @@ -1,6 +1,7 @@ """Core operations on a CCHDO CF/netCDF file.""" from collections.abc import Hashable +from typing import Any import numpy as np import numpy.typing as npt @@ -20,7 +21,8 @@ ExchangeFlag, ExchangeSampleFlag, ) -from cchdo.hydro.types import FileType +from cchdo.hydro.rename import rename_with_bookkeeping +from cchdo.hydro.types import FileType, PrecisionSourceType from cchdo.hydro.utils import ( add_cdom_coordinate, add_geometry_var, @@ -40,6 +42,27 @@ } +def _filter_nones(dict_: dict[Any, Any]) -> dict[Any, Any]: + return {k: v for k, v in dict_.items() if v is not None} + + +def var_to_whpnames(var: xr.Variable) -> list[WHPName]: + params: list[WHPName] = [] + param = var.attrs.get("whp_name") + unit = var.attrs.get("whp_unit") + if isinstance(param, list): + params.extend(WHPNames[(combined, unit)] for combined in param) + else: + try: + error = WHPNames[(param, unit)] + if error.error_col: + return [] + except KeyError: + pass + params.append(WHPNames[(param, unit)]) + return params + + def dataarray_factory( param: WHPName, ctype="data", @@ -290,6 +313,73 @@ def add_param( return _ds +def change_params( + ds: xr.Dataset, + params: dict[WHPName | str, WHPName | str], + *, + precision_source: PrecisionSourceType | None = None, +) -> xr.Dataset: + """Change the parameter identity of variables""" + whp_params: dict[WHPName, WHPName] = {} + for key, value in params.items(): + key_ = WHPNames[key] + value_ = WHPNames[value] + if key_.scope != value_.scope: + raise ValueError( + f"From and to params must have the same scope: {key_} and {value_} have scopes {key_.scope} and {value_.scope}" + ) + if key_.flag_w != value_.flag_w: + raise NotImplementedError( + "Cannot change ctd to discrete types (or reversed) yet" + ) + + whp_params[key_] = value_ + + # TODO: this list needs to be deterministic or somehow guaranteed by the CCHDO data model + # There is interplay between hydro and the params module + # We do want to pass though basically anything _unknown_. + retain_attrs = { + "C_format": None, + "C_format_source": None, + "ancillary_variables": None, + } + drop_attrs = { + "whp_name": None, + "standard_name": None, + "units": None, + "whp_units": None, + } + + rename_vars = {} + new_attrs = {} + for old, new in whp_params.items(): + # TODO make a get_by_whpname method on cchdo accessor? + if old.full_nc_name not in ds: + continue + old_var = ds[old.full_nc_name] + existing_attrs = _filter_nones({**old_var.attrs, **drop_attrs}) + incoming_attrs = _filter_nones({**new.get_nc_attrs(), **retain_attrs}) + updated_attrs = {**existing_attrs, **incoming_attrs} + rename_vars[old.full_nc_name] = new.full_nc_name + new_attrs[old.full_nc_name] = updated_attrs + + # I just cannot seem to think of a better/generic way of doing this... + if old.nc_name_error in old_var.attrs.get("ancillary_variables", ""): + pass + if old.nc_name_flag in old_var.attrs.get("ancillary_variables", ""): + rename_vars[old.nc_name_flag] = new.nc_name_flag + # TODO: Flags attrs... + + _ds = ds.copy() + for varname, attrs in new_attrs.items(): + _ds[varname].attrs = attrs + + _ds = rename_with_bookkeeping(_ds, rename_vars, ["ancillary_variables"]) + + check_ancillary_variables(_ds) + return _ds + + def add_profile_level(ds: xr.Dataset, idx, levels) -> xr.Dataset: return ds diff --git a/src/cchdo/hydro/tests/test_core_ops.py b/src/cchdo/hydro/tests/test_core_ops.py index b246fac..222cfdc 100644 --- a/src/cchdo/hydro/tests/test_core_ops.py +++ b/src/cchdo/hydro/tests/test_core_ops.py @@ -350,3 +350,34 @@ def test_add_param_cdom(): testing_ds_param = core.add_param(ds_param_325, WHPNames["CDOM300 [/METER]"]) xr.testing.assert_identical(ds_param_cdom, testing_ds_param) + + +def test_change_param(): + params = ( + "PH", + "PH_FLAG_W", + "PH_TMP", + ) + new_params = ( + "PH_TOT", + "PH_TOT_FLAG_W", + "PH_TMP", + ) + units = ("", "", "DEG C") + data = ( + "7.1234", + "2", + "20.0", + ) + ds = read_exchange( + io.BytesIO(simple_bottle_exchange(params=params, units=units, data=data)), + precision_source="database", + ) + ds_expected = read_exchange( + io.BytesIO(simple_bottle_exchange(params=new_params, units=units, data=data)), + precision_source="database", + ) + + result = core.change_params(ds, {"PH": "PH_TOT"}) + + xr.testing.assert_identical(result, ds_expected)