Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Configurable Redis Server + Add Multiple Redis Servers #147

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
fbcedff
Added configs
HayleyO Jun 5, 2024
99657aa
Added drone complex controller
HayleyO Jun 5, 2024
49899ef
Merge pull request #1 from HayleyO/iema_dev_test
HayleyO Jun 5, 2024
e5e4816
Removed KAST from pybullet_config.ini and removed the KAST usage from…
rachaelchertok1 Jun 14, 2024
3acd4be
Made changes to remove KAST and still have drone_state_machine_plugin…
rachaelchertok1 Jun 17, 2024
79f50d9
Fixed merge conflict
rachaelchertok1 Jun 17, 2024
06f6631
Fixed default so it works
HayleyO Jun 18, 2024
d8d1528
Confirmed you can add redis configurations to meta data file parsing …
HayleyO Jun 20, 2024
bbbb0eb
Add unit test
HayleyO Jun 20, 2024
d7e7d1a
Add password to unit tests
HayleyO Jun 20, 2024
0973694
Removed argument name
HayleyO Jun 20, 2024
35237b3
Updated config to have a neater dictionary of redis server configs + …
HayleyO Jun 21, 2024
d4b5fed
Updated unit test
HayleyO Jun 21, 2024
a5f4eaa
Modified Redis adapter to work with multiple servers
rachaelchertok1 Jun 26, 2024
bc28576
Adding multi-server experiment
rachaelchertok1 Jun 27, 2024
f03b346
Merge pull request #2 from HayleyO/Issue_134_Create_Multiple_Redis_Se…
rachaelchertok1 Jul 1, 2024
93b3533
Allow access to OnAir from different directories
HayleyO Jul 16, 2024
e6d8abb
Updates to redis_adapter unit test and to redis_adapter.py to have su…
rachaelchertok1 Jul 30, 2024
259dc53
All unit tests passing and 100% code coverage!
rachaelchertok1 Aug 2, 2024
95c3a47
Removed commented functions from test redis adapter code and removed …
rachaelchertok1 Aug 6, 2024
36bc76b
Merge branch 'main' of https://github.com/HayleyO/OnAir_SummerInterns…
HayleyO Aug 7, 2024
eb8f341
Merge branch 'main' of https://github.com/HayleyO/OnAIR
HayleyO Aug 7, 2024
9013c46
Removing things that seem problematic
HayleyO Aug 7, 2024
30dbdde
Remove one config that we don't need
HayleyO Aug 7, 2024
291e1b3
Merge pull request #3 from HayleyO/unit_testing_134_create_multiple_r…
rachaelchertok1 Aug 7, 2024
c268fb6
Added comments
HayleyO Aug 7, 2024
1eb3f45
Merge branch 'main' of https://github.com/HayleyO/OnAir_SummerInterns…
HayleyO Aug 7, 2024
e32538c
Removing pybullet things
HayleyO Aug 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions __init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import sys
import os

sys.path.append(os.path.abspath(os.path.dirname(__file__)))
24 changes: 19 additions & 5 deletions onair/data/telemetry_configs/redis_example_CONFIG.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,25 @@
}
}
},
"redis_subscriptions": [
"state_0",
"state_1",
"state_2"
],

"redis" : [
{
"address": "localhost",
"port": 6379,
"subscriptions": [
"state_0"
]
},
{
"address": "localhost",
"port": 6380,
"subscriptions": [
"state_1",
"state_2"
]
}
],

"order": [
"time",
"state_0.x",
Expand Down
92 changes: 64 additions & 28 deletions onair/data_handling/redis_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,60 +29,96 @@ class DataSource(OnAirDataSource):

def __init__(self, data_file, meta_file, ss_breakdown = False):
super().__init__(data_file, meta_file, ss_breakdown)
self.address = 'localhost'
self.port = 6379
self.db = 0
self.server = None
self.new_data_lock = threading.Lock()
self.new_data = False
self.servers = []
self.currentData = []
self.currentData.append({'headers':self.order,
'data':list('-' * len(self.order))})
self.currentData.append({'headers':self.order,
'data':list('-' * len(self.order))})
self.double_buffer_read_index = 0
self.connect()
self.subscribe(self.subscriptions)

def connect(self):
"""Establish connection to REDIS server."""
print_msg('Redis adapter connecting to server...')
self.server = redis.Redis(self.address, self.port, self.db)
for idx, server_config in enumerate(self.server_configs):
server_config_keys = server_config.keys()
if 'address' in server_config_keys:
address = server_config['address']
else:
address = 'localhost'

if self.server.ping():
print_msg('... connected!')
if 'port' in server_config_keys:
port = server_config['port']
else:
port = 6379

if 'db' in server_config_keys:
db = server_config['db']
else:
db = 0

def subscribe(self, subscriptions):
"""Subscribe to REDIS message channel(s) and launch listener thread."""
if len(subscriptions) != 0 and self.server.ping():
self.pubsub = self.server.pubsub()
if 'password' in server_config_keys:
password = server_config['password']
else:
password = ''

#if there are subscriptions in this Redis server configuration's subscription key
if len(server_config['subscriptions']) != 0:
#Create the servers and append them to self.servers list
self.servers.append(redis.Redis(address, port, db, password))

for s in subscriptions:
self.pubsub.subscribe(s)
print_msg(f"Subscribing to channel: {s}")
try:
#Ping server to make sure we can connect
self.servers[-1].ping()
print_msg(f'... connected to server # {idx}!')

listen_thread = threading.Thread(target=self.message_listener)
listen_thread.start()
else:
print_msg(f"No subscriptions given!")
#Set up Redis pubsub function for the current server
pubsub = self.servers[-1].pubsub()

for s in server_config['subscriptions']:
pubsub.subscribe(s)
print_msg(f"Subscribing to channel: {s} on server # {idx}")
listen_thread = threading.Thread(target=self.message_listener, args=(pubsub,))
listen_thread.start()

#This except will be hit if self.servers[-1].ping() threw an exception (could not properly ping server)
except:
print_msg(f'Did not connect to server # {idx}. Not setting up subscriptions.', 'RED')

else:
print_msg("No subscriptions given! Redis server not created")

def parse_meta_data_file(self, meta_data_file, ss_breakdown):
self.server_configs = []
configs = extract_meta_data_handle_ss_breakdown(
meta_data_file, ss_breakdown)
meta = parseJson(meta_data_file)
keys = meta.keys()

# Setup redis server configuration
#Checking if 'redis' exists
if 'redis' in keys:
count_server_config = 0
#Checking if dictionaries within 'redis' key each have a 'subscription' key. Error will be thrown if not.
for server_config in meta['redis']:
redis_config_keys = server_config.keys()
if ('subscriptions' in redis_config_keys) == False:
raise ConfigKeyError(f'Config file: \'{meta_data_file}\' ' \
f'missing required key \'subscriptions\' from {count_server_config} in key \'redis\'')
count_server_config +=1

#Saving all of Redis dictionaries from JSON file to self.server_configs
self.server_configs = meta['redis']

if 'order' in keys:
self.order = meta['order']
else:
raise ConfigKeyError(f'Config file: \'{meta_data_file}\' ' \
'missing required key \'order\'')

if 'redis_subscriptions' in meta.keys():
self.subscriptions = meta['redis_subscriptions']
else:
self.subscriptions = []

'missing required key \'order\'')

return configs

def process_data_file(self, data_file):
Expand Down Expand Up @@ -115,9 +151,9 @@ def has_more(self):
"""Live connection should always return True"""
return True

def message_listener(self):
def message_listener(self, pubsub):
"""Loop for listening for messages on channels"""
for message in self.pubsub.listen():
for message in pubsub.listen():
if message['type'] == 'message':
channel_name = f"{message['channel'].decode()}"
# Attempt to load message as json
Expand Down
70 changes: 70 additions & 0 deletions redis-experiment-publisher-multi-server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import redis
import time
import random

# Initialize the Redis connection for server #1
redis_host = "localhost"
redis_port = 6379
# When your Redis server requires a password, fill it in here
redis_password = ""
# Connect to Redis
r1 = redis.Redis(host=redis_host,
port=redis_port,
password=redis_password,
decode_responses=True)

# Initialize the Redis connection for server #2
redis_host = "localhost"
redis_port = 6380 # Make sure you start your redis server with a different port
# When your Redis server requires a password, fill it in here
redis_password = ""
# Connect to Redis
r2 = redis.Redis(host=redis_host,
port=redis_port,
password=redis_password,
decode_responses=True)

# List of channel names
server1_channels = ['state_0']
server2_channels = ['state_1', 'state_2']
# Publish messages on each channel in random order
def publish_messages():
loop_count = 0
inner_loop_count = 0
max_loops = 9
while loop_count < max_loops:
random.shuffle(server1_channels)
for channel in server1_channels:
r1.publish(channel, f'{{"time":{inner_loop_count}, ' \
f'"x":{inner_loop_count+0.1}, ' \
f'"y":{inner_loop_count+0.2}}}')

print(f"Published data to {channel}, " \
f"[{inner_loop_count}, " \
f"{inner_loop_count+0.1}, " \
f"{inner_loop_count+0.2}]")

inner_loop_count += 1
time.sleep(2)

random.shuffle(server2_channels)
for channel in server2_channels:
r2.publish(channel, f'{{"time":{inner_loop_count}, ' \
f'"x":{inner_loop_count+0.1}, ' \
f'"y":{inner_loop_count+0.2}}}')

print(f"Published data to {channel}, " \
f"[{inner_loop_count}, " \
f"{inner_loop_count+0.1}, " \
f"{inner_loop_count+0.2}]")

inner_loop_count += 1
time.sleep(2)

loop_count += 1
print(f"Completed {loop_count} loops")



if __name__ == "__main__":
publish_messages()
Loading
Loading