From 52e4024f74277d0f6acea86ceb586383957af838 Mon Sep 17 00:00:00 2001 From: Rodolfo Dalla Costa Date: Thu, 17 Sep 2020 00:50:37 -0300 Subject: [PATCH 1/2] cria validador para dto do kafka --- src/dto.py | 24 ++++++++++++++++++++++++ src/enums.py | 5 +++++ src/server.py | 28 +++++++++++++--------------- 3 files changed, 42 insertions(+), 15 deletions(-) create mode 100644 src/dto.py create mode 100644 src/enums.py diff --git a/src/dto.py b/src/dto.py new file mode 100644 index 0000000..fc89bd1 --- /dev/null +++ b/src/dto.py @@ -0,0 +1,24 @@ +import enums + +class ArenaLoggerDtoValidator(): + @staticmethod + def validateDto(message): + if 'harena-log-stream-version' not in message : + message['harena-log-stream-version'] = str(enums.StreamVersionSpecialCodes.NOT_IDENTIFIED.value) + + #TODO: create an exception handler with custom exceptions + if 'harena-log-stream' not in message: + raise ValueError('Message does not contains harena-log-stream') + + for harena_stream_log_value in message['harena-log-stream']: + if 'topic' not in harena_stream_log_value: + raise ValueError('One or more harena-log-stream items does not contains topic') + break + + if 'payload' not in harena_stream_log_value: + raise ValueError("One or more harena-log-stream items does not contains payload") + break + + if not bool(harena_stream_log_value['payload']): + raise ValueError("One or more harena-log-stream items does not contains key-value pair") + diff --git a/src/enums.py b/src/enums.py new file mode 100644 index 0000000..6bbc1ee --- /dev/null +++ b/src/enums.py @@ -0,0 +1,5 @@ +import enum + +class StreamVersionSpecialCodes(enum.Enum): + NOT_IDENTIFIED = 99 + ERROR = 100 \ No newline at end of file diff --git a/src/server.py b/src/server.py index 8e876f8..d5b62a9 100644 --- a/src/server.py +++ b/src/server.py @@ -8,6 +8,7 @@ import logging import coloredlogs +from dto import ArenaLoggerDtoValidator from flask import Flask, request, jsonify from flask_restful import Resource, Api from config import Config @@ -44,6 +45,7 @@ def run(self): mongodb_collection = mongodb_db[self.mongodb_collection] print("Checking for newly streamed messages during at least {} seconds...".format(self.delay)) + try: for message in self.kafka_consumer: print() @@ -52,10 +54,9 @@ def run(self): for event in message.value['harena-log-stream']: mongodb_collection.insert_one(event) print() - except: + except Exception as e: print("Object is not a JSON or does not have an 'harena-log-stream' array") - print() - + print("Exception trace message: "+str(e)) print("Waiting time ({} seconds) for new messages ended.".format(self.delay)) mongodb_client.close() @@ -69,7 +70,6 @@ def __init__(self, kafka_producer): LOGGER.debug("IndexResource initialized") - def get(self): message = {"message": "Welcome to the Harena Logger module", "kafka_bootstrap_connected" : self.kafka_producer.bootstrap_connected() @@ -84,7 +84,6 @@ def __init__(self, kafka_producer, target_topic): self.target_topic=target_topic - @cross_origin(origin='*') def post(self): @@ -92,17 +91,20 @@ def post(self): # To do: properly evaluate message body parsing message = request.get_json() message['server_timestamp'] = "{}".format(int(round(time.time() * 1000))) - + + ArenaLoggerDtoValidator.validateDto(message) # Asynchronous by default future = self.kafka_producer.send(self.target_topic, json.dumps(message).encode('utf-8')) # Block for 'synchronous' sends record_metadata = future.get(timeout=10) - except KafkaError: + except KafkaError as ke: # Decide what to do if produce request failed... - log.exception() - except: + logging.exception(ke) + except Exception as e: print("could not validate the json") + print("Exception trace message: " + str(e)) + logging.exception(e) # Successful result returns assigned partition and offset # print(future) @@ -120,14 +122,11 @@ def get(self): } return message - if __name__ == '__main__': - kafka_producer = None kafka_consumer = None - while True: try: kafka_producer = KafkaProducer(bootstrap_servers=Config.HARENA_LOGGER_KAFKA_BROKERS) @@ -137,15 +136,14 @@ def get(self): value_deserializer=lambda m: json.loads(m.decode('utf-8')), consumer_timeout_ms=Config.HARENA_LOGGER_INTERVAL_S*1000) break - except: + except Exception as e: + print("Exception trace message: "+ str(e)) pass print("Could not exchange metadata with Kafka bootstrap servers for the first time. Retrying...") time.sleep(1) - - consumer_thread = KafkaMongodbAppender(mongodb_server_url=Config.HARENA_LOGGER_MONGODB_URL, mongodb_database=Config.HARENA_LOGGER_MONGODB_DB, mongodb_collection=Config.HARENA_LOGGER_MONGODB_COLLECTION, From 638e98432f601c4912b10cced2f52c91962cbe85 Mon Sep 17 00:00:00 2001 From: marcosfmmota Date: Fri, 25 Sep 2020 12:14:53 -0300 Subject: [PATCH 2/2] update README --- readme.md => README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) rename readme.md => README.md (99%) diff --git a/readme.md b/README.md similarity index 99% rename from readme.md rename to README.md index 9747957..6ca80e6 100644 --- a/readme.md +++ b/README.md @@ -81,10 +81,11 @@ flask run # running the application ##### System dependencies * flask -* flask-restful * flask-cors -* paho-mqtt +* flask-restful * pymongo +* kafka-python +* coloredlogs ## Configuration