import itertools
import numba
import numpy as np
import scipy.sparse
from ..compatibility import range, zip, zip_longest
from ..utils import isscalar, equivalent, _zero_of_dtype
[docs]def elemwise(func, *args, **kwargs):
"""
Apply a function to any number of arguments.
Parameters
----------
func : Callable
The function to apply. Must support broadcasting.
args : tuple, optional
The arguments to the function. Can be :obj:`SparseArray` objects
or :obj:`scipy.sparse.spmatrix` objects.
kwargs : dict, optional
Any additional arguments to pass to the function.
Returns
-------
COO
The result of applying the function.
Raises
------
ValueError
If the operation would result in a dense matrix, or if the operands
don't have broadcastable shapes.
See Also
--------
:obj:`numpy.ufunc` : A similar Numpy construct. Note that any :code:`ufunc` can be used
as the :code:`func` input to this function.
Notes
-----
Previously, operations with Numpy arrays were sometimes supported. Now,
it is necessary to convert Numpy arrays to :obj:`COO` objects.
"""
return _Elemwise(func, *args, **kwargs).get_result()
@numba.jit(nopython=True, nogil=True)
def _match_arrays(a, b): # pragma: no cover
"""
Finds all indexes into a and b such that a[i] = b[j]. The outputs are sorted
in lexographical order.
Parameters
----------
a, b : np.ndarray
The input 1-D arrays to match. If matching of multiple fields is
needed, use np.recarrays. These two arrays must be sorted.
Returns
-------
a_idx, b_idx : np.ndarray
The output indices of every possible pair of matching elements.
"""
if len(a) == 0 or len(b) == 0:
return np.empty(0, dtype=np.uintp), np.empty(0, dtype=np.uintp)
a_ind, b_ind = [], []
nb = len(b)
ib = 0
match = 0
for ia, j in enumerate(a):
if j == b[match]:
ib = match
while ib < nb and j >= b[ib]:
if j == b[ib]:
a_ind.append(ia)
b_ind.append(ib)
if b[match] < b[ib]:
match = ib
ib += 1
return np.array(a_ind, dtype=np.uintp), np.array(b_ind, dtype=np.uintp)
def _get_nary_broadcast_shape(*shapes):
"""
Broadcast any number of shapes to a result shape.
Parameters
----------
shapes : tuple[tuple[int]]
The shapes to broadcast.
Returns
-------
tuple[int]
The output shape.
Raises
------
ValueError
If the input shapes cannot be broadcast to a single shape.
"""
result_shape = ()
for shape in shapes:
try:
result_shape = _get_broadcast_shape(shape, result_shape)
except ValueError:
shapes_str = ', '.join(str(shape) for shape in shapes)
raise ValueError('operands could not be broadcast together with shapes %s'
% shapes_str)
return result_shape
def _get_broadcast_shape(shape1, shape2, is_result=False):
"""
Get the overall broadcasted shape.
Parameters
----------
shape1, shape2 : tuple[int]
The input shapes to broadcast together.
is_result : bool
Whether or not shape2 is also the result shape.
Returns
-------
result_shape : tuple[int]
The overall shape of the result.
Raises
------
ValueError
If the two shapes cannot be broadcast together.
"""
# https://stackoverflow.com/a/47244284/774273
if not all((l1 == l2) or (l1 == 1) or ((l2 == 1) and not is_result) for l1, l2 in
zip(shape1[::-1], shape2[::-1])):
raise ValueError('operands could not be broadcast together with shapes %s, %s' %
(shape1, shape2))
result_shape = tuple(l1 if l1 != 1 else l2 for l1, l2 in
zip_longest(shape1[::-1], shape2[::-1], fillvalue=1))[::-1]
return result_shape
def _get_broadcast_parameters(shape, broadcast_shape):
"""
Get the broadcast parameters.
Parameters
----------
shape : tuple[int]
The input shape.
broadcast_shape
The shape to broadcast to.
Returns
-------
params : list
A list containing None if the dimension isn't in the original array, False if
it needs to be broadcast, and True if it doesn't.
"""
params = [None if l1 is None else l1 == l2 for l1, l2
in zip_longest(shape[::-1], broadcast_shape[::-1], fillvalue=None)][::-1]
return params
def _get_reduced_coords(coords, params):
"""
Gets only those dimensions of the coordinates that don't need to be broadcast.
Parameters
----------
coords : np.ndarray
The coordinates to reduce.
params : list
The params from which to check which dimensions to get.
Returns
-------
reduced_coords : np.ndarray
The reduced coordinates.
"""
reduced_params = [bool(param) for param in params]
return coords[reduced_params]
def _get_reduced_shape(shape, params):
"""
Gets only those dimensions of the coordinates that don't need to be broadcast.
Parameters
----------
coords : np.ndarray
The coordinates to reduce.
params : list
The params from which to check which dimensions to get.
Returns
-------
reduced_coords : np.ndarray
The reduced coordinates.
"""
reduced_shape = tuple(l for l, p in zip(shape, params) if p)
return reduced_shape
def _get_expanded_coords_data(coords, data, params, broadcast_shape):
"""
Expand coordinates/data to broadcast_shape. Does most of the heavy lifting for broadcast_to.
Produces sorted output for sorted inputs.
Parameters
----------
coords : np.ndarray
The coordinates to expand.
data : np.ndarray
The data corresponding to the coordinates.
params : list
The broadcast parameters.
broadcast_shape : tuple[int]
The shape to broadcast to.
Returns
-------
expanded_coords : np.ndarray
List of 1-D arrays. Each item in the list has one dimension of coordinates.
expanded_data : np.ndarray
The data corresponding to expanded_coords.
"""
first_dim = -1
expand_shapes = []
for d, p, l in zip(range(len(broadcast_shape)), params, broadcast_shape):
if p and first_dim == -1:
expand_shapes.append(coords.shape[1])
first_dim = d
if not p:
expand_shapes.append(l)
all_idx = _cartesian_product(*(np.arange(d, dtype=np.intp) for d in expand_shapes))
false_dim = 0
dim = 0
expanded_coords = np.empty((len(broadcast_shape), all_idx.shape[1]), dtype=np.intp)
expanded_data = data[all_idx[first_dim]]
for d, p, l in zip(range(len(broadcast_shape)), params, broadcast_shape):
if p:
expanded_coords[d] = coords[dim, all_idx[first_dim]]
else:
expanded_coords[d] = all_idx[false_dim + (d > first_dim)]
false_dim += 1
if p is not None:
dim += 1
return np.asarray(expanded_coords), np.asarray(expanded_data)
# (c) senderle
# Taken from https://stackoverflow.com/a/11146645/774273
# License: https://creativecommons.org/licenses/by-sa/3.0/
def _cartesian_product(*arrays):
"""
Get the cartesian product of a number of arrays.
Parameters
----------
arrays : Tuple[np.ndarray]
The arrays to get a cartesian product of. Always sorted with respect
to the original array.
Returns
-------
out : np.ndarray
The overall cartesian product of all the input arrays.
"""
broadcastable = np.ix_(*arrays)
broadcasted = np.broadcast_arrays(*broadcastable)
rows, cols = np.prod(broadcasted[0].shape), len(broadcasted)
dtype = np.result_type(*arrays)
out = np.empty(rows * cols, dtype=dtype)
start, end = 0, rows
for a in broadcasted:
out[start:end] = a.reshape(-1)
start, end = end, end + rows
return out.reshape(cols, rows)
def _get_matching_coords(coords, params):
"""
Get the matching coords across a number of broadcast operands.
Parameters
----------
coords : list[numpy.ndarray]
The input coordinates.
params : list[Union[bool, none]]
The broadcast parameters.
Returns
-------
numpy.ndarray
The broacasted coordinates
"""
matching_coords = []
dims = np.zeros(len(coords), dtype=np.uint8)
for p_all in zip(*params):
for i, p in enumerate(p_all):
if p:
matching_coords.append(coords[i][dims[i]])
break
else:
matching_coords.append(coords[dims[0]])
for i, p in enumerate(p_all):
if p is not None:
dims[i] += 1
return np.asarray(matching_coords, dtype=np.intp)
def broadcast_to(x, shape):
"""
Performs the equivalent of :obj:`numpy.broadcast_to` for :obj:`COO`. Note that
this function returns a new array instead of a view.
Parameters
----------
shape : tuple[int]
The shape to broadcast the data to.
Returns
-------
COO
The broadcasted sparse array.
Raises
------
ValueError
If the operand cannot be broadcast to the given shape.
See also
--------
:obj:`numpy.broadcast_to` : NumPy equivalent function
"""
from .core import COO
if shape == x.shape:
return x
result_shape = _get_broadcast_shape(x.shape, shape, is_result=True)
params = _get_broadcast_parameters(x.shape, result_shape)
coords, data = _get_expanded_coords_data(x.coords, x.data, params, result_shape)
# Check if all the non-broadcast axes are next to each other
nonbroadcast_idx = [idx for idx, p in enumerate(params) if p]
diff_nonbroadcast_idx = [a - b for a, b in zip(nonbroadcast_idx[1:], nonbroadcast_idx[:-1])]
sorted = all(d == 1 for d in diff_nonbroadcast_idx)
return COO(coords, data, shape=result_shape, has_duplicates=False,
sorted=sorted, fill_value=x.fill_value)
class _Elemwise(object):
def __init__(self, func, *args, **kwargs):
"""
Initialize the element-wise function calculator.
Parameters
----------
func : types.Callable
The function to compute
args : tuple[Union[SparseArray, ndarray, scipy.sparse.spmatrix]]
The arguments to compute the function on.
kwargs : dict
Extra arguments to pass to the function.
"""
from .core import COO
from ..sparse_array import SparseArray
processed_args = []
for arg in args:
if isinstance(arg, scipy.sparse.spmatrix):
processed_args.append(COO.from_scipy_sparse(arg))
elif isscalar(arg) or isinstance(arg, np.ndarray):
# Faster and more reliable to pass ()-shaped ndarrays as scalars.
processed_args.append(np.asarray(arg))
elif isinstance(arg, SparseArray) and not isinstance(arg, COO):
processed_args.append(COO(arg))
elif not isinstance(arg, COO):
self.args = None
return
else:
processed_args.append(arg)
self.args = tuple(processed_args)
self.func = func
self.dtype = kwargs.pop('dtype', None)
self.kwargs = kwargs
self.cache = {}
self._get_fill_value()
self._check_broadcast()
def get_result(self):
from .core import COO
if self.args is None:
return NotImplemented
if any(s == 0 for s in self.shape):
data = np.empty((0,), dtype=self.fill_value.dtype)
coords = np.empty((0, len(self.shape)), dtype=np.intp)
return COO(coords, data, shape=self.shape, has_duplicates=False, fill_value=self.fill_value)
data_list = []
coords_list = []
for mask in itertools.product(*[[True, False] if isinstance(arg, COO)
else [None] for arg in self.args]):
if not any(mask):
continue
r = self._get_func_coords_data(mask)
if r is not None:
coords_list.append(r[0])
data_list.append(r[1])
# Concatenate matches and mismatches
data = np.concatenate(data_list) if len(data_list) else np.empty((0,), dtype=self.fill_value.dtype)
coords = np.concatenate(coords_list, axis=1) if len(coords_list) else \
np.empty((0, len(self.shape)), dtype=np.intp)
return COO(coords, data, shape=self.shape, has_duplicates=False, fill_value=self.fill_value)
def _get_fill_value(self):
"""
A function that finds and returns the fill-value.
Raises
------
ValueError
If the fill-value is inconsistent.
"""
from .core import COO
zero_args = tuple(arg.fill_value[...] if isinstance(arg, COO) else arg for arg in self.args)
# Some elemwise functions require a dtype argument, some abhorr it.
try:
fill_value_array = self.func(*zero_args, dtype=self.dtype, **self.kwargs)
self.dtype = None
except TypeError:
fill_value_array = self.func(*zero_args, **self.kwargs)
try:
fill_value = fill_value_array[(0,) * fill_value_array.ndim]
except IndexError:
zero_args = tuple(
arg.fill_value if isinstance(arg, COO) else _zero_of_dtype(arg.dtype) for arg in self.args)
fill_value = self.func(*zero_args, **self.kwargs)[()]
if not equivalent(fill_value, fill_value_array).all():
raise ValueError('Inconsistent fill-values in the result array: operating on the ndarray with'
' fill-values produces inconsistent results.')
# Store dtype separately if needed.
if self.dtype is not None:
fill_value = fill_value.astype(self.dtype)
self.fill_value = fill_value
self.dtype = self.fill_value.dtype
def _check_broadcast(self):
"""
Checks if adding the ndarrays changes the broadcast shape.
Raises
------
ValueError
If the check fails.
"""
from .core import COO
full_shape = _get_nary_broadcast_shape(*tuple(arg.shape for arg in self.args))
non_ndarray_shape = _get_nary_broadcast_shape(
*tuple(arg.shape for arg in self.args if isinstance(arg, COO))
)
if full_shape != non_ndarray_shape:
raise ValueError('All ndarrays must be broadcastable to the shape without ndarrays {}'
.format(non_ndarray_shape))
self.shape = full_shape
def _get_func_coords_data(self, mask):
"""
Gets the coords/data for a certain mask
Parameters
----------
mask : tuple[Union[bool, NoneType]]
The mask determining whether to match or unmatch.
Returns
-------
None or tuple
The coords/data tuple for the given mask.
"""
from .core import COO
matched_args = [arg for arg, m in zip(self.args, mask) if m is not None and m]
unmatched_args = [arg for arg, m in zip(self.args, mask) if m is not None and not m]
ndarray_args = [arg for arg, m in zip(self.args, mask) if m is None]
matched_broadcast_shape = _get_nary_broadcast_shape(
*tuple(arg.shape for arg in itertools.chain(matched_args, ndarray_args))
)
matched_arrays = self._match_coo(*matched_args,
cache=self.cache,
broadcast_shape=matched_broadcast_shape)
func_args = []
m_arg = 0
for arg, m in zip(self.args, mask):
if m is None:
func_args.append(np.broadcast_to(arg, matched_broadcast_shape)[tuple(matched_arrays[0].coords)])
continue
if m:
func_args.append(matched_arrays[m_arg].data)
m_arg += 1
else:
func_args.append(arg.fill_value)
# Try our best to preserve the output dtype.
try:
func_data = self.func(*func_args, dtype=self.dtype, **self.kwargs)
except TypeError:
try:
func_args = np.broadcast_arrays(*func_args)
out = np.empty(func_args[0].shape, dtype=self.dtype)
func_data = self.func(*func_args, out=out, **self.kwargs)
except TypeError:
func_data = self.func(*func_args, **self.kwargs).astype(self.dtype)
unmatched_mask = ~equivalent(func_data, self.fill_value)
if not unmatched_mask.any():
return None
func_coords = matched_arrays[0].coords[:, unmatched_mask]
func_data = func_data[unmatched_mask]
if matched_arrays[0].shape != self.shape:
params = _get_broadcast_parameters(matched_arrays[0].shape, self.shape)
func_coords, func_data = _get_expanded_coords_data(func_coords, func_data, params, self.shape)
if all(m is None or m for m in mask):
return func_coords, func_data
# Not really sorted but we need the sortedness.
func_array = COO(func_coords, func_data, self.shape, has_duplicates=False, sorted=True)
unmatched_mask = np.ones(func_array.nnz, dtype=np.bool)
for arg in unmatched_args:
matched_idx = self._match_coo(func_array, arg, return_midx=True)[0]
unmatched_mask[matched_idx] = False
coords = np.asarray(func_array.coords[:, unmatched_mask], order='C')
data = np.asarray(func_array.data[unmatched_mask], order='C')
return coords, data
@staticmethod
def _match_coo(*args, **kwargs):
"""
Matches the coordinates for any number of input :obj:`COO` arrays.
Equivalent to "sparse" broadcasting for all arrays.
Parameters
----------
args : Tuple[COO]
The input :obj:`COO` arrays.
return_midx : bool
Whether to return matched indices or matched arrays. Matching
only supported for two arrays. ``False`` by default.
cache : dict
Cache of things already matched. No cache by default.
Returns
-------
matched_idx : List[ndarray]
The indices of matched elements in the original arrays. Only returned if
``return_midx`` is ``True``.
matched_arrays : List[COO]
The expanded, matched :obj:`COO` objects. Only returned if
``return_midx`` is ``False``.
"""
from .core import COO
from .common import linear_loc
cache = kwargs.pop('cache', None)
return_midx = kwargs.pop('return_midx', False)
broadcast_shape = kwargs.pop('broadcast_shape', None)
if kwargs:
raise ValueError('Unknown kwargs: {}'.format(kwargs.keys()))
if return_midx and (len(args) != 2 or cache is not None):
raise NotImplementedError('Matching indices only supported for two args, and no cache.')
matched_arrays = [args[0]]
cache_key = [id(args[0])]
for arg2 in args[1:]:
cache_key.append(id(arg2))
key = tuple(cache_key)
if cache is not None and key in cache:
matched_arrays = cache[key]
continue
cargs = [matched_arrays[0], arg2]
current_shape = _get_broadcast_shape(matched_arrays[0].shape, arg2.shape)
params = [_get_broadcast_parameters(arg.shape, current_shape) for arg in cargs]
reduced_params = [all(p) for p in zip(*params)]
reduced_shape = _get_reduced_shape(arg2.shape,
reduced_params[-arg2.ndim:])
reduced_coords = [_get_reduced_coords(arg.coords, reduced_params[-arg.ndim:])
for arg in cargs]
linear = [linear_loc(rc, reduced_shape) for rc in reduced_coords]
sorted_idx = [np.argsort(idx) for idx in linear]
linear = [idx[s] for idx, s in zip(linear, sorted_idx)]
matched_idx = _match_arrays(*linear)
if return_midx:
matched_idx = [sidx[midx] for sidx, midx in zip(sorted_idx, matched_idx)]
return matched_idx
coords = [arg.coords[:, s] for arg, s in zip(cargs, sorted_idx)]
mcoords = [c[:, idx] for c, idx in zip(coords, matched_idx)]
mcoords = _get_matching_coords(mcoords, params)
mdata = [arg.data[sorted_idx[0]][matched_idx[0]] for arg in matched_arrays]
mdata.append(arg2.data[sorted_idx[1]][matched_idx[1]])
# The coords aren't truly sorted, but we don't need them, so it's
# best to avoid the extra cost.
matched_arrays = [
COO(mcoords, md, shape=current_shape, sorted=True, has_duplicates=False)
for md in mdata]
if cache is not None:
cache[key] = matched_arrays
if broadcast_shape is not None and matched_arrays[0].shape != broadcast_shape:
params = _get_broadcast_parameters(matched_arrays[0].shape, broadcast_shape)
coords, idx = _get_expanded_coords_data(
matched_arrays[0].coords, np.arange(matched_arrays[0].nnz), params, broadcast_shape
)
matched_arrays = [
COO(coords, arr.data[idx], shape=broadcast_shape, sorted=True, has_duplicates=False)
for arr in matched_arrays
]
return matched_arrays