Skip to content
This repository has been archived by the owner on Oct 3, 2020. It is now read-only.

Commit

Permalink
Merge pull request #93 from hjacobs/cluster-registry
Browse files Browse the repository at this point in the history
#73 implement --cluster-registry-url option
  • Loading branch information
hjacobs authored Jan 14, 2017
2 parents aabb42d + c647f15 commit 484d3e9
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 25 deletions.
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ The following environment variables are supported:
Optional token endpoint URL for the OAuth 2 Authorization Code Grant flow.
``CLUSTERS``
Comma separated list of Kubernetes API server URLs. It defaults to ``http://localhost:8001/`` (default endpoint of ``kubectl proxy``).
``CLUSTER_REGISTRY_URL``
URL to cluster registry returning list of Kubernetes clusters.
``CREDENTIALS_DIR``
Directory to read (OAuth) credentials from --- these credentials are only used for non-localhost cluster URLs.
``DEBUG``
Expand Down
26 changes: 25 additions & 1 deletion docs/multiple-clusters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,28 @@ Multiple Clusters

Set the ``CLUSTERS`` environment variable to a comma separated list of Kubernetes API server URLs.

TODO: how to configure authentication, registry, ..
Cluster Registry
================

Clusters can be dynamically discovered by providing one HTTP endpoint as the cluster registry.
Set either the ``CLUSTER_REGISTRY_URL`` environment variable or the ``--cluster-registry-url`` option to an URL conforming to:

.. code-block:: bash
$ curl -H 'Authorization: Bearer mytoken' $CLUSTER_REGISTRY_URL/kubernetes-clusters
{
"items": [
{
"id": "my-cluster-id",
"api_server_url": "https://my-cluster.example.org"
}
]
}
The cluster registry will be queryied with an OAuth Bearer token, the token can be statically set via the ``OAUTH2_ACCESS_TOKENS`` environment variable.
Example:

.. code-block:: bash
$ token=mysecrettoken
$ docker run -it -p 8080:8080 -e OAUTH2_ACCESS_TOKENS=read-only=$token hjacobs/kube-ops-view --cluster-registry-url=https://cluster-registry.example.org
60 changes: 56 additions & 4 deletions kube_ops_view/cluster_discovery.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
import time
from urllib.parse import urljoin

import kubernetes.client
import kubernetes.config
import logging
import re
import requests
import tokens
from requests.auth import AuthBase

DEFAULT_CLUSTERS = 'http://localhost:8001/'
CLUSTER_ID_INVALID_CHARS = re.compile('[^a-z0-9:-]')

logger = logging.getLogger(__name__)

tokens.configure(from_file_only=True)


def generate_cluster_id(url: str):
'''Generate some "cluster ID" from given API server URL'''
for prefix in ('https://', 'http://'):
if url.startswith(prefix):
url = url[len(prefix):]
return CLUSTER_ID_INVALID_CHARS.sub('-', url.lower()).strip('-')


class StaticTokenAuth(AuthBase):
def __init__(self, token):
self.token = token
Expand All @@ -29,25 +46,29 @@ def __call__(self, request):


class Cluster:
def __init__(self, api_server_url, ssl_ca_cert=None, auth=None):
def __init__(self, id, api_server_url, ssl_ca_cert=None, auth=None):
self.id = id
self.api_server_url = api_server_url
self.ssl_ca_cert = ssl_ca_cert
self.auth = auth


class StaticClusterDiscoverer:

def __init__(self, api_server_urls):
def __init__(self, api_server_urls: list):
self._clusters = []

if not api_server_urls:
try:
kubernetes.config.load_incluster_config()
except kubernetes.config.ConfigException:
cluster = Cluster('http://localhost:8001')
# we are not running inside a cluster
# => assume default kubectl proxy URL
cluster = Cluster(generate_cluster_id(DEFAULT_CLUSTERS), DEFAULT_CLUSTERS)
else:
config = kubernetes.client.configuration
cluster = Cluster(
generate_cluster_id(config.host),
config.host,
ssl_ca_cert=config.ssl_ca_cert,
auth=StaticTokenAuth(config.api_key['authorization'].split(' ', 1)[-1]))
Expand All @@ -60,7 +81,38 @@ def __init__(self, api_server_urls):
auth = OAuthTokenAuth('read-only')
else:
auth = None
self._clusters.append(Cluster(api_server_url, auth=auth))
self._clusters.append(Cluster(generate_cluster_id(api_server_url), api_server_url, auth=auth))

def get_clusters(self):
return self._clusters


class ClusterRegistryDiscoverer:

def __init__(self, cluster_registry_url: str, cache_lifetime=60):
self._url = cluster_registry_url
self._cache_lifetime = cache_lifetime
self._last_cache_refresh = 0
self._clusters = []
self._session = requests.Session()
self._session.auth = OAuthTokenAuth('read-only')

def refresh(self):
try:
response = self._session.get(urljoin(self._url, '/kubernetes-clusters'), timeout=10)
response.raise_for_status()
clusters = []
for row in response.json()['items']:
# only consider "ready" clusters
if row.get('lifecycle_status', 'ready') == 'ready':
clusters.append(Cluster(row['id'], row['api_server_url'], auth=OAuthTokenAuth('read-only')))
self._clusters = clusters
self._last_cache_refresh = time.time()
except:
logger.exception('Failed to refresh from cluster registry {}'.format(self._url))

def get_clusters(self):
now = time.time()
if now - self._last_cache_refresh > self._cache_lifetime:
self.refresh()
return self._clusters
21 changes: 6 additions & 15 deletions kube_ops_view/kubernetes.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
import datetime
import logging
import re
from urllib.parse import urljoin

import requests

CLUSTER_ID_INVALID_CHARS = re.compile('[^a-z0-9:-]')
logger = logging.getLogger(__name__)

session = requests.Session()


def generate_cluster_id(url: str):
'''Generate some "cluster ID" from given API server URL'''
for prefix in ('https://', 'http://'):
if url.startswith(prefix):
url = url[len(prefix):]
return CLUSTER_ID_INVALID_CHARS.sub('-', url.lower()).strip('-')


def map_node_status(status: dict):
return {
'addresses': status.get('addresses'),
Expand Down Expand Up @@ -62,8 +53,8 @@ def request(cluster, path, **kwargs):

def get_kubernetes_clusters(cluster_discoverer):
for cluster in cluster_discoverer.get_clusters():
cluster_id = cluster.id
api_server_url = cluster.api_server_url
cluster_id = generate_cluster_id(api_server_url)
response = request(cluster, '/api/v1/nodes')
response.raise_for_status()
nodes = {}
Expand Down Expand Up @@ -94,18 +85,18 @@ def get_kubernetes_clusters(cluster_discoverer):
response.raise_for_status()
data = response.json()
if not data.get('items'):
logging.info('Heapster node metrics not available (yet)')
logger.info('Heapster node metrics not available (yet)')
else:
for metrics in data['items']:
nodes[metrics['metadata']['name']]['usage'] = metrics['usage']
except:
logging.exception('Failed to get node metrics')
logger.exception('Failed to get node metrics')
try:
response = request(cluster, '/api/v1/namespaces/kube-system/services/heapster/proxy/apis/metrics/v1alpha1/pods')
response.raise_for_status()
data = response.json()
if not data.get('items'):
logging.info('Heapster pod metrics not available (yet)')
logger.info('Heapster pod metrics not available (yet)')
else:
for metrics in data['items']:
pod = pods_by_namespace_name.get((metrics['metadata']['namespace'], metrics['metadata']['name']))
Expand All @@ -115,5 +106,5 @@ def get_kubernetes_clusters(cluster_discoverer):
if container['name'] == container_metrics['name']:
container['resources']['usage'] = container_metrics['usage']
except:
logging.exception('Failed to get pod metrics')
logger.exception('Failed to get pod metrics')
yield {'id': cluster_id, 'api_server_url': api_server_url, 'nodes': nodes, 'unassigned_pods': unassigned_pods}
14 changes: 10 additions & 4 deletions kube_ops_view/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from .mock import get_mock_clusters
from .kubernetes import get_kubernetes_clusters
from .stores import MemoryStore, RedisStore
from .cluster_discovery import DEFAULT_CLUSTERS, StaticClusterDiscoverer
from .cluster_discovery import DEFAULT_CLUSTERS, StaticClusterDiscoverer, ClusterRegistryDiscoverer


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -216,7 +216,8 @@ def print_version(ctx, param, value):
@click.option('--redis-url', help='Redis URL to use for pub/sub and job locking', envvar='REDIS_URL')
@click.option('--clusters', help='Comma separated list of Kubernetes API server URLs (default: {})'.format(DEFAULT_CLUSTERS),
envvar='CLUSTERS')
def main(port, debug, mock, secret_key, redis_url, clusters):
@click.option('--cluster-registry-url', help='URL to cluster registry', envvar='CLUSTER_REGISTRY_URL')
def main(port, debug, mock, secret_key, redis_url, clusters, cluster_registry_url):
logging.basicConfig(level=logging.DEBUG if debug else logging.INFO)

store = RedisStore(redis_url) if redis_url else MemoryStore()
Expand All @@ -225,8 +226,13 @@ def main(port, debug, mock, secret_key, redis_url, clusters):
app.secret_key = secret_key
app.store = store

api_server_urls = clusters.split(',') if clusters else []
gevent.spawn(update, cluster_discoverer=StaticClusterDiscoverer(api_server_urls), store=store, mock=mock)
if cluster_registry_url:
discoverer = ClusterRegistryDiscoverer(cluster_registry_url)
else:
api_server_urls = clusters.split(',') if clusters else []
discoverer = StaticClusterDiscoverer(api_server_urls)

gevent.spawn(update, cluster_discoverer=discoverer, store=store, mock=mock)

signal.signal(signal.SIGTERM, exit_gracefully)
http_server = gevent.wsgi.WSGIServer(('0.0.0.0', port), app)
Expand Down
4 changes: 3 additions & 1 deletion kube_ops_view/stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from redlock import Redlock
from queue import Queue

logger = logging.getLogger(__name__)

ONE_YEAR = 3600 * 24 * 365


Expand Down Expand Up @@ -87,7 +89,7 @@ class RedisStore:
'''Redis-based backend for deployments with replicas > 1'''

def __init__(self, url: str):
logging.info('Connecting to Redis on {}..'.format(url))
logger.info('Connecting to Redis on {}..'.format(url))
self._redis = redis.StrictRedis.from_url(url)
self._redlock = Redlock([url])

Expand Down

0 comments on commit 484d3e9

Please sign in to comment.