From a25017e6855d266140a8841b637d47789da7d7eb Mon Sep 17 00:00:00 2001 From: Bastiaan Date: Thu, 18 Apr 2024 15:20:51 +0200 Subject: [PATCH 01/18] Add snyk workflow to github actions --- .github/workflows/snyk.yaml | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 .github/workflows/snyk.yaml diff --git a/.github/workflows/snyk.yaml b/.github/workflows/snyk.yaml new file mode 100644 index 00000000..54b592d6 --- /dev/null +++ b/.github/workflows/snyk.yaml @@ -0,0 +1,34 @@ +name: Snyk test + +on: + - pull_request + +jobs: + security: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - uses: snyk/actions/setup@master + with: + snyk-version: v1.1032.0 + + - uses: actions/setup-java@v3 + with: + distribution: temurin + java-version: 17 + + - name: Setup Gradle + uses: gradle/gradle-build-action@v2 + + - name: Run Snyk to check for vulnerabilities + env: + SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }} + run: > + snyk test + --all-projects + --configuration-matching="^runtimeClasspath$" + --fail-on=upgradable + --org=radar-base + --policy-path=.snyk + --severity-threshold=high From 4f266302f76893d7f65a9463ddf495e96874ce0f Mon Sep 17 00:00:00 2001 From: Bastiaan Date: Thu, 18 Apr 2024 16:07:33 +0200 Subject: [PATCH 02/18] Update the versions of our dependencies to the latest available --- buildSrc/src/main/kotlin/Versions.kt | 20 +++++------ .../radarbase/connect/rest/RestTaskTest.java | 35 ++++++++++--------- 2 files changed, 27 insertions(+), 28 deletions(-) diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index 0109e945..6c1d7e80 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -3,7 +3,7 @@ object Versions { const val project = "0.4.2-SNAPSHOT" const val java = 17 - const val kotlin = "1.9.10" + const val kotlin = "1.9.22" const val wrapper = "8.4" const val radarCommons = "1.1.2" @@ -14,20 +14,18 @@ object Versions { const val managementPortal = "2.0.0" // From image - const val jackson = "2.16.1" + const val jackson = "2.17.0" - const val log4j2 = "2.20.0" - const val slf4j = "2.0.9" + const val log4j2 = "2.23.1" + const val slf4j = "2.0.13" const val okhttp = "4.12.0" - const val firebaseAdmin = "9.1.0" + const val firebaseAdmin = "9.2.0" const val radarSchemas = "0.8.7-hotfix" - const val ktor = "2.3.5" + const val ktor = "2.3.10" - const val junit = "5.9.3" - const val wiremock = "2.27.2" - const val mockito = "5.3.1" - - const val kotlinVersion = "1.9.10" + const val junit = "5.10.2" + const val wiremock = "3.0.1" + const val mockito = "5.11.0" } diff --git a/kafka-connect-rest-source/src/test/java/org/radarbase/connect/rest/RestTaskTest.java b/kafka-connect-rest-source/src/test/java/org/radarbase/connect/rest/RestTaskTest.java index d4868e6a..885fedec 100644 --- a/kafka-connect-rest-source/src/test/java/org/radarbase/connect/rest/RestTaskTest.java +++ b/kafka-connect-rest-source/src/test/java/org/radarbase/connect/rest/RestTaskTest.java @@ -17,27 +17,11 @@ package org.radarbase.connect.rest; -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; -import static com.github.tomakehurst.wiremock.client.WireMock.matching; -import static com.github.tomakehurst.wiremock.client.WireMock.post; -import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; -import static com.github.tomakehurst.wiremock.client.WireMock.resetAllRequests; -import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; -import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching; -import static com.github.tomakehurst.wiremock.client.WireMock.verify; -import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; -import static org.junit.jupiter.api.Assertions.assertEquals; - import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.client.VerificationException; import com.github.tomakehurst.wiremock.client.WireMock; import com.github.tomakehurst.wiremock.verification.LoggedRequest; import com.github.tomakehurst.wiremock.verification.NearMiss; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTaskContext; import org.apache.kafka.connect.storage.OffsetStorageReader; @@ -56,6 +40,23 @@ import org.radarbase.connect.rest.single.SingleRestSourceConnector; import org.radarbase.connect.rest.single.SingleRestSourceConnectorConfig; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.matching; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.resetAllRequests; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching; +import static com.github.tomakehurst.wiremock.client.WireMock.verify; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; +import static org.junit.jupiter.api.Assertions.assertEquals; + @ExtendWith(WireMockRule.class) public class RestTaskTest { @@ -167,7 +168,7 @@ public void afterEach(ExtensionContext context) { } } - private void checkForUnmatchedRequests() { + public void checkForUnmatchedRequests() { List unmatchedRequests = findAllUnmatchedRequests(); if (!unmatchedRequests.isEmpty()) { List nearMisses = findNearMissesForAllUnmatchedRequests(); From f72531a31749c1a27c3f999de0349b8a90b3a2da Mon Sep 17 00:00:00 2001 From: Pauline Conde Date: Wed, 8 May 2024 12:44:48 +0100 Subject: [PATCH 03/18] Update Versions.kt --- buildSrc/src/main/kotlin/Versions.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index c2463f20..f399654f 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -1,6 +1,6 @@ @Suppress("ConstPropertyName", "MemberVisibilityCanBePrivate") object Versions { - const val project = "0.5.2" + const val project = "0.5.3-SNAPSHOT" const val java = 17 const val kotlin = "1.9.10" From 7908a49413be758c7108832739e91f5942ef3393 Mon Sep 17 00:00:00 2001 From: Pauline Date: Tue, 28 May 2024 23:33:18 +0800 Subject: [PATCH 04/18] Update Oura request backoff times --- .../kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt index d7729270..c1d89c01 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt @@ -242,8 +242,8 @@ constructor( private val BACK_OFF_TIME = Duration.ofMinutes(10L) private val ONE_DAY = 1L private val TIME_AFTER_REQUEST = Duration.ofDays(30) - private val USER_BACK_OFF_TIME = Duration.ofMinutes(2L) - private val SUCCESS_BACK_OFF_TIME = Duration.ofSeconds(3L) + private val USER_BACK_OFF_TIME = Duration.ofHours(12L) + private val SUCCESS_BACK_OFF_TIME = Duration.ofMinutes(1L) private val USER_MAX_REQUESTS = 20 val JSON_FACTORY = JsonFactory() val JSON_READER = ObjectMapper(JSON_FACTORY).registerModule(JavaTimeModule()).reader() From 0430499d6b9f350cb4ac2990d7e51558f7239d05 Mon Sep 17 00:00:00 2001 From: Bastiaan Date: Tue, 18 Jun 2024 15:36:00 +0200 Subject: [PATCH 05/18] Uninstall java 11 and install java 17 in the base docker images from kafka, as per https://github.com/confluentinc/common-docker/issues/179#issuecomment-1743995773 --- kafka-connect-fitbit-source/Dockerfile | 7 +++++++ kafka-connect-oura-source/Dockerfile | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/kafka-connect-fitbit-source/Dockerfile b/kafka-connect-fitbit-source/Dockerfile index d4c7ff01..3e1ec53f 100644 --- a/kafka-connect-fitbit-source/Dockerfile +++ b/kafka-connect-fitbit-source/Dockerfile @@ -34,6 +34,13 @@ RUN gradle jar FROM confluentinc/cp-kafka-connect-base:7.5.0 +USER root + +RUN yum remove -y zulu11-ca-jdk-headless && yum remove -y zulu11-ca-jre-headless +RUN yum install -y zulu17-ca-jdk-headless && yum install -y zulu17-ca-jre-headless + +USER appuser + MAINTAINER Joris Borgdorff LABEL description="Kafka REST API Source connector" diff --git a/kafka-connect-oura-source/Dockerfile b/kafka-connect-oura-source/Dockerfile index 41972797..2675cdba 100644 --- a/kafka-connect-oura-source/Dockerfile +++ b/kafka-connect-oura-source/Dockerfile @@ -34,6 +34,13 @@ RUN gradle jar FROM confluentinc/cp-kafka-connect-base:7.5.0 +USER root + +RUN yum remove -y zulu11-ca-jdk-headless && yum remove -y zulu11-ca-jre-headless +RUN yum install -y zulu17-ca-jdk-headless && yum install -y zulu17-ca-jre-headless + +USER appuser + MAINTAINER Pauline Conde LABEL description="Kafka Oura REST API Source connector" From 78c20fe6cd28783e7f04db30e4b456e93629311a Mon Sep 17 00:00:00 2001 From: Bastiaan Date: Wed, 19 Jun 2024 14:53:05 +0200 Subject: [PATCH 06/18] Check if transfer-Encoding header is set, if so the response has a body --- .../connect/rest/fitbit/user/ServiceUserRepository.kt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt index c26c2af5..9c93a576 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt @@ -240,7 +240,10 @@ class ServiceUserRepository : UserRepository { ): T = withContext(Dispatchers.IO) { val response = client.request(builder) val contentLength = response.contentLength() - val hasBody = contentLength != null && contentLength > 0 + // if Transfer-Encoding: chunked, then the request has data but contentLength will be null. + val transferEncoding = response.headers["Transfer-Encoding"] + val hasBody = (contentLength != null && contentLength > 0) || + (transferEncoding != null && transferEncoding.contains("chunked")) if (response.status == HttpStatusCode.NotFound) { throw NoSuchElementException("URL " + response.request.url + " does not exist") } else if (!response.status.isSuccess() || !hasBody) { From 9971d00859b9e201637246614aa253452190d537 Mon Sep 17 00:00:00 2001 From: Pauline Date: Wed, 10 Jul 2024 10:54:18 +0100 Subject: [PATCH 07/18] Fix initialisation of kakfa offset manager --- .../radarbase/connect/rest/oura/offset/KafkaOffsetManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java index fbb4329b..341978b2 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java @@ -36,7 +36,7 @@ public void initialize(List> partitions) { .filter(e -> e.getValue() != null && e.getValue().containsKey(TIMESTAMP_OFFSET_KEY)) .collect(Collectors.toMap( e -> (String) e.getKey().get("user") + "-" + e.getKey().get("route"), - e -> Instant.ofEpochMilli(((Number) e.getValue().get(TIMESTAMP_OFFSET_KEY)).longValue()))); + e -> Instant.ofEpochSecond(((Number) e.getValue().get(TIMESTAMP_OFFSET_KEY)).longValue()))); } else { logger.warn("Offset storage reader is null, will resume from an empty state."); } From 5b32e996af759d92260c016ebff17594de497b7d Mon Sep 17 00:00:00 2001 From: Pauline Date: Wed, 10 Jul 2024 10:54:39 +0100 Subject: [PATCH 08/18] Add backoff time when no records are found --- .../kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt index c1d89c01..9a2517ce 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt @@ -145,11 +145,13 @@ constructor( ?: nextRequestTime } else { if (request.startDate.plus(TIME_AFTER_REQUEST).isBefore(Instant.now())) { + logger.info("No records found, updating offsets to end date..") ouraOffsetManager.updateOffsets( request.route, request.user, request.endDate, ) + userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME) } } return records @@ -241,7 +243,7 @@ constructor( private val logger = LoggerFactory.getLogger(OuraRequestGenerator::class.java) private val BACK_OFF_TIME = Duration.ofMinutes(10L) private val ONE_DAY = 1L - private val TIME_AFTER_REQUEST = Duration.ofDays(30) + private val TIME_AFTER_REQUEST = Duration.ofDays(10) private val USER_BACK_OFF_TIME = Duration.ofHours(12L) private val SUCCESS_BACK_OFF_TIME = Duration.ofMinutes(1L) private val USER_MAX_REQUESTS = 20 From c24c89e076fab120a6daab8f6aebe15ab4b763b1 Mon Sep 17 00:00:00 2001 From: Pauline Date: Wed, 10 Jul 2024 14:33:14 +0100 Subject: [PATCH 09/18] UUpdate back off times --- .../org/radarbase/oura/request/OuraRequestGenerator.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt index 9a2517ce..d948d874 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt @@ -90,7 +90,7 @@ constructor( offsetTime.coerceAtLeast(startDate) } val endDate = if (user.endDate >= Instant.now()) Instant.now() else user.endDate - if (Duration.between(startOffset, endDate).toDays() <= ONE_DAY) { + if (Duration.between(startOffset, endDate) <= ONE_DAY) { logger.info("Interval between dates is too short. Backing off..") userNextRequest[user.versionedId] = Instant.now().plus(USER_BACK_OFF_TIME) return emptySequence() @@ -242,8 +242,8 @@ constructor( companion object { private val logger = LoggerFactory.getLogger(OuraRequestGenerator::class.java) private val BACK_OFF_TIME = Duration.ofMinutes(10L) - private val ONE_DAY = 1L - private val TIME_AFTER_REQUEST = Duration.ofDays(10) + private val ONE_DAY = Duration.ofDays(1L) + private val TIME_AFTER_REQUEST = Duration.ofDays(30) private val USER_BACK_OFF_TIME = Duration.ofHours(12L) private val SUCCESS_BACK_OFF_TIME = Duration.ofMinutes(1L) private val USER_MAX_REQUESTS = 20 From 593f50d16043602e643617da0b89b8a3b66e6dba Mon Sep 17 00:00:00 2001 From: Pauline Conde Date: Wed, 10 Jul 2024 15:06:54 +0100 Subject: [PATCH 10/18] Bump version --- buildSrc/src/main/kotlin/Versions.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index 3f84278c..6b5a3557 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -1,6 +1,6 @@ @Suppress("ConstPropertyName", "MemberVisibilityCanBePrivate") object Versions { - const val project = "0.5.3-SNAPSHOT" + const val project = "0.5.3" const val java = 17 const val kotlin = "1.9.22" From 2e0cd88f828c68d166212be9f5a777e0eb1dc8bc Mon Sep 17 00:00:00 2001 From: Pauline Date: Wed, 10 Jul 2024 15:59:46 +0100 Subject: [PATCH 11/18] Allow null end dates for continuous pull of data --- .../radarbase/oura/request/OuraRequestGenerator.kt | 12 +++++++++++- .../main/kotlin/org/radarbase/oura/user/OuraUser.kt | 2 +- .../src/main/kotlin/org/radarbase/oura/user/User.kt | 2 +- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt index d948d874..07ca1624 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt @@ -89,7 +89,17 @@ constructor( logger.info("Offsets found in persistence: " + offsetTime.toString()) offsetTime.coerceAtLeast(startDate) } - val endDate = if (user.endDate >= Instant.now()) Instant.now() else user.endDate + val endDate = user.endDate!!.let { + if (user.endDate == null) { + Instant.now() + } + else { + if (it >= Instant.now()) { + Instant.now() + } + it + } + } if (Duration.between(startOffset, endDate) <= ONE_DAY) { logger.info("Interval between dates is too short. Backing off..") userNextRequest[user.versionedId] = Instant.now().plus(USER_BACK_OFF_TIME) diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt b/oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt index 2b33c3d8..e7156d13 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt @@ -16,7 +16,7 @@ data class OuraUser( @JsonProperty("externalId") override val externalId: String?, @JsonProperty("isAuthorized") override val isAuthorized: Boolean, @JsonProperty("startDate") override val startDate: Instant, - @JsonProperty("endDate") override val endDate: Instant, + @JsonProperty("endDate") override val endDate: Instant? = null, @JsonProperty("version") override val version: String? = null, @JsonProperty("serviceUserId") override val serviceUserId: String? = null, ) : User { diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/user/User.kt b/oura-library/src/main/kotlin/org/radarbase/oura/user/User.kt index ee6fd496..b2247354 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/user/User.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/user/User.kt @@ -10,7 +10,7 @@ interface User { val sourceId: String val externalId: String? val startDate: Instant - val endDate: Instant + val endDate: Instant? val createdAt: Instant val humanReadableUserId: String? val serviceUserId: String? From 67af96456a5ae472b650fc7cc3a86d7ba4b42ebe Mon Sep 17 00:00:00 2001 From: Pauline Date: Wed, 10 Jul 2024 16:12:23 +0100 Subject: [PATCH 12/18] Update back off time for no records to ONE_DAY --- .../kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt index d948d874..eb807efc 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt @@ -151,7 +151,7 @@ constructor( request.user, request.endDate, ) - userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME) + userNextRequest[request.user.versionedId] = Instant.now().plus(ONE_DAY) } } return records From 5d87b76e049dcc0ea2e5cd029564b15de18eec3b Mon Sep 17 00:00:00 2001 From: Pauline Conde Date: Wed, 10 Jul 2024 16:18:09 +0100 Subject: [PATCH 13/18] Refactor checking of end date for brevity Co-authored-by: Yatharth Ranjan --- .../radarbase/oura/request/OuraRequestGenerator.kt | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt index 07ca1624..39633acb 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt @@ -89,17 +89,7 @@ constructor( logger.info("Offsets found in persistence: " + offsetTime.toString()) offsetTime.coerceAtLeast(startDate) } - val endDate = user.endDate!!.let { - if (user.endDate == null) { - Instant.now() - } - else { - if (it >= Instant.now()) { - Instant.now() - } - it - } - } + val endDate = user.endDate?.coerceAtMost(Instant.now()) ?: Instant.now() if (Duration.between(startOffset, endDate) <= ONE_DAY) { logger.info("Interval between dates is too short. Backing off..") userNextRequest[user.versionedId] = Instant.now().plus(USER_BACK_OFF_TIME) From f7d3f330d8c03caa83becd5373d1d97db66fb05d Mon Sep 17 00:00:00 2001 From: Pauline Date: Fri, 12 Jul 2024 15:45:14 +0100 Subject: [PATCH 14/18] Fix isComplete check for User --- .../src/main/kotlin/org/radarbase/oura/user/OuraUser.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt b/oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt index e7156d13..e3a7034d 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt @@ -23,5 +23,5 @@ data class OuraUser( override val observationKey: ObservationKey = ObservationKey(projectId, userId, sourceId) override val versionedId: String = "$id${version?.let { "#$it" } ?: ""}" - fun isComplete() = isAuthorized && startDate.isBefore(endDate) && serviceUserId != null + fun isComplete() = isAuthorized && (endDate == null || startDate.isBefore(endDate)) && serviceUserId != null } From dbcd7dcb8dbc6d1b057171c583c6b2844906327c Mon Sep 17 00:00:00 2001 From: Pauline Date: Mon, 15 Jul 2024 11:51:13 +0100 Subject: [PATCH 15/18] Fix lint errors --- .../src/main/kotlin/org/radarbase/oura/user/OuraUser.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt b/oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt index e3a7034d..57905f8e 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt @@ -23,5 +23,6 @@ data class OuraUser( override val observationKey: ObservationKey = ObservationKey(projectId, userId, sourceId) override val versionedId: String = "$id${version?.let { "#$it" } ?: ""}" - fun isComplete() = isAuthorized && (endDate == null || startDate.isBefore(endDate)) && serviceUserId != null + fun isComplete() = + isAuthorized && (endDate == null || startDate.isBefore(endDate)) && serviceUserId != null } From 7f0dcb1d441a5dd9988901fcb9f3ce24cb758411 Mon Sep 17 00:00:00 2001 From: Pauline Date: Fri, 26 Jul 2024 14:22:50 +0100 Subject: [PATCH 16/18] Fix Oura offset times --- .../connect/rest/oura/OuraSourceTask.java | 3 +++ .../oura/request/OuraRequestGenerator.kt | 22 +++++++------------ 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java index 293a792f..39ad35e7 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java @@ -63,6 +63,7 @@ public class OuraSourceTask extends SourceTask { private AvroData avroData = new AvroData(20); private KafkaOffsetManager offsetManager; String TIMESTAMP_OFFSET_KEY = "timestamp"; + long TIMEOUT = 60000L; public void initialize(OuraRestSourceConnectorConfig config, OffsetStorageReader offsetStorageReader) { OuraRestSourceConnectorConfig ouraConfig = (OuraRestSourceConnectorConfig) config; @@ -144,6 +145,8 @@ public List poll() throws InterruptedException { List sourceRecords = Collections.emptyList(); do { + Thread.sleep(TIMEOUT); + Map configs = context.configs(); Iterator requestIterator = this.requests() .iterator(); diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt index fba8fa26..6d35132e 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt @@ -91,8 +91,7 @@ constructor( } val endDate = user.endDate?.coerceAtMost(Instant.now()) ?: Instant.now() if (Duration.between(startOffset, endDate) <= ONE_DAY) { - logger.info("Interval between dates is too short. Backing off..") - userNextRequest[user.versionedId] = Instant.now().plus(USER_BACK_OFF_TIME) + logger.info("Interval between dates is too short. Not requesting..") return emptySequence() } val endTime = (startOffset + defaultQueryRange).coerceAtMost(endDate) @@ -130,19 +129,12 @@ constructor( ouraOffsetManager.updateOffsets( request.route, request.user, - Instant.ofEpochSecond(offset).plus(Duration.ofMillis(500)), + Instant.ofEpochSecond(offset).plus(ONE_DAY), ) - val currentNextRequestTime = userNextRequest[request.user.versionedId] val nextRequestTime = Instant.now().plus(SUCCESS_BACK_OFF_TIME) - userNextRequest[request.user.versionedId] = - currentNextRequestTime?.let { - if (currentNextRequestTime > nextRequestTime) { - currentNextRequestTime - } else { - nextRequestTime - } - } - ?: nextRequestTime + userNextRequest[request.user.versionedId] = userNextRequest[request.user.versionedId]?.let { + if (it > nextRequestTime) it else nextRequestTime + } ?: nextRequestTime } else { if (request.startDate.plus(TIME_AFTER_REQUEST).isBefore(Instant.now())) { logger.info("No records found, updating offsets to end date..") @@ -151,7 +143,9 @@ constructor( request.user, request.endDate, ) - userNextRequest[request.user.versionedId] = Instant.now().plus(ONE_DAY) + userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME) + } else { + userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME) } } return records From 8544aa99f34c7fea9765af9b3d1d2a2dc0e15743 Mon Sep 17 00:00:00 2001 From: Pauline Date: Fri, 9 Aug 2024 13:35:32 +0100 Subject: [PATCH 17/18] Fix lint errors --- .../org/radarbase/oura/request/OuraRequestGenerator.kt | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt index 6d35132e..79cb30b3 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt @@ -132,9 +132,10 @@ constructor( Instant.ofEpochSecond(offset).plus(ONE_DAY), ) val nextRequestTime = Instant.now().plus(SUCCESS_BACK_OFF_TIME) - userNextRequest[request.user.versionedId] = userNextRequest[request.user.versionedId]?.let { - if (it > nextRequestTime) it else nextRequestTime - } ?: nextRequestTime + userNextRequest[request.user.versionedId] = + userNextRequest[request.user.versionedId]?.let { + if (it > nextRequestTime) it else nextRequestTime + } ?: nextRequestTime } else { if (request.startDate.plus(TIME_AFTER_REQUEST).isBefore(Instant.now())) { logger.info("No records found, updating offsets to end date..") From 27e5a196cd0281e5d02f8597a9a362cbfdb7597f Mon Sep 17 00:00:00 2001 From: Pauline Date: Mon, 19 Aug 2024 14:06:16 +0100 Subject: [PATCH 18/18] Bump vulnerable deps --- buildSrc/src/main/kotlin/Versions.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index 6b5a3557..6abf27b1 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -7,9 +7,9 @@ object Versions { const val wrapper = "8.4" const val radarCommons = "1.1.2" - const val confluent = "7.6.0" + const val confluent = "7.7.0" const val kafka = "$confluent-ce" - const val avro = "1.11.3" + const val avro = "1.12.0" const val managementPortal = "2.0.0"