Source code for text_extensions_for_pandas.io.bert

#
#  Copyright (c) 2020 IBM Corp.
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

"""
The ``io.bert`` module contains functions for working with transformer-based
language models such as BERT, including managing the special tokenization and windowing
that these models require.

This module uses the transformers_ library to implement tokenization and
embedding generation. You will need that library in your Python path to use the
functions in this module.

.. _transformers: https://github.com/huggingface/transformers
"""

################################################################################
# tokenization.py
#
# Functions for tokenization of text

import numpy as np
import pandas as pd
from typing import *

from text_extensions_for_pandas.array.span import (
    SpanArray,
)
from text_extensions_for_pandas.array.token_span import (
    TokenSpanArray,
)
from text_extensions_for_pandas.array.tensor import (
    TensorArray,
)

from text_extensions_for_pandas.io import conll as conll
from text_extensions_for_pandas import spanner as spanner


[docs]def make_bert_tokens(target_text: str, tokenizer) -> pd.DataFrame: """ Tokenize the indicated text for BERT embeddings and return a DataFrame with one row per token. :param: target_text: string to tokenize :param: tokenizer: A tokenizer that is a subclass of huggingface transformers PreTrainingTokenizerFast which supports `encode_plus` with return_offsets_mapping=True. :returns: ``pd.DataFrame`` with following columns: * "id": unique integer ID for each token * "span": span of the token (with offsets measured in characters) * "input_id": integer ID suitable for input to a BERT embedding model * "token_type_id": list of token type ids to be fed to a model * "attention_mask": list of indices specifying which tokens should be attended to by the model * "special_tokens_mask": `True` if the token is a zero-length special token such as "start of document" """ # noinspection PyPackageRequirements from transformers import PreTrainedTokenizerFast if not isinstance(tokenizer, PreTrainedTokenizerFast): raise TypeError( "Tokenizer must be an instance of " "transformers.PreTrainedTokenizerFast that supports " "encode_plus with return_offsets_mapping=True." ) tokenized_result = tokenizer.encode_plus( target_text, return_special_tokens_mask=True, return_offsets_mapping=True ) # Get offset mapping from tokenizer offsets = tokenized_result["offset_mapping"] # Init any special tokens at beginning i = 0 while offsets[i] is None: offsets[i] = (0, 0) i += 1 # Make a DataFrame to unzip (begin, end) offsets offset_df = pd.DataFrame(offsets, columns=["begin", "end"]) # Convert special tokens mask to boolean special_tokens_mask = pd.Series(tokenized_result["special_tokens_mask"]).astype( "bool" ) # Fill remaining special tokens to zero-length spans ends = offset_df["end"].fillna(method="ffill").astype("int32") begins = offset_df["begin"].mask(special_tokens_mask, other=ends).astype("int32") spans = SpanArray(target_text, begins, ends) token_features = pd.DataFrame( { "token_id": special_tokens_mask.index, "span": spans, "input_id": tokenized_result["input_ids"], "token_type_id": tokenized_result["token_type_ids"], "attention_mask": tokenized_result["attention_mask"], "special_tokens_mask": special_tokens_mask, } ) return token_features
[docs]def add_embeddings( df: pd.DataFrame, bert: Any, overlap: int = 32, non_overlap: int = 64 ) -> pd.DataFrame: """ Add BERT embeddings to a DataFrame of BERT tokens. :param df: DataFrame containing BERT tokens, as returned by :func:`make_bert_tokens` Must contain a column ``input_id`` containing token IDs. :param bert: PyTorch-based BERT model from the ``transformers`` library :param overlap: (optional) how much overlap there should be between adjacent windows :param non_overlap: (optional) how much non-overlapping content between the overlapping regions there should be at the middle of each window? :returns: A copy of ``df`` with a new column, "embedding", containing BERT embeddings as a ``TensorArray``. .. note:: PyTorch must be installed to run this function. """ # Import torch inline so that the rest of this library will function without it. # noinspection PyPackageRequirements import torch flat_input_ids = df["input_id"].values windows = seq_to_windows(flat_input_ids, overlap, non_overlap) bert_result = bert( input_ids=torch.tensor(windows["input_ids"]), attention_mask=torch.tensor(windows["attention_masks"]), ) hidden_states = windows_to_seq( flat_input_ids, bert_result[0].detach().numpy(), overlap, non_overlap ) embeddings = TensorArray(hidden_states) ret = df.copy() ret["embedding"] = embeddings return ret
[docs]def conll_to_bert( df: pd.DataFrame, tokenizer: Any, bert: Any, token_class_dtype: pd.CategoricalDtype, compute_embeddings: bool = True, overlap: int = 32, non_overlap: int = 64, ) -> pd.DataFrame: """ :param df: One DataFrame from the :func:`conll_2003_to_dataframes` function, representing the tokens of a single document in the original tokenization. :param tokenizer: BERT tokenizer instance from the `transformers` library :param bert: PyTorch-based BERT model from the `transformers` library :param token_class_dtype: Pandas categorical type for representing token class labels, as returned by :func:`make_iob_tag_categories` :param compute_embeddings: True to generate BERT embeddings at each token position and add a column "embedding" to the returned DataFrame with the embeddings :param overlap: (optional) how much overlap there should be between adjacent windows for embeddings :param non_overlap: (optional) how much non-overlapping content between the overlapping regions there should be at the middle of each window? :returns: A version of the same DataFrame, but with BERT tokens, BERT embeddings for each token (if ``compute_embeddings`` is ``True``), and token class labels. """ spans_df = conll.iob_to_spans(df) bert_toks_df = make_bert_tokens(df["span"].values[0].target_text, tokenizer) bert_token_spans = TokenSpanArray.align_to_tokens( bert_toks_df["span"], spans_df["span"] ) bert_toks_df[["ent_iob", "ent_type"]] = conll.spans_to_iob( bert_token_spans, spans_df["ent_type"] ) bert_toks_df = conll.add_token_classes(bert_toks_df, token_class_dtype) if compute_embeddings: bert_toks_df = add_embeddings(bert_toks_df, bert, overlap, non_overlap) return bert_toks_df
[docs]def align_bert_tokens_to_corpus_tokens( spans_df: pd.DataFrame, corpus_toks_df: pd.DataFrame, spans_df_token_col: str = "span", corpus_df_token_col: str = "span", entity_type_col: str = "ent_type", ) -> pd.DataFrame: """ Expand entity matches from a BERT-based model so that they align with the corpus's original tokenization. :param spans_df: DataFrame of extracted entities. Must contain two columns with span and entity type information, respectively. Other columns ignored. :param corpus_toks_df: DataFrame of the corpus's original tokenization, one row per token. Must contain a column with character-based spans of the tokens. :param spans_df_token_col: the name of the column in ``spans_df`` containing its tokenization. By default, ``'span'`` :param corpus_df_token_col: the name of the column in ``corpus_toks_df`` that contains its tokenization. By default ```'span'`` :param entity_type_col: the name of the column in spans_df that contains the entity types of the elements :returns: A new DataFrame with schema ``["span", "ent_type"]``, where the "span" column contains token-based spans based off the *corpus* tokenization in ``corpus_toks_df["span"]``. """ if len(spans_df.index) == 0: return spans_df.copy() overlaps_df = spanner.overlap_join( spans_df[spans_df_token_col], corpus_toks_df[corpus_df_token_col], "span", "corpus_token", ).merge(spans_df, left_on="span", right_on=spans_df_token_col) agg_df = ( overlaps_df.groupby("span") .aggregate({"corpus_token": "sum", entity_type_col: "first"}) .reset_index() ) cons_df = spanner.consolidate(agg_df, "corpus_token")[ ["corpus_token", entity_type_col] ].rename(columns={"corpus_token": "span"}) cons_df["span"] = TokenSpanArray.align_to_tokens( corpus_toks_df[corpus_df_token_col], cons_df["span"] ) return cons_df
[docs]def seq_to_windows( seq: np.ndarray, overlap: int, non_overlap: int ) -> Dict[str, np.ndarray]: """ Convert a variable-length sequence into a set of fixed length windows, adding padding as necessary. Usually this function is used to prepare batches of BERT tokens to feed to a BERT model. :param seq: Original variable length sequence, as a 1D numpy array :param overlap: How much overlap there should be between adjacent windows :param non_overlap: How much non-overlapping content between the overlapping regions there should be at the middle of each window? :returns: Dictionary with the keys "input_ids" and "attention_masks" mapped to NumPy arrays as described below. * "input_ids" (where ``d`` is the returned dictionary): 2D ``np.ndarray`` of fixed-length windows * "attention_masks": 2D ``np.ndarray`` of attention masks (1 for tokens that are NOT masked, 0 for tokens that are masked) to feed into your favorite BERT-like embedding generator. """ if len(seq.shape) != 1: raise ValueError(f"Input array must be 1D; got shape {seq.shape}") window_length, pre_padding, post_padding = _compute_padding( len(seq), overlap, non_overlap ) # First generate the windows as a padded flat arrays. padded_length = len(seq) + pre_padding + post_padding buf = np.zeros(shape=[padded_length], dtype=seq.dtype) buf[pre_padding : pre_padding + len(seq)] = seq mask_buf = np.zeros_like(buf, dtype=int) # 0 == MASKED mask_buf[pre_padding : pre_padding + len(seq)] = 1 # 1 == NOT MASKED # Reshape the flat arrays into overlapping windows. num_windows = padded_length // (overlap + non_overlap) windows = np.zeros(shape=[num_windows, window_length], dtype=seq.dtype) masks = np.zeros(shape=[num_windows, window_length], dtype=int) for i in range(num_windows): start = i * (overlap + non_overlap) windows[i] = buf[start : start + window_length] masks[i] = mask_buf[start : start + window_length] return {"input_ids": windows, "attention_masks": masks}
[docs]def windows_to_seq( seq: np.ndarray, windows: np.ndarray, overlap: int, non_overlap: int ) -> np.ndarray: """ Inverse of :func:`seq_to_windows`. Convert fixed length windows with padding to a variable-length sequence that matches up with the original sequence from which the windows were computed. Usually this function is used to convert the outputs of a BERT model back to a format that aligns with the original tokens. :param seq: Original variable length sequence to align with, as a 1D numpy array :param windows: Windowed data to align with the original sequence. Usually this data is the result of applying a transformation to the output of ``seq_to_windows()``` :param overlap: How much overlap there is between adjacent windows :param non_overlap: How much non-overlapping content between the overlapping regions there should be at the middle of each window? :returns: A 1D ``np.ndarray`` containing the contents of ``windows`` that correspond to the elements of ``seq``. """ if len(seq.shape) != 1: raise ValueError(f"Input array must be 1D; got shape {seq.shape}") window_length, pre_padding, post_padding = _compute_padding( len(seq), overlap, non_overlap ) # Input may be an array of n-dimensional tensors. result_shape = [len(seq)] + list(windows.shape[2:]) # result = np.zeros_like(seq, dtype=windows.dtype) result = np.zeros(shape=result_shape, dtype=windows.dtype) # Special-case the first and last windows. if len(seq) <= non_overlap + (overlap // 2): # Only one window, potentially a partial window. return windows[0][overlap : overlap + len(seq)] else: result[0 : non_overlap + (overlap // 2)] = windows[0][ overlap : overlap + non_overlap + (overlap // 2) ] num_to_copy_from_last = overlap // 2 + overlap + non_overlap - post_padding if num_to_copy_from_last > 0: result[-num_to_copy_from_last:] = windows[-1][ overlap // 2 : (overlap // 2) + num_to_copy_from_last ] # Remaining windows can be covered in a loop for i in range(1, len(windows) - 1): src_start = overlap // 2 dest_start = overlap // 2 + non_overlap + (i - 1) * (overlap + non_overlap) num_to_copy = min(non_overlap + overlap, len(seq) - dest_start) result[dest_start : dest_start + num_to_copy] = windows[i][ src_start : src_start + num_to_copy ] return result
def _compute_padding( seq_len: int, overlap: int, non_overlap: int ) -> Tuple[int, int, int]: """ Shared padding computation for :func:`seq_to_windows` and :func:`windows_to_seq` :param seq_len: Length of original sequence :param overlap: How much overlap there should be between adjacent window :param non_overlap: How much non-overlapping content between the overlapping regions there should be at the middle of each window? :returns: A tuple of (window_length, pre_padding, post_padding) """ if 0 != overlap % 2: raise ValueError(f"Non-even overlaps not implemented; got {overlap}") # Each window has overlapping regions at the beginning and end window_length = (2 * overlap) + non_overlap # Account for the padding before the first window pre_padding = overlap # Account for the padding after the last window remainder = (seq_len + pre_padding) % (overlap + non_overlap) post_padding = window_length - remainder if post_padding == window_length: # Chop off empty last window post_padding -= overlap + non_overlap return window_length, pre_padding, post_padding