# Source code for sparse.coo.umath

import itertools

import numba
import numpy as np
import scipy.sparse

from itertools import zip_longest

from ..utils import isscalar, equivalent, _zero_of_dtype

[docs]def elemwise(func, *args, **kwargs):
"""
Apply a function to any number of arguments.

Parameters
----------
func : Callable
The function to apply. Must support broadcasting.
args : tuple, optional
The arguments to the function. Can be :obj:SparseArray objects
or :obj:scipy.sparse.spmatrix objects.
kwargs : dict, optional
Any additional arguments to pass to the function.

Returns
-------
COO
The result of applying the function.

Raises
------
ValueError
If the operation would result in a dense matrix, or if the operands

--------
:obj:numpy.ufunc : A similar Numpy construct. Note that any :code:ufunc can be used
as the :code:func input to this function.

Notes
-----
Previously, operations with Numpy arrays were sometimes supported. Now,
it is necessary to convert Numpy arrays to :obj:COO objects.
"""

return _Elemwise(func, *args, **kwargs).get_result()

@numba.jit(nopython=True, nogil=True)
def _match_arrays(a, b):  # pragma: no cover
"""
Finds all indexes into a and b such that a[i] = b[j]. The outputs are sorted
in lexographical order.

Parameters
----------
a, b : np.ndarray
The input 1-D arrays to match. If matching of multiple fields is
needed, use np.recarrays. These two arrays must be sorted.

Returns
-------
a_idx, b_idx : np.ndarray
The output indices of every possible pair of matching elements.
"""
if len(a) == 0 or len(b) == 0:
return np.empty(0, dtype=np.uintp), np.empty(0, dtype=np.uintp)

a_ind, b_ind = [], []
nb = len(b)
ib = 0
match = 0

for ia, j in enumerate(a):
if j == b[match]:
ib = match

while ib < nb and j >= b[ib]:
if j == b[ib]:
a_ind.append(ia)
b_ind.append(ib)

if b[match] < b[ib]:
match = ib

ib += 1

return np.array(a_ind, dtype=np.uintp), np.array(b_ind, dtype=np.uintp)

"""
Broadcast any number of shapes to a result shape.

Parameters
----------
shapes : tuple[tuple[int]]

Returns
-------
tuple[int]
The output shape.

Raises
------
ValueError
If the input shapes cannot be broadcast to a single shape.
"""
result_shape = ()

for shape in shapes:
try:
except ValueError:
shapes_str = ', '.join(str(shape) for shape in shapes)
raise ValueError('operands could not be broadcast together with shapes %s'
% shapes_str)

return result_shape

"""

Parameters
----------
shape1, shape2 : tuple[int]
The input shapes to broadcast together.
is_result : bool
Whether or not shape2 is also the result shape.

Returns
-------
result_shape : tuple[int]
The overall shape of the result.

Raises
------
ValueError
If the two shapes cannot be broadcast together.
"""
# https://stackoverflow.com/a/47244284/774273
if not all((l1 == l2) or (l1 == 1) or ((l2 == 1) and not is_result) for l1, l2 in
zip(shape1[::-1], shape2[::-1])):
raise ValueError('operands could not be broadcast together with shapes %s, %s' %
(shape1, shape2))

result_shape = tuple(l1 if l1 != 1 else l2 for l1, l2 in
zip_longest(shape1[::-1], shape2[::-1], fillvalue=1))[::-1]

return result_shape

"""

Parameters
----------
shape : tuple[int]
The input shape.

Returns
-------
params : list
A list containing None if the dimension isn't in the original array, False if
it needs to be broadcast, and True if it doesn't.
"""
params = [None if l1 is None else l1 == l2 for l1, l2

return params

def _get_reduced_coords(coords, params):
"""
Gets only those dimensions of the coordinates that don't need to be broadcast.

Parameters
----------
coords : np.ndarray
The coordinates to reduce.
params : list
The params from which to check which dimensions to get.

Returns
-------
reduced_coords : np.ndarray
The reduced coordinates.
"""

reduced_params = [bool(param) for param in params]

return coords[reduced_params]

def _get_reduced_shape(shape, params):
"""
Gets only those dimensions of the coordinates that don't need to be broadcast.

Parameters
----------
coords : np.ndarray
The coordinates to reduce.
params : list
The params from which to check which dimensions to get.

Returns
-------
reduced_coords : np.ndarray
The reduced coordinates.
"""
reduced_shape = tuple(l for l, p in zip(shape, params) if p)

return reduced_shape

"""
Expand coordinates/data to broadcast_shape. Does most of the heavy lifting for broadcast_to.
Produces sorted output for sorted inputs.

Parameters
----------
coords : np.ndarray
The coordinates to expand.
data : np.ndarray
The data corresponding to the coordinates.
params : list

Returns
-------
expanded_coords : np.ndarray
List of 1-D arrays. Each item in the list has one dimension of coordinates.
expanded_data : np.ndarray
The data corresponding to expanded_coords.
"""
first_dim = -1
expand_shapes = []
if p and first_dim == -1:
expand_shapes.append(coords.shape[1])
first_dim = d

if not p:
expand_shapes.append(l)

all_idx = _cartesian_product(*(np.arange(d, dtype=np.intp) for d in expand_shapes))

false_dim = 0
dim = 0

expanded_data = data[all_idx[first_dim]]

if p:
expanded_coords[d] = coords[dim, all_idx[first_dim]]
else:
expanded_coords[d] = all_idx[false_dim + (d > first_dim)]
false_dim += 1

if p is not None:
dim += 1

return np.asarray(expanded_coords), np.asarray(expanded_data)

# (c) senderle
# Taken from https://stackoverflow.com/a/11146645/774273
def _cartesian_product(*arrays):
"""
Get the cartesian product of a number of arrays.

Parameters
----------
arrays : Tuple[np.ndarray]
The arrays to get a cartesian product of. Always sorted with respect
to the original array.

Returns
-------
out : np.ndarray
The overall cartesian product of all the input arrays.
"""
dtype = np.result_type(*arrays)
out = np.empty(rows * cols, dtype=dtype)
start, end = 0, rows
out[start:end] = a.reshape(-1)
start, end = end, end + rows
return out.reshape(cols, rows)

def _get_matching_coords(coords, params):
"""
Get the matching coords across a number of broadcast operands.

Parameters
----------
coords : list[numpy.ndarray]
The input coordinates.
params : list[Union[bool, none]]
Returns
-------
numpy.ndarray
The broacasted coordinates
"""
matching_coords = []
dims = np.zeros(len(coords), dtype=np.uint8)

for p_all in zip(*params):
for i, p in enumerate(p_all):
if p:
matching_coords.append(coords[i][dims[i]])
break
else:
matching_coords.append(coords[dims[0]])

for i, p in enumerate(p_all):
if p is not None:
dims[i] += 1

return np.asarray(matching_coords, dtype=np.intp)

"""
Performs the equivalent of :obj:numpy.broadcast_to for :obj:COO. Note that
this function returns a new array instead of a view.

Parameters
----------
shape : tuple[int]
The shape to broadcast the data to.

Returns
-------
COO

Raises
------
ValueError
If the operand cannot be broadcast to the given shape.

--------
:obj:numpy.broadcast_to : NumPy equivalent function
"""
from .core import COO

if shape == x.shape:
return x

coords, data = _get_expanded_coords_data(x.coords, x.data, params, result_shape)

# Check if all the non-broadcast axes are next to each other
nonbroadcast_idx = [idx for idx, p in enumerate(params) if p]
sorted = all(d == 1 for d in diff_nonbroadcast_idx)

return COO(coords, data, shape=result_shape, has_duplicates=False,
sorted=sorted, fill_value=x.fill_value)

class _Elemwise:
def __init__(self, func, *args, **kwargs):
"""
Initialize the element-wise function calculator.

Parameters
----------
func : types.Callable
The function to compute
args : tuple[Union[SparseArray, ndarray, scipy.sparse.spmatrix]]
The arguments to compute the function on.
kwargs : dict
Extra arguments to pass to the function.
"""

from .core import COO
from ..sparse_array import SparseArray

processed_args = []

for arg in args:
if isinstance(arg, scipy.sparse.spmatrix):
processed_args.append(COO.from_scipy_sparse(arg))
elif isscalar(arg) or isinstance(arg, np.ndarray):
# Faster and more reliable to pass ()-shaped ndarrays as scalars.
processed_args.append(np.asarray(arg))
elif isinstance(arg, SparseArray) and not isinstance(arg, COO):
processed_args.append(COO(arg))
elif not isinstance(arg, COO):
self.args = None
return
else:
processed_args.append(arg)

self.args = tuple(processed_args)
self.func = func
self.dtype = kwargs.pop('dtype', None)
self.kwargs = kwargs
self.cache = {}

self._get_fill_value()

def get_result(self):
from .core import COO
if self.args is None:
return NotImplemented

if any(s == 0 for s in self.shape):
data = np.empty((0,), dtype=self.fill_value.dtype)
coords = np.empty((0, len(self.shape)), dtype=np.intp)
return COO(coords, data, shape=self.shape, has_duplicates=False, fill_value=self.fill_value)

data_list = []
coords_list = []

for mask in itertools.product(*[[True, False] if isinstance(arg, COO)
else [None] for arg in self.args]):
continue

if r is not None:
coords_list.append(r[0])
data_list.append(r[1])

# Concatenate matches and mismatches
data = np.concatenate(data_list) if len(data_list) else np.empty((0,), dtype=self.fill_value.dtype)
coords = np.concatenate(coords_list, axis=1) if len(coords_list) else \
np.empty((0, len(self.shape)), dtype=np.intp)

return COO(coords, data, shape=self.shape, has_duplicates=False, fill_value=self.fill_value)

def _get_fill_value(self):
"""
A function that finds and returns the fill-value.

Raises
------
ValueError
If the fill-value is inconsistent.
"""
from .core import COO

zero_args = tuple(arg.fill_value[...] if isinstance(arg, COO) else arg for arg in self.args)

# Some elemwise functions require a dtype argument, some abhorr it.
try:
fill_value_array = self.func(*zero_args, dtype=self.dtype, **self.kwargs)
self.dtype = None
except TypeError:
fill_value_array = self.func(*zero_args, **self.kwargs)

try:
fill_value = fill_value_array[(0,) * fill_value_array.ndim]
except IndexError:
zero_args = tuple(
arg.fill_value if isinstance(arg, COO) else _zero_of_dtype(arg.dtype) for arg in self.args)
fill_value = self.func(*zero_args, **self.kwargs)[()]

if not equivalent(fill_value, fill_value_array).all():
raise ValueError('Performing a mixed sparse-dense operation that would result in a dense array. '
'Please make sure that func(sparse_fill_values, ndarrays) is a constant array.')

# Store dtype separately if needed.
if self.dtype is not None:
fill_value = fill_value.astype(self.dtype)

self.fill_value = fill_value
self.dtype = self.fill_value.dtype

"""

Raises
------
ValueError
If the check fails.
"""
from .core import COO
full_shape = _get_nary_broadcast_shape(*tuple(arg.shape for arg in self.args))
*tuple(arg.shape for arg in self.args if isinstance(arg, COO))
)

if full_shape != non_ndarray_shape:
raise ValueError('Please make sure that the broadcast shape of just the sparse arrays is '
'the same as the broadcast shape of all the operands.')

self.shape = full_shape

"""
Gets the coords/data for a certain mask

Parameters
----------
The mask determining whether to match or unmatch.

Returns
-------
None or tuple
The coords/data tuple for the given mask.
"""
from .core import COO

matched_args = [arg for arg, m in zip(self.args, mask) if m is not None and m]
unmatched_args = [arg for arg, m in zip(self.args, mask) if m is not None and not m]
ndarray_args = [arg for arg, m in zip(self.args, mask) if m is None]

*tuple(arg.shape for arg in itertools.chain(matched_args, ndarray_args))
)

matched_arrays = self._match_coo(*matched_args,
cache=self.cache,

func_args = []

m_arg = 0
for arg, m in zip(self.args, mask):
if m is None:
continue

if m:
func_args.append(matched_arrays[m_arg].data)
m_arg += 1
else:
func_args.append(arg.fill_value)

# Try our best to preserve the output dtype.
try:
func_data = self.func(*func_args, dtype=self.dtype, **self.kwargs)
except TypeError:
try:
out = np.empty(func_args[0].shape, dtype=self.dtype)
func_data = self.func(*func_args, out=out, **self.kwargs)
except TypeError:
func_data = self.func(*func_args, **self.kwargs).astype(self.dtype)

return None

if matched_arrays[0].shape != self.shape:
func_coords, func_data = _get_expanded_coords_data(func_coords, func_data, params, self.shape)

if all(m is None or m for m in mask):
return func_coords, func_data

# Not really sorted but we need the sortedness.
func_array = COO(func_coords, func_data, self.shape, has_duplicates=False, sorted=True)

for arg in unmatched_args:
matched_idx = self._match_coo(func_array, arg, return_midx=True)[0]

return coords, data

@staticmethod
def _match_coo(*args, **kwargs):
"""
Matches the coordinates for any number of input :obj:COO arrays.
Equivalent to "sparse" broadcasting for all arrays.

Parameters
----------
args : Tuple[COO]
The input :obj:COO arrays.
return_midx : bool
Whether to return matched indices or matched arrays. Matching
only supported for two arrays. False by default.
cache : dict
Cache of things already matched. No cache by default.

Returns
-------
matched_idx : List[ndarray]
The indices of matched elements in the original arrays. Only returned if
return_midx is True.
matched_arrays : List[COO]
The expanded, matched :obj:COO objects. Only returned if
return_midx is False.
"""
from .core import COO
from .common import linear_loc

cache = kwargs.pop('cache', None)
return_midx = kwargs.pop('return_midx', False)

if kwargs:
raise ValueError('Unknown kwargs: {}'.format(kwargs.keys()))

if return_midx and (len(args) != 2 or cache is not None):
raise NotImplementedError('Matching indices only supported for two args, and no cache.')

matched_arrays = [args[0]]
cache_key = [id(args[0])]
for arg2 in args[1:]:
cache_key.append(id(arg2))
key = tuple(cache_key)
if cache is not None and key in cache:
matched_arrays = cache[key]
continue

cargs = [matched_arrays[0], arg2]
params = [_get_broadcast_parameters(arg.shape, current_shape) for arg in cargs]
reduced_params = [all(p) for p in zip(*params)]
reduced_shape = _get_reduced_shape(arg2.shape,
reduced_params[-arg2.ndim:])

reduced_coords = [_get_reduced_coords(arg.coords, reduced_params[-arg.ndim:])
for arg in cargs]

linear = [linear_loc(rc, reduced_shape) for rc in reduced_coords]
sorted_idx = [np.argsort(idx) for idx in linear]
linear = [idx[s] for idx, s in zip(linear, sorted_idx)]
matched_idx = _match_arrays(*linear)

if return_midx:
matched_idx = [sidx[midx] for sidx, midx in zip(sorted_idx, matched_idx)]
return matched_idx

coords = [arg.coords[:, s] for arg, s in zip(cargs, sorted_idx)]
mcoords = [c[:, idx] for c, idx in zip(coords, matched_idx)]
mcoords = _get_matching_coords(mcoords, params)
mdata = [arg.data[sorted_idx[0]][matched_idx[0]] for arg in matched_arrays]
mdata.append(arg2.data[sorted_idx[1]][matched_idx[1]])
# The coords aren't truly sorted, but we don't need them, so it's
# best to avoid the extra cost.
matched_arrays = [
COO(mcoords, md, shape=current_shape, sorted=True, has_duplicates=False)
for md in mdata]

if cache is not None:
cache[key] = matched_arrays