Source code for bluesearch.server.mining_server

"""The mining server."""

# Blue Brain Search is a text mining toolbox focused on scientific use cases.
#
# Copyright (C) 2020  Blue Brain Project, EPFL.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import io
from typing import Any, Dict, Iterable, Tuple

import pandas as pd
import spacy
from flask import Flask, jsonify, request

import bluesearch

from ..mining import SPECS, run_pipeline
from ..sql import retrieve_articles, retrieve_mining_cache, retrieve_paragraph
from ..utils import load_spacy_model


[docs]class MiningServer(Flask): """The BBS mining server. Parameters ---------- models_libs : dict of str Dictionary mapping each type of extraction ('ee' for entities, 're' for relations, 'ae' for attributes) to the list of paths of available models. connection : sqlalchemy.engine.Engine The database connection. """ def __init__(self, models_libs, connection): package_name, *_ = __name__.partition(".") super().__init__(import_name=package_name) self.version = bluesearch.__version__ self.server_name = "MiningServer" self.logger.info("Initializing the server") self.logger.info(f"Name: {self.server_name}") self.logger.info(f"Version: {self.version}") self.logger.info("Loading the model libraries") self.models_libs = models_libs self.logger.info("Loading the NER models") self.ee_models: Dict[str, spacy.language.Language] = {} self.logger.debug(f"EE models available:\n{str(self.models_libs['ee'])}") for entity_type, model_path in models_libs["ee"].items(): self.logger.info(f"Entity type {entity_type}: loading model {model_path}") self.ee_models[entity_type] = load_spacy_model(model_path) self.connection = connection self.add_url_rule("/text", view_func=self.pipeline_text, methods=["POST"]) self.add_url_rule( "/database", view_func=self.pipeline_database, methods=["POST"] ) self.add_url_rule("/help", view_func=self.help, methods=["POST"]) self.logger.info("Initialization done.")
[docs] def help(self): """Respond to the help.""" self.logger.info("Help called") response = { "name": self.server_name, "version": self.version, "database": self.connection.url.database, "description": "Run the BBS text mining pipeline on a given text.", "POST": { "/help": { "description": "Get this help.", "response_content_type": "application/json", }, "/text": { "description": "Mine a given text according to a given schema.", "response_content_type": "application/json", "required_fields": {"text": [], "schema": []}, "accepted_fields": {"debug": [True, False]}, }, "/database": { "description": "The BBS text mining server." "schema.", "response_content_type": "application/json", "required_fields": { "identifiers": [ ("article_id_1", "paragraph_id_1"), ], "schema": [], }, "accepted_fields": { "debug": [True, False], "use_cache": [True, False], }, }, }, } """Help the user by sending information about the server.""" return jsonify(response)
[docs] def get_available_etypes(self, schema_df): """Find entity extraction model for entity types. Parameters ---------- schema_df : pd.DataFrame Schema of the client request. Returns ------- etypes : set[str] Entity types requested and available on the server. etypes_na : set[str] Entity types requested but not available on the server. """ detected_etypes = set(self.ee_models.keys()) requested_etypes = set(schema_df["entity_type"]) etypes = requested_etypes & detected_etypes etypes_na = requested_etypes - detected_etypes return etypes, etypes_na
[docs] def pipeline_database(self): """Respond to a query on specific paragraphs in the database.""" self.logger.info("Query for mining of articles received") if request.is_json: json_request = request.get_json() identifiers = json_request.get("identifiers") schema_str = json_request.get("schema") debug = json_request.get("debug", False) use_cache = json_request.get("use_cache", True) self.logger.info("Mining parameters:") self.logger.info(f"identifiers : {identifiers}") self.logger.info(f"schema : {schema_str}") self.logger.info(f"debug : {debug}") self.logger.info(f"use_cache : {use_cache}") self.logger.info("Mining starting...") args_err_response = self.check_args_not_null( identifiers=identifiers, schema=schema_str ) if args_err_response: return args_err_response schema_df = self.read_df_from_str(schema_str) self.logger.debug("schema_df:") self.logger.debug(str(schema_df)) if use_cache: self.logger.info("Using cache") # determine which models are necessary etypes, etypes_na = self.get_available_etypes(schema_df) self.logger.debug(f"etypes = {etypes}") self.logger.debug(f"etypes_na = {etypes_na}") # get cached results df_all = retrieve_mining_cache(identifiers, etypes, self.connection) self.logger.debug(f"cached results, df_all =\n{str(df_all)}") # append the ontology source column df_all = self.add_ontology_column(df_all, schema_df) self.logger.debug( f"appended ontology source column, df_all =\n{str(df_all)}" ) # apply specs if not debug if not debug: df_all = pd.DataFrame(df_all, columns=SPECS) self.logger.debug(f"applied column specs, df_all =\n{str(df_all)}") else: self.logger.info("Not using the cache") all_article_ids = [] all_paragraphs = pd.DataFrame() for (article_id, paragraph_pos) in identifiers: if paragraph_pos == -1: all_article_ids += [article_id] else: paragraph = retrieve_paragraph( article_id, paragraph_pos, engine=self.connection ) all_paragraphs = all_paragraphs.append(paragraph) if all_article_ids: articles = retrieve_articles( article_ids=all_article_ids, engine=self.connection ) all_paragraphs = all_paragraphs.append(articles) texts = [ ( row["text"], { "paper_id": f'{row["article_id"]}:{row["section_name"]}' f':{row["paragraph_pos_in_article"]}' }, ) for _, row in all_paragraphs.iterrows() ] df_all, etypes_na = self.mine_texts( texts=texts, schema_df=schema_df, debug=debug ) response = self.create_response(df_all, etypes_na) self.logger.info(f"Mining completed, extracted {len(df_all)} elements.") else: self.logger.info("Request is not JSON. Not processing.") response = self.create_error_response( "The request has to be a JSON object." ) return response
[docs] def pipeline_text(self): """Respond to a custom text query.""" self.logger.info("Query for mining of raw text received") if request.is_json: self.logger.info("Request is JSON. Processing.") json_request = request.get_json() text = json_request.get("text") schema_str = json_request.get("schema") debug = json_request.get("debug", False) self.logger.info("Mining parameters:") self.logger.info(f"text : {text}") self.logger.info(f"schema : {schema_str}") self.logger.info(f"debug : {debug}") args_err_response = self.check_args_not_null(text=text, schema=schema_str) if args_err_response: return args_err_response schema_df = self.read_df_from_str(schema_str) texts: Iterable[Tuple[str, Dict[Any, Any]]] = [(text, {})] df_all, etypes_na = self.mine_texts( texts=texts, schema_df=schema_df, debug=debug ) response = self.create_response(df_all, etypes_na) else: self.logger.info("Request is not JSON. Not processing.") response = self.create_error_response( "The request has to be a JSON object." ) return response
[docs] def mine_texts(self, texts, schema_df, debug): """Run mining pipeline on a given list of texts.""" self.logger.info("Running the mining pipeline...") etypes, etypes_na = self.get_available_etypes(schema_df) df_all = pd.DataFrame() for etype in etypes: ee_model = self.ee_models[etype] df = run_pipeline( texts=texts, model_entities=ee_model, models_relations={}, debug=debug ) df_all = df_all.append(df) df_all = self.add_ontology_column(df_all, schema_df) self.logger.info(f"Mining completed. Mined {len(df_all)} items.") return ( df_all.sort_values(by=["paper_id", "start_char"], ignore_index=True), etypes_na, )
[docs] def check_args_not_null(self, **kwargs): """Sanity check that arguments provided are not null. Returns False if all is good. """ for k, v in kwargs.items(): if v is None: self.logger.info(f'No "{k}" was provided. Stopping.') return self.create_error_response(f'The request "{k}" is missing.') return False
[docs] @staticmethod def add_ontology_column(df_all, schema_df): """Add ontology column to dataframe.""" os_mapping = { et: os for _, (et, os) in schema_df[["entity_type", "ontology_source"]].iterrows() } df_all["ontology_source"] = df_all["entity_type"].apply(lambda x: os_mapping[x]) return df_all
[docs] @staticmethod def read_df_from_str(df_str, drop_duplicates=True): """Read a csv file from a string into a pd.DataFrame.""" with io.StringIO(df_str) as sio: schema_df = pd.read_csv(sio) if drop_duplicates: schema_df = schema_df.drop_duplicates(keep="first", ignore_index=True) return schema_df
[docs] @staticmethod def create_error_response(error_message): """Create response if there is an error during the process. Parameters ---------- error_message : str Error message to send if there is an issue. Returns ------- response : str Response to send with the error_message in a json format. """ response = jsonify(error=error_message) return response, 400
[docs] @staticmethod def create_response(df_extractions, etypes_na): """Create the response thanks to dataframe. Parameters ---------- df_extractions : pd.DataFrame DataFrame containing all the elements extracted by text mining. etypes_na : Iterable[str] Entity types found in the request CSV file for which no available model was found in the library. Returns ------- response : requests.response Response containing the dataframe converted in csv table. """ csv_extractions = df_extractions.to_csv(index=False) warnings = [ f'No text mining model was found in the library for "{etype}".' for etype in etypes_na ] return jsonify(csv_extractions=csv_extractions, warnings=warnings), 200