Skip to content

Commit

Permalink
Merge pull request Aiven-Open#555 from aiven/health-endpoint
Browse files Browse the repository at this point in the history
Implement health status endpoint
  • Loading branch information
jjaakola-aiven authored Mar 17, 2023
2 parents 8727294 + f9c1f02 commit e28b2e1
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 8 deletions.
8 changes: 8 additions & 0 deletions karapace/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import base64
import hashlib
import logging
import os
import re
import secrets
import sys
Expand Down Expand Up @@ -77,11 +78,16 @@ class ACLEntry:
class HTTPAuthorizer:
def __init__(self, filename: str) -> None:
self._auth_filename: str = filename
self._auth_mtime: float = -1
self._refresh_auth_task: Optional[asyncio.Task] = None
self._refresh_auth_awatch_stop_event = asyncio.Event()
# Once first, can raise if file not valid
self._load_authfile()

@property
def authfile_last_modified(self) -> float:
return self._auth_mtime

async def start_refresh_task(self, stats: StatsClient) -> None:
"""Start authfile refresher task"""

Expand Down Expand Up @@ -116,6 +122,7 @@ async def close(self) -> None:

def _load_authfile(self) -> None:
try:
statinfo = os.stat(self._auth_filename)
with open(self._auth_filename) as authfile:
authdata = json_decode(authfile)

Expand All @@ -142,6 +149,7 @@ def _load_authfile(self) -> None:
"Loaded schema registry access control rules: %s",
[(entry.username, entry.operation.value, entry.resource.pattern) for entry in permissions],
)
self._auth_mtime = statinfo.st_mtime
except Exception as ex:
raise InvalidConfiguration("Failed to load auth file") from ex

Expand Down
19 changes: 19 additions & 0 deletions karapace/karapace.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,22 @@
from http import HTTPStatus
from karapace.config import Config
from karapace.rapu import HTTPRequest, HTTPResponse, RestApp
from karapace.utils import json_encode
from typing import Callable, NoReturn, Optional, Union

import aiohttp.web
import time


class KarapaceBase(RestApp):
def __init__(self, config: Config, not_ready_handler: Optional[Callable[[HTTPRequest], None]] = None) -> None:
super().__init__(app_name="karapace", config=config, not_ready_handler=not_ready_handler)

self._process_start_time = time.monotonic()
self.health_hooks = []
# Do not use rapu's etag, readiness and other wrapping
self.app.router.add_route("GET", "/_health", self.health)

self.kafka_timeout = 10
self.route("/", callback=self.root_get, method="GET")
self.log.info("Karapace initialized")
Expand Down Expand Up @@ -62,5 +71,15 @@ def not_found(message: str, sub_code: int, content_type: str) -> NoReturn:
async def root_get(self) -> NoReturn:
self.r({}, "application/json")

async def health(self, _request) -> aiohttp.web.Response:
resp = {"process_uptime_sec": int(time.monotonic() - self._process_start_time)}
for hook in self.health_hooks:
resp.update(await hook())
return aiohttp.web.Response(
body=json_encode(resp, binary=True, compact=True),
status=HTTPStatus.OK.value,
headers={"Content-Type": "application/json"},
)


empty_response = partial(KarapaceBase.r, body={}, status=HTTPStatus.NO_CONTENT, content_type="application/json")
20 changes: 20 additions & 0 deletions karapace/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from dataclasses import dataclass
from kafka import KafkaConsumer
from kafka.coordinator.base import BaseCoordinator
from kafka.errors import NoBrokersAvailable, NodeNotReadyError
Expand Down Expand Up @@ -40,6 +41,15 @@ def get_member_configuration(*, host: str, port: int, scheme: str, master_eligib
}


@dataclass
class SchemaCoordinatorStatus:
is_primary: Optional[bool]
is_primary_eligible: bool
primary_url: Optional[str]
is_running: bool
group_generation_id: int


class SchemaCoordinator(BaseCoordinator):
are_we_master: Optional[bool] = None
master_url: Optional[str] = None
Expand Down Expand Up @@ -212,6 +222,16 @@ def init_schema_coordinator(self) -> None:
)
self.schema_coordinator_ready.set()

def get_coordinator_status(self) -> SchemaCoordinatorStatus:
generation = self.sc.generation() if self.sc is not None else None
return SchemaCoordinatorStatus(
is_primary=self.sc.are_we_master if self.sc is not None else None,
is_primary_eligible=cast(bool, self.config["master_eligibility"]),
primary_url=self.sc.master_url if self.sc is not None else None,
is_running=self.is_alive(),
group_generation_id=generation.generation_id if generation is not None else -1,
)

def get_master_info(self) -> Tuple[Optional[bool], Optional[str]]:
"""Return whether we're the master, and the actual master url that can be used if we're not"""
self.schema_coordinator_ready.wait()
Expand Down
3 changes: 0 additions & 3 deletions karapace/rapu.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,3 @@ def run(self) -> None:
access_log_class=self.config["access_log_class"],
access_log_format='%Tfs %{x-client-ip}i "%r" %s "%{user-agent}i" response=%bb request_body=%{content-length}ib',
)

def add_routes(self):
pass # Override in sub-classes
19 changes: 14 additions & 5 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ def __init__(self) -> None:
self._condition = Condition()
self._greatest_offset = -1 # Would fail if initially this is 0 as it will be first offset ever.

def greatest_offset(self) -> int:
return self._greatest_offset

def offset_seen(self, new_offset: int) -> None:
with self._condition:
self._greatest_offset = max(self._greatest_offset, new_offset)
Expand Down Expand Up @@ -155,13 +158,16 @@ def __init__(
# consumed the produced messages. This makes the REST APIs more
# consistent (e.g. a request to a schema that was just produced will
# return the correct data.)
# - highest_offset's last seen value is stored by the REST API to
# expose reader loading progress for health status endpoint.
# - ready is used by the REST API to wait until this thread has
# synchronized with data in the schema topic. This prevents the server
# from returning stale data (e.g. a schemas has been soft deleted, but
# the topic has not been compacted yet, waiting allows this to consume
# the soft delete message and return the correct data instead of the
# old stale version that has not been deleted yet.)
self.offset = OFFSET_UNINITIALIZED
self._highest_offset = OFFSET_UNINITIALIZED
self.ready = False

# This event controls when the Reader should stop running, it will be
Expand Down Expand Up @@ -216,7 +222,7 @@ def run(self) -> None:
schema_topic_create = [schema_topic]
while not self._stop.is_set() and not schema_topic_exists:
try:
LOG.info("[Schema Topic] Creating %r", schema_topic)
LOG.info("[Schema Topic] Creating %r", schema_topic.name)
self.admin_client.create_topics(schema_topic_create, timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS)
LOG.info("[Schema Topic] Successfully created %r", schema_topic.name)
schema_topic_exists = True
Expand Down Expand Up @@ -272,19 +278,22 @@ def _is_ready(self) -> bool:
return False
# Offset in the response is the offset for the next upcoming message.
# Reduce by one for actual highest offset.
highest_offset = list(offsets.values())[0] - 1
self._highest_offset = list(offsets.values())[0] - 1
cur_time = time.monotonic()
time_from_last_check = cur_time - self.last_check
progress_pct = 0 if not highest_offset else round((self.offset / highest_offset) * 100, 2)
progress_pct = 0 if not self._highest_offset else round((self.offset / self._highest_offset) * 100, 2)
LOG.info(
"Replay progress (%s): %s/%s (%s %%)",
round(time_from_last_check, 2),
self.offset,
highest_offset,
self._highest_offset,
progress_pct,
)
self.last_check = cur_time
return self.offset >= highest_offset
return self.offset >= self._highest_offset

def highest_offset(self) -> int:
return max(self._highest_offset, self.offset_watcher.greatest_offset())

def handle_messages(self) -> None:
assert self.consumer is not None, "Thread must be started"
Expand Down
20 changes: 20 additions & 0 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,26 @@ def __init__(self, config: Config) -> None:

self._forward_client = None
self.app.on_startup.append(self._create_forward_client)
self.health_hooks.append(self.schema_registry_health)

async def schema_registry_health(self) -> dict:
resp = {}
if self._auth is not None:
resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified
resp["schema_registry_ready"] = self.schema_registry.schema_reader.ready
if self.schema_registry.schema_reader.ready:
resp["schema_registry_startup_time_sec"] = (
self.schema_registry.schema_reader.last_check - self._process_start_time
)
resp["schema_registry_reader_current_offset"] = self.schema_registry.schema_reader.offset
resp["schema_registry_reader_highest_offset"] = self.schema_registry.schema_reader.highest_offset()
cs = self.schema_registry.mc.get_coordinator_status()
resp["schema_registry_is_primary"] = cs.is_primary
resp["schema_registry_is_primary_eligible"] = cs.is_primary_eligible
resp["schema_registry_primary_url"] = cs.primary_url
resp["schema_registry_coordinator_running"] = cs.is_running
resp["schema_registry_coordinator_generation_id"] = cs.group_generation_id
return resp

async def _create_forward_client(self, app: aiohttp.web.Application) -> None: # pylint: disable=unused-argument
"""Callback for aiohttp.Application.on_startup"""
Expand Down

0 comments on commit e28b2e1

Please sign in to comment.