Skip to content

Commit

Permalink
add reset and rename SubmissionEntity
Browse files Browse the repository at this point in the history
  • Loading branch information
brick-green committed Sep 4, 2024
1 parent 3dd4caf commit 67894da
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 115 deletions.
92 changes: 58 additions & 34 deletions prime-router/src/main/kotlin/azure/SubmissionTableService.kt
Original file line number Diff line number Diff line change
@@ -1,75 +1,99 @@
package gov.cdc.prime.router.azure

import gov.cdc.prime.reportstream.shared.SubmissionEntity
import gov.cdc.prime.reportstream.shared.Submission
import org.apache.logging.log4j.kotlin.Logging

/**
* Service class responsible for handling operations related to the "submission" table in Azure Table Storage.
*
* This service uses the TableAccess singleton to interact with Azure Table Storage.
* This service uses the TableAccess singleton to interact with Azure Table Storage for inserting and retrieving
* submissions in the "submission" table.
*
* It allows inserting `Submission` objects as table entities and retrieving them based on the submission ID and status.
*/
class SubmissionTableService private constructor() : Logging {

companion object {
/**
* The singleton instance of SubmissionService, initialized lazily.
* The singleton instance of SubmissionTableService, initialized lazily.
*
* This ensures that the instance is created only when it is first accessed,
* providing thread-safe, lazy initialization.
*/
val instance: SubmissionTableService by lazy { SubmissionTableService() }
}

private val tableName = "submission"

/**
* Lazily initialized TableClient to interact with the "submission" table.
* The client is created once and reused for all operations on the "submission" table.
* The `TableAccess` object used to interact with Azure Table Storage.
*
* This is marked as `@Volatile` to ensure thread visibility. It is initialized lazily
* and resettable to allow refreshing the connection if necessary.
*/
@Volatile
private var tableAccess: TableAccess = TableAccess()

/**
* Resets the `TableAccess` instance used to interact with the "submission" table.
*
* This method ensures thread safety by synchronizing access to `tableAccess`, preventing multiple
* threads from resetting the instance at the same time.
*/
private val tableAccess: TableAccess by lazy { TableAccess.instance }
fun reset() {
synchronized(this) {
tableAccess = TableAccess() // Re-initialize the TableAccess instance
}
}

/**
* Inserts a SubmissionEntity into the "submission" table.
* Inserts a SubmissionEntity into the "submission" table in Azure Table Storage.
*
* Converts the provided SubmissionEntity into a TableEntity and attempts to insert it into the table.
* Logs success or failure of the operation.
* Converts the provided `Submission` object into a `TableEntity` and inserts it into the table.
* Logs the outcome of the operation (success or failure).
*
* @param submission The SubmissionEntity to be inserted into the table.
* @param submission The `Submission` object to be inserted into the table.
*/
fun insertSubmission(submission: SubmissionEntity) {
fun insertSubmission(submission: Submission) {
try {
// Convert Submission to TableEntity and insert into the table
val entity = submission.toTableEntity()
tableAccess.insertEntity(tableName, entity)
logger
.info(
logger.info(
"Submission entity insert succeeded: ${submission.submissionId} with status ${submission.status}"
)
)
} catch (e: Exception) {
logger
.error(
// Log the error if insertion fails
logger.error(
"Submission entity insert failed: ${submission.submissionId} with status ${submission.status}",
e
)
)
}
}

/**
* Reads a submission entity from the "submission" table.
* Retrieves a SubmissionEntity from the "submission" table based on submission ID and status.
*
* Fetches the corresponding table entity from Azure Table Storage using the provided partition key
* (submission ID) and row key (status). If the entity is found, it is converted back into a `Submission` object.
*
* @param partitionKey The partition key of the entity.
* @param rowKey The row key of the entity.
* @return The SubmissionEntity if found, otherwise null.
* If the entity is not found, or if an error occurs during retrieval, the method returns `null` and logs the error.
*
* @param submissionID The partition key representing the submission ID.
* @param status The row key representing the status of the submission.
* @return The `Submission` object if found, otherwise `null`.
*/
fun getSubmissionEntity(partitionKey: String, rowKey: String): SubmissionEntity? = try {
val tableEntity = tableAccess.getEntity(tableName, partitionKey, rowKey)
if (tableEntity != null) {
SubmissionEntity.fromTableEntity(tableEntity)
} else {
null
}
} catch (e: Exception) {
logger
.error(
"Failed to read submission entity: $partitionKey with status $rowKey",
e
)
fun getSubmission(submissionID: String, status: String): Submission? = try {
// Retrieve the TableEntity and convert it back to a Submission object if found
val tableEntity = tableAccess.getEntity(tableName, submissionID, status)
if (tableEntity != null) {
Submission.fromTableEntity(tableEntity)
} else {
null
}
} catch (e: Exception) {
// Log the error if retrieval fails and return null
logger.error("Failed to read submission entity: $submissionID with status $status", e)
null
}
}
76 changes: 12 additions & 64 deletions prime-router/src/main/kotlin/azure/TableAccess.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ import org.apache.logging.log4j.kotlin.Logging
class TableAccess : Logging {

companion object {
/**
* Singleton instance of TableAccess, initialized lazily.
* Ensures that the instance is created only when it is first accessed.
*/
val instance: TableAccess by lazy { TableAccess() }

/**
* The environment variable that stores the connection string.
Expand All @@ -36,37 +31,9 @@ class TableAccess : Logging {
fun getConnectionString(): String = System.getenv(defaultEnvVar)
}

/**
* The TableServiceClient used to interact with Azure Table Storage.
* It is volatile to ensure visibility across threads and lazily initialized when accessed.
*/
@Volatile
private var tableServiceClient: TableServiceClient? = null

/**
* Lazily retrieves the TableServiceClient, initializing it if necessary.
*
* The method is synchronized to ensure thread safety and avoid race conditions during initialization.
*
* @return The initialized TableServiceClient instance.
*/
private fun getTableServiceClient(): TableServiceClient {
return synchronized(this) {
tableServiceClient ?: initializeClient(getConnectionString()).also { tableServiceClient = it }
}
}

/**
* Initializes the TableServiceClient using the provided connection string.
*
* @param connectionString The connection string used to create the TableServiceClient.
* @return The initialized TableServiceClient instance.
*/
private fun initializeClient(connectionString: String): TableServiceClient {
return TableServiceClientBuilder()
.connectionString(connectionString)
.buildClient()
}
private val tableServiceClient: TableServiceClient = TableServiceClientBuilder()
.connectionString(getConnectionString())
.buildClient()

/**
* Inserts a TableEntity into the specified table.
Expand Down Expand Up @@ -118,17 +85,11 @@ class TableAccess : Logging {
* @return A TableClient for interacting with the specified table, or null if the table does not exist or an error occurs.
*/
private fun getTableClient(tableName: String): TableClient? {
try {
val tableExists = getTableServiceClient().listTables().any { it.name == tableName }
return if (tableExists) {
getTableServiceClient().getTableClient(tableName)
} else {
null
}
} catch (e: Exception) {
logger.error("Error getting table service client", e)
handleClientFailure()
return retryGetTableClient(tableName)
val tableExists = tableServiceClient.listTables().any { it.name == tableName }
return if (tableExists) {
tableServiceClient.getTableClient(tableName)
} else {
null
}
}

Expand All @@ -139,9 +100,9 @@ class TableAccess : Logging {
* @return A TableClient for interacting with the specified table, or null if the table does not exist.
*/
private fun retryGetTableClient(tableName: String): TableClient? {
val tableExists = getTableServiceClient().listTables().any { it.name == tableName }
val tableExists = tableServiceClient.listTables().any { it.name == tableName }
return if (tableExists) {
getTableServiceClient().getTableClient(tableName)
tableServiceClient.getTableClient(tableName)
} else {
null
}
Expand All @@ -158,21 +119,8 @@ class TableAccess : Logging {
private fun getOrCreateTableClient(tableName: String): TableClient {
val tableClient = getTableClient(tableName)
return tableClient ?: run {
getTableServiceClient().createTable(tableName)
getTableServiceClient().getTableClient(tableName)
}
}

/**
* Handles client failures by reinitializing the TableServiceClient.
*
* This method is invoked when an error occurs during table operations. It ensures that the TableServiceClient
* is reinitialized to recover from connection or channel-related failures.
*/
private fun handleClientFailure() {
logger.warn("Detected client failure, reinitializing TableServiceClient...")
synchronized(this) {
tableServiceClient = initializeClient(getConnectionString()) // Reinitialize the client
tableServiceClient.createTable(tableName)
tableServiceClient.getTableClient(tableName)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package gov.cdc.prime.router.fhirengine.engine
import ca.uhn.hl7v2.model.Message
import com.microsoft.azure.functions.HttpStatus
import gov.cdc.prime.reportstream.shared.QueueMessage
import gov.cdc.prime.reportstream.shared.SubmissionEntity
import gov.cdc.prime.reportstream.shared.Submission
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.ClientSource
import gov.cdc.prime.router.CustomerStatus
Expand Down Expand Up @@ -163,7 +163,7 @@ class FHIRReceiver(

// Insert the rejection into the submissions table
val tableEntity =
SubmissionEntity(
Submission(
queueMessage.reportId.toString(), "Rejected",
queueMessage.blobURL,
"Sender not found matching client_id: ${queueMessage.headers[clientIdHeader]}"
Expand Down Expand Up @@ -239,7 +239,7 @@ class FHIRReceiver(
}

// Insert the acceptance into the submissions table
val tableEntity = SubmissionEntity(
val tableEntity = Submission(
queueMessage.reportId.toString(),
"Accepted",
queueMessage.blobURL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class FHIRReceiverIntegrationTests {
settings,
ReportStreamTestDatabaseContainer.testDatabaseAccess,
azureEventService = azureEventService,
// submissionTableService = submissionTableService
submissionTableService = submissionTableService
)
}

Expand Down Expand Up @@ -128,6 +128,7 @@ class FHIRReceiverIntegrationTests {
every { TableAccess.getConnectionString() } returns getConnString()

submissionTableService = SubmissionTableService.instance
submissionTableService.reset()
}

@AfterEach
Expand Down Expand Up @@ -205,7 +206,7 @@ class FHIRReceiverIntegrationTests {
QueueAccess.sendMessage(any(), any())
}

val tableRow = submissionTableService.getSubmissionEntity(reportId.toString(), "Accepted")
val tableRow = submissionTableService.getSubmission(reportId.toString(), "Accepted")

assertNotNull(tableRow)
assertThat(tableRow.detail).isEqualTo(
Expand Down Expand Up @@ -315,7 +316,7 @@ class FHIRReceiverIntegrationTests {
QueueAccess.sendMessage(any(), any())
}

val tableRow = submissionTableService.getSubmissionEntity(
val tableRow = submissionTableService.getSubmission(
reportId.toString(),
"Rejected"
)
Expand Down Expand Up @@ -407,7 +408,7 @@ class FHIRReceiverIntegrationTests {
QueueAccess.sendMessage(any(), any())
}

val tableRow = submissionTableService.getSubmissionEntity(
val tableRow = submissionTableService.getSubmission(
reportId.toString(),
"Accepted"
)
Expand Down Expand Up @@ -500,7 +501,7 @@ class FHIRReceiverIntegrationTests {
QueueAccess.sendMessage(any(), any())
}

val tableRow = submissionTableService.getSubmissionEntity(
val tableRow = submissionTableService.getSubmission(
reportId.toString(),
"Accepted"
)
Expand Down Expand Up @@ -595,7 +596,7 @@ class FHIRReceiverIntegrationTests {
QueueAccess.sendMessage(any(), any())
}

val tableRow = submissionTableService.getSubmissionEntity(
val tableRow = submissionTableService.getSubmission(
reportId.toString(),
"Accepted"
)
Expand Down Expand Up @@ -689,7 +690,7 @@ class FHIRReceiverIntegrationTests {
QueueAccess.sendMessage(any(), any())
}

val tableRow = submissionTableService.getSubmissionEntity(
val tableRow = submissionTableService.getSubmission(
reportId.toString(),
"Accepted"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import assertk.assertThat
import assertk.assertions.hasSize
import assertk.assertions.isEqualTo
import com.microsoft.azure.functions.HttpStatus
import gov.cdc.prime.reportstream.shared.SubmissionEntity
import gov.cdc.prime.reportstream.shared.Submission
import gov.cdc.prime.router.ActionLog
import gov.cdc.prime.router.ActionLogDetail
import gov.cdc.prime.router.ActionLogger
Expand Down Expand Up @@ -168,7 +168,7 @@ class FHIRReceiverTest {
val reportId = queueMessage.reportId.toString()
val blobURL = queueMessage.blobURL
verify(exactly = 1) {
SubmissionEntity(
Submission(
reportId,
"Rejected",
blobURL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.azure.data.tables.models.TableEntity
* @property bodyURL The URL pointing to the body of the submission.
* @property detail Optional additional details about the submission.
*/
data class SubmissionEntity(
data class Submission(
val submissionId: String,
val status: String,
val bodyURL: String,
Expand All @@ -23,8 +23,8 @@ data class SubmissionEntity(
* @param tableEntity The TableEntity to convert.
* @return The corresponding SubmissionEntity.
*/
fun fromTableEntity(tableEntity: TableEntity): SubmissionEntity {
return SubmissionEntity(
fun fromTableEntity(tableEntity: TableEntity): Submission {
return Submission(
submissionId = tableEntity.partitionKey,
status = tableEntity.rowKey,
bodyURL = tableEntity.getProperty("body_url") as String,
Expand Down
Loading

0 comments on commit 67894da

Please sign in to comment.