Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
anton-vinogradov committed Jul 29, 2024
1 parent f81cde2 commit 1798ee4
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,18 @@ public void testResendConflictVersion() throws Exception {
assertTrue(GridTestUtils.waitForCondition(() -> cnsmr.events().size() == 3, 10_000, 100));

CdcEvent ev0 = cnsmr.events().get(0);
assertEquals(0, ev0.unwrappedKey());
assertEquals(0, ev0.unwrappedValue());
assertEquals(0, ev0.key());
assertEquals(0, ev0.value());
assertNull(ev0.version().otherClusterVersion());

CdcEvent ev1 = cnsmr.events().get(1);
assertEquals(0, ev1.unwrappedKey());
assertEquals(1, ev1.unwrappedValue());
assertEquals(0, ev1.key());
assertEquals(1, ev1.value());
assertEquals(conflict, ev1.version().otherClusterVersion());

CdcEvent ev2 = cnsmr.events().get(2);
assertEquals(0, ev2.unwrappedKey());
assertEquals(1, ev2.unwrappedValue());
assertEquals(0, ev2.key());
assertEquals(1, ev2.value());
assertNull(ev2.version().otherClusterVersion());
}

Expand Down
12 changes: 6 additions & 6 deletions modules/core/src/main/java/org/apache/ignite/cdc/CdcEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,32 @@ public interface CdcEvent extends Serializable {
/**
* @return Key for the changed entry.
*/
public KeyCacheObject key();
public KeyCacheObject keyCacheObject();

/**
* @return Value for the changed entry or {@code null} in case of entry removal.
*/
@Nullable public CacheObject value();
@Nullable public CacheObject valueCacheObject();

/**
* @return Previous entry state metadata if expected.
*/
@Nullable public CacheObject previousStateMetadata();
@Nullable public CacheObject previousStateMetadataCacheObject();

/**
* @return Key which was placed into cache. Or null if failed to convert.
*/
public Object unwrappedKey();
public Object key();

/**
* @return Value which was placed into cache. Or null for delete operation or for failure.
*/
public Object unwrappedValue();
public Object value();

/**
* @return Previous entry state metadata.
*/
public Object unwrappedPreviousStateMetadata();
public Object previousStateMetadata();

/**
* @return {@code True} if event fired on primary node for partition containing this entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,32 @@ public CdcEventImpl(DataEntry entry) {
}

/** {@inheritDoc} */
@Override public Object unwrappedKey() {
@Override public Object key() {
return ((UnwrapDataEntry)(entry)).unwrappedKey();
}

/** {@inheritDoc} */
@Override public Object unwrappedValue() {
@Override public Object value() {
return ((UnwrapDataEntry)(entry)).unwrappedValue();
}

/** {@inheritDoc} */
@Override public Object unwrappedPreviousStateMetadata() {
@Override public Object previousStateMetadata() {
return ((UnwrapDataEntry)(entry)).unwrappedPreviousStateMetadata();
}

/** {@inheritDoc} */
@Override public KeyCacheObject key() {
@Override public KeyCacheObject keyCacheObject() {
return entry.key();
}

/** {@inheritDoc} */
@Override public CacheObject value() {
@Override public CacheObject valueCacheObject() {
return entry.value();
}

/** {@inheritDoc} */
@Override public @Nullable CacheObject previousStateMetadata() {
@Override public @Nullable CacheObject previousStateMetadataCacheObject() {
return entry.previousStateMetadata();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public abstract static class TestCdcConsumer<T> implements CdcConsumer {
return;

data.computeIfAbsent(
F.t(evt.value() == null ? DELETE : UPDATE, evt.cacheId()),
F.t(evt.valueCacheObject() == null ? DELETE : UPDATE, evt.cacheId()),
k -> new ArrayList<>()).add(extract(evt));

assertTrue(caches.containsKey(evt.cacheId()));
Expand Down Expand Up @@ -375,18 +375,18 @@ public static class UserCdcConsumer extends TestCdcConsumer<Integer> {
assertTrue(userTypeFound);
assertNull(evt.version().otherClusterVersion());

if (evt.value() == null)
if (evt.valueCacheObject() == null)
return;

User user = (User)evt.unwrappedValue();
User user = (User)evt.value();

assertTrue(user.getName().startsWith(JOHN));
assertTrue(user.getAge() >= 42);
}

/** {@inheritDoc} */
@Override public Integer extract(CdcEvent evt) {
return (Integer)evt.unwrappedKey();
return (Integer)evt.key();
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public void testPreviousStateMetadataWritten() throws Exception {
try {
readAll(new UserCdcConsumer() {
@Override public void checkEvent(CdcEvent evt) {
assertEquals(evt.unwrappedKey(), evt.unwrappedPreviousStateMetadata());
assertEquals(evt.key(), evt.previousStateMetadata());
}
}, true);
}
Expand Down Expand Up @@ -292,9 +292,9 @@ public void testReadExpireTime() throws Exception {
@Override public void checkEvent(CdcEvent evt) {
super.checkEvent(evt);

Integer key = (Integer)evt.unwrappedKey();
Integer key = (Integer)evt.key();

if (evt.value() == null || key % 2 != 0) {
if (evt.valueCacheObject() == null || key % 2 != 0) {
assertEquals("Expire time must not be set [key=" + key + ']', CU.EXPIRE_TIME_ETERNAL, evt.expireTime());

return;
Expand Down Expand Up @@ -443,7 +443,7 @@ public void testReadOneByOneForBackup() throws Exception {
if (!firstEvt.get())
throw new RuntimeException("Expected fail.");

data.add((Integer)evts.next().unwrappedKey());
data.add((Integer)evts.next().key());

firstEvt.set(false);

Expand Down Expand Up @@ -527,7 +527,7 @@ public void testReadFromNextEntry() throws Exception {

CdcEvent evt = evts.next();

assertEquals(expKey.get(), evt.unwrappedKey());
assertEquals(expKey.get(), evt.key());

expKey.incrementAndGet();

Expand Down Expand Up @@ -819,8 +819,8 @@ public void testReReadWhenStateWasNotStored() throws Exception {
continue;

data.computeIfAbsent(
F.t(evt.value() == null ? DELETE : UPDATE, evt.cacheId()),
k -> new ArrayList<>()).add((Integer)evt.unwrappedKey()
F.t(evt.valueCacheObject() == null ? DELETE : UPDATE, evt.cacheId()),
k -> new ArrayList<>()).add((Integer)evt.key()
);

if (consumeHalf.get())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,26 +229,26 @@ public static class BinaryCdcConsumer extends TestCdcConsumer<CdcEvent> {
assertTrue(userValType);
assertTrue(cityValType);

if (evt.value() == null)
if (evt.valueCacheObject() == null)
return;

if (evt.cacheId() == cacheId(USER)) {
int id = ((BinaryObject)evt.unwrappedKey()).field(ID);
int cityId = ((BinaryObject)evt.unwrappedKey()).field(CITY_ID);
int id = ((BinaryObject)evt.key()).field(ID);
int cityId = ((BinaryObject)evt.key()).field(CITY_ID);

assertEquals(42 * id, cityId);

String name = ((BinaryObject)evt.unwrappedValue()).field(NAME);
String name = ((BinaryObject)evt.value()).field(NAME);

if (id % 2 == 0)
assertTrue(name.startsWith(JOHN));
else
assertTrue(name.startsWith(SARAH));
}
else {
int id = (Integer)evt.unwrappedKey();
String name = ((BinaryObject)evt.unwrappedValue()).field(NAME);
String zipCode = ((BinaryObject)evt.unwrappedValue()).field(ZIP_CODE);
int id = (Integer)evt.key();
String name = ((BinaryObject)evt.value()).field(NAME);
String zipCode = ((BinaryObject)evt.value()).field(ZIP_CODE);

assertEquals(Integer.toString(127000 + id), zipCode);

Expand Down

0 comments on commit 1798ee4

Please sign in to comment.