Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added discrete movement and utility to build mission from an XML file #180

Merged
merged 30 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
30fa214
added ModSettings class
EHAT32 Nov 18, 2024
7b17011
corrected xml method for ModSettings
EHAT32 Nov 18, 2024
0dc0073
added ModSettings into MissionXML
EHAT32 Nov 18, 2024
86fec19
excluded modSettings from MissionXML.xml()
EHAT32 Nov 19, 2024
f8b6952
added RewardForTouchingBlockType into AgentHandlers
EHAT32 Nov 20, 2024
4d5192d
added RewardForSendingCommand and included in AgentHandlers
EHAT32 Nov 20, 2024
5b8fc8d
renamed RewardBlock into Block, changed .xml() method
EHAT32 Nov 20, 2024
e5078e5
added AgentQuitFromTouchingBlockType and included into AgentHandlers
EHAT32 Nov 20, 2024
bcdb3ac
fixed typo
EHAT32 Nov 20, 2024
44c55e0
fixed typo
EHAT32 Nov 20, 2024
7adfa09
fixed AgentQuitFromTouchingBlockType class
EHAT32 Nov 20, 2024
efd8b4d
fixed .xml() in RewardForTouchingBlockType
EHAT32 Nov 20, 2024
ef3646d
added a convertion from an XML into a dictionary
EHAT32 Nov 20, 2024
fb11703
remove_namespace tweak
EHAT32 Nov 20, 2024
0616388
fixed default field creation for Pytthon 3.11+
EHAT32 Nov 20, 2024
6552322
creation of mission from an XML file
EHAT32 Nov 20, 2024
97adefe
fixed ModSettings
EHAT32 Dec 20, 2024
f370ec0
example for discrete movement on a platform
EHAT32 Dec 20, 2024
81c798e
added command for discrete movement
EHAT32 Dec 22, 2024
ea28dbf
test for discrete movement
EHAT32 Dec 22, 2024
96d768f
removed
EHAT32 Dec 23, 2024
b26d27d
removed using of dicts
EHAT32 Dec 23, 2024
106a360
Merge remote-tracking branch 'upstream/main'
EHAT32 Dec 23, 2024
07d9267
moved test_discrete_motion into test_motion_vereya
EHAT32 Dec 23, 2024
2459d66
reverted merging two tests
EHAT32 Dec 23, 2024
d9ae3a6
added test_discrete_motion into run_tests
EHAT32 Dec 23, 2024
a21ba74
removed excessive code
EHAT32 Dec 23, 2024
3f490e5
removed deprecated import
EHAT32 Dec 24, 2024
d208185
Merge branch 'trueagi-io:main' into main
EHAT32 Dec 25, 2024
658d85e
moved helper functions into corresponding classes
EHAT32 Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 223 additions & 0 deletions tagilmo/VereyaPython/load_from_xml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import xml.etree.ElementTree as ET
import tagilmo.utils.mission_builder as mb
from .xml_util import remove_namespaces

def _createAbout(aboutRoot = None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move these helper functions into corresponding classes, e.g. createAbout into About.from_xml, _createAgentSection into AgentSection.from_xml etc..

@CICS-Oleg what do you think?

Copy link
Contributor Author

@EHAT32 EHAT32 Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Also I added new argument into MissionXML constructor: path to the XML file, so that mission can be built at initialization

about = mb.About()
if aboutRoot is None or len(list(aboutRoot)) == 0:
return about
summary = aboutRoot.find("Summary")
if summary is not None:
about.summary = summary.text
return about


def _createModSettings(msRoot = None):
ms = mb.ModSettings()
if msRoot is None or len(list(msRoot)) == 0:
return ms

msTick = msRoot.find("MsPerTick")
if msTick is not None:
ms.ms_per_tick = msTick.text
return ms


def _createTime(timeRoot = None):
if timeRoot is None or len(list(timeRoot)) == 0:
return None, None

time_start = None
if timeRoot.find("StartTime") is not None:
time_start = timeRoot.find("StartTime").text

time_pass = None
if timeRoot.find("AllowPassageOfTime") is not None:
time_pass = timeRoot.find("AllowPassageOfTime").text

return time_start, time_pass


def _createServerInitialConditions(initConditionsRoot = None):
if initConditionsRoot is None or len(list(initConditionsRoot)) == 0:
return mb.ServerInitialConditions()

time_start, time_pass = _createTime(initConditionsRoot.find("Time"))

weather = None
if initConditionsRoot.find("Weather") is not None:
weather = initConditionsRoot.find("Weather").text

allow_spawning = "true"
if initConditionsRoot.find("AllowSpawning") is not None:
allow_spawning = initConditionsRoot.find("AllowSpawning").text

initial_conditions = mb.ServerInitialConditions(time_start_string=time_start,
time_pass_string=time_pass,
weather_string=weather,
spawning_string=allow_spawning)
return initial_conditions

def _createDrawingDecorator(drawingDecoratorRoot = None):
if drawingDecoratorRoot is None or len(list(drawingDecoratorRoot)) == 0:
return mb.DrawingDecorator()
decorators = []
drawing_elements = list(drawingDecoratorRoot)
for el in drawing_elements:
match el.tag:
case "DrawCuboid":
decorators.append(mb.DrawCuboid(el.attrib["x1"], el.attrib["y1"], el.attrib["z1"],
el.attrib["x2"], el.attrib["y2"], el.attrib["z2"],
el.attrib["type"]))
case "DrawBlock":
decorators.append(mb.DrawBlock(el.attrib["x"], el.attrib["y"], el.attrib["z"],
el.attrib["type"]))
case "DrawLine":
decorators.append(mb.DrawLine(el.attrib["x1"], el.attrib["y1"], el.attrib["z1"],
el.attrib["x2"], el.attrib["y2"], el.attrib["z2"],
el.attrib["type"]))
case "DrawItem":
decorators.append(mb.DrawItem(el.attrib["x"], el.attrib["y"], el.attrib["z"],
el.attrib["type"]))
case _:
continue
return mb.DrawingDecorator(decorators)


def _createServerHandlers(handlersRoot = None):
serverHandlers = mb.ServerHandlers()
if handlersRoot is None or len(list(handlersRoot)) == 0:
return serverHandlers

world_generator = mb.defaultworld()
flat_gen = handlersRoot.find("FlatWorldGenerator")
if flat_gen is not None:
world_generator = mb.flatworld(flat_gen.attrib["generatorString"])
serverHandlers.worldgenerator = world_generator

serverHandlers.drawingdecorator = _createDrawingDecorator(handlersRoot.find("DrawingDecorator"))

time_limit = None
if handlersRoot.find("ServerQuitFromTimep") is not None:
time_limit = handlersRoot.find("ServerQuitFromTimep").attrib["timeLimitMs"]
serverHandlers.timeLimitsMs = time_limit

return serverHandlers


def _createServerSection(serverSectionRoot = None):
init_conditions = _createServerInitialConditions(serverSectionRoot.find("ServerInitialConditions"))
handlers = _createServerHandlers(serverSectionRoot.find('ServerHandlers'))
return mb.ServerSection(handlers=handlers, initial_conditions=init_conditions)


def _createCommands(commandsRoot= None):
if commandsRoot is None:
return mb.Commands()
# TODO: parsing non-empty root
return mb.Commands()


def _createRewardForTouchingBlockType(rewardForTouchingBlockTypeRoot = None):
if rewardForTouchingBlockTypeRoot is None or len(list(rewardForTouchingBlockTypeRoot)) == 0:
return mb.RewardForTouchingBlockType()
rewards = []
reward_blocks = rewardForTouchingBlockTypeRoot.findall("Block")
for block in reward_blocks:
rewards.append(mb.Block(reward=block.attrib['reward'],
blockType=block.attrib['type'],
behaviour=block.attrib['behaviour']))
return mb.RewardForTouchingBlockType(rewards)


def _createAgentQuitFromTouchingBlockType(agentQuitFromTouchingBlockType = None):
if agentQuitFromTouchingBlockType is None or len(list(agentQuitFromTouchingBlockType)) == 0:
return mb.AgentQuitFromTouchingBlockType()
quit_blocks = []
blocks = agentQuitFromTouchingBlockType.findall("Block")
for block in blocks:
quit_blocks.append(mb.Block(blockType=block.attrib['type']))
return mb.AgentQuitFromTouchingBlockType(quit_blocks)


def _createAgentHandlers(agentHandlersRoot = None):
if agentHandlersRoot is None:
return mb.AgentHandlers()
obsFullStats = None
if agentHandlersRoot.find("ObservationFromFullStats") is not None:
obsFullStats = True
obs = mb.Observations(bFullStats=obsFullStats)
video_producer = mb.VideoProducer()
if agentHandlersRoot.find("VideoProducer") is not None:
producer = agentHandlersRoot.find("VideoProducer")
video_producer = mb.VideoProducer(height=int(producer.find("Height").text),
width=int(producer.find("Width").text),
want_depth=producer.attrib["want_depth"] == "true")
commands = _createCommands()

rewards_for_touching = None
if agentHandlersRoot.find("RewardForTouchingBlockType") is not None:
rewards_for_touching = _createRewardForTouchingBlockType(agentHandlersRoot.find("RewardForTouchingBlockType"))

reward_for_sending_command = None
if agentHandlersRoot.find("RewardForSendingCommand") is not None:
reward = agentHandlersRoot.find("RewardForSendingCommand").attrib['reward']
reward_for_sending_command = mb.RewardForSendingCommand(reward=reward)

agent_quit = None
if agentHandlersRoot.find('AgentQuitFromTouchingBlockType') is not None:
agent_quit = _createAgentQuitFromTouchingBlockType(agentHandlersRoot.find('AgentQuitFromTouchingBlockType'))

return mb.AgentHandlers(commands=commands, observations=obs, video_producer=video_producer,
rewardForTouchingBlockType=rewards_for_touching,
rewardForSendingCommand=reward_for_sending_command, agentQuitFromTouchingBlockType=agent_quit)


def _createAgentSection(agentSectionRoot = None):

if agentSectionRoot is None or len(agentSectionRoot) == 0:
return [mb.AgentSection()]

sections = []
for agent in agentSectionRoot:
agent_section = mb.AgentSection()
if agent.find("mode") is not None:
agent_section.mode = agent.attrib["mode"]

if agent.find("Name") is not None:
agent_section.name = agent.find("Name").text

if agent.find("AgentStart") is not None:
xyzp = None
if agent.find("AgentStart").find("Placement") is not None:
placement = agent.find("AgentStart").find("Placement")
xyzp = [float(v) for _,v in placement.items()][:4] #yaw is not in constructor
agent_start = mb.AgentStart(place_xyzp=xyzp)
agent_section.agentstart = agent_start

agent_section.agenthandlers = _createAgentHandlers(agent.find("AgentHandlers"))

sections.append(agent_section)

return sections


def _createMissionXML(missionRoot):
mission = mb.MissionXML()
if missionRoot is None or len(list(missionRoot)) == 0:
return mission

mission.about = _createAbout(missionRoot.find("About"))
mission.modSettings = _createModSettings(missionRoot.find("ModSettings"))
mission.serverSection = _createServerSection(missionRoot.find("ServerSection"))
mission.agentSections = _createAgentSection(missionRoot.findall("AgentSection"))

return mission


def load_mission(path):
tree = ET.parse(path)
root = tree.getroot()
remove_namespaces(root)
miss = _createMissionXML(root)
return miss
7 changes: 7 additions & 0 deletions tagilmo/VereyaPython/xml_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,10 @@ def str2xml(xml: str) -> Element:
_, _, el.tag = el.tag.rpartition('}') # strip ns
root = it.root
return root


def remove_namespaces(el):
if el.tag.startswith('{'):
_, _, el.tag = el.tag.rpartition('}')
for child in el:
remove_namespaces(child)
71 changes: 69 additions & 2 deletions tagilmo/utils/mission_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,20 @@ def xml(self):
return _xml


class ModSettings:

def __init__(self, ms_per_tick=None):
self.ms_per_tick = ms_per_tick

def xml(self):
_xml = '<ModSettings>\n'
if self.ms_per_tick:
_xml += "<MsPerTick>"+self.ms_per_tick+"</MsPerTick>"
else:
_xml += "<MsPerTick/>"
_xml += '\n</ModSettings>\n'
return _xml


class ServerInitialConditions:

Expand Down Expand Up @@ -405,20 +419,71 @@ def xml(self):
</ColourMapProducer>'.format(width=self.width, height=self.height)


class Block:
def __init__(self, reward=None, blockType=None, behaviour=None):
self.reward = reward
self.blockType = blockType
self.behaviour = behaviour

def xml(self):
_xml = "<Block"
_xml += '' if self.reward is None else f' reward="{self.reward}"'
_xml += '' if self.blockType is None else f' type="{self.blockType}"'
_xml += '' if self.behaviour is None else f' behaviour="{self.behaviour}"'
_xml += "/>\n"
return _xml

class RewardForTouchingBlockType:
def __init__(self, rewardBlocks = []):
self.rewardBlocks = rewardBlocks

def xml(self):
_xml = "<RewardForTouchingBlockType>\n"
for rewardBlock in self.rewardBlocks:
_xml += rewardBlock.xml()
_xml += "</RewardForTouchingBlockType>\n"
return _xml


class RewardForSendingCommand:
def __init__(self, reward):
self.reward = reward

def xml(self):
return f'<RewardForSendingCommand reward="{self.reward}"/>\n'


class AgentQuitFromTouchingBlockType:
def __init__(self, quitBlocks = []):
self.quitBlocks = quitBlocks

def xml(self):
_xml = "<AgentQuitFromTouchingBlockType>\n"
for quitBlock in self.quitBlocks:
_xml += quitBlock.xml()
_xml += "</AgentQuitFromTouchingBlockType>\n"
return _xml


class AgentHandlers:

def __init__(self, commands=Commands(), observations=Observations(),
all_str='', video_producer=None, colourmap_producer=None):
all_str='', video_producer=None, colourmap_producer=None, rewardForTouchingBlockType = None, rewardForSendingCommand=None, agentQuitFromTouchingBlockType = None):
self.commands = commands
self.observations = observations
self.all_str = all_str
self.video_producer = video_producer
self.colourmap_producer = colourmap_producer
self.rewardForTouchingBlockType = rewardForTouchingBlockType
self.rewardForSendingCommand = rewardForSendingCommand
self.agentQuitFromTouchingBlockType = agentQuitFromTouchingBlockType

def xml(self):
_xml = '<AgentHandlers>\n'
_xml += self.commands.xml()
_xml += '' if self.rewardForTouchingBlockType is None else self.rewardForTouchingBlockType.xml()
_xml += '' if self.rewardForSendingCommand is None else self.rewardForSendingCommand.xml()
_xml += '' if self.agentQuitFromTouchingBlockType is None else self.agentQuitFromTouchingBlockType.xml()
_xml += self.observations.xml()
_xml += self.all_str
_xml += '' if self.video_producer is None else self.video_producer.xml()
Expand Down Expand Up @@ -508,9 +573,10 @@ def xml(self):

class MissionXML:

def __init__(self, about=About(), serverSection=ServerSection(), agentSections=[AgentSection()], namespace=None):
def __init__(self, about=About(), modSettings = ModSettings(), serverSection=ServerSection(), agentSections=[AgentSection()], namespace=None):
self.namespace = namespace
self.about = about
self.modSettings = modSettings
self.serverSection = serverSection
self.agentSections = agentSections

Expand Down Expand Up @@ -561,6 +627,7 @@ def xml(self):
<Mission xmlns="http://{0}" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
'''.format(namespace)
_xml += self.about.xml()
_xml += self.modSettings.xml()
_xml += self.serverSection.xml()
for agentSection in self.agentSections:
_xml += agentSection.xml()
Expand Down
10 changes: 10 additions & 0 deletions tagilmo/utils/vereya_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,16 @@ def _sendMotionCommand(self, command, value, agentId=None):
def strafe(self, value, agentId=None):
return self._sendMotionCommand('strafe', value, agentId)

def discreteMove(self, value, agentId=None):
"""Moves the agent one block along one of the cardinal directions.

Args:
value (string): "west" | "east" | "north" | "south"

agentId (int, optional): id of an agent
"""
return self._sendMotionCommand('move'+value, 1, agentId)

def move(self, value, agentId=None):
return self._sendMotionCommand('move', value, agentId)

Expand Down
4 changes: 3 additions & 1 deletion tests/vereya/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def main():
test_files = ['test_motion_vereya', 'test_craft',
'test_inventory', 'test_quit',
'test_observation', 'test_placement', 'test_image',
'test_consistency', 'test_motion_mob', 'test_mob', 'test_agent', 'test_flat_world', 'test_draw']
'test_consistency', 'test_motion_mob', 'test_mob',
'test_agent', 'test_flat_world', 'test_draw',
'test_discrete_motion']
res = run_tests(test_files)
if not res.wasSuccessful():
sys.exit(1)
Expand Down
Loading