Skip to content

Commit

Permalink
feat(registration): Hit Keel at a known (configured) address
Browse files Browse the repository at this point in the history
  • Loading branch information
robfletcher committed Aug 29, 2018
1 parent 6acc7a6 commit 33644f2
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 41 deletions.
4 changes: 2 additions & 2 deletions keel-api/src/main/proto/Registry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ service AssetPluginRegistry {
}

message RegisterAssetPluginRequest {
string name = 1;
string vipAddress = 1;
repeated TypeMetadata types = 2;
}

Expand All @@ -25,7 +25,7 @@ service VetoPluginRegistry {
}

message RegisterVetoPluginRequest {
string name = 1;
string vipAddress = 1;
}

message RegisterVetoPluginResponse {
Expand Down
1 change: 0 additions & 1 deletion keel-plugin/keel-plugin.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
apply plugin: "java-library"
apply plugin: "kotlin-spring"
apply from: "$rootDir/gradle/grpc.gradle"
apply from: "$rootDir/gradle/spek2.gradle"
apply from: "$rootDir/gradle/junit5.gradle"

dependencies {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
package com.netflix.spinnaker.keel.plugin

import com.netflix.spinnaker.keel.api.TypeMetadata
import com.netflix.spinnaker.keel.api.engine.RegisterAssetPluginRequest
import com.netflix.spinnaker.keel.api.plugin.AssetPluginGrpc.AssetPluginImplBase

abstract class AssetPlugin : AssetPluginImplBase() {
abstract val supportedTypes: Iterable<TypeMetadata>
}

val AssetPlugin.registrationRequest: RegisterAssetPluginRequest
get() = RegisterAssetPluginRequest
.newBuilder()
.addAllTypes(supportedTypes)
.build()
Original file line number Diff line number Diff line change
@@ -1,49 +1,67 @@
package com.netflix.spinnaker.keel.plugin

import com.netflix.appinfo.InstanceInfo
import com.netflix.appinfo.InstanceInfo.InstanceStatus.UP
import com.netflix.discovery.EurekaClient
import com.netflix.spinnaker.keel.api.engine.AssetPluginRegistryGrpc
import com.netflix.spinnaker.keel.api.engine.AssetPluginRegistryGrpc.AssetPluginRegistryBlockingStub
import com.netflix.spinnaker.keel.api.engine.RegisterAssetPluginRequest
import com.netflix.spinnaker.kork.eureka.RemoteStatusChangedEvent
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.ApplicationListener
import org.springframework.stereotype.Component
import javax.annotation.PostConstruct

@Component
class PluginRegistrar(
private val eurekaClient: EurekaClient,
private val plugins: List<AssetPlugin>
) {
private val log = LoggerFactory.getLogger(javaClass)

@PostConstruct
fun registerPlugins() {
registry().let { registry ->
plugins.forEach { plugin ->
plugin.register(registry)
}
private val plugins: List<AssetPlugin>,
@Value("\${keel.registry.address:keel-test.us-west-2.spinnaker.netflix.net}") private val keelRegistryAddress: String,
private val instanceInfo: InstanceInfo
) : ApplicationListener<RemoteStatusChangedEvent> {

private val log by lazy { LoggerFactory.getLogger(javaClass) }

override fun onApplicationEvent(event: RemoteStatusChangedEvent) {
if (event.source.status == UP) {
onDiscoveryUp()
}
// TODO: should deregister as well
}

private fun AssetPlugin.register(registry: AssetPluginRegistryBlockingStub) {
registry.register(registrationRequest).let { response ->
fun onDiscoveryUp() {
plugins.forEach { plugin ->
plugin.registerWith(registry)
}
}

private fun AssetPlugin.registerWith(registry: AssetPluginRegistryBlockingStub) {
val request = RegisterAssetPluginRequest
.newBuilder()
.apply {
vipAddress = instanceInfo.vipAddress
addAllTypes(supportedTypes)
}
.build()
registry.register(request).let { response ->
if (response.succeeded) {
log.info("Successfully registered {} with Keel", javaClass.simpleName)
}
}
}

private fun registry(): AssetPluginRegistryBlockingStub =
eurekaClient
.getNextServerFromEureka("keel", false)
private val registry: AssetPluginRegistryBlockingStub by lazy {
eurekaClient.getNextServerFromEureka(keelRegistryAddress, false)
?.let(::createChannelTo)
?.let(AssetPluginRegistryGrpc::newBlockingStub)
?: throw IllegalStateException("Can't find keel in Eureka")
}

fun createChannelTo(instance: InstanceInfo): ManagedChannel =
fun createChannelTo(it: InstanceInfo): ManagedChannel =
ManagedChannelBuilder
.forAddress(instance.ipAddr, instance.port)
.forAddress(it.ipAddr, it.port)
.usePlaintext()
.build()
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.netflix.spinnaker.keel.plugin

import com.netflix.appinfo.InstanceInfo
import com.netflix.discovery.EurekaClient
import com.netflix.spinnaker.keel.api.GrpcStubManager
import com.netflix.spinnaker.keel.api.TypeMetadata
Expand Down Expand Up @@ -35,16 +36,28 @@ internal class PluginRegistrationTest {
}
}
}
val keelRegistryAddress = "keel-test"
val eurekaClient: EurekaClient = mock()
val amazonAssetPlugin = mock<AssetPlugin>()
val registrar = PluginRegistrar(eurekaClient, listOf(amazonAssetPlugin))
val instanceInfo = InstanceInfo.Builder.newBuilder().run {
setAppName("keelplugins")
setASGName("keelplugins-test-aws-v005")
setVIPAddress("keelplugins-test-aws")
build()
}
val registrar = PluginRegistrar(
eurekaClient,
listOf(amazonAssetPlugin),
keelRegistryAddress,
instanceInfo
)

@BeforeEach
fun startRegistry() {
grpc.startServer {
addService(registry)
}
whenever(eurekaClient.getNextServerFromEureka("keel", false)) doReturn grpc.instanceInfo
whenever(eurekaClient.getNextServerFromEureka(keelRegistryAddress, false)) doReturn grpc.instanceInfo
whenever(amazonAssetPlugin.supportedTypes) doReturn listOf(
"aws.SecurityGroup",
"aws.ClassicLoadBalancer"
Expand All @@ -59,7 +72,7 @@ internal class PluginRegistrationTest {

@Test
fun `registers plugins on startup`() {
registrar.registerPlugins()
registrar.onDiscoveryUp()

expect(registeredTypes)
.containsExactlyInAnyOrder(
Expand All @@ -69,6 +82,5 @@ internal class PluginRegistrationTest {
}
}


fun typeMetadataForKind(it: String) =
TypeMetadata.newBuilder().setKind(it).build()
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class GrpcAssetPluginRegistry(
request
.typesList
.forEach { type ->
assetPlugins[type] = request.name
log.info("Registered asset plugin \"${request.name}\" supporting $type")
assetPlugins[type] = request.vipAddress
log.info("Registered asset plugin at \"${request.vipAddress}\" supporting $type")
}
responseObserver.apply {
onNext(registerAssetPluginSuccessResponse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class GrpcVetoPluginRegistry(
request: RegisterVetoPluginRequest,
responseObserver: StreamObserver<RegisterVetoPluginResponse>
) {
vetoPlugins.add(request.name)
log.info("Registered veto plugin \"${request.name}\"")
vetoPlugins.add(request.vipAddress)
log.info("Registered veto plugin at \"${request.vipAddress}\"")
responseObserver.apply {
onNext(
RegisterVetoPluginResponse.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ internal class GrpcAssetServiceTests {
addService(plugin)
}

val pluginName = "aws-plugin"
whenever(eureka.getNextServerFromEureka(pluginName, false)) doReturn grpc.instanceInfo
val pluginAddress = "aws-plugin"
whenever(eureka.getNextServerFromEureka(pluginAddress, false)) doReturn grpc.instanceInfo

val responseCallback: StreamObserver<RegisterAssetPluginResponse> = mock()
registry.register(RegisterAssetPluginRequest.newBuilder().also {
it.name = pluginName
it.vipAddress = pluginAddress
it.addTypes(asset.toTypeMetaData())
}.build(), responseCallback)
verify(responseCallback).onNext(argWhere {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ internal object PluginRegistrationSpec : Spek({
RegisterAssetPluginRequest
.newBuilder()
.apply {
name = "aws-asset-plugin"
vipAddress = "aws-asset-plugin"
addTypes(type)
}
.build(),
Expand Down Expand Up @@ -135,7 +135,7 @@ internal object PluginRegistrationSpec : Spek({
given("plugins were registered") {
sequenceOf("execution-window", "cloud-capacity").forEach {
subject.register(
RegisterVetoPluginRequest.newBuilder().setName(it).build(),
RegisterVetoPluginRequest.newBuilder().setVipAddress(it).build(),
responseHandler
)
}
Expand Down

0 comments on commit 33644f2

Please sign in to comment.