diff --git a/Dockerfile b/Dockerfile index c16308e..2977a89 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,23 +1,20 @@ -FROM python:2.7 - - -RUN apt-get update && \ - apt-get install -y \ - build-essential \ - git \ - libxerces-c-dev - -RUN mkdir -p /opt -RUN (cd /opt; git clone https://github.com/radiganm/sumo.git) -RUN (cd /opt/sumo; ./configure) -RUN (cd /opt/sumo; make) -RUN (cd /opt/sumo; make install) - -ENV SUMO_HOME /opt/sumo -# First cache dependencies -ADD ./setup.py /app/setup.py -RUN python /app/setup.py install -# Add sources +FROM starofall/crowdnav + ADD ./ /app/ + +# WORKDIR /app + +# COPY . /app +RUN pip install flask + +COPY ./start.sh /app + +# WORKDIR /app + +EXPOSE 5000 + WORKDIR /app -CMD ["python","/app/forever.py"] + +RUN chmod 777 ./start.sh + +CMD ["bash", "start.sh"] diff --git a/app/Boot.py b/app/Boot.py index 8c79d30..fe07f62 100644 --- a/app/Boot.py +++ b/app/Boot.py @@ -8,12 +8,12 @@ from app.routing.CustomRouter import CustomRouter from app.network.Network import Network from app.simulation.Simulation import Simulation -from streaming import RTXForword +from app.streaming import RTXForword from colorama import Fore -from sumo import SUMOConnector, SUMODependency -import Config +from app.sumo import SUMOConnector, SUMODependency +from app import Config import traci, sys, os -import thread +#import thread import time diff --git a/app/HTTPServer/__init__.py b/app/HTTPServer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/HTTPServer/endpoints.py b/app/HTTPServer/endpoints.py new file mode 100644 index 0000000..3bc20a6 --- /dev/null +++ b/app/HTTPServer/endpoints.py @@ -0,0 +1,60 @@ +from flask import Flask, jsonify, request +import json + + +def get_monitor(): + file_path = "./monitor_data.json" + + try: + with open(file_path, 'r') as json_file: + data = json.load(json_file) + return data + + except FileNotFoundError: + raise FileNotFoundError("Monitor data not found") + + except Exception as e: + raise e + + +# Construct the path to the knobs.json file, assuming it's two directories back +json_file_path = './knobs.json' + +# Open the JSON file in read mode ('r') +with open(json_file_path, 'r') as json_file: + # Use json.load() to read data from the file + adaptation_options_data = json.load(json_file) + + # Now 'data' contains the contents of the JSON file as a Python dictionary + print(adaptation_options_data) + + + + +def getexecute(): + try: + # Get the JSON data from the request body + request_data = request.get_json() + + # Check if the required adaptation fields are present in the request data + required_fields = ["routeRandomSigma", "explorationPercentage", "maxSpeedAndLengthFactor", + "averageEdgeDurationFactor", "freshnessUpdateFactor", "freshnessCutOffValue", + "reRouteEveryTicks"] + if not all(field in request_data for field in required_fields): + return jsonify({"error": "Missing required fields in the adaptation request"}), 400 + + # Implement your adaptation logic here based on the received data + # For demonstration, we'll print the received data + print('Received adaptation data: {request_data}') + + # Return a response indicating the success of the adaptation + return jsonify({"message": "Adaptation executed successfully"}) + + except Exception as e: + # Handle exceptions and return an error response + return jsonify({"error": str(e)}), 500 + +def getAdaptationOptions(): + return adaptation_options_data + + diff --git a/app/HTTPServer/main.py b/app/HTTPServer/main.py new file mode 100644 index 0000000..7a16280 --- /dev/null +++ b/app/HTTPServer/main.py @@ -0,0 +1,113 @@ +from flask import Flask, jsonify +from flask.views import MethodView +import json +from endpoints import get_monitor + +app = Flask(__name__) + +knobs_path = '../../knobs.json' + + +def read_knobs(): + """Read the knobs.json file.""" + try: + with open(knobs_path, 'r') as file: + knobs_data = json.load(file) + return knobs_data + except FileNotFoundError: + return None + + +def write_knobs(data): + """Write data to the knobs.json file.""" + with open(knobs_path, 'w') as file: + json.dump(data, file, indent=2) + + +class MonitorAPI(MethodView): + def get(self): + try: + file_path = "app/HTTPServer/monitor_data.json" + with open(file_path, "r") as json_file: + data = json.load(json_file) + return jsonify(data) + except FileNotFoundError: + return jsonify({'error': 'monitor_data.json not found'}), 404 + + +app.add_url_rule('/monitor', view_func=MonitorAPI.as_view('monitor')) + + +@app.route('/execute', methods=['PUT']) +def execute_adaptation(): + data = get_execute() + return jsonify(data) + + +class MonitorSchemaAPI(MethodView): + def get(self): + schema = { + 'vehicle_count': 'number', + 'avg_trip_duration': 'number', + 'total_trips': 'number', + 'avg_trip_overhead': 'number' + } + return jsonify(schema) + + +app.add_url_rule('/monitor_schema', view_func=MonitorSchemaAPI.as_view('monitor_schema')) + + +class AdaptationOptionsSchemaAPI(MethodView): + def get(self): + schema = { + 'type': 'object', + 'properties': { + 'routeRandomSigma': { + 'type': 'number', + 'description': 'The randomization sigma of edge weights' + }, + 'explorationPercentage': { + 'type': 'number', + 'description': 'The percentage of routes used for exploration' + }, + 'maxSpeedAndLengthFactor': { + 'type': 'integer', + 'description': 'How much the length/speed influences the routing' + }, + 'averageEdgeDurationFactor': { + 'type': 'integer', + 'description': 'How much the average edge factor influences the routing' + }, + 'freshnessUpdateFactor': { + 'type': 'integer', + 'description': 'How much the freshness update factor influences the routing' + }, + 'freshnessCutOffValue': { + 'type': 'integer', + 'description': 'If data is older than this, it is not considered in the algorithm' + }, + 'reRouteEveryTicks': { + 'type': 'integer', + 'description': 'Check for a new route every x times after the car starts' + }, + # Add more properties as needed + }, + 'required': [ + 'routeRandomSigma', + 'explorationPercentage', + 'maxSpeedAndLengthFactor', + 'averageEdgeDurationFactor', + 'freshnessUpdateFactor', + 'freshnessCutOffValue', + 'reRouteEveryTicks' + ], + } + return jsonify(schema) + + +app.add_url_rule('/adaptation_options_schema', view_func=AdaptationOptionsSchemaAPI.as_view('adaptation_options_schema')) + + +if __name__ == '__main__': + app.run(host='0.0.0.0') diff --git a/app/HTTPServer/monitor_data.json b/app/HTTPServer/monitor_data.json new file mode 100644 index 0000000..8972566 --- /dev/null +++ b/app/HTTPServer/monitor_data.json @@ -0,0 +1,14 @@ +{ + "step":1, + "EdgeID": 2, + "EdgeDensity":{ + "edge1": 10, + "edge2": 8, + "edge3": 15 + }, + "totalCarCounter":2, + "carIndexCounter": 1, + "totalTrips": 1, + "totalTripAverage": 4, + "totalTripOverheadAverage": 44 +} \ No newline at end of file diff --git a/app/HTTPServer/readme.md b/app/HTTPServer/readme.md new file mode 100644 index 0000000..8e7686e --- /dev/null +++ b/app/HTTPServer/readme.md @@ -0,0 +1,11 @@ +In order to run the HTTP server: + +pip install fastapi +pip install "uvicorn[standard]" +python -m uvicorn main:app --reload + + +after building the docker image: + +docker run -p 3000:5000 -it --link kafka:kafka +"then access the port 3000" \ No newline at end of file diff --git a/app/Strategy/strategy.py b/app/Strategy/strategy.py new file mode 100644 index 0000000..f1c2ab7 --- /dev/null +++ b/app/Strategy/strategy.py @@ -0,0 +1,49 @@ +import traci +import csv +from statistics import mean + +##need to implement this onto upisas + +class CrowdNavAdaptationStrategy: + def __init__(self, rerouting_threshold=0.4): + self.rerouting_threshold = rerouting_threshold + self.edge_density = {} + + def monitor(self): + #this should be retrived from the endpoints + for edge_id in traci.edge.getIDList(): + vehicle_ids = traci.edge.getLastStepVehicleIDs(edge_id) + density = len(vehicle_ids) + self.edge_density[edge_id] = density + + def analyze(self): + overloaded_streets = [] + for edge_id, density in self.edge_density.items(): + mean_density = mean(density) + if mean_density > self.rerouting_threshold: + overloaded_streets.append(edge_id) + return overloaded_streets + + def plan(self, overloaded_streets): + avoid_streets_signal = [] + for _ in range(traci.simulation.getDeltaT()): + signal_value = 0 if traci.simulation.getCurrentEdgeID() in overloaded_streets else 1 + avoid_streets_signal.append(signal_value) + return avoid_streets_signal + + def execute(self, avoid_streets_signal): + if 0 in avoid_streets_signal: + print("Sending signal to avoid overloaded streets!") + with open('signal.target', 'w') as signal_file: + signal_writer = csv.writer(signal_file, dialect='excel') + signal_writer.writerow(avoid_streets_signal) + + def update_knowledge_base(self): + # You can update the knowledge base based on the feedback from the simulation. + # This might involve adjusting thresholds, exploring different rerouting strategies, or updating other parameters. + pass + + +# Example usage +strategy = CrowdNavAdaptationStrategy() +strategy.run_strategy() diff --git a/app/simulation/Simulation.py b/app/simulation/Simulation.py index afc44f2..0416e76 100644 --- a/app/simulation/Simulation.py +++ b/app/simulation/Simulation.py @@ -11,7 +11,7 @@ from app.logging import info from app.routing.CustomRouter import CustomRouter from app.streaming import RTXConnector -import time +import time , random # get the current system time from app.routing.RoutingEdge import RoutingEdge @@ -25,6 +25,8 @@ class Simulation(object): # the current tick of the simulation tick = 0 + edge_density = {} + # last tick time lastTick = current_milli_time() @@ -32,7 +34,7 @@ class Simulation(object): def applyFileConfig(cls): """ reads configs from a json and applies it at realtime to the simulation """ try: - config = json.load(open('./knobs.json')) + config = json.load(open('../../knobs.json')) CustomRouter.explorationPercentage = config['explorationPercentage'] CustomRouter.averageEdgeDurationFactor = config['averageEdgeDurationFactor'] CustomRouter.maxSpeedAndLengthFactor = config['maxSpeedAndLengthFactor'] @@ -41,7 +43,7 @@ def applyFileConfig(cls): CustomRouter.reRouteEveryTicks = config['reRouteEveryTicks'] except: pass - + @classmethod def start(cls): """ start the simulation """ @@ -50,7 +52,18 @@ def start(cls): cls.applyFileConfig() CarRegistry.applyCarCounter() cls.loop() - + + + ##Find's the density on the first 30 Edges{For Demonstartion Purpose, you can delete the } + @classmethod + def monitor_edge_density(cls): + """ Monitor and update the density of vehicles on specific edges """ + cls.edge_density.clear() # Clear the existing data + edge_ids = traci.edge.getIDList()[:30] # Get the first 30 edge IDs + for edge_id in edge_ids: + density = traci.edge.getLastStepVehicleNumber(edge_id) + cls.edge_density[edge_id] = density + @classmethod # @profile def loop(cls): @@ -69,7 +82,7 @@ def loop(cls): msg = dict() msg["duration"] = duration RTXForword.publish(msg, Config.kafkaTopicPerformance) - + # Check for removed cars and re-add them into the system for removedCarId in traci.simulation.getSubscriptionResults()[122]: CarRegistry.findById(removedCarId).setArrived(cls.tick) @@ -81,6 +94,9 @@ def loop(cls): msg = dict() msg["duration"] = current_milli_time() - timeBeforeCarProcess RTXForword.publish(msg, Config.kafkaTopicRouting) + + # Monitor vehicle density for each edge + cls.monitor_edge_density() # if we enable this we get debug information in the sumo-gui using global traveltime # should not be used for normal running, just for debugging @@ -127,15 +143,35 @@ def loop(cls): if "edge_average_influence" in newConf: RoutingEdge.edgeAverageInfluence = newConf["edge_average_influence"] print("setting edgeAverageInfluence: " + str(newConf["edge_average_influence"])) + # print status update if we are not running in parallel mode if (cls.tick % 100) == 0 and Config.parallelMode is False: print(str(Config.processID) + " -> Step:" + str(cls.tick) + " # Driving cars: " + str( - traci.vehicle.getIDCount()) + "/" + str( + traci.vehicle.getIDCount()) + "EdgeID" + str(random.choice(traci.edge.getIDList()[:30]))+ + "EdgeDensity" + str(cls.monitor_edge_density()) +"/" + str( CarRegistry.totalCarCounter) + " # avgTripDuration: " + str( CarRegistry.totalTripAverage) + "(" + str( CarRegistry.totalTrips) + ")" + " # avgTripOverhead: " + str( CarRegistry.totalTripOverheadAverage)) + + + # Write monitoring data to json file + file_path = "app/HTTPServer/monitor_data.json" + data = { + 'step': str(Config.processID), + 'currentCars': traci.vehicle.getIDCount(), + 'EdgeID': random.choice(traci.edge.getIDList()[:30]), + 'EdgeDensity': cls.monitor_edge_density(), + 'totalCarCounter': CarRegistry.totalCarCounter, + 'carIndexCounter': CarRegistry.carIndexCounter, + 'totalTrips': CarRegistry.totalTrips, + 'totalTripAverage': CarRegistry.totalTripAverage, + 'totalTripOverheadAverage': CarRegistry.totalTripOverheadAverage + } + with open(file_path, 'w') as json_file: + json.dump(data, json_file) + # @depricated -> will be removed # # if we are in paralllel mode we end the simulation after 10000 ticks with a result output diff --git a/app/strategy.py b/app/strategy.py new file mode 100644 index 0000000..f1c2ab7 --- /dev/null +++ b/app/strategy.py @@ -0,0 +1,49 @@ +import traci +import csv +from statistics import mean + +##need to implement this onto upisas + +class CrowdNavAdaptationStrategy: + def __init__(self, rerouting_threshold=0.4): + self.rerouting_threshold = rerouting_threshold + self.edge_density = {} + + def monitor(self): + #this should be retrived from the endpoints + for edge_id in traci.edge.getIDList(): + vehicle_ids = traci.edge.getLastStepVehicleIDs(edge_id) + density = len(vehicle_ids) + self.edge_density[edge_id] = density + + def analyze(self): + overloaded_streets = [] + for edge_id, density in self.edge_density.items(): + mean_density = mean(density) + if mean_density > self.rerouting_threshold: + overloaded_streets.append(edge_id) + return overloaded_streets + + def plan(self, overloaded_streets): + avoid_streets_signal = [] + for _ in range(traci.simulation.getDeltaT()): + signal_value = 0 if traci.simulation.getCurrentEdgeID() in overloaded_streets else 1 + avoid_streets_signal.append(signal_value) + return avoid_streets_signal + + def execute(self, avoid_streets_signal): + if 0 in avoid_streets_signal: + print("Sending signal to avoid overloaded streets!") + with open('signal.target', 'w') as signal_file: + signal_writer = csv.writer(signal_file, dialect='excel') + signal_writer.writerow(avoid_streets_signal) + + def update_knowledge_base(self): + # You can update the knowledge base based on the feedback from the simulation. + # This might involve adjusting thresholds, exploring different rerouting strategies, or updating other parameters. + pass + + +# Example usage +strategy = CrowdNavAdaptationStrategy() +strategy.run_strategy() diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..5355cb4 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,11 @@ +version: '3' +services: + app: + image: crowdnav:latest #Image name change it accordingly + depends_on: + - Upisas + - kafka + Upisas: + image: upisasstudents:latest #Image name + kafka: + image: spotify/kafka:latest #image name diff --git a/read_data.py b/read_data.py new file mode 100644 index 0000000..ce1c244 --- /dev/null +++ b/read_data.py @@ -0,0 +1,33 @@ +import threading +import time +# import os , sys +# package_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) + +# # Add the parent directory to the Python path +# sys.path.append(package_path) +from app.simulation.Simulation import Simulation +from app.streaming import RTXConnector + + + +def read_data_thread(thread_id): + for i in range(5): + print('Thread', {thread_id}) + time.sleep(1) + +if __name__ == "__main__": + threads = [] + + # Create and start multiple threads + newConf = RTXConnector.checkForNewConfiguration() + print(RTXConnector) + for i in range(3): + thread = threading.Thread(target=read_data_thread, args=(i,)) + thread.start() + threads.append(thread) + + # Wait for all threads to finish + for thread in threads: + thread.join() + + print("All threads have finished reading data.") diff --git a/start.sh b/start.sh new file mode 100644 index 0000000..5fbc556 --- /dev/null +++ b/start.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +echo "Docker started." +python forever.py & +python /app/app/HTTPServer/main.py \ No newline at end of file