Skip to content

Commit

Permalink
[BugFix] Solve issues-3470: Yarn webui fails to obtain task status wh…
Browse files Browse the repository at this point in the history
…en submitting a Flink task after turning on Kerberos authentication. (#4020)

Co-authored-by: ze.miao <ze.miao@pcitc.com>
Co-authored-by: GH Action - Upstream Sync <action@github.com>
  • Loading branch information
3 people authored Dec 9, 2024
1 parent 3c84748 commit 4a2dfde
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.gateway.utils;

import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.AuthSchemes;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.config.Lookup;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.impl.auth.SPNegoSchemeFactory;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;

import java.io.IOException;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;

import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequestKerberosUrlUtils {
public static Logger logger = LoggerFactory.getLogger(RequestKerberosUrlUtils.class);
private String principal;
private String keyTabLocation;

public RequestKerberosUrlUtils() {}

public RequestKerberosUrlUtils(String principal, String keyTabLocation) {
this.principal = principal;
this.keyTabLocation = keyTabLocation;
}

public RequestKerberosUrlUtils(String principal, String keyTabLocation, boolean isDebug) {
this(principal, keyTabLocation);
if (isDebug) {
System.setProperty("sun.security.spnego.debug", "true");
System.setProperty("sun.security.krb5.debug", "true");
}
}

public RequestKerberosUrlUtils(String principal, String keyTabLocation, String krb5Location, boolean isDebug) {
this(principal, keyTabLocation, isDebug);
// System.setProperty("java.security.krb5.conf", krb5Location);
}

private static HttpClient buildSpengoHttpClient() {

Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
.register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(true))
.build();

BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(new AuthScope(null, -1, null), new Credentials() {
@Override
public Principal getUserPrincipal() {
return null;
}

@Override
public String getPassword() {
return null;
}
});

CloseableHttpClient httpClient = HttpClientBuilder.create()
.setDefaultAuthSchemeRegistry(authSchemeRegistry)
.setDefaultCredentialsProvider(credentialsProvider)
.build();
return httpClient;
}

public HttpResponse callRestUrl(final String url, final String userId) {
// logger.warn(String.format("Calling KerberosHttpClient %s %s %s", this.principal, this.keyTabLocation,
// url));
Configuration config = new Configuration() {
@Override
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
HashMap<String, Object> options = new HashMap<String, Object>() {
{
put("useTicketCache", "false");
put("useKeyTab", "true");
put("keyTab", keyTabLocation);
// Krb5 in GSS API needs to be refreshed so it does not throw the error
// Specified version of key is not available
put("refreshKrb5Config", "true");
put("principal", principal);
put("storeKey", "true");
put("doNotPrompt", "true");
put("isInitiator", "true");
put("debug", "true");
}
};
return new AppConfigurationEntry[] {
new AppConfigurationEntry(
"com.sun.security.auth.module.Krb5LoginModule",
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
options)
};
}
};
Set<Principal> princ = new HashSet<Principal>(1);
princ.add(new KerberosPrincipal(userId));
Subject sub = new Subject(false, princ, new HashSet<Object>(), new HashSet<Object>());
try {
// auth module:Krb5Login
LoginContext lc = new LoginContext("Krb5Login", sub, null, config);
lc.login();
Subject serviceSubject = lc.getSubject();
return Subject.doAs(serviceSubject, new PrivilegedAction<HttpResponse>() {
HttpResponse httpResponse = null;

@Override
public HttpResponse run() {
try {
HttpClient spnegoHttpClient = buildSpengoHttpClient();
httpResponse = spnegoHttpClient.execute(new HttpGet(url));
return httpResponse;
} catch (IOException ioe) {
ioe.printStackTrace();
}
return httpResponse;
}
});
} catch (Exception le) {
le.printStackTrace();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.dinky.gateway.result.SavePointResult;
import org.dinky.gateway.result.TestResult;
import org.dinky.gateway.result.YarnResult;
import org.dinky.gateway.utils.RequestKerberosUrlUtils;
import org.dinky.utils.FlinkJsonUtil;
import org.dinky.utils.ThreadUtil;

Expand Down Expand Up @@ -73,9 +74,12 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.zookeeper.ZooKeeper;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.net.URI;
import java.util.ArrayList;
Expand Down Expand Up @@ -404,6 +408,39 @@ protected String getWebUrl(ClusterClient<ApplicationId> clusterClient, YarnResul
+ JobsOverviewHeaders.URL.substring(1);

String json = HttpUtil.get(url);

// 增加判断访问Flink WebUI如果认证失败,尝试使用Kerberos认证
if (HttpUtil.createGet(url).execute().getStatus() == 401) {
logger.info("yarn application api url:" + url);
logger.info(
"HTTP API return code 401, try to authenticate using the Kerberos get yarn application state.");
org.apache.http.HttpResponse httpResponse = null;
String principal = configuration.get(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
String keytab = configuration.get(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
logger.info("get principal:" + principal + "||keytab:" + keytab);
BufferedReader in = null;
try {
RequestKerberosUrlUtils restTest = new RequestKerberosUrlUtils(principal, keytab, null, false);
httpResponse = restTest.callRestUrl(url, principal);
InputStream inputStream = httpResponse.getEntity().getContent();
in = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
String str = null;
while ((str = in.readLine()) != null) {
logger.info("yarn application state api content:" + str);
json = str;
}
if (httpResponse.getStatusLine().getStatusCode() != 200) {
throw new RuntimeException(String.format(
"Failed to get job details, please check yarn cluster status. Web URL is: %s the job tracking url is: %s",
webUrl, url));
}
} catch (Exception e) {
logger.info("Failed to kerberos authentication:" + e.getMessage());
e.printStackTrace();
}
logger.info("kerberos authentication login successfully and start to get job details");
}

try {
MultipleJobsDetails jobsDetails = FlinkJsonUtil.toBean(json, JobsOverviewHeaders.getInstance());
jobDetailsList.addAll(jobsDetails.getJobs());
Expand Down

0 comments on commit 4a2dfde

Please sign in to comment.