From 5025bf5dc5269757cbe82711aaf80c4a4b88b99f Mon Sep 17 00:00:00 2001 From: Michael Kalish Date: Tue, 8 Oct 2024 11:18:24 -0400 Subject: [PATCH] 15223: generate empty in convert (#16137) * 15223: generate empty report for garbled convert data * fixup! 15223: generate empty report for garbled convert data * fixup! 15223: generate empty report for garbled convert data * fixup! 15223: generate empty report for garbled convert data * fixup! 15223: generate empty report for garbled convert data * fixup! 15223: generate empty report for garbled convert data --- .../src/main/kotlin/azure/ActionHistory.kt | 16 +++++ .../kotlin/fhirengine/engine/FHIRConverter.kt | 33 +-------- .../common/UniversalPipelineTestUtils.kt | 47 +++++++++---- .../azure/FHIRConverterIntegrationTests.kt | 67 +++++++++++++++++-- .../FHIRDestinationFilterIntegrationTests.kt | 2 +- 5 files changed, 116 insertions(+), 49 deletions(-) diff --git a/prime-router/src/main/kotlin/azure/ActionHistory.kt b/prime-router/src/main/kotlin/azure/ActionHistory.kt index 53c139c788b..3f620279a36 100644 --- a/prime-router/src/main/kotlin/azure/ActionHistory.kt +++ b/prime-router/src/main/kotlin/azure/ActionHistory.kt @@ -448,6 +448,22 @@ class ActionHistory( } } + /** + * Allows tracking of an empty report regardless of where it is generated in the pipeline + * @param report the details of the report + */ + fun trackEmptyReport(report: Report) { + generatingEmptyReport = true + val reportFile = ReportFile() + reportFile.reportId = report.id + reportFile.schemaTopic = report.schema.topic + reportFile.schemaName = "None" + reportFile.itemCount = 0 + reportFile.bodyFormat = report.bodyFormat.toString() + reportFile.nextAction = TaskAction.none + reportsOut[reportFile.reportId] = reportFile + } + /** * Use this to record history info about a newly generated empty [report] for sending to [receiver] that * has requested an empty batch. The [event] will be batch or send. diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt index a831daa3b7d..46565172885 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt @@ -33,7 +33,6 @@ import gov.cdc.prime.router.azure.LookupTableConditionMapper import gov.cdc.prime.router.azure.ProcessEvent import gov.cdc.prime.router.azure.db.Tables import gov.cdc.prime.router.azure.db.enums.TaskAction -import gov.cdc.prime.router.azure.db.tables.pojos.ItemLineage import gov.cdc.prime.router.azure.observability.bundleDigest.BundleDigestExtractor import gov.cdc.prime.router.azure.observability.bundleDigest.FhirPathBundleDigestLabResultExtractorStrategy import gov.cdc.prime.router.azure.observability.context.MDCUtils @@ -277,41 +276,15 @@ class FHIRConverter( }.collect(Collectors.toList()).filterNotNull() } } else { - val nextEvent = ProcessEvent( - Event.EventAction.NONE, - queueMessage.reportId, - Options.None, - emptyMap(), - emptyList() - ) - - // TODO: https://github.com/CDCgov/prime-reportstream/issues/15223 val report = Report( - MimeFormat.FHIR, + format, emptyList(), - 1, + 0, metadata = this.metadata, topic = queueMessage.topic, nextAction = TaskAction.none ) - - // create item lineage - report.itemLineages = listOf( - ItemLineage( - null, - queueMessage.reportId, - 1, - report.id, - 1, - null, - null, - null, - report.getItemHashForRow(1) - ) - ) - - // ensure tracking is set - actionHistory.trackCreatedReport(nextEvent, report) + actionHistory.trackEmptyReport(report) reportEventService.sendReportProcessingError( ReportStreamEventName.REPORT_NOT_PROCESSABLE, report, diff --git a/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt b/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt index 7a40dd6272b..f158a08993b 100644 --- a/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt +++ b/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt @@ -127,6 +127,24 @@ OBX|4|CWE|95421-4^Resides in a congregate care setting^LN^^^^2.69||N^No^HL70136| OBX|5|CWE|95419-8^Has symptoms related to condition of interest^LN^^^^2.69||N^No^HL70136||||||F|||202102090000-0600|||||||||||||||QST SPM|1|0cba76f5-35e0-4a28-803a-2f31308aae9b||258500001^Nasopharyngeal swab^SCT||||71836000^Nasopharyngeal structure (body structure)^SCT^^^^2020-09-01|||||||||202102090000-0600|202102090000-0600""" +// This report is trying to contain two items, but the HL7 is garbled, the first is missing an MSH segment and the second +// has a typo in its MSH segment +@Suppress("ktlint:standard:max-line-length") +const val garbledHL7Record = + """FT|Centers for Disease Control and Prevention|0.1-SNAPSHOT|PRIME ReportStream|0.1-SNAPSHOT||20210210 +PID|1||2a14112c-ece1-4f82-915c-7b3a8d152eda^^^Avante at Ormond Beach^PI||Buckridge^Kareem^Millie^^^^L||19580810|F||2106-3^White^HL70005^^^^2.5.1|688 Leighann Inlet^^South Rodneychester^TX^67071^^^^48077||7275555555:1:^PRN^^roscoe.wilkinson@email.com^1^211^2240784|||||||||U^Unknown^HL70189||||||||N +ORC|RE|73a6e9bd-aaec-418e-813a-0ad33366ca85^6^7^8&F^9|73a6e9bd-aaec-418e-813a-0ad33366ca85|||||||||1629082607^Eddin^Husam^^^^^^CMS&2.16.840.1.113883.3.249&ISO^^^^NPI||^WPN^^^1^386^6825220|20210209||||||Avante at Ormond Beach|170 North King Road^^Ormond Beach^FL^32174^^^^12127|^WPN^^jbrush@avantecenters.com^1^407^7397506|^^^^32174 +OBR|1|73a6e9bd-aaec-418e-813a-0ad33366ca85|0cba76f5-35e0-4a28-803a-2f31308aae9b|94558-4^SARS-CoV-2 (COVID-19) Ag [Presence] in Respiratory specimen by Rapid immunoassay^LN|||202102090000-0600|202102090000-0600||||||||1629082607^Eddin^Husam^^^^^^CMS&2.16.840.1.113883.3.249&ISO^^^^NPI|^WPN^^^1^386^6825220|||||202102090000-0600|||F +OBX|1|CWE|94558-4^SARS-CoV-2 (COVID-19) Ag [Presence] in Respiratory specimen by Rapid immunoassay^LN||260415000^Not detected^SCT|||N^Normal (applies to non-numeric results)^HL70078|||F|||202102090000-0600|||CareStart COVID-19 Antigen test_Access Bio, Inc._EUA^^99ELR||202102090000-0600||||Avante at Ormond Beach^^^^^CLIA&2.16.840.1.113883.4.7&ISO^^^^10D0876999^CLIA|170 North King Road^^Ormond Beach^FL^32174^^^^12127 +OBX|2|CWE|95418-0^Whether patient is employed in a healthcare setting^LN^^^^2.69||Y^Yes^HL70136||||||F|||202102090000-0600|||||||||||||||QST +OBX|3|CWE|95417-2^First test for condition of interest^LN^^^^2.69||Y^Yes^HL70136||||||F|||202102090000-0600|||||||||||||||QST +OBX|4|CWE|95421-4^Resides in a congregate care setting^LN^^^^2.69||N^No^HL70136||||||F|||202102090000-0600|||||||||||||||QST +OBX|5|CWE|95419-8^Has symptoms related to condition of interest^LN^^^^2.69||N^No^HL70136||||||F|||202102090000-0600|||||||||||||||QST +SPM|1|0cba76f5-35e0-4a28-803a-2f31308aae9b||258500001^Nasopharyngeal swab^SCT||||71836000^Nasopharyngeal structure (body structure)^SCT^^^^2020-09-01|||||||||202102090000-0600|202102090000-0600 +SH|^~\&#!|CDC PRIME - Atlanta, Georgia (Dekalb)^2.16.840.1.114222.4.1.237821^ISO|Avante at Ormond Beach^10D0876999^CLIA|PRIME_DOH|Prime ReportStream|20210210170737||ORU^R01^ORU_R01|371784|P|2.5.1|||NE|NE|USA||||PHLabReportNoAck^ELR_Receiver^2.16.840.1.113883.9.99^ISO +SFT|Centers for Disease Control and Prevention|0.1-SNAPSHOT|PRIME ReportStream|0.1-SNAPSHOT||20210210 +PID|1||2a14112c-ece1-4f82-915c-7b3a8d152eda^^^Avante at Ormond Beach^PI||Buckridge^Kareem^Millie^^^^L||19580810|F||2106-3^White^HL70005^^^^2.5.1|688 Leighann Inlet^^South Rodneychester^TX^67071^^^^48077||7275555555:1:^PRN^^roscoe.wilkinson@email.com^1^211^2240784|||||||||U^Unknown^HL70189||||||||N""" + @Suppress("ktlint:standard:max-line-length") const val validRadxMarsHL7Message = """MSH|^~\&|MMTC.PROD^2.16.840.1.113883.3.8589.4.2.106.1^ISO|CAREEVOLUTION^00Z0000024^CLIA|AIMS.INTEGRATION.PRD^2.16.840.1.114222.4.3.15.1^ISO|AIMS.PLATFORM^2.16.840.1.114222.4.1.217446^ISO|20240403205305+0000||ORU^R01^ORU_R01|20240403205305_dba7572cc6334f1ea0744c5f235c823e|P|2.5.1|||NE|NE|||||PHLabReport-NoAck^ELR251R1_Rcvr_Prof^2.16.840.1.113883.9.11^ISO @@ -257,12 +275,13 @@ object UniversalPipelineTestUtils { ) /** - * fetch child reports associated with a [parent] report and ensure we find an [expected] number of children + * fetch child reports associated with a [parent] report and ensure we find an [expectedItems] number of children */ fun fetchChildReports( parent: Report, txn: DataAccessTransaction, - expected: Int? = null, + expectedItems: Int? = null, + expectedReports: Int = 1, ): List { val itemLineages = DSL .using(txn) @@ -271,15 +290,15 @@ object UniversalPipelineTestUtils { .where(ItemLineage.ITEM_LINEAGE.PARENT_REPORT_ID.eq(parent.id)) .fetchInto(gov.cdc.prime.router.azure.db.tables.pojos.ItemLineage::class.java) - if (expected != null) { - assertThat(itemLineages).hasSize(expected) - assertThat(itemLineages.map { it.childIndex }).isEqualTo(MutableList(expected) { 1 }) + if (expectedItems != null) { + assertThat(itemLineages).hasSize(expectedItems) + assertThat(itemLineages.map { it.childIndex }).isEqualTo(MutableList(expectedItems) { 1 }) // itemCount is on the report created by the test. It will not be null. if (parent.itemCount > 1) { - assertThat(itemLineages.map { it.parentIndex }).isEqualTo((1..expected).toList()) + assertThat(itemLineages.map { it.parentIndex }).isEqualTo((1..expectedItems).toList()) } else { - assertThat(itemLineages.map { it.parentIndex }).isEqualTo(MutableList(expected) { 1 }) + assertThat(itemLineages.map { it.parentIndex }).isEqualTo(MutableList(expectedItems) { 1 }) } } @@ -290,9 +309,7 @@ object UniversalPipelineTestUtils { .where(ReportLineage.REPORT_LINEAGE.PARENT_REPORT_ID.eq(parent.id)) .fetchInto(gov.cdc.prime.router.azure.db.tables.pojos.ReportLineage::class.java) - if (expected != null) { - assertThat(reportLineages).hasSize(expected) - } + assertThat(reportLineages).hasSize(expectedReports) val childReportIds = reportLineages.map { it.childReportId @@ -307,11 +324,13 @@ object UniversalPipelineTestUtils { ) ) .fetchInto(ReportFile::class.java) - if (expected != null) { - assertThat(reportFiles).hasSize(expected) + + assertThat(reportFiles).hasSize(expectedReports) + + if (expectedItems != 0) { + assertThat(itemLineages).transform { lineages -> lineages.map { it.childReportId }.sorted() } + .isEqualTo(reportFiles.map { it.reportId }.sorted()) } - assertThat(itemLineages).transform { lineages -> lineages.map { it.childReportId }.sorted() } - .isEqualTo(reportFiles.map { it.reportId }.sorted()) return reportFiles } diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt index 69b6dbe7182..772dcec6796 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt @@ -49,6 +49,7 @@ import gov.cdc.prime.router.common.cleanHL7Record import gov.cdc.prime.router.common.cleanHL7RecordConverted import gov.cdc.prime.router.common.cleanHL7RecordConvertedAndTransformed import gov.cdc.prime.router.common.conditionCodedValidFHIRRecord1 +import gov.cdc.prime.router.common.garbledHL7Record import gov.cdc.prime.router.common.invalidEmptyFHIRRecord import gov.cdc.prime.router.common.invalidHL7Record import gov.cdc.prime.router.common.invalidHL7RecordConverted @@ -257,7 +258,7 @@ class FHIRConverterIntegrationTests { ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> val (routedReports, unroutedReports) = fetchChildReports( - receiveReport, txn, 4 + receiveReport, txn, 4, 4 ).partition { it.nextAction != TaskAction.none } assertThat(routedReports).hasSize(2) routedReports.forEach { @@ -441,7 +442,7 @@ class FHIRConverterIntegrationTests { ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> val (routedReports, unroutedReports) = fetchChildReports( - receiveReport, txn, 4 + receiveReport, txn, 4, 4 ).partition { it.nextAction != TaskAction.none } assertThat(routedReports).hasSize(2) routedReports.forEach { @@ -584,7 +585,7 @@ class FHIRConverterIntegrationTests { ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> val (routedReports, notRouted) = fetchChildReports( - receiveReport, txn, 2 + receiveReport, txn, 2, 2 ).partition { it.nextAction != TaskAction.none } with(routedReports.single()) { @@ -711,7 +712,7 @@ class FHIRConverterIntegrationTests { fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> - val routedReports = fetchChildReports(receiveReport, txn, 2) + val routedReports = fetchChildReports(receiveReport, txn, 2, 2) routedReports.forEach { assertThat(it.nextAction).isEqualTo(TaskAction.destination_filter) assertThat(it.receivingOrg).isEqualTo(null) @@ -804,4 +805,62 @@ class FHIRConverterIntegrationTests { assertThat(report.bodyFormat).isEqualTo("FHIR") } } + + @Test + fun `test should gracefully handle a case where number of items is unknown`() { + val receivedReportContents = garbledHL7Record + val receiveBlobUrl = BlobAccess.uploadBlob( + "receive/happy-path.hl7", + receivedReportContents.toByteArray(), + getBlobContainerMetadata() + ) + + val receiveReport = setupConvertStep(MimeFormat.HL7, hl7Sender, receiveBlobUrl, 1) + val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7Sender) + val fhirFunctions = createFHIRFunctionsInstance() + + fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) + + verify(exactly = 0) { + QueueAccess.sendMessage(any(), any()) + } + ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> + val report = fetchChildReports(receiveReport, txn, 0).single() + assertThat(report.nextAction).isEqualTo(TaskAction.none) + assertThat(report.receivingOrg).isEqualTo(null) + assertThat(report.receivingOrgSvc).isEqualTo(null) + assertThat(report.schemaName).isEqualTo("None") + assertThat(report.schemaTopic).isEqualTo(Topic.FULL_ELR) + assertThat(report.bodyFormat).isEqualTo("HL7") + } + } + + @Test + fun `test should gracefully handle a case with an empty contents`() { + val receivedReportContents = " " + val receiveBlobUrl = BlobAccess.uploadBlob( + "receive/happy-path.hl7", + receivedReportContents.toByteArray(), + getBlobContainerMetadata() + ) + + val receiveReport = setupConvertStep(MimeFormat.HL7, hl7Sender, receiveBlobUrl, 1) + val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7Sender) + val fhirFunctions = createFHIRFunctionsInstance() + + fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert)) + + verify(exactly = 0) { + QueueAccess.sendMessage(any(), any()) + } + ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> + val report = fetchChildReports(receiveReport, txn, 0, 1).single() + assertThat(report.nextAction).isEqualTo(TaskAction.none) + assertThat(report.receivingOrg).isEqualTo(null) + assertThat(report.receivingOrgSvc).isEqualTo(null) + assertThat(report.schemaName).isEqualTo("None") + assertThat(report.schemaTopic).isEqualTo(Topic.FULL_ELR) + assertThat(report.bodyFormat).isEqualTo("HL7") + } + } } \ No newline at end of file diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt index 76d52f0b353..f86cc54a2bf 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt @@ -194,7 +194,7 @@ class FHIRDestinationFilterIntegrationTests : Logging { // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> - val routedReports = fetchChildReports(report, txn, 2) + val routedReports = fetchChildReports(report, txn, 2, 2) with(routedReports.first()) { assertThat(this.nextAction).isEqualTo(TaskAction.receiver_filter) assertThat(this.receivingOrg).isEqualTo("phd")