Skip to content

Commit

Permalink
Check for DVRT compability on recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
elijahgrimaldi committed Nov 4, 2024
1 parent 8889ab3 commit 73bf831
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,6 @@ public DaVinciBackend(
}

if (backendConfig.isBlobTransferManagerEnabled()) {
if (recordTransformerFunction != null) {
throw new VeniceException("DaVinciRecordTransformer doesn't support blob transfer.");
}

blobTransferManager = BlobTransferUtil.getP2PBlobTransferManagerForDVCAndStart(
configLoader.getVeniceServerConfig().getDvcP2pBlobTransferServerPort(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.AbstractStorageIterator;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.StorageInitializationException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.serializer.AvroGenericDeserializer;
import com.linkedin.venice.serializer.AvroSerializer;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Optional;
import org.apache.avro.Schema;


Expand Down Expand Up @@ -74,24 +74,20 @@ public final ByteBuffer prependSchemaIdToHeader(ByteBuffer valueBytes, int schem
/**
* @return true if transformer logic has changed since the last time the class was loaded
*/
public boolean hasTransformerLogicChanged(int classHash) {
public boolean hasTransformerLogicChanged(AbstractStorageEngine storageEngine, int partitionId, int classHash) {
try {
String classHashPath = String.format("./classHash-%d.txt", recordTransformer.getStoreVersion());
File f = new File(classHashPath);
if (f.exists()) {
try (BufferedReader br = new BufferedReader(new FileReader(classHashPath))) {
int storedClassHash = Integer.parseInt(br.readLine());
if (storedClassHash == classHash) {
return false;
}
}
}

try (FileWriter fw = new FileWriter(classHashPath)) {
fw.write(String.valueOf(classHash));
Optional<OffsetRecord> offsetRecord = storageEngine.getPartitionOffset(partitionId);
Integer offsetRecordClassHash = offsetRecord.map(OffsetRecord::getTransformerClassHash).orElse(null);
if (!Objects.equals(offsetRecordClassHash, classHash)) {
offsetRecord.ifPresent(record -> {
record.setTransformerClassHash(classHash);
storageEngine.putPartitionOffset(partitionId, record);
});
return true;
} else {
return false;
}
return true;
} catch (IOException e) {
} catch (IllegalArgumentException | StorageInitializationException e) {
throw new VeniceException("Failed to check if transformation logic has changed", e);
}
}
Expand All @@ -101,18 +97,17 @@ public boolean hasTransformerLogicChanged(int classHash) {
*/
public final void onRecovery(
AbstractStorageEngine storageEngine,
Integer partition,
int partitionId,
Lazy<VeniceCompressor> compressor) {
// ToDo: Store class hash in RocksDB to support blob transfer
int classHash = recordTransformer.getClassHash();
boolean transformerLogicChanged = hasTransformerLogicChanged(classHash);
boolean transformerLogicChanged = hasTransformerLogicChanged(storageEngine, partitionId, classHash);

if (!recordTransformer.getStoreRecordsInDaVinci() || transformerLogicChanged) {
// Bootstrap from VT
storageEngine.clearPartitionOffset(partition);
storageEngine.clearPartitionOffset(partitionId);
} else {
// Bootstrap from local storage
AbstractStorageIterator iterator = storageEngine.getIterator(partition);
AbstractStorageIterator iterator = storageEngine.getIterator(partitionId);
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
byte[] keyBytes = iterator.key();
byte[] valueBytes = iterator.value();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.linkedin.davinci.transformer;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -18,8 +18,10 @@
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.AbstractStorageIterator;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.File;
import java.util.Optional;
import org.apache.avro.Schema;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -61,13 +63,22 @@ public void testRecordTransformer() {
recordTransformer.processDelete(lazyKey);

assertFalse(recordTransformer.getStoreRecordsInDaVinci());

AbstractStorageEngine storageEngine = mock(AbstractStorageEngine.class);
int classHash = recordTransformer.getClassHash();
int partitionNumber = 1;
OffsetRecord nullOffsetRecord = mock(OffsetRecord.class);
when(nullOffsetRecord.getTransformerClassHash()).thenReturn(null);

OffsetRecord matchingOffsetRecord = mock(OffsetRecord.class);
when(matchingOffsetRecord.getTransformerClassHash()).thenReturn(classHash);

DaVinciRecordTransformerUtility<Integer, String> recordTransformerUtility =
recordTransformer.getRecordTransformerUtility();
assertTrue(recordTransformerUtility.hasTransformerLogicChanged(classHash));
assertFalse(recordTransformerUtility.hasTransformerLogicChanged(classHash));
when(storageEngine.getPartitionOffset(partitionNumber)).thenReturn(Optional.of(nullOffsetRecord));
assertTrue(recordTransformerUtility.hasTransformerLogicChanged(storageEngine, partitionNumber, classHash));
verify(storageEngine, times(1)).putPartitionOffset(eq(partitionNumber), any(OffsetRecord.class));
when(storageEngine.getPartitionOffset(partitionNumber)).thenReturn(Optional.of(matchingOffsetRecord));
assertFalse(recordTransformerUtility.hasTransformerLogicChanged(storageEngine, partitionNumber, classHash));
}

@Test
Expand All @@ -86,15 +97,6 @@ public void testOnRecovery() {
int partitionNumber = 1;
recordTransformer.onRecovery(storageEngine, partitionNumber, compressor);
verify(storageEngine, times(1)).clearPartitionOffset(partitionNumber);

// Reset the mock to clear previous interactions
reset(storageEngine);

// Execute the onRecovery method again to test the case where the classHash file exists
when(storageEngine.getIterator(partitionNumber)).thenReturn(iterator);
recordTransformer.onRecovery(storageEngine, partitionNumber, compressor);
verify(storageEngine, never()).clearPartitionOffset(partitionNumber);
verify(storageEngine, times(1)).getIterator(partitionNumber);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,22 @@ CharSequence guidToUtf8(GUID guid) {
return new Utf8(GuidUtils.getCharSequenceFromGuid(guid));
}

public Integer getTransformerClassHash() {
return partitionState.getTransformerClassHash();
}

public void setTransformerClassHash(Integer classHash) {
this.partitionState.transformerClassHash = classHash;
}

@Override
public String toString() {
return "OffsetRecord{" + "localVersionTopicOffset=" + getLocalVersionTopicOffset() + ", upstreamOffset="
+ getPartitionUpstreamOffsetString() + ", leaderTopic=" + getLeaderTopic() + ", offsetLag=" + getOffsetLag()
+ ", eventTimeEpochMs=" + getMaxMessageTimeInMs() + ", latestProducerProcessingTimeInMs="
+ getLatestProducerProcessingTimeInMs() + ", isEndOfPushReceived=" + isEndOfPushReceived() + ", databaseInfo="
+ getDatabaseInfo() + ", realTimeProducerState=" + getRealTimeProducerState() + '}';
+ getDatabaseInfo() + ", realTimeProducerState=" + getRealTimeProducerState() + ", transformerClassHash="
+ getTransformerClassHash() + '}';
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@
"avro.java.string": "String"
},
"default": {}
},
{
"name": "transformerClassHash",
"doc": "An integer hash code of the DaVinci record transformer for compatibility checks during BLOB transfer",
"type": ["null", "int"],
"default": null
}
]
}

0 comments on commit 73bf831

Please sign in to comment.