Skip to content

Commit

Permalink
Fixed and merge mater with threading and scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
Qmando committed Nov 9, 2018
1 parent 6517af5 commit df481d0
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 122 deletions.
3 changes: 3 additions & 0 deletions elastalert/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def get_module(module_name):
""" Loads a module and returns a specific object.
module_name should 'module.file.object'.
Returns object or raises EAException on error. """
sys.path.append(os.getcwd())
try:
module_path, module_class = module_name.rsplit('.', 1)
base_module = __import__(module_path, globals(), locals(), [module_class])
Expand Down Expand Up @@ -195,6 +196,8 @@ def load_options(rule, conf, filename, args=None):
rule['query_delay'] = datetime.timedelta(**rule['query_delay'])
if 'buffer_time' in rule:
rule['buffer_time'] = datetime.timedelta(**rule['buffer_time'])
if 'run_every' in rule:
rule['run_every'] = datetime.timedelta(**rule['run_every'])
if 'bucket_interval' in rule:
rule['bucket_interval_timedelta'] = datetime.timedelta(**rule['bucket_interval'])
if 'exponential_realert' in rule:
Expand Down
2 changes: 2 additions & 0 deletions elastalert/create_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def main():
filename = 'config.yaml'
elif os.path.isfile(args.config):
filename = args.config
elif os.path.isfile('../config.yaml'):
filename = '../config.yaml'
else:
filename = ''

Expand Down
229 changes: 149 additions & 80 deletions elastalert/elastalert.py

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions elastalert/enhancements.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
from util import pretty_ts


class BaseEnhancement(object):
Expand All @@ -14,6 +15,11 @@ def process(self, match):
raise NotImplementedError()


class TimeEnhancement(BaseEnhancement):
def process(self, match):
match['@timestamp'] = pretty_ts(match['@timestamp'])


class DropMatchException(Exception):
""" ElastAlert will drop a match if this exception type is raised by an enhancement """
pass
13 changes: 11 additions & 2 deletions elastalert/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os

import dateutil.parser
import dateutil.tz
import pytz
from auth import Auth
from elasticsearch import RequestsHttpConnection
from elasticsearch.client import Elasticsearch
Expand Down Expand Up @@ -112,7 +112,7 @@ def ts_to_dt(timestamp):
dt = dateutil.parser.parse(timestamp)
# Implicitly convert local timestamps to UTC
if dt.tzinfo is None:
dt = dt.replace(tzinfo=dateutil.tz.tzutc())
dt = dt.replace(tzinfo=pytz.utc)
return dt


Expand Down Expand Up @@ -365,6 +365,15 @@ def build_es_conn_config(conf):
return parsed_conf


def pytzfy(dt):
# apscheduler requires pytz timezone objects
# This function will replace a dateutil.tz one with a pytz one
if dt.tzinfo is not None:
new_tz = pytz.timezone(dt.tzinfo.tzname('Y is this even required??'))
return dt.replace(tzinfo=new_tz)
return dt


def parse_duration(value):
"""Convert ``unit=num`` spec into a ``timedelta`` object."""
unit, num = value.split('=')
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
-r requirements.txt
coverage
flake8
pre-commit
Expand Down
7 changes: 4 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
apscheduler>=3.3.0
aws-requests-auth>=0.3.0
blist>=1.3.6
boto3>=1.4.4
cffi>=1.11.5
configparser>=3.5.0
croniter>=0.3.16
elasticsearch
Expand All @@ -11,11 +13,10 @@ jsonschema>=2.6.0
mock>=2.0.0
PyStaticConfiguration>=0.10.3
python-dateutil>=2.6.0,<2.7.0
python-magic>=0.4.15
PyYAML>=3.12
requests>=2.0.0
stomp.py>=4.1.17
texttable>=0.8.8
twilio==6.0.0
thehive4py>=1.4.4
python-magic>=0.4.15
cffi>=1.11.5
twilio==6.0.0
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
packages=find_packages(),
package_data={'elastalert': ['schema.yaml']},
install_requires=[
'apscheduler>=3.3.0'
'aws-requests-auth>=0.3.0',
'blist>=1.3.6',
'boto3>=1.4.4',
Expand Down
65 changes: 33 additions & 32 deletions tests/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,19 @@ def test_init_rule(ea):


def test_query(ea):
ea.current_es.search.return_value = {'hits': {'total': 0, 'hits': []}}
ea.thread_data.current_es.search.return_value = {'hits': {'total': 0, 'hits': []}}
ea.run_query(ea.rules[0], START, END)
ea.current_es.search.assert_called_with(body={
ea.thread_data.current_es.search.assert_called_with(body={
'query': {'filtered': {'filter': {'bool': {'must': [{'range': {'@timestamp': {'lte': END_TIMESTAMP, 'gt': START_TIMESTAMP}}}]}}}},
'sort': [{'@timestamp': {'order': 'asc'}}]}, index='idx', _source_include=['@timestamp'], ignore_unavailable=True,
size=ea.rules[0]['max_query_size'], scroll=ea.conf['scroll_keepalive'])


def test_query_with_fields(ea):
ea.rules[0]['_source_enabled'] = False
ea.current_es.search.return_value = {'hits': {'total': 0, 'hits': []}}
ea.thread_data.current_es.search.return_value = {'hits': {'total': 0, 'hits': []}}
ea.run_query(ea.rules[0], START, END)
ea.current_es.search.assert_called_with(body={
ea.thread_data.current_es.search.assert_called_with(body={
'query': {'filtered': {'filter': {'bool': {'must': [{'range': {'@timestamp': {'lte': END_TIMESTAMP, 'gt': START_TIMESTAMP}}}]}}}},
'sort': [{'@timestamp': {'order': 'asc'}}], 'fields': ['@timestamp']}, index='idx', ignore_unavailable=True,
size=ea.rules[0]['max_query_size'], scroll=ea.conf['scroll_keepalive'])
Expand All @@ -109,11 +109,11 @@ def test_query_with_fields(ea):
def test_query_with_unix(ea):
ea.rules[0]['timestamp_type'] = 'unix'
ea.rules[0]['dt_to_ts'] = dt_to_unix
ea.current_es.search.return_value = {'hits': {'total': 0, 'hits': []}}
ea.thread_data.current_es.search.return_value = {'hits': {'total': 0, 'hits': []}}
ea.run_query(ea.rules[0], START, END)
start_unix = dt_to_unix(START)
end_unix = dt_to_unix(END)
ea.current_es.search.assert_called_with(
ea.thread_data.current_es.search.assert_called_with(
body={'query': {'filtered': {'filter': {'bool': {'must': [{'range': {'@timestamp': {'lte': end_unix, 'gt': start_unix}}}]}}}},
'sort': [{'@timestamp': {'order': 'asc'}}]}, index='idx', _source_include=['@timestamp'], ignore_unavailable=True,
size=ea.rules[0]['max_query_size'], scroll=ea.conf['scroll_keepalive'])
Expand All @@ -122,18 +122,18 @@ def test_query_with_unix(ea):
def test_query_with_unixms(ea):
ea.rules[0]['timestamp_type'] = 'unixms'
ea.rules[0]['dt_to_ts'] = dt_to_unixms
ea.current_es.search.return_value = {'hits': {'total': 0, 'hits': []}}
ea.thread_data.current_es.search.return_value = {'hits': {'total': 0, 'hits': []}}
ea.run_query(ea.rules[0], START, END)
start_unix = dt_to_unixms(START)
end_unix = dt_to_unixms(END)
ea.current_es.search.assert_called_with(
ea.thread_data.current_es.search.assert_called_with(
body={'query': {'filtered': {'filter': {'bool': {'must': [{'range': {'@timestamp': {'lte': end_unix, 'gt': start_unix}}}]}}}},
'sort': [{'@timestamp': {'order': 'asc'}}]}, index='idx', _source_include=['@timestamp'], ignore_unavailable=True,
size=ea.rules[0]['max_query_size'], scroll=ea.conf['scroll_keepalive'])


def test_no_hits(ea):
ea.current_es.search.return_value = {'hits': {'total': 0, 'hits': []}}
ea.thread_data.current_es.search.return_value = {'hits': {'total': 0, 'hits': []}}
ea.run_query(ea.rules[0], START, END)
assert ea.rules[0]['type'].add_data.call_count == 0

Expand All @@ -142,15 +142,15 @@ def test_no_terms_hits(ea):
ea.rules[0]['use_terms_query'] = True
ea.rules[0]['query_key'] = 'QWERTY'
ea.rules[0]['doc_type'] = 'uiop'
ea.current_es.search.return_value = {'hits': {'total': 0, 'hits': []}}
ea.thread_data.current_es.search.return_value = {'hits': {'total': 0, 'hits': []}}
ea.run_query(ea.rules[0], START, END)
assert ea.rules[0]['type'].add_terms_data.call_count == 0


def test_some_hits(ea):
hits = generate_hits([START_TIMESTAMP, END_TIMESTAMP])
hits_dt = generate_hits([START, END])
ea.current_es.search.return_value = hits
ea.thread_data.current_es.search.return_value = hits
ea.run_query(ea.rules[0], START, END)
assert ea.rules[0]['type'].add_data.call_count == 1
ea.rules[0]['type'].add_data.assert_called_with([x['_source'] for x in hits_dt['hits']['hits']])
Expand All @@ -162,7 +162,7 @@ def test_some_hits_unix(ea):
ea.rules[0]['ts_to_dt'] = unix_to_dt
hits = generate_hits([dt_to_unix(START), dt_to_unix(END)])
hits_dt = generate_hits([START, END])
ea.current_es.search.return_value = copy.deepcopy(hits)
ea.thread_data.current_es.search.return_value = copy.deepcopy(hits)
ea.run_query(ea.rules[0], START, END)
assert ea.rules[0]['type'].add_data.call_count == 1
ea.rules[0]['type'].add_data.assert_called_with([x['_source'] for x in hits_dt['hits']['hits']])
Expand All @@ -176,7 +176,7 @@ def _duplicate_hits_generator(timestamps, **kwargs):


def test_duplicate_timestamps(ea):
ea.current_es.search.side_effect = _duplicate_hits_generator([START_TIMESTAMP] * 3, blah='duplicate')
ea.thread_data.current_es.search.side_effect = _duplicate_hits_generator([START_TIMESTAMP] * 3, blah='duplicate')
ea.run_query(ea.rules[0], START, ts_to_dt('2014-01-01T00:00:00Z'))

assert len(ea.rules[0]['type'].add_data.call_args_list[0][0][0]) == 3
Expand All @@ -189,7 +189,7 @@ def test_duplicate_timestamps(ea):

def test_match(ea):
hits = generate_hits([START_TIMESTAMP, END_TIMESTAMP])
ea.current_es.search.return_value = hits
ea.thread_data.current_es.search.return_value = hits
ea.rules[0]['type'].matches = [{'@timestamp': END}]
with mock.patch('elastalert.elastalert.elasticsearch_client'):
ea.run_rule(ea.rules[0], END, START)
Expand Down Expand Up @@ -280,7 +280,7 @@ def test_match_with_module_with_agg(ea):
ea.rules[0]['match_enhancements'] = [mod]
ea.rules[0]['aggregation'] = datetime.timedelta(minutes=15)
hits = generate_hits([START_TIMESTAMP, END_TIMESTAMP])
ea.current_es.search.return_value = hits
ea.thread_data.current_es.search.return_value = hits
ea.rules[0]['type'].matches = [{'@timestamp': END}]
with mock.patch('elastalert.elastalert.elasticsearch_client'):
ea.run_rule(ea.rules[0], END, START)
Expand All @@ -294,7 +294,7 @@ def test_match_with_enhancements_first(ea):
ea.rules[0]['aggregation'] = datetime.timedelta(minutes=15)
ea.rules[0]['run_enhancements_first'] = True
hits = generate_hits([START_TIMESTAMP, END_TIMESTAMP])
ea.current_es.search.return_value = hits
ea.thread_data.current_es.search.return_value = hits
ea.rules[0]['type'].matches = [{'@timestamp': END}]
with mock.patch('elastalert.elastalert.elasticsearch_client'):
with mock.patch.object(ea, 'add_aggregated_alert') as add_alert:
Expand All @@ -317,7 +317,7 @@ def test_agg_matchtime(ea):
hits_timestamps = ['2014-09-26T12:34:45', '2014-09-26T12:40:45', '2014-09-26T12:47:45']
alerttime1 = dt_to_ts(ts_to_dt(hits_timestamps[0]) + datetime.timedelta(minutes=10))
hits = generate_hits(hits_timestamps)
ea.current_es.search.return_value = hits
ea.thread_data.current_es.search.return_value = hits
with mock.patch('elastalert.elastalert.elasticsearch_client'):
# Aggregate first two, query over full range
ea.rules[0]['aggregate_by_match_time'] = True
Expand Down Expand Up @@ -373,7 +373,7 @@ def test_agg_not_matchtime(ea):
hits_timestamps = ['2014-09-26T12:34:45', '2014-09-26T12:40:45', '2014-09-26T12:47:45']
match_time = ts_to_dt('2014-09-26T12:55:00Z')
hits = generate_hits(hits_timestamps)
ea.current_es.search.return_value = hits
ea.thread_data.current_es.search.return_value = hits
with mock.patch('elastalert.elastalert.elasticsearch_client'):
with mock.patch('elastalert.elastalert.ts_now', return_value=match_time):
ea.rules[0]['aggregation'] = datetime.timedelta(minutes=10)
Expand Down Expand Up @@ -402,7 +402,7 @@ def test_agg_cron(ea):
ea.max_aggregation = 1337
hits_timestamps = ['2014-09-26T12:34:45', '2014-09-26T12:40:45', '2014-09-26T12:47:45']
hits = generate_hits(hits_timestamps)
ea.current_es.search.return_value = hits
ea.thread_data.current_es.search.return_value = hits
alerttime1 = dt_to_ts(ts_to_dt('2014-09-26T12:46:00'))
alerttime2 = dt_to_ts(ts_to_dt('2014-09-26T13:04:00'))

Expand Down Expand Up @@ -439,7 +439,7 @@ def test_agg_no_writeback_connectivity(ea):
run again, that they will be passed again to add_aggregated_alert """
hit1, hit2, hit3 = '2014-09-26T12:34:45', '2014-09-26T12:40:45', '2014-09-26T12:47:45'
hits = generate_hits([hit1, hit2, hit3])
ea.current_es.search.return_value = hits
ea.thread_data.current_es.search.return_value = hits
ea.rules[0]['aggregation'] = datetime.timedelta(minutes=10)
ea.rules[0]['type'].matches = [{'@timestamp': hit1},
{'@timestamp': hit2},
Expand All @@ -453,7 +453,7 @@ def test_agg_no_writeback_connectivity(ea):
{'@timestamp': hit2, 'num_hits': 0, 'num_matches': 3},
{'@timestamp': hit3, 'num_hits': 0, 'num_matches': 3}]

ea.current_es.search.return_value = {'hits': {'total': 0, 'hits': []}}
ea.thread_data.current_es.search.return_value = {'hits': {'total': 0, 'hits': []}}
ea.add_aggregated_alert = mock.Mock()

with mock.patch('elastalert.elastalert.elasticsearch_client'):
Expand All @@ -469,7 +469,7 @@ def test_agg_with_aggregation_key(ea):
hits_timestamps = ['2014-09-26T12:34:45', '2014-09-26T12:40:45', '2014-09-26T12:43:45']
match_time = ts_to_dt('2014-09-26T12:45:00Z')
hits = generate_hits(hits_timestamps)
ea.current_es.search.return_value = hits
ea.thread_data.current_es.search.return_value = hits
with mock.patch('elastalert.elastalert.elasticsearch_client'):
with mock.patch('elastalert.elastalert.ts_now', return_value=match_time):
ea.rules[0]['aggregation'] = datetime.timedelta(minutes=10)
Expand Down Expand Up @@ -562,7 +562,7 @@ def test_compound_query_key(ea):
ea.rules[0]['query_key'] = 'this,that,those'
ea.rules[0]['compound_query_key'] = ['this', 'that', 'those']
hits = generate_hits([START_TIMESTAMP, END_TIMESTAMP], this='abc', that=u'☃', those=4)
ea.current_es.search.return_value = hits
ea.thread_data.current_es.search.return_value = hits
ea.run_query(ea.rules[0], START, END)
call_args = ea.rules[0]['type'].add_data.call_args_list[0]
assert 'this,that,those' in call_args[0][0][0]
Expand Down Expand Up @@ -604,7 +604,7 @@ def test_silence_query_key(ea):
def test_realert(ea):
hits = ['2014-09-26T12:35:%sZ' % (x) for x in range(60)]
matches = [{'@timestamp': x} for x in hits]
ea.current_es.search.return_value = hits
ea.thread_data.current_es.search.return_value = hits
with mock.patch('elastalert.elastalert.elasticsearch_client'):
ea.rules[0]['realert'] = datetime.timedelta(seconds=50)
ea.rules[0]['type'].matches = matches
Expand Down Expand Up @@ -703,7 +703,7 @@ def test_count(ea):
query['query']['filtered']['filter']['bool']['must'][0]['range']['@timestamp']['lte'] = dt_to_ts(end)
query['query']['filtered']['filter']['bool']['must'][0]['range']['@timestamp']['gt'] = dt_to_ts(start)
start = start + ea.run_every
ea.current_es.count.assert_any_call(body=query, doc_type='doctype', index='idx', ignore_unavailable=True)
ea.thread_data.current_es.count.assert_any_call(body=query, doc_type='doctype', index='idx', ignore_unavailable=True)


def run_and_assert_segmented_queries(ea, start, end, segment_size):
Expand All @@ -727,8 +727,8 @@ def run_and_assert_segmented_queries(ea, start, end, segment_size):
def test_query_segmenting_reset_num_hits(ea):
# Tests that num_hits gets reset every time run_query is run
def assert_num_hits_reset():
assert ea.num_hits == 0
ea.num_hits += 10
assert ea.thread_data.num_hits == 0
ea.thread_data.num_hits += 10
with mock.patch.object(ea, 'run_query') as mock_run_query:
mock_run_query.side_effect = assert_num_hits_reset()
ea.run_rule(ea.rules[0], END, START)
Expand Down Expand Up @@ -915,6 +915,7 @@ def test_kibana_dashboard(ea):


def test_rule_changes(ea):
re = datetime.timedelta(minutes=10)
ea.rule_hashes = {'rules/rule1.yaml': 'ABC',
'rules/rule2.yaml': 'DEF'}
ea.rules = [ea.init_rule(rule, True) for rule in [{'rule_file': 'rules/rule1.yaml', 'name': 'rule1', 'filter': []},
Expand All @@ -926,8 +927,8 @@ def test_rule_changes(ea):

with mock.patch('elastalert.elastalert.get_rule_hashes') as mock_hashes:
with mock.patch('elastalert.elastalert.load_configuration') as mock_load:
mock_load.side_effect = [{'filter': [], 'name': 'rule2', 'rule_file': 'rules/rule2.yaml'},
{'filter': [], 'name': 'rule3', 'rule_file': 'rules/rule3.yaml'}]
mock_load.side_effect = [{'filter': [], 'name': 'rule2', 'rule_file': 'rules/rule2.yaml', 'run_every': re},
{'filter': [], 'name': 'rule3', 'rule_file': 'rules/rule3.yaml', 'run_every': re}]
mock_hashes.return_value = new_hashes
ea.load_rule_changes()

Expand Down Expand Up @@ -1004,9 +1005,9 @@ def test_count_keys(ea):
ea.rules[0]['doc_type'] = 'blah'
buckets = [{'aggregations': {'filtered': {'counts': {'buckets': [{'key': 'a', 'doc_count': 10}, {'key': 'b', 'doc_count': 5}]}}}},
{'aggregations': {'filtered': {'counts': {'buckets': [{'key': 'd', 'doc_count': 10}, {'key': 'c', 'doc_count': 12}]}}}}]
ea.current_es.search.side_effect = buckets
ea.thread_data.current_es.search.side_effect = buckets
counts = ea.get_top_counts(ea.rules[0], START, END, ['this', 'that'])
calls = ea.current_es.search.call_args_list
calls = ea.thread_data.current_es.search.call_args_list
assert calls[0][1]['search_type'] == 'count'
assert calls[0][1]['body']['aggs']['filtered']['aggs']['counts']['terms'] == {'field': 'this', 'size': 5}
assert counts['top_events_this'] == {'a': 10, 'b': 5}
Expand Down Expand Up @@ -1131,7 +1132,7 @@ def mock_loop():
ea.stop()

with mock.patch.object(ea, 'sleep_for', return_value=None):
with mock.patch.object(ea, 'run_all_rules') as mock_run:
with mock.patch.object(ea, 'sleep_for') as mock_run:
mock_run.side_effect = mock_loop()
start_thread = threading.Thread(target=ea.start)
# Set as daemon to prevent a failed test from blocking exit
Expand Down
Loading

0 comments on commit df481d0

Please sign in to comment.