Skip to content

Commit

Permalink
IGNITE-20682 [ducktests] Add the extention point to the rebalance tes…
Browse files Browse the repository at this point in the history
…ts (#11002)
  • Loading branch information
skorotkov authored Oct 23, 2023
1 parent 655eb28 commit a1c13de
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.ignite.internal.ducktest.tests;

import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.ignite.IgniteCache;
Expand Down Expand Up @@ -54,6 +57,13 @@ public class DataGenerationApplication extends IgniteAwareApplication {
if (jsonNode.has("indexCount"))
idxCnt = jsonNode.get("indexCount").asInt();

byte[] dataPattern = null;

if (jsonNode.has("dataPatternBase64"))
dataPattern = Optional.ofNullable(jsonNode.get("dataPatternBase64").asText(null))
.map(b64 -> Base64.getDecoder().decode(b64))
.orElse(null);

markInitialized();

for (int i = 1; i <= cacheCnt; i++) {
Expand Down Expand Up @@ -82,7 +92,7 @@ public class DataGenerationApplication extends IgniteAwareApplication {

IgniteCache<Integer, BinaryObject> cache = ignite.getOrCreateCache(ccfg);

generateCacheData(cache.getName(), entrySize, from, to, idxCnt);
generateCacheData(cache.getName(), entrySize, from, to, idxCnt, dataPattern);
}

markFinished();
Expand All @@ -93,16 +103,38 @@ public class DataGenerationApplication extends IgniteAwareApplication {
* @param entrySize Entry size.
* @param from From key.
* @param to To key.
* @param dataPattern If not-null pattern is used to fill the entry data field.
* It is filled with random data otherwise.
*/
private void generateCacheData(String cacheName, int entrySize, int from, int to, int idxCnt) {
private void generateCacheData(String cacheName, int entrySize, int from, int to, int idxCnt, byte[] dataPattern) {
int flushEach = MAX_STREAMER_DATA_SIZE / entrySize + (MAX_STREAMER_DATA_SIZE % entrySize == 0 ? 0 : 1);
int logEach = (to - from) / 10;

BinaryObjectBuilder builder = ignite.binary().builder(VAL_TYPE);

byte[] data = new byte[entrySize];
byte[] data;

if (dataPattern != null) {
ByteArrayOutputStream buf = new ByteArrayOutputStream(entrySize);

int curPos = 0;

while (curPos < entrySize) {
int len = Math.min(dataPattern.length, entrySize - curPos);

buf.write(dataPattern, 0, len);

ThreadLocalRandom.current().nextBytes(data);
curPos += len;
}

assert buf.size() == entrySize;

data = buf.toByteArray();
}
else {
data = new byte[entrySize];
ThreadLocalRandom.current().nextBytes(data);
}

try (IgniteDataStreamer<Integer, BinaryObject> stmr = ignite.dataStreamer(cacheName)) {
for (int i = from; i < to; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
from ignitetest.services.ignite import IgniteService
from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster
from ignitetest.tests.rebalance.util import start_ignite, get_result, TriggerEvent, NUM_NODES, \
await_rebalance_start, RebalanceParams
from ignitetest.tests.util import preload_data, DataGenerationParams
await_rebalance_start, BaseRebalanceTest
from ignitetest.tests.util import preload_data
from ignitetest.utils import cluster, ignite_versions
from ignitetest.utils.ignite_test import IgniteTest
from ignitetest.utils.version import DEV_BRANCH, LATEST


class RebalanceInMemoryTest(IgniteTest):
class RebalanceInMemoryTest(BaseRebalanceTest):
"""
Tests rebalance scenarios in in-memory mode.
"""
Expand Down Expand Up @@ -81,12 +80,12 @@ def __run(self, ignite_version, trigger_event,
:param throttle: rebalanceThrottle config property.
:return: Rebalance and data preload stats.
"""
reb_params = RebalanceParams(trigger_event=trigger_event, thread_pool_size=thread_pool_size,
batch_size=batch_size, batches_prefetch_count=batches_prefetch_count,
throttle=throttle)
reb_params = self.get_reb_params(trigger_event=trigger_event, thread_pool_size=thread_pool_size,
batch_size=batch_size, batches_prefetch_count=batches_prefetch_count,
throttle=throttle)

data_gen_params = DataGenerationParams(backups=backups, cache_count=cache_count, entry_count=entry_count,
entry_size=entry_size, preloaders=preloaders)
data_gen_params = self.get_data_gen_params(backups=backups, cache_count=cache_count, entry_count=entry_count,
entry_size=entry_size, preloaders=preloaders)

ignites = start_ignite(self.test_context, ignite_version, reb_params, data_gen_params)

Expand All @@ -100,7 +99,8 @@ def __run(self, ignite_version, trigger_event,
rebalance_nodes = ignites.nodes[:-1]
else:
ignite = IgniteService(self.test_context,
ignites.config._replace(discovery_spi=from_ignite_cluster(ignites)), num_nodes=1)
ignites.config._replace(discovery_spi=from_ignite_cluster(ignites)), num_nodes=1,
modules=reb_params.modules)
ignite.start()
rebalance_nodes = ignite.nodes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@
from ignitetest.services.utils.ignite_aware import IgniteAwareService
from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster
from ignitetest.tests.rebalance.util import NUM_NODES, start_ignite, TriggerEvent, \
get_result, check_type_of_rebalancing, await_rebalance_start, RebalanceParams
from ignitetest.tests.util import preload_data, DataGenerationParams
get_result, check_type_of_rebalancing, await_rebalance_start, BaseRebalanceTest
from ignitetest.tests.util import preload_data
from ignitetest.utils import cluster, ignite_versions
from ignitetest.utils.ignite_test import IgniteTest
from ignitetest.utils.version import DEV_BRANCH, LATEST


class RebalancePersistentTest(IgniteTest):
class RebalancePersistentTest(BaseRebalanceTest):
"""
Tests rebalance scenarios in persistent mode.
"""
Expand All @@ -45,12 +44,12 @@ def test_node_join(self, ignite_version, backups, cache_count, entry_count, entr
Tests rebalance on node join.
"""

reb_params = RebalanceParams(trigger_event=TriggerEvent.NODE_JOIN, thread_pool_size=thread_pool_size,
batch_size=batch_size, batches_prefetch_count=batches_prefetch_count,
throttle=throttle, persistent=True)
reb_params = self.get_reb_params(trigger_event=TriggerEvent.NODE_JOIN, thread_pool_size=thread_pool_size,
batch_size=batch_size, batches_prefetch_count=batches_prefetch_count,
throttle=throttle)

data_gen_params = DataGenerationParams(backups=backups, cache_count=cache_count, entry_count=entry_count,
entry_size=entry_size, preloaders=preloaders)
data_gen_params = self.get_data_gen_params(backups=backups, cache_count=cache_count, entry_count=entry_count,
entry_size=entry_size, preloaders=preloaders)

ignites = start_ignite(self.test_context, ignite_version, reb_params, data_gen_params)

Expand All @@ -64,7 +63,7 @@ def test_node_join(self, ignite_version, backups, cache_count, entry_count, entr
data_gen_params=data_gen_params)

new_node = IgniteService(self.test_context, ignites.config._replace(discovery_spi=from_ignite_cluster(ignites)),
num_nodes=1)
num_nodes=1, modules=reb_params.modules)
new_node.start()

control_utility.add_to_baseline(new_node.nodes)
Expand Down Expand Up @@ -93,12 +92,12 @@ def test_node_left(self, ignite_version, backups, cache_count, entry_count, entr
Tests rebalance on node left.
"""

reb_params = RebalanceParams(trigger_event=TriggerEvent.NODE_LEFT, thread_pool_size=thread_pool_size,
batch_size=batch_size, batches_prefetch_count=batches_prefetch_count,
throttle=throttle, persistent=True)
reb_params = self.get_reb_params(trigger_event=TriggerEvent.NODE_LEFT, thread_pool_size=thread_pool_size,
batch_size=batch_size, batches_prefetch_count=batches_prefetch_count,
throttle=throttle)

data_gen_params = DataGenerationParams(backups=backups, cache_count=cache_count, entry_count=entry_count,
entry_size=entry_size, preloaders=preloaders)
data_gen_params = self.get_data_gen_params(backups=backups, cache_count=cache_count, entry_count=entry_count,
entry_size=entry_size, preloaders=preloaders)

ignites = start_ignite(self.test_context, ignite_version, reb_params, data_gen_params)

Expand Down Expand Up @@ -142,15 +141,15 @@ def node_join_historical_test(self, ignite_version, backups, cache_count, entry_

preload_entries = 10_000

reb_params = RebalanceParams(trigger_event=TriggerEvent.NODE_JOIN, thread_pool_size=thread_pool_size,
batch_size=batch_size, batches_prefetch_count=batches_prefetch_count,
throttle=throttle, persistent=True,
jvm_opts=['-DIGNITE_PDS_WAL_REBALANCE_THRESHOLD=0',
'-DIGNITE_PREFER_WAL_REBALANCE=true']
)
reb_params = self.get_reb_params(trigger_event=TriggerEvent.NODE_JOIN, thread_pool_size=thread_pool_size,
batch_size=batch_size, batches_prefetch_count=batches_prefetch_count,
throttle=throttle,
jvm_opts=['-DIGNITE_PDS_WAL_REBALANCE_THRESHOLD=0',
'-DIGNITE_PREFER_WAL_REBALANCE=true']
)

data_gen_params = DataGenerationParams(backups=backups, cache_count=cache_count, entry_count=entry_count,
entry_size=entry_size, preloaders=preloaders)
data_gen_params = self.get_data_gen_params(backups=backups, cache_count=cache_count, entry_count=entry_count,
entry_size=entry_size, preloaders=preloaders)

ignites = start_ignite(self.test_context, ignite_version, reb_params, data_gen_params)

Expand All @@ -163,7 +162,9 @@ def node_join_historical_test(self, ignite_version, backups, cache_count, entry_
self.test_context,
preloader_config,
java_class_name="org.apache.ignite.internal.ducktest.tests.DataGenerationApplication",
params={"backups": 1, "cacheCount": 1, "entrySize": 1, "from": 0, "to": preload_entries}
modules=data_gen_params.modules,
params={"backups": 1, "cacheCount": 1, "entrySize": 1, "from": 0, "to": preload_entries,
"dataPatternBase64": data_gen_params.data_pattern_base64}
)

preloader.run()
Expand Down Expand Up @@ -202,6 +203,11 @@ def node_join_historical_test(self, ignite_version, backups, cache_count, entry_

return result

def get_reb_params(self, **kwargs):
return super().get_reb_params(**kwargs)._replace(
persistent=True
)


def await_and_check_rebalance(service: IgniteService, rebalance_nodes: list = None, is_full: bool = True):
"""
Expand Down
30 changes: 29 additions & 1 deletion modules/ducktests/tests/ignitetest/tests/rebalance/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ignitetest.services.utils.ignite_configuration.data_storage import DataRegionConfiguration
from ignitetest.tests.util import DataGenerationParams
from ignitetest.utils.enum import constructible
from ignitetest.utils.ignite_test import IgniteTest
from ignitetest.utils.version import IgniteVersion

NUM_NODES = 4
Expand All @@ -54,6 +55,8 @@ class RebalanceParams(NamedTuple):
throttle: int = None
persistent: bool = False
jvm_opts: list = None
modules: list = []
plugins: list = []


class RebalanceMetrics(NamedTuple):
Expand Down Expand Up @@ -102,9 +105,13 @@ def start_ignite(test_context, ignite_version: str, rebalance_params: RebalanceP
rebalance_batches_prefetch_count=rebalance_params.batches_prefetch_count,
rebalance_throttle=rebalance_params.throttle)

if rebalance_params.plugins:
node_config = node_config._replace(plugins=[*node_config.plugins, *rebalance_params.plugins])

ignites = IgniteService(test_context, config=node_config,
num_nodes=node_count if rebalance_params.trigger_event else node_count - 1,
jvm_opts=rebalance_params.jvm_opts)
jvm_opts=rebalance_params.jvm_opts,
modules=rebalance_params.modules)
ignites.start()

return ignites
Expand Down Expand Up @@ -247,3 +254,24 @@ def check_type_of_rebalancing(rebalance_nodes: list, is_full: bool = True):
assert msg in i, i

return output


class BaseRebalanceTest(IgniteTest):
"""
Base class for rebalance tests.
"""
def get_reb_params(self, **kwargs):
"""
Create rebalance parameters.
:param kwargs: RebalanceParams cstor parameters.
:return: instance of RebalanceParams.
"""
return RebalanceParams(**kwargs)

def get_data_gen_params(self, **kwargs):
"""
Create parameters for data generation application.
:param kwargs: DataGenerationParams cstor parameters.
:return: instance of DataGenerationParams.
"""
return DataGenerationParams(**kwargs)
6 changes: 5 additions & 1 deletion modules/ducktests/tests/ignitetest/tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class DataGenerationParams(NamedTuple):
entry_size: int = 50_000
preloaders: int = 1
index_count: int = 0
data_pattern_base64: str = None
modules: list = []

@property
def data_region_max_size(self):
Expand Down Expand Up @@ -79,8 +81,10 @@ def start_app(_from, _to):
"entrySize": data_gen_params.entry_size,
"from": _from,
"to": _to,
"indexCount": data_gen_params.index_count
"indexCount": data_gen_params.index_count,
"dataPatternBase64": data_gen_params.data_pattern_base64
},
modules=data_gen_params.modules,
shutdown_timeout_sec=timeout)
app.start_async()

Expand Down

0 comments on commit a1c13de

Please sign in to comment.