diff --git a/api/share/utils.py b/api/share/utils.py index 1178adfa85d..5083220ce7f 100644 --- a/api/share/utils.py +++ b/api/share/utils.py @@ -2,14 +2,9 @@ SHARE/Trove accepts metadata records as "indexcards" in turtle format: https://www.w3.org/TR/turtle/ """ -from functools import partial from http import HTTPStatus import logging -import random -from urllib.parse import urljoin -import uuid -from celery.exceptions import Retry from django.apps import apps import requests @@ -17,7 +12,6 @@ from framework.celery_tasks.handlers import enqueue_task from framework.encryption import ensure_bytes from framework.sentry import log_exception -from osf import models as osf_db from osf.metadata.osf_gathering import ( OsfmapPartition, pls_get_magic_metadata_basket, @@ -33,10 +27,6 @@ def shtrove_ingest_url(): return f'{settings.SHARE_URL}trove/ingest' -def sharev2_push_url(): - return f'{settings.SHARE_URL}api/v2/normalizeddata/' - - def is_qa_resource(resource): """ QA puts tags and special titles on their project to stop them from appearing in the search results. This function @@ -70,8 +60,6 @@ def _enqueue_update_share(osfresource): logger.warning(f'update_share skipping resource that has no guids: {osfresource}') return enqueue_task(task__update_share.s(_osfguid_value)) - if isinstance(osfresource, (osf_db.AbstractNode, osf_db.Preprint)): - enqueue_task(async_update_resource_share.s(_osfguid_value)) @celery_app.task( @@ -212,398 +200,3 @@ def _next_osfmap_partition(partition: OsfmapPartition) -> OsfmapPartition | None return OsfmapPartition.MONTHLY_SUPPLEMENT case _: return None - - -### -# BEGIN soon-to-be-deleted (🤞) legacy sharev2 push -# (until dust has settled on iri-centric (rdf-based) search) -"""Utilities for pushing metadata to SHARE - -SHARE uses a specific dialect of JSON-LD that could/should have been -an internal implementation detail, but for historical reasons OSF must -be aware of it in order to push metadata updates to SHARE -- hopefully, -that awareness is contained entirely within this file. - -WARNING: In this context, "graph node" does NOT have anything to do with -OSF's `Node` model, but instead refers to a "node object" within a JSON-LD -graph, as defined at https://www.w3.org/TR/json-ld11/#dfn-node-object - -Each graph node must contain '@id' and '@type', plus other key/value pairs -according to the "SHARE schema": -https://github.com/CenterForOpenScience/SHARE/blob/develop/share/schema/schema-spec.yaml - -In this case, '@id' will always be a "blank" identifier, which begins with '_:' -and is used only to define relationships between nodes in the graph -- nodes -may reference each other with @id/@type pairs -- -e.g. {'@id': '...', '@type': '...'} - -Example serialization: The following SHARE-style JSON-LD document represents a -preprint with one "creator" and one identifier -- the graph contains nodes for -the preprint, person, and identifier, plus another node representing the -"creator" relationship between the preprint and person: -``` -{ - 'central_node_id': '_:foo', - '@graph': [ - { - '@id': '_:foo', - '@type': 'preprint', - 'title': 'This is a preprint!', - }, - { - '@id': '_:bar', - '@type': 'workidentifier', - 'uri': 'https://osf.io/foobar/', - 'creative_work': {'@id': '_:foo', '@type': 'preprint'} - }, - { - '@id': '_:baz', - '@type': 'person', - 'name': 'Magpie Jones' - }, - { - '@id': '_:qux', - '@type': 'creator', - 'creative_work': {'@id': '_:foo', '@type': 'preprint'}, - 'agent': {'@id': '_:baz', '@type': 'person'} - } - ] -} -``` -""" - - -class GraphNode: - """Utility class for building a JSON-LD graph suitable for pushing to SHARE - - WARNING: In this context, "graph node" does NOT have anything to do with - OSF's `Node` model, but instead refers to a "node object" within a JSON-LD - graph, as defined at https://www.w3.org/TR/json-ld11/#dfn-node-object - """ - - @staticmethod - def serialize_graph(central_graph_node, all_graph_nodes): - """Serialize the mess of GraphNodes to a JSON-friendly dictionary - :param central_graph_node: the GraphNode for the preprint/node/registration - this graph is most "about" - :param all_graph_nodes: list of GraphNodes to include -- will also recursively - look for and include GraphNodes contained in attrs - """ - to_visit = [central_graph_node, *all_graph_nodes] # make a copy of the list - visited = set() - while to_visit: - n = to_visit.pop(0) - if n not in visited: - visited.add(n) - to_visit.extend(n.get_related()) - - return { - 'central_node_id': central_graph_node.id, - '@graph': [node.serialize() for node in visited], - } - - @property - def ref(self): - return {'@id': self.id, '@type': self.type} - - def __init__(self, type_, **attrs): - self.id = f'_:{uuid.uuid4()}' - self.type = type_.lower() - self.attrs = attrs - - def get_related(self): - for value in self.attrs.values(): - if isinstance(value, GraphNode): - yield value - elif isinstance(value, list): - yield from value - - def serialize(self): - ser = {} - for key, value in self.attrs.items(): - if isinstance(value, GraphNode): - ser[key] = value.ref - elif isinstance(value, list) or value in (None, '', {}): - continue - else: - ser[key] = value - - return dict(self.ref, **ser) - - -def format_user(user): - person = GraphNode( - 'person', **{ - 'name': user.fullname, - 'suffix': user.suffix, - 'given_name': user.given_name, - 'family_name': user.family_name, - 'additional_name': user.middle_names, - }, - ) - - person.attrs['identifiers'] = [GraphNode('agentidentifier', agent=person, uri=user.absolute_url)] - - if user.external_identity.get('ORCID') and list(user.external_identity['ORCID'].values())[0] == 'VERIFIED': - person.attrs['identifiers'].append(GraphNode('agentidentifier', agent=person, uri=list(user.external_identity['ORCID'].keys())[0])) - - person.attrs['related_agents'] = [GraphNode('isaffiliatedwith', subject=person, related=GraphNode('institution', name=institution.name)) for institution in user.get_affiliated_institutions()] - - return person - - -def format_bibliographic_contributor(work_node, user, index): - return GraphNode( - 'creator', - agent=format_user(user), - order_cited=index, - creative_work=work_node, - cited_as=user.fullname, - ) - - -def format_subject(subject, context=None): - if context is None: - context = {} - if subject is None: - return None - if subject.id in context: - return context[subject.id] - context[subject.id] = GraphNode( - 'subject', - name=subject.text, - uri=subject.absolute_api_v2_url, - ) - context[subject.id].attrs['parent'] = format_subject(subject.parent, context) - context[subject.id].attrs['central_synonym'] = format_subject(subject.bepress_subject, context) - return context[subject.id] - - -def send_share_json(resource, data): - """POST metadata to SHARE, using the provider for the given resource. - """ - if getattr(resource, 'provider') and resource.provider.access_token: - access_token = resource.provider.access_token - else: - access_token = settings.SHARE_API_TOKEN - - return requests.post( - sharev2_push_url(), - json=data, - headers={ - 'Authorization': f'Bearer {access_token}', - 'Content-Type': 'application/vnd.api+json', - }, - ) - - -def serialize_share_data(resource, old_subjects=None): - """Build a request payload to send Node/Preprint/Registration metadata to SHARE. - :param resource: either a Node, Preprint or Registration - :param old_subjects: - - :return: JSON-serializable dictionary of the resource's metadata, good for POSTing to SHARE - """ - from osf.models import ( - Node, - DraftNode, - Preprint, - Registration, - ) - suid = None - if isinstance(resource, Preprint): - # old_subjects is only used for preprints and should be removed as soon as SHARE - # is fully switched over to the non-mergy pipeline (see ENG-2098) - serializer = partial(serialize_preprint, old_subjects=old_subjects) - suid = resource.get_guid()._id - elif isinstance(resource, Node): - serializer = serialize_osf_node - elif isinstance(resource, Registration): - serializer = serialize_registration - elif isinstance(resource, DraftNode): - return {} - else: - raise NotImplementedError() - - return { - 'data': { - 'type': 'NormalizedData', - 'attributes': { - 'tasks': [], - 'raw': None, - 'suid': resource._id if not suid else suid, - 'data': serializer(resource), - }, - }, - } - - -def serialize_preprint(preprint, old_subjects=None): - if old_subjects is None: - old_subjects = [] - from osf.models import Subject - old_subjects = [Subject.objects.get(id=s) for s in old_subjects] - preprint_graph = GraphNode( - preprint.provider.share_publish_type, **{ - 'title': preprint.title, - 'description': preprint.description or '', - 'is_deleted': ( - (not preprint.verified_publishable and not preprint.is_retracted) - or preprint.is_spam - or preprint.is_deleted - or is_qa_resource(preprint) - ), - 'date_updated': preprint.modified.isoformat(), - 'date_published': preprint.date_published.isoformat() if preprint.date_published else None, - }, - ) - to_visit = [ - preprint_graph, - GraphNode('workidentifier', creative_work=preprint_graph, uri=urljoin(settings.DOMAIN, preprint._id + '/')), - ] - - doi = preprint.get_identifier_value('doi') - if doi: - to_visit.append(GraphNode('workidentifier', creative_work=preprint_graph, uri=f'{settings.DOI_URL_PREFIX}{doi}')) - - if preprint.provider.domain_redirect_enabled: - to_visit.append(GraphNode('workidentifier', creative_work=preprint_graph, uri=preprint.absolute_url)) - - if preprint.article_doi: - # Article DOI refers to a clone of this preprint on another system and therefore does not qualify as an identifier for this preprint - related_work = GraphNode('creativework') - to_visit.append(GraphNode('workrelation', subject=preprint_graph, related=related_work)) - to_visit.append(GraphNode('workidentifier', creative_work=related_work, uri=f'{settings.DOI_URL_PREFIX}{preprint.article_doi}')) - - preprint_graph.attrs['tags'] = [ - GraphNode('throughtags', creative_work=preprint_graph, tag=GraphNode('tag', name=tag)) - for tag in preprint.tags.values_list('name', flat=True) if tag - ] - - current_subjects = [ - GraphNode('throughsubjects', creative_work=preprint_graph, is_deleted=False, subject=format_subject(s)) - for s in preprint.subjects.all() - ] - deleted_subjects = [ - GraphNode('throughsubjects', creative_work=preprint_graph, is_deleted=True, subject=format_subject(s)) - for s in old_subjects if not preprint.subjects.filter(id=s.id).exists() - ] - preprint_graph.attrs['subjects'] = current_subjects + deleted_subjects - - to_visit.extend(format_bibliographic_contributor(preprint_graph, user, i) for i, user in enumerate(preprint.visible_contributors)) - - return GraphNode.serialize_graph(preprint_graph, to_visit) - -def format_node_lineage(child_osf_node, child_graph_node): - parent_osf_node = child_osf_node.parent_node - if not parent_osf_node: - return [] - parent_graph_node = GraphNode('registration', title=parent_osf_node.title) - return [ - parent_graph_node, - GraphNode('workidentifier', creative_work=parent_graph_node, uri=urljoin(settings.DOMAIN, parent_osf_node.url)), - GraphNode('ispartof', subject=child_graph_node, related=parent_graph_node), - *format_node_lineage(parent_osf_node, parent_graph_node), - ] - -def serialize_registration(registration): - return serialize_osf_node( - registration, - additional_attrs={ - 'date_published': registration.registered_date.isoformat() if registration.registered_date else None, - 'registration_type': registration.registered_schema.first().name if registration.registered_schema.exists() else None, - 'justification': registration.retraction.justification if registration.retraction else None, - 'withdrawn': registration.is_retracted, - 'extra': {'osf_related_resource_types': _get_osf_related_resource_types(registration)}, - }, - ) - -def _get_osf_related_resource_types(registration): - from osf.models import OutcomeArtifact - from osf.utils.outcomes import ArtifactTypes - artifacts = OutcomeArtifact.objects.for_registration(registration).filter(finalized=True, deleted__isnull=True) - return { - artifact_type.name.lower(): artifacts.filter(artifact_type=artifact_type).exists() - for artifact_type in ArtifactTypes.public_types() - } - -def serialize_osf_node(osf_node, additional_attrs=None): - if osf_node.provider: - share_publish_type = osf_node.provider.share_publish_type - else: - share_publish_type = 'project' - - graph_node = GraphNode( - share_publish_type, **{ - 'title': osf_node.title, - 'description': osf_node.description or '', - 'is_deleted': ( - not osf_node.is_public - or osf_node.is_deleted - or osf_node.is_spam - or is_qa_resource(osf_node) - ), - **(additional_attrs or {}), - }, - ) - - to_visit = [ - graph_node, - GraphNode('workidentifier', creative_work=graph_node, uri=urljoin(settings.DOMAIN, osf_node.url)), - ] - - doi = osf_node.get_identifier_value('doi') - if doi: - to_visit.append(GraphNode('workidentifier', creative_work=graph_node, uri=f'{settings.DOI_URL_PREFIX}{doi}')) - - graph_node.attrs['tags'] = [ - GraphNode('throughtags', creative_work=graph_node, tag=GraphNode('tag', name=tag._id)) - for tag in osf_node.tags.all() - ] - - graph_node.attrs['subjects'] = [ - GraphNode('throughsubjects', creative_work=graph_node, subject=format_subject(s)) - for s in osf_node.subjects.all() - ] - - to_visit.extend(format_bibliographic_contributor(graph_node, user, i) for i, user in enumerate(osf_node.visible_contributors)) - to_visit.extend(GraphNode('AgentWorkRelation', creative_work=graph_node, agent=GraphNode('institution', name=institution.name)) for institution in osf_node.affiliated_institutions.all()) - - to_visit.extend(format_node_lineage(osf_node, graph_node)) - - return GraphNode.serialize_graph(graph_node, to_visit) - - -@celery_app.task(bind=True, max_retries=4, acks_late=True) -def async_update_resource_share(self, guid, old_subjects=None): - """ - This function updates share takes Preprints, Projects and Registrations. - :param self: - :param guid: - :return: - """ - AbstractNode = apps.get_model('osf.AbstractNode') - resource = AbstractNode.load(guid) - if not resource: - Preprint = apps.get_model('osf.Preprint') - resource = Preprint.load(guid) - - data = serialize_share_data(resource, old_subjects) - resp = send_share_json(resource, data) - try: - resp.raise_for_status() - except Exception as e: - if self.request.retries == self.max_retries: - log_exception(e) - elif resp.status_code >= 500: - try: - self.retry( - exc=e, - countdown=(random.random() + 1) * min(60 + settings.CELERY_RETRY_BACKOFF_BASE ** self.request.retries, 60 * 10), - ) - except Retry as e: # Retry is only raise after > 5 retries - log_exception(e) - else: - log_exception(e) - - return resp diff --git a/api_tests/share/_utils.py b/api_tests/share/_utils.py index 2d974b75ccf..4fde322fccc 100644 --- a/api_tests/share/_utils.py +++ b/api_tests/share/_utils.py @@ -11,7 +11,7 @@ postcommit_queue, ) from website import settings as website_settings -from api.share.utils import shtrove_ingest_url, sharev2_push_url +from api.share.utils import shtrove_ingest_url from osf.metadata.osf_gathering import OsfmapPartition @@ -28,8 +28,6 @@ def mock_share_responses(): _ingest_url = shtrove_ingest_url() _rsps.add(responses.POST, _ingest_url, status=200) _rsps.add(responses.DELETE, _ingest_url, status=200) - # for legacy sharev2 support: - _rsps.add(responses.POST, sharev2_push_url(), status=200) yield _rsps @@ -44,7 +42,6 @@ def mock_update_share(): def expect_ingest_request(mock_share_responses, osfguid, *, token=None, delete=False, count=1, error_response=False): mock_share_responses._calls.reset() yield - _legacy_count_per_item = 1 _trove_main_count_per_item = 1 _trove_supplementary_count_per_item = ( 0 @@ -52,8 +49,7 @@ def expect_ingest_request(mock_share_responses, osfguid, *, token=None, delete=F else (len(OsfmapPartition) - 1) ) _total_count = count * ( - _legacy_count_per_item - + _trove_main_count_per_item + _trove_main_count_per_item + _trove_supplementary_count_per_item ) assert len(mock_share_responses.calls) == _total_count, ( @@ -61,24 +57,18 @@ def expect_ingest_request(mock_share_responses, osfguid, *, token=None, delete=F ) _trove_ingest_calls = [] _trove_supp_ingest_calls = [] - _legacy_push_calls = [] for _call in mock_share_responses.calls: if _call.request.url.startswith(shtrove_ingest_url()): if 'is_supplementary' in _call.request.url: _trove_supp_ingest_calls.append(_call) else: _trove_ingest_calls.append(_call) - else: - _legacy_push_calls.append(_call) assert len(_trove_ingest_calls) == count assert len(_trove_supp_ingest_calls) == count * _trove_supplementary_count_per_item - assert len(_legacy_push_calls) == count for _call in _trove_ingest_calls: assert_ingest_request(_call.request, osfguid, token=token, delete=delete) for _call in _trove_supp_ingest_calls: assert_ingest_request(_call.request, osfguid, token=token, delete=delete, supp=True) - for _call in _legacy_push_calls: - assert _call.request.url.startswith(sharev2_push_url()) def assert_ingest_request(request, expected_osfguid, *, token=None, delete=False, supp=False): diff --git a/api_tests/share/test_share_node.py b/api_tests/share/test_share_node.py index 089611f2512..791e7d0099a 100644 --- a/api_tests/share/test_share_node.py +++ b/api_tests/share/test_share_node.py @@ -20,7 +20,7 @@ from website.project.tasks import on_node_updated from framework.auth.core import Auth -from api.share.utils import shtrove_ingest_url, sharev2_push_url +from api.share.utils import shtrove_ingest_url from ._utils import expect_ingest_request @@ -189,8 +189,6 @@ def test_call_async_update_on_500_retry(self, mock_share_responses, node, user): """This is meant to simulate a temporary outage, so the retry mechanism should kick in and complete it.""" mock_share_responses.replace(responses.POST, shtrove_ingest_url(), status=500) mock_share_responses.add(responses.POST, shtrove_ingest_url(), status=200) - mock_share_responses.replace(responses.POST, sharev2_push_url(), status=500) - mock_share_responses.add(responses.POST, sharev2_push_url(), status=200) with expect_ingest_request(mock_share_responses, node._id, count=2): on_node_updated(node._id, user._id, False, {'is_public'}) @@ -198,13 +196,11 @@ def test_call_async_update_on_500_retry(self, mock_share_responses, node, user): def test_call_async_update_on_500_failure(self, mock_share_responses, node, user): """This is meant to simulate a total outage, so the retry mechanism should try X number of times and quit.""" mock_share_responses.replace(responses.POST, shtrove_ingest_url(), status=500) - mock_share_responses.replace(responses.POST, sharev2_push_url(), status=500) with expect_ingest_request(mock_share_responses, node._id, count=5): # tries five times on_node_updated(node._id, user._id, False, {'is_public'}) @mark.skip('Synchronous retries not supported if celery >=5.0') def test_no_call_async_update_on_400_failure(self, mock_share_responses, node, user): mock_share_responses.replace(responses.POST, shtrove_ingest_url(), status=400) - mock_share_responses.replace(responses.POST, sharev2_push_url(), status=400) with expect_ingest_request(mock_share_responses, node._id): on_node_updated(node._id, user._id, False, {'is_public'}) diff --git a/api_tests/share/test_share_preprint.py b/api_tests/share/test_share_preprint.py index 4ab47963bc8..cf0c8a3d92d 100644 --- a/api_tests/share/test_share_preprint.py +++ b/api_tests/share/test_share_preprint.py @@ -4,7 +4,7 @@ import pytest import responses -from api.share.utils import shtrove_ingest_url, sharev2_push_url +from api.share.utils import shtrove_ingest_url from framework.auth.core import Auth from osf.models.spam import SpamStatus from osf.utils.permissions import READ, WRITE, ADMIN @@ -124,14 +124,12 @@ def test_preprint_contributor_changes_updates_preprints_share(self, mock_share_r @pytest.mark.skip('Synchronous retries not supported if celery >=5.0') def test_call_async_update_on_500_failure(self, mock_share_responses, preprint, auth): mock_share_responses.replace(responses.POST, shtrove_ingest_url(), status=500) - mock_share_responses.replace(responses.POST, sharev2_push_url(), status=500) preprint.set_published(True, auth=auth, save=True) with expect_preprint_ingest_request(mock_share_responses, preprint, count=5): preprint.update_search() def test_no_call_async_update_on_400_failure(self, mock_share_responses, preprint, auth): mock_share_responses.replace(responses.POST, shtrove_ingest_url(), status=400) - mock_share_responses.replace(responses.POST, sharev2_push_url(), status=400) preprint.set_published(True, auth=auth, save=True) with expect_preprint_ingest_request(mock_share_responses, preprint, count=1, error_response=True): preprint.update_search()