sm_base_executor.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import rospy
4 from strands_executive_msgs.msg import Task, TaskEvent
5 from strands_executive_msgs.srv import *
6 import mongodb_store.util as dc_util
7 import actionlib
8 from actionlib_msgs.msg import GoalStatus
9 from geometry_msgs.msg import Pose, Point, Quaternion
10 from mongodb_store.message_store import MessageStoreProxy
11 from topological_navigation.msg import GotoNodeAction, GotoNodeGoal
12 # from strands_executive_msgs.msg import ExecutePolicyAction, ExecutePolicyGoal
13 from std_srvs.srv import Empty, EmptyResponse
14 from std_msgs.msg import String
15 from task_executor.base_executor import BaseTaskExecutor
16 from smach_ros import SimpleActionState, IntrospectionServer
17 import smach
18 import threading
19 
20 class ExecutorState(smach.State):
21  def __init__(self, task_executor, outcomes):
22  super(ExecutorState , self ).__init__(outcomes=outcomes, input_keys=['task'])
23  self.executor = task_executor
24 
25 class TimerState(smach.State):
26  def __init__(self, duration, pause_secs=1, can_finish=None):
27  super(TimerState , self ).__init__(outcomes=['succeeded', 'preempted', 'aborted'])
28  self.duration = duration
29  self.pause = rospy.Duration(pause_secs)
30  if can_finish is None:
31  self.can_finish = lambda: True
32  else:
33  self.can_finish = can_finish
34 
35  def execute(self, userdata):
36  now = rospy.get_rostime()
37  target = now + self.duration
38  while now < target and not self.preempt_requested() and not rospy.is_shutdown():
39  rospy.sleep(self.pause)
40  now = rospy.get_rostime()
41 
42  # now block if can_finish is not true
43  while not self.can_finish() and not self.preempt_requested() and not rospy.is_shutdown():
44  rospy.sleep(self.pause)
45  rospy.loginfo('Waiting for signal that I can finish after timeout')
46 
47  if rospy.is_shutdown():
48  return 'aborted'
49  elif self.preempt_requested():
50  return 'preempted'
51  else:
52  return 'succeeded'
53 
54 
55 
56 
58  def __init__(self, task_executor):
59  ExecutorState.__init__(self, task_executor, outcomes=['aborted'])
60 
61  def execute(self, userdata):
62  rospy.logwarn('Task failed')
63  completed = userdata.task
64  self.executor.active_task = None
65  self.executor.log_task_event(userdata.task, TaskEvent.TASK_FAILED, rospy.get_rostime())
66  self.executor.task_failed(completed)
67  rospy.loginfo('Execution of task %s failed' % userdata.task.task_id)
68  return 'aborted'
69 
71  def __init__(self, task_executor):
72  ExecutorState.__init__(self, task_executor, outcomes=['succeeded'])
73 
74  def execute(self, userdata):
75  now = rospy.get_rostime()
76  # do bookkeeping before causing update
77  completed = userdata.task
78  self.executor.active_task = None
79  self.executor.log_task_event(userdata.task, TaskEvent.TASK_SUCCEEDED, rospy.get_rostime())
80 
81  self.executor.task_succeeded(completed)
82  rospy.loginfo('Execution of task %s succeeded' % userdata.task.task_id)
83  return 'succeeded'
84 
85 
87  def __init__(self, task_executor):
88  ExecutorState.__init__(self, task_executor, outcomes=['preempted'])
89 
90  def execute(self, userdata):
91  rospy.loginfo('Execution of task %s was cancelled' % userdata.task.task_id)
92  self.executor.log_task_event(userdata.task, TaskEvent.TASK_PREEMPTED, rospy.get_rostime())
93  # it could be that a delayed cancellation signal causes this to get called out of turn, so check that this is the correct cancellation
94  if self.executor.active_task is not None and self.executor.active_task.task_id == userdata.task.task_id:
95  completed = userdata.task
96  self.executor.active_task = None
97  self.executor.task_failed(completed)
98  return 'preempted'
99 
100 
102  def __init__(self, task_executor):
103  ExecutorState.__init__(self, task_executor, outcomes=['succeeded'])
104 
105 
106  def execute(self, userdata):
107  self.executor.log_task_event(userdata.task, TaskEvent.TASK_STARTED, rospy.get_rostime())
108  self.executor.prepare_task(userdata.task)
109  return 'succeeded'
110 
112  """ Provides a base executor using Smach """
113  # These can be implemented by sub classes to provide hooks into the execution system
114 
115 
116  def __init__(self):
117  super( AbstractTaskExecutor, self ).__init__()
118  self.task_sm = None
119  self.smach_thread = None
120  self.sis = None
121  self.join_lock = threading.Lock()
122 
123 
124  def reset_sm(self):
125 
126  # If introspection is running, stop it
127  if self.sis is not None:
128  self.sis.stop()
129 
130  # If the smach_tread is running then we may have started executing prior to the sm winding up fully, or there is something blocking there
131  timeout_secs = 15
132  if not self.join_smach_thread(timeout_secs):
133  rospy.logerr('SMACH thread has not shut down properly. Continuing with execution regardless')
134 
135 
136  self.task_sm = None
137  self.smach_thread = None
138  self.sis = None
139 
140  def nav_start_cb(self, userdata, initial_states):
141  task = userdata.task
142  rospy.loginfo('Nav started to %s for task %s, action %s' % (task.start_node_id, task.task_id, task.action))
143  self.log_task_event(task, TaskEvent.NAVIGATION_STARTED, rospy.get_rostime())
144 
145 
146 
147 
148 
149 
150 
151 
152  def nav_termination_cb(self, userdata, terminal_states, container_outcome):
153  rospy.loginfo('Nav terminated with outcome %s' % container_outcome)
154  if container_outcome == 'succeeded':
155  self.log_task_event(userdata.task, TaskEvent.NAVIGATION_SUCCEEDED, rospy.get_rostime())
156  elif container_outcome == 'preempted':
157  self.log_task_event(userdata.task, TaskEvent.NAVIGATION_PREEMPTED, rospy.get_rostime())
158  else:
159  self.log_task_event(userdata.task, TaskEvent.NAVIGATION_FAILED, rospy.get_rostime())
160 
161  def action_start_cb(self, userdata, initial_states):
162  task = userdata.task
163  rospy.loginfo('Action %s started for task %s' % (task.action, task.task_id))
164  self.log_task_event(task, TaskEvent.EXECUTION_STARTED, rospy.get_rostime())
165 
166  def outcome_cb(self, outcome_map):
167  if outcome_map['MONITORED'] == 'succeeded':
168  return 'succeeded'
169  if outcome_map["MONITORED"] == "aborted" or outcome_map["MONITORING"] == "aborted":
170  return "aborted"
171  if outcome_map["MONITORING"] == "succeeded":
172  return "aborted"
173  if outcome_map["MONITORED"] == "preempted" or outcome_map["MONITORING"] == "preempted":
174  return "preempted"
175 
176 
177  ## outcome map for monitored states
178  #concurrence_outcome_map = {
179  ## if the monitored state succeeds the concurrence succeeds
180  #'succeeded' : {'MONITORED':'succeeded'},
181  ## if either abort, then the concurrence returns this
182  #'aborted' : {'MONITORED':'aborted'},
183  #'aborted' : {'MONITORING':'aborted'},
184  ## if the monitoring state succeeds the we abort as nav timed out
185  #'aborted' : {'MONITORING':'succeeded'},
186  ## if both were preempted then we were prempted overall
187  #'preempted' : {'MONITORED':'preempted', 'MONITORING':'preempted'}
188  #}
189 
190 
191  def action_termination_cb(self, userdata, terminal_states, container_outcome):
192  rospy.loginfo('Action terminated with outcome %s' % container_outcome)
193  if container_outcome == 'succeeded':
194  self.log_task_event(userdata.task, TaskEvent.EXECUTION_SUCCEEDED, rospy.get_rostime())
195  elif container_outcome == 'preempted':
196  self.log_task_event(userdata.task, TaskEvent.EXECUTION_PREEMPTED, rospy.get_rostime())
197  else:
198  self.log_task_event(userdata.task, TaskEvent.EXECUTION_FAILED, rospy.get_rostime())
199 
200  def execute_task(self, task):
201  """ Called to trigger execution of given task. """
202 
203  # clean up from last execution
204  self.reset_sm()
205 
206  rospy.loginfo('Execution of task %s was requested' % task.task_id)
207 
208  # Create the state machine necessary to execution this task
209  self.task_sm = smach.StateMachine(['succeeded','aborted','preempted'])
210  self.task_sm.userdata.task = task
211 
212  with self.task_sm:
213 
214  # Initialise task data
215  init_transition = {}
216  if task.start_node_id != '':
217  init_transition = {'succeeded': 'TASK_NAVIGATION'}
218  else:
219  init_transition = {'succeeded': 'TASK_EXECUTION'}
220 
221  smach.StateMachine.add('TASK_INITIALISATION', TaskInitialisation(self), transitions=init_transition)
222 
223  # Final task outcomes
224  smach.StateMachine.add('TASK_SUCCEEDED', TaskSucceeded(self), transitions={'succeeded':'succeeded'})
225  smach.StateMachine.add('TASK_CANCELLED', TaskCancelled(self), transitions={'preempted':'preempted'})
226  smach.StateMachine.add('TASK_FAILED', TaskFailed(self), transitions={'aborted':'aborted'})
227 
228 
229 
230  # when one child terminates, preempt the others
231  concurrence_child_term_cb = lambda so: True
232 
233 
234  # navigation action
235  if task.start_node_id != '':
236 
237  nav_transitions = {'preempted':'TASK_CANCELLED', 'aborted':'TASK_FAILED'}
238 
239  # if no action then nav success leads to task success
240  if task.action == '':
241  nav_transitions['succeeded'] = 'TASK_SUCCEEDED'
242  # else move on to execution
243  else:
244  nav_transitions['succeeded'] = 'TASK_EXECUTION'
245 
246 
247  # create a concurrence which monitors execution time
248  nav_concurrence = smach.Concurrence(outcomes=['succeeded', 'preempted', 'aborted'],
249  default_outcome='aborted',
250  outcome_cb=self.outcome_cb,
251  child_termination_cb=concurrence_child_term_cb)
252  # register callback for logging
253  nav_concurrence.register_start_cb(self.nav_start_cb)
254  nav_concurrence.register_termination_cb(self.nav_termination_cb)
255  nav_concurrence.userdata.task = task
256 
257  with nav_concurrence:
258  # where we want to go
259  # nav_goal = ExecutePolicyGoal(task_type=ExecutePolicyGoal.GOTO_WAYPOINT, target_id=task.start_node_id, time_of_day='all_day')
260  nav_goal = GotoNodeGoal(target = task.start_node_id)
261  # let nav run for three times the length it usually takes before terminating
262  monitor_duration = self.expected_navigation_duration(task) * 3
263 
264  smach.Concurrence.add('MONITORED',
265  # SimpleActionState('mdp_plan_exec/execute_policy',
266  # ExecutePolicyAction,
267  SimpleActionState('topological_navigation',
268  GotoNodeAction,
269  goal=nav_goal))
270  smach.Concurrence.add('MONITORING', TimerState(duration=monitor_duration))
271 
272  smach.StateMachine.add('TASK_NAVIGATION', nav_concurrence, transitions=nav_transitions)
273 
274  # actual task action
275  if task.action != '':
276  try:
277  (action_string, goal_string) = self.get_task_types(task.action)
278  action_clz = dc_util.load_class(dc_util.type_to_class_string(action_string))
279  goal_clz = dc_util.load_class(dc_util.type_to_class_string(goal_string))
280 
281  argument_list = self.get_arguments(task.arguments)
282  goal = goal_clz(*argument_list)
283 
284  # create a concurrence which monitors execution time along with doing the execution
285  action_concurrence = smach.Concurrence(outcomes=['succeeded', 'preempted', 'aborted'],
286  default_outcome='aborted',
287  outcome_cb=self.outcome_cb,
288  child_termination_cb=concurrence_child_term_cb)
289 
290  # register callback for logging
291  action_concurrence.register_start_cb(self.action_start_cb)
292  action_concurrence.register_termination_cb(self.action_termination_cb)
293  action_concurrence.userdata.task = task
294 
295  with action_concurrence:
296  smach.Concurrence.add('MONITORED', SimpleActionState(task.action, action_clz, goal=goal))
297  wiggle_room = rospy.Duration(5)
298 
299  # this prevents the task being interupted if it runs out of time but should still run
300  def task_can_be_interrupted():
301  return self.is_task_interruptible(task)
302 
303  smach.Concurrence.add('MONITORING', TimerState(duration=(task.max_duration+wiggle_room), can_finish=task_can_be_interrupted))
304 
305  smach.StateMachine.add('TASK_EXECUTION',
306  action_concurrence,
307  transitions={'preempted':'TASK_CANCELLED', 'aborted':'TASK_FAILED', 'succeeded':'TASK_SUCCEEDED'})
308 
309  except Exception, e:
310  rospy.logwarn('Caught exception when creating the task to execute: %s' % e)
311  self.task_failed(task)
312  return
313 
314  self.task_sm.set_initial_state(['TASK_INITIALISATION'])
315 
316  self.sis = IntrospectionServer('task_executor', self.task_sm, '/TASK_ROOT')
317  self.sis.start()
318 
319  self.smach_thread = threading.Thread(target=self.task_sm.execute)
320  self.smach_thread.start()
321 
322  def join_smach_thread(self, timeout):
323  """ Returns true of the smach_thread has been terminated successfully """
324 
325  self.join_lock.acquire()
326 
327  successfully_joined = True
328 
329  # if we need to join
330  if self.smach_thread is not None and self.smach_thread.is_alive():
331  self.smach_thread.join(timeout)
332  if self.smach_thread is not None:
333  successfully_joined = not self.smach_thread.is_alive()
334 
335  self.join_lock.release()
336 
337  return successfully_joined
338 
340  preempt_timeout_secs = 30
341  if self.task_sm is not None:
342  rospy.loginfo('Requesting preempt on state machine in state %s' % self.task_sm.get_active_states())
343  self.task_sm.request_preempt()
344  rospy.loginfo('Waiting for exit')
345 
346  if not self.join_smach_thread(preempt_timeout_secs):
347  rospy.logerr('Task action or navigation did not preempt after %s seconds. State at end was %s' % (preempt_timeout_secs, self.task_sm.get_active_states()))
348  # manually notify completeness in this case
349  completed = self.active_task
350  self.active_task = None
351  self.task_failed(completed)
352  else:
353  rospy.loginfo('And relax')
354 
def nav_termination_cb(self, userdata, terminal_states, container_outcome)
def nav_start_cb(self, userdata, initial_states)
def __init__(self, task_executor, outcomes)
def action_termination_cb(self, userdata, terminal_states, container_outcome)
def action_start_cb(self, userdata, initial_states)


task_executor
Author(s): Nick Hawes
autogenerated on Tue Mar 17 2015 20:08:13