-
Notifications
You must be signed in to change notification settings - Fork 0
/
monitor_lynis.py
217 lines (198 loc) · 9.25 KB
/
monitor_lynis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
from monitor import Monitor
import configparser
import sys
from configparser import DuplicateOptionError, DuplicateSectionError, MissingSectionHeaderError
from time import sleep
import shutil
import os
from subprocess import call, DEVNULL
class MonitorLynis(Monitor):
def __init__(self):
super().__init__()
self.name = "lynis"
self.type = "info"
self.lynis = Lynis()
def get_point(self):
while True:
result = self.lynis.get_report()
yield result
sleep(60 * 60)
class Lynis(object):
@staticmethod
def installed():
result = False
if hasattr(shutil, 'which') and shutil.which("lynis") is not None:
result = True
if os.path.isfile("/usr/bin/lynis"):
result = True
if os.path.isfile("/opt/lynis/lynis"):
result = True
return result
@staticmethod
def get_report():
if os.path.isfile("/opt/lynis/lynis"):
call(["/opt/lynis/lynis", "--auditor", "Brixmond", "-Q"], cwd="/opt/lynis", stdout=DEVNULL)
else:
call(["lynis", "--auditor", "Brixmond", "-Q"], stdout=DEVNULL)
report_format = "old"
with open("/var/log/lynis-report.dat") as report:
report_string = report.read()
if "[General]" not in report_string:
report_format = "new"
report_string = "[General]\n" + report_string
report_file = ConfigParserMultiOpt()
report_file.read_string(report_string)
report_warnings = {}
report_suggestions = {}
for section in report_file.sections():
if report_file.has_option(section, "warning[]"):
warnings = report_file[section]["warning[]"]
for warning in warnings:
if report_format == "new":
name, description, *rest = warning.split("|")
prio = "U"
else:
name, prio, description, *rest = warning.split("|")
report_warnings[name] = {
"prio": prio,
"description": description
}
if report_file.has_option(section, "suggestion[]"):
suggestions = report_file[section]["suggestion[]"]
for suggestion in suggestions:
if report_format == "new":
name, description, *rest = suggestion.split("|")
prio = "U"
else:
name, prio, description, *rest = suggestion.split("|")
report_suggestions[name] = {
"prio": prio,
"description": description
}
return {
"suggestions": report_suggestions,
"warnings": report_warnings,
"warning_count": len(report_warnings),
"suggestion_count": len(report_suggestions)
}
# Code from http://stackoverflow.com/questions/13921323/handling-duplicate-keys-with-configparser
class ConfigParserMultiOpt(configparser.RawConfigParser):
"""ConfigParser allowing duplicate keys. Values are stored in a list"""
def __init__(self):
configparser.RawConfigParser.__init__(self, empty_lines_in_values=False, strict=False)
def _read(self, fp, fpname):
"""Parse a sectioned configuration file.
Each section in a configuration file contains a header, indicated by
a name in square brackets (`[]'), plus key/value options, indicated by
`name' and `value' delimited with a specific substring (`=' or `:' by
default).
Values can span multiple lines, as long as they are indented deeper
than the first line of the value. Depending on the parser's mode, blank
lines may be treated as parts of multiline values or ignored.
Configuration files may include comments, prefixed by specific
characters (`#' and `;' by default). Comments may appear on their own
in an otherwise empty line or may be entered in lines holding values or
section names.
"""
elements_added = set()
cursect = None # None, or a dictionary
sectname = None
optname = None
lineno = 0
indent_level = 0
e = None # None, or an exception
for lineno, line in enumerate(fp, start=1):
comment_start = None
# strip inline comments
for prefix in self._inline_comment_prefixes:
index = line.find(prefix)
if index == 0 or (index > 0 and line[index - 1].isspace()):
comment_start = index
break
# strip full line comments
for prefix in self._comment_prefixes:
if line.strip().startswith(prefix):
comment_start = 0
break
value = line[:comment_start].strip()
if not value:
if self._empty_lines_in_values:
# add empty line to the value, but only if there was no
# comment on the line
if (comment_start is None and
cursect is not None and
optname and
cursect[optname] is not None):
cursect[optname].append('') # newlines added at join
else:
# empty line marks end of value
indent_level = sys.maxsize
continue
# continuation line?
first_nonspace = self.NONSPACECRE.search(line)
cur_indent_level = first_nonspace.start() if first_nonspace else 0
if (cursect is not None and optname and
cur_indent_level > indent_level):
cursect[optname].append(value)
# a section header or option header?
else:
indent_level = cur_indent_level
# is it a section header?
mo = self.SECTCRE.match(value)
if mo:
sectname = mo.group('header')
if sectname in self._sections:
if self._strict and sectname in elements_added:
raise DuplicateSectionError(sectname, fpname,
lineno)
cursect = self._sections[sectname]
elements_added.add(sectname)
elif sectname == self.default_section:
cursect = self._defaults
else:
cursect = self._dict()
self._sections[sectname] = cursect
self._proxies[sectname] = configparser.SectionProxy(self, sectname)
elements_added.add(sectname)
# So sections can't start with a continuation line
optname = None
# no section header in the file?
elif cursect is None:
raise MissingSectionHeaderError(fpname, lineno, line)
# an option line?
else:
mo = self._optcre.match(value)
if mo:
optname, vi, optval = mo.group('option', 'vi', 'value')
if not optname:
e = self._handle_error(e, fpname, lineno, line)
optname = self.optionxform(optname.rstrip())
if (self._strict and
(sectname, optname) in elements_added):
raise configparser.DuplicateOptionError(sectname, optname, fpname, lineno)
elements_added.add((sectname, optname))
# This check is fine because the OPTCRE cannot
# match if it would set optval to None
if optval is not None:
optval = optval.strip()
# Check if this optname already exists
if (optname in cursect) and (cursect[optname] is not None):
# If it does, convert it to a tuple if it isn't already one
if not isinstance(cursect[optname], tuple):
cursect[optname] = tuple(cursect[optname])
cursect[optname] = cursect[optname] + tuple([optval])
else:
cursect[optname] = [optval]
else:
# valueless option handling
cursect[optname] = None
else:
# a non-fatal parsing error occurred. set up the
# exception but keep going. the exception will be
# raised at the end of the file and will contain a
# list of all bogus lines
e = self._handle_error(e, fpname, lineno, line)
# if any parsing errors occurred, raise an exception
if e:
raise e
self._join_multiline_values()