# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# (C) British Crown Copyright 2017-2019 Met Office.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
This module defines all the "plugins" specific for ensemble calibration.
"""
import datetime
import warnings
import iris
import numpy as np
from iris.exceptions import CoordinateNotFoundError
from scipy import stats
from scipy.optimize import minimize
from scipy.stats import norm
from improver import BasePlugin
from improver.ensemble_calibration.utilities import (
check_predictor_of_mean_flag, convert_cube_data_to_2d,
flatten_ignoring_masked_data)
from improver.metadata.utilities import create_new_diagnostic_cube
from improver.utilities.cube_checker import time_coords_match
from improver.utilities.cube_manipulation import enforce_coordinate_ordering
from improver.utilities.temporal import (
cycletime_to_datetime, datetime_to_iris_time, iris_time_to_datetime)
[docs]class ContinuousRankedProbabilityScoreMinimisers:
"""
Minimise the Continuous Ranked Probability Score (CRPS)
Calculate the optimised coefficients for minimising the CRPS based on
assuming a particular probability distribution for the phenomenon being
minimised.
The number of coefficients that will be optimised depend upon the initial
guess.
Minimisation is performed using the Nelder-Mead algorithm for 200
iterations to limit the computational expense.
Note that the BFGS algorithm was initially trialled but had a bug
in comparison to comparative results generated in R.
"""
# The tolerated percentage change for the final iteration when
# performing the minimisation.
TOLERATED_PERCENTAGE_CHANGE = 5
# An arbitrary value set if an infinite value is detected
# as part of the minimisation.
BAD_VALUE = np.float64(999999)
[docs] def __init__(self, tolerance=0.01, max_iterations=1000):
"""
Initialise class for performing minimisation of the Continuous
Ranked Probability Score (CRPS).
Args:
tolerance (float):
The tolerance for the Continuous Ranked Probability
Score (CRPS) calculated by the minimisation. The CRPS is in
the units of the variable being calibrated. The tolerance is
therefore representative of how close to the actual value are
we aiming to forecast for a particular variable. Once multiple
iterations result in a CRPS equal to the same value within the
specified tolerance, the minimisation will terminate.
max_iterations (int):
The maximum number of iterations allowed until the
minimisation has converged to a stable solution. If the
maximum number of iterations is reached, but the minimisation
has not yet converged to a stable solution, then the available
solution is used anyway, and a warning is raised. If the
predictor_of_mean is "realizations", then the number of
iterations may require increasing, as there will be
more coefficients to solve for.
"""
# Dictionary containing the functions that will be minimised,
# depending upon the distribution requested.
self.minimisation_dict = {
"gaussian": self.calculate_normal_crps,
"truncated_gaussian": self.calculate_truncated_normal_crps}
self.tolerance = tolerance
# Maximum iterations for minimisation using Nelder-Mead.
self.max_iterations = max_iterations
def __repr__(self):
"""Represent the configured plugin instance as a string."""
result = ('<ContinuousRankedProbabilityScoreMinimisers: '
'minimisation_dict: {}; tolerance: {}; max_iterations: {}>')
print_dict = {}
for key in self.minimisation_dict:
print_dict.update({key: self.minimisation_dict[key].__name__})
return result.format(print_dict, self.tolerance, self.max_iterations)
[docs] def process(
self, initial_guess, forecast_predictor, truth, forecast_var,
predictor_of_mean_flag, distribution):
"""
Function to pass a given function to the scipy minimize
function to estimate optimised values for the coefficients.
If the predictor_of_mean_flag is the ensemble mean, this function
estimates values for alpha, beta, gamma and delta based on the
equation:
N(a + b * ensemble_mean, c + d * ensemble_variance),
where N is a chosen distribution and values of a, b, c and d are
solved in the format of alpha, beta, gamma and delta, see the equations
below.
.. math::
a = \\alpha
.. math::
b = \\beta
.. math::
c = \\gamma^2
.. math::
d = \\delta^2
The gamma and delta values are squared to ensure c and d are positive
and therefore more interpretable.
If the predictor_of_mean_flag is the ensemble realizations, this
function estimates values for alpha, beta, gamma and delta based on the
equation:
.. math::
N(a + b_0 \\times realization0 + b_1 \\times realization1,
c + d \\times ensemble\\_variance)
where N is a chosen distribution, the values of a, b, c and d relate
to alpha, beta, gamma and delta through the equations above with
the exception that :math:`b=\\beta^2`, and the number of beta terms
depends on the number of realizations provided. The beta, gamma, and
delta values are squared to ensure that b, c and d are positive values
and therefore are more easily interpretable. Specifically for the b
term, the squaring ensures that the the b values can be interpreted
as a weighting for each realization.
Args:
initial_guess (list):
List of optimised coefficients.
Order of coefficients is [gamma, delta, alpha, beta].
forecast_predictor (iris.cube.Cube):
Cube containing the fields to be used as the predictor,
either the ensemble mean or the ensemble realizations.
truth (iris.cube.Cube):
Cube containing the field, which will be used as truth.
forecast_var (iris.cube.Cube):
Cube containg the field containing the ensemble variance.
predictor_of_mean_flag (str):
String to specify the input to calculate the calibrated mean.
Currently the ensemble mean ("mean") and the ensemble
realizations ("realizations") are supported as the predictors.
distribution (str):
String used to access the appropriate function for use in the
minimisation within self.minimisation_dict.
Returns:
list of float:
List of optimised coefficients.
Order of coefficients is [gamma, delta, alpha, beta].
Raises:
KeyError: If the distribution is not supported.
Warns:
Warning: If the minimisation did not converge.
"""
def calculate_percentage_change_in_last_iteration(allvecs):
"""
Calculate the percentage change that has occurred within
the last iteration of the minimisation. If the percentage change
between the last iteration and the last-but-one iteration exceeds
the threshold, a warning message is printed.
Args:
allvecs (list):
List of numpy arrays containing the optimised coefficients,
after each iteration.
Warns:
Warning: If a satisfactory minimisation has not been achieved.
"""
last_iteration_percentage_change = np.absolute(
(allvecs[-1] - allvecs[-2]) / allvecs[-2])*100
if (np.any(last_iteration_percentage_change >
self.TOLERATED_PERCENTAGE_CHANGE)):
np.set_printoptions(suppress=True)
msg = ("The final iteration resulted in a percentage change "
"that is greater than the accepted threshold of 5% "
"i.e. {}. "
"\nA satisfactory minimisation has not been achieved. "
"\nLast iteration: {}, "
"\nLast-but-one iteration: {}"
"\nAbsolute difference: {}\n").format(
last_iteration_percentage_change, allvecs[-1],
allvecs[-2], np.absolute(allvecs[-2]-allvecs[-1]))
warnings.warn(msg)
try:
minimisation_function = self.minimisation_dict[distribution]
except KeyError as err:
msg = ("Distribution requested {} is not supported in {}"
"Error message is {}".format(
distribution, self.minimisation_dict, err))
raise KeyError(msg)
# Ensure predictor_of_mean_flag is valid.
check_predictor_of_mean_flag(predictor_of_mean_flag)
# Flatten the data arrays and remove any missing data.
truth_data = flatten_ignoring_masked_data(truth.data)
forecast_var_data = flatten_ignoring_masked_data(forecast_var.data)
if predictor_of_mean_flag.lower() == "mean":
forecast_predictor_data = flatten_ignoring_masked_data(
forecast_predictor.data)
elif predictor_of_mean_flag.lower() == "realizations":
enforce_coordinate_ordering(forecast_predictor, "realization")
# Need to transpose this array so there are columns for each
# ensemble member rather than rows.
forecast_predictor_data = flatten_ignoring_masked_data(
forecast_predictor.data, preserve_leading_dimension=True).T
# Increased precision is needed for stable coefficient calculation.
# The resulting coefficients are cast to float32 prior to output.
initial_guess = np.array(initial_guess, dtype=np.float64)
forecast_predictor_data = forecast_predictor_data.astype(np.float64)
forecast_var_data = forecast_var_data.astype(np.float64)
truth_data = truth_data.astype(np.float64)
sqrt_pi = np.sqrt(np.pi).astype(np.float64)
optimised_coeffs = minimize(
minimisation_function, initial_guess,
args=(forecast_predictor_data, truth_data,
forecast_var_data, sqrt_pi, predictor_of_mean_flag),
method="Nelder-Mead", tol=self.tolerance,
options={"maxiter": self.max_iterations, "return_all": True})
if not optimised_coeffs.success:
msg = ("Minimisation did not result in convergence after "
"{} iterations. \n{}".format(
self.max_iterations, optimised_coeffs.message))
warnings.warn(msg)
calculate_percentage_change_in_last_iteration(optimised_coeffs.allvecs)
return optimised_coeffs.x.astype(np.float32)
[docs] def calculate_normal_crps(
self, initial_guess, forecast_predictor, truth, forecast_var,
sqrt_pi, predictor_of_mean_flag):
"""
Calculate the CRPS for a normal distribution.
Scientific Reference:
Gneiting, T. et al., 2005.
Calibrated Probabilistic Forecasting Using Ensemble Model Output
Statistics and Minimum CRPS Estimation.
Monthly Weather Review, 133(5), pp.1098-1118.
Args:
initial_guess (list):
List of optimised coefficients.
Order of coefficients is [gamma, delta, alpha, beta].
forecast_predictor (numpy.ndarray):
Data to be used as the predictor,
either the ensemble mean or the ensemble realizations.
truth (numpy.ndarray):
Data to be used as truth.
forecast_var (numpy.ndarray):
Ensemble variance data.
sqrt_pi (numpy.ndarray):
Square root of Pi
predictor_of_mean_flag (str):
String to specify the input to calculate the calibrated mean.
Currently the ensemble mean ("mean") and the ensemble
realizations ("realizations") are supported as the predictors.
Returns:
float:
CRPS for the current set of coefficients. This CRPS is a mean
value across all points.
"""
if predictor_of_mean_flag.lower() == "mean":
beta = initial_guess[2:]
elif predictor_of_mean_flag.lower() == "realizations":
beta = np.array(
[initial_guess[2]]+(initial_guess[3:]**2).tolist(),
dtype=np.float32
)
new_col = np.ones(truth.shape, dtype=np.float32)
all_data = np.column_stack((new_col, forecast_predictor))
mu = np.dot(all_data, beta)
sigma = np.sqrt(
initial_guess[0]**2 + initial_guess[1]**2 * forecast_var)
xz = (truth - mu) / sigma
normal_cdf = norm.cdf(xz)
normal_pdf = norm.pdf(xz)
if np.isfinite(np.min(mu/sigma)):
result = np.nanmean(
sigma * (
xz * (2 * normal_cdf - 1) + 2 * normal_pdf - 1 / sqrt_pi))
else:
result = self.BAD_VALUE
return result
[docs] def calculate_truncated_normal_crps(
self, initial_guess, forecast_predictor, truth, forecast_var,
sqrt_pi, predictor_of_mean_flag):
"""
Calculate the CRPS for a truncated normal distribution with zero
as the lower bound.
Scientific Reference:
Thorarinsdottir, T.L. & Gneiting, T., 2010.
Probabilistic forecasts of wind speed: Ensemble model
output statistics by using heteroscedastic censored regression.
Journal of the Royal Statistical Society.
Series A: Statistics in Society, 173(2), pp.371-388.
Args:
initial_guess (list):
List of optimised coefficients.
Order of coefficients is [gamma, delta, alpha, beta].
forecast_predictor (numpy.ndarray):
Data to be used as the predictor,
either the ensemble mean or the ensemble realizations.
truth (numpy.ndarray):
Data to be used as truth.
forecast_var (numpy.ndarray):
Ensemble variance data.
sqrt_pi (numpy.ndarray):
Square root of Pi
predictor_of_mean_flag (str):
String to specify the input to calculate the calibrated mean.
Currently the ensemble mean ("mean") and the ensemble
realizations ("realizations") are supported as the predictors.
Returns:
float:
CRPS for the current set of coefficients. This CRPS is a mean
value across all points.
"""
if predictor_of_mean_flag.lower() == "mean":
beta = initial_guess[2:]
elif predictor_of_mean_flag.lower() == "realizations":
beta = np.array(
[initial_guess[2]]+(initial_guess[3:]**2).tolist(),
dtype=np.float32
)
new_col = np.ones(truth.shape, dtype=np.float32)
all_data = np.column_stack((new_col, forecast_predictor))
mu = np.dot(all_data, beta)
sigma = np.sqrt(
initial_guess[0]**2 + initial_guess[1]**2 * forecast_var)
xz = (truth - mu) / sigma
normal_cdf = norm.cdf(xz)
normal_pdf = norm.pdf(xz)
x0 = mu / sigma
normal_cdf_0 = norm.cdf(x0)
normal_cdf_root_two = norm.cdf(np.sqrt(2) * x0)
if np.isfinite(np.min(mu / sigma)) or (np.min(mu / sigma) >= -3):
result = np.nanmean(
(sigma / normal_cdf_0**2) *
(xz * normal_cdf_0 * (2 * normal_cdf + normal_cdf_0 - 2) +
2 * normal_pdf * normal_cdf_0 -
normal_cdf_root_two / sqrt_pi))
else:
result = self.BAD_VALUE
return result
[docs]class EstimateCoefficientsForEnsembleCalibration(BasePlugin):
"""
Class focussing on estimating the optimised coefficients for ensemble
calibration.
"""
# Logical flag for whether initial guess estimates for the coefficients
# will be estimated using linear regression i.e.
# ESTIMATE_COEFFICIENTS_FROM_LINEAR_MODEL_FLAG = True, or whether default
# values will be used instead i.e.
# ESTIMATE_COEFFICIENTS_FROM_LINEAR_MODEL_FLAG = False.
ESTIMATE_COEFFICIENTS_FROM_LINEAR_MODEL_FLAG = True
[docs] def __init__(self, distribution, current_cycle, desired_units=None,
predictor_of_mean_flag="mean", tolerance=0.01,
max_iterations=1000):
"""
Create an ensemble calibration plugin that, for Nonhomogeneous Gaussian
Regression, calculates coefficients based on historical forecasts and
applies the coefficients to the current forecast.
Args:
distribution (str):
Name of distribution. Assume that the current forecast can be
represented using this distribution.
current_cycle (str):
The current cycle in YYYYMMDDTHHMMZ format e.g. 20171122T0100Z.
This is used to create a forecast_reference_time coordinate
on the resulting EMOS coefficients cube.
desired_units (str or cf_units.Unit):
The unit that you would like the calibration to be undertaken
in. The current forecast, historical forecast and truth will be
converted as required.
predictor_of_mean_flag (str):
String to specify the input to calculate the calibrated mean.
Currently the ensemble mean ("mean") and the ensemble
realizations ("realizations") are supported as the predictors.
tolerance (float):
The tolerance for the Continuous Ranked Probability
Score (CRPS) calculated by the minimisation. The CRPS is in
the units of the variable being calibrated. The tolerance is
therefore representative of how close to the actual value are
we aiming to forecast for a particular variable. Once multiple
iterations result in a CRPS equal to the same value within the
specified tolerance, the minimisation will terminate.
max_iterations (int):
The maximum number of iterations allowed until the
minimisation has converged to a stable solution. If the
maximum number of iterations is reached, but the minimisation
has not yet converged to a stable solution, then the available
solution is used anyway, and a warning is raised. If the
predictor_of_mean is "realizations", then the number of
iterations may require increasing, as there will be
more coefficients to solve for.
Raises:
ValueError: If the given distribution is not valid.
Warns:
ImportWarning: If the statsmodels module can't be imported.
"""
valid_distributions = (ContinuousRankedProbabilityScoreMinimisers().
minimisation_dict.keys())
if distribution not in valid_distributions:
msg = ("Given distribution {} not available. Available "
"distributions are {}".format(
distribution, valid_distributions))
raise ValueError(msg)
self.distribution = distribution
self.current_cycle = current_cycle
self.desired_units = desired_units
# Ensure predictor_of_mean_flag is valid.
check_predictor_of_mean_flag(predictor_of_mean_flag)
self.predictor_of_mean_flag = predictor_of_mean_flag
self.tolerance = tolerance
self.max_iterations = max_iterations
self.minimiser = ContinuousRankedProbabilityScoreMinimisers(
tolerance=self.tolerance, max_iterations=self.max_iterations)
# Setting default values for coeff_names. Beta is the final
# coefficient name in the list, as there can potentially be
# multiple beta coefficients if the ensemble realizations, rather
# than the ensemble mean, are provided as the predictor.
self.coeff_names = ["gamma", "delta", "alpha", "beta"]
import imp
try:
imp.find_module('statsmodels')
except ImportError:
statsmodels_found = False
if predictor_of_mean_flag.lower() == "realizations":
msg = (
"The statsmodels can not be imported. "
"Will not be able to calculate an initial guess from "
"the individual ensemble realizations. "
"A default initial guess will be used without "
"estimating coefficients from a linear model.")
warnings.warn(msg, ImportWarning)
else:
statsmodels_found = True
import statsmodels.api as sm
self.sm = sm
self.statsmodels_found = statsmodels_found
def __repr__(self):
"""Represent the configured plugin instance as a string."""
result = ('<EstimateCoefficientsForEnsembleCalibration: '
'distribution: {}; '
'current_cycle: {}; '
'desired_units: {}; '
'predictor_of_mean_flag: {}; '
'minimiser: {}; '
'coeff_names: {}; '
'tolerance: {}; '
'max_iterations: {}>')
return result.format(
self.distribution, self.current_cycle, self.desired_units,
self.predictor_of_mean_flag, self.minimiser.__class__,
self.coeff_names, self.tolerance, self.max_iterations)
[docs] def create_coefficients_cube(
self, optimised_coeffs, historic_forecast):
"""Create a cube for storing the coefficients computed using EMOS.
.. See the documentation for examples of these cubes.
.. include:: extended_documentation/ensemble_calibration/
ensemble_calibration/create_coefficients_cube.rst
Args:
optimised_coeffs (list):
List of optimised coefficients.
Order of coefficients is [gamma, delta, alpha, beta].
historic_forecast (iris.cube.Cube):
The cube containing the historic forecast.
Returns:
iris.cube.Cube:
Cube constructed using the coefficients provided and using
metadata from the historic_forecast cube. The cube contains
a coefficient_index dimension coordinate where the points
of the coordinate are integer values and a
coefficient_name auxiliary coordinate where the points of
the coordinate are e.g. gamma, delta, alpha, beta.
Raises:
ValueError: If the number of coefficients in the optimised_coeffs
does not match the expected number.
"""
if self.predictor_of_mean_flag.lower() == "realizations":
realization_coeffs = []
for realization in historic_forecast.coord("realization").points:
realization_coeffs.append(
"{}{}".format(self.coeff_names[-1], np.int32(realization)))
coeff_names = self.coeff_names[:-1] + realization_coeffs
else:
coeff_names = self.coeff_names
if len(optimised_coeffs) != len(coeff_names):
msg = ("The number of coefficients in {} must equal the "
"number of coefficient names {}.".format(
optimised_coeffs, coeff_names))
raise ValueError(msg)
coefficient_index = iris.coords.DimCoord(
np.arange(len(optimised_coeffs), dtype=np.int32),
long_name="coefficient_index", units="1")
coefficient_name = iris.coords.AuxCoord(
coeff_names, long_name="coefficient_name", units="no_unit")
dim_coords_and_dims = [(coefficient_index, 0)]
aux_coords_and_dims = [(coefficient_name, 0)]
# Create a forecast_reference_time coordinate.
frt_point = cycletime_to_datetime(self.current_cycle)
try:
frt_coord = (
historic_forecast.coord("forecast_reference_time").copy(
datetime_to_iris_time(frt_point)))
except CoordinateNotFoundError:
pass
else:
aux_coords_and_dims.append((frt_coord, None))
# Create forecast period and time coordinates.
try:
fp_point = (
np.unique(historic_forecast.coord("forecast_period").points))
fp_coord = (
historic_forecast.coord("forecast_period").copy(fp_point))
except CoordinateNotFoundError:
pass
else:
aux_coords_and_dims.append((fp_coord, None))
if historic_forecast.coords("time"):
# Ensure that the fp_point is determined with units of seconds.
copy_of_fp_coord = (
historic_forecast.coord("forecast_period").copy())
copy_of_fp_coord.convert_units("seconds")
fp_point, = np.unique(copy_of_fp_coord.points)
time_point = (
frt_point + datetime.timedelta(seconds=float(fp_point)))
time_point = datetime_to_iris_time(time_point)
time_coord = historic_forecast.coord("time").copy(time_point)
aux_coords_and_dims.append((time_coord, None))
# Create x and y coordinates
for axis in ["x", "y"]:
historic_coord_points = historic_forecast.coord(axis=axis).points
coord_point = np.median(historic_coord_points)
coord_bounds = [historic_coord_points[0],
historic_coord_points[-1]]
new_coord = historic_forecast.coord(axis=axis).copy(
points=coord_point, bounds=coord_bounds)
aux_coords_and_dims.append((new_coord, None))
attributes = {"diagnostic_standard_name": historic_forecast.name()}
for attribute in historic_forecast.attributes.keys():
if attribute.endswith("model_configuration"):
attributes[attribute] = (
historic_forecast.attributes[attribute])
cube = iris.cube.Cube(
optimised_coeffs, long_name="emos_coefficients", units="1",
dim_coords_and_dims=dim_coords_and_dims,
aux_coords_and_dims=aux_coords_and_dims, attributes=attributes)
return cube
[docs] def compute_initial_guess(
self, truth, forecast_predictor, predictor_of_mean_flag,
estimate_coefficients_from_linear_model_flag,
no_of_realizations=None):
"""
Function to compute initial guess of the alpha, beta, gamma
and delta components of the EMOS coefficients by linear regression
of the forecast predictor and the truth, if requested. Otherwise,
default values for the coefficients will be used.
If the predictor_of_mean_flag is "mean", then the order of
the initial_guess is [gamma, delta, alpha, beta]. Otherwise, if the
predictor_of_mean_flag is "realizations" then the order of the
initial_guess is [gamma, delta, alpha, beta0, beta1, beta2], where
the number of beta variables will correspond to the number of
realizations. In this example initial guess with three beta
variables, there will correspondingly be three realizations.
The coefficients relate to adjustments to the ensemble mean or the
ensemble realizations, and adjustments to the ensemble variance:
.. math::
alpha + beta * ensemble\\_mean
or
.. math::
alpha + beta0 * realization0 + beta1 * realization1
.. math::
gamma + delta * ensemble\\_variance
The default values for the initial guesses are in
[gamma, delta, alpha, beta] ordering:
* For the ensemble mean, the default initial guess: [0, 1, 0, 1]
assumes that the raw forecast is skilful and the expected adjustments
are small.
* For the ensemble realizations, the default initial guess is
effectively: [0, 1, 0, 1/3., 1/3., 1/3.], such that
each realization is assumed to have equal weight.
If linear regression is enabled, the alpha and beta coefficients
associated with the ensemble mean or ensemble realizations are
modified based on the results from the linear regression fit.
Args:
truth (iris.cube.Cube):
Cube containing the field, which will be used as truth.
forecast_predictor (iris.cube.Cube):
Cube containing the fields to be used as the predictor,
either the ensemble mean or the ensemble realizations.
predictor_of_mean_flag (str):
String to specify the input to calculate the calibrated mean.
Currently the ensemble mean ("mean") and the ensemble
realizations ("realizations") are supported as the predictors.
estimate_coefficients_from_linear_model_flag (bool):
Flag whether coefficients should be estimated from
the linear regression, or static estimates should be used.
no_of_realizations (int):
Number of realizations, if ensemble realizations are to be
used as predictors. Default is None.
Returns:
list of float:
List of coefficients to be used as initial guess.
Order of coefficients is [gamma, delta, alpha, beta].
"""
if (predictor_of_mean_flag.lower() == "mean" and
not estimate_coefficients_from_linear_model_flag):
initial_guess = [0, 1, 0, 1]
elif (predictor_of_mean_flag.lower() == "realizations" and
not estimate_coefficients_from_linear_model_flag):
initial_guess = [0, 1, 0] + np.repeat(
np.sqrt(1. / no_of_realizations), no_of_realizations).tolist()
elif estimate_coefficients_from_linear_model_flag:
truth_flattened = flatten_ignoring_masked_data(truth.data)
if predictor_of_mean_flag.lower() == "mean":
forecast_predictor_flattened = flatten_ignoring_masked_data(
forecast_predictor.data)
if (truth_flattened.size == 0) or (
forecast_predictor_flattened.size == 0):
gradient, intercept = ([np.nan, np.nan])
else:
gradient, intercept, _, _, _ = (
stats.linregress(
forecast_predictor_flattened, truth_flattened))
initial_guess = [0, 1, intercept, gradient]
elif predictor_of_mean_flag.lower() == "realizations":
if self.statsmodels_found:
enforce_coordinate_ordering(
forecast_predictor, "realization")
forecast_predictor_flattened = (
flatten_ignoring_masked_data(
forecast_predictor.data,
preserve_leading_dimension=True))
val = self.sm.add_constant(forecast_predictor_flattened.T)
est = self.sm.OLS(truth_flattened, val).fit()
intercept = est.params[0]
gradient = est.params[1:]
initial_guess = [0, 1, intercept]+gradient.tolist()
else:
initial_guess = (
[0, 1, 0] +
np.repeat(np.sqrt(1./no_of_realizations),
no_of_realizations).tolist())
return np.array(initial_guess, dtype=np.float32)
[docs] @staticmethod
def _filter_non_matching_cubes(historic_forecast, truth):
"""
Provide filtering for the historic forecast and truth to make sure
that these contain matching validity times. This ensures that any
mismatch between the historic forecasts and truth is dealt with.
Args:
historic_forecast (iris.cube.Cube):
Cube of historic forecasts that potentially contains
a mismatch compared to the truth.
truth (iris.cube.Cube):
Cube of truth that potentially contains a mismatch
compared to the historic forecasts.
Returns:
(tuple): tuple containing:
**matching_historic_forecasts** (iris.cube.Cube):
Cube of historic forecasts where any mismatches with
the truth cube have been removed.
**matching_truths** (iris.cube.Cube):
Cube of truths where any mismatches with
the historic_forecasts cube have been removed.
Raises:
ValueError: The filtering has found no matches in validity time
between the historic forecasts and the truths.
"""
matching_historic_forecasts = iris.cube.CubeList([])
matching_truths = iris.cube.CubeList([])
for hf_slice in historic_forecast.slices_over("time"):
if hf_slice.coord("time").has_bounds():
point = iris_time_to_datetime(hf_slice.coord("time"),
point_or_bound="point")
bounds, = iris_time_to_datetime(
hf_slice.coord("time"), point_or_bound="bound")
coord_values = (
{"time": lambda cell: point[0] == cell.point and
bounds[0] == cell.bound[0] and
bounds[1] == cell.bound[1]})
else:
coord_values = (
{"time": iris_time_to_datetime(
hf_slice.coord("time"), point_or_bound="point")})
constr = iris.Constraint(coord_values=coord_values)
truth_slice = truth.extract(constr)
if truth_slice:
matching_historic_forecasts.append(hf_slice)
matching_truths.append(truth_slice)
if not matching_historic_forecasts and not matching_truths:
msg = ("The filtering has found no matches in validity time "
"between the historic forecasts and the truths.")
raise ValueError(msg)
return (matching_historic_forecasts.merge_cube(),
matching_truths.merge_cube())
[docs] @staticmethod
def mask_cube(cube, landsea_mask):
"""
Mask the input cube using the given landsea_mask. Sea points are
filled with nans and masked.
Args:
cube (iris.cube.Cube):
A cube to be masked, on the same grid as the landsea_mask.
The last two dimensions on this cube must match the dimensions
in the landsea_mask cube.
landsea_mask(iris.cube.Cube):
A cube containing a land-sea mask. Within the
land-sea mask cube land points should be specified as ones,
and sea points as zeros.
Raises:
IndexError: if the cube and landsea_mask shapes are not compatible.
"""
try:
cube.data[..., ~landsea_mask.data.astype(np.bool)] = np.nan
except IndexError as err:
msg = (
"Cube and landsea_mask shapes are not compatible. {}".format(
err))
raise IndexError(msg)
else:
cube.data = np.ma.masked_invalid(cube.data)
[docs] def process(self, historic_forecast, truth, landsea_mask=None):
"""
Using Nonhomogeneous Gaussian Regression/Ensemble Model Output
Statistics, estimate the required coefficients from historical
forecasts.
The main contents of this method is:
1. Check that the predictor_of_mean_flag is valid.
2. Filter the historic forecasts and truth to ensure that these
inputs match in validity time.
3. Apply unit conversion to ensure that the historic forecasts and
truth have the desired units for calibration.
4. Calculate the variance of the historic forecasts. If the chosen
predictor is the mean, also calculate the mean of the historic
forecasts.
5. If a land-sea mask is provided then mask out sea points in the truth
and predictor from the historic forecasts.
6. Calculate initial guess at coefficient values by performing a
linear regression, if requested, otherwise default values are
used.
7. Perform minimisation.
Args:
historic_forecast (iris.cube.Cube):
The cube containing the historical forecasts used
for calibration.
truth (iris.cube.Cube):
The cube containing the truth used for calibration.
landsea_mask (iris.cube.Cube):
The optional cube containing a land-sea mask. If provided, only
land points are used to calculate the coefficients. Within the
land-sea mask cube land points should be specified as ones,
and sea points as zeros.
Returns:
iris.cube.Cube:
Cube containing the coefficients estimated using EMOS.
The cube contains a coefficient_index dimension coordinate
and a coefficient_name auxiliary coordinate.
Raises:
ValueError: If either the historic_forecast or truth cubes were not
passed in.
ValueError: If the units of the historic and truth cubes do not
match.
"""
if not (historic_forecast and truth):
raise ValueError("historic_forecast and truth cubes must be "
"provided.")
# Ensure predictor_of_mean_flag is valid.
check_predictor_of_mean_flag(self.predictor_of_mean_flag)
historic_forecast, truth = (
self._filter_non_matching_cubes(historic_forecast, truth))
# Make sure inputs have the same units.
if self.desired_units:
historic_forecast.convert_units(self.desired_units)
truth.convert_units(self.desired_units)
if historic_forecast.units != truth.units:
msg = ("The historic forecast units of {} do not match "
"the truth units {}. These units must match, so that "
"the coefficients can be estimated.")
raise ValueError(msg)
if self.predictor_of_mean_flag.lower() == "mean":
no_of_realizations = None
forecast_predictor = historic_forecast.collapsed(
"realization", iris.analysis.MEAN)
elif self.predictor_of_mean_flag.lower() == "realizations":
no_of_realizations = len(
historic_forecast.coord("realization").points)
forecast_predictor = historic_forecast
forecast_var = historic_forecast.collapsed(
"realization", iris.analysis.VARIANCE)
# If a landsea_mask is provided mask out the sea points
if landsea_mask:
self.mask_cube(forecast_predictor, landsea_mask)
self.mask_cube(forecast_var, landsea_mask)
self.mask_cube(truth, landsea_mask)
# Computing initial guess for EMOS coefficients
initial_guess = self.compute_initial_guess(
truth, forecast_predictor, self.predictor_of_mean_flag,
self.ESTIMATE_COEFFICIENTS_FROM_LINEAR_MODEL_FLAG,
no_of_realizations=no_of_realizations)
# Calculate coefficients if there are no nans in the initial guess.
if np.any(np.isnan(initial_guess)):
optimised_coeffs = initial_guess
else:
optimised_coeffs = (
self.minimiser.process(
initial_guess, forecast_predictor,
truth, forecast_var,
self.predictor_of_mean_flag,
self.distribution.lower()))
coefficients_cube = (
self.create_coefficients_cube(optimised_coeffs, historic_forecast))
return coefficients_cube
[docs]class ApplyCoefficientsFromEnsembleCalibration(BasePlugin):
"""
Class to apply the optimised EMOS coefficients to the current forecast.
"""
[docs] def __init__(self, predictor="mean"):
"""
Create a plugin that uses the coefficients created using EMOS from
historical forecasts and corresponding truths and applies these
coefficients to the current forecast to generate a location and scale
parameter that represents the calibrated distribution.
Args:
predictor (str):
String to specify the form of the predictor used to calculate
the location parameter when estimating the EMOS coefficients.
Currently the ensemble mean ("mean") and the ensemble
realizations ("realizations") are supported as the predictors.
"""
check_predictor_of_mean_flag(predictor)
self.predictor = predictor
def __repr__(self):
"""Represent the configured plugin instance as a string."""
result = ('<ApplyCoefficientsFromEnsembleCalibration: predictor: {}>')
return result.format(self.predictor)
[docs] @staticmethod
def _merge_calibrated_and_uncalibrated_regions(
original_data, calibrated_data, mask):
"""
If a mask has been provided to this plugin, this function acts to
combine calibrated data and uncalibrated data. Those regions where the
mask=0 will be populated with uncalibrated data. Those regions where
the mask=1 will retain calibrated data. The calibrated data cube will
be modified in situ.
Note that this can be achieved straightforwardly with fancy indexing
but there is a need to slice the data to avoid overflowing available
memory.
Args:
original_data (numpy.ndarray):
The uncalibrated predictor or variance that will populate
regions in which the mask=0.
calibrated_data (numpy.ndarray):
The calibrated predictor or variance data array that will be
modified in situ. Those regions of the array that correspond
with indices at which the mask=0 will be replaced with data
from the original_data array.
mask (numpy.ndarray):
A mask determining which regions should be returned with
calibrated data (1) and which regions should be returned with
uncalibrated data (0).
"""
mask = np.broadcast_to(mask, calibrated_data.shape)
all_indices = np.split(mask == 0, mask.shape[-1], axis=-1)
original_data = np.split(original_data, original_data.shape[-1],
axis=-1)
calibrated_data = np.split(calibrated_data, calibrated_data.shape[-1],
axis=-1)
iterator = zip(original_data, calibrated_data, all_indices)
for original, calibrated, indices in iterator:
calibrated[indices] = original[indices]
calibrated_data = np.squeeze(np.stack(calibrated_data, axis=-1))
[docs] def _spatial_domain_match(self):
"""
Check that the domain of the current forecast and coefficients cube
match.
Raises:
ValueError: If the domain information of the current_forecast and
coefficients_cube do not match.
"""
msg = ("The domain along the {} axis given by the current forecast {} "
"does not match the domain given by the coefficients cube {}.")
for axis in ["x", "y"]:
current_forecast_points = [
self.current_forecast.coord(axis=axis).points[0],
self.current_forecast.coord(axis=axis).points[-1]]
if not np.allclose(current_forecast_points,
self.coefficients_cube.coord(axis=axis).bounds):
raise ValueError(
msg.format(axis, current_forecast_points,
self.coefficients_cube.coord(axis=axis).bounds))
[docs] def _calculate_location_parameter_from_mean(self, optimised_coeffs):
"""
Function to calculate the location parameter when the ensemble mean at
each grid point is the predictor.
Please see the equations below:
.. math::
location\\_parameter = a + b \\times ensemble\\_mean
.. math::
a = \\alpha
.. math::
b = \\beta
Args:
optimised_coeffs (dict):
A dictionary containing the calibration coefficient names as
keys with their corresponding values.
Returns:
numpy.ndarray:
Location parameter calculated using the ensemble mean as the
predictor.
"""
forecast_predictor = self.current_forecast.collapsed(
"realization", iris.analysis.MEAN)
# Calculate location parameter = a + b*X, where X is the
# raw ensemble mean. In this case, b = beta.
location_parameter = (
optimised_coeffs["alpha"] +
optimised_coeffs["beta"] * forecast_predictor.data).astype(
np.float32)
return location_parameter
[docs] def _calculate_location_parameter_from_realizations(
self, optimised_coeffs):
"""
Function to calculate the location parameter when the ensemble
realizations are the predictor.
Please see the equations below:
.. math::
location\\_parameter = a + b_0 \\times realization_0 +
... + b_n \\times realization_n
.. math::
a = \\alpha
.. math::
b = \\beta^2
Args:
optimised_coeffs (dict):
A dictionary containing the calibration coefficient names as
keys with their corresponding values.
Returns:
numpy.ndarray:
Location parameter calculated using the ensemble realizations
as the predictor.
"""
forecast_predictor = self.current_forecast
# Calculate location parameter = a + b1*X1 .... + bn*Xn, where X is the
# ensemble realizations. The number of b and X terms depends upon the
# number of ensemble realizations. In this case, b = beta^2.
beta_values = np.array([], dtype=np.float32)
for key in optimised_coeffs.keys():
if key.startswith("beta"):
beta_values = np.append(beta_values, optimised_coeffs[key])
a_and_b = np.append(optimised_coeffs["alpha"], beta_values**2)
forecast_predictor_flat = (
convert_cube_data_to_2d(forecast_predictor))
xy_shape = next(forecast_predictor.slices_over("realization")).shape
col_of_ones = np.ones(np.prod(xy_shape), dtype=np.float32)
ones_and_predictor = (
np.column_stack((col_of_ones, forecast_predictor_flat)))
location_parameter = (
np.dot(ones_and_predictor, a_and_b).reshape(xy_shape).astype(
np.float32))
return location_parameter
[docs] def _calculate_scale_parameter(self, optimised_coeffs):
"""
Calculation of the scale parameter using the ensemble variance
adjusted using the gamma and delta coefficients calculated by EMOS.
This follows the equations below, where :math:`S^2` is the ensemble
variance and :math:`\\sigma^2` is the scale parameter:
.. math::
\\sigma^2 = c + d \\times S^2
.. math::
c = \\gamma^2
.. math::
d = \\delta^2
Args:
optimised_coeffs (dict):
A dictionary containing the calibration coefficient names as
keys with their corresponding values.
Returns:
numpy.ndarray:
Scale parameter for defining the distribution of the calibrated
forecast.
"""
forecast_var = self.current_forecast.collapsed(
"realization", iris.analysis.VARIANCE)
# Calculating the scale parameter, based on the raw variance S^2,
# where predicted variance = c + dS^2, where c = (gamma)^2 and
# d = (delta)^2
scale_parameter = (
optimised_coeffs["gamma"]**2 +
optimised_coeffs["delta"]**2 * forecast_var.data).astype(
np.float32)
return scale_parameter
[docs] def _create_output_cubes(
self, location_parameter, scale_parameter):
"""
Creation of output cubes containing the location and scale parameters.
Args:
location_parameter (numpy.ndarray):
Location parameter of the calibrated distribution.
scale_parameter (numpy.ndarray):
Scale parameter of the calibrated distribution.
Returns:
(tuple): tuple containing:
**location_parameter_cube** (iris.cube.Cube):
Location parameter of the calibrated distribution with
associated metadata.
**scale_parameter_cube** (iris.cube.Cube):
Scale parameter of the calibrated distribution with
associated metadata.
"""
template_cube = next(self.current_forecast.slices_over("realization"))
template_cube.remove_coord("realization")
location_parameter_cube = create_new_diagnostic_cube(
"location_parameter", template_cube.units, template_cube,
template_cube.attributes, data=location_parameter)
scale_parameter_cube = create_new_diagnostic_cube(
"scale_parameter", f"({template_cube.units})^2",
template_cube, template_cube.attributes, data=scale_parameter)
return location_parameter_cube, scale_parameter_cube
[docs] def process(self, current_forecast, coefficients_cube, landsea_mask=None):
"""
Apply the EMOS coefficients to the current forecast, in order to
generate location and scale parameters for creating the calibrated
distribution.
Args:
current_forecast (iris.cube.Cube):
The cube containing the current forecast.
coefficients_cube (iris.cube.Cube):
Cube containing the coefficients estimated using EMOS.
The cube contains a coefficient_index dimension coordinate
where the points of the coordinate are integer values and a
coefficient_name auxiliary coordinate where the points of
the coordinate are e.g. gamma, delta, alpha, beta.
landsea_mask (iris.cube.Cube or None):
The optional cube containing a land-sea mask. If provided, only
land points are calibrated using the provided coefficients.
Returns:
(tuple): tuple containing:
**location_parameter_cube** (iris.cube.Cube):
Cube containing the location parameter of the calibrated
distribution calculated using either the ensemble mean or
the ensemble realizations.
**scale_parameter_cube** (iris.cube.Cube):
Cube containing the scale parameter of the calibrated
distribution calculated using either the ensemble mean or
the ensemble realizations.
"""
self.current_forecast = current_forecast
self.coefficients_cube = coefficients_cube
# Check coefficients_cube and forecast cube are compatible.
time_coords_match(self.current_forecast, self.coefficients_cube)
self._spatial_domain_match()
optimised_coeffs = (
dict(zip(self.coefficients_cube.coord("coefficient_name").points,
self.coefficients_cube.data)))
if self.predictor.lower() == "mean":
location_parameter = (
self._calculate_location_parameter_from_mean(optimised_coeffs))
else:
location_parameter = (
self._calculate_location_parameter_from_realizations(
optimised_coeffs))
scale_parameter = self._calculate_scale_parameter(optimised_coeffs)
location_parameter_cube, scale_parameter_cube = (
self._create_output_cubes(location_parameter, scale_parameter))
# Use a mask to confine calibration to regions in which the mask=1.
if landsea_mask:
# Assume that the ensemble mean and the ensemble variance provide
# an estimate of the uncalibrated location and scale parameter.
# TODO: Improve the handling of the uncalibrated region, so that
# the ensemble mean and ensemble variance are not assumed to be
# the location and scale parameter, respectively.
uncalibrated_location_parameter = (
self.current_forecast.collapsed(
"realization", iris.analysis.MEAN))
uncalibrated_scale_parameter = (
self.current_forecast.collapsed(
"realization", iris.analysis.VARIANCE))
self._merge_calibrated_and_uncalibrated_regions(
uncalibrated_location_parameter.data,
location_parameter_cube.data,
landsea_mask.data)
self._merge_calibrated_and_uncalibrated_regions(
uncalibrated_scale_parameter.data,
scale_parameter_cube.data,
landsea_mask.data)
return location_parameter_cube, scale_parameter_cube