Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 133 and 134 merge branch #149

Closed
wants to merge 63 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
d8d1528
Confirmed you can add redis configurations to meta data file parsing …
HayleyO Jun 20, 2024
bbbb0eb
Add unit test
HayleyO Jun 20, 2024
d7e7d1a
Add password to unit tests
HayleyO Jun 20, 2024
0973694
Removed argument name
HayleyO Jun 20, 2024
35237b3
Updated config to have a neater dictionary of redis server configs + …
HayleyO Jun 21, 2024
d4b5fed
Updated unit test
HayleyO Jun 21, 2024
a5f4eaa
Modified Redis adapter to work with multiple servers
rachaelchertok1 Jun 26, 2024
bc28576
Adding multi-server experiment
rachaelchertok1 Jun 27, 2024
f03b346
Merge pull request #2 from HayleyO/Issue_134_Create_Multiple_Redis_Se…
rachaelchertok1 Jul 1, 2024
93b3533
Allow access to OnAir from different directories
HayleyO Jul 16, 2024
e6d8abb
Updates to redis_adapter unit test and to redis_adapter.py to have su…
rachaelchertok1 Jul 30, 2024
259dc53
All unit tests passing and 100% code coverage!
rachaelchertok1 Aug 2, 2024
95c3a47
Removed commented functions from test redis adapter code and removed …
rachaelchertok1 Aug 6, 2024
dc58c1b
Allow access to OnAir from different directories
HayleyO Jul 16, 2024
1e82748
Updates to redis_adapter unit test and to redis_adapter.py to have su…
rachaelchertok1 Jul 30, 2024
1f18f50
All unit tests passing and 100% code coverage!
rachaelchertok1 Aug 2, 2024
d377e0a
Removed commented functions from test redis adapter code and removed …
rachaelchertok1 Aug 6, 2024
f146a9c
Merge branch 'main' of https://github.com/nasa/OnAIR into new_new_branch
HayleyO Aug 8, 2024
119ff9f
Merge branch 'unit_testing_134_create_multiple_redis_server_adapter' …
HayleyO Aug 8, 2024
2cad053
Updates to redis_adapter unit test and to redis_adapter.py to have su…
rachaelchertok1 Jul 30, 2024
25a1637
All unit tests passing and 100% code coverage!
rachaelchertok1 Aug 2, 2024
9b17eff
Removed commented functions from test redis adapter code and removed …
rachaelchertok1 Aug 6, 2024
73cc778
Update config file specification format
dragonejt Mar 23, 2024
0eeb92e
enable OPTIONS section to be optional
dragonejt Mar 23, 2024
c2350d0
fix: Remove dev_enabled and viz_enabled, use GitHub default python gi…
dragonejt Jun 21, 2024
22a51a9
example.ini was incorrect, changed to redis_example.ini
asgibson Jun 27, 2024
abf4149
fix: Update redis_example.ini and kalman_csv_output_example.ini to re…
dragonejt Jul 10, 2024
186de9c
fix: revert ExecutionEngine instantiation in tests to use __new__
dragonejt Jul 11, 2024
eba10a8
fix: re-add .gitignore to ignore macOS .DS_Store files
dragonejt Jul 11, 2024
d0f7d14
Add note about github username to contributors section.
the-other-james Jul 15, 2024
eeb9011
starting rework of SBN adapter
the-other-james Sep 22, 2023
4e0f10e
Moved to the correct locations
the-other-james Nov 27, 2023
a53510d
OnAIR is launched by cFS
the-other-james Jan 2, 2024
6829540
SBN adapter actually works
the-other-james Jan 8, 2024
0fa6703
Store the message ID lookup table in the telemetry metadata
the-other-james Feb 6, 2024
98f6c00
Deal with nested structs for cFS messages
the-other-james Feb 7, 2024
b6f0342
Remove prints
the-other-james Feb 7, 2024
46369fb
bug fix for nested message header fields
williamzhang0306 Jul 2, 2024
8c053b2
re working unit tests for sbn_adapter
williamzhang0306 Jul 2, 2024
a11909e
fixed str() overriding of mock object
williamzhang0306 Jul 2, 2024
e143678
slight change to sbn_adapter get_next to be similar to redis_adapter
williamzhang0306 Jul 2, 2024
208ee1c
parse_meta_data_file raises ConfigKeyError if missing 'channels' in json
williamzhang0306 Jul 3, 2024
1eb5fde
potential bugfix for getting fields of nested structs
williamzhang0306 Jul 3, 2024
f5ffa86
80% testing coverage
williamzhang0306 Jul 3, 2024
41c477a
connect() and message_listener_thread() tests
williamzhang0306 Jul 3, 2024
7855e72
additional test for get_current_data
williamzhang0306 Jul 5, 2024
c426c7f
Formatting, removed todo comments
williamzhang0306 Jul 10, 2024
c91cd7f
Removed time field and associated unittest from sbn adapter
williamzhang0306 Jul 10, 2024
9fecbb7
documetation for running with onair draft 1
williamzhang0306 Jul 12, 2024
a3557e7
Minor doc and formatting updates.
williamzhang0306 Jul 15, 2024
8d4109a
Add newline
the-other-james Jul 15, 2024
6e9acd2
Update README.md
asgibson Jul 17, 2024
cffdacb
Remove unused modules (#137)
AlexKurek Jul 29, 2024
a2b35df
Remove unused file (#144)
the-other-james Jul 30, 2024
ad09810
Switch back to csv parser (#143)
the-other-james Jul 30, 2024
a22b8e6
Updated for new config format (#140)
the-other-james Jul 30, 2024
313fe70
Allow access to OnAir from different directories
HayleyO Jul 16, 2024
914064f
Updates to redis_adapter unit test and to redis_adapter.py to have su…
rachaelchertok1 Jul 30, 2024
e16ef06
All unit tests passing and 100% code coverage!
rachaelchertok1 Aug 2, 2024
1cb273f
Removed commented functions from test redis adapter code and removed …
rachaelchertok1 Aug 6, 2024
96a8f83
Merge branch 'unit_testing_134_create_multiple_redis_server_adapter' …
HayleyO Aug 9, 2024
4e3b53b
Merge branch 'Issue_133_and_134_merge_branch' of https://github.com/H…
HayleyO Aug 9, 2024
46c8333
Fixed a weird git merge-caused error where code got repeated (my bad)
HayleyO Aug 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions __init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import sys
import os

sys.path.append(os.path.abspath(os.path.dirname(__file__)))
24 changes: 19 additions & 5 deletions onair/data/telemetry_configs/redis_example_CONFIG.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,25 @@
}
}
},
"redis_subscriptions": [
"state_0",
"state_1",
"state_2"
],

"redis" : [
{
"address": "localhost",
"port": 6379,
"subscriptions": [
"state_0"
]
},
{
"address": "localhost",
"port": 6380,
"subscriptions": [
"state_1",
"state_2"
]
}
],

"order": [
"time",
"state_0.x",
Expand Down
94 changes: 65 additions & 29 deletions onair/data_handling/redis_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,60 +29,96 @@ class DataSource(OnAirDataSource):

def __init__(self, data_file, meta_file, ss_breakdown = False):
super().__init__(data_file, meta_file, ss_breakdown)
self.address = 'localhost'
self.port = 6379
self.db = 0
self.server = None
self.new_data_lock = threading.Lock()
self.new_data = False
self.servers = []
self.currentData = []
self.currentData.append({'headers':self.order,
'data':list('-' * len(self.order))})
self.currentData.append({'headers':self.order,
'data':list('-' * len(self.order))})
self.double_buffer_read_index = 0
self.connect()
self.subscribe(self.subscriptions)

def connect(self):
"""Establish connection to REDIS server."""
print_msg('Redis adapter connecting to server...')
self.server = redis.Redis(self.address, self.port, self.db)
for idx, server_config in enumerate(self.server_configs):
server_config_keys = server_config.keys()
if 'address' in server_config_keys:
address = server_config['address']
else:
address = 'localhost'

if self.server.ping():
print_msg('... connected!')
if 'port' in server_config_keys:
port = server_config['port']
else:
port = 6379

if 'db' in server_config_keys:
db = server_config['db']
else:
db = 0

def subscribe(self, subscriptions):
"""Subscribe to REDIS message channel(s) and launch listener thread."""
if len(subscriptions) != 0 and self.server.ping():
self.pubsub = self.server.pubsub()
if 'password' in server_config_keys:
password = server_config['password']
else:
password = ''

#if there are subscriptions in this Redis server configuration's subscription key
if len(server_config['subscriptions']) != 0:
#Create the servers and append them to self.servers list
self.servers.append(redis.Redis(address, port, db, password))

for s in subscriptions:
self.pubsub.subscribe(s)
print_msg(f"Subscribing to channel: {s}")
try:
#Ping server to make sure we can connect
self.servers[-1].ping()
print_msg(f'... connected to server # {idx}!')

listen_thread = threading.Thread(target=self.message_listener)
listen_thread.start()
else:
print_msg(f"No subscriptions given!")
#Set up Redis pubsub function for the current server
pubsub = self.servers[-1].pubsub()

for s in server_config['subscriptions']:
pubsub.subscribe(s)
print_msg(f"Subscribing to channel: {s} on server # {idx}")
listen_thread = threading.Thread(target=self.message_listener, args=(pubsub,))
listen_thread.start()

#This except will be hit if self.servers[-1].ping() threw an exception (could not properly ping server)
except:
print_msg(f'Did not connect to server # {idx}. Not setting up subscriptions.', 'RED')

else:
print_msg("No subscriptions given! Redis server not created")

def parse_meta_data_file(self, meta_data_file, ss_breakdown):
self.server_configs = []
configs = extract_meta_data_handle_ss_breakdown(
meta_data_file, ss_breakdown)
meta = parseJson(meta_data_file)
keys = meta.keys()

# Setup redis server configuration
#Checking if 'redis' exists
if 'redis' in keys:
count_server_config = 0
#Checking if dictionaries within 'redis' key each have a 'subscription' key. Error will be thrown if not.
for server_config in meta['redis']:
redis_config_keys = server_config.keys()
if ('subscriptions' in redis_config_keys) == False:
raise ConfigKeyError(f'Config file: \'{meta_data_file}\' ' \
f'missing required key \'subscriptions\' from {count_server_config} in key \'redis\'')
count_server_config +=1

#Saving all of Redis dictionaries from JSON file to self.server_configs
self.server_configs = meta['redis']

if 'order' in keys:
self.order = meta['order']
else:
raise ConfigKeyError(f'Config file: \'{meta_data_file}\' ' \
'missing required key \'order\'')

if 'redis_subscriptions' in meta.keys():
self.subscriptions = meta['redis_subscriptions']
else:
self.subscriptions = []

'missing required key \'order\'')

return configs

def process_data_file(self, data_file):
Expand Down Expand Up @@ -115,9 +151,9 @@ def has_more(self):
"""Live connection should always return True"""
return True

def message_listener(self):
def message_listener(self, pubsub):
"""Loop for listening for messages on channels"""
for message in self.pubsub.listen():
for message in pubsub.listen():
if message['type'] == 'message':
channel_name = f"{message['channel'].decode()}"
# Attempt to load message as json
Expand Down Expand Up @@ -173,4 +209,4 @@ def message_listener(self):
print_msg("Redis subscription listener exited.", ['WARNING'])

def has_data(self):
return self.new_data
return self.new_data
70 changes: 70 additions & 0 deletions redis-experiment-publisher-multi-server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import redis
import time
import random

# Initialize the Redis connection for server #1
redis_host = "localhost"
redis_port = 6379
# When your Redis server requires a password, fill it in here
redis_password = ""
# Connect to Redis
r1 = redis.Redis(host=redis_host,
port=redis_port,
password=redis_password,
decode_responses=True)

# Initialize the Redis connection for server #1
redis_host = "localhost"
redis_port = 6380
# When your Redis server requires a password, fill it in here
redis_password = ""
# Connect to Redis
r2 = redis.Redis(host=redis_host,
port=redis_port,
password=redis_password,
decode_responses=True)

# List of channel names
server1_channels = ['state_0']
server2_channels = ['state_1', 'state_2']
# Publish messages on each channel in random order
def publish_messages():
loop_count = 0
inner_loop_count = 0
max_loops = 9
while loop_count < max_loops:
random.shuffle(server1_channels)
for channel in server1_channels:
r1.publish(channel, f'{{"time":{inner_loop_count}, ' \
f'"x":{inner_loop_count+0.1}, ' \
f'"y":{inner_loop_count+0.2}}}')

print(f"Published data to {channel}, " \
f"[{inner_loop_count}, " \
f"{inner_loop_count+0.1}, " \
f"{inner_loop_count+0.2}]")

inner_loop_count += 1
time.sleep(2)

random.shuffle(server2_channels)
for channel in server2_channels:
r2.publish(channel, f'{{"time":{inner_loop_count}, ' \
f'"x":{inner_loop_count+0.1}, ' \
f'"y":{inner_loop_count+0.2}}}')

print(f"Published data to {channel}, " \
f"[{inner_loop_count}, " \
f"{inner_loop_count+0.1}, " \
f"{inner_loop_count+0.2}]")

inner_loop_count += 1
time.sleep(2)

loop_count += 1
print(f"Completed {loop_count} loops")



if __name__ == "__main__":
publish_messages()
Loading
Loading