Skip to content

Commit

Permalink
adding support for other clients
Browse files Browse the repository at this point in the history
  • Loading branch information
thugrock7 committed Aug 11, 2024
1 parent 8dabf28 commit dc60684
Show file tree
Hide file tree
Showing 46 changed files with 1,180 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPInputStream;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
Expand Down Expand Up @@ -127,6 +127,51 @@ public void postJsonNonRepeatableEntity()
postJsonEntity(entity);
}

@Disabled("This is flaky !!")
@Test
public void getGzipResponse()
throws ExecutionException, InterruptedException, TimeoutException, IOException {
HttpGet getRequest =
new HttpGet(String.format("http://localhost:%s/gzip", testHttpServer.port()));
getRequest.addHeader("foo", "bar");
Future<HttpResponse> futureResponse = client.execute(getRequest, new NoopFutureCallback());

HttpResponse response = futureResponse.get();
Assertions.assertEquals(200, response.getStatusLine().getStatusCode());
try (InputStream gzipStream = new GZIPInputStream(response.getEntity().getContent())) {
String responseBody = readInputStream(gzipStream);
Assertions.assertEquals(TestHttpServer.GzipHandler.RESPONSE_BODY, responseBody);
}

TEST_WRITER.waitForTraces(1);
// exclude server spans
List<List<Span>> traces =
TEST_WRITER.waitForSpans(2, span -> span.getKind().equals(Span.SpanKind.SPAN_KIND_SERVER));
Assertions.assertEquals(1, traces.size());
Assertions.assertEquals(2, traces.get(0).size());
Span clientSpan = traces.get(0).get(1);
Span responseBodySpan = traces.get(0).get(0);
if (traces.get(0).get(0).getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT)) {
clientSpan = traces.get(0).get(0);
responseBodySpan = traces.get(0).get(1);
}

Assertions.assertEquals(
"test-value",
TEST_WRITER
.getAttributesMap(clientSpan)
.get("http.response.header.test-response-header")
.getStringValue());
Assertions.assertEquals(
"bar",
TEST_WRITER.getAttributesMap(clientSpan).get("http.request.header.foo").getStringValue());
Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body"));

Assertions.assertEquals(
TestHttpServer.GzipHandler.RESPONSE_BODY,
TEST_WRITER.getAttributesMap(responseBodySpan).get("http.response.body").getStringValue());
}

public void postJsonEntity(HttpEntity entity)
throws TimeoutException, InterruptedException, IOException, ExecutionException {
HttpPost postRequest = new HttpPost();
Expand Down Expand Up @@ -165,8 +210,7 @@ private static String readInputStream(InputStream inputStream) throws IOExceptio
StringBuilder textBuilder = new StringBuilder();

try (BufferedReader reader =
new BufferedReader(
new InputStreamReader(inputStream, Charset.forName(StandardCharsets.UTF_8.name())))) {
new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
int c;
while ((c = reader.read()) != -1) {
textBuilder.append((char) c);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.javaagent.instrumentation.hypertrace.apachehttpclient.v4_0.ApacheHttpClientObjectRegistry.SpanAndAttributeKey;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.function.Function;
import java.util.zip.GZIPInputStream;
import org.apache.http.Header;
import org.apache.http.HeaderIterator;
import org.apache.http.HttpEntity;
Expand Down Expand Up @@ -100,26 +103,32 @@ public static void traceEntity(
if (contentType == null || !ContentTypeUtils.shouldCapture(contentType.getValue())) {
return;
}

String charsetStr = ContentTypeUtils.parseCharset(contentType.getValue());
Charset charset = ContentTypeCharsetUtils.toCharset(charsetStr);

// Get the content encoding header and check if it's gzip
Header contentEncoding = entity.getContentEncoding();
boolean isGzipEncoded =
contentEncoding != null
&& contentEncoding.getValue() != null
&& contentEncoding.getValue().toLowerCase().contains("gzip");
if (entity.isRepeatable()) {
try {
BoundedByteArrayOutputStream byteArrayOutputStream =
BoundedBuffersFactory.createStream(charset);
entity.writeTo(byteArrayOutputStream);

try {
String body = byteArrayOutputStream.toStringWithSuppliedCharset();
span.setAttribute(bodyAttributeKey, body);
} catch (UnsupportedEncodingException e) {
log.error("Could not parse charset from encoding {}", charsetStr, e);
InputStream contentStream = entity.getContent();
if (isGzipEncoded) {
try {
contentStream = new GZIPInputStream(contentStream);
} catch (IOException e) {
log.error("Failed to create GZIPInputStream", e);
return;
}
}

String body = readInputStream(contentStream, charset);
span.setAttribute(bodyAttributeKey, body);

} catch (IOException e) {
log.error("Could not read request input stream from repeatable request entity/body", e);
throw new RuntimeException(e);
}

return;
}

Expand All @@ -133,4 +142,18 @@ public static void traceEntity(
ApacheHttpClientObjectRegistry.entityToSpan.put(
entity, new SpanAndAttributeKey(span, bodyAttributeKey));
}

public static String readInputStream(InputStream inputStream, Charset charset)
throws IOException {
BoundedByteArrayOutputStream outputStream = BoundedBuffersFactory.createStream(charset);
try (InputStreamReader reader = new InputStreamReader(inputStream, charset);
OutputStreamWriter writer = new OutputStreamWriter(outputStream, charset)) {
int c;
while ((c = reader.read()) != -1) {
writer.write(c);
}
writer.flush();
}
return outputStream.toStringWithSuppliedCharset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,18 @@ public static void exit(@Advice.This HttpEntity thizz, @Advice.Return InputStrea
}
Charset charset = ContentTypeCharsetUtils.toCharset(charsetStr);

String contentEncoding = null;
Header contentEncodingHeader = thizz.getContentEncoding();
if (contentEncodingHeader != null) {
contentEncoding = contentEncodingHeader.getValue();
}
SpanAndBuffer spanAndBuffer =
new SpanAndBuffer(
clientSpan.span,
BoundedBuffersFactory.createStream((int) contentSize, charset),
clientSpan.attributeKey,
charset);
charset,
contentEncoding);
VirtualField.find(InputStream.class, SpanAndBuffer.class).set(inputStream, spanAndBuffer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ public static void exit(@Advice.This InputStream thizz, @Advice.Return int avail
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
contextStore.set(thizz, null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,22 @@
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.util.zip.GZIPInputStream;
import org.hypertrace.agent.core.instrumentation.HypertraceCallDepthThreadLocalMap;
import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes;
import org.hypertrace.agent.core.instrumentation.SpanAndBuffer;
import org.hypertrace.agent.core.instrumentation.buffer.BoundedBuffersFactory;
import org.hypertrace.agent.core.instrumentation.buffer.BoundedByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,6 +91,13 @@ public static void addAttribute(Span span, AttributeKey<String> attributeKey, St
spanBuilder.setAttribute(
"http.response.header.content-type", (String) resContentType);
}
Object resContentEncoding =
getAttribute.invoke(
span, HypertraceSemanticAttributes.HTTP_RESPONSE_HEADER_CONTENT_ENCODING);
if (resContentEncoding != null) {
spanBuilder.setAttribute(
"http.response.header.content-encoding", (String) resContentEncoding);
}
}
} catch (IllegalAccessException | InvocationTargetException e) {
// ignore and continue
Expand All @@ -100,12 +113,31 @@ public static void addAttribute(Span span, AttributeKey<String> attributeKey, St
}

public static void addBody(
Span span, AttributeKey<String> attributeKey, ByteArrayOutputStream buffer, Charset charset) {
Span span,
AttributeKey<String> attributeKey,
ByteArrayOutputStream buffer,
Charset charset,
String contentEncoding) {
try {
String body = buffer.toString(charset.name());
InputStreamUtils.addAttribute(span, attributeKey, body);
byte[] data = buffer.toByteArray();

// if content-encoding is gzip,
if (contentEncoding != null && contentEncoding.toLowerCase().contains("gzip")) {
try (GZIPInputStream gzipInputStream =
new GZIPInputStream(new ByteArrayInputStream(data))) {
InputStreamReader reader = new InputStreamReader(gzipInputStream, charset);
String body = readInputStream(reader, charset);
InputStreamUtils.addAttribute(span, attributeKey, body);
}
} else {
// No decompression needed, convert directly to string
String body = new String(data, charset);
InputStreamUtils.addAttribute(span, attributeKey, body);
}
} catch (UnsupportedEncodingException e) {
log.error("Failed to parse encofing from charset {}", charset, e);
log.error("Failed to parse encoding from charset {}", charset, e);
} catch (IOException e) {
log.error("Failed to read or decompress data", e);
}
}

Expand All @@ -132,7 +164,8 @@ public static void read(
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
contextStore.set(inputStream, null);
}
}
Expand All @@ -146,7 +179,8 @@ public static void read(
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
VirtualField.find(InputStream.class, SpanAndBuffer.class).set(inputStream, null);
}
}
Expand All @@ -166,7 +200,8 @@ public static void read(
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
contextStore.set(inputStream, null);
}
}
Expand Down Expand Up @@ -194,10 +229,24 @@ public static void readNBytes(
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
contextStore.set(inputStream, null);
} else {
spanAndBuffer.byteArrayBuffer.write(b, off, read);
}
}

public static String readInputStream(InputStreamReader inputReader, Charset charset)
throws IOException {
BoundedByteArrayOutputStream outputStream = BoundedBuffersFactory.createStream(charset);
try (OutputStreamWriter writer = new OutputStreamWriter(outputStream, charset)) {
int c;
while ((c = inputReader.read()) != -1) {
writer.write(c);
}
writer.flush();
}
return outputStream.toStringWithSuppliedCharset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.micronaut.http.client.annotation.Client;
import io.micronaut.test.annotation.MicronautTest;
import io.opentelemetry.proto.trace.v1.Span;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
Expand Down Expand Up @@ -135,4 +136,41 @@ public void post() throws InterruptedException, TimeoutException {
TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body").getStringValue());
Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body"));
}

@Test
public void getGzipResponse() throws TimeoutException, InterruptedException, IOException {
String retrieve =
client
.toBlocking()
.retrieve(
HttpRequest.GET(String.format("http://localhost:%d/gzip", testHttpServer.port()))
.header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE));
Assertions.assertEquals("{\"message\": \"hello\"}", retrieve);

TEST_WRITER.waitForTraces(1);
List<List<Span>> traces =
TEST_WRITER.waitForSpans(
1,
span ->
!span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT)
|| span.getAttributesList().stream()
.noneMatch(
keyValue ->
keyValue.getKey().equals("http.url")
&& keyValue.getValue().getStringValue().contains("/gzip")));
Assertions.assertEquals(1, traces.size());
Assertions.assertEquals(1, traces.get(0).size());
Span clientSpan = traces.get(0).get(0);
Assertions.assertEquals(
REQUEST_HEADER_VALUE,
TEST_WRITER
.getAttributesMap(clientSpan)
.get("http.request.header." + REQUEST_HEADER_NAME)
.getStringValue());
Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body"));

String respBodyCapturedInSpan =
TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body").getStringValue();
Assertions.assertEquals("{\"message\": \"hello\"}", respBodyCapturedInSpan);
}
}
Loading

0 comments on commit dc60684

Please sign in to comment.