# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# (C) British Crown Copyright 2017-2019 Met Office.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
""" Provides support utilities."""
import copy
import cartopy.crs as ccrs
import iris
import numpy as np
import scipy.ndimage
from iris.coords import CellMethod
from iris.cube import Cube, CubeList
from improver import BasePlugin
from improver.utilities.cube_checker import check_cube_coordinates
# Maximum radius of the neighbourhood width in grid cells.
MAX_DISTANCE_IN_GRID_CELLS = 500
[docs]def check_if_grid_is_equal_area(cube, require_equal_xy_spacing=True):
"""
Identify whether the grid is an equal area grid, by checking whether points
are equally spaced along each of the x- and y-axes. By default this
function also checks whether the grid spacing is the same in both spatial
dimensions.
Args:
cube (iris.cube.Cube):
Cube with coordinates that will be checked.
require_equal_spacing (bool):
Flag to require the grid is equally spaced in the two spatial
dimensions (not strictly required for equal-area criterion).
Raises:
ValueError: If coordinate points are not equally spaced along either
axis (from calculate_grid_spacing)
ValueError: If point spacing is not equal for the two spatial axes
"""
xdiff = calculate_grid_spacing(cube, 'metres', axis='x')
ydiff = calculate_grid_spacing(cube, 'metres', axis='y')
if require_equal_xy_spacing and not np.isclose(xdiff, ydiff):
raise ValueError(
"Grid does not have equal spacing in x and y dimensions")
[docs]def calculate_grid_spacing(cube, units, axis='x'):
"""
Returns the grid spacing of a given spatial axis
Args:
cube (iris.cube.Cube):
Cube of data on equal area grid
units (str or cf_units.Unit):
Unit in which the grid spacing is required
axis (str):
Axis ('x' or 'y') to use in determining grid spacing
Returns:
float:
Grid spacing in required unit
Raises:
ValueError: If points are not equally spaced
"""
coord = cube.coord(axis=axis).copy()
coord.convert_units(units)
diffs = np.unique(np.diff(coord.points))
if len(diffs) > 1:
raise ValueError(
'Coordinate {} points are not equally spaced'.format(coord.name()))
return diffs[0]
[docs]def convert_distance_into_number_of_grid_cells(
cube, distance, axis='x', max_distance_in_grid_cells=None,
int_grid_cells=True):
"""
Return the number of grid cells in the x and y direction based on the
input distance in metres. Requires an equal-area grid on which the spacing
is equal in the x- and y- directions.
Args:
cube (iris.cube.Cube):
Cube containing the x and y coordinates, which will be used for
calculating the number of grid cells in the x and y direction,
which equates to the requested distance in the x and y direction.
distance (float):
Distance in metres. Must be positive.
axis (str):
Axis ('x' or 'y') to use in determining grid spacing
max_distance_in_grid_cells (int or None):
Maximum distance in grid cells. Defaults to None, which bypasses
the check.
int_grid_cells (bool):
If true only integer number of grid_cells are returned, rounded
down. If false the number of grid_cells returned will be a float.
Returns:
int or float:
Number of grid cells along the specified (x or y) axis equal to the
requested distance in metres.
Raises:
ValueError: If a negative distance is provided
ValueError: If the projection is not equal-area
ValueError:
If the distance in grid cells is larger than the maximum dimension
of the rectangular domain (measured across the diagonal). Needed
for neighbourhood processing.
ValueError: If the distance in grid cells is zero.
Value Error:
If max_distance_in_grid_cells is set and the distance in grid cells
exceeds this value. Needed for neighbourhood processing.
"""
d_error = "Distance of {}m".format(distance)
zero_distance_error = ("{} gives zero cell extent".format(d_error))
if distance == 0:
raise ValueError(zero_distance_error)
if distance < 0:
raise ValueError("Please specify a positive distance in metres")
# calculate grid spacing along chosen axis
grid_spacing_metres = calculate_grid_spacing(cube, 'metres', axis=axis)
# check required distance isn't greater than the size of the domain
# (note: this implicitly assumes equal x- and y-spacing)
def calculate_domain_extent(coord):
"""Calculates the coordinate extent in metres"""
new_coord = coord.copy()
new_coord.convert_units('metres')
return max(new_coord.points) - min(new_coord.points)
x_extent_metres = calculate_domain_extent(cube.coord(axis='x'))
y_extent_metres = calculate_domain_extent(cube.coord(axis='y'))
max_distance_of_domain = np.sqrt(x_extent_metres**2 + y_extent_metres**2)
if distance > max_distance_of_domain:
raise ValueError(
"{} exceeds max domain distance of {}m".format(
d_error, max_distance_of_domain))
# calculate distance in grid squares
grid_cells = distance / abs(grid_spacing_metres)
if int_grid_cells:
grid_cells = int(grid_cells)
if grid_cells == 0:
raise ValueError(zero_distance_error)
if max_distance_in_grid_cells is not None:
if grid_cells > max_distance_in_grid_cells:
raise ValueError(
"{} exceeds maximum permitted grid cell extent".format(
d_error))
return grid_cells
[docs]def convert_number_of_grid_cells_into_distance(cube, grid_points):
"""
Calculate distance in metres equal to the given number of gridpoints
based on the coordinates on an input cube.
Args:
cube (iris.cube.Cube):
Cube for which the distance is to be calculated.
grid_points (int):
Number of grid points to convert.
Returns:
float:
The radius in metres.
"""
check_if_grid_is_equal_area(cube, require_equal_xy_spacing=True)
spacing = calculate_grid_spacing(cube, 'metres')
radius_in_metres = spacing * grid_points
return radius_in_metres
[docs]class DifferenceBetweenAdjacentGridSquares(BasePlugin):
"""
Calculate the difference between adjacent grid squares within
a cube. The difference is calculated along the x and y axis
individually.
"""
[docs] def __init__(self, gradient=False):
"""
Initialise class.
"""
self.is_gradient = gradient
[docs] def create_difference_cube(self, cube, coord_name, diff_along_axis):
"""
Put the difference array into a cube with the appropriate
metadata.
Args:
cube (iris.cube.Cube):
Cube from which the differences have been calculated.
coord_name (str):
The name of the coordinate over which the difference
have been calculated.
diff_along_axis (numpy.ndarray):
Array containing the differences.
Returns:
iris.cube.Cube:
Cube containing the differences calculated along the
specified axis.
"""
points = cube.coord(coord_name).points
mean_points = (points[1:] + points[:-1]) / 2
# Copy cube metadata and coordinates into a new cube.
# Create a new coordinate for the coordinate along which the
# difference has been calculated.
metadata_dict = copy.deepcopy(cube.metadata._asdict())
diff_cube = Cube(diff_along_axis, **metadata_dict)
for coord in cube.dim_coords:
dims = cube.coord_dims(coord)
if coord.name() in [coord_name]:
coord = coord.copy(points=mean_points)
diff_cube.add_dim_coord(coord.copy(), dims)
for coord in cube.aux_coords:
dims = cube.coord_dims(coord)
diff_cube.add_aux_coord(coord.copy(), dims)
for coord in cube.derived_coords:
dims = cube.coord_dims(coord)
diff_cube.add_aux_coord(coord.copy(), dims)
# Add metadata to indicate that a difference has been calculated.
# TODO: update metadata for difference and add metadata for gradient
# when proper conventions have been agreed upon.
if not self.is_gradient:
cell_method = CellMethod("difference", coords=[coord_name],
intervals='1 grid length')
diff_cube.add_cell_method(cell_method)
diff_cube.attributes["form_of_difference"] = (
"forward_difference")
diff_cube.rename('difference_of_' + cube.name())
return diff_cube
[docs] def calculate_difference(self, cube, coord_axis):
"""
Calculate the difference along the axis specified by the
coordinate.
Args:
cube (iris.cube.Cube):
Cube from which the differences will be calculated.
coord_axis (str):
Short-hand reference for the x or y coordinate, as allowed by
iris.util.guess_coord_axis.
Returns:
iris.cube.Cube:
Cube after the differences have been calculated along the
specified axis.
"""
coord_name = cube.coord(axis=coord_axis).name()
diff_axis = cube.coord_dims(coord_name)[0]
diff_along_axis = np.diff(cube.data, axis=diff_axis)
diff_cube = self.create_difference_cube(
cube, coord_name, diff_along_axis)
return diff_cube
[docs] @staticmethod
def gradient_from_diff(diff_cube, ref_cube, coord_axis):
"""
Calculate the gradient along the x or y axis from differences between
adjacent grid squares.
Args:
diff_cube (iris.cube.Cube):
Cube containing differences along the x or y axis
ref_cube (iris.cube.Cube):
Cube with correct output dimensions
coord_axis (str):
Short-hand reference for the x or y coordinate, as allowed by
iris.util.guess_coord_axis.
Returns:
iris.cube.Cube:
A cube of the gradients in the coordinate direction specified.
"""
grid_spacing = np.diff(diff_cube.coord(axis=coord_axis).points)[0]
gradient = diff_cube.copy(data=(diff_cube.data) / grid_spacing)
gradient = gradient.regrid(ref_cube, iris.analysis.Linear())
gradient.rename(diff_cube.name().replace('difference_', 'gradient_'))
return gradient
[docs] def process(self, cube):
"""
Calculate the difference along the x and y axes and return
the result in separate cubes. The difference along each axis is
calculated using numpy.diff.
Args:
cube (iris.cube.Cube):
Cube from which the differences will be calculated.
Returns:
(tuple): tuple containing:
**diff_along_y_cube** (iris.cube.Cube):
Cube after the differences have been calculated along the
y axis.
**diff_along_x_cube** (iris.cube.Cube):
Cube after the differences have been calculated along the
x axis.
"""
diff_along_y_cube = self.calculate_difference(cube, "y")
diff_along_x_cube = self.calculate_difference(cube, "x")
if self.is_gradient:
diff_along_y_cube = self.gradient_from_diff(diff_along_y_cube,
cube, "y")
diff_along_x_cube = self.gradient_from_diff(diff_along_x_cube,
cube, "x")
return diff_along_x_cube, diff_along_y_cube
[docs]class OccurrenceWithinVicinity:
"""Calculate whether a phenomenon occurs within the specified distance."""
[docs] def __init__(self, distance):
"""
Initialise the class.
Args:
distance (float):
Distance in metres used to define the vicinity within which to
search for an occurrence.
"""
self.distance = distance
def __repr__(self):
"""Represent the configured plugin instance as a string."""
result = ('<OccurrenceWithinVicinity: distance: {}>')
return result.format(self.distance)
[docs] def maximum_within_vicinity(self, cube):
"""
Find grid points where a phenomenon occurs within a defined distance.
The occurrences within this vicinity are maximised, such that all
grid points within the vicinity are recorded as having an occurrence.
For non-binary fields, if the vicinity of two occurrences overlap,
the maximum value within the vicinity is chosen.
Args:
cube (iris.cube.Cube):
Thresholded cube.
Returns:
iris.cube.Cube:
Cube where the occurrences have been spatially spread, so that
they're equally likely to have occurred anywhere within the
vicinity defined using the specified distance.
"""
grid_spacing = (
convert_distance_into_number_of_grid_cells(
cube, self.distance,
max_distance_in_grid_cells=MAX_DISTANCE_IN_GRID_CELLS))
# Convert the number of grid points (i.e. grid_spacing) represented
# by self.distance, e.g. where grid_spacing=1 is an increment to
# a central point, into grid_cells which is the total number of points
# within the defined vicinity along the y axis e.g grid_cells=3.
grid_cells = (2 * grid_spacing) + 1
max_cube = cube.copy()
unmasked_cube_data = cube.data.copy()
if np.ma.is_masked(cube.data):
unmasked_cube_data = cube.data.data.copy()
unmasked_cube_data[cube.data.mask] = np.nan
# The following command finds the maximum value for each grid point
# from within a square of length "size"
max_data = (
scipy.ndimage.filters.maximum_filter(unmasked_cube_data,
size=grid_cells))
if np.ma.is_masked(cube.data):
# Update only the unmasked values
max_cube.data.data[~cube.data.mask] = max_data[~cube.data.mask]
else:
max_cube.data = max_data
return max_cube
[docs] def process(self, cube):
"""
Ensure that the cube passed to the maximum_within_vicinity method is
2d and subsequently merged back together.
Args:
cube (iris.cube.Cube):
Thresholded cube.
Returns:
Iris.cube.Cube
Cube containing the occurrences within a vicinity for each
xy 2d slice, which have been merged back together.
"""
max_cubes = CubeList([])
for cube_slice in cube.slices([cube.coord(axis='y'),
cube.coord(axis='x')]):
max_cubes.append(self.maximum_within_vicinity(cube_slice))
result_cube = max_cubes.merge_cube()
# Put dimensions back if they were there before.
result_cube = check_cube_coordinates(cube, result_cube)
return result_cube
[docs]def lat_lon_determine(cube):
"""
Test whether a diagnostic cube is on a latitude/longitude grid or uses an
alternative projection.
Args:
cube (iris.cube.Cube):
A diagnostic cube to examine for coordinate system.
Returns:
cartopy.crs.CRS or None:
Coordinate system of the diagnostic cube in a cartopy format unless
it is already a latitude/longitude grid, in which case None is
returned.
"""
trg_crs = None
if (not cube.coord(axis='x').name() == 'longitude' or
not cube.coord(axis='y').name() == 'latitude'):
trg_crs = cube.coord_system().as_cartopy_crs()
return trg_crs