diff --git a/.dockerignore b/.dockerignore index d3cc21fe..98eae927 100644 --- a/.dockerignore +++ b/.dockerignore @@ -3,8 +3,6 @@ .idea out build -*/out */src/test -*/build *.iml .gradletasknamecache diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 00000000..2001dadf --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,143 @@ +# Continuous integration, including test and integration test +name: Main test + +# Run in master and dev branches and in all pull requests to those branches +on: + push: + branches: [ master, dev ] + pull_request: + branches: [ master, dev ] + +env: + DOCKER_IMAGE: radarbase/kafka-connect-rest-fitbit-source + +jobs: + # Build and test the code + kotlin: + # The type of runner that the job will run on + runs-on: ubuntu-latest + + # Steps represent a sequence of tasks that will be executed as part of the job + steps: + # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it + - uses: actions/checkout@v2 + + - uses: actions/setup-java@v1 + with: + java-version: 11 + + - name: Cache + uses: actions/cache@v2 + with: + # Cache gradle directories + path: | + ~/.gradle/caches + ~/.gradle/wrapper + # Key for restoring and saving the cache + key: ${{ runner.os }}-gradle-${{ hashFiles('gradlew', '**/*.gradle', 'gradle.properties', 'gradle/**') }} + restore-keys: | + ${{ runner.os }}-gradle- + + # Compile the code + - name: Compile code + run: ./gradlew assemble + + # Gradle check + - name: Check + run: ./gradlew check + + docker: + # The type of runner that the job will run on + runs-on: ubuntu-latest + + # Steps represent a sequence of tasks that will be executed as part of the job + steps: + # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it + - uses: actions/checkout@v2 + + - name: Docker build parameters + id: docker_params + run: | + echo "::set-output name=has_docker_login::${{ secrets.DOCKERHUB_USERNAME != '' && secrets.DOCKERHUB_TOKEN != '' }}" + if [ "${{ github.event_name == 'pull_request' }}" = "true" ]; then + echo "::set-output name=push::false" + echo "::set-output name=load::true" + echo "::set-output name=platforms::linux/amd64" + else + echo "::set-output name=push::true" + echo "::set-output name=load::false" + echo "::set-output name=platforms::linux/amd64,linux/arm64" + fi + + - name: Cache Docker layers + id: cache_buildx + uses: actions/cache@v2 + with: + path: /tmp/.buildx-cache + key: ${{ runner.os }}-buildx-${{ steps.docker_params.outputs.push }}-${{ hashFiles('**/Dockerfile', '**/*.gradle', 'gradle.properties', '.dockerignore', '*/src/main/**', 'docker/**') }} + restore-keys: | + ${{ runner.os }}-buildx-${{ steps.docker_params.outputs.push }}- + ${{ runner.os }}-buildx- + + - name: Login to Docker Hub + if: steps.docker_params.outputs.has_docker_login == 'true' + uses: docker/login-action@v1 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + # Add Docker labels and tags + - name: Docker meta + id: docker_meta + uses: crazy-max/ghaction-docker-meta@v2 + with: + images: ${{ env.DOCKER_IMAGE }} + + # Setup docker build environment + - name: Set up QEMU + uses: docker/setup-qemu-action@v1 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 + + - name: Cache parameters + id: cache-parameters + run: | + if [ "${{ steps.cache_buildx.outputs.cache-hit }}" = "true" ]; then + echo "::set-output name=cache-to::" + else + echo "::set-output name=cache-to::type=local,dest=/tmp/.buildx-cache-new,mode=max" + fi + + - name: Build docker + uses: docker/build-push-action@v2 + with: + cache-from: type=local,src=/tmp/.buildx-cache + cache-to: ${{ steps.cache-parameters.outputs.cache-to }} + platforms: ${{ steps.docker_params.outputs.platforms }} + load: ${{ steps.docker_params.outputs.load }} + push: ${{ steps.docker_params.outputs.push }} + tags: ${{ steps.docker_meta.outputs.tags }} + # Use runtime labels from docker_meta as well as fixed labels + labels: | + ${{ steps.docker_meta.outputs.labels }} + maintainer=Joris Borgdorff , Nivethika Mahasivam , Pauline Conde + org.opencontainers.image.description=RADAR-base upload connector backend application + org.opencontainers.image.authors=Joris Borgdorff , Nivethika Mahasivam , Pauline Conde + org.opencontainers.image.vendor=RADAR-base + org.opencontainers.image.licenses=Apache-2.0 + + - name: Pull docker image + if: steps.docker_params.outputs.load == 'false' + run: docker pull ${{ env.DOCKER_IMAGE }}:${{ steps.docker_meta.outputs.version }} + + - name: Inspect docker image + run: | + docker image inspect ${{ env.DOCKER_IMAGE }}:${{ steps.docker_meta.outputs.version }} + docker run --rm ${{ env.DOCKER_IMAGE }}:${{ steps.docker_meta.outputs.version }} curl --help + + - name: Move docker build cache + if: steps.cache_buildx.outputs.cache-hit != 'true' + run: | + rm -rf /tmp/.buildx-cache + mv /tmp/.buildx-cache-new /tmp/.buildx-cache diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 00000000..af6b748e --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,96 @@ +# Create release files +name: Release + +on: + release: + types: [ published ] + +env: + DOCKER_IMAGE: radarbase/kafka-connect-rest-fitbit-source + +jobs: + uploadBackend: + # The type of runner that the job will run on + runs-on: ubuntu-latest + + # Steps represent a sequence of tasks that will be executed as part of the job + steps: + # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it + - uses: actions/checkout@v2 + - uses: actions/setup-java@v1 + with: + java-version: 11 + + - name: Gradle cache + uses: actions/cache@v2 + with: + # Cache gradle directories + path: | + ~/.gradle/caches + ~/.gradle/wrapper + # An explicit key for restoring and saving the cache + key: ${{ runner.os }}-gradle-${{ hashFiles('gradlew', '**/*.gradle', 'gradle.properties', 'gradle/**') }} + restore-keys: | + ${{ runner.os }}-gradle- + + # Compile code + - name: Compile code + run: ./gradlew jar + + # Upload it to GitHub + - name: Upload to GitHub + uses: AButler/upload-release-assets@v2.0 + with: + files: "*/build/libs/*" + repo-token: ${{ secrets.GITHUB_TOKEN }} + + # Build and push tagged release docker image + docker: + # The type of runner that the job will run on + runs-on: ubuntu-latest + + # Steps represent a sequence of tasks that will be executed as part of the job + steps: + - uses: actions/checkout@v2 + + # Setup docker build environment + - name: Set up QEMU + uses: docker/setup-qemu-action@v1 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 + + - name: Login to DockerHub + uses: docker/login-action@v1 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + # Add Docker labels and tags + - name: Docker meta + id: docker_meta + uses: crazy-max/ghaction-docker-meta@v2 + with: + images: ${{ env.DOCKER_IMAGE }} + tags: | + type=semver,pattern={{version}} + type=semver,pattern={{major}}.{{minor}} + + - name: Build docker + uses: docker/build-push-action@v2 + with: + platforms: linux/amd64,linux/arm64 + push: true + tags: ${{ steps.docker_meta.outputs.tags }} + # Use runtime labels from docker_meta_backend as well as fixed labels + labels: | + ${{ steps.docker_meta.outputs.labels }} + maintainer=Joris Borgdorff , Nivethika Mahasivam , Pauline Conde + org.opencontainers.image.description=RADAR-base upload connector backend application + org.opencontainers.image.authors=Joris Borgdorff , Nivethika Mahasivam , Pauline Conde + org.opencontainers.image.vendor=RADAR-base + org.opencontainers.image.licenses=Apache-2.0 + + - name: Inspect image + run: | + docker pull ${{ env.DOCKER_IMAGE }}:${{ steps.docker_meta.outputs.version }} + docker image inspect ${{ env.DOCKER_IMAGE }}:${{ steps.docker_meta.outputs.version }} diff --git a/Dockerfile b/Dockerfile index f1f346a2..15de33df 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,41 +12,32 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM openjdk:8-alpine as builder +FROM gradle:7.2-jdk11 as builder RUN mkdir /code WORKDIR /code -ENV GRADLE_OPTS -Dorg.gradle.daemon=false +ENV GRADLE_USER_HOME=/code/.gradlecache \ + GRADLE_OPTS="-Dorg.gradle.vfs.watch=false" -COPY ./gradle/wrapper /code/gradle/wrapper -COPY ./gradlew /code/ -RUN ./gradlew --version - -COPY ./build.gradle ./settings.gradle /code/ +COPY ./build.gradle ./settings.gradle ./gradle.properties /code/ COPY kafka-connect-rest-source/build.gradle /code/kafka-connect-rest-source/ - -RUN ./gradlew downloadDependencies copyDependencies - COPY kafka-connect-fitbit-source/build.gradle /code/kafka-connect-fitbit-source/ -RUN ./gradlew downloadDependencies copyDependencies +RUN gradle downloadDependencies copyDependencies COPY ./kafka-connect-rest-source/src/ /code/kafka-connect-rest-source/src - -RUN ./gradlew jar - COPY ./kafka-connect-fitbit-source/src/ /code/kafka-connect-fitbit-source/src -RUN ./gradlew jar +RUN gradle jar -FROM confluentinc/cp-kafka-connect-base:5.5.2 +FROM confluentinc/cp-kafka-connect-base:6.2.0-3-ubi8 MAINTAINER Joris Borgdorff LABEL description="Kafka REST API Source connector" -ENV CONNECT_PLUGIN_PATH /usr/share/java/kafka-connect/plugins +ENV CONNECT_PLUGIN_PATH=/usr/share/java/kafka-connect/plugins # To isolate the classpath from the plugin path as recommended COPY --from=builder /code/kafka-connect-rest-source/build/third-party/*.jar ${CONNECT_PLUGIN_PATH}/kafka-connect-rest-source/ diff --git a/README.md b/README.md index 82379590..b186e677 100644 --- a/README.md +++ b/README.md @@ -8,10 +8,12 @@ Fitbit in particular. The documentation of the Kafka Connect REST source still n ### Installation This repository relies on a recent version of docker and docker-compose as well as an installation -of Java 8 or later. +of Java 11 or later. ### Usage +Generally, this component is installed with [RADAR-Kubernetes](https://github.com/RADAR-base/RADAR-Kubernetes). It uses Docker image [radarbase/kafka-connect-rest-fitbit-source](https://hub.docker.com/r/radarbase/kafka-connect-rest-fitbit-source). + First, [register a Fitbit App](https://dev.fitbit.com/apps) with Fitbit. It should be either a server app, for multiple users, or a personal app for a single user. With the server app, you need to [request access to intraday API data](https://dev.fitbit.com/build/reference/web-api/help/). @@ -35,57 +37,57 @@ your Fitbit App client ID and client secret. The following tables shows the poss Importance -rest.source.poll.interval.msHow often to poll the source URL.long60000low +rest.source.poll.interval.msHow often to poll the source URL.long60000low -rest.source.base.urlBase URL for REST source connector.stringhigh +rest.source.base.urlBase URL for REST source connector.stringhigh -rest.source.destination.topicsThe list of destination topics for the REST source connector.list""high +rest.source.destination.topicsThe list of destination topics for the REST source connector.list""high -rest.source.topic.selectorThe topic selector class for REST source connector.classorg.radarbase.connect.rest.selector.SimpleTopicSelectorClass extending org.radarbase.connect.rest.selector.TopicSelectorhigh +rest.source.topic.selectorThe topic selector class for REST source connector.classorg.radarbase.connect.rest.selector.SimpleTopicSelectorClass extending org.radarbase.connect.rest.selector.TopicSelectorhigh -rest.source.payload.converter.classClass to be used to convert messages from REST calls to SourceRecordsclassorg.radarbase.connect.rest.converter.StringPayloadConverterClass extending org.radarbase.connect.rest.converter.PayloadToSourceRecordConverterlow +rest.source.payload.converter.classClass to be used to convert messages from REST calls to SourceRecordsclassorg.radarbase.connect.rest.converter.StringPayloadConverterClass extending org.radarbase.connect.rest.converter.PayloadToSourceRecordConverterlow -rest.source.request.generator.classClass to be used to generate REST requestsclassorg.radarbase.connect.rest.single.SingleRequestGeneratorClass extending org.radarbase.connect.rest.request.RequestGeneratorlow +rest.source.request.generator.classClass to be used to generate REST requestsclassorg.radarbase.connect.rest.single.SingleRequestGeneratorClass extending org.radarbase.connect.rest.request.RequestGeneratorlow -fitbit.usersThe user ID of Fitbit users to include in polling, separated by commas. Non existing user names will be ignored. If empty, all users in the user directory will be used.list""high +fitbit.usersThe user ID of Fitbit users to include in polling, separated by commas. Non existing user names will be ignored. If empty, all users in the user directory will be used.list""high -fitbit.api.clientClient ID for the Fitbit APIstringnon-empty stringhigh +fitbit.api.clientClient ID for the Fitbit APIstringnon-empty stringhigh -fitbit.api.secretSecret for the Fitbit API client set in fitbit.api.client.passwordhigh +fitbit.api.secretSecret for the Fitbit API client set in fitbit.api.client.passwordhigh -fitbit.user.poll.intervalPolling interval per Fitbit user per request route in seconds.int150medium +fitbit.user.poll.intervalPolling interval per Fitbit user per request route in seconds.int150medium -fitbit.api.intradaySet to true if the client has permissions to Fitbit Intraday API, false otherwise.booleanfalsemedium +fitbit.api.intradaySet to true if the client has permissions to Fitbit Intraday API, false otherwise.booleanfalsemedium -fitbit.user.repository.classClass for managing users and authentication.classorg.radarbase.connect.rest.fitbit.user.YamlUserRepositoryClass extending org.radarbase.connect.rest.fitbit.user.UserRepositorymedium +fitbit.user.repository.classClass for managing users and authentication.classorg.radarbase.connect.rest.fitbit.user.YamlUserRepositoryClass extending org.radarbase.connect.rest.fitbit.user.UserRepositorymedium -fitbit.user.dirDirectory containing Fitbit user information and credentials. Only used if a file-based user repository is configured.string/var/lib/kafka-connect-fitbit-source/userslow +fitbit.user.dirDirectory containing Fitbit user information and credentials. Only used if a file-based user repository is configured.string/var/lib/kafka-connect-fitbit-source/userslow -fitbit.user.repository.urlURL for webservice containing user credentials. Only used if a webservice-based user repository is configured.string""low +fitbit.user.repository.urlURL for webservice containing user credentials. Only used if a webservice-based user repository is configured.string""low -fitbit.user.repository.client.idClient ID for connecting to the service repository.string""medium +fitbit.user.repository.client.idClient ID for connecting to the service repository.string""medium -fitbit.user.repository.client.secretClient secret for connecting to the service repository.string""medium +fitbit.user.repository.client.secretClient secret for connecting to the service repository.string""medium -fitbit.user.repository.oauth2.token.urlOAuth 2.0 token url for retrieving client credentials.string""medium +fitbit.user.repository.oauth2.token.urlOAuth 2.0 token url for retrieving client credentials.string""medium -fitbit.intraday.steps.topicTopic for Fitbit intraday stepsstringconnect_fitbit_intraday_stepsnon-empty string without control characterslow +fitbit.intraday.steps.topicTopic for Fitbit intraday stepsstringconnect_fitbit_intraday_stepsnon-empty string without control characterslow -fitbit.intraday.heart.rate.topicTopic for Fitbit intraday heart_ratestringconnect_fitbit_intraday_heart_ratenon-empty string without control characterslow +fitbit.intraday.heart.rate.topicTopic for Fitbit intraday heart_ratestringconnect_fitbit_intraday_heart_ratenon-empty string without control characterslow -fitbit.sleep.stages.topicTopic for Fitbit sleep stagesstringconnect_fitbit_sleep_stagesnon-empty string without control characterslow +fitbit.sleep.stages.topicTopic for Fitbit sleep stagesstringconnect_fitbit_sleep_stagesnon-empty string without control characterslow -fitbit.sleep.classic.topicTopic for Fitbit sleep classic datastringconnect_fitbit_sleep_classicnon-empty string without control characterslow +fitbit.sleep.classic.topicTopic for Fitbit sleep classic datastringconnect_fitbit_sleep_classicnon-empty string without control characterslow -fitbit.time.zone.topicTopic for Fitbit profile time zonestringconnect_fitbit_time_zonenon-empty string without control characterslow +fitbit.time.zone.topicTopic for Fitbit profile time zonestringconnect_fitbit_time_zonenon-empty string without control characterslow -fitbit.activity.log.topicTopic for Fitbit activity log.stringconnect_fitbit_activity_lognon-empty string without control characterslow +fitbit.activity.log.topicTopic for Fitbit activity log.stringconnect_fitbit_activity_lognon-empty string without control characterslow -fitbit.intraday.calories.topicTopic for Fitbit intraday caloriesstringconnect_fitbit_intraday_caloriesnon-empty string without control characterslow +fitbit.intraday.calories.topicTopic for Fitbit intraday caloriesstringconnect_fitbit_intraday_caloriesnon-empty string without control characterslow -fitbit.user.firebase.collection.fitbit.nameFirestore Collection for retrieving Fitbit Auth details. Only used when a Firebase based user repository is used.stringfitbitlow +fitbit.user.firebase.collection.fitbit.nameFirestore Collection for retrieving Fitbit Auth details. Only used when a Firebase based user repository is used.stringfitbitlow -fitbit.user.firebase.collection.user.nameFirestore Collection for retrieving User details. Only used when a Firebase based user repository is used.stringuserslow +fitbit.user.firebase.collection.user.nameFirestore Collection for retrieving User details. Only used when a Firebase based user repository is used.stringuserslow If the ManagementPortal is used to authenticate against the user repository, please add an OAuth client to ManagementPortal with the following properties: diff --git a/build.gradle b/build.gradle index e7f408b0..4322e22f 100644 --- a/build.gradle +++ b/build.gradle @@ -1,33 +1,38 @@ -description = 'kafka-connect-rest-source' - -subprojects { - ext { - kafkaVersion = '2.5.1' - confluentVersion = '5.5.2' - jacksonVersion = '2.11.3' - } +import com.github.benmanes.gradle.versions.updates.DependencyUpdatesTask - apply plugin: 'java' - apply plugin: 'java-library' +plugins { + id("com.github.ben-manes.versions") version "0.39.0" +} - group = 'org.radarbase' - version = '0.3.3' +description = 'kafka-connect-rest-source' - sourceCompatibility = 1.8 - targetCompatibility = 1.8 +allprojects { + group = "org.radarbase" + version = "0.4.0" repositories { mavenCentral() maven { url "https://packages.confluent.io/maven/" } maven { url "https://repo.maven.apache.org/maven2" } - jcenter() - maven { url "https://dl.bintray.com/radar-cns/org.radarcns" } - maven { url 'https://oss.jfrog.org/artifactory/oss-snapshot-local/' } } } +subprojects { + ext { + kafkaVersion = '2.8.0' + confluentVersion = '6.2.0' + jacksonVersion = '2.12.5' + } + + apply plugin: 'java' + apply plugin: 'java-library' + + sourceCompatibility = 11 + targetCompatibility = 11 +} + wrapper { - gradleVersion '6.6.1' + gradleVersion '7.2' } evaluationDependsOnChildren() @@ -42,3 +47,16 @@ task downloadDependencies { println 'Downloaded REST code dependencies' } } + + +def isNonStable = { String version -> + def stableKeyword = ["RELEASE", "FINAL", "GA"].any { version.toUpperCase().contains(it) } + def regex = /^[0-9,.v-]+(-r)?$/ + return !stableKeyword && !(version ==~ regex) +} + +tasks.named("dependencyUpdates").configure { + rejectVersionIf { + isNonStable(it.candidate.version) + } +} diff --git a/docker-compose.yml b/docker-compose.yml index cc5a01c4..dc5238a7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,7 @@ services: # Zookeeper Cluster # #---------------------------------------------------------------------------# zookeeper-1: - image: confluentinc/cp-zookeeper:5.5.1 + image: confluentinc/cp-zookeeper:6.2.0 environment: ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_PORT: 2181 @@ -19,7 +19,7 @@ services: ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888 zookeeper-2: - image: confluentinc/cp-zookeeper:5.5.1 + image: confluentinc/cp-zookeeper:6.2.0 environment: ZOOKEEPER_SERVER_ID: 2 ZOOKEEPER_CLIENT_PORT: 2181 @@ -29,7 +29,7 @@ services: ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888 zookeeper-3: - image: confluentinc/cp-zookeeper:5.5.1 + image: confluentinc/cp-zookeeper:6.2.0 environment: ZOOKEEPER_SERVER_ID: 3 ZOOKEEPER_CLIENT_PORT: 2181 @@ -42,7 +42,7 @@ services: # Kafka Cluster # #---------------------------------------------------------------------------# kafka-1: - image: confluentinc/cp-kafka:5.5.1 + image: confluentinc/cp-kafka:6.2.0 depends_on: - zookeeper-1 - zookeeper-2 @@ -61,7 +61,7 @@ services: KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false" kafka-2: - image: confluentinc/cp-kafka:5.5.1 + image: confluentinc/cp-kafka:6.2.0 depends_on: - zookeeper-1 - zookeeper-2 @@ -80,7 +80,7 @@ services: KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false" kafka-3: - image: confluentinc/cp-kafka:5.5.1 + image: confluentinc/cp-kafka:6.2.0 depends_on: - zookeeper-1 - zookeeper-2 @@ -102,11 +102,8 @@ services: # Schema Registry # #---------------------------------------------------------------------------# schema-registry-1: - image: confluentinc/cp-schema-registry:5.5.1 + image: confluentinc/cp-schema-registry:6.2.0 depends_on: - - zookeeper-1 - - zookeeper-2 - - zookeeper-3 - kafka-1 - kafka-2 - kafka-3 @@ -114,7 +111,7 @@ services: ports: - "8081:8081" environment: - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-1:9092,PLAINTEXT://kafka-2:9092,PLAINTEXT://kafka-3:9092 SCHEMA_REGISTRY_HOST_NAME: schema-registry-1 SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 SCHEMA_REGISTRY_AVRO_COMPATIBILITY_LEVEL: none @@ -124,11 +121,8 @@ services: # REST proxy # #---------------------------------------------------------------------------# rest-proxy-1: - image: confluentinc/cp-kafka-rest:5.5.1 + image: confluentinc/cp-kafka-rest:6.2.0 depends_on: - - zookeeper-1 - - zookeeper-2 - - zookeeper-3 - kafka-1 - kafka-2 - kafka-3 @@ -136,6 +130,7 @@ services: ports: - "8082:8082" environment: + KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-1:9092,PLAINTEXT://kafka-2:9092,PLAINTEXT://kafka-3:9092 KAFKA_REST_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 KAFKA_REST_LISTENERS: http://0.0.0.0:8082 KAFKA_REST_SCHEMA_REGISTRY_URL: http://schema-registry-1:8081 diff --git a/docker/launch b/docker/launch index a9ba474b..6acc8510 100755 --- a/docker/launch +++ b/docker/launch @@ -46,7 +46,7 @@ echo "===> Launching ${COMPONENT} ..." # Add our jar to the classpath so that the custom classes can be loaded first. # And this also makes sure that the CLASSPATH does not start with ":/etc/..." # other jars are loaded via the plugin path -export CLASSPATH="/etc/${COMPONENT}/kafka-connect-mongodb-sink/*" +#export CLASSPATH="/etc/${COMPONENT}/kafka-connect-mongodb-sink/*" # execute connector in standalone mode -exec connect-standalone /etc/"${COMPONENT}"/"${COMPONENT}".properties $(find /etc/"${COMPONENT}"/ -type f -name "${CONNECTOR_PROPERTY_FILE_PREFIX}*.properties") +exec connect-standalone /etc/"${COMPONENT}"/"${COMPONENT}".properties /etc/"${COMPONENT}"/"${CONNECTOR_PROPERTY_FILE_PREFIX}"*.properties diff --git a/gradle.properties b/gradle.properties new file mode 100644 index 00000000..e69de29b diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index e708b1c0..7454180f 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 12d38de6..ffed3a25 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.6.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.2-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index 4f906e0c..744e882e 100755 --- a/gradlew +++ b/gradlew @@ -72,7 +72,7 @@ case "`uname`" in Darwin* ) darwin=true ;; - MINGW* ) + MSYS* | MINGW* ) msys=true ;; NONSTOP* ) diff --git a/kafka-connect-fitbit-source/build.gradle b/kafka-connect-fitbit-source/build.gradle index 79071e29..be07a1c0 100644 --- a/kafka-connect-fitbit-source/build.gradle +++ b/kafka-connect-fitbit-source/build.gradle @@ -1,21 +1,21 @@ dependencies { api project(':kafka-connect-rest-source') api group: 'io.confluent', name: 'kafka-connect-avro-converter', version: confluentVersion - api group: 'org.radarcns', name: 'radar-schemas-commons', version: '0.5.14' + api group: 'org.radarbase', name: 'radar-schemas-commons', version: '0.7.3' - implementation group: 'org.radarcns', name: 'oauth-client-util', version: '0.6.0' + implementation group: 'org.radarbase', name: 'oauth-client-util', version: '0.8.0' implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: jacksonVersion implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: jacksonVersion - implementation 'com.google.firebase:firebase-admin:6.16.0' + implementation 'com.google.firebase:firebase-admin:8.0.1' // Included in connector runtime compileOnly group: 'org.apache.kafka', name: 'connect-api', version: kafkaVersion compileOnly group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: jacksonVersion - testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.6.2' - testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: '5.6.2' - testRuntimeOnly group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30' + testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.7.2' + testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: '5.7.2' + testRuntimeOnly group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.32' testImplementation group: 'org.apache.kafka', name: 'connect-api', version: kafkaVersion } diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitActivityLogAvroConverter.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitActivityLogAvroConverter.java index f441301c..254bb786 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitActivityLogAvroConverter.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitActivityLogAvroConverter.java @@ -156,7 +156,7 @@ private FitbitActivityHeartRate getHeartRate(JsonNode activity) { Optional mean = optInt(activity, "averageHeartRate"); Optional> zones = optArray(activity, "heartRateZones"); - if (!mean.isPresent() && !zones.isPresent()) { + if (mean.isEmpty() && zones.isEmpty()) { return null; } diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitSleepAvroConverter.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitSleepAvroConverter.java index a6164a34..976a348b 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitSleepAvroConverter.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitSleepAvroConverter.java @@ -127,6 +127,10 @@ protected Stream processRecords( }) .collect(Collectors.toList()); + if (allRecords.isEmpty()) { + return Stream.empty(); + } + // The final group gets the actual offset, to ensure that the group does not get queried // again. allRecords.get(allRecords.size() - 1).sourceOffset = startTime; diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/FitbitRequestGenerator.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/FitbitRequestGenerator.java index b4b0a98f..8d062831 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/FitbitRequestGenerator.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/FitbitRequestGenerator.java @@ -24,7 +24,6 @@ import io.confluent.connect.avro.AvroData; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayStepsRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayStepsRoute.java index 0ebc3d84..867e36fe 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayStepsRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayStepsRoute.java @@ -20,7 +20,6 @@ import static java.time.temporal.ChronoUnit.MINUTES; import io.confluent.connect.avro.AvroData; -import java.time.temporal.TemporalAmount; import java.util.stream.Stream; import org.radarbase.connect.rest.fitbit.converter.FitbitIntradayStepsAvroConverter; import org.radarbase.connect.rest.fitbit.request.FitbitRequestGenerator; diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/LocalUser.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/LocalUser.java index 0412d577..2bc3ed67 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/LocalUser.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/LocalUser.java @@ -95,6 +95,10 @@ public void setOauth2Credentials(OAuth2UserCredentials oauth2Credentials) { this.oauth2Credentials = oauth2Credentials; } + public void setIsAuthorized(Boolean isAuthorized) { + this.isAuthorized = isAuthorized; + } + @JsonSetter("fitbitUserId") public void setFitbitUserId(String id) { this.serviceUserId = id; diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java index 626c5562..a2761411 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java @@ -22,8 +22,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectReader; -import io.confluent.common.config.ConfigException; import java.io.IOException; +import java.net.ProtocolException; import java.net.URL; import java.time.Duration; import java.time.Instant; @@ -44,10 +44,11 @@ import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.ResponseBody; +import org.apache.kafka.common.config.ConfigException; import org.radarbase.connect.rest.RestSourceConnectorConfig; import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig; -import org.radarcns.exception.TokenException; -import org.radarcns.oauth.OAuth2Client; +import org.radarbase.exception.TokenException; +import org.radarbase.oauth.OAuth2Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +62,8 @@ public class ServiceUserRepository implements UserRepository { private static final RequestBody EMPTY_BODY = RequestBody.create("", MediaType.parse("application/json; charset=utf-8")); private static final Duration FETCH_THRESHOLD = Duration.ofMinutes(1L); + private static final Duration CONNECTION_TIMEOUT = Duration.ofSeconds(60); + private static final Duration CONNECTION_READ_TIMEOUT = Duration.ofSeconds(90); private final OkHttpClient client; private final Map cachedCredentials; @@ -73,7 +76,10 @@ public class ServiceUserRepository implements UserRepository { private String basicCredentials; public ServiceUserRepository() { - this.client = new OkHttpClient(); + this.client = new OkHttpClient.Builder() + .connectTimeout(CONNECTION_TIMEOUT) + .readTimeout(CONNECTION_READ_TIMEOUT) + .build(); this.cachedCredentials = new HashMap<>(); this.containedUsers = new HashSet<>(); } @@ -115,29 +121,52 @@ public Stream stream() { try { applyPendingUpdates(); } catch (IOException ex) { - logger.error("Failed to initially get users from repository"); + logger.error("Failed to initially get users from repository", ex); } } - return this.timedCachedUsers.stream(); + return this.timedCachedUsers.stream() + .filter(User::isComplete); } @Override public String getAccessToken(User user) throws IOException, NotAuthorizedException { + if (!user.isAuthorized()) { + throw new NotAuthorizedException("User is not authorized"); + } OAuth2UserCredentials credentials = cachedCredentials.get(user.getId()); if (credentials == null || credentials.isAccessTokenExpired()) { - Request request = requestFor("users/" + user.getId() + "/token").build(); - credentials = makeRequest(request, OAUTH_READER); - cachedCredentials.put(user.getId(), credentials); + try { + Request request = requestFor("users/" + user.getId() + "/token").build(); + credentials = makeRequest(request, OAUTH_READER); + cachedCredentials.put(user.getId(), credentials); + } catch (NotAuthorizedException ex) { + cachedCredentials.remove(user.getId()); + if (user instanceof LocalUser) { + ((LocalUser) user).setIsAuthorized(false); + } + throw ex; + } } return credentials.getAccessToken(); } @Override public String refreshAccessToken(User user) throws IOException, NotAuthorizedException { + if (!user.isAuthorized()) { + throw new NotAuthorizedException("User is not authorized"); + } Request request = requestFor("users/" + user.getId() + "/token").post(EMPTY_BODY).build(); - OAuth2UserCredentials credentials = makeRequest(request, OAUTH_READER); - cachedCredentials.put(user.getId(), credentials); - return credentials.getAccessToken(); + try { + OAuth2UserCredentials credentials = makeRequest(request, OAUTH_READER); + cachedCredentials.put(user.getId(), credentials); + return credentials.getAccessToken(); + } catch (NotAuthorizedException ex) { + cachedCredentials.remove(user.getId()); + if (user instanceof LocalUser) { + ((LocalUser) user).setIsAuthorized(false); + } + throw ex; + } } @Override @@ -195,6 +224,8 @@ private T makeRequest(Request request, ObjectReader reader) throws IOExcepti if (response.code() == 404) { throw new NoSuchElementException("URL " + request.url() + " does not exist"); + } else if (response.code() == 407) { + throw new NotAuthorizedException("Refresh token cannot be retrieved for unauthorized user"); } else if (!response.isSuccessful() || body == null) { String message = "Failed to make request"; if (response.code() > 0) { @@ -209,9 +240,11 @@ private T makeRequest(Request request, ObjectReader reader) throws IOExcepti try { return reader.readValue(bodyString); } catch (JsonProcessingException ex) { - logger.error("Failed to parse JSON: {}\n{}", ex.toString(), bodyString); + logger.error("Failed to parse JSON: {}\n{}", ex, bodyString); throw ex; } + } catch (ProtocolException ex) { + throw new NotAuthorizedException("Refresh token cannot be retrieved for unauthorized user"); } } } diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/CovidCollabFirebaseUserRepository.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/CovidCollabFirebaseUserRepository.java index cb8ab697..89b9e8f7 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/CovidCollabFirebaseUserRepository.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/CovidCollabFirebaseUserRepository.java @@ -122,7 +122,7 @@ public void initialize(RestSourceConnectorConfig config) { fitbitConfig.getFitbitClientSecret(), FITBIT_TOKEN_ENDPOINT); - /** + /* * Currently, we only listen for the fitbit collection, as it contains most information while * the user collection only contains project Id which is not supposed to change. The user * document is pulled every time the corresponding fitbit document is pulled, so it will be diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUserRepository.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUserRepository.java index 198dccce..771ae9c9 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUserRepository.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUserRepository.java @@ -34,10 +34,9 @@ public void initialize(RestSourceConnectorConfig config) { // See https://firebase.google.com/docs/admin/setup#initialize-sdk for more details. FirebaseOptions options; try { - options = - new FirebaseOptions.Builder() - .setCredentials(GoogleCredentials.getApplicationDefault()) - .build(); + options = FirebaseOptions.builder() + .setCredentials(GoogleCredentials.getApplicationDefault()) + .build(); } catch (IOException exc) { logger.error("Failed to get credentials for Firebase app.", exc); throw new IllegalStateException(exc); diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FitbitTokenService.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FitbitTokenService.java index 97f3311d..bfba3002 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FitbitTokenService.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FitbitTokenService.java @@ -19,9 +19,9 @@ public class FitbitTokenService { private static final Logger logger = LoggerFactory.getLogger(FitbitTokenService.class); private final OkHttpClient client; private final ObjectMapper mapper; - private String clientId; - private String clientSecret; - private String tokenEndpoint; + private final String clientId; + private final String clientSecret; + private final String tokenEndpoint; public FitbitTokenService(String clientId, String clientSecret, String tokenEndpoint) { this.clientId = clientId; diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/util/DateRange.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/util/DateRange.java index b799f1bc..d5f61565 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/util/DateRange.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/util/DateRange.java @@ -18,7 +18,6 @@ package org.radarbase.connect.rest.fitbit.util; import java.time.ZonedDateTime; -import java.time.temporal.TemporalAmount; import java.util.Objects; public class DateRange { @@ -30,11 +29,6 @@ public DateRange(ZonedDateTime start, ZonedDateTime end) { this.end = end; } - public DateRange(ZonedDateTime start, TemporalAmount duration) { - this.start = start; - this.end = start.plus(duration); - } - public ZonedDateTime start() { return start; } diff --git a/kafka-connect-rest-source/build.gradle b/kafka-connect-rest-source/build.gradle index 97c085e8..58cbba9b 100644 --- a/kafka-connect-rest-source/build.gradle +++ b/kafka-connect-rest-source/build.gradle @@ -1,14 +1,14 @@ dependencies { - api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.0' + api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.1' // Included in connector runtime compileOnly group: 'org.apache.kafka', name: 'connect-api', version: kafkaVersion - testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3' - testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.6.2' - testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: '5.6.2' - testImplementation group: 'org.mockito', name: 'mockito-core', version: '2.27.0' - testImplementation group: 'com.github.tomakehurst', name: 'wiremock', version: '2.23.2' + testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.5' + testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.7.2' + testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: '5.7.2' + testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4' + testImplementation group: 'com.github.tomakehurst', name: 'wiremock', version: '2.27.2' testImplementation group: 'org.apache.kafka', name: 'connect-api', version: kafkaVersion } diff --git a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/RestSourceTask.java b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/RestSourceTask.java index b957496a..a425f16a 100644 --- a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/RestSourceTask.java +++ b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/RestSourceTask.java @@ -18,20 +18,15 @@ package org.radarbase.connect.rest; import static java.time.temporal.ChronoUnit.MILLIS; -import static org.radarbase.connect.rest.util.ThrowingFunction.tryOrNull; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.time.Instant; -import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.concurrent.atomic.LongAdder; -import java.util.function.Function; import java.util.stream.Collectors; +import javax.ws.rs.NotAuthorizedException; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -42,7 +37,7 @@ import org.slf4j.LoggerFactory; public class RestSourceTask extends SourceTask { - private static Logger logger = LoggerFactory.getLogger(RestSourceTask.class); + private static final Logger logger = LoggerFactory.getLogger(RestSourceTask.class); private RequestGenerator requestGenerator; @@ -92,7 +87,7 @@ public List poll() throws InterruptedException { try { requests = request.handleRequest() .collect(Collectors.toList()); - } catch (IOException ex) { + } catch (IOException | NotAuthorizedException ex) { logger.warn("Failed to make request: {}", ex.toString()); } } diff --git a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/BytesPayloadConverter.java b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/BytesPayloadConverter.java index 891ae478..69262954 100644 --- a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/BytesPayloadConverter.java +++ b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/BytesPayloadConverter.java @@ -19,13 +19,10 @@ import static java.lang.System.currentTimeMillis; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Map; import okhttp3.Headers; -import okhttp3.Response; -import okhttp3.ResponseBody; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.source.SourceRecord; import org.radarbase.connect.rest.RestSourceConnectorConfig; diff --git a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/request/RestRequest.java b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/request/RestRequest.java index 0abc06d2..8e7d5ce2 100644 --- a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/request/RestRequest.java +++ b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/request/RestRequest.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.function.Predicate; import java.util.stream.Stream; +import javax.ws.rs.NotAuthorizedException; import okhttp3.Headers; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -95,7 +96,7 @@ public Stream handleRequest() throws IOException { headers = response.headers(); ResponseBody body = response.body(); data = body != null ? body.bytes() : null; - } catch (IOException ex) { + } catch (IOException | NotAuthorizedException ex) { route.requestFailed(this, null); throw ex; } diff --git a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/single/SingleRequestGenerator.java b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/single/SingleRequestGenerator.java index 4cff9842..a6298526 100644 --- a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/single/SingleRequestGenerator.java +++ b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/single/SingleRequestGenerator.java @@ -17,6 +17,7 @@ package org.radarbase.connect.rest.single; +import static java.util.Objects.requireNonNullElse; import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.MIN_INSTANT; import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.TIMESTAMP_OFFSET_KEY; import static org.radarbase.connect.rest.request.PollingRequestRoute.max; @@ -25,6 +26,7 @@ import java.time.Instant; import java.util.Collections; import java.util.Map; +import java.util.Objects; import java.util.stream.Stream; import okhttp3.Headers; import okhttp3.HttpUrl; @@ -75,12 +77,8 @@ public void initialize(RestSourceConnectorConfig config) { if (singleConfig.getData() != null && !singleConfig.getData().isEmpty()) { String contentType = headers.get("Content-Type"); - MediaType mediaType; - if (contentType == null) { - mediaType = MediaType.parse("text/plain; charset=utf-8"); - } else { - mediaType = MediaType.parse(contentType); - } + MediaType mediaType = MediaType.parse( + requireNonNullElse(contentType, "text/plain; charset=utf-8")); body = RequestBody.create(singleConfig.getData(), mediaType); } diff --git a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/single/SingleRestSourceConnectorConfig.java b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/single/SingleRestSourceConnectorConfig.java index e3c901d2..842a5dab 100644 --- a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/single/SingleRestSourceConnectorConfig.java +++ b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/single/SingleRestSourceConnectorConfig.java @@ -47,7 +47,6 @@ public class SingleRestSourceConnectorConfig extends RestSourceConnectorConfig { private final Map requestProperties; - @SuppressWarnings("unchecked") private SingleRestSourceConnectorConfig(ConfigDef config, Map parsedConfig, boolean doLog) { super(config, parsedConfig, doLog); diff --git a/kafka-connect-rest-source/src/test/java/org/radarbase/connect/rest/RestTaskTest.java b/kafka-connect-rest-source/src/test/java/org/radarbase/connect/rest/RestTaskTest.java index ef5f8d37..d4868e6a 100644 --- a/kafka-connect-rest-source/src/test/java/org/radarbase/connect/rest/RestTaskTest.java +++ b/kafka-connect-rest-source/src/test/java/org/radarbase/connect/rest/RestTaskTest.java @@ -28,7 +28,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching; import static com.github.tomakehurst.wiremock.client.WireMock.verify; import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.client.VerificationException; @@ -118,10 +118,10 @@ public OffsetStorageReader offsetStorageReader() { sourceTask.start(props); messages = sourceTask.poll(); - assertEquals("Message count: ", 1, messages.size()); - assertEquals("Response class: ", String.class, messages.get(0).value().getClass()); - assertEquals("Response body: ", RESPONSE_BODY, messages.get(0).value()); - assertEquals("Topic: ", TOPIC, messages.get(0).topic()); + assertEquals(1, messages.size(), "Message count: "); + assertEquals(String.class, messages.get(0).value().getClass(), "Response class: "); + assertEquals(RESPONSE_BODY, messages.get(0).value(), "Response body: "); + assertEquals(TOPIC, messages.get(0).topic(), "Topic: "); verify(postRequestedFor(urlMatching(PATH)) .withRequestBody(equalTo(DATA)) @@ -134,10 +134,10 @@ public OffsetStorageReader offsetStorageReader() { sourceTask.start(props); messages = sourceTask.poll(); - assertEquals("Message count: ", 1, messages.size()); - assertEquals("Response class: ", byte[].class, messages.get(0).value().getClass()); - assertEquals("Response body: ", RESPONSE_BODY, new String((byte[]) messages.get(0).value())); - assertEquals("Topic: ", TOPIC, messages.get(0).topic()); + assertEquals(1, messages.size(), "Message count: "); + assertEquals(byte[].class, messages.get(0).value().getClass(), "Response class: "); + assertEquals(RESPONSE_BODY, new String((byte[]) messages.get(0).value()), "Response body: "); + assertEquals(TOPIC, messages.get(0).topic(), "Topic: "); verify(postRequestedFor(urlMatching(PATH)) .withRequestBody(equalTo(DATA))