diff --git a/service_capacity_modeling/capacity_planner.py b/service_capacity_modeling/capacity_planner.py index 81e6ef6..29cf8d8 100644 --- a/service_capacity_modeling/capacity_planner.py +++ b/service_capacity_modeling/capacity_planner.py @@ -503,12 +503,11 @@ def _plan_certain( if len(allowed_drives) == 0: allowed_drives.update(hardware.drives.keys()) - # Get current instance type if exists - current_instance_name: str = extra_model_arguments.get("current_instance_name", None) - if current_instance_name is not None: + # Get current instance object if exists + if desires.current_instance_type is not "": for instance in hardware.instances.values(): - if instance.name == current_instance_name: - extra_model_arguments["current_instance_name"] = instance + if instance.name == desires.current_instance_type: + desires.current_instance = instance plans = [] if model.run_hardware_simulation(): diff --git a/service_capacity_modeling/interface.py b/service_capacity_modeling/interface.py index 9a065f8..9c47d85 100644 --- a/service_capacity_modeling/interface.py +++ b/service_capacity_modeling/interface.py @@ -17,7 +17,6 @@ from pydantic import BaseModel from pydantic import Field - GIB_IN_BYTES = 1024 * 1024 * 1024 MIB_IN_BYTES = 1024 * 1024 MEGABIT_IN_BYTES = (1000 * 1000) / 8 @@ -376,7 +375,7 @@ def annual_cost_gib(self, data_gib: float = 0): break if transfer_cost[0] > 0: annual_cost += ( - min(_annual_data, transfer_cost[0]) * transfer_cost[1] + min(_annual_data, transfer_cost[0]) * transfer_cost[1] ) _annual_data -= transfer_cost[0] else: @@ -639,6 +638,9 @@ class CapacityDesires(ExcludeUnsetModel): # hence this default core_reference_ghz: float = 2.3 + current_instance_type: str = "" + current_instance: Instance = None # type: ignore + def merge_with(self, defaults: "CapacityDesires") -> "CapacityDesires": # Now merge with the models default desires_dict = self.dict(exclude_unset=True) @@ -732,7 +734,7 @@ class Requirements(ExcludeUnsetModel): # pylint: disable=unused-argument @staticmethod def regret( - name: str, optimal_plan: "CapacityPlan", proposed_plan: "CapacityPlan" + name: str, optimal_plan: "CapacityPlan", proposed_plan: "CapacityPlan" ) -> float: return 0.0 diff --git a/service_capacity_modeling/models/org/netflix/cassandra.py b/service_capacity_modeling/models/org/netflix/cassandra.py index 7133f4f..0968aed 100644 --- a/service_capacity_modeling/models/org/netflix/cassandra.py +++ b/service_capacity_modeling/models/org/netflix/cassandra.py @@ -40,39 +40,39 @@ def _write_buffer_gib_zone( - desires: CapacityDesires, zones_per_region: int, flushes_before_compaction: int = 4 + desires: CapacityDesires, zones_per_region: int, flushes_before_compaction: int = 4 ) -> float: # Cassandra has to buffer writes before flushing to disk, and assuming # we will compact every 4 flushes and we want no more than 2 redundant # compactions in an hour, we want <= 4**2 = 16 flushes per hour # or a flush of data every 3600 / 16 = 225 seconds write_bytes_per_second = ( - desires.query_pattern.estimated_write_per_second.mid - * desires.query_pattern.estimated_mean_write_size_bytes.mid + desires.query_pattern.estimated_write_per_second.mid + * desires.query_pattern.estimated_mean_write_size_bytes.mid ) compactions_per_hour = 2 hour_in_seconds = 60 * 60 write_buffer_gib = ( - (write_bytes_per_second * hour_in_seconds) - / (flushes_before_compaction**compactions_per_hour) - ) / (1 << 30) + (write_bytes_per_second * hour_in_seconds) + / (flushes_before_compaction ** compactions_per_hour) + ) / (1 << 30) return float(write_buffer_gib) / zones_per_region def _estimate_cassandra_requirement( - instance: Instance, - current_instance: Instance, - required_cluster_size: Optional[int], - desires: CapacityDesires, - working_set: float, - reads_per_second: float, - max_rps_to_disk: int, - zones_per_region: int = 3, - copies_per_region: int = 3, - max_cpu_utilization: float = None, + instance: Instance, + max_cpu_utilization: Optional[float], + required_cluster_size: Optional[int], + current_instance: Optional[Instance], + desires: CapacityDesires, + working_set: float, + reads_per_second: float, + max_rps_to_disk: int, + zones_per_region: int = 3, + copies_per_region: int = 3, ) -> CapacityRequirement: """Estimate the capacity required for one zone given a regional desire @@ -80,7 +80,7 @@ def _estimate_cassandra_requirement( return the zonal capacity requirement """ # Keep half of the cores free for background work (compaction, backup, repair) - if all([max_cpu_utilization, current_instance, required_cluster_size]): + if max_cpu_utilization is not None and current_instance is not None and required_cluster_size is not None: needed_cores = (current_instance.cpu * required_cluster_size) * (max_cpu_utilization / 20) else: needed_cores = sqrt_staffed_cores(desires) * 2 @@ -174,24 +174,22 @@ def _upsert_params(cluster, params): # pylint: disable=too-many-return-statements # flake8: noqa: C901 def _estimate_cassandra_cluster_zonal( - instance: Instance, - current_instance: Instance, - drive: Drive, - context: RegionContext, - desires: CapacityDesires, - zones_per_region: int = 3, - copies_per_region: int = 3, - require_local_disks: bool = False, - require_attached_disks: bool = False, - required_cluster_size: Optional[int] = None, - max_rps_to_disk: int = 500, - max_local_disk_gib: int = 2048, - max_regional_size: int = 96, - max_write_buffer_percent: float = 0.25, - max_table_buffer_percent: float = 0.11, - max_cpu_utilization: float = None, + instance: Instance, + max_cpu_utilization: Optional[float], + drive: Drive, + context: RegionContext, + desires: CapacityDesires, + zones_per_region: int = 3, + copies_per_region: int = 3, + require_local_disks: bool = False, + require_attached_disks: bool = False, + required_cluster_size: Optional[int] = None, + max_rps_to_disk: int = 500, + max_local_disk_gib: int = 2048, + max_regional_size: int = 96, + max_write_buffer_percent: float = 0.25, + max_table_buffer_percent: float = 0.11, ) -> Optional[CapacityPlan]: - # Netflix Cassandra doesn't like to deploy on really small instances if instance.cpu < 2 or instance.ram_gib < 14: return None @@ -210,7 +208,7 @@ def _estimate_cassandra_cluster_zonal( rps = desires.query_pattern.estimated_read_per_second.mid // zones_per_region write_per_sec = ( - desires.query_pattern.estimated_write_per_second.mid // zones_per_region + desires.query_pattern.estimated_write_per_second.mid // zones_per_region ) write_bytes_per_sec = round( write_per_sec * desires.query_pattern.estimated_mean_write_size_bytes.mid @@ -242,15 +240,15 @@ def _estimate_cassandra_cluster_zonal( requirement = _estimate_cassandra_requirement( instance=instance, - current_instance=current_instance, + max_cpu_utilization=max_cpu_utilization, required_cluster_size=required_cluster_size, + current_instance=desires.current_instance, desires=desires, working_set=working_set, reads_per_second=rps, max_rps_to_disk=max_rps_to_disk, zones_per_region=zones_per_region, copies_per_region=copies_per_region, - max_cpu_utilization=max_cpu_utilization, ) # Cassandra clusters should aim to be at least 2 nodes per zone to start @@ -261,8 +259,8 @@ def _estimate_cassandra_cluster_zonal( min_count = 2 base_mem = ( - desires.data_shape.reserved_instance_app_mem_gib - + desires.data_shape.reserved_instance_system_mem_gib + desires.data_shape.reserved_instance_app_mem_gib + + desires.data_shape.reserved_instance_system_mem_gib ) heap_fn = _cass_heap_for_write_buffer( @@ -345,7 +343,7 @@ def _estimate_cassandra_cluster_zonal( annual_cost=blob.annual_cost_gib(requirement.disk_gib.mid), service_params={ "nines_required": ( - 1 - 1.0 / desires.data_shape.durability_slo_order.mid + 1 - 1.0 / desires.data_shape.durability_slo_order.mid ) }, ) @@ -388,15 +386,15 @@ def _cass_io_per_read(node_size_gib, sstable_size_mb=160): def _cass_heap_for_write_buffer( - instance: Instance, - write_buffer_gib: float, - max_zonal_size: int, - buffer_percent: float, + instance: Instance, + write_buffer_gib: float, + max_zonal_size: int, + buffer_percent: float, ) -> Callable[[float], float]: # If there is no way we can get enough heap with the max zonal size, try # letting max heap grow to 31 GiB per node to get more write buffer if write_buffer_gib > ( - max_zonal_size * _cass_heap(instance.ram_gib) * buffer_percent + max_zonal_size * _cass_heap(instance.ram_gib) * buffer_percent ): return lambda x: _cass_heap(x, max_heap_gib=30) else: @@ -421,9 +419,9 @@ def _target_rf(desires: CapacityDesires, user_copies: Optional[int]) -> int: # run with RF=2 consistency = desires.query_pattern.access_consistency.same_region if ( - desires.data_shape.durability_slo_order.mid < 1000 - and consistency is not None - and consistency.target_consistency != AccessConsistency.read_your_writes + desires.data_shape.durability_slo_order.mid < 1000 + and consistency is not None + and consistency.target_consistency != AccessConsistency.read_your_writes ): return 2 return 3 @@ -433,7 +431,7 @@ class NflxCassandraArguments(BaseModel): copies_per_region: int = Field( default=3, description="How many copies of the data will exist e.g. RF=3. If unsupplied" - " this will be deduced from durability and consistency desires", + " this will be deduced from durability and consistency desires", ) require_local_disks: bool = Field( default=False, @@ -462,25 +460,25 @@ class NflxCassandraArguments(BaseModel): max_write_buffer_percent: float = Field( default=0.25, description="The amount of heap memory that can be used to buffer writes. " - "Note that if there are more than 100k writes this will " - "automatically adjust to 0.5", + "Note that if there are more than 100k writes this will " + "automatically adjust to 0.5", ) max_table_buffer_percent: float = Field( default=0.11, description="How much of heap memory can be used for a single table. " - "Note that if there are more than 100k writes this will " - "automatically adjust to 0.2", + "Note that if there are more than 100k writes this will " + "automatically adjust to 0.2", ) class NflxCassandraCapacityModel(CapacityModel): @staticmethod def capacity_plan( - instance: Instance, - drive: Drive, - context: RegionContext, - desires: CapacityDesires, - extra_model_arguments: Dict[str, Any], + instance: Instance, + drive: Drive, + context: RegionContext, + desires: CapacityDesires, + extra_model_arguments: Dict[str, Any], ) -> Optional[CapacityPlan]: # Use durabiliy and consistency to compute RF. copies_per_region = _target_rf( @@ -504,20 +502,19 @@ def capacity_plan( max_table_buffer_percent: float = min( 0.5, extra_model_arguments.get("max_table_buffer_percent", 0.11) ) - max_cpu_utilization: float = extra_model_arguments.get("max_cpu_utilization", None) - current_instance: Instance = extra_model_arguments.get("current_instance_name", None) + max_cpu_utilization: Optional[float] = extra_model_arguments.get("max_cpu_utilization", None) # Adjust heap defaults for high write clusters if ( - desires.query_pattern.estimated_write_per_second.mid >= 100_000 - and desires.data_shape.estimated_state_size_gib.mid >= 100 + desires.query_pattern.estimated_write_per_second.mid >= 100_000 + and desires.data_shape.estimated_state_size_gib.mid >= 100 ): max_write_buffer_percent = max(0.5, max_write_buffer_percent) max_table_buffer_percent = max(0.2, max_table_buffer_percent) return _estimate_cassandra_cluster_zonal( instance=instance, - current_instance=current_instance, + max_cpu_utilization=max_cpu_utilization, drive=drive, context=context, desires=desires, @@ -531,7 +528,6 @@ def capacity_plan( max_local_disk_gib=max_local_disk_gib, max_write_buffer_percent=max_write_buffer_percent, max_table_buffer_percent=max_table_buffer_percent, - max_cpu_utilization=max_cpu_utilization, ) @staticmethod diff --git a/tests/netflix/test_cassandra.py b/tests/netflix/test_cassandra.py index d85e618..5a506c5 100644 --- a/tests/netflix/test_cassandra.py +++ b/tests/netflix/test_cassandra.py @@ -8,7 +8,8 @@ from service_capacity_modeling.interface import FixedInterval from service_capacity_modeling.interface import GlobalConsistency from service_capacity_modeling.interface import QueryPattern - +from service_capacity_modeling.interface import Interval +from service_capacity_modeling.interface import AccessPattern small_but_high_qps = CapacityDesires( service_tier=1, @@ -66,9 +67,9 @@ def test_capacity_small_fast(): # with lots of ebs_gp2 to handle the read IOs if small_result.attached_drives: assert ( - small_result.count - * sum(d.size_gib for d in small_result.attached_drives) - > 1000 + small_result.count + * sum(d.size_gib for d in small_result.attached_drives) + > 1000 ) assert small_result.cluster_params["cassandra.heap.write.percent"] == 0.25 @@ -156,12 +157,12 @@ def test_capacity_high_writes(): assert 30 <= num_cpus <= 128 if high_writes_result.attached_drives: assert ( - high_writes_result.count * high_writes_result.attached_drives[0].size_gib - >= 400 + high_writes_result.count * high_writes_result.attached_drives[0].size_gib + >= 400 ) elif high_writes_result.instance.drive is not None: assert ( - high_writes_result.count * high_writes_result.instance.drive.size_gib >= 400 + high_writes_result.count * high_writes_result.instance.drive.size_gib >= 400 ) else: raise AssertionError("Should have drives") @@ -194,9 +195,9 @@ def test_high_write_throughput(): assert high_writes_result.attached_drives[0].size_gib >= 400 assert ( - 300_000 - > high_writes_result.count * high_writes_result.attached_drives[0].size_gib - >= 100_000 + 300_000 + > high_writes_result.count * high_writes_result.attached_drives[0].size_gib + >= 100_000 ) cluster_cost = cap_plan.candidate_clusters.annual_costs["cassandra.zonal-clusters"] @@ -225,7 +226,7 @@ def test_capacity_large_footprint(): assert large_footprint_result.cluster_params["cassandra.heap.write.percent"] == 0.25 assert large_footprint_result.cluster_params["cassandra.heap.table.percent"] == 0.11 assert ( - large_footprint_result.cluster_params["cassandra.compaction.min_threshold"] == 4 + large_footprint_result.cluster_params["cassandra.compaction.min_threshold"] == 4 ) @@ -269,7 +270,7 @@ def test_reduced_durability(): )[0] assert cheap_plan.candidate_clusters.total_annual_cost < ( - 0.7 * float(expensive_plan.candidate_clusters.total_annual_cost) + 0.7 * float(expensive_plan.candidate_clusters.total_annual_cost) ) # The reduced durability and consistency requirement let's us # use less compute @@ -279,25 +280,70 @@ def test_reduced_durability(): # Due to high writes both should have high heap write buffering for plan in (expensive_plan, cheap_plan): assert ( - plan.candidate_clusters.zonal[0].cluster_params[ - "cassandra.heap.write.percent" - ] - == 0.5 + plan.candidate_clusters.zonal[0].cluster_params[ + "cassandra.heap.write.percent" + ] + == 0.5 ) assert ( - plan.candidate_clusters.zonal[0].cluster_params[ - "cassandra.heap.table.percent" - ] - == 0.2 + plan.candidate_clusters.zonal[0].cluster_params[ + "cassandra.heap.table.percent" + ] + == 0.2 ) assert ( - plan.candidate_clusters.zonal[0].cluster_params[ - "cassandra.compaction.min_threshold" - ] - == 8 + plan.candidate_clusters.zonal[0].cluster_params[ + "cassandra.compaction.min_threshold" + ] + == 8 ) assert ( - cheap_plan.candidate_clusters.zonal[0].cluster_params["cassandra.keyspace.rf"] - == 2 + cheap_plan.candidate_clusters.zonal[0].cluster_params["cassandra.keyspace.rf"] + == 2 ) + + +def test_plan_certain(): + """ + Use cpu utilization to determine instance types directly as supposed to extrapolating it from the Data Shape + """ + worn_desire = CapacityDesires( + service_tier=1, + current_instance_type="i4i.8xlarge", + query_pattern=QueryPattern( + access_pattern=AccessPattern( + AccessPattern.latency + ), + estimated_read_per_second=Interval( + low=234248, mid=351854, high=485906, confidence=0.98 + ), + estimated_write_per_second=Interval( + low=19841, mid=31198, high=37307, confidence=0.98 + ), + ), + # We think we're going to have around 200 TiB of data + data_shape=DataShape( + estimated_state_size_gib=Interval( + low=2006.083, mid=2252.5, high=2480.41, confidence=0.98 + ), + estimated_compression_ratio=Interval( + low=1, mid=1, high=1, confidence=1 + ), + ), + ) + cap_plan = planner.plan_certain( + model_name="org.netflix.cassandra", + region="us-east-1", + num_results=3, + num_regions=4, + desires=worn_desire, + extra_model_arguments={ + "required_cluster_size": 24, + "max_cpu_utilization": 14.194801291058118, + }, + ) + + lr_clusters = cap_plan[0].candidate_clusters.zonal[0] + assert lr_clusters.count == 24 + assert lr_clusters.instance.cpu == 8 diff --git a/tests/netflix/test_cassandra_uncertain.py b/tests/netflix/test_cassandra_uncertain.py index 2f1af42..a3392b6 100644 --- a/tests/netflix/test_cassandra_uncertain.py +++ b/tests/netflix/test_cassandra_uncertain.py @@ -1,11 +1,9 @@ -import json from service_capacity_modeling.capacity_planner import planner from service_capacity_modeling.interface import CapacityDesires from service_capacity_modeling.interface import DataShape from service_capacity_modeling.interface import Interval from service_capacity_modeling.interface import QueryPattern -from service_capacity_modeling.interface import AccessPattern uncertain_mid = CapacityDesires( service_tier=1, @@ -203,48 +201,3 @@ def test_very_small_has_disk(): assert sum(dr.size_gib for dr in lr_cluster.attached_drives) > 10 else: assert lr_cluster.instance.drive.size_gib > 10 - - -def test_plan_certain(): - """ - Use cpu utilization to determine instance types directly as supposed to extrapolating it from the Data Shape - """ - worn_desire = CapacityDesires( - service_tier=1, - query_pattern=QueryPattern( - access_pattern=AccessPattern( - AccessPattern.latency - ), - estimated_read_per_second=Interval( - low=234248, mid=351854, high=485906, confidence=0.98 - ), - estimated_write_per_second=Interval( - low=19841, mid=31198, high=37307, confidence=0.98 - ), - ), - # We think we're going to have around 200 TiB of data - data_shape=DataShape( - estimated_state_size_gib=Interval( - low=2006.083, mid=2252.5, high=2480.41, confidence=0.98 - ), - estimated_compression_ratio=Interval( - low=1, mid=1, high=1, confidence=1 - ), - ), - ) - cap_plan = planner.plan_certain( - model_name="org.netflix.cassandra", - region="us-east-1", - num_results=3, - num_regions=4, - desires=worn_desire, - extra_model_arguments={ - "required_cluster_size": 24, - "current_instance_name": "i4i.8xlarge", - "max_cpu_utilization": 14.194801291058118, - }, - ) - - lr_clusters = cap_plan[0].candidate_clusters.zonal[0] - assert lr_clusters.count == 24 - assert lr_clusters.instance.cpu < 32 \ No newline at end of file