diff --git a/.github/workflows/black.yml b/.github/workflows/black.yml new file mode 100644 index 0000000..484e337 --- /dev/null +++ b/.github/workflows/black.yml @@ -0,0 +1,67 @@ +name: Python QA + +on: + push: + branches: ["main"] + pull_request: + branches: ["main"] + +permissions: + contents: read + +jobs: + black: + runs-on: ubuntu-latest + name: Python QA + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python 3.12 + uses: actions/setup-python@v4 + with: + python-version: "3.12" + + # Cache the installation of Poetry itself, e.g. the next step. This prevents the workflow + # from installing Poetry every time, which can be slow. + - name: cache poetry install + uses: actions/cache@v3 + with: + path: ~/.local + key: poetry-1.8.2 + + # Install Poetry. + # The key configuration value here is `virtualenvs-in-project: true`: this creates the + # venv as a `.venv` in your testing directory, which allows the next step to easily + # cache it. + - uses: snok/install-poetry@v1.3.4 + with: + version: 1.8.2 + virtualenvs-create: true + virtualenvs-in-project: true + installer-parallel: true + + # Cache your dependencies (i.e. all the stuff in your `pyproject.toml`). Note the cache + # key: if you're using multiple Python versions, or multiple OSes, you'd need to include + # them in the cache key. I'm not, so it can be simple and just depend on the poetry.lock. + - name: cache deps + id: cache-deps + uses: actions/cache@v3 + with: + path: .venv + key: pydeps-${{ hashFiles('**/poetry.lock') }} + + # Install dependencies. `--no-root` means "install all dependencies but not the project itself" + - run: poetry install --no-interaction --no-root + if: steps.cache-deps.outputs.cache-hit != 'true' + + # Now install _your_ project. + - run: poetry install --no-interaction + + ################################################################ + # Now finally run your code quality tools + ################################################################ + + - name: Format with black + run: | + poetry run black 'p6' --check diff --git a/p6/calc_type_enum.py b/p6/calc_type_enum.py index 0d30df2..9df85b2 100644 --- a/p6/calc_type_enum.py +++ b/p6/calc_type_enum.py @@ -1,9 +1,9 @@ from enum import Enum + class CalcType(Enum): - BASELINE = 'baseline' - AVERAGE = 'average' - MAX = 'max' - SQUARED = 'squared' - RATIOS = 'ratios' - \ No newline at end of file + BASELINE = "baseline" + AVERAGE = "average" + MAX = "max" + SQUARED = "squared" + RATIOS = "ratios" diff --git a/p6/linear_optimization/optimizer.py b/p6/linear_optimization/optimizer.py index 6f4d5d6..bb60f3b 100644 --- a/p6/linear_optimization/optimizer.py +++ b/p6/linear_optimization/optimizer.py @@ -1,28 +1,30 @@ +from datetime import datetime import os import pandas as pd import gurobipy as gp from gurobipy import GRB -from datetime import datetime from dotenv import load_dotenv from p6.calc_type_enum import CalcType from p6.utils import log from p6.utils import data as dataUtils + logger = log.setupCustomLogger(__name__) -load_dotenv('variables.env') +load_dotenv("variables.env") -# Environment variables +OPT_MODELS_OUTPUT_DIR = os.getenv("OPT_MODELS_OUTPUT_DIR") + +# Environment gurobi license variables options = { "WLSACCESSID": os.getenv("WLSACCESSID"), "WLSSECRET": os.getenv("WLSSECRET"), "LICENSEID": int(os.getenv("LICENSEID")), } -LOGGING_DIR = os.getenv('LOGGING_DIR') -def runLinearOptimizationModel(model, links, flows, traffic, timestamp): +def runLinearOptimizationModel(model, links, flows, traffic, timestamp, savelp=False): """ Runs the linear optimization model to calculate the link utilization and the average link utilization. @@ -30,84 +32,124 @@ def runLinearOptimizationModel(model, links, flows, traffic, timestamp): ---------- #### model: string The optimization model to run, can be 'averageUtilization', 'maxUtilization', or 'squaredUtilization'. - + #### links: dict The links in the network, indexed by linkName. - + #### paths: dict The paths for each source-destination pair, with the paths split into a list of paths. - + #### traffic: dict The traffic for each source-destination pair. - + ### Returns: ---------- The total link utilization, the average link utilization, and the link utilization for each link. """ - logger.info('Started running linear optimization model...') + logger.info("Started running linear optimization model...") with gp.Env(params=options) as env, gp.Model(env=env) as m: # Create optimization model based on the input model m = gp.Model("network_optimization", env=env) # Decision variables for path ratios for each source-destination pair - path_ratios = m.addVars([(sd, pathNum) for sd in flows for pathNum in range(len(flows[sd]))], vtype=GRB.CONTINUOUS, name="PathRatios") + path_ratios = m.addVars( + [(sd, pathNum) for sd in flows for pathNum in range(len(flows[sd]))], + vtype=GRB.CONTINUOUS, + name="PathRatios", + ) match model: case CalcType.AVERAGE.value: utilization = m.addVars(links, vtype=GRB.CONTINUOUS, name="Utilization") - m.setObjective(gp.quicksum((utilization[link]/links[link]['capacity'] for link in links)), GRB.MINIMIZE) + m.setObjective( + gp.quicksum( + (utilization[link] / links[link]["capacity"] for link in links) + ), + GRB.MINIMIZE, + ) case CalcType.MAX.value: max_utilization = m.addVar(vtype=GRB.CONTINUOUS, name="MaxUtilization") m.setObjective(max_utilization, GRB.MINIMIZE) case CalcType.SQUARED.value: utilization = m.addVars(links, vtype=GRB.CONTINUOUS, name="Utilization") - m.setObjective(gp.quicksum((utilization[link]**2 for link in links)), GRB.MINIMIZE) + m.setObjective( + gp.quicksum((utilization[link] ** 2 for link in links)), + GRB.MINIMIZE, + ) case _: - raise ValueError(f'Invalid model: {model}') + raise ValueError(f"Invalid model: {model}") # Constraints for each link's utilization # Consists of the sum of ratios and traffic for each path related to the link for link in links: linkTuple = tuple((link[:5], link[5:])) link_flow = gp.quicksum( - path_ratios[sd, pathNum] * traffic[sd] - if linkTuple in zip(flows[sd][pathNum][:-1], flows[sd][pathNum][1:]) - else 0 - for sd in links[link]['listFlows'] for pathNum in range(len(flows[sd])) + ( + path_ratios[sd, pathNum] * traffic[sd] + if linkTuple in zip(flows[sd][pathNum][:-1], flows[sd][pathNum][1:]) + else 0 + ) + for sd in links[link]["listFlows"] + for pathNum in range(len(flows[sd])) ) - m.addConstr(link_flow <= links[link]['capacity'], name=f"cap_{link}") + m.addConstr(link_flow <= links[link]["capacity"], name=f"cap_{link}") match model: - case CalcType.AVERAGE.value: - m.addConstr(link_flow == links[link]['capacity'] * utilization[link], name=f"util_{link}") + case CalcType.AVERAGE.value: + m.addConstr( + link_flow == links[link]["capacity"] * utilization[link], + name=f"util_{link}", + ) case CalcType.MAX.value: - m.addConstr(link_flow / links[link]['capacity'] <= max_utilization, name=f"util_{link}") + m.addConstr( + link_flow / links[link]["capacity"] <= max_utilization, + name=f"util_{link}", + ) case CalcType.SQUARED.value: - m.addConstr(link_flow == utilization[link] * links[link]['capacity'], name=f"util_{link}") + m.addConstr( + link_flow == utilization[link] * links[link]["capacity"], + name=f"util_{link}", + ) case _: - raise ValueError(f'Invalid model: {model}') - + raise ValueError(f"Invalid model: {model}") + for sd in traffic: - m.addConstr(path_ratios.sum(sd, '*') == 1, name=f"traffic_split_{sd}") + m.addConstr(path_ratios.sum(sd, "*") == 1, name=f"traffic_split_{sd}") - m.write(f"{model}.lp") + if savelp: + if not os.path.exists(OPT_MODELS_OUTPUT_DIR): + os.makedirs(OPT_MODELS_OUTPUT_DIR) - logger.info('Started optimization...') + ts = datetime.now().strftime("%Y%m%d") + time = (timestamp[:3] + timestamp[4:-6]).lower() + m.write(f"{OPT_MODELS_OUTPUT_DIR}/{ts}_{model}_{time}.lp") + + logger.info("Started optimization...") m.optimize() - logger.info('Finished optimization') + logger.info("Finished optimization") # Output the results ratioData = [] if m.status == GRB.OPTIMAL: - #debug and save optimal path ratios + # debug and save optimal path ratios for sd in flows: logger.debug(f"Optimal path ratios for {sd}:") for pathNum in range(len(flows[sd])): - ratioData.append([timestamp, sd, pathNum, path_ratios[sd, pathNum].x]) - logger.debug(f" Path {pathNum}: {path_ratios[sd, pathNum].x * 100} %") - - dataUtils.writeDataToFile(pd.DataFrame(ratioData, columns=['timestamp', 'flowName', 'pathNum', 'ratio']), model, True) + ratioData.append( + [timestamp, sd, pathNum, path_ratios[sd, pathNum].x] + ) + logger.debug( + f" Path {pathNum}: {path_ratios[sd, pathNum].x * 100} %" + ) + + dataUtils.writeDataToFile( + pd.DataFrame( + ratioData, columns=["timestamp", "flowName", "pathNum", "ratio"] + ), + model, + True, + ) # Calculate average, min and max link utilization totalLinkUtil = 0 @@ -116,31 +158,34 @@ def runLinearOptimizationModel(model, links, flows, traffic, timestamp): for link in links: linkTuple = tuple((link[:5], link[5:])) link_flow = sum( - path_ratios[sd, pathNum].x * traffic[sd] - if linkTuple in zip(flows[sd][pathNum][:-1], flows[sd][pathNum][1:]) - else 0 - for sd in links[link]['listFlows'] for pathNum in range(len(flows[sd])) + ( + path_ratios[sd, pathNum].x * traffic[sd] + if linkTuple + in zip(flows[sd][pathNum][:-1], flows[sd][pathNum][1:]) + else 0 + ) + for sd in links[link]["listFlows"] + for pathNum in range(len(flows[sd])) ) - totalLinkUtil += link_flow / links[link]['capacity'] * 100 - + totalLinkUtil += link_flow / links[link]["capacity"] * 100 + # Update min and max link utilization - if (link_flow / links[link]['capacity'] * 100) < minLinkUtil: - minLinkUtil = link_flow / links[link]['capacity'] * 100 - if (link_flow / links[link]['capacity'] * 100) > maxLinkUtil: - maxLinkUtil = link_flow / links[link]['capacity'] * 100 - + if (link_flow / links[link]["capacity"] * 100) < minLinkUtil: + minLinkUtil = link_flow / links[link]["capacity"] * 100 + if (link_flow / links[link]["capacity"] * 100) > maxLinkUtil: + maxLinkUtil = link_flow / links[link]["capacity"] * 100 + avgLinkUtil = totalLinkUtil / len(links) logger.info(f"Average link utilization: {avgLinkUtil}% for model {model}") - + return avgLinkUtil, minLinkUtil, maxLinkUtil elif m.status == GRB.INFEASIBLE: - logger.error('Model is infeasible') + logger.error("Model is infeasible") m.computeIIS() - logger.error('The following constraints cannot be satisfied:') + logger.error("The following constraints cannot be satisfied:") for c in m.getConstrs(): if c.IISConstr: logger.error(c.constrName) else: - logger.error('Optimization ended with status %d' % m.status) - \ No newline at end of file + logger.error("Optimization ended with status %d" % m.status) diff --git a/p6/main.py b/p6/main.py index f35e492..a364dcd 100644 --- a/p6/main.py +++ b/p6/main.py @@ -1,5 +1,9 @@ import argparse +import pandas as pd import statistics as stats +import multiprocessing as mp + +from multiprocessing import set_start_method from p6.calc_type_enum import CalcType from p6.utils import data as dataUtils @@ -9,66 +13,121 @@ logger = log.setupCustomLogger(__name__) -import pandas as pd - DATA_DAY = 2 + def calcLinkUtil(links): util = {} for linkKey in links: - util[linkKey] = links[linkKey]['totalTraffic'] / links[linkKey]['capacity'] * 100 + util[linkKey] = ( + links[linkKey]["totalTraffic"] / links[linkKey]["capacity"] * 100 + ) return util + +def process_flows_hour(timestamp, flows, traffic, args, linksCopy): + links = linksCopy + + # Initialize totalTraffic and listFlows for all links + for linkKey in links: + links[linkKey]["totalTraffic"] = 0 + links[linkKey]["listFlows"] = [] + + logger.info(f"Processing {timestamp} with {len(flows)} flows...") + for flow in flows: + routers = nwUtils.getRoutersHashFromFlow(flows[flow]) + flowLinks = nwUtils.getFlowLinks(routers, links) + + # Update links with traffic, and if link is new, add it to links + for linkKey in flowLinks: + if linkKey in links: + links[linkKey]["totalTraffic"] += ( + traffic[flow] * flowLinks[linkKey].trafficRatio + ) + else: + links[linkKey] = { + "linkStart": flowLinks[linkKey].linkStart, + "linkEnd": flowLinks[linkKey].linkEnd, + "capacity": flowLinks[linkKey].capacity, + "totalTraffic": traffic[flow] * flowLinks[linkKey].trafficRatio, + "listFlows": [], + } + + # Add this flow to the list of flows for this link + links[linkKey]["listFlows"].append(flow) + + # Run linear optimization or baseline calculations + if args.model_type == CalcType.BASELINE.value: + linkUtil = calcLinkUtil(links) + return [ + timestamp, + min(linkUtil.values()), + max(linkUtil.values()), + stats.mean(linkUtil.values()), + ] + else: + avgLinkUtil, minLinkUtil, maxLinkUtil = linOpt.runLinearOptimizationModel( + args.model_type, links, flows, traffic, timestamp, args.save_lp_models + ) + logger.info("LINEAR OPTIMIZATION RETURNED!") + return [timestamp, minLinkUtil, maxLinkUtil, avgLinkUtil] + + def main(): parser = argparse.ArgumentParser() - parser.add_argument('model_type', choices=[CalcType.BASELINE.value, CalcType.AVERAGE.value, CalcType.MAX.value, CalcType.SQUARED.value], help='type of calculation to run') + parser.add_argument( + "model_type", + choices=[ + CalcType.BASELINE.value, + CalcType.AVERAGE.value, + CalcType.MAX.value, + CalcType.SQUARED.value, + ], + help="type of calculation to run", + ) + parser.add_argument( + "-slpm", + "--save-lp-models", + action="store_true", + help="save linear optimization models", + ) args = parser.parse_args() - logger.info('Started, model_type: ' + str(args.model_type)) + # Set start method to spawn to avoid issues with multiprocessing on Windows + set_start_method("spawn") + + startTime = pd.Timestamp.now() + logger.info("Started, model_type: " + str(args.model_type)) flows = dataUtils.readFlows(DATA_DAY) links = dataUtils.readLinks() traffic = dataUtils.readTraffic(DATA_DAY) - utilStats = [] - - for timestamp in flows: - # Reset totalTraffic and listFlows for all links in this timestamp - for linkKey in links: - links[linkKey]['totalTraffic'] = 0 - links[linkKey]['listFlows'] = [] - - logger.info(f'Processing {timestamp} with {len(flows[timestamp])} flows...') - for flow in flows[timestamp]: - routers = nwUtils.getRoutersHashFromFlow(flows[timestamp][flow]) - flowLinks = nwUtils.getFlowLinks(routers, links) - - # Update links with traffic, and if link is new, add it to links - for linkKey in flowLinks: - if(linkKey in links): - links[linkKey]['totalTraffic'] += traffic[timestamp][flow] * flowLinks[linkKey].trafficRatio - else: - links[linkKey] = { - 'linkStart': flowLinks[linkKey].linkStart, - 'linkEnd': flowLinks[linkKey].linkEnd, - 'capacity': flowLinks[linkKey].capacity, - 'totalTraffic': traffic[timestamp][flow] * flowLinks[linkKey].trafficRatio - } - links[linkKey]['listFlows'] = [] - - # Add this flow to the list of flows for this link - links[linkKey]['listFlows'].append(flow) - - # Run linear optimization or baseline calculations - if (args.model_type == CalcType.BASELINE.value): - linkUtil = calcLinkUtil(links) - utilStats.append([timestamp, min(linkUtil.values()), max(linkUtil.values()), stats.mean(linkUtil.values())]) - else: - avgLinkUtil, minLinkUtil, maxLinkUtil = linOpt.runLinearOptimizationModel(args.model_type, links, flows[timestamp], traffic[timestamp], timestamp) - utilStats.append([timestamp, minLinkUtil, maxLinkUtil, avgLinkUtil]) - - dataUtils.writeDataToFile(pd.DataFrame(utilStats, columns=['timestamp', 'min_util', 'max_util', 'avg_util']), args.model_type) - - logger.info('Finished') + with mp.Pool() as pool: + results = pool.starmap( + process_flows_hour, + [ + (timestamp, flows[timestamp], traffic[timestamp], args, links.copy()) + for timestamp in flows + ], + ) + + logger.info("Finished processing all timestamps!") + + results.sort() + dataUtils.writeDataToFile( + pd.DataFrame( + results, columns=["timestamp", "min_util", "max_util", "avg_util"] + ), + args.model_type, + ) + + endTime = pd.Timestamp.now() + + # Log elapsed time in hours, minutes and seconds + elapsedTime = (endTime - startTime).components + logger.info( + f"Finished, elapsed time: {elapsedTime.hours} hours, {elapsedTime.minutes} minutes, {elapsedTime.seconds} seconds" + ) diff --git a/p6/models/network.py b/p6/models/network.py index 555f24f..65ba63d 100644 --- a/p6/models/network.py +++ b/p6/models/network.py @@ -10,6 +10,7 @@ def addConnection(self, link, isEgress): else: self.ingress[link.name] = link + class Link: def __init__(self, linkStart, linkEnd, capacity): self.linkStart = linkStart @@ -17,4 +18,3 @@ def __init__(self, linkStart, linkEnd, capacity): self.capacity = capacity self.name = f"{linkStart}{linkEnd}" self.trafficRatio = 0 - \ No newline at end of file diff --git a/p6/utils/data.py b/p6/utils/data.py index 09bb3d7..babf1cd 100644 --- a/p6/utils/data.py +++ b/p6/utils/data.py @@ -1,12 +1,14 @@ import os import sys import pandas as pd +import multiprocessing as mp from p6.utils import log +from functools import partial from datetime import datetime from dotenv import load_dotenv -load_dotenv('variables.env') +load_dotenv("variables.env") logger = log.setupCustomLogger(__name__) DATASET_PATH = os.getenv("DATASET_PATH") @@ -15,7 +17,20 @@ DATASET_LINKS_NAME = os.getenv("DATASET_LINKS_NAME") DATA_OUTPUT_DIR = os.getenv("DATA_OUTPUT_DIR") -RATIO_OUTPUT_DIR = os.getenv("RATIO_OUTPUT_DIR") +RATIOS_OUTPUT_DIR = os.getenv("RATIOS_OUTPUT_DIR") + + +def _process_group(chunk, group_func): + return chunk.groupby(["timestamp", "pathName"])["path"].apply(group_func) + + +def _group_func(x): + return [path[1:-1].split(";") for path in x] + + +def _merge_results(results): + return {k: v for result in results for k, v in result.items()} + def readFlows(day): """ @@ -33,20 +48,51 @@ def readFlows(day): """ try: - logger.info('START: reading flows...') + logger.info("START: reading flows...") + + logger.info("Reading paths...") + dataFlows = pd.read_csv( + f"{DATASET_PATH}/{DATASET_PATHS_PREFIX}{day}.csv", + names=["timestamp", "pathStart", "pathEnd", "path"], + engine="pyarrow", + ) + dataFlows["pathName"] = dataFlows["pathStart"] + dataFlows["pathEnd"] + logger.info( + "Finished reading paths, number of paths: " + str(len(dataFlows.index)) + ) - logger.info('Reading paths...') - dataFlows = pd.read_csv(f'{DATASET_PATH}/{DATASET_PATHS_PREFIX}{day}.csv', names=['timestamp', 'pathStart', 'pathEnd', 'path'], engine='pyarrow') - dataFlows['pathName'] = dataFlows['pathStart'] + dataFlows['pathEnd'] - logger.info('Finished reading paths, number of paths: ' + str(len(dataFlows.index))) - # Grouping paths by timestamp and pathName, and splitting the path string into a list of paths - logger.debug('Grouping paths...') - grouped_flows = dataFlows.groupby(['timestamp', 'pathName'])['path'].apply(lambda x: [path[1:-1].split(';') for path in x]).to_dict() - logger.debug('Finished grouping paths') + logger.debug("Grouping paths...") + + # Splitting data into chunks for multiprocessing + cpu_count = mp.cpu_count() + chunk_size = len(dataFlows) // cpu_count + logger.info( + f"Grouping in parallel | CPUs: {cpu_count} | chunk_size: {chunk_size} | len(dataFlows): {len(dataFlows)}" + ) + chunks = [ + ( + dataFlows[i:] + if rangeIndex == cpu_count - 1 + else dataFlows[i : i + chunk_size] + ) + for rangeIndex, i in enumerate([i * chunk_size for i in range(cpu_count)]) + ] + + partial_process_group = partial(_process_group, group_func=_group_func) + + # Create a pool of processes and apply the process_group function to each chunk + with mp.Pool() as pool: + results = pool.map(partial_process_group, chunks) + + # Merge the results from all processes + grouped_flows = _merge_results(results) + + # grouped_flows = dataFlows.groupby(['timestamp', 'pathName'])['path'].apply(lambda x: [path[1:-1].split(';') for path in x]).to_dict() + logger.debug("Finished grouping paths") # Constructing the final flows dictionary, only keeping paths with more than one router in path - logger.debug('Constructing flows dictionary...') + logger.debug("Constructing flows dictionary...") flows = {} for (timestamp, pathName), paths in grouped_flows.items(): for path in paths: @@ -56,11 +102,11 @@ def readFlows(day): if timestamp not in flows: flows[timestamp] = {} flows[timestamp][pathName] = paths - logger.debug('Finished constructing flows dictionary') + logger.debug("Finished constructing flows dictionary") - logger.info('END: reading flows, number of groups: ' + str(len(flows))) + logger.info("END: reading flows, number of groups: " + str(len(flows))) except Exception as e: - logger.error(f'Error reading flows: {e}') + logger.error(f"Error reading flows: {e}") sys.exit(1) return flows @@ -76,21 +122,27 @@ def readLinks(): """ try: - logger.info('START: reading links...') - - logger.info('Reading links...') - dataCapacity = pd.read_csv(f'{DATASET_PATH}/{DATASET_LINKS_NAME}.csv.gz', compression="gzip", names=['linkStart', 'linkEnd', 'capacity'], skiprows=1, engine="pyarrow") - dataCapacity['linkName'] = dataCapacity['linkStart'] + dataCapacity['linkEnd'] - dataCapacity.set_index('linkName', inplace=True) - links = dataCapacity.to_dict('index') - #remove links that start and end at the same router - update: this is not necessary cant find any duplicates - #copilot cooked here 🤨 - #links = {k: v for k, v in links.items() if k[:5] != k[5:]} - logger.info('Finished reading links, number of links: ' + str(len(links))) - - logger.info('END: reading links') + logger.info("START: reading links...") + + logger.info("Reading links...") + dataCapacity = pd.read_csv( + f"{DATASET_PATH}/{DATASET_LINKS_NAME}.csv.gz", + compression="gzip", + names=["linkStart", "linkEnd", "capacity"], + skiprows=1, + engine="pyarrow", + ) + dataCapacity["linkName"] = dataCapacity["linkStart"] + dataCapacity["linkEnd"] + dataCapacity.set_index("linkName", inplace=True) + links = dataCapacity.to_dict("index") + # remove links that start and end at the same router - update: this is not necessary cant find any duplicates + # copilot cooked here 🤨 + # links = {k: v for k, v in links.items() if k[:5] != k[5:]} + logger.info("Finished reading links, number of links: " + str(len(links))) + + logger.info("END: reading links") except Exception as e: - logger.error(f'Error reading links: {e}') + logger.error(f"Error reading links: {e}") sys.exit(1) return links @@ -111,21 +163,29 @@ def readTraffic(day): """ try: - logger.info('START: reading traffic...') - - logger.info('Started reading traffic...') - dataTraffic = pd.read_csv(f'{DATASET_PATH}/{DATASET_TRAFFIC_PREFIX}{day}.csv', names=['timestamp', 'flowStart', 'flowEnd', 'traffic'], engine='pyarrow') - dataTraffic['flow'] = dataTraffic['flowStart'] + dataTraffic['flowEnd'] - dataTraffic = dataTraffic.drop(['flowStart','flowEnd'], axis=1) - logger.info('Finished reading traffic, number of flows: ' + str(len(dataTraffic.index))) - + logger.info("START: reading traffic...") + + logger.info("Started reading traffic...") + dataTraffic = pd.read_csv( + f"{DATASET_PATH}/{DATASET_TRAFFIC_PREFIX}{day}.csv", + names=["timestamp", "flowStart", "flowEnd", "traffic"], + engine="pyarrow", + ) + dataTraffic["flow"] = dataTraffic["flowStart"] + dataTraffic["flowEnd"] + dataTraffic = dataTraffic.drop(["flowStart", "flowEnd"], axis=1) + logger.info( + "Finished reading traffic, number of flows: " + str(len(dataTraffic.index)) + ) + # Grouping traffic by timestamp and flow - logger.debug('Grouping traffic...') - grouped_traffic = dataTraffic.groupby(['timestamp', 'flow'])['traffic'].first().to_dict() - logger.debug('Finished grouping traffic') + logger.debug("Grouping traffic...") + grouped_traffic = ( + dataTraffic.groupby(["timestamp", "flow"])["traffic"].first().to_dict() + ) + logger.debug("Finished grouping traffic") # Constructing the final traffic dictionary - logger.debug('Constructing traffic dictionary...') + logger.debug("Constructing traffic dictionary...") traffic = {} for (timestamp, flow), traffic_value in grouped_traffic.items(): if timestamp not in traffic: @@ -134,11 +194,11 @@ def readTraffic(day): if flow[:5] == flow[5:]: continue traffic[timestamp][flow] = traffic_value - logger.debug('Finished constructing traffic dictionary') + logger.debug("Finished constructing traffic dictionary") - logger.info('END: reading traffic, number of groups: ' + str(len(traffic))) + logger.info("END: reading traffic, number of groups: " + str(len(traffic))) except Exception as e: - logger.error(f'Error reading traffic: {e}') + logger.error(f"Error reading traffic: {e}") sys.exit(1) return traffic @@ -153,25 +213,26 @@ def writeDataToFile(data, type, ratioData=None): #### data: pandas.DataFrame The daily utilization data to write to a file. """ - + try: if not os.path.exists(DATA_OUTPUT_DIR): os.makedirs(DATA_OUTPUT_DIR) - if not os.path.exists(RATIO_OUTPUT_DIR): - os.makedirs(RATIO_OUTPUT_DIR) - filePath = '' + filePath = "" timestamp = datetime.now().strftime("%Y%m%d") if ratioData is not None: - time = (data['timestamp'][0][:3] + data['timestamp'][0][4:-6]).lower() - filePath = f'{RATIO_OUTPUT_DIR}/{timestamp}_{type}_{time}_ratios.csv' + if not os.path.exists(RATIOS_OUTPUT_DIR): + os.makedirs(RATIOS_OUTPUT_DIR) + + time = (data["timestamp"][0][:3] + data["timestamp"][0][4:-6]).lower() + filePath = f"{RATIOS_OUTPUT_DIR}/{timestamp}_{type}_{time}_ratios.csv" else: - filePath = f'{DATA_OUTPUT_DIR}/{timestamp}_{type}.csv' + filePath = f"{DATA_OUTPUT_DIR}/{timestamp}_{type}.csv" - logger.info(f'Writing data to file...') - data.to_csv(filePath, mode='w', header=True, index=False) - logger.info(f'Finished writing data to file') + logger.info(f"Writing data to file...") + data.to_csv(filePath, mode="w", header=True, index=False) + logger.info(f"Finished writing data to file") except Exception as e: - logger.error(f'Error writing data to file: {e}') + logger.error(f"Error writing data to file: {e}") sys.exit(1) diff --git a/p6/utils/log.py b/p6/utils/log.py index 4979a77..c524c2c 100644 --- a/p6/utils/log.py +++ b/p6/utils/log.py @@ -4,37 +4,45 @@ from datetime import datetime from dotenv import load_dotenv -load_dotenv('variables.env') + +load_dotenv("variables.env") + def setupCustomLogger(name): - outdir = os.getenv('LOGGING_DIR') + outdir = os.getenv("LOGGING_DIR") if not os.path.exists(outdir): os.makedirs(outdir) - format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + format = "%(asctime)s - %(name)s - %(levelname)s - %(process)d - %(message)s" formatter = logging.Formatter(format) handler = logging.StreamHandler(sys.stdout) handler.setFormatter(formatter) - + timestamp = datetime.now().strftime("%Y%m%d") log_filename = f"{outdir}/{timestamp}_p6.log" - logging.basicConfig(filename=log_filename, level=_logLevel(os.getenv('LOGGING_LEVEL')), format=format) + logging.basicConfig( + filename=log_filename, + level=_logLevel(os.getenv("LOGGING_LEVEL")), + format=format, + ) logger = logging.getLogger(name) logger.addHandler(handler) - + return logger + def _logLevel(level): - if level == 'DEBUG': - return logging.DEBUG - elif level == 'INFO': - return logging.INFO - elif level == 'WARNING': - return logging.WARNING - elif level == 'ERROR': - return logging.ERROR - elif level == 'CRITICAL': - return logging.CRITICAL - else: - return logging.INFO \ No newline at end of file + match level: + case "DEBUG": + return logging.DEBUG + case "INFO": + return logging.INFO + case "WARNING": + return logging.WARNING + case "ERROR": + return logging.ERROR + case "CRITICAL": + return logging.CRITICAL + case _: + return logging.INFO diff --git a/p6/utils/network.py b/p6/utils/network.py index 97a8502..fb7f63c 100644 --- a/p6/utils/network.py +++ b/p6/utils/network.py @@ -2,16 +2,20 @@ from p6.models.network import Router, Link from p6.utils import log + logger = log.setupCustomLogger(__name__) import configparser + config = configparser.ConfigParser() -config.read('config.ini') +config.read("config.ini") from dotenv import load_dotenv -load_dotenv('variables.env') -AVG_CAPACITY = int(os.getenv('AVERAGE_CAPACITY')) +load_dotenv("variables.env") + +AVG_CAPACITY = int(os.getenv("AVERAGE_CAPACITY")) + def getRoutersHashFromFlow(flow): """ @@ -28,20 +32,23 @@ def getRoutersHashFromFlow(flow): """ routersHash = {} - + for path in flow: - prevRouterName = '' + prevRouterName = "" for routerName in reversed(path): if routerName not in routersHash: routersHash[routerName] = Router(routerName) - if prevRouterName != '': - routersHash[prevRouterName].addConnection(routersHash[routerName], False) + if prevRouterName != "": + routersHash[prevRouterName].addConnection( + routersHash[routerName], False + ) routersHash[routerName].addConnection(routersHash[prevRouterName], True) prevRouterName = routerName return routersHash + def getFlowLinks(routers, capacities): """ This function creates a hash of links from routers by traversing breadth-first, and calculates the traffic ratio for each link. @@ -66,34 +73,37 @@ def getFlowLinks(routers, capacities): visited.append(endRouter) queue.append(endRouter) - logger.debug(f'Started traversing (endrouter: {endRouter.name})') + logger.debug(f"Started traversing (endrouter: {endRouter.name})") while queue: currentRouter = queue.pop(0) for ingressKey in currentRouter.ingress: - newLink = Link(currentRouter.ingress[ingressKey].name, currentRouter.name, 0) + newLink = Link( + currentRouter.ingress[ingressKey].name, currentRouter.name, 0 + ) - if(newLink.name not in capacities): + if newLink.name not in capacities: newLink.capacity = AVG_CAPACITY else: - newLink.capacity = capacities[newLink.name]['capacity'] + newLink.capacity = capacities[newLink.name]["capacity"] - if(currentRouter == endRouter): + if currentRouter == endRouter: newLink.trafficRatio = 1 / len(currentRouter.ingress) else: newLink.trafficRatio = _calcLinkRatio(flowLinks, currentRouter) - + flowLinks[newLink.name] = newLink if currentRouter.ingress[ingressKey] not in visited: visited.append(currentRouter.ingress[ingressKey]) queue.append(currentRouter.ingress[ingressKey]) - - logger.debug(f'Finished traversing (endrouter: {endRouter.name})') + + logger.debug(f"Finished traversing (endrouter: {endRouter.name})") return flowLinks + def _getEndRouter(routers): """ Internal function to get the end router of a network. @@ -108,6 +118,7 @@ def _getEndRouter(routers): if len(routers[routerKey].egress) == 0: return routers[routerKey] + def _calcLinkRatio(links, currentRouter): """ Internal function to calculate the traffic ratio for a link. @@ -127,6 +138,7 @@ def _calcLinkRatio(links, currentRouter): return sumEgress / len(currentRouter.ingress) + def printRouterHash(routersHash): for routerKey in routersHash: print(f"Router: {routerKey}") @@ -134,4 +146,4 @@ def printRouterHash(routersHash): print(f"-Ingress:{routersHash[routerKey].ingress[ingressKey].name}") for egressKey in routersHash[routerKey].egress: print(f"-Egress:{routersHash[routerKey].egress[egressKey].name}") - print("") \ No newline at end of file + print("") diff --git a/poetry.lock b/poetry.lock index a5abcfb..07adbef 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,5 +1,76 @@ # This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +[[package]] +name = "black" +version = "24.4.2" +description = "The uncompromising code formatter." +optional = false +python-versions = ">=3.8" +files = [ + {file = "black-24.4.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:dd1b5a14e417189db4c7b64a6540f31730713d173f0b63e55fabd52d61d8fdce"}, + {file = "black-24.4.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8e537d281831ad0e71007dcdcbe50a71470b978c453fa41ce77186bbe0ed6021"}, + {file = "black-24.4.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eaea3008c281f1038edb473c1aa8ed8143a5535ff18f978a318f10302b254063"}, + {file = "black-24.4.2-cp310-cp310-win_amd64.whl", hash = "sha256:7768a0dbf16a39aa5e9a3ded568bb545c8c2727396d063bbaf847df05b08cd96"}, + {file = "black-24.4.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:257d724c2c9b1660f353b36c802ccece186a30accc7742c176d29c146df6e474"}, + {file = "black-24.4.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:bdde6f877a18f24844e381d45e9947a49e97933573ac9d4345399be37621e26c"}, + {file = "black-24.4.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e151054aa00bad1f4e1f04919542885f89f5f7d086b8a59e5000e6c616896ffb"}, + {file = "black-24.4.2-cp311-cp311-win_amd64.whl", hash = "sha256:7e122b1c4fb252fd85df3ca93578732b4749d9be076593076ef4d07a0233c3e1"}, + {file = "black-24.4.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:accf49e151c8ed2c0cdc528691838afd217c50412534e876a19270fea1e28e2d"}, + {file = "black-24.4.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:88c57dc656038f1ab9f92b3eb5335ee9b021412feaa46330d5eba4e51fe49b04"}, + {file = "black-24.4.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:be8bef99eb46d5021bf053114442914baeb3649a89dc5f3a555c88737e5e98fc"}, + {file = "black-24.4.2-cp312-cp312-win_amd64.whl", hash = "sha256:415e686e87dbbe6f4cd5ef0fbf764af7b89f9057b97c908742b6008cc554b9c0"}, + {file = "black-24.4.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:bf10f7310db693bb62692609b397e8d67257c55f949abde4c67f9cc574492cc7"}, + {file = "black-24.4.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:98e123f1d5cfd42f886624d84464f7756f60ff6eab89ae845210631714f6db94"}, + {file = "black-24.4.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:48a85f2cb5e6799a9ef05347b476cce6c182d6c71ee36925a6c194d074336ef8"}, + {file = "black-24.4.2-cp38-cp38-win_amd64.whl", hash = "sha256:b1530ae42e9d6d5b670a34db49a94115a64596bc77710b1d05e9801e62ca0a7c"}, + {file = "black-24.4.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:37aae07b029fa0174d39daf02748b379399b909652a806e5708199bd93899da1"}, + {file = "black-24.4.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:da33a1a5e49c4122ccdfd56cd021ff1ebc4a1ec4e2d01594fef9b6f267a9e741"}, + {file = "black-24.4.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ef703f83fc32e131e9bcc0a5094cfe85599e7109f896fe8bc96cc402f3eb4b6e"}, + {file = "black-24.4.2-cp39-cp39-win_amd64.whl", hash = "sha256:b9176b9832e84308818a99a561e90aa479e73c523b3f77afd07913380ae2eab7"}, + {file = "black-24.4.2-py3-none-any.whl", hash = "sha256:d36ed1124bb81b32f8614555b34cc4259c3fbc7eec17870e8ff8ded335b58d8c"}, + {file = "black-24.4.2.tar.gz", hash = "sha256:c872b53057f000085da66a19c55d68f6f8ddcac2642392ad3a355878406fbd4d"}, +] + +[package.dependencies] +click = ">=8.0.0" +mypy-extensions = ">=0.4.3" +packaging = ">=22.0" +pathspec = ">=0.9.0" +platformdirs = ">=2" +tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} +typing-extensions = {version = ">=4.0.1", markers = "python_version < \"3.11\""} + +[package.extras] +colorama = ["colorama (>=0.4.3)"] +d = ["aiohttp (>=3.7.4)", "aiohttp (>=3.7.4,!=3.9.0)"] +jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"] +uvloop = ["uvloop (>=0.15.2)"] + +[[package]] +name = "click" +version = "8.1.7" +description = "Composable command line interface toolkit" +optional = false +python-versions = ">=3.7" +files = [ + {file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"}, + {file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} + +[[package]] +name = "colorama" +version = "0.4.6" +description = "Cross-platform colored terminal text." +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +files = [ + {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, + {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, +] + [[package]] name = "gurobipy" version = "11.0.1" @@ -32,6 +103,17 @@ files = [ [package.extras] matrixapi = ["numpy", "scipy"] +[[package]] +name = "mypy-extensions" +version = "1.0.0" +description = "Type system extensions for programs checked with the mypy type checker." +optional = false +python-versions = ">=3.5" +files = [ + {file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"}, + {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, +] + [[package]] name = "numpy" version = "1.26.4" @@ -77,6 +159,17 @@ files = [ {file = "numpy-1.26.4.tar.gz", hash = "sha256:2a02aba9ed12e4ac4eb3ea9421c420301a0c6460d9830d74a9df87efa4912010"}, ] +[[package]] +name = "packaging" +version = "24.0" +description = "Core utilities for Python packages" +optional = false +python-versions = ">=3.7" +files = [ + {file = "packaging-24.0-py3-none-any.whl", hash = "sha256:2ddfb553fdf02fb784c234c7ba6ccc288296ceabec964ad2eae3777778130bc5"}, + {file = "packaging-24.0.tar.gz", hash = "sha256:eb82c5e3e56209074766e6885bb04b8c38a0c015d0a30036ebe7ece34c9989e9"}, +] + [[package]] name = "pandas" version = "2.2.1" @@ -150,6 +243,33 @@ sql-other = ["SQLAlchemy (>=2.0.0)", "adbc-driver-postgresql (>=0.8.0)", "adbc-d test = ["hypothesis (>=6.46.1)", "pytest (>=7.3.2)", "pytest-xdist (>=2.2.0)"] xml = ["lxml (>=4.9.2)"] +[[package]] +name = "pathspec" +version = "0.12.1" +description = "Utility library for gitignore style pattern matching of file paths." +optional = false +python-versions = ">=3.8" +files = [ + {file = "pathspec-0.12.1-py3-none-any.whl", hash = "sha256:a0d503e138a4c123b27490a4f7beda6a01c6f288df0e4a8b79c7eb0dc7b4cc08"}, + {file = "pathspec-0.12.1.tar.gz", hash = "sha256:a482d51503a1ab33b1c67a6c3813a26953dbdc71c31dacaef9a838c4e29f5712"}, +] + +[[package]] +name = "platformdirs" +version = "4.2.1" +description = "A small Python package for determining appropriate platform-specific dirs, e.g. a `user data dir`." +optional = false +python-versions = ">=3.8" +files = [ + {file = "platformdirs-4.2.1-py3-none-any.whl", hash = "sha256:17d5a1161b3fd67b390023cb2d3b026bbd40abde6fdb052dfbd3a29c3ba22ee1"}, + {file = "platformdirs-4.2.1.tar.gz", hash = "sha256:031cd18d4ec63ec53e82dceaac0417d218a6863f7745dfcc9efe7793b7039bdf"}, +] + +[package.extras] +docs = ["furo (>=2023.9.10)", "proselint (>=0.13)", "sphinx (>=7.2.6)", "sphinx-autodoc-typehints (>=1.25.2)"] +test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest (>=7.4.3)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)"] +type = ["mypy (>=1.8)"] + [[package]] name = "pyarrow" version = "15.0.2" @@ -248,6 +368,28 @@ files = [ {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"}, ] +[[package]] +name = "tomli" +version = "2.0.1" +description = "A lil' TOML parser" +optional = false +python-versions = ">=3.7" +files = [ + {file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"}, + {file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"}, +] + +[[package]] +name = "typing-extensions" +version = "4.11.0" +description = "Backported and Experimental Type Hints for Python 3.8+" +optional = false +python-versions = ">=3.8" +files = [ + {file = "typing_extensions-4.11.0-py3-none-any.whl", hash = "sha256:c1f94d72897edaf4ce775bb7558d5b79d8126906a14ea5ed1635921406c0387a"}, + {file = "typing_extensions-4.11.0.tar.gz", hash = "sha256:83f085bd5ca59c80295fc2a82ab5dac679cbe02b9f33f7d83af68e241bea51b0"}, +] + [[package]] name = "tzdata" version = "2024.1" @@ -262,4 +404,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "62b22d035c16a21960964be111673919fdd5593cbfd01d7fa19a0ee3a53b3aed" +content-hash = "b71b8aa9f048837574313ae19ea1f44802f75a4a3092ce40aa424b6a9015e921" diff --git a/pyproject.toml b/pyproject.toml index b1bb11b..f66321c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ gurobipy = "^11.0.1" pandas = "^2.2.1" pyarrow = "^15.0.2" python-dotenv = "^1.0.1" +black = "^24.4.2" [tool.poetry.scripts] p6 = "p6.main:main" diff --git a/variables.env-example b/variables.env-example index 4f76fb0..bf832d0 100644 --- a/variables.env-example +++ b/variables.env-example @@ -8,7 +8,8 @@ AVERAGE_CAPACITY=47874 # Data output directory DATA_OUTPUT_DIR=output -RATIO_OUTPUT_DIR=output/ratios +RATIOS_OUTPUT_DIR=output/ratios +OPT_MODELS_OUTPUT_DIR=output/optimization_models # Logging output directory LOGGING_DIR=log