Skip to content

Commit

Permalink
Merge pull request #22 from harena-lab/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
marcosfmmota authored Sep 25, 2020
2 parents 96b5ae7 + 638e984 commit 09faabc
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 17 deletions.
5 changes: 3 additions & 2 deletions readme.md → README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ flask run # running the application
##### System dependencies

* flask
* flask-restful
* flask-cors
* paho-mqtt
* flask-restful
* pymongo
* kafka-python
* coloredlogs

## Configuration

Expand Down
24 changes: 24 additions & 0 deletions src/dto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import enums

class ArenaLoggerDtoValidator():
@staticmethod
def validateDto(message):
if 'harena-log-stream-version' not in message :
message['harena-log-stream-version'] = str(enums.StreamVersionSpecialCodes.NOT_IDENTIFIED.value)

#TODO: create an exception handler with custom exceptions
if 'harena-log-stream' not in message:
raise ValueError('Message does not contains harena-log-stream')

for harena_stream_log_value in message['harena-log-stream']:
if 'topic' not in harena_stream_log_value:
raise ValueError('One or more harena-log-stream items does not contains topic')
break

if 'payload' not in harena_stream_log_value:
raise ValueError("One or more harena-log-stream items does not contains payload")
break

if not bool(harena_stream_log_value['payload']):
raise ValueError("One or more harena-log-stream items does not contains key-value pair")

5 changes: 5 additions & 0 deletions src/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import enum

class StreamVersionSpecialCodes(enum.Enum):
NOT_IDENTIFIED = 99
ERROR = 100
28 changes: 13 additions & 15 deletions src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import coloredlogs

from dto import ArenaLoggerDtoValidator
from flask import Flask, request, jsonify
from flask_restful import Resource, Api
from config import Config
Expand Down Expand Up @@ -44,6 +45,7 @@ def run(self):
mongodb_collection = mongodb_db[self.mongodb_collection]

print("Checking for newly streamed messages during at least {} seconds...".format(self.delay))

try:
for message in self.kafka_consumer:
print()
Expand All @@ -52,10 +54,9 @@ def run(self):
for event in message.value['harena-log-stream']:
mongodb_collection.insert_one(event)
print()
except:
except Exception as e:
print("Object is not a JSON or does not have an 'harena-log-stream' array")
print()

print("Exception trace message: "+str(e))

print("Waiting time ({} seconds) for new messages ended.".format(self.delay))
mongodb_client.close()
Expand All @@ -69,7 +70,6 @@ def __init__(self, kafka_producer):

LOGGER.debug("IndexResource initialized")


def get(self):
message = {"message": "Welcome to the Harena Logger module",
"kafka_bootstrap_connected" : self.kafka_producer.bootstrap_connected()
Expand All @@ -84,25 +84,27 @@ def __init__(self, kafka_producer, target_topic):
self.target_topic=target_topic



@cross_origin(origin='*')
def post(self):

try:
# To do: properly evaluate message body parsing
message = request.get_json()
message['server_timestamp'] = "{}".format(int(round(time.time() * 1000)))


ArenaLoggerDtoValidator.validateDto(message)
# Asynchronous by default
future = self.kafka_producer.send(self.target_topic, json.dumps(message).encode('utf-8'))

# Block for 'synchronous' sends
record_metadata = future.get(timeout=10)
except KafkaError:
except KafkaError as ke:
# Decide what to do if produce request failed...
log.exception()
except:
logging.exception(ke)
except Exception as e:
print("could not validate the json")
print("Exception trace message: " + str(e))
logging.exception(e)

# Successful result returns assigned partition and offset
# print(future)
Expand All @@ -120,14 +122,11 @@ def get(self):
}
return message


if __name__ == '__main__':


kafka_producer = None
kafka_consumer = None


while True:
try:
kafka_producer = KafkaProducer(bootstrap_servers=Config.HARENA_LOGGER_KAFKA_BROKERS)
Expand All @@ -137,15 +136,14 @@ def get(self):
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
consumer_timeout_ms=Config.HARENA_LOGGER_INTERVAL_S*1000)
break
except:
except Exception as e:
print("Exception trace message: "+ str(e))
pass

print("Could not exchange metadata with Kafka bootstrap servers for the first time. Retrying...")
time.sleep(1)




consumer_thread = KafkaMongodbAppender(mongodb_server_url=Config.HARENA_LOGGER_MONGODB_URL,
mongodb_database=Config.HARENA_LOGGER_MONGODB_DB,
mongodb_collection=Config.HARENA_LOGGER_MONGODB_COLLECTION,
Expand Down

0 comments on commit 09faabc

Please sign in to comment.