Source code for text_extensions_for_pandas.array.tensor

#
#  Copyright (c) 2020 IBM Corp.
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

#
# tensor.py
#
# Part of text_extensions_for_pandas
#
# Pandas extensions to support columns of N-dimensional tensors of equal shape.
#

from distutils.version import LooseVersion
import numbers
import os
from typing import *

import numpy as np
import pandas as pd
from pandas.compat import set_function_name
from pandas.core.dtypes.generic import ABCDataFrame, ABCSeries
try:
    from pandas.core.dtypes.generic import ABCIndex
except ImportError:
    # ABCIndexClass changed to ABCIndex in Pandas 1.3
    # noinspection PyUnresolvedReferences
    from pandas.core.dtypes.generic import ABCIndexClass as ABCIndex
from pandas.core.indexers import check_array_indexer, validate_indices

""" Begin Patching of ExtensionArrayFormatter """
from pandas.io.formats.format import ExtensionArrayFormatter


def _format_strings_patched(self) -> List[str]:
    from pandas.core.construction import extract_array
    from pandas.io.formats.format import format_array

    if not isinstance(self.values, TensorArray):
        return self._format_strings_orig()

    values = extract_array(self.values, extract_numpy=True)
    array = np.asarray(values)

    if array.ndim == 1:
        return self._format_strings_orig()

    def format_array_wrap(array_, formatter_):
        fmt_values = format_array(
            array_,
            formatter_,
            float_format=self.float_format,
            na_rep=self.na_rep,
            digits=self.digits,
            space=self.space,
            justify=self.justify,
            decimal=self.decimal,
            leading_space=self.leading_space,
            quoting=self.quoting,
        )
        return fmt_values

    flat_formatter = self.formatter
    if flat_formatter is None:
        flat_formatter = values._formatter(boxed=True)

    # Flatten array, call function, reshape (use ravel_compat in v1.3.0)
    flat_array = array.ravel("K")
    fmt_flat_array = np.asarray(
        format_array_wrap(flat_array, flat_formatter))
    order = "F" if array.flags.f_contiguous else "C"
    fmt_array = fmt_flat_array.reshape(array.shape, order=order)

    # Format the array of nested strings, use default formatter
    return format_array_wrap(fmt_array, None)


def _format_strings_patched_v1_0_0(self) -> List[str]:
    from functools import partial
    from pandas.core.construction import extract_array
    from pandas.io.formats.format import format_array
    from pandas.io.formats.printing import pprint_thing

    if not isinstance(self.values, TensorArray):
        return self._format_strings_orig()

    values = extract_array(self.values, extract_numpy=True)
    array = np.asarray(values)

    if array.ndim == 1:
        return self._format_strings_orig()

    def format_array_wrap(array_, formatter_):
        fmt_values = format_array(
            array_,
            formatter_,
            float_format=self.float_format,
            na_rep=self.na_rep,
            digits=self.digits,
            space=self.space,
            justify=self.justify,
            decimal=self.decimal,
            leading_space=self.leading_space,
        )
        return fmt_values

    flat_formatter = self.formatter
    if flat_formatter is None:
        flat_formatter = values._formatter(boxed=True)

    # Flatten array, call function, reshape (use ravel_compat in v1.3.0)
    flat_array = array.ravel("K")
    fmt_flat_array = np.asarray(
        format_array_wrap(flat_array, flat_formatter))
    order = "F" if array.flags.f_contiguous else "C"
    fmt_array = fmt_flat_array.reshape(array.shape, order=order)

    # Slimmed down version of GenericArrayFormatter due to pandas-dev GH#33770
    def format_strings_slim(array_, leading_space):
        formatter = partial(
            pprint_thing,
            escape_chars=("\t", "\r", "\n"),
        )

        def _format(x):
            return str(formatter(x))

        fmt_values = []
        for v in array_:
            tpl = "{v}" if leading_space is False else " {v}"
            fmt_values.append(tpl.format(v=_format(v)))
        return fmt_values

    return format_strings_slim(fmt_array, self.leading_space)


_FORMATTER_ENABLED_KEY = "TEXT_EXTENSIONS_FOR_PANDAS_FORMATTER_ENABLED"

if os.getenv(_FORMATTER_ENABLED_KEY, "true").lower() == "true":
    ExtensionArrayFormatter._format_strings_orig = \
        ExtensionArrayFormatter._format_strings
    if LooseVersion("1.1.0") <= LooseVersion(pd.__version__) < LooseVersion("1.3.0"):
        ExtensionArrayFormatter._format_strings = _format_strings_patched
    else:
        ExtensionArrayFormatter._format_strings = _format_strings_patched_v1_0_0
    ExtensionArrayFormatter._patched_by_text_extensions_for_pandas = True
""" End Patching of ExtensionArrayFormatter """


[docs]@pd.api.extensions.register_extension_dtype class TensorDtype(pd.api.extensions.ExtensionDtype): """ Pandas data type for a column of tensors with the same shape. """ base = None @property def type(self): """The type for a single row of a TensorArray column.""" return TensorElement @property def name(self) -> str: """A string representation of the dtype.""" return "TensorDtype" @classmethod def construct_from_string(cls, string: str): """ See docstring in :class:`ExtensionDType` class in ``pandas/core/dtypes/base.py`` for information about this method. """ if not isinstance(string, str): raise TypeError( f"'construct_from_string' expects a string, got {type(string)}" ) # Upstream code uses exceptions as part of its normal control flow and # will pass this method bogus class names. if string == cls.__name__: return cls() else: raise TypeError( f"Cannot construct a '{cls.__name__}' from '{string}'") @classmethod def construct_array_type(cls): """ See docstring in :class:`ExtensionDType` class in ``pandas/core/dtypes/base.py`` for information about this method. """ return TensorArray def __from_arrow__(self, extension_array): from text_extensions_for_pandas.array.arrow_conversion import arrow_to_tensor_array return arrow_to_tensor_array(extension_array)
class TensorOpsMixin(pd.api.extensions.ExtensionScalarOpsMixin): """ Mixin to provide operators on underlying ndarray. TODO: would be better to derive from ExtensionOpsMixin, but not available """ @classmethod def _create_method(cls, op, coerce_to_dtype=True, result_dtype=None): # NOTE: this overrides, but coerce_to_dtype, result_dtype might not be needed def _binop(self, other): lvalues = self._tensor if isinstance(other, (ABCDataFrame, ABCSeries, ABCIndex)): # Rely on pandas to unbox and dispatch to us. return NotImplemented # divmod returns a tuple if op_name in ["__divmod__", "__rdivmod__"]: # TODO: return tuple # div, mod = result raise NotImplementedError if isinstance(other, (TensorArray, TensorElement)): rvalues = other._tensor else: rvalues = other result = op(lvalues, rvalues) # Force a TensorArray if rvalue is not a scalar if isinstance(self, TensorElement) and \ (not isinstance(other, TensorElement) or not np.isscalar(other)): result_wrapped = TensorArray(result) else: result_wrapped = cls(result) return result_wrapped op_name = f"__{op.__name__}__" return set_function_name(_binop, op_name, cls)
[docs]class TensorElement(TensorOpsMixin): """ Class representing a single element in a TensorArray, or row in a Pandas column of dtype TensorDtype. This is a light wrapper over a numpy.ndarray """ def __init__(self, values: np.ndarray): """ Construct a TensorElement from an numpy.ndarray. :param values: tensor values for this instance. """ self._tensor = values def __repr__(self): return self._tensor.__repr__() def __str__(self): return self._tensor.__str__()
[docs] def to_numpy(self): """ Return the values of this element as a numpy.ndarray :return: numpy.ndarray """ return np.asarray(self._tensor)
def __array__(self): return np.asarray(self._tensor)
[docs]class TensorArray(pd.api.extensions.ExtensionArray, TensorOpsMixin): """ A Pandas :class:`ExtensionArray` that represents a column of :class:`numpy.ndarray` objects, or tensors, where the outer dimension is the count of tensors in the column. Each tensor must have the same shape. """ def __init__(self, values: Union[np.ndarray, Sequence[Union[np.ndarray, TensorElement]], TensorElement, Any]): """ :param values: A :class:`numpy.ndarray` or sequence of :class:`numpy.ndarray` objects of equal shape. """ if isinstance(values, np.ndarray): if values.dtype.type is np.object_ and len(values) > 0 and \ isinstance(values[0], TensorElement): self._tensor = np.array([np.asarray(v) for v in values]) else: self._tensor = values elif isinstance(values, Sequence): if len(values) == 0: self._tensor = np.array([]) else: self._tensor = np.stack([np.asarray(v) for v in values], axis=0) elif isinstance(values, TensorElement): self._tensor = np.array([np.asarray(values)]) elif np.isscalar(values): # `values` is a single element: pd.Series(np.nan, index=[1, 2, 3], dtype=TensorDtype()) self._tensor = np.array([values]) elif isinstance(values, TensorArray): raise TypeError("Use the copy() method to create a copy of a TensorArray") else: raise TypeError(f"Expected a numpy.ndarray or sequence of numpy.ndarray, " f"but received {values} " f"of type '{type(values)}' instead.") @classmethod def _from_sequence(cls, scalars, dtype=None, copy=False): """ See docstring in :class:`ExtensionArray` class in ``pandas/core/arrays/base.py`` for information about this method. """ if copy and isinstance(scalars, np.ndarray): scalars = scalars.copy() elif isinstance(scalars, TensorArray): scalars = scalars._tensor.copy() if copy else scalars._tensor return TensorArray(scalars) @classmethod def _from_factorized(cls, values, original): """ See docstring in :class:`ExtensionArray` class in ``pandas/core/arrays/base.py`` for information about this method. """ raise NotImplementedError @classmethod def _concat_same_type( cls, to_concat: Sequence["TensorArray"] ) -> "TensorArray": """ See docstring in :class:`ExtensionArray` class in ``pandas/core/arrays/base.py`` for information about this method. """ return TensorArray(np.concatenate([a._tensor for a in to_concat]))
[docs] def isna(self) -> np.array: """ See docstring in :class:`ExtensionArray` class in ``pandas/core/arrays/base.py`` for information about this method. """ if self._tensor.dtype.type is np.object_: # Avoid comparing with __eq__ because the elements of the tensor may do # something funny with that operation. result_list = [ self._tensor[i] is None for i in range(len(self)) ] return np.array(result_list, dtype=bool) elif self._tensor.dtype.type is np.str_: return np.all(self._tensor == "", axis=-1) else: return np.all(np.isnan(self._tensor), axis=-1)
[docs] def copy(self) -> "TensorArray": """ See docstring in :class:`ExtensionArray` class in ``pandas/core/arrays/base.py`` for information about this method. """ ret = TensorArray( self._tensor.copy(), ) # TODO: Copy cached properties too return ret
[docs] def take( self, indices: Sequence[int], allow_fill: bool = False, fill_value: Any = None ) -> "TensorArray": """ See docstring in :class:`ExtensionArray` class in ``pandas/core/arrays/base.py`` for information about this method. """ if allow_fill: # From API docs: "[If allow_fill == True, then] negative values in # `indices` indicate missing values and are set to `fill_value` indices = np.asarray(indices, dtype=np.intp) validate_indices(indices, len(self._tensor)) # Check if there are missing indices to fill, if not can use numpy take below has_missing = np.any(indices < 0) if has_missing: if fill_value is None: fill_value = np.nan # Create an array populated with fill value values = np.full((len(indices),) + self._tensor.shape[1:], fill_value) # Iterate over each index and set non-missing elements for i, idx in enumerate(indices): if idx >= 0: values[i] = self._tensor[idx] return TensorArray(values) # Delegate take to numpy array values = self._tensor.take(indices, axis=0) return TensorArray(values)
@property def dtype(self) -> pd.api.extensions.ExtensionDtype: """ See docstring in :class:`ExtensionArray` class in ``pandas/core/arrays/base.py`` for information about this method. """ return TensorDtype() @property def inferred_type(self) -> str: """ Return string describing type of TensorArray. Delegates to :func:`pandas.api.types.infer_dtype`. See docstring for more information. :return: string describing numpy type of this TensorArray """ return pd.api.types.infer_dtype(self._tensor) @property def nbytes(self) -> int: """ See docstring in :class:`ExtensionArray` class in ``pandas/core/arrays/base.py`` for information about this method. """ return self._tensor.nbytes
[docs] def to_numpy(self, dtype=None, copy=False, na_value=pd.api.extensions.no_default): """ See docstring in :class:`ExtensionArray` class in ``pandas/core/arrays/base.py`` for information about this method. """ if dtype is not None: dtype = pd.api.types.pandas_dtype(dtype) if copy: values = np.array(self._tensor, dtype=dtype, copy=True) else: values = self._tensor.astype(dtype) elif copy: values = self._tensor.copy() else: values = self._tensor return values
@property def numpy_dtype(self): """ Get the dtype of the tensor. :return: The numpy dtype of the backing ndarray """ return self._tensor.dtype @property def numpy_ndim(self): """ Get the number of tensor dimensions. :return: integer for the number of dimensions """ return self._tensor.ndim @property def numpy_shape(self): """ Get the shape of the tensor. :return: A tuple of integers for the numpy shape of the backing ndarray """ return self._tensor.shape
[docs] def astype(self, dtype, copy=True): """ See docstring in :class:`ExtensionArray` class in ``pandas/core/arrays/base.py`` for information about this method. """ dtype = pd.api.types.pandas_dtype(dtype) if isinstance(dtype, TensorDtype): values = TensorArray(self._tensor.copy()) if copy else self elif not pd.api.types.is_object_dtype(dtype) and \ pd.api.types.is_string_dtype(dtype): values = np.array([str(t) for t in self._tensor]) if isinstance(dtype, pd.StringDtype): return dtype.construct_array_type()._from_sequence(values, copy=False) else: return values elif pd.api.types.is_object_dtype(dtype): # Interpret astype(object) as "cast to an array of numpy arrays" values = np.empty(len(self), dtype=object) for i in range(len(self)): values[i] = self._tensor[i] else: values = self._tensor.astype(dtype, copy=copy) return values
[docs] def any(self, axis=None, out=None, keepdims=False): """ Test whether any array element along a given axis evaluates to ``True``. See numpy.any() documentation for more information https://numpy.org/doc/stable/reference/generated/numpy.any.html#numpy.any :param axis: Axis or axes along which a logical OR reduction is performed. :param out: Alternate output array in which to place the result. :param keepdims: If this is set to True, the axes which are reduced are left in the result as dimensions with size one. :return: single boolean unless ``axis``is not ``None``; else :class:`TensorArray` """ result = self._tensor.any(axis=axis, out=out, keepdims=keepdims) return result if axis is None else TensorArray(result)
[docs] def all(self, axis=None, out=None, keepdims=False): """ Test whether all array elements along a given axis evaluate to ``True``. :param axis: Axis or axes along which a logical AND reduction is performed. :param out: Alternate output array in which to place the result. :param keepdims: If this is set to True, the axes which are reduced are left in the result as dimensions with size one. :return: single boolean unless ``axis`` is not ``None``; else :class:`TensorArray` """ result = self._tensor.all(axis=axis, out=out, keepdims=keepdims) return result if axis is None else TensorArray(result)
def __len__(self) -> int: return len(self._tensor) def __getitem__(self, item) -> Union["TensorArray", "TensorElement"]: """ See docstring in :class:`ExtensionArray` class in ``pandas/core/arrays/base.py`` for information about this method. """ # Return scalar if single value is selected, a TensorElement for single array # element, or TensorArray for slice if isinstance(item, int): value = self._tensor[item] if np.isscalar(value): return value else: return TensorElement(value) else: # BEGIN workaround for Pandas issue #42430 if isinstance(item, tuple) and len(item) > 1 and item[0] == Ellipsis: if len(item) > 2: # Hopefully this case is not possible, but can't be sure raise ValueError(f"Workaround Pandas issue #42430 not implemented " f"for tuple length > 2") item = item[1] # END workaround for issue #42430 if isinstance(item, TensorArray): item = np.asarray(item) item = check_array_indexer(self, item) return TensorArray(self._tensor[item]) def __setitem__(self, key: Union[int, np.ndarray], value: Any) -> None: """ See docstring in :class:`ExtensionArray` class in ``pandas/core/arrays/base.py`` for information about this method. """ key = check_array_indexer(self, key) if isinstance(value, TensorElement) or np.isscalar(value): value = np.asarray(value) if isinstance(value, list): value = [np.asarray(v) if isinstance(v, TensorElement) else v for v in value] if isinstance(value, ABCSeries) and isinstance(value.dtype, TensorDtype): value = value.values if value is None or isinstance(value, Sequence) and len(value) == 0: nan_fill = np.full_like(self._tensor[key], np.nan) self._tensor[key] = nan_fill elif isinstance(key, (int, slice, np.ndarray)): self._tensor[key] = value else: raise NotImplementedError(f"__setitem__ with key type '{type(key)}' " f"not implemented") def __contains__(self, item) -> bool: if isinstance(item, TensorElement): npitem = np.asarray(item) if npitem.size == 1 and np.isnan(npitem).all(): return self.isna().any() return super().__contains__(item) def __repr__(self): """ See docstring in :class:`ExtensionArray` class in ``pandas/core/arrays/base.py`` for information about this method. """ return self._tensor.__repr__() def __str__(self): return self._tensor.__str__() def _values_for_factorize(self) -> Tuple[np.ndarray, Any]: """ See docstring in :class:`ExtensionArray` class in ``pandas/core/arrays/base.py`` for information about this method. """ # TODO return self._tensor, np.nan raise NotImplementedError def _reduce(self, name, skipna=True, **kwargs): """ See docstring in :class:`ExtensionArray` class in ``pandas/core/arrays/base.py`` for information about this method. """ if name in ("sum", "prod", "mean", "std", "var", "min", "max", "argmin", "argmax", "median", "any", "all"): # Standard Numpy aggregates. Retrieve the eponymous Numpy # function and call it on our tensor along axis 0. numpy_agg_func = getattr(np, name) return TensorElement(numpy_agg_func(self._tensor, axis=0)) else: raise NotImplementedError(f"'{name}' aggregate not implemented.") def __array__(self, dtype=None): """ Interface to return the backing tensor as a numpy array with optional dtype. If dtype is not None, then the tensor will be cast to that type, otherwise this is a no-op. """ return np.asarray(self._tensor, dtype=dtype) def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): """ Interface to handle numpy ufuncs that will accept TensorArray as input, and wrap the output back as another TensorArray. """ out = kwargs.get('out', ()) for x in inputs + out: if not isinstance(x, (TensorArray, np.ndarray, numbers.Number)): return NotImplemented # Defer to the implementation of the ufunc on unwrapped values. inputs = tuple(x._tensor if isinstance(x, TensorArray) else x for x in inputs) if out: kwargs['out'] = tuple( x._tensor if isinstance(x, TensorArray) else x for x in out) result = getattr(ufunc, method)(*inputs, **kwargs) if type(result) is tuple: # multiple return values return tuple(type(self)(x) for x in result) elif method == 'at': # no return value return None else: # one return value return type(self)(result) def __arrow_array__(self, type=None): from text_extensions_for_pandas.array.arrow_conversion import ArrowTensorArray return ArrowTensorArray.from_numpy(self._tensor)
# Add operators from the mixin to the class TensorElement._add_arithmetic_ops() TensorElement._add_comparison_ops() TensorArray._add_arithmetic_ops() TensorArray._add_comparison_ops()