Skip to content

Commit

Permalink
[FSTORE-1628][APPEND] dont instantiate hudi when not necessary (#437)
Browse files Browse the repository at this point in the history
Co-authored-by: Fabio Buso <fabio@hopsworks.ai>
Co-authored-by: Fabio Buso <dev.siroibaf@gmail.com>
  • Loading branch information
3 people authored Dec 16, 2024
1 parent d937310 commit 9ecb873
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@

public class FeatureGroupEngine extends FeatureGroupEngineBase {

private HudiEngine hudiEngine = new HudiEngine();

/**
* Create the metadata and write the data to the online/offline feature store.
*
Expand Down Expand Up @@ -457,8 +455,8 @@ public FeatureGroupCommit commitDelete(FeatureGroupBase featureGroupBase, Datase
throw new FeatureStoreException("delete function is only valid for "
+ "time travel enabled feature group");
}
return hudiEngine.deleteRecord(SparkEngine.getInstance().getSparkSession(), featureGroupBase, genericDataset,
writeOptions);
return HudiEngine.getInstance().deleteRecord(SparkEngine.getInstance().getSparkSession(), featureGroupBase,
genericDataset, writeOptions);
}

public ExternalFeatureGroup saveExternalFeatureGroup(ExternalFeatureGroup externalFeatureGroup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ public static void setInstance(SparkEngine sparkEngine) {
@Getter
private SparkSession sparkSession;

private HudiEngine hudiEngine = new HudiEngine();

private SparkEngine() {
sparkSession = SparkSession.builder()
.enableHiveSupport()
Expand Down Expand Up @@ -247,7 +245,7 @@ private Map<String, String> getOnDemandOptions(ExternalFeatureGroup externalFeat

public void registerHudiTemporaryTable(FeatureGroupAlias featureGroupAlias, Map<String, String> readOptions)
throws FeatureStoreException {
Map<String, String> hudiArgs = hudiEngine.setupHudiReadOpts(
Map<String, String> hudiArgs = HudiEngine.getInstance().setupHudiReadOpts(
featureGroupAlias.getLeftFeatureGroupStartTimestamp(),
featureGroupAlias.getLeftFeatureGroupEndTimestamp(),
readOptions);
Expand All @@ -258,7 +256,7 @@ public void registerHudiTemporaryTable(FeatureGroupAlias featureGroupAlias, Map<
.load(featureGroupAlias.getFeatureGroup().getLocation())
.createOrReplaceTempView(featureGroupAlias.getAlias());

hudiEngine.reconcileHudiSchema(sparkSession, featureGroupAlias, hudiArgs);
HudiEngine.getInstance().reconcileHudiSchema(sparkSession, featureGroupAlias, hudiArgs);
}

/**
Expand Down Expand Up @@ -661,7 +659,8 @@ public void writeOfflineDataframe(FeatureGroupBase featureGroup, Dataset<Row> da
throws IOException, FeatureStoreException, ParseException {

if (featureGroup.getTimeTravelFormat() == TimeTravelFormat.HUDI) {
hudiEngine.saveHudiFeatureGroup(sparkSession, featureGroup, dataset, operation, writeOptions, validationId);
HudiEngine.getInstance().saveHudiFeatureGroup(sparkSession, featureGroup, dataset, operation,
writeOptions, validationId);
} else {
writeSparkDataset(featureGroup, dataset, writeOptions);
}
Expand Down Expand Up @@ -790,7 +789,7 @@ private void setupAdlsConnectorHadoopConf(StorageConnector.AdlsConnector storage
public void streamToHudiTable(StreamFeatureGroup streamFeatureGroup, Map<String, String> writeOptions)
throws Exception {
writeOptions = getKafkaConfig(streamFeatureGroup, writeOptions);
hudiEngine.streamToHoodieTable(sparkSession, streamFeatureGroup, writeOptions);
HudiEngine.getInstance().streamToHoodieTable(sparkSession, streamFeatureGroup, writeOptions);
}

public List<Feature> parseFeatureGroupSchema(Dataset<Row> dataset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
import com.logicalclocks.hsfs.metadata.FeatureGroupApi;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.KafkaApi;

import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import com.logicalclocks.hsfs.spark.FeatureGroup;
import com.logicalclocks.hsfs.spark.FeatureStore;
import com.logicalclocks.hsfs.spark.StreamFeatureGroup;
Expand Down Expand Up @@ -146,8 +144,19 @@ public class HudiEngine {
private FeatureGroupApi featureGroupApi = new FeatureGroupApi();
private FeatureGroupCommit fgCommitMetadata = new FeatureGroupCommit();
private DeltaStreamerConfig deltaStreamerConfig = new DeltaStreamerConfig();
private KafkaApi kafkaApi = new KafkaApi();
private StorageConnectorApi storageConnectorApi = new StorageConnectorApi();

protected static HudiEngine INSTANCE = null;

public static synchronized HudiEngine getInstance() {
if (INSTANCE == null) {
INSTANCE = new HudiEngine();
}
return INSTANCE;
}

// To make sure everyone uses getInstance
private HudiEngine() {
}

public void saveHudiFeatureGroup(SparkSession sparkSession, FeatureGroupBase featureGroup,
Dataset<Row> dataset, HudiOperationType operation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void testSparkSchemasMatch() throws Exception {
.add("name", "string")
.add("age", "int");

HudiEngine engine = new HudiEngine();
HudiEngine engine = HudiEngine.getInstance();
Assertions.assertTrue(engine.sparkSchemasMatch(schema1.fieldNames(), schema2.fieldNames()));
}

Expand All @@ -49,7 +49,7 @@ void testSparkSchemasMatchFeatureMissing() throws Exception {
.add("id", "int")
.add("name", "string");

HudiEngine engine = new HudiEngine();
HudiEngine engine = HudiEngine.getInstance();
Assertions.assertFalse(engine.sparkSchemasMatch(schema1.fieldNames(), schema2.fieldNames()));
}

Expand All @@ -64,7 +64,7 @@ void testSparkSchemasMatchDifferentOrder() throws Exception {
.add("age", "int")
.add("name", "string");

HudiEngine engine = new HudiEngine();
HudiEngine engine = HudiEngine.getInstance();
Assertions.assertTrue(engine.sparkSchemasMatch(schema1.fieldNames(), schema2.fieldNames()));
}

Expand Down

0 comments on commit 9ecb873

Please sign in to comment.