Skip to content

Commit

Permalink
[FLINK-20090][rest] Expose slot sharing group info in REST API
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborgsomogyi authored Feb 29, 2024
1 parent 06fdc01 commit 34a7734
Show file tree
Hide file tree
Showing 15 changed files with 231 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.io.network.partition.DataSetMetaInfo;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
Expand Down Expand Up @@ -71,6 +74,7 @@
import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
Expand All @@ -86,6 +90,8 @@
import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetIdPathParameter;
import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetListHeaders;
import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetListResponseBody;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders;
Expand All @@ -96,6 +102,7 @@
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
Expand Down Expand Up @@ -1236,6 +1243,58 @@ void testNotShowSuspendedJobStatus() throws Exception {
}
}

@Test
void testJobDetailsContainsSlotSharingGroupId() throws Exception {
final IOMetricsInfo jobVertexMetrics =
new IOMetricsInfo(0, false, 0, false, 0, false, 0, false, 0, 0, 0);
SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
final Collection<JobDetailsInfo.JobVertexDetailsInfo> jobVertexDetailsInfos =
Collections.singletonList(
new JobDetailsInfo.JobVertexDetailsInfo(
new JobVertexID(),
slotSharingGroupId,
"jobVertex1",
2,
1,
ExecutionState.RUNNING,
1,
2,
1,
Collections.singletonMap(ExecutionState.RUNNING, 0),
jobVertexMetrics));
final JobDetailsInfo jobDetailsInfo =
new JobDetailsInfo(
jobId,
"foobar",
false,
JobStatus.RUNNING,
1,
2,
1,
2,
10,
Collections.singletonMap(JobStatus.RUNNING, 1L),
jobVertexDetailsInfos,
Collections.singletonMap(ExecutionState.RUNNING, 1),
new JobPlanInfo.RawJson("{\"id\":\"1234\"}"));
final TestJobDetailsInfoHandler jobDetailsInfoHandler =
new TestJobDetailsInfoHandler(jobDetailsInfo);

try (TestRestServerEndpoint restServerEndpoint =
createRestServerEndpoint(jobDetailsInfoHandler)) {
try (RestClusterClient<?> restClusterClient =
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
final CompletableFuture<JobDetailsInfo> jobDetailsInfoFuture =
restClusterClient.getJobDetails(jobId);
Collection<JobDetailsInfo.JobVertexDetailsInfo> jobVertexInfos =
jobDetailsInfoFuture.get().getJobVertexInfos();
assertThat(jobVertexInfos).hasSize(1);
assertThat(jobVertexInfos.iterator().next().getSlotSharingGroupId())
.isEqualTo(slotSharingGroupId);
}
}
}

private class TestClientCoordinationHandler
extends TestHandler<
ClientCoordinationRequestBody,
Expand Down Expand Up @@ -1402,6 +1461,25 @@ protected CompletableFuture<JobStatusInfo> handleRequest(
}
}

private class TestJobDetailsInfoHandler
extends TestHandler<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> {

private final JobDetailsInfo jobDetailsInfo;

private TestJobDetailsInfoHandler(@Nonnull JobDetailsInfo jobDetailsInfo) {
super(JobDetailsHeaders.getInstance());
this.jobDetailsInfo = checkNotNull(jobDetailsInfo);
}

@Override
protected CompletableFuture<JobDetailsInfo> handleRequest(
@Nonnull HandlerRequest<EmptyRequestBody> request,
@Nonnull DispatcherGateway gateway)
throws RestHandlerException {
return CompletableFuture.completedFuture(jobDetailsInfo);
}
}

private abstract class TestHandler<
R extends RequestBody, P extends ResponseBody, M extends MessageParameters>
extends AbstractRestHandler<DispatcherGateway, R, P, M> {
Expand Down
3 changes: 3 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,9 @@
"id" : {
"type" : "any"
},
"slotSharingGroupId" : {
"type" : "any"
},
"name" : {
"type" : "string"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;

/**
* Common interface for the runtime {@link ExecutionJobVertex} and {@link
Expand Down Expand Up @@ -48,6 +49,13 @@ public interface AccessExecutionJobVertex {
*/
int getMaxParallelism();

/**
* Returns the slot sharing group for this job vertex.
*
* @return slot sharing group for this job vertex.
*/
SlotSharingGroup getSlotSharingGroup();

/**
* Returns the resource profile for this job vertex.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ public static ArchivedExecutionGraph createSparseArchivedExecutionGraphWithJobVe
jobVertex.getName(),
parallelismInfo.getParallelism(),
parallelismInfo.getMaxParallelism(),
jobVertex.getSlotSharingGroup(),
ResourceProfile.fromResourceSpec(
jobVertex.getMinResources(), MemorySize.ZERO),
new StringifiedAccumulatorResult[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;

import java.io.Serializable;

Expand All @@ -39,6 +40,8 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser

private final int maxParallelism;

private final SlotSharingGroup slotSharingGroup;

private final ResourceProfile resourceProfile;

private final StringifiedAccumulatorResult[] archivedUserAccumulators;
Expand All @@ -55,6 +58,7 @@ public ArchivedExecutionJobVertex(ExecutionJobVertex jobVertex) {
this.name = jobVertex.getJobVertex().getName();
this.parallelism = jobVertex.getParallelism();
this.maxParallelism = jobVertex.getMaxParallelism();
this.slotSharingGroup = jobVertex.getSlotSharingGroup();
this.resourceProfile = jobVertex.getResourceProfile();
}

Expand All @@ -64,13 +68,15 @@ public ArchivedExecutionJobVertex(
String name,
int parallelism,
int maxParallelism,
SlotSharingGroup slotSharingGroup,
ResourceProfile resourceProfile,
StringifiedAccumulatorResult[] archivedUserAccumulators) {
this.taskVertices = taskVertices;
this.id = id;
this.name = name;
this.parallelism = parallelism;
this.maxParallelism = maxParallelism;
this.slotSharingGroup = slotSharingGroup;
this.resourceProfile = resourceProfile;
this.archivedUserAccumulators = archivedUserAccumulators;
}
Expand All @@ -94,6 +100,11 @@ public int getMaxParallelism() {
return maxParallelism;
}

@Override
public SlotSharingGroup getSlotSharingGroup() {
return slotSharingGroup;
}

@Override
public ResourceProfile getResourceProfile() {
return resourceProfile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ public InputSplitAssigner getSplitAssigner() {
return splitAssigner;
}

@Override
public SlotSharingGroup getSlotSharingGroup() {
return slotSharingGroup;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,24 @@
package org.apache.flink.runtime.instance;

import org.apache.flink.util.AbstractID;
import org.apache.flink.util.StringUtils;

public class SlotSharingGroupId extends AbstractID {
private static final long serialVersionUID = 8837647978345422042L;

public SlotSharingGroupId() {
super();
}

public SlotSharingGroupId(long lowerPart, long upperPart) {
super(lowerPart, upperPart);
}

public SlotSharingGroupId() {}
public SlotSharingGroupId(byte[] bytes) {
super(bytes);
}

public static SlotSharingGroupId fromHexString(String hexString) {
return new SlotSharingGroupId(StringUtils.hexStringToByte(hexString));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(

return new JobDetailsInfo.JobVertexDetailsInfo(
ejv.getJobVertexId(),
ejv.getSlotSharingGroup().getSlotSharingGroupId(),
ejv.getName(),
ejv.getMaxParallelism(),
ejv.getParallelism(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.runtime.rest.messages.ResponseBody;
Expand All @@ -29,6 +30,8 @@
import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
import org.apache.flink.runtime.rest.messages.json.SlotSharingGroupIDDeserializer;
import org.apache.flink.runtime.rest.messages.json.SlotSharingGroupIDSerializer;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -263,6 +266,8 @@ public static final class JobVertexDetailsInfo {

public static final String FIELD_NAME_JOB_VERTEX_ID = "id";

public static final String FIELD_NAME_SLOT_SHARING_GROUP_ID = "slotSharingGroupId";

public static final String FIELD_NAME_JOB_VERTEX_NAME = "name";

public static final String FIELD_NAME_MAX_PARALLELISM = "maxParallelism";
Expand All @@ -285,6 +290,10 @@ public static final class JobVertexDetailsInfo {
@JsonSerialize(using = JobVertexIDSerializer.class)
private final JobVertexID jobVertexID;

@JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_ID)
@JsonSerialize(using = SlotSharingGroupIDSerializer.class)
private final SlotSharingGroupId slotSharingGroupId;

@JsonProperty(FIELD_NAME_JOB_VERTEX_NAME)
private final String name;

Expand Down Expand Up @@ -317,6 +326,9 @@ public JobVertexDetailsInfo(
@JsonDeserialize(using = JobVertexIDDeserializer.class)
@JsonProperty(FIELD_NAME_JOB_VERTEX_ID)
JobVertexID jobVertexID,
@JsonDeserialize(using = SlotSharingGroupIDDeserializer.class)
@JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_ID)
SlotSharingGroupId slotSharingGroupId,
@JsonProperty(FIELD_NAME_JOB_VERTEX_NAME) String name,
@JsonProperty(FIELD_NAME_MAX_PARALLELISM) int maxParallelism,
@JsonProperty(FIELD_NAME_PARALLELISM) int parallelism,
Expand All @@ -328,6 +340,7 @@ public JobVertexDetailsInfo(
Map<ExecutionState, Integer> tasksPerState,
@JsonProperty(FIELD_NAME_JOB_VERTEX_METRICS) IOMetricsInfo jobVertexMetrics) {
this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId);
this.name = Preconditions.checkNotNull(name);
this.maxParallelism = maxParallelism;
this.parallelism = parallelism;
Expand All @@ -344,6 +357,11 @@ public JobVertexID getJobVertexID() {
return jobVertexID;
}

@JsonIgnore
public SlotSharingGroupId getSlotSharingGroupId() {
return slotSharingGroupId;
}

@JsonIgnore
public String getName() {
return name;
Expand Down Expand Up @@ -404,6 +422,7 @@ public boolean equals(Object o) {
&& endTime == that.endTime
&& duration == that.duration
&& Objects.equals(jobVertexID, that.jobVertexID)
&& Objects.equals(slotSharingGroupId, that.slotSharingGroupId)
&& Objects.equals(name, that.name)
&& executionState == that.executionState
&& Objects.equals(tasksPerState, that.tasksPerState)
Expand All @@ -414,6 +433,7 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(
jobVertexID,
slotSharingGroupId,
name,
maxParallelism,
parallelism,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages.json;

import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;

import java.io.IOException;

/** Jackson deserializer for {@link SlotSharingGroupId}. */
public class SlotSharingGroupIDDeserializer extends StdDeserializer<SlotSharingGroupId> {

private static final long serialVersionUID = -2908308366715321301L;

protected SlotSharingGroupIDDeserializer() {
super(JobVertexID.class);
}

@Override
public SlotSharingGroupId deserialize(JsonParser p, DeserializationContext ctxt)
throws IOException {
return SlotSharingGroupId.fromHexString(p.getValueAsString());
}
}
Loading

0 comments on commit 34a7734

Please sign in to comment.