Source code for improver.ensemble_copula_coupling.ensemble_copula_coupling

# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# (C) British Crown Copyright 2017-2019 Met Office.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
#   list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
#   this list of conditions and the following disclaimer in the documentation
#   and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
#   contributors may be used to endorse or promote products derived from
#   this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
This module defines the plugins required for Ensemble Copula Coupling.

"""
import warnings

import iris
import numpy as np
from iris.exceptions import CoordinateNotFoundError, InvalidCubeError
from scipy import stats

from improver import BasePlugin
from improver.ensemble_calibration.utilities import convert_cube_data_to_2d
from improver.ensemble_copula_coupling.utilities import (
    choose_set_of_percentiles, concatenate_2d_array_with_2d_array_endpoints,
    create_cube_with_percentiles, get_bounds_of_distribution,
    insert_lower_and_upper_endpoint_to_1d_array,
    restore_non_probabilistic_dimensions)
from improver.metadata.probabilistic import (
    find_percentile_coordinate, find_threshold_coordinate)
from improver.utilities.cube_checker import (
    check_cube_coordinates, check_for_x_and_y_axes)
from improver.utilities.cube_manipulation import (
    concatenate_cubes, enforce_coordinate_ordering)
from improver.utilities.indexing_operations import choose


[docs]class RebadgePercentilesAsRealizations(BasePlugin): """ Class to rebadge percentiles as ensemble realizations. This will allow the quantisation to percentiles to be completed, without a subsequent EnsembleReordering step to restore spatial correlations, if required. """
[docs] def __init__(self): """ Initialise the class. """ pass
[docs] @staticmethod def process(cube, ensemble_realization_numbers=None): """ Rebadge percentiles as ensemble realizations. The ensemble realization numbering will depend upon the number of percentiles in the input cube i.e. 0, 1, 2, 3, ..., n-1, if there are n percentiles. Args: cube (iris.cube.Cube): Cube containing a percentile coordinate, which will be rebadged as ensemble realization. ensemble_realization_numbers (numpy.ndarray): An array containing the ensemble numbers required in the output realization coordinate. Default is None, meaning the realization coordinate will be numbered 0, 1, 2 ... n-1 for n percentiles on the input cube. Raises: InvalidCubeError: If the realization coordinate already exists on the cube. """ percentile_coord_name = ( find_percentile_coordinate(cube).name()) if ensemble_realization_numbers is None: ensemble_realization_numbers = ( np.arange( len(cube.coord(percentile_coord_name).points), dtype=np.int32)) cube.coord(percentile_coord_name).points = ( ensemble_realization_numbers) # we can't rebadge if the realization coordinate already exists: try: realization_coord = cube.coord('realization') except CoordinateNotFoundError: realization_coord = None if realization_coord: raise InvalidCubeError( "Cannot rebadge percentile coordinate to realization " "coordinate because a realization coordinate already exists.") cube.coord(percentile_coord_name).rename("realization") cube.coord("realization").units = "1" cube.coord("realization").points = ( cube.coord("realization").points.astype(np.int32)) return cube
[docs]class ResamplePercentiles(BasePlugin): """ Class for resampling percentiles from an existing set of percentiles. In combination with the Ensemble Reordering plugin, this is a variant of Ensemble Copula Coupling. This class includes the ability to linearly interpolate from an input set of percentiles to a different output set of percentiles. """
[docs] def __init__(self, ecc_bounds_warning=False): """ Initialise the class. Args: ecc_bounds_warning (bool): If true and ECC bounds are exceeded by the percentile values, a warning will be generated rather than an exception. Default value is FALSE. """ self.ecc_bounds_warning = ecc_bounds_warning
[docs] def _add_bounds_to_percentiles_and_forecast_at_percentiles( self, percentiles, forecast_at_percentiles, bounds_pairing): """ Padding of the lower and upper bounds of the percentiles for a given phenomenon, and padding of forecast values using the constant lower and upper bounds. Args: percentiles (numpy.ndarray): Array of percentiles from a Cumulative Distribution Function. forecast_at_percentiles (numpy.ndarray): Array containing the underlying forecast values at each percentile. bounds_pairing (tuple): Lower and upper bound to be used as the ends of the cumulative distribution function. Raises: ValueError: If the percentile points are outside the ECC bounds and self.ecc_bounds_warning is False. ValueError: If the percentiles are not in ascending order. Warns: Warning: If the percentile points are outside the ECC bounds and self.ecc_bounds_warning is True. """ lower_bound, upper_bound = bounds_pairing percentiles = insert_lower_and_upper_endpoint_to_1d_array( percentiles, 0, 100) forecast_at_percentiles_with_endpoints = \ concatenate_2d_array_with_2d_array_endpoints( forecast_at_percentiles, lower_bound, upper_bound) if np.any(np.diff(forecast_at_percentiles_with_endpoints) < 0): msg = ("The end points added to the forecast at percentile " "values representing each percentile must result in " "an ascending order. " "In this case, the forecast at percentile values {} " "is outside the allowable range given by the " "bounds {}".format(forecast_at_percentiles, bounds_pairing)) if self.ecc_bounds_warning: warn_msg = msg + (" The percentile values that have " "exceeded the existing bounds will be used " "as new bounds.") warnings.warn(warn_msg) if upper_bound < forecast_at_percentiles_with_endpoints.max(): upper_bound = forecast_at_percentiles_with_endpoints.max() if lower_bound > forecast_at_percentiles_with_endpoints.min(): lower_bound = forecast_at_percentiles_with_endpoints.min() forecast_at_percentiles_with_endpoints = \ concatenate_2d_array_with_2d_array_endpoints( forecast_at_percentiles, lower_bound, upper_bound) else: raise ValueError(msg) if np.any(np.diff(percentiles) < 0): msg = ("The percentiles must be in ascending order." "The input percentiles were {}".format(percentiles)) raise ValueError(msg) return percentiles, forecast_at_percentiles_with_endpoints
[docs] def _interpolate_percentiles( self, forecast_at_percentiles, desired_percentiles, bounds_pairing, percentile_coord_name): """ Interpolation of forecast for a set of percentiles from an initial set of percentiles to a new set of percentiles. This is constructed by linearly interpolating between the original set of percentiles to a new set of percentiles. Args: forecast_at_percentiles (iris.cube.Cube): Cube containing a percentile coordinate. desired_percentiles (numpy.ndarray): Array of the desired percentiles. bounds_pairing (tuple): Lower and upper bound to be used as the ends of the cumulative distribution function. percentile_coord_name (str): Name of required percentile coordinate. Returns: iris.cube.Cube: Cube containing values for the required diagnostic e.g. air_temperature at the required percentiles. """ original_percentiles = ( forecast_at_percentiles.coord(percentile_coord_name).points) # Ensure that the percentile dimension is first, so that the # conversion to a 2d array produces data in the desired order. enforce_coordinate_ordering( forecast_at_percentiles, percentile_coord_name) forecast_at_reshaped_percentiles = convert_cube_data_to_2d( forecast_at_percentiles, coord=percentile_coord_name) original_percentiles, forecast_at_reshaped_percentiles = ( self._add_bounds_to_percentiles_and_forecast_at_percentiles( original_percentiles, forecast_at_reshaped_percentiles, bounds_pairing)) forecast_at_interpolated_percentiles = ( np.empty( (len(desired_percentiles), forecast_at_reshaped_percentiles.shape[0]), dtype=np.float32 ) ) for index in range(forecast_at_reshaped_percentiles.shape[0]): forecast_at_interpolated_percentiles[:, index] = np.interp( desired_percentiles, original_percentiles, forecast_at_reshaped_percentiles[index, :]) # Reshape forecast_at_percentiles, so the percentiles dimension is # first, and any other dimension coordinates follow. forecast_at_percentiles_data = ( restore_non_probabilistic_dimensions( forecast_at_interpolated_percentiles, forecast_at_percentiles, percentile_coord_name, len(desired_percentiles))) for template_cube in forecast_at_percentiles.slices_over( percentile_coord_name): template_cube.remove_coord(percentile_coord_name) break percentile_cube = create_cube_with_percentiles( desired_percentiles, template_cube, forecast_at_percentiles_data,) return percentile_cube
[docs] def process(self, forecast_at_percentiles, no_of_percentiles=None, sampling="quantile"): """ 1. Creates a list of percentiles. 2. Accesses the lower and upper bound pair of the forecast values, in order to specify lower and upper bounds for the percentiles. 3. Interpolate the percentile coordinate into an alternative set of percentiles using linear interpolation. Args: forecast_at_percentiles (iris.cube.Cube): Cube expected to contain a percentile coordinate. no_of_percentiles (int or None): Number of percentiles If None, the number of percentiles within the input forecast_at_percentiles cube is used as the number of percentiles. sampling (str): Type of sampling of the distribution to produce a set of percentiles e.g. quantile or random. Accepted options for sampling are: * Quantile: A regular set of equally-spaced percentiles aimed at dividing a Cumulative Distribution Function into blocks of equal probability. * Random: A random set of ordered percentiles. Returns: iris.cube.Cube: Cube with forecast values at the desired set of percentiles. The percentile coordinate is always the zeroth dimension. """ percentile_coord = find_percentile_coordinate(forecast_at_percentiles) if no_of_percentiles is None: no_of_percentiles = ( len(forecast_at_percentiles.coord( percentile_coord).points)) percentiles = choose_set_of_percentiles( no_of_percentiles, sampling=sampling) cube_units = forecast_at_percentiles.units bounds_pairing = ( get_bounds_of_distribution( forecast_at_percentiles.name(), cube_units)) forecast_at_percentiles = self._interpolate_percentiles( forecast_at_percentiles, percentiles, bounds_pairing, percentile_coord.name()) return forecast_at_percentiles
[docs]class GeneratePercentilesFromProbabilities(BasePlugin): """ Class for generating percentiles from probabilities. In combination with the Ensemble Reordering plugin, this is a variant Ensemble Copula Coupling. This class includes the ability to interpolate between probabilities specified using multiple thresholds in order to generate the percentiles, see Figure 1 from Flowerdew, 2014. Scientific Reference: Flowerdew, J., 2014. Calibrated ensemble reliability whilst preserving spatial structure. Tellus Series A, Dynamic Meteorology and Oceanography, 66, 22662. """
[docs] def __init__(self, ecc_bounds_warning=False): """ Initialise the class. Args: ecc_bounds_warning (bool): If true and ECC bounds are exceeded by the percentile values, a warning will be generated rather than an exception. Default value is FALSE. """ self.ecc_bounds_warning = ecc_bounds_warning
[docs] def _add_bounds_to_thresholds_and_probabilities( self, threshold_points, probabilities_for_cdf, bounds_pairing): """ Padding of the lower and upper bounds of the distribution for a given phenomenon for the threshold_points, and padding of probabilities of 0 and 1 to the forecast probabilities. Args: threshold_points (numpy.ndarray): Array of threshold values used to calculate the probabilities. probabilities_for_cdf (numpy.ndarray): Array containing the probabilities used for constructing an cumulative distribution function i.e. probabilities below threshold. bounds_pairing (tuple): Lower and upper bound to be used as the ends of the cumulative distribution function. Returns: (tuple): tuple containing: **threshold_points** (numpy.ndarray): Array of threshold values padded with the lower and upper bound of the distribution. **probabilities_for_cdf** (numpy.ndarray): Array containing the probabilities padded with 0 and 1 at each end. Raises: ValueError: If the thresholds exceed the ECC bounds for the diagnostic and self.ecc_bounds_warning is False. Warns: Warning: If the thresholds exceed the ECC bounds for the diagnostic and self.ecc_bounds_warning is True. """ lower_bound, upper_bound = bounds_pairing threshold_points_with_endpoints = \ insert_lower_and_upper_endpoint_to_1d_array( threshold_points, lower_bound, upper_bound) probabilities_for_cdf = concatenate_2d_array_with_2d_array_endpoints( probabilities_for_cdf, 0, 1) if np.any(np.diff(threshold_points_with_endpoints) < 0): msg = ("The calculated threshold values {} are not in ascending " "order as required for the cumulative distribution " "function (CDF). This is due to the threshold values " "exceeding the range given by the ECC bounds {}." .format(threshold_points_with_endpoints, bounds_pairing)) # If ecc_bounds_warning has been set, generate a warning message # rather than raising an exception so that subsequent processing # can continue. Then apply the new bounds as necessary to # ensure the threshold values and endpoints are in ascending # order and avoid problems further along the processing chain. if self.ecc_bounds_warning: warn_msg = msg + (" The threshold points that have " "exceeded the existing bounds will be used " "as new bounds.") warnings.warn(warn_msg) if upper_bound < max(threshold_points_with_endpoints): upper_bound = max(threshold_points_with_endpoints) if lower_bound > min(threshold_points_with_endpoints): lower_bound = min(threshold_points_with_endpoints) threshold_points_with_endpoints = \ insert_lower_and_upper_endpoint_to_1d_array( threshold_points, lower_bound, upper_bound) else: raise ValueError(msg) return threshold_points_with_endpoints, probabilities_for_cdf
[docs] def _probabilities_to_percentiles( self, forecast_probabilities, percentiles, bounds_pairing): """ Conversion of probabilities to percentiles through the construction of an cumulative distribution function. This is effectively constructed by linear interpolation from the probabilities associated with each threshold to a set of percentiles. Args: forecast_probabilities (iris.cube.Cube): Cube with a threshold coordinate. percentiles (numpy.ndarray): Array of percentiles, at which the corresponding values will be calculated. bounds_pairing (tuple): Lower and upper bound to be used as the ends of the cumulative distribution function. Returns: iris.cube.Cube: Cube containing values for the required diagnostic e.g. air_temperature at the required percentiles. Raises: NotImplementedError: If the threshold coordinate has an spp__relative_to_threshold attribute that is not either "above" or "below". Warns: Warning: If the probability values are not ascending, so the resulting cdf is not monotonically increasing. """ threshold_coord = find_threshold_coordinate(forecast_probabilities) threshold_unit = threshold_coord.units threshold_points = threshold_coord.points # Ensure that the percentile dimension is first, so that the # conversion to a 2d array produces data in the desired order. enforce_coordinate_ordering( forecast_probabilities, threshold_coord.name()) prob_slices = convert_cube_data_to_2d( forecast_probabilities, coord=threshold_coord.name()) # The requirement below for a monotonically changing probability # across thresholds can be thwarted by precision errors of order 1E-10, # as such, here we round to a precision of 9 decimal places. prob_slices = np.around(prob_slices, 9) # Invert probabilities for data thresholded above thresholds. relation = find_threshold_coordinate( forecast_probabilities).attributes['spp__relative_to_threshold'] if relation == 'above': probabilities_for_cdf = 1 - prob_slices elif relation == 'below': probabilities_for_cdf = prob_slices else: msg = ("Probabilities to percentiles only implemented for " "thresholds above or below a given value." "The relation to threshold is given as {}".format(relation)) raise NotImplementedError(msg) threshold_points, probabilities_for_cdf = ( self._add_bounds_to_thresholds_and_probabilities( threshold_points, probabilities_for_cdf, bounds_pairing)) if np.any(np.diff(probabilities_for_cdf) < 0): msg = ("The probability values used to construct the " "Cumulative Distribution Function (CDF) " "must be ascending i.e. in order to yield " "a monotonically increasing CDF." "The probabilities are {}".format(probabilities_for_cdf)) warnings.warn(msg) # Convert percentiles into fractions. percentiles_as_fractions = np.array( [x/100.0 for x in percentiles], dtype=np.float32) forecast_at_percentiles = ( np.empty((len(percentiles), probabilities_for_cdf.shape[0]), dtype=np.float32) ) for index in range(probabilities_for_cdf.shape[0]): forecast_at_percentiles[:, index] = np.interp( percentiles_as_fractions, probabilities_for_cdf[index, :], threshold_points) # Reshape forecast_at_percentiles, so the percentiles dimension is # first, and any other dimension coordinates follow. forecast_at_percentiles = ( restore_non_probabilistic_dimensions( forecast_at_percentiles, forecast_probabilities, threshold_coord.name(), len(percentiles))) for template_cube in forecast_probabilities.slices_over( threshold_coord.name()): template_cube.rename( template_cube.name().replace("probability_of_", "")) template_cube.rename( template_cube.name().replace( "_above_threshold", "").replace("_below_threshold", "")) template_cube.remove_coord(threshold_coord.name()) break percentile_cube = create_cube_with_percentiles( percentiles, template_cube, forecast_at_percentiles, cube_unit=threshold_unit) return percentile_cube
[docs] def process(self, forecast_probabilities, no_of_percentiles=None, percentiles=None, sampling="quantile"): """ 1. Concatenates cubes with a threshold coordinate. 2. Creates a list of percentiles. 3. Accesses the lower and upper bound pair to find the ends of the cumulative distribution function. 4. Convert the threshold coordinate into values at a set of percentiles using linear interpolation, see Figure 1 from Flowerdew, 2014. Args: forecast_probabilities (iris.cube.Cube): Cube containing a threshold coordinate. no_of_percentiles (int): Number of percentiles. If None and percentiles is not set, the number of thresholds within the input forecast_probabilities cube is used as the number of percentiles. This argument is mutually exclusive with percentiles. percentiles (list of float): The desired percentile values in the interval [0, 100]. This argument is mutually exclusive with no_of_percentiles. sampling (str): Type of sampling of the distribution to produce a set of percentiles e.g. quantile or random. Accepted options for sampling are: * Quantile: A regular set of equally-spaced percentiles aimed at dividing a Cumulative Distribution Function into blocks of equal probability. * Random: A random set of ordered percentiles. Returns: iris.cube.Cube: Cube with forecast values at the desired set of percentiles. The threshold coordinate is always the zeroth dimension. Raises: ValueError: If both no_of_percentiles and percentiles are provided """ if no_of_percentiles is not None and percentiles is not None: raise ValueError( "Cannot specify both no_of_percentiles and percentiles to " "GeneratePercentilesFromProbabilities") threshold_coord = find_threshold_coordinate(forecast_probabilities) phenom_name = ( forecast_probabilities.name().replace( "probability_of_", "").replace("_above_threshold", "").replace( "_below_threshold", "")) if no_of_percentiles is None: no_of_percentiles = ( len(forecast_probabilities.coord( threshold_coord.name()).points)) if percentiles is None: percentiles = choose_set_of_percentiles( no_of_percentiles, sampling=sampling) elif not isinstance(percentiles, (tuple, list)): percentiles = [percentiles] percentiles = np.array(percentiles, dtype=np.float32) cube_units = ( forecast_probabilities.coord(threshold_coord.name()).units) bounds_pairing = ( get_bounds_of_distribution( phenom_name, cube_units)) # If a cube still has multiple realizations, slice over these to reduce # the memory requirements into manageable chunks. try: slices_over_realization = forecast_probabilities.slices_over( "realization") except CoordinateNotFoundError: slices_over_realization = [forecast_probabilities] cubelist = iris.cube.CubeList([]) for cube_realization in slices_over_realization: cubelist.append(self._probabilities_to_percentiles( cube_realization, percentiles, bounds_pairing)) forecast_at_percentiles = cubelist.merge_cube() return forecast_at_percentiles
[docs]class FromMeanAndVariance(): """ Base Class to support the plugins that compute percentiles and probabilities from the mean and variance. """
[docs] def __init__(self, distribution="norm", shape_parameters=None): """ Initialise the class. Args: distribution (str): Name of a distribution supported by scipy.stats. shape_parameters (list or None): For use with distributions in scipy.stats (e.g. truncnorm) that require the specification of shape parameters to be able to define the shape of the distribution. For the truncated normal distribution, the shape parameters should be appropriate for distribution constructed from the mean and standard deviation provided. Please note that for use with :meth:`~improver.ensemble_calibration.ensemble_calibration.\ ContinuousRankedProbabilityScoreMinimisers.calculate_truncated_normal_crps`, the shape parameters for a truncated normal distribution with a lower bound of zero should be [0, np.inf]. """ try: self.distribution = getattr(stats, distribution) except AttributeError as err: msg = ("The distribution requested {} is not a valid distribution " "in scipy.stats. {}".format(distribution, err)) raise AttributeError(msg) if shape_parameters is None: shape_parameters = [] self.shape_parameters = shape_parameters
def __repr__(self): """Represent the configured plugin instance as a string.""" result = ('<FromMeanAndVariance: distribution: {}; ' 'shape_parameters: {}>') return result.format( self.distribution.name, self.shape_parameters)
[docs] def _rescale_shape_parameters(self, mean, std): """ Rescale the shape parameters for the desired mean and standard deviation for the truncated normal distribution. The shape parameters for any other distribution will remain unchanged. For the truncated normal distribution, if the shape parameters are not rescaled, then :data:`scipy.stats.truncnorm` will assume that the shape parameters are appropriate for a standard normal distribution. As the aim is to construct a distribution using specific values for the mean and standard deviation, the assumption of a standard normal distribution is not appropriate. Therefore the shape parameters are rescaled using the equations: .. math:: a\\_rescaled = (a - mean)/standard\\_deviation b\\_rescaled = (b - mean)/standard\\_deviation Please see :data:`scipy.stats.truncnorm` for some further information. Args: mean (numpy.ndarray): Mean to be used to scale the shape parameters. std (numpy.ndarray): Standard deviation to be used to scale the shape parameters. """ if self.distribution.name == "truncnorm": if self.shape_parameters: rescaled_values = [] for value in self.shape_parameters: rescaled_values.append((value - mean) / std) self.shape_parameters = rescaled_values else: msg = ("For the truncated normal distribution, " "shape parameters must be specified.") raise ValueError(msg)
[docs]class GeneratePercentilesFromMeanAndVariance(BasePlugin, FromMeanAndVariance): """ Plugin focussing on generating percentiles from mean and variance. In combination with the EnsembleReordering plugin, this is Ensemble Copula Coupling. """ def __repr__(self): """Represent the configured plugin instance as a string.""" result = ('<GeneratePercentilesFromMeanAndVariance: ' 'distribution: {}; shape_parameters: {}>') return result.format(self.distribution.name, self.shape_parameters)
[docs] def _mean_and_variance_to_percentiles( self, calibrated_forecast_predictor, calibrated_forecast_variance, template_cube, percentiles): """ Function returning percentiles based on the supplied mean and variance. The percentiles are created by assuming a Gaussian distribution and calculating the value of the phenomenon at specific points within the distribution. Args: calibrated_forecast_predictor (iris.cube.Cube): Predictor for the calibrated forecast i.e. the mean. calibrated_forecast_variance (iris.cube.Cube): Variance for the calibrated forecast. template_cube (iris.cube.Cube): Template cube containing either a percentile or realization coordinate. All coordinates apart from the percentile or realization coordinate will be copied from the template cube. Metadata will also be copied from this cube. percentiles (list): Percentiles at which to calculate the value of the phenomenon at. Returns: iris.cube.Cube: Cube containing the values for the phenomenon at each of the percentiles requested. Raises: ValueError: If any of the resulting percentile values are nans and these nans are not caused by a zero variance. """ calibrated_forecast_predictor_data = ( calibrated_forecast_predictor.data.flatten()) calibrated_forecast_variance_data = ( calibrated_forecast_variance.data.flatten()) # Convert percentiles into fractions. percentiles = np.array( [x/100.0 for x in percentiles], dtype=np.float32) result = np.zeros((len(percentiles), calibrated_forecast_predictor_data.shape[0]), dtype=np.float32) self._rescale_shape_parameters( calibrated_forecast_predictor_data, np.sqrt(calibrated_forecast_variance_data)) percentile_method = self.distribution( *self.shape_parameters, loc=calibrated_forecast_predictor_data, scale=np.sqrt(calibrated_forecast_variance_data)) # Loop over percentiles, and use the specified distribution with the # mean and variance to calculate the values at each percentile. for index, percentile in enumerate(percentiles): percentile_list = np.repeat( percentile, len(calibrated_forecast_predictor_data)) result[index, :] = percentile_method.ppf(percentile_list) # If percent point function (PPF) returns NaNs, fill in # mean instead of NaN values. NaN will only be generated if the # variance is zero. Therefore, if the variance is zero, the mean # value is used for all gridpoints with a NaN. if np.any(calibrated_forecast_variance_data == 0): nan_index = np.argwhere(np.isnan(result[index, :])) result[index, nan_index] = ( calibrated_forecast_predictor_data[nan_index]) if np.any(np.isnan(result)): msg = ("NaNs are present within the result for the {} " "percentile. Unable to calculate the percent point " "function.") raise ValueError(msg) # Convert percentiles back into percentages. percentiles = [x*100.0 for x in percentiles] # Reshape forecast_at_percentiles, so the percentiles dimension is # first, and any other dimension coordinates follow. result = result.reshape( (len(percentiles),) + calibrated_forecast_predictor.data.shape) for prob_coord_name in ["realization", "percentile"]: if template_cube.coords(prob_coord_name, dim_coords=True): prob_coord = template_cube.coord(prob_coord_name) template_slice = next(template_cube.slices_over(prob_coord)) template_slice.remove_coord(prob_coord) percentile_cube = create_cube_with_percentiles( percentiles, template_slice, result) # Remove cell methods associated with finding the ensemble mean percentile_cube.cell_methods = {} return percentile_cube
[docs] def process(self, calibrated_forecast_predictor, calibrated_forecast_variance, template_cube, no_of_percentiles=None, percentiles=None): """ Generate ensemble percentiles from the mean and variance. Args: calibrated_forecast_predictor (iris.cube.Cube): Cube containing the calibrated forecast predictor. calibrated_forecast_variance (iris.cube.Cube): Cube containing the calibrated forecast variance. template_cube (iris.cube.Cube): Template cube containing either a percentile or realization coordinate. All coordinates apart from the percentile or realization coordinate will be copied from the template cube. Metadata will also be copied from this cube. no_of_percentiles (int): Integer defining the number of percentiles that will be calculated from the mean and variance. percentiles (list): List of percentiles that will be generated from the mean and variance provided. Returns: iris.cube.Cube: Cube for calibrated percentiles. The percentile coordinate is always the zeroth dimension. Raises: ValueError: Ensure that it is not possible to supply "no_of_percentiles" and "percentiles" simultaneously as keyword arguments. """ if no_of_percentiles and percentiles: msg = ("Please specify either the number of percentiles or " "provide a list of percentiles. The number of percentiles " "provided was {} and the list of percentiles " "provided was {}".format(no_of_percentiles, percentiles)) raise ValueError(msg) if no_of_percentiles: percentiles = choose_set_of_percentiles(no_of_percentiles) calibrated_forecast_percentiles = ( self._mean_and_variance_to_percentiles( calibrated_forecast_predictor, calibrated_forecast_variance, template_cube, percentiles)) return calibrated_forecast_percentiles
[docs]class GenerateProbabilitiesFromMeanAndVariance( BasePlugin, FromMeanAndVariance): """ Plugin to generate probabilities relative to given thresholds from the mean and variance of a distribution. """ def __repr__(self): """Represent the configured plugin instance as a string.""" result = ('<GenerateProbabilitiesFromMeanAndVariance: ' 'distribution: {}; shape_parameters: {}>') return result.format(self.distribution.name, self.shape_parameters)
[docs] @staticmethod def _check_template_cube(cube): """ The template cube is expected to contain a leading threshold dimension followed by spatial (y/x) dimensions. This check raises an error if this is not the case. If the cube contains the expected dimensions, a threshold leading order is enforced. Args: cube (iris.cube.Cube): A cube whose dimensions are checked to ensure they match what is expected. Raises: ValueError: If cube is not of the expected dimensions. """ check_for_x_and_y_axes(cube, require_dim_coords=True) dim_coords = [coord.name() for coord in cube.coords(dim_coords=True)] msg = ('GenerateProbabilitiesFromMeanAndVariance expects a cube with ' 'only a leading threshold dimension, followed by spatial (y/x) ' 'dimensions. Got dimensions: {}'.format(dim_coords)) try: threshold_coord = find_threshold_coordinate(cube) except CoordinateNotFoundError: raise ValueError(msg) if len(dim_coords) < 4: enforce_coordinate_ordering(cube, threshold_coord.name()) return raise ValueError(msg)
[docs] @staticmethod def _check_unit_compatibility(mean_values, variance_values, probability_cube_template): """ The mean, variance, and threshold values come from three different cubes. They should all be in the same units, but this is a sanity check to ensure this is the case, converting units of the means and variances if possible. This has been written specifically for this plugin as we are comparing squared units in the case of the variance. Args: mean_values (iris.cube.Cube): Cube of mean values. variance_values (iris.cube.Cube): Cube of variance values. probability_cube_template (iris.cube.Cube): Cube containing threshold values. Raises: ValueError: If units of input cubes are not compatible. """ threshold_units = ( find_threshold_coordinate(probability_cube_template).units) try: mean_values.convert_units(threshold_units) variance_values.convert_units(threshold_units**2) except ValueError as err: msg = ('Error: {} This is likely because the mean ' 'variance and template cube threshold units are ' 'not equivalent/compatible.'.format(err)) raise ValueError(msg)
[docs] def _mean_and_variance_to_probabilities(self, mean_values, variance_values, probability_cube_template): """ Function returning probabilities relative to provided thresholds based on the supplied mean and variance. A Gaussian distribution is assumed. Args: mean_values (iris.cube.Cube): Predictor for the calibrated forecast i.e. the mean. variance_values (iris.cube.Cube): Variance for the calibrated forecast. probability_cube_template (iris.cube.Cube): A probability cube that has a threshold coordinate, where the probabilities are defined as above or below the threshold by the spp__relative_to_threshold attribute. This cube matches the desired output cube format. Returns: iris.cube.Cube: Cube containing the data expressed as probabilities relative to the provided thresholds in the way described by spp__relative_to_threshold. """ thresholds = ( find_threshold_coordinate(probability_cube_template).points) relative_to_threshold = find_threshold_coordinate( probability_cube_template).attributes['spp__relative_to_threshold'] self._rescale_shape_parameters( mean_values.data.flatten(), np.sqrt(variance_values.data).flatten()) # Loop over thresholds, and use the specified distribution with the # mean and variance to calculate the probabilities relative to each # threshold. probabilities = np.empty_like(probability_cube_template.data) distribution = self.distribution( *self.shape_parameters, loc=mean_values.data.flatten(), scale=np.sqrt(variance_values.data.flatten())) probability_method = distribution.cdf if relative_to_threshold == 'above': probability_method = distribution.sf for index, threshold in enumerate(thresholds): probabilities[index, ...] = np.reshape( probability_method(threshold), probabilities.shape[1:]) probability_cube = probability_cube_template.copy(data=probabilities) return probability_cube
[docs] def process(self, mean_values, variance_values, probability_cube_template): """ Generate probabilities from the mean and variance of distribution. Args: mean_values (iris.cube.Cube): Cube containing the distribution mean values of a diagnostic, e.g. the mean over realizations. variance_values (iris.cube.Cube): Cube containing the distribution variance values of a diagnostic, e.g. the variance across realizations. probability_cube_template (iris.cube.Cube): A probability cube that has a threshold coordinate, where the probabilities are defined as above or below the threshold by the spp__relative_to_threshold attribute. This cube matches the desired output cube format. Returns: iris.cube.Cube: A cube of diagnostic data expressed as probabilities relative to the thresholds found in the probability_cube_template. """ self._check_template_cube(probability_cube_template) self._check_unit_compatibility(mean_values, variance_values, probability_cube_template) probability_cube = self._mean_and_variance_to_probabilities( mean_values, variance_values, probability_cube_template) return probability_cube
[docs]class EnsembleReordering(BasePlugin): """ Plugin for applying the reordering step of Ensemble Copula Coupling, in order to generate ensemble realizations with multivariate structure from percentiles. The percentiles are assumed to be in ascending order. Reference: Schefzik, R., Thorarinsdottir, T.L. & Gneiting, T., 2013. Uncertainty Quantification in Complex Simulation Models Using Ensemble Copula Coupling. Statistical Science, 28(4), pp.616-640. """
[docs] @staticmethod def _recycle_raw_ensemble_realizations( post_processed_forecast_percentiles, raw_forecast_realizations, percentile_coord_name): """ Function to determine whether there is a mismatch between the number of percentiles and the number of raw forecast realizations. If more percentiles are requested than ensemble realizations, then the ensemble realizations are recycled. This assumes that the identity of the ensemble realizations within the raw ensemble forecast is random, such that the raw ensemble realizations are exchangeable. If fewer percentiles are requested than ensemble realizations, then only the first n ensemble realizations are used. Args: post_processed_forecast_percentiles (iris.cube.Cube): Cube for post-processed percentiles. The percentiles are assumed to be in ascending order. raw_forecast_realizations (iris.cube.Cube): Cube containing the raw (not post-processed) forecasts. percentile_coord_name (str): Name of required percentile coordinate. Returns: iris cube.Cube: Cube for the raw ensemble forecast, where the raw ensemble realizations have either been recycled or constrained, depending upon the number of percentiles present in the post-processed forecast cube. """ plen = len( post_processed_forecast_percentiles.coord( percentile_coord_name).points) mlen = len(raw_forecast_realizations.coord("realization").points) if plen == mlen: pass else: raw_forecast_realizations_extended = iris.cube.CubeList() realization_list = [] mpoints = raw_forecast_realizations.coord("realization").points # Loop over the number of percentiles and finding the # corresponding ensemble realization number. The ensemble # realization numbers are recycled e.g. 1, 2, 3, 1, 2, 3, etc. for index in range(plen): realization_list.append(mpoints[index % len(mpoints)]) # Assume that the ensemble realizations are ascending linearly. new_realization_numbers = realization_list[0] + list(range(plen)) # Extract the realizations required in the realization_list from # the raw_forecast_realizations. Edit the realization number as # appropriate and append to a cubelist containing rebadged # raw ensemble realizations. for realization, index in zip( realization_list, new_realization_numbers): constr = iris.Constraint(realization=realization) raw_forecast_realization = raw_forecast_realizations.extract( constr) raw_forecast_realization.coord("realization").points = index raw_forecast_realizations_extended.append( raw_forecast_realization) raw_forecast_realizations = concatenate_cubes( raw_forecast_realizations_extended, coords_to_slice_over=["realization", "time"]) return raw_forecast_realizations
[docs] @staticmethod def rank_ecc( post_processed_forecast_percentiles, raw_forecast_realizations, random_ordering=False, random_seed=None): """ Function to apply Ensemble Copula Coupling. This ranks the post-processed forecast realizations based on a ranking determined from the raw forecast realizations. Args: post_processed_forecast_percentiles (iris.cube.Cube): Cube for post-processed percentiles. The percentiles are assumed to be in ascending order. raw_forecast_realizations (iris.cube.Cube): Cube containing the raw (not post-processed) forecasts. The probabilistic dimension is assumed to be the zeroth dimension. random_ordering (bool): If random_ordering is True, the post-processed forecasts are reordered randomly, rather than using the ordering of the raw ensemble. random_seed (int or None): If random_seed is an integer, the integer value is used for the random seed. If random_seed is None, no random seed is set, so the random values generated are not reproducible. Returns: iris.cube.Cube: Cube for post-processed realizations where at a particular grid point, the ranking of the values within the ensemble matches the ranking from the raw ensemble. """ results = iris.cube.CubeList([]) for rawfc, calfc in zip( raw_forecast_realizations.slices_over("time"), post_processed_forecast_percentiles.slices_over("time")): if random_seed is not None: random_seed = int(random_seed) random_seed = np.random.RandomState(random_seed) random_data = random_seed.rand(*rawfc.data.shape) if random_ordering: # Returns the indices that would sort the array. # As these indices are from a random dataset, only an argsort # is used. ranking = np.argsort(random_data, axis=0) else: # Lexsort returns the indices sorted firstly by the # primary key, the raw forecast data (unless random_ordering # is enabled), and secondly by the secondary key, an array of # random data, in order to split tied values randomly. sorting_index = np.lexsort((random_data, rawfc.data), axis=0) # Returns the indices that would sort the array. ranking = np.argsort(sorting_index, axis=0) # Index the post-processed forecast data using the ranking array. # The following uses a custom choose function that reproduces the # required elements of the np.choose method without the limitation # of having < 32 arrays or a leading dimension < 32 in the # input data array. This function allows indexing of a 3d array # using a 3d array. calfc.data = choose(ranking, calfc.data) results.append(calfc) # Ensure we haven't lost any dimensional coordinates with only one # value in. results = results.merge_cube() results = check_cube_coordinates( post_processed_forecast_percentiles, results) return results
[docs] def process( self, post_processed_forecast, raw_forecast, random_ordering=False, random_seed=None): """ Reorder post-processed forecast using the ordering of the raw ensemble. Args: post_processed_forecast (iris.cube.Cube): The cube containing the post-processed forecast realizations. raw_forecast (iris.cube.Cube): The cube containing the raw (not post-processed) forecast. random_ordering (bool): If random_ordering is True, the post-processed forecasts are reordered randomly, rather than using the ordering of the raw ensemble. random_seed (int): If random_seed is an integer, the integer value is used for the random seed. If random_seed is None, no random seed is set, so the random values generated are not reproducible. Returns: iris.cube.Cube: Cube containing the new ensemble realizations where all points within the dataset have been reordered in comparison to the input percentiles. """ percentile_coord_name = ( find_percentile_coordinate(post_processed_forecast).name()) enforce_coordinate_ordering( post_processed_forecast, percentile_coord_name) enforce_coordinate_ordering(raw_forecast, "realization") raw_forecast = ( self._recycle_raw_ensemble_realizations( post_processed_forecast, raw_forecast, percentile_coord_name)) post_processed_forecast_realizations = self.rank_ecc( post_processed_forecast, raw_forecast, random_ordering=random_ordering, random_seed=random_seed) post_processed_forecast_realizations = ( RebadgePercentilesAsRealizations.process( post_processed_forecast_realizations)) enforce_coordinate_ordering( post_processed_forecast_realizations, "realization") return post_processed_forecast_realizations