diff --git a/kafka2influxdb.py b/kafka2influxdb.py index 45cfec6..ef3f1e7 100644 --- a/kafka2influxdb.py +++ b/kafka2influxdb.py @@ -1,4 +1,3 @@ -import json import logging import os from pprint import pformat @@ -13,7 +12,8 @@ """ Consume parsed data from Kafka and save it to InfluxDB V2 database. -Parsed data is expected to be in the following "well-known" "JSON timeseries"-like format: +Parsed data is expected to be in the following "well-known" "JSON +timeseries"-like format: parsed_data = { "header": { @@ -46,7 +46,10 @@ "device": { "device_id": "B81758FFFE031234", # FIXME: this is incorrect format for device_metadata now - "device_metadata": {"name": "Elsys ERS CO2 A81758FFFE035729", "parser_module": "fvhiot.parsers.elsys"}, + "device_metadata": { + "name": "Elsys ERS CO2 A81758FFFE035729", + "parser_module": "fvhiot.parsers.elsys" + }, "device_state": {"state data": "is here"}, }, "meta": { @@ -77,14 +80,16 @@ def parsed_data_to_influxdb_format( - measurement_name, device_id, data: dict, extra_fields: dict = None, extra_tags: dict = None + measurement_name, device_id, data: dict, extra_fields: dict = None, + extra_tags: dict = None ) -> list: """ Convert parsed data to InfluxDB datapoints format, see example above. :param measurement_name: name of the measurement, e.g. "elsys" :param device_id: device ID, e.g. "B81758FFFE031234" :param data: parsed data in "well-known" "JSON timeseries"-like format - :param extra_fields: extra fields to add to each datapoint, e.g. {"rssi": rssi_value} + :param extra_fields: extra fields to add to each datapoint, e.g. {"rssi": + rssi_value} :param extra_tags: extra tags to add to each datapoint, e.g. {"dev-type": "sensor"} """ influxdb_points = [] @@ -129,7 +134,8 @@ def main(): logging.debug(pformat(data, width=120)) measurement_name = data["device"]["parser_module"].split(".")[-1] device_id = data["device"]["device_id"] - influxdb_datapoints = parsed_data_to_influxdb_format(measurement_name, device_id, data) + influxdb_datapoints = parsed_data_to_influxdb_format(measurement_name, + device_id, data) with client.write_api(write_options=SYNCHRONOUS) as write_api: write_api.write(bucket, org, influxdb_datapoints) logging.info(f"Saved {len(influxdb_datapoints)} datapoints to InfluxDB")