Source code for

#  Copyright (c) 2020 IBM Corp.
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  See the License for the specific language governing permissions and
#  limitations under the License.

This module of Text Extensions for Pandas includes I/O functions related to the
Watson Natural Language Understanding service on the IBM Cloud.
This service provides analysis of text feature through a request/response API.
for information on getting started with the service. Details of the API and
available features can be found at
For convenience, a Python SDK is available at
that can be used to authenticate and make requests to the service.

from typing import *
import warnings

import numpy as np
import pandas as pd
import pyarrow as pa

from text_extensions_for_pandas.array.span import SpanArray, Span
from text_extensions_for_pandas.array.token_span import TokenSpanArray
from import util
from text_extensions_for_pandas.spanner import contain_join

# Standard Schemas for Response Data
_entities_schema = [
    ("type", "string"),
    ("text", "string"),
    ("sentiment.label", "string"),
    ("sentiment.score", "double"),
    ("relevance", "double"),
    ("emotion.sadness", "double"),
    ("", "double"),
    ("emotion.fear", "double"),
    ("emotion.disgust", "double"),
    ("emotion.anger", "double"),
    ("count", "int64"),
    ("confidence", "double"),
    ("disambiguation.subtype", "string"),
    ("", "string"),
    ("disambiguation.dbpedia_resource", "string"),

_entity_mentions_schema = [
    ("type", "string"),
    ("text", "string"),
    ("span", "ArrowSpanType"),  # NOTE: Renamed from "location"
    ("confidence", "double")

_keywords_schema = [
    ("text", "string"),
    ("sentiment.label", "string"),
    ("sentiment.score", "double"),
    ("relevance", "double"),
    ("emotion.sadness", "double"),
    ("", "double"),
    ("emotion.fear", "double"),
    ("emotion.disgust", "double"),
    ("emotion.anger", "double"),
    ("count", "int64"),

_semantic_roles_schema = [
    ("subject.text", "string"),
    ("sentence", "string"),
    ("object.text", "string"),
    ("action.verb.text", "string"),
    ("action.verb.tense", "string"),
    ("action.text", "string"),
    ("action.normalized", "string"),

_syntax_schema = [
    ("span", "ArrowSpanType"),
    ("part_of_speech", "string"),
    ("lemma", "string"),
    ("sentence", " ArrowTokenSpanType"),

_relations_schema = [
    ("type", "string"),
    ("sentence_span", "ArrowTokenSpanType"),
    ("score", "double"),
    ("arguments.0.span", "ArrowTokenSpanType"),
    ("arguments.1.span", "ArrowTokenSpanType"),
    ("arguments.0.entities.type", "string"),
    ("arguments.1.entities.type", "string"),
    ("arguments.0.entities.text", "string"),
    ("arguments.1.entities.text", "string"),
    ("arguments.0.entities.disambiguation.subtype", "string"),
    ("arguments.1.entities.disambiguation.subtype", "string"),
    ("", "string"),
    ("", "string"),
    ("arguments.0.disambiguation.dbpedia_resource", "string"),
    ("arguments.1.disambiguation.dbpedia_resource", "string"),

def _make_syntax_dataframes(syntax_response, original_text):
    tokens = syntax_response.get("tokens", [])
    sentence = syntax_response.get("sentences", [])

    if len(tokens) > 0:
        token_table = util.make_table(tokens)
        location_col, location_name = util.find_column(token_table, "location")
        text_col, text_name = util.find_column(token_table, "text")
        char_span = util.make_char_span(location_col, text_col, original_text)

        # Drop location, text columns that is duplicated in char_span
        token_table = token_table.drop([location_name, text_name])

        # Add the span columns to the DataFrames
        token_df = token_table.to_pandas()
        token_df['span'] = char_span
        char_span = None
        token_df = pd.DataFrame()

    if len(sentence) > 0:
        sentence_table = util.make_table(sentence)
        sentence_df = sentence_table.to_pandas()
        if char_span is not None:
            location_col, _ = util.find_column(sentence_table, "location")
            text_col, _ = util.find_column(sentence_table, "text")
            sentence_char_span = util.make_char_span(location_col, text_col, original_text)
            sentence_span = TokenSpanArray.align_to_tokens(char_span, sentence_char_span)
            sentence_df['span'] = sentence_char_span
            sentence_df['sentence_span'] = sentence_span
        sentence_df = pd.DataFrame()

    return token_df, sentence_df

def _merge_syntax_dataframes(token_df, sentence_series):

    df = token_df.merge(
        ), how="outer"

    return df

def _make_relations_dataframe(relations, original_text, sentence_span_series):
    if len(relations) == 0:
        return pd.DataFrame()

    table = util.make_table(relations)

    location_cols = {}  # Type: Dict[int, Tuple[Union[Array, ChunkedArray], str]]

    # Separate each argument into a column
    flattened_arguments = []
    drop_cols = []
    for name in table.column_names:
        if name.lower().startswith("arguments"):
            col = pa.concat_arrays(table.column(name).iterchunks())
            assert pa.types.is_list(col.type)

            name_split = name.split('.', maxsplit=1)
            num_arguments = len(col[0])

            value_series = col.values.to_pandas()

            # Separate the arguments into individual columns
            for i in range(num_arguments):
                arg_name = "{}.{}.{}".format(name_split[0], i, name_split[1])
                arg_series = value_series[i::num_arguments]

                arg_array = pa.array(arg_series)

                # If list array is fixed length with 1 element, it can be flattened
                temp = arg_array
                while pa.types.is_list(temp.type):
                    temp = temp.flatten()
                    if len(temp) == len(arg_array):
                        # TODO also need to verify each offset inc by 1?
                        arg_array = temp

                if name.lower().endswith("location"):
                    location_cols[i] = (arg_array, "{}.{}".format(name_split[0], i))

                flattened_arguments.append((arg_array, arg_name))

    # Add the flattened argument columns
    for arg_array, arg_name in flattened_arguments:
        table = table.append_column(arg_name, arg_array)

    # Replace argument location and text columns with spans
    arg_span_cols = {}
    for arg_i, (location_col, arg_prefix) in location_cols.items():
        text_col, text_name = util.find_column(table, "{}.text".format(arg_prefix))
        arg_span_cols["{}.span".format(arg_prefix)] = util.make_char_span(location_col,
        drop_cols.extend(["{}.location".format(arg_prefix), text_name])

    add_cols = arg_span_cols.copy()

    # Build the sentence span and drop plain text sentence col
    sentence_col, sentence_name = util.find_column(table, "sentence")
    arg_col_names = list(arg_span_cols.keys())
    if len(arg_col_names) > 0:
        first_arg_span_array = arg_span_cols[arg_col_names[0]]

        sentence_matches = []
        for i, arg_span in enumerate(first_arg_span_array):
            arg_begin = arg_span.begin
            arg_end = arg_span.end
            j = len(sentence_span_series) // 2
            found = False
            while not found:
                sentence_span = sentence_span_series[j]
                if arg_begin >= sentence_span.end:
                    j += 1
                elif arg_end <= sentence_span.begin:
                    j -= 1
                    contains = [sentence_span.contains(a[i]) for a in arg_span_cols.values()]
                    if not (all(contains) and
                            sentence_span.covered_text == sentence_col[i].as_py()):
                        msg = f"Mismatched sentence span for: {sentence_span}"
                        if not all(contains):
                            msg += f"\nContains Args: {all(contains)}"
                        if sentence_span.covered_text != sentence_col[i].as_py():
                            msg += f"\nSpanText: '{sentence_span.covered_text}'" \
                                   f"\nSentence: '{sentence_col[i]}'"
                    found = True

        relations_sentence = sentence_span_series[sentence_matches]
        add_cols["sentence_span"] = relations_sentence.reset_index(drop=True)
        warnings.warn("Could not make sentence span column for Re")

    # Drop columns that have been flattened or replaced by spans
    table = table.drop(drop_cols)

    df = table.to_pandas()

    # Insert additional columns
    for col_name, col in add_cols.items():
        df[col_name] = col

    return df

def _make_relations_dataframe_zero_copy(relations):
    if len(relations) == 0:
        return pd.DataFrame()

    table =util.make_table(relations)

    # Separate each argument into a column
    flattened_arguments = []
    drop_cols = []
    for name in table.column_names:
        if name.lower().startswith("arguments"):
            col = pa.concat_arrays(table.column(name).iterchunks())
            assert pa.types.is_list(col.type)
            is_nested_list = pa.types.is_list(col.type.value_type)

            name_split = name.split('.', maxsplit=1)
            first_list = col[0]
            num_arguments = len(first_list)

            null_count = 0

            # Get the flattened raw values
            raw = col
            offset_arrays = []
            while pa.types.is_list(raw.type):
                null_count += raw.null_count
                raw = raw.flatten()

            # TODO handle lists with null values
            if null_count > 0:

            # Convert values to numpy
            values = raw.to_numpy(zero_copy_only=False)  # string might copy
            offsets_list = [o.to_numpy() for o in offset_arrays]

            # Compute the length of each list in the array
            value_offsets = offsets_list.pop()
            value_lengths = value_offsets[1:] - value_offsets[:-1]

            # Separate the arguments into individual columns
            for i in range(num_arguments):
                arg_name = "{}.{}.{}".format(name_split[0], i, name_split[1])
                arg_lengths = value_lengths[i::num_arguments]

                # Fixed length arrays can be sliced
                if not is_nested_list or len(np.unique(arg_lengths)) == 1:
                    num_elements = len(first_list[i]) if is_nested_list else 1

                    # Only 1 element so leave in primitive array
                    if not is_nested_list or num_elements == 1:
                        arg_values = values[i::num_arguments]
                        arg_array = pa.array(arg_values)
                    # Multiple elements so put back in a list array
                        arg_values = values.reshape([len(col) * num_arguments, num_elements])
                        arg_values = arg_values[i::num_elements]
                        arg_values = arg_values.flatten()
                        arg_offsets = np.cumsum(arg_lengths)
                        arg_offsets = np.insert(arg_offsets, 0, 0)
                        arg_array = pa.ListArray.from_arrays(arg_offsets, arg_values)
                    # TODO Argument properties with variable length arrays not currently
                    #  supported

                flattened_arguments.append((arg_array, arg_name))

    # Add the flattened argument columns
    for arg_array, arg_name in flattened_arguments:
        table = table.append_column(arg_name, arg_array)

    # Drop columns that have been flattened
    table = table.drop(drop_cols)

    return table.to_pandas()

def _make_entity_dataframes(entities: List,
                            original_text: str) -> (pd.DataFrame, pd.DataFrame):
    Create the entities and entity_mentions DataFrames.

    :param entities: The "entities" section of a parsed NLU response
    :param original_text: Text of the document.  This argument must be provided if there
     are entity mention spans.
    if len(entities) == 0:
        return pd.DataFrame(), pd.DataFrame()

    table = util.make_table(entities)

    # Check if response includes entity mentions
    mention_name_cols = [(name, table.column(name)) for name in table.column_names
                         if name.lower().startswith("mentions")]

    # Make entities and entity mentions (optional) DataFrames
    if len(mention_name_cols) > 0:
        mention_names, mention_cols = zip(*mention_name_cols)

        # Create the entities DataFrame with mention arrays dropped
        table = table.drop(mention_names)
        pdf = table.to_pandas()

        # Flatten the mention arrays to be put in separate table
        mention_arrays = [pa.concat_arrays(col.iterchunks()) for col in mention_cols]
        flat_mention_arrays = [a.flatten() for a in mention_arrays]
        table_mentions = pa.Table.from_arrays(flat_mention_arrays, names=mention_names)

        # Convert location/text columns to span
        location_col, location_name = util.find_column(table_mentions, "location")
        text_col, text_name = util.find_column(table_mentions, "text")
        if original_text is None:
            raise ValueError(
                "Unable to construct target text for converting entity mentions to spans")

        char_span = util.make_char_span(location_col, text_col, original_text)
        table_mentions = table_mentions.drop([location_name, text_name])

        # Create the entity_mentions DataFrame
        pdf_mentions = table_mentions.to_pandas()
        pdf_mentions["span"] = char_span

        # Align index of parent entities DataFrame with flattened DataFrame and ffill
        # values
        mention_offsets = mention_arrays[0].offsets.to_numpy()
        pdf_parent = pdf.set_index(mention_offsets[:-1])
        pdf_parent = pdf_parent.reindex(pdf_mentions.index, method="ffill")

        # Add columns from entities parent DataFrame
        pdf_mentions["text"] = pdf_parent["text"]
        pdf_mentions["type"] = pdf_parent["type"]

        # Remove "mentions" from column names
        pdf_mentions.rename(columns={c: c.split("mentions.")[-1]
                                     for c in pdf_mentions.columns},
        pdf = table.to_pandas()
        pdf_mentions = pd.DataFrame()

    return pdf, pdf_mentions

[docs]def parse_response(response: Dict[str, Any], original_text: str = None, apply_standard_schema: bool = False) -> Dict[str, pd.DataFrame]: """ Parse a Watson NLU response as a decoded JSON string, e.g. dictionary containing requested features and convert into a dict of Pandas DataFrames. The following features in the response will be converted: * entities * entity_mentions (elements of the "mentions" field of `response["entities"]`) * keywords * relations * semantic_roles * syntax For information on getting started with Watson Natural Language Understanding on IBM Cloud, see A Python SDK for authentication and making requests to the service is provided at Details on the supported features and available options when making the request can be found at .. note:: Additional feature data in response will not be processed >>> response = natural_language_understanding.analyze( ... url="", ... return_analyzed_text=True, ... features=Features( ... entities=EntitiesOptions(sentiment=True), ... keywords=KeywordsOptions(sentiment=True, emotion=True), ... relations=RelationsOptions(), ... semantic_roles=SemanticRolesOptions(), ... syntax=SyntaxOptions(sentences=True, tokens=SyntaxOptionsTokens(lemma=True, part_of_speech=True)) ... )).get_result() >>> dfs = parse_response(response) >>> dfs.keys() dict_keys(['syntax', 'entities', 'keywords', 'relations', 'semantic_roles']) >>> dfs["syntax"].head() span part_of_speech lemma \ 0 [0, 5): 'Monty' PROPN None 1 [6, 12): 'Python' PROPN python <BLANKLINE> sentence 0 [0, 273): 'Monty Python and the Holy Grail is ... 1 [0, 273): 'Monty Python and the Holy Grail is ... :param response: A dictionary of features from the IBM Watson NLU response :param original_text: Optional original text sent in request, if None will look for "analyzed_text" keyword in response :param apply_standard_schema: Return DataFrames with a set schema, whether data was present in the response or not :return: A dictionary mapping feature name to a Pandas DataFrame """ dfs = {} if original_text is None and "analyzed_text" in response: original_text = response["analyzed_text"] # Create the syntax DataFrame syntax_response = response.get("syntax", {}) token_df, sentence_df = _make_syntax_dataframes(syntax_response, original_text) sentence_series = sentence_df.get("sentence_span") if sentence_series is not None: syntax_df = _merge_syntax_dataframes(token_df, sentence_series) else: syntax_df = pd.concat([token_df, sentence_df], axis=1) dfs["syntax"] = util.apply_schema(syntax_df, _syntax_schema, apply_standard_schema) if original_text is None and "span" in dfs["syntax"].columns: char_span = dfs["syntax"]["span"] if isinstance(char_span, SpanArray): original_text = dfs["syntax"]["span"].target_text else: warnings.warn("Did not receive and could not build original text") # Create the entities DataFrames entities = response.get("entities", []) entities_df, entity_mentions_df = _make_entity_dataframes(entities, original_text) dfs["entities"] = util.apply_schema(entities_df, _entities_schema, apply_standard_schema) dfs["entity_mentions"] = util.apply_schema(entity_mentions_df, _entity_mentions_schema, apply_standard_schema) # Create the keywords DataFrame keywords = response.get("keywords", []) keywords_df = util.make_dataframe(keywords) dfs["keywords"] = util.apply_schema(keywords_df, _keywords_schema, apply_standard_schema) # Create the relations DataFrame relations = response.get("relations", []) relations_df = _make_relations_dataframe(relations, original_text, sentence_series) dfs["relations"] = util.apply_schema(relations_df, _relations_schema, apply_standard_schema) # Create the semantic roles DataFrame semantic_roles = response.get("semantic_roles", []) semantic_roles_df = util.make_dataframe(semantic_roles) dfs["semantic_roles"] = util.apply_schema(semantic_roles_df, _semantic_roles_schema, apply_standard_schema) if "warnings" in response: # TODO: check structure of warnings and improve message warnings.warn(str(response["warnings"])) return dfs
[docs]def make_span_from_entities(char_span: SpanArray, entities_frame: pd.DataFrame, entity_col: str = "text") -> TokenSpanArray: """ Create a token span array for entity text from the entities DataFrame, and an existing char span array with tokens from the entire analyzed text. :param char_span: Parsed tokens :param entities_frame: Entities DataFrame from `parse_response` :param entity_col: Column name for the entity text :return: TokenSpanArray for matching entities """ entities = entities_frame[entity_col] entities_len = entities.str.len() begins = [] ends = [] i = 0 while i < len(char_span): span = char_span[i] text = span.covered_text end = i num_tokens = 1 stop = False while not stop: stop = True starts_with = entities.str.startswith(text) if any(starts_with): # Have a complete match, advance the end index if any(entities_len[starts_with] == len(text)): end = i + num_tokens # Try the next token if i + num_tokens < len(char_span): span = char_span[i + num_tokens] text = text + " " + span.covered_text num_tokens += 1 stop = False if i != end: begins.append(i) ends.append(end) i += (end - i) else: i += 1 return TokenSpanArray(char_span, begins, ends)