Source code for improver.nbhood.recursive_filter

# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# (C) British Crown Copyright 2017-2019 Met Office.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
#   list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
#   this list of conditions and the following disclaimer in the documentation
#   and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
#   contributors may be used to endorse or promote products derived from
#   this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""Module to apply a recursive filter to neighbourhooded data."""
import warnings

import iris
import numpy as np

from improver import BasePlugin
from improver.nbhood.square_kernel import SquareNeighbourhood
from improver.utilities.cube_checker import check_cube_coordinates
from improver.utilities.pad_spatial import (
    pad_cube_with_halo, remove_halo_from_cube)


[docs]class RecursiveFilter(BasePlugin): """ Apply a recursive filter to the input cube. """
[docs] def __init__(self, alpha_x=None, alpha_y=None, iterations=None, edge_width=1, re_mask=False): """ Initialise the class. The alpha values determine how much "value" of a cell undergoing filtering is comprised of the current value at that cell and how much comes from the adjacent cell preceding it in the direction in which filtering is being applied. Args: alpha_x (float or None): Filter parameter: A constant used to weight the recursive filter along the x-axis. Defined such that 0 < alpha_x < 1.0 alpha_y (float or None): Filter parameter: A constant used to weight the recursive filter along the y-axis. Defined such that 0 < alpha_y < 1.0 iterations (int or None): The number of iterations of the recursive filter. edge_width (int): Half the width of the padding halo applied before recursive filtering. re_mask (bool): If re_mask is True, the original un-recursively filtered mask is applied to mask out the recursively filtered cube. If re_mask is False, the original un-recursively filtered mask is not applied. Therefore, the recursive filtering may result in values being present in areas that were originally masked. Raises: ValueError: If alpha_x is not set such that 0 < alpha_x <= 0.5 ValueError: If alpha_y is not set such that 0 < alpha_y <= 0.5 ValueError: If number of iterations is not None and is set such that iterations is less than 1. Warns: UserWarning: If iterations is higher than 2. """ alpha_error = ("alpha must be less than 0.5. A large alpha value" "leads to poor conservation of probabilities: ") for k, alpha in {'x': alpha_x, 'y': alpha_y}.items(): if alpha is not None and not 0 < alpha <= 0.5: message = alpha_error if alpha > 0.5 else '' message += "Invalid alpha_{}: must be > 0 and <= 0.5: {}" raise ValueError(message.format(k, alpha)) if iterations is not None: if iterations < 1: raise ValueError( "Invalid number of iterations: must be >= 1: {}".format( iterations)) if iterations > 2: warnings.warn( "More than two iterations degrades the conservation" "of probability assumption.") self.alpha_x = alpha_x self.alpha_y = alpha_y self.iterations = iterations self.edge_width = edge_width self.re_mask = re_mask
def __repr__(self): """Represent the configured plugin instance as a string.""" result = ('<RecursiveFilter: alpha_x: {}, alpha_y: {}, iterations: {},' ' edge_width: {}') return result.format(self.alpha_x, self.alpha_y, self.iterations, self.edge_width)
[docs] @staticmethod def _recurse_forward(grid, alphas, axis): """ Method to run the recursive filter in the forward direction. In the forward direction: Recursive filtering is calculated as: Bi = ((1-alpha) * Ai) + (alpha * Bi-1) Progressing from gridpoint i-1 to i: Bi = new value at gridpoint i, Ai = Old value at gridpoint i Bi-1 = New value at gridpoint i-1 Args: grid (numpy.ndarray): 2D array containing the input data to which the recursive filter will be applied. alphas (numpy.ndarray): Matching 2D array of alpha values that will be used when applying the recursive filter along the specified axis. axis (int): Index of the spatial axis (0 or 1) over which to recurse. Returns: numpy.ndarray: 2D array containing the smoothed field after the recursive filter method has been applied to the input array in the forward direction along the specified axis. """ lim = grid.shape[axis] for i in range(1, lim): if axis == 0: grid[i, :] = ((1. - alphas[i, :]) * grid[i, :] + alphas[i, :] * grid[i-1, :]) if axis == 1: grid[:, i] = ((1. - alphas[:, i]) * grid[:, i] + alphas[:, i] * grid[:, i-1]) return grid
[docs] @staticmethod def _recurse_backward(grid, alphas, axis): """ Method to run the recursive filter in the backwards direction. In the backwards direction: Recursive filtering is calculated as: Bi = ((1-alpha) * Ai) + (alpha * Bi+1) Progressing from gridpoint i+1 to i:. Bi = new value at gridpoint i, Ai = Old value at gridpoint i Bi+1 = New value at gridpoint i+1 Args: grid (numpy.ndarray): 2D array containing the input data to which the recursive filter will be applied. alphas (numpy.ndarray): Matching 2D array of alpha values that will be used when applying the recursive filter along the specified axis. axis (int): Index of the spatial axis (0 or 1) over which to recurse. Returns: numpy.ndarray: 2D array containing the smoothed field after the recursive filter method has been applied to the input array in the backwards direction along the specified axis. """ lim = grid.shape[axis] for i in range(lim-2, -1, -1): if axis == 0: grid[i, :] = ((1. - alphas[i, :]) * grid[i, :] + alphas[i, :] * grid[i+1, :]) if axis == 1: grid[:, i] = ((1. - alphas[:, i]) * grid[:, i] + alphas[:, i] * grid[:, i+1]) return grid
[docs] @staticmethod def _run_recursion(cube, alphas_x, alphas_y, iterations): """ Method to run the recursive filter. Args: cube (iris.cube.Cube): 2D cube containing the input data to which the recursive filter will be applied. alphas_x (iris.cube.Cube): 2D cube containing array of alpha values that will be used when applying the recursive filter along the x-axis. alphas_y (iris.cube.Cube): 2D cube containing array of alpha values that will be used when applying the recursive filter along the y-axis. iterations (int): The number of iterations of the recursive filter Returns: iris.cube.Cube: Cube containing the smoothed field after the recursive filter method has been applied to the input cube. """ x_index, = cube.coord_dims(cube.coord(axis="x").name()) y_index, = cube.coord_dims(cube.coord(axis="y").name()) output = cube.data for _ in range(iterations): output = RecursiveFilter._recurse_forward(output, alphas_x.data, x_index) output = RecursiveFilter._recurse_backward(output, alphas_x.data, x_index) output = RecursiveFilter._recurse_forward(output, alphas_y.data, y_index) output = RecursiveFilter._recurse_backward(output, alphas_y.data, y_index) cube.data = output return cube
[docs] def _set_alphas(self, cube, alpha, alphas_cube): """ Set up the alpha parameter. Args: cube (iris.cube.Cube): 2D cube containing the input data to which the recursive filter will be applied. alpha (float): The constant used to weight the recursive filter in that direction: Defined such that 0.0 < alpha < 1.0 alphas_cube (iris.cube.Cube or None): Cube containing array of alpha values that will be used when applying the recursive filter in a specific direction. Raises: ValueError: If both alphas_cube and alpha are provided. ValueError: If alpha and alphas_cube are both set to None ValueError: If dimension of alphas array is less than dimension of data array ValueError: If dimension of alphas array is greater than dimension of data array Returns: iris.cube.Cube: Cube containing a padded array of alpha values for the specified direction. """ if alpha is not None and alphas_cube is not None: emsg = ("A cube of alpha values and a single float value for alpha" " have both been provided. Only one of these options can" " be set.") raise ValueError(emsg) if alphas_cube is None: if alpha is None: emsg = ("A value for alpha must be set if alphas_cube is " "set to None: alpha is currently set as: {}") raise ValueError(emsg.format(alpha)) alphas_cube = cube.copy( data=np.ones(cube.data.shape) * alpha) if alphas_cube is not None: if alphas_cube.data.shape != cube.data.shape: emsg = ("Dimensions of alphas array do not match dimensions " "of data array: {} < {}") raise ValueError(emsg.format(alphas_cube.data.shape, cube.data.shape)) alphas_cube = pad_cube_with_halo( alphas_cube, 2*self.edge_width, 2*self.edge_width) return alphas_cube
[docs] def process(self, cube, alphas_x=None, alphas_y=None, mask_cube=None): """ Set up the alpha parameters and run the recursive filter. The steps undertaken are: 1. Split the input cube into slices determined by the co-ordinates in the x and y directions. 2. Construct an array of filter parameters (alphas_x and alphas_y) for each cube slice that are used to weight the recursive filter in the x- and y-directions. 3. Pad each cube slice with a square-neighbourhood halo and apply the recursive filter for the required number of iterations. 4. Remove the halo from the cube slice and append the recursed cube slice to a 'recursed cube'. 5. Merge all the cube slices in the 'recursed cube' into a 'new cube'. 6. Modify the 'new cube' so that its scalar dimension co-ordinates are consistent with those in the original input cube. 7. Return the 'new cube' which now contains the recursively filtered values for the original input cube. Args: cube (iris.cube.Cube): Cube containing the input data to which the recursive filter will be applied. alphas_x (iris.cube.Cube or None): Cube containing array of alpha values that will be used when applying the recursive filter along the x-axis. alphas_y (iris.cube.Cube or None): Cube containing array of alpha values that will be used when applying the recursive filter along the y-axis. mask_cube (iris.cube.Cube or None): Cube containing an external mask to apply to the cube before applying the recursive filter. Returns: iris.cube.Cube: Cube containing the smoothed field after the recursive filter method has been applied. Raises: ValueError: If any alpha cube value is over 0.5 """ for alpha in (alphas_x, alphas_y): if alpha is not None and (alpha.data > 0.5).any(): raise ValueError( "All alpha values must be less than 0.5. A large alpha" "value leads to poor conservation of probabilities") cube_format = next(cube.slices([cube.coord(axis='y'), cube.coord(axis='x')])) alphas_x = self._set_alphas(cube_format, self.alpha_x, alphas_x) alphas_y = self._set_alphas(cube_format, self.alpha_y, alphas_y) recursed_cube = iris.cube.CubeList() for output in cube.slices([cube.coord(axis='y'), cube.coord(axis='x')]): # Setup cube and mask for processing. # This should set up a mask full of 1.0 if None is provided # and set the data 0.0 where mask is 0.0 or the data is NaN output, mask, nan_array = ( SquareNeighbourhood().set_up_cubes_to_be_neighbourhooded( output, mask_cube)) mask = mask.data.squeeze() padded_cube = pad_cube_with_halo( output, 2*self.edge_width, 2*self.edge_width) new_cube = self._run_recursion(padded_cube, alphas_x, alphas_y, self.iterations) new_cube = remove_halo_from_cube( new_cube, 2*self.edge_width, 2*self.edge_width) if self.re_mask: new_cube.data[nan_array.astype(bool)] = np.nan new_cube.data = np.ma.masked_array(new_cube.data, mask=np.logical_not(mask)) recursed_cube.append(new_cube) new_cube = recursed_cube.merge_cube() new_cube = check_cube_coordinates(cube, new_cube) return new_cube