Skip to content

Commit

Permalink
[FLINK-34546] Emit span with failure labels on failure.
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter committed Mar 1, 2024
1 parent 5f06ce7 commit 7c8e3f5
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,20 @@ public class TraceOptions {
+ " any of the names in the list will be started. Otherwise, all reporters that could be found in"
+ " the configuration will be started.");

/**
* Temporary option to report events as span. This option will be removed once we support
* reporting events.
*/
@Deprecated
public static final ConfigOption<Boolean> REPORT_EVENTS_AS_SPANS =
key("traces.report-events-as-spans")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to report events as spans. This is a temporary parameter that "
+ "is in place until we have support for reporting events. "
+ "In the meantime, this can be activated to report them as spans instead.");

/**
* Returns a view over the given configuration via which options can be set/retrieved for the
* given reporter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.flink.runtime.executiongraph.failover;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TraceOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.failure.FailureEnricher.Context;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.Execution;
Expand All @@ -28,6 +31,8 @@
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.throwable.ThrowableClassifier;
import org.apache.flink.runtime.throwable.ThrowableType;
import org.apache.flink.traces.Span;
import org.apache.flink.traces.SpanBuilder;
import org.apache.flink.util.IterableUtils;

import javax.annotation.Nullable;
Expand All @@ -47,6 +52,8 @@
*/
public class ExecutionFailureHandler {

public static final String FAILURE_LABEL_ATTRIBUTE_PREFIX = "failureLabel.";

private final SchedulingTopology schedulingTopology;

/** Strategy to judge which tasks should be restarted. */
Expand All @@ -62,6 +69,9 @@ public class ExecutionFailureHandler {
private final Context globalFailureCtx;
private final Collection<FailureEnricher> failureEnrichers;
private final ComponentMainThreadExecutor mainThreadExecutor;
private final MetricGroup metricGroup;

private final boolean reportEventsAsSpans;

/**
* Creates the handler to deal with task failures.
Expand All @@ -76,13 +86,15 @@ public class ExecutionFailureHandler {
* @param globalFailureCtx Global failure Context used by FailureEnrichers
*/
public ExecutionFailureHandler(
final Configuration jobMasterConfig,
final SchedulingTopology schedulingTopology,
final FailoverStrategy failoverStrategy,
final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
final ComponentMainThreadExecutor mainThreadExecutor,
final Collection<FailureEnricher> failureEnrichers,
final Context taskFailureCtx,
final Context globalFailureCtx) {
final Context globalFailureCtx,
final MetricGroup metricGroup) {

this.schedulingTopology = checkNotNull(schedulingTopology);
this.failoverStrategy = checkNotNull(failoverStrategy);
Expand All @@ -91,6 +103,8 @@ public ExecutionFailureHandler(
this.failureEnrichers = checkNotNull(failureEnrichers);
this.taskFailureCtx = taskFailureCtx;
this.globalFailureCtx = globalFailureCtx;
this.metricGroup = metricGroup;
this.reportEventsAsSpans = jobMasterConfig.get(TraceOptions.REPORT_EVENTS_AS_SPANS);
}

/**
Expand All @@ -104,7 +118,7 @@ public ExecutionFailureHandler(
*/
public FailureHandlingResult getFailureHandlingResult(
Execution failedExecution, Throwable cause, long timestamp) {
return handleFailure(
return handleFailureAndReport(
failedExecution,
cause,
timestamp,
Expand All @@ -123,7 +137,7 @@ public FailureHandlingResult getFailureHandlingResult(
*/
public FailureHandlingResult getGlobalFailureHandlingResult(
final Throwable cause, long timestamp) {
return handleFailure(
return handleFailureAndReport(
null,
cause,
timestamp,
Expand All @@ -141,6 +155,51 @@ private CompletableFuture<Map<String, String>> labelFailure(Throwable cause, boo
return FailureEnricherUtils.labelFailure(cause, ctx, mainThreadExecutor, failureEnrichers);
}

private FailureHandlingResult handleFailureAndReport(
@Nullable final Execution failedExecution,
final Throwable cause,
long timestamp,
final Set<ExecutionVertexID> verticesToRestart,
final boolean globalFailure) {

FailureHandlingResult failureHandlingResult =
handleFailure(failedExecution, cause, timestamp, verticesToRestart, globalFailure);

if (reportEventsAsSpans) {
// TODO: replace with reporting as event once events are supported.
// Add reporting as callback for when the failure labeling is completed.
failureHandlingResult
.getFailureLabels()
.thenAcceptAsync(
labels -> reportFailureHandling(failureHandlingResult, labels),
mainThreadExecutor);
}

return failureHandlingResult;
}

private void reportFailureHandling(
FailureHandlingResult failureHandlingResult, Map<String, String> failureLabels) {

// Add base attributes
SpanBuilder spanBuilder =
Span.builder(ExecutionFailureHandler.class, "JobFailure")
.setStartTsMillis(failureHandlingResult.getTimestamp())
.setEndTsMillis(failureHandlingResult.getTimestamp())
.setAttribute(
"canRestart", String.valueOf(failureHandlingResult.canRestart()))
.setAttribute(
"isGlobalFailure",
String.valueOf(failureHandlingResult.isGlobalFailure()));

// Add all failure labels
for (Map.Entry<String, String> entry : failureLabels.entrySet()) {
spanBuilder.setAttribute(
FAILURE_LABEL_ATTRIBUTE_PREFIX + entry.getKey(), entry.getValue());
}
metricGroup.addSpan(spanBuilder);
}

private FailureHandlingResult handleFailure(
@Nullable final Execution failedExecution,
final Throwable cause,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,15 @@ protected DefaultScheduler(

this.executionFailureHandler =
new ExecutionFailureHandler(
jobMasterConfiguration,
getSchedulingTopology(),
failoverStrategy,
restartBackoffTimeStrategy,
mainThreadExecutor,
failureEnrichers,
taskFailureCtx,
globalFailureCtx);
globalFailureCtx,
jobManagerJobMetricGroup);

this.schedulingStrategy =
schedulingStrategyFactory.createInstance(this, getSchedulingTopology());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

package org.apache.flink.runtime.executiongraph.failover;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TraceOptions;
import org.apache.flink.core.failure.TestingFailureEnricher;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.Execution;
Expand All @@ -29,14 +32,19 @@
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.traces.Span;
import org.apache.flink.traces.SpanBuilder;
import org.apache.flink.util.IterableUtils;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -66,6 +74,8 @@ class ExecutionFailureHandlerTest {

private TestingFailureEnricher testingFailureEnricher;

private List<Span> spanCollector;

@BeforeEach
void setUp() {
TestingSchedulingTopology topology = new TestingSchedulingTopology();
Expand All @@ -77,15 +87,25 @@ void setUp() {
isNewAttempt = new AtomicBoolean(true);
backoffTimeStrategy =
new TestRestartBackoffTimeStrategy(true, RESTART_DELAY_MS, isNewAttempt::get);
spanCollector = new CopyOnWriteArrayList<>();
Configuration configuration = new Configuration();
configuration.set(TraceOptions.REPORT_EVENTS_AS_SPANS, Boolean.TRUE);
executionFailureHandler =
new ExecutionFailureHandler(
configuration,
schedulingTopology,
failoverStrategy,
backoffTimeStrategy,
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
Collections.singleton(testingFailureEnricher),
null,
null);
null,
new UnregisteredMetricsGroup() {
@Override
public void addSpan(SpanBuilder spanBuilder) {
spanCollector.add(spanBuilder.build());
}
});
}

/** Tests the case that task restarting is accepted. */
Expand Down Expand Up @@ -115,6 +135,7 @@ void testNormalFailureHandling() throws Exception {
assertThat(result.getFailureLabels().get())
.isEqualTo(testingFailureEnricher.getFailureLabels());
assertThat(executionFailureHandler.getNumberOfRestarts()).isOne();
checkMetrics(spanCollector, false, true);
}

/** Tests the case that task restarting is suppressed. */
Expand Down Expand Up @@ -151,6 +172,7 @@ void testRestartingSuppressedFailureHandlingResult() throws Exception {
.isInstanceOf(IllegalStateException.class);

assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
checkMetrics(spanCollector, false, false);
}

/** Tests the case that the failure is non-recoverable type. */
Expand Down Expand Up @@ -192,6 +214,7 @@ void testNonRecoverableFailureHandlingResult() throws Exception {
.isInstanceOf(IllegalStateException.class);

assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
checkMetrics(spanCollector, false, false);
}

@Test
Expand All @@ -217,6 +240,7 @@ void testNewAttemptAndNumberOfRestarts() throws Exception {
isNewAttempt.set(false);
testHandlingConcurrentException(execution, error);
testHandlingConcurrentException(execution, error);
checkMetrics(spanCollector, false, true);
}

private void testHandlingRootException(Execution execution, Throwable error) {
Expand Down Expand Up @@ -283,6 +307,7 @@ void testGlobalFailureHandling() throws ExecutionException, InterruptedException
assertThat(testingFailureEnricher.getSeenThrowables()).containsExactly(error);
assertThat(result.getFailureLabels().get())
.isEqualTo(testingFailureEnricher.getFailureLabels());
checkMetrics(spanCollector, true, true);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -310,4 +335,16 @@ public Set<ExecutionVertexID> getTasksNeedingRestart(
return tasksToRestart;
}
}

private void checkMetrics(List<Span> results, boolean global, boolean canRestart) {
assertThat(results).isNotEmpty();
for (Span span : results) {
assertThat(span.getScope()).isEqualTo(ExecutionFailureHandler.class.getCanonicalName());
assertThat(span.getName()).isEqualTo("JobFailure");
Map<String, Object> attributes = span.getAttributes();
assertThat(attributes).containsEntry("failureLabel.failKey", "failValue");
assertThat(attributes).containsEntry("canRestart", String.valueOf(canRestart));
assertThat(attributes).containsEntry("isGlobalFailure", String.valueOf(global));
}
}
}

0 comments on commit 7c8e3f5

Please sign in to comment.