Skip to content

Commit

Permalink
Remove bitbots deps, cleanup and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Flova committed Jan 16, 2024
1 parent 08f9143 commit b6547f7
Show file tree
Hide file tree
Showing 10 changed files with 536 additions and 167 deletions.
16 changes: 8 additions & 8 deletions game_controller_humanoid/game_controller_humanoid/gamestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

Short = Int16ul

RobotInfo = "robot_info" / Struct(
RobotInfoStruct = "robot_info" / Struct(
# define NONE 0
# define PENALTY_HL_KID_BALL_MANIPULATION 1
# define PENALTY_HL_KID_PHYSICAL_CONTACT 2
Expand All @@ -23,7 +23,7 @@
"goalkeeper" / Flag
)

TeamInfo = "team" / Struct(
TeamInfoStruct = "team" / Struct(
"team_number" / Byte,
"team_color" / Enum(Byte,
BLUE=0,
Expand All @@ -42,11 +42,11 @@
"single_shots" / Short, # bits represent penalty shot success
"coach_sequence" / Byte,
"coach_message" / PaddedString(253, 'utf8'),
"coach" / RobotInfo,
"players" / Array(11, RobotInfo)
"coach" / RobotInfoStruct,
"players" / Array(11, RobotInfoStruct)
)

GameState = "gamedata" / Struct(
GameStateStruct = "gamedata" / Struct(
"header" / Const(b'RGme'),
"version" / Const(12, Short),
"packet_number" / Byte,
Expand Down Expand Up @@ -84,14 +84,14 @@
"drop_in_time" / Short,
"seconds_remaining" / Int16sl,
"secondary_seconds_remaining" / Int16sl,
"teams" / Array(2, "team" / TeamInfo)
"teams" / Array(2, "team" / TeamInfoStruct)
)

GAME_CONTROLLER_RESPONSE_VERSION = 2

ReturnData = Struct(
ResponseStruct = Struct(
"header" / Const(b"RGrt"),
"version" / Const(2, Byte),
"version" / Const(GAME_CONTROLLER_RESPONSE_VERSION, Byte),
"team" / Byte,
"player" / Byte,
"message" / Byte
Expand Down
297 changes: 147 additions & 150 deletions game_controller_humanoid/game_controller_humanoid/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import socket
import time
from typing import Any
import rclpy

from construct import Container, ConstError
from rclpy import logging
from construct import ConstError
from rclpy.node import Node
from std_msgs.msg import Bool
from game_controller_humanoid.gamestate import GameState, ReturnData, GAME_CONTROLLER_RESPONSE_VERSION
from bitbots_msgs.msg import GameState as GameStateMsg
from bitbots_utils.utils import get_parameters_from_other_node
from rclpy.time import Time
from rclpy.duration import Duration
from std_msgs.msg import Header
from construct import Container
from game_controller_humanoid.gamestate import GameStateStruct, ResponseStruct
from game_controller_humanoid.utils import get_parameters_from_other_node

logger = logging.get_logger('game_controller_humanoid')
from game_controller_humanoid_interfaces.msg import GameState


class GameStateReceiver(Node):
Expand All @@ -37,178 +37,175 @@ class GameStateReceiver(Node):
After this we send a package back to the GC """

def __init__(self):
super().__init__('game_controller', automatically_declare_parameters_from_overrides=True)

params = get_parameters_from_other_node(self, "parameter_blackboard", ['team_id', 'bot_id'])
self.team_number = params['team_id']
self.player_number = params['bot_id']
logger.info('We are playing as player {} in team {}'.format(self.player_number, self.team_number))

self.state_publisher = self.create_publisher(GameStateMsg, 'gamestate', 1)

self.man_penalize = False
def __init__(self, *args, **kwargs):
super().__init__('game_controller', *args, **kwargs, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True)

# Check if we have the team and bot id parameters or if we should get them from the blackboard
if self.has_parameter('team_id') and self.has_parameter('bot_id'):
self.get_logger().info('Found team_id and bot_id parameter, using them')
# Get the parameters from our node
self.team_number = self.get_parameter('team_id').value
self.player_number = self.get_parameter('bot_id').value
else:
self.get_logger().info('No team_id and bot_id parameter set in game_controller, getting them from blackboard')
# Get the parameter names from the parameter server
param_blackboard_name: str = self.get_parameter('parameter_blackboard_name').value
team_id_param_name: str = self.get_parameter('team_id_param_name').value
bot_id_param_name: str = self.get_parameter('bot_id_param_name').value
# Get the parameters from the blackboard
params = get_parameters_from_other_node(self, param_blackboard_name, [
team_id_param_name,
bot_id_param_name])
# Set the parameters
self.team_number = params[team_id_param_name]
self.player_number = params[bot_id_param_name]

self.get_logger().info(f'We are playing as player {self.player_number} in team {self.team_number}')

# The publisher for the game state
self.state_publisher = self.create_publisher(GameState, 'gamestate', 1)

# The time in seconds after which we assume the game controller is lost
# and we tell the robot to move
self.game_controller_lost_time = 20
self.game_controller_connected_publisher = self.create_publisher(Bool, 'game_controller_connected', 1)

# The address listening on and the port for sending back the robots meta data
listen_host = self.get_parameter('listen_host').value
listen_port = self.get_parameter('listen_port').value
self.addr = (listen_host, listen_port)
self.addr = (
self.get_parameter('listen_host').value,
self.get_parameter('listen_port').value
)
self.answer_port = self.get_parameter('answer_port').value

# The state and time we received last form the GC
self.state = None
self.time = time.time()
# The time of the last package
self.last_package_time: Time = self.get_clock().now()

# The socket and whether it is still running
self.socket = None
self.running = True
# Create the socket we want to use for the communications
self.socket = self._open_socket()

self._open_socket()

def _open_socket(self):
def _open_socket(self) -> socket.socket:
""" Creates the socket """
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.addr)
self.socket.settimeout(2)
self.socket2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.socket2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

def receive_forever(self):
""" Waits in a loop that is terminated by setting self.running = False """
while True:
try:
self.receive_once()
except IOError as e:
logger.warn("Error while sending keepalive: " + str(e))

def receive_once(self):
""" Receives a package and interprets it.
Calls :func:`on_new_gamestate`
Sends an answer to the GC """
""" Waits in a loop for new packages """
while rclpy.ok():
# Try to receive a package
self.receive_and_answer_once()
# Check if we didn't receive a package for a long time and if so
# call the fallback behavior
if self.get_time_since_last_package() > Duration(seconds=self.game_controller_lost_time):
self.fallback_behavior()

def receive_and_answer_once(self):
""" Receives a package, interprets it and sends an answer. """
try:
data, peer = self.socket.recvfrom(GameState.sizeof())
# Receive the package
data, peer = self.socket.recvfrom(GameStateStruct.sizeof())

# Throws a ConstError if it doesn't work
parsed_state = GameState.parse(data)
# Parse the package based on the GameStateStruct
# This throws a ConstError if it doesn't work
parsed_state = GameStateStruct.parse(data)

# Assign the new package after it parsed successful to the state
self.state = parsed_state
self.time = time.time()

# Publish that game controller received message
msg = Bool()
msg.data = True
self.game_controller_connected_publisher.publish(msg)
self.last_package_time = self.get_clock().now()

# Call the handler for the package
self.on_new_gamestate(self.state)
# Build the game state message and publish it
self.state_publisher.publish(self.build_game_state_msg(parsed_state))

# Answer the GameController
self.answer_to_gamecontroller(peer)

except AssertionError as ae:
logger.error(ae)
self.get_logger().error(ae)
except socket.timeout:
logger.info("No GameController message received (socket timeout)", throttle_duration_sec=5)
self.get_logger().info("No GameController message received (socket timeout)", throttle_duration_sec=5)
except ConstError:
logger.warn("Parse Error: Probably using an old protocol!")
finally:
if self.get_time_since_last_package() > self.game_controller_lost_time:
self.time += 5 # Resend message every five seconds
logger.info("No GameController message received, allowing robot to move", throttle_duration_sec=5)
msg = GameStateMsg()
msg.game_state = 3 # PLAYING
self.state_publisher.publish(msg)
msg2 = Bool()
msg2.data = False
self.game_controller_connected_publisher.publish(msg2)
self.get_logger().warn("Parse Error: Probably using an old protocol!")
except IOError as e:
self.get_logger().warn(f"Error while sending keep-alive: {str(e)}")

def fallback_behavior(self):
"""
This is called if we didn't receive a package for a long time.
It tells the robot to play anyway.
"""
self.get_logger().info("No GameController message received, allowing robot to move anyway", throttle_duration_sec=5)
self.state_publisher.publish(
GameState(
header=Header(stamp=self.get_clock().now().to_msg()),
game_state = GameState.GAMESTATE_PLAYING
)
)

def answer_to_gamecontroller(self, peer):
""" Sends a life sign to the game controller """
return_message = 0 if self.man_penalize else 2

data = Container(header=b"RGrt",
version=GAME_CONTROLLER_RESPONSE_VERSION,
team=self.team_number,
player=self.player_number,
message=return_message)
# Build the answer package
data = ResponseStruct.build(dict(
team=self.team_number,
player=self.player_number,
message=2))
# Send the package
self.get_logger().debug(f'Sending answer to {peer[0]}:{self.answer_port}')
try:
destination = peer[0], self.answer_port
logger.debug('Sending answer to {} port {}'.format(destination[0], destination[1]))
self.socket.sendto(ReturnData.build(data), destination)
self.socket.sendto(data, (peer[0], self.answer_port))
except Exception as e:
logger.error("Network Error: %s" % str(e))

def on_new_gamestate(self, state):
""" Is called with the new game state after receiving a package.
The information is processed and published as a standard message to a ROS topic.
:param state: Game State
"""

is_own_team = lambda number: number == self.team_number
own_team = self.select_team_by(is_own_team, state.teams)

is_not_own_team = lambda number: number != self.team_number
rival_team = self.select_team_by(is_not_own_team, state.teams)

if not own_team or not rival_team:
logger.error('Team {} not playing, only {} and {}'.format(self.team_number, state.teams[0].team_number,
state.teams[1].team_number))
return

try:
me = own_team.players[self.player_number - 1]
except IndexError:
logger.error('Robot {} not playing'.format(self.player_number))
return

msg = GameStateMsg()
msg.header.stamp = self.get_clock().now().to_msg()
msg.game_state = state.game_state.intvalue
msg.secondary_state = state.secondary_state.intvalue
msg.secondary_state_mode = state.secondary_state_info[1]
msg.first_half = state.first_half
msg.own_score = own_team.score
msg.rival_score = rival_team.score
msg.seconds_remaining = state.seconds_remaining
msg.secondary_seconds_remaining = state.secondary_seconds_remaining
msg.has_kick_off = state.kick_of_team == self.team_number
msg.penalized = me.penalty != 0
msg.seconds_till_unpenalized = me.secs_till_unpenalized
msg.secondary_state_team = state.secondary_state_info[0]
msg.secondary_state_mode = state.secondary_state_info[1]
msg.team_color = own_team.team_color.intvalue
msg.drop_in_team = state.drop_in_team
msg.drop_in_time = state.drop_in_time
msg.penalty_shot = own_team.penalty_shot
msg.single_shots = own_team.single_shots
msg.coach_message = own_team.coach_message
penalties = []
red_cards = []
for i in range(6):
penalties.append(own_team.players[i].penalty != 0)
red_cards.append(own_team.players[i].number_of_red_cards != 0)
msg.team_mates_with_penalty = penalties
msg.team_mates_with_red_card = red_cards
self.state_publisher.publish(msg)

def get_last_state(self):
return self.state, self.time

def get_time_since_last_package(self):
return time.time() - self.time

def stop(self):
self.running = False

def set_manual_penalty(self, flag):
self.man_penalize = flag

def select_team_by(self, predicate, teams):
selected = [team for team in teams if predicate(team.team_number)]
self.get_logger().error(f"Network Error: {str(e)}")

def build_game_state_msg(self, state) -> GameState:
""" Builds a GameState message from the game state """

# Get the team objects sorted into own and rival team
own_team = GameStateReceiver.select_team_by(
lambda team: team.team_number == self.team_number,
state.teams)
rival_team = GameStateReceiver.select_team_by(
lambda team: team.team_number != self.team_number,
state.teams)

# Add some assertions to make sure everything is fine
assert not (own_team is None or rival_team is None), \
f'Team {self.team_number} not playing, only {state.teams[0].team_number} and {state.teams[1].team_number}'

assert self.player_number <= len(own_team.players), \
f'Robot {self.player_number} not playing'

this_robot = own_team.players[self.player_number - 1]

return GameState(
header = Header(stamp=self.get_clock().now().to_msg()),
game_state = state.game_state.intvalue,
secondary_state = state.secondary_state.intvalue,
secondary_state_mode = state.secondary_state_info[1],
first_half = state.first_half,
own_score = own_team.score,
rival_score = rival_team.score,
seconds_remaining = state.seconds_remaining,
secondary_seconds_remaining = state.secondary_seconds_remaining,
has_kick_off = state.kick_of_team == self.team_number,
penalized = this_robot.penalty != 0,
seconds_till_unpenalized = this_robot.secs_till_unpenalized,
secondary_state_team = state.secondary_state_info[0],
team_color = own_team.team_color.intvalue,
drop_in_team = state.drop_in_team,
drop_in_time = state.drop_in_time,
penalty_shot = own_team.penalty_shot,
single_shots = own_team.single_shots,
coach_message = own_team.coach_message,
team_mates_with_penalty = [player.penalty != 0 for player in own_team.players],
team_mates_with_red_card = [player.number_of_red_cards != 0 for player in own_team.players],
)

def get_time_since_last_package(self) -> Duration:
""" Returns the time in seconds since the last package was received """
return self.get_clock().now() - self.last_package_time

@staticmethod
def select_team_by(predicate: callable, teams: list[Container]) -> Container:
""" Selects the team based on the predicate """
selected = [team for team in teams if predicate(team)]
return next(iter(selected), None)


Expand Down
Loading

0 comments on commit b6547f7

Please sign in to comment.