diff --git a/README.md b/README.md index f3e30da..f9f882d 100755 --- a/README.md +++ b/README.md @@ -1,61 +1,49 @@ -# Cold Clarity +# ColdClarity 👁️ 🧊 👁️ +[![python](https://img.shields.io/badge/Python-3.9-3776AB.svg?style=flat&logo=python&logoColor=white)](https://www.python.org) ![ISE Version](https://img.shields.io/badge/ISE-3.3-blue) -Endpoint Reporting App for Identity Service Engine (ISE) +ColdClarity is a tool designed to see data gathered by Cisco ISE from your network. It generates reports based on customized configurations for compliance, device status, and more. +## Table of Contents +- [Features](#Features) +- [Configuration](#Configuration) +- [General Report Settings](#General Report Settings) +- [Authentication Settings](#Authentication Settings) +- [SMTP Configuration](#SMTP Configuration) +## Features -## Table of Contents -- [Reporting](#Reporting) -- [Templates](#Templates) -- [FAQs](#FAQs) -- [Requirements](#Requirements) - -## Reporting -### Using Source -```shell -# make sure you in the ColdClarity Dir. -# Also if the config YAML is in the current dir or the subdir Config_information you only need to specify the file name -# otherwise specify the complete PATH -python3.8 term_access.py --config_file config.yaml -``` -### Using Containers -```shell -# you can use either docker or podman, but the following is created for podman. -# you can also run it natively with out this script as its only if you want to ensure the app runs and exits properly -# one use-case for this is running this on a cron job in a environment where the app will not work natively -# please edit the BASH file appropriately and give it the correct rights to run -./cold_watcher.bash -``` -## Templates -### Generating ISE Certificates for Client Based Auth -If you are using client based authentication for your ISE deployment AND YOU DONT have a client based Cert that ISE has a CA for, -please look at the `self.signed_cert.bash` in the templates DIR on general instructions on how it works with this APP and ISE -```bash -# running the script is simple please make you give it correct permission -./self.signed_cert.bash -``` -### Configuration YAML -1. In the `report` section please fill it out with the information you have and make sure `send_email` is set to `true` -if you want to send this report automatically with the `prepared_for` specifying the receiver of the report. -2. In `authentication` specify whether you are using user/password or certificate based login -3. If you are sending this report make sure your specify your mail relay settings. - - -## FAQs -**Q**: We have all of our devices in audit mode but our reports are generating that those endpoints are compliant when in ISE under the Failed Conditions -I see hits for those endpoints. how come? - -**A**: Since ISE treats all audit Policies as Passes, this app will parse the posture Policy _AND NOT_ posture condition to give a more accurate totaling of endpoints status. - -**Q**: In the reports my total endpoints and profiled endpoints are not matching my logical profiles buckets - -**A**: As of ISE v3.1, it doest support the de-confliction of logical profile assigned to an endpoint. So if you have a -situation where you have the parent profile and child profile in the same ISE logical profile. ISE will just append the same logical profile to the endpoint. The same case holds true if you also assign the multiple logical profiles to the same endpoint - - -## Requirements -This app requires the following environment -``` -python >= 3.8 -Cisco ISE >= 3.3 -``` \ No newline at end of file +- **Configurable Reporting**: Supports HW/SW cataloging, endpoint profiles, and custom posture policies. +- **Flexible Authentication**: Choose from certificate-based, text-based, and/or ERS-based authentication. +- **Automated Email Notifications**: Sends reports via email to specified recipients. +- **Customizable Profiles and Buckets**: Allows for logical organization of endpoints into profiles and buckets. +- **Specialized Reporting Options**: Option to focus reports on hardware details or other endpoint specifics. + +## Configuration + +The tool uses `config_templete.yaml` for its settings. Here are some key sections to configure: + +### General Report Settings + +- **Policy Name**: Define the NAC policy name with `policy_name`. +- **Output Destination**: Set the `destination_filepath` for where the report should be saved. +- **Notification Settings**: Toggle `send_email` to enable email notifications. + +### Authentication Settings + +- **Certificate-Based**: Set `authentication.cert_based.use` to `True` and provide `cert_pfx_location` and `cert_password`. +- **Text-Based**: Toggle `authentication.text_based.use` and provide `username` and `password` if preferred. +- **ERS-Based**: Uses `ers_based.username` and `ers_based.password`. Please make sure this account has the correct permission in ISE + +### SMTP Configuration + +Set up email notifications with: + +```yaml +smtp: + email: your_email@example.com + server: smtp.example.com + port: 25 + destination_email: recipient@example.com + destination_email_cc: + - cc1@example.com + - cc2@example.com diff --git a/ise_control.py b/ise_control.py index a06800d..2fc9b26 100755 --- a/ise_control.py +++ b/ise_control.py @@ -1,14 +1,11 @@ -import base64 import json import pandas as pd import requests from requests.auth import HTTPBasicAuth -from tqdm import tqdm from utilities import Rutils, log_collector from requests_pkcs12 import Pkcs12Adapter -from os import getpid from ssl import create_default_context, CERT_NONE from xmltodict import parse as xmlparse import oracledb @@ -135,7 +132,7 @@ def logout_ise_session(self): self.session.get(f'https://{self.ip}/admin/logout.jsp') return - def dataconnect_engine(self,sql_string) -> pd.DataFrame: + def dataconnect_engine(self, sql_string) -> pd.DataFrame: # skip Oracle Server Cert Validation db_ssl_context = create_default_context() db_ssl_context.check_hostname = False @@ -152,11 +149,15 @@ def dataconnect_engine(self,sql_string) -> pd.DataFrame: port=2484, ssl_context=db_ssl_context ) - # get info from DB + # get as many rows as possible on a trip but dont overload mem if we dont have any 10K should be good size for the max amount from a query + # https://oracle.github.io/python-oracledb/samples/tutorial/Python-and-Oracle-Database-The-New-Wave-of-Scripting.html#fetching cursor = connection.cursor() + cursor.prefetchrows = 10001 + cursor.arraysize = 10000 + # get info from DB cursor.execute(sql_string) columns = [desc[0] for desc in cursor.description] - output = cursor.fetchall() + data = cursor.fetchall() cursor.close() connection.close() except Exception as execpt_error: @@ -165,8 +166,8 @@ def dataconnect_engine(self,sql_string) -> pd.DataFrame: try: # put in df - dc_pd = pd.DataFrame(output, columns=columns) - # clean DB objects from df + dc_pd = pd.DataFrame(data, columns=columns) + # clean DB objects from df that cant be converted to STR type badcols = [] for x in dc_pd.columns.tolist(): try: @@ -191,14 +192,19 @@ def get_all_active_sessions(self) -> pd.DataFrame: galls = self.mnt_data_retrival("Session/ActiveList") if galls.status_code == 200: data_dict = xmlparse(galls.content) - df = pd.json_normalize(data_dict['activeList']['activeSession']) - self.logger.debug(f'{df.shape[0]} Active Sessions Obtained') - return df + # if we have active sessions + if bool(data_dict['activeList'].get('activeSession')): + df = pd.json_normalize(data_dict['activeList']['activeSession']) + self.logger.debug(f'{df.shape[0]} Active Sessions Obtained') + return df + else: + self.logger.critical(f'NO active sessions found...') + return pd.DataFrame([]) else: - self.logger.critical('No active sessions found in results!') + self.logger.critical(f'received back response code {galls.status_code} CANNOT PROCESS ACTIVE SESSIONS ') return pd.DataFrame([]) - def get_all_profiler_count(self): + def get_all_profiler_count(self) -> int: self.logger.debug('Obtaining active profile count') gapc = self.mnt_data_retrival("Session/ProfilerCount") if gapc.status_code == 200: @@ -242,23 +248,67 @@ def get_license_info(self): self.logger.debug('Obtained Serial Number') return sn_data - def get_endpoint_software_info(self): - # applications data - host_sw = 'pageType=app&columns=productName%2Cversion%2CvendorName%2Ccategories%2CoperatingSystem%2CnoOfDevicesPerApp&sortBy=productName&startAt=1&pageSize=10000' - # transform to base64 then into the str representation of it - host_sw = base64.b64encode(str.encode(host_sw)).decode('utf-8') - # session cookie are persistence so only need to add this header that was implemented from the JS caller - headers = {'_QPH_': host_sw} - url = f"https://{self.ip}/admin/rs/uiapi/visibility" - self.sw_catalog = self.session.get(url, headers=headers) - return + def get_endpoint_software_info(self) -> pd.DataFrame: + endpoints = [] + step_page = 1 + control_size = 100 + + self.logger.info(f'Getting Collected software information') + sw_url = f"https://{self.ip}/admin/rs/uiapi/visibility" + while True: + header_data = f'pageType=app&' \ + f'columns=productName%2C' \ + f'version%2C' \ + f'vendorName%2C' \ + f'categories%2C' \ + f'operatingSystem%2C' \ + f'noOfDevicesPerApp&' \ + f'sortBy=productName&' \ + f'startAt={step_page}&' \ + f'pageSize={control_size}' \ + + # transform to base64 then into the str representation of it + header_data = self.UTILS.encode_data(header_data) + # session cookie are persistence so only need to add this header that was implemented from the JS caller + header = self.HEADER_DATA.copy() + header['_QPH_'] = header_data + response = self.session.get(sw_url, headers=header) + + if response.status_code == 200: + ep_data = response.json() + if len(ep_data) > 0: + endpoints += ep_data + step_page += 1 + else: + self.logger.critical(f'GESI: no HW data for endpoints on page {step_page}') + break + else: + self.logger.debug(f'GESI: received back response code {response.status_code} on data retrieval') + break + + # clean list and transform json str to dicts to load into DF + # check if anything in the list + if len(endpoints) > 0: + # ETL + sw_data = pd.DataFrame(endpoints) + sw_data.drop(columns=['id','productId'], inplace=True) + sw_data.fillna('None', inplace=True) + sw_data.drop_duplicates(inplace=True) + sw_data.reset_index(drop=True, inplace=True) + + self.logger.info(f'Gathered {sw_data.shape[0]} Types of SW') + self.logger.info('SW data collection complete') + return sw_data + else: + self.logger.critical(f'GESI: no software data gathered from ISE') + return pd.DataFrame([]) def get_endpoint_hardware_info(self) -> pd.DataFrame: endpoints = [] step_page = 1 control_size = 100 - self.logger.info(f'Getting endpoint hardware info') + self.logger.info(f'Getting Collected hardware information') url = f"https://{self.ip}/admin/rs/uiapi/hwvisibility" while True: # step thru endpoint pages @@ -271,7 +321,7 @@ def get_endpoint_hardware_info(self) -> pd.DataFrame: f'total_entries={control_size}' # transform to base64 then into the str representation of it - header_data = base64.b64encode(str.encode(header_data)).decode('utf-8') + header_data = self.UTILS.encode_data(header_data) # session cookie are persistence so only need to add this header that was implemented from the JS caller header = self.HEADER_DATA.copy() header['_QPH_'] = header_data @@ -283,8 +333,11 @@ def get_endpoint_hardware_info(self) -> pd.DataFrame: endpoints += ep_data step_page += 1 else: - self.logger.critical(f'no HW data for endpoints on page {step_page}') + self.logger.critical(f'GEHI: no HW data for endpoints on page {step_page}') break + else: + self.logger.debug(f'GEHI: received back response code {response.status_code} on data retrieval') + break # clean list and transform json str to dicts to load into DF endpoints = list(set(endpoints)) @@ -296,48 +349,10 @@ def get_endpoint_hardware_info(self) -> pd.DataFrame: self.logger.info('Endpoint HW data collection complete') return hw_data else: - self.logger.critical(f'no Hardware data gathered from ISE') + self.logger.critical(f'GEHI: no Hardware data gathered from ISE') return pd.DataFrame([]) - def special_reporting_data(self): - special_rep = self.config['special_reporting'] - reporting_location = special_rep.get('reporting_location') - find_files = special_rep.get('files_to_look_for') - filter_list = special_rep.get('filter_list') - special_items = special_rep.get('filter_specifics') - attr_to_look_for = special_rep.get('get_attribute_from_endpoint') - fnames = self.UTILS.get_files_from_loc(reporting_location, find_files) - # df holder - self.endpoints = pd.DataFrame([]) - for f in fnames: - ep_df = pd.read_csv(f'{reporting_location}/{f}') - ep_df = self.filter_data(ep_df, filter_list, special_items) - self.endpoints = pd.concat([self.endpoints, ep_df], ignore_index=True) - self.UTILS.create_file_path('archive', f, parent_dir=reporting_location) - self.get_metadata_from_endpoints(attr_to_look_for) - self.logger.info('Endpoint special data collection complete') - - def filter_data(self, raw_df: pd.DataFrame, filter_list: list, data_matching: dict = None): - raw_df.drop(columns=filter_list, inplace=True) - # if we have specifics we want to match on - if data_matching: - for k, v in data_matching.items(): - # try to see if we fit a usecase if not keep going - try: - raw_df = raw_df[raw_df[k].astype(int) > v] - continue - except Exception as error: - self.logger.debug(error) - - try: - raw_df = raw_df[raw_df[k].str.contains(v)] - continue - except Exception as error: - self.logger.debug(error) - return raw_df - def retrieve_endpoint_data(self): - # todo: need to fix with updated code # deployment ID self.sn = self.get_license_info() self.endpoint_policies = None @@ -352,5 +367,7 @@ def retrieve_endpoint_data(self): if __name__ == '__main__': ise = ISE() - ise.retrieve_endpoint_data() - ise.logout_ise_session() + # ise.retrieve_endpoint_data() + # ise.get_endpoint_software_info() + # ise.get_endpoint_hardware_info() + # ise.logout_ise_session() diff --git a/report_data.py b/report_data.py index 1cd2da2..42e5cf9 100755 --- a/report_data.py +++ b/report_data.py @@ -1,9 +1,8 @@ import csv import time -from json import loads from os import path + import pandas as pd -from tqdm import tqdm from ise_control import ISE from messaging import Messaging @@ -30,7 +29,7 @@ def create_ise_endpoint_report(self): # special reports if self.ise.config.get('special_reporting').get('use'): self.create_special_reporting() - quit() + return # reg report fname = self.utils.create_file_path('endpoint_reports', f'{self.ise.config["report"]["organization"]}_step{self.ise.step}_{self.timestr}.csv') @@ -67,6 +66,7 @@ def create_ise_endpoint_report(self): if self.ise.config["report"]['send_email']: messager = Messaging(self.ise.config) messager.send_message(msg_attac_loc_or_buf=fname) + return def ise_step_1(self, writer, ise_eps): writer.writerow([f'{self.reporting_name}-Step{self.ise.step}-2.1 {self.ise.config["report"]["prepared_for"]} Device Category', self.ise.endpoints.shape[0]]) @@ -94,7 +94,7 @@ def ise_step_2(self, writer): common_computing_profiles = 'server|red hat| hel|workstation|OSX' # db queries - get_all_posture_endpoints = "select * from posture_assessment_by_condition" + get_all_posture_endpoints = "select POLICY,ENDPOINT_ID from posture_assessment_by_condition" get_all_auths = "select ORIG_CALLING_STATION_ID,AUTHENTICATION_METHOD,AUTHENTICATION_PROTOCOL,POSTURE_STATUS,ENDPOINT_PROFILE from RADIUS_AUTHENTICATIONS" get_all_endpoints ="select B.LOGICAL_PROFILE, B.ASSIGNED_POLICIES, A.MAC_ADDRESS from ENDPOINTS_DATA A, LOGICAL_PROFILES B where A.ENDPOINT_POLICY = B.ASSIGNED_POLICIES" get_portal_endpoints ="select MAC_ADDRESS, PORTAL_USER from ENDPOINTS_DATA" @@ -203,56 +203,32 @@ def ise_step_2(self, writer): self.ise.logger.info("Finished all reports!") return - def create_ise_sw_hw_report(self, type_='software', hw_mac_list: list = None): - # function import until we plop this on the devops server - fname = self.utils.create_file_path('endpoint_reports', f'{self.ise.config["report"]["organization"]}_step{self.ise.step}_{self.timestr}.csv') - vis = None - if type_ == 'software': + def ise_sw_hw_data(self, ware_type): + ware_data = None + + # path creation + fname = self.utils.create_file_path('endpoint_reports', f'{self.ise.config["report"]["organization"]}_{ware_type.upper()}_{self.timestr}.csv') + + if ware_type == 'software': self.ise.logger.info('Collecting Endpoint software information from ISE') - self.ise.get_endpoint_software_info() - vis = pd.DataFrame(loads(self.ise.sw_catalog.text)) - vis.drop(columns=['id', 'productId'], inplace=True) - else: + ware_data = self.ise.get_endpoint_software_info() + elif ware_type == 'hardware': self.ise.logger.info('Collecting Endpoint hardware information from ISE') - hw_count = 0 - hw_attr_list = [] - if hw_mac_list is not None: - for hw_mac in tqdm(hw_mac_list, total=(len(hw_mac_list)), desc="Getting Hardware info from endpoints", colour='red'): - hw_catalog = self.ise.get_endpoint_hardware_info(hw_mac) - try: - hw_catalog = loads(hw_catalog.text) - if len(hw_catalog) < 1: - # No Hardware Endpoint Data to report - raise ValueError - hw_count += 1 - for hwa in hw_catalog: - hw_attr_list.append(hwa) - except Exception as error: - self.ise.logger.debug(f'CHWR: {error}') - pass - # DONT KNOW WHAT THE FUCK IS THAT - # hw_attr_list = [dict(t) for t in {tuple(d.items()) for d in l}] - if len(hw_attr_list) < 1: - self.ise.logger.error(f'No {type_} Data to Report') - return - vis = pd.DataFrame(hw_attr_list) - vis['endpoint_count'] = hw_count - vis.drop(columns=['vendorId', 'productId'], inplace=True) - vis.to_csv(fname, index=False) - self.ise.logger.info(f'Endpoint {type_} Report Done! Saved to: {fname}') - # send email - if self.ise.config["report"]['send_email']: - messager = Messaging(self.ise.config) - messager.send_message(msg_attac_loc_or_buf=fname) + ware_data = self.ise.get_endpoint_hardware_info() + + ware_data.to_csv(fname, index=False) + self.ise.logger.info(f'{ware_type} Report Done! Saved to: {fname}') + return fname def create_special_reporting(self): - self.ise.special_reporting_data() + self.ise.logger.info('Generating Special Reporting information from ISE') + fname = self.ise_sw_hw_data(ware_type=self.ise.config.get('special_reporting')['sr_type']) # send email if self.ise.config["report"]['send_email']: messager = Messaging(self.ise.config) - messager.send_message(msg_attac_loc_or_buf=self.ise.endpoints, attachment_name=self.ise.config['special_reporting']['name_of_file_to_send']) + messager.send_message(msg_attac_loc_or_buf=fname) if __name__ == '__main__': c2r = ISEReport() - c2r.create_ise_endpoint_report(incl_report_type='None') + c2r.create_ise_endpoint_report() diff --git a/templates/config_templete.yaml b/templates/config_templete.yaml index 6562c95..5266612 100755 --- a/templates/config_templete.yaml +++ b/templates/config_templete.yaml @@ -77,19 +77,10 @@ step2_conditions_custom: - usb: example_custom_policy_name -# if you need a custom report thats already sitting in the repository server +# if you need a custom report for just all cataloged software/hardware information special_reporting: - use: False - reporting_location: '/path/to/reports' - files_to_look_for: - - 'test' - name_of_file_to_send: 'suff_to_send.csv' - filter_list: - - 'useless Col data' - filter_specifics: - 'ACCT_SESSION_TIME': 0 - 'AUTHENTICATION_RULE': 'Allow Guest Users' - get_attribute_from_endpoint: 'MAC ADDRESS' + use: True + sr_type: 'hardware' # get all endpoints or just connected ones only_connected: True