Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi source support #101

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 52 additions & 39 deletions src/main/scala/io/conduktor/ksm/AclSynchronizer.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.conduktor.ksm

import io.conduktor.ksm.source.SourceAcl
import cats.implicits._
import io.conduktor.ksm.notification.Notification
import io.conduktor.ksm.source.{ParsingContext, SourceAcl}
import io.conduktor.ksm.source.{ParsingContext, SourceAcl, SourceAclResult}
import kafka.security.auth.{Acl, Authorizer, Resource}
import org.slf4j.{Logger, LoggerFactory}

Expand Down Expand Up @@ -66,7 +66,7 @@ class AclSynchronizer(

import AclSynchronizer._

private var sourceAclsCache: Set[(Resource, Acl)] = _
private var sourceAclsCache: Map[String, Set[(Resource, Acl)]] = Map()
private var failedRefreshes: Int = 0

if (readOnly) {
Expand All @@ -84,50 +84,63 @@ class AclSynchronizer(
Try(sourceAcl.refresh()) match {
case Success(result) =>
failedRefreshes = 0
result match {
// the source has not changed
case None =>
if (sourceAclsCache != null) {
// the Kafka Acls may have changed so we check against the last known correct SourceAcl that we cached
applySourceAcls(
sourceAclsCache,
getKafkaAcls,
notification,
authorizer
)
}
case Some(ParsingContext(parser, reader)) =>
val sourceAclResult = parser.aclsFromReader(reader)
reader.close()
sourceAclResult.result match {
// the source has changed
case Right(ksmAcls) =>
// we have a new result, so we cache it
sourceAclsCache = ksmAcls
applySourceAcls(
sourceAclsCache,
getKafkaAcls,
notification,
authorizer
)
case Left(parsingExceptions: List[Exception]) =>
// parsing exceptions we want to notify
log.error(
"Exceptions while refreshing ACL source:",
parsingExceptions.map(e => e.toString).mkString("\n")
)
// ugly but for now this will do
notification.notifyErrors(
parsingExceptions.map(e => Try(throw e))
result
.map { context: ParsingContext =>
context match {
// in case there is an update
case ParsingContext(resourceKey, aclParser, reader, true) =>
val sourceAclResult = aclParser.aclsFromReader(reader)
reader.close()
// add to the cache if successful
sourceAclResult.result match {
case Right(kafkaAcls) =>
sourceAclsCache += (resourceKey -> kafkaAcls)
}
sourceAclResult
case ParsingContext(resourceKey, _, _, false) =>
// no update necessary, fetch from cache
SourceAclResult(
Right(
sourceAclsCache
.getOrElse(
resourceKey,
// this should never happen, sources should set shouldUpdate to true the first time
throw new RuntimeException(
s"The resource '$resourceKey' does not exist in the cache."
)
)
)
)
}
}
.combineAll
.result match {
// the source has changed
case Right(ksmAcls) =>
// we have a new result, so we cache it
applySourceAcls(
ksmAcls,
getKafkaAcls,
notification,
authorizer
)
case Left(parsingExceptions: List[Exception]) =>
// parsing exceptions we want to notify
log.error(
"Exceptions while refreshing ACL source:",
parsingExceptions.map(e => e.toString).mkString("\n")
)
// ugly but for now this will do
notification.notifyErrors(
parsingExceptions.map(e => Failure(e))
)
}
case Failure(e) =>
// errors such as HTTP exceptions when refreshing
failedRefreshes += 1
try {
log.error("Exceptions while refreshing ACL source:", e)
if(failedRefreshes >= numFailedRefreshesBeforeNotification){
if (failedRefreshes >= numFailedRefreshesBeforeNotification) {
notification.notifyErrors(List(Try(e)))
failedRefreshes = 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class BitbucketCloudSourceAcl(parserRegistry: AclParserRegistry)
password = config.getString(AUTH_PASSWORD_CONFIG)
}

override def refresh(): Option[ParsingContext] = {
override def refresh(): List[ParsingContext] = {
// get the latest file
val url = s"$apiurl/repositories/$organization/$repo/src/master/$filePath"
val request: Request = new Request(url)
Expand All @@ -63,7 +63,13 @@ class BitbucketCloudSourceAcl(parserRegistry: AclParserRegistry)
case 200 =>
// we receive a valid response
val reader = new BufferedReader(new StringReader(response.textBody))
Some(source.ParsingContext(parserRegistry.getParserByFilename(filePath), reader))
List(
ParsingContext(
filePath,
parserRegistry.getParserByFilename(filePath),
reader
)
)
case _ =>
// uncaught error
log.warn(response.asString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class BitbucketServerSourceAcl(parserRegistry: AclParserRegistry)
})
}

override def refresh(): Option[ParsingContext] = {
override def refresh(): List[ParsingContext] = {
// get changes since last commit
val url =
s"$protocol://$hostname:$port/rest/api/1.0/projects/$project/repos/$repo/commits"
Expand Down Expand Up @@ -103,8 +103,9 @@ class BitbucketServerSourceAcl(parserRegistry: AclParserRegistry)
// update the last commit id
lastCommit = Some(values.get(0).get("id").asText())
val data = fileResponse.textBody
Some(
source.ParsingContext(
List(
ParsingContext(
filePath,
parserRegistry.getParserByFilename(filePath),
new StringReader(data)
)
Expand All @@ -115,7 +116,7 @@ class BitbucketServerSourceAcl(parserRegistry: AclParserRegistry)
throw HTTPException(Some(response.asString), response)
}
} else {
None
List()
}
case _ =>
// uncaught error
Expand Down
38 changes: 28 additions & 10 deletions src/main/scala/io/conduktor/ksm/source/FileSourceAcl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@ package io.conduktor.ksm.source

import com.typesafe.config.Config
import io.conduktor.ksm.parser.AclParserRegistry
import io.conduktor.ksm.source

import java.io.{File, FileReader}
import java.io.{File, FileNotFoundException, FileReader}

class FileSourceAcl(parserRegistry: AclParserRegistry)
extends SourceAcl(parserRegistry) {

override val CONFIG_PREFIX: String = "file"
final val FILENAME_CONFIG = "filename"

var lastModified: Long = -1
var filename: String = _
val modifiedMap: Map[String, Long] = Map[String, Long]()

/**
* internal config definition for the module
Expand All @@ -29,14 +28,33 @@ class FileSourceAcl(parserRegistry: AclParserRegistry)
* Uses a CSV parser on the file afterwards
* @return
*/
override def refresh(): Option[ParsingContext] = {
val file = new File(filename)
if (file.lastModified() > lastModified) {
val reader = new FileReader(file)
lastModified = file.lastModified()
Some(source.ParsingContext(parserRegistry.getParserByFilename(filename), reader))
override def refresh(): List[ParsingContext] = {

val path = new File(filename)
if (path.exists()) {
val files =
if (path.isFile)
List(path)
else
path.listFiles.filter(_.isFile).toList

files.map(file =>
ParsingContext(
file.getName,
parserRegistry.getParserByFilename(file.getName),
new FileReader(file),
file.lastModified >= (modifiedMap.get(file.getName) match {
case None =>
modifiedMap + (file.getName -> file.lastModified)
0L
case Some(value) => value
})
)
)
} else {
None
throw new FileNotFoundException(
s"The provided file does not exist: $filename"
)
}
}

Expand Down
13 changes: 8 additions & 5 deletions src/main/scala/io/conduktor/ksm/source/GitHubSourceAcl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package io.conduktor.ksm.source
import com.fasterxml.jackson.databind.ObjectMapper
import com.typesafe.config.Config
import io.conduktor.ksm.parser.AclParserRegistry
import io.conduktor.ksm.source
import org.slf4j.LoggerFactory
import skinny.http.{HTTP, HTTPException, Request, Response}

Expand Down Expand Up @@ -49,7 +48,7 @@ class GitHubSourceAcl(parserRegistry: AclParserRegistry)
tokenOpt = Try(config.getString(AUTH_TOKEN_CONFIG)).toOption
}

override def refresh(): Option[ParsingContext] = {
override def refresh(): List[ParsingContext] = {
val url =
s"https://$hostname/repos/$user/$repo/contents/$filepath?ref=$branch"
val request: Request = new Request(url)
Expand Down Expand Up @@ -80,11 +79,15 @@ class GitHubSourceAcl(parserRegistry: AclParserRegistry)
Charset.forName("UTF-8")
)
// use the CSV Parser
Some(
source.ParsingContext(parserRegistry.getParserByFilename(filepath), new StringReader(data))
List(
ParsingContext(
filepath,
parserRegistry.getParserByFilename(filepath),
new StringReader(data)
)
)
case 304 =>
None
List()
case _ =>
// we got an http error so we propagate it
log.warn(response.asString)
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/io/conduktor/ksm/source/GitLabSourceAcl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package io.conduktor.ksm.source
import com.fasterxml.jackson.databind.ObjectMapper
import com.typesafe.config.Config
import io.conduktor.ksm.parser.AclParserRegistry
import io.conduktor.ksm.source
import org.slf4j.LoggerFactory
import skinny.http.{HTTP, HTTPException, Request, Response}

Expand Down Expand Up @@ -42,7 +41,7 @@ class GitLabSourceAcl(parserRegistry: AclParserRegistry)
accessToken = config.getString(ACCESSTOKEN_CONFIG)
}

override def refresh(): Option[ParsingContext] = {
override def refresh(): List[ParsingContext] = {
val url =
s"https://$hostname/api/v4/projects/$repoid/repository/files/$filepath?ref=$branch"
val request: Request = new Request(url)
Expand All @@ -62,7 +61,7 @@ class GitLabSourceAcl(parserRegistry: AclParserRegistry)
log.info(
s"No changes were detected in the ACL file ${filepath}. Skipping .... "
)
None
List()
case _ =>
val response: Response = HTTP.get(request)
response.status match {
Expand All @@ -76,8 +75,9 @@ class GitLabSourceAcl(parserRegistry: AclParserRegistry)
Charset.forName("UTF-8")
)
// use the CSV Parser
Some(
source.ParsingContext(
List(
ParsingContext(
filepath,
parserRegistry.getParserByFilename(filepath),
new StringReader(data)
)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/conduktor/ksm/source/NoSourceAcl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class NoSourceAcl(parserRegistry: AclParserRegistry) extends SourceAcl(parserReg
*
* @return
*/
override def refresh(): Option[ParsingContext] = None
override def refresh(): List[ParsingContext] = List()

/**
* Close all the necessary underlying objects or connections belonging to this instance
Expand Down
16 changes: 8 additions & 8 deletions src/main/scala/io/conduktor/ksm/source/S3SourceAcl.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package io.conduktor.ksm.source

import java.io._
import java.util.Date
import com.amazonaws.regions.Regions
import com.amazonaws.services.s3._
import com.amazonaws.services.s3.model._
import io.conduktor.ksm.parser.AclParserRegistry
import com.typesafe.config.Config
import io.conduktor.ksm.parser.AclParserRegistry
import io.conduktor.ksm.source
import org.slf4j.LoggerFactory

import java.io._
import java.util.Date

class S3SourceAcl(parserRegistry: AclParserRegistry)
extends SourceAcl(parserRegistry) {

Expand Down Expand Up @@ -59,7 +58,7 @@ class S3SourceAcl(parserRegistry: AclParserRegistry)
*
* @return
*/
override def refresh(): Option[ParsingContext] = {
override def refresh(): List[ParsingContext] = {
val s3 = s3Client()
val s3object = Option(
s3.getObject(
Expand All @@ -84,13 +83,14 @@ class S3SourceAcl(parserRegistry: AclParserRegistry)

reader.close()
bucket.close()
Some(
source.ParsingContext(
List(
ParsingContext(
key,
parserRegistry.getParserByFilename(key),
new BufferedReader(new StringReader(content))
)
)
case None => None
case None => List()
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/main/scala/io/conduktor/ksm/source/SourceAcl.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package io.conduktor.ksm.source

import io.conduktor.ksm.parser.AclParserRegistry

import java.io.Reader
import com.typesafe.config.Config
import io.conduktor.ksm.parser.{AclParser, AclParserRegistry}

case class ParsingContext(aclParser: AclParser, reader: Reader)
case class ParsingContext(resourceKey: String, aclParser: AclParser, reader: Reader, shouldUpdate: Boolean = true)

abstract class SourceAcl(val parserRegistry: AclParserRegistry) {

Expand All @@ -30,7 +28,7 @@ abstract class SourceAcl(val parserRegistry: AclParserRegistry) {
* Kafka Security Manager will not update Acls in Kafka until there are no errors in the result
* @return
*/
def refresh(): Option[ParsingContext]
def refresh(): List[ParsingContext]

/**
* Close all the necessary underlying objects or connections belonging to this instance
Expand Down
Loading
Loading