Skip to content

Commit

Permalink
Merge pull request #82 from alexiteki/main
Browse files Browse the repository at this point in the history
Updates to Async and Side Output based on Eng Feedback
  • Loading branch information
jeremyber-aws authored Dec 16, 2024
2 parents a64439e + db9cbcb commit 336a970
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ env/
venv/
.java-version
/pyflink/
/.run/

Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,20 @@
public class ProcessedEvent {

final String message;
public String processed;

@Override
public String toString() {
return "ProcessedEvent{" +
", message='" + message + '\'' +
", processed='" + processed + '\'' +
'}';
}

public ProcessedEvent(String message, String processed) {
public ProcessedEvent(String message) {
this.message = message;
this.processed = processed;
}

// Getter methods
public String getMessage() {
return message;
}

public String getProcessed() {
return processed;
}

// Setter method for processed field
public void setProcessed(String processed) {
this.processed = processed;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.*;

public class ProcessingFunction extends RichAsyncFunction<IncomingEvent, ProcessedEvent> {
private static final Logger LOG = LoggerFactory.getLogger(ProcessingFunction.class);

private final String apiUrl;
private final String apiKey;
private static ExecutorService executorService;


private transient AsyncHttpClient client;

Expand All @@ -43,41 +43,52 @@ public ProcessingFunction(String apiUrl, String apiKey) {
public void open(Configuration parameters) throws Exception {
DefaultAsyncHttpClientConfig.Builder clientBuilder = Dsl.config().setConnectTimeout(Duration.ofSeconds(10));
client = Dsl.asyncHttpClient(clientBuilder);

int numCores = Runtime.getRuntime().availableProcessors(); // get num cores on node for thread count
executorService = Executors.newFixedThreadPool(numCores);

}

@Override
public void close() throws Exception
{
client.close();
executorService.shutdown();
}

@Override
public void asyncInvoke(IncomingEvent incomingEvent, ResultFuture<ProcessedEvent> resultFuture) {

// Create a new ProcessedEvent instance
ProcessedEvent processedEvent = new ProcessedEvent(incomingEvent.getMessage(), null);
ProcessedEvent processedEvent = new ProcessedEvent(incomingEvent.getMessage());
LOG.debug("New request: {}", incomingEvent);

// Note: The Async Client used must return a Future object or equivalent
Future<Response> future = client.prepareGet(apiUrl)
.setHeader("x-api-key", apiKey)
.execute();

// Asynchronously calling API and handling response via Completable Future
CompletableFuture.supplyAsync(() -> {
try {
LOG.debug("Trying to get response for {}", incomingEvent.getId());
Response response = future.get();
return response.getStatusCode();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error during async HTTP call: {}", e.getMessage());
return -1;
}
}).thenAccept(statusCode -> {
// Process the request via a Completable Future, in order to not block request synchronously
// Notice we are passing executor service for thread management
CompletableFuture.supplyAsync(() ->
{
try {
LOG.debug("Trying to get response for {}", incomingEvent.getId());
Response response = future.get();
return response.getStatusCode();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error during async HTTP call: {}", e.getMessage());
return -1;
}
}, executorService).thenAccept(statusCode -> {
if (statusCode == 200) {
processedEvent.setProcessed("SUCCESS");
LOG.debug("Success! {}", incomingEvent.getId());
resultFuture.complete(Collections.singleton(processedEvent));
} else if (statusCode == 500) { // Retryable error
LOG.error("Status code 500, retrying shortly...");
processedEvent.setProcessed("FAIL");
resultFuture.completeExceptionally(new Throwable(statusCode.toString()));
} else {
LOG.error("Unexpected status code: {}", statusCode);
processedEvent.setProcessed("FAIL");
resultFuture.completeExceptionally(new Throwable(statusCode.toString()));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static void main(String[] args) throws Exception {
Preconditions.checkArgument(!outputStreamArn.isEmpty(), "Output stream ARN must not be empty");

processedStream
.map(value -> String.format("%s,%s", value.message, value.processed))
.map(value -> String.format("%s", value.message))
.sinkTo(createSink(outputProperties));

LOGGER.debug("Starting flink job: {}", "Async I/O Retries");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ public IncomingEvent(String message) {





}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.amazonaws.services.msf;

public enum ProcessingOutcome {
SUCCESS("SUCCESS"),
ERROR("ERROR");

private final String text;

ProcessingOutcome(final String text) {
this.text = text;
}

@Override
public String toString() {
return text;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,20 @@ public static void main(String[] args) throws Exception {
.setParallelism(1);

// Validate stream for invalid messages
SingleOutputStreamOperator<Tuple2<IncomingEvent, Boolean>> validatedStream = source
SingleOutputStreamOperator<Tuple2<IncomingEvent, ProcessingOutcome>> validatedStream = source
.map(incomingEvent -> {
boolean isPoisoned = "Poison".equals(incomingEvent.message);
return Tuple2.of(incomingEvent, isPoisoned);
}, TypeInformation.of(new TypeHint<Tuple2<IncomingEvent, Boolean>>() {
ProcessingOutcome result = "Poison".equals(incomingEvent.message)?ProcessingOutcome.ERROR: ProcessingOutcome.SUCCESS;
return Tuple2.of(incomingEvent, result);
}, TypeInformation.of(new TypeHint<Tuple2<IncomingEvent, ProcessingOutcome>>() {
}));

// Split the stream based on validation
SingleOutputStreamOperator<IncomingEvent> mainStream = validatedStream
.process(new ProcessFunction<Tuple2<IncomingEvent, Boolean>, IncomingEvent>() {
.process(new ProcessFunction<Tuple2<IncomingEvent, ProcessingOutcome>, IncomingEvent>() {
@Override
public void processElement(Tuple2<IncomingEvent, Boolean> value, Context ctx,
public void processElement(Tuple2<IncomingEvent, ProcessingOutcome> value, Context ctx,
Collector<IncomingEvent> out) throws Exception {
if (value.f1) {
if (value.f1.equals(ProcessingOutcome.ERROR)) {
// Invalid event (true), send to DLQ sink
ctx.output(invalidEventsTag, value.f0);
} else {
Expand Down

0 comments on commit 336a970

Please sign in to comment.