"""Module for the Database Creation."""
# Blue Brain Search is a text mining toolbox focused on scientific use cases.
#
# Copyright (C) 2020 Blue Brain Project, EPFL.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import json
import logging
import pathlib
import time
import langdetect
import langdetect.lang_detect_exception
import pandas as pd
import sqlalchemy
from bluesearch.database.article import Article, CORD19ArticleParser
from bluesearch.utils import load_spacy_model
logger = logging.getLogger(__name__)
[docs]def mark_bad_sentences(engine, sentences_table_name):
"""Flag bad sentences in SQL database.
Parameters
----------
engine : sqlalchemy.engine.Engine
The connection to an SQL database.
sentences_table_name : str
The table with sentences.
Raises
------
RuntimeError
If the column "is_bad" is missing in the table provided.
"""
logger.info('Verifying the column "is_bad" is present')
with engine.begin() as connection:
inspector = sqlalchemy.inspect(connection)
columns = inspector.get_columns(sentences_table_name)
columns_name = [col["name"] for col in columns]
if "is_bad" not in columns_name:
raise RuntimeError("Column is_bad not found in given table")
logger.info("Getting all sentences")
with engine.begin() as connection:
query = f"SELECT sentence_id, text FROM {sentences_table_name}" # nosec
df_sentences = pd.read_sql(query, connection)
logger.info("Computing text lengths")
text_length = df_sentences["text"].str.len()
logger.info("Checking for LaTeX")
has_latex = df_sentences["text"].str.contains(r"\\[a-z]+{")
logger.info("Checking for minimal length")
too_short = text_length < 20
logger.info("Checking for maximal length")
too_long = text_length > 2000
df_sentences["is_bad"] = has_latex | too_short | too_long
n_bad = df_sentences["is_bad"].sum()
n_total = len(df_sentences)
bad_percent = n_bad / n_total * 100
logger.info(f"{n_bad} of {n_total} found to be bad ({bad_percent:.2f}%)")
bad_sentence_ids = df_sentences["sentence_id"][df_sentences["is_bad"]]
if len(bad_sentence_ids) > 0:
logger.info("Writing results to database")
bad_sentence_ids = ", ".join(str(id_) for id_ in bad_sentence_ids)
with engine.begin() as connection:
query = f"""
UPDATE {sentences_table_name}
SET is_bad = 1
WHERE sentence_id in ({bad_sentence_ids})
"""
connection.execute(query)
else:
logger.info("Nothing to write to database")
[docs]class CORD19DatabaseCreation:
"""Create SQL database from a specified dataset.
Parameters
----------
data_path : str or pathlib.Path
Directory to the dataset where metadata.csv and all jsons file
are located.
engine : SQLAlchemy.Engine
Engine linked to the database.
Attributes
----------
max_text_length : int
Max length of values in MySQL column of type TEXT. We have to
constraint our text values to be smaller than this value
(especially articles.abstract and sentences.text).
"""
def __init__(self, data_path, engine):
self.data_path = pathlib.Path(data_path)
if not self.data_path.exists():
raise NotADirectoryError(
f"The data directory {self.data_path} does not exit"
)
self.metadata = pd.read_csv(self.data_path / "metadata.csv")
self.is_constructed = False
self.engine = engine
self.logger = logging.getLogger(self.__class__.__name__)
self.max_text_length = 60000
[docs] def construct(self):
"""Construct the database."""
if not self.is_constructed:
self._schema_creation()
self.logger.info("Schemas of the tables are created.")
self._articles_table()
self.logger.info("Articles table is created.")
self._sentences_table()
self.logger.info("Sentences table is created.")
self.is_constructed = True
else:
raise ValueError("This database is already constructed!")
def _schema_creation(self):
"""Create the schemas of the different tables in the database."""
metadata = sqlalchemy.MetaData()
self.articles_table = sqlalchemy.Table(
"articles",
metadata,
sqlalchemy.Column(
"article_id", sqlalchemy.Integer(), primary_key=True, autoincrement=True
),
sqlalchemy.Column("cord_uid", sqlalchemy.String(8), nullable=False),
sqlalchemy.Column("sha", sqlalchemy.Text()),
sqlalchemy.Column("source_x", sqlalchemy.Text()),
sqlalchemy.Column("title", sqlalchemy.Text()),
sqlalchemy.Column("doi", sqlalchemy.Text()),
sqlalchemy.Column("pmcid", sqlalchemy.Text()),
sqlalchemy.Column("pubmed_id", sqlalchemy.Text()),
sqlalchemy.Column("license", sqlalchemy.Text()),
sqlalchemy.Column("abstract", sqlalchemy.Text()),
sqlalchemy.Column("publish_time", sqlalchemy.Date()),
sqlalchemy.Column("authors", sqlalchemy.Text()),
sqlalchemy.Column("journal", sqlalchemy.Text()),
sqlalchemy.Column("mag_id", sqlalchemy.Text()),
sqlalchemy.Column("who_covidence_id", sqlalchemy.Text()),
sqlalchemy.Column("arxiv_id", sqlalchemy.Text()),
sqlalchemy.Column("pdf_json_files", sqlalchemy.Text()),
sqlalchemy.Column("pmc_json_files", sqlalchemy.Text()),
sqlalchemy.Column("url", sqlalchemy.Text()),
sqlalchemy.Column("s2_id", sqlalchemy.Text()),
sqlalchemy.Column("is_english", sqlalchemy.Boolean()),
)
self.sentences_table = sqlalchemy.Table(
"sentences",
metadata,
sqlalchemy.Column(
"sentence_id",
sqlalchemy.Integer(),
primary_key=True,
autoincrement=True,
),
sqlalchemy.Column("section_name", sqlalchemy.Text()),
sqlalchemy.Column(
"article_id",
sqlalchemy.Integer(),
sqlalchemy.ForeignKey("articles.article_id"),
nullable=False,
),
sqlalchemy.Column("text", sqlalchemy.Text()),
sqlalchemy.Column(
"paragraph_pos_in_article", sqlalchemy.Integer(), nullable=False
),
sqlalchemy.Column(
"sentence_pos_in_paragraph", sqlalchemy.Integer(), nullable=False
),
sqlalchemy.UniqueConstraint(
"article_id",
"paragraph_pos_in_article",
"sentence_pos_in_paragraph",
name="sentence_unique_identifier",
),
sqlalchemy.Column("is_bad", sqlalchemy.Boolean(), server_default="0"),
)
with self.engine.begin() as connection:
metadata.create_all(connection)
def _articles_table(self):
"""Fill the Article Table thanks to 'metadata.csv'.
The articles table has all the metadata.csv columns
expect the 'sha'. Moreover, the columns are renamed
(cfr. _rename_columns).
"""
rejected_articles = []
df = self.metadata.drop_duplicates("cord_uid", keep="first")
df["publish_time"] = pd.to_datetime(df["publish_time"])
for index, article in df.iterrows():
try:
if (
isinstance(article["abstract"], str)
and len(article["abstract"]) > self.max_text_length
):
article["abstract"] = article["abstract"][: self.max_text_length]
self.logger.warning(
f"The abstract of article {index} has a length >"
f" {self.max_text_length} and was cut off for the "
f"database."
)
with self.engine.begin() as con:
article.to_frame().transpose().to_sql(
name="articles", con=con, index=False, if_exists="append"
)
except Exception as e:
rejected_articles += [index]
self.logger.error(
f"Number of articles rejected: {len(rejected_articles)}"
)
self.logger.error(f"Last rejected: {rejected_articles[-1]}")
self.logger.error(str(e))
if index % 1000 == 0:
self.logger.info(f"Number of articles saved: {index}")
def _process_article_sentences(self, article, nlp):
paragraphs = []
article_id = int(article["article_id"])
paragraph_pos_in_article = 0
pmc_json = pdf_json = False
# Read title and abstract
if article["title"] is not None:
text = article["title"]
meta = {
"section_name": "Title",
"article_id": article_id,
"paragraph_pos_in_article": paragraph_pos_in_article,
}
paragraphs += [(text, meta)]
paragraph_pos_in_article += 1
if article["abstract"] is not None:
text = article["abstract"]
meta = {
"section_name": "Abstract",
"article_id": article_id,
"paragraph_pos_in_article": paragraph_pos_in_article,
}
paragraphs += [(text, meta)]
paragraph_pos_in_article += 1
# Find files linked to articles
if article["pmc_json_files"] is not None:
pmc_json = True
jsons_path = article["pmc_json_files"].split("; ")
elif article["pdf_json_files"] is not None:
pdf_json = True
jsons_path = article["pdf_json_files"].split("; ")
else:
jsons_path = []
# Load json
for json_path in jsons_path:
with open(self.data_path / json_path.strip()) as fp:
json_file_data = json.load(fp)
parser = CORD19ArticleParser(json_file_data)
article = Article.parse(parser)
for section_title, text in article.iter_paragraphs():
metadata = {
"section_name": section_title,
"article_id": article_id,
"paragraph_pos_in_article": paragraph_pos_in_article,
}
paragraphs.append((text, metadata))
paragraph_pos_in_article += 1
sentences = self.segment(nlp, paragraphs)
sentences_df = pd.DataFrame(
sentences,
columns=[
"sentence_id",
"section_name",
"article_id",
"text",
"paragraph_pos_in_article",
"sentence_pos_in_paragraph",
],
)
# Consider first n sentences in paper to quickly determine
# if it is in English
n_sents_language = 10
is_english = self.check_is_english(
" ".join(sentences_df[:n_sents_language]["text"])
)
update_stmt = """
UPDATE articles
SET is_english = :is_english
WHERE article_id = :article_id
"""
with self.engine.begin() as con:
sentences_df.to_sql(
name="sentences", con=con, index=False, if_exists="append"
)
con.execute(
sqlalchemy.sql.text(update_stmt),
is_english=is_english,
article_id=article_id,
)
return pmc_json, pdf_json
def _sentences_table(self, model_name="en_core_sci_lg"):
"""Fill the sentences table thanks to all the json files.
For each paragraph, all sentences are extracted and populate
the sentences table.
Parameters
----------
model_name : str, optional
SpaCy model used to parse the text into sentences.
Returns
-------
pmc : int
Number of articles with at least one pmc_json.
pdf : int
Number of articles that does not have pmc_json file but
at least one pdf_json.
rejected_articles : list of int
Article_id of the articles that raises an error during
the parsing.
"""
nlp = load_spacy_model(model_name, disable=["tagger", "ner"])
articles_table = pd.read_sql(
"""
SELECT article_id, title, abstract, pmc_json_files, pdf_json_files
FROM articles
WHERE (abstract IS NOT NULL) OR (title IS NOT NULL)
""",
con=self.engine,
)
pdf = 0
pmc = 0
rejected_articles = []
num_articles = 0
start = time.perf_counter()
for _, article in articles_table.iterrows():
try:
pmc_json, pdf_json = self._process_article_sentences(article, nlp)
if pmc_json:
pmc += 1
if pdf_json:
pdf += 1
except Exception as e:
rejected_articles += [int(article["article_id"])]
self.logger.error(
f"{len(rejected_articles)} Rejected Articles: "
f"{rejected_articles[-1]}"
)
self.logger.error(str(e))
num_articles += 1
if num_articles % 1000 == 0:
self.logger.info(
f"Number of articles: {num_articles} in "
f"{time.perf_counter() - start:.1f} seconds"
)
# Create article_id index
mymodel_url_index = sqlalchemy.Index(
"article_id_index", self.sentences_table.c.article_id
)
mymodel_url_index.create(bind=self.engine)
# Create is_bad and article_id index
sqlalchemy.Index(
"is_bad_article_id_index",
self.sentences_table.c.article_id,
self.sentences_table.c.is_bad,
).create(bind=self.engine)
# Create FULLTEXT INDEX
if self.engine.url.drivername.startswith("mysql"):
with self.engine.begin() as connection:
self.logger.info(
"Start creating FULLTEXT INDEX on sentences (column text)"
)
connection.execute(
"CREATE FULLTEXT INDEX fulltext_text ON sentences(text)"
)
self.logger.info("Ended creating FULLTEXT INDEX")
return pmc, pdf, rejected_articles
[docs] def segment(self, nlp, paragraphs):
"""Segment a paragraph/article into sentences.
Parameters
----------
nlp : spacy.language.Language
Spacy pipeline applying sentence segmentation.
paragraphs : List of tuples (text, metadata)
List of Paragraph/Article in raw text to segment into sentences.
[(text, metadata), ].
Returns
-------
all_sentences : list of dict
List of all the sentences extracted from the paragraph.
"""
if isinstance(paragraphs, str):
paragraphs = [paragraphs]
all_sentences = []
for paragraph, metadata in nlp.pipe(paragraphs, as_tuples=True):
for pos, sent in enumerate(paragraph.sents):
text = str(sent)
if len(text) > self.max_text_length:
text = text[: self.max_text_length]
self.logger.warning(
f'One sentence (article {metadata["article_id"]}, '
f'paragraph {metadata["paragraph_pos_in_article"]},'
f"sentence pos {pos}) has a length > {self.max_text_length}"
f"and was cut off for the database."
)
all_sentences += [
{"text": text, "sentence_pos_in_paragraph": pos, **metadata}
]
return all_sentences
[docs] def check_is_english(self, text):
"""Check if the given text is English.
Note the algorithm seems to be non-deterministic,
as mentioned in https://github.com/Mimino666/langdetect#basic-usage.
This is the reason of using `langdetect.DetectorFactory.seed = 0`
Parameters
----------
text : str
Text to analyze.
Returns
-------
lang : bool or None
Whether the language of the provided `text` is in English or not. If
the input `text` is an empty string, `None` is returned.
"""
langdetect.DetectorFactory.seed = 0
lang = None
if isinstance(text, str):
try:
lang = str(langdetect.detect(text))
except langdetect.lang_detect_exception.LangDetectException as e:
self.logger.info(e)
is_english = lang == "en"
return is_english