From 59a1fb41ada953de8cdda37910c715fd3778a481 Mon Sep 17 00:00:00 2001 From: ramsrivatsak Date: Fri, 13 Oct 2023 09:45:13 -0700 Subject: [PATCH 1/2] supporting multiple regions or zones as inputs to current instance types (+5 squashed commits) Squashed commits: [aa15bd9] joey changes on new types [592def6] consolidate current instance type (str), count (interval), cpu% (interval) into a new model object [82dd4c5] required cluster type parameter is now per zone [0ce2064] Moved instance type definition inside interface (joey comments addressed) [352a776] Add cpu utilization as a parameter in extra model arguments; If you have cpu usage already use it directly --- service_capacity_modeling/capacity_planner.py | 8 ++ service_capacity_modeling/interface.py | 31 ++++- .../models/org/netflix/cassandra.py | 115 ++++++++++-------- tests/netflix/test_cassandra.py | 107 ++++++++++++---- tests/netflix/test_cassandra_uncertain.py | 20 +-- 5 files changed, 187 insertions(+), 94 deletions(-) diff --git a/service_capacity_modeling/capacity_planner.py b/service_capacity_modeling/capacity_planner.py index ce64ed3..39026e6 100644 --- a/service_capacity_modeling/capacity_planner.py +++ b/service_capacity_modeling/capacity_planner.py @@ -575,6 +575,14 @@ def generate_scenarios( if len(allowed_drives) == 0: allowed_drives.update(hardware.drives.keys()) + # Get current instance object if exists + if desires.current_clusters: + for zonal_cluster_capacity in desires.current_clusters.zonal: + zonal_cluster_capacity.cluster_instance = hardware.instances[zonal_cluster_capacity.cluster_instance_name] + for regional_cluster_capacity in desires.current_clusters.regional: + regional_cluster_capacity.cluster_instance = hardware.instances[regional_cluster_capacity.cluster_instance_name] + + plans = [] if model.run_hardware_simulation(): for instance in hardware.instances.values(): if not _allow_instance( diff --git a/service_capacity_modeling/interface.py b/service_capacity_modeling/interface.py index 81c56bb..2fac80f 100644 --- a/service_capacity_modeling/interface.py +++ b/service_capacity_modeling/interface.py @@ -18,7 +18,6 @@ from pydantic import BaseModel from pydantic import Field - GIB_IN_BYTES = 1024 * 1024 * 1024 MIB_IN_BYTES = 1024 * 1024 MEGABIT_IN_BYTES = (1000 * 1000) / 8 @@ -377,7 +376,7 @@ def annual_cost_gib(self, data_gib: float = 0): break if transfer_cost[0] > 0: annual_cost += ( - min(_annual_data, transfer_cost[0]) * transfer_cost[1] + min(_annual_data, transfer_cost[0]) * transfer_cost[1] ) _annual_data -= transfer_cost[0] else: @@ -621,6 +620,29 @@ class DataShape(ExcludeUnsetModel): ) +class CurrentClusterCapacity(ExcludeUnsetModel): + cluster_instance_name: str + cluster_instance: Optional[Instance] = None + cluster_instance_count: Interval + cpu_utilization: Interval + + +# For services that are provisioned by zone (e.g. Cassandra, EVCache) +class CurrentZoneClusterCapacity(CurrentClusterCapacity): + pass + + +# For services that are provisioned regionally (e.g. Java services, RDS, etc ..) +class CurrentRegionClusterCapacity(CurrentClusterCapacity): + pass + + +class CurrentClusters(ExcludeUnsetModel): + zonal: Sequence[CurrentZoneClusterCapacity] = [] + regional: Sequence[CurrentRegionClusterCapacity] = [] + services: Sequence[ServiceCapacity] = [] + + class CapacityDesires(ExcludeUnsetModel): # How critical is this cluster, impacts how much "extra" we provision # 0 = Critical to the product (Product does not function) @@ -635,6 +657,9 @@ class CapacityDesires(ExcludeUnsetModel): # What will the state look like data_shape: DataShape = DataShape() + # What is the current microarchitectural/system configuration of the system + current_clusters: Optional[CurrentClusters] = None + # When users are providing latency estimates, what is the typical # instance core frequency we are comparing to. Databases use i3s a lot # hence this default @@ -733,7 +758,7 @@ class Requirements(ExcludeUnsetModel): # pylint: disable=unused-argument @staticmethod def regret( - name: str, optimal_plan: "CapacityPlan", proposed_plan: "CapacityPlan" + name: str, optimal_plan: "CapacityPlan", proposed_plan: "CapacityPlan" ) -> float: return 0.0 diff --git a/service_capacity_modeling/models/org/netflix/cassandra.py b/service_capacity_modeling/models/org/netflix/cassandra.py index 28c584f..a5dd303 100644 --- a/service_capacity_modeling/models/org/netflix/cassandra.py +++ b/service_capacity_modeling/models/org/netflix/cassandra.py @@ -40,44 +40,51 @@ def _write_buffer_gib_zone( - desires: CapacityDesires, zones_per_region: int, flushes_before_compaction: int = 4 + desires: CapacityDesires, zones_per_region: int, flushes_before_compaction: int = 4 ) -> float: # Cassandra has to buffer writes before flushing to disk, and assuming # we will compact every 4 flushes and we want no more than 2 redundant # compactions in an hour, we want <= 4**2 = 16 flushes per hour # or a flush of data every 3600 / 16 = 225 seconds write_bytes_per_second = ( - desires.query_pattern.estimated_write_per_second.mid - * desires.query_pattern.estimated_mean_write_size_bytes.mid + desires.query_pattern.estimated_write_per_second.mid + * desires.query_pattern.estimated_mean_write_size_bytes.mid ) compactions_per_hour = 2 hour_in_seconds = 60 * 60 write_buffer_gib = ( - (write_bytes_per_second * hour_in_seconds) - / (flushes_before_compaction**compactions_per_hour) - ) / (1 << 30) + (write_bytes_per_second * hour_in_seconds) + / (flushes_before_compaction ** compactions_per_hour) + ) / (1 << 30) return float(write_buffer_gib) / zones_per_region def _estimate_cassandra_requirement( - instance: Instance, - desires: CapacityDesires, - working_set: float, - reads_per_second: float, - max_rps_to_disk: int, - zones_per_region: int = 3, - copies_per_region: int = 3, + instance: Instance, + desires: CapacityDesires, + working_set: float, + reads_per_second: float, + max_rps_to_disk: int, + required_cluster_size: Optional[int] = None, + zones_per_region: int = 3, + copies_per_region: int = 3, ) -> CapacityRequirement: """Estimate the capacity required for one zone given a regional desire The input desires should be the **regional** desire, and this function will return the zonal capacity requirement """ - # Keep half of the cores free for background work (compaction, backup, repair) - needed_cores = sqrt_staffed_cores(desires) * 2 + current_capacity = None if desires.current_clusters is None else desires.current_clusters.zonal[0] if len(desires.current_clusters.zonal) else desires.current_clusters.regional[0] + # Keep half of the cores free for background work (compaction, backup, repair). Currently, zones and regions are + # configured in a homogeneous manner. Hence, we just take any one of the current cluster configuration + if current_capacity and current_capacity.cluster_instance and required_cluster_size is not None: + needed_cores = (current_capacity.cluster_instance.cpu * required_cluster_size * + zones_per_region) * (current_capacity.cpu_utilization.high / 20) + else: + needed_cores = sqrt_staffed_cores(desires) * 2 # Keep half of the bandwidth available for backup needed_network_mbps = simple_network_mbps(desires) * 2 @@ -168,22 +175,21 @@ def _upsert_params(cluster, params): # pylint: disable=too-many-return-statements # flake8: noqa: C901 def _estimate_cassandra_cluster_zonal( - instance: Instance, - drive: Drive, - context: RegionContext, - desires: CapacityDesires, - zones_per_region: int = 3, - copies_per_region: int = 3, - require_local_disks: bool = False, - require_attached_disks: bool = False, - required_cluster_size: Optional[int] = None, - max_rps_to_disk: int = 500, - max_local_disk_gib: int = 2048, - max_regional_size: int = 96, - max_write_buffer_percent: float = 0.25, - max_table_buffer_percent: float = 0.11, + instance: Instance, + drive: Drive, + context: RegionContext, + desires: CapacityDesires, + zones_per_region: int = 3, + copies_per_region: int = 3, + require_local_disks: bool = False, + require_attached_disks: bool = False, + required_cluster_size: Optional[int] = None, + max_rps_to_disk: int = 500, + max_local_disk_gib: int = 2048, + max_regional_size: int = 96, + max_write_buffer_percent: float = 0.25, + max_table_buffer_percent: float = 0.11, ) -> Optional[CapacityPlan]: - # Netflix Cassandra doesn't like to deploy on really small instances if instance.cpu < 2 or instance.ram_gib < 14: return None @@ -202,7 +208,7 @@ def _estimate_cassandra_cluster_zonal( rps = desires.query_pattern.estimated_read_per_second.mid // zones_per_region write_per_sec = ( - desires.query_pattern.estimated_write_per_second.mid // zones_per_region + desires.query_pattern.estimated_write_per_second.mid // zones_per_region ) write_bytes_per_sec = round( write_per_sec * desires.query_pattern.estimated_mean_write_size_bytes.mid @@ -238,6 +244,7 @@ def _estimate_cassandra_cluster_zonal( working_set=working_set, reads_per_second=rps, max_rps_to_disk=max_rps_to_disk, + required_cluster_size=required_cluster_size, zones_per_region=zones_per_region, copies_per_region=copies_per_region, ) @@ -250,8 +257,8 @@ def _estimate_cassandra_cluster_zonal( min_count = 2 base_mem = ( - desires.data_shape.reserved_instance_app_mem_gib - + desires.data_shape.reserved_instance_system_mem_gib + desires.data_shape.reserved_instance_app_mem_gib + + desires.data_shape.reserved_instance_system_mem_gib ) heap_fn = _cass_heap_for_write_buffer( @@ -334,7 +341,7 @@ def _estimate_cassandra_cluster_zonal( annual_cost=blob.annual_cost_gib(requirement.disk_gib.mid), service_params={ "nines_required": ( - 1 - 1.0 / desires.data_shape.durability_slo_order.mid + 1 - 1.0 / desires.data_shape.durability_slo_order.mid ) }, ) @@ -377,15 +384,15 @@ def _cass_io_per_read(node_size_gib, sstable_size_mb=160): def _cass_heap_for_write_buffer( - instance: Instance, - write_buffer_gib: float, - max_zonal_size: int, - buffer_percent: float, + instance: Instance, + write_buffer_gib: float, + max_zonal_size: int, + buffer_percent: float, ) -> Callable[[float], float]: # If there is no way we can get enough heap with the max zonal size, try # letting max heap grow to 31 GiB per node to get more write buffer if write_buffer_gib > ( - max_zonal_size * _cass_heap(instance.ram_gib) * buffer_percent + max_zonal_size * _cass_heap(instance.ram_gib) * buffer_percent ): return lambda x: _cass_heap(x, max_heap_gib=30) else: @@ -410,9 +417,9 @@ def _target_rf(desires: CapacityDesires, user_copies: Optional[int]) -> int: # run with RF=2 consistency = desires.query_pattern.access_consistency.same_region if ( - desires.data_shape.durability_slo_order.mid < 1000 - and consistency is not None - and consistency.target_consistency != AccessConsistency.read_your_writes + desires.data_shape.durability_slo_order.mid < 1000 + and consistency is not None + and consistency.target_consistency != AccessConsistency.read_your_writes ): return 2 return 3 @@ -422,7 +429,7 @@ class NflxCassandraArguments(BaseModel): copies_per_region: int = Field( default=3, description="How many copies of the data will exist e.g. RF=3. If unsupplied" - " this will be deduced from durability and consistency desires", + " this will be deduced from durability and consistency desires", ) require_local_disks: bool = Field( default=False, @@ -451,25 +458,25 @@ class NflxCassandraArguments(BaseModel): max_write_buffer_percent: float = Field( default=0.25, description="The amount of heap memory that can be used to buffer writes. " - "Note that if there are more than 100k writes this will " - "automatically adjust to 0.5", + "Note that if there are more than 100k writes this will " + "automatically adjust to 0.5", ) max_table_buffer_percent: float = Field( default=0.11, description="How much of heap memory can be used for a single table. " - "Note that if there are more than 100k writes this will " - "automatically adjust to 0.2", + "Note that if there are more than 100k writes this will " + "automatically adjust to 0.2", ) class NflxCassandraCapacityModel(CapacityModel): @staticmethod def capacity_plan( - instance: Instance, - drive: Drive, - context: RegionContext, - desires: CapacityDesires, - extra_model_arguments: Dict[str, Any], + instance: Instance, + drive: Drive, + context: RegionContext, + desires: CapacityDesires, + extra_model_arguments: Dict[str, Any], ) -> Optional[CapacityPlan]: # Use durabiliy and consistency to compute RF. copies_per_region = _target_rf( @@ -496,8 +503,8 @@ def capacity_plan( # Adjust heap defaults for high write clusters if ( - desires.query_pattern.estimated_write_per_second.mid >= 100_000 - and desires.data_shape.estimated_state_size_gib.mid >= 100 + desires.query_pattern.estimated_write_per_second.mid >= 100_000 + and desires.data_shape.estimated_state_size_gib.mid >= 100 ): max_write_buffer_percent = max(0.5, max_write_buffer_percent) max_table_buffer_percent = max(0.2, max_table_buffer_percent) diff --git a/tests/netflix/test_cassandra.py b/tests/netflix/test_cassandra.py index d85e618..3beac86 100644 --- a/tests/netflix/test_cassandra.py +++ b/tests/netflix/test_cassandra.py @@ -1,5 +1,5 @@ from service_capacity_modeling.capacity_planner import planner -from service_capacity_modeling.interface import AccessConsistency +from service_capacity_modeling.interface import AccessConsistency, CurrentClusterCapacity, CurrentClusters from service_capacity_modeling.interface import CapacityDesires from service_capacity_modeling.interface import certain_float from service_capacity_modeling.interface import certain_int @@ -8,7 +8,8 @@ from service_capacity_modeling.interface import FixedInterval from service_capacity_modeling.interface import GlobalConsistency from service_capacity_modeling.interface import QueryPattern - +from service_capacity_modeling.interface import Interval +from service_capacity_modeling.interface import AccessPattern small_but_high_qps = CapacityDesires( service_tier=1, @@ -66,9 +67,9 @@ def test_capacity_small_fast(): # with lots of ebs_gp2 to handle the read IOs if small_result.attached_drives: assert ( - small_result.count - * sum(d.size_gib for d in small_result.attached_drives) - > 1000 + small_result.count + * sum(d.size_gib for d in small_result.attached_drives) + > 1000 ) assert small_result.cluster_params["cassandra.heap.write.percent"] == 0.25 @@ -156,12 +157,12 @@ def test_capacity_high_writes(): assert 30 <= num_cpus <= 128 if high_writes_result.attached_drives: assert ( - high_writes_result.count * high_writes_result.attached_drives[0].size_gib - >= 400 + high_writes_result.count * high_writes_result.attached_drives[0].size_gib + >= 400 ) elif high_writes_result.instance.drive is not None: assert ( - high_writes_result.count * high_writes_result.instance.drive.size_gib >= 400 + high_writes_result.count * high_writes_result.instance.drive.size_gib >= 400 ) else: raise AssertionError("Should have drives") @@ -194,9 +195,9 @@ def test_high_write_throughput(): assert high_writes_result.attached_drives[0].size_gib >= 400 assert ( - 300_000 - > high_writes_result.count * high_writes_result.attached_drives[0].size_gib - >= 100_000 + 300_000 + > high_writes_result.count * high_writes_result.attached_drives[0].size_gib + >= 100_000 ) cluster_cost = cap_plan.candidate_clusters.annual_costs["cassandra.zonal-clusters"] @@ -225,7 +226,7 @@ def test_capacity_large_footprint(): assert large_footprint_result.cluster_params["cassandra.heap.write.percent"] == 0.25 assert large_footprint_result.cluster_params["cassandra.heap.table.percent"] == 0.11 assert ( - large_footprint_result.cluster_params["cassandra.compaction.min_threshold"] == 4 + large_footprint_result.cluster_params["cassandra.compaction.min_threshold"] == 4 ) @@ -269,7 +270,7 @@ def test_reduced_durability(): )[0] assert cheap_plan.candidate_clusters.total_annual_cost < ( - 0.7 * float(expensive_plan.candidate_clusters.total_annual_cost) + 0.7 * float(expensive_plan.candidate_clusters.total_annual_cost) ) # The reduced durability and consistency requirement let's us # use less compute @@ -279,25 +280,77 @@ def test_reduced_durability(): # Due to high writes both should have high heap write buffering for plan in (expensive_plan, cheap_plan): assert ( - plan.candidate_clusters.zonal[0].cluster_params[ - "cassandra.heap.write.percent" - ] - == 0.5 + plan.candidate_clusters.zonal[0].cluster_params[ + "cassandra.heap.write.percent" + ] + == 0.5 ) assert ( - plan.candidate_clusters.zonal[0].cluster_params[ - "cassandra.heap.table.percent" - ] - == 0.2 + plan.candidate_clusters.zonal[0].cluster_params[ + "cassandra.heap.table.percent" + ] + == 0.2 ) assert ( - plan.candidate_clusters.zonal[0].cluster_params[ - "cassandra.compaction.min_threshold" - ] - == 8 + plan.candidate_clusters.zonal[0].cluster_params[ + "cassandra.compaction.min_threshold" + ] + == 8 ) assert ( - cheap_plan.candidate_clusters.zonal[0].cluster_params["cassandra.keyspace.rf"] - == 2 + cheap_plan.candidate_clusters.zonal[0].cluster_params["cassandra.keyspace.rf"] + == 2 + ) + + +def test_plan_certain(): + """ + Use cpu utilization to determine instance types directly as supposed to extrapolating it from the Data Shape + """ + cluster_capacity = CurrentClusterCapacity( + cluster_instance_name="i4i.8xlarge", + cluster_instance_count=Interval(low=8, mid=8, high=8, confidence=1), + cpu_utilization=Interval( + low=10.12, mid=13.2, high=14.194801291058118, confidence=1 + ), + ) + + worn_desire = CapacityDesires( + service_tier=1, + current_clusters=CurrentClusters(zonal=[cluster_capacity]), + query_pattern=QueryPattern( + access_pattern=AccessPattern( + AccessPattern.latency + ), + estimated_read_per_second=Interval( + low=234248, mid=351854, high=485906, confidence=0.98 + ), + estimated_write_per_second=Interval( + low=19841, mid=31198, high=37307, confidence=0.98 + ), + ), + # We think we're going to have around 200 TiB of data + data_shape=DataShape( + estimated_state_size_gib=Interval( + low=2006.083, mid=2252.5, high=2480.41, confidence=0.98 + ), + estimated_compression_ratio=Interval( + low=1, mid=1, high=1, confidence=1 + ), + ), + ) + cap_plan = planner.plan_certain( + model_name="org.netflix.cassandra", + region="us-east-1", + num_results=3, + num_regions=4, + desires=worn_desire, + extra_model_arguments={ + "required_cluster_size": 8, + }, ) + + lr_clusters = cap_plan[0].candidate_clusters.zonal[0] + assert lr_clusters.count == 8 + assert lr_clusters.instance.cpu == 16 diff --git a/tests/netflix/test_cassandra_uncertain.py b/tests/netflix/test_cassandra_uncertain.py index eedf91c..a3392b6 100644 --- a/tests/netflix/test_cassandra_uncertain.py +++ b/tests/netflix/test_cassandra_uncertain.py @@ -1,10 +1,10 @@ + from service_capacity_modeling.capacity_planner import planner from service_capacity_modeling.interface import CapacityDesires from service_capacity_modeling.interface import DataShape from service_capacity_modeling.interface import Interval from service_capacity_modeling.interface import QueryPattern - uncertain_mid = CapacityDesires( service_tier=1, query_pattern=QueryPattern( @@ -42,14 +42,14 @@ def test_uncertain_planning(): lr_cluster = lr.candidate_clusters.zonal[0] assert 8 <= lr_cluster.count * lr_cluster.instance.cpu <= 64 assert ( - 5_000 <= lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 45_000 + 5_000 <= lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 45_000 ) sr = mid_plan.least_regret[1] sr_cluster = sr.candidate_clusters.zonal[0] assert 8 <= sr_cluster.count * sr_cluster.instance.cpu <= 64 assert ( - 5_000 <= sr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 45_000 + 5_000 <= sr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 45_000 ) tiny_plan = planner.plan( @@ -61,7 +61,7 @@ def test_uncertain_planning(): lr_cluster = lr.candidate_clusters.zonal[0] assert 2 <= lr_cluster.count * lr_cluster.instance.cpu < 16 assert ( - 1_000 < lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 6_000 + 1_000 < lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 6_000 ) @@ -155,9 +155,9 @@ def test_worn_dataset(): lr_cluster = lr.candidate_clusters.zonal[0] assert 128 <= lr_cluster.count * lr_cluster.instance.cpu <= 512 assert ( - 250_000 - <= lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] - < 1_000_000 + 250_000 + <= lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] + < 1_000_000 ) assert lr_cluster.instance.name.startswith( "m5." @@ -193,9 +193,9 @@ def test_very_small_has_disk(): lr_cluster = lr.candidate_clusters.zonal[0] assert 2 <= lr_cluster.count * lr_cluster.instance.cpu < 16 assert ( - 1_000 - < lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] - < 6_000 + 1_000 + < lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] + < 6_000 ) if lr_cluster.instance.drive is None: assert sum(dr.size_gib for dr in lr_cluster.attached_drives) > 10 From de475e0186250ca65bfe27889352052854706743 Mon Sep 17 00:00:00 2001 From: Ram Srivatsa Kannan Date: Wed, 25 Oct 2023 16:54:01 -0700 Subject: [PATCH 2/2] fixing linting/style errors --- service_capacity_modeling/capacity_planner.py | 25 +++- service_capacity_modeling/interface.py | 4 +- .../models/org/netflix/cassandra.py | 130 ++++++++++-------- tests/netflix/test_cassandra.py | 69 +++++----- tests/netflix/test_cassandra_uncertain.py | 19 ++- 5 files changed, 135 insertions(+), 112 deletions(-) diff --git a/service_capacity_modeling/capacity_planner.py b/service_capacity_modeling/capacity_planner.py index 39026e6..a5318ae 100644 --- a/service_capacity_modeling/capacity_planner.py +++ b/service_capacity_modeling/capacity_planner.py @@ -24,6 +24,7 @@ from service_capacity_modeling.interface import certain_float from service_capacity_modeling.interface import DataShape from service_capacity_modeling.interface import Drive +from service_capacity_modeling.interface import Hardware from service_capacity_modeling.interface import Instance from service_capacity_modeling.interface import Interval from service_capacity_modeling.interface import interval @@ -182,6 +183,21 @@ def model_desires_percentiles( return results, d +def _set_instance_objects( + desires: CapacityDesires, + hardware: Hardware, +): + if desires.current_clusters: + for zonal_cluster_capacity in desires.current_clusters.zonal: + zonal_cluster_capacity.cluster_instance = hardware.instances[ + zonal_cluster_capacity.cluster_instance_name + ] + for regional_cluster_capacity in desires.current_clusters.regional: + regional_cluster_capacity.cluster_instance = hardware.instances[ + regional_cluster_capacity.cluster_instance_name + ] + + def _allow_instance( instance: Instance, allowed_names: Sequence[str], @@ -575,14 +591,9 @@ def generate_scenarios( if len(allowed_drives) == 0: allowed_drives.update(hardware.drives.keys()) - # Get current instance object if exists - if desires.current_clusters: - for zonal_cluster_capacity in desires.current_clusters.zonal: - zonal_cluster_capacity.cluster_instance = hardware.instances[zonal_cluster_capacity.cluster_instance_name] - for regional_cluster_capacity in desires.current_clusters.regional: - regional_cluster_capacity.cluster_instance = hardware.instances[regional_cluster_capacity.cluster_instance_name] + # Set current instance object if exists + _set_instance_objects(desires, hardware) - plans = [] if model.run_hardware_simulation(): for instance in hardware.instances.values(): if not _allow_instance( diff --git a/service_capacity_modeling/interface.py b/service_capacity_modeling/interface.py index 2fac80f..fc71d67 100644 --- a/service_capacity_modeling/interface.py +++ b/service_capacity_modeling/interface.py @@ -376,7 +376,7 @@ def annual_cost_gib(self, data_gib: float = 0): break if transfer_cost[0] > 0: annual_cost += ( - min(_annual_data, transfer_cost[0]) * transfer_cost[1] + min(_annual_data, transfer_cost[0]) * transfer_cost[1] ) _annual_data -= transfer_cost[0] else: @@ -758,7 +758,7 @@ class Requirements(ExcludeUnsetModel): # pylint: disable=unused-argument @staticmethod def regret( - name: str, optimal_plan: "CapacityPlan", proposed_plan: "CapacityPlan" + name: str, optimal_plan: "CapacityPlan", proposed_plan: "CapacityPlan" ) -> float: return 0.0 diff --git a/service_capacity_modeling/models/org/netflix/cassandra.py b/service_capacity_modeling/models/org/netflix/cassandra.py index a5dd303..a778293 100644 --- a/service_capacity_modeling/models/org/netflix/cassandra.py +++ b/service_capacity_modeling/models/org/netflix/cassandra.py @@ -40,49 +40,63 @@ def _write_buffer_gib_zone( - desires: CapacityDesires, zones_per_region: int, flushes_before_compaction: int = 4 + desires: CapacityDesires, zones_per_region: int, flushes_before_compaction: int = 4 ) -> float: # Cassandra has to buffer writes before flushing to disk, and assuming # we will compact every 4 flushes and we want no more than 2 redundant # compactions in an hour, we want <= 4**2 = 16 flushes per hour # or a flush of data every 3600 / 16 = 225 seconds write_bytes_per_second = ( - desires.query_pattern.estimated_write_per_second.mid - * desires.query_pattern.estimated_mean_write_size_bytes.mid + desires.query_pattern.estimated_write_per_second.mid + * desires.query_pattern.estimated_mean_write_size_bytes.mid ) compactions_per_hour = 2 hour_in_seconds = 60 * 60 write_buffer_gib = ( - (write_bytes_per_second * hour_in_seconds) - / (flushes_before_compaction ** compactions_per_hour) - ) / (1 << 30) + (write_bytes_per_second * hour_in_seconds) + / (flushes_before_compaction**compactions_per_hour) + ) / (1 << 30) return float(write_buffer_gib) / zones_per_region def _estimate_cassandra_requirement( - instance: Instance, - desires: CapacityDesires, - working_set: float, - reads_per_second: float, - max_rps_to_disk: int, - required_cluster_size: Optional[int] = None, - zones_per_region: int = 3, - copies_per_region: int = 3, + instance: Instance, + desires: CapacityDesires, + working_set: float, + reads_per_second: float, + max_rps_to_disk: int, + required_cluster_size: Optional[int] = None, + zones_per_region: int = 3, + copies_per_region: int = 3, ) -> CapacityRequirement: """Estimate the capacity required for one zone given a regional desire The input desires should be the **regional** desire, and this function will return the zonal capacity requirement """ - current_capacity = None if desires.current_clusters is None else desires.current_clusters.zonal[0] if len(desires.current_clusters.zonal) else desires.current_clusters.regional[0] - # Keep half of the cores free for background work (compaction, backup, repair). Currently, zones and regions are - # configured in a homogeneous manner. Hence, we just take any one of the current cluster configuration - if current_capacity and current_capacity.cluster_instance and required_cluster_size is not None: - needed_cores = (current_capacity.cluster_instance.cpu * required_cluster_size * - zones_per_region) * (current_capacity.cpu_utilization.high / 20) + current_capacity = ( + None + if desires.current_clusters is None + else desires.current_clusters.zonal[0] + if len(desires.current_clusters.zonal) + else desires.current_clusters.regional[0] + ) + # Keep half of the cores free for background work (compaction, backup, repair). + # Currently, zones and regions are configured in a homogeneous manner. Hence, + # we just take any one of the current cluster configuration + if ( + current_capacity + and current_capacity.cluster_instance + and required_cluster_size is not None + ): + needed_cores = ( + current_capacity.cluster_instance.cpu + * required_cluster_size + * zones_per_region + ) * (current_capacity.cpu_utilization.high / 20) else: needed_cores = sqrt_staffed_cores(desires) * 2 # Keep half of the bandwidth available for backup @@ -175,20 +189,20 @@ def _upsert_params(cluster, params): # pylint: disable=too-many-return-statements # flake8: noqa: C901 def _estimate_cassandra_cluster_zonal( - instance: Instance, - drive: Drive, - context: RegionContext, - desires: CapacityDesires, - zones_per_region: int = 3, - copies_per_region: int = 3, - require_local_disks: bool = False, - require_attached_disks: bool = False, - required_cluster_size: Optional[int] = None, - max_rps_to_disk: int = 500, - max_local_disk_gib: int = 2048, - max_regional_size: int = 96, - max_write_buffer_percent: float = 0.25, - max_table_buffer_percent: float = 0.11, + instance: Instance, + drive: Drive, + context: RegionContext, + desires: CapacityDesires, + zones_per_region: int = 3, + copies_per_region: int = 3, + require_local_disks: bool = False, + require_attached_disks: bool = False, + required_cluster_size: Optional[int] = None, + max_rps_to_disk: int = 500, + max_local_disk_gib: int = 2048, + max_regional_size: int = 96, + max_write_buffer_percent: float = 0.25, + max_table_buffer_percent: float = 0.11, ) -> Optional[CapacityPlan]: # Netflix Cassandra doesn't like to deploy on really small instances if instance.cpu < 2 or instance.ram_gib < 14: @@ -208,7 +222,7 @@ def _estimate_cassandra_cluster_zonal( rps = desires.query_pattern.estimated_read_per_second.mid // zones_per_region write_per_sec = ( - desires.query_pattern.estimated_write_per_second.mid // zones_per_region + desires.query_pattern.estimated_write_per_second.mid // zones_per_region ) write_bytes_per_sec = round( write_per_sec * desires.query_pattern.estimated_mean_write_size_bytes.mid @@ -257,8 +271,8 @@ def _estimate_cassandra_cluster_zonal( min_count = 2 base_mem = ( - desires.data_shape.reserved_instance_app_mem_gib - + desires.data_shape.reserved_instance_system_mem_gib + desires.data_shape.reserved_instance_app_mem_gib + + desires.data_shape.reserved_instance_system_mem_gib ) heap_fn = _cass_heap_for_write_buffer( @@ -341,7 +355,7 @@ def _estimate_cassandra_cluster_zonal( annual_cost=blob.annual_cost_gib(requirement.disk_gib.mid), service_params={ "nines_required": ( - 1 - 1.0 / desires.data_shape.durability_slo_order.mid + 1 - 1.0 / desires.data_shape.durability_slo_order.mid ) }, ) @@ -384,15 +398,15 @@ def _cass_io_per_read(node_size_gib, sstable_size_mb=160): def _cass_heap_for_write_buffer( - instance: Instance, - write_buffer_gib: float, - max_zonal_size: int, - buffer_percent: float, + instance: Instance, + write_buffer_gib: float, + max_zonal_size: int, + buffer_percent: float, ) -> Callable[[float], float]: # If there is no way we can get enough heap with the max zonal size, try # letting max heap grow to 31 GiB per node to get more write buffer if write_buffer_gib > ( - max_zonal_size * _cass_heap(instance.ram_gib) * buffer_percent + max_zonal_size * _cass_heap(instance.ram_gib) * buffer_percent ): return lambda x: _cass_heap(x, max_heap_gib=30) else: @@ -417,9 +431,9 @@ def _target_rf(desires: CapacityDesires, user_copies: Optional[int]) -> int: # run with RF=2 consistency = desires.query_pattern.access_consistency.same_region if ( - desires.data_shape.durability_slo_order.mid < 1000 - and consistency is not None - and consistency.target_consistency != AccessConsistency.read_your_writes + desires.data_shape.durability_slo_order.mid < 1000 + and consistency is not None + and consistency.target_consistency != AccessConsistency.read_your_writes ): return 2 return 3 @@ -429,7 +443,7 @@ class NflxCassandraArguments(BaseModel): copies_per_region: int = Field( default=3, description="How many copies of the data will exist e.g. RF=3. If unsupplied" - " this will be deduced from durability and consistency desires", + " this will be deduced from durability and consistency desires", ) require_local_disks: bool = Field( default=False, @@ -458,25 +472,25 @@ class NflxCassandraArguments(BaseModel): max_write_buffer_percent: float = Field( default=0.25, description="The amount of heap memory that can be used to buffer writes. " - "Note that if there are more than 100k writes this will " - "automatically adjust to 0.5", + "Note that if there are more than 100k writes this will " + "automatically adjust to 0.5", ) max_table_buffer_percent: float = Field( default=0.11, description="How much of heap memory can be used for a single table. " - "Note that if there are more than 100k writes this will " - "automatically adjust to 0.2", + "Note that if there are more than 100k writes this will " + "automatically adjust to 0.2", ) class NflxCassandraCapacityModel(CapacityModel): @staticmethod def capacity_plan( - instance: Instance, - drive: Drive, - context: RegionContext, - desires: CapacityDesires, - extra_model_arguments: Dict[str, Any], + instance: Instance, + drive: Drive, + context: RegionContext, + desires: CapacityDesires, + extra_model_arguments: Dict[str, Any], ) -> Optional[CapacityPlan]: # Use durabiliy and consistency to compute RF. copies_per_region = _target_rf( @@ -503,8 +517,8 @@ def capacity_plan( # Adjust heap defaults for high write clusters if ( - desires.query_pattern.estimated_write_per_second.mid >= 100_000 - and desires.data_shape.estimated_state_size_gib.mid >= 100 + desires.query_pattern.estimated_write_per_second.mid >= 100_000 + and desires.data_shape.estimated_state_size_gib.mid >= 100 ): max_write_buffer_percent = max(0.5, max_write_buffer_percent) max_table_buffer_percent = max(0.2, max_table_buffer_percent) diff --git a/tests/netflix/test_cassandra.py b/tests/netflix/test_cassandra.py index 3beac86..ba20089 100644 --- a/tests/netflix/test_cassandra.py +++ b/tests/netflix/test_cassandra.py @@ -1,15 +1,17 @@ from service_capacity_modeling.capacity_planner import planner -from service_capacity_modeling.interface import AccessConsistency, CurrentClusterCapacity, CurrentClusters +from service_capacity_modeling.interface import AccessConsistency +from service_capacity_modeling.interface import AccessPattern from service_capacity_modeling.interface import CapacityDesires from service_capacity_modeling.interface import certain_float from service_capacity_modeling.interface import certain_int from service_capacity_modeling.interface import Consistency +from service_capacity_modeling.interface import CurrentClusterCapacity +from service_capacity_modeling.interface import CurrentClusters from service_capacity_modeling.interface import DataShape from service_capacity_modeling.interface import FixedInterval from service_capacity_modeling.interface import GlobalConsistency -from service_capacity_modeling.interface import QueryPattern from service_capacity_modeling.interface import Interval -from service_capacity_modeling.interface import AccessPattern +from service_capacity_modeling.interface import QueryPattern small_but_high_qps = CapacityDesires( service_tier=1, @@ -67,9 +69,9 @@ def test_capacity_small_fast(): # with lots of ebs_gp2 to handle the read IOs if small_result.attached_drives: assert ( - small_result.count - * sum(d.size_gib for d in small_result.attached_drives) - > 1000 + small_result.count + * sum(d.size_gib for d in small_result.attached_drives) + > 1000 ) assert small_result.cluster_params["cassandra.heap.write.percent"] == 0.25 @@ -157,12 +159,12 @@ def test_capacity_high_writes(): assert 30 <= num_cpus <= 128 if high_writes_result.attached_drives: assert ( - high_writes_result.count * high_writes_result.attached_drives[0].size_gib - >= 400 + high_writes_result.count * high_writes_result.attached_drives[0].size_gib + >= 400 ) elif high_writes_result.instance.drive is not None: assert ( - high_writes_result.count * high_writes_result.instance.drive.size_gib >= 400 + high_writes_result.count * high_writes_result.instance.drive.size_gib >= 400 ) else: raise AssertionError("Should have drives") @@ -195,9 +197,9 @@ def test_high_write_throughput(): assert high_writes_result.attached_drives[0].size_gib >= 400 assert ( - 300_000 - > high_writes_result.count * high_writes_result.attached_drives[0].size_gib - >= 100_000 + 300_000 + > high_writes_result.count * high_writes_result.attached_drives[0].size_gib + >= 100_000 ) cluster_cost = cap_plan.candidate_clusters.annual_costs["cassandra.zonal-clusters"] @@ -226,7 +228,7 @@ def test_capacity_large_footprint(): assert large_footprint_result.cluster_params["cassandra.heap.write.percent"] == 0.25 assert large_footprint_result.cluster_params["cassandra.heap.table.percent"] == 0.11 assert ( - large_footprint_result.cluster_params["cassandra.compaction.min_threshold"] == 4 + large_footprint_result.cluster_params["cassandra.compaction.min_threshold"] == 4 ) @@ -270,7 +272,7 @@ def test_reduced_durability(): )[0] assert cheap_plan.candidate_clusters.total_annual_cost < ( - 0.7 * float(expensive_plan.candidate_clusters.total_annual_cost) + 0.7 * float(expensive_plan.candidate_clusters.total_annual_cost) ) # The reduced durability and consistency requirement let's us # use less compute @@ -280,33 +282,34 @@ def test_reduced_durability(): # Due to high writes both should have high heap write buffering for plan in (expensive_plan, cheap_plan): assert ( - plan.candidate_clusters.zonal[0].cluster_params[ - "cassandra.heap.write.percent" - ] - == 0.5 + plan.candidate_clusters.zonal[0].cluster_params[ + "cassandra.heap.write.percent" + ] + == 0.5 ) assert ( - plan.candidate_clusters.zonal[0].cluster_params[ - "cassandra.heap.table.percent" - ] - == 0.2 + plan.candidate_clusters.zonal[0].cluster_params[ + "cassandra.heap.table.percent" + ] + == 0.2 ) assert ( - plan.candidate_clusters.zonal[0].cluster_params[ - "cassandra.compaction.min_threshold" - ] - == 8 + plan.candidate_clusters.zonal[0].cluster_params[ + "cassandra.compaction.min_threshold" + ] + == 8 ) assert ( - cheap_plan.candidate_clusters.zonal[0].cluster_params["cassandra.keyspace.rf"] - == 2 + cheap_plan.candidate_clusters.zonal[0].cluster_params["cassandra.keyspace.rf"] + == 2 ) def test_plan_certain(): """ - Use cpu utilization to determine instance types directly as supposed to extrapolating it from the Data Shape + Use cpu utilization to determine instance types directly as supposed to + extrapolating it from the Data Shape """ cluster_capacity = CurrentClusterCapacity( cluster_instance_name="i4i.8xlarge", @@ -320,9 +323,7 @@ def test_plan_certain(): service_tier=1, current_clusters=CurrentClusters(zonal=[cluster_capacity]), query_pattern=QueryPattern( - access_pattern=AccessPattern( - AccessPattern.latency - ), + access_pattern=AccessPattern(AccessPattern.latency), estimated_read_per_second=Interval( low=234248, mid=351854, high=485906, confidence=0.98 ), @@ -335,9 +336,7 @@ def test_plan_certain(): estimated_state_size_gib=Interval( low=2006.083, mid=2252.5, high=2480.41, confidence=0.98 ), - estimated_compression_ratio=Interval( - low=1, mid=1, high=1, confidence=1 - ), + estimated_compression_ratio=Interval(low=1, mid=1, high=1, confidence=1), ), ) cap_plan = planner.plan_certain( diff --git a/tests/netflix/test_cassandra_uncertain.py b/tests/netflix/test_cassandra_uncertain.py index a3392b6..ce93dfb 100644 --- a/tests/netflix/test_cassandra_uncertain.py +++ b/tests/netflix/test_cassandra_uncertain.py @@ -1,4 +1,3 @@ - from service_capacity_modeling.capacity_planner import planner from service_capacity_modeling.interface import CapacityDesires from service_capacity_modeling.interface import DataShape @@ -42,14 +41,14 @@ def test_uncertain_planning(): lr_cluster = lr.candidate_clusters.zonal[0] assert 8 <= lr_cluster.count * lr_cluster.instance.cpu <= 64 assert ( - 5_000 <= lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 45_000 + 5_000 <= lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 45_000 ) sr = mid_plan.least_regret[1] sr_cluster = sr.candidate_clusters.zonal[0] assert 8 <= sr_cluster.count * sr_cluster.instance.cpu <= 64 assert ( - 5_000 <= sr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 45_000 + 5_000 <= sr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 45_000 ) tiny_plan = planner.plan( @@ -61,7 +60,7 @@ def test_uncertain_planning(): lr_cluster = lr.candidate_clusters.zonal[0] assert 2 <= lr_cluster.count * lr_cluster.instance.cpu < 16 assert ( - 1_000 < lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 6_000 + 1_000 < lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 6_000 ) @@ -155,9 +154,9 @@ def test_worn_dataset(): lr_cluster = lr.candidate_clusters.zonal[0] assert 128 <= lr_cluster.count * lr_cluster.instance.cpu <= 512 assert ( - 250_000 - <= lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] - < 1_000_000 + 250_000 + <= lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] + < 1_000_000 ) assert lr_cluster.instance.name.startswith( "m5." @@ -193,9 +192,9 @@ def test_very_small_has_disk(): lr_cluster = lr.candidate_clusters.zonal[0] assert 2 <= lr_cluster.count * lr_cluster.instance.cpu < 16 assert ( - 1_000 - < lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] - < 6_000 + 1_000 + < lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] + < 6_000 ) if lr_cluster.instance.drive is None: assert sum(dr.size_gib for dr in lr_cluster.attached_drives) > 10