Source code for text_extensions_for_pandas.spanner.join

#
#  Copyright (c) 2020 IBM Corp.
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

#
# join.py
#
# Span-specific join operators.
#

import numpy as np
import pandas as pd

from text_extensions_for_pandas import SpanDtype, TokenSpanDtype


[docs]def adjacent_join( first_series: pd.Series, second_series: pd.Series, first_name: str = "first", second_name: str = "second", min_gap: int = 0, max_gap: int = 0, ): """ Compute the join of two series of spans, where a pair of spans is considered to match if they are adjacent to each other in the text. :param first_series: Spans that appear earlier. dtype must be TokenSpanDtype. :param second_series: Spans that come after. dtype must be TokenSpanDtype. :param first_name: Name to give the column in the returned dataframe that is derived from `first_series`. :param second_name: Column name for spans from `second_series` in the returned DataFrame. :param min_gap: Minimum number of spans allowed between matching pairs of spans, inclusive. :param max_gap: Maximum number of spans allowed between matching pairs of spans, inclusive. :returns: a new `pd.DataFrame` containing all pairs of spans that match the join predicate. Columns of the DataFrame will be named according to the `first_name` and `second_name` arguments. """ # For now we always make the first series the outer. # TODO: Make the larger series the outer and adjust the join logic # below accordingly. outer = pd.DataFrame( {"outer_span": first_series, "outer_end": first_series.values.end_token} ) # Inner series gets replicated for every possible offset so we can use # Pandas' high-performance equijoin inner_span_list = [second_series] * (max_gap - min_gap + 1) outer_end_list = [ # Outer comes first, so join predicate is: # outer_span.end + gap == inner_span.begin # or equivalently: # outer_span.end = inner_span.begin - gap second_series.values.begin_token - gap for gap in range(min_gap, max_gap + 1) ] inner = pd.DataFrame( { "inner_span": pd.concat(inner_span_list), "outer_end": np.concatenate(outer_end_list), } ) joined = outer.merge(inner) # Now we have a DataFrame with the schema # [outer_span, outer_end, inner_span] return pd.DataFrame( {first_name: joined["outer_span"], second_name: joined["inner_span"]} )
[docs]def overlap_join( first_series: pd.Series, second_series: pd.Series, first_name: str = "first", second_name: str = "second", ): """ Compute the join of two series of spans, where a pair of spans is considered to match if they overlap. :param first_series: First set of spans to join, wrapped in a `pd.Series` :param second_series: Second set of spans to join. :param first_name: Name to give the column in the returned dataframe that is derived from `first_series`. :param second_name: Column name for spans from `second_series` in the returned DataFrame. :returns: a new `pd.DataFrame` containing all pairs of spans that match the join predicate. Columns of the DataFrame will be named according to the `first_name` and `second_name` arguments. """ # Python type checking doesn't enforce Pandas series dtypes. if (not isinstance(first_series.dtype, (SpanDtype, TokenSpanDtype)) or not isinstance(second_series.dtype, (SpanDtype, TokenSpanDtype))): raise ValueError(f"Series must be of dtype {SpanDtype.name} " f"or {TokenSpanDtype.name}. Dtypes received were " f"{first_series.dtype} and {second_series.dtype}") # For now we always use character offsets. # TODO: Use token offsets of both sides of the join are TokenSpanArrays def _get_char_offsets(s: pd.Series): # noinspection PyUnresolvedReferences return s.array.begin, s.array.end first_begins, first_ends = _get_char_offsets(first_series) second_begins, second_ends = _get_char_offsets(second_series) # The algorithm here is what is known in the ER literature as "blocking". # First evaluate a looser predicate that can be translated to an equijoin, # then filter using the actual join predicate. # Compute average span length to determine blocking factor # TODO: Is average the right aggregate to use here? total_len = np.sum(first_ends - first_begins) + np.sum(second_ends - second_begins) num_spans = len(first_series.index) + len(second_series.index) average_len = 0. if 0 == num_spans else total_len / num_spans blocking_factor = max(1, int(np.floor(average_len))) # Generate a table of which blocks each row of the input participates in. # Use primary key (index) values because inputs can have duplicate spans. def _make_table(name, index, begins, ends): # TODO: Vectorize this part. indexes = [] blocks = [] for i, b, e in zip(index, begins, ends): for block in range(b // blocking_factor, e // blocking_factor + 1): indexes.append(i) blocks.append(block) return pd.DataFrame({name: indexes, "block": blocks}) first_table = _make_table("first", first_series.index, first_begins, first_ends) second_table = _make_table( "second", second_series.index, second_begins, second_ends ) # Do an equijoin on block ID and remove duplicates from the resulting # <first key, second key> relation. merged_table = pd.merge(first_table, second_table) key_pairs = merged_table.groupby(["first", "second"]).aggregate( {"first": "first", "second": "first"} ) # Join the keys back with the original series to form the result, plus # some extra values due to blocking. block_result = pd.DataFrame( { first_name: first_series.loc[key_pairs["first"]].array, second_name: second_series.loc[key_pairs["second"]].array, } ) # Filter out extra values from blocking mask = block_result[first_name].array.overlaps(block_result[second_name].array) return block_result[mask].reset_index(drop=True)
[docs]def contain_join( first_series: pd.Series, second_series: pd.Series, first_name: str = "first", second_name: str = "second", ): """ Compute the join of two series of spans, where a pair of spans is considered to match if the second span is contained within the first. :param first_series: First set of spans to join, wrapped in a `pd.Series` :param second_series: Second set of spans to join. These are the ones that are contained within the first set where the join predicate is satisfied. :param first_name: Name to give the column in the returned dataframe that is derived from `first_series`. :param second_name: Column name for spans from `second_series` in the returned DataFrame. :returns: a new `pd.DataFrame` containing all pairs of spans that match the join predicate. Columns of the DataFrame will be named according to the `first_name` and `second_name` arguments. """ # For now we just run overlap_join() and filter the results. # TODO: Factor out the blocking code so that we can avoid filtering # and regenerating the index twice. overlap_result = overlap_join(first_series, second_series, first_name, second_name) mask = overlap_result[first_name].values.contains( overlap_result[second_name].values ) return overlap_result[mask].reset_index(drop=True)