From 5f8447b0d3346572d18e35bb5caa2219dfa03d59 Mon Sep 17 00:00:00 2001 From: Ivan Greguric Ortolan Date: Fri, 14 May 2021 20:58:53 +0200 Subject: [PATCH 1/3] Directory support first draft --- .../io/conduktor/ksm/AclSynchronizer.scala | 69 ++++++++++--------- .../ksm/source/BitbucketCloudSourceAcl.scala | 10 ++- .../ksm/source/BitbucketServerSourceAcl.scala | 9 +-- .../conduktor/ksm/source/FileSourceAcl.scala | 34 ++++++--- .../ksm/source/GitHubSourceAcl.scala | 13 ++-- .../ksm/source/GitLabSourceAcl.scala | 10 +-- .../io/conduktor/ksm/source/NoSourceAcl.scala | 2 +- .../io/conduktor/ksm/source/S3SourceAcl.scala | 16 ++--- .../io/conduktor/ksm/source/SourceAcl.scala | 6 +- 9 files changed, 98 insertions(+), 71 deletions(-) diff --git a/src/main/scala/io/conduktor/ksm/AclSynchronizer.scala b/src/main/scala/io/conduktor/ksm/AclSynchronizer.scala index dccc56f..6cb1f75 100644 --- a/src/main/scala/io/conduktor/ksm/AclSynchronizer.scala +++ b/src/main/scala/io/conduktor/ksm/AclSynchronizer.scala @@ -6,6 +6,7 @@ import io.conduktor.ksm.source.{ParsingContext, SourceAcl} import kafka.security.auth.{Acl, Authorizer, Resource} import org.slf4j.{Logger, LoggerFactory} +import scala.collection.mutable import scala.util.{Failure, Success, Try} object AclSynchronizer { @@ -66,7 +67,8 @@ class AclSynchronizer( import AclSynchronizer._ - private var sourceAclsCache: Set[(Resource, Acl)] = _ + private val sourceAclsCache: mutable.Map[String, Set[(Resource, Acl)]] = + mutable.Map() private var failedRefreshes: Int = 0 if (readOnly) { @@ -84,50 +86,49 @@ class AclSynchronizer( Try(sourceAcl.refresh()) match { case Success(result) => failedRefreshes = 0 - result match { - // the source has not changed - case None => - if (sourceAclsCache != null) { - // the Kafka Acls may have changed so we check against the last known correct SourceAcl that we cached + result.foreach { context: ParsingContext => + context match { + case ParsingContext(resourceKey, aclParser, reader, true) => + val sourceAclResult = aclParser.aclsFromReader(reader) + reader.close() + sourceAclResult.result match { + // the source has changed + case Right(ksmAcls) => + // we have a new result, so we cache it + sourceAclsCache + (resourceKey -> ksmAcls) + applySourceAcls( + sourceAclsCache(resourceKey), + getKafkaAcls, + notification, + authorizer + ) + case Left(parsingExceptions: List[Exception]) => + // parsing exceptions we want to notify + log.error( + "Exceptions while refreshing ACL source:", + parsingExceptions.map(e => e.toString).mkString("\n") + ) + // ugly but for now this will do + notification.notifyErrors( + parsingExceptions.map(e => Try(throw e)) + ) + } + case ParsingContext(resourceKey, _, _, false) => + // the source does not need updating, reapply the permission applySourceAcls( - sourceAclsCache, + sourceAclsCache.getOrElse(resourceKey, Set()), getKafkaAcls, notification, authorizer ) - } - case Some(ParsingContext(parser, reader)) => - val sourceAclResult = parser.aclsFromReader(reader) - reader.close() - sourceAclResult.result match { - // the source has changed - case Right(ksmAcls) => - // we have a new result, so we cache it - sourceAclsCache = ksmAcls - applySourceAcls( - sourceAclsCache, - getKafkaAcls, - notification, - authorizer - ) - case Left(parsingExceptions: List[Exception]) => - // parsing exceptions we want to notify - log.error( - "Exceptions while refreshing ACL source:", - parsingExceptions.map(e => e.toString).mkString("\n") - ) - // ugly but for now this will do - notification.notifyErrors( - parsingExceptions.map(e => Try(throw e)) - ) - } + } } case Failure(e) => // errors such as HTTP exceptions when refreshing failedRefreshes += 1 try { log.error("Exceptions while refreshing ACL source:", e) - if(failedRefreshes >= numFailedRefreshesBeforeNotification){ + if (failedRefreshes >= numFailedRefreshesBeforeNotification) { notification.notifyErrors(List(Try(e))) failedRefreshes = 0 } diff --git a/src/main/scala/io/conduktor/ksm/source/BitbucketCloudSourceAcl.scala b/src/main/scala/io/conduktor/ksm/source/BitbucketCloudSourceAcl.scala index a89a220..97263ed 100644 --- a/src/main/scala/io/conduktor/ksm/source/BitbucketCloudSourceAcl.scala +++ b/src/main/scala/io/conduktor/ksm/source/BitbucketCloudSourceAcl.scala @@ -45,7 +45,7 @@ class BitbucketCloudSourceAcl(parserRegistry: AclParserRegistry) password = config.getString(AUTH_PASSWORD_CONFIG) } - override def refresh(): Option[ParsingContext] = { + override def refresh(): List[ParsingContext] = { // get the latest file val url = s"$apiurl/repositories/$organization/$repo/src/master/$filePath" val request: Request = new Request(url) @@ -63,7 +63,13 @@ class BitbucketCloudSourceAcl(parserRegistry: AclParserRegistry) case 200 => // we receive a valid response val reader = new BufferedReader(new StringReader(response.textBody)) - Some(source.ParsingContext(parserRegistry.getParserByFilename(filePath), reader)) + List( + ParsingContext( + filePath, + parserRegistry.getParserByFilename(filePath), + reader + ) + ) case _ => // uncaught error log.warn(response.asString) diff --git a/src/main/scala/io/conduktor/ksm/source/BitbucketServerSourceAcl.scala b/src/main/scala/io/conduktor/ksm/source/BitbucketServerSourceAcl.scala index 8dc46df..115c498 100644 --- a/src/main/scala/io/conduktor/ksm/source/BitbucketServerSourceAcl.scala +++ b/src/main/scala/io/conduktor/ksm/source/BitbucketServerSourceAcl.scala @@ -63,7 +63,7 @@ class BitbucketServerSourceAcl(parserRegistry: AclParserRegistry) }) } - override def refresh(): Option[ParsingContext] = { + override def refresh(): List[ParsingContext] = { // get changes since last commit val url = s"$protocol://$hostname:$port/rest/api/1.0/projects/$project/repos/$repo/commits" @@ -103,8 +103,9 @@ class BitbucketServerSourceAcl(parserRegistry: AclParserRegistry) // update the last commit id lastCommit = Some(values.get(0).get("id").asText()) val data = fileResponse.textBody - Some( - source.ParsingContext( + List( + ParsingContext( + filePath, parserRegistry.getParserByFilename(filePath), new StringReader(data) ) @@ -115,7 +116,7 @@ class BitbucketServerSourceAcl(parserRegistry: AclParserRegistry) throw HTTPException(Some(response.asString), response) } } else { - None + List() } case _ => // uncaught error diff --git a/src/main/scala/io/conduktor/ksm/source/FileSourceAcl.scala b/src/main/scala/io/conduktor/ksm/source/FileSourceAcl.scala index 22744b0..1d89053 100644 --- a/src/main/scala/io/conduktor/ksm/source/FileSourceAcl.scala +++ b/src/main/scala/io/conduktor/ksm/source/FileSourceAcl.scala @@ -2,9 +2,9 @@ package io.conduktor.ksm.source import com.typesafe.config.Config import io.conduktor.ksm.parser.AclParserRegistry -import io.conduktor.ksm.source import java.io.{File, FileReader} +import scala.util.Try class FileSourceAcl(parserRegistry: AclParserRegistry) extends SourceAcl(parserRegistry) { @@ -14,6 +14,7 @@ class FileSourceAcl(parserRegistry: AclParserRegistry) var lastModified: Long = -1 var filename: String = _ + val modifiedMap: Map[String, Long] = Map[String, Long]() /** * internal config definition for the module @@ -29,14 +30,31 @@ class FileSourceAcl(parserRegistry: AclParserRegistry) * Uses a CSV parser on the file afterwards * @return */ - override def refresh(): Option[ParsingContext] = { - val file = new File(filename) - if (file.lastModified() > lastModified) { - val reader = new FileReader(file) - lastModified = file.lastModified() - Some(source.ParsingContext(parserRegistry.getParserByFilename(filename), reader)) + override def refresh(): List[ParsingContext] = { + + val path = new File(filename) + if (path.exists()) { + val files = + if (path.isFile) + List(path) + else + path.listFiles.filter(_.isFile).toList + + files.map(file => + ParsingContext( + file.getName, + parserRegistry.getParserByFilename(file.getName), + new FileReader(file), + file.lastModified >= (modifiedMap.get(file.getName) match { + case None => + modifiedMap + (file.getName -> file.lastModified) + 0L + case Some(value) => value + }) + ) + ) } else { - None + List() } } diff --git a/src/main/scala/io/conduktor/ksm/source/GitHubSourceAcl.scala b/src/main/scala/io/conduktor/ksm/source/GitHubSourceAcl.scala index 603fec1..e0d7806 100644 --- a/src/main/scala/io/conduktor/ksm/source/GitHubSourceAcl.scala +++ b/src/main/scala/io/conduktor/ksm/source/GitHubSourceAcl.scala @@ -3,7 +3,6 @@ package io.conduktor.ksm.source import com.fasterxml.jackson.databind.ObjectMapper import com.typesafe.config.Config import io.conduktor.ksm.parser.AclParserRegistry -import io.conduktor.ksm.source import org.slf4j.LoggerFactory import skinny.http.{HTTP, HTTPException, Request, Response} @@ -49,7 +48,7 @@ class GitHubSourceAcl(parserRegistry: AclParserRegistry) tokenOpt = Try(config.getString(AUTH_TOKEN_CONFIG)).toOption } - override def refresh(): Option[ParsingContext] = { + override def refresh(): List[ParsingContext] = { val url = s"https://$hostname/repos/$user/$repo/contents/$filepath?ref=$branch" val request: Request = new Request(url) @@ -80,11 +79,15 @@ class GitHubSourceAcl(parserRegistry: AclParserRegistry) Charset.forName("UTF-8") ) // use the CSV Parser - Some( - source.ParsingContext(parserRegistry.getParserByFilename(filepath), new StringReader(data)) + List( + ParsingContext( + filepath, + parserRegistry.getParserByFilename(filepath), + new StringReader(data) + ) ) case 304 => - None + List() case _ => // we got an http error so we propagate it log.warn(response.asString) diff --git a/src/main/scala/io/conduktor/ksm/source/GitLabSourceAcl.scala b/src/main/scala/io/conduktor/ksm/source/GitLabSourceAcl.scala index 1f1fb0c..57cefc0 100644 --- a/src/main/scala/io/conduktor/ksm/source/GitLabSourceAcl.scala +++ b/src/main/scala/io/conduktor/ksm/source/GitLabSourceAcl.scala @@ -3,7 +3,6 @@ package io.conduktor.ksm.source import com.fasterxml.jackson.databind.ObjectMapper import com.typesafe.config.Config import io.conduktor.ksm.parser.AclParserRegistry -import io.conduktor.ksm.source import org.slf4j.LoggerFactory import skinny.http.{HTTP, HTTPException, Request, Response} @@ -42,7 +41,7 @@ class GitLabSourceAcl(parserRegistry: AclParserRegistry) accessToken = config.getString(ACCESSTOKEN_CONFIG) } - override def refresh(): Option[ParsingContext] = { + override def refresh(): List[ParsingContext] = { val url = s"https://$hostname/api/v4/projects/$repoid/repository/files/$filepath?ref=$branch" val request: Request = new Request(url) @@ -62,7 +61,7 @@ class GitLabSourceAcl(parserRegistry: AclParserRegistry) log.info( s"No changes were detected in the ACL file ${filepath}. Skipping .... " ) - None + List() case _ => val response: Response = HTTP.get(request) response.status match { @@ -76,8 +75,9 @@ class GitLabSourceAcl(parserRegistry: AclParserRegistry) Charset.forName("UTF-8") ) // use the CSV Parser - Some( - source.ParsingContext( + List( + ParsingContext( + filepath, parserRegistry.getParserByFilename(filepath), new StringReader(data) ) diff --git a/src/main/scala/io/conduktor/ksm/source/NoSourceAcl.scala b/src/main/scala/io/conduktor/ksm/source/NoSourceAcl.scala index d0bf10a..f83045f 100644 --- a/src/main/scala/io/conduktor/ksm/source/NoSourceAcl.scala +++ b/src/main/scala/io/conduktor/ksm/source/NoSourceAcl.scala @@ -26,7 +26,7 @@ class NoSourceAcl(parserRegistry: AclParserRegistry) extends SourceAcl(parserReg * * @return */ - override def refresh(): Option[ParsingContext] = None + override def refresh(): List[ParsingContext] = List() /** * Close all the necessary underlying objects or connections belonging to this instance diff --git a/src/main/scala/io/conduktor/ksm/source/S3SourceAcl.scala b/src/main/scala/io/conduktor/ksm/source/S3SourceAcl.scala index 52a23ec..71d0caa 100644 --- a/src/main/scala/io/conduktor/ksm/source/S3SourceAcl.scala +++ b/src/main/scala/io/conduktor/ksm/source/S3SourceAcl.scala @@ -1,16 +1,15 @@ package io.conduktor.ksm.source -import java.io._ -import java.util.Date import com.amazonaws.regions.Regions import com.amazonaws.services.s3._ import com.amazonaws.services.s3.model._ -import io.conduktor.ksm.parser.AclParserRegistry import com.typesafe.config.Config import io.conduktor.ksm.parser.AclParserRegistry -import io.conduktor.ksm.source import org.slf4j.LoggerFactory +import java.io._ +import java.util.Date + class S3SourceAcl(parserRegistry: AclParserRegistry) extends SourceAcl(parserRegistry) { @@ -59,7 +58,7 @@ class S3SourceAcl(parserRegistry: AclParserRegistry) * * @return */ - override def refresh(): Option[ParsingContext] = { + override def refresh(): List[ParsingContext] = { val s3 = s3Client() val s3object = Option( s3.getObject( @@ -84,13 +83,14 @@ class S3SourceAcl(parserRegistry: AclParserRegistry) reader.close() bucket.close() - Some( - source.ParsingContext( + List( + ParsingContext( + key, parserRegistry.getParserByFilename(key), new BufferedReader(new StringReader(content)) ) ) - case None => None + case None => List() } } diff --git a/src/main/scala/io/conduktor/ksm/source/SourceAcl.scala b/src/main/scala/io/conduktor/ksm/source/SourceAcl.scala index e11da07..2383207 100644 --- a/src/main/scala/io/conduktor/ksm/source/SourceAcl.scala +++ b/src/main/scala/io/conduktor/ksm/source/SourceAcl.scala @@ -1,12 +1,10 @@ package io.conduktor.ksm.source -import io.conduktor.ksm.parser.AclParserRegistry - import java.io.Reader import com.typesafe.config.Config import io.conduktor.ksm.parser.{AclParser, AclParserRegistry} -case class ParsingContext(aclParser: AclParser, reader: Reader) +case class ParsingContext(resourceKey: String, aclParser: AclParser, reader: Reader, shouldUpdate: Boolean = true) abstract class SourceAcl(val parserRegistry: AclParserRegistry) { @@ -30,7 +28,7 @@ abstract class SourceAcl(val parserRegistry: AclParserRegistry) { * Kafka Security Manager will not update Acls in Kafka until there are no errors in the result * @return */ - def refresh(): Option[ParsingContext] + def refresh(): List[ParsingContext] /** * Close all the necessary underlying objects or connections belonging to this instance From b61f3dbcf9ae52875a0ae9403fa1aa68f5da900a Mon Sep 17 00:00:00 2001 From: Ivan Greguric Ortolan Date: Tue, 29 Jun 2021 20:52:24 +0200 Subject: [PATCH 2/3] Remove unused content, fix minor quirks --- src/main/scala/io/conduktor/ksm/AclSynchronizer.scala | 9 +++------ .../scala/io/conduktor/ksm/source/FileSourceAcl.scala | 7 +++++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/scala/io/conduktor/ksm/AclSynchronizer.scala b/src/main/scala/io/conduktor/ksm/AclSynchronizer.scala index 6cb1f75..2256717 100644 --- a/src/main/scala/io/conduktor/ksm/AclSynchronizer.scala +++ b/src/main/scala/io/conduktor/ksm/AclSynchronizer.scala @@ -1,12 +1,10 @@ package io.conduktor.ksm -import io.conduktor.ksm.source.SourceAcl import io.conduktor.ksm.notification.Notification import io.conduktor.ksm.source.{ParsingContext, SourceAcl} import kafka.security.auth.{Acl, Authorizer, Resource} import org.slf4j.{Logger, LoggerFactory} -import scala.collection.mutable import scala.util.{Failure, Success, Try} object AclSynchronizer { @@ -67,8 +65,7 @@ class AclSynchronizer( import AclSynchronizer._ - private val sourceAclsCache: mutable.Map[String, Set[(Resource, Acl)]] = - mutable.Map() + private var sourceAclsCache: Map[String, Set[(Resource, Acl)]] = Map() private var failedRefreshes: Int = 0 if (readOnly) { @@ -95,7 +92,7 @@ class AclSynchronizer( // the source has changed case Right(ksmAcls) => // we have a new result, so we cache it - sourceAclsCache + (resourceKey -> ksmAcls) + sourceAclsCache += (resourceKey -> ksmAcls) applySourceAcls( sourceAclsCache(resourceKey), getKafkaAcls, @@ -110,7 +107,7 @@ class AclSynchronizer( ) // ugly but for now this will do notification.notifyErrors( - parsingExceptions.map(e => Try(throw e)) + parsingExceptions.map(e => Failure(e)) ) } case ParsingContext(resourceKey, _, _, false) => diff --git a/src/main/scala/io/conduktor/ksm/source/FileSourceAcl.scala b/src/main/scala/io/conduktor/ksm/source/FileSourceAcl.scala index 1d89053..bf353a9 100644 --- a/src/main/scala/io/conduktor/ksm/source/FileSourceAcl.scala +++ b/src/main/scala/io/conduktor/ksm/source/FileSourceAcl.scala @@ -2,17 +2,19 @@ package io.conduktor.ksm.source import com.typesafe.config.Config import io.conduktor.ksm.parser.AclParserRegistry +import org.slf4j.{Logger, LoggerFactory} import java.io.{File, FileReader} -import scala.util.Try class FileSourceAcl(parserRegistry: AclParserRegistry) extends SourceAcl(parserRegistry) { + private val log: Logger = + LoggerFactory.getLogger(classOf[FileSourceAcl].getSimpleName) + override val CONFIG_PREFIX: String = "file" final val FILENAME_CONFIG = "filename" - var lastModified: Long = -1 var filename: String = _ val modifiedMap: Map[String, Long] = Map[String, Long]() @@ -54,6 +56,7 @@ class FileSourceAcl(parserRegistry: AclParserRegistry) ) ) } else { + log.error(s"The provided path does not exist: $path") List() } } From 7d3901e6018d96f1699cffd331931501fba26d1b Mon Sep 17 00:00:00 2001 From: Ivan Greguric Ortolan Date: Tue, 29 Jun 2021 23:48:55 +0200 Subject: [PATCH 3/3] Fix tests and application code I realized that the code was completely wrong, I need to call the applySourceAcls with all values not for every parsing context. So what I do now is, first I collect all acls either from the reader or cache and then apply on the result --- .../io/conduktor/ksm/AclSynchronizer.scala | 85 ++++++++------ .../conduktor/ksm/source/FileSourceAcl.scala | 11 +- .../conduktor/ksm/AclSynchronizerTest.scala | 7 -- .../source/BitbucketServerSourceAclTest.scala | 111 ++++++++++++------ .../conduktor/ksm/source/DummySourceAcl.scala | 36 ++++-- .../ksm/source/FileSourceAclTest.scala | 14 +-- .../ksm/source/S3SourceAclTest.scala | 20 ++-- 7 files changed, 169 insertions(+), 115 deletions(-) diff --git a/src/main/scala/io/conduktor/ksm/AclSynchronizer.scala b/src/main/scala/io/conduktor/ksm/AclSynchronizer.scala index 2256717..b36163f 100644 --- a/src/main/scala/io/conduktor/ksm/AclSynchronizer.scala +++ b/src/main/scala/io/conduktor/ksm/AclSynchronizer.scala @@ -1,7 +1,8 @@ package io.conduktor.ksm +import cats.implicits._ import io.conduktor.ksm.notification.Notification -import io.conduktor.ksm.source.{ParsingContext, SourceAcl} +import io.conduktor.ksm.source.{ParsingContext, SourceAcl, SourceAclResult} import kafka.security.auth.{Acl, Authorizer, Resource} import org.slf4j.{Logger, LoggerFactory} @@ -83,42 +84,56 @@ class AclSynchronizer( Try(sourceAcl.refresh()) match { case Success(result) => failedRefreshes = 0 - result.foreach { context: ParsingContext => - context match { - case ParsingContext(resourceKey, aclParser, reader, true) => - val sourceAclResult = aclParser.aclsFromReader(reader) - reader.close() - sourceAclResult.result match { - // the source has changed - case Right(ksmAcls) => - // we have a new result, so we cache it - sourceAclsCache += (resourceKey -> ksmAcls) - applySourceAcls( - sourceAclsCache(resourceKey), - getKafkaAcls, - notification, - authorizer + result + .map { context: ParsingContext => + context match { + // in case there is an update + case ParsingContext(resourceKey, aclParser, reader, true) => + val sourceAclResult = aclParser.aclsFromReader(reader) + reader.close() + // add to the cache if successful + sourceAclResult.result match { + case Right(kafkaAcls) => + sourceAclsCache += (resourceKey -> kafkaAcls) + } + sourceAclResult + case ParsingContext(resourceKey, _, _, false) => + // no update necessary, fetch from cache + SourceAclResult( + Right( + sourceAclsCache + .getOrElse( + resourceKey, + // this should never happen, sources should set shouldUpdate to true the first time + throw new RuntimeException( + s"The resource '$resourceKey' does not exist in the cache." + ) + ) ) - case Left(parsingExceptions: List[Exception]) => - // parsing exceptions we want to notify - log.error( - "Exceptions while refreshing ACL source:", - parsingExceptions.map(e => e.toString).mkString("\n") - ) - // ugly but for now this will do - notification.notifyErrors( - parsingExceptions.map(e => Failure(e)) - ) - } - case ParsingContext(resourceKey, _, _, false) => - // the source does not need updating, reapply the permission - applySourceAcls( - sourceAclsCache.getOrElse(resourceKey, Set()), - getKafkaAcls, - notification, - authorizer - ) + ) + } } + .combineAll + .result match { + // the source has changed + case Right(ksmAcls) => + // we have a new result, so we cache it + applySourceAcls( + ksmAcls, + getKafkaAcls, + notification, + authorizer + ) + case Left(parsingExceptions: List[Exception]) => + // parsing exceptions we want to notify + log.error( + "Exceptions while refreshing ACL source:", + parsingExceptions.map(e => e.toString).mkString("\n") + ) + // ugly but for now this will do + notification.notifyErrors( + parsingExceptions.map(e => Failure(e)) + ) } case Failure(e) => // errors such as HTTP exceptions when refreshing diff --git a/src/main/scala/io/conduktor/ksm/source/FileSourceAcl.scala b/src/main/scala/io/conduktor/ksm/source/FileSourceAcl.scala index bf353a9..5397c48 100644 --- a/src/main/scala/io/conduktor/ksm/source/FileSourceAcl.scala +++ b/src/main/scala/io/conduktor/ksm/source/FileSourceAcl.scala @@ -2,16 +2,12 @@ package io.conduktor.ksm.source import com.typesafe.config.Config import io.conduktor.ksm.parser.AclParserRegistry -import org.slf4j.{Logger, LoggerFactory} -import java.io.{File, FileReader} +import java.io.{File, FileNotFoundException, FileReader} class FileSourceAcl(parserRegistry: AclParserRegistry) extends SourceAcl(parserRegistry) { - private val log: Logger = - LoggerFactory.getLogger(classOf[FileSourceAcl].getSimpleName) - override val CONFIG_PREFIX: String = "file" final val FILENAME_CONFIG = "filename" @@ -56,8 +52,9 @@ class FileSourceAcl(parserRegistry: AclParserRegistry) ) ) } else { - log.error(s"The provided path does not exist: $path") - List() + throw new FileNotFoundException( + s"The provided file does not exist: $filename" + ) } } diff --git a/src/test/scala/io/conduktor/ksm/AclSynchronizerTest.scala b/src/test/scala/io/conduktor/ksm/AclSynchronizerTest.scala index b69a100..6519763 100644 --- a/src/test/scala/io/conduktor/ksm/AclSynchronizerTest.scala +++ b/src/test/scala/io/conduktor/ksm/AclSynchronizerTest.scala @@ -1,9 +1,5 @@ package io.conduktor.ksm -import io.conduktor.ksm.notification.{ConsoleNotification, DummyNotification} -import io.conduktor.ksm.parser.AclParserRegistry -import io.conduktor.ksm.source.SourceAcl -import com.typesafe.config.Config import io.conduktor.ksm.notification.{ConsoleNotification, DummyNotification} import io.conduktor.ksm.parser.AclParserRegistry import io.conduktor.ksm.parser.csv.CsvAclParser @@ -14,7 +10,6 @@ import org.scalamock.scalatest.MockFactory import org.scalatest.concurrent.Eventually import org.scalatest.{FlatSpec, Matchers} -import java.io.Reader import scala.collection.JavaConverters._ import scala.concurrent.duration._ @@ -142,8 +137,6 @@ class AclSynchronizerTest val dummySourceAcl = new DummySourceAcl(aclParserRegistryMock) - val aclParser = new CsvAclParser() - val aclSynchronizer: AclSynchronizer = new AclSynchronizer( simpleAclAuthorizer, dummySourceAcl, diff --git a/src/test/scala/io/conduktor/ksm/source/BitbucketServerSourceAclTest.scala b/src/test/scala/io/conduktor/ksm/source/BitbucketServerSourceAclTest.scala index ecdcd7e..9e53084 100644 --- a/src/test/scala/io/conduktor/ksm/source/BitbucketServerSourceAclTest.scala +++ b/src/test/scala/io/conduktor/ksm/source/BitbucketServerSourceAclTest.scala @@ -10,21 +10,30 @@ import java.io.{BufferedReader, Reader} import java.util.Base64 import java.util.stream.Collectors - -class BitbucketServerSourceAclTest extends FlatSpec with Matchers with MixedMockFactory { +class BitbucketServerSourceAclTest + extends FlatSpec + with Matchers + with MixedMockFactory { val csvlAclParser = new CsvAclParser() val aclParserRegistryMock: AclParserRegistry = stub[AclParserRegistry] (aclParserRegistryMock.getParserByFilename _).when(*).returns(csvlAclParser) "Test" should "Successfully return body for specific branch" in { - val bitbucketServerSoureAcl = new BitbucketServerSourceAcl(aclParserRegistryMock) - val dummyHttp = new DummyHttp(Response(200, body = DummyHttp.commitsContent.getBytes)) + val bitbucketServerSoureAcl = + new BitbucketServerSourceAcl(aclParserRegistryMock) + val dummyHttp = + new DummyHttp(Response(200, body = DummyHttp.commitsContent.getBytes)) dummyHttp.commitMatcher = req => { - req.url.endsWith("commits") && req.queryParams.exists(q => q.name == "until" && q.value == "ref/feature-F1") && req.queryParams.length == 2 + req.url.endsWith("commits") && req.queryParams.exists(q => + q.name == "until" && q.value == "ref/feature-F1" + ) && req.queryParams.length == 2 } - - dummyHttp.browseMatcher = req => req.url.endsWith("browse/testFile?raw") && req.queryParams.exists(q => q.name == "at" && q.value == "ref/feature-F1") && req.queryParams.length == 1 + + dummyHttp.browseMatcher = req => + req.url.endsWith("browse/testFile?raw") && req.queryParams.exists(q => + q.name == "at" && q.value == "ref/feature-F1" + ) && req.queryParams.length == 1 populateSourceAcl(bitbucketServerSoureAcl, branch = "ref/feature-F1") bitbucketServerSoureAcl.http = dummyHttp @@ -32,69 +41,85 @@ class BitbucketServerSourceAclTest extends FlatSpec with Matchers with MixedMock val response = bitbucketServerSoureAcl.refresh() response.isEmpty shouldBe false - readAllLines(response.get.reader) shouldBe DummyHttp.browseFile + readAllLines(response.head.reader) shouldBe DummyHttp.browseFile } "Test" should "Successfully return body for acl" in { - val bitbucketServerSoureAcl = new BitbucketServerSourceAcl(aclParserRegistryMock) - val dummyHttp = new DummyHttp(Response(200, body = DummyHttp.commitsContent.getBytes)) + val bitbucketServerSoureAcl = + new BitbucketServerSourceAcl(aclParserRegistryMock) + val dummyHttp = + new DummyHttp(Response(200, body = DummyHttp.commitsContent.getBytes)) populateSourceAcl(bitbucketServerSoureAcl) bitbucketServerSoureAcl.http = dummyHttp - + val response = bitbucketServerSoureAcl.refresh() - + response.isEmpty shouldBe false - readAllLines(response.get.reader) shouldBe DummyHttp.browseFile + readAllLines(response.head.reader) shouldBe DummyHttp.browseFile } "Test" should "Pass base64 auth to bitbucket" in { - val bitbucketServerSoureAcl = new BitbucketServerSourceAcl(aclParserRegistryMock) - val dummyHttp = new DummyHttp(Response(200, body = DummyHttp.commitsContent.getBytes)) - val expected = "Basic " + Base64.getEncoder.encodeToString("test:pwd".getBytes) + val bitbucketServerSoureAcl = + new BitbucketServerSourceAcl(aclParserRegistryMock) + val dummyHttp = + new DummyHttp(Response(200, body = DummyHttp.commitsContent.getBytes)) + val expected = "Basic " + Base64.getEncoder.encodeToString( + "test:pwd".getBytes + ) populateSourceAcl(bitbucketServerSoureAcl) bitbucketServerSoureAcl.http = dummyHttp - dummyHttp.commitMatcher = req => req.header("Authorization").get == expected && req.url.endsWith("commits") - dummyHttp.browseMatcher = req => req.header("Authorization").get == expected && req.url.endsWith("browse/testFile?raw") - + dummyHttp.commitMatcher = req => + req.header("Authorization").get == expected && req.url.endsWith("commits") + dummyHttp.browseMatcher = req => + req.header("Authorization").get == expected && req.url.endsWith( + "browse/testFile?raw" + ) val response = bitbucketServerSoureAcl.refresh() - response.isEmpty shouldBe false - readAllLines(response.get.reader) shouldBe DummyHttp.browseFile + readAllLines(response.head.reader) shouldBe DummyHttp.browseFile } "Test" should "Successfully not return body if acl do not changed since last call" in { - val bitbucketServerSoureAcl = new BitbucketServerSourceAcl(aclParserRegistryMock) - val dummyHttp = new DummyHttp(Response(200, body = DummyHttp.commitsContent.getBytes)) + val bitbucketServerSoureAcl = + new BitbucketServerSourceAcl(aclParserRegistryMock) + val dummyHttp = + new DummyHttp(Response(200, body = DummyHttp.commitsContent.getBytes)) populateSourceAcl(bitbucketServerSoureAcl) bitbucketServerSoureAcl.http = dummyHttp val firstResponse = bitbucketServerSoureAcl.refresh() - dummyHttp.commitMatcher = req => req.queryParams.exists(q => q.value == "c22287a15f6bada0b3b121b838a13dc3fad613cc") - dummyHttp.commitsResponse = Response(200, body = DummyHttp.commitsEmptyResponse.getBytes) + dummyHttp.commitMatcher = req => + req.queryParams.exists(q => + q.value == "c22287a15f6bada0b3b121b838a13dc3fad613cc" + ) + dummyHttp.commitsResponse = + Response(200, body = DummyHttp.commitsEmptyResponse.getBytes) dummyHttp.browseResponse = Response(500) val response = bitbucketServerSoureAcl.refresh() - firstResponse.isEmpty shouldBe false response.isEmpty shouldBe true } "Test" should "Successfully return body if acl changed since last call" in { - val bitbucketServerSoureAcl = new BitbucketServerSourceAcl(aclParserRegistryMock) - val dummyHttp = new DummyHttp(Response(200, body = DummyHttp.commitsContentFirst.getBytes)) + val bitbucketServerSoureAcl = + new BitbucketServerSourceAcl(aclParserRegistryMock) + val dummyHttp = new DummyHttp( + Response(200, body = DummyHttp.commitsContentFirst.getBytes) + ) populateSourceAcl(bitbucketServerSoureAcl) bitbucketServerSoureAcl.http = dummyHttp val firstResponse = bitbucketServerSoureAcl.refresh() val firstCommit = bitbucketServerSoureAcl.lastCommit.get - dummyHttp.commitMatcher = req => req.queryParams.exists(q => q.value == "somefirsthash") - dummyHttp.commitsResponse = Response(200, body = DummyHttp.commitsContent.getBytes) - + dummyHttp.commitMatcher = + req => req.queryParams.exists(q => q.value == "somefirsthash") + dummyHttp.commitsResponse = + Response(200, body = DummyHttp.commitsContent.getBytes) val response = bitbucketServerSoureAcl.refresh() - firstResponse.isEmpty shouldBe false response.isEmpty shouldBe false firstCommit shouldBe "somefirsthash" @@ -106,7 +131,11 @@ class BitbucketServerSourceAclTest extends FlatSpec with Matchers with MixedMock buffReader.lines().collect(Collectors.joining("\n")) } - def populateSourceAcl(source: BitbucketServerSourceAcl, filePath: String = "testFile", branch: String = null): Unit = { + def populateSourceAcl( + source: BitbucketServerSourceAcl, + filePath: String = "testFile", + branch: String = null + ): Unit = { source.hostname = "example" source.port = "8888" source.protocol = "http" @@ -119,15 +148,23 @@ class BitbucketServerSourceAclTest extends FlatSpec with Matchers with MixedMock } } -class DummyHttp(var commitsResponse: Response, var browseResponse: Response = Response(200, body = DummyHttp.browseFile.getBytes)) extends HTTP { +class DummyHttp( + var commitsResponse: Response, + var browseResponse: Response = + Response(200, body = DummyHttp.browseFile.getBytes) +) extends HTTP { var commitMatcher: Request => Boolean = req => { - req.url.endsWith("commits") && req.queryParams.exists(q => q.name == "path" - && q.value == "testFile") && !req.queryParams.exists(q => q.name == "until") + req.url.endsWith("commits") && req.queryParams.exists(q => + q.name == "path" + && q.value == "testFile" + ) && !req.queryParams.exists(q => q.name == "until") } var browseMatcher: Request => Boolean = req => { - req.url.endsWith("browse/testFile?raw") && !req.queryParams.exists(q => q.name == "at") + req.url.endsWith("browse/testFile?raw") && !req.queryParams.exists(q => + q.name == "at" + ) } override def get(req: Request): Response = { diff --git a/src/test/scala/io/conduktor/ksm/source/DummySourceAcl.scala b/src/test/scala/io/conduktor/ksm/source/DummySourceAcl.scala index a9b3161..e57f9ab 100644 --- a/src/test/scala/io/conduktor/ksm/source/DummySourceAcl.scala +++ b/src/test/scala/io/conduktor/ksm/source/DummySourceAcl.scala @@ -1,9 +1,11 @@ package io.conduktor.ksm.source -import io.conduktor.ksm.TestFixtures._ import com.typesafe.config.Config +import io.conduktor.ksm.TestFixtures._ import io.conduktor.ksm.parser.AclParserRegistry import io.conduktor.ksm.parser.csv.CsvAclParser +import kafka.security.auth +import kafka.security.auth.Acl import java.io.StringReader @@ -36,21 +38,29 @@ class DummySourceAcl(parserRegistry: AclParserRegistry) // a states iterator, shifting its position changes current state private val sarsIterator = sars.iterator - override def refresh(): Option[ParsingContext] = { - if (noneNext) { - noneNext = false - None - } else if (errorNext) { + var current: List[_ <: (auth.Resource, Acl)] = List() + + override def refresh(): List[ParsingContext] = { + if (errorNext) { errorNext = false throw new RuntimeException("triggered error") - } else { - Some( - ParsingContext( - csvAclParser, - new StringReader(csvAclParser.formatAcls(sarsIterator.next().toList)) - ) - ) } + + if (!noneNext) { + current = sarsIterator.next().toList + } + + val res = List( + ParsingContext( + "fileName", + csvAclParser, + new StringReader(csvAclParser.formatAcls(current)), + !noneNext + ) + ) + + noneNext = false + res } def setNoneNext(): Unit = { diff --git a/src/test/scala/io/conduktor/ksm/source/FileSourceAclTest.scala b/src/test/scala/io/conduktor/ksm/source/FileSourceAclTest.scala index b03e9ab..a4fac36 100644 --- a/src/test/scala/io/conduktor/ksm/source/FileSourceAclTest.scala +++ b/src/test/scala/io/conduktor/ksm/source/FileSourceAclTest.scala @@ -2,16 +2,16 @@ package io.conduktor.ksm.source import io.conduktor.ksm.parser.AclParserRegistry import io.conduktor.ksm.parser.csv.CsvAclParser - -import java.io.{File, Reader} -import java.nio.charset.StandardCharsets -import java.nio.file.{Files, Paths} import kafka.security.auth._ import org.apache.kafka.common.resource.PatternType import org.apache.kafka.common.utils.SecurityUtils import org.scalamock.scalatest.MockFactory import org.scalatest.{FlatSpec, Matchers} +import java.io.{File, Reader} +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths} + class FileSourceAclTest extends FlatSpec with Matchers with MockFactory { val csvlAclParser = new CsvAclParser() @@ -52,7 +52,7 @@ class FileSourceAclTest extends FlatSpec with Matchers with MockFactory { val res2 = Resource(Group, "bar", PatternType.PREFIXED) val res3 = Resource(Cluster, "kafka-cluster", PatternType.LITERAL) - val parsingContext = fileSourceAcl.refresh().get + val parsingContext = fileSourceAcl.refresh().head csvlAclParser.aclsFromReader(parsingContext.reader).result shouldBe Right( Set(res1 -> acl1, res2 -> acl2, res3 -> acl3) ) @@ -93,7 +93,7 @@ class FileSourceAclTest extends FlatSpec with Matchers with MockFactory { val res2 = Resource(Group, "bar", PatternType.PREFIXED) val res3 = Resource(Cluster, "kafka-cluster", PatternType.LITERAL) - val reader1: Reader = fileSourceAcl.refresh().get.reader + val reader1: Reader = fileSourceAcl.refresh().head.reader csvlAclParser.aclsFromReader(reader1).result shouldBe Right( Set(res1 -> acl1, res2 -> acl2, res3 -> acl3) ) @@ -111,7 +111,7 @@ class FileSourceAclTest extends FlatSpec with Matchers with MockFactory { // we force the modification of the time of the file so that the test passes file.setLastModified(System.currentTimeMillis() + 10000) - val reader2 = fileSourceAcl.refresh().get.reader + val reader2 = fileSourceAcl.refresh().head.reader csvlAclParser.aclsFromReader(reader2).result shouldBe Right( Set(res1 -> acl1) ) diff --git a/src/test/scala/io/conduktor/ksm/source/S3SourceAclTest.scala b/src/test/scala/io/conduktor/ksm/source/S3SourceAclTest.scala index 48522ed..94f1f5b 100644 --- a/src/test/scala/io/conduktor/ksm/source/S3SourceAclTest.scala +++ b/src/test/scala/io/conduktor/ksm/source/S3SourceAclTest.scala @@ -1,13 +1,12 @@ package io.conduktor.ksm.source -import io.conduktor.ksm.parser.AclParserRegistry -import io.conduktor.ksm.parser.{AclParser, AclParserRegistry} import io.conduktor.ksm.parser.csv.CsvAclParser +import io.conduktor.ksm.parser.{AclParser, AclParserRegistry} import org.scalamock.scalatest.MockFactory +import org.scalatest.{FlatSpec, Matchers} import java.io.BufferedReader import java.util.UUID -import org.scalatest.{FlatSpec, Matchers} class S3SourceAclTest extends FlatSpec with Matchers with MockFactory { @@ -37,14 +36,17 @@ class S3SourceAclTest extends FlatSpec with Matchers with MockFactory { s3SourceAcl.configure(bucket, key, region) - val reader = s3SourceAcl.refresh() - - reader match { - case None => fail() // didn't read - case Some(ParsingContext(_: AclParser, x: BufferedReader)) => - val read = Stream.continually(x.readLine()).takeWhile(Option(_).nonEmpty).map(_.concat("\n")).mkString + val res = s3SourceAcl.refresh() + res.head match { + case ParsingContext(_, _: AclParser, x: BufferedReader, _) => + val read = Stream + .continually(x.readLine()) + .takeWhile(Option(_).nonEmpty) + .map(_.concat("\n")) + .mkString content shouldBe read + case _ => fail() } s3SourceAcl.api.shutdown // kills the underlying actor system. Use api.stop() to just unbind the port.