Skip to content

Commit

Permalink
fixing linting/style errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Ram Srivatsa Kannan committed Oct 26, 2023
1 parent 59a1fb4 commit de475e0
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 112 deletions.
25 changes: 18 additions & 7 deletions service_capacity_modeling/capacity_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from service_capacity_modeling.interface import certain_float
from service_capacity_modeling.interface import DataShape
from service_capacity_modeling.interface import Drive
from service_capacity_modeling.interface import Hardware
from service_capacity_modeling.interface import Instance
from service_capacity_modeling.interface import Interval
from service_capacity_modeling.interface import interval
Expand Down Expand Up @@ -182,6 +183,21 @@ def model_desires_percentiles(
return results, d


def _set_instance_objects(
desires: CapacityDesires,
hardware: Hardware,
):
if desires.current_clusters:
for zonal_cluster_capacity in desires.current_clusters.zonal:
zonal_cluster_capacity.cluster_instance = hardware.instances[
zonal_cluster_capacity.cluster_instance_name
]
for regional_cluster_capacity in desires.current_clusters.regional:
regional_cluster_capacity.cluster_instance = hardware.instances[
regional_cluster_capacity.cluster_instance_name
]


def _allow_instance(
instance: Instance,
allowed_names: Sequence[str],
Expand Down Expand Up @@ -575,14 +591,9 @@ def generate_scenarios(
if len(allowed_drives) == 0:
allowed_drives.update(hardware.drives.keys())

# Get current instance object if exists
if desires.current_clusters:
for zonal_cluster_capacity in desires.current_clusters.zonal:
zonal_cluster_capacity.cluster_instance = hardware.instances[zonal_cluster_capacity.cluster_instance_name]
for regional_cluster_capacity in desires.current_clusters.regional:
regional_cluster_capacity.cluster_instance = hardware.instances[regional_cluster_capacity.cluster_instance_name]
# Set current instance object if exists
_set_instance_objects(desires, hardware)

plans = []
if model.run_hardware_simulation():
for instance in hardware.instances.values():
if not _allow_instance(
Expand Down
4 changes: 2 additions & 2 deletions service_capacity_modeling/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ def annual_cost_gib(self, data_gib: float = 0):
break
if transfer_cost[0] > 0:
annual_cost += (
min(_annual_data, transfer_cost[0]) * transfer_cost[1]
min(_annual_data, transfer_cost[0]) * transfer_cost[1]
)
_annual_data -= transfer_cost[0]
else:
Expand Down Expand Up @@ -758,7 +758,7 @@ class Requirements(ExcludeUnsetModel):
# pylint: disable=unused-argument
@staticmethod
def regret(
name: str, optimal_plan: "CapacityPlan", proposed_plan: "CapacityPlan"
name: str, optimal_plan: "CapacityPlan", proposed_plan: "CapacityPlan"
) -> float:
return 0.0

Expand Down
130 changes: 72 additions & 58 deletions service_capacity_modeling/models/org/netflix/cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,49 +40,63 @@


def _write_buffer_gib_zone(
desires: CapacityDesires, zones_per_region: int, flushes_before_compaction: int = 4
desires: CapacityDesires, zones_per_region: int, flushes_before_compaction: int = 4
) -> float:
# Cassandra has to buffer writes before flushing to disk, and assuming
# we will compact every 4 flushes and we want no more than 2 redundant
# compactions in an hour, we want <= 4**2 = 16 flushes per hour
# or a flush of data every 3600 / 16 = 225 seconds
write_bytes_per_second = (
desires.query_pattern.estimated_write_per_second.mid
* desires.query_pattern.estimated_mean_write_size_bytes.mid
desires.query_pattern.estimated_write_per_second.mid
* desires.query_pattern.estimated_mean_write_size_bytes.mid
)

compactions_per_hour = 2
hour_in_seconds = 60 * 60

write_buffer_gib = (
(write_bytes_per_second * hour_in_seconds)
/ (flushes_before_compaction ** compactions_per_hour)
) / (1 << 30)
(write_bytes_per_second * hour_in_seconds)
/ (flushes_before_compaction**compactions_per_hour)
) / (1 << 30)

return float(write_buffer_gib) / zones_per_region


def _estimate_cassandra_requirement(
instance: Instance,
desires: CapacityDesires,
working_set: float,
reads_per_second: float,
max_rps_to_disk: int,
required_cluster_size: Optional[int] = None,
zones_per_region: int = 3,
copies_per_region: int = 3,
instance: Instance,
desires: CapacityDesires,
working_set: float,
reads_per_second: float,
max_rps_to_disk: int,
required_cluster_size: Optional[int] = None,
zones_per_region: int = 3,
copies_per_region: int = 3,
) -> CapacityRequirement:
"""Estimate the capacity required for one zone given a regional desire
The input desires should be the **regional** desire, and this function will
return the zonal capacity requirement
"""
current_capacity = None if desires.current_clusters is None else desires.current_clusters.zonal[0] if len(desires.current_clusters.zonal) else desires.current_clusters.regional[0]
# Keep half of the cores free for background work (compaction, backup, repair). Currently, zones and regions are
# configured in a homogeneous manner. Hence, we just take any one of the current cluster configuration
if current_capacity and current_capacity.cluster_instance and required_cluster_size is not None:
needed_cores = (current_capacity.cluster_instance.cpu * required_cluster_size *
zones_per_region) * (current_capacity.cpu_utilization.high / 20)
current_capacity = (
None
if desires.current_clusters is None
else desires.current_clusters.zonal[0]
if len(desires.current_clusters.zonal)
else desires.current_clusters.regional[0]
)
# Keep half of the cores free for background work (compaction, backup, repair).
# Currently, zones and regions are configured in a homogeneous manner. Hence,
# we just take any one of the current cluster configuration
if (
current_capacity
and current_capacity.cluster_instance
and required_cluster_size is not None
):
needed_cores = (
current_capacity.cluster_instance.cpu
* required_cluster_size
* zones_per_region
) * (current_capacity.cpu_utilization.high / 20)
else:
needed_cores = sqrt_staffed_cores(desires) * 2
# Keep half of the bandwidth available for backup
Expand Down Expand Up @@ -175,20 +189,20 @@ def _upsert_params(cluster, params):
# pylint: disable=too-many-return-statements
# flake8: noqa: C901
def _estimate_cassandra_cluster_zonal(
instance: Instance,
drive: Drive,
context: RegionContext,
desires: CapacityDesires,
zones_per_region: int = 3,
copies_per_region: int = 3,
require_local_disks: bool = False,
require_attached_disks: bool = False,
required_cluster_size: Optional[int] = None,
max_rps_to_disk: int = 500,
max_local_disk_gib: int = 2048,
max_regional_size: int = 96,
max_write_buffer_percent: float = 0.25,
max_table_buffer_percent: float = 0.11,
instance: Instance,
drive: Drive,
context: RegionContext,
desires: CapacityDesires,
zones_per_region: int = 3,
copies_per_region: int = 3,
require_local_disks: bool = False,
require_attached_disks: bool = False,
required_cluster_size: Optional[int] = None,
max_rps_to_disk: int = 500,
max_local_disk_gib: int = 2048,
max_regional_size: int = 96,
max_write_buffer_percent: float = 0.25,
max_table_buffer_percent: float = 0.11,
) -> Optional[CapacityPlan]:
# Netflix Cassandra doesn't like to deploy on really small instances
if instance.cpu < 2 or instance.ram_gib < 14:
Expand All @@ -208,7 +222,7 @@ def _estimate_cassandra_cluster_zonal(

rps = desires.query_pattern.estimated_read_per_second.mid // zones_per_region
write_per_sec = (
desires.query_pattern.estimated_write_per_second.mid // zones_per_region
desires.query_pattern.estimated_write_per_second.mid // zones_per_region
)
write_bytes_per_sec = round(
write_per_sec * desires.query_pattern.estimated_mean_write_size_bytes.mid
Expand Down Expand Up @@ -257,8 +271,8 @@ def _estimate_cassandra_cluster_zonal(
min_count = 2

base_mem = (
desires.data_shape.reserved_instance_app_mem_gib
+ desires.data_shape.reserved_instance_system_mem_gib
desires.data_shape.reserved_instance_app_mem_gib
+ desires.data_shape.reserved_instance_system_mem_gib
)

heap_fn = _cass_heap_for_write_buffer(
Expand Down Expand Up @@ -341,7 +355,7 @@ def _estimate_cassandra_cluster_zonal(
annual_cost=blob.annual_cost_gib(requirement.disk_gib.mid),
service_params={
"nines_required": (
1 - 1.0 / desires.data_shape.durability_slo_order.mid
1 - 1.0 / desires.data_shape.durability_slo_order.mid
)
},
)
Expand Down Expand Up @@ -384,15 +398,15 @@ def _cass_io_per_read(node_size_gib, sstable_size_mb=160):


def _cass_heap_for_write_buffer(
instance: Instance,
write_buffer_gib: float,
max_zonal_size: int,
buffer_percent: float,
instance: Instance,
write_buffer_gib: float,
max_zonal_size: int,
buffer_percent: float,
) -> Callable[[float], float]:
# If there is no way we can get enough heap with the max zonal size, try
# letting max heap grow to 31 GiB per node to get more write buffer
if write_buffer_gib > (
max_zonal_size * _cass_heap(instance.ram_gib) * buffer_percent
max_zonal_size * _cass_heap(instance.ram_gib) * buffer_percent
):
return lambda x: _cass_heap(x, max_heap_gib=30)
else:
Expand All @@ -417,9 +431,9 @@ def _target_rf(desires: CapacityDesires, user_copies: Optional[int]) -> int:
# run with RF=2
consistency = desires.query_pattern.access_consistency.same_region
if (
desires.data_shape.durability_slo_order.mid < 1000
and consistency is not None
and consistency.target_consistency != AccessConsistency.read_your_writes
desires.data_shape.durability_slo_order.mid < 1000
and consistency is not None
and consistency.target_consistency != AccessConsistency.read_your_writes
):
return 2
return 3
Expand All @@ -429,7 +443,7 @@ class NflxCassandraArguments(BaseModel):
copies_per_region: int = Field(
default=3,
description="How many copies of the data will exist e.g. RF=3. If unsupplied"
" this will be deduced from durability and consistency desires",
" this will be deduced from durability and consistency desires",
)
require_local_disks: bool = Field(
default=False,
Expand Down Expand Up @@ -458,25 +472,25 @@ class NflxCassandraArguments(BaseModel):
max_write_buffer_percent: float = Field(
default=0.25,
description="The amount of heap memory that can be used to buffer writes. "
"Note that if there are more than 100k writes this will "
"automatically adjust to 0.5",
"Note that if there are more than 100k writes this will "
"automatically adjust to 0.5",
)
max_table_buffer_percent: float = Field(
default=0.11,
description="How much of heap memory can be used for a single table. "
"Note that if there are more than 100k writes this will "
"automatically adjust to 0.2",
"Note that if there are more than 100k writes this will "
"automatically adjust to 0.2",
)


class NflxCassandraCapacityModel(CapacityModel):
@staticmethod
def capacity_plan(
instance: Instance,
drive: Drive,
context: RegionContext,
desires: CapacityDesires,
extra_model_arguments: Dict[str, Any],
instance: Instance,
drive: Drive,
context: RegionContext,
desires: CapacityDesires,
extra_model_arguments: Dict[str, Any],
) -> Optional[CapacityPlan]:
# Use durabiliy and consistency to compute RF.
copies_per_region = _target_rf(
Expand All @@ -503,8 +517,8 @@ def capacity_plan(

# Adjust heap defaults for high write clusters
if (
desires.query_pattern.estimated_write_per_second.mid >= 100_000
and desires.data_shape.estimated_state_size_gib.mid >= 100
desires.query_pattern.estimated_write_per_second.mid >= 100_000
and desires.data_shape.estimated_state_size_gib.mid >= 100
):
max_write_buffer_percent = max(0.5, max_write_buffer_percent)
max_table_buffer_percent = max(0.2, max_table_buffer_percent)
Expand Down
Loading

0 comments on commit de475e0

Please sign in to comment.