Skip to content

Commit

Permalink
Merge branch 'main' into FSTORE-1627
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks authored Dec 2, 2024
2 parents 0de2a3a + db70457 commit 1d53353
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HopsworksConnectionBase;
import com.logicalclocks.hsfs.SecretStore;
import com.logicalclocks.hsfs.flink.engine.FlinkEngine;
import com.logicalclocks.hsfs.metadata.Credentials;
import com.logicalclocks.hsfs.metadata.HopsworksClient;

import com.logicalclocks.hsfs.metadata.HopsworksHttpClient;
Expand All @@ -30,14 +30,17 @@
import software.amazon.awssdk.regions.Region;

import java.io.IOException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;

public class HopsworksConnection extends HopsworksConnectionBase {

@Builder
public HopsworksConnection(String host, int port, String project, Region region, SecretStore secretStore,
boolean hostnameVerification, String trustStorePath,
String certPath, String apiKeyFilePath, String apiKeyValue)
throws IOException, FeatureStoreException {
throws IOException, FeatureStoreException, KeyStoreException, CertificateException, NoSuchAlgorithmException {
this.host = host;
this.port = port;
this.project = getProjectName(project);
Expand All @@ -54,10 +57,11 @@ public HopsworksConnection(String host, int port, String project, Region region,
this.projectObj = getProject();
HopsworksClient.getInstance().setProject(this.projectObj);
if (!System.getProperties().containsKey(HopsworksInternalClient.REST_ENDPOINT_SYS)) {
HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient();
hopsworksHttpClient.setTrustStorePath(FlinkEngine.getInstance().getTrustStorePath());
hopsworksHttpClient.setKeyStorePath(FlinkEngine.getInstance().getKeyStorePath());
hopsworksHttpClient.setCertKey(HopsworksHttpClient.readCertKey(FlinkEngine.getInstance().getCertKey()));
Credentials credentials = HopsworksClient.getInstance().getCredentials();
HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient();
hopsworksHttpClient.setTrustStorePath(credentials.gettStore());
hopsworksHttpClient.setKeyStorePath(credentials.getkStore());
hopsworksHttpClient.setCertKey(credentials.getPassword());
HopsworksClient.getInstance().setHopsworksHttpClient(hopsworksHttpClient);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
import lombok.Getter;

import org.apache.avro.generic.GenericRecord;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.core.fs.Path;
Expand All @@ -45,8 +42,6 @@
import java.util.Map;
import java.util.Properties;

import static org.apache.flink.configuration.ConfigOptions.key;

public class FlinkEngine extends EngineBase {
private static FlinkEngine INSTANCE = null;

Expand All @@ -60,23 +55,6 @@ public static synchronized FlinkEngine getInstance() throws FeatureStoreExceptio
@Getter
private StreamExecutionEnvironment streamExecutionEnvironment;

private final Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
private final ConfigOption<String> keyStorePath =
key("flink.hadoop.hops.ssl.keystore.name")
.stringType()
.defaultValue("trustStore.jks")
.withDescription("path to keyStore.jks");
private final ConfigOption<String> trustStorePath =
key("flink.hadoop.hops.ssl.truststore.name")
.stringType()
.defaultValue("trustStore.jks")
.withDescription("path to trustStore.jks");
private final ConfigOption<String> materialPasswdPath =
key("flink.hadoop.hops.ssl.keystores.passwd.name")
.stringType()
.defaultValue("material_passwd")
.withDescription("path to material_passwd");

private FlinkEngine() throws FeatureStoreException {
streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure the streamExecutionEnvironment
Expand Down Expand Up @@ -148,16 +126,4 @@ public Map<String, String> getKafkaConfig(FeatureGroupBase featureGroup, Map<Str
config.put("enable.idempotence", "false");
return config;
}

public String getTrustStorePath() {
return flinkConfig.getString(trustStorePath);
}

public String getKeyStorePath() {
return flinkConfig.getString(keyStorePath);
}

public String getCertKey() {
return flinkConfig.getString(materialPasswdPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
import software.amazon.awssdk.regions.Region;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
Expand Down Expand Up @@ -118,21 +120,39 @@ public Credentials getCredentials() throws FeatureStoreException, IOException,
KeyStore ks = KeyStore.getInstance("JKS");
ks.load(new ByteArrayInputStream(Base64.getDecoder().decode(credentials.getkStore())),
credentials.getPassword().toCharArray());
String keyStorePath = System.getProperty("java.io.tmpdir") + "/keyStore.jks";
ks.store(new FileOutputStream(keyStorePath), credentials.getPassword().toCharArray());
File keyStore = createTempFile("keyStore.jks");
ks.store(new FileOutputStream(keyStore), credentials.getPassword().toCharArray());

KeyStore ts = KeyStore.getInstance("JKS");
ts.load(new ByteArrayInputStream(Base64.getDecoder().decode(credentials.gettStore())),
credentials.getPassword().toCharArray());
String trustStorePath = System.getProperty("java.io.tmpdir") + "/trustStore.jks";
ts.store(new FileOutputStream(trustStorePath), credentials.getPassword().toCharArray());
File trustStore = createTempFile("trustStore.jks");
ts.store(new FileOutputStream(trustStore), credentials.getPassword().toCharArray());

credentials.setkStore(keyStorePath);
credentials.settStore(trustStorePath);
credentials.setkStore(keyStore.getAbsolutePath());
credentials.settStore(trustStore.getAbsolutePath());

return credentials;
}

private File createTempFile(String fileName) throws FeatureStoreException {
HopsworksClient hopsworksClient = getInstance();

// Create a File object
File file = Paths.get(
System.getProperty("java.io.tmpdir"),
hopsworksClient.getProject().getProjectName(),
fileName).toFile();

// Ensure parent directories exist
File parentDir = file.getParentFile();
if (parentDir != null && !parentDir.exists()) {
parentDir.mkdirs();
}

return file;
}

@Getter
@Setter
protected HopsworksHttpClient hopsworksHttpClient;
Expand Down
5 changes: 5 additions & 0 deletions python/hsml/engine/model_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,11 @@ def update_download_progress(n_dirs, n_files, done=False):
projects_index = from_hdfs_model_path.find("/Projects", 0)
from_hdfs_model_path = from_hdfs_model_path[projects_index:]

if not self._dataset_api.path_exists(from_hdfs_model_path):
# if Files directory doesn't exist, download files from the model version
# directory for backwards compatibility with the old model file structure
from_hdfs_model_path = model_instance.version_path

self._download_model_from_hopsfs(
from_hdfs_model_path=from_hdfs_model_path,
to_local_path=local_path,
Expand Down

0 comments on commit 1d53353

Please sign in to comment.