Source code for bluesearch.mining.attribute

"""Classes and functions for attribute extraction."""

# Blue Brain Search is a text mining toolbox focused on scientific use cases.
#
# Copyright (C) 2020  Blue Brain Project, EPFL.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import collections
import json
import logging
import textwrap
import warnings

import ipywidgets as widgets
import pandas as pd
import requests
from IPython.display import HTML, display
from spacy import displacy

from .._css import style

logger = logging.getLogger(__name__)


[docs]class AttributeExtractor: """Extract and analyze attributes in a given text.""" def __init__(self, core_nlp_url, grobid_quantities_url, ee_model): """Initialize the class. Parameters ---------- core_nlp_url : str The URL of the CoreNLP server. grobid_quantities_url : str The URL of the Grobid Quantities server. ee_model : spacy.language.Language The spacy model for name entity extraction """ logger.debug(f"{self.__class__.__name__} init") logger.debug(f"CoreNLP URL: {core_nlp_url}") logger.debug(f"Grobid Quantities URL: {grobid_quantities_url}") logger.debug(f"Entity Extraction Model:\n{ee_model.meta}") self.core_nlp_url = core_nlp_url self.grobid_quantities_url = grobid_quantities_url self.ee_model = ee_model
[docs] @staticmethod def get_quantity_type(quantity): """Get the type of a Grobid quantity. The top-level Grobid object is a measurement. A measurement can contain one or more than one quantities. Some Grobid quantities have a type attached to them, e.g. "mass", "concentration", etc. This is the type that is returned. For quantities without a type an empty string is returned. Parameters ---------- quantity : dict A Grobid quantity. Returns ------- quantity_type : str The type of the quantity. """ try: quantity_type = quantity["rawUnit"]["type"] except KeyError: try: quantity_type = quantity["normalizedUnit"]["type"] except KeyError: quantity_type = "" return quantity_type
[docs] def get_measurement_type(self, measurement): """Get the type of a Grobid measurement. For measurements with multiple quantities the most common type is returned. In case of ties the empty type always loses. Parameters ---------- measurement : dict A Grobid measurement. Returns ------- measurement_type : str The type of the Grobid measurement. """ logger.debug("get_measurement_type") logger.debug(f"measurement:\n{measurement}") quantity_types = [ self.get_quantity_type(quantity) for quantity in self.iter_quantities(measurement) ] logger.debug(f"quantity_types: {quantity_types}") quantity_type_counts = collections.Counter(quantity_types) most_common_quantity_types = sorted( quantity_type_counts.most_common(), key=lambda t_cnt: (-t_cnt[1], int(t_cnt[0] == "")), ) measurement_type = most_common_quantity_types[0][0] return measurement_type
[docs] def count_measurement_types(self, measurements): """Count types of all given measurements. Parameters ---------- measurements : list A list of Grobid measurements. Returns ------- all_type_counts : collections.Counter The counts of all measurement types. """ all_types = [self.get_measurement_type(m) for m in measurements] all_type_counts = collections.Counter(all_types) return all_type_counts
[docs] def get_grobid_measurements(self, text): """Get measurements for text form Grobid server. Parameters ---------- text : str The text for the query. Returns ------- measurements : list_like All Grobid measurements extracted from the given text. """ response = requests.post(self.grobid_quantities_url, files={"text": text}) measurements = [] if response.status_code != 200: msg = f"GROBID request problem. Code: {response.status_code}" warnings.warn(msg) else: response_json = json.loads(response.text) if "measurements" in response_json: measurements = response_json["measurements"] return measurements
[docs] @staticmethod def annotate_quantities(text, measurements): """Annotate measurements in text using HTML/CSS styles. Parameters ---------- text : str The text to annotate. measurements : list The Grobid measurements for the text. It is assumed that these measurements were obtained by calling `get_grobid_measurements(text)`. Returns ------- output : IPython.core.display.HTML The annotated text. """ def annotate_quantity(quantity): annotations = [] start = quantity["offsetStart"] end = quantity["offsetEnd"] formatted_text = f'<span class="number">{text[start:end]}</span>' quantity_type = AttributeExtractor.get_quantity_type(quantity) if quantity_type: formatted_text += f'<span class="quantityType">[{quantity_type}]</span>' annotations.append([start, end, formatted_text]) if "rawUnit" in quantity: start = quantity["rawUnit"]["offsetStart"] end = quantity["rawUnit"]["offsetEnd"] annotations.append( [start, end, f'<span class="unit">{text[start:end]}</span>'] ) return annotations annotations = [] for measurement in measurements: for quantity in AttributeExtractor.iter_quantities(measurement): annotations += annotate_quantity(quantity) annotations = sorted(annotations, key=lambda x: x[0]) annotated_text = "" last_idx = 0 for start, end, quantity in annotations: if start >= last_idx: annotated_text += text[last_idx:start] + quantity last_idx = end annotated_text += text[last_idx:] css_style = style.get_css_style() html = f""""<style> {css_style} </style> <div class=\"fixedWidth\"> {annotated_text} </div>""" output = HTML(html) return output
[docs] @staticmethod def get_overlapping_token_ids(start, end, tokens): """Find tokens intersecting the interval [start, end). CoreNLP breaks a given text down into sentences, and each sentence is broken down into tokens. These can be accessed by `response['sentences'][sentence_id]['tokens']`. Each token corresponds to a position in the original text. This method determines which tokens would intersect a a given slice of this text. Parameters ---------- start : int The left boundary of the interval. end : int The right boundary of the interval. tokens : list The CoreNLP sentence tokens. Returns ------- ids : list A list of token indices that overlap with the given interval. """ ids = [] for token in tokens: overlap_start = max(start, token["characterOffsetBegin"]) overlap_end = min(end, token["characterOffsetEnd"]) if overlap_start < overlap_end: ids.append(token["index"]) return ids
[docs] @staticmethod def iter_quantities(measurement): """Iterate over quantities in a Grobid measurement. Parameters ---------- measurement : dict A Grobid measurement. Yields ------ quantity : dict A Grobid quantity in the given measurement. """ if "quantity" in measurement: yield measurement["quantity"] elif "quantities" in measurement: yield from measurement["quantities"] elif "quantityMost" in measurement or "quantityLeast" in measurement: if "quantityLeast" in measurement: yield measurement["quantityLeast"] if "quantityMost" in measurement: yield measurement["quantityMost"] elif "quantityBase" in measurement or "quantityRange" in measurement: if "quantityBase" in measurement: yield measurement["quantityBase"] if "quantityRange" in measurement: yield measurement["quantityRange"] else: warnings.warn("no quantity in measurement") return
[docs] def get_quantity_tokens(self, quantity, tokens): """Associate a Grobid quantity to CoreNLP tokens. Both the quantity and the tokens should originate from exactly the same text. A quantity may be composed of multiple parts, e.g. a number and a unit, and therefore correspond to multiple CoreNLP tokens. Parameters ---------- quantity : dict A Grobid quantity. tokens : list CoreNLP tokens. Returns ------- ids : list A list of CoreNLP token IDs corresponding to the given quantity. """ value_start = quantity["offsetStart"] value_end = quantity["offsetEnd"] ids = self.get_overlapping_token_ids(value_start, value_end, tokens) if "rawUnit" in quantity: unit_start = quantity["rawUnit"]["offsetStart"] unit_end = quantity["rawUnit"]["offsetEnd"] ids += self.get_overlapping_token_ids(unit_start, unit_end, tokens) return ids
[docs] def get_measurement_tokens(self, measurement, tokens): """Associate a Grobid measurement to CoreNLP tokens. See `get_quantity_tokens` for more details. Parameters ---------- measurement : dict A Grobid measurement. tokens : list CoreNLP tokens. Returns ------- ids : list A list of CoreNLP token IDs corresponding to the given quantity. """ ids = [] for quantity in self.iter_quantities(measurement): ids += self.get_quantity_tokens(quantity, tokens) return ids
[docs] def get_entity_tokens(self, entity, tokens): """Associate a spacy entity to CoreNLP tokens. Parameters ---------- entity : spacy.tokens.Span A spacy entity extracted from the text. See `extract_attributes` for more details. tokens : list CoreNLP tokens. Returns ------- ids : list A list of CoreNLP token IDs corresponding to the given entity. """ return self.get_overlapping_token_ids( entity.start_char, entity.end_char, tokens )
[docs] @staticmethod def iter_parents(dependencies, token_idx): """Iterate over all parents of a token. It seems that each node has at most one parent, and that `parent == 0` means no parent Parameters ---------- dependencies : list CoreNLP dependencies found in response['sentences'][idx][['basicDependencies']. token_idx : int The index of the token for which parents need to be iterated. Yields ------ parent_idx : int The index of a parent token. """ for link in dependencies: if link["dependent"] == token_idx: parent = link["governor"] if parent != 0: yield link["governor"]
[docs] def find_nn_parents(self, dependencies, tokens_d, token_idx): """Parse CoreNLP dependencies to find parents of token. To link named entities to attributes parents for both entity tokens and attribute tokens need to be extracted. See `extract_attributes` for more information This is one possible strategy for finding parents of a given token. Ascent the dependency tree until find a parent of type "NN". Do this for all parents. If, as it seems, each node has at most one parent, then the results will be either one index or no indices. Parameters ---------- dependencies : list CoreNLP dependencies found in response['sentences'][idx][['basicDependencies'] tokens_d : dict CoreNLP token dictionary mapping token indices to tokens. See `extract_attributes`. token_idx : int The index of the token for which parents need to be found. Returns ------- parents : list A list of parents. """ def get_nn(idx): if tokens_d[idx]["pos"].startswith("NN"): return [idx] else: nn_parents = [] for new_idx in self.iter_parents(dependencies, idx): nn_parents += get_nn(new_idx) return nn_parents results = [] if tokens_d[token_idx]["pos"].startswith("NN"): results.append(token_idx) for parent_idx in self.iter_parents(dependencies, token_idx): results += get_nn(parent_idx) return results
[docs] def find_all_parents(self, dependencies, tokens_d, tokens, parent_fn=None): """Find all parents of a given CoreNLP token. Parameters ---------- dependencies : list CoreNLP dependencies found in response['sentences'][idx][['basicDependencies'] tokens_d : dict CoreNLP token dictionary mapping token indices to tokens. See `extract_attributes`. tokens : list List of token indices for which parents need to be found. parent_fn : function An implementation of a parent finding strategy. Currently the available strategies are `find_compound_parents` and `find_nn_parents`. The latter seems to perform better. Returns ------- parent_ids : list A list of all parents found under the given strategy for the tokens provided. """ if parent_fn is None: parent_fn = self.find_nn_parents parent_ids = [] for token_idx in tokens: parent_ids += parent_fn(dependencies, tokens_d, token_idx) return parent_ids
[docs] @staticmethod def quantity_to_str(quantity): """Convert a Grobid quantity to string. Parameters ---------- quantity : dict A Grobid quantity. Returns ------- result : str A String representation of the quantity. """ result = str(quantity["rawValue"]) if "rawUnit" in quantity: result += " " + quantity["rawUnit"]["name"] return result
[docs] def measurement_to_str(self, measurement): """Convert a Grobid measurement to string. Parameters ---------- measurement : dict A Grobid measurement. Returns ------- quantities : list or str String representations of quantities in a measurement. If the measurement contains only one quantity then its string representation is return as is. Otherwise a list of string representations of quantities is returned. """ quantities = [ self.quantity_to_str(quantity) for quantity in self.iter_quantities(measurement) ] if len(quantities) == 1: quantities = quantities[0] return quantities
[docs] def process_raw_annotation_df(self, df, copy=True): """Add standard columns to attribute data frame. Parameters ---------- df : pd.DataFrame A data frame with measurements in a raw format. This can be obtained by calling `extract_attributes` with the parameter `raw_attributes=True`. copy : bool If true then it is guaranteed that the original data frame won't be modified. Returns ------- df : pd.DataFrame A modified data frame with the raw attribute column replaced by a number of more explicit columns using the standard nomenclature. """ if copy: df = df.copy() if "attribute" not in df.columns: return df def get_property(attribute): m_type = self.get_measurement_type(attribute) if len(m_type) > 0: return f"has {m_type} {attribute['type']}" else: return f"has {attribute['type']}" df["property"] = df["attribute"].apply(get_property) df["property_type"] = "attribute" df["property_value"] = df["attribute"].apply(self.measurement_to_str) df["property_value_type"] = "int" return df.drop(columns="attribute")
[docs] def get_core_nlp_analysis(self, text): """Send a CoreNLP query and return the result. Parameters ---------- text : str The text to analyze with CoreNLP. Returns ------- response_json : dict The CoreNLP response. """ response_json = None try: request_data = text.encode("utf-8") request_params = '?properties={"annotators":"depparse"}' response = requests.post( self.core_nlp_url + request_params, data=request_data ) response.raise_for_status() response_json = json.loads(response.text) except requests.exceptions.RequestException: warnings.warn("There was a problem contacting the CoreNLP server.") except AssertionError: warnings.warn("Reply by CoreNLP was not OK.") except json.JSONDecodeError: warnings.warn("Could not parse the CoreNLP response JSON.") finally: if response_json is None: response_json = {"sentences": []} return response_json
[docs] def are_linked(self, measurement, entity, core_nlp_sentence): """Determine if a measurement and an entity are link. Parameters ---------- measurement : dict A Grobid measurement. entity : spacy.tokens.Span A spacy named entity. core_nlp_sentence : dict A CoreNLP sentences. The CoreNLP sentences can be obtained from `core_nlp_response["sentences"]`. Returns ------- have_common_parents : bool Whether or not the entity is linked to the measurement. """ tokens = core_nlp_sentence["tokens"] dependencies = core_nlp_sentence["basicDependencies"] tokens_d = {token["index"]: token for token in tokens} measurement_ids = self.get_measurement_tokens(measurement, tokens) ne_ids = self.get_entity_tokens(entity, tokens) measurement_parents = self.find_all_parents( dependencies, tokens_d, measurement_ids ) ne_parents = self.find_all_parents(dependencies, tokens_d, ne_ids) measurement_parents = set(measurement_parents) ne_parents = set(ne_parents) have_common_parents = len(measurement_parents & ne_parents) > 0 return have_common_parents
[docs] def extract_attributes( self, text, linked_attributes_only=True, raw_attributes=False ): """Extract attributes from text. Parameters ---------- text : str The text for attribute extraction. linked_attributes_only : bool If true then only those attributes will be recorded for which there is an associated named entity. raw_attributes : bool If true then the resulting data frame will contain all attribute information in one single column with raw grobid measurements. If false then the raw data frame will be processed using `process_raw_annotation_df` Returns ------- df : pd.DataFrame A pandas data frame with extracted attributes. """ # NER doc = self.ee_model(text) logging.info("{} entities detected: {}".format(len(doc.ents), doc.ents)) # Grobid Quantities measurements = self.get_grobid_measurements(text) logging.info("{} measurements detected".format(len(measurements))) # CoreNLP logging.info("Sending CoreNLP query...") response_json = self.get_core_nlp_analysis(text) logging.info( "CoreNLP found {} sentences".format(len(response_json["sentences"])) ) # Analysis columns = ["entity", "entity_type", "attribute"] rows = [] recorded_measurements = set() for entity in doc.ents: for i, measurement in enumerate(measurements): for core_nlp_sentence in response_json["sentences"]: have_link = self.are_linked(measurement, entity, core_nlp_sentence) if have_link > 0: row = { "entity": entity.text, "entity_type": entity.label_, "attribute": measurement, } rows.append(row) recorded_measurements.add(i) if not linked_attributes_only: for i, measurement in enumerate(measurements): if i not in recorded_measurements: row = {"attribute": measurement} rows.append(row) df_attributes = pd.DataFrame(rows, columns=columns) if raw_attributes: return df_attributes else: return self.process_raw_annotation_df(df_attributes)
[docs]class AttributeAnnotationTab(widgets.Tab): """A tab widget for displaying attribute extractions. It is a subclass of the `ipywidgets.Tab` class and contains the following four tabs: - Raw Text - Named Entites - Attributes - Table """ def __init__(self, attribute_extractor, ee_model, text=None): """Initialize class instance. Parameters ---------- attribute_extractor : AttributeExtractor An instance of an attribute extractor. ee_model : spacy.language.Language A spacy model for named entity extraction. text : str, optional A text to initialize the widget with. Can be set or changed later with `set_text`. """ super().__init__() self.attribute_extractor = attribute_extractor self.ee_model = ee_model self._init_ui() if text is not None: self.set_text(text) def _init_ui(self): self.outputs = collections.OrderedDict() self.outputs["Raw Text"] = widgets.Output() self.outputs["Named Entities"] = widgets.Output( layout=widgets.Layout(width="80ch") ) self.outputs["Attributes"] = widgets.Output() self.outputs["Table"] = widgets.Output() self.children = list(self.outputs.values()) for i, name in enumerate(self.outputs): self.set_title(i, name)
[docs] def set_text(self, text): """Set the text for the widget. Parameters ---------- text : str The text to assign to this widget. """ text = textwrap.dedent(text).strip() df = self.attribute_extractor.extract_attributes( text, linked_attributes_only=False ) doc = self.ee_model(text) measurements = self.attribute_extractor.get_grobid_measurements(text) for canvas in self.outputs.values(): canvas.clear_output() with self.outputs["Raw Text"]: print(textwrap.fill(text, 80)) with self.outputs["Named Entities"]: displacy_out = displacy.render(doc, style="ent") if displacy_out is not None: display(HTML(displacy_out)) with self.outputs["Attributes"]: annotated = self.attribute_extractor.annotate_quantities( text, measurements, 70 ) display(annotated) with self.outputs["Table"]: display(df)
[docs]class TextCollectionWidget(widgets.VBox): """A widget displaying annotations for a number o texts. The text can be selected using a slider and the annotation results will be displayed in an `AttributeAnnotationTab` widget. """ def __init__(self, texts, attribute_extractor, ee_model): """Initialize class instance. Parameters ---------- texts : list_like A list of strings with texts to be annotated. attribute_extractor : AttributeExtractor An instance of an attribute extractor. ee_model : spacy.language.Language A spacy model for named entity extraction. """ super().__init__() if not texts: raise TypeError("texts must be a non-empty list.") self.texts = texts self.idx_slider = widgets.IntSlider( description="Text ID", value=0, min=0, max=len(texts) - 1, continuous_update=False, ) self.idx_slider.observe(self._on_idx_change, names="value") self.tab = AttributeAnnotationTab(attribute_extractor, ee_model) text = self.texts[self.idx_slider.value] self.tab.set_text(text) self.children = [ self.idx_slider, self.tab, ] def _on_idx_change(self, event): idx = event["new"] text = self.texts[idx] self.tab.set_text(text)