Skip to content

Commit

Permalink
fixing pooling policy
Browse files Browse the repository at this point in the history
  • Loading branch information
matheusmota committed Sep 7, 2020
1 parent 37de144 commit 33a1db3
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Config(object):

HARENA_LOGGER_KAFKA_BROKERS = os.environ.get('HARENA_LOGGER_KAFKA_BROKERS', 'kafka1:19092')
HARENA_LOGGER_KAFKA_TOPIC = os.environ.get('HARENA_LOGGER_KAFKA_TOPIC', 'harena-logs')
HARENA_LOGGER_INTERVAL_S = os.environ.get('HARENA_LOGGER_INTERVAL_S', 10)
HARENA_LOGGER_INTERVAL_S = int(os.environ.get('HARENA_LOGGER_INTERVAL_S', 10))


# LOGGING SETTINGS
Expand Down
33 changes: 5 additions & 28 deletions src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,43 +43,20 @@ def run(self):
mongodb_db = mongodb_client[self.mongodb_database]
mongodb_collection = mongodb_db[self.mongodb_collection]

print("Checking for newly streamed messages...")
print("Checking for newly streamed messages during at least {} seconds...".format(self.delay))

for message in self.kafka_consumer:
#print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
print("Found {} events inside the message ".format(len(message.value['harena-log-stream'])))
print("Found {} events inside a message ".format(len(message.value['harena-log-stream'])))

for event in message.value['harena-log-stream']:
mongodb_collection.insert_one(event)



print("Message stream timed out. Waiting {} seconds until next verification...".format(self.delay))
print("Waiting time ({} seconds) for new messages ended.".format(self.delay))
mongodb_client.close()
time.sleep(self.delay)

# mongodb_client = pymongo.MongoClient(web_app.config['HARENA_LOGGER_MONGODB_URL'])
# mongodb_db = mongodb_client[web_app.config['HARENA_LOGGER_MONGODB_DB']]
# mongodb_collection = mongodb_db[web_app.config['HARENA_LOGGER_MONGODB_COLLECTION']]


# while True:




# def connect_to_mongodb(threadName, counter, delay):

# def disconnect_from_mongodb(threadName, counter, delay):

# def consume_from_kafka_and_append_to_mongodb(kafka_brokers, delay):

# def get_num_of_messages(kafka_brokers, delay):







class IndexResource(Resource):

Expand Down Expand Up @@ -153,7 +130,7 @@ def get(self):
kafka_consumer = KafkaConsumer(Config.HARENA_LOGGER_KAFKA_TOPIC, group_id='harena-logger-consumer',
bootstrap_servers=Config.HARENA_LOGGER_KAFKA_BROKERS,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
consumer_timeout_ms=10000)
consumer_timeout_ms=Config.HARENA_LOGGER_INTERVAL_S*1000)
break
except:
pass
Expand Down

0 comments on commit 33a1db3

Please sign in to comment.