Source code for text_extensions_for_pandas.array.span

#
#  Copyright (c) 2020 IBM Corp.
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

#
# span.py
#
# Part of text_extensions_for_pandas
#
# Pandas extensions to support columns of spans with character offsets.
#

import collections.abc
import textwrap
from typing import *

import numpy as np
import pandas as pd
from memoized_property import memoized_property
# noinspection PyProtectedMember
from pandas.api.types import is_bool_dtype
from pandas.core.dtypes.generic import ABCDataFrame, ABCSeries
try:
    from pandas.core.dtypes.generic import ABCIndex
except ImportError:
    # ABCIndexClass changed to ABCIndex in Pandas 1.3
    # noinspection PyUnresolvedReferences
    from pandas.core.dtypes.generic import ABCIndexClass as ABCIndex
from pandas.core.indexers import check_array_indexer

# Internal imports
import text_extensions_for_pandas.jupyter as jupyter
from text_extensions_for_pandas.array.string_table import StringTable
from text_extensions_for_pandas.util import to_int_array


def _check_same_text(obj1, obj2):
    if isinstance(obj1, Span) and isinstance(obj2, Span):
        if obj1.target_text != obj1.target_text:
            raise ValueError(
                f"Spans are over different target text "
                f"(got {obj1.target_text} and {obj2.target_text})"
            )
        return
    if not (isinstance(obj1, SpanArray) or isinstance(obj2, SpanArray)):
        raise TypeError(f"Expected some combination of Span and SpanArray, "
                        f"but received {type(obj1)} and {type(obj2)}")

    same_text_mask = (
        obj1.same_target_text(obj2) if isinstance(obj1, SpanArray)
        else obj2.same_target_text(obj1))
    if not np.all(same_text_mask):
        raise ValueError(
            f"SpanArrays are over different target text "
            f"(got {obj1.same_target_text} and {obj2.same_target_text})\n"
            f"Comparison result: {same_text_mask}"
        )


class SpanOpMixin:
    """
    Mixin class to define common operations between Span and SpanArray.
    """

    def __add__(self, other) -> Union["Span", "SpanArray"]:
        """
        Add a pair of spans and/or span arrays.

        span1 + span2 == minimal span that covers both spans
        :param other: Span or SpanArray
        :return: minimal span (or array of spans) that covers both inputs.
        """
        if isinstance(other, (ABCDataFrame, ABCSeries, ABCIndex)):
            # Rely on pandas to unbox and dispatch to us.
            return NotImplemented

        if isinstance(self, Span) and isinstance(other, Span):
            # Span + *Span = Span
            _check_same_text(self, other)
            return Span(self.target_text, min(self.begin, other.begin),
                        max(self.end, other.end))
        elif isinstance(self, (Span, SpanArray)) and isinstance(other, (Span, SpanArray)):
            # SpanArray + *Span* = SpanArray
            _check_same_text(self, other)
            return SpanArray(self.target_text,
                                    np.minimum(self.begin, other.begin),
                                    np.maximum(self.end, other.end))
        else:
            raise TypeError(f"Unexpected combination of span types for add operation: "
                            f"{type(self)} and {type(other)}")


[docs]class Span(SpanOpMixin): """ Python object representation of a single span with character offsets; that is, a single row of a `SpanArray`. An offset of `Span.NULL_OFFSET_VALUE` (currently -1) indicates "not a span" in the sense that NaN is "not a number". Most of the methods and properties of this class are single-span versions of the eponymous methods in :class:`SpanArray`. See that class for API documentation. """ # Begin/end value that indicates "not a span" in the sense that NaN is # "not a number". NULL_OFFSET_VALUE = -1 # Type: int def __init__(self, text: str, begin: int, end: int): """ Args: text: target document text on which the span is defined begin: Begin offset (inclusive) within `text` end: End offset (exclusive, one past the last char) within `text` """ if text is not None and not isinstance(text, str): raise TypeError(f"Text must be a string. Got {text} of type {type(text)}.") if Span.NULL_OFFSET_VALUE == begin: if Span.NULL_OFFSET_VALUE != end: raise ValueError("Begin offset with special 'null' value {} " "must be paired with an end offset of {}", Span.NULL_OFFSET_VALUE, Span.NULL_OFFSET_VALUE) elif begin < 0: raise ValueError("begin must be >= 0") elif end < 0: raise ValueError("end must be >= 0") elif end > len(text): raise ValueError(f"end must be less than length of target string " f"({end} > {len(text)}") self._text = text self._begin = begin self._end = end def __repr__(self) -> str: if self.begin == Span.NULL_OFFSET_VALUE: return "NA" elif self.target_text is None: return f"[{self.begin}, {self.end}): None" else: return f"[{self.begin}, {self.end}): " \ f"'{textwrap.shorten(self.covered_text, 80)}'" def __eq__(self, other): if isinstance(other, Span): return ( # All NAs considered equal (self.begin == Span.NULL_OFFSET_VALUE and other.begin == Span.NULL_OFFSET_VALUE) or (self.begin == other.begin and self.end == other.end and self.target_text == other.target_text)) elif isinstance(other, SpanArray): return other == self else: # Different type ==> not equal return False def __hash__(self): result = hash((self.target_text, self.begin, self.end)) return result def __lt__(self, other): """ span1 < span2 if span1.end <= span2.begin and both spans are over the same target text """ if not isinstance(other, (Span, SpanArray)): raise ValueError(f"Less-than relationship not defined for {self} and {other} " f"of types {type(self)} and {type(other)}.") elif isinstance(other, Span) and self.target_text != other.target_text: raise ValueError(f"Less-than relationship undefined for different target " f"texts.") elif isinstance(other, SpanArray) and np.any(self.target_text != other.target_text): raise ValueError(f"Less-than relationship undefined for different target " f"texts. Indexes that differ are " f"{np.argmin(self.target_text != other.target_text)}.") else: return self.end <= other.begin def __gt__(self, other): return other < self def __le__(self, other): return self < other or self == other def __ge__(self, other): return other <= self @property def begin(self): return self._begin @property def end(self): return self._end @property def target_text(self): return self._text @memoized_property def covered_text(self): """ Returns the substring of `self.target_text` that this `Span` represents. """ if Span.NULL_OFFSET_VALUE == self._begin: return None else: return self.target_text[self.begin:self.end]
[docs] def overlaps(self, other: "Span"): """ :param other: Another Span or TokenSpan :return: True if the two spans overlap. Also True if a zero-length span is contained within the other. """ if self.target_text != other.target_text: return False elif self.begin == other.begin and self.end == other.end: # Ensure that pairs of identical zero-length spans overlap. return True elif other.begin >= self.end: return False # other completely to the right of self elif other.end <= self.begin: return False # other completely to the left of self else: # other.begin < self.end and other.end >= self.begin return True
[docs] def contains(self, other: "Span"): """ :param other: Another Span or TokenSpan :return: True if `other` is entirely within the bounds of this span. Also True if a zero-length span is contained within the other. """ if self.target_text != other.target_text: return False return other.begin >= self.begin and other.end <= self.end
[docs] def context(self, num_chars: int = 40) -> str: """ Show the location of this span in the context of the target string. :param num_chars: How many characters on either side to display :return: A string in the form: ```<text before>[<text inside>]<text after>``` describing the text within and around the span. """ before_text = self.target_text[self.begin - num_chars:self.begin] after_text = self.target_text[self.end:self.end + num_chars] if self.begin > num_chars: before_text = "..." + before_text if self.end + num_chars < len(self.target_text): after_text = after_text + "..." return f"{before_text}[{self.covered_text}]{after_text}"
[docs]@pd.api.extensions.register_extension_dtype class SpanDtype(pd.api.extensions.ExtensionDtype): """ Panda datatype for a span that represents a range of characters within a target string. """ @property def type(self): # The type for a single row of a column of type Span return Span @property def name(self) -> str: """A string representation of the dtype.""" return "SpanDtype" @classmethod def construct_from_string(cls, string: str): """ See docstring in `ExtensionDType` class in `pandas/core/dtypes/base.py` for information about this method. """ if not isinstance(string, str): raise TypeError( f"'construct_from_string' expects a string, got {type(string)}" ) # Upstream code uses exceptions as part of its normal control flow and # will pass this method bogus class names. if string == cls.__name__: return cls() else: raise TypeError( f"Cannot construct a '{cls.__name__}' from '{string}'") @classmethod def construct_array_type(cls): """ See docstring in `ExtensionDType` class in `pandas/core/dtypes/base.py` for information about this method. """ return SpanArray @property def na_value(self) -> object: """ See docstring in `ExtensionDType` class in `pandas/core/dtypes/base.py` for information about this method. """ return _NULL_SPAN_SINGLETON def __from_arrow__(self, extension_array): """ Convert the given extension array of type ArrowSpanType to a SpanArray. """ from text_extensions_for_pandas.array.arrow_conversion import arrow_to_span return arrow_to_span(extension_array)
_NULL_SPAN_SINGLETON = Span("", Span.NULL_OFFSET_VALUE, Span.NULL_OFFSET_VALUE) _EMPTY_INT_ARRAY = np.zeros(0, dtype=int)
[docs]class SpanArray(pd.api.extensions.ExtensionArray, SpanOpMixin): """ A Pandas `ExtensionArray` that represents a column of character-based spans over a single target text. Spans are represented as `[begin, end)` intervals, where `begin` and `end` are character offsets into the target text. """ def __init__(self, text: Union[str, Sequence[str], np.ndarray, Tuple[StringTable, np.ndarray]], begins: Union[pd.Series, np.ndarray, Sequence[int]], ends: Union[pd.Series, np.ndarray, Sequence[int]] ): """ Factory method for creating instances of this class. :param text: Target text from which the spans of this array are drawn, or a sequence of texts if different spans can have different targets :param begins: Begin offsets of spans (closed) :param ends: End offsets (open) :return: A new `SpanArray` object """ if not isinstance(begins, (pd.Series, np.ndarray, list)): raise TypeError(f"begins is of unsupported type {type(begins)}. " f"Supported types are Series, ndarray and List[int].") if not isinstance(ends, (pd.Series, np.ndarray, list)): raise TypeError(f"ends is of unsupported type {type(ends)}. " f"Supported types are Series, ndarray and List[int].") if len(begins) != len(ends): raise ValueError(f"Received {len(begins)} begin offsets and {len(ends)} " f"offsets. Lengths should be equal.") begins = to_int_array(begins) ends = to_int_array(ends) if isinstance(text, str): # With a single string, every row gets string ID 0 string_table = StringTable.create_single(text) # type: StringTable text_ids = np.zeros_like(begins) # type: np.ndarray elif isinstance(text, tuple): # INTERNAL USE ONLY: String table specified directly. # Note that this branch MUST come before the branch that checks for # sequences of strings, because tuples are sequences. string_table, text_ids = text elif isinstance(text, (collections.abc.Sequence, np.ndarray)): if len(text) != len(begins): # Checked len(begins) == len(ends) earlier raise ValueError(f"Received {len(text)} target text values and " f"{len(begins)} begin offsets. Lengths should be equal.") string_table, text_ids = StringTable.merge_things(text) else: raise TypeError(f"Text argument is of unsupported type {type(text)}") # Begin and end offsets in characters self._begins = begins # type: np.ndarray self._ends = ends # type: np.ndarray self._string_table = string_table # type: Union[StringTable, None] self._text_ids = text_ids # Cached list of other SpanArrays that are exactly the same as this # one. Each element is the result of calling id() self._equivalent_arrays = [] # type: List[int] # Version numbers of elements in self._equivalent_arrays, to ensure that # a change hasn't made the arrays no longer equal self._equiv_array_versions = [] # type: List[int] # Monotonically increasing version number for tracking changes and # invalidating caches self._version = 0 # Flag that tells whether to display details of offsets in Jupyter notebooks self._repr_html_show_offsets = True # type: bool ########################################## # Overrides of superclass methods go here. @property def dtype(self) -> pd.api.extensions.ExtensionDtype: """ See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py` for information about this method. """ return SpanDtype()
[docs] def astype(self, dtype, copy=True): """ See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py` for information about this method. """ dtype = pd.api.types.pandas_dtype(dtype) if isinstance(dtype, SpanDtype): data = self.copy() if copy else self elif isinstance(dtype, pd.StringDtype): # noinspection PyProtectedMember return dtype.construct_array_type()._from_sequence(self, copy=False) else: data = self.to_numpy(dtype=dtype, copy=copy, na_value=_NULL_SPAN_SINGLETON) return data
@property def nbytes(self) -> int: """ See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py` for information about this method. """ return ( self._begins.nbytes + self._ends.nbytes + self._text_ids.nbytes + self._string_table.nbytes() ) def __len__(self) -> int: return len(self._begins) def __getitem__(self, item) -> Union[Span, "SpanArray"]: """ See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py` for information about this method. """ if isinstance(item, int): return Span(self.target_text[item], int(self._begins[item]), int(self._ends[item])) else: # item not an int --> assume it's a numpy-compatible index item = check_array_indexer(self, item) return SpanArray( (self._string_table, self._text_ids[item]), self._begins[item], self._ends[item]) def __setitem__(self, key: Union[int, np.ndarray], value: Any) -> None: """ See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py` for information about this method. """ # Subroutine of the if-else sequence below def _is_sequence_of_spans(seq: Any): if isinstance(seq, SpanArray): return True if not isinstance(seq, (collections.abc.Sequence, np.ndarray)): return False else: # For other sequences, check for everything being Span or None return all(elem is None or isinstance(elem, Span) for elem in seq) key = check_array_indexer(self, key) if isinstance(value, ABCSeries) and isinstance(value.dtype, SpanDtype): value = value.values if value is None or (isinstance(value, collections.abc.Sequence) and len(value) == 0): self._begins[key] = Span.NULL_OFFSET_VALUE self._ends[key] = Span.NULL_OFFSET_VALUE self._text_ids[key] = StringTable.NONE_ID elif isinstance(value, Span): self._begins[key] = value.begin self._ends[key] = value.end self._text_ids[key] = self._string_table.maybe_add_thing(value.target_text) elif ((isinstance(key, slice) or (isinstance(key, np.ndarray) and is_bool_dtype(key.dtype))) and isinstance(value, SpanArray)): self._begins[key] = value.begin self._ends[key] = value.end self._text_ids[key] = self._string_table.maybe_add_things(value.target_text) elif (isinstance(key, np.ndarray) and len(value) > 0 and len(value) == len(key) and _is_sequence_of_spans(value)): for k, v in zip(key, value): self._begins[k] = v.begin self._ends[k] = v.end self._text_ids[k] = self._string_table.maybe_add_thing(v.target_text) else: raise ValueError( f"Attempted to set element {key} (type {type(key)}) of a SpanArray with " f"an object of type {type(value)}") # We just changed the contents of this array, so invalidate any cached # results computed from those contents. self.increment_version() def __eq__(self, other): """ Pandas/Numpy-style array/series comparison function. :param other: Second operand of a Pandas "==" comparison with the series that wraps this TokenSpanArray. :return: Returns a boolean mask indicating which rows match `other`. """ if isinstance(other, (ABCDataFrame, ABCSeries, ABCIndex)): # Rely on pandas to unbox and dispatch to us. return NotImplemented if isinstance(other, Span): mask = np.full(len(self), True, dtype=bool) mask[self.target_text != other.target_text] = False mask[self.begin != other.begin] = False mask[self.end != other.end] = False return mask elif isinstance(other, SpanArray): if len(self) != len(other): raise ValueError("Can't compare arrays of differing lengths " "{} and {}".format(len(self), len(other))) return np.logical_and( self.target_text == other.target_text, np.logical_and( self.begin == other.begin, self.end == other.end ) ) else: # TODO: Return False here once we're sure that this # function is catching all the comparisons that really matter. raise ValueError("Don't know how to compare objects of type " "'{}' and '{}'".format(type(self), type(other))) def __ne__(self, other): if isinstance(other, (ABCDataFrame, ABCSeries, ABCIndex)): # Rely on pandas to unbox and dispatch to us. return NotImplemented return ~(self == other) def __hash__(self): return self._hash def __contains__(self, item) -> bool: """ Return true if scalar item exists in this SpanArray. :param item: scalar Span value. :return: true if item exists in this SpanArray. """ if isinstance(item, Span) and \ item.begin == Span.NULL_OFFSET_VALUE: return Span.NULL_OFFSET_VALUE in self._begins return super().__contains__(item)
[docs] def equals(self, other: "SpanArray"): """ :param other: A second :class:`SpanArray` :return: ``True`` if both arrays have the same target texts (can be a different string object with the same contents) and the same spans in the same order. """ if not isinstance(other, SpanArray): raise TypeError(f"equals() not defined for arguments of type " f"{type(other)}") if self is other: return True # Check for cached result if id(other) in self._equivalent_arrays: cache_ix = self._equivalent_arrays.index(id(other)) else: cache_ix = -1 if (cache_ix >= 0 and other.version == self._equiv_array_versions[cache_ix]): # Cached "equal" result return True elif (not np.array_equal(self.target_text, other.target_text) or not np.array_equal(self.begin, other.begin) or not np.array_equal(self.end, other.end)): # "Not equal" result from slow path if cache_ix >= 0: del self._equivalent_arrays[cache_ix] del self._equiv_array_versions[cache_ix] return False else: # If we get here, self and other are equal, and we had to expend # quite a bit of effort to figure that out. # Cache the result so we don't have to do that again. if cache_ix >= 0: self._equiv_array_versions[cache_ix] = other.version else: self._equivalent_arrays.append(id(other)) self._equiv_array_versions.append(other.version) return True
@classmethod def _concat_same_type( cls, to_concat: Sequence[pd.api.extensions.ExtensionArray] ) -> pd.api.extensions.ExtensionArray: """ See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py` for information about this method. """ span_arrays = [] # type: List["SpanArray"] for tc in to_concat: if not isinstance(tc, SpanArray): raise ValueError(f"Attempted to concatenate a sequence containing a " f"non-SpanArray object via SpanArray._concat_same_type." f" Types are: {[type(t) for t in to_concat]})") span_arrays.append(tc) string_table, text_ids_list = StringTable.merge_tables_and_ids( [s._string_table for s in span_arrays], [s._text_ids for s in span_arrays] ) text_ids = np.concatenate(text_ids_list) begins = np.concatenate([a.begin for a in span_arrays]) ends = np.concatenate([a.end for a in span_arrays]) return SpanArray((string_table, text_ids), begins, ends) @classmethod def _from_sequence(cls, scalars, dtype=None, copy=False): """ See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py` for information about this method. """ if isinstance(scalars, Span): scalars = [scalars] if isinstance(scalars, SpanArray): # Fast path for no-op scalars_as_span_array = scalars # type: SpanArray if copy: return scalars_as_span_array.copy() else: return scalars_as_span_array begins = np.empty(len(scalars), dtype=int) ends = np.empty(len(scalars), dtype=int) target_texts = np.empty(len(scalars), dtype=object) i = 0 for s in scalars: if not isinstance(s, Span): # TODO: Temporary fix for np.nan values, pandas-dev GH#38980 try: if np.isnan(s): # May throw TypeError s = _NULL_SPAN_SINGLETON else: raise TypeError() except TypeError: raise ValueError(f"Can only convert a sequence of Span " f"objects to a SpanArray. Found an " f"object of type {type(s)}") begins[i] = s.begin ends[i] = s.end target_texts[i] = s.target_text i += 1 string_table, text_ids = StringTable.merge_things(target_texts) return SpanArray((string_table, text_ids), begins, ends) @classmethod def _from_factorized(cls, values, original): """ See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py` for information about this method. """ return cls._from_sequence(values) def _values_for_factorize(self) -> Tuple[np.ndarray, Any]: """ See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py` for information about this method. """ return self.astype(object), _NULL_SPAN_SINGLETON
[docs] def isna(self) -> np.array: """ See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py` for information about this method. """ return np.equal(self._begins, Span.NULL_OFFSET_VALUE)
[docs] def copy(self) -> "SpanArray": """ See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py` for information about this method. """ # StringTables are append-only, so shallow copying should be safe copy_str_table = self._string_table return SpanArray((copy_str_table, self._text_ids.copy()), self.begin.copy(), self.end.copy())
[docs] def take( self, indices: Sequence[int], allow_fill: bool = False, fill_value: Any = None ) -> "SpanArray": """ See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py` for information about this method. """ if allow_fill: # From API docs: "[If allow_fill == True, then] negative values in # `indices` indicate missing values. These values are set to # `fill_value`. Any other negative values raise a ``ValueError``." if fill_value is None or \ (np.isscalar(fill_value) and np.isnan(fill_value)): fill_value = _NULL_SPAN_SINGLETON elif not isinstance(fill_value, Span): raise ValueError("Fill value must be Null, nan, or a Span " "(was {})".format(fill_value)) else: # Dummy fill value to keep code below happy fill_value = _NULL_SPAN_SINGLETON # Pandas' internal implementation of take() does most of the heavy # lifting. begins = pd.api.extensions.take( self.begin, indices, allow_fill=allow_fill, fill_value=fill_value.begin ) ends = pd.api.extensions.take( self.end, indices, allow_fill=allow_fill, fill_value=fill_value.end ) text_ids = pd.api.extensions.take( self._text_ids, indices, allow_fill=allow_fill, fill_value=self._string_table.maybe_add_thing(fill_value.target_text) ) # StringTables are append-only, so should be safe to share return SpanArray((self._string_table, text_ids), begins, ends)
def __lt__(self, other): """ Pandas-style array/series comparison function. :param other: Second operand of a Pandas "<" comparison with the series that wraps this TokenSpanArray. :return: Returns a boolean mask indicating which rows are less than `other`. span1 < span2 if span1.end <= span2.begin and both spans are over the same target text. """ if isinstance(other, (ABCDataFrame, ABCSeries, ABCIndex)): # Rely on pandas to unbox and dispatch to us. return NotImplemented elif not isinstance(other, (Span, SpanArray)): raise ValueError(f"'<' relationship not defined for {self} and {other} " f"of types {type(self)} and {type(other)}.") else: offsets_mask = self.end <= other.begin text_mask = self.same_target_text(other) return np.logical_and(offsets_mask, text_mask) def __gt__(self, other): if isinstance(other, (ABCDataFrame, ABCSeries, ABCIndex)): # Rely on pandas to unbox and dispatch to us. return NotImplemented if isinstance(other, (SpanArray, Span)): return other.__lt__(self) else: raise ValueError("'>' relationship not defined for {} and {} " "of types {} and {}" "".format(self, other, type(self), type(other))) def __le__(self, other): # TODO: Figure out what the semantics of this operation should be. raise NotImplementedError() def __ge__(self, other): # TODO: Figure out what the semantics of this operation should be. raise NotImplementedError() def _reduce(self, name, skipna=True, **kwargs): """ See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py` for information about this method. """ if 0 == len(self): # Special case: Empty array # For all aggregates defined so far, we should return NA for this case. return _NULL_SPAN_SINGLETON if name == "sum": # Sum ==> combine, i.e. return the smallest span that contains all # spans in the series if not self.is_single_document: raise ValueError(f"Sum of spans not defined for different target texts.") first_target_text = self.target_text[0] return Span(first_target_text, np.min(self.begin), np.max(self.end)) elif name == "first": return self[0] # return Span(first_target_text, self.begin[0], self.end[0]) else: raise TypeError(f"'{name}' aggregation not supported on a series " f"backed by a SpanArray") #################################################### # Methods that don't override the superclass go here
[docs] @classmethod def make_array(cls, o) -> "SpanArray": """ Make a :class:`SpanArray` object out of any of several types of input. :param o: a :class:`SpanArray` object represented as a :class:`pd.Series`, a list of :class:`Span` objects, or maybe just an actual :class:`SpanArray` (or :class:`TokenSpanArray`) object. :return: :class:`SpanArray` version of ``o``, which may be a pointer to ``o`` or one of its fields. """ if isinstance(o, SpanArray): return o elif isinstance(o, pd.Series): return cls.make_array(o.values) elif isinstance(o, Sequence): return cls._from_sequence(o) elif isinstance(o, Iterable): return cls._from_sequence([e for e in o])
@memoized_property def target_text(self) -> np.ndarray: """ :return: "document" texts that the spans in this array reference, as opposed to the regions of these documents that the spans cover. """ return self._string_table.ids_to_things(self._text_ids) @memoized_property def document_text(self) -> Union[str, None]: """ :return: if all spans in this array cover the same document, text of that document. Raises a :class:`ValueError` if the array is empty or if the Spans in this array cover more than one document. """ if len(self._text_ids) == 0: raise ValueError("An empty array has no document text") if not self.is_single_document: raise ValueError("Spans in array cover more than one document") else: # Look up first text directly so we don't materialize the target_text # property when it's not needed. return self._string_table.id_to_thing(self._text_ids[0]) @memoized_property def is_single_document(self) -> bool: """ :return: True if there is at least one span in the and every span is over the same target text. """ # NOTE: For legacy reasons, this method is currently inconsistent with the method # by the same name in TokenSpanArray. TokenSpanArray.is_single_document() returns # True on an empty array, while SpanArray.is_single_document() returns false. if len(self) == 0: # If there are zero spans, then there are zero documents. return False elif self._string_table.num_things == 1: # Only one string; make sure that this array has a non-null value for b in self._begins: if b != Span.NULL_OFFSET_VALUE: return True # All nulls --> zero spans return False else: # More than one string in the StringTable and at least one span. return self._is_single_document_slow_path() def _is_single_document_slow_path(self) -> bool: # Slow but reliable way to test whether everything in this SpanArray is from # the same document. # Checks whether every span has the same text ID. # Ignores NAs when making this comparison. # First we need to find the first text ID that is not NA first_text_id = None for b, t in zip(self._begins, self._text_ids): if b != Span.NULL_OFFSET_VALUE: first_text_id = t break if first_text_id is None: # Special case: All NAs --> Zero documents return False return not np.any( np.logical_and( # Row is not null... np.not_equal(self._begins, Span.NULL_OFFSET_VALUE), # ...and is over a different text than the first row's text ID np.not_equal(self._text_ids, first_text_id), ) )
[docs] def split_by_document(self) -> List["SpanArray"]: """ :return: A list of slices of this `SpanArray` that cover single documents. """ if self.is_single_document: return [self] slices = [] for text_id in self._string_table.ids: mask = self._text_ids == text_id if np.any(mask): slices.append(self[mask]) return slices
@property def begin(self) -> np.ndarray: return self._begins @property def end(self) -> np.ndarray: return self._ends @property def version(self) -> int: """ :return: Monotonically increasing version number that changes every time this array is modified. **NOTE:** This number might not change if a caller obtains a pointer to an internal array and modifies it. Callers who perform such modifications should call `increment_version()` """ return self._version
[docs] def increment_version(self): """ Manually increase the version counter of this array to indicate that the array's contents have changed. Also invalidates any internal cached data derived from the array's state. """ # Invalidate cached computation self._clear_cached_properties() self._equivalent_arrays = [] self._equiv_array_versions = [] # Increment the counter self._version += 1
[docs] def as_tuples(self) -> np.ndarray: """ :returns: (begin, end) pairs as an array of tuples """ return np.concatenate( (self.begin.reshape((-1, 1)), self.end.reshape((-1, 1))), axis=1)
@property def covered_text(self) -> np.ndarray: """ :return: an array of the substrings of `target_text` corresponding to the spans in this array. """ # TODO: Vectorized version of this texts = self.target_text # Need dtype=np.object so we can return nulls result = np.empty(len(self), dtype=object) for i in range(len(self)): if self._begins[i] == Span.NULL_OFFSET_VALUE: # Null value at this index result[i] = None elif texts[i] is None: # Null text and non-null begin/end. Shouldn't happen in normal use but # does occur in some parts of the Pandas regression suite. result[i] = None else: result[i] = texts[i][self._begins[i]:self._ends[i]] return result @memoized_property def normalized_covered_text(self) -> np.ndarray: """ :return: A normalized version of the covered text of the spans in this array. Currently "normalized" means "lowercase". """ # Currently we can't use np.char.lower directly because # self.covered_text needs to be an object array, not a numpy string # array, to allow for null values. covered_text = self.covered_text result = np.empty_like(covered_text) for i in range(len(result)): result[i] = None if covered_text[i] is None else covered_text[i].lower() return result
[docs] def as_frame(self) -> pd.DataFrame: """ Returns a dataframe representation of this column based on Python atomic types. """ return pd.DataFrame({ "begin": self.begin, "end": self.end, "covered_text": self.covered_text })
[docs] def same_target_text(self, other: Union["SpanArray", Span]): """ :param other: Either a single span or an array of spans of the same length as this one :return: Numpy array containing a boolean mask of all entries that have the same target text. Two spans with target text of None are considered to have the same target text. """ if isinstance(other, Span): other_id = self._string_table.thing_to_id(other.target_text) return self._text_ids == other_id elif isinstance(other, SpanArray): other_ids = self._string_table.things_to_ids(other.target_text) return self._text_ids == other_ids else: raise TypeError(f"same_target_text not defined for input type " f"{type(other)}")
[docs] def overlaps(self, other: Union["SpanArray", Span]): """ :param other: Either a single span or an array of spans of the same length as this one :return: Numpy array containing a boolean mask of all entries that overlap the corresponding element of `other` """ if not isinstance(other, (Span, SpanArray)): raise TypeError(f"overlaps not defined for input type " f"{type(other)}") # Replicate the logic in Span.overlaps() with boolean masks same_text_mask = self.same_target_text(other) exact_equal_mask = np.logical_and(self.begin == other.begin, self.end == other.end) begin_ge_end_mask = other.begin >= self.end end_le_begin_mask = other.end <= self.begin # (self.target_text == other.target_text) and ( # (self.begin == other.begin and self.end == other.end) # or not (other.begin >= self.end or other.end <= self.begin) # ) return ( np.logical_and( same_text_mask, np.logical_or( exact_equal_mask, np.logical_not( np.logical_or(begin_ge_end_mask, end_le_begin_mask) ) ) ) )
[docs] def contains(self, other: Union["SpanArray", Span]): """ :param other: Either a single span or an array of spans of the same length as this one :return: Numpy array containing a boolean mask of all entries that contain the corresponding element of `other` """ if not isinstance(other, (Span, SpanArray)): raise TypeError(f"contains not defined for input type " f"{type(other)}") # Replicate the logic in Span.contains() with boolean masks same_text_mask = self.same_target_text(other) begin_ge_begin_mask = other.begin >= self.begin end_le_end_mask = other.end <= self.end return ( np.logical_and( same_text_mask, np.logical_and(begin_ge_begin_mask, end_le_end_mask) ) )
def _repr_html_(self) -> str: """ HTML pretty-printing of a series of spans for Jupyter notebooks. """ return jupyter.pretty_print_html(self, self._repr_html_show_offsets) @property def repr_html_show_offsets(self): """ @returns: Whether the HTML/Jupyter notebook representation of this array will contain a table of span offsets in addition to the marked-up target text. """ return self._repr_html_show_offsets @repr_html_show_offsets.setter def repr_html_show_offsets(self, show_offsets: bool): self._repr_html_show_offsets = show_offsets ########################################## # Keep private and protected methods here. @memoized_property def _hash(self): return hash((self.target_text.tobytes(), self._begins.tobytes(), self._ends.tobytes())) # noinspection PyMethodMayBeStatic def _cached_property_names(self) -> List[str]: """ :return: names of cached properties whose values are computed on demand and invalidated when the set of spans change. """ return ["_hash", "is_single_document", "target_text", "normalized_covered_text", "document_text"] def _clear_cached_properties(self) -> None: """ Remove cached values of memoized properties to reflect changes to the data on which they are based. """ for n in self._cached_property_names(): attr_name = "_{0}".format(n) if hasattr(self, attr_name): delattr(self, attr_name) def __arrow_array__(self, type=None): """ Conversion of this Array to a pyarrow.ExtensionArray. :param type: Optional type passed to arrow for conversion, not used :return: pyarrow.ExtensionArray of type ArrowSpanType """ from text_extensions_for_pandas.array.arrow_conversion import span_to_arrow return span_to_arrow(self)