diff --git a/perfkitbenchmarker/linux_benchmarks/cassandra_stress_benchmark.py b/perfkitbenchmarker/linux_benchmarks/cassandra_stress_benchmark.py index ed6157be0..7f611478c 100644 --- a/perfkitbenchmarker/linux_benchmarks/cassandra_stress_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/cassandra_stress_benchmark.py @@ -12,11 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Runs cassandra. +"""Runs cassandra-stress on a cluster of Cassandra servers. +Cassandra is a distributed, open source, NoSQL database management system Cassandra homepage: http://cassandra.apache.org cassandra-stress tool page: http://docs.datastax.com/en/cassandra/2.1/cassandra/tools/toolsCStress_t.html + +We are using cassandra-stress to test the performance of the cassandra cluster +for different workloads at varied load. The benchmark is run in two phases: +1. Preload phase: The cassandra cluster is preloaded with a set of keys. +2. Run phase: The cassandra cluster is run with a specified workload. Workload +can be of type read only, write only or mixed read and write. The benchmark can +be configured to run the test multiple times by using --optimize_performance +flag. We update the load for each test by updating the number of threads to get +max op rate. """ import collections @@ -25,6 +35,7 @@ import logging import math import posixpath +import re import time from typing import Union @@ -35,14 +46,16 @@ from perfkitbenchmarker import errors from perfkitbenchmarker import regex_util from perfkitbenchmarker import sample +from perfkitbenchmarker import units from perfkitbenchmarker import vm_util from perfkitbenchmarker.linux_packages import cassandra NUM_KEYS_PER_CORE = 2000000 # Adding wait between prefill and the test workload to give some time -# for the data to propage and for the cluster to stabilize. +# for the data to propagate and for the cluster to stabilize. PROPAGATION_WAIT_TIME = 720 +WAIT_BETWEEN_COMPACTION_TASKS_CHECK = 720 # cassandra-stress command WRITE_COMMAND = 'write' @@ -81,12 +94,12 @@ ) # Options for cassandra-stress -flags.DEFINE_integer( - 'num_keys', +CASSANDRA_STRESS_NUM_KEYS = flags.DEFINE_integer( + 'cassandra_stress_num_keys', 0, - 'Number of keys used in cassandra-stress tool across ' - 'all loader vms. If unset, this benchmark will use ' - '%s * NumCpusForBenchmark() on data nodes as the value.' + 'Number of keys used in cassandra-stress tool across all loader vms. If' + ' unset, this benchmark will use %s * NumCpusForBenchmark() on data nodes' + ' as the value. Ignored if --cassandra_stress_run_duration is set.' % NUM_KEYS_PER_CORE, ) @@ -113,6 +126,17 @@ 'Number of retries when error encountered during stress.', ) +CASSANDRA_STRESS_PRELOAD_THREADS = flags.DEFINE_integer( + 'cassandra_stress_preload_thead_count', + 300, + 'Number of threads to use for preloading.', +) +CASSANDRA_STRESS_RUN_DURATION = flags.DEFINE_string( + 'cassandra_stress_run_duration', + None, + 'Duration of the cassandra-stress. Use m, s and h as units. Overrides' + ' --num_keys.', +) IS_ROW_CACHE_ENABLED = flags.DEFINE_bool( 'is_row_cache_enabled', False, @@ -137,6 +161,16 @@ [], 'zones to launch the clients for the benchmark in. ', ) +CASSANDRA_CPU_UTILIZATION_LIMIT = flags.DEFINE_integer( + 'cassandra_cpu_utilization_limit', + 70, + 'Maximum cpu utilization percentage for the benchmark. ', +) +CASSANDRA_STRESS_MAX_RUNS = flags.DEFINE_integer( + 'cassandra_stress_max_runs', + 6, + 'Max Number of times to run cassandra-stress to optimize performance.', +) # Use "./cassandra-stress help -pop" to get more details. # [dist=DIST(?)]: Seeds are selected from this distribution # EXP(min..max): @@ -290,8 +324,17 @@ 'Total partitions', 'Total errors', }) -# Maximum value will be choisen between client vms. +# Maximum value will be chosen between client vms. MAXIMUM_METRICS = {'latency max'} +THREAD_INCREMENT_COUNT = 50 +MAX_MEDIAN_LATENCY_MS = 20 +MAX_ACCEPTED_COMPACTION_TIME = 30 +STARTING_THREAD_COUNT = 25 +SAR_CPU_UTILIZATION_INTERVAL = 10 + + +class CassandraCompactionNotCompletedError(Exception): + """Exception for cassandra compaction not complete.""" def GetConfig(user_config): @@ -399,12 +442,12 @@ def GenerateMetadataFromFlags(benchmark_spec, cassandra_vms, client_vms): """ vm_dict = benchmark_spec.vm_groups metadata = {} - if not FLAGS.num_keys: + if not CASSANDRA_STRESS_NUM_KEYS.value: metadata['num_keys'] = ( NUM_KEYS_PER_CORE * vm_dict[CASSANDRA_GROUP][0].NumCpusForBenchmark() ) else: - metadata['num_keys'] = FLAGS.num_keys + metadata['num_keys'] = CASSANDRA_STRESS_NUM_KEYS.value if FLAGS['cassandra_stress_preload_num_keys'].present: metadata['num_preload_keys'] = FLAGS.cassandra_stress_preload_num_keys @@ -419,6 +462,9 @@ def GenerateMetadataFromFlags(benchmark_spec, cassandra_vms, client_vms): 'num_data_nodes': len(cassandra_vms), 'num_loader_nodes': len(client_vms), 'num_cassandra_stress_threads': FLAGS.num_cassandra_stress_threads, + 'num_cassandra_stress_preload_threads': ( + CASSANDRA_STRESS_PRELOAD_THREADS.value + ), 'command': FLAGS.cassandra_stress_command, 'consistency_level': FLAGS.cassandra_stress_consistency_level, 'retries': FLAGS.cassandra_stress_retries, @@ -431,6 +477,7 @@ def GenerateMetadataFromFlags(benchmark_spec, cassandra_vms, client_vms): ), 'is_row_cache_enabled': FLAGS.is_row_cache_enabled, 'row_cache_size': FLAGS.row_cache_size, + 'duration': CASSANDRA_STRESS_RUN_DURATION.value, }) if FLAGS.cassandra_stress_command == USER_COMMAND: @@ -473,6 +520,8 @@ def PreloadCassandraServer(cassandra_vms, client_vms, metadata): client_vms, metadata['num_preload_keys'], cassandra_stress_command, + CASSANDRA_STRESS_PRELOAD_THREADS.value, + is_preload=True, ) logging.info('Waiting %s for keyspace to propagate.', PROPAGATION_WAIT_TIME) time.sleep(PROPAGATION_WAIT_TIME) @@ -511,6 +560,10 @@ def Prepare(benchmark_spec): background_tasks.RunThreaded( lambda vm: vm.Install('cassandra_stress'), client_vms ) + # needed to run sar + background_tasks.RunThreaded( + lambda vm: vm.InstallPackages('sysstat'), cassandra_vms + client_vms + ) seed_vm = cassandra_vms[0] configure = functools.partial(cassandra.Configure, seed_vms=[seed_vm]) background_tasks.RunThreaded(configure, cassandra_vms) @@ -527,6 +580,7 @@ def Prepare(benchmark_spec): seed_vm, replication_factor=FLAGS.cassandra_replication_factor ) PreloadCassandraServer(cassandra_vms, client_vms, metadata) + WaitForCompactionTasks(cassandra_vms) def _ResultFilePath(vm): @@ -543,6 +597,8 @@ def RunTestOnLoader( population_per_vm, population_dist, population_params, + thread_count, + is_preload, ): """Run Cassandra-stress test on loader node. @@ -557,6 +613,8 @@ def RunTestOnLoader( population_per_vm: integer. Population per loader vm. population_dist: string. The population distribution. population_params: string. Representing additional population parameters. + thread_count: integer. The number of threads to use for cassandra-stress. + is_preload: boolean. Whether this is a preload run. """ if command == USER_COMMAND: command += r' profile={profile} ops\({ops}\)'.format( @@ -590,21 +648,31 @@ def RunTestOnLoader( ) else: population_dist = '-pop seq=%s' % population_params + duration = '' + num_keys_parameter = 'n={num_keys}'.format(num_keys=operations_per_vm) + # Duration specificies how long to run the test. If it is set, we don't need + # to specify the number of keys. + if not is_preload and CASSANDRA_STRESS_RUN_DURATION.value: + duration = r'duration={duration}'.format( + duration=CASSANDRA_STRESS_RUN_DURATION.value + ) + num_keys_parameter = '' vm.RobustRemoteCommand( - 'sudo {cassandra_stress_command} {command} cl={consistency_level}' - ' n={num_keys} -node {nodes} {schema} {population_dist} -log' - ' file={result_file} -rate threads={threads} -errors retries={retries}' - .format( + 'sudo {cassandra_stress_command} {command} {duration}' + ' cl={consistency_level} {num_keys_parameter} -node {nodes} {schema}' + ' {population_dist} -log file={result_file} -rate threads={threads}' + ' -errors retries={retries}'.format( cassandra_stress_command=cassandra.GetCassandraStressPath(vm), command=command, consistency_level=FLAGS.cassandra_stress_consistency_level, - num_keys=operations_per_vm, + num_keys_parameter=num_keys_parameter, nodes=','.join(data_node_ips), schema=schema_option, population_dist=population_dist, result_file=_ResultFilePath(vm), retries=FLAGS.cassandra_stress_retries, - threads=FLAGS.num_cassandra_stress_threads, + threads=int(thread_count), + duration=duration, ) ) @@ -614,10 +682,12 @@ def RunCassandraStressTest( loader_vms, num_operations, command, + thread_count, profile_operations='insert=1', population_size=None, population_dist=None, population_params=None, + is_preload=False, ): """Start all loader nodes as Cassandra clients and run stress test. @@ -631,6 +701,11 @@ def RunCassandraStressTest( population_size: integer. The population size. population_dist: string. The population distribution. population_params: string. Representing additional population parameters. + thread_count: integer. The number of threads to use for cassandra-stress. + is_preload: boolean. Whether this is a preload run. + + Returns: + A list of cpu load for each cassandra node. """ num_loaders = len(loader_vms) data_node_ips = [vm.internal_ip for vm in cassandra_vms] @@ -645,24 +720,72 @@ def RunCassandraStressTest( operations_per_vm, ) logging.info('Executing the benchmark.') - args = [ - ( - ( - loader_vms[i], - i, - operations_per_vm, - data_node_ips, - command, - profile_operations, - population_per_vm, - population_dist, - population_params, - ), - {}, - ) - for i in range(0, num_loaders) - ] - background_tasks.RunThreaded(RunTestOnLoader, args) + tasks = [] + for i in range(0, len(loader_vms)): + tasks.append(( + RunTestOnLoader, + [ + loader_vms[i], + i, + operations_per_vm, + data_node_ips, + command, + profile_operations, + population_per_vm, + population_dist, + population_params, + thread_count, + is_preload, + ], + {}, + )) + if not is_preload: + for vm in cassandra_vms: + tasks.append((CPUUtilizationReporting, [vm], {})) + background_tasks.RunParallelThreads(tasks, max_concurrency=10) + return background_tasks.RunThreaded(GetLoadAverage, cassandra_vms) + + +def CPUUtilizationReporting(vm): + # command : sar -u + # we are collection the data every SAR_CPU_UTILIZATION_INTERVAL seconds for + # the duration of the test + vm.RobustRemoteCommand( + f'sar -u {SAR_CPU_UTILIZATION_INTERVAL}' + f' {CalculateNumberOfSarRequestsFromDuration(CASSANDRA_STRESS_RUN_DURATION.value, SAR_CPU_UTILIZATION_INTERVAL)}' + f' > {GenerateCpuUtilizationFileName(vm)}' + ) + + +def ParseAverageCpuUtilization(output) -> float: + """Parses the output of the sar command.""" + average_cpu_utilization = re.findall( + r'^Average.*$', output, flags=re.MULTILINE + ) + if not average_cpu_utilization: + logging.error('No average cpu utilization found in sar output.') + return 0 + per_process_cpu_utilization = re.sub( + ' +', ' ', average_cpu_utilization[0] + ).split(' ') + return float(per_process_cpu_utilization[2].strip()) + + +def CalculateNumberOfSarRequestsFromDuration(duration, freq): + """Calculates the number of sar requests to be sent from the duration of the test.""" + if duration is None: + # If duration is not set, we don't need to send sar requests. + return 0 + duration = duration.replace('m', 'min') # In units, m is meter + quantity = units.ParseExpression(duration) + seconds = quantity.m_as(units.second) + return int(seconds / freq) + + +def GetLoadAverage(vm): + stdout, _ = vm.RemoteCommand('uptime') + load_last_5m = ParseUptimeOutput(stdout) + return load_last_5m def ParseResp(resp) -> dict[str, Union[float, int]]: @@ -769,6 +892,10 @@ def CollectResults(loader_vms, metadata): return results +def GenerateCpuUtilizationFileName(vm): + return f'{vm_util.VM_TMP_DIR}/{vm.name}-cpu_utilization.log' + + def Run(benchmark_spec): """Run Cassandra on target vms. @@ -786,17 +913,170 @@ def Run(benchmark_spec): metadata['cassandra_version'] = cassandra.GetCassandraVersion( benchmark_spec.vm_groups[CASSANDRA_GROUP][0] ) - RunCassandraStressTest( - cassandra_vms, - client_vms, - metadata['num_keys'], - metadata['command'], - metadata.get('operations'), - metadata['population_size'], - metadata['population_dist'], - metadata['population_parameters'], + return RunTestNTimes(client_vms, cassandra_vms, metadata) + + +def RunTestNTimes(client_vms, cassandra_vms, metadata): + """Run the cassandra stress test max_allowed_runs times. + + Args: + client_vms: client vms. + cassandra_vms: cassandra server vms. + metadata: dict. Contains metadata for this benchmark. + + Returns: + A list of sample.Sample objects. + + Running cassandra stress test with different thread counts. + - We increase the thread count gradually by THREAD_INCREMENT_COUNT + till op rate increases. + - We decrease the thread count by THREAD_INCREMENT_COUNT/2 + if the operation rate is lower than the previous run. + """ + samples = [] + last_operation_rate = 0 + run_count = 0 + thread_count = STARTING_THREAD_COUNT + max_op_rate = 0 + max_op_rate_metadata = None + while run_count < CASSANDRA_STRESS_MAX_RUNS.value: + logging.info('running thread count: %s', thread_count) + run_count += 1 + cpu_loads = RunCassandraStressTest( + cassandra_vms, + client_vms, + metadata['num_keys'], + metadata['command'], + thread_count, + metadata.get('operations'), + metadata['population_size'], + metadata['population_dist'], + metadata['population_parameters'], + is_preload=False, + ) + cpu_utilization = GetCpuUtilization(cassandra_vms) + metadata['server_load_cpu_utilization'] = [ + cpu_loads[i] / cassandra_vms[i].num_cpus * 100 + for i in range(len(cassandra_vms)) + ] + metadata['server_cpu_utilization'] = cpu_utilization + metadata['num_cassandra_stress_threads'] = thread_count + # Trackig disk and memory usage for auditing and debugging purposes. + for vm in cassandra_vms: + vm.RemoteCommand('df -H') + vm.RemoteCommand('free -h') + current_samples = CollectResults(client_vms, copy.deepcopy(metadata)) + samples.extend(current_samples) + latest_operation_rate = GetOperationRate(current_samples) + median_latency = GetMedianLatency(current_samples) + max_cpu_usage = max(cpu_utilization) + + if max_op_rate < latest_operation_rate: + max_op_rate = latest_operation_rate + max_op_rate_metadata = metadata + + if ( + int(median_latency) >= MAX_MEDIAN_LATENCY_MS + or max_cpu_usage > CASSANDRA_CPU_UTILIZATION_LIMIT.value + ): + next_thread_count = thread_count - THREAD_INCREMENT_COUNT / 2 + elif latest_operation_rate > last_operation_rate: + next_thread_count = thread_count + THREAD_INCREMENT_COUNT + else: + next_thread_count = thread_count - THREAD_INCREMENT_COUNT / 2 + + last_operation_rate = latest_operation_rate + thread_count = next_thread_count + WaitForCompactionTasks(cassandra_vms) + samples.append( + sample.Sample( + 'max_op_rate', + max_op_rate, + 'operations per second', + max_op_rate_metadata, + ) + ) + return samples + + +@vm_util.Retry( + max_retries=10, + retryable_exceptions=( + CassandraCompactionNotCompletedError, + ), + poll_interval=WAIT_BETWEEN_COMPACTION_TASKS_CHECK, +) +def WaitForCompactionTasks(cassandra_vms): + """Waits for cassandra's pending compaction tasks to be completed.""" + pending_compaction_tasks = cassandra.GetPendingTaskCountFromCompactionStats( + cassandra_vms ) - return CollectResults(client_vms, metadata) + max_pending_compaction_tasks = max(pending_compaction_tasks) + logging.info( + 'Remaining compaction tasks: %s', + max_pending_compaction_tasks, + ) + if max_pending_compaction_tasks > 0: + raise CassandraCompactionNotCompletedError( + f'{max_pending_compaction_tasks} compaction tasks not completed' + ) + + +def GetCpuUtilization(cassandra_vms): + """Get cpu utilization during the test from sar output. + + Args: + cassandra_vms: cassandra server vms. + + Returns: + A list of cpu utilization for each cassandra node. + """ + cpu_utilization = [] + if ( + CalculateNumberOfSarRequestsFromDuration( + CASSANDRA_STRESS_RUN_DURATION.value, SAR_CPU_UTILIZATION_INTERVAL + ) + == 0 + ): + return cpu_utilization + for vm in cassandra_vms: + stdout, _ = vm.RobustRemoteCommand( + f'cat {GenerateCpuUtilizationFileName(vm)}' + ) + cpu_utilization.append(ParseAverageCpuUtilization(stdout)) + return cpu_utilization + + +def ParseUptimeOutput(uptime_output) -> float: + """Parses the output of the uptime command. + + Args: + uptime_output: The output of the uptime command. + + Returns: + The load average of the last 5 minutes. + + Sample uptime command output: + 20:11:37 up 172 days, 22 min, 4 users, load average: 0.23, 0.54, 0.31 + load average displays the load average of the last 1 minute, 5 minutes and 15 + minutes. + """ + load_average_str = re.sub('.*(load average: )(.*)', r'\2', uptime_output) + return float(load_average_str.split(', ')[1]) + + +def GetOperationRate(samples): + for s in samples: + if s.metric == 'op rate': + return s.value + return 0 + + +def GetMedianLatency(samples): + for s in samples: + if s.metric == 'latency median': + return s.value + return 0 def Cleanup(benchmark_spec): diff --git a/perfkitbenchmarker/linux_packages/cassandra.py b/perfkitbenchmarker/linux_packages/cassandra.py index da43d1c82..6a9c039bb 100644 --- a/perfkitbenchmarker/linux_packages/cassandra.py +++ b/perfkitbenchmarker/linux_packages/cassandra.py @@ -23,6 +23,7 @@ import logging import os import posixpath +import re import time from absl import flags from perfkitbenchmarker import background_tasks @@ -255,6 +256,46 @@ def IsRunning(vm): return False +def GetCompactionStats(vm): + """Returns compaction stats for the given VM. + + Args: + vm: VirtualMachine. The VM to get compaction stats from. + + Sample Output of compaction stats: + pending tasks: 5 + - keyspace1.standard1: 5 + + id compaction type keyspace table completed total unit progress + c69 Compaction keyspace1 standard1 437886277 20868111340 bytes 2.10% + Active compaction remaining time : 0h05m33s + """ + stdout, _ = vm.RemoteCommand(f'{GetNodetoolPath()} compactionstats') + return stdout + + +def GetPendingTaskCountFromCompactionStats(cassandra_vms): + """Parses the compaction stats for the given VMs and returns the pending task count. + + Args: + cassandra_vms: List of VirtualMachine. The Cassandra VMs to get compaction + stats from. + + Returns: + List of int. The pending task count for each VM. + """ + compaction_stats = background_tasks.RunThreaded( + GetCompactionStats, cassandra_vms + ) + pending_tasks = [] + for stats in compaction_stats: + line = re.search(r'pending tasks: *(\d*)', stats) + if line: + value = re.sub(r'pending tasks: *(\d*)', r'\1', line.group()) + pending_tasks.append(int(value)) + return pending_tasks + + def CleanNode(vm): """Remove Cassandra data from 'vm'. diff --git a/tests/linux_benchmarks/cassandra_stress_benchmark_test.py b/tests/linux_benchmarks/cassandra_stress_benchmark_test.py index f3fed8d07..47370b46e 100644 --- a/tests/linux_benchmarks/cassandra_stress_benchmark_test.py +++ b/tests/linux_benchmarks/cassandra_stress_benchmark_test.py @@ -8,6 +8,24 @@ class CassandraStressBenchmarkTest( pkb_common_test_case.PkbCommonTestCase, test_util.SamplesTestMixin ): + def testSarRequestCount(self): + count = cassandra_stress_benchmark.CalculateNumberOfSarRequestsFromDuration( + '10m', 10 + ) + self.assertEqual(count, 60) + count = cassandra_stress_benchmark.CalculateNumberOfSarRequestsFromDuration( + '10s', 10 + ) + self.assertEqual(count, 1) + count = cassandra_stress_benchmark.CalculateNumberOfSarRequestsFromDuration( + '10h', 10 + ) + self.assertEqual(count, 3600) + count = cassandra_stress_benchmark.CalculateNumberOfSarRequestsFromDuration( + None, 10 + ) + self.assertEqual(count, 0) + def testCassandraStressResponseParsing(self): sample_results = """ Results: