#
# Copyright (c) 2020 IBM Corp.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# token_span.py
#
# Part of text_extensions_for_pandas
#
# Pandas extensions to support columns of spans with token offsets.
#
import collections.abc
import textwrap
from typing import *
import numpy as np
import pandas as pd
from memoized_property import memoized_property
# noinspection PyProtectedMember
from pandas.api.types import is_bool_dtype
from pandas.core.dtypes.generic import ABCDataFrame, ABCSeries
try:
from pandas.core.dtypes.generic import ABCIndex
except ImportError:
# ABCIndexClass changed to ABCIndex in Pandas 1.3
# noinspection PyUnresolvedReferences
from pandas.core.dtypes.generic import ABCIndexClass as ABCIndex
from pandas.core.indexers import check_array_indexer
from text_extensions_for_pandas.array.span import (
Span,
SpanArray,
SpanDtype,
SpanOpMixin,
)
# Internal imports
from text_extensions_for_pandas.array.token_table import TokenTable
from text_extensions_for_pandas.util import to_int_array
def _check_same_tokens(obj1, obj2):
if isinstance(obj1, TokenSpan) and isinstance(obj2, TokenSpan):
return obj1.tokens.equals(obj2.tokens)
if not (isinstance(obj1, TokenSpanArray) or isinstance(obj2, TokenSpanArray)):
raise TypeError(f"Expected some combination of TokenSpan and TokenSpanArray, "
f"but received {type(obj1)} and {type(obj2)}")
same_tokens_mask = (
obj1.same_tokens(obj2) if isinstance(obj1, TokenSpanArray)
else obj2.same_tokens(obj1))
if not np.all(same_tokens_mask):
raise ValueError(
f"TokenSpanArrays are over different sets of tokens "
f"(got {obj1.tokens} and {obj2.tokens})\n"
f"Comparison result: {same_tokens_mask}"
)
class TokenSpanOpMixin(SpanOpMixin):
"""
Mixin class to define common operations between TokenSpan and TokenSpanArray.
"""
def __add__(self, other) -> Union[Span, "TokenSpan", SpanArray, "TokenSpanArray"]:
"""
Add a pair of spans and/or span arrays.
span1 + span2 == minimal span that covers both spans
:param other: TokenSpan, Span, TokenSpanArray, or SpanArray
:return: minimal span (or array of spans) that covers both inputs.
"""
if isinstance(self, TokenSpan) and isinstance(other, TokenSpan):
# TokenSpan + TokenSpan = TokenSpan
_check_same_tokens(self, other)
return TokenSpan(self.tokens, min(self.begin_token, other.begin_token),
max(self.end_token, other.end_token))
elif isinstance(self, (TokenSpan, TokenSpanArray)) and \
isinstance(other, (TokenSpan, TokenSpanArray)):
# TokenSpanArray + TokenSpan* = TokenSpanArray
_check_same_tokens(self, other)
return TokenSpanArray(
self.tokens,
np.minimum(self.begin_token, other.begin_token),
np.maximum(self.end_token, other.end_token))
else:
return super().__add__(other)
[docs]class TokenSpan(Span, TokenSpanOpMixin):
"""
Python object representation of a single span with token offsets; that
is, a single row of a `TokenSpanArray`.
This class is also a subclass of `Span` and can return character-level
information.
An offset of `TokenSpan.NULL_OFFSET_VALUE` (currently -1) indicates
"not a span" in the sense that NaN is "not a number".
"""
def __init__(self, tokens: Any, begin_token: int, end_token: int):
"""
:param tokens: Tokenization information about the document, including
the target text. Must be a type that :func:`SpanArray.make_array()`
can convert to a `SpanArray`.
:param begin_token: Begin offset (inclusive) within the tokenized text,
:param end_token: End offset; exclusive, one past the last token
"""
tokens = SpanArray.make_array(tokens)
if TokenSpan.NULL_OFFSET_VALUE != begin_token and begin_token < 0:
raise ValueError(
f"Begin token offset must be NULL_OFFSET_VALUE or "
f"greater than zero (got {begin_token})"
)
if TokenSpan.NULL_OFFSET_VALUE != begin_token and end_token < begin_token:
raise ValueError(
f"End must be >= begin (got {begin_token} and " f"{end_token}"
)
if begin_token > len(tokens):
raise ValueError(
f"Begin token offset of {begin_token} larger than "
f"number of tokens ({len(tokens)})"
)
if end_token > len(tokens) + 1:
raise ValueError(
f"End token offset of {end_token} larger than "
f"number of tokens + 1 ({len(tokens)} + 1)"
)
if len(tokens) == 0 and begin_token != TokenSpan.NULL_OFFSET_VALUE:
raise ValueError(
f"Tried to create a non-null TokenSpan over an empty list of tokens."
)
if TokenSpan.NULL_OFFSET_VALUE == begin_token:
if TokenSpan.NULL_OFFSET_VALUE != end_token:
raise ValueError(
"Begin offset with special 'null' value {} "
"must be paired with an end offset of {}",
TokenSpan.NULL_OFFSET_VALUE,
TokenSpan.NULL_OFFSET_VALUE,
)
begin_char_off = end_char_off = Span.NULL_OFFSET_VALUE
else:
begin_char_off = tokens.begin[begin_token]
end_char_off = (
begin_char_off
if begin_token == end_token
else tokens.end[end_token - 1]
)
if len(tokens) == 0:
doc_text = None
elif not tokens.is_single_document:
raise ValueError("Tokens must be from exactly one document.")
else:
doc_text = tokens.document_text
super().__init__(doc_text, begin_char_off, end_char_off)
self._tokens = tokens
self._begin_token = begin_token
self._end_token = end_token
[docs] @classmethod
def make_null(cls, tokens):
"""
Convenience method for building null spans.
:param tokens: Tokens of the target string
:return: A null span over the indicated tokens
"""
return TokenSpan(
tokens, TokenSpan.NULL_OFFSET_VALUE, TokenSpan.NULL_OFFSET_VALUE
)
# Set this flag to True to use offets in tokens, not characters, in the
# string representation of TokenSpans globally.
USE_TOKEN_OFFSETS_IN_REPR = False
def __repr__(self) -> str:
if TokenSpan.NULL_OFFSET_VALUE == self._begin_token:
return "NA"
elif TokenSpan.USE_TOKEN_OFFSETS_IN_REPR:
return "[{}, {}): '{}'".format(
self.begin_token, self.end_token, textwrap.shorten(self.covered_text, 80)
)
else:
return "[{}, {}): '{}'".format(
self.begin, self.end, textwrap.shorten(self.covered_text, 80)
)
def __eq__(self, other):
if isinstance(other, TokenSpan) and self.tokens.equals(other.tokens):
return (
self.begin_token == other.begin_token
and self.end_token == other.end_token)
else:
# Different tokens, or no tokens, or not a span ==> Fall back on superclass
return Span.__eq__(self, other)
def __hash__(self):
# Use superclass hash function so that hash and __eq__ are consistent
return Span.__hash__(self)
def __lt__(self, other):
"""
span1 < span2 if span1.end <= span2.begin
"""
if isinstance(other, TokenSpan):
# Use token offsets when available
return self.end_token <= other.begin_token
else:
return Span.__lt__(self, other)
@property
def tokens(self):
return self._tokens
@property
def begin_token(self):
return self._begin_token
@property
def end_token(self):
return self._end_token
_EMPTY_SPAN_ARRAY_SINGLETON = SpanArray("", [], [])
_NULL_TOKEN_SPAN_SINGLETON = TokenSpan(_EMPTY_SPAN_ARRAY_SINGLETON,
Span.NULL_OFFSET_VALUE, Span.NULL_OFFSET_VALUE)
[docs]@pd.api.extensions.register_extension_dtype
class TokenSpanDtype(SpanDtype):
"""
Pandas datatype for a span that represents a range of tokens within a
target string.
"""
@property
def type(self):
# The type for a single row of a column of type TokenSpan
return TokenSpan
@property
def name(self) -> str:
""":return: A string representation of the dtype."""
return "TokenSpanDtype"
@property
def na_value(self) -> object:
"""
See docstring in `ExtensionDType` class in `pandas/core/dtypes/base.py`
for information about this method.
"""
return _NULL_TOKEN_SPAN_SINGLETON
@classmethod
def construct_array_type(cls):
"""
See docstring in `ExtensionDType` class in `pandas/core/dtypes/base.py`
for information about this method.
"""
return TokenSpanArray
def __from_arrow__(self, extension_array):
"""
Convert the given extension array of type ArrowTokenSpanType to a
TokenSpanArray.
"""
from text_extensions_for_pandas.array.arrow_conversion import arrow_to_token_span
return arrow_to_token_span(extension_array)
_NOT_A_DOCUMENT_TEXT = "This string is not the text of a document."
_EMPTY_INT_ARRAY = np.zeros(0, dtype=int)
# Singleton instance of the SpanArray value that corresponds to NA for tokens
# NULL_TOKENS_VALUE = SpanArray("", [], [])
[docs]class TokenSpanArray(SpanArray, TokenSpanOpMixin):
"""
A Pandas :class:`ExtensionArray` that represents a column of token-based spans
over a single target text.
Spans are represented internally as ``[begin_token, end_token)`` intervals, where
the properties ``begin_token`` and ``end_token`` are *token* offsets into the target
text. As with the parent class :class:`SpanArray`, the properties ``begin`` and
``end`` of a :class:`TokenSpanArray` return *character* offsets.
Null values are encoded with begin and end offsets of
``TokenSpan.NULL_OFFSET_VALUE``.
Fields:
* ``self._tokens``: Reference to the target string's tokens as a
`SpanArray`. For now, references to different `SpanArray`
objects are treated as different even if the arrays have the same
contents.
* ``self._begin_tokens``: Numpy array of integer offsets in tokens. An offset
of TokenSpan.NULL_OFFSET_VALUE here indicates a null value.
* ``self._end_tokens``: Numpy array of end offsets (1 + last token in span).
"""
def __init__(self, tokens: Union[SpanArray, Sequence[SpanArray]],
begin_tokens: Union[pd.Series, np.ndarray, Sequence[int]] = None,
end_tokens: Union[pd.Series, np.ndarray, Sequence[int]] = None):
"""
:param tokens: Character-level span information about the underlying
tokens. Can be a single set of tokens, covering all spans, or a separate
`SpanArray` pointer for every span.
:param begin_tokens: Array of begin offsets measured in tokens
:param end_tokens: Array of end offsets measured in tokens
"""
# Superclass constructor expects values for things that the subclass doesn't
# use.
super().__init__(_NOT_A_DOCUMENT_TEXT, _EMPTY_INT_ARRAY, _EMPTY_INT_ARRAY)
if not isinstance(begin_tokens, (pd.Series, np.ndarray, list)):
raise TypeError(f"begin_tokens is of unsupported type {type(begin_tokens)}. "
f"Supported types are Series, ndarray and List[int].")
if not isinstance(end_tokens, (pd.Series, np.ndarray, list)):
raise TypeError(f"end_tokens is of unsupported type {type(end_tokens)}. "
f"Supported types are Series, ndarray and List[int].")
if isinstance(tokens, SpanArray):
if not tokens.is_single_document:
raise ValueError(f"Token spans come from more than one document.")
# Can't just pass a SpanArray to np.full() or np.array(), because Numpy will
# interpret it as an array-like of Span values.
tokens_array = np.empty(len(begin_tokens), dtype=object)
for i in range(len(begin_tokens)):
tokens_array[i] = tokens
tokens = tokens_array
elif isinstance(tokens, collections.abc.Sequence):
if len(tokens) != len(begin_tokens):
raise ValueError(f"Received {len(tokens)} arrays of tokens and "
f"{len(begin_tokens)} begin offsets. "
f"Lengths should be equal.")
# Can't just pass a SpanArray to np.array(), because Numpy will interpret it
# as an array-like of Span values.
tokens_array = np.empty(len(begin_tokens), dtype=object)
for i in range(len(begin_tokens)):
tokens_array[i] = tokens[i]
tokens = tokens_array
elif isinstance(tokens, np.ndarray):
if len(tokens) != len(begin_tokens):
raise ValueError(f"Received {len(tokens)} arrays of tokens and "
f"{len(begin_tokens)} begin offsets. "
f"Lengths should be equal.")
if (len(tokens) > 0
and tokens[0] is not None
and not isinstance(tokens[0], SpanArray)):
raise TypeError(f"Tokens object for row 0 is of unexpected type "
f"{type(tokens[0])}. Type should be SpanArray.")
else:
raise TypeError(f"Expected SpanArray or list of SpanArray as tokens "
f"but got {type(tokens)}")
self._tokens = tokens
self._begin_tokens = to_int_array(begin_tokens)
self._end_tokens = to_int_array(end_tokens)
[docs] @staticmethod
def from_char_offsets(tokens: Any) -> "TokenSpanArray":
"""
Convenience factory method for wrapping the character-level spans of a
series of tokens into single-token token-based spans.
:param tokens: character-based offsets of the tokens, as any type that
:func:`SpanArray.make_array` understands.
:return: A :class:`TokenSpanArray` containing single-token spans for each of the
tokens in ``tokens``.
"""
begin_tokens = np.arange(len(tokens))
tokens_array = SpanArray.make_array(tokens)
return TokenSpanArray(tokens_array, begin_tokens, begin_tokens + 1)
##########################################
# Overrides of superclass methods go here.
@property
def dtype(self) -> pd.api.extensions.ExtensionDtype:
return TokenSpanDtype()
[docs] def astype(self, dtype, copy=True):
"""
See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py`
for information about this method.
"""
dtype = pd.api.types.pandas_dtype(dtype)
if isinstance(dtype, SpanDtype):
data = self.copy() if copy else self
elif isinstance(dtype, pd.StringDtype):
# noinspection PyProtectedMember
return dtype.construct_array_type()._from_sequence(self, copy=False)
else:
data = self.to_numpy(dtype=dtype, copy=copy,
na_value=_NULL_TOKEN_SPAN_SINGLETON)
return data
@property
def nbytes(self) -> int:
"""
See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py`
for information about this method.
"""
table, _ = TokenTable.merge_things(self.tokens)
return (self._begin_tokens.nbytes + self._end_tokens.nbytes +
table.nbytes())
def __len__(self) -> int:
return len(self._begin_tokens)
def __getitem__(self, item) -> Union[TokenSpan, "TokenSpanArray"]:
"""
See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py`
for information about this method.
"""
if isinstance(item, int):
return TokenSpan(
self.tokens[item], int(self._begin_tokens[item]),
int(self._end_tokens[item])
)
else:
# item not an int --> assume it's a numpy-compatible index
item = check_array_indexer(self, item)
return TokenSpanArray(
self.tokens[item], self.begin_token[item], self.end_token[item]
)
def __setitem__(self, key: Union[int, np.ndarray, list, slice], value: Any) -> None:
"""
See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py`
for information about this method.
"""
key = check_array_indexer(self, key)
if isinstance(value, ABCSeries) and isinstance(value.dtype, SpanDtype):
value = value.values
if value is None or isinstance(value, Sequence) and len(value) == 0:
self._begin_tokens[key] = TokenSpan.NULL_OFFSET_VALUE
self._end_tokens[key] = TokenSpan.NULL_OFFSET_VALUE
elif isinstance(value, TokenSpan):
# Single input span --> one or more target positions
self._begin_tokens[key] = value.begin_token
self._end_tokens[key] = value.end_token
# We'd like to do self._tokens[key] = value.tokens, but NumPy interprets
# value.tokens as an array and gets very confused if you try that.
mask = np.full(len(self._tokens), False, dtype=bool)
mask[key] = True
for i in range(len(self._tokens)):
if mask[i]:
self._tokens[i] = value.tokens
elif ((isinstance(key, slice) or
(isinstance(key, np.ndarray) and is_bool_dtype(key.dtype)))
and isinstance(value, TokenSpanArray)):
# x spans -> x target positions
self._tokens[key] = value.tokens
self._begin_tokens[key] = value.begin_token
self._end_tokens[key] = value.end_token
elif (isinstance(key, np.ndarray) and len(value) > 0 and len(value) == len(key)
and
((isinstance(value, Sequence) and isinstance(value[0], TokenSpan)) or
isinstance(value, TokenSpanArray))):
for k, v in zip(key, value):
self._tokens[k] = v.tokens
self._begin_tokens[k] = v.begin_token
self._end_tokens[k] = v.end_token
else:
raise ValueError(
f"Attempted to set element of TokenSpanArray with "
f"an object of type {type(value)}; current set of "
f"allowed types is {(TokenSpan, TokenSpanArray)}"
)
self._clear_cached_properties()
def __eq__(self, other):
"""
Pandas/Numpy-style array/series comparison function.
:param other: Second operand of a Pandas "==" comparison with the series
that wraps this TokenSpanArray.
:return: Returns a boolean mask indicating which rows match `other`.
"""
if isinstance(other, (ABCDataFrame, ABCSeries, ABCIndex)):
# Rely on pandas to unbox and dispatch to us.
return NotImplemented
elif (isinstance(other, TokenSpanArray) and len(self) == len(other)
and self.same_tokens(other)):
return np.logical_and(
self.begin_token == other.begin_token, self.end_token == other.end_token
)
else:
# Different tokens, no tokens, unexpected type ==> fall back on superclass
return SpanArray.__eq__(self, other)
def __hash__(self):
if self._hash is None:
# Use superclass hash function so that hash() and == are consistent
# across type.
self._hash = SpanArray.__hash__(self)
return self._hash
def __contains__(self, item) -> bool:
"""
Return true if scalar item exists in this TokenSpanArray.
:param item: scalar TokenSpan value.
:return: true if item exists in this TokenSpanArray.
"""
if isinstance(item, TokenSpan) and \
item.begin == TokenSpan.NULL_OFFSET_VALUE:
return TokenSpan.NULL_OFFSET_VALUE in self._begin_tokens
return super().__contains__(item)
def __le__(self, other):
# TODO: Figure out what the semantics of this operation should be.
raise NotImplementedError()
def __ge__(self, other):
# TODO: Figure out what the semantics of this operation should be.
raise NotImplementedError()
@classmethod
def _concat_same_type(
cls, to_concat: Sequence[pd.api.extensions.ExtensionArray]
) -> "TokenSpanArray":
"""
See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py`
for information about this method.
"""
if len(to_concat) == 0:
raise ValueError("Can't concatenate zero TokenSpanArrays")
arrays_to_concat = [] # type: List[TokenSpanArray]
for c in to_concat:
if not isinstance(c, TokenSpanArray):
raise TypeError(f"Tried to concatenate {type(c)} to TokenSpanArray")
arrays_to_concat.append(c)
tokens = np.concatenate([a.tokens for a in arrays_to_concat])
begin_tokens = np.concatenate([a.begin_token for a in arrays_to_concat])
end_tokens = np.concatenate([a.end_token for a in arrays_to_concat])
return cls(tokens, begin_tokens, end_tokens)
@classmethod
def _from_factorized(cls, values, original):
"""
See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py`
for information about this method.
"""
# Because we don't currently override the factorize() class method, the
# "values" input to _from_factorized is a ndarray of TokenSpan objects.
# TODO: Faster implementation of factorize/_from_factorized
# Can't pass SpanArrays to np.array() because SpanArrays are array-like.
begin_tokens = np.array([v.begin_token for v in values], dtype=np.int32)
end_tokens = np.array([v.end_token for v in values], dtype=np.int32)
tokens = np.empty(len(begin_tokens), dtype=object)
i = 0
for v in values:
tokens[i] = v.tokens
i += 1
return cls(tokens, begin_tokens, end_tokens)
@classmethod
def _from_sequence(cls, scalars, dtype=None, copy=False):
"""
See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py`
for information about this method.
"""
if isinstance(scalars, TokenSpan):
scalars = [scalars]
# noinspection PyTypeChecker
tokens = np.empty(len(scalars), object)
begin_tokens = np.empty(len(scalars), np.int32)
end_tokens = np.empty(len(scalars), np.int32)
i = 0
for s in scalars:
if not isinstance(s, TokenSpan):
# TODO: Temporary fix for np.nan values, pandas-dev GH#38980
if np.isnan(s):
s = _NULL_TOKEN_SPAN_SINGLETON
else:
raise ValueError(
f"Can only convert a sequence of TokenSpan "
f"objects to a TokenSpanArray. Found an "
f"object of type {type(s)}"
)
tokens[i] = s.tokens
begin_tokens[i] = s.begin_token
end_tokens[i] = s.end_token
i += 1
return TokenSpanArray(tokens, begin_tokens, end_tokens)
[docs] def isna(self) -> np.array:
"""
See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py`
for information about this method.
"""
# isna() of an ExtensionArray must return a copy that the caller can scribble on.
return self.nulls_mask.copy()
[docs] def copy(self) -> "TokenSpanArray":
"""
See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py`
for information about this method.
"""
ret = TokenSpanArray(
self.tokens, self.begin_token.copy(), self.end_token.copy()
)
# TODO: Copy cached properties
return ret
[docs] def take(
self, indices: Sequence[int], allow_fill: bool = False, fill_value: Any = None
) -> "TokenSpanArray":
"""
See docstring in `ExtensionArray` class in `pandas/core/arrays/base.py`
for information about this method.
"""
# From API docs: "[If allow_fill == True, then] negative values in
# `indices` indicate missing values. These values are set to
# `fill_value`.
if fill_value is None or \
(np.isscalar(fill_value) and np.isnan(fill_value)):
# Replace with a "nan span"
fill_value = _NULL_TOKEN_SPAN_SINGLETON
elif not isinstance(fill_value, TokenSpan):
raise ValueError(
"Fill value must be Null, nan, or a TokenSpan "
"(was {})".format(fill_value)
)
# Pandas' internal implementation of take() does most of the heavy
# lifting.
tokens = pd.api.extensions.take(
self.tokens,
indices,
allow_fill=allow_fill,
fill_value=fill_value.tokens,
)
begin_tokens = pd.api.extensions.take(
self.begin_token,
indices,
allow_fill=allow_fill,
fill_value=fill_value.begin_token,
)
end_tokens = pd.api.extensions.take(
self.end_token,
indices,
allow_fill=allow_fill,
fill_value=fill_value.end_token,
)
return TokenSpanArray(
tokens,
begin_tokens,
end_tokens
)
####################################################
# Methods that don't override the superclass go here
[docs] @classmethod
def make_array(cls, o) -> "TokenSpanArray":
"""
Make a :class:`TokenSpanArray` object out of any of several types of input.
:param o: a :class:`TokenSpanArray` object represented as a :class:`pd.Series`,
a list of :class:`TokenSpan` objects, or an actual :class:`TokenSpanArray`
object.
:return: :class:`TokenSpanArray` version of ``o``, which may be a pointer to ``o`` or
one of its fields.
"""
if isinstance(o, TokenSpanArray):
return o
elif isinstance(o, pd.Series):
return cls.make_array(o.values)
elif isinstance(o, Sequence):
return cls._from_sequence(o)
elif isinstance(o, Iterable):
return cls._from_sequence([e for e in o])
[docs] @classmethod
def align_to_tokens(cls, tokens: Any, spans: Any):
"""
Align a set of character or token-based spans to a specified
tokenization, producing a `TokenSpanArray` of token-based spans.
:param tokens: The tokens to align to, as any type that
:func:`SpanArray.make_array` accepts.
:param spans: The spans to align. These spans must all target the same text
as ``tokens``.
:return: An array of :class:`TokenSpan` objects aligned to the tokens of
``tokens``.
Raises :class:`ValueError` if any of the spans in ``spans`` doesn't start and
end on a token boundary.
"""
tokens = SpanArray.make_array(tokens)
spans = SpanArray.make_array(spans)
if not tokens.is_single_document:
raise ValueError(f"Tokens cover more than one document (tokens are {tokens})")
if not spans.is_single_document:
raise ValueError(f"Spans cover more than one document (spans are {spans})")
# Create and join temporary dataframes
tokens_df = pd.DataFrame({
"token_index": np.arange(len(tokens)),
"token_begin": tokens.begin,
"token_end": tokens.end
})
spans_df = pd.DataFrame({
"span_index": np.arange(len(spans)),
"span_begin": spans.begin,
"span_end": spans.end
})
# Ignore zero-length tokens
# TODO: Is this the right thing to do?
tokens_df = tokens_df[tokens_df["token_begin"] != tokens_df["token_end"]]
begin_matches = pd.merge(tokens_df, spans_df,
left_on="token_begin",
right_on="span_begin",
how="right", indicator=True)
mismatched = begin_matches[begin_matches["_merge"] == "right_only"]
if len(mismatched.index) > 0:
raise ValueError(
f"The following span(s) did not align with the begin offset\n"
f"of any token:\n"
f"{mismatched[['span_index', 'span_begin', 'span_end']]}")
end_matches = pd.merge(tokens_df, spans_df,
left_on="token_end",
right_on="span_end",
how="right", indicator=True)
mismatched = end_matches[end_matches["_merge"] == "right_only"]
if len(mismatched.index) > 0:
raise ValueError(
f"The following span(s) did not align with the end offset\n"
f"of any token:\n"
f"{mismatched[['span_index', 'span_begin', 'span_end']]}")
# Join on span index to get (begin, end) pairs.
begins_and_ends = pd.merge(
begin_matches[["token_index", "span_index"]],
end_matches[["token_index", "span_index"]],
on="span_index", suffixes=("_begin", "_end"),
sort=True)
return TokenSpanArray(tokens,
begins_and_ends["token_index_begin"],
begins_and_ends["token_index_end"] + 1)
@property
def tokens(self) -> np.ndarray:
"""
:return: The tokens over which each TokenSpan in this array are defined as
an ndarray of object.
"""
return self._tokens
@memoized_property
def target_text(self) -> np.ndarray:
"""
:return: "document" texts that the spans in this array reference, as opposed to
the regions of these documents that the spans cover.
"""
# Note that this property overrides the eponymous property in SpanArray
texts = [
None if self.nulls_mask[i]
else self.tokens[i].document_text
for i in range(len(self))
]
return np.array(texts, dtype=object)
@memoized_property
def document_text(self) -> Union[str, None]:
"""
:return: if all spans in this array cover the same document, text of that
document.
Raises a :class:`ValueError` if the array is empty or if the Spans in this
array cover more than one document.
"""
# Checks for zero-length array and multiple docs are in document_tokens()
return self.document_tokens.document_text
@memoized_property
def document_tokens(self) -> Union[SpanArray, None]:
"""
:return: if all spans in this array cover the same tokenization of a single
document, tokens of that document.
Raises a `ValueError` if the array is empty or if the Spans in this
array cover more than one document.
"""
if len(self.tokens) == 0:
raise ValueError("An empty array has no document tokens")
elif not self.is_single_document:
raise ValueError("Spans in array cover more than one document")
else:
return self.tokens[0]
@memoized_property
def nulls_mask(self) -> np.ndarray:
"""
:return: A boolean mask indicating which rows are nulls
"""
return self._begin_tokens == TokenSpan.NULL_OFFSET_VALUE
@memoized_property
def begin(self) -> np.ndarray:
"""
:return: the *character* offsets of the span begins.
"""
result = np.empty_like(self.begin_token, dtype=np.int32)
for i in range(len(self)):
begin_token_ix = self.begin_token[i]
if begin_token_ix == TokenSpan.NULL_OFFSET_VALUE:
result[i] = Span.NULL_OFFSET_VALUE
else:
result[i] = self.tokens[i].begin[begin_token_ix]
return result
@memoized_property
def end(self) -> np.ndarray:
"""
:return: the *character* offsets of the span ends.
"""
# Start out with the end of the last token in each span.
result = np.empty_like(self.begin_token, dtype=np.int32)
for i in range(len(self)):
begin_token_ix = self.begin_token[i]
end_token_ix = self.end_token[i]
if begin_token_ix == TokenSpan.NULL_OFFSET_VALUE:
result[i] = Span.NULL_OFFSET_VALUE
elif begin_token_ix == end_token_ix:
# Zero-length span
result[i] = self.begin[i]
else:
result[i] = self.tokens[i].end[end_token_ix - 1]
return result
@property
def begin_token(self) -> np.ndarray:
"""
:return: Token offsets of the span begins; that is, the index of the
first token in each span.
"""
return self._begin_tokens
@property
def end_token(self) -> np.ndarray:
"""
:return: Token offsets of the span ends. That is, 1 + last token
present in the span, for each span in the column.
"""
return self._end_tokens
[docs] def as_tuples(self) -> np.ndarray:
"""
Returns (begin, end) pairs as an array of tuples
"""
return np.concatenate(
(self.begin.reshape((-1, 1)), self.end.reshape((-1, 1))), axis=1
)
[docs] def increment_version(self):
"""
Override parent class's version of this function to also clear out data cached
in the subclass.
"""
super().increment_version()
@memoized_property
def covered_text(self) -> np.ndarray:
"""
Returns an array of the substrings of `target_text` corresponding to
the spans in this array.
"""
texts = [
None if self.nulls_mask[i]
else self.target_text[i][self.begin[i]:self.end[i]]
for i in range(len(self))
]
return np.array(texts, dtype=object)
[docs] def as_frame(self) -> pd.DataFrame:
"""
Returns a dataframe representation of this column based on Python
atomic types.
"""
return pd.DataFrame(
{
"begin": self.begin,
"end": self.end,
"begin_token": self.begin_token,
"end_token": self.end_token,
"covered_text": self.covered_text,
}
)
[docs] def same_target_text(self, other: Union["SpanArray", Span]):
"""
:param other: Either a single span or an array of spans of the same
length as this one
:return: Numpy array containing a boolean mask of all entries that
have the same target text.
Two spans with target text of None are considered to have the same
target text.
"""
if isinstance(other, (Span, SpanArray)):
return self.target_text == other.target_text
else:
raise TypeError(f"same_target_text not defined for input type "
f"{type(other)}")
[docs] def same_tokens(self, other: Union["TokenSpanArray", TokenSpan]):
"""
:param other: Either a single span or an array of spans of the same
length as this one. Must be token-based.
:return: Numpy array containing a boolean mask of all entries that
are over the same tokenization of the same target text.
Two spans with target text of None are considered to have the same
target text.
"""
if not isinstance(other, (TokenSpan, TokenSpanArray)):
raise TypeError(f"same_tokens not defined for input type "
f"{type(other)}")
if self.is_single_tokenization:
# Fast path for common case of one set of tokens
other_tokens = (other.tokens if isinstance(other, TokenSpan)
else other.document_tokens)
return self.document_tokens.equals(other_tokens)
# Slow path: Compare each element.
if isinstance(other, TokenSpan):
return np.array([t.equals(other.tokens) for t in self.tokens], dtype=bool)
else: # isinstance(other, TokenSpanArray)
return np.array([self.tokens[i].equals(other.tokens[i])
for i in range(len(self.tokens))], dtype=bool)
@memoized_property
def is_single_document(self) -> bool:
"""
:return: True if every span in this array is over the same target text
or if there are zero spans in this array.
"""
# NOTE: For legacy reasons, this method is currently inconsistent with the method
# by the same name in SpanArray. TokenSpanArray.is_single_document() returns
# True on an empty array, while SpanArray.is_single_document() returns False.
if len(self) == 0:
# If there are zero spans, we consider there to be one document with the
# document text being whatever is the document text for our tokens.
return True
else:
# More than one tokenization and at least one span. Check whether
# every span has the same text.
# Find the first span that is not NA
first_target_text = None
for b, t in zip(self._begin_tokens, self.target_text):
if b != Span.NULL_OFFSET_VALUE:
first_target_text = t
break
if first_target_text is None:
# Special case: All NAs --> Zero documents
return True
return not np.any(np.logical_and(
# Row is not null...
np.not_equal(self._begin_tokens, Span.NULL_OFFSET_VALUE),
# ...and is over a different text than the first row's text ID
np.not_equal(self.target_text, first_target_text)))
[docs] def split_by_document(self) -> List["SpanArray"]:
"""
:return: A list of slices of this `SpanArray` that cover single documents.
"""
if self.is_single_document:
return [self]
# For now, treat each tokenization as a different document to avoid O(n^2)
# behavior.
# TODO: Consider a more in-depth comparison to capture mixtures of different
# tokenizations of the same document.
token_table, token_ids = TokenTable.merge_things(self.tokens)
result = []
for tokens_id in token_table.ids:
mask = token_ids == tokens_id
if np.any(mask):
result.append(self[mask])
return result
@memoized_property
def is_single_tokenization(self) -> bool:
"""
:return: True if every span in this array is over the same tokenization
of the same target text or if there are zero spans in this array.
"""
if len(self) == 0:
# If there are zero spans, we consider there to be one document with the
# document text being whatever is the first element of the StringTable.
return True
else:
first_t = self.tokens[0]
for t in self.tokens:
if not t.equals(first_t):
return False
return True
##########################################
# Keep private and protected methods here.
def _cached_property_names(self) -> List[str]:
"""
:return: names of cached properties whose values are computed on demand
and invalidated when the set of spans change.
"""
# Superclass has its own list.
return super()._cached_property_names() + [
"nulls_mask", "have_nulls", "begin", "end", "target_text",
"covered_text", "document_tokens"
]
def __arrow_array__(self, type=None):
"""
Conversion of this Array to a pyarrow.ExtensionArray.
:param type: Optional type passed to arrow for conversion, not used
:return: pyarrow.ExtensionArray of type ArrowTokenSpanType
"""
from text_extensions_for_pandas.array.arrow_conversion import token_span_to_arrow
return token_span_to_arrow(self)