Skip to content

Commit

Permalink
Merge pull request #142 from RADAR-base/release-0.5.3
Browse files Browse the repository at this point in the history
Release 0.5.3
  • Loading branch information
mpgxvii committed Aug 19, 2024
2 parents 361ae2b + 27e5a19 commit 6e29fd5
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 53 deletions.
34 changes: 34 additions & 0 deletions .github/workflows/snyk.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: Snyk test

on:
- pull_request

jobs:
security:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- uses: snyk/actions/setup@master
with:
snyk-version: v1.1032.0

- uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 17

- name: Setup Gradle
uses: gradle/gradle-build-action@v2

- name: Run Snyk to check for vulnerabilities
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
run: >
snyk test
--all-projects
--configuration-matching="^runtimeClasspath$"
--fail-on=upgradable
--org=radar-base
--policy-path=.snyk
--severity-threshold=high
26 changes: 12 additions & 14 deletions buildSrc/src/main/kotlin/Versions.kt
Original file line number Diff line number Diff line change
@@ -1,33 +1,31 @@
@Suppress("ConstPropertyName", "MemberVisibilityCanBePrivate")
object Versions {
const val project = "0.5.2"
const val project = "0.5.3"

const val java = 17
const val kotlin = "1.9.10"
const val kotlin = "1.9.22"
const val wrapper = "8.4"

const val radarCommons = "1.1.2"
const val confluent = "7.6.0"
const val confluent = "7.7.0"
const val kafka = "$confluent-ce"
const val avro = "1.11.3"
const val avro = "1.12.0"

const val managementPortal = "2.0.0"

// From image
const val jackson = "2.16.1"
const val jackson = "2.17.0"

const val log4j2 = "2.20.0"
const val slf4j = "2.0.9"
const val log4j2 = "2.23.1"
const val slf4j = "2.0.13"

const val okhttp = "4.12.0"

const val firebaseAdmin = "9.1.0"
const val firebaseAdmin = "9.2.0"
const val radarSchemas = "0.8.7-hotfix"
const val ktor = "2.3.5"
const val ktor = "2.3.10"

const val junit = "5.9.3"
const val wiremock = "2.27.2"
const val mockito = "5.3.1"

const val kotlinVersion = "1.9.10"
const val junit = "5.10.2"
const val wiremock = "3.0.1"
const val mockito = "5.11.0"
}
7 changes: 7 additions & 0 deletions kafka-connect-fitbit-source/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ RUN gradle jar

FROM confluentinc/cp-kafka-connect-base:7.5.0

USER root

RUN yum remove -y zulu11-ca-jdk-headless && yum remove -y zulu11-ca-jre-headless
RUN yum install -y zulu17-ca-jdk-headless && yum install -y zulu17-ca-jre-headless

USER appuser

MAINTAINER Joris Borgdorff <joris@thehyve.nl>

LABEL description="Kafka REST API Source connector"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,10 @@ class ServiceUserRepository : UserRepository {
): T = withContext(Dispatchers.IO) {
val response = client.request(builder)
val contentLength = response.contentLength()
val hasBody = contentLength != null && contentLength > 0
// if Transfer-Encoding: chunked, then the request has data but contentLength will be null.
val transferEncoding = response.headers["Transfer-Encoding"]
val hasBody = (contentLength != null && contentLength > 0) ||
(transferEncoding != null && transferEncoding.contains("chunked"))
if (response.status == HttpStatusCode.NotFound) {
throw NoSuchElementException("URL " + response.request.url + " does not exist")
} else if (!response.status.isSuccess() || !hasBody) {
Expand Down
7 changes: 7 additions & 0 deletions kafka-connect-oura-source/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ RUN gradle jar

FROM confluentinc/cp-kafka-connect-base:7.5.0

USER root

RUN yum remove -y zulu11-ca-jdk-headless && yum remove -y zulu11-ca-jre-headless
RUN yum install -y zulu17-ca-jdk-headless && yum install -y zulu17-ca-jre-headless

USER appuser

MAINTAINER Pauline Conde <pauline.conde@kcl.ac.uk>

LABEL description="Kafka Oura REST API Source connector"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class OuraSourceTask extends SourceTask {
private AvroData avroData = new AvroData(20);
private KafkaOffsetManager offsetManager;
String TIMESTAMP_OFFSET_KEY = "timestamp";
long TIMEOUT = 60000L;

public void initialize(OuraRestSourceConnectorConfig config, OffsetStorageReader offsetStorageReader) {
OuraRestSourceConnectorConfig ouraConfig = (OuraRestSourceConnectorConfig) config;
Expand Down Expand Up @@ -144,6 +145,8 @@ public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> sourceRecords = Collections.emptyList();

do {
Thread.sleep(TIMEOUT);

Map<String, String> configs = context.configs();
Iterator<? extends RestRequest> requestIterator = this.requests()
.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void initialize(List<Map<String, Object>> partitions) {
.filter(e -> e.getValue() != null && e.getValue().containsKey(TIMESTAMP_OFFSET_KEY))
.collect(Collectors.toMap(
e -> (String) e.getKey().get("user") + "-" + e.getKey().get("route"),
e -> Instant.ofEpochMilli(((Number) e.getValue().get(TIMESTAMP_OFFSET_KEY)).longValue())));
e -> Instant.ofEpochSecond(((Number) e.getValue().get(TIMESTAMP_OFFSET_KEY)).longValue())));
} else {
logger.warn("Offset storage reader is null, will resume from an empty state.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,11 @@

package org.radarbase.connect.rest;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
import static com.github.tomakehurst.wiremock.client.WireMock.matching;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.resetAllRequests;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.VerificationException;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.verification.LoggedRequest;
import com.github.tomakehurst.wiremock.verification.NearMiss;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
Expand All @@ -56,6 +40,23 @@
import org.radarbase.connect.rest.single.SingleRestSourceConnector;
import org.radarbase.connect.rest.single.SingleRestSourceConnectorConfig;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
import static com.github.tomakehurst.wiremock.client.WireMock.matching;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.resetAllRequests;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
import static org.junit.jupiter.api.Assertions.assertEquals;

@ExtendWith(WireMockRule.class)
public class RestTaskTest {

Expand Down Expand Up @@ -167,7 +168,7 @@ public void afterEach(ExtensionContext context) {
}
}

private void checkForUnmatchedRequests() {
public void checkForUnmatchedRequests() {
List<LoggedRequest> unmatchedRequests = findAllUnmatchedRequests();
if (!unmatchedRequests.isEmpty()) {
List<NearMiss> nearMisses = findNearMissesForAllUnmatchedRequests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,9 @@ constructor(
logger.info("Offsets found in persistence: " + offsetTime.toString())
offsetTime.coerceAtLeast(startDate)
}
val endDate = if (user.endDate >= Instant.now()) Instant.now() else user.endDate
if (Duration.between(startOffset, endDate).toDays() <= ONE_DAY) {
logger.info("Interval between dates is too short. Backing off..")
userNextRequest[user.versionedId] = Instant.now().plus(USER_BACK_OFF_TIME)
val endDate = user.endDate?.coerceAtMost(Instant.now()) ?: Instant.now()
if (Duration.between(startOffset, endDate) <= ONE_DAY) {
logger.info("Interval between dates is too short. Not requesting..")
return emptySequence()
}
val endTime = (startOffset + defaultQueryRange).coerceAtMost(endDate)
Expand Down Expand Up @@ -130,26 +129,24 @@ constructor(
ouraOffsetManager.updateOffsets(
request.route,
request.user,
Instant.ofEpochSecond(offset).plus(Duration.ofMillis(500)),
Instant.ofEpochSecond(offset).plus(ONE_DAY),
)
val currentNextRequestTime = userNextRequest[request.user.versionedId]
val nextRequestTime = Instant.now().plus(SUCCESS_BACK_OFF_TIME)
userNextRequest[request.user.versionedId] =
currentNextRequestTime?.let {
if (currentNextRequestTime > nextRequestTime) {
currentNextRequestTime
} else {
nextRequestTime
}
}
?: nextRequestTime
userNextRequest[request.user.versionedId]?.let {
if (it > nextRequestTime) it else nextRequestTime
} ?: nextRequestTime
} else {
if (request.startDate.plus(TIME_AFTER_REQUEST).isBefore(Instant.now())) {
logger.info("No records found, updating offsets to end date..")
ouraOffsetManager.updateOffsets(
request.route,
request.user,
request.endDate,
)
userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME)
} else {
userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME)
}
}
return records
Expand Down Expand Up @@ -240,10 +237,10 @@ constructor(
companion object {
private val logger = LoggerFactory.getLogger(OuraRequestGenerator::class.java)
private val BACK_OFF_TIME = Duration.ofMinutes(10L)
private val ONE_DAY = 1L
private val ONE_DAY = Duration.ofDays(1L)
private val TIME_AFTER_REQUEST = Duration.ofDays(30)
private val USER_BACK_OFF_TIME = Duration.ofMinutes(2L)
private val SUCCESS_BACK_OFF_TIME = Duration.ofSeconds(3L)
private val USER_BACK_OFF_TIME = Duration.ofHours(12L)
private val SUCCESS_BACK_OFF_TIME = Duration.ofMinutes(1L)
private val USER_MAX_REQUESTS = 20
val JSON_FACTORY = JsonFactory()
val JSON_READER = ObjectMapper(JSON_FACTORY).registerModule(JavaTimeModule()).reader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ data class OuraUser(
@JsonProperty("externalId") override val externalId: String?,
@JsonProperty("isAuthorized") override val isAuthorized: Boolean,
@JsonProperty("startDate") override val startDate: Instant,
@JsonProperty("endDate") override val endDate: Instant,
@JsonProperty("endDate") override val endDate: Instant? = null,
@JsonProperty("version") override val version: String? = null,
@JsonProperty("serviceUserId") override val serviceUserId: String? = null,
) : User {
override val observationKey: ObservationKey = ObservationKey(projectId, userId, sourceId)
override val versionedId: String = "$id${version?.let { "#$it" } ?: ""}"

fun isComplete() = isAuthorized && startDate.isBefore(endDate) && serviceUserId != null
fun isComplete() =
isAuthorized && (endDate == null || startDate.isBefore(endDate)) && serviceUserId != null
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ interface User {
val sourceId: String
val externalId: String?
val startDate: Instant
val endDate: Instant
val endDate: Instant?
val createdAt: Instant
val humanReadableUserId: String?
val serviceUserId: String?
Expand Down

0 comments on commit 6e29fd5

Please sign in to comment.