diff --git a/src/hpc-server/hpc-app-service-api/src/main/java/gov/nih/nci/hpc/service/HpcDataMigrationService.java b/src/hpc-server/hpc-app-service-api/src/main/java/gov/nih/nci/hpc/service/HpcDataMigrationService.java index 9b50ae5fa0..5c289dd699 100644 --- a/src/hpc-server/hpc-app-service-api/src/main/java/gov/nih/nci/hpc/service/HpcDataMigrationService.java +++ b/src/hpc-server/hpc-app-service-api/src/main/java/gov/nih/nci/hpc/service/HpcDataMigrationService.java @@ -16,6 +16,7 @@ import gov.nih.nci.hpc.domain.model.HpcDataMigrationTask; import gov.nih.nci.hpc.domain.model.HpcDataMigrationTaskResult; import gov.nih.nci.hpc.domain.model.HpcDataMigrationTaskStatus; +import gov.nih.nci.hpc.domain.model.HpcStagedMetadataAttribute; import gov.nih.nci.hpc.exception.HpcException; /** @@ -286,4 +287,43 @@ public HpcDataMigrationTask createMetadataMigrationTask(String fromS3ArchiveConf String toS3ArchiveConfigurationId, String fromArchiveFileContainerId, String toArchiveFileContainerId, String archiveFileIdPattern, String userId) throws HpcException; + /** + * Get a list of staged metadata attributes to be processed. + * + * @return A List of staged metadata entries. + * @throws HpcException on service failure. + */ + public List getStagedMetadataAttributes() throws HpcException; + + + /** + * Add staged metadata attribute to path. + * + * @param stagedMetadataAttribute staged metadata entry to add + * @param isCollection true if path is a collection path + * @param configurationId The configuration ID. + * @param collectionType The collection type. + * @throws HpcException on service failure. + */ + public void addStagedMetadataAttribute(HpcStagedMetadataAttribute stagedMetadataAttribute, boolean isCollection, + String configurationId, String collectionType) throws HpcException; + + /** + * Claim a staged metadata attribute for processing, so that in a multi-node + * deployment only one server processes each row. + * + * @param stagedMetadataAttribute staged metadata entry to claim + * @return true if successfully claimed, false if another node already claimed it. + * @throws HpcException on service failure. + */ + public boolean claimStagedMetadataAttribute(HpcStagedMetadataAttribute stagedMetadataAttribute) + throws HpcException; + + /** + * Clean processed staged metadata attribute. + * + * @param stagedMetadataAttribute staged metadata entry to clean up + * @throws HpcException on service failure. + */ + public void cleanupStagedMetadataAttribute(HpcStagedMetadataAttribute stagedMetadataAttribute) throws HpcException; } diff --git a/src/hpc-server/hpc-app-service-api/src/main/java/gov/nih/nci/hpc/service/HpcMetadataService.java b/src/hpc-server/hpc-app-service-api/src/main/java/gov/nih/nci/hpc/service/HpcMetadataService.java index 2b6d41cb87..804f45d0b9 100644 --- a/src/hpc-server/hpc-app-service-api/src/main/java/gov/nih/nci/hpc/service/HpcMetadataService.java +++ b/src/hpc-server/hpc-app-service-api/src/main/java/gov/nih/nci/hpc/service/HpcMetadataService.java @@ -55,9 +55,10 @@ public void addMetadataToCollection(String path, List metadata * @param metadataEntries The metadata entries to update. * @param configurationId The configuration to apply validation rules. Metadata * validation rules are configuration specific. + * @param allowSystemMetadata True if system metadata update is allowed. * @throws HpcException on service failure. */ - public void updateCollectionMetadata(String path, List metadataEntries, String configurationId) + public void updateCollectionMetadata(String path, List metadataEntries, String configurationId, boolean allowSystemMetadata) throws HpcException; /** @@ -310,10 +311,11 @@ public void updateDataObjectSystemGeneratedMetadata(String path, HpcFileLocation * @param extractedMetadata True if the data object is updated w/ extracted * metadata (this is optionally performed during data * object registration) + * @param allowSystemMetadata True if system metadata update is allowed. * @throws HpcException on service failure. */ public void updateDataObjectMetadata(String path, List metadataEntries, String configurationId, - String collectionType, boolean extractedMetadata) throws HpcException; + String collectionType, boolean extractedMetadata, boolean allowSystemMetadata) throws HpcException; /** * Get metadata of a data object. diff --git a/src/hpc-server/hpc-app-service-impl/src/main/java/gov/nih/nci/hpc/service/impl/HpcDataMigrationServiceImpl.java b/src/hpc-server/hpc-app-service-impl/src/main/java/gov/nih/nci/hpc/service/impl/HpcDataMigrationServiceImpl.java index ad9763b7a5..d65bfcea8f 100644 --- a/src/hpc-server/hpc-app-service-impl/src/main/java/gov/nih/nci/hpc/service/impl/HpcDataMigrationServiceImpl.java +++ b/src/hpc-server/hpc-app-service-impl/src/main/java/gov/nih/nci/hpc/service/impl/HpcDataMigrationServiceImpl.java @@ -8,6 +8,7 @@ */ package gov.nih.nci.hpc.service.impl; +import java.util.ArrayList; import java.util.Calendar; import java.util.Iterator; import java.util.List; @@ -24,6 +25,7 @@ import com.google.common.collect.Iterables; import gov.nih.nci.hpc.dao.HpcDataMigrationDAO; +import gov.nih.nci.hpc.domain.datamanagement.HpcAuditRequestType; import gov.nih.nci.hpc.domain.datamigration.HpcDataMigrationResult; import gov.nih.nci.hpc.domain.datamigration.HpcDataMigrationStatus; import gov.nih.nci.hpc.domain.datamigration.HpcDataMigrationType; @@ -32,15 +34,19 @@ import gov.nih.nci.hpc.domain.datatransfer.HpcDeepArchiveStatus; import gov.nih.nci.hpc.domain.datatransfer.HpcStreamingUploadSource; import gov.nih.nci.hpc.domain.error.HpcErrorType; +import gov.nih.nci.hpc.domain.metadata.HpcMetadataEntries; +import gov.nih.nci.hpc.domain.metadata.HpcMetadataEntry; import gov.nih.nci.hpc.domain.model.HpcDataMigrationTask; import gov.nih.nci.hpc.domain.model.HpcDataMigrationTaskResult; import gov.nih.nci.hpc.domain.model.HpcDataMigrationTaskStatus; import gov.nih.nci.hpc.domain.model.HpcDataObjectUploadRequest; import gov.nih.nci.hpc.domain.model.HpcDataTransferConfiguration; +import gov.nih.nci.hpc.domain.model.HpcStagedMetadataAttribute; import gov.nih.nci.hpc.domain.model.HpcSystemGeneratedMetadata; import gov.nih.nci.hpc.domain.user.HpcIntegratedSystemAccount; import gov.nih.nci.hpc.exception.HpcException; import gov.nih.nci.hpc.integration.HpcDataTransferProxy; +import gov.nih.nci.hpc.service.HpcDataManagementService; import gov.nih.nci.hpc.service.HpcDataMigrationService; import gov.nih.nci.hpc.service.HpcDataTransferService; import gov.nih.nci.hpc.service.HpcMetadataService; @@ -85,6 +91,10 @@ public class HpcDataMigrationServiceImpl implements HpcDataMigrationService { @Autowired private HpcSecurityService securityService = null; + // The Data Management Application Service Instance. + @Autowired + private HpcDataManagementService dataManagementService = null; + // A configured ID representing the server performing a migration task. @Value("${hpc.service.serverId}") private String serverId = null; @@ -441,7 +451,9 @@ public void resetMigrationTasksInProcess() throws HpcException { int taskCount = dataMigrationDAO.cleanupDataMigrationTasks(); logger.info("Cleaned up {} data migration tasks", taskCount); - + + // Reset any in-process metadata attribute migration + dataMigrationDAO.resetStagedMetadataAttribute(); } @Override @@ -632,4 +644,87 @@ public HpcDataMigrationTask createMetadataMigrationTask(String fromS3ArchiveConf dataMigrationDAO.upsertDataMigrationTask(migrationTask); return migrationTask; } + + @Override + public List getStagedMetadataAttributes() + throws HpcException { + + List stagedMetadataEntries = dataMigrationDAO.getStagedMetadataAttributes(); + logger.info("{} staged metadata attributes retrieved.", stagedMetadataEntries.size()); + + return stagedMetadataEntries; + } + + @Override + public void addStagedMetadataAttribute(HpcStagedMetadataAttribute stagedMetadataAttribute, boolean isCollection, + String configurationId, String collectionType) throws HpcException { + + // Construct metadata entry to add. + List metadataEntries = new ArrayList<>(); + HpcMetadataEntry entry = new HpcMetadataEntry(); + entry.setAttribute(stagedMetadataAttribute.getAttribute()); + entry.setValue(stagedMetadataAttribute.getValue()); + metadataEntries.add(entry); + + // Call addMetadata method to add the staged metadata entry to the relevant data object or collection + // with allowSystemMetadata set to true to allow adding metadata with reserved attributes + if (isCollection) { + // Get the metadata for this collection. + HpcMetadataEntries metadataBefore = metadataService.getCollectionMetadataEntries(stagedMetadataAttribute.getPath()); + + // Update the metadata. + boolean updated = true; + String message = null; + try { + metadataService.updateCollectionMetadata(stagedMetadataAttribute.getPath(), metadataEntries, + configurationId, true); + } catch (HpcException e) { + // Collection metadata update failed. Capture this in the audit record. + updated = false; + message = e.getMessage(); + } finally { + // Add an audit record of this update collection attempt. + dataManagementService.addAuditRecord(stagedMetadataAttribute.getPath(), HpcAuditRequestType.UPDATE_COLLECTION, metadataBefore, + metadataService.getCollectionMetadataEntries(stagedMetadataAttribute.getPath()), null, updated, null, message, "add-staged-metadata-task", null, + null); + } + + } else { + // Get the metadata for this data object. + HpcMetadataEntries metadataBefore = metadataService.getDataObjectMetadataEntries(stagedMetadataAttribute.getPath(), true); + + // Update the metadata. + boolean updated = true; + String message = null; + try { + metadataService.updateDataObjectMetadata(stagedMetadataAttribute.getPath(), metadataEntries, + configurationId, collectionType, false, true); + } catch (HpcException e) { + // Data object metadata update failed. Capture this in the audit record. + updated = false; + message = e.getMessage(); + } finally { + // Add an audit record of this update collection attempt. + dataManagementService.addAuditRecord(stagedMetadataAttribute.getPath(), HpcAuditRequestType.UPDATE_DATA_OBJECT, metadataBefore, + metadataService.getDataObjectMetadataEntries(stagedMetadataAttribute.getPath(), true), null, updated, null, message, "add-staged-metadata-task", null, + null); + } + } + + } + + @Override + public boolean claimStagedMetadataAttribute(HpcStagedMetadataAttribute stagedMetadataAttribute) + throws HpcException { + + return dataMigrationDAO.claimStagedMetadataAttribute(stagedMetadataAttribute); + } + + @Override + public void cleanupStagedMetadataAttribute(HpcStagedMetadataAttribute stagedMetadataAttribute) throws HpcException { + + int count = dataMigrationDAO.cleanupStagedMetadataAttribute(stagedMetadataAttribute); + logger.info("{} staged metadata attribute cleaned up.", count); + + } } diff --git a/src/hpc-server/hpc-app-service-impl/src/main/java/gov/nih/nci/hpc/service/impl/HpcMetadataServiceImpl.java b/src/hpc-server/hpc-app-service-impl/src/main/java/gov/nih/nci/hpc/service/impl/HpcMetadataServiceImpl.java index 68eb535bd6..22a5981dd3 100644 --- a/src/hpc-server/hpc-app-service-impl/src/main/java/gov/nih/nci/hpc/service/impl/HpcMetadataServiceImpl.java +++ b/src/hpc-server/hpc-app-service-impl/src/main/java/gov/nih/nci/hpc/service/impl/HpcMetadataServiceImpl.java @@ -205,7 +205,7 @@ public void addMetadataToCollection(String path, List metadata } // Validate Metadata. - metadataValidator.validateCollectionMetadata(configurationId, null, metadataEntries); + metadataValidator.validateCollectionMetadata(configurationId, null, metadataEntries, false); // Add Metadata to the DM system. dataManagementProxy.addMetadataToCollection(dataManagementAuthenticator.getAuthenticatedToken(), path, @@ -213,7 +213,7 @@ public void addMetadataToCollection(String path, List metadata } @Override - public void updateCollectionMetadata(String path, List metadataEntries, String configurationId) + public void updateCollectionMetadata(String path, List metadataEntries, String configurationId, boolean allowSystemMetadata) throws HpcException { // Input validation. if (path == null) { @@ -234,7 +234,7 @@ public void updateCollectionMetadata(String path, List metadat validateCollectionTypeUpdate(existingMetadataEntries, metadataEntries); // Validate the metadata. - metadataValidator.validateCollectionMetadata(configurationId, existingMetadataEntries, metadataEntries); + metadataValidator.validateCollectionMetadata(configurationId, existingMetadataEntries, metadataEntries, allowSystemMetadata); // Update the 'metadata updated' system-metadata to record the time of this // metadata update. @@ -541,7 +541,7 @@ public HpcMetadataEntry addMetadataToDataObject(String path, List String configurationId, String collectionType) throws HpcException { // Update the data object's metadata. updateDataObjectMetadata(path, new ArrayList(extractedMetadataEntries), configurationId, - collectionType, true); + collectionType, true, false); // Set the extracted-metadata-attributes system generated metadata to have all // the attributes @@ -884,7 +884,7 @@ public void updateDataObjectSystemGeneratedMetadata(String path, HpcFileLocation @Override public void updateDataObjectMetadata(String path, List metadataEntries, String configurationId, - String collectionType, boolean extractedMetadata) throws HpcException { + String collectionType, boolean extractedMetadata, boolean allowSystemMetadata) throws HpcException { // Input validation. if (path == null) { throw new HpcException(INVALID_PATH_MSG, HpcErrorType.INVALID_REQUEST_INPUT); @@ -901,7 +901,7 @@ public void updateDataObjectMetadata(String path, List metadat // Validate the metadata. metadataValidator.validateDataObjectMetadata(configurationId, metadataRetriever.getDataObjectMetadata(path), - metadataEntries, collectionType); + metadataEntries, collectionType, allowSystemMetadata); // Update the 'metadata updated' system-metadata to record the time of this // metadata update. This is skipped for updated the data object w/ extracted diff --git a/src/hpc-server/hpc-app-service-impl/src/main/java/gov/nih/nci/hpc/service/impl/HpcMetadataValidator.java b/src/hpc-server/hpc-app-service-impl/src/main/java/gov/nih/nci/hpc/service/impl/HpcMetadataValidator.java index d58cf0e2be..332a687a8b 100644 --- a/src/hpc-server/hpc-app-service-impl/src/main/java/gov/nih/nci/hpc/service/impl/HpcMetadataValidator.java +++ b/src/hpc-server/hpc-app-service-impl/src/main/java/gov/nih/nci/hpc/service/impl/HpcMetadataValidator.java @@ -156,7 +156,7 @@ private HpcMetadataValidator() { * @throws HpcException If the metadata is invalid. */ public void validateCollectionMetadata(String configurationId, List existingMetadataEntries, - List addUpdateMetadataEntries) throws HpcException { + List addUpdateMetadataEntries, boolean allowSystemMetadata) throws HpcException { HpcDataManagementConfiguration dataManagementConfiguration = dataManagementConfigurationLocator .get(configurationId); if (dataManagementConfiguration == null) { @@ -165,7 +165,7 @@ public void validateCollectionMetadata(String configurationId, List existingMetadataEntries, - List addUpdateMetadataEntries, String collectionType) throws HpcException { + List addUpdateMetadataEntries, String collectionType, boolean allowSystemMetadata) throws HpcException { HpcDataManagementConfiguration dataManagementConfiguration = dataManagementConfigurationLocator .get(configurationId); if (dataManagementConfiguration == null) { @@ -194,7 +194,8 @@ public void validateDataObjectMetadata(String configurationId, List getDataObjectSystemGeneratedMetadataAttributeNames() { */ private void validateMetadata(List existingMetadataEntries, List addUpdateMetadataEntries, List metadataValidationRules, - String collectionType, Boolean restrictMetadata) throws HpcException { + String collectionType, Boolean restrictMetadata, boolean allowSystemMetadata) throws HpcException { // Crate a metadata map. Put existing entries first. Map metadataEntriesMap = new HashMap<>(); if (existingMetadataEntries != null) { @@ -298,7 +299,8 @@ private void validateMetadata(List existingMetadataEntries, // If the restrict_metadata flag is set in the database, then ensure // that the add/update metadata entry defined in the validation rules // i.e. it is a mandatory or optional metadata for the applicable DOC - if (restrictMetadata) { + if (restrictMetadata && !(allowSystemMetadata + && systemGeneratedMetadataAttributes.contains(metadataEntry.getAttribute()))) { // Do this check only for new files or collections so that fixing // of existing ones is not necessitated if (CollectionUtils.isNullOrEmpty(existingMetadataEntries)) { @@ -318,12 +320,15 @@ private void validateMetadata(List existingMetadataEntries, } } - // Validate the add/update metadata entries don't include reserved system - // generated metadata. - for (String metadataAttribue : systemGeneratedMetadataAttributes) { - if (addUpdateMetadataEntriesMap.containsKey(metadataAttribue)) { - throw new HpcException("System generated metadata cannot be set/changed: " + metadataAttribue, - HpcErrorType.INVALID_REQUEST_INPUT); + // If the allowSystemMetadata flag is not set, validate that add/update metadata entries don't include reserved system generated metadata. + if (!allowSystemMetadata) { + // Validate the add/update metadata entries don't include reserved system + // generated metadata. + for (String metadataAttribue : systemGeneratedMetadataAttributes) { + if (addUpdateMetadataEntriesMap.containsKey(metadataAttribue)) { + throw new HpcException("System generated metadata cannot be set/changed: " + metadataAttribue, + HpcErrorType.INVALID_REQUEST_INPUT); + } } } diff --git a/src/hpc-server/hpc-bus-service-api/src/main/java/gov/nih/nci/hpc/bus/HpcDataMigrationBusService.java b/src/hpc-server/hpc-bus-service-api/src/main/java/gov/nih/nci/hpc/bus/HpcDataMigrationBusService.java index d88282031c..8931855e0b 100644 --- a/src/hpc-server/hpc-bus-service-api/src/main/java/gov/nih/nci/hpc/bus/HpcDataMigrationBusService.java +++ b/src/hpc-server/hpc-bus-service-api/src/main/java/gov/nih/nci/hpc/bus/HpcDataMigrationBusService.java @@ -172,4 +172,12 @@ public HpcMigrationResponseDTO migrateMetadata(HpcMetadataMigrationRequestDTO me * @throws HpcException on service failure. */ public void restartDataMigrationTasks() throws HpcException; + + /** + * Process staged metadata attributes. + * + * @throws HpcException on service failure. + */ + public void processStagedMetadataAttributes() throws HpcException; + } diff --git a/src/hpc-server/hpc-bus-service-impl/src/main/java/gov/nih/nci/hpc/bus/impl/HpcDataManagementBusServiceImpl.java b/src/hpc-server/hpc-bus-service-impl/src/main/java/gov/nih/nci/hpc/bus/impl/HpcDataManagementBusServiceImpl.java index 894f62673b..ab5e62c97b 100644 --- a/src/hpc-server/hpc-bus-service-impl/src/main/java/gov/nih/nci/hpc/bus/impl/HpcDataManagementBusServiceImpl.java +++ b/src/hpc-server/hpc-bus-service-impl/src/main/java/gov/nih/nci/hpc/bus/impl/HpcDataManagementBusServiceImpl.java @@ -3676,7 +3676,7 @@ private void updateCollection(String path, List metadataEntrie if (!metadataContained(metadataEntries, metadataBefore.getSelfMetadataEntries())) { synchronized (this) { metadataService.updateCollectionMetadata(path, metadataEntries, - systemGeneratedMetadata.getConfigurationId()); + systemGeneratedMetadata.getConfigurationId(), false); } } else { logger.info( @@ -3740,7 +3740,7 @@ private HpcDataObjectUploadResponse updateDataObject(String path, List { + try { + // Claim the row via an atomic conditional UPDATE (IN_PROCESS = 0 → 1). + // Oracle's row-level locking ensures that if two nodes execute this + // UPDATE concurrently on the same row, only one succeeds; the other + // finds IN_PROCESS already set and updates 0 rows. + if (!dataMigrationService.claimStagedMetadataAttribute(stagedMetadataAttribute)) { + logger.debug("Staged metadata {}:{} for path - {} already claimed by another node, skipping.", + stagedMetadataAttribute.getAttribute(), stagedMetadataAttribute.getValue(), + stagedMetadataAttribute.getPath()); + return; + } + + logger.info("Adding staged metadata {}:{} to path - {}", stagedMetadataAttribute.getAttribute(), + stagedMetadataAttribute.getValue(), + stagedMetadataAttribute.getPath()); + + boolean isCollection = dataManagementService.isPathCollection(stagedMetadataAttribute.getPath()); + String configurationId = dataManagementService.findDataManagementConfigurationId(stagedMetadataAttribute.getPath()); + // Get the collection type containing the data object. + String collectionPath = isCollection ? null : stagedMetadataAttribute.getPath().substring(0, stagedMetadataAttribute.getPath().lastIndexOf('/')); + String collectionType = isCollection ? null : dataManagementService.getCollectionType(collectionPath); + + // Add the metadata + dataMigrationService.addStagedMetadataAttribute(stagedMetadataAttribute, isCollection, configurationId, collectionType); + + // Delete the staged metadata after it's added to the data object/collection. + dataMigrationService.cleanupStagedMetadataAttribute(stagedMetadataAttribute); + + } catch (HpcException e) { + logger.error("Failed to add staged metadata {}:{} to path - {}. Error: {}", stagedMetadataAttribute.getAttribute(), + stagedMetadataAttribute.getValue(), stagedMetadataAttribute.getPath(), e.getMessage(), e); + } + }); + } @Override @HpcExecuteAsSystemAccount diff --git a/src/hpc-server/hpc-dao-api/src/main/java/gov/nih/nci/hpc/dao/HpcDataMigrationDAO.java b/src/hpc-server/hpc-dao-api/src/main/java/gov/nih/nci/hpc/dao/HpcDataMigrationDAO.java index 90898f106d..1b4e55e26f 100644 --- a/src/hpc-server/hpc-dao-api/src/main/java/gov/nih/nci/hpc/dao/HpcDataMigrationDAO.java +++ b/src/hpc-server/hpc-dao-api/src/main/java/gov/nih/nci/hpc/dao/HpcDataMigrationDAO.java @@ -19,6 +19,7 @@ import gov.nih.nci.hpc.domain.datamigration.HpcDataMigrationType; import gov.nih.nci.hpc.domain.model.HpcDataMigrationTask; import gov.nih.nci.hpc.domain.model.HpcDataMigrationTaskResult; +import gov.nih.nci.hpc.domain.model.HpcStagedMetadataAttribute; import gov.nih.nci.hpc.exception.HpcException; /** @@ -199,4 +200,40 @@ public Map getCollectionMigrationResultCount(St * @throws HpcException on database error. */ public int cleanupDataMigrationTasks() throws HpcException; + + /** + * Get staged metadata attributes for processing. + * + * @return A list of staged metadata attribute. + * @throws HpcException on database error. + */ + public List getStagedMetadataAttributes() throws HpcException; + + /** + * Claim a staged metadata attribute for processing. Uses an atomic update so + * that only one server node claims each row in a multi-node deployment. + * + * @param stagedMetadataAttribute The staged metadata entry to claim. + * @return true if the row was successfully claimed, false if another node + * already claimed it. + * @throws HpcException on database error. + */ + public boolean claimStagedMetadataAttribute(HpcStagedMetadataAttribute stagedMetadataAttribute) + throws HpcException; + + /** + * Cleanup staged metadata attributes processed. + * + * @param stagedMetadataAttribute The staged metadata to be cleaned up + * @return The number of records cleaned up. + * @throws HpcException on database error. + */ + public int cleanupStagedMetadataAttribute(HpcStagedMetadataAttribute stagedMetadataAttribute) throws HpcException; + + /** + * Reset in-process staged metadata attributes. + * + * @throws HpcException on database error. + */ + public void resetStagedMetadataAttribute() throws HpcException; } diff --git a/src/hpc-server/hpc-dao-impl/src/main/java/gov/nih/nci/hpc/dao/oracle/impl/HpcDataMigrationDAOImpl.java b/src/hpc-server/hpc-dao-impl/src/main/java/gov/nih/nci/hpc/dao/oracle/impl/HpcDataMigrationDAOImpl.java index aacb7458d6..de1462ddfc 100644 --- a/src/hpc-server/hpc-dao-impl/src/main/java/gov/nih/nci/hpc/dao/oracle/impl/HpcDataMigrationDAOImpl.java +++ b/src/hpc-server/hpc-dao-impl/src/main/java/gov/nih/nci/hpc/dao/oracle/impl/HpcDataMigrationDAOImpl.java @@ -33,6 +33,7 @@ import org.springframework.jdbc.core.support.SqlLobValue; import org.springframework.jdbc.support.lob.DefaultLobHandler; import org.springframework.jdbc.support.lob.LobHandler; +import org.springframework.transaction.annotation.Transactional; import gov.nih.nci.hpc.dao.HpcDataMigrationDAO; import gov.nih.nci.hpc.domain.datamigration.HpcDataMigrationResult; @@ -42,6 +43,7 @@ import gov.nih.nci.hpc.domain.error.HpcErrorType; import gov.nih.nci.hpc.domain.model.HpcDataMigrationTask; import gov.nih.nci.hpc.domain.model.HpcDataMigrationTaskResult; +import gov.nih.nci.hpc.domain.model.HpcStagedMetadataAttribute; import gov.nih.nci.hpc.domain.user.HpcIntegratedSystem; import gov.nih.nci.hpc.exception.HpcException; @@ -56,6 +58,9 @@ public class HpcDataMigrationDAOImpl implements HpcDataMigrationDAO { // ---------------------------------------------------------------------// // SQL Queries. + + // The following queries are used for upserting and retrieving data migration tasks and results. + private static final String UPSERT_DATA_MIGRATION_TASK_SQL = "merge into HPC_DATA_MIGRATION_TASK using dual on (ID = ?) " + "when matched then update set PARENT_ID = ?, USER_ID = ?, PATH = ?, CONFIGURATION_ID = ?, FROM_S3_ARCHIVE_CONFIGURATION_ID = ?, " + "TO_S3_ARCHIVE_CONFIGURATION_ID = ?, TYPE = ?, STATUS = ?, CREATED = ?, ALIGN_ARCHIVE_PATH = ?, DATA_SIZE = ?, PERCENT_COMPLETE = ?, SERVER_ID = ?, " @@ -117,6 +122,19 @@ public class HpcDataMigrationDAOImpl implements HpcDataMigrationDAO { + "(select COALESCE(sum(DATA_SIZE), 0) as total, COALESCE(sum(DATA_SIZE), 0) as transferred from HPC_DATA_MIGRATION_TASK_RESULT where PARENT_ID = ? " + "union all select COALESCE(sum(DATA_SIZE), 0) as total, COALESCE(sum(PERCENT_COMPLETE / 100 * DATA_SIZE), 0) as transferred from HPC_DATA_MIGRATION_TASK where PARENT_ID = ?)) " + "where ID = ? and STATUS = ? and TYPE != ?"; + + // The following queries are used for retrieving staged metadata attributes for processing and tracking migrated attributes. + + private static final String GET_STAGED_METADATA_ATTRIBUTES_SQL = "select * from HPC_STAGED_METADATA_ATTRIBUTES where IN_PROCESS = 0"; + + private static final String CLAIM_STAGED_METADATA_ATTRIBUTE_SQL = "update HPC_STAGED_METADATA_ATTRIBUTES set IN_PROCESS = 1 where PATH = ? and META_ATTR_NAME = ? and IN_PROCESS = 0"; + + private static final String CLEANUP_STAGED_METADATA_ATTRIBUTE_SQL = "delete from HPC_STAGED_METADATA_ATTRIBUTES where path = ? and meta_attr_name = ? "; + + private static final String INSERT_MIGRATED_METADATA_ATTRIBUTE_SQL = "insert into HPC_MIGRATED_METADATA_ATTRIBUTES ( " + + "PATH, META_ATTR_NAME, META_ATTR_VALUE, COMPLETED) values (?, ?, ?, ?)"; + + private static final String RESET_STAGED_METADATA_ATTRIBUTE_SQL = "update HPC_STAGED_METADATA_ATTRIBUTES set IN_PROCESS = 0 where IN_PROCESS = 1"; // ---------------------------------------------------------------------// // Instance members @@ -222,6 +240,16 @@ public class HpcDataMigrationDAOImpl implements HpcDataMigrationDAO { return new AbstractMap.SimpleEntry( HpcDataMigrationResult.fromValue(rs.getString("RESULT")), rs.getInt("COUNT")); }; + + // Mapper to get staged metadata entries for processing + private RowMapper stagedMetadataAttributesRowMapper = (rs, rowNum) -> { + HpcStagedMetadataAttribute metadataEntry = new HpcStagedMetadataAttribute(); + metadataEntry.setPath(rs.getString("PATH")); + metadataEntry.setAttribute(rs.getString("META_ATTR_NAME")); + metadataEntry.setValue(rs.getString("META_ATTR_VALUE")); + + return metadataEntry; + }; // The Logger instance. private final Logger logger = LoggerFactory.getLogger(this.getClass().getName()); @@ -549,4 +577,54 @@ public int cleanupDataMigrationTasks() throws HpcException { } } + + @Override + public List getStagedMetadataAttributes() throws HpcException { + try { + return jdbcTemplate.query(GET_STAGED_METADATA_ATTRIBUTES_SQL, stagedMetadataAttributesRowMapper); + + } catch (DataAccessException e) { + throw new HpcException("Failed to get staged metadata attributes: " + e.getMessage(), HpcErrorType.DATABASE_ERROR, + HpcIntegratedSystem.ORACLE, e); + } + } + + @Override + public boolean claimStagedMetadataAttribute(HpcStagedMetadataAttribute stagedMetadataAttribute) + throws HpcException { + try { + return jdbcTemplate.update(CLAIM_STAGED_METADATA_ATTRIBUTE_SQL, stagedMetadataAttribute.getPath(), + stagedMetadataAttribute.getAttribute()) > 0; + + } catch (DataAccessException e) { + throw new HpcException("Failed to claim staged metadata attribute: " + e.getMessage(), + HpcErrorType.DATABASE_ERROR, HpcIntegratedSystem.ORACLE, e); + } + } + + @Override + @Transactional + public int cleanupStagedMetadataAttribute(HpcStagedMetadataAttribute stagedMetadataAttribute) throws HpcException { + try { + jdbcTemplate.update(INSERT_MIGRATED_METADATA_ATTRIBUTE_SQL, stagedMetadataAttribute.getPath(), stagedMetadataAttribute.getAttribute(), + stagedMetadataAttribute.getValue(), Calendar.getInstance()); + + return jdbcTemplate.update(CLEANUP_STAGED_METADATA_ATTRIBUTE_SQL, stagedMetadataAttribute.getPath(), stagedMetadataAttribute.getAttribute()); + + } catch (DataAccessException e) { + throw new HpcException("Failed to cleanup staged metadata attribute: " + e.getMessage(), HpcErrorType.DATABASE_ERROR, + HpcIntegratedSystem.ORACLE, e); + } + } + + @Override + public void resetStagedMetadataAttribute() throws HpcException { + try { + jdbcTemplate.update(RESET_STAGED_METADATA_ATTRIBUTE_SQL); + + } catch (DataAccessException e) { + throw new HpcException("Failed to reset in process staged metadata attribute: " + e.getMessage(), + HpcErrorType.DATABASE_ERROR, HpcIntegratedSystem.ORACLE, e); + } + } } diff --git a/src/hpc-server/hpc-dao-impl/src/main/scripts/migration/release-3.24.0/hpc_staged_metadata_attributes.sql b/src/hpc-server/hpc-dao-impl/src/main/scripts/migration/release-3.24.0/hpc_staged_metadata_attributes.sql new file mode 100644 index 0000000000..1774cdd31b --- /dev/null +++ b/src/hpc-server/hpc-dao-impl/src/main/scripts/migration/release-3.24.0/hpc_staged_metadata_attributes.sql @@ -0,0 +1,28 @@ +-- +-- hpc_staged_metadata_attributes.sql +-- +-- Copyright Leidos Biomedical Research, Inc +-- +-- Distributed under the OSI-approved BSD 3-Clause License. +-- See http://ncip.github.com/HPC/LICENSE.txt for details. +-- +-- @author Yuri Dinh +-- @version $Id$ +-- + +create table irods.hpc_staged_metadata_attributes +( + PATH VARCHAR2(2700), + META_ATTR_NAME VARCHAR2(2700) not null, + META_ATTR_VALUE VARCHAR2(2700) not null, + IN_PROCESS NUMBER(1) default 0 not null +); + +create table irods.hpc_migrated_metadata_attributes +( + PATH VARCHAR2(2700), + META_ATTR_NAME VARCHAR2(2700) not null, + META_ATTR_VALUE VARCHAR2(2700) not null, + COMPLETED TIMESTAMP(6) +); + diff --git a/src/hpc-server/hpc-domain-model/src/main/resources/schema/HpcDataMigration.xsd b/src/hpc-server/hpc-domain-model/src/main/resources/schema/HpcDataMigration.xsd index 83164b6674..122d91de7d 100644 --- a/src/hpc-server/hpc-domain-model/src/main/resources/schema/HpcDataMigration.xsd +++ b/src/hpc-server/hpc-domain-model/src/main/resources/schema/HpcDataMigration.xsd @@ -112,5 +112,13 @@ + + + + + + + + diff --git a/src/hpc-server/hpc-scheduler-migration/src/main/java/gov/nih/nci/hpc/scheduler/migration/impl/HpcScheduledTasksImpl.java b/src/hpc-server/hpc-scheduler-migration/src/main/java/gov/nih/nci/hpc/scheduler/migration/impl/HpcScheduledTasksImpl.java index 5ebe7aa5ef..86d4c84901 100644 --- a/src/hpc-server/hpc-scheduler-migration/src/main/java/gov/nih/nci/hpc/scheduler/migration/impl/HpcScheduledTasksImpl.java +++ b/src/hpc-server/hpc-scheduler-migration/src/main/java/gov/nih/nci/hpc/scheduler/migration/impl/HpcScheduledTasksImpl.java @@ -101,6 +101,13 @@ private void processDataObjectMetadataUpdatetMigrationReceivedTask() { execute("processDataObjectMetadataUpdateMigrationReceived()", dataMigrationBusService::processDataObjectMetadataUpdateMigrationReceived, logger); } + + /** Process staged metadata attributes. */ + @Scheduled(cron = "${hpc.scheduler.migration.cron.processStagedMetadataAttributes.delay}") + private void processStagedMetadataAttributes() { + execute("processStagedMetadataAttributes()", + dataMigrationBusService::processStagedMetadataAttributes, logger); + } /** * Called by Spring dependency injection. Reset all active S3 upload/download in diff --git a/src/hpc-server/hpc-scheduler-migration/src/main/resources/WEB-INF/spring/hpc-scheduler-migration.properties b/src/hpc-server/hpc-scheduler-migration/src/main/resources/WEB-INF/spring/hpc-scheduler-migration.properties index 2edc3db681..7dd19268d4 100644 --- a/src/hpc-server/hpc-scheduler-migration/src/main/resources/WEB-INF/spring/hpc-scheduler-migration.properties +++ b/src/hpc-server/hpc-scheduler-migration/src/main/resources/WEB-INF/spring/hpc-scheduler-migration.properties @@ -326,4 +326,5 @@ hpc.scheduler.migration.cron.processDataObjectListMigrationReceived.delay=25 0/1 hpc.scheduler.migration.cron.processCollectionListMigrationReceived.delay=35 0/1 * * * ? hpc.scheduler.migration.cron.processBulkMetadataUpdatetMigrationReceived.delay=32 0/1 * * * ? hpc.scheduler.migration.cron.processDataObjectMetadataUpdatetMigrationReceived.delay=38 0/1 * * * ? +hpc.scheduler.migration.cron.processStagedMetadataAttributes.delay=0 0 4 * * *