-
Notifications
You must be signed in to change notification settings - Fork 0
/
planner.py
150 lines (116 loc) · 4.74 KB
/
planner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import util
from state import State
from node import Node
class Frontier(object):
'''
Frontier class implement a search frontier using a
PriorityQueue for ordering the nodes and a set for
constant-time checks of states in frontier.
OBSERVATION: it receives as input a function `f` that
itself receives a node and returns the priority for
the given node. Check util.PriorityQueueWithFunction for
more details.
'''
def __init__(self, f):
self._queue = util.PriorityQueueWithFunction(f)
self._set = set()
def __contains__(self, node):
''' Return true if `node.state` is in the frontier. '''
return node.state in self._set
def __len__(self):
''' Return the number of nodes in frontier. '''
assert(len(self._queue) == len(self._set))
return len(self._queue)
def is_empty(self):
''' Return true if frontier is empty. '''
return self._queue.isEmpty()
def push(self, node):
''' Push `node` to frontier. '''
self._queue.push(node)
self._set.add(node.state)
def update(self, node, priority):
self._queue.update(node, priority)
if (node.state not in self._set):
self._set.add(node.state)
def pop(self):
''' Pop `node` from frontier. '''
node = self._queue.pop()
self._set.remove(node.state)
return node
def __str__(self):
''' Return string representation of frontier. '''
return str(self._queue)
class ProgressionPlanning(object):
'''
ProgressionPlanning class implements all necessary components
for implicitly generating the state space in a forward way (i.e., by progression).self
'''
def __init__(self, domain, problem):
self._problem = problem
self._all_actions = problem.ground_all_actions(domain)
@property
def problem(self):
return self._problem
@property
def actions(self):
return self._all_actions
def applicable(self, state):
''' Return a list of applicable actions in a given `state`. '''
applicable = list()
for action in self.actions:
if State(state).intersect(action.precond) == action.precond:
applicable.append(action)
return applicable
def successor(self, state, action):
''' Return the sucessor state generated by executing `action` in `state`. '''
return State(action.pos_effect).union(State(state).difference(action.neg_effect))
def goal_test(self, state):
''' Return true if `state` is a goal state. '''
return State(state).intersect(self.problem.goal) == State(self.problem.goal)
def solve(self, W, heuristics):
'''
Implement best-first graph-search WA*. It receives `W` the weight of WA*
and `heuristics` a function that receives a state s and the planning object (ie., self)
and returns h(s). Check heuristics.py for more information.
If problem has solution, return a triple (plan, num_explored, num_generated) where:
- `plan` is a sequence of actions;
- `num_explored` is the number of states explored; and
- `num_generated` is the nubmer of states generated.
Otherwise, it should return None.
OBSERVATION: a state is 'explored' when it is removed from the frontier and
a state is 'generated' when it is the successor state generated by an action
regardless whether or not it is in the explored set or already in the frontier.
'''
plan = []
num_explored = 0
num_generated = 0
explored = list()
init_state = self.problem.init
goal = self.problem.goal
frontier = Frontier(node_cost)
frontier.push(Node(State().union(init_state)))
while ( not frontier.is_empty() ):
node = frontier.pop()
state = node.state
explored.append(state)
num_explored += 1
if (self.goal_test(state)):
plan = node.path()
break
else:
actions_App = self.applicable(state)
for action in actions_App:
successsor = self.successor(state, action)
new_Node = Node(successsor,
action,
node,
node.g + 1,
W * heuristics(successsor, self, goal))
if (new_Node not in frontier) and (successsor not in explored):
frontier.push(new_Node)
num_generated += 1
if (plan == []):
return (None, 0, 0)
return (plan, num_explored, num_generated)
def node_cost(node):
return node.g + node.h