From 3893b8298cb7abf40873732a6dd49aaf7e79e92c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20de=20la=20R=C3=BAa=20Mart=C3=ADnez?= Date: Fri, 29 Nov 2024 11:57:01 +0100 Subject: [PATCH 1/2] [HWORKS-1737][Append] Download model files from version dir if Files dir not found (#418) --- python/hsml/engine/model_engine.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/hsml/engine/model_engine.py b/python/hsml/engine/model_engine.py index d1f21365f..1cf0ba99f 100644 --- a/python/hsml/engine/model_engine.py +++ b/python/hsml/engine/model_engine.py @@ -455,6 +455,11 @@ def update_download_progress(n_dirs, n_files, done=False): projects_index = from_hdfs_model_path.find("/Projects", 0) from_hdfs_model_path = from_hdfs_model_path[projects_index:] + if not self._dataset_api.path_exists(from_hdfs_model_path): + # if Files directory doesn't exist, download files from the model version + # directory for backwards compatibility with the old model file structure + from_hdfs_model_path = model_instance.version_path + self._download_model_from_hopsfs( from_hdfs_model_path=from_hdfs_model_path, to_local_path=local_path, From db70457712033ac52141e2fe2b6edb25a65551a2 Mon Sep 17 00:00:00 2001 From: Ralf Date: Fri, 29 Nov 2024 15:04:14 +0200 Subject: [PATCH 2/2] [FSTORE-1619] Update Flink certificate management (#414) --- .../hsfs/flink/HopsworksConnection.java | 16 +++++---- .../hsfs/flink/engine/FlinkEngine.java | 34 ------------------- .../hsfs/metadata/HopsworksClient.java | 32 +++++++++++++---- 3 files changed, 36 insertions(+), 46 deletions(-) diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java index 3d8d71d0f..68195c77f 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/HopsworksConnection.java @@ -20,7 +20,7 @@ import com.logicalclocks.hsfs.FeatureStoreException; import com.logicalclocks.hsfs.HopsworksConnectionBase; import com.logicalclocks.hsfs.SecretStore; -import com.logicalclocks.hsfs.flink.engine.FlinkEngine; +import com.logicalclocks.hsfs.metadata.Credentials; import com.logicalclocks.hsfs.metadata.HopsworksClient; import com.logicalclocks.hsfs.metadata.HopsworksHttpClient; @@ -30,6 +30,9 @@ import software.amazon.awssdk.regions.Region; import java.io.IOException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; public class HopsworksConnection extends HopsworksConnectionBase { @@ -37,7 +40,7 @@ public class HopsworksConnection extends HopsworksConnectionBase { public HopsworksConnection(String host, int port, String project, Region region, SecretStore secretStore, boolean hostnameVerification, String trustStorePath, String certPath, String apiKeyFilePath, String apiKeyValue) - throws IOException, FeatureStoreException { + throws IOException, FeatureStoreException, KeyStoreException, CertificateException, NoSuchAlgorithmException { this.host = host; this.port = port; this.project = getProjectName(project); @@ -54,10 +57,11 @@ public HopsworksConnection(String host, int port, String project, Region region, this.projectObj = getProject(); HopsworksClient.getInstance().setProject(this.projectObj); if (!System.getProperties().containsKey(HopsworksInternalClient.REST_ENDPOINT_SYS)) { - HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient(); - hopsworksHttpClient.setTrustStorePath(FlinkEngine.getInstance().getTrustStorePath()); - hopsworksHttpClient.setKeyStorePath(FlinkEngine.getInstance().getKeyStorePath()); - hopsworksHttpClient.setCertKey(HopsworksHttpClient.readCertKey(FlinkEngine.getInstance().getCertKey())); + Credentials credentials = HopsworksClient.getInstance().getCredentials(); + HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient(); + hopsworksHttpClient.setTrustStorePath(credentials.gettStore()); + hopsworksHttpClient.setKeyStorePath(credentials.getkStore()); + hopsworksHttpClient.setCertKey(credentials.getPassword()); HopsworksClient.getInstance().setHopsworksHttpClient(hopsworksHttpClient); } } diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java index 9e0645e96..aa6b07537 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java @@ -28,9 +28,6 @@ import lombok.Getter; import org.apache.avro.generic.GenericRecord; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.core.fs.Path; @@ -45,8 +42,6 @@ import java.util.Map; import java.util.Properties; -import static org.apache.flink.configuration.ConfigOptions.key; - public class FlinkEngine extends EngineBase { private static FlinkEngine INSTANCE = null; @@ -60,23 +55,6 @@ public static synchronized FlinkEngine getInstance() throws FeatureStoreExceptio @Getter private StreamExecutionEnvironment streamExecutionEnvironment; - private final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(); - private final ConfigOption keyStorePath = - key("flink.hadoop.hops.ssl.keystore.name") - .stringType() - .defaultValue("trustStore.jks") - .withDescription("path to keyStore.jks"); - private final ConfigOption trustStorePath = - key("flink.hadoop.hops.ssl.truststore.name") - .stringType() - .defaultValue("trustStore.jks") - .withDescription("path to trustStore.jks"); - private final ConfigOption materialPasswdPath = - key("flink.hadoop.hops.ssl.keystores.passwd.name") - .stringType() - .defaultValue("material_passwd") - .withDescription("path to material_passwd"); - private FlinkEngine() throws FeatureStoreException { streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); // Configure the streamExecutionEnvironment @@ -148,16 +126,4 @@ public Map getKafkaConfig(FeatureGroupBase featureGroup, Map