Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional tweaks to the Elasticsearch model, moving search nodes into a dedicated sub-model #81

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions service_capacity_modeling/models/org/netflix/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .elasticsearch import nflx_elasticsearch_capacity_model
from .elasticsearch import nflx_elasticsearch_data_capacity_model
from .elasticsearch import nflx_elasticsearch_master_capacity_model
from .elasticsearch import nflx_elasticsearch_search_capacity_model
from .entity import nflx_entity_capacity_model
from .evcache import nflx_evcache_capacity_model
from .kafka import nflx_kafka_capacity_model
Expand All @@ -30,6 +31,7 @@ def models():
"org.netflix.elasticsearch": nflx_elasticsearch_capacity_model,
"org.netflix.elasticsearch.node": nflx_elasticsearch_data_capacity_model,
"org.netflix.elasticsearch.master": nflx_elasticsearch_master_capacity_model,
"org.netflix.elasticsearch.search": nflx_elasticsearch_search_capacity_model,
"org.netflix.entity": nflx_entity_capacity_model,
"org.netflix.cockroachdb": nflx_cockroachdb_capacity_model,
"org.netflix.aurora": nflx_aurora_capacity_model,
Expand Down
150 changes: 72 additions & 78 deletions service_capacity_modeling/models/org/netflix/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def _upsert_params(cluster, params):
class NflxElasticsearchArguments(BaseModel):
copies_per_region: int = Field(
default=3,
description="How many copies of the data will exist e.g. RF=3. If unsupplied"
description="How many copies of the data will exist e.g. RF=3. If not supplied"
" this will be deduced from durability and consistency desires",
)
max_regional_size: int = Field(
Expand All @@ -163,8 +163,8 @@ class NflxElasticsearchArguments(BaseModel):
description="What is the maximum size of a cluster in this region",
)
max_local_disk_gib: int = Field(
# Nodes larger than 4 TiB are painful to recover
default=4096,
# Nodes larger than 8 TiB are painful to recover
default=8192,
description="The maximum amount of data we store per machine",
)
max_rps_to_disk: int = Field(
Expand All @@ -183,15 +183,13 @@ def capacity_plan(
extra_model_arguments: Dict[str, Any],
) -> Optional[CapacityPlan]:

# (FIXME): Need elasticsearch input
# TODO: Use durability requirements to compute RF.
copies_per_region: int = _target_rf(
desires, extra_model_arguments.get("copies_per_region", None)
)
max_regional_size: int = extra_model_arguments.get("max_regional_size", 240)
max_regional_size: int = extra_model_arguments.get("max_regional_size", 120)
max_rps_to_disk: int = extra_model_arguments.get("max_rps_to_disk", 1000)
# Very large nodes are hard to recover
max_local_disk_gib: int = extra_model_arguments.get("max_local_disk_gib", 5000)
max_local_disk_gib: int = extra_model_arguments.get("max_local_disk_gib", 8192)

# the ratio of traffic that should be handled by search nodes.
# 0.0 = no search nodes, all searches handled by data nodes
Expand All @@ -213,7 +211,6 @@ def capacity_plan(
_rps = desires.query_pattern.estimated_read_per_second.mid // zones_in_region

data_rps = _rps / (search_to_data_rps_ratio + 1)
search_rps = search_to_data_rps_ratio * data_rps

# Based on the disk latency and the read latency SLOs we adjust our
# working set to keep more or less data in RAM. Faster drives need
Expand Down Expand Up @@ -278,12 +275,11 @@ def capacity_plan(
# merging can make progress as long as there is some headroom
required_disk_space=lambda x: x * 1.33,
max_local_disk_gib=max_local_disk_gib,
# elasticsearch clusters can autobalance via shard placement
# Elasticsearch clusters can auto-balance via shard placement
cluster_size=lambda x: x,
min_count=1,
# Sidecars/System takes away memory from Elasticsearch
# Elasticsearch uses half of available system max of 32 for compressed
# oops
# which uses half of available system max of 32 for compressed oops
reserve_memory=lambda x: base_mem + max(32, x / 2),
core_reference_ghz=data_requirement.core_reference_ghz,
)
Expand All @@ -293,74 +289,28 @@ def capacity_plan(
params = {"elasticsearch.copies": copies_per_region}
_upsert_params(data_cluster, params)

# elasticsearch clusters generally should try to stay under some total number
# Elasticsearch clusters generally should try to stay under some total number
# of nodes. Orgs do this for all kinds of reasons such as
# * Security group limits. Since you must have < 500 rules if you're
# ingressing public ips)
# * Maintenance. If your restart script does one node at a time you want
# smaller clusters so your restarts don't take months.
# * NxN network issues. Sometimes smaller clusters of bigger nodes
# are better for network propagation
# * Maintenance. If your restart script does one node at a time you want
# smaller clusters so your restarts don't take months.
# * NxN network issues. Sometimes smaller clusters of bigger nodes
# are better for network propagation
if data_cluster.count > (max_regional_size // zones_in_region):
return None
data_requirements = [data_requirement] * zones_in_region
data_clusters = [data_cluster] * zones_in_region

ec2_costs = {
"elasticsearch-data.zonal-clusters": zones_in_region
* data_cluster.annual_cost
}

if search_rps > 0:
search_requirement = _estimate_elasticsearch_requirement(
node_type="search",
instance=instance,
desires=desires,
working_set=0.0,
reads_per_second=search_rps,
zones_in_region=zones_in_region,
copies_per_region=0,
max_rps_to_disk=1, # to avoid divide by zero
)
search_cluster = compute_stateful_zone(
instance=instance,
drive=drive,
needed_cores=int(search_requirement.cpu_cores.mid),
needed_disk_gib=int(search_requirement.disk_gib.mid),
needed_memory_gib=int(search_requirement.mem_gib.mid),
needed_network_mbps=search_requirement.network_mbps.mid,
max_local_disk_gib=0,
# Elasticsearch clusters can autobalance via shard placement
cluster_size=lambda x: x,
min_count=1,
# Sidecars/System takes away memory from Elasticsearch
# Elasticsearch uses half of available system max of 32 for compressed
# oops
reserve_memory=lambda x: base_mem + max(32, x / 2),
core_reference_ghz=search_requirement.core_reference_ghz,
)
search_cluster.cluster_type = "elasticsearch-search"

if search_cluster.count > (max_regional_size // zones_in_region):
return None
ec2_costs["elasticsearch-search.zonal-clusters"] = (
zones_in_region * search_cluster.annual_cost
)

search_requirements = [search_requirement] * zones_in_region
search_clusters = [search_cluster] * zones_in_region
else:
search_requirements = []
search_clusters = []

clusters = Clusters(
annual_costs=ec2_costs,
zonal=data_clusters + search_clusters,
zonal=[data_cluster] * zones_in_region,
regional=[],
)

plan = CapacityPlan(
requirements=Requirements(zonal=data_requirements + search_requirements),
requirements=Requirements(zonal=[data_requirement] * zones_in_region),
candidate_clusters=clusters,
)

Expand All @@ -376,7 +326,7 @@ def capacity_plan(
desires: CapacityDesires,
extra_model_arguments: Dict[str, Any],
) -> Optional[CapacityPlan]:
# only accept running on instances with a lot of RAM and a few CPUs
# Only accept running on instances with a lot of RAM and a few CPUs
if instance.ram_gib <= 24:
return None
if instance.cpu <= 2:
Expand Down Expand Up @@ -413,6 +363,50 @@ def capacity_plan(
)


class NflxElasticsearchSearchCapacityModel(CapacityModel):
@staticmethod
def capacity_plan(
instance: Instance,
drive: Drive,
context: RegionContext,
desires: CapacityDesires,
extra_model_arguments: Dict[str, Any],
) -> Optional[CapacityPlan]:
# Only accept running on instances with a lot of RAM and a few CPUs
if instance.ram_gib <= 24:
return None
if instance.cpu <= 2:
return None

zones_in_region = context.zones_in_region
requirement = CapacityRequirement(
requirement_type="elasticsearch-search-zonal",
core_reference_ghz=desires.core_reference_ghz,
cpu_cores=certain_int(2),
mem_gib=certain_int(24),
context={},
)

cluster = ZoneClusterCapacity(
cluster_type="elasticsearch-search",
count=1,
instance=instance,
attached_drives=[],
annual_cost=instance.annual_cost,
)

ec2_cost = zones_in_region * cluster.annual_cost
clusters = Clusters(
annual_costs={"elasticsearch-search.zonal-clusters": ec2_cost},
zonal=[cluster] * zones_in_region,
)

return CapacityPlan(
requirements=Requirements(zonal=[requirement] * zones_in_region),
candidate_clusters=clusters,
)


class NflxElasticsearchCapacityModel(CapacityModel):
@staticmethod
def capacity_plan(
Expand All @@ -432,21 +426,22 @@ def description():
def compose_with(
user_desires: CapacityDesires, extra_model_arguments: Dict[str, Any]
) -> Tuple[Tuple[str, Callable[[CapacityDesires], CapacityDesires]], ...]:
def _modify_data_desires(
user_desires: CapacityDesires,
) -> CapacityDesires:
def _modify_data_desires(desires: CapacityDesires) -> CapacityDesires:
# data node's model use the full desires
return user_desires
return desires

def _modify_master_desires(
user_desires: CapacityDesires,
) -> CapacityDesires:
# master node's model doesn't use anything from the desires.
return user_desires
def _modify_master_desires(desires: CapacityDesires) -> CapacityDesires:
# master node's model doesn't use anything from the desires
return desires

def _modify_search_desires(desires: CapacityDesires) -> CapacityDesires:
# search node's model doesn't use anything from the desires
return desires

return (
("org.netflix.elasticsearch.node", _modify_data_desires),
("org.netflix.elasticsearch.master", _modify_master_desires),
("org.netflix.elasticsearch.search", _modify_search_desires),
)

@staticmethod
Expand Down Expand Up @@ -508,14 +503,12 @@ def default_desires(user_desires, extra_model_arguments: Dict[str, Any]):
confidence=0.98,
),
),
# Most latency sensitive elasticsearch clusters are in the
# < 100GiB range
# Most latency sensitive Elasticsearch clusters are in the <100GiB range
data_shape=DataShape(
estimated_state_size_gib=Interval(
low=10, mid=100, high=1000, confidence=0.98
),
# Netflix Elasticsearch compresses with Deflate (gzip)
# by default
# Netflix Elasticsearch compresses with Deflate (gzip) by default
estimated_compression_ratio=Interval(
minimum_value=1.4,
maximum_value=8,
Expand Down Expand Up @@ -595,3 +588,4 @@ def default_desires(user_desires, extra_model_arguments: Dict[str, Any]):
nflx_elasticsearch_capacity_model = NflxElasticsearchCapacityModel()
nflx_elasticsearch_data_capacity_model = NflxElasticsearchDataCapacityModel()
nflx_elasticsearch_master_capacity_model = NflxElasticsearchMasterCapacityModel()
nflx_elasticsearch_search_capacity_model = NflxElasticsearchSearchCapacityModel()
27 changes: 26 additions & 1 deletion tests/netflix/test_elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import Counter
from collections import defaultdict

from service_capacity_modeling.capacity_planner import planner
Expand Down Expand Up @@ -123,20 +124,44 @@ def test_es_simple_certain():
)

assert len(cap_plan) > 0, "Resulting cap_plan is empty"
assert all(plan for plan in cap_plan), "One or more plans is empty"

for plan in cap_plan:
assert plan, "One or more plans is empty"
assert plan.candidate_clusters, "candidate_clusters is empty"
assert plan.candidate_clusters.zonal, "candidate_clusters.zonal is empty"
assert (
len(plan.candidate_clusters.zonal) == 9
), "len(candidate_clusters.zonal) != 9"

cluster_type_counts = Counter(
zone.cluster_type for zone in plan.candidate_clusters.zonal
)

assert len(cluster_type_counts) == 3, "Expecting 3 cluster types"
assert (
cluster_type_counts["elasticsearch-search"] == 3
), "Expecting exactly 3 search nodes"
assert (
cluster_type_counts["elasticsearch-master"] == 3
), "Expecting exactly 3 master nodes"
assert (
cluster_type_counts["elasticsearch-data"] >= 3
), "Expecting at least 3 data nodes"


def zonal_summary(zlr):
zlr_cpu = zlr.count * zlr.instance.cpu
zlr_cost = zlr.annual_cost
zlr_family = zlr.instance.family
zlr_instance_name = zlr.instance.name
zlr_drive_gib = sum(dr.size_gib for dr in zlr.attached_drives)
if zlr.instance.drive is not None:
zlr_drive_gib += zlr.instance.drive.size_gib
zlr_drive_gib *= zlr.count

return (
zlr_family,
zlr_instance_name,
zlr.count,
zlr_cpu,
zlr_cost,
Expand Down
Loading