"""Module for the Database Creation."""
# Blue Brain Search is a text mining toolbox focused on scientific use cases.
#
# Copyright (C) 2020 Blue Brain Project, EPFL.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import io
import logging
import multiprocessing as mp
import queue
import traceback
from typing import Dict, List
import sqlalchemy
from ..mining.pipeline import run_pipeline
from ..sql import retrieve_articles
from ..utils import load_spacy_model
[docs]class Miner:
"""Multiprocessing worker class for mining named entities.
Parameters
----------
database_url : str
URL of a database already containing tables `articles` and `sentences`.
The URL should indicate database dialect and connection argument, e.g.
`database_url = "postgresql://scott:tiger@localhost/test"`.
model_path : str
The path for loading the spacy model that will perform the
named entity extraction.
target_table : str
The target table name for the mining results.
task_queue : multiprocessing.Queue
The queue with tasks for this worker
can_finish : multiprocessing.Event
A flag to indicate that the worker can stop waiting for new
tasks. Unless this flag is set, the worker will continue
polling the task queue for new tasks.
"""
def __init__(
self,
database_url,
model_path,
target_table,
task_queue,
can_finish,
):
self.name = mp.current_process().name
self.engine = sqlalchemy.create_engine(database_url)
self.model_path = model_path
self.target_table_name = target_table
self.task_queue = task_queue
self.can_finish = can_finish
self.n_tasks_done = 0
self.logger = logging.getLogger(str(self))
self.logger.info("Disposing of existing connections in engine")
# This is important for multiprocessing
self.engine.dispose()
self.logger.info("Loading the NLP model")
self.model = load_spacy_model(self.model_path)
[docs] @classmethod
def create_and_mine(
cls,
database_url,
model_path,
target_table,
task_queue,
can_finish,
):
"""Create a miner instance and start the mining loop.
Parameters
----------
database_url : str
URL of a database already containing tables `articles` and `sentences`.
The URL should indicate database dialect and connection argument, e.g.
`database_url = "postgresql://scott:tiger@localhost/test"`.
model_path : str
The path for loading the spacy model that will perform the
named entity extraction.
target_table : str
The target table name for the mining results.
task_queue : multiprocessing.Queue
The queue with tasks for this worker
can_finish : multiprocessing.Event
A flag to indicate that the worker can stop waiting for new
tasks. Unless this flag is set, the worker will continue
polling the task queue for new tasks.
"""
miner = cls(
database_url=database_url,
model_path=model_path,
target_table=target_table,
task_queue=task_queue,
can_finish=can_finish,
)
miner.work_loop()
miner.clean_up()
def _log_exception(self, article_id):
"""Log any unhandled exception raised during mining."""
error_trace = io.StringIO()
traceback.print_exc(file=error_trace)
error_message = f"\nArticle ID: {article_id}\n" + error_trace.getvalue()
self.logger.error(error_message)
[docs] def work_loop(self):
"""Do the mining work loop."""
self.logger.info("Starting mining")
while not self.can_finish.is_set():
# Just get new tasks until the main thread sets `can_finish`
try:
article_id = self.task_queue.get(timeout=1.0)
except queue.Empty:
# This doesn't always mean that the queue is empty,
# and is raised when queue.get() times out.
self.logger.debug("queue.Empty raised")
else:
try:
self._mine(article_id)
self.n_tasks_done += 1
except Exception:
self._log_exception(article_id)
def _generate_texts_with_metadata(self, article_ids):
"""Return a generator of (text, metadata_dict) for nlp.pipe.
Parameters
----------
article_ids : int or list of int
Article(s) to mine.
Yields
------
text : str
The text to mine
metadata : dict
The metadata for the text.
"""
if isinstance(article_ids, int):
article_ids = [article_ids]
df_articles = retrieve_articles(article_ids, self.engine)
for _, row in df_articles.iterrows():
text = row["text"]
article_id = row["article_id"]
section_name = row["section_name"]
paragraph_pos = row["paragraph_pos_in_article"]
metadata = {
"article_id": article_id,
"paragraph_pos_in_article": paragraph_pos,
"paper_id": f"{article_id}:{section_name}:{paragraph_pos}",
}
yield text, metadata
def _mine(self, article_id):
"""Perform one mining task.
Parameters
----------
article_id : int
The article ID for mining.
"""
self.logger.info(f"Processing article_id = {article_id}")
self.logger.debug("Getting all texts for the article")
texts = self._generate_texts_with_metadata(article_id)
self.logger.debug("Running the pipeline")
df_results = run_pipeline(
texts=texts, model_entities=self.model, models_relations={}, debug=True
)
df_results["mining_model_version"] = self.model.meta["version"]
df_results["spacy_version"] = self.model.meta["spacy_version"]
self.logger.debug("Writing results to the SQL database")
with self.engine.begin() as con:
df_results.to_sql(
self.target_table_name, con=con, if_exists="append", index=False
)
self.logger.info(f"Mined {len(df_results)} entities.")
[docs] def clean_up(self):
"""Clean up after task processing has been finished."""
self.logger.info("Finished mining, cleaning up")
self.logger.info(f"I'm proud to have done {self.n_tasks_done} tasks!")
def __str__(self):
"""Represent self as string.
Returns
-------
str
The string representation of self.
"""
return f"{self.__class__.__name__}[{self.name}]"
[docs]class CreateMiningCache:
"""Create SQL database to save results of mining into a cache.
Parameters
----------
database_engine : sqlalchemy.engine.Engine
Connection to the CORD-19 database.
ee_models_paths : dict[str, pathlib.Path]
Dictionary mapping entity type to model path detecting it.
target_table_name : str
The target table name for the mining results.
workers_per_model : int, optional
Number of max processes to spawn to run text mining and table
population in parallel.
"""
def __init__(
self,
database_engine,
ee_models_paths,
target_table_name,
workers_per_model=1,
):
self.logger = logging.getLogger(self.__class__.__name__)
required_tables = ["articles", "sentences"]
for table_name in required_tables:
if not database_engine.has_table(table_name):
raise ValueError(
f"Database at {database_engine.url} does not "
f"contain required table {table_name}."
)
self.engine = database_engine
self.target_table = target_table_name
self.ee_models_paths = ee_models_paths
self.workers_per_model = workers_per_model
[docs] def construct(self):
"""Construct and populate the cache of mined results."""
self.logger.info("Creating target table schema")
self._schema_creation()
self.logger.info("Deleting rows that will be re-populated")
self._delete_rows()
self.logger.info("Starting mining")
self.do_mining()
self.logger.info("Mining complete")
def _delete_rows(self):
"""Delete rows in the target table that will be re-populated."""
for etype in self.ee_models_paths:
# Reformatted due to this bandit bug in python3.8:
# https://github.com/PyCQA/bandit/issues/658
query = ( # nosec
f"DELETE FROM {self.target_table} WHERE entity_type = :etype"
)
self.engine.execute(
sqlalchemy.sql.text(query),
etype=etype,
)
def _schema_creation(self):
"""Create the schemas of the different tables in the database."""
metadata = sqlalchemy.MetaData()
if self.engine.has_table(self.target_table):
self.mining_cache_table = sqlalchemy.Table(
self.target_table, metadata, autoload=True, autoload_with=self.engine
)
return
articles_table = sqlalchemy.Table(
"articles", metadata, autoload=True, autoload_with=self.engine
)
self.mining_cache_table = sqlalchemy.Table(
self.target_table,
metadata,
sqlalchemy.Column("entity", sqlalchemy.Text()),
sqlalchemy.Column("entity_type", sqlalchemy.Text()),
sqlalchemy.Column("property", sqlalchemy.Text()),
sqlalchemy.Column("property_value", sqlalchemy.Text()),
sqlalchemy.Column("property_type", sqlalchemy.Text()),
sqlalchemy.Column("property_value_type", sqlalchemy.Text()),
sqlalchemy.Column("ontology_source", sqlalchemy.Text()),
sqlalchemy.Column("paper_id", sqlalchemy.Text()),
sqlalchemy.Column("start_char", sqlalchemy.Integer()),
sqlalchemy.Column("end_char", sqlalchemy.Integer()),
sqlalchemy.Column(
"article_id",
sqlalchemy.Integer(),
sqlalchemy.ForeignKey(articles_table.c.article_id),
nullable=False,
),
sqlalchemy.Column(
"paragraph_pos_in_article", sqlalchemy.Integer(), nullable=False
),
sqlalchemy.Column(
"mining_model_version", sqlalchemy.Text(), nullable=False
),
sqlalchemy.Column("spacy_version", sqlalchemy.Text(), nullable=False),
)
with self.engine.begin() as connection:
metadata.create_all(connection)
[docs] def create_tasks(self, task_queues, workers_by_queue):
"""Create tasks for the mining workers.
Parameters
----------
task_queues : dict[str or pathlib.Path, multiprocessing.Queue]
Task queues for different models. The keys are the model
paths and the values are the actual queues.
workers_by_queue : dict[str]
All worker processes working on tasks from a given queue.
"""
self.logger.info("Getting all article IDs")
result_proxy = self.engine.execute(
"SELECT article_id FROM articles ORDER BY article_id"
)
all_article_ids = [row["article_id"] for row in result_proxy]
current_task_ids = {queue_name: 0 for queue_name in task_queues}
# We got some new tasks, put them in the task queues.
self.logger.info("Adding new tasks")
# As long as there are any tasks keep trying to add them to the queues
while any(
task_idx < len(all_article_ids) for task_idx in current_task_ids.values()
):
for queue_name, task_queue in task_queues.items():
# Check if still task available for the current queue
current_task_idx = current_task_ids[queue_name]
if current_task_idx == len(all_article_ids):
self.logger.debug(
f"All tasks for the {queue_name} queue have already been added."
)
continue
# Check if there are still workers working on this queue
if not any(
worker.is_alive() for worker in workers_by_queue[queue_name]
):
self.logger.debug("No workers left working on this queue")
current_task_ids[queue_name] = len(all_article_ids)
continue
# Try adding the task to the queue.
article_id = all_article_ids[current_task_idx]
self.logger.debug(
f"Adding article ID {article_id} to the {queue_name} queue"
)
try:
task_queue.put(article_id, timeout=0.5)
current_task_ids[queue_name] += 1
except queue.Full:
self.logger.debug("Queue full, will try next time")
[docs] def do_mining(self):
"""Do the parallelized mining."""
self.logger.info(
f"Starting mining with {self.workers_per_model} workers per model."
)
# Flags to let the workers know there won't be any new tasks.
can_finish: Dict[str, mp.synchronize.Event] = {
etype: mp.Event() for etype in self.ee_models_paths
}
# Prepare the task queues for the workers - one task queue per model.
task_queues: Dict[str, mp.Queue] = {
etype: mp.Queue() for etype in self.ee_models_paths
}
# Spawn the workers according to `workers_per_model`.
self.logger.info("Spawning the worker processes")
worker_processes = []
workers_by_queue: Dict[str, List[mp.Process]] = {
queue_name: [] for queue_name in task_queues
}
for etype, model_path in self.ee_models_paths.items():
for i in range(self.workers_per_model):
worker_name = f"{etype}_{i}"
worker_process = mp.Process(
name=worker_name,
target=Miner.create_and_mine,
kwargs={
"database_url": self.engine.url,
"model_path": model_path,
"target_table": self.target_table,
"task_queue": task_queues[etype],
"can_finish": can_finish[etype],
},
)
worker_process.start()
worker_processes.append(worker_process)
workers_by_queue[etype].append(worker_process)
# Create tasks
self.logger.info("Creating tasks")
self.create_tasks(task_queues, workers_by_queue)
# Monitor the queues and the workers to decide when we're finished.
# For a given model the work is finished when the corresponding queue
# is empty. But it can be that all workers stop/crash before all
# tasks are done. Therefore we need to check if anyone is still
# working on a given queue, and if not empty we will empty it.
while not all(flag.is_set() for flag in can_finish.values()):
for queue_name, task_queue in task_queues.items():
if can_finish[queue_name].is_set():
# This queue is already empty we've let the workers know
continue
if not any(
worker.is_alive() for worker in workers_by_queue[queue_name]
):
self.logger.debug(f"Emptying the {queue_name} queue")
while not task_queue.empty():
article_id = task_queue.get(timeout=1)
self.logger.debug(f"Got non-done task {article_id}")
if task_queue.empty():
self.logger.debug(
f"Setting the can finish flag for the {queue_name} queue."
)
can_finish[queue_name].set()
self.logger.info("Closing all task queues")
for queue_name, task_queue in task_queues.items():
self.logger.debug(f"Closing the reading end of the queue {queue_name}")
# Note that this is only safe when the queue is empty. This is
# because there's a background thread putting buffered data
# in the queue. If the queue is not empty it might be that the
# background thread is still transferring the data from the
# buffer. Closing the reading end of the internal pipe actually
# also closes the writing end, and therefore the background
# thread will throw a BrokenPipeError as it will fail to write
# to the closed pipe.
task_queue.close()
self.logger.debug(f"Joining the buffering thread of queue {queue_name}")
task_queue.join_thread()
# Wait for the processes to finish.
self.logger.info("No more new tasks, just waiting for the workers to finish")
# We'll transfer finished workers from `worker_processes`
# to `finished_workers`. We're done when `worker_processes` is empty.
finished_workers: List[mp.Process] = []
while len(worker_processes) > 0:
self.logger.debug(
f"Status: {len(worker_processes)} workers still alive, "
f"{len(finished_workers)} finished."
)
# Loop through all living workers and try to join
for process in worker_processes:
# Don't need to wait forever - others might finish before
process.join(timeout=1.0)
# If the current process did finish then put it in the
# `finished_workers` queue and do some cleaning up.
if not process.is_alive():
self.logger.info(f"Worker {process.name} finished.")
finished_workers.append(process)
if process.exitcode != 0:
self.logger.error(
f"Worker {process.name} terminated with exit "
f"code {process.exitcode}!"
)
# Remove all workers that are already in the `finished_workers`
# list from the `worker_processes` list.
for process in finished_workers:
if process in worker_processes:
worker_processes.remove(process)
self.logger.info("Finished mining.")
# Create index on (article_id, paragraph_pos_in_article, start_char)
# to speed up ORDER BY clause.
self.logger.info("Start creating index on (par, art, char)...")
sqlalchemy.Index(
"index_art_par_char",
self.mining_cache_table.c.article_id,
self.mining_cache_table.c.paragraph_pos_in_article,
self.mining_cache_table.c.start_char,
).create(bind=self.engine)
self.logger.info("Done creating index on (par, art, char).")