diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/FeatureGroupEngine.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/FeatureGroupEngine.java index f791d8bcd..3fb73bd7c 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/FeatureGroupEngine.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/FeatureGroupEngine.java @@ -52,8 +52,6 @@ public class FeatureGroupEngine extends FeatureGroupEngineBase { - private HudiEngine hudiEngine = new HudiEngine(); - /** * Create the metadata and write the data to the online/offline feature store. * @@ -457,8 +455,8 @@ public FeatureGroupCommit commitDelete(FeatureGroupBase featureGroupBase, Datase throw new FeatureStoreException("delete function is only valid for " + "time travel enabled feature group"); } - return hudiEngine.deleteRecord(SparkEngine.getInstance().getSparkSession(), featureGroupBase, genericDataset, - writeOptions); + return HudiEngine.getInstance().deleteRecord(SparkEngine.getInstance().getSparkSession(), featureGroupBase, + genericDataset, writeOptions); } public ExternalFeatureGroup saveExternalFeatureGroup(ExternalFeatureGroup externalFeatureGroup) diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java index 1aed9a650..7c784d8d8 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java @@ -142,8 +142,6 @@ public static void setInstance(SparkEngine sparkEngine) { @Getter private SparkSession sparkSession; - private HudiEngine hudiEngine = new HudiEngine(); - private SparkEngine() { sparkSession = SparkSession.builder() .enableHiveSupport() @@ -247,7 +245,7 @@ private Map getOnDemandOptions(ExternalFeatureGroup externalFeat public void registerHudiTemporaryTable(FeatureGroupAlias featureGroupAlias, Map readOptions) throws FeatureStoreException { - Map hudiArgs = hudiEngine.setupHudiReadOpts( + Map hudiArgs = HudiEngine.getInstance().setupHudiReadOpts( featureGroupAlias.getLeftFeatureGroupStartTimestamp(), featureGroupAlias.getLeftFeatureGroupEndTimestamp(), readOptions); @@ -258,7 +256,7 @@ public void registerHudiTemporaryTable(FeatureGroupAlias featureGroupAlias, Map< .load(featureGroupAlias.getFeatureGroup().getLocation()) .createOrReplaceTempView(featureGroupAlias.getAlias()); - hudiEngine.reconcileHudiSchema(sparkSession, featureGroupAlias, hudiArgs); + HudiEngine.getInstance().reconcileHudiSchema(sparkSession, featureGroupAlias, hudiArgs); } /** @@ -661,7 +659,8 @@ public void writeOfflineDataframe(FeatureGroupBase featureGroup, Dataset da throws IOException, FeatureStoreException, ParseException { if (featureGroup.getTimeTravelFormat() == TimeTravelFormat.HUDI) { - hudiEngine.saveHudiFeatureGroup(sparkSession, featureGroup, dataset, operation, writeOptions, validationId); + HudiEngine.getInstance().saveHudiFeatureGroup(sparkSession, featureGroup, dataset, operation, + writeOptions, validationId); } else { writeSparkDataset(featureGroup, dataset, writeOptions); } @@ -790,7 +789,7 @@ private void setupAdlsConnectorHadoopConf(StorageConnector.AdlsConnector storage public void streamToHudiTable(StreamFeatureGroup streamFeatureGroup, Map writeOptions) throws Exception { writeOptions = getKafkaConfig(streamFeatureGroup, writeOptions); - hudiEngine.streamToHoodieTable(sparkSession, streamFeatureGroup, writeOptions); + HudiEngine.getInstance().streamToHoodieTable(sparkSession, streamFeatureGroup, writeOptions); } public List parseFeatureGroupSchema(Dataset dataset, diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/HudiEngine.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/HudiEngine.java index ee4a19c2b..224a66a84 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/HudiEngine.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/HudiEngine.java @@ -25,9 +25,7 @@ import com.logicalclocks.hsfs.engine.FeatureGroupUtils; import com.logicalclocks.hsfs.metadata.FeatureGroupApi; import com.logicalclocks.hsfs.FeatureGroupBase; -import com.logicalclocks.hsfs.metadata.KafkaApi; -import com.logicalclocks.hsfs.metadata.StorageConnectorApi; import com.logicalclocks.hsfs.spark.FeatureGroup; import com.logicalclocks.hsfs.spark.FeatureStore; import com.logicalclocks.hsfs.spark.StreamFeatureGroup; @@ -146,8 +144,19 @@ public class HudiEngine { private FeatureGroupApi featureGroupApi = new FeatureGroupApi(); private FeatureGroupCommit fgCommitMetadata = new FeatureGroupCommit(); private DeltaStreamerConfig deltaStreamerConfig = new DeltaStreamerConfig(); - private KafkaApi kafkaApi = new KafkaApi(); - private StorageConnectorApi storageConnectorApi = new StorageConnectorApi(); + + protected static HudiEngine INSTANCE = null; + + public static synchronized HudiEngine getInstance() { + if (INSTANCE == null) { + INSTANCE = new HudiEngine(); + } + return INSTANCE; + } + + // To make sure everyone uses getInstance + private HudiEngine() { + } public void saveHudiFeatureGroup(SparkSession sparkSession, FeatureGroupBase featureGroup, Dataset dataset, HudiOperationType operation, diff --git a/java/spark/src/test/java/com/logicalclocks/hsfs/spark/engine/TestHudiEngine.java b/java/spark/src/test/java/com/logicalclocks/hsfs/spark/engine/TestHudiEngine.java index f3f09efb5..8659b552b 100644 --- a/java/spark/src/test/java/com/logicalclocks/hsfs/spark/engine/TestHudiEngine.java +++ b/java/spark/src/test/java/com/logicalclocks/hsfs/spark/engine/TestHudiEngine.java @@ -35,7 +35,7 @@ void testSparkSchemasMatch() throws Exception { .add("name", "string") .add("age", "int"); - HudiEngine engine = new HudiEngine(); + HudiEngine engine = HudiEngine.getInstance(); Assertions.assertTrue(engine.sparkSchemasMatch(schema1.fieldNames(), schema2.fieldNames())); } @@ -49,7 +49,7 @@ void testSparkSchemasMatchFeatureMissing() throws Exception { .add("id", "int") .add("name", "string"); - HudiEngine engine = new HudiEngine(); + HudiEngine engine = HudiEngine.getInstance(); Assertions.assertFalse(engine.sparkSchemasMatch(schema1.fieldNames(), schema2.fieldNames())); } @@ -64,7 +64,7 @@ void testSparkSchemasMatchDifferentOrder() throws Exception { .add("age", "int") .add("name", "string"); - HudiEngine engine = new HudiEngine(); + HudiEngine engine = HudiEngine.getInstance(); Assertions.assertTrue(engine.sparkSchemasMatch(schema1.fieldNames(), schema2.fieldNames())); }