diff --git a/.gitignore b/.gitignore
index 83f222bb..fd7f38b0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,6 +6,8 @@
build
dist
docs/_build
+docs/solnlib
+docs/index.rst
splunksolutionlib.egg-info
__pycache__
*.pyc
diff --git a/examples/main.py b/examples/main.py
index 2a664f17..3ea05916 100644
--- a/examples/main.py
+++ b/examples/main.py
@@ -41,6 +41,7 @@ def run_test():
import test_metadata
import test_acl
import test_credentials
+ import test_conf_manager
print 'check splunk environment...'
test_splunkenv.test_splunkenv()
@@ -54,6 +55,8 @@ def run_test():
test_acl.test_acl_manager()
print 'test credential manager...'
test_credentials.test_credential_manager()
+ print 'test conf manager...'
+ test_conf_manager.test_conf_manager()
if __name__ == '__main__':
teardown_environment()
diff --git a/examples/test_conf_manager.py b/examples/test_conf_manager.py
new file mode 100644
index 00000000..4096f59d
--- /dev/null
+++ b/examples/test_conf_manager.py
@@ -0,0 +1,39 @@
+import sys
+import os.path as op
+import pytest
+
+from splunklib import client
+
+sys.path.insert(0, op.dirname(op.dirname(op.abspath(__file__))))
+import solnlib.splunk_rest_client as rest_client
+from solnlib import credentials
+from solnlib import conf_manager
+import context
+
+
+def test_conf_manager():
+ session_key = credentials.get_session_key(
+ context.username, context.password, scheme=context.scheme,
+ host=context.host, port=context.port)
+
+ cfsm = rest_client.SplunkRestClient(
+ session_key, context.app, owner=context.owner,
+ scheme=context.scheme, host=context.host, port=context.port).confs
+ try:
+ cfsm.get('test')
+ except client.HTTPError:
+ cfsm.create('test')
+
+ cfm = conf_manager.ConfManager(
+ 'test', session_key, context.app, owner=context.owner,
+ scheme=context.scheme, host=context.host, port=context.port)
+ cfm.update('test_stanza', {'k1': 1, 'k2': 2}, ['k1'])
+ assert cfm.get('test_stanza')['k1'] == 1
+ assert int(cfm.get('test_stanza')['k2']) == 2
+ assert len(cfm.get_all()) == 1
+ cfm.delete('test_stanza')
+ with pytest.raises(conf_manager.ConfStanzaNotExistException):
+ cfm.get('test_stanza')
+ with pytest.raises(conf_manager.ConfStanzaNotExistException):
+ cfm.delete('test_stanza')
+ cfm.reload()
diff --git a/solnlib/conf_manager.py b/solnlib/conf_manager.py
new file mode 100755
index 00000000..38e32a31
--- /dev/null
+++ b/solnlib/conf_manager.py
@@ -0,0 +1,272 @@
+# Copyright 2016 Splunk, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the 'License'): you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+'''
+This modules contains simple interfaces for Splunk config file management.
+'''
+
+import json
+
+from splunklib import binding
+
+from solnlib.utils import retry
+from solnlib.credentials import CredNotExistException
+from solnlib.credentials import CredentialManager
+import solnlib.splunk_rest_client as rest_client
+
+__all__ = ['ConfManagerException',
+ 'ConfStanzaNotExistException',
+ 'ConfManager']
+
+
+class ConfManagerException(Exception):
+ pass
+
+
+class ConfStanzaNotExistException(Exception):
+ pass
+
+
+class ConfManager(object):
+ '''Configuration file manager.
+
+ :param conf_file: Configuration file.
+ :type conf_file: ``string``
+ :param session_key: Splunk access token.
+ :type session_key: ``string``
+ :param app: App name of namespace.
+ :type app: ``string``
+ :param owner: (optional) Owner of namespace, default is `nobody`.
+ :type owner: ``string``
+ :param realm: (optional) Realm of credential, default is None.
+ :type realm: ``string``
+ :param scheme: (optional) The access scheme, default is `https`.
+ :type scheme: ``string``
+ :param host: (optional) The host name, default is `localhost`.
+ :type host: ``string``
+ :param port: (optional) The port number, default is 8089.
+ :type port: ``integer``
+ :param context: Other configurations for Splunk rest client.
+ :type context: ``dict``
+
+ :raises ConfManagerException: If `conf_file` does not exist.
+
+ Usage::
+
+ >>> from solnlib import conf_manager
+ >>> cfm = conf_manager.ConfManager('test_conf',
+ session_key,
+ 'Splunk_TA_test')
+ '''
+
+ ENCRYPTED_TOKEN = '******'
+
+ reserved_keys = ('userName', 'appName')
+
+ def __init__(self, conf_file, session_key, app, owner='nobody',
+ scheme='https', host='localhost', port=8089, **context):
+ try:
+ self._conf_mgr = rest_client.SplunkRestClient(
+ session_key,
+ app,
+ owner=owner,
+ scheme=scheme,
+ host=host,
+ port=port,
+ **context).confs[conf_file]
+ except KeyError:
+ raise ConfManagerException(
+ 'Config file: %s does not exist.' % conf_file)
+ self._conf_file = conf_file
+ self._cred_mgr = CredentialManager(
+ session_key, app, owner=owner, realm=app,
+ scheme=scheme, host=host, port=port, **context)
+
+ def _filter_stanza(self, stanza):
+ for k in self.reserved_keys:
+ if k in stanza:
+ del stanza[k]
+
+ return stanza
+
+ def _encrypt_stanza(self, stanza_name, stanza, encrypt_keys):
+ if not encrypt_keys:
+ return stanza
+
+ encrypt_fields = {key: stanza[key] for key in encrypt_keys}
+ self._cred_mgr.set_password(stanza_name, json.dumps(encrypt_fields))
+
+ for key in encrypt_keys:
+ stanza[key] = self.ENCRYPTED_TOKEN
+
+ return stanza
+
+ def _decrypt_stanza(self, stanza_name, encrypted_stanza):
+ encrypted_keys = [key for key in encrypted_stanza if
+ encrypted_stanza[key] == self.ENCRYPTED_TOKEN]
+ if encrypted_keys:
+ encrypted_fields = json.loads(
+ self._cred_mgr.get_password(stanza_name))
+ for key in encrypted_keys:
+ encrypted_stanza[key] = encrypted_fields[key]
+
+ return encrypted_stanza
+
+ def _delete_stanza_creds(self, stanza_name):
+ self._cred_mgr.delete_password(stanza_name)
+
+ @retry(exceptions=[binding.HTTPError])
+ def get(self, stanza_name):
+ '''Get stanza from configuration file.
+
+ :param stanza_name: Stanza name.
+ :type stanza_name: ``string``
+ :returns: Stanza, like: {
+ 'disabled': '0',
+ 'eai:appName': 'solnlib_demo',
+ 'eai:userName': 'nobody',
+ 'k1': '1',
+ 'k2': '2'}
+ :rtype: ``dict``
+
+ Usage::
+
+ >>> from solnlib import conf_manager
+ >>> cfm = conf_manager.ConfManager('test_conf',
+ session_key,
+ 'Splunk_TA_test')
+ >>> cfm.get('test_stanza')
+ '''
+
+ try:
+ stanza_mgr = self._conf_mgr.list(name=stanza_name)[0]
+ except binding.HTTPError as e:
+ if e.status == 404:
+ raise ConfStanzaNotExistException(
+ 'Stanza: %s does not exist in %s.conf' %
+ (stanza_name, self._conf_file))
+ raise
+
+ stanza = self._decrypt_stanza(stanza_mgr.name, stanza_mgr.content)
+ return stanza
+
+ @retry(exceptions=[binding.HTTPError])
+ def get_all(self):
+ '''Get all stanzas from configuration file.
+
+ :returns: All stanzas, like: {'test': {
+ 'disabled': '0',
+ 'eai:appName': 'solnlib_demo',
+ 'eai:userName': 'nobody',
+ 'k1': '1',
+ 'k2': '2'}}
+ :rtype: ``dict``
+
+ :raises ConfStanzaNotExistException: If stanza does not exist.
+
+
+ Usage::
+
+ >>> from solnlib import conf_manager
+ >>> cfm = conf_manager.ConfManager('test_conf',
+ session_key,
+ 'Splunk_TA_test')
+ >>> cfm.get_all()
+ '''
+
+ stanza_mgrs = self._conf_mgr.list()
+ return {stanza_mgr.name: self._decrypt_stanza(
+ stanza_mgr.name, stanza_mgr.content) for stanza_mgr in stanza_mgrs}
+
+ @retry(exceptions=[binding.HTTPError])
+ def update(self, stanza_name, stanza, encrypt_keys=None):
+ '''Update stanza with some fields are encrypted.
+
+ :param stanza_name: Stanza name.
+ :type stanza_name: ``string``
+ :param stanza: Stanza to update, like: {
+ 'k1': 1,
+ 'k2': 2}.
+ :type stanza: ``dict``
+ :param encrypt_keys: Fields name to encrypt.
+ :type encrypt_keys: ``list``
+
+ Usage::
+
+ >>> from solnlib import conf_manager
+ >>> cfm = conf_manager.ConfManager('test_conf',
+ session_key,
+ 'Splunk_TA_test')
+ >>> cfm.get('test_stanza', {'k1': 1, 'k2': 2}, ['k1'])
+ '''
+
+ stanza = self._filter_stanza(stanza)
+ encrypted_stanza = self._encrypt_stanza(stanza_name,
+ stanza,
+ encrypt_keys)
+
+ try:
+ stanza_mgr = self._conf_mgr.list(name=stanza_name)[0]
+ except binding.HTTPError as e:
+ if e.status != 404:
+ raise
+
+ stanza_mgr = self._conf_mgr.create(stanza_name)
+
+ stanza_mgr.submit(encrypted_stanza)
+
+ @retry(exceptions=[binding.HTTPError])
+ def delete(self, stanza_name):
+ '''Delete stanza.
+
+ :param stanza_name: Stanza name to delete.
+ :type stanza_name: ``string``
+
+ :raises ConfStanzaNotExistException: If stanza does not exist.
+
+ Usage::
+
+ >>> from solnlib import conf_manager
+ >>> cfm = conf_manager.ConfManager('test_conf',
+ session_key,
+ 'Splunk_TA_test')
+ >>> cfm.delete('test_stanza')
+ '''
+
+ try:
+ self._conf_mgr.delete(stanza_name)
+ except KeyError:
+ raise ConfStanzaNotExistException(
+ 'Stanza: %s does not exist in %s.conf' %
+ (stanza_name, self._conf_file))
+
+ try:
+ self._cred_mgr.delete_password(stanza_name)
+ except CredNotExistException:
+ pass
+
+ @retry(exceptions=[binding.HTTPError])
+ def reload(self):
+ '''Reload configuration file.
+
+ Usage::
+
+ >>> from solnlib import conf_manager
+ >>> cfm = conf_manager.ConfManager('test_conf',
+ session_key,
+ 'Splunk_TA_test')
+ >>> cfm.reload()
+ '''
+
+ self._conf_mgr.get('_reload')
diff --git a/tests/common.py b/tests/common.py
index ec2ba6cb..8ae57d5c 100644
--- a/tests/common.py
+++ b/tests/common.py
@@ -17,7 +17,7 @@
def mock_splunkhome(monkeypatch):
- class _MockPopen(object):
+ class MockPopen(object):
def __init__(self, args, bufsize=0, executable=None,
stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=False, shell=False,
@@ -37,23 +37,23 @@ def communicate(self, input=None):
return fp.read(), None
monkeypatch.setenv('SPLUNK_HOME', op.join(cur_dir, 'data/mock_splunk/'))
- monkeypatch.setattr(subprocess, 'Popen', _MockPopen)
+ monkeypatch.setattr(subprocess, 'Popen', MockPopen)
def mock_serverinfo(monkeypatch):
- _mock_server_info_property = {'server_roles': ['cluster_search_head', 'search_head',
- 'kv_store', 'shc_captain'],
- 'version': '6.3.1511.2',
- 'serverName': 'unittestServer'}
+ mock_server_info_property = {'server_roles': ['cluster_search_head', 'search_head',
+ 'kv_store', 'shc_captain'],
+ 'version': '6.3.1511.2',
+ 'serverName': 'unittestServer'}
- monkeypatch.setattr(client.Service, 'info', _mock_server_info_property)
+ monkeypatch.setattr(client.Service, 'info', mock_server_info_property)
def mock_gethostname(monkeypatch):
- def _mock_gethostname():
+ def mock_gethostname():
return 'unittestServer'
- monkeypatch.setattr(socket, 'gethostname', _mock_gethostname)
+ monkeypatch.setattr(socket, 'gethostname', mock_gethostname)
def make_response_record(body, status=200):
diff --git a/tests/test_acl.py b/tests/test_acl.py
index 48a7e296..bc556fb4 100644
--- a/tests/test_acl.py
+++ b/tests/test_acl.py
@@ -10,56 +10,58 @@
sys.path.insert(0, op.dirname(op.dirname(op.abspath(__file__))))
from solnlib import acl
+_old_acl = '{"entry": [{"author": "nobody", "name": "transforms", "acl": {"sharing": "global", "perms": {"read": ["*"], "write": ["*"]}, "app": "unittest", "modifiable": true, "owner": "nobody", "can_change_perms": true, "can_share_global": true, "can_list": true, "can_share_user": false, "can_share_app": true, "removable": false, "can_write": true}}]}'
-class TestACLManager(object):
- _old_acl = '{"entry": [{"author": "nobody", "name": "transforms", "acl": {"sharing": "global", "perms": {"read": ["*"], "write": ["*"]}, "app": "unittest", "modifiable": true, "owner": "nobody", "can_change_perms": true, "can_share_global": true, "can_list": true, "can_share_user": false, "can_share_app": true, "removable": false, "can_write": true}}]}'
+_new_acl1 = '{"entry": [{"author": "nobody", "name": "transforms", "acl": {"sharing": "global", "perms": {"read": ["admin"], "write": ["admin"]}, "app": "unittest", "modifiable": true, "owner": "nobody", "can_change_perms": true, "can_share_global": true, "can_list": true, "can_share_user": false, "can_share_app": true, "removable": false, "can_write": true}}]}'
+
+_new_acl2 = '{"entry": [{"author": "nobody", "name": "transforms", "acl": {"sharing": "global", "perms": {"read": ["admin"], "write": ["*"]}, "app": "unittest", "modifiable": true, "owner": "nobody", "can_change_perms": true, "can_share_global": true, "can_list": true, "can_share_user": false, "can_share_app": true, "removable": false, "can_write": true}}]}'
- _new_acl1 = '{"entry": [{"author": "nobody", "name": "transforms", "acl": {"sharing": "global", "perms": {"read": ["admin"], "write": ["admin"]}, "app": "unittest", "modifiable": true, "owner": "nobody", "can_change_perms": true, "can_share_global": true, "can_list": true, "can_share_user": false, "can_share_app": true, "removable": false, "can_write": true}}]}'
+_new_acl3 = '{"entry": [{"author": "nobody", "name": "transforms", "acl": {"sharing": "global", "perms": {"read": ["*"], "write": ["admin"]}, "app": "unittest", "modifiable": true, "owner": "nobody", "can_change_perms": true, "can_share_global": true, "can_list": true, "can_share_user": false, "can_share_app": true, "removable": false, "can_write": true}}]}'
- _new_acl2 = '{"entry": [{"author": "nobody", "name": "transforms", "acl": {"sharing": "global", "perms": {"read": ["admin"], "write": ["*"]}, "app": "unittest", "modifiable": true, "owner": "nobody", "can_change_perms": true, "can_share_global": true, "can_list": true, "can_share_user": false, "can_share_app": true, "removable": false, "can_write": true}}]}'
- _new_acl3 = '{"entry": [{"author": "nobody", "name": "transforms", "acl": {"sharing": "global", "perms": {"read": ["*"], "write": ["admin"]}, "app": "unittest", "modifiable": true, "owner": "nobody", "can_change_perms": true, "can_share_global": true, "can_list": true, "can_share_user": false, "can_share_app": true, "removable": false, "can_write": true}}]}'
+def _mock_get(self, path_segment, owner=None, app=None, sharing=None, **query):
+ return common.make_response_record(_old_acl)
- def _mock_get(self, path_segment, owner=None, app=None, sharing=None, **query):
- return common.make_response_record(self._old_acl)
- def _mock_post(self, path_segment, owner=None, app=None, sharing=None, headers=None, **query):
- if 'perms.read=admin' in query['body'] and 'perms.write=admin' in query['body']:
- return common.make_response_record(self._new_acl1)
- elif 'perms.read=admin' in query['body']:
- return common.make_response_record(self._new_acl2)
- elif 'perms.write=admin' in query['body']:
- return common.make_response_record(self._new_acl3)
- else:
- return common.make_response_record(self._old_acl)
+def _mock_post(self, path_segment, owner=None, app=None, sharing=None, headers=None, **query):
+ if 'perms.read=admin' in query['body'] and 'perms.write=admin' in query['body']:
+ return common.make_response_record(_new_acl1)
+ elif 'perms.read=admin' in query['body']:
+ return common.make_response_record(_new_acl2)
+ elif 'perms.write=admin' in query['body']:
+ return common.make_response_record(_new_acl3)
+ else:
+ return common.make_response_record(_old_acl)
+
+class TestACLManager(object):
def test_get(self, monkeypatch):
- monkeypatch.setattr(binding.Context, 'get', self._mock_get)
+ monkeypatch.setattr(binding.Context, 'get', _mock_get)
aclm = acl.ACLManager(common.SESSION_KEY, common.app)
perms = aclm.get('data/transforms/extractions/_acl')
- assert perms == json.loads(self._old_acl)['entry'][0]['acl']
+ assert perms == json.loads(_old_acl)['entry'][0]['acl']
def test_update(self, monkeypatch):
- monkeypatch.setattr(binding.Context, 'get', self._mock_get)
- monkeypatch.setattr(binding.Context, 'post', self._mock_post)
+ monkeypatch.setattr(binding.Context, 'get', _mock_get)
+ monkeypatch.setattr(binding.Context, 'post', _mock_post)
aclm = acl.ACLManager(common.SESSION_KEY, common.app)
perms = aclm.update('data/transforms/extractions/_acl',
perms_read=['admin'], perms_write=['admin'])
- assert perms == json.loads(self._new_acl1)['entry'][0]['acl']
+ assert perms == json.loads(_new_acl1)['entry'][0]['acl']
perms = aclm.update('data/transforms/extractions/_acl',
perms_read=['admin'])
- assert perms == json.loads(self._new_acl2)['entry'][0]['acl']
+ assert perms == json.loads(_new_acl2)['entry'][0]['acl']
perms = aclm.update('data/transforms/extractions/_acl',
perms_write=['admin'])
- assert perms == json.loads(self._new_acl3)['entry'][0]['acl']
+ assert perms == json.loads(_new_acl3)['entry'][0]['acl']
perms = aclm.update('data/transforms/extractions/_acl')
- assert perms == json.loads(self._old_acl)['entry'][0]['acl']
+ assert perms == json.loads(_old_acl)['entry'][0]['acl']
with pytest.raises(acl.ACLException):
aclm.update('data/transforms/extractions', perms_write=['admin'])
diff --git a/tests/test_conf_manager.py b/tests/test_conf_manager.py
new file mode 100644
index 00000000..4a4a4ed3
--- /dev/null
+++ b/tests/test_conf_manager.py
@@ -0,0 +1,129 @@
+import sys
+import hashlib
+import pytest
+import os.path as op
+
+from splunklib import binding
+from splunklib import client
+from splunklib.data import record
+
+import common
+
+sys.path.insert(0, op.dirname(op.dirname(op.abspath(__file__))))
+from solnlib import conf_manager
+
+
+def test_conf_manager(monkeypatch):
+ credentials_store = {}
+ all_stanzas = {}
+
+ def mock_storage_passwords_list(self, count=None, **kwargs):
+ return credentials_store.values()
+
+ def mock_storage_passwords_create(self, password, username, realm=None):
+ title = '{}:{}:'.format(realm, username) if \
+ realm else ':{}:'.format(username)
+ password = client.StoragePassword(
+ None,
+ 'storage/passwords/{}'.format(title),
+ state=record({'content': {'clear_password': password,
+ 'encr_password': hashlib.md5(
+ password).digest(),
+ 'password': '********',
+ 'realm': realm,
+ 'username': username},
+ 'title': title}))
+ credentials_store[title] = password
+ return password
+
+ def mock_storage_passwords_delete(self, username, realm=None):
+ title = '{}:{}:'.format(realm, username) if \
+ realm else ':{}:'.format(username)
+ if title in credentials_store:
+ del credentials_store[title]
+ else:
+ raise KeyError('No such entity %s' % username)
+
+ def mock_configuration_get(
+ self, name='', owner=None, app=None, sharing=None, **query):
+ return common.make_response_record('')
+
+ def mock_configuration_file_list(self, count=None, **kwargs):
+ if not hasattr(mock_configuration_file_list, 'normal_mode'):
+ mock_configuration_file_list.normal_mode = True
+ raise binding.HTTPError(common.make_response_record('', status=404))
+ else:
+ if 'name' in kwargs:
+ if kwargs['name'] in all_stanzas:
+ stanza_mgr = client.Stanza(
+ self.service,
+ 'configs/conf-test/{0}/'.format(kwargs['name']),
+ skip_refresh=True)
+ stanza_mgr._state = common.record(
+ {'title': kwargs['name'],
+ 'content': all_stanzas[kwargs['name']]})
+ return [stanza_mgr]
+ else:
+ raise binding.HTTPError(
+ common.make_response_record('', status=404))
+ else:
+ stanza_mgrs = []
+ for stanza_name, stanza in all_stanzas.iteritems():
+ stanza_mgr = client.Stanza(
+ self.service,
+ 'configs/conf-test/{0}/'.format(stanza_name),
+ skip_refresh=True)
+ stanza_mgr._state = common.record({'title': stanza_name,
+ 'content': stanza})
+ stanza_mgrs.append(stanza_mgr)
+
+ return stanza_mgrs
+
+ def mock_configuration_file_get(
+ self, name="", owner=None, app=None, sharing=None, **query):
+ return common.make_response_record('')
+
+ def mock_configuration_file_create(self, name, **params):
+ stanza_mgr = client.Stanza(
+ self.service,
+ 'configs/conf-test/{0}/'.format(name),
+ skip_refresh=True)
+ stanza_mgr._state = common.record({'title': name,
+ 'content': {}})
+ return stanza_mgr
+
+ def mock_configuration_file_delete(self, name, **params):
+ del all_stanzas[name]
+
+ def mock_stanza_submit(self, stanza):
+ all_stanzas[self.name] = stanza
+
+ monkeypatch.setattr(client.StoragePasswords, 'list',
+ mock_storage_passwords_list)
+ monkeypatch.setattr(client.StoragePasswords, 'create',
+ mock_storage_passwords_create)
+ monkeypatch.setattr(client.StoragePasswords, 'delete',
+ mock_storage_passwords_delete)
+ monkeypatch.setattr(client.Configurations, 'get',
+ mock_configuration_get)
+ monkeypatch.setattr(client.ConfigurationFile, 'get',
+ mock_configuration_file_get)
+ monkeypatch.setattr(client.ConfigurationFile, 'list',
+ mock_configuration_file_list)
+ monkeypatch.setattr(client.ConfigurationFile, 'create',
+ mock_configuration_file_create)
+ monkeypatch.setattr(client.ConfigurationFile, 'delete',
+ mock_configuration_file_delete)
+ monkeypatch.setattr(client.Stanza, 'submit',
+ mock_stanza_submit)
+
+ cfm = conf_manager.ConfManager('test', common.SESSION_KEY, common.app)
+ cfm.update('test_stanza', {'k1': 1, 'k2': 2}, ['k1'])
+ assert cfm.get('test_stanza') == {'k2': 2, 'k1': 1}
+ assert cfm.get_all() == {'test_stanza': {'k2': 2, 'k1': 1}}
+ cfm.delete('test_stanza')
+ with pytest.raises(conf_manager.ConfStanzaNotExistException):
+ cfm.get('test_stanza')
+ with pytest.raises(conf_manager.ConfStanzaNotExistException):
+ cfm.delete('test_stanza')
+ cfm.reload()
diff --git a/tests/test_credentials.py b/tests/test_credentials.py
index 071a6fba..64920aa5 100644
--- a/tests/test_credentials.py
+++ b/tests/test_credentials.py
@@ -13,65 +13,67 @@
from solnlib import credentials
-class TestCredentialManager(object):
- _credentials_store = {}
+def test_credential_manager(monkeypatch):
+ credentials_store = {}
- def _mock_storage_passwords_list(self, count=None, **kwargs):
- return TestCredentialManager._credentials_store.values()
+ def mock_storage_passwords_list(self, count=None, **kwargs):
+ return credentials_store.values()
- def _mock_storage_passwords_create(self, password, username, realm=None):
- title = '{}:{}:'.format(realm, username) if realm else ':{}:'.format(username)
+ def mock_storage_passwords_create(self, password, username, realm=None):
+ title = '{}:{}:'.format(realm, username) if \
+ realm else ':{}:'.format(username)
password = client.StoragePassword(
None,
'storage/passwords/{}'.format(title),
- state=record({'content': {'clear_password': password,
- 'encr_password': hashlib.md5(password).digest(),
- 'password': '********',
- 'realm': realm,
- 'username': username},
- 'title': title}))
- TestCredentialManager._credentials_store[title] = password
+ state=record(
+ {'content': {'clear_password': password,
+ 'encr_password': hashlib.md5(password).digest(),
+ 'password': '********',
+ 'realm': realm,
+ 'username': username},
+ 'title': title}))
+ credentials_store[title] = password
return password
- def _mock_storage_passwords_delete(self, username, realm=None):
- title = '{}:{}:'.format(realm, username) if realm else ':{}:'.format(username)
- if title in TestCredentialManager._credentials_store:
- del TestCredentialManager._credentials_store[title]
+ def mock_storage_passwords_delete(self, username, realm=None):
+ title = '{}:{}:'.format(realm, username) \
+ if realm else ':{}:'.format(username)
+ if title in credentials_store:
+ del credentials_store[title]
else:
raise KeyError('No such entity %s' % username)
- def test_set_password(self, monkeypatch):
- monkeypatch.setattr(client.StoragePasswords, 'list', self._mock_storage_passwords_list)
- monkeypatch.setattr(client.StoragePasswords, 'create', self._mock_storage_passwords_create)
- monkeypatch.setattr(client.StoragePasswords, 'delete', self._mock_storage_passwords_delete)
+ monkeypatch.setattr(
+ client.StoragePasswords, 'list', mock_storage_passwords_list)
+ monkeypatch.setattr(
+ client.StoragePasswords, 'create', mock_storage_passwords_create)
+ monkeypatch.setattr(
+ client.StoragePasswords, 'delete', mock_storage_passwords_delete)
- cm = credentials.CredentialManager(common.SESSION_KEY, common.app, realm='realm_test')
- cm.set_password('testuser1', 'password1')
- assert cm.get_password('testuser1') == 'password1'
+ cm = credentials.CredentialManager(
+ common.SESSION_KEY, common.app, realm='realm_test')
+ cm.set_password('testuser1', 'password1')
+ assert cm.get_password('testuser1') == 'password1'
- long_password = ''.join(['1111111111' for i in xrange(30)])
- cm.set_password('testuser2', long_password)
- assert cm.get_password('testuser2') == long_password
+ long_password = ''.join(['1111111111' for i in xrange(30)])
+ cm.set_password('testuser2', long_password)
+ assert cm.get_password('testuser2') == long_password
- def test_delete_password(self, monkeypatch):
- monkeypatch.setattr(client.StoragePasswords, 'list', self._mock_storage_passwords_list)
- monkeypatch.setattr(client.StoragePasswords, 'delete', self._mock_storage_passwords_delete)
- monkeypatch.setattr(client.StoragePasswords, 'delete', self._mock_storage_passwords_delete)
+ cm.delete_password('testuser1')
+ with pytest.raises(Exception):
+ cm.get_password('testuser1')
- cm = credentials.CredentialManager(common.SESSION_KEY, common.app, realm='realm_test')
- cm.delete_password('testuser1')
- with pytest.raises(Exception):
- cm.get_password('testuser1')
-
- cm.delete_password('testuser2')
- with pytest.raises(Exception):
- cm.get_password('testuser2')
+ cm.delete_password('testuser2')
+ with pytest.raises(Exception):
+ cm.get_password('testuser2')
def test_get_session_key(monkeypatch):
def _mock_session_key_post(self, url, headers=None, **kwargs):
- return common.make_response_record('{"sessionKey":"' + common.SESSION_KEY + '"}')
+ return common.make_response_record(
+ '{"sessionKey":"' + common.SESSION_KEY + '"}')
monkeypatch.setattr(binding.HttpLib, 'post', _mock_session_key_post)
- assert credentials.get_session_key('user', 'password') == common.SESSION_KEY
+ assert credentials.get_session_key(
+ 'user', 'password') == common.SESSION_KEY
diff --git a/tests/test_modular_input_checkpointer.py b/tests/test_modular_input_checkpointer.py
index ae9a7136..36f05150 100644
--- a/tests/test_modular_input_checkpointer.py
+++ b/tests/test_modular_input_checkpointer.py
@@ -15,119 +15,99 @@
cur_dir = op.dirname(op.abspath(__file__))
-class TestKVStoreCheckpointer(object):
+def test_kvstore_checkpointer(monkeypatch):
KVSTORE_CHECKPOINTER_COLLECTION_NAME = 'TestKVStoreCheckpointer'
- _checkpoint_states = {}
+ checkpoint_states = {}
- def _mock_kvstore_checkpointer_collections_get(
- self, name='', owner=None, app=None, sharing=None, **query):
+ def mock_kvstore_collections_get(self, name='', owner=None, app=None, sharing=None, **query):
raise binding.HTTPError(common.make_response_record('', status=404))
- def _mock_kvstore_checkpointer_collections_create(
- self, name, indexes={}, fields={}, **kwargs):
+ def mock_kvstore_collections_create(self, name, indexes={}, fields={}, **kwargs):
pass
- def _mock_kvstore_checkpointer_collections_list(self, count=None, **kwargs):
+ def mock_kvstore_collections_list(self, count=None, **kwargs):
return [client.KVStoreCollection(None, None)]
- def _mock_kvstore_checkpointer_collection_init(self, service, path, **kwargs):
+ def mock_kvstore_collection_init(self, service, path, **kwargs):
pass
- def _mock_kvstore_checkpointer_collection_data_init(self, collection):
+ def mock_kvstore_collection_data_init(self, collection):
pass
- def _mock_kvstore_checkpointer_collection_data_batch_save(self, *documents):
+ def mock_kvstore_collection_data_batch_save(self, *documents):
for document in documents:
- self._checkpoint_states[document['_key']] = document
+ checkpoint_states[document['_key']] = document
- def _mock_kvstore_checkpointer_collection_data_query_by_id(self, id):
+ def mock_kvstore_collection_data_query_by_id(self, id):
try:
- return self._checkpoint_states[id]
+ return checkpoint_states[id]
except:
- raise binding.HTTPError(common.make_response_record('', status=404))
+ raise binding.HTTPError(
+ common.make_response_record('', status=404))
- def _mock_kvstore_checkpointer_collection_data_delete_by_id(self, id):
+ def mock_kvstore_collection_data_delete_by_id(self, id):
try:
- del self._checkpoint_states[id]
+ del checkpoint_states[id]
except:
raise binding.HTTPError(None, status=404)
- def test_kvstore_checkpoint_crud(self, monkeypatch):
- monkeypatch.setattr(
- client.KVStoreCollections, 'get',
- self._mock_kvstore_checkpointer_collections_get)
- monkeypatch.setattr(
- client.KVStoreCollections, 'create',
- self._mock_kvstore_checkpointer_collections_create)
- monkeypatch.setattr(
- client.KVStoreCollections, 'list',
- self._mock_kvstore_checkpointer_collections_list)
- monkeypatch.setattr(
- client.KVStoreCollection, '__init__',
- self._mock_kvstore_checkpointer_collection_init)
- monkeypatch.setattr(
- client.KVStoreCollection, 'name',
- self.KVSTORE_CHECKPOINTER_COLLECTION_NAME)
- monkeypatch.setattr(
- client.KVStoreCollectionData, '__init__',
- self._mock_kvstore_checkpointer_collection_data_init)
- monkeypatch.setattr(
- client.KVStoreCollectionData, 'batch_save',
- self._mock_kvstore_checkpointer_collection_data_batch_save)
- monkeypatch.setattr(
- client.KVStoreCollectionData, 'query_by_id',
- self._mock_kvstore_checkpointer_collection_data_query_by_id)
- monkeypatch.setattr(
- client.KVStoreCollectionData, 'delete_by_id',
- self._mock_kvstore_checkpointer_collection_data_delete_by_id)
-
- ck = KVStoreCheckpointer(
- TestKVStoreCheckpointer.KVSTORE_CHECKPOINTER_COLLECTION_NAME,
- common.SESSION_KEY,
- 'Splunk_TA_test')
-
- ck.update('test_state_key1', 'test_state_value1')
- ck.batch_update([{'_key': 'test_state_key2',
- 'state': 'test_state_value2'},
- {'_key': 'test_state_key3',
- 'state': 'test_state_value3'}])
- assert ck.get('test_state_key1') == 'test_state_value1'
- assert ck.get('test_state_key2') == 'test_state_value2'
- assert ck.get('test_state_key3') == 'test_state_value3'
- ck.delete('test_state_key1')
- ck.delete('test_state_key2')
- ck.delete('test_state_key3')
- assert ck.get('test_state_key1') is None
- assert ck.get('test_state_key2') is None
- assert ck.get('test_state_key3') is None
-
-
-class TestFileCheckpointer(object):
+ monkeypatch.setattr(client.KVStoreCollections, 'get',
+ mock_kvstore_collections_get)
+ monkeypatch.setattr(client.KVStoreCollections, 'create',
+ mock_kvstore_collections_create)
+ monkeypatch.setattr(client.KVStoreCollections, 'list',
+ mock_kvstore_collections_list)
+ monkeypatch.setattr(client.KVStoreCollection, '__init__',
+ mock_kvstore_collection_init)
+ monkeypatch.setattr(client.KVStoreCollection, 'name',
+ KVSTORE_CHECKPOINTER_COLLECTION_NAME)
+ monkeypatch.setattr(client.KVStoreCollectionData, '__init__',
+ mock_kvstore_collection_data_init)
+ monkeypatch.setattr(client.KVStoreCollectionData, 'batch_save',
+ mock_kvstore_collection_data_batch_save)
+ monkeypatch.setattr(client.KVStoreCollectionData, 'query_by_id',
+ mock_kvstore_collection_data_query_by_id)
+ monkeypatch.setattr(client.KVStoreCollectionData, 'delete_by_id',
+ mock_kvstore_collection_data_delete_by_id)
+
+ ck = KVStoreCheckpointer(KVSTORE_CHECKPOINTER_COLLECTION_NAME,
+ common.SESSION_KEY, 'Splunk_TA_test')
+
+ ck.update('test_state_key1', 'test_state_value1')
+ ck.batch_update([{'_key': 'test_state_key2',
+ 'state': 'test_state_value2'},
+ {'_key': 'test_state_key3',
+ 'state': 'test_state_value3'}])
+ assert ck.get('test_state_key1') == 'test_state_value1'
+ assert ck.get('test_state_key2') == 'test_state_value2'
+ assert ck.get('test_state_key3') == 'test_state_value3'
+ ck.delete('test_state_key1')
+ ck.delete('test_state_key2')
+ ck.delete('test_state_key3')
+ assert ck.get('test_state_key1') is None
+ assert ck.get('test_state_key2') is None
+ assert ck.get('test_state_key3') is None
+
+
+def test_file_checkpointer(monkeypatch):
checkpoint_dir = op.join(cur_dir, '.checkpoint_dir')
-
- @classmethod
- def setup_class(cls):
- os.mkdir(cls.checkpoint_dir)
-
- @classmethod
- def teardown_class(cls):
- shutil.rmtree(cls.checkpoint_dir)
-
- def test_file_checkpoint_crud(self, monkeypatch):
- ck = FileCheckpointer(self.checkpoint_dir)
-
- ck.update('test_state_key1', 'test_state_value1')
- ck.batch_update([{'_key': 'test_state_key2',
- 'state': 'test_state_value2'},
- {'_key': 'test_state_key3',
- 'state': 'test_state_value3'}])
- assert ck.get('test_state_key1') == 'test_state_value1'
- assert ck.get('test_state_key2') == 'test_state_value2'
- assert ck.get('test_state_key3') == 'test_state_value3'
- ck.delete('test_state_key1')
- ck.delete('test_state_key2')
- ck.delete('test_state_key3')
- assert ck.get('test_state_key1') is None
- assert ck.get('test_state_key2') is None
- assert ck.get('test_state_key3') is None
+ os.mkdir(checkpoint_dir)
+
+ ck = FileCheckpointer(checkpoint_dir)
+ ck.update('test_state_key1', 'test_state_value1')
+ ck.batch_update([{'_key': 'test_state_key2',
+ 'state': 'test_state_value2'},
+ {'_key': 'test_state_key3',
+ 'state': 'test_state_value3'}])
+ assert ck.get('test_state_key1') == 'test_state_value1'
+ assert ck.get('test_state_key2') == 'test_state_value2'
+ assert ck.get('test_state_key3') == 'test_state_value3'
+ ck.delete('test_state_key1')
+ ck.delete('test_state_key2')
+ ck.delete('test_state_key3')
+ assert ck.get('test_state_key1') is None
+ assert ck.get('test_state_key2') is None
+ assert ck.get('test_state_key3') is None
+
+ shutil.rmtree(checkpoint_dir)
diff --git a/tests/test_modular_input_event_writer.py b/tests/test_modular_input_event_writer.py
index 20c8144a..f169d5db 100644
--- a/tests/test_modular_input_event_writer.py
+++ b/tests/test_modular_input_event_writer.py
@@ -10,8 +10,8 @@
from solnlib.modular_input import HECEventWriter
-class TestClassicEventWriter(object):
- class _MockStdout(object):
+def test_classic_event_writer(monkeypatch):
+ class MockStdout(object):
def __init__(self):
self._buf = ''
@@ -26,71 +26,67 @@ def write(self, event):
def flush(self):
pass
- def test_write_events(self, monkeypatch):
-
- mock_stdout = self._MockStdout()
- monkeypatch.setattr(sys, 'stdout', mock_stdout)
-
- ew = ClassicEventWriter()
- events = []
- events.append(ew.create_event(data='This is a test data1.',
- time=1372274622.493,
- index='main',
- host='localhost',
- source='Splunk',
- sourcetype='misc',
- stanza='test_scheme://test',
- unbroken=True,
- done=False))
- events.append(ew.create_event(data='This is a test data2.',
- time=1372274622.493,
- index='main',
- host='localhost',
- source='Splunk',
- sourcetype='misc',
- stanza='test_scheme://test',
- unbroken=True,
- done=True))
- ew.write_events(events)
- ew.close()
-
- assert mock_stdout.read() == 'mainlocalhostmiscThis is a test data1.mainlocalhostmiscThis is a test data2.'
-
-
-class TestHECEventWriter(object):
-
- def _mock_get(self, path_segment, owner=None, app=None, sharing=None, **query):
+ mock_stdout = MockStdout()
+ monkeypatch.setattr(sys, 'stdout', mock_stdout)
+
+ ew = ClassicEventWriter()
+ events = []
+ events.append(ew.create_event(data='This is a test data1.',
+ time=1372274622.493,
+ index='main',
+ host='localhost',
+ source='Splunk',
+ sourcetype='misc',
+ stanza='test_scheme://test',
+ unbroken=True,
+ done=False))
+ events.append(ew.create_event(data='This is a test data2.',
+ time=1372274622.493,
+ index='main',
+ host='localhost',
+ source='Splunk',
+ sourcetype='misc',
+ stanza='test_scheme://test',
+ unbroken=True,
+ done=True))
+ ew.write_events(events)
+ ew.close()
+
+ assert mock_stdout.read() == 'mainlocalhostmiscThis is a test data1.mainlocalhostmiscThis is a test data2.'
+
+
+def test_hec_event_writer(monkeypatch):
+ def mock_get(self, path_segment, owner=None, app=None, sharing=None, **query):
if path_segment.endswith('/http'):
return common.make_response_record('{"entry": [{"content": {"port": 8088}}]}')
else:
return common.make_response_record('{"entry": [{"content": {"token": "87de04d1-0823-11e6-9c94-a45e60e34295"}}]}')
- def _mock_post(self, path_segment, owner=None, app=None, sharing=None, headers=None, **query):
+ def mock_post(self, path_segment, owner=None, app=None, sharing=None, headers=None, **query):
assert query['body'] == '{"index": "main", "sourcetype": "misc", "source": "Splunk", "host": "localhost", "time": 1372274622.493, "event": "This is a test data1."}\n{"index": "main", "sourcetype": "misc", "source": "Splunk", "host": "localhost", "time": 1372274622.493, "event": "This is a test data2."}'
- def test_write_events(self, monkeypatch):
- monkeypatch.setattr(binding.Context, 'get', self._mock_get)
- monkeypatch.setattr(binding.Context, 'post', self._mock_post)
-
- ew = HECEventWriter('HECTestInput', common.SESSION_KEY)
- events = []
- events.append(ew.create_event(data='This is a test data1.',
- time=1372274622.493,
- index='main',
- host='localhost',
- source='Splunk',
- sourcetype='misc',
- stanza='test_scheme://test',
- unbroken=True,
- done=False))
- events.append(ew.create_event(data='This is a test data2.',
- time=1372274622.493,
- index='main',
- host='localhost',
- source='Splunk',
- sourcetype='misc',
- stanza='test_scheme://test',
- unbroken=True,
- done=True))
- ew.write_events(events)
- ew.close()
+ monkeypatch.setattr(binding.Context, 'get', mock_get)
+ monkeypatch.setattr(binding.Context, 'post', mock_post)
+
+ ew = HECEventWriter('HECTestInput', common.SESSION_KEY)
+ events = []
+ events.append(ew.create_event(data='This is a test data1.',
+ time=1372274622.493,
+ index='main',
+ host='localhost',
+ source='Splunk',
+ sourcetype='misc',
+ stanza='test_scheme://test',
+ unbroken=True,
+ done=False))
+ events.append(ew.create_event(data='This is a test data2.',
+ time=1372274622.493,
+ index='main',
+ host='localhost',
+ source='Splunk',
+ sourcetype='misc',
+ stanza='test_scheme://test',
+ unbroken=True,
+ done=True))
+ ew.write_events(events)
+ ew.close()
diff --git a/tests/test_net_utils.py b/tests/test_net_utils.py
index e4123cd6..11f67345 100644
--- a/tests/test_net_utils.py
+++ b/tests/test_net_utils.py
@@ -14,7 +14,7 @@ def test_resolve_hostname(monkeypatch):
unresolvable_ip2 = '192.168.1.2'
unresolvable_ip3 = '192.168.1.3'
- def _mock_gethostbyaddr(addr):
+ def mock_gethostbyaddr(addr):
if addr == resolvable_ip:
return ('unittestServer', None, None)
elif addr == unresolvable_ip1:
@@ -24,7 +24,7 @@ def _mock_gethostbyaddr(addr):
else:
raise socket.timeout()
- monkeypatch.setattr(socket, 'gethostbyaddr', _mock_gethostbyaddr)
+ monkeypatch.setattr(socket, 'gethostbyaddr', mock_gethostbyaddr)
with pytest.raises(ValueError):
net_utils.resolve_hostname(invalid_ip)