From 8c8c919220dd910a6891dfd0795cef63b56974b6 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 30 Oct 2023 12:38:13 +0300 Subject: [PATCH] IGNITE-20746 Cache objects transformation never happen on TcpIgniteClient.putAllConflict() --- ...CacheObjectCompressionConsumptionTest.java | 26 ++++++++++++++++--- .../platform/client/ClientMessageParser.java | 4 +-- .../ClientCachePutAllConflictRequest.java | 4 +-- .../ClientCacheRemoveAllConflictRequest.java | 2 +- .../ClientDataStreamerAddDataRequest.java | 5 ++-- .../streamer/ClientDataStreamerReader.java | 7 ++--- .../ClientDataStreamerStartRequest.java | 5 ++-- .../platform/utils/PlatformUtils.java | 8 ++++-- 8 files changed, 42 insertions(+), 19 deletions(-) diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java index 38ce530e5308c..5aabe2baf2b75 100644 --- a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java +++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java @@ -19,7 +19,9 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Function; import org.apache.commons.io.FileUtils; import org.apache.ignite.DataRegionMetrics; @@ -34,8 +36,11 @@ import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.client.thin.TcpClientCache; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.metric.LongMetric; @@ -285,7 +290,7 @@ private Consumption doTest(int cnt, Function keyGen, Function keyGen, Function> data = new HashMap<>(); + + GridCacheVersion otherVer = new GridCacheVersion(1, 1, 1, 0); + + data.put(key, new T3<>(val, otherVer, 0L)); + + ((TcpClientCache)cache).putAllConflict(data); + } assertEqualsArraysAware(cache.get(key), val); } @@ -335,7 +352,7 @@ private Consumption doTest(int cnt, Function keyGen, FunctionfindMetric(SENT_BYTES_METRIC_NAME).value(); clNet += reg.findMetric(RECEIVED_BYTES_METRIC_NAME).value(); - if (mode != ConsumptionTestMode.THIN_CLIENT) + if (mode != ConsumptionTestMode.THIN_CLIENT && mode != ConsumptionTestMode.THIN_CLIENT_INTERNAL_API) assertEquals(0, clNet); net += clNet; @@ -406,6 +423,9 @@ private enum ConsumptionTestMode { /** Thin client. */ THIN_CLIENT, + /** Thin client uses internal API. */ + THIN_CLIENT_INTERNAL_API, + /** Node + Persistent. */ PERSISTENT } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java index dbd62bc0e28ed..22cc257e660c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java @@ -627,10 +627,10 @@ public ClientListenerRequest decode(BinaryReaderExImpl reader) { return new ClientServiceGetDescriptorRequest(reader); case OP_DATA_STREAMER_START: - return new ClientDataStreamerStartRequest(reader); + return new ClientDataStreamerStartRequest(reader, ctx); case OP_DATA_STREAMER_ADD_DATA: - return new ClientDataStreamerAddDataRequest(reader); + return new ClientDataStreamerAddDataRequest(reader, ctx); case OP_ATOMIC_LONG_CREATE: return new ClientAtomicLongCreateRequest(reader); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java index ac3182b751b78..4701f654e6614 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java @@ -60,8 +60,8 @@ public ClientCachePutAllConflictRequest(BinaryReaderExImpl reader, ClientConnect map = new LinkedHashMap<>(cnt); for (int i = 0; i < cnt; i++) { - KeyCacheObject key = readCacheObject(reader, true); - CacheObject val = readCacheObject(reader, false); + KeyCacheObject key = readCacheObject(reader, true, ctx); + CacheObject val = readCacheObject(reader, false, ctx); GridCacheVersion ver = (GridCacheVersion)reader.readObjectDetached(); long expireTime = reader.readLong(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java index c6e1a35f1d25b..168585bb0992f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java @@ -51,7 +51,7 @@ public ClientCacheRemoveAllConflictRequest(BinaryReaderExImpl reader) { map = new LinkedHashMap<>(cnt); for (int i = 0; i < cnt; i++) { - KeyCacheObject key = readCacheObject(reader, true); + KeyCacheObject key = readCacheObject(reader, true, null); GridCacheVersion ver = (GridCacheVersion)reader.readObjectDetached(); map.put(key, ver); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerAddDataRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerAddDataRequest.java index 8e17a51a7eab8..9eaaec9e2e974 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerAddDataRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerAddDataRequest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.platform.client.streamer; import java.util.Collection; - import org.apache.ignite.internal.binary.BinaryReaderExImpl; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -48,12 +47,12 @@ public class ClientDataStreamerAddDataRequest extends ClientDataStreamerRequest * * @param reader Data reader. */ - public ClientDataStreamerAddDataRequest(BinaryReaderExImpl reader) { + public ClientDataStreamerAddDataRequest(BinaryReaderExImpl reader, ClientConnectionContext ctx) { super(reader); streamerId = reader.readLong(); flags = reader.readByte(); - entries = ClientDataStreamerReader.read(reader); + entries = ClientDataStreamerReader.read(reader, ctx); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java index 39a973df3c52b..da1ea65bef8cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java @@ -21,6 +21,7 @@ import java.util.Collection; import org.apache.ignite.internal.binary.BinaryReaderExImpl; import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; +import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; import static org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readCacheObject; @@ -34,7 +35,7 @@ class ClientDataStreamerReader { * @param reader Data reader. * @return Streamer entry. */ - public static Collection read(BinaryReaderExImpl reader) { + public static Collection read(BinaryReaderExImpl reader, ClientConnectionContext ctx) { int entriesCnt = reader.readInt(); if (entriesCnt == 0) @@ -43,8 +44,8 @@ public static Collection read(BinaryReaderExImpl reader) { Collection entries = new ArrayList<>(entriesCnt); for (int i = 0; i < entriesCnt; i++) { - entries.add(new DataStreamerEntry(readCacheObject(reader, true), - readCacheObject(reader, false))); + entries.add(new DataStreamerEntry(readCacheObject(reader, true, ctx), + readCacheObject(reader, false, ctx))); } return entries; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java index b8cd231aeaa9d..7c96f423e6953 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.platform.client.streamer; import java.util.Collection; - import org.apache.ignite.IgniteException; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.internal.GridKernalContext; @@ -73,7 +72,7 @@ public class ClientDataStreamerStartRequest extends ClientDataStreamerRequest { * * @param reader Data reader. */ - public ClientDataStreamerStartRequest(BinaryReaderExImpl reader) { + public ClientDataStreamerStartRequest(BinaryReaderExImpl reader, ClientConnectionContext ctx) { super(reader); cacheId = reader.readInt(); @@ -82,7 +81,7 @@ public ClientDataStreamerStartRequest(BinaryReaderExImpl reader) { perThreadBufferSize = reader.readInt(); receiverObj = reader.readObjectDetached(); receiverPlatform = receiverObj == null ? 0 : reader.readByte(); - entries = ClientDataStreamerReader.read(reader); + entries = ClientDataStreamerReader.read(reader, ctx); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java index 8462a5d24feb0..94b99943d89e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java @@ -68,6 +68,7 @@ import org.apache.ignite.internal.processors.platform.PlatformExtendedException; import org.apache.ignite.internal.processors.platform.PlatformNativeException; import org.apache.ignite.internal.processors.platform.PlatformProcessor; +import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetServiceImpl; import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; @@ -1373,8 +1374,9 @@ else if (svcCfg instanceof ServiceConfiguration && svcCfg.getService() instanceo * * @param reader Reader. * @param isKey {@code True} if object is a key. + * @param ctx Client connection context. */ - public static T readCacheObject(BinaryReaderExImpl reader, boolean isKey) { + public static T readCacheObject(BinaryReaderExImpl reader, boolean isKey, ClientConnectionContext ctx) { BinaryInputStream in = reader.in(); int pos0 = in.position(); @@ -1393,7 +1395,9 @@ public static T readCacheObject(BinaryReaderExImpl reade byte[] objBytes = in.readByteArray(pos1 - pos0); - return isKey ? (T)new KeyCacheObjectImpl(obj, objBytes, -1) : (T)new CacheObjectImpl(obj, objBytes); + return isKey ? + (T)new KeyCacheObjectImpl(obj, objBytes, -1) : + (T)new CacheObjectImpl(obj, ctx.kernalContext().transformer() == null ? objBytes : null); } /**