From fac3418ccaf0d17cc26bae2eabf4eefc3a8ba87e Mon Sep 17 00:00:00 2001 From: Ram Srivatsa Kannan Date: Wed, 25 Oct 2023 16:54:01 -0700 Subject: [PATCH] fixing linting errors --- service_capacity_modeling/capacity_planner.py | 8 +- service_capacity_modeling/interface.py | 4 +- .../models/org/netflix/cassandra.py | 130 ++++++++++-------- tests/netflix/test_cassandra.py | 64 ++++----- tests/netflix/test_cassandra_uncertain.py | 19 ++- 5 files changed, 121 insertions(+), 104 deletions(-) diff --git a/service_capacity_modeling/capacity_planner.py b/service_capacity_modeling/capacity_planner.py index 39026e6..58f1fba 100644 --- a/service_capacity_modeling/capacity_planner.py +++ b/service_capacity_modeling/capacity_planner.py @@ -578,9 +578,13 @@ def generate_scenarios( # Get current instance object if exists if desires.current_clusters: for zonal_cluster_capacity in desires.current_clusters.zonal: - zonal_cluster_capacity.cluster_instance = hardware.instances[zonal_cluster_capacity.cluster_instance_name] + zonal_cluster_capacity.cluster_instance = hardware.instances[ + zonal_cluster_capacity.cluster_instance_name + ] for regional_cluster_capacity in desires.current_clusters.regional: - regional_cluster_capacity.cluster_instance = hardware.instances[regional_cluster_capacity.cluster_instance_name] + regional_cluster_capacity.cluster_instance = hardware.instances[ + regional_cluster_capacity.cluster_instance_name + ] plans = [] if model.run_hardware_simulation(): diff --git a/service_capacity_modeling/interface.py b/service_capacity_modeling/interface.py index 2fac80f..fc71d67 100644 --- a/service_capacity_modeling/interface.py +++ b/service_capacity_modeling/interface.py @@ -376,7 +376,7 @@ def annual_cost_gib(self, data_gib: float = 0): break if transfer_cost[0] > 0: annual_cost += ( - min(_annual_data, transfer_cost[0]) * transfer_cost[1] + min(_annual_data, transfer_cost[0]) * transfer_cost[1] ) _annual_data -= transfer_cost[0] else: @@ -758,7 +758,7 @@ class Requirements(ExcludeUnsetModel): # pylint: disable=unused-argument @staticmethod def regret( - name: str, optimal_plan: "CapacityPlan", proposed_plan: "CapacityPlan" + name: str, optimal_plan: "CapacityPlan", proposed_plan: "CapacityPlan" ) -> float: return 0.0 diff --git a/service_capacity_modeling/models/org/netflix/cassandra.py b/service_capacity_modeling/models/org/netflix/cassandra.py index a5dd303..a778293 100644 --- a/service_capacity_modeling/models/org/netflix/cassandra.py +++ b/service_capacity_modeling/models/org/netflix/cassandra.py @@ -40,49 +40,63 @@ def _write_buffer_gib_zone( - desires: CapacityDesires, zones_per_region: int, flushes_before_compaction: int = 4 + desires: CapacityDesires, zones_per_region: int, flushes_before_compaction: int = 4 ) -> float: # Cassandra has to buffer writes before flushing to disk, and assuming # we will compact every 4 flushes and we want no more than 2 redundant # compactions in an hour, we want <= 4**2 = 16 flushes per hour # or a flush of data every 3600 / 16 = 225 seconds write_bytes_per_second = ( - desires.query_pattern.estimated_write_per_second.mid - * desires.query_pattern.estimated_mean_write_size_bytes.mid + desires.query_pattern.estimated_write_per_second.mid + * desires.query_pattern.estimated_mean_write_size_bytes.mid ) compactions_per_hour = 2 hour_in_seconds = 60 * 60 write_buffer_gib = ( - (write_bytes_per_second * hour_in_seconds) - / (flushes_before_compaction ** compactions_per_hour) - ) / (1 << 30) + (write_bytes_per_second * hour_in_seconds) + / (flushes_before_compaction**compactions_per_hour) + ) / (1 << 30) return float(write_buffer_gib) / zones_per_region def _estimate_cassandra_requirement( - instance: Instance, - desires: CapacityDesires, - working_set: float, - reads_per_second: float, - max_rps_to_disk: int, - required_cluster_size: Optional[int] = None, - zones_per_region: int = 3, - copies_per_region: int = 3, + instance: Instance, + desires: CapacityDesires, + working_set: float, + reads_per_second: float, + max_rps_to_disk: int, + required_cluster_size: Optional[int] = None, + zones_per_region: int = 3, + copies_per_region: int = 3, ) -> CapacityRequirement: """Estimate the capacity required for one zone given a regional desire The input desires should be the **regional** desire, and this function will return the zonal capacity requirement """ - current_capacity = None if desires.current_clusters is None else desires.current_clusters.zonal[0] if len(desires.current_clusters.zonal) else desires.current_clusters.regional[0] - # Keep half of the cores free for background work (compaction, backup, repair). Currently, zones and regions are - # configured in a homogeneous manner. Hence, we just take any one of the current cluster configuration - if current_capacity and current_capacity.cluster_instance and required_cluster_size is not None: - needed_cores = (current_capacity.cluster_instance.cpu * required_cluster_size * - zones_per_region) * (current_capacity.cpu_utilization.high / 20) + current_capacity = ( + None + if desires.current_clusters is None + else desires.current_clusters.zonal[0] + if len(desires.current_clusters.zonal) + else desires.current_clusters.regional[0] + ) + # Keep half of the cores free for background work (compaction, backup, repair). + # Currently, zones and regions are configured in a homogeneous manner. Hence, + # we just take any one of the current cluster configuration + if ( + current_capacity + and current_capacity.cluster_instance + and required_cluster_size is not None + ): + needed_cores = ( + current_capacity.cluster_instance.cpu + * required_cluster_size + * zones_per_region + ) * (current_capacity.cpu_utilization.high / 20) else: needed_cores = sqrt_staffed_cores(desires) * 2 # Keep half of the bandwidth available for backup @@ -175,20 +189,20 @@ def _upsert_params(cluster, params): # pylint: disable=too-many-return-statements # flake8: noqa: C901 def _estimate_cassandra_cluster_zonal( - instance: Instance, - drive: Drive, - context: RegionContext, - desires: CapacityDesires, - zones_per_region: int = 3, - copies_per_region: int = 3, - require_local_disks: bool = False, - require_attached_disks: bool = False, - required_cluster_size: Optional[int] = None, - max_rps_to_disk: int = 500, - max_local_disk_gib: int = 2048, - max_regional_size: int = 96, - max_write_buffer_percent: float = 0.25, - max_table_buffer_percent: float = 0.11, + instance: Instance, + drive: Drive, + context: RegionContext, + desires: CapacityDesires, + zones_per_region: int = 3, + copies_per_region: int = 3, + require_local_disks: bool = False, + require_attached_disks: bool = False, + required_cluster_size: Optional[int] = None, + max_rps_to_disk: int = 500, + max_local_disk_gib: int = 2048, + max_regional_size: int = 96, + max_write_buffer_percent: float = 0.25, + max_table_buffer_percent: float = 0.11, ) -> Optional[CapacityPlan]: # Netflix Cassandra doesn't like to deploy on really small instances if instance.cpu < 2 or instance.ram_gib < 14: @@ -208,7 +222,7 @@ def _estimate_cassandra_cluster_zonal( rps = desires.query_pattern.estimated_read_per_second.mid // zones_per_region write_per_sec = ( - desires.query_pattern.estimated_write_per_second.mid // zones_per_region + desires.query_pattern.estimated_write_per_second.mid // zones_per_region ) write_bytes_per_sec = round( write_per_sec * desires.query_pattern.estimated_mean_write_size_bytes.mid @@ -257,8 +271,8 @@ def _estimate_cassandra_cluster_zonal( min_count = 2 base_mem = ( - desires.data_shape.reserved_instance_app_mem_gib - + desires.data_shape.reserved_instance_system_mem_gib + desires.data_shape.reserved_instance_app_mem_gib + + desires.data_shape.reserved_instance_system_mem_gib ) heap_fn = _cass_heap_for_write_buffer( @@ -341,7 +355,7 @@ def _estimate_cassandra_cluster_zonal( annual_cost=blob.annual_cost_gib(requirement.disk_gib.mid), service_params={ "nines_required": ( - 1 - 1.0 / desires.data_shape.durability_slo_order.mid + 1 - 1.0 / desires.data_shape.durability_slo_order.mid ) }, ) @@ -384,15 +398,15 @@ def _cass_io_per_read(node_size_gib, sstable_size_mb=160): def _cass_heap_for_write_buffer( - instance: Instance, - write_buffer_gib: float, - max_zonal_size: int, - buffer_percent: float, + instance: Instance, + write_buffer_gib: float, + max_zonal_size: int, + buffer_percent: float, ) -> Callable[[float], float]: # If there is no way we can get enough heap with the max zonal size, try # letting max heap grow to 31 GiB per node to get more write buffer if write_buffer_gib > ( - max_zonal_size * _cass_heap(instance.ram_gib) * buffer_percent + max_zonal_size * _cass_heap(instance.ram_gib) * buffer_percent ): return lambda x: _cass_heap(x, max_heap_gib=30) else: @@ -417,9 +431,9 @@ def _target_rf(desires: CapacityDesires, user_copies: Optional[int]) -> int: # run with RF=2 consistency = desires.query_pattern.access_consistency.same_region if ( - desires.data_shape.durability_slo_order.mid < 1000 - and consistency is not None - and consistency.target_consistency != AccessConsistency.read_your_writes + desires.data_shape.durability_slo_order.mid < 1000 + and consistency is not None + and consistency.target_consistency != AccessConsistency.read_your_writes ): return 2 return 3 @@ -429,7 +443,7 @@ class NflxCassandraArguments(BaseModel): copies_per_region: int = Field( default=3, description="How many copies of the data will exist e.g. RF=3. If unsupplied" - " this will be deduced from durability and consistency desires", + " this will be deduced from durability and consistency desires", ) require_local_disks: bool = Field( default=False, @@ -458,25 +472,25 @@ class NflxCassandraArguments(BaseModel): max_write_buffer_percent: float = Field( default=0.25, description="The amount of heap memory that can be used to buffer writes. " - "Note that if there are more than 100k writes this will " - "automatically adjust to 0.5", + "Note that if there are more than 100k writes this will " + "automatically adjust to 0.5", ) max_table_buffer_percent: float = Field( default=0.11, description="How much of heap memory can be used for a single table. " - "Note that if there are more than 100k writes this will " - "automatically adjust to 0.2", + "Note that if there are more than 100k writes this will " + "automatically adjust to 0.2", ) class NflxCassandraCapacityModel(CapacityModel): @staticmethod def capacity_plan( - instance: Instance, - drive: Drive, - context: RegionContext, - desires: CapacityDesires, - extra_model_arguments: Dict[str, Any], + instance: Instance, + drive: Drive, + context: RegionContext, + desires: CapacityDesires, + extra_model_arguments: Dict[str, Any], ) -> Optional[CapacityPlan]: # Use durabiliy and consistency to compute RF. copies_per_region = _target_rf( @@ -503,8 +517,8 @@ def capacity_plan( # Adjust heap defaults for high write clusters if ( - desires.query_pattern.estimated_write_per_second.mid >= 100_000 - and desires.data_shape.estimated_state_size_gib.mid >= 100 + desires.query_pattern.estimated_write_per_second.mid >= 100_000 + and desires.data_shape.estimated_state_size_gib.mid >= 100 ): max_write_buffer_percent = max(0.5, max_write_buffer_percent) max_table_buffer_percent = max(0.2, max_table_buffer_percent) diff --git a/tests/netflix/test_cassandra.py b/tests/netflix/test_cassandra.py index 3beac86..061cf0e 100644 --- a/tests/netflix/test_cassandra.py +++ b/tests/netflix/test_cassandra.py @@ -1,5 +1,9 @@ from service_capacity_modeling.capacity_planner import planner -from service_capacity_modeling.interface import AccessConsistency, CurrentClusterCapacity, CurrentClusters +from service_capacity_modeling.interface import ( + AccessConsistency, + CurrentClusterCapacity, + CurrentClusters, +) from service_capacity_modeling.interface import CapacityDesires from service_capacity_modeling.interface import certain_float from service_capacity_modeling.interface import certain_int @@ -67,9 +71,9 @@ def test_capacity_small_fast(): # with lots of ebs_gp2 to handle the read IOs if small_result.attached_drives: assert ( - small_result.count - * sum(d.size_gib for d in small_result.attached_drives) - > 1000 + small_result.count + * sum(d.size_gib for d in small_result.attached_drives) + > 1000 ) assert small_result.cluster_params["cassandra.heap.write.percent"] == 0.25 @@ -157,12 +161,12 @@ def test_capacity_high_writes(): assert 30 <= num_cpus <= 128 if high_writes_result.attached_drives: assert ( - high_writes_result.count * high_writes_result.attached_drives[0].size_gib - >= 400 + high_writes_result.count * high_writes_result.attached_drives[0].size_gib + >= 400 ) elif high_writes_result.instance.drive is not None: assert ( - high_writes_result.count * high_writes_result.instance.drive.size_gib >= 400 + high_writes_result.count * high_writes_result.instance.drive.size_gib >= 400 ) else: raise AssertionError("Should have drives") @@ -195,9 +199,9 @@ def test_high_write_throughput(): assert high_writes_result.attached_drives[0].size_gib >= 400 assert ( - 300_000 - > high_writes_result.count * high_writes_result.attached_drives[0].size_gib - >= 100_000 + 300_000 + > high_writes_result.count * high_writes_result.attached_drives[0].size_gib + >= 100_000 ) cluster_cost = cap_plan.candidate_clusters.annual_costs["cassandra.zonal-clusters"] @@ -226,7 +230,7 @@ def test_capacity_large_footprint(): assert large_footprint_result.cluster_params["cassandra.heap.write.percent"] == 0.25 assert large_footprint_result.cluster_params["cassandra.heap.table.percent"] == 0.11 assert ( - large_footprint_result.cluster_params["cassandra.compaction.min_threshold"] == 4 + large_footprint_result.cluster_params["cassandra.compaction.min_threshold"] == 4 ) @@ -270,7 +274,7 @@ def test_reduced_durability(): )[0] assert cheap_plan.candidate_clusters.total_annual_cost < ( - 0.7 * float(expensive_plan.candidate_clusters.total_annual_cost) + 0.7 * float(expensive_plan.candidate_clusters.total_annual_cost) ) # The reduced durability and consistency requirement let's us # use less compute @@ -280,27 +284,27 @@ def test_reduced_durability(): # Due to high writes both should have high heap write buffering for plan in (expensive_plan, cheap_plan): assert ( - plan.candidate_clusters.zonal[0].cluster_params[ - "cassandra.heap.write.percent" - ] - == 0.5 + plan.candidate_clusters.zonal[0].cluster_params[ + "cassandra.heap.write.percent" + ] + == 0.5 ) assert ( - plan.candidate_clusters.zonal[0].cluster_params[ - "cassandra.heap.table.percent" - ] - == 0.2 + plan.candidate_clusters.zonal[0].cluster_params[ + "cassandra.heap.table.percent" + ] + == 0.2 ) assert ( - plan.candidate_clusters.zonal[0].cluster_params[ - "cassandra.compaction.min_threshold" - ] - == 8 + plan.candidate_clusters.zonal[0].cluster_params[ + "cassandra.compaction.min_threshold" + ] + == 8 ) assert ( - cheap_plan.candidate_clusters.zonal[0].cluster_params["cassandra.keyspace.rf"] - == 2 + cheap_plan.candidate_clusters.zonal[0].cluster_params["cassandra.keyspace.rf"] + == 2 ) @@ -320,9 +324,7 @@ def test_plan_certain(): service_tier=1, current_clusters=CurrentClusters(zonal=[cluster_capacity]), query_pattern=QueryPattern( - access_pattern=AccessPattern( - AccessPattern.latency - ), + access_pattern=AccessPattern(AccessPattern.latency), estimated_read_per_second=Interval( low=234248, mid=351854, high=485906, confidence=0.98 ), @@ -335,9 +337,7 @@ def test_plan_certain(): estimated_state_size_gib=Interval( low=2006.083, mid=2252.5, high=2480.41, confidence=0.98 ), - estimated_compression_ratio=Interval( - low=1, mid=1, high=1, confidence=1 - ), + estimated_compression_ratio=Interval(low=1, mid=1, high=1, confidence=1), ), ) cap_plan = planner.plan_certain( diff --git a/tests/netflix/test_cassandra_uncertain.py b/tests/netflix/test_cassandra_uncertain.py index a3392b6..ce93dfb 100644 --- a/tests/netflix/test_cassandra_uncertain.py +++ b/tests/netflix/test_cassandra_uncertain.py @@ -1,4 +1,3 @@ - from service_capacity_modeling.capacity_planner import planner from service_capacity_modeling.interface import CapacityDesires from service_capacity_modeling.interface import DataShape @@ -42,14 +41,14 @@ def test_uncertain_planning(): lr_cluster = lr.candidate_clusters.zonal[0] assert 8 <= lr_cluster.count * lr_cluster.instance.cpu <= 64 assert ( - 5_000 <= lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 45_000 + 5_000 <= lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 45_000 ) sr = mid_plan.least_regret[1] sr_cluster = sr.candidate_clusters.zonal[0] assert 8 <= sr_cluster.count * sr_cluster.instance.cpu <= 64 assert ( - 5_000 <= sr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 45_000 + 5_000 <= sr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 45_000 ) tiny_plan = planner.plan( @@ -61,7 +60,7 @@ def test_uncertain_planning(): lr_cluster = lr.candidate_clusters.zonal[0] assert 2 <= lr_cluster.count * lr_cluster.instance.cpu < 16 assert ( - 1_000 < lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 6_000 + 1_000 < lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] < 6_000 ) @@ -155,9 +154,9 @@ def test_worn_dataset(): lr_cluster = lr.candidate_clusters.zonal[0] assert 128 <= lr_cluster.count * lr_cluster.instance.cpu <= 512 assert ( - 250_000 - <= lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] - < 1_000_000 + 250_000 + <= lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] + < 1_000_000 ) assert lr_cluster.instance.name.startswith( "m5." @@ -193,9 +192,9 @@ def test_very_small_has_disk(): lr_cluster = lr.candidate_clusters.zonal[0] assert 2 <= lr_cluster.count * lr_cluster.instance.cpu < 16 assert ( - 1_000 - < lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] - < 6_000 + 1_000 + < lr.candidate_clusters.annual_costs["cassandra.zonal-clusters"] + < 6_000 ) if lr_cluster.instance.drive is None: assert sum(dr.size_gib for dr in lr_cluster.attached_drives) > 10