diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/dto/Vaccination.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/dto/Vaccination.java index 07275335d..18aa08da3 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/dto/Vaccination.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/dto/Vaccination.java @@ -88,7 +88,7 @@ public class Vaccination { private Long organizationUid; @Column(name="PHC_UID") - private Long phcUid; + private String phcUid; @Column(name="PATIENT_UID") private Long patientUid; diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/VaccinationReporting.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/VaccinationReporting.java index 7e22adb75..1cb7fffbb 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/VaccinationReporting.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/VaccinationReporting.java @@ -36,7 +36,7 @@ public class VaccinationReporting { private String electronicInd; private Long providerUid; private Long organizationUid; - private Long phcUid; + private String phcUid; private Long patientUid; private String materialCd; } diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java index 133248931..1accba246 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java @@ -783,4 +783,57 @@ void testProcessMissingOrInvalidCaseManagement() { expected.setDatamartColumnNm("TOTAL_COUNT_50_TO_64"); return expected; } + + @Test + void testVaccinationWithMultiplePHCUIDs() throws JsonProcessingException { + Vaccination vaccination = constructVaccination(VACCINATION_UID); + vaccination.setPhcUid("123456,789012,345678"); + transformer.setVaccinationOutputTopicName(VACCINATION_TOPIC); + VaccinationReportingKey expectedKey = new VaccinationReportingKey(); + expectedKey.setVaccinationUid(VACCINATION_UID); + when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); + transformer.processVaccination(vaccination); + Awaitility.await() + .atMost(1, TimeUnit.SECONDS) + .untilAsserted(() -> + verify(kafkaTemplate, times(1)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()) + ); + var actualVacValue = objectMapper.readValue( + objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(), VaccinationReporting.class); + assertEquals("123456,789012,345678", actualVacValue.getPhcUid()); + } + + @Test + void testVaccinationWithSinglePHCUID() throws JsonProcessingException { + Vaccination vaccination = constructVaccination(VACCINATION_UID); + vaccination.setPhcUid("123456"); + transformer.setVaccinationOutputTopicName(VACCINATION_TOPIC); + when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); + transformer.processVaccination(vaccination); + Awaitility.await() + .atMost(1, TimeUnit.SECONDS) + .untilAsserted(() -> + verify(kafkaTemplate, times(1)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()) + ); + var actualVacValue = objectMapper.readValue( + objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(), VaccinationReporting.class); + assertEquals("123456", actualVacValue.getPhcUid()); + } + + @Test + void testVaccinationWithNullPHCUID() throws JsonProcessingException { + Vaccination vaccination = constructVaccination(VACCINATION_UID); + vaccination.setPhcUid(null); + transformer.setVaccinationOutputTopicName(VACCINATION_TOPIC); + when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); + transformer.processVaccination(vaccination); + Awaitility.await() + .atMost(1, TimeUnit.SECONDS) + .untilAsserted(() -> + verify(kafkaTemplate, times(1)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()) + ); + var actualVacValue = objectMapper.readValue( + objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(), VaccinationReporting.class); + assertNull(actualVacValue.getPhcUid()); + } } diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/utils/TestUtils.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/utils/TestUtils.java index 0d1201c67..7efe7f2e0 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/utils/TestUtils.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/utils/TestUtils.java @@ -347,6 +347,8 @@ public static Vaccination constructVaccination(Long vaccinationUid) { vaccination.setVaccinationAnatomicalSite(""); vaccination.setVaccineManufacturerNm("test"); vaccination.setMaterialCd("102"); + //NEW: TEST comma-separated PHC UIDs + vaccination.setPhcUid("12345,78902,34566"); return vaccination; } @@ -372,6 +374,8 @@ public static VaccinationReporting constructVaccinationReporting(Long vaccinatio vaccinationReporting.setVaccinationAnatomicalSite(""); vaccinationReporting.setVaccineManufacturerNm("test"); vaccinationReporting.setMaterialCd("102"); + //NEW: TEST comma-separated PHC UIDs + vaccinationReporting.setPhcUid("12345,78902,34566"); return vaccinationReporting; } diff --git a/liquibase-service/src/main/resources/db/003-odse/routines/027-sp_vaccination_event-001.sql b/liquibase-service/src/main/resources/db/003-odse/routines/027-sp_vaccination_event-001.sql index 38db6d703..73d3caf5b 100644 --- a/liquibase-service/src/main/resources/db/003-odse/routines/027-sp_vaccination_event-001.sql +++ b/liquibase-service/src/main/resources/db/003-odse/routines/027-sp_vaccination_event-001.sql @@ -1160,11 +1160,12 @@ BEGIN , CASE_INFO as ( select src.VACCINATION_UID, - actrel.TARGET_ACT_UID as PHC_UID + STRING_AGG(actrel.TARGET_ACT_UID, ',') as PHC_UID from #TMP_VACCINATION_INIT src inner join NBS_ODSE.dbo.ACT_RELATIONSHIP actrel with (nolock) ON actrel.SOURCE_ACT_UID = src.VACCINATION_UID where TYPE_CD='1180' + GROUP BY src.VACCINATION_UID ) SELECT ix.ADD_TIME , diff --git a/liquibase-service/src/main/resources/db/005-rdb_modern/routines/046-sp_f_vaccination_postprocessing-001.sql b/liquibase-service/src/main/resources/db/005-rdb_modern/routines/046-sp_f_vaccination_postprocessing-001.sql index a4443b705..189d34e5a 100644 --- a/liquibase-service/src/main/resources/db/005-rdb_modern/routines/046-sp_f_vaccination_postprocessing-001.sql +++ b/liquibase-service/src/main/resources/db/005-rdb_modern/routines/046-sp_f_vaccination_postprocessing-001.sql @@ -68,36 +68,46 @@ BEGIN SET @PROC_STEP_NO = @PROC_STEP_NO + 1; SET @PROC_STEP_NAME = ' GENERATING #F_VAC_INIT'; + -- Expand comma-separated PHC_UID into individual rows + WITH PHC_EXPANDED AS ( + SELECT nc.VACCINATION_UID, + nc.PATIENT_UID, + nc.PROVIDER_UID, + nc.ORGANIZATION_UID, + CAST(TRIM(phc_split.value) AS BIGINT) as PHC_UID_INDIVIDUAL + FROM (SELECT * FROM dbo.NRT_VACCINATION + WHERE VACCINATION_UID IN (SELECT value FROM STRING_SPLIT(@vac_uids, ','))) nc + CROSS APPLY STRING_SPLIT(nc.PHC_UID, ',') phc_split + WHERE TRIM(phc_split.value) != '' AND TRIM(phc_split.value) IS NOT NULL + ) SELECT dim.D_VACCINATION_KEY, - nc.PATIENT_UID, + exp.PATIENT_UID, coalesce(pt1.PATIENT_KEY, 1) as PATIENT_KEY, - nc.PROVIDER_UID, + exp.PROVIDER_UID, coalesce(pv1.PROVIDER_KEY, 1) as VACCINE_GIVEN_BY_KEY, - nc.ORGANIZATION_UID, + exp.ORGANIZATION_UID, coalesce(org.ORGANIZATION_KEY, 1) as VACCINE_GIVEN_BY_ORG_KEY, 1 as D_VACCINATION_REPEAT_KEY, - nc.PHC_UID, + exp.PHC_UID_INDIVIDUAL as PHC_UID, coalesce(inv1.INVESTIGATION_KEY, 1) as INVESTIGATION_KEY INTO #F_VAC_INIT - FROM (SELECT * - FROM dbo.NRT_VACCINATION - WHERE VACCINATION_UID IN (SELECT value FROM STRING_SPLIT(@vac_uids, ','))) nc + FROM PHC_EXPANDED exp LEFT JOIN - dbo.D_VACCINATION dim with (nolock) on dim.VACCINATION_UID = nc.VACCINATION_UID + dbo.D_VACCINATION dim with (nolock) on dim.VACCINATION_UID = exp.VACCINATION_UID LEFT JOIN - dbo.D_ORGANIZATION org with (nolock) on org.ORGANIZATION_UID = nc.ORGANIZATION_UID + dbo.D_ORGANIZATION org with (nolock) on org.ORGANIZATION_UID = exp.ORGANIZATION_UID LEFT JOIN - dbo.D_PROVIDER pv1 with (nolock) on pv1.PROVIDER_UID = nc.PROVIDER_UID + dbo.D_PROVIDER pv1 with (nolock) on pv1.PROVIDER_UID = exp.PROVIDER_UID LEFT JOIN - dbo.D_PATIENT pt1 with (nolock) on pt1.PATIENT_UID = nc.PATIENT_UID + dbo.D_PATIENT pt1 with (nolock) on pt1.PATIENT_UID = exp.PATIENT_UID LEFT JOIN - dbo.INVESTIGATION inv1 with (nolock) on inv1.CASE_UID = nc.PHC_UID; + dbo.INVESTIGATION inv1 with (nolock) on inv1.CASE_UID = exp.PHC_UID_INDIVIDUAL; SELECT @ROWCOUNT_NO = @@ROWCOUNT; @@ -123,7 +133,7 @@ BEGIN INTO #F_VAC_INIT_NEW FROM #F_VAC_INIT init LEFT OUTER JOIN - dbo.F_VACCINATION fact with (nolock) ON fact.D_VACCINATION_KEY = init.D_VACCINATION_KEY + dbo.F_VACCINATION fact with (nolock) ON fact.D_VACCINATION_KEY = init.D_VACCINATION_KEY AND fact.INVESTIGATION_KEY = init.INVESTIGATION_KEY WHERE fact.D_VACCINATION_KEY is NULL; if @@ -181,8 +191,13 @@ BEGIN FROM dbo.F_VACCINATION fact INNER JOIN (SELECT * FROM #F_VAC_INIT - WHERE D_VACCINATION_KEY NOT IN (SELECT D_VACCINATION_KEY FROM #F_VAC_INIT_NEW)) src - ON src.D_VACCINATION_KEY = fact.D_VACCINATION_KEY; + WHERE NOT EXISTS ( + SELECT 1 FROM #F_VAC_INIT_NEW new_rec + WHERE new_rec.D_VACCINATION_KEY = #F_VAC_INIT.D_VACCINATION_KEY + AND new_rec.INVESTIGATION_KEY = #F_VAC_INIT.INVESTIGATION_KEY + )) src + ON src.D_VACCINATION_KEY = fact.D_VACCINATION_KEY + AND src.INVESTIGATION_KEY = fact.INVESTIGATION_KEY; SELECT @RowCount_no = @@ROWCOUNT; diff --git a/liquibase-service/src/main/resources/db/005-rdb_modern/tables/051-create_nrt_vaccination-001.sql b/liquibase-service/src/main/resources/db/005-rdb_modern/tables/051-create_nrt_vaccination-001.sql index 33ed3969e..45ff26e13 100644 --- a/liquibase-service/src/main/resources/db/005-rdb_modern/tables/051-create_nrt_vaccination-001.sql +++ b/liquibase-service/src/main/resources/db/005-rdb_modern/tables/051-create_nrt_vaccination-001.sql @@ -80,6 +80,15 @@ IF EXISTS (SELECT 1 ALTER TABLE dbo.nrt_vaccination ADD material_cd varchar(20); END; + -- CNDE - 3045 + IF EXISTS(SELECT 1 + FROM sys.columns + WHERE name = N'phc_uid' + AND Object_ID = Object_ID(N'nrt_vaccination')) + BEGIN + ALTER TABLE dbo.nrt_vaccination + ALTER COLUMN phc_uid NVARCHAR(MAX); + END; END;