From 395a60266567e4f7acaa5c923ec98547a03a18ad Mon Sep 17 00:00:00 2001 From: Gennadiy Chuyeshov Date: Thu, 26 Oct 2023 14:29:15 -0700 Subject: [PATCH] Additional tweaks to the Elasticsearch model, moving search nodes into a dedicated sub-model --- .../models/org/netflix/__init__.py | 2 + .../models/org/netflix/elasticsearch.py | 150 +++++++++--------- tests/netflix/test_elasticsearch.py | 17 +- 3 files changed, 90 insertions(+), 79 deletions(-) diff --git a/service_capacity_modeling/models/org/netflix/__init__.py b/service_capacity_modeling/models/org/netflix/__init__.py index ce207c0..5b049e1 100644 --- a/service_capacity_modeling/models/org/netflix/__init__.py +++ b/service_capacity_modeling/models/org/netflix/__init__.py @@ -6,6 +6,7 @@ from .elasticsearch import nflx_elasticsearch_capacity_model from .elasticsearch import nflx_elasticsearch_data_capacity_model from .elasticsearch import nflx_elasticsearch_master_capacity_model +from .elasticsearch import nflx_elasticsearch_search_capacity_model from .entity import nflx_entity_capacity_model from .evcache import nflx_evcache_capacity_model from .kafka import nflx_kafka_capacity_model @@ -30,6 +31,7 @@ def models(): "org.netflix.elasticsearch": nflx_elasticsearch_capacity_model, "org.netflix.elasticsearch.node": nflx_elasticsearch_data_capacity_model, "org.netflix.elasticsearch.master": nflx_elasticsearch_master_capacity_model, + "org.netflix.elasticsearch.search": nflx_elasticsearch_search_capacity_model, "org.netflix.entity": nflx_entity_capacity_model, "org.netflix.cockroachdb": nflx_cockroachdb_capacity_model, "org.netflix.aurora": nflx_aurora_capacity_model, diff --git a/service_capacity_modeling/models/org/netflix/elasticsearch.py b/service_capacity_modeling/models/org/netflix/elasticsearch.py index 2a44f4e..754f5cd 100644 --- a/service_capacity_modeling/models/org/netflix/elasticsearch.py +++ b/service_capacity_modeling/models/org/netflix/elasticsearch.py @@ -154,7 +154,7 @@ def _upsert_params(cluster, params): class NflxElasticsearchArguments(BaseModel): copies_per_region: int = Field( default=3, - description="How many copies of the data will exist e.g. RF=3. If unsupplied" + description="How many copies of the data will exist e.g. RF=3. If not supplied" " this will be deduced from durability and consistency desires", ) max_regional_size: int = Field( @@ -163,8 +163,8 @@ class NflxElasticsearchArguments(BaseModel): description="What is the maximum size of a cluster in this region", ) max_local_disk_gib: int = Field( - # Nodes larger than 4 TiB are painful to recover - default=4096, + # Nodes larger than 8 TiB are painful to recover + default=8192, description="The maximum amount of data we store per machine", ) max_rps_to_disk: int = Field( @@ -183,15 +183,13 @@ def capacity_plan( extra_model_arguments: Dict[str, Any], ) -> Optional[CapacityPlan]: - # (FIXME): Need elasticsearch input - # TODO: Use durability requirements to compute RF. copies_per_region: int = _target_rf( desires, extra_model_arguments.get("copies_per_region", None) ) - max_regional_size: int = extra_model_arguments.get("max_regional_size", 240) + max_regional_size: int = extra_model_arguments.get("max_regional_size", 120) max_rps_to_disk: int = extra_model_arguments.get("max_rps_to_disk", 1000) # Very large nodes are hard to recover - max_local_disk_gib: int = extra_model_arguments.get("max_local_disk_gib", 5000) + max_local_disk_gib: int = extra_model_arguments.get("max_local_disk_gib", 8192) # the ratio of traffic that should be handled by search nodes. # 0.0 = no search nodes, all searches handled by data nodes @@ -213,7 +211,6 @@ def capacity_plan( _rps = desires.query_pattern.estimated_read_per_second.mid // zones_in_region data_rps = _rps / (search_to_data_rps_ratio + 1) - search_rps = search_to_data_rps_ratio * data_rps # Based on the disk latency and the read latency SLOs we adjust our # working set to keep more or less data in RAM. Faster drives need @@ -278,12 +275,11 @@ def capacity_plan( # merging can make progress as long as there is some headroom required_disk_space=lambda x: x * 1.33, max_local_disk_gib=max_local_disk_gib, - # elasticsearch clusters can autobalance via shard placement + # Elasticsearch clusters can auto-balance via shard placement cluster_size=lambda x: x, min_count=1, # Sidecars/System takes away memory from Elasticsearch - # Elasticsearch uses half of available system max of 32 for compressed - # oops + # which uses half of available system max of 32 for compressed oops reserve_memory=lambda x: base_mem + max(32, x / 2), core_reference_ghz=data_requirement.core_reference_ghz, ) @@ -293,74 +289,28 @@ def capacity_plan( params = {"elasticsearch.copies": copies_per_region} _upsert_params(data_cluster, params) - # elasticsearch clusters generally should try to stay under some total number + # Elasticsearch clusters generally should try to stay under some total number # of nodes. Orgs do this for all kinds of reasons such as - # * Security group limits. Since you must have < 500 rules if you're - # ingressing public ips) - # * Maintenance. If your restart script does one node at a time you want - # smaller clusters so your restarts don't take months. - # * NxN network issues. Sometimes smaller clusters of bigger nodes - # are better for network propagation + # * Maintenance. If your restart script does one node at a time you want + # smaller clusters so your restarts don't take months. + # * NxN network issues. Sometimes smaller clusters of bigger nodes + # are better for network propagation if data_cluster.count > (max_regional_size // zones_in_region): return None - data_requirements = [data_requirement] * zones_in_region - data_clusters = [data_cluster] * zones_in_region ec2_costs = { "elasticsearch-data.zonal-clusters": zones_in_region * data_cluster.annual_cost } - if search_rps > 0: - search_requirement = _estimate_elasticsearch_requirement( - node_type="search", - instance=instance, - desires=desires, - working_set=0.0, - reads_per_second=search_rps, - zones_in_region=zones_in_region, - copies_per_region=0, - max_rps_to_disk=1, # to avoid divide by zero - ) - search_cluster = compute_stateful_zone( - instance=instance, - drive=drive, - needed_cores=int(search_requirement.cpu_cores.mid), - needed_disk_gib=int(search_requirement.disk_gib.mid), - needed_memory_gib=int(search_requirement.mem_gib.mid), - needed_network_mbps=search_requirement.network_mbps.mid, - max_local_disk_gib=0, - # Elasticsearch clusters can autobalance via shard placement - cluster_size=lambda x: x, - min_count=1, - # Sidecars/System takes away memory from Elasticsearch - # Elasticsearch uses half of available system max of 32 for compressed - # oops - reserve_memory=lambda x: base_mem + max(32, x / 2), - core_reference_ghz=search_requirement.core_reference_ghz, - ) - search_cluster.cluster_type = "elasticsearch-search" - - if search_cluster.count > (max_regional_size // zones_in_region): - return None - ec2_costs["elasticsearch-search.zonal-clusters"] = ( - zones_in_region * search_cluster.annual_cost - ) - - search_requirements = [search_requirement] * zones_in_region - search_clusters = [search_cluster] * zones_in_region - else: - search_requirements = [] - search_clusters = [] - clusters = Clusters( annual_costs=ec2_costs, - zonal=data_clusters + search_clusters, + zonal=[data_cluster] * zones_in_region, regional=[], ) plan = CapacityPlan( - requirements=Requirements(zonal=data_requirements + search_requirements), + requirements=Requirements(zonal=[data_requirement] * zones_in_region), candidate_clusters=clusters, ) @@ -376,7 +326,7 @@ def capacity_plan( desires: CapacityDesires, extra_model_arguments: Dict[str, Any], ) -> Optional[CapacityPlan]: - # only accept running on instances with a lot of RAM and a few CPUs + # Only accept running on instances with a lot of RAM and a few CPUs if instance.ram_gib <= 24: return None if instance.cpu <= 2: @@ -413,6 +363,50 @@ def capacity_plan( ) +class NflxElasticsearchSearchCapacityModel(CapacityModel): + @staticmethod + def capacity_plan( + instance: Instance, + drive: Drive, + context: RegionContext, + desires: CapacityDesires, + extra_model_arguments: Dict[str, Any], + ) -> Optional[CapacityPlan]: + # Only accept running on instances with a lot of RAM and a few CPUs + if instance.ram_gib <= 24: + return None + if instance.cpu <= 2: + return None + + zones_in_region = context.zones_in_region + requirement = CapacityRequirement( + requirement_type="elasticsearch-search-zonal", + core_reference_ghz=desires.core_reference_ghz, + cpu_cores=certain_int(2), + mem_gib=certain_int(24), + context={}, + ) + + cluster = ZoneClusterCapacity( + cluster_type="elasticsearch-search", + count=1, + instance=instance, + attached_drives=[], + annual_cost=instance.annual_cost, + ) + + ec2_cost = zones_in_region * cluster.annual_cost + clusters = Clusters( + annual_costs={"elasticsearch-search.zonal-clusters": ec2_cost}, + zonal=[cluster] * zones_in_region, + ) + + return CapacityPlan( + requirements=Requirements(zonal=[requirement] * zones_in_region), + candidate_clusters=clusters, + ) + + class NflxElasticsearchCapacityModel(CapacityModel): @staticmethod def capacity_plan( @@ -432,21 +426,22 @@ def description(): def compose_with( user_desires: CapacityDesires, extra_model_arguments: Dict[str, Any] ) -> Tuple[Tuple[str, Callable[[CapacityDesires], CapacityDesires]], ...]: - def _modify_data_desires( - user_desires: CapacityDesires, - ) -> CapacityDesires: + def _modify_data_desires(desires: CapacityDesires) -> CapacityDesires: # data node's model use the full desires - return user_desires + return desires - def _modify_master_desires( - user_desires: CapacityDesires, - ) -> CapacityDesires: - # master node's model doesn't use anything from the desires. - return user_desires + def _modify_master_desires(desires: CapacityDesires) -> CapacityDesires: + # master node's model doesn't use anything from the desires + return desires + + def _modify_search_desires(desires: CapacityDesires) -> CapacityDesires: + # search node's model doesn't use anything from the desires + return desires return ( ("org.netflix.elasticsearch.node", _modify_data_desires), ("org.netflix.elasticsearch.master", _modify_master_desires), + ("org.netflix.elasticsearch.search", _modify_search_desires), ) @staticmethod @@ -508,14 +503,12 @@ def default_desires(user_desires, extra_model_arguments: Dict[str, Any]): confidence=0.98, ), ), - # Most latency sensitive elasticsearch clusters are in the - # < 100GiB range + # Most latency sensitive Elasticsearch clusters are in the < 100GiB range data_shape=DataShape( estimated_state_size_gib=Interval( low=10, mid=100, high=1000, confidence=0.98 ), - # Netflix Elasticsearch compresses with Deflate (gzip) - # by default + # Netflix Elasticsearch compresses with Deflate (gzip) by default estimated_compression_ratio=Interval( minimum_value=1.4, maximum_value=8, @@ -595,3 +588,4 @@ def default_desires(user_desires, extra_model_arguments: Dict[str, Any]): nflx_elasticsearch_capacity_model = NflxElasticsearchCapacityModel() nflx_elasticsearch_data_capacity_model = NflxElasticsearchDataCapacityModel() nflx_elasticsearch_master_capacity_model = NflxElasticsearchMasterCapacityModel() +nflx_elasticsearch_search_capacity_model = NflxElasticsearchSearchCapacityModel() diff --git a/tests/netflix/test_elasticsearch.py b/tests/netflix/test_elasticsearch.py index 5a3634a..5e9e012 100644 --- a/tests/netflix/test_elasticsearch.py +++ b/tests/netflix/test_elasticsearch.py @@ -1,3 +1,4 @@ +from collections import Counter from collections import defaultdict from service_capacity_modeling.capacity_planner import planner @@ -123,13 +124,26 @@ def test_es_simple_certain(): ) assert len(cap_plan) > 0, "Resulting cap_plan is empty" - assert all(plan for plan in cap_plan), "One or more plans is empty" + + for plan in cap_plan: + assert plan, "One or more plans is empty" + assert plan.candidate_clusters, "candidate_clusters is empty" + assert plan.candidate_clusters.zonal, "candidate_clusters.zonal is empty" + assert len(plan.candidate_clusters.zonal) == 9, "len(candidate_clusters.zonal) != 9" + + cluster_type_counts = Counter(zone.cluster_type for zone in plan.candidate_clusters.zonal) + + assert len(cluster_type_counts) == 3, "Expecting 3 cluster types" + assert cluster_type_counts["elasticsearch-search"] == 3, "Expecting exactly 3 search nodes" + assert cluster_type_counts["elasticsearch-master"] == 3, "Expecting exactly 3 master nodes" + assert cluster_type_counts["elasticsearch-data"] >= 3, "Expecting at least 3 data nodes" def zonal_summary(zlr): zlr_cpu = zlr.count * zlr.instance.cpu zlr_cost = zlr.annual_cost zlr_family = zlr.instance.family + zlr_instance_name = zlr.instance.name zlr_drive_gib = sum(dr.size_gib for dr in zlr.attached_drives) if zlr.instance.drive is not None: zlr_drive_gib += zlr.instance.drive.size_gib @@ -137,6 +151,7 @@ def zonal_summary(zlr): return ( zlr_family, + zlr_instance_name, zlr.count, zlr_cpu, zlr_cost,