diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java index 216fbf72d6279..e96d76d72c29f 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java @@ -17,9 +17,12 @@ package org.apache.ignite.internal.ducktest.tests; +import java.io.ByteArrayOutputStream; import java.util.ArrayList; +import java.util.Base64; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; import com.fasterxml.jackson.databind.JsonNode; import org.apache.ignite.IgniteCache; @@ -54,6 +57,13 @@ public class DataGenerationApplication extends IgniteAwareApplication { if (jsonNode.has("indexCount")) idxCnt = jsonNode.get("indexCount").asInt(); + byte[] dataPattern = null; + + if (jsonNode.has("dataPatternBase64")) + dataPattern = Optional.ofNullable(jsonNode.get("dataPatternBase64").asText(null)) + .map(b64 -> Base64.getDecoder().decode(b64)) + .orElse(null); + markInitialized(); for (int i = 1; i <= cacheCnt; i++) { @@ -82,7 +92,7 @@ public class DataGenerationApplication extends IgniteAwareApplication { IgniteCache cache = ignite.getOrCreateCache(ccfg); - generateCacheData(cache.getName(), entrySize, from, to, idxCnt); + generateCacheData(cache.getName(), entrySize, from, to, idxCnt, dataPattern); } markFinished(); @@ -93,16 +103,38 @@ public class DataGenerationApplication extends IgniteAwareApplication { * @param entrySize Entry size. * @param from From key. * @param to To key. + * @param dataPattern If not-null pattern is used to fill the entry data field. + * It is filled with random data otherwise. */ - private void generateCacheData(String cacheName, int entrySize, int from, int to, int idxCnt) { + private void generateCacheData(String cacheName, int entrySize, int from, int to, int idxCnt, byte[] dataPattern) { int flushEach = MAX_STREAMER_DATA_SIZE / entrySize + (MAX_STREAMER_DATA_SIZE % entrySize == 0 ? 0 : 1); int logEach = (to - from) / 10; BinaryObjectBuilder builder = ignite.binary().builder(VAL_TYPE); - byte[] data = new byte[entrySize]; + byte[] data; + + if (dataPattern != null) { + ByteArrayOutputStream buf = new ByteArrayOutputStream(entrySize); + + int curPos = 0; + + while (curPos < entrySize) { + int len = Math.min(dataPattern.length, entrySize - curPos); + + buf.write(dataPattern, 0, len); - ThreadLocalRandom.current().nextBytes(data); + curPos += len; + } + + assert buf.size() == entrySize; + + data = buf.toByteArray(); + } + else { + data = new byte[entrySize]; + ThreadLocalRandom.current().nextBytes(data); + } try (IgniteDataStreamer stmr = ignite.dataStreamer(cacheName)) { for (int i = from; i < to; i++) { diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py index d1213d65ffda2..9ad55243ce13a 100644 --- a/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py +++ b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py @@ -21,14 +21,13 @@ from ignitetest.services.ignite import IgniteService from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster from ignitetest.tests.rebalance.util import start_ignite, get_result, TriggerEvent, NUM_NODES, \ - await_rebalance_start, RebalanceParams -from ignitetest.tests.util import preload_data, DataGenerationParams + await_rebalance_start, BaseRebalanceTest +from ignitetest.tests.util import preload_data from ignitetest.utils import cluster, ignite_versions -from ignitetest.utils.ignite_test import IgniteTest from ignitetest.utils.version import DEV_BRANCH, LATEST -class RebalanceInMemoryTest(IgniteTest): +class RebalanceInMemoryTest(BaseRebalanceTest): """ Tests rebalance scenarios in in-memory mode. """ @@ -81,12 +80,12 @@ def __run(self, ignite_version, trigger_event, :param throttle: rebalanceThrottle config property. :return: Rebalance and data preload stats. """ - reb_params = RebalanceParams(trigger_event=trigger_event, thread_pool_size=thread_pool_size, - batch_size=batch_size, batches_prefetch_count=batches_prefetch_count, - throttle=throttle) + reb_params = self.get_reb_params(trigger_event=trigger_event, thread_pool_size=thread_pool_size, + batch_size=batch_size, batches_prefetch_count=batches_prefetch_count, + throttle=throttle) - data_gen_params = DataGenerationParams(backups=backups, cache_count=cache_count, entry_count=entry_count, - entry_size=entry_size, preloaders=preloaders) + data_gen_params = self.get_data_gen_params(backups=backups, cache_count=cache_count, entry_count=entry_count, + entry_size=entry_size, preloaders=preloaders) ignites = start_ignite(self.test_context, ignite_version, reb_params, data_gen_params) @@ -100,7 +99,8 @@ def __run(self, ignite_version, trigger_event, rebalance_nodes = ignites.nodes[:-1] else: ignite = IgniteService(self.test_context, - ignites.config._replace(discovery_spi=from_ignite_cluster(ignites)), num_nodes=1) + ignites.config._replace(discovery_spi=from_ignite_cluster(ignites)), num_nodes=1, + modules=reb_params.modules) ignite.start() rebalance_nodes = ignite.nodes diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/persistent_test.py b/modules/ducktests/tests/ignitetest/tests/rebalance/persistent_test.py index 1aba0d71e3f80..0599316d322d4 100644 --- a/modules/ducktests/tests/ignitetest/tests/rebalance/persistent_test.py +++ b/modules/ducktests/tests/ignitetest/tests/rebalance/persistent_test.py @@ -24,14 +24,13 @@ from ignitetest.services.utils.ignite_aware import IgniteAwareService from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster from ignitetest.tests.rebalance.util import NUM_NODES, start_ignite, TriggerEvent, \ - get_result, check_type_of_rebalancing, await_rebalance_start, RebalanceParams -from ignitetest.tests.util import preload_data, DataGenerationParams + get_result, check_type_of_rebalancing, await_rebalance_start, BaseRebalanceTest +from ignitetest.tests.util import preload_data from ignitetest.utils import cluster, ignite_versions -from ignitetest.utils.ignite_test import IgniteTest from ignitetest.utils.version import DEV_BRANCH, LATEST -class RebalancePersistentTest(IgniteTest): +class RebalancePersistentTest(BaseRebalanceTest): """ Tests rebalance scenarios in persistent mode. """ @@ -45,12 +44,12 @@ def test_node_join(self, ignite_version, backups, cache_count, entry_count, entr Tests rebalance on node join. """ - reb_params = RebalanceParams(trigger_event=TriggerEvent.NODE_JOIN, thread_pool_size=thread_pool_size, - batch_size=batch_size, batches_prefetch_count=batches_prefetch_count, - throttle=throttle, persistent=True) + reb_params = self.get_reb_params(trigger_event=TriggerEvent.NODE_JOIN, thread_pool_size=thread_pool_size, + batch_size=batch_size, batches_prefetch_count=batches_prefetch_count, + throttle=throttle) - data_gen_params = DataGenerationParams(backups=backups, cache_count=cache_count, entry_count=entry_count, - entry_size=entry_size, preloaders=preloaders) + data_gen_params = self.get_data_gen_params(backups=backups, cache_count=cache_count, entry_count=entry_count, + entry_size=entry_size, preloaders=preloaders) ignites = start_ignite(self.test_context, ignite_version, reb_params, data_gen_params) @@ -64,7 +63,7 @@ def test_node_join(self, ignite_version, backups, cache_count, entry_count, entr data_gen_params=data_gen_params) new_node = IgniteService(self.test_context, ignites.config._replace(discovery_spi=from_ignite_cluster(ignites)), - num_nodes=1) + num_nodes=1, modules=reb_params.modules) new_node.start() control_utility.add_to_baseline(new_node.nodes) @@ -93,12 +92,12 @@ def test_node_left(self, ignite_version, backups, cache_count, entry_count, entr Tests rebalance on node left. """ - reb_params = RebalanceParams(trigger_event=TriggerEvent.NODE_LEFT, thread_pool_size=thread_pool_size, - batch_size=batch_size, batches_prefetch_count=batches_prefetch_count, - throttle=throttle, persistent=True) + reb_params = self.get_reb_params(trigger_event=TriggerEvent.NODE_LEFT, thread_pool_size=thread_pool_size, + batch_size=batch_size, batches_prefetch_count=batches_prefetch_count, + throttle=throttle) - data_gen_params = DataGenerationParams(backups=backups, cache_count=cache_count, entry_count=entry_count, - entry_size=entry_size, preloaders=preloaders) + data_gen_params = self.get_data_gen_params(backups=backups, cache_count=cache_count, entry_count=entry_count, + entry_size=entry_size, preloaders=preloaders) ignites = start_ignite(self.test_context, ignite_version, reb_params, data_gen_params) @@ -142,15 +141,15 @@ def node_join_historical_test(self, ignite_version, backups, cache_count, entry_ preload_entries = 10_000 - reb_params = RebalanceParams(trigger_event=TriggerEvent.NODE_JOIN, thread_pool_size=thread_pool_size, - batch_size=batch_size, batches_prefetch_count=batches_prefetch_count, - throttle=throttle, persistent=True, - jvm_opts=['-DIGNITE_PDS_WAL_REBALANCE_THRESHOLD=0', - '-DIGNITE_PREFER_WAL_REBALANCE=true'] - ) + reb_params = self.get_reb_params(trigger_event=TriggerEvent.NODE_JOIN, thread_pool_size=thread_pool_size, + batch_size=batch_size, batches_prefetch_count=batches_prefetch_count, + throttle=throttle, + jvm_opts=['-DIGNITE_PDS_WAL_REBALANCE_THRESHOLD=0', + '-DIGNITE_PREFER_WAL_REBALANCE=true'] + ) - data_gen_params = DataGenerationParams(backups=backups, cache_count=cache_count, entry_count=entry_count, - entry_size=entry_size, preloaders=preloaders) + data_gen_params = self.get_data_gen_params(backups=backups, cache_count=cache_count, entry_count=entry_count, + entry_size=entry_size, preloaders=preloaders) ignites = start_ignite(self.test_context, ignite_version, reb_params, data_gen_params) @@ -163,7 +162,9 @@ def node_join_historical_test(self, ignite_version, backups, cache_count, entry_ self.test_context, preloader_config, java_class_name="org.apache.ignite.internal.ducktest.tests.DataGenerationApplication", - params={"backups": 1, "cacheCount": 1, "entrySize": 1, "from": 0, "to": preload_entries} + modules=data_gen_params.modules, + params={"backups": 1, "cacheCount": 1, "entrySize": 1, "from": 0, "to": preload_entries, + "dataPatternBase64": data_gen_params.data_pattern_base64} ) preloader.run() @@ -202,6 +203,11 @@ def node_join_historical_test(self, ignite_version, backups, cache_count, entry_ return result + def get_reb_params(self, **kwargs): + return super().get_reb_params(**kwargs)._replace( + persistent=True + ) + def await_and_check_rebalance(service: IgniteService, rebalance_nodes: list = None, is_full: bool = True): """ diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/util.py b/modules/ducktests/tests/ignitetest/tests/rebalance/util.py index 18ebb173c5a85..9a5351dd2388e 100644 --- a/modules/ducktests/tests/ignitetest/tests/rebalance/util.py +++ b/modules/ducktests/tests/ignitetest/tests/rebalance/util.py @@ -29,6 +29,7 @@ from ignitetest.services.utils.ignite_configuration.data_storage import DataRegionConfiguration from ignitetest.tests.util import DataGenerationParams from ignitetest.utils.enum import constructible +from ignitetest.utils.ignite_test import IgniteTest from ignitetest.utils.version import IgniteVersion NUM_NODES = 4 @@ -54,6 +55,8 @@ class RebalanceParams(NamedTuple): throttle: int = None persistent: bool = False jvm_opts: list = None + modules: list = [] + plugins: list = [] class RebalanceMetrics(NamedTuple): @@ -102,9 +105,13 @@ def start_ignite(test_context, ignite_version: str, rebalance_params: RebalanceP rebalance_batches_prefetch_count=rebalance_params.batches_prefetch_count, rebalance_throttle=rebalance_params.throttle) + if rebalance_params.plugins: + node_config = node_config._replace(plugins=[*node_config.plugins, *rebalance_params.plugins]) + ignites = IgniteService(test_context, config=node_config, num_nodes=node_count if rebalance_params.trigger_event else node_count - 1, - jvm_opts=rebalance_params.jvm_opts) + jvm_opts=rebalance_params.jvm_opts, + modules=rebalance_params.modules) ignites.start() return ignites @@ -247,3 +254,24 @@ def check_type_of_rebalancing(rebalance_nodes: list, is_full: bool = True): assert msg in i, i return output + + +class BaseRebalanceTest(IgniteTest): + """ + Base class for rebalance tests. + """ + def get_reb_params(self, **kwargs): + """ + Create rebalance parameters. + :param kwargs: RebalanceParams cstor parameters. + :return: instance of RebalanceParams. + """ + return RebalanceParams(**kwargs) + + def get_data_gen_params(self, **kwargs): + """ + Create parameters for data generation application. + :param kwargs: DataGenerationParams cstor parameters. + :return: instance of DataGenerationParams. + """ + return DataGenerationParams(**kwargs) diff --git a/modules/ducktests/tests/ignitetest/tests/util.py b/modules/ducktests/tests/ignitetest/tests/util.py index b8192a3443b5f..bc40cf860a4a7 100644 --- a/modules/ducktests/tests/ignitetest/tests/util.py +++ b/modules/ducktests/tests/ignitetest/tests/util.py @@ -35,6 +35,8 @@ class DataGenerationParams(NamedTuple): entry_size: int = 50_000 preloaders: int = 1 index_count: int = 0 + data_pattern_base64: str = None + modules: list = [] @property def data_region_max_size(self): @@ -79,8 +81,10 @@ def start_app(_from, _to): "entrySize": data_gen_params.entry_size, "from": _from, "to": _to, - "indexCount": data_gen_params.index_count + "indexCount": data_gen_params.index_count, + "dataPatternBase64": data_gen_params.data_pattern_base64 }, + modules=data_gen_params.modules, shutdown_timeout_sec=timeout) app.start_async()