Skip to content

Commit

Permalink
feat(assets): gRPC endpoint to register assets
Browse files Browse the repository at this point in the history
  • Loading branch information
robfletcher committed Sep 19, 2018
1 parent c23ed3b commit a1d6694
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 0 deletions.
32 changes: 32 additions & 0 deletions keel-api/src/main/proto/AssetRegistry.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
syntax = "proto3";

package spinnaker.keel.api;

option java_multiple_files = true;
option java_package = "com.netflix.spinnaker.keel.api";

import "Asset.proto";

service AssetRegistry {
rpc upsertAsset (UpsertAssetRequest) returns (UpsertAssetResponse);
// TODO: will need a delete asset
}

message UpsertAssetRequest {
AssetContainer asset = 1;
}

message UpsertAssetResponse {
repeated UpsertAssetResult result = 1;
}

message UpsertAssetResult {
AssetId id = 1;
UpsertAssetStatus status = 2;
}

enum UpsertAssetStatus {
INSERTED = 0;
UPDATED = 1;
FAILED = 2;
}
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.netflix.spinnaker.keel.grpc

import com.netflix.spinnaker.keel.api.AssetRegistryGrpc
import com.netflix.spinnaker.keel.api.UpsertAssetRequest
import com.netflix.spinnaker.keel.api.UpsertAssetResponse
import com.netflix.spinnaker.keel.api.UpsertAssetResult
import com.netflix.spinnaker.keel.api.UpsertAssetStatus.INSERTED
import com.netflix.spinnaker.keel.persistence.AssetRepository
import io.grpc.stub.StreamObserver
import org.lognet.springboot.grpc.GRpcService
import org.slf4j.LoggerFactory

@GRpcService
class GrpcAssetRegistry(
private val assetRepository: AssetRepository
) : AssetRegistryGrpc.AssetRegistryImplBase() {
override fun upsertAsset(
request: UpsertAssetRequest,
responseObserver: StreamObserver<UpsertAssetResponse>
) {
with(request.asset) {
log.info("Upserting asset {}", asset.id)

val upserted = listOf(asset.fromProto()) + partialAssetsList.map { it.fromProto() }
upserted.forEach(assetRepository::store)

with(responseObserver) {
onNext(UpsertAssetResponse.newBuilder().apply {
upserted.forEach {
addResult(UpsertAssetResult.newBuilder().apply {
status = INSERTED
id = it.id.toProto()
})
}
}.build())
onCompleted()
}
}
}

private val log by lazy { LoggerFactory.getLogger(javaClass) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package com.netflix.spinnaker.keel.grpc

import com.netflix.spinnaker.keel.api.AssetRegistryGrpc
import com.netflix.spinnaker.keel.api.GrpcStubManager
import com.netflix.spinnaker.keel.api.UpsertAssetRequest
import com.netflix.spinnaker.keel.api.UpsertAssetStatus.INSERTED
import com.netflix.spinnaker.keel.model.Asset
import com.netflix.spinnaker.keel.model.AssetId
import com.netflix.spinnaker.keel.model.PartialAsset
import com.netflix.spinnaker.keel.persistence.AssetRepository
import com.netflix.spinnaker.keel.processing.randomBytes
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.reset
import com.nhaarman.mockito_kotlin.verify
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import strikt.api.expectThat
import strikt.assertions.all
import strikt.assertions.containsExactlyInAnyOrder
import strikt.assertions.first
import strikt.assertions.hasSize
import strikt.assertions.isEqualTo

internal object GrpcAssetRegistrySpec : Spek({

val assetRepository: AssetRepository = mock()
val subject = GrpcAssetRegistry(assetRepository)

val grpc = GrpcStubManager(AssetRegistryGrpc::newBlockingStub)

beforeGroup {
grpc.startServer {
addService(subject)
}
}

afterGroup {
grpc.stopServer()
}

describe("upserting an asset") {
given("no previous version of the asset exists") {
given("a simple asset") {
val asset = Asset(
id = AssetId("aws:securityGroup:keel:keel-frontend:mgmt:us-west-2"),
kind = "SecurityGroup",
spec = randomBytes()
)
val request = UpsertAssetRequest.newBuilder()
.apply {
assetBuilder.asset = asset.toProto()
}
.build()

afterGroup {
reset(assetRepository)
}

on("upserting the asset") {
grpc.withChannel { stub ->
stub.upsertAsset(request)
.also { response ->
expectThat(response.resultList)
.hasSize(1)
.first()
.and {
map { it.status }.isEqualTo(INSERTED)
}
.and {
map { it.id.value }.isEqualTo(asset.id.value)
}
}
}
}

it("stores the asset") {
verify(assetRepository).store(asset)
}
}

given("an asset with associated partials") {
val asset = Asset(
id = AssetId("aws:securityGroup:keel:keel-frontend:mgmt:us-west-2"),
kind = "SecurityGroup",
spec = randomBytes()
)
val partial1 = PartialAsset(
id = AssetId("aws:securityGroup:keel:keel-frontend:mgmt:us-west-2:ingress1"),
root = asset.id,
kind = "SecurityGroupRule",
spec = randomBytes()
)
val partial2 = PartialAsset(
id = AssetId("aws:securityGroup:keel:keel-frontend:mgmt:us-west-2:ingress2"),
root = asset.id,
kind = "SecurityGroupRule",
spec = randomBytes()
)
val request = UpsertAssetRequest.newBuilder()
.apply {
assetBuilder.asset = asset.toProto()
assetBuilder.addPartialAssets(partial1.toProto())
assetBuilder.addPartialAssets(partial2.toProto())
}
.build()

afterGroup {
reset(assetRepository)
}

on("upserting the asset") {
grpc.withChannel { stub ->
stub.upsertAsset(request)
.also { response ->
expectThat(response.resultList)
.hasSize(3)
.and {
map {
it.map { it.status }
}.all { isEqualTo(INSERTED) }
}
.and {
map {
it.map { it.id.value }
}.containsExactlyInAnyOrder(asset.id.value, partial1.id.value, partial2.id.value)
}
}
}
}

it("stores the asset and its partials") {
verify(assetRepository).store(asset)
verify(assetRepository).store(partial1)
verify(assetRepository).store(partial2)
}
}
}
}
})

0 comments on commit a1d6694

Please sign in to comment.