Source code for text_extensions_for_pandas.spanner.join

#
#  Copyright (c) 2020 IBM Corp.
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

#
# join.py
#
# Span-specific join operators.
#

import numpy as np
import pandas as pd

from text_extensions_for_pandas import (Span, SpanArray, SpanDtype, TokenSpanDtype)


[docs]def adjacent_join( first_series: pd.Series, second_series: pd.Series, first_name: str = "first", second_name: str = "second", min_gap: int = 0, max_gap: int = 0, ): """ Compute the join of two series of spans, where a pair of spans is considered to match if they are adjacent to each other in the text. :param first_series: Spans that appear earlier. dtype must be TokenSpanDtype. :param second_series: Spans that come after. dtype must be TokenSpanDtype. :param first_name: Name to give the column in the returned dataframe that is derived from `first_series`. :param second_name: Column name for spans from `second_series` in the returned DataFrame. :param min_gap: Minimum number of spans allowed between matching pairs of spans, inclusive. :param max_gap: Maximum number of spans allowed between matching pairs of spans, inclusive. :returns: a new `pd.DataFrame` containing all pairs of spans that match the join predicate. Columns of the DataFrame will be named according to the `first_name` and `second_name` arguments. """ # For now we always make the first series the outer. # TODO: Make the larger series the outer and adjust the join logic # below accordingly. outer = pd.DataFrame( {"outer_span": first_series, "outer_end": first_series.values.end_token} ) # Inner series gets replicated for every possible offset so we can use # Pandas' high-performance equijoin inner_span_list = [second_series] * (max_gap - min_gap + 1) outer_end_list = [ # Outer comes first, so join predicate is: # outer_span.end + gap == inner_span.begin # or equivalently: # outer_span.end = inner_span.begin - gap second_series.values.begin_token - gap for gap in range(min_gap, max_gap + 1) ] inner = pd.DataFrame( { "inner_span": pd.concat(inner_span_list), "outer_end": np.concatenate(outer_end_list), } ) joined = outer.merge(inner) # Now we have a DataFrame with the schema # [outer_span, outer_end, inner_span] return pd.DataFrame( {first_name: joined["outer_span"], second_name: joined["inner_span"]} )
[docs]def overlap_join( first_series: pd.Series, second_series: pd.Series, first_name: str = "first", second_name: str = "second", ): """ Compute the join of two series of spans, where a pair of spans is considered to match if they overlap. :param first_series: First set of spans to join, wrapped in a `pd.Series` :param second_series: Second set of spans to join. :param first_name: Name to give the column in the returned dataframe that is derived from `first_series`. :param second_name: Column name for spans from `second_series` in the returned DataFrame. :returns: a new `pd.DataFrame` containing all pairs of spans that match the join predicate. Columns of the DataFrame will be named according to the `first_name` and `second_name` arguments. """ # Python type checking doesn't enforce Pandas series dtypes. if (not isinstance(first_series.dtype, (SpanDtype, TokenSpanDtype)) or not isinstance(second_series.dtype, (SpanDtype, TokenSpanDtype))): raise ValueError(f"Series must be of dtype {SpanDtype.name} " f"or {TokenSpanDtype.name}. Dtypes received were " f"{first_series.dtype} and {second_series.dtype}") # For now we always use character offsets. # TODO: Use token offsets of both sides of the join are TokenSpanArrays def _get_char_offsets(s: pd.Series): # noinspection PyUnresolvedReferences return s.array.begin, s.array.end first_begins, first_ends = _get_char_offsets(first_series) second_begins, second_ends = _get_char_offsets(second_series) # The algorithm here is what is known in the ER literature as "blocking". # First evaluate a looser predicate that can be translated to an equijoin, # then filter using the actual join predicate. # Compute average span length to determine blocking factor # TODO: Is average the right aggregate to use here? total_len = np.sum(first_ends - first_begins) + np.sum(second_ends - second_begins) num_spans = len(first_series.index) + len(second_series.index) average_len = 0. if 0 == num_spans else total_len / num_spans blocking_factor = max(1, int(np.floor(average_len))) # Generate a table of which blocks each row of the input participates in. # Use primary key (index) values because inputs can have duplicate spans. def _make_table(name, index, begins, ends): # TODO: Vectorize this part. indexes = [] blocks = [] for i, b, e in zip(index, begins, ends): for block in range(b // blocking_factor, e // blocking_factor + 1): indexes.append(i) blocks.append(block) return pd.DataFrame({name: indexes, "block": blocks}) first_table = _make_table("first", first_series.index, first_begins, first_ends) second_table = _make_table( "second", second_series.index, second_begins, second_ends ) # Do an equijoin on block ID and remove duplicates from the resulting # <first key, second key> relation. merged_table = pd.merge(first_table, second_table) key_pairs = merged_table.groupby(["first", "second"]).aggregate( {"first": "first", "second": "first"} ) # Join the keys back with the original series to form the result, plus # some extra values due to blocking. block_result = pd.DataFrame( { first_name: first_series.loc[key_pairs["first"]].array, second_name: second_series.loc[key_pairs["second"]].array, } ) # Filter out extra values from blocking mask = block_result[first_name].array.overlaps(block_result[second_name].array) return block_result[mask].reset_index(drop=True)
[docs]def contain_join( first_series: pd.Series, second_series: pd.Series, first_name: str = "first", second_name: str = "second", ): """ Compute the join of two series of spans, where a pair of spans is considered to match if the second span is contained within the first. :param first_series: First set of spans to join, wrapped in a `pd.Series` :param second_series: Second set of spans to join. These are the ones that are contained within the first set where the join predicate is satisfied. :param first_name: Name to give the column in the returned dataframe that is derived from `first_series`. :param second_name: Column name for spans from `second_series` in the returned DataFrame. :returns: a new `pd.DataFrame` containing all pairs of spans that match the join predicate. Columns of the DataFrame will be named according to the `first_name` and `second_name` arguments. """ # For now we just run overlap_join() and filter the results. # TODO: Factor out the blocking code so that we can avoid filtering # and regenerating the index twice. overlap_result = overlap_join(first_series, second_series, first_name, second_name) mask = overlap_result[first_name].values.contains( overlap_result[second_name].values ) return overlap_result[mask].reset_index(drop=True)
[docs]def unpack_semijoin(target_region: Span, model_results: pd.DataFrame) -> pd.DataFrame: """ Unpack the results of evaluating an extraction model, such as dependency parsing or named entity recognition, using a semijoin strategy to reduce the amount of text over which the model is applied. To use :func:`unpack_semijoin`, first identify regions of the text that you wish to run the model. Then run the model over the text of those regions to produce spans whose begin and end offsets are relative to the text of each distinct target region. Then you can pass the spans and the model results to this function to produce result spans whose begin and end offsets are relative to the original document text. :param target_region: Span indicating a section of the original document text over which the model was applied. :param model_results: Results from running your model over ``target_region``, as a :class:`pd.DataFrame`. :returns: A :class:`pd.DataFrame` with the same schema as ``model_results``, but with all spans converted from spans over the target text of ``target_region`` to spans over the original document text. """ doc_text = target_region.target_text region_offset = target_region.begin # Make a copy of the DataFrame, then modify span columns of # the copy in place. result = model_results.copy() for i in range(len(result.columns)): if isinstance(model_results.dtypes[i], SpanDtype): column_name = model_results.columns[i] raw_spans = result[column_name].array result[column_name] = SpanArray( doc_text, raw_spans.begin + region_offset, raw_spans.end + region_offset) return result