Skip to content

Commit

Permalink
Improve consume sykmeldinger (#175)
Browse files Browse the repository at this point in the history
* Skip old sykmeldinger, and read 100 at a time
  • Loading branch information
AudunSorheim authored Jul 5, 2024
1 parent 971c291 commit e24da3e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class SendtSykmeldingAivenConsumer(
"org.apache.kafka.common.serialization.StringDeserializer"
)
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100")
}
kafkaListener = KafkaConsumer(kafkaConfig)
kafkaListener.subscribe(listOf(SENDT_SYKMELDING_TOPIC))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,12 @@ package no.nav.syfo.sykmelding.database
import no.nav.syfo.application.database.DatabaseInterface
import no.nav.syfo.application.database.toList
import no.nav.syfo.sykmelding.domain.Sykmeldingsperiode
import no.nav.syfo.sykmelding.domain.SykmeldingsperiodeAGDTO
import java.sql.ResultSet
import java.sql.Timestamp
import java.time.LocalDate
import java.time.LocalDateTime
import java.util.*

fun DatabaseInterface.persistSykmeldingsperioder(
sykmeldingId: String,
orgnumber: String,
employeeIdentificationNumber: String,
sykmeldingsperioder: List<SykmeldingsperiodeAGDTO>
) {
sykmeldingsperioder.forEach { sykmeldingsperiode ->
persistSykmeldingsperiode(
sykmeldingId,
orgnumber,
employeeIdentificationNumber,
sykmeldingsperiode.fom,
sykmeldingsperiode.tom
)
}
}

fun DatabaseInterface.persistSykmeldingsperiode(
sykmeldingId: String,
orgnummer: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import no.nav.syfo.application.database.DatabaseInterface
import no.nav.syfo.sykmelding.database.deleteSykmeldingsperioder
import no.nav.syfo.sykmelding.database.getSykmeldingsperioder
import no.nav.syfo.sykmelding.database.hasActiveSentSykmelding
import no.nav.syfo.sykmelding.database.persistSykmeldingsperioder
import no.nav.syfo.sykmelding.database.persistSykmeldingsperiode
import no.nav.syfo.sykmelding.domain.Sykmeldingsperiode
import no.nav.syfo.sykmelding.domain.SykmeldingsperiodeAGDTO
import java.time.LocalDate

class SendtSykmeldingService(private val database: DatabaseInterface) {
fun persistSykmeldingsperioder(
Expand All @@ -15,12 +16,20 @@ class SendtSykmeldingService(private val database: DatabaseInterface) {
employeeIdentificationNumber: String,
sykmeldingsperioder: List<SykmeldingsperiodeAGDTO>
) {
database.persistSykmeldingsperioder(
sykmeldingId = sykmeldingId,
orgnumber = orgnumber,
employeeIdentificationNumber = employeeIdentificationNumber,
sykmeldingsperioder = sykmeldingsperioder
)
val activeSykmeldingsPerioder = sykmeldingsperioder.filter {
!it.tom.isBefore(
LocalDate.now()
)
}
activeSykmeldingsPerioder.forEach { sykmeldingsperiode ->
database.persistSykmeldingsperiode(
sykmeldingId = sykmeldingId,
orgnummer = orgnumber,
employeeIdentificationNumber = employeeIdentificationNumber,
fom = sykmeldingsperiode.fom,
tom = sykmeldingsperiode.tom
)
}
}

fun deleteSykmeldingsperioder(sykmeldingId: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ class SendtSykmeldingServiceTest : DescribeSpec({
}

describe("Sykmeldingsperioder") {
it("Should persist sykmeldingsperioder") {
it("Should persist active sykmeldingsperioder") {
val sykmeldingId = "123"
val orgnumber = "456"
val employeeIdentificationNumber = "789"
val activeSykmeldingFom = LocalDate.now().minusWeeks(10)
val activeSykmeldingTom = LocalDate.now().plusWeeks(10)

val sykmeldingsperioder = listOf(
SykmeldingsperiodeAGDTO(
fom = LocalDate.now().minusWeeks(10),
tom = LocalDate.now().plusDays(10),
fom = activeSykmeldingFom,
tom = activeSykmeldingTom,
),
SykmeldingsperiodeAGDTO(
fom = LocalDate.now().minusWeeks(20),
Expand All @@ -43,7 +46,9 @@ class SendtSykmeldingServiceTest : DescribeSpec({
val storedSykmeldingsperioder =
sendtSykmeldingService.getSykmeldingsperioder(orgnumber, employeeIdentificationNumber)

storedSykmeldingsperioder.size shouldBe 2
storedSykmeldingsperioder.size shouldBe 1
storedSykmeldingsperioder[0].fom shouldBe activeSykmeldingFom
storedSykmeldingsperioder[0].tom shouldBe activeSykmeldingTom
}

it("Should delete tombstone records") {
Expand Down Expand Up @@ -71,7 +76,7 @@ class SendtSykmeldingServiceTest : DescribeSpec({
val storedSykmeldingsperioder =
sendtSykmeldingService.getSykmeldingsperioder(orgnumber, employeeIdentificationNumber)

storedSykmeldingsperioder.size shouldBe 2
storedSykmeldingsperioder.size shouldBe 1

sendtSykmeldingService.deleteSykmeldingsperioder(sykmeldingId)

Expand Down

0 comments on commit e24da3e

Please sign in to comment.