From 0d010356f19b8e663f3aa691c135288064d5d6a2 Mon Sep 17 00:00:00 2001 From: Victor Nazzaro Date: Wed, 11 Dec 2024 16:09:06 -0800 Subject: [PATCH] Add gRPC authenticator for exporter --- .../exporter/internal/auth/Authenticator.java | 2 +- .../internal/grpc/GrpcExporterBuilder.java | 10 ++++++- .../internal/grpc/GrpcSenderProvider.java | 4 ++- .../internal/auth/AuthenticatorTest.java | 22 +++++++++----- .../otlp/trace/OltpExporterBenchmark.java | 1 + .../internal/UpstreamGrpcSenderProvider.java | 29 +++++++++++++++++-- .../okhttp/internal/OkHttpGrpcSender.java | 20 +++++++++++-- .../internal/OkHttpGrpcSenderProvider.java | 7 +++-- .../internal/OkHttpGrpcSuppressionTest.java | 2 +- 9 files changed, 80 insertions(+), 17 deletions(-) diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/auth/Authenticator.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/auth/Authenticator.java index 7ad2547ee20..d86e3ab1fa6 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/auth/Authenticator.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/auth/Authenticator.java @@ -41,7 +41,7 @@ static void setAuthenticatorOnDelegate(Object builder, Authenticator authenticat field.setAccessible(true); Object value = field.get(builder); if (value instanceof GrpcExporterBuilder) { - throw new IllegalArgumentException("GrpcExporterBuilder not supported yet."); + ((GrpcExporterBuilder) value).setAuthenticator(authenticator); } else if (value instanceof HttpExporterBuilder) { ((HttpExporterBuilder) value).setAuthenticator(authenticator); } else { diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java index c5b04fd8db8..a98e77f95f2 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java @@ -12,6 +12,7 @@ import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.exporter.internal.ExporterBuilderUtil; import io.opentelemetry.exporter.internal.TlsConfigHelper; +import io.opentelemetry.exporter.internal.auth.Authenticator; import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.common.export.RetryPolicy; @@ -62,6 +63,7 @@ public class GrpcExporterBuilder { private TlsConfigHelper tlsConfigHelper = new TlsConfigHelper(); @Nullable private RetryPolicy retryPolicy = RetryPolicy.getDefault(); private Supplier meterProviderSupplier = GlobalOpenTelemetry::getMeterProvider; + @Nullable private Authenticator authenticator; // Use Object type since gRPC may not be on the classpath. @Nullable private Object grpcChannel; @@ -147,6 +149,11 @@ public GrpcExporterBuilder setMeterProvider(Supplier meterProv return this; } + public GrpcExporterBuilder setAuthenticator(Authenticator authenticator) { + this.authenticator = authenticator; + return this; + } + @SuppressWarnings("BuilderReturnThis") public GrpcExporterBuilder copy() { GrpcExporterBuilder copy = @@ -209,7 +216,8 @@ public GrpcExporter build() { grpcStubFactory, retryPolicy, isPlainHttp ? null : tlsConfigHelper.getSslContext(), - isPlainHttp ? null : tlsConfigHelper.getTrustManager()); + isPlainHttp ? null : tlsConfigHelper.getTrustManager(), + authenticator); LOGGER.log(Level.FINE, "Using GrpcSender: " + grpcSender.getClass().getName()); return new GrpcExporter<>(exporterName, type, grpcSender, meterProviderSupplier); diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSenderProvider.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSenderProvider.java index 48f0e927d26..10916e2546b 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSenderProvider.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSenderProvider.java @@ -6,6 +6,7 @@ package io.opentelemetry.exporter.internal.grpc; import io.grpc.Channel; +import io.opentelemetry.exporter.internal.auth.Authenticator; import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.common.export.RetryPolicy; @@ -40,5 +41,6 @@ GrpcSender createSender( Supplier>> stubFactory, @Nullable RetryPolicy retryPolicy, @Nullable SSLContext sslContext, - @Nullable X509TrustManager trustManager); + @Nullable X509TrustManager trustManager, + @Nullable Authenticator authenticator); } diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/auth/AuthenticatorTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/auth/AuthenticatorTest.java index 93e07e12d41..a6fd4ae2d9a 100644 --- a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/auth/AuthenticatorTest.java +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/auth/AuthenticatorTest.java @@ -10,7 +10,9 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.opentelemetry.exporter.internal.grpc.GrpcExporter; +import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; +import java.net.URI; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -31,16 +33,22 @@ void getHeaders() { @Test void setAuthenticatorOnDelegate_Success() { - HttpExporterBuilder builder = + // For HTTP exporter + HttpExporterBuilder httpBuilder = new HttpExporterBuilder<>("otlp", "test", "http://localhost:4318/test"); - - assertThat(builder).extracting("authenticator").isNull(); - + assertThat(httpBuilder).extracting("authenticator").isNull(); Authenticator authenticator = Collections::emptyMap; + Authenticator.setAuthenticatorOnDelegate(new WithDelegate(httpBuilder), authenticator); + assertThat(httpBuilder) + .extracting("authenticator", as(InstanceOfAssertFactories.type(Authenticator.class))) + .isSameAs(authenticator); - Authenticator.setAuthenticatorOnDelegate(new WithDelegate(builder), authenticator); - - assertThat(builder) + // For GRPC exporter + GrpcExporterBuilder grpcBuilder = + new GrpcExporterBuilder<>("otlp", "test", 60, URI.create("test"), null, "/test"); + assertThat(grpcBuilder).extracting("authenticator").isNull(); + Authenticator.setAuthenticatorOnDelegate(new WithDelegate(grpcBuilder), authenticator); + assertThat(grpcBuilder) .extracting("authenticator", as(InstanceOfAssertFactories.type(Authenticator.class))) .isSameAs(authenticator); } diff --git a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java index 5dc9f2107c8..b7f631806f5 100644 --- a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java +++ b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java @@ -105,6 +105,7 @@ public void setUp() { Collections::emptyMap, null, null, + null, null), MeterProvider::noop); diff --git a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java index d07bc4ed614..94f49147885 100644 --- a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java +++ b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java @@ -5,11 +5,16 @@ package io.opentelemetry.exporter.sender.grpc.managedchannel.internal; +import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; + +import io.grpc.CallCredentials; import io.grpc.Channel; import io.grpc.Codec; import io.grpc.CompressorRegistry; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.opentelemetry.exporter.internal.auth.Authenticator; import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.grpc.GrpcSender; import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider; @@ -21,6 +26,7 @@ import java.net.URI; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import java.util.function.BiFunction; import java.util.function.Supplier; import javax.annotation.Nullable; @@ -47,7 +53,8 @@ public GrpcSender createSender( Supplier>> stubFactory, @Nullable RetryPolicy retryPolicy, @Nullable SSLContext sslContext, - @Nullable X509TrustManager trustManager) { + @Nullable X509TrustManager trustManager, + @Nullable Authenticator authenticator) { boolean shutdownChannel = false; if (managedChannel == null) { // Shutdown the channel as part of the exporter shutdown sequence if @@ -83,11 +90,29 @@ public OutputStream compress(OutputStream os) throws IOException { compression = compressor.getEncoding(); } + CallCredentials cred = + new CallCredentials() { + @Override + public void applyRequestMetadata( + RequestInfo requestInfo, Executor executor, MetadataApplier metadataApplier) { + Metadata headers = new Metadata(); + if (authenticator != null) { + // For each header provided in the authenticator, put it in the header of the + // Metadata. + for (Map.Entry e : authenticator.getHeaders().entrySet()) { + headers.put(Metadata.Key.of(e.getKey(), ASCII_STRING_MARSHALLER), e.getValue()); + } + } + metadataApplier.apply(headers); + } + }; + MarshalerServiceStub stub = stubFactory .get() .apply((Channel) managedChannel, authorityOverride) - .withCompression(compression); + .withCompression(compression) + .withCallCredentials(cred); return new UpstreamGrpcSender<>(stub, shutdownChannel, timeoutNanos, headersSupplier); } diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java index f8b4996b90e..94093df79ed 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java @@ -25,6 +25,7 @@ import io.opentelemetry.api.internal.InstrumentationUtil; import io.opentelemetry.exporter.internal.RetryUtil; +import io.opentelemetry.exporter.internal.auth.Authenticator; import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil; import io.opentelemetry.exporter.internal.grpc.GrpcResponse; @@ -42,6 +43,7 @@ import java.util.Map; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.net.ssl.SSLContext; import javax.net.ssl.X509TrustManager; @@ -80,7 +82,8 @@ public OkHttpGrpcSender( Supplier>> headersSupplier, @Nullable RetryPolicy retryPolicy, @Nullable SSLContext sslContext, - @Nullable X509TrustManager trustManager) { + @Nullable X509TrustManager trustManager, + @Nullable Authenticator authenticator) { OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder() .dispatcher(OkHttpUtil.newDispatcher()) @@ -90,7 +93,6 @@ public OkHttpGrpcSender( clientBuilder.addInterceptor( new RetryInterceptor(retryPolicy, OkHttpGrpcSender::isRetryable)); } - boolean isPlainHttp = endpoint.startsWith("http://"); if (isPlainHttp) { clientBuilder.connectionSpecs(Collections.singletonList(ConnectionSpec.CLEARTEXT)); @@ -102,6 +104,20 @@ public OkHttpGrpcSender( } } + if (authenticator != null) { + Map> headers = headersSupplier.get(); + // Convert the auth header type of Map to the expected type of + // OkHttpGrpcSender of Map> + Map> authHeaders = + authenticator.getHeaders().entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, e -> Collections.singletonList(e.getValue()))); + // The authenticator headers will override the default headers if there are duplicate keys. + headers.putAll(authHeaders); + headersSupplier = () -> headers; + } + this.client = clientBuilder.build(); this.headersSupplier = headersSupplier; this.url = HttpUrl.get(endpoint); diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java index b6595ee2866..78844660bbe 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java @@ -6,6 +6,7 @@ package io.opentelemetry.exporter.sender.okhttp.internal; import io.grpc.Channel; +import io.opentelemetry.exporter.internal.auth.Authenticator; import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.grpc.GrpcSender; import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider; @@ -41,7 +42,8 @@ public GrpcSender createSender( Supplier>> stubFactory, @Nullable RetryPolicy retryPolicy, @Nullable SSLContext sslContext, - @Nullable X509TrustManager trustManager) { + @Nullable X509TrustManager trustManager, + @Nullable Authenticator authenticator) { return new OkHttpGrpcSender<>( endpoint.resolve(endpointPath).toString(), compressor, @@ -50,6 +52,7 @@ public GrpcSender createSender( headersSupplier, retryPolicy, sslContext, - trustManager); + trustManager, + authenticator); } } diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java index 6eb0a4393b3..39457f4e2de 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java @@ -21,7 +21,7 @@ void send(OkHttpGrpcSender sender, Runnable onSuccess, Runnable @Override OkHttpGrpcSender createSender(String endpoint) { return new OkHttpGrpcSender<>( - "https://localhost", null, 10L, 10L, Collections::emptyMap, null, null, null); + "https://localhost", null, 10L, 10L, Collections::emptyMap, null, null, null, null); } protected static class DummyMarshaler extends MarshalerWithSize {