Skip to content

Commit

Permalink
Fix linter warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
laurigates committed Sep 19, 2023
1 parent 0b3a0b1 commit 68c7ce9
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions kafka2influxdb.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import logging
import os
from pprint import pformat
Expand All @@ -13,7 +12,8 @@
"""
Consume parsed data from Kafka and save it to InfluxDB V2 database.
Parsed data is expected to be in the following "well-known" "JSON timeseries"-like format:
Parsed data is expected to be in the following "well-known" "JSON
timeseries"-like format:
parsed_data = {
"header": {
Expand Down Expand Up @@ -46,7 +46,10 @@
"device": {
"device_id": "B81758FFFE031234",
# FIXME: this is incorrect format for device_metadata now
"device_metadata": {"name": "Elsys ERS CO2 A81758FFFE035729", "parser_module": "fvhiot.parsers.elsys"},
"device_metadata": {
"name": "Elsys ERS CO2 A81758FFFE035729",
"parser_module": "fvhiot.parsers.elsys"
},
"device_state": {"state data": "is here"},
},
"meta": {
Expand Down Expand Up @@ -77,14 +80,16 @@


def parsed_data_to_influxdb_format(
measurement_name, device_id, data: dict, extra_fields: dict = None, extra_tags: dict = None
measurement_name, device_id, data: dict, extra_fields: dict = None,
extra_tags: dict = None
) -> list:
"""
Convert parsed data to InfluxDB datapoints format, see example above.
:param measurement_name: name of the measurement, e.g. "elsys"
:param device_id: device ID, e.g. "B81758FFFE031234"
:param data: parsed data in "well-known" "JSON timeseries"-like format
:param extra_fields: extra fields to add to each datapoint, e.g. {"rssi": rssi_value}
:param extra_fields: extra fields to add to each datapoint, e.g. {"rssi":
rssi_value}
:param extra_tags: extra tags to add to each datapoint, e.g. {"dev-type": "sensor"}
"""
influxdb_points = []
Expand Down Expand Up @@ -129,7 +134,8 @@ def main():
logging.debug(pformat(data, width=120))
measurement_name = data["device"]["parser_module"].split(".")[-1]
device_id = data["device"]["device_id"]
influxdb_datapoints = parsed_data_to_influxdb_format(measurement_name, device_id, data)
influxdb_datapoints = parsed_data_to_influxdb_format(measurement_name,
device_id, data)
with client.write_api(write_options=SYNCHRONOUS) as write_api:
write_api.write(bucket, org, influxdb_datapoints)
logging.info(f"Saved {len(influxdb_datapoints)} datapoints to InfluxDB")
Expand Down

0 comments on commit 68c7ce9

Please sign in to comment.