"""The annotator module"""
import os
import tempfile
from importlib import resources
from json import loads, decoder, load
from typing import Union
from requests import get, exceptions
import pandas as pd
from lxml import etree
from daiquiri import getLogger
# FIXME: Refactor to use geoenv (https://github.com/clnsmth/geoenv)
# from geoenvo.resolver import Resolver
# from geoenvo.geometry import Geometry
from spinneret.workbook import (
delete_annotations,
initialize_workbook_row,
get_package_id,
get_package_url,
get_subject_and_context,
get_description,
delete_duplicate_annotations,
)
from spinneret.utilities import (
load_eml,
load_workbook,
write_workbook,
write_eml,
expand_curie,
get_elements_for_predicate,
get_template_for_predicate,
get_predicate_id_for_predicate,
)
# FIXME: Refactor to use geoenv (https://github.com/clnsmth/geoenv)
from spinneret.eml import get_geographic_coverage
logger = getLogger(__name__)
# pylint: disable=too-many-lines
# pylint: disable=too-many-locals
# pylint: disable=too-many-positional-arguments
[docs]def get_bioportal_annotation(
text: str,
api_key: str,
ontologies: str,
semantic_types: str = "",
expand_semantic_types_hierarchy: str = "false",
expand_class_hierarchy: str = "false",
class_hierarchy_max_level: int = 0,
expand_mappings: str = "false",
stop_words: str = "",
minimum_match_length: int = 3,
exclude_numbers: str = "false",
whole_word_only: str = "true",
exclude_synonyms: str = "false",
longest_only: str = "false",
) -> Union[list, None]:
"""Get an annotation from the BioPortal API
:param text: The text to be annotated.
:param api_key: The BioPortal API key.
:param ontologies: The ontologies to use for annotation.
:param semantic_types: The semantic types to use for annotation.
:param expand_semantic_types_hierarchy: true means to use the semantic
types passed in the "semantic_types" parameter as well as all their
immediate children. false means to use ONLY the semantic types passed
in the "semantic_types" parameter.
:param expand_class_hierarchy: used only in conjunction with
"class_hierarchy_max_level" parameter; determines whether or not to
include ancestors of the given class when performing an annotation.
:param class_hierarchy_max_level: the depth of the hierarchy to use when
performing an annotation.
:param expand_mappings: true means that the following manual mappings will
be used in annotation: UMLS, REST, CUI, OBOXREF.
:param stop_words: a comma-separated list of words to ignore in the text.
:param minimum_match_length: the minimum number of characters in a term
that must be matched in the text.
:param exclude_numbers: true means to exclude numbers from annotation.
:param whole_word_only: true means to match whole words only.
:param exclude_synonyms: true means to exclude synonyms from annotation.
:param longest_only: true means that only the longest match for a given
phrase will be returned.
:returns: A list of dictionaries, each with the annotation keys `label`
and `uri`, corresponding to the preferred label and URI of the
annotated concept. None if the request fails.
:notes: This function is a wrapper for the BioPortal API. The BioPortal API
is a repository of biomedical ontologies with a RESTful API that allows
users to annotate text with ontology concepts. The API is documented at
https://data.bioontology.org/documentation#nav_annotator.
This function requires an API key from BioPortal. To obtain an API key,
users must register at https://bioportal.bioontology.org/account. The
key can be loaded as an environment variable from the configuration
file (see `utilities.load_configuration`).
"""
logger.info(f"Text contains {len(text.split())} words")
# Construct the query
url = "https://data.bioontology.org/annotator"
payload = {
"text": text,
"apikey": api_key,
"ontologies": ontologies,
"semantic_types": semantic_types,
"expand_semantic_types_hierarchy": expand_semantic_types_hierarchy,
"expand_class_hierarchy": expand_class_hierarchy,
"class_hierarchy_max_level": class_hierarchy_max_level,
"expand_mappings": expand_mappings,
"stop_words": stop_words,
"minimum_match_length": minimum_match_length,
"exclude_numbers": exclude_numbers,
"whole_word_only": whole_word_only,
"exclude_synonyms": exclude_synonyms,
"longest_only": longest_only,
"page_size": 100, # to circumvent pagination
"format": "json", # being explicit here, even though it's the default
}
# Get annotations
try:
r = get(url, params=payload, timeout=10)
r.raise_for_status()
except exceptions.RequestException as e:
logger.error(f"Error calling https://data.bioontology.org/annotator: {e}")
return None
# Parse the results
annotations = []
for item in r.json():
self_link = item.get("annotatedClass", {}).get("links").get("self", None)
try:
r = get(self_link, params={"apikey": api_key}, timeout=10)
r.raise_for_status()
except exceptions.RequestException as e:
logger.error(f"Error calling {self_link}: {e}")
return None
uri = r.json().get("@id", None)
label = r.json().get("prefLabel", None)
annotations.append({"label": label, "uri": uri})
return annotations
# pylint: disable=too-many-positional-arguments
[docs]def annotate_workbook(
workbook_path: str,
eml_path: str,
output_path: str,
local_model: str = None,
temperature: Union[float, None] = None,
return_ungrounded: bool = False,
sample_size: int = 1,
) -> None:
"""Annotate a workbook with automated annotation
:param workbook_path: The path to the workbook to be annotated
corresponding to the EML file.
:param eml_path: The path to the EML file corresponding to the workbook.
:param output_path: The path to write the annotated workbook.
:param local_model: See `get_ontogpt_annotation` documentation for details.
:param temperature: The temperature parameter for the model. If None, the
OntoGPT default will be used.
:param return_ungrounded: See `get_ontogpt_annotation` documentation for
details.
:param sample_size: Executes multiple replicates of the annotation request
to reduce variability of outputs. Variability is inherent in OntoGPT.
:returns: None
:notes: The workbook is annotated by annotators best suited for the XPaths
in the EML file. The annotated workbook is written back to the same
path as the original workbook.
"""
logger.info(f"Annotating workbook {workbook_path}")
# Ensure the workbook and eml file match to avoid errors
pid = os.path.basename(workbook_path).split("_")[0]
eml_file = pid + ".xml"
if eml_file not in eml_path:
logger.warning(f"EML file {eml_file} does not match workbook {workbook_path}")
return None
# Load the workbook and EML for processing
wb = load_workbook(workbook_path)
eml = load_eml(eml_path)
# Run workbook annotator, results of one are used as input for the next
predicates = [
"contains measurements of type",
"contains process",
"env_broad_scale",
"env_local_scale",
"environmental material",
"research topic",
"usesMethod",
]
for p in predicates:
wb = add_predicate_annotations_to_workbook(
predicate=p,
workbook=wb,
eml=eml,
local_model=local_model,
temperature=temperature,
return_ungrounded=return_ungrounded,
sample_size=sample_size,
)
wb = add_qudt_annotations_to_workbook(wb, eml)
write_workbook(wb, output_path)
return None
[docs]def annotate_eml(
eml: Union[str, etree._ElementTree],
workbook: Union[str, pd.core.frame.DataFrame],
output_path: str = None,
) -> etree._ElementTree:
"""Annotate an EML file with terms from the corresponding workbook
:param eml: Either the path to the EML file corresponding to the
`workbook`, or the EML file itself as an lxml etree.
:param workbook: Either the path to the workbook corresponding to the
`eml`, or the workbook itself as a pandas DataFrame.
:param output_path: The path to write the annotated EML file.
:returns: The annotated EML file as an lxml etree.
:notes: The EML file is annotated with terms from the corresponding
workbook. Terms from the workbook are added even if they are already
present in the EML file.
"""
# Load the EML and workbook for processing
eml = load_eml(eml)
wb = load_workbook(workbook)
# Iterate over workbook rows and annotate the EML
for _, row in wb.iterrows():
# Only annotate if required components are present
if (
not pd.isnull(row["predicate"])
and not pd.isnull(row["predicate_id"])
and not pd.isnull(row["object"])
and not pd.isnull(row["object_id"])
):
# Skip if the object_id is an ungrounded concept from OntoGPT.
# These are not valid annotations.
if row["object_id"].startswith("AUTO:"):
continue
# Create the annotation element
annotation = create_annotation_element(
predicate_label=row["predicate"],
predicate_id=row["predicate_id"],
object_label=row["object"],
object_id=row["object_id"],
)
# Insert the annotation
if row["element"] == "dataset":
# Insert the annotation before the required contact element,
# and any optional elements preceding the contact element, to
# correctly locate dataset level annotations according to the
# EML schema.
root = eml.getroot()
dataset = root.find(".//dataset")
if dataset.find("purpose"):
reference_element = dataset.find("purpose")
elif dataset.find("introduction"):
reference_element = dataset.find("introduction")
elif dataset.find("gettingStarted"):
reference_element = dataset.find("gettingStarted")
elif dataset.find("acknowledgements"):
reference_element = dataset.find("acknowledgements")
elif dataset.find("maintenance"):
reference_element = dataset.find("maintenance")
else:
reference_element = dataset.find("contact")
dataset.insert(dataset.index(reference_element), annotation)
elif row["element"] == "attribute":
# Convert absolute XPath to relative path to avoid errors
attribute_xpath = row["element_xpath"].replace("/eml:eml", "./")
# Insert the annotation at the end of the attribute list.
root = eml.getroot()
attribute = root.find(attribute_xpath)
attribute.insert(len(attribute) + 1, annotation)
if output_path:
write_eml(eml, output_path)
return eml
[docs]def create_annotation_element(predicate_label, predicate_id, object_label, object_id):
"""Create an EML annotation element
:param predicate_label: The predicate label of the annotation.
:param predicate_id: The URI of the predicate.
:param object_label: The object label of the annotation.
:param object_id: The URI of the object.
"""
annotation_elem = etree.Element("annotation")
property_uri_elem = etree.SubElement(annotation_elem, "propertyURI")
property_uri_elem.attrib["label"] = predicate_label
property_uri_elem.text = predicate_id
value_uri_elem = etree.SubElement(annotation_elem, "valueURI")
value_uri_elem.attrib["label"] = object_label
value_uri_elem.text = object_id
return annotation_elem
[docs]def get_qudt_annotation(text: str) -> Union[list, None]:
"""Get an annotation from the QUDT API
:param text: The text to be annotated. This should be the value from the
EML `standardUnit` or `customUnit` element.
:returns: A list of dictionaries, each with the annotation keys `label`
and `uri`, corresponding to the preferred label and URI of the
annotated concept. None if the request fails.
:notes: This function queries the Unit Annotations Service
https://vocab.lternet.edu/unitsws.html, developed by the EDI and LTER
units working group, for a match of the input `text` to a QUDT unit via
the service mapping.
"""
url = (
f"https://vocab.lternet.edu/webservice/unitsws.php?rawunit={text}&"
f"returntype=json"
)
try:
r = get(url, timeout=10)
r.raise_for_status()
except exceptions.RequestException as e:
logger.error(f"Error calling {url}: {e}")
return None
if r.text == "No_Match":
return None
try: # the service has a few JSON encoding bugs
json = loads(r.text)
except decoder.JSONDecodeError as e:
logger.error(f"Error decoding JSON from {url}: {e}")
return None
label = json["qudtLabel"]
uri = json["qudtURI"]
return [{"label": label, "uri": uri}]
[docs]def add_qudt_annotations_to_workbook(
workbook: Union[str, pd.core.frame.DataFrame],
eml: Union[str, etree._ElementTree],
output_path: str = None,
overwrite: bool = False,
) -> pd.core.frame.DataFrame:
"""
:param workbook: Either the path to the workbook to be annotated, or the
workbook itself as a pandas DataFrame.
:param eml: Either the path to the EML file corresponding to the workbook,
or the EML file itself as an lxml etree.
:param output_path: The path to write the annotated workbook.
:param overwrite: If True, overwrite existing `QUDT` annotations in the
`workbook, so a fresh set may be created.
:returns: Workbook with QUDT annotations.
"""
logger.info("Annotating units")
# Parameters for the function
predicate = "uses standard"
# Load the workbook and EML for processing
wb = load_workbook(workbook)
eml = load_eml(eml)
# Remove existing QUDT annotations if overwrite is True, using a set of
# criteria that accurately define the annotations to remove.
if overwrite:
wb = delete_annotations(
workbook=wb,
criteria={
"element": "attribute",
"object_id": "http://qudt.org/vocab/unit/",
"author": "spinneret.annotator.get_qudt_annotation",
},
)
# Iterate over EML units and add QUDT annotations to the workbook
units = eml.xpath("//standardUnit") + eml.xpath("//customUnit")
for unit in units:
attribute_element = unit.xpath("ancestor::attribute[1]")[0]
attribute_xpath = eml.getpath(attribute_element)
attribute_description = get_description(attribute_element)
# Skip if this element already has an annotation in the workbook, to
# prevent duplicate annotations from being added.
if has_annotation(wb, attribute_xpath, predicate):
return wb
# Reuse existing annotations for elements with identical tag names,
# descriptions, and predicate labels, to reduce redundant processing.
# Note this assumes semantic equivalence between elements with matching
# tags and descriptions.
annotations = get_annotation_from_workbook(
workbook=wb,
element=attribute_element.tag,
description=attribute_description,
predicate=predicate,
)
if annotations is None:
# Get the QUDT annotation
annotations = get_qudt_annotation(unit.text)
if annotations is not None:
for annotation in annotations:
row = initialize_workbook_row()
row["package_id"] = get_package_id(eml)
row["url"] = get_package_url(eml)
row["element"] = attribute_element.tag
if "id" in attribute_element.attrib:
row["element_id"] = attribute_element.attrib["id"]
else:
row["element_id"] = pd.NA
row["element_xpath"] = attribute_xpath
row["context"] = get_subject_and_context(attribute_element)["context"]
row["description"] = attribute_description
row["subject"] = get_subject_and_context(attribute_element)["subject"]
row["predicate"] = predicate
row["predicate_id"] = (
"http://ecoinformatics.org/oboe/oboe.1.2/oboe-core.owl#usesStandard"
)
row["object"] = annotation["label"]
row["object_id"] = annotation["uri"]
row["author"] = "spinneret.annotator.get_qudt_annotation"
row["date"] = pd.Timestamp.now()
row = pd.DataFrame([row], dtype=str)
wb = pd.concat([wb, row], ignore_index=True)
wb = delete_duplicate_annotations(wb)
if output_path:
write_workbook(wb, output_path)
return wb
[docs]def get_ontogpt_annotation(
text: str,
template: str,
local_model: str = None,
temperature: Union[float, None] = None,
return_ungrounded: bool = False,
) -> Union[list, None]:
"""
:param text: The text to be annotated.
:param template: Name of OntoGPT template to use for grounding. Available
templates are in src/data/ontogpt/templates. Omit the file extension.
:param local_model: The local language model to use (e.g. `llama3.2`). This
should be one of the options available from `ollama` (see
https://ollama.com/library) and should be installed locally. If `None`,
the configured remote model will be used. See the OntoGPT documentation
for more information.
:param temperature: The temperature parameter for the model. If `None`, the
OntoGPT default will be used.
:param return_ungrounded: If True, return ungrounded annotations. These
may be useful in identifying potential concepts to add to a vocabulary,
or to identify concepts that a human curator may be capable of
grounding.
:returns: A list of dictionaries, each with the annotation keys `label`
and `uri`. None if the request fails or no annotations are found.
:notes: This function is a wrapper for the OntoGPT API. Set up of OntoGPT
is required to use this function. For more information, see:
https://monarch-initiative.github.io/ontogpt/.
"""
logger.info(f"Text contains {len(text.split())} words")
# OntoGPT transacts in files, so we write the input text to a temporary
# file and receive the results as a JSON file. Once the results are parsed
# we can discard the files.
with tempfile.TemporaryDirectory() as temp_dir:
input_file = os.path.join(temp_dir, "input.txt")
with open(input_file, "w", encoding="utf-8") as f:
f.write(text)
template_file = resources.files("spinneret.data.ontogpt.templates").joinpath(
f"{template}.yaml"
)
output_file = os.path.join(temp_dir, "output.json")
# Call OntoGPT
cmd = (
f"ontogpt extract -i {input_file} -t {template_file} "
f"--output-format json -o {output_file}"
)
if local_model is not None:
cmd += f" -m ollama_chat/{local_model}"
if temperature is not None:
cmd += f" --temperature {temperature}"
try:
# Clear the cache so that the model can derive new annotations
cache_path = os.getcwd() + "/.litellm_cache"
os.system(f"rm -rf {cache_path}")
os.system(cmd)
except Exception as e: # pylint: disable=broad-exception-caught
logger.error(f"Error calling OntoGPT: {e}")
return None
# Parse the results
try: # Occasionally, no file is returned. This is a bug in OntoGPT.
with open(output_file, "r", encoding="utf-8") as f:
r = load(f)
except FileNotFoundError as e:
logger.error(f"Error reading OntoGPT output file: {e}")
return None
named_entities = r.get("named_entities")
if named_entities is None: # OntoGPT couldn't find any annotations
return None
annotations = []
for item in named_entities:
uri = item.get("id")
label = item.get("label")
ungrounded = uri.startswith("AUTO:")
if ungrounded and not return_ungrounded:
continue
uri = expand_curie(uri)
annotations.append({"label": label, "uri": uri})
return annotations
# pylint: disable=too-many-positional-arguments
[docs]def add_predicate_annotations_to_workbook(
predicate: str,
workbook: Union[str, pd.core.frame.DataFrame],
eml: Union[str, etree._ElementTree],
output_path: str = None,
overwrite: bool = False,
local_model: str = None,
temperature: Union[float, None] = None,
return_ungrounded: bool = False,
sample_size: int = 1,
) -> pd.core.frame.DataFrame:
"""
:param predicate: The predicate label for the annotation. This guides the
annotation process with which OntoGPT template to use. The options are:
`contains measurements of type`, `contains process`, `env_broad_scale`,
`env_local_scale`, `environmental material`, `research topic`,
`usesMethod`, `uses standard`.
:param workbook: Either the path to the workbook to be annotated, or the
workbook itself as a pandas DataFrame.
:param eml: Either the path to the EML file corresponding to the workbook,
or the EML file itself as an lxml etree.
:param output_path: The path to write the annotated workbook.
:param overwrite: If True, overwrite existing annotations in the workbook,
so a fresh set may be created. Only annotations with the same predicate
as the `predicate` input will be removed.
:param local_model: See `get_ontogpt_annotation` documentation for details.
:param temperature: The temperature parameter for the model. If `None`, the
OntoGPT default will be used.
:param return_ungrounded: See `get_ontogpt_annotation` documentation for
details.
:param sample_size: Executes multiple replicates of the annotation request
to reduce variability of outputs. Variability is inherent in OntoGPT.
:returns: Workbook with predicate annotations.
:notes: This function retrieves annotations using OntoGPT, except for the
`uses standard` which uses a deterministic method. OntoGPT requires
setup and configuration described in the `get_ontogpt_annotation`
function.
"""
# Load the workbook and EML for processing
wb = load_workbook(workbook)
eml = load_eml(eml)
# Annotate for each element in the set that matches the predicate
elements = get_elements_for_predicate(eml, predicate)
for element in elements:
logger.info(f"Annotating {predicate}")
# Parameters for use below
element_tag = element.tag
element_description = get_description(element)
element_xpath = eml.getpath(element)
template = get_template_for_predicate(predicate)
predicate_id = get_predicate_id_for_predicate(predicate)
author = "spinneret.annotator.get_ontogpt_annotation"
# Remove existing annotations if instructed to do so
if overwrite:
wb = delete_annotations(
workbook=wb,
criteria={
"element": element_tag,
"element_xpath": element_xpath,
"predicate": predicate,
"author": author,
},
)
# Skip if this element already has an annotation in the workbook, to:
# prevent duplicate annotations, and to allow for resuming annotation
# of a partially annotated workbook.
if has_annotation(wb, element_xpath, predicate):
return wb
# Reuse existing annotations for elements with identical tag names,
# descriptions, and predicate labels, to reduce redundant processing.
# Note this assumes semantic equivalence between elements with matching
# tags and descriptions, which is generally true.
annotations = get_annotation_from_workbook(
workbook=wb,
element=element_tag,
description=element_description,
predicate=predicate,
)
if annotations is None:
# Get the annotations
annotations = []
for _ in range(sample_size):
res = get_ontogpt_annotation(
text=element_description,
template=template,
local_model=local_model,
temperature=temperature,
return_ungrounded=return_ungrounded,
)
if res is not None:
annotations.extend(res)
if len(annotations) == 0:
annotations = None
# Add annotations to the workbook
if annotations is not None:
for annotation in annotations:
row = initialize_workbook_row()
row["package_id"] = get_package_id(eml)
row["url"] = get_package_url(eml)
row["element"] = element_tag
if "id" in element.attrib:
row["element_id"] = element.attrib["id"]
else:
row["element_id"] = pd.NA
row["element_xpath"] = eml.getpath(element)
row["context"] = get_subject_and_context(element)["context"]
row["description"] = element_description
row["subject"] = get_subject_and_context(element)["subject"]
row["predicate"] = predicate
row["predicate_id"] = predicate_id
row["object"] = annotation["label"]
row["object_id"] = annotation["uri"]
row["author"] = author
row["date"] = pd.Timestamp.now()
row = pd.DataFrame([row], dtype=str)
wb = pd.concat([wb, row], ignore_index=True)
wb = delete_duplicate_annotations(wb)
if output_path:
write_workbook(wb, output_path)
return wb
[docs]def get_annotation_from_workbook(
workbook: Union[str, pd.core.frame.DataFrame],
element: str,
description: str,
predicate: str,
) -> Union[list, None]:
"""
:param workbook: Either the path to the workbook to be annotated, or the
workbook itself as a pandas DataFrame.
:param element: The element to retrieve annotations for.
:param description: The description of the element to retrieve annotations
for.
:param predicate: The predicate to retrieve annotations for.
:returns: A list of dictionaries, each with the annotation keys
`label` (same as `object` column in workbook), `uri` (same as
`object_id` column in workbook). None if no annotations are found for
the given element name.
:notes: This function returns existing annotations from the workbook if
the `element`, `description`, and `predicate` match, and the `object`
and `object_id` are not empty. This is useful when one or more data
entities have several attributes of different names but the same
meaning.
"""
wb = load_workbook(workbook)
matching_rows = (
(wb["element"] == element)
& (wb["description"] == description)
& (wb["predicate"] == predicate)
& (wb["object"].notna())
& (wb["object_id"].notna())
)
rows = wb[matching_rows].to_dict(orient="records")
res = []
if rows:
for row in rows:
row = {k: row[k] for k in ["object", "object_id"]}
# Currently, workbook annotators reference the object as "label"
# and the object_id as "uri", so we rename them here.
row["label"] = row.pop("object")
row["uri"] = row.pop("object_id")
res.append(row)
return res
return None
[docs]def has_annotation(
workbook: Union[str, pd.core.frame.DataFrame], element_xpath: str, predicate: str
) -> bool:
"""
:param workbook: Either the path to the workbook to be annotated, or the
workbook itself as a pandas DataFrame.
:param element_xpath: The XPath of the element to check for annotations.
:param predicate: The predicate to check for annotations.
:returns: True if the `workbook` contains an `element_xpath` that has an
annotation for the given `predicate`. False otherwise.
"""
wb = load_workbook(workbook)
matching_rows = (
(wb["element_xpath"] == element_xpath)
& (wb["predicate"] == predicate)
& wb["predicate_id"].notna()
& wb["object"].notna()
& wb["object_id"].notna()
)
return bool(matching_rows.any())
# FIXME: Refactor to use geoenv (https://github.com/clnsmth/geoenv)
# def get_geoenv_response_data(eml: str, data_sources: list) -> List[dict]:
# """
# Get `geoenvo` response data for each Geographic Coverage in an EML file. The
# data is the raw JSON response from the `geoenvo` resolver, which includes
# environmental properties and the data source used to resolve them. This
# raw data can be further processed to extract specific properties of
# interest.
#
# :param eml: Path to the EML metadata document in XML format.
# :param data_sources: A list of geoenvo data sources to use for resolution.
# :return: A list of JSON values returned by the geoenvo.Resolver.resolve
# method.
# """
# # Initialize the resolver
# resolver = Resolver(data_sources)
#
# # Get the list of GeographicCoverage objects
# geographic_coverages = get_geographic_coverage(eml)
# identifier = get_package_id(load_eml(eml))
#
# # Resolve the environments
# environments = []
# if geographic_coverages:
# for gc in geographic_coverages:
# geojson = gc.to_geojson_geometry()
# if geojson is None: # geographicCoverage has ID references
# continue
# geometry = Geometry(loads(geojson))
# response = resolver.resolve(
# geometry, identifier=identifier, description=gc.description()
# )
# environments.append(response.data)
# return environments