Skip to content

Commit

Permalink
fix(Gate): not updating output data from golden record updates
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoprow committed May 14, 2024
1 parent 18dcfd6 commit e055422
Show file tree
Hide file tree
Showing 17 changed files with 505 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.eclipse.tractusx.bpdm.gate.config

import jakarta.annotation.PostConstruct
import org.eclipse.tractusx.bpdm.gate.service.GoldenRecordTaskService
import org.eclipse.tractusx.bpdm.gate.service.GoldenRecordUpdateService
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.TaskScheduler
import org.springframework.scheduling.support.CronTrigger
Expand All @@ -29,23 +30,24 @@ import org.springframework.scheduling.support.CronTrigger
class GoldenRecordTaskConfiguration(
private val configProperties: GoldenRecordTaskConfigProperties,
private val taskScheduler: TaskScheduler,
private val service: GoldenRecordTaskService
private val taskService: GoldenRecordTaskService,
private val updateService: GoldenRecordUpdateService
) {

@PostConstruct
fun scheduleGoldenRecordTasks() {
taskScheduler.scheduleIfEnabled(
{ service.createTasksForReadyBusinessPartners() },
{ taskService.createTasksForReadyBusinessPartners() },
configProperties.creation.fromSharingMember.cron
)

taskScheduler.scheduleIfEnabled(
{ service.createTasksForGoldenRecordUpdates() },
{ updateService.updateOutputOnGoldenRecordChange() },
configProperties.creation.fromPool.cron
)

taskScheduler.scheduleIfEnabled(
{ service.resolvePendingTasks() },
{ taskService.resolvePendingTasks() },
configProperties.check.cron
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ import java.time.LocalDateTime
@Table(name = "confidence_criteria")
class ConfidenceCriteriaDb(
@Column(name = "shared_by_owner", nullable = false)
val sharedByOwner: Boolean,
var sharedByOwner: Boolean,
@Column(name = "checked_by_external_data_source", nullable = false)
val checkedByExternalDataSource: Boolean,
var checkedByExternalDataSource: Boolean,
@Column(name = "number_of_business_partners", nullable = false)
val numberOfBusinessPartners: Int,
var numberOfBusinessPartners: Int,
@Column(name = "last_confidence_check_at", nullable = false)
val lastConfidenceCheckAt: LocalDateTime,
var lastConfidenceCheckAt: LocalDateTime,
@Column(name = "next_confidence_check_at", nullable = false)
val nextConfidenceCheckAt: LocalDateTime,
var nextConfidenceCheckAt: LocalDateTime,
@Column(name = "confidence_level", nullable = false)
val confidenceLevel: Int,
var confidenceLevel: Int,
) : BaseEntity()
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ package org.eclipse.tractusx.bpdm.gate.entity.generic

import jakarta.persistence.Column
import jakarta.persistence.Embeddable
import jakarta.persistence.EnumType
import jakarta.persistence.Enumerated
import org.eclipse.tractusx.bpdm.common.dto.BusinessPartnerType


@Embeddable
Expand All @@ -33,14 +36,19 @@ data class IdentifierDb(
var value: String,

@Column(name = "issuing_body")
var issuingBody: String?
var issuingBody: String?,

@Enumerated(EnumType.STRING)
@Column(name = "business_partner_type", nullable = false)
var businessPartnerType: BusinessPartnerType

) : Comparable<IdentifierDb> {

// Natural order by "type", "value", "issuingBody"
override fun compareTo(other: IdentifierDb) = compareBy(
IdentifierDb::type,
IdentifierDb::value,
IdentifierDb::issuingBody
IdentifierDb::issuingBody,
IdentifierDb::businessPartnerType
).compare(this, other)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.springframework.data.jpa.repository.Query
import org.springframework.data.repository.CrudRepository
import org.springframework.data.repository.query.Param
import org.springframework.stereotype.Repository
import java.time.Instant


@Repository
Expand All @@ -50,6 +51,10 @@ interface BusinessPartnerRepository : JpaRepository<BusinessPartnerDb, Long>, Cr
@Query("SELECT b.stage as stage, COUNT(b.stage) as count FROM BusinessPartnerDb AS b GROUP BY b.stage")
fun countPerStages(): List<PartnersPerStageCount>

fun findByStageAndBpnLIn(stage: StageType, bpnL: Collection<String>): Set<BusinessPartnerDb>
fun findByStageAndBpnSIn(stage: StageType, bpnS: Collection<String>): Set<BusinessPartnerDb>
fun findByStageAndBpnAIn(stage: StageType, bpnA: Collection<String>): Set<BusinessPartnerDb>

interface PartnersPerStageCount {
val stage: StageType
val count: Int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class BusinessPartnerMappings {
externalId = dto.externalId,
nameParts = dto.nameParts.toMutableList(),
roles = dto.roles.toSortedSet(),
identifiers = dto.identifiers.mapNotNull(::toIdentifier).toSortedSet(),
identifiers = dto.identifiers.mapNotNull{toIdentifier(it, BusinessPartnerType.GENERIC)}.toSortedSet(),
states = dto.states.asSequence().mapNotNull{toState(it, BusinessPartnerType.GENERIC)}
.plus(dto.legalEntity.states.mapNotNull { toState(it, BusinessPartnerType.LEGAL_ENTITY) })
.plus(dto.site.states.mapNotNull { toState(it, BusinessPartnerType.SITE) })
Expand Down Expand Up @@ -284,10 +284,10 @@ class BusinessPartnerMappings {
private fun toIdentifierDto(entity: IdentifierDb) =
BusinessPartnerIdentifierDto(type = entity.type, value = entity.value, issuingBody = entity.issuingBody)

private fun toIdentifier(dto: BusinessPartnerIdentifierDto) =
private fun toIdentifier(dto: BusinessPartnerIdentifierDto, businessPartnerType: BusinessPartnerType) =
dto.type?.let { type ->
dto.value?.let { value ->
IdentifierDb(type = type, value = value, issuingBody = dto.issuingBody)
IdentifierDb(type = type, value = value, issuingBody = dto.issuingBody, businessPartnerType)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import org.eclipse.tractusx.bpdm.gate.exception.BpdmMissingStageException
import org.eclipse.tractusx.bpdm.gate.repository.ChangelogRepository
import org.eclipse.tractusx.bpdm.gate.repository.SharingStateRepository
import org.eclipse.tractusx.bpdm.gate.repository.generic.BusinessPartnerRepository
import org.eclipse.tractusx.bpdm.gate.util.BusinessPartnerComparisonUtil
import org.eclipse.tractusx.bpdm.gate.util.BusinessPartnerCopyUtil
import org.eclipse.tractusx.orchestrator.api.client.OrchestrationApiClient
import org.eclipse.tractusx.orchestrator.api.model.TaskCreateRequest
import org.eclipse.tractusx.orchestrator.api.model.TaskCreateResponse
Expand All @@ -53,6 +55,8 @@ class BusinessPartnerService(
private val sharingStateService: SharingStateService,
private val changelogRepository: ChangelogRepository,
private val sharingStateRepository: SharingStateRepository,
private val copyUtil: BusinessPartnerCopyUtil,
private val compareUtil: BusinessPartnerComparisonUtil
) {
private val logger = KotlinLogging.logger { }

Expand Down Expand Up @@ -182,38 +186,10 @@ class BusinessPartnerService(
val matchingBusinessPartner = persistedBusinessPartnerMap[entity.externalId]
val hasErrorSharingState = sharingStatesMap[entity.externalId]?.sharingStateType == SharingStateType.Error

matchingBusinessPartner?.let { hasChanges(entity, it) } ?: true || hasErrorSharingState //If there are difference return true, else returns false
matchingBusinessPartner?.let { compareUtil.hasChanges(entity, it) } ?: true || hasErrorSharingState //If there are difference return true, else returns false
}
}

private fun hasChanges(entity: BusinessPartnerDb, persistedBP: BusinessPartnerDb): Boolean {

return entity.nameParts != persistedBP.nameParts ||
entity.roles != persistedBP.roles ||
entity.shortName != persistedBP.shortName ||
entity.legalName != persistedBP.legalName ||
entity.siteName != persistedBP.siteName ||
entity.addressName != persistedBP.addressName ||
entity.legalForm != persistedBP.legalForm ||
entity.isOwnCompanyData != persistedBP.isOwnCompanyData ||
entity.bpnL != persistedBP.bpnL ||
entity.bpnS != persistedBP.bpnS ||
entity.bpnA != persistedBP.bpnA ||
entity.stage != persistedBP.stage ||
entity.parentId != persistedBP.parentId ||
entity.parentType != persistedBP.parentType ||
entity.identifiers != persistedBP.identifiers ||
entity.states != persistedBP.states ||
entity.classifications != persistedBP.classifications ||
postalAddressHasChanges(entity.postalAddress, persistedBP.postalAddress)
}

private fun postalAddressHasChanges(entityPostalAddress: PostalAddressDb, persistedPostalAddress: PostalAddressDb): Boolean {
return (entityPostalAddress.addressType != persistedPostalAddress.addressType) ||
(entityPostalAddress.alternativePostalAddress != persistedPostalAddress.alternativePostalAddress) ||
(entityPostalAddress.physicalPostalAddress != persistedPostalAddress.physicalPostalAddress)
}

/**
* Resolve all [entityCandidates] by looking for existing business partner data in the given [stage]
*
Expand All @@ -227,70 +203,12 @@ class BusinessPartnerService(
return entityCandidates.map { candidate ->
val existingEntity = existingPartnersByExternalId[candidate.externalId]
if (existingEntity != null)
ResolutionResult(copyValues(candidate, existingEntity), true)
ResolutionResult(copyUtil.copyValues(candidate, existingEntity), true)
else
ResolutionResult(candidate, false)
}
}

private fun copyValues(fromPartner: BusinessPartnerDb, toPartner: BusinessPartnerDb): BusinessPartnerDb {
return toPartner.apply {
stage = fromPartner.stage
shortName = fromPartner.shortName
legalName = fromPartner.legalName
siteName = fromPartner.siteName
addressName = fromPartner.addressName
legalForm = fromPartner.legalForm
isOwnCompanyData = fromPartner.isOwnCompanyData
bpnL = fromPartner.bpnL
bpnS = fromPartner.bpnS
bpnA = fromPartner.bpnA
parentId = fromPartner.parentId
parentType = fromPartner.parentType
legalEntityConfidence = fromPartner.legalEntityConfidence
siteConfidence = fromPartner.siteConfidence
addressConfidence = fromPartner.addressConfidence

nameParts.replace(fromPartner.nameParts)
roles.replace(fromPartner.roles)

states.copyAndSync(fromPartner.states, ::copyValues)
classifications.copyAndSync(fromPartner.classifications, ::copyValues)
identifiers.copyAndSync(fromPartner.identifiers, ::copyValues)

copyValues(fromPartner.postalAddress, postalAddress)
}
}

private fun copyValues(fromState: StateDb, toState: StateDb) =
toState.apply {
validFrom = fromState.validFrom
validTo = fromState.validTo
type = fromState.type
}

private fun copyValues(fromClassification: ClassificationDb, toClassification: ClassificationDb) =
toClassification.apply {
value = fromClassification.value
type = fromClassification.type
code = fromClassification.code
}

private fun copyValues(fromIdentifier: IdentifierDb, toIdentifier: IdentifierDb) =
toIdentifier.apply {
type = fromIdentifier.type
value = fromIdentifier.value
issuingBody = fromIdentifier.issuingBody
}

private fun copyValues(fromPostalAddress: PostalAddressDb, toPostalAddress: PostalAddressDb) =
toPostalAddress.apply {
addressType = fromPostalAddress.addressType
physicalPostalAddress = fromPostalAddress.physicalPostalAddress
alternativePostalAddress = fromPostalAddress.alternativePostalAddress
}


}

data class ResolutionResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ class GoldenRecordTaskService(
private val businessPartnerService: BusinessPartnerService,
private val orchestratorMappings: OrchestratorMappings,
private val orchestrationApiClient: OrchestrationApiClient,
private val properties: GoldenRecordTaskConfigProperties,
private val syncRecordService: SyncRecordService,
private val poolClient: PoolApiClient
private val properties: GoldenRecordTaskConfigProperties
) {
private val logger = KotlinLogging.logger { }

Expand Down Expand Up @@ -102,50 +100,6 @@ class GoldenRecordTaskService(
}
}

@Transactional
fun createTasksForGoldenRecordUpdates() {
logger.info { "Started scheduled task to create golden record tasks from Pool updates" }

val syncRecord = syncRecordService.getOrCreateRecord(SyncTypeDb.POOL_TO_GATE_OUTPUT)

val pageRequest = PaginationRequest(0, properties.creation.fromPool.batchSize)
val changelogSearchRequest = ChangelogSearchRequest(syncRecord.finishedAt)
val poolChangelogEntries = poolClient.changelogs.getChangelogEntries(changelogSearchRequest, pageRequest)

val poolUpdatedEntries = poolChangelogEntries.content.filter { it.changelogType == ChangelogType.UPDATE }

val bpnA =
poolUpdatedEntries.filter { it.businessPartnerType == BusinessPartnerType.ADDRESS }.map { it.bpn }
val bpnL = poolUpdatedEntries.filter { it.businessPartnerType == BusinessPartnerType.LEGAL_ENTITY }
.map { it.bpn }
val bpnS =
poolUpdatedEntries.filter { it.businessPartnerType == BusinessPartnerType.SITE }.map { it.bpn }

val gateOutputEntries = businessPartnerRepository.findByStageAndBpnLInOrBpnSInOrBpnAIn(StageType.Output, bpnL, bpnS, bpnA)

val businessPartnerGenericDtoList = gateOutputEntries.map { bp ->
orchestratorMappings.toOrchestratorDto(bp)
}

val tasks = createGoldenRecordTasks(TaskMode.UpdateFromPool, businessPartnerGenericDtoList)

val pendingRequests = gateOutputEntries.zip(tasks)
.map { (partner, task) ->
SharingStateService.PendingRequest(
partner.externalId,
task.taskId
)
}
sharingStateService.setPending(pendingRequests, null)

if (poolUpdatedEntries.isNotEmpty()) {
syncRecordService.setSynchronizationStart(SyncTypeDb.POOL_TO_GATE_OUTPUT)
syncRecordService.setSynchronizationSuccess(SyncTypeDb.POOL_TO_GATE_OUTPUT, poolUpdatedEntries.last().timestamp)
}

logger.info { "Created ${tasks.size} new golden record tasks from pool updates" }
}

private fun createGoldenRecordTasks(mode: TaskMode, orchestratorBusinessPartnersDto: List<BusinessPartner>): List<TaskClientStateDto> {
if (orchestratorBusinessPartnersDto.isEmpty())
return emptyList()
Expand Down
Loading

0 comments on commit e055422

Please sign in to comment.