Source code for text_extensions_for_pandas.spanner.extract

#
#  Copyright (c) 2020 IBM Corp.
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

#
# extract.py
#
# Variants of the Extract operator from spanner algebra. The Extract operator
# returns sub-spans of a parent span that match a predicate.
#
import collections
import re

import numpy as np
import pandas as pd
import regex

from typing import *

# Internal imports
from text_extensions_for_pandas.array.span import SpanArray, Span
from text_extensions_for_pandas.array.token_span import TokenSpanArray
from text_extensions_for_pandas.io.spacy import simple_tokenizer

# Set to True to use sparse storage for tokens 2-n of n-token dictionary
# entries. First token is always stored dense, of course.
# Currently set to False to avoid spurious Pandas API warnings about conversion
# from sparse to dense.
# TODO: Turn this back on when Pandas fixes the issue with the warning.
_SPARSE_DICT_ENTRIES = False


[docs]def load_dict(file_name: str, tokenizer: "spacy.tokenizer.Tokenizer" = None): """ Load a SystemT-format dictionary file. File format is one entry per line. Tokenizes and normalizes the dictionary entries. :param file_name: Path to dictionary file :param tokenizer: Preconfigured tokenizer object for tokenizing dictionary entries. **Must be the same configuration as the tokenizer used on the target text!** If None, this method will use SpaCy's default English tokenizer. :return: :class:`pd.DataFrame` with the normalized, tokenized dictionary entries. """ with open(file_name, "r") as f: lines = [ line.strip() for line in f.readlines() if len(line.strip()) > 0 and line[0] != "#" ] return create_dict(lines, tokenizer)
[docs]def create_dict( entries: Iterable[str], tokenizer: "spacy.tokenizer.Tokenizer" = None ) -> pd.DataFrame: """ Create a dictionary from a list of entries, where each entry is expressed as a single string. Tokenizes and normalizes the dictionary entries. :param entries: Iterable of strings, one string per dictionary entry. :param tokenizer: Preconfigured tokenizer object for tokenizing dictionary entries. **Must always tokenize the same way as the tokenizer used on the target text!** If None, this method will use tokenizer returned by :func:`text_extensions_for_pandas.io.spacy.simple_tokenizer()`. :return: :class:`pd.DataFrame` with the normalized, tokenized dictionary entries. """ if tokenizer is None: tokenizer = simple_tokenizer() # Tokenize with SpaCy. Produces a SpaCy document object per line. tokenized_entries = [tokenizer(entry.lower()) for entry in entries] # Determine the number of tokens in the longest dictionary entry. max_num_toks = max([len(e) for e in tokenized_entries]) # Generate a column for each token. Go one past the max number of tokens so # that every dictionary entry ends up None-terminated. cols_dict = {} for i in range(max_num_toks + 1): # Extract token i from every entry that has a token i toks_list = [e[i].text if len(e) > i else None for e in tokenized_entries] cols_dict["toks_{}".format(i)] = ( # Sparse storage for tokens 2 and onward toks_list if i == 0 or not _SPARSE_DICT_ENTRIES else pd.SparseArray(toks_list) ) return pd.DataFrame(cols_dict)
[docs]def extract_dict( tokens: Union[SpanArray, pd.Series], dictionary: pd.DataFrame, output_col_name: str = "match", ): """ Identify all matches of a dictionary on a sequence of tokens. :param tokens: :class:`SpanArray` of token information, optionally wrapped in a :class:`pd.Series`. **These tokens must come from the same tokenizer that tokenized the entries of ``dictionary``.** To tokenize with SpaCy, use :func:`text_extensions_for_pandas.io.spacy.make_tokens()`. :param dictionary: The dictionary to match, encoded as a :class:`pd.DataFrame` in the format returned by :func:`load_dict()` :param output_col_name: (optional) name of column of matching spans in the returned DataFrame :return: a single-column DataFrame of token ID spans of dictionary matches """ # Box tokens into a pd.Series if not already boxed. if isinstance(tokens, SpanArray): tokens = pd.Series(tokens) # Wrap the important parts of the tokens series in a temporary dataframe. # noinspection PyUnresolvedReferences toks_tmp = pd.DataFrame( { "token_id": tokens.index, "normalized_text": tokens.array.normalized_covered_text, } ) # Start by matching the first token. matches = pd.merge( dictionary, toks_tmp, left_on="toks_0", right_on="normalized_text" ) matches.rename(columns={"token_id": "begin_token_id"}, inplace=True) matches_col_names = list(matches.columns) # We'll need this later # Check against remaining elements of matching dictionary entries and # accumulate the full set of matches as a list of IntervalIndexes begins_list = [] ends_list = [] max_entry_len = len(dictionary.columns) for match_len in range(1, max_entry_len): # print("Match len: {}".format(match_len)) # Find matches of length match_len. Dictionary entries of this length # will have None in the column "toks_<match_len>". match_locs = pd.isna(matches["toks_{}".format(match_len)]) # print("Completed matches:\n{}".format(matches[match_locs])) match_begins = matches[match_locs]["begin_token_id"].to_numpy() match_ends = match_begins + match_len begins_list.append(match_begins) ends_list.append(match_ends) # For the remaining partial matches against longer dictionary entries, # check the next token by merging with the tokens dataframe. potential_matches = matches[~match_locs].copy() # print("Raw potential matches:\n{}".format(potential_matches)) potential_matches.drop("normalized_text", axis=1, inplace=True) potential_matches["next_token_id"] = ( potential_matches["begin_token_id"] + match_len ) potential_matches = pd.merge( potential_matches, toks_tmp, left_on="next_token_id", right_on="token_id" ) # print("Filtered potential matches:\n{}".format(potential_matches)) potential_matches = potential_matches[ potential_matches["normalized_text"] == potential_matches["toks_{}".format(match_len)] ] # The result of the join has some extra columns that we don't need. matches = potential_matches[matches_col_names] # Gather together all the sets of matches and wrap in a dataframe. begins = np.concatenate(begins_list) ends = np.concatenate(ends_list) result = pd.DataFrame( {output_col_name: TokenSpanArray(tokens.values, begins, ends)} ) # Results are sorted by number of tokens; sort by location instead. result["__begin"] = result[output_col_name].values.begin return result.sort_values("__begin")[[output_col_name]]
[docs]def extract_regex_tok( tokens: Union[SpanArray, pd.Series], compiled_regex: regex.Regex, min_len=1, max_len=1, output_col_name: str = "match", ): """ Identify all (possibly overlapping) matches of a regular expression that start and end on token boundaries. :param tokens: :class:`SpanArray` of token information, optionally wrapped in a `pd.Series`. :param compiled_regex: Regular expression to evaluate. :param min_len: Minimum match length in tokens :param max_len: Maximum match length (inclusive) in tokens :param output_col_name: (optional) name of column of matching spans in the returned DataFrame :returns: A single-column DataFrame containing a span for each match of the regex. """ tokens = SpanArray.make_array(tokens) num_tokens = len(tokens) matches_regex_f = np.vectorize(lambda s: compiled_regex.fullmatch(s) is not None) # The built-in regex functionality of Pandas/Python does not have # an optimized single-pass RegexTok, so generate all the places # where there might be a match and run them through regex.fullmatch(). # Note that this approach is asymptotically inefficient if max_len is large. # TODO: Performance tuning for both small and large max_len matches_list = [] for cur_len in range(min_len, max_len + 1): window_begin_toks = np.arange(0, num_tokens - cur_len + 1) window_end_toks = window_begin_toks + cur_len window_tok_spans = TokenSpanArray(tokens, window_begin_toks, window_end_toks) matches_list.append( pd.Series(window_tok_spans[matches_regex_f(window_tok_spans.covered_text)]) ) return pd.DataFrame({output_col_name: pd.concat(matches_list)})
[docs]def extract_regex( doc_text: str, compiled_regex: "re.Pattern" # Double quotes for Python 3.6 compatibility ): """ Identify all non-overlapping matches of a regular expression, as returned by :func:`re.Pattern.finditer()`, and return those locations as an array of spans. :param doc_text: Text of the document; will be the target text of the returned spans. :param compiled_regex: Regular expression to evaluate, compiled with either the ``re`` or the ``regex`` package. :returns: :class:`SpanArray` containing a span for each match of the regex. """ begins = [] ends = [] for a in compiled_regex.finditer(doc_text): begins.append(a.start()) ends.append(a.end()) return SpanArray(doc_text, begins, ends)
[docs]def extract_split( doc_text: str, split_points: Union[Sequence[int], np.ndarray, SpanArray] ) -> SpanArray: """ Split a document into spans along a specified set of split points. :param doc_text: Text of the document; will be the target text of the returned spans. :param split_points: A series of offsets into ``doc_text``, expressed as either: * A sequence of integers (split at certain locations and return a set of splits that covers every character in the document) as a list or 1-d Numpy array * A sequence of spans (split around the indicated locations, but discard the parts of the document that are within a split point) :returns: :class:`SpanArray` that splits the document in the specified way. """ if isinstance(split_points, (collections.abc.Sequence, np.ndarray)): # Single-integer split points ==> zero-length spans split_points = SpanArray(doc_text, split_points, split_points) elif not isinstance(split_points, SpanArray): raise TypeError(f"Split points are of type {type(split_points)}. Expected a " f"sequence of integers or a SpanArray.") # Make sure split points are in order sorted_indices = split_points.argsort() sorted_split_points = split_points[sorted_indices] # Break out the split points. split_begins = sorted_split_points.begin.tolist() # type: List[int] split_ends = sorted_split_points.end.tolist() # type: List[int] # Tack on an additional split point at the very end to simplify the logic below. split_begins.append(len(doc_text)) split_ends.append(len(doc_text)) # Walk through the document, generating the begin and end offsets of spans begins = [] ends = [] begin = 0 for split_begin, split_end in zip(split_begins, split_ends): end = split_begin if end > begin: # Ignore zero-length and negative-length chunks begins.append(begin) ends.append(end) begin = split_end return SpanArray(doc_text, begins, ends)