Skip to content

Commit

Permalink
[common][consistency][specs] Add in diff comparison utility functions…
Browse files Browse the repository at this point in the history
… and leap frog specs (#1277)

This code has been committed in internal repos previously, and has been part of unsubmitted PR's in the past, but now we're finally getting around to using it in main OS.  So committing this part of the code as we advance on that project.
  • Loading branch information
ZacAttack authored Nov 4, 2024
1 parent af00080 commit e4fac90
Show file tree
Hide file tree
Showing 5 changed files with 746 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.linkedin.venice.utils.consistency;

import java.util.List;


public final class DiffValidationUtils {
private DiffValidationUtils() {
// not called
}

/**
* This method determines if a given record in a snapshot has been lost. It determines this by looking at the records
* ReplicationMetadata and comparing it to the highwatermark of the colo which did not have this record. If the colo
* which does not have this record has advanced beyond the offset vector held by the individual record, then we know that
* this record has been passed by.
*
* This function should not be used on any record which holds a tombstone (as the colo which is farther ahead will have
* purged out a record which currently holds a tombstone).
* @param existingValueOffsetVector
* @param nonExistentValuePartitionOffsetWatermark
* @return True if the record seems to be missing.
*/
static public boolean isRecordMissing(
List<Long> existingValueOffsetVector,
List<Long> nonExistentValuePartitionOffsetWatermark) {
if (!hasOffsetAdvanced(existingValueOffsetVector, nonExistentValuePartitionOffsetWatermark)) {
return false;
}
return true;
}

/**
* This method determines that if given the values for two equal keys in a snapshot of venice data, have the records diverged.
* It does this, by determining if each colo has received all data which is pertinent to coming to the current resolution
* held by each snapshot. In other words, does each colo have all the same information to come to the same conclusion,
* and do these conclusions diverge?
*
* This method DOES NOT catch divergence for 'missing' records. Missing records should be determined with {
* @link #isRecordMissing(List, List)}
*
* @param firstValueChecksum A checksum or hash that represents the value of the first record in this comparison.
* The firstValueChecksum should correspond to the secondValueChecksum with the same key.
* @param secondValueChecksum A checksum or hash that represents the value of the second record in this comparison.
* The secondValueChecksum should correspond to the firstValueChecksum with the same key.
* @param firstValuePartitionOffsetWatermark A list of offsets which give the highwatermark of remote consumption for the
* snapshot of the partition which held the firstValueChecksum. The list
* should be ordered with offsets that correspond to colo id's exactly as it's
* presented in the ReplicationMetadata of venice records.
* @param secondValuePartitionOffsetWatermark A list of offsets which give the highwatermark of remote consumption for the
* snapshot of the partition which held the secondValueChecksum. The list
* should be ordered with offsets that correspond to colo id's exactly as it's
* presented in the ReplicationMetadata of venice records.
* @param firstValueOffsetVector A list of offsets pulled from the ReplicationMetadata of the first record
* @param secondValueOffsetVector A list of offsets pulled from the ReplicationMetadata of the second record
* @return True if the data seems to have diverged
*/
static public boolean doRecordsDiverge(
String firstValueChecksum,
String secondValueChecksum,
List<Long> firstValuePartitionOffsetWatermark,
List<Long> secondValuePartitionOffsetWatermark,
List<Long> firstValueOffsetVector,
List<Long> secondValueOffsetVector) {

// Sanity check, it's possible metadata wasn't emitted, so give this a pass if any of these are blank
if (firstValuePartitionOffsetWatermark.isEmpty() || secondValuePartitionOffsetWatermark.isEmpty()
|| firstValueOffsetVector.isEmpty() || secondValueOffsetVector.isEmpty()) {
return false;
}

// If the values are the same, then these records do not diverge
if (firstValueChecksum.equals(secondValueChecksum)) {
return false;
}

// First, we need to determine if enough information has been broad casted. That is, the PartitionOffsetWatermarks
// for both values need to be greater then all component vector parts of the individual value offsets. E.g.:
// all entries in secondValuePartitionOffsetWatermark must be greater then all entries in firstValueOffsetVector and
// all the entries in the firstValuePartitionOffsetWatermark must be greater then the secondValueOffsetVector.
// we need not compare secondValueOffsetVector to secondValuePartitionOffsetWatermark
if (!hasOffsetAdvanced(firstValueOffsetVector, secondValuePartitionOffsetWatermark)) {
return false;
}

if (!hasOffsetAdvanced(secondValueOffsetVector, firstValuePartitionOffsetWatermark)) {
return false;
}

// At this time we know the following
// 1) the values are different
// 2) Both colos have received enough information to have seen all the pertinent events that caused
// the rows to converge in this way.
// These records should have converged, but have not. Therefore, they have diverged!

return true;
}

/**
* Checks to see if an offset vector has advanced completely beyond some base offset vector or not.
*
* @param baseOffset The vector to compare against.
* @param advancedOffset The vector has should be advanced along.
* @return True if the advancedOffset vector has grown beyond the baseOffset
*/
static public boolean hasOffsetAdvanced(List<Long> baseOffset, List<Long> advancedOffset) {
if (baseOffset.size() > advancedOffset.size()) {
// the baseoffset has more entries then the advanced one, meaning that it's seen entries from more colos
// meaning that it's automatically further along then the second argument. We break early to avoid any
// array out of bounds exception
return false;
}
for (int i = 0; i < baseOffset.size(); i++) {
if (advancedOffset.get(i) < baseOffset.get(i)) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package com.linkedin.venice.utils.consistency;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.testng.Assert;
import org.testng.annotations.Test;


public class DiffValidationUtilsTest {
@Test
public void testDoRecordsDiverge() {
List<Long> firstValueOffsetRecord = new ArrayList<>();
List<Long> secondValueOffsetRecord = new ArrayList<>();
List<Long> firstPartitionHighWaterMark = new ArrayList<>();
List<Long> secondPartitionHighWatermark = new ArrayList<>();

// metadata isn't populated in both colo's
Assert.assertFalse(
DiffValidationUtils.doRecordsDiverge(
"foo",
"bar",
Collections.EMPTY_LIST,
Collections.EMPTY_LIST,
Collections.EMPTY_LIST,
Collections.EMPTY_LIST));

// metadata isn't populated in first colo
Collections.addAll(secondPartitionHighWatermark, 10L, 20L, 1500L);
Collections.addAll(secondValueOffsetRecord, 3L, 0L);
Assert.assertFalse(
DiffValidationUtils.doRecordsDiverge(
"foo",
"bar",
Collections.EMPTY_LIST,
secondPartitionHighWatermark,
Collections.EMPTY_LIST,
secondValueOffsetRecord));

// metadata isn't populated in second colo
Collections.addAll(firstPartitionHighWaterMark, 10L, 20L, 1500L);
Collections.addAll(firstValueOffsetRecord, 3L, 0L);
Assert.assertFalse(
DiffValidationUtils.doRecordsDiverge(
"foo",
"bar",
firstPartitionHighWaterMark,
Collections.EMPTY_LIST,
firstValueOffsetRecord,
Collections.EMPTY_LIST));

// values are the same
Assert.assertFalse(
DiffValidationUtils.doRecordsDiverge(
"foo",
"foo",
firstPartitionHighWaterMark,
secondPartitionHighWatermark,
firstValueOffsetRecord,
secondValueOffsetRecord));

// Clean up
firstPartitionHighWaterMark.clear();
firstValueOffsetRecord.clear();
secondPartitionHighWatermark.clear();
firstPartitionHighWaterMark.clear();

// first colo is ahead completely
Collections.addAll(firstPartitionHighWaterMark, 20L, 40L, 1600L);
Collections.addAll(firstValueOffsetRecord, 20L, 40L);
Collections.addAll(secondPartitionHighWatermark, 10L, 20L, 1500L);
Collections.addAll(secondValueOffsetRecord, 3L, 0L);
Assert.assertFalse(
DiffValidationUtils.doRecordsDiverge(
"foo",
"bar",
firstPartitionHighWaterMark,
secondPartitionHighWatermark,
firstValueOffsetRecord,
secondValueOffsetRecord));
firstPartitionHighWaterMark.clear();
firstValueOffsetRecord.clear();
secondPartitionHighWatermark.clear();
secondValueOffsetRecord.clear();

// second colo is ahead completely
Collections.addAll(firstPartitionHighWaterMark, 10L, 20L, 1500L);
Collections.addAll(firstValueOffsetRecord, 3L, 0L);
Collections.addAll(secondPartitionHighWatermark, 20L, 40L, 1600L);
Collections.addAll(secondValueOffsetRecord, 20L, 39L);
Assert.assertFalse(
DiffValidationUtils.doRecordsDiverge(
"foo",
"bar",
firstPartitionHighWaterMark,
secondPartitionHighWatermark,
firstValueOffsetRecord,
secondValueOffsetRecord));
firstPartitionHighWaterMark.clear();
firstValueOffsetRecord.clear();
secondPartitionHighWatermark.clear();
secondValueOffsetRecord.clear();

// fist colo has a lagging colo
Collections.addAll(firstPartitionHighWaterMark, 10L, 20L, 1500L);
Collections.addAll(firstValueOffsetRecord, 3L, 0L);
Collections.addAll(secondPartitionHighWatermark, 10L, 40L, 1500L);
Collections.addAll(secondValueOffsetRecord, 10L, 25L);
Assert.assertFalse(
DiffValidationUtils.doRecordsDiverge(
"foo",
"bar",
firstPartitionHighWaterMark,
secondPartitionHighWatermark,
firstValueOffsetRecord,
secondValueOffsetRecord));
firstPartitionHighWaterMark.clear();
firstValueOffsetRecord.clear();
secondPartitionHighWatermark.clear();
secondValueOffsetRecord.clear();

// second colo has a lagging colo
Collections.addAll(firstPartitionHighWaterMark, 10L, 40L, 1500L);
Collections.addAll(firstValueOffsetRecord, 3L, 25L);
Collections.addAll(secondPartitionHighWatermark, 10L, 20L, 1500L);
Collections.addAll(secondValueOffsetRecord, 10L, 19L);
Assert.assertFalse(
DiffValidationUtils.doRecordsDiverge(
"foo",
"bar",
firstPartitionHighWaterMark,
secondPartitionHighWatermark,
firstValueOffsetRecord,
secondValueOffsetRecord));
firstPartitionHighWaterMark.clear();
firstValueOffsetRecord.clear();
secondPartitionHighWatermark.clear();
secondValueOffsetRecord.clear();

// records diverge
Collections.addAll(firstPartitionHighWaterMark, 10L, 40L, 1505L);
Collections.addAll(firstValueOffsetRecord, 3L, 25L);
Collections.addAll(secondPartitionHighWatermark, 10L, 40L, 1500L);
Collections.addAll(secondValueOffsetRecord, 10L, 19L);
Assert.assertTrue(
DiffValidationUtils.doRecordsDiverge(
"foo",
"bar",
firstPartitionHighWaterMark,
secondPartitionHighWatermark,
firstValueOffsetRecord,
secondValueOffsetRecord));

}
}
14 changes: 14 additions & 0 deletions specs/TLA+/LeapFrog/MCleapfrog.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
SPECIFICATION Spec
CONSTRAINTS
TerminateComparison
NoSuccessiveControlMessages
CONSTANTS
COLOS = {"lor1", "ltx1"}
KEYS = {"key1", "key2"}
VALUES = {"squirrel", "elephant"}
MAX_WRITES = 3

INVARIANT MaxDiameter
PROPERTIES
Safe
Live
17 changes: 17 additions & 0 deletions specs/TLA+/LeapFrog/MCleapfrog.tla
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
----------- MODULE MCleapfrog ----
EXTENDS TLC, leapfrog

(* Do not explore states that have more writes then whats been configured *)
TerminateComparison ==
\/ WRITES <= MAX_WRITES

(* Do not explore states where we're just running comparisons infinitely *)
NoSuccessiveControlMessages ==
/\ \A i,j \in 1..Len(WALs.coloA):
(j = i + 1 /\ WALs.coloA[i].key = "controlKey") => WALs.coloA[j].key # WALs.coloA[i].key
/\ \A i,j \in 1..Len(WALs.coloB):
(j = i + 1 /\ WALs.coloB[i].key = "controlKey") => WALs.coloB[j].key # WALs.coloB[i].key

(* INVARIANT meant to police state explosion (possible bug) *)
MaxDiameter == TLCGet("level") < 50
====
Loading

0 comments on commit e4fac90

Please sign in to comment.