Skip to content

Commit

Permalink
[controller][grpc] Add ClusterAdminOpsGrpcService with handler and tests
Browse files Browse the repository at this point in the history
- Introduced `ClusterAdminOpsGrpcServiceImpl` for gRPC support of cluster admin
operations.

- Added `ClusterAdminOpsRequestHandler` for handling gRPC requests.

- Implemented all relevant methods in the service, including admin command status,
metadata handling, and execution ID retrieval.

- Provided comprehensive TestNG tests for the service and request handler.
  • Loading branch information
sushantmane committed Jan 18, 2025
1 parent 7442860 commit 0c69ab5
Show file tree
Hide file tree
Showing 15 changed files with 1,341 additions and 60 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
syntax = 'proto3';
package com.linkedin.venice.protocols.controller;


import "controller/ControllerGrpcRequestContext.proto";

option java_multiple_files = true;

service ClusterAdminOpsGrpcService {
// AdminCommandExecution
rpc getAdminCommandExecutionStatus(AdminCommandExecutionStatusGrpcRequest) returns (AdminCommandExecutionStatusGrpcResponse) {}
rpc getLastSuccessfulAdminCommandExecutionId(LastSuccessfulAdminCommandExecutionGrpcRequest) returns (LastSuccessfulAdminCommandExecutionGrpcResponse) {}

// AdminTopicMetadata
rpc getAdminTopicMetadata(AdminTopicMetadataGrpcRequest) returns (AdminTopicMetadataGrpcResponse) {}
rpc updateAdminTopicMetadata(UpdateAdminTopicMetadataGrpcRequest) returns (UpdateAdminTopicMetadataGrpcResponse) {}
}


message AdminCommandExecutionStatusGrpcRequest {
string clusterName = 1;
int64 adminCommandExecutionId = 2;
}

message AdminCommandExecutionStatusGrpcResponse {
string clusterName = 1;
int64 adminCommandExecutionId = 2;
string operation = 3;
string startTime = 4;
map<string, string> fabricToExecutionStatusMap = 5;
}

message LastSuccessfulAdminCommandExecutionGrpcRequest {
string clusterName = 1;
}

message LastSuccessfulAdminCommandExecutionGrpcResponse {
string clusterName = 1;
int64 lastSuccessfulAdminCommandExecutionId = 2;
}

message AdminTopicMetadataGrpcRequest {
string clusterName = 1;
optional string storeName = 2;
}

message AdminTopicMetadataGrpcResponse {
AdminTopicGrpcMetadata metadata = 1;
}

message UpdateAdminTopicMetadataGrpcResponse {
string clusterName = 1;
optional string storeName = 2;
}

message UpdateAdminTopicMetadataGrpcRequest {
AdminTopicGrpcMetadata metadata = 1;
}

message AdminTopicGrpcMetadata {
string clusterName = 1;
int64 executionId = 2;
optional string storeName = 3;
optional int64 offset = 4;
optional int64 upstreamOffset = 5;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;

Expand All @@ -15,13 +16,18 @@
import com.google.rpc.Status;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.controller.grpc.GrpcRequestResponseConverter;
import com.linkedin.venice.controllerapi.AdminCommandExecutionStatus;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo;
import com.linkedin.venice.protocols.controller.ControllerGrpcErrorType;
import com.linkedin.venice.protocols.controller.VeniceControllerGrpcErrorInfo;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -145,4 +151,63 @@ public void testParseControllerGrpcErrorWithUnpackFailure() {

assertTrue(thrownException.getMessage().contains("Failed to unpack error details"));
}

@Test
public void testToExecutionStatusMapSuccess() {
Map<String, String> grpcMap = new HashMap<>();
grpcMap.put("fabric1", "COMPLETED");
grpcMap.put("fabric2", "PROCESSING");
grpcMap.put("fabric3", "ERROR");

ConcurrentHashMap<String, AdminCommandExecutionStatus> executionStatusMap =
GrpcRequestResponseConverter.toExecutionStatusMap(grpcMap);

assertNotNull(executionStatusMap);
assertEquals(executionStatusMap.size(), grpcMap.size());
assertEquals(executionStatusMap.get("fabric1"), AdminCommandExecutionStatus.COMPLETED);
assertEquals(executionStatusMap.get("fabric2"), AdminCommandExecutionStatus.PROCESSING);
assertEquals(executionStatusMap.get("fabric3"), AdminCommandExecutionStatus.ERROR);
}

@Test
public void testToExecutionStatusMapWithEmptyInput() {
Map<String, String> grpcMap = Collections.emptyMap();
ConcurrentHashMap<String, AdminCommandExecutionStatus> executionStatusMap =
GrpcRequestResponseConverter.toExecutionStatusMap(grpcMap);

assertNotNull(executionStatusMap);
assertEquals(executionStatusMap.size(), 0);
}

@Test
public void testToExecutionStatusMapWithInvalidStatus() {
Map<String, String> grpcMap = new HashMap<>();
grpcMap.put("fabric1", "INVALID_STATUS");
assertThrows(IllegalArgumentException.class, () -> GrpcRequestResponseConverter.toExecutionStatusMap(grpcMap));
}

@Test
public void testToGrpcExecutionStatusMapSuccess() {
ConcurrentHashMap<String, AdminCommandExecutionStatus> executionStatusMap = new ConcurrentHashMap<>();
executionStatusMap.put("fabric1", AdminCommandExecutionStatus.COMPLETED);
executionStatusMap.put("fabric2", AdminCommandExecutionStatus.PROCESSING);
executionStatusMap.put("fabric3", AdminCommandExecutionStatus.ERROR);

Map<String, String> grpcMap = GrpcRequestResponseConverter.toGrpcExecutionStatusMap(executionStatusMap);

assertNotNull(grpcMap);
assertEquals(grpcMap.size(), executionStatusMap.size());
assertEquals(grpcMap.get("fabric1"), "COMPLETED");
assertEquals(grpcMap.get("fabric2"), "PROCESSING");
assertEquals(grpcMap.get("fabric3"), "ERROR");
}

@Test
public void testToGrpcExecutionStatusMapWithEmptyInput() {
ConcurrentHashMap<String, AdminCommandExecutionStatus> executionStatusMap = new ConcurrentHashMap<>();
Map<String, String> grpcMap = GrpcRequestResponseConverter.toGrpcExecutionStatusMap(executionStatusMap);

assertNotNull(grpcMap);
assertEquals(grpcMap.size(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.linkedin.venice.authorization.AuthorizerService;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.controller.grpc.server.ClusterAdminOpsGrpcServiceImpl;
import com.linkedin.venice.controller.grpc.server.StoreGrpcServiceImpl;
import com.linkedin.venice.controller.grpc.server.interceptor.ControllerGrpcAuditLoggingInterceptor;
import com.linkedin.venice.controller.grpc.server.interceptor.ControllerGrpcSslSessionInterceptor;
Expand Down Expand Up @@ -286,9 +287,12 @@ private void initializeGrpcServer() {
interceptors.add(parentControllerRegionValidationInterceptor);

VeniceControllerGrpcServiceImpl grpcService = new VeniceControllerGrpcServiceImpl(unsecureRequestHandler);
StoreGrpcServiceImpl storeAclGrpcServiceGrpc = new StoreGrpcServiceImpl(
StoreGrpcServiceImpl storeGrpcServiceGrpc = new StoreGrpcServiceImpl(
unsecureRequestHandler.getStoreRequestHandler(),
unsecureRequestHandler.getControllerAccessManager());
ClusterAdminOpsGrpcServiceImpl clusterAdminOpsGrpcService = new ClusterAdminOpsGrpcServiceImpl(
unsecureRequestHandler.getClusterAdminOpsRequestHandler(),
unsecureRequestHandler.getControllerAccessManager());
grpcExecutor = ThreadPoolFactory.createThreadPool(
multiClusterConfigs.getGrpcServerThreadCount(),
CONTROLLER_GRPC_SERVER_THREAD_NAME,
Expand All @@ -298,7 +302,8 @@ private void initializeGrpcServer() {
adminGrpcServer = new VeniceGrpcServer(
new VeniceGrpcServerConfig.Builder().setPort(multiClusterConfigs.getAdminGrpcPort())
.addService(grpcService)
.addService(storeAclGrpcServiceGrpc)
.addService(storeGrpcServiceGrpc)
.addService(clusterAdminOpsGrpcService)
.setExecutor(grpcExecutor)
.setInterceptors(interceptors)
.build());
Expand All @@ -309,13 +314,17 @@ private void initializeGrpcServer() {
multiClusterConfigs.getSslConfig().get().getSslProperties(),
multiClusterConfigs.getSslFactoryClassName());
VeniceControllerGrpcServiceImpl secureGrpcService = new VeniceControllerGrpcServiceImpl(secureRequestHandler);
StoreGrpcServiceImpl secureStoreAclGrpcService = new StoreGrpcServiceImpl(
StoreGrpcServiceImpl secureStoreGrpcService = new StoreGrpcServiceImpl(
secureRequestHandler.getStoreRequestHandler(),
secureRequestHandler.getControllerAccessManager());
ClusterAdminOpsGrpcServiceImpl secureClusterAdminOpsGrpcService = new ClusterAdminOpsGrpcServiceImpl(
secureRequestHandler.getClusterAdminOpsRequestHandler(),
secureRequestHandler.getControllerAccessManager());
adminSecureGrpcServer = new VeniceGrpcServer(
new VeniceGrpcServerConfig.Builder().setPort(multiClusterConfigs.getAdminSecureGrpcPort())
.addService(secureGrpcService)
.addService(secureStoreAclGrpcService)
.addService(secureStoreGrpcService)
.addService(secureClusterAdminOpsGrpcService)
.setExecutor(grpcExecutor)
.setSslFactory(sslFactory)
.setInterceptors(interceptors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.protobuf.Any;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.controllerapi.AdminCommandExecutionStatus;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo;
import com.linkedin.venice.protocols.controller.ControllerGrpcErrorType;
Expand All @@ -10,6 +11,8 @@
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


public class GrpcRequestResponseConverter {
Expand Down Expand Up @@ -140,4 +143,35 @@ public static VeniceControllerGrpcErrorInfo parseControllerGrpcError(StatusRunti
}
throw new VeniceClientException("An unknown gRPC error occurred. Error code: " + Code.UNKNOWN.name());
}

/**
* Converts a gRPC map with string statuses to a ConcurrentHashMap with AdminCommandExecutionStatus.
*
* @param grpcMap Map with keys as fabric names and values as string statuses.
* @return A ConcurrentHashMap with keys as fabric names and values as AdminCommandExecutionStatus.
*/
public static ConcurrentHashMap<String, AdminCommandExecutionStatus> toExecutionStatusMap(
Map<String, String> grpcMap) {
ConcurrentHashMap<String, AdminCommandExecutionStatus> executionStatusMap = new ConcurrentHashMap<>(grpcMap.size());
for (Map.Entry<String, String> entry: grpcMap.entrySet()) {
executionStatusMap.put(entry.getKey(), AdminCommandExecutionStatus.valueOf(entry.getValue()));
}
return executionStatusMap;
}

/**
* Converts a ConcurrentHashMap with AdminCommandExecutionStatus to a map with string statuses.
*
* @param executionStatusMap ConcurrentHashMap with keys as fabric names and values as AdminCommandExecutionStatus.
* @return A Map with keys as fabric names and values as string statuses.
*/
public static Map<String, String> toGrpcExecutionStatusMap(
Map<String, AdminCommandExecutionStatus> executionStatusMap) {
ConcurrentHashMap<String, String> grpcMap = new ConcurrentHashMap<>(executionStatusMap.size());
for (Map.Entry<String, AdminCommandExecutionStatus> entry: executionStatusMap.entrySet()) {
grpcMap.put(entry.getKey(), entry.getValue().name());
}
return grpcMap;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.linkedin.venice.controller.grpc.server;

import static com.linkedin.venice.controller.grpc.server.ControllerGrpcServerUtils.isAllowListUser;
import static com.linkedin.venice.controller.server.VeniceRouteHandler.ACL_CHECK_FAILURE_WARN_MESSAGE_PREFIX;
import static com.linkedin.venice.protocols.controller.ClusterAdminOpsGrpcServiceGrpc.*;

import com.linkedin.venice.controller.server.ClusterAdminOpsRequestHandler;
import com.linkedin.venice.controller.server.VeniceControllerAccessManager;
import com.linkedin.venice.exceptions.VeniceUnauthorizedAccessException;
import com.linkedin.venice.protocols.controller.AdminCommandExecutionStatusGrpcRequest;
import com.linkedin.venice.protocols.controller.AdminCommandExecutionStatusGrpcResponse;
import com.linkedin.venice.protocols.controller.AdminTopicGrpcMetadata;
import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcRequest;
import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcResponse;
import com.linkedin.venice.protocols.controller.ClusterAdminOpsGrpcServiceGrpc;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcResponse;
import io.grpc.Context;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public class ClusterAdminOpsGrpcServiceImpl extends ClusterAdminOpsGrpcServiceImplBase {
private static final Logger LOGGER = LogManager.getLogger(ClusterAdminOpsGrpcServiceImpl.class);
private final ClusterAdminOpsRequestHandler requestHandler;
private final VeniceControllerAccessManager accessManager;

public ClusterAdminOpsGrpcServiceImpl(
ClusterAdminOpsRequestHandler requestHandler,
VeniceControllerAccessManager accessManager) {
this.requestHandler = requestHandler;
this.accessManager = accessManager;
}

@Override
public void getAdminCommandExecutionStatus(
AdminCommandExecutionStatusGrpcRequest request,
StreamObserver<AdminCommandExecutionStatusGrpcResponse> responseObserver) {
LOGGER.debug("Received getAdminCommandExecutionStatus request: {}", request);
ControllerGrpcServerUtils.handleRequest(
ClusterAdminOpsGrpcServiceGrpc.getGetAdminCommandExecutionStatusMethod(),
() -> requestHandler.getAdminCommandExecutionStatus(request),
responseObserver,
request.getClusterName(),
null);
}

@Override
public void getLastSuccessfulAdminCommandExecutionId(
LastSuccessfulAdminCommandExecutionGrpcRequest request,
StreamObserver<LastSuccessfulAdminCommandExecutionGrpcResponse> responseObserver) {
LOGGER.debug("Received getLastSuccessfulAdminCommandExecutionId request: {}", request);
ControllerGrpcServerUtils.handleRequest(
ClusterAdminOpsGrpcServiceGrpc.getGetLastSuccessfulAdminCommandExecutionIdMethod(),
() -> requestHandler.getLastSucceedExecutionId(request),
responseObserver,
request.getClusterName(),
null);
}

@Override
public void getAdminTopicMetadata(
AdminTopicMetadataGrpcRequest request,
StreamObserver<AdminTopicMetadataGrpcResponse> responseObserver) {
LOGGER.debug("Received getAdminTopicMetadata request: {}", request);
ControllerGrpcServerUtils.handleRequest(
ClusterAdminOpsGrpcServiceGrpc.getGetAdminTopicMetadataMethod(),
() -> requestHandler.getAdminTopicMetadata(request),
responseObserver,
request.getClusterName(),
request.hasStoreName() ? request.getStoreName() : null);
}

@Override
public void updateAdminTopicMetadata(
UpdateAdminTopicMetadataGrpcRequest request,
StreamObserver<UpdateAdminTopicMetadataGrpcResponse> responseObserver) {
LOGGER.debug("Received updateAdminTopicMetadata request: {}", request);
AdminTopicGrpcMetadata metadata = request.getMetadata();
ControllerGrpcServerUtils.handleRequest(ClusterAdminOpsGrpcServiceGrpc.getUpdateAdminTopicMetadataMethod(), () -> {
if (!isAllowListUser(accessManager, request.getMetadata().getStoreName(), Context.current())) {
throw new VeniceUnauthorizedAccessException(
ACL_CHECK_FAILURE_WARN_MESSAGE_PREFIX
+ ClusterAdminOpsGrpcServiceGrpc.getUpdateAdminTopicMetadataMethod().getFullMethodName());
}
return requestHandler.updateAdminTopicMetadata(request);
}, responseObserver, metadata.getClusterName(), metadata.hasStoreName() ? metadata.getStoreName() : null);
}
}
Loading

0 comments on commit 0c69ab5

Please sign in to comment.