"""Response Guided Dimensionality Reduction."""
import warnings
from os import linesep
from typing import Optional
from typing import TypeVar
from typing import Union
import matplotlib.pyplot as plt
import numpy as np
import xarray as xr
from matplotlib.collections import QuadMesh
from scipy.stats import pearsonr as _pearsonr
from sklearn.cluster import DBSCAN
from . import utils
[docs]
SURFACE_AREA_EARTH_KM2 = 5.10072e8
[docs]
XrType = TypeVar("XrType", xr.DataArray, xr.Dataset)
[docs]
def spherical_area(latitude: float, dlat: float, dlon: Optional[float] = None) -> float:
"""Approximate the area of a square grid cell on a spherical (!) earth.
Returns the area in square kilometers of earth surface.
Args:
latitude (float): Latitude at the center of the grid cell (deg)
dlat (float): Latitude grid resolution (deg)
dlon (float): Longitude grid resolution (deg), optional in case of a square grid.
Returns:
float: Area of the grid cell (km^2)
"""
if dlon is None:
dlon = dlat
dlon = np.radians(dlon)
dlat = np.radians(dlat)
lat = np.radians(latitude)
h = np.sin(lat + dlat / 2) - np.sin(lat - dlat / 2)
spherical_area = h * dlon / (np.pi * 4)
return spherical_area * SURFACE_AREA_EARTH_KM2
[docs]
def cluster_area(ds: XrType, cluster_label: float) -> float:
"""Determine the total area of a cluster.
Requires the input dataset to have the variables `area` and `cluster_labels`.
Args:
ds (xr.Dataset or xr.DataArray): Dataset/DataArray containing the variables
`area` and `cluster_labels`.
cluster_label (float): The label (as float) for which the area should be
calculated.
Returns:
float: Area of the cluster `cluster_label`.
"""
return (
ds["area"]
.where(ds["cluster_labels"] == cluster_label)
.sum(skipna=True)
.values.item()
)
[docs]
def remove_small_area_clusters(ds: XrType, min_area_km2: float) -> XrType:
"""Remove the clusters where the area is under the input threshold.
Args:
ds (xr.DataArray, xr.Dataset): Dataset containing `cluster_labels` and `area`.
min_area_km2 (float): The minimum allowed area of each cluster
Returns:
xr.DataArray, xr.Dataset: The input dataset with the labels of the clusters set
to 0 when the area of the cluster is under the `min_area_km2` threshold.
"""
clusters = np.unique(ds["cluster_labels"])
areas = [cluster_area(ds, c) for c in clusters]
valid_clusters = np.array([c for c, a in zip(clusters, areas) if a > min_area_km2])
ds["cluster_labels"] = ds["cluster_labels"].where(
np.isin(ds["cluster_labels"], valid_clusters), 0
)
return ds
[docs]
def add_gridcell_area(data: xr.DataArray):
"""Add the area of each gridcell (latitude) in km2.
Note: Assumes an even grid (in degrees)
Args:
data: Data containing lat, lon coordinates in degrees.
Returns:
Input data with an added coordinate "area".
"""
dlat = np.abs(data.latitude.values[1] - data.latitude.values[0])
dlon = np.abs(data.longitude.values[1] - data.longitude.values[0])
data["area"] = spherical_area(data.latitude, dlat, dlon)
return data
[docs]
def assert_clusters_present(data: xr.DataArray) -> None:
"""Assert that any (non-'0') clusters are present in the data."""
if np.unique(data.cluster_labels).size == 1:
warnings.warn(
"No significant clusters found in the input DataArray", stacklevel=2
)
[docs]
def _get_dbscan_clusters(
data: xr.Dataset, coords: np.ndarray, dbscan_params: dict
) -> np.ndarray:
"""Generate the DBSCAN cluster labels based on the correlation and p-value.
Args:
data: DataArray of the precursor field, of only a single
i_interval. Requires the 'latitude' and 'longitude' dimensions to be stacked
into a "coords" dimension.
coords: 2-D array containing the coordinates of each (lat, lon) grid
point, in radians.
dbscan_params: Dictionary containing the elements 'alpha', 'eps',
'min_area_km2'. See the documentation of RGDR for more information.
Returns:
np.ndarray: 1-D array of the same length as `coords`, containing cluster labels
for every coordinate.
"""
labels = np.zeros(len(coords), dtype=int)
for sign, sign_mask in zip([1, -1], [data["corr"] >= 0, data["corr"] < 0]):
mask = np.logical_and(data["p_val"] < dbscan_params["alpha"], sign_mask)
if np.sum(mask) > 0: # Check if the mask contains any points to cluster
db = DBSCAN(
eps=dbscan_params["eps"] / RADIUS_EARTH_KM,
min_samples=1,
algorithm="auto",
metric="haversine",
).fit(coords[mask])
labels[mask] = sign * (db.labels_ + 1)
return labels
[docs]
def _find_clusters(
precursor: xr.DataArray,
corr: xr.DataArray,
p_val: xr.DataArray,
dbscan_params: dict,
) -> xr.DataArray:
"""Compute clusters and adds their labels to the precursor dataset.
For clustering the DBSCAN algorithm is used, with a Haversine distance metric.
Args:
precursor (xr.DataArray): DataArray of the precursor field, containing
'latitude' and 'longitude' dimensions in degrees.
corr (xr.DataArray): DataArray with the correlation values, generated by
correlation_map()
p_val (xr.DataArray): DataArray with the p-values, generated by
correlation_map()
dbscan_params (dict): Dictionary containing the elements 'alpha', 'eps',
'min_area_km2'. See the documentation of RGDR for more information.
Returns:
xr.DataArray: The input precursor data, with as extra coordinate labelled
clusters.
"""
data = precursor.to_dataset()
data["corr"], data["p_val"] = corr, p_val # Will require less tracking of indices
data = data.stack(coord=["latitude", "longitude"])
coords = np.asarray(data["coord"].values.tolist())
coords = np.radians(coords)
labels = _get_dbscan_clusters(data, coords, dbscan_params)
precursor = precursor.stack(coord=["latitude", "longitude"])
precursor["cluster_labels"] = ("coord", labels)
precursor["cluster_labels"] = precursor["cluster_labels"].astype("int16")
precursor = precursor.unstack("coord")
return precursor
[docs]
def masked_spherical_dbscan(
precursor: xr.DataArray,
corr: xr.DataArray,
p_val: xr.DataArray,
dbscan_params: dict,
) -> xr.DataArray:
"""Determine the clusters based on sklearn's DBSCAN implementation.
Alpha determines the mask based on the minimum p_value. Grouping can be
adjusted using the `eps_km` parameter. Cluster labels are negative for
areas with a negative correlation coefficient and positive for areas with
a positive correlation coefficient. Areas without any significant correlation
are put in the cluster labelled '0'.
Args:
precursor (xr.DataArray): DataArray of the precursor field, containing
'latitude' and 'longitude' dimensions in degrees.
corr (xr.DataArray): DataArray with the correlation values, generated by
correlation_map()
p_val (xr.DataArray): DataArray with the p-values, generated by
correlation_map()
dbscan_params (dict): Dictionary containing the elements 'alpha', 'eps',
'min_area_km2'. See the documentation of RGDR for more information.
Returns:
xr.DataArray: Precursor data grouped by the DBSCAN clusters.
"""
precursor = add_gridcell_area(precursor)
precursor = _find_clusters(precursor, corr, p_val, dbscan_params)
if dbscan_params["min_area"] is not None:
precursor = remove_small_area_clusters(precursor, dbscan_params["min_area"])
assert_clusters_present(precursor)
return precursor
[docs]
def _pearsonr_nan(x: np.ndarray, y: np.ndarray) -> tuple[float, float]:
"""NaN friendly implementation of scipy.stats.pearsonr.
Calculates the correlation coefficient between two arrays, as well as the p-value
of this correlation. However, instead of raising an error when encountering NaN
values, this function will return both the correlation coefficient and the p-value as NaN.
Args:
x: 1-D array
y: 1-D array
Returns:
r_coefficient
p_value
"""
if np.any(np.isnan(x)) or np.any(np.isnan(y)):
return np.nan, np.nan
return _pearsonr(x, y)
[docs]
def correlation(
field: xr.DataArray, target: xr.DataArray, corr_dim: str = "time"
) -> tuple[xr.DataArray, xr.DataArray]:
"""Calculate correlation maps.
Args:
field: Spatial data with a dimension named `corr_dim`, over which each
location should have the Pearson correlation coefficient calculated with the
target data.
target: Data which has to be correlated with the spatial data. Requires a
dimension named `corr_dim`.
corr_dim: Dimension over which the correlation coefficient should be calculated.
Returns:
r_coefficient: DataArray filled with the correlation coefficient for each
non-`corr_dim` coordinate.
p_value: DataArray filled with the two-tailed p-values for each computed
correlation coefficient.
"""
assert (
corr_dim in target.dims
), f"input target does not have contain the '{corr_dim}' dimension"
assert (
corr_dim in field.dims
), f"input field does not have contain the '{corr_dim}' dimension"
assert np.all(
[dim in field.dims for dim in target.dims]
), "Field and target dims do not match"
return xr.apply_ufunc(
_pearsonr_nan,
field,
target,
input_core_dims=[[corr_dim], [corr_dim]],
vectorize=True,
output_core_dims=[[], []],
)
[docs]
def partial_correlation(field, target, z):
"""Calculate partial correlation maps."""
raise NotImplementedError
[docs]
def regression(field, target):
"""Regression analysis on entire maps.
Methods include Linear, Ridge, Lasso.
"""
raise NotImplementedError
[docs]
class RGDR:
"""Response Guided Dimensionality Reduction."""
def __init__( # noqa: PLR0913 (too-many-arguments)
self,
target_intervals: Union[int, list[int]],
lag: int,
eps_km: float,
alpha: float,
min_area_km2: Optional[float] = None,
) -> None:
"""Response Guided Dimensionality Reduction (RGDR).
Dimensionality reduction based on the correlation between precursor field and
target timeseries.
Args:
target_intervals: The target interval indices which should be correlated
with the precursors. Input in the form of "[1, 2, 3]" or "1"
The precursor intervals will be determined based on the `lag` kwarg.
lag: The lag between the precursor and target intervals to compute the
correlation, akin to lag in cross-correlation. E.g. if the target
intervals are [1, 2], and lag is 2, the precursor intervals will be
[-2, -1]
alpha (float): p-value below which the correlation is considered significant
enough for a location to be included in a cluster.
eps_km (float): The maximum distance (in km) between two grid cells for them
to be considered to be in the same cluster. This is not a maximum bound
on the distances between grid cells within a cluster.
The minimum appropriate value is equal to the maximum width/height of a
grid cell (i.e. the width of the grid cell closest to the equator).
The upper bound depends on when cells or clusters would still be
considered to be part of the same climate signal (i.e., precursor region).
Higher values can lead to fewer clusters, but also clusters in which the
cells of the same cluster are separated by large geographical distances.
min_area_km2 (float): The minimum area of a cluster (in square km). Clusters
smaller than this minimum area will be discarded.
Attributes:
corr_map: correlation coefficient map of given precursor field and
target series.
pval_map: p-values map of correlation
cluster_map: cluster labels for precursor field masked by p-values
"""
self._lag = lag
self._dbscan_params = {"eps": eps_km, "alpha": alpha, "min_area": min_area_km2}
self._target_intervals = (
[target_intervals]
if isinstance(target_intervals, int)
else target_intervals
)
self._precursor_intervals = utils.intervals_subtract(
self._target_intervals, lag
)
self._corr_map: Union[None, xr.DataArray] = None
self._pval_map: Union[None, xr.DataArray] = None
self._cluster_map: Union[None, xr.DataArray] = None
self._area = None
@property
[docs]
def target_intervals(self) -> list[int]:
"""Return target intervals."""
return self._target_intervals
@property
[docs]
def precursor_intervals(self) -> list[int]:
"""Return precursor intervals."""
return self._precursor_intervals
@property
[docs]
def cluster_map(self) -> xr.DataArray:
"""Return cluster map."""
if self._cluster_map is None:
raise ValueError(
"No cluster map exists yet, .fit() has to be called first."
)
return self._cluster_map
@property
[docs]
def pval_map(self) -> xr.DataArray:
"""Return p-value map."""
if self._pval_map is None:
raise ValueError(
"No p-value map exists yet, .fit() has to be called first."
)
return self._pval_map
@property
[docs]
def corr_map(self) -> xr.DataArray:
"""Return correlation map."""
if self._corr_map is None:
raise ValueError(
"No correlation map exists yet, .fit() has to be called first."
)
return self._corr_map
[docs]
def get_correlation(
self,
precursor: xr.DataArray,
target: xr.DataArray,
) -> tuple[xr.DataArray, xr.DataArray]:
"""Calculate the correlation and p-value between input precursor and target.
Args:
precursor: Precursor field data with the dimensions
'latitude', 'longitude', and 'anchor_year'
target: Timeseries data with only the dimension 'anchor_year'
Returns:
(correlation, p_value): DataArrays containing the correlation and p-value.
"""
if not isinstance(precursor, xr.DataArray):
raise ValueError("Please provide an xr.DataArray, not a dataset")
p, t = stack_input_data(
precursor, target, self._precursor_intervals, self._target_intervals
)
return correlation(p, t, corr_dim="anch_int")
[docs]
def get_clusters(
self,
precursor: xr.DataArray,
target: xr.DataArray,
) -> xr.DataArray:
"""Generate clusters for the precursor data.
Args:
precursor: Precursor field data with the dimensions
'latitude', 'longitude', 'anchor_year', and 'i_interval'
target: Target timeseries data with only the dimensions 'anchor_year' and
'i_interval'
Returns:
DataArray containing the clusters as masks.
"""
corr, p_val = self.get_correlation(precursor, target)
return masked_spherical_dbscan(precursor, corr, p_val, self._dbscan_params)
[docs]
def preview_correlation( # noqa: PLR0913 (too-many-arguments)
self,
precursor: xr.DataArray,
target: xr.DataArray,
add_alpha_hatch: bool = True,
ax1: Optional[plt.Axes] = None,
ax2: Optional[plt.Axes] = None,
) -> list[QuadMesh]:
"""Preview correlation and p-value results with given inputs.
Generate a figure showing the correlation and p-value results with the
initiated RGDR class and input precursor field.
Args:
precursor: Precursor field data with the dimensions
'latitude', 'longitude', 'anchor_year', and 'i_interval'
target: Target timeseries data with only the dimensions 'anchor_year' and
'i_interval'
add_alpha_hatch: Adds a red hatching when the p-value is lower than the
RGDR's 'alpha' value.
ax1: a matplotlib axis handle to plot the correlation values into.
If None, an axis handle will be created instead.
ax2: a matplotlib axis handle to plot the p-values into. If None, an axis
handle will be created instead.
Returns:
List of matplotlib QuadMesh artists.
"""
corr, p_val = self.get_correlation(precursor, target)
if (ax1 is None) and (ax2 is None):
_, (ax1, ax2) = plt.subplots(ncols=2)
elif (ax1 is None) or (ax2 is None):
raise ValueError(
"Either pass axis handles for both ax1 and ax2, or pass neither."
)
plot1 = corr.plot.pcolormesh(ax=ax1, cmap="coolwarm") # type: ignore
plot2 = p_val.plot.pcolormesh(ax=ax2, cmap="viridis_r", vmin=0, vmax=1) # type: ignore
if add_alpha_hatch:
coords = plot2.get_coordinates()
plt.rcParams["hatch.color"] = "r"
plt.pcolor(
coords[:, :, 0],
coords[:, :, 1],
p_val.where(p_val < self._dbscan_params["alpha"]).values,
hatch="x",
alpha=0.0,
)
ax1.set_title("correlation")
ax2.set_title("p-value")
return [plot1, plot2]
[docs]
def preview_clusters(
self,
precursor: xr.DataArray,
target: xr.DataArray,
ax: Optional[plt.Axes] = None,
**kwargs,
) -> QuadMesh:
"""Preview clusters.
Generates a figure showing the clusters resulting from the initiated RGDR
class and input precursor field.
Args:
precursor: Precursor field data with the dimensions
'latitude', 'longitude', 'anchor_year', and 'i_interval'
target: Target timeseries data with only the dimensions 'anchor_year' and
'i_interval'
ax (plt.Axes, optional): a matplotlib axis handle to plot the clusters
into. If None, an axis handle will be created instead.
**kwargs: Keyword arguments that should be passed to QuadMesh.
Returns:
Matplotlib QuadMesh artist.
"""
if ax is None:
_, ax = plt.subplots()
clusters = self.get_clusters(precursor, target)
return clusters["cluster_labels"].plot(cmap="viridis", ax=ax, **kwargs) # type: ignore
[docs]
def fit(self, precursor: xr.DataArray, target: xr.DataArray):
"""Fit RGDR clusters to precursor data.
Performs DBSCAN clustering on a prepared DataArray, and then groups the data by
their determined clusters, using an weighted mean. The weight is based on the
area of each grid cell.
Density-Based Spatial Clustering of Applications with Noise (DBSCAN) clusters
gridcells together which are of the same sign and in proximity to
each other using DBSCAN.
Clusters labelled with a positive value represent a positive correlation with
the target timeseries, the clusters labelled with a negative value represent a
negative correlation. All locations not in a cluster are grouped together under
the label '0'.
Args:
precursor: Precursor field data with the dimensions
'latitude', 'longitude', 'anchor_year', and 'i_interval'
target: Target timeseries data with only the dimensions 'anchor_year' and
'i_interval', which will be correlated with the precursor field.
Returns:
xr.DataArray: The precursor data, with the latitute and longitude dimensions
reduced to clusters.
"""
corr, p_val = self.get_correlation(precursor, target)
masked_data = masked_spherical_dbscan(
precursor, corr, p_val, self._dbscan_params
)
self._corr_map = corr
self._pval_map = p_val
self._cluster_map = masked_data.cluster_labels
self._area = masked_data.area
[docs]
def __repr__(self) -> str:
"""Represent the RGDR transformer with strings."""
props = [
("target_intervals", repr(self.target_intervals)),
("lag", repr(self._lag)),
("eps_km", repr(self._dbscan_params["eps"])),
("alpha", repr(self._dbscan_params["alpha"])),
("min_area_km2", repr(self._dbscan_params["min_area"])),
]
propstr = f"{linesep}\t" + f",{linesep}\t".join(
[f"{k}={v}" for k, v in props]
) # sourcery skip: use-fstring-for-concatenation
return f"{self.__class__.__name__}({propstr}{linesep})".replace("\t", " ")