Skip to content

Commit

Permalink
Ny consummer for import av virksomheter (fra Brreg)
Browse files Browse the repository at this point in the history
Co-authored-by: Christian Klem <christian.klem@nav.no>
  • Loading branch information
thomasdufourd and klechr committed Aug 23, 2023
1 parent 50d5135 commit b1b5d93
Show file tree
Hide file tree
Showing 9 changed files with 404 additions and 168 deletions.
2 changes: 2 additions & 0 deletions .nais/nais.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ spec:
value: pia.ia-sak-leveranse-v1
- name: BRREG_OPPDATERING_TOPIC
value: pia.brreg-oppdatering
- name: BRREG_ALLE_VIRKSOMHETER_TOPIC
value: pia.brreg-alle-virksomheter
- name: STATISTIKK_METADATA_VIRKSOMHET_TOPIC
value: arbeidsgiver.sykefravarsstatistikk-metadata-virksomhet-v1
- name: STATISTIKK_LAND_TOPIC
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ services:
IA_SAK_STATUS_TOPIC: pia.ia-sak-status-v1
IA_SAK_LEVERANSE_TOPIC: pia.ia-sak-leveranse-v1
BRREG_OPPDATERING_TOPIC: pia.brreg-oppdatering
BRREG_ALLE_VIRKSOMHETER_TOPIC: pia.brreg-alle-virksomheter
# Backend spesifikk env
BRREG_UNDERENHET_URL: /brregmock/enhetsregisteret/api/underenheter/lastned
CONSUMER_LOOP_DELAY: 1
Expand Down
12 changes: 12 additions & 0 deletions src/main/kotlin/no/nav/lydia/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import no.nav.lydia.sykefraversstatistikk.import.BrregOppdateringConsumer
import no.nav.lydia.sykefraversstatistikk.import.StatistikkPerKategoriConsumer
import no.nav.lydia.integrasjoner.azure.AzureService
import no.nav.lydia.integrasjoner.azure.navEnhet
import no.nav.lydia.sykefraversstatistikk.import.BrregAlleVirksomheterConsumer
import no.nav.lydia.sykefraversstatistikk.import.StatistikkMetadataVirksomhetConsumer
import no.nav.lydia.virksomhet.VirksomhetRepository
import no.nav.lydia.virksomhet.VirksomhetService
Expand Down Expand Up @@ -80,6 +81,7 @@ fun startLydiaBackend() {
)

brregConsumer(naisEnv = naisEnv, dataSource = dataSource)
brregAlleVirksomheterConsumer(naisEnv = naisEnv, dataSource = dataSource)

StatistikkPerKategoriConsumer.apply {
create(kafka = naisEnv.kafka, sykefraværsstatistikkService = sykefraværsstatistikkService)
Expand Down Expand Up @@ -114,6 +116,16 @@ private fun brregConsumer(naisEnv: NaisEnvironment, dataSource: DataSource) {
}
}

private fun brregAlleVirksomheterConsumer(naisEnv: NaisEnvironment, dataSource: DataSource) {
BrregAlleVirksomheterConsumer.apply {
create(
kafka = naisEnv.kafka,
repository = VirksomhetRepository(dataSource)
)
run()
}
}

fun Application.lydiaRestApi(
naisEnvironment: NaisEnvironment,
dataSource: DataSource,
Expand Down
3 changes: 2 additions & 1 deletion src/main/kotlin/no/nav/lydia/NaisEnvironment.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class NaisEnvironment(
}

fun hentMiljø(cluster: String) =
Environment.values().find { it.name.lowercase() == cluster } ?: throw IllegalStateException("Ukjent miljø ${cluster}")
Environment.entries.find { it.name.lowercase() == cluster } ?: throw IllegalStateException("Ukjent miljø ${cluster}")

const val APP_NAVN = "lydia-api"
}
Expand Down Expand Up @@ -79,6 +79,7 @@ class Kafka(
val iaSakStatusTopic: String = getEnvVar("IA_SAK_STATUS_TOPIC"),
val iaSakLeveranseTopic: String = getEnvVar("IA_SAK_LEVERANSE_TOPIC"),
val brregOppdateringTopic: String = getEnvVar("BRREG_OPPDATERING_TOPIC"),
val brregAlleVirksomheterTopic: String = getEnvVar("BRREG_ALLE_VIRKSOMHETER_TOPIC"),
val statistikkMetadataVirksomhetTopic: String = getEnvVar("STATISTIKK_METADATA_VIRKSOMHET_TOPIC"),
val statistikkLandTopic: String = getEnvVar("STATISTIKK_LAND_TOPIC"),
val statistikkSektorTopic: String = getEnvVar("STATISTIKK_SEKTOR_TOPIC"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package no.nav.lydia.sykefraversstatistikk.import

import kotlinx.coroutines.*
import kotlinx.serialization.json.Json
import no.nav.lydia.Kafka
import no.nav.lydia.exceptions.UgyldigAdresseException
import no.nav.lydia.integrasjoner.brreg.BrregVirksomhetDto
import no.nav.lydia.integrasjoner.brreg.tilVirksomhet
import no.nav.lydia.virksomhet.VirksomhetRepository
import no.nav.lydia.virksomhet.domene.VirksomhetStatus
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.errors.RetriableException
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration
import kotlin.coroutines.CoroutineContext

object BrregAlleVirksomheterConsumer : CoroutineScope {
private val logger: Logger = LoggerFactory.getLogger(this::class.java)
lateinit var job: Job
lateinit var kafka: Kafka

lateinit var virksomhetRepository: VirksomhetRepository
private lateinit var kafkaConsumer: KafkaConsumer<String, String>

override val coroutineContext: CoroutineContext
get() = Dispatchers.IO + job

init {
Runtime.getRuntime().addShutdownHook(Thread(BrregAlleVirksomheterConsumer::cancel))
}

fun create(kafka: Kafka, repository: VirksomhetRepository) {
val topicNavn = kafka.brregAlleVirksomheterTopic
val consumerGroupNavn = Kafka.brregConsumerGroupId

logger.info("Creating kafka consumer job for $topicNavn i group $consumerGroupNavn")
this.job = Job()
this.kafka = kafka
this.virksomhetRepository = repository
kafkaConsumer = KafkaConsumer(
BrregAlleVirksomheterConsumer.kafka.consumerProperties(consumerGroupId = Kafka.brregConsumerGroupId),
StringDeserializer(),
StringDeserializer()
)
logger.info("Created kafka consumer job for $topicNavn i group $consumerGroupNavn")
}

fun run() {
val topicNavn = kafka.brregAlleVirksomheterTopic
launch {
kafkaConsumer.use { consumer ->
try {
consumer.subscribe(listOf(topicNavn))
logger.info("Kafka consumer subscribed to $topicNavn")
while (job.isActive) {
try {
val records = consumer.poll(Duration.ofSeconds(1))
val antallMeldinger = records.count()
if (antallMeldinger < 1) continue
var antallIrrelevanteBedrifter = 0
logger.info("Fant $antallMeldinger nye meldinger for $topicNavn}")
records.forEach { record ->
val brregVirksomhet = Json.decodeFromString<BrregVirksomhetDto>(record.value())

try {
val virksomhet = brregVirksomhet.tilVirksomhet(
status = VirksomhetStatus.AKTIV,
oppdateringsId = null
)
virksomhetRepository.insert(virksomhet)
virksomhetRepository.insertNæringsundergrupper(virksomhet)
} catch (e: UgyldigAdresseException) {
antallIrrelevanteBedrifter += 1
}
}
logger.info("Lagret $antallMeldinger meldinger for $topicNavn")
if (antallIrrelevanteBedrifter > 0) {
logger.info("Fant $antallIrrelevanteBedrifter irrelevante bedrifter.")
}
consumer.commitSync()
} catch (e: RetriableException) {
logger.warn("Had a retriable exception for $topicNavn, retrying", e)
}
delay(kafka.consumerLoopDelay)
}
} catch (e: WakeupException) {
logger.info("BrregAlleVirksomheterConsumer is shutting down...")
} catch (e: Exception) {
logger.error("Exception is shutting down kafka listner for $topicNavn", e)
throw e
}
}
}
}

fun cancel() = runBlocking {
logger.info("Stopping kafka consumer job for ${kafka.brregAlleVirksomheterTopic}")
kafkaConsumer.wakeup()
job.cancelAndJoin()
logger.info("Stopped kafka consumer job for ${kafka.brregAlleVirksomheterTopic}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ object BrregOppdateringConsumer : CoroutineScope {
oppdateringsId = oppdateringVirksomhet.oppdateringsid
)
repository.insert(virksomhet)
repository.simpleInsert(virksomhet)
repository.insertNæringsundergrupper(virksomhet)
} catch (e: UgyldigAdresseException) {
antallIrrelevanteBedrifter += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class VirksomhetRepository(val dataSource: DataSource) {
}
}

fun simpleInsert(virksomhet: VirksomhetLagringDao) {
fun insertNæringsundergrupper(virksomhet: VirksomhetLagringDao) {
using(sessionOf(dataSource)) { session ->
session.transaction { tx ->
val insertSql = """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.kotest.matchers.shouldBe
import io.kotest.matchers.shouldNotBe
import io.ktor.http.HttpStatusCode
import kotlinx.datetime.Clock
import no.nav.lydia.helper.IntegrationsHelper
import no.nav.lydia.helper.PiaBrregOppdateringTestData.Companion.endredeVirksomheter
import no.nav.lydia.helper.PiaBrregOppdateringTestData.Companion.fjernedeVirksomheter
import no.nav.lydia.helper.PiaBrregOppdateringTestData.Companion.nyeVirksomheter
Expand All @@ -21,8 +22,10 @@ import no.nav.lydia.helper.TestVirksomhet.Companion.nyVirksomhet
import no.nav.lydia.helper.VirksomhetHelper
import no.nav.lydia.helper.genererEndretNavn
import no.nav.lydia.integrasjoner.brreg.Beliggenhetsadresse
import no.nav.lydia.integrasjoner.ssb.NæringsDownloader
import no.nav.lydia.virksomhet.api.VirksomhetDto
import no.nav.lydia.virksomhet.domene.Næringsgruppe
import no.nav.lydia.virksomhet.domene.Sektor
import no.nav.lydia.virksomhet.domene.VirksomhetStatus
import java.sql.Timestamp
import kotlin.test.Test
Expand All @@ -34,6 +37,51 @@ import kotlin.test.Test
class VirksomhetOppdateringTest {
private val token = TestContainerHelper.oauth2ServerContainer.superbruker1.token

@Test
fun `vi oppdaterer næringsgrupper til en bedrift når vi importerer ALLE bedrifter`() {
val nyVirksomhet = nyVirksomhet(
beliggenhet = Beliggenhetsadresse(
land = "NORGE",
landkode = "NO",
postnummer = "0100",
poststed = "OSLO",
adresse = listOf("Tertitten 1"),
kommune = "OSLO",
kommunenummer = "0300",
), næringer = listOf(
Næringsgruppe(
"Barnehager", "88.911"
),
Næringsgruppe(
"Dyrking av ettårige vekster ellers", "01.190"
)
)
)
NæringsDownloader(
url = IntegrationsHelper.mockKallMotSsbNæringer(
httpMock = TestContainerHelper.httpMock,
testData = TestData.fraVirksomhet(nyVirksomhet, sektor = Sektor.STATLIG, perioder = listOf())
),
næringsRepository = TestContainerHelper.næringsRepository
).lastNedNæringer()

TestContainerHelper.kafkaContainerHelper.sendBrregAlleVirksomheter(listOf(nyVirksomhet))

val virksomhetId = TestContainerHelper.postgresContainer.hentEnkelKolonne<Int>(
"""select id from virksomhet
where orgnr='${nyVirksomhet.orgnr}'
""".trimIndent()
)

val næringskode1 = TestContainerHelper.postgresContainer.hentEnkelKolonne<String>(
"""select naeringskode1 from virksomhet_naringsundergrupper
where virksomhet= $virksomhetId
""".trimIndent()
)

næringskode1 shouldBe "88.911"
}

@Test
fun `vi oppdaterer næringsgrupper til en bedrift`() {
val nyVirksomhet = nyVirksomhet(
Expand Down
Loading

0 comments on commit b1b5d93

Please sign in to comment.