Skip to content

Commit

Permalink
Merge pull request #12 from real-digital/INFRA-1006-hide-password-dur…
Browse files Browse the repository at this point in the history
…ing-init

hash sensitive values before logging them
  • Loading branch information
garrettthomaskth authored Feb 14, 2020
2 parents 99c7ee5 + 69883a2 commit 53fb922
Show file tree
Hide file tree
Showing 12 changed files with 387 additions and 192 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ verify_ssl = true
pyconnect = {path = ".",editable = true,extras = ["test", "dev"]}

[packages]
pycodestyle = "*"

[requires]
python_version = "3.6"
Expand Down
387 changes: 232 additions & 155 deletions Pipfile.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from setuptools import setup

required = ["confluent-kafka[avro]>=1.0", "pyaml>=3.13"]
# TODO remove avro-python3 version pin once they fixed the pycodestyle import
required = ["confluent-kafka[avro]>=1.0", "pyaml>=3.13", "avro-python3==1.9.1"]


setup(
name="pyconnect",
version="0.4.0",
version="0.4.1",
packages=["pyconnect"],
package_dir={"": "src"},
# minimal requirements to run pyconnect
Expand Down
13 changes: 12 additions & 1 deletion src/pyconnect/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import re
from pathlib import Path
from typing import Any, Callable, Dict, List, Pattern, Type, Union

import yaml

from .core import PyConnectException
Expand Down Expand Up @@ -224,6 +223,10 @@ def _validate_ast_tree(tree: ast.AST) -> None:
raise ValueError(f"Illegal node found: {node}")


def bool_from_string_parser(string: str) -> bool:
return string.lower() == "true"


def csv_line_reader(separator=",", quoter='"', escaper="\\", strip_chars="\r\t\n ") -> Callable[[str], List[str]]:
"""
Creates a function that parses a **line** in csv format using the given parameters and returns a list of strings.
Expand Down Expand Up @@ -312,6 +315,12 @@ class BaseConfig(dict):
30 minutes. See :func:`pyconnect.config.timedelta_parser` for more info.
*Default is 30m*
**hash_sensitive_values**: bool
If true, sensitive values (e.g. sasl.password) from the kafka_opts configurations are
hashed and logged with the hashing parameters, so that the values can be validated.
If false, the sensitive values are replaced by "****", offering no opprotunity to validate.
Default is true.
**kafka_opts**: Dict[str, str]
Additional options that shall be passed to the underlying Kafka library.
See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for documentation.
Expand All @@ -326,6 +335,7 @@ class BaseConfig(dict):
__parsers = {
"bootstrap_servers": csv_line_reader(),
"offset_commit_interval": timedelta_parser,
"hash_sensitive_values": bool_from_string_parser,
"kafka_opts": json.loads,
}

Expand All @@ -334,6 +344,7 @@ def __init__(self, conf_dict: Dict[str, Any]) -> None:
self["bootstrap_servers"] = conf_dict.pop("bootstrap_servers")
self["schema_registry"] = conf_dict.pop("schema_registry")
self["offset_commit_interval"] = conf_dict.pop("offset_commit_interval", "30m")
self["hash_sensitive_values"] = conf_dict.pop("hash_sensitive_values", "true")
self["kafka_opts"] = conf_dict.pop("kafka_opts", {})

if len(conf_dict) != 0:
Expand Down
35 changes: 34 additions & 1 deletion src/pyconnect/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

import logging
from abc import ABCMeta, abstractmethod
import hashlib
from enum import Enum
from typing import Any, Callable, Optional
import os
from typing import Any, Callable, Optional, Dict

from confluent_kafka import KafkaException
from confluent_kafka.cimpl import Message
Expand All @@ -26,6 +28,37 @@ def message_repr(msg: Message) -> str:
)


def hide_sensitive_values(
config: Dict[str, Any], algorithm: str = "sha256", iterations: int = 100000, hash_sensitive_values: bool = True
) -> Dict[str, Any]:
"""
This function takes a kakfa configuration dictionary and hashes all present sensitive values (i.e. any keys from
"ssl.key.password", "ssl.keystore.password", "sasl.password", "ssl.key.pem", "ssl_key" which are in the dictionary).
By default the hashed value is logged with the hashing parameters. If you do not want this behavior and would rather
the sensitive_value be replaced by "****", set `hash_sensitive_values` to False.
Note: The original dictionary is not modified.
:param config: Kafka config dictionary.
:param algorithm: Hash algorithm.
:param iterations: Number of times to run the hashing algorithm.
:param hash_sensitive_values: Should the hashing parameters be logged? Set to `False` if you don't need to be able
to check the secret value; this replaces the sensitive value with "****".
:return: a dictionary in which the sensitive values are hashed.
"""
SENSITIVE_KEYS = ["ssl.key.password", "ssl.keystore.password", "sasl.password", "ssl.key.pem", "ssl_key"]

config_copy = config.copy()
salt = os.urandom(32)
for key in SENSITIVE_KEYS:
if key in config_copy:
if hash_sensitive_values:
hashed_password = hashlib.pbkdf2_hmac(algorithm, config_copy[key].encode(), salt, iterations).hex()
config_copy[key] = f"$PBKDF2-HMAC-{algorithm.upper()}:{salt.hex()}:{iterations}${hashed_password}"
else:
config_copy[key] = "****"
return config_copy


class PyConnectException(Exception):
"""
Base Class for all exceptions raised by the PyConnect framework.
Expand Down
6 changes: 4 additions & 2 deletions src/pyconnect/pyconnectsink.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from confluent_kafka.cimpl import KafkaError

from .config import SinkConfig
from .core import BaseConnector, Status, message_repr
from .core import BaseConnector, Status, message_repr, hide_sensitive_values

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -146,6 +146,7 @@ def __init__(self, config: SinkConfig) -> None:
self._consumer: RichAvroConsumer = self._make_consumer()

def _make_consumer(self) -> RichAvroConsumer:
hash_sensitive_values = self.config["hash_sensitive_values"]
config = {
"bootstrap.servers": ",".join(self.config["bootstrap_servers"]),
"group.id": self.config["group_id"],
Expand All @@ -161,7 +162,8 @@ def _make_consumer(self) -> RichAvroConsumer:
**self.config["kafka_opts"],
}
consumer = RichAvroConsumer(config)
logger.info(f"AvroConsumer created with config: {config}")
hidden_config = hide_sensitive_values(config, hash_sensitive_values=hash_sensitive_values)
logger.info(f"AvroConsumer created with config: {hidden_config}")
# noinspection PyArgumentList
consumer.subscribe(self.config["topics"], on_assign=self._on_assign, on_revoke=self._on_revoke)
return consumer
Expand Down
9 changes: 8 additions & 1 deletion src/pyconnect/pyconnectsource.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from abc import ABCMeta, abstractmethod
from time import sleep
from typing import Any, Optional, Tuple
Expand All @@ -7,7 +8,9 @@

from .avroparser import to_key_schema, to_value_schema
from .config import SourceConfig
from .core import BaseConnector, PyConnectException, Status
from .core import BaseConnector, PyConnectException, Status, hide_sensitive_values

logger = logging.getLogger(__name__)


class PyConnectSource(BaseConnector, metaclass=ABCMeta):
Expand All @@ -32,10 +35,14 @@ def _make_producer(self) -> AvroProducer:
Creates the underlying instance of :class:`confluent_kafka.avro.AvroProducer` which is used to publish
messages and producer offsets.
"""
hash_sensitive_values = self.config["hash_sensitive_values"]
config = {
"bootstrap.servers": ",".join(self.config["bootstrap_servers"]),
"schema.registry.url": self.config["schema_registry"],
**self.config["kafka_opts"],
}
hidden_config = hide_sensitive_values(config, hash_sensitive_values=hash_sensitive_values)
logger.info(f"AvroProducer created with config: {hidden_config}")
return AvroProducer(config)

def _make_offset_consumer(self) -> AvroConsumer:
Expand Down
35 changes: 22 additions & 13 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def cluster_config() -> Dict[str, str]:


@pytest.fixture(scope="session")
def assert_cluster_running(cluster_config) -> None:
def assert_cluster_running(cluster_config: Dict[str, str]) -> None:
"""
Makes sure the kafka cluster is running by checking whether the rest-proxy service returns the topics
"""
Expand All @@ -98,7 +98,7 @@ def assert_cluster_running(cluster_config) -> None:


@pytest.fixture(scope="session")
def running_cluster_config(cluster_config, assert_cluster_running) -> Dict[str, str]:
def running_cluster_config(cluster_config: Dict[str, str], assert_cluster_running) -> Dict[str, str]:
"""
Reads the docker-compose.yml in order to determine the host names and ports of the different services necessary
for the kafka cluster.
Expand All @@ -109,7 +109,7 @@ def running_cluster_config(cluster_config, assert_cluster_running) -> Dict[str,


@pytest.fixture(params=[1, 2, 4], ids=["num_partitions=1", "num_partitions=2", "num_partitions=4"])
def topic(request, running_cluster_config) -> Iterable[Tuple[str, int]]:
def topic_and_partitions(request, running_cluster_config: Dict[str, str]) -> Iterable[Tuple[str, int]]:
"""
Creates a kafka topic consisting of a random 5 character string and being partition into 1, 2 or 4 partitions.
Then it yields the tuple (topic, n_partitions).
Expand Down Expand Up @@ -137,7 +137,7 @@ def topic(request, running_cluster_config) -> Iterable[Tuple[str, int]]:
).stdout.decode()
logger.info(creation_output)

yield (topic_id, partitions)
yield topic_id, partitions

description_output = subprocess.run(
[
Expand All @@ -154,11 +154,13 @@ def topic(request, running_cluster_config) -> Iterable[Tuple[str, int]]:


@pytest.fixture
def plain_avro_producer(running_cluster_config, topic) -> confluent_avro.AvroProducer:
def plain_avro_producer(
running_cluster_config: Dict[str, str], topic_and_partitions: Tuple[str, int]
) -> confluent_avro.AvroProducer:
"""
Creates a plain `confluent_kafka.avro.AvroProducer` that can be used to publish messages.
"""
topic_id, partitions = topic
topic_id, partitions = topic_and_partitions
producer_config = {
"bootstrap.servers": running_cluster_config["broker"],
"schema.registry.url": running_cluster_config["schema-registry"],
Expand All @@ -169,14 +171,22 @@ def plain_avro_producer(running_cluster_config, topic) -> confluent_avro.AvroPro
return producer


Record = Tuple[Any, Any]
RecordList = List[Record]


@pytest.fixture
def produced_messages(
records, plain_avro_producer, topic, running_cluster_config, consume_all
records: RecordList,
plain_avro_producer,
topic_and_partitions: Tuple[str, int],
running_cluster_config: Dict[str, str],
consume_all,
) -> Iterable[List[Tuple[str, dict]]]:
"""
Creates 15 random messages, produces them to the currently active topic and then yields them for the test.
"""
topic_id, partitions = topic
topic_id, partitions = topic_and_partitions

key, value = records[0]
key_schema = to_key_schema(key)
Expand Down Expand Up @@ -256,8 +266,6 @@ def eof_message(error_message_factory) -> Message:
return error_message_factory(error_code=KafkaError._PARTITION_EOF)


Record = Tuple[Any, Any]
RecordList = List[Record]
ConsumeAll = Callable[..., RecordList]


Expand All @@ -270,11 +278,11 @@ def records() -> RecordList:


@pytest.fixture
def consume_all(topic, running_cluster_config) -> Iterable[ConsumeAll]:
def consume_all(topic_and_partitions: Tuple[str, int], running_cluster_config: Dict[str, str]) -> Iterable[ConsumeAll]:
"""
Creates a function that consumes and returns all messages for the current test's topic.
"""
topic_id, _ = topic
topic_id, _ = topic_and_partitions

consumer = AvroConsumer(
{
Expand All @@ -286,11 +294,12 @@ def consume_all(topic, running_cluster_config) -> Iterable[ConsumeAll]:
}
)
consumer.subscribe([topic_id])
consumer.list_topics()

def consume_all_() -> RecordList:
records = []
while True:
msg = consumer.poll(timeout=2)
msg = consumer.poll(timeout=10)
if msg is None:
break
if msg.error() is not None:
Expand Down
33 changes: 33 additions & 0 deletions test/test_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import hashlib
import re

from pyconnect.core import hide_sensitive_values


def test_hide_sensitive_values_hashes():
config = {"sasl.password": "unhashed password", "regular_key": "regular value"}
hashed_config = hide_sensitive_values(config)
hash_pattern = r"\$PBKDF2-HMAC-(?P<algo>[^:]+):(?P<salt>[^:]+):(?P<iterations>\d+)\$(?P<hash>\w+)"
groups = re.match(hash_pattern, hashed_config["sasl.password"]).groupdict()
recomputed_hash = hashlib.pbkdf2_hmac(
groups["algo"].lower(), b"unhashed password", bytes.fromhex(groups["salt"]), int(groups["iterations"])
)

assert recomputed_hash.hex() == groups["hash"]
assert hashed_config["sasl.password"] != config["sasl.password"]
assert hashed_config["regular_key"] == config["regular_key"]


def test_hide_sensitive_values_obfuscates():
config = {"sasl.password": "unhashed password", "regular_key": "regular value"}
hidden_config = hide_sensitive_values(config, hash_sensitive_values=False)

assert hidden_config["sasl.password"] == "****"
assert hidden_config["regular_key"] == config["regular_key"]


def test_hide_sensitive_values_doesnt_hash_when_it_shouldnt():
config = {"not_sensitive_key": "not sensitive key", "regular_key": "regular value"}
hashed_config = hide_sensitive_values(config)

assert hashed_config == config
27 changes: 21 additions & 6 deletions test/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@
import pathlib
import shutil
import subprocess
from typing import Tuple, List, Dict

import pytest

from .conftest import RecordList, ConsumeAll
from .utils import ROOT_DIR, compare_lists_unordered

EXAMPLES_DIR = ROOT_DIR / "examples"


@pytest.fixture
def tmp_with_pyconnect(tmpdir):
def tmp_with_pyconnect(tmpdir: pathlib.Path) -> Tuple[pathlib.Path, pathlib.Path]:
tmpdir = pathlib.Path(tmpdir).absolute()
venv_name = ".test_venv"
venv_bin = tmpdir / venv_name / "bin"
Expand All @@ -23,14 +25,19 @@ def tmp_with_pyconnect(tmpdir):


@pytest.mark.e2e
def test_file_sink_example(running_cluster_config, topic, produced_messages, tmp_with_pyconnect):
def test_file_sink_example(
running_cluster_config: Dict[str, str],
topic_and_partitions: Tuple[str, int],
produced_messages: List[Tuple[str, dict]],
tmp_with_pyconnect: Tuple[pathlib.Path, pathlib.Path],
):
tmpdir, venv_bin = tmp_with_pyconnect
sinkfile = tmpdir / "sink_dir" / "sinkfile"

env_vars = {
"PYCONNECT_BOOTSTRAP_SERVERS": running_cluster_config["broker"],
"PYCONNECT_SCHEMA_REGISTRY": running_cluster_config["schema-registry"],
"PYCONNECT_TOPICS": topic[0],
"PYCONNECT_TOPICS": topic_and_partitions[0],
"PYCONNECT_GROUP_ID": "testgroup",
"PYCONNECT_SINK_DIRECTORY": sinkfile.parent,
"PYCONNECT_SINK_FILENAME": sinkfile.name,
Expand All @@ -54,15 +61,23 @@ def test_file_sink_example(running_cluster_config, topic, produced_messages, tmp


@pytest.mark.e2e
def test_file_source_example(records, running_cluster_config, topic, consume_all, tmp_with_pyconnect):
def test_file_source_example(
records: RecordList,
running_cluster_config: Dict[str, str],
topic_and_partitions: Tuple[str, int],
consume_all: ConsumeAll,
tmp_with_pyconnect: Tuple[pathlib.Path, pathlib.Path],
):
tmpdir, venv_bin = tmp_with_pyconnect
source_file = tmpdir / "source_dir" / "sourcefile"

topic_id, _ = topic_and_partitions

env_vars = {
"PYCONNECT_BOOTSTRAP_SERVERS": running_cluster_config["broker"],
"PYCONNECT_SCHEMA_REGISTRY": running_cluster_config["schema-registry"],
"PYCONNECT_TOPIC": topic[0],
"PYCONNECT_OFFSET_TOPIC": topic[0] + "_offset_topic",
"PYCONNECT_TOPIC": topic_id,
"PYCONNECT_OFFSET_TOPIC": f"{topic_id}_offset_topic",
"PYCONNECT_SOURCE_DIRECTORY": source_file.parent,
"PYCONNECT_SOURCE_FILENAME": source_file.name,
}
Expand Down
Loading

0 comments on commit 53fb922

Please sign in to comment.