diff --git a/src/main/kotlin/no/nav/syfo/Bootstrap.kt b/src/main/kotlin/no/nav/syfo/Bootstrap.kt index 34ec606..b0d20ca 100644 --- a/src/main/kotlin/no/nav/syfo/Bootstrap.kt +++ b/src/main/kotlin/no/nav/syfo/Bootstrap.kt @@ -200,12 +200,12 @@ fun main() { val sykmeldingService = SykmeldingService( smregisterClient = - SmregisterClient( - smregisterEndpointURL = env.smregisterEndpointURL, - accessTokenClientV2 = accessTokenClientV2, - scope = env.smregisterAudience, - httpClient = httpClient, - ), + SmregisterClient( + smregisterEndpointURL = env.smregisterEndpointURL, + accessTokenClientV2 = accessTokenClientV2, + scope = env.smregisterAudience, + httpClient = httpClient, + ), ) val behandlingsutfallService = BehandlingsutfallService( @@ -363,8 +363,10 @@ suspend fun blockingApplicationLogic( applicationState, kafkaAivenConsumerReceivedSykmelding, mottattSykmeldingService, - 0 + 0, ) + kafkaAivenConsumerReceivedSykmelding.unsubscribe() + log.info("Stopper kafkaAivenConsumerReceivedSykmelding") runKafkaConsumer( infotrygdOppdateringProducer, @@ -373,8 +375,10 @@ suspend fun blockingApplicationLogic( applicationState, kafkaAivenConsumerReceivedSykmeldingRetry, mottattSykmeldingService, - 10000 + 10000, ) + kafkaAivenConsumerReceivedSykmeldingRetry.unsubscribe() + log.info("Stopper kafkaAivenConsumerReceivedSykmeldingRetry") } delay(100) } @@ -390,30 +394,20 @@ private suspend fun runKafkaConsumer( pollTimeMs: Long, ) { while (applicationState.ready && shouldRun(getCurrentTime())) { - try { - kafkaAivenConsumerReceivedSykmelding - .poll(Duration.ofMillis(pollTimeMs)) - .mapNotNull { it.value() } - .forEach { receivedSykmeldingString -> - handleRecords( - session, - infotrygdOppdateringProducer, - infotrygdSporringProducer, - mottattSykmeldingService, - receivedSykmeldingString, - ) - } - delay(100) - } catch (ex: Exception) { - log.error("error running sykmelding-consumer", ex) - } finally { - kafkaAivenConsumerReceivedSykmelding.unsubscribe() - log.info( - "Unsubscribed from topic and waiting for 10 seconds before trying again", - ) - delay(10_000) - } + kafkaAivenConsumerReceivedSykmelding + .poll(Duration.ofMillis(pollTimeMs)) + .mapNotNull { it.value() } + .forEach { receivedSykmeldingString -> + handleRecords( + session, + infotrygdOppdateringProducer, + infotrygdSporringProducer, + mottattSykmeldingService, + receivedSykmeldingString, + ) + } } + delay(100) } @WithSpan