Skip to content

Commit

Permalink
Merge pull request #16 from OObasuyi/working
Browse files Browse the repository at this point in the history
big update
  • Loading branch information
OObasuyi authored Nov 3, 2024
2 parents 5b7ebad + fe487f6 commit ec9149f
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 182 deletions.
102 changes: 45 additions & 57 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,61 +1,49 @@
# Cold Clarity
# ColdClarity 👁️ 🧊 👁️
[![python](https://img.shields.io/badge/Python-3.9-3776AB.svg?style=flat&logo=python&logoColor=white)](https://www.python.org) ![ISE Version](https://img.shields.io/badge/ISE-3.3-blue)

Endpoint Reporting App for Identity Service Engine (ISE)
ColdClarity is a tool designed to see data gathered by Cisco ISE from your network. It generates reports based on customized configurations for compliance, device status, and more.

## Table of Contents
- [Features](#Features)
- [Configuration](#Configuration)
- [General Report Settings](#General Report Settings)
- [Authentication Settings](#Authentication Settings)
- [SMTP Configuration](#SMTP Configuration)

## Features

## Table of Contents
- [Reporting](#Reporting)
- [Templates](#Templates)
- [FAQs](#FAQs)
- [Requirements](#Requirements)

## Reporting
### Using Source
```shell
# make sure you in the ColdClarity Dir.
# Also if the config YAML is in the current dir or the subdir Config_information you only need to specify the file name
# otherwise specify the complete PATH
python3.8 term_access.py --config_file config.yaml
```
### Using Containers
```shell
# you can use either docker or podman, but the following is created for podman.
# you can also run it natively with out this script as its only if you want to ensure the app runs and exits properly
# one use-case for this is running this on a cron job in a environment where the app will not work natively
# please edit the BASH file appropriately and give it the correct rights to run
./cold_watcher.bash
```
## Templates
### Generating ISE Certificates for Client Based Auth
If you are using client based authentication for your ISE deployment AND YOU DONT have a client based Cert that ISE has a CA for,
please look at the `self.signed_cert.bash` in the templates DIR on general instructions on how it works with this APP and ISE
```bash
# running the script is simple please make you give it correct permission
./self.signed_cert.bash
```
### Configuration YAML
1. In the `report` section please fill it out with the information you have and make sure `send_email` is set to `true`
if you want to send this report automatically with the `prepared_for` specifying the receiver of the report.
2. In `authentication` specify whether you are using user/password or certificate based login
3. If you are sending this report make sure your specify your mail relay settings.


## FAQs
**Q**: We have all of our devices in audit mode but our reports are generating that those endpoints are compliant when in ISE under the Failed Conditions
I see hits for those endpoints. how come?

**A**: Since ISE treats all audit Policies as Passes, this app will parse the posture Policy _AND NOT_ posture condition to give a more accurate totaling of endpoints status.

**Q**: In the reports my total endpoints and profiled endpoints are not matching my logical profiles buckets

**A**: As of ISE v3.1, it doest support the de-confliction of logical profile assigned to an endpoint. So if you have a
situation where you have the parent profile and child profile in the same ISE logical profile. ISE will just append the same logical profile to the endpoint. The same case holds true if you also assign the multiple logical profiles to the same endpoint


## Requirements
This app requires the following environment
```
python >= 3.8
Cisco ISE >= 3.3
```
- **Configurable Reporting**: Supports HW/SW cataloging, endpoint profiles, and custom posture policies.
- **Flexible Authentication**: Choose from certificate-based, text-based, and/or ERS-based authentication.
- **Automated Email Notifications**: Sends reports via email to specified recipients.
- **Customizable Profiles and Buckets**: Allows for logical organization of endpoints into profiles and buckets.
- **Specialized Reporting Options**: Option to focus reports on hardware details or other endpoint specifics.

## Configuration

The tool uses `config_templete.yaml` for its settings. Here are some key sections to configure:

### General Report Settings

- **Policy Name**: Define the NAC policy name with `policy_name`.
- **Output Destination**: Set the `destination_filepath` for where the report should be saved.
- **Notification Settings**: Toggle `send_email` to enable email notifications.

### Authentication Settings

- **Certificate-Based**: Set `authentication.cert_based.use` to `True` and provide `cert_pfx_location` and `cert_password`.
- **Text-Based**: Toggle `authentication.text_based.use` and provide `username` and `password` if preferred.
- **ERS-Based**: Uses `ers_based.username` and `ers_based.password`. Please make sure this account has the correct permission in ISE

### SMTP Configuration

Set up email notifications with:

```yaml
smtp:
email: your_email@example.com
server: smtp.example.com
port: 25
destination_email: recipient@example.com
destination_email_cc:
- cc1@example.com
- cc2@example.com
151 changes: 84 additions & 67 deletions ise_control.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import base64
import json

import pandas as pd
import requests
from requests.auth import HTTPBasicAuth
from tqdm import tqdm

from utilities import Rutils, log_collector
from requests_pkcs12 import Pkcs12Adapter
from os import getpid
from ssl import create_default_context, CERT_NONE
from xmltodict import parse as xmlparse
import oracledb
Expand Down Expand Up @@ -135,7 +132,7 @@ def logout_ise_session(self):
self.session.get(f'https://{self.ip}/admin/logout.jsp')
return

def dataconnect_engine(self,sql_string) -> pd.DataFrame:
def dataconnect_engine(self, sql_string) -> pd.DataFrame:
# skip Oracle Server Cert Validation
db_ssl_context = create_default_context()
db_ssl_context.check_hostname = False
Expand All @@ -152,11 +149,15 @@ def dataconnect_engine(self,sql_string) -> pd.DataFrame:
port=2484,
ssl_context=db_ssl_context
)
# get info from DB
# get as many rows as possible on a trip but dont overload mem if we dont have any 10K should be good size for the max amount from a query
# https://oracle.github.io/python-oracledb/samples/tutorial/Python-and-Oracle-Database-The-New-Wave-of-Scripting.html#fetching
cursor = connection.cursor()
cursor.prefetchrows = 10001
cursor.arraysize = 10000
# get info from DB
cursor.execute(sql_string)
columns = [desc[0] for desc in cursor.description]
output = cursor.fetchall()
data = cursor.fetchall()
cursor.close()
connection.close()
except Exception as execpt_error:
Expand All @@ -165,8 +166,8 @@ def dataconnect_engine(self,sql_string) -> pd.DataFrame:

try:
# put in df
dc_pd = pd.DataFrame(output, columns=columns)
# clean DB objects from df
dc_pd = pd.DataFrame(data, columns=columns)
# clean DB objects from df that cant be converted to STR type
badcols = []
for x in dc_pd.columns.tolist():
try:
Expand All @@ -191,14 +192,19 @@ def get_all_active_sessions(self) -> pd.DataFrame:
galls = self.mnt_data_retrival("Session/ActiveList")
if galls.status_code == 200:
data_dict = xmlparse(galls.content)
df = pd.json_normalize(data_dict['activeList']['activeSession'])
self.logger.debug(f'{df.shape[0]} Active Sessions Obtained')
return df
# if we have active sessions
if bool(data_dict['activeList'].get('activeSession')):
df = pd.json_normalize(data_dict['activeList']['activeSession'])
self.logger.debug(f'{df.shape[0]} Active Sessions Obtained')
return df
else:
self.logger.critical(f'NO active sessions found...')
return pd.DataFrame([])
else:
self.logger.critical('No active sessions found in results!')
self.logger.critical(f'received back response code {galls.status_code} CANNOT PROCESS ACTIVE SESSIONS ')
return pd.DataFrame([])

def get_all_profiler_count(self):
def get_all_profiler_count(self) -> int:
self.logger.debug('Obtaining active profile count')
gapc = self.mnt_data_retrival("Session/ProfilerCount")
if gapc.status_code == 200:
Expand Down Expand Up @@ -242,23 +248,67 @@ def get_license_info(self):
self.logger.debug('Obtained Serial Number')
return sn_data

def get_endpoint_software_info(self):
# applications data
host_sw = 'pageType=app&columns=productName%2Cversion%2CvendorName%2Ccategories%2CoperatingSystem%2CnoOfDevicesPerApp&sortBy=productName&startAt=1&pageSize=10000'
# transform to base64 then into the str representation of it
host_sw = base64.b64encode(str.encode(host_sw)).decode('utf-8')
# session cookie are persistence so only need to add this header that was implemented from the JS caller
headers = {'_QPH_': host_sw}
url = f"https://{self.ip}/admin/rs/uiapi/visibility"
self.sw_catalog = self.session.get(url, headers=headers)
return
def get_endpoint_software_info(self) -> pd.DataFrame:
endpoints = []
step_page = 1
control_size = 100

self.logger.info(f'Getting Collected software information')
sw_url = f"https://{self.ip}/admin/rs/uiapi/visibility"
while True:
header_data = f'pageType=app&' \
f'columns=productName%2C' \
f'version%2C' \
f'vendorName%2C' \
f'categories%2C' \
f'operatingSystem%2C' \
f'noOfDevicesPerApp&' \
f'sortBy=productName&' \
f'startAt={step_page}&' \
f'pageSize={control_size}' \

# transform to base64 then into the str representation of it
header_data = self.UTILS.encode_data(header_data)
# session cookie are persistence so only need to add this header that was implemented from the JS caller
header = self.HEADER_DATA.copy()
header['_QPH_'] = header_data
response = self.session.get(sw_url, headers=header)

if response.status_code == 200:
ep_data = response.json()
if len(ep_data) > 0:
endpoints += ep_data
step_page += 1
else:
self.logger.critical(f'GESI: no HW data for endpoints on page {step_page}')
break
else:
self.logger.debug(f'GESI: received back response code {response.status_code} on data retrieval')
break

# clean list and transform json str to dicts to load into DF
# check if anything in the list
if len(endpoints) > 0:
# ETL
sw_data = pd.DataFrame(endpoints)
sw_data.drop(columns=['id','productId'], inplace=True)
sw_data.fillna('None', inplace=True)
sw_data.drop_duplicates(inplace=True)
sw_data.reset_index(drop=True, inplace=True)

self.logger.info(f'Gathered {sw_data.shape[0]} Types of SW')
self.logger.info('SW data collection complete')
return sw_data
else:
self.logger.critical(f'GESI: no software data gathered from ISE')
return pd.DataFrame([])

def get_endpoint_hardware_info(self) -> pd.DataFrame:
endpoints = []
step_page = 1
control_size = 100

self.logger.info(f'Getting endpoint hardware info')
self.logger.info(f'Getting Collected hardware information')
url = f"https://{self.ip}/admin/rs/uiapi/hwvisibility"
while True:
# step thru endpoint pages
Expand All @@ -271,7 +321,7 @@ def get_endpoint_hardware_info(self) -> pd.DataFrame:
f'total_entries={control_size}'

# transform to base64 then into the str representation of it
header_data = base64.b64encode(str.encode(header_data)).decode('utf-8')
header_data = self.UTILS.encode_data(header_data)
# session cookie are persistence so only need to add this header that was implemented from the JS caller
header = self.HEADER_DATA.copy()
header['_QPH_'] = header_data
Expand All @@ -283,8 +333,11 @@ def get_endpoint_hardware_info(self) -> pd.DataFrame:
endpoints += ep_data
step_page += 1
else:
self.logger.critical(f'no HW data for endpoints on page {step_page}')
self.logger.critical(f'GEHI: no HW data for endpoints on page {step_page}')
break
else:
self.logger.debug(f'GEHI: received back response code {response.status_code} on data retrieval')
break

# clean list and transform json str to dicts to load into DF
endpoints = list(set(endpoints))
Expand All @@ -296,48 +349,10 @@ def get_endpoint_hardware_info(self) -> pd.DataFrame:
self.logger.info('Endpoint HW data collection complete')
return hw_data
else:
self.logger.critical(f'no Hardware data gathered from ISE')
self.logger.critical(f'GEHI: no Hardware data gathered from ISE')
return pd.DataFrame([])

def special_reporting_data(self):
special_rep = self.config['special_reporting']
reporting_location = special_rep.get('reporting_location')
find_files = special_rep.get('files_to_look_for')
filter_list = special_rep.get('filter_list')
special_items = special_rep.get('filter_specifics')
attr_to_look_for = special_rep.get('get_attribute_from_endpoint')
fnames = self.UTILS.get_files_from_loc(reporting_location, find_files)
# df holder
self.endpoints = pd.DataFrame([])
for f in fnames:
ep_df = pd.read_csv(f'{reporting_location}/{f}')
ep_df = self.filter_data(ep_df, filter_list, special_items)
self.endpoints = pd.concat([self.endpoints, ep_df], ignore_index=True)
self.UTILS.create_file_path('archive', f, parent_dir=reporting_location)
self.get_metadata_from_endpoints(attr_to_look_for)
self.logger.info('Endpoint special data collection complete')

def filter_data(self, raw_df: pd.DataFrame, filter_list: list, data_matching: dict = None):
raw_df.drop(columns=filter_list, inplace=True)
# if we have specifics we want to match on
if data_matching:
for k, v in data_matching.items():
# try to see if we fit a usecase if not keep going
try:
raw_df = raw_df[raw_df[k].astype(int) > v]
continue
except Exception as error:
self.logger.debug(error)

try:
raw_df = raw_df[raw_df[k].str.contains(v)]
continue
except Exception as error:
self.logger.debug(error)
return raw_df

def retrieve_endpoint_data(self):
# todo: need to fix with updated code
# deployment ID
self.sn = self.get_license_info()
self.endpoint_policies = None
Expand All @@ -352,5 +367,7 @@ def retrieve_endpoint_data(self):

if __name__ == '__main__':
ise = ISE()
ise.retrieve_endpoint_data()
ise.logout_ise_session()
# ise.retrieve_endpoint_data()
# ise.get_endpoint_software_info()
# ise.get_endpoint_hardware_info()
# ise.logout_ise_session()
Loading

0 comments on commit ec9149f

Please sign in to comment.