Source code for bluesearch.sql

"""SQL Related functions."""

# Blue Brain Search is a text mining toolbox focused on scientific use cases.
#
# Copyright (C) 2020  Blue Brain Project, EPFL.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import logging
from typing import cast

import numpy as np
import pandas as pd
import sqlalchemy.sql as sql


[docs]def get_titles(article_ids, engine): """Get article titles from the SQL database. Parameters ---------- article_ids : iterable of int An iterable of article IDs. engine : sqlalchemy.engine.Engine SQLAlchemy Engine connected to the database. Returns ------- titles : dict Dictionary mapping article IDs to the article titles. """ if len(article_ids) == 0: return {} query = sql.text( """SELECT article_id, title FROM articles WHERE article_id IN :article_ids """ ) query = query.bindparams(sql.bindparam("article_ids", expanding=True)) with engine.begin() as connection: response = connection.execute(query, {"article_ids": article_ids}).fetchall() titles = {article_id: title for article_id, title in response} return titles
[docs]def retrieve_article_ids(engine): """Retrieve all articles_id from sentences table. Parameters ---------- engine : sqlalchemy.engine.Engine SQLAlchemy Engine connected to the database. Returns ------- article_id_dict : dict Dictionary giving the corresponding article_id for a given sentence_id """ result_proxy = engine.execute("SELECT sentence_id, article_id FROM sentences") article_id_dict = dict(result_proxy.fetchall()) return article_id_dict
[docs]def retrieve_sentences_from_sentence_ids(sentence_ids, engine, keep_order=False): """Retrieve sentences given sentence ids. Parameters ---------- sentence_ids : iterable of int Sentence ids for which need to retrieve the text. engine : sqlalchemy.engine.Engine SQLAlchemy Engine connected to the database. keep_order : bool, optional Make sure that the order of sentence ID in the result data frame is the same. Note that the default value is `False`. Returns ------- df_sentences : pd.DataFrame Pandas DataFrame containing all sentences and their corresponding metadata: article_id, sentence_id, section_name, text, paragraph_pos_in_article. """ sql_query = sql.text( """ SELECT article_id, sentence_id, section_name, text, paragraph_pos_in_article FROM sentences WHERE sentence_id IN :sentence_ids """ ) sql_query = sql_query.bindparams(sql.bindparam("sentence_ids", expanding=True)) with engine.begin() as connection: df_sentences = pd.read_sql( sql_query, params={"sentence_ids": [int(id_) for id_ in sentence_ids]}, con=connection, ) if keep_order: # Remove sentence IDs that were not found, otherwise df.loc will fail. found_sentence_ids = set(df_sentences["sentence_id"]) sentence_ids = filter(lambda x: x in found_sentence_ids, sentence_ids) # Sort the dataframe by sentence_ids df_sentences = ( df_sentences.set_index("sentence_id").loc[sentence_ids].reset_index() ) return df_sentences
[docs]def retrieve_paragraph_from_sentence_id(sentence_id, engine): """Retrieve paragraph given one sentence id. Parameters ---------- sentence_id : int Sentence id for which need to retrieve the paragraph. engine : sqlalchemy.engine.Engine SQLAlchemy Engine connected to the database. Returns ------- paragraph : str or None If ``str`` then a paragraph containing the sentence of the given sentence_id. If None then the `sentence_id` was not found in the sentences table. """ sql_query = sql.text( """SELECT text FROM sentences WHERE article_id = (SELECT article_id FROM sentences WHERE sentence_id = :sentence_id ) AND paragraph_pos_in_article = (SELECT paragraph_pos_in_article FROM sentences WHERE sentence_id = :sentence_id ) ORDER BY sentence_pos_in_paragraph ASC""" ) all_sentences = pd.read_sql( sql_query, engine, params={"sentence_id": int(sentence_id)} )["text"].to_list() if not all_sentences: paragraph = None else: paragraph = " ".join(all_sentences) return paragraph
[docs]def retrieve_paragraph(article_id, paragraph_pos_in_article, engine): """Retrieve paragraph given one identifier (article_id, paragraph_pos_in_article). Parameters ---------- article_id : int Article id. paragraph_pos_in_article : int Relative position of a paragraph in an article. Note that the numbering starts from 0. engine : sqlalchemy.engine.Engine SQLAlchemy Engine connected to the database. Returns ------- paragraph : pd.DataFrame pd.DataFrame with the paragraph and its metadata: article_id, text, section_name, paragraph_pos_in_article. """ sql_query = sql.text( """SELECT section_name, text FROM sentences WHERE article_id = :article_id AND paragraph_pos_in_article = :paragraph_pos_in_article ORDER BY sentence_pos_in_paragraph ASC""" ) sentences = pd.read_sql( sql_query, engine, params={ "article_id": int(article_id), "paragraph_pos_in_article": int(paragraph_pos_in_article), }, ) if sentences.empty: paragraph = pd.DataFrame( columns=["article_id", "text", "section_name", "paragraph_pos_in_article"] ) else: sentences_text = sentences["text"].to_list() section_name = sentences["section_name"].iloc[0] paragraph_text = " ".join(sentences_text) paragraph = pd.DataFrame( [ { "article_id": article_id, "text": paragraph_text, "section_name": section_name, "paragraph_pos_in_article": paragraph_pos_in_article, }, ] ) return paragraph
[docs]def retrieve_article_metadata_from_article_id(article_id, engine): """Retrieve article metadata given one article id. Parameters ---------- article_id : int Article id for which need to retrieve the article metadata. engine : sqlalchemy.engine.Engine SQLAlchemy Engine connected to the database. Returns ------- article : pd.DataFrame DataFrame containing the article metadata. The columns are 'article_id', 'cord_uid', 'sha', 'source_x', 'title', 'doi', 'pmcid', 'pubmed_id', 'license', 'abstract', 'publish_time', 'authors', 'journal', 'mag_id', 'who_covidence_id', 'arxiv_id', 'pdf_json_files', 'pmc_json_files', 'url', 's2_id'. """ sql_query = sql.text( """SELECT * FROM articles WHERE article_id = :article_id""" ) article = pd.read_sql(sql_query, engine, params={"article_id": int(article_id)}) return article
[docs]def retrieve_articles(article_ids, engine): """Retrieve article given multiple article ids. Parameters ---------- article_ids : list of int List of Article id for which need to retrieve the entire text article. engine : sqlalchemy.engine.Engine SQLAlchemy Engine connected to the database. Returns ------- articles : pd.DataFrame DataFrame containing the articles divided into paragraphs. The columns are 'article_id', 'paragraph_pos_in_article', 'text', 'section_name'. """ article_ids = [int(id_) for id_ in article_ids] sql_query = sql.text( """SELECT * FROM sentences WHERE article_id IN :articles_ids ORDER BY article_id ASC, paragraph_pos_in_article ASC, sentence_pos_in_paragraph ASC""" ) sql_query = sql_query.bindparams(sql.bindparam("articles_ids", expanding=True)) all_sentences = pd.read_sql(sql_query, engine, params={"articles_ids": article_ids}) groupby_var = all_sentences.groupby(by=["article_id", "paragraph_pos_in_article"]) paragraphs = groupby_var["text"].apply(lambda x: " ".join(x)) section_name = groupby_var["section_name"].unique().apply(lambda x: x[0]) articles = pd.DataFrame( {"text": paragraphs, "section_name": section_name} ).reset_index() return articles
[docs]def retrieve_mining_cache(identifiers, etypes, engine): """Retrieve cached mining results. Parameters ---------- identifiers : list of tuple Tuples of form (article_id, paragraph_pos_in_article). Note that if `paragraph_pos_in_article` is -1 then we are considering all the paragraphs. etypes : list List of entity types to consider. Duplicates are removed automatically. engine : sqlalchemy.engine.Engine SQLAlchemy Engine connected to the database. Returns ------- result : pd.DataFrame Selected rows of the `mining_cache` table. """ logger = logging.getLogger("retrieve_mining_cache") logger.debug("parameters:") logger.debug(f"identifiers = {identifiers}") logger.debug(f"etypes = {etypes}") logger.debug(f"engine = {engine}") etypes = tuple(set(etypes)) identifiers_arts = [int(a) for a, p in identifiers if p == -1] if identifiers_arts: query_arts = sql.text( """ SELECT * FROM mining_cache WHERE article_id IN :identifiers_arts AND entity_type IN :etypes ORDER BY article_id, paragraph_pos_in_article, start_char """ ) query_arts = query_arts.bindparams( sql.bindparam("identifiers_arts", expanding=True), sql.bindparam("etypes", expanding=True), ) df_arts = pd.read_sql( query_arts, con=engine, params={"identifiers_arts": identifiers_arts, "etypes": etypes}, ) else: logger.debug("setting df_arts to emtpy because `not identifiers_arts == True`") df_arts = pd.DataFrame() identifiers_pars = [(a, p) for a, p in identifiers if p != -1] if identifiers_pars: # Remarks # 1. Conditions are mutually exclusive, so several `UNION`s are # equivalent to several `OR`s. # 2. `UNION` is considerably faster than `OR` in this case. # 3. If `len(identifiers_pars)` is too large, we may have a too long # SQL statement which overflows the max length. So we break it down. if len(etypes) == 1: etypes = f"('{etypes[0]}')" batch_size = 1000 dfs_pars = [] d, r = divmod(len(identifiers_pars), batch_size) for i in range(0, d + (r > 0)): # Reformatted due to this bandit bug in python3.8: # https://github.com/PyCQA/bandit/issues/658 query_pars = " UNION ".join( # nosec "SELECT * FROM mining_cache " f"WHERE (article_id = {a} AND paragraph_pos_in_article = {p})" for a, p in identifiers_pars[i * batch_size : (i + 1) * batch_size] ) # Reformatted due to this bandit bug in python3.8: # https://github.com/PyCQA/bandit/issues/658 query_pars = ( # nosec f"SELECT * FROM ({query_pars}) tt " f"WHERE tt.entity_type IN {etypes}" ) dfs_pars.append(pd.read_sql(query_pars, engine)) df_pars = pd.concat(dfs_pars) # cast() to tell mypy that sort_values() doesn't return None here. df_pars = cast( pd.DataFrame, df_pars.sort_values( by=["article_id", "paragraph_pos_in_article", "start_char"] ), ) else: logger.debug("setting df_pars to emtpy because `not identifiers_pars == True`") df_pars = pd.DataFrame() return df_pars.append(df_arts, ignore_index=True)
[docs]class SentenceFilter: """Filter sentence IDs by applying conditions. Instantiate this class and apply different filters by calling the corresponding filtering methods in any order. Finally, call either the `run()` or the `stream()` method to obtain the filtered sentence IDs. Example ------- .. code-block:: python import sqlalchemy connection = sqlalchemy.create_engine("...") filtered_sentence_ids = ( SentenceFilter(connection) .only_with_journal() .restrict_sentences_ids_to([1, 2, 3, 4, 5]) .date_range((2010, 2020)) .exclude_strings(["virus", "disease"]) .run() ) When the `run()` or the `stream()` method is called an SQL query is constructed and executed internally. For the example above it would have approximately the following form .. code-block:: SQL SELECT sentence_id FROM sentences WHERE article_id IN ( SELECT article_id FROM articles WHERE publish_time BETWEEN '2010-01-01' AND '2020-12-31' AND journal IS NOT NULL ) AND sentence_id IN ('1', '2', '3', '4', '5') AND text NOT LIKE '%virus%' AND text NOT LIKE '%disease%' Parameters ---------- connection : sqlalchemy.engine.Engine Connection to the database that contains the `articles` and `sentences` tables. """ def __init__(self, connection): self.connection = connection self.logger = logging.getLogger(self.__class__.__name__) self.only_english_flag = False self.only_with_journal_flag = False self.discard_bad_sentences_flag = False self.year_from = None self.year_to = None self.string_exclusions = [] self.string_inclusions = [] self.restricted_sentence_ids = None
[docs] def discard_bad_sentences(self, flag=True): """Discard sentences that are flagged as bad. Parameters ---------- flag : bool If True, then all sentences with `True` in the `is_bad` column are discarded. Returns ------- self : SentenceFilter The instance of `SentenceFilter` itself. Useful for chained applications of filters. """ self.logger.info(f"Discard bad: {flag}") self.discard_bad_sentences_flag = flag return self
[docs] def only_english(self, flag=True): """Only select articles that are in English. Parameters ---------- flag : bool If True, then only articles for which are in English will be selected. Returns ------- self : SentenceFilter The instance of `SentenceFilter` itself. Useful for chained applications of filters. """ self.logger.info(f"Only in English: {flag}") self.only_english_flag = flag return self
[docs] def only_with_journal(self, flag=True): """Only select articles with a journal. Parameters ---------- flag : bool If True, then only articles for which a journal was specified will be selected. Returns ------- self : SentenceFilter The instance of `SentenceFilter` itself. Useful for chained applications of filters. """ self.logger.info(f"Only with journal: {flag}") self.only_with_journal_flag = flag return self
[docs] def date_range(self, date_range=None): """Restrict to articles in a given date range. Parameters ---------- date_range : tuple or None A tuple with two elements of the form `(start_year, end_year)`. If None then nothing no date range is applied. Returns ------- self : SentenceFilter The instance of `SentenceFilter` itself. Useful for chained applications of filters. """ self.logger.info(f"Date range: {date_range}") if date_range is not None: self.year_from, self.year_to = date_range return self
[docs] def include_strings(self, strings): """Include only sentences containing all of the given strings. Parameters ---------- strings : list_like The strings to include. Returns ------- self : SentenceFilter The instance of `SentenceFilter` itself. """ self.logger.info(f"Include strings: {strings}") strings = map(lambda s: s.lower(), strings) strings = filter(lambda s: len(s) > 0, strings) self.string_inclusions.extend(strings) return self
[docs] def exclude_strings(self, strings): """Exclude sentences containing any of the given strings. Parameters ---------- strings : list_like The strings to exclude. Returns ------- self : SentenceFilter The instance of `SentenceFilter` itself. Useful for chained applications of filters. """ self.logger.info(f"Exclude strings: {strings}") strings = map(lambda s: s.lower(), strings) strings = filter(lambda s: len(s) > 0, strings) self.string_exclusions.extend(strings) return self
[docs] def restrict_sentences_ids_to(self, sentence_ids): """Restrict sentence IDs to the given ones. Parameters ---------- sentence_ids : list_like The sentence IDs to restrict to. Returns ------- self : SentenceFilter The instance of `SentenceFilter` itself. Useful for chained applications of filters. """ # For logging if len(sentence_ids) > 5: ids_str = f"[{', '.join(map(str, sentence_ids[:5]))} ..." else: ids_str = str(sentence_ids) self.logger.info(f"Restricting to sentencs IDs: {ids_str}") # The actual restriction self.restricted_sentence_ids = tuple(sentence_ids) return self
def _build_query(self): article_conditions = [] sentence_conditions = [] # Discard bad condition if self.discard_bad_sentences_flag: sentence_conditions.append("is_bad = 0") # In English condition if self.only_english_flag: article_conditions.append("is_english = 1") # Journal condition if self.only_with_journal_flag: article_conditions.append("journal IS NOT NULL") # Date range condition if self.year_from is not None and self.year_to is not None: from_date = f"{self.year_from:04d}-01-01" to_date = f"{self.year_to:04d}-12-31" article_conditions.append( f"publish_time BETWEEN '{from_date}' AND '{to_date}'" ) # Add article conditions to sentence conditions if len(article_conditions) > 0: # Reformatted due to this bandit bug in python3.8: # https://github.com/PyCQA/bandit/issues/658 article_condition_query = ( # nosec "article_id IN ( " " SELECT article_id " " FROM articles " f' WHERE {" AND ".join(article_conditions)} ' ")" ).strip() # nosec sentence_conditions.append(article_condition_query) # Restricted sentence IDs if self.restricted_sentence_ids is not None: sentence_ids_s = ", ".join(str(x) for x in self.restricted_sentence_ids) if not sentence_ids_s and self.connection.url.drivername in { "mysql+mysqldb", "mysql+pymysql", }: sentence_ids_s = "NULL" sentence_conditions.append(f"sentence_id IN ({sentence_ids_s})") # Inclusion and Exclusion Text if self.connection.url.drivername in {"mysql+mysqldb", "mysql+pymysql"}: if self.string_inclusions: inclusions = " ".join( f'+"{string}"' if len(string.split(" ")) > 1 else f"+{string}" for string in self.string_inclusions ) exclusions = " ".join( f'-"{string}"' if len(string.split(" ")) > 1 else f"-{string}" for string in self.string_exclusions ) condition = f"{inclusions} {exclusions}".strip() sentence_conditions.append( f"MATCH(text) AGAINST ('{condition}' IN BOOLEAN MODE)" ) elif self.string_exclusions: # This elif statement is to create conditions if there are # onlyexclusions words; without any inclusions. Indeed, # in this case, MATCH AGAINST IN BOOLEAN MODE does; not work # anymore as you can find on the official docs: # https://dev.mysql.com/doc/refman/8.0/en/fulltext-boolean.html # boolean-mode search that contains only terms preceded by - # returns an empty result for text in self.string_exclusions: sentence_conditions.append(f"INSTR(text, '{text}') = 0") else: for text in self.string_exclusions: sentence_conditions.append(f"text NOT LIKE '%{text}%'") for text in self.string_inclusions: sentence_conditions.append(f"text LIKE '%{text}%'") # Build and send query query = "SELECT sentence_id FROM sentences" if len(sentence_conditions) > 0: query = f"{query} WHERE {' AND '.join(sentence_conditions)}" return query
[docs] def iterate(self, chunk_size): """Run the filtering query and iterate over restricted sentence IDs. Parameters ---------- chunk_size : int The size of the batches of sentence IDs that are yielded. Yields ------ result_arr : np.ndarray A 1-dimensional numpy array with the filtered sentence IDs. Its length will be at most equal to `chunk_size`. """ self.logger.info(f"Iterating filtering with chunk size {chunk_size}") query = self._build_query() # self.logger.info(f"Query: {query}") for df_results in pd.read_sql(query, self.connection, chunksize=chunk_size): result_arr = df_results["sentence_id"].to_numpy() yield result_arr
[docs] def run(self): """Run the filtering query to find restricted sentence IDs. Returns ------- result_arr : np.ndarray A 1-dimensional numpy array with the filtered sentence IDs. """ self.logger.info("Running the filtering query") query = self._build_query() # self.logger.info(f"Query: {query}") self.logger.debug("Running pd.read_sql") results = [row[0] for row in self.connection.execute(query).fetchall()] self.logger.info(f"Filtering gave {len(results)} results") return np.array(results)