base_executor.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import rospy
4 
5 from strands_executive_msgs.msg import Task, TaskEvent
6 from strands_executive_msgs.srv import *
7 import mongodb_store.util as dc_util
8 import actionlib
9 from actionlib_msgs.msg import GoalStatus
10 from geometry_msgs.msg import Pose, Point, Quaternion
11 from mongodb_store.message_store import MessageStoreProxy
12 from topological_navigation.msg import GotoNodeAction, GotoNodeGoal
13 from strands_navigation_msgs.srv import EstimateTravelTime
14 from std_srvs.srv import Empty, EmptyResponse
15 from std_msgs.msg import String
16 import threading
17 
18 class BaseTaskExecutor(object):
19  # These can be implemented by sub classes to provide hooks into the execution system
20 
21  def add_tasks(self, tasks):
22  """ Called with new tasks for the executor """
23  pass
24 
25  def start_execution(self):
26  """ Called when overall execution should (re)start """
27  pass
28 
29  def pause_execution(self):
30  """ Called when overall execution should pause """
31  pass
32 
33  def task_complete(self, task):
34  """ Called when the given task has completed execution """
35  pass
36 
37  def task_succeeded(self, task):
38  """ Called when the given task has completed execution successfully """
39  self.task_complete(task)
40 
41  def task_failed(self, task):
42  """ Called when the given task has completed execution but failed """
43  self.task_complete(task)
44 
45 
46  def task_demanded(self, demanded_task, currently_active_task):
47  """ Called when a task is demanded. self.active_task is the demanded task (and is being executed) and previously_active_task was the task that was being executed (which could be None) """
48  pass
49 
50  def cancel_task(self, task_id):
51  """ Called when a request is received to cancel a task. The currently executing one is checked elsewhere. """
52  return False
53 
54  def clear_schedule(self):
55  """ Called to clear all tasks from schedule, with the exception of the currently executing one. """
56  pass
57 
58  def __init__(self):
59  self.task_counter = 1
60  self.msg_store = MessageStoreProxy()
61  self.logging_msg_store = MessageStoreProxy(collection='task_events')
62 
63  expected_time_srv_name = 'topological_navigation/travel_time_estimator'
64  rospy.loginfo('Waiting for %s' % expected_time_srv_name)
65  rospy.wait_for_service(expected_time_srv_name)
66  rospy.loginfo('... and got %s' % expected_time_srv_name)
67  self.expected_time = rospy.ServiceProxy(expected_time_srv_name, EstimateTravelTime)
68 
69  self.executing = False
70  # start with some faked but likely one in case of problems
71  self.current_node = 'WayPoint1'
72  self.closest_node = 'WayPoint1'
73  rospy.Subscriber('/current_node', String, self.update_topological_location)
74  rospy.Subscriber('/closest_node', String, self.update_topological_closest_node)
75  self.active_task = None
76  self.active_task_completes_by = rospy.get_rostime()
77  self.service_lock = threading.Lock()
78  self.expected_time_lock = threading.Lock()
79 
80  self.task_event_publisher = rospy.Publisher('task_executor/events', TaskEvent, queue_size=20)
81 
82 
83 
85  return self.active_task_completes_by
86 
87  def update_topological_location(self, node_name):
88  self.current_node = node_name.data
89 
90 
91  def update_topological_closest_node(self,node_name):
92  self.closest_node=node_name.data
93 
94 
95  def advertise_services(self):
96  """
97  Adverstise ROS services. Only call at the end of constructor to avoid calls during construction.
98  """
99  # advertise ros services
100  for attr in dir(self):
101  if attr.endswith("_ros_srv"):
102  service=getattr(self, attr)
103  rospy.Service("/task_executor/" + attr[:-8], service.type, service)
104 
105  def get_task_types(self, action_name):
106  """
107  Returns the type string related to the action string provided.
108  """
109  rospy.logdebug("task action provided: %s", action_name)
110  topics = rospy.get_published_topics(action_name)
111  for [topic, type] in topics:
112  if topic.endswith('feedback'):
113  return (type[:-8], type[:-14] + 'Goal')
114  raise RuntimeError('No action associated with topic: %s'% action_name)
115 
116 
118  # if we're going nowhere, return some default
119  if task.start_node_id == '':
120  return rospy.Duration(10)
121  elif self.current_node == 'none':
122  return self.get_navigation_duration(start=self.closest_node, end=task.start_node_id)
123  else:
124  return self.get_navigation_duration(start=self.current_node, end=task.start_node_id)
125 
126 
127  def get_navigation_duration(self, start, end):
128 
129  try:
130  # prevent concurrent calls to expected_time service.
131  self.expected_time_lock.acquire()
132  if start == '' or end == '':
133  # if we're going nowhere, return some default
134  return rospy.Duration(10)
135  else:
136  et = self.expected_time(start=start, target=end)
137  # rospy.loginfo('expected travel time %s' % et.travel_time)
138  # allow a bit of time for any transition -- mainly for testing cases
139  return rospy.Duration(max(et.travel_time.to_sec() * 20, 10))
140  # return rospy.Duration(120)
141  except Exception, e:
142  rospy.logwarn('Caught exception when getting expected time: %s' % e)
143  return rospy.Duration(10)
144  finally:
145  self.expected_time_lock.release()
146 
147 
148 
149  def log_task_events(self, tasks, event, time, description=""):
150  for task in tasks:
151  te = TaskEvent(task=task, event=event, time=time, description=description)
152 
153  try:
154  self.task_event_publisher.publish(te)
155  self.logging_msg_store.insert(te)
156  except Exception, e:
157  rospy.logwarn('Caught exception when logging: %s' % e)
158 
159 
160  def log_task_event(self, task, event, time, description=""):
161  te = TaskEvent(task=task, event=event, time=time, description=description)
162  try:
163  self.task_event_publisher.publish(te)
164  self.logging_msg_store.insert(te)
165  except Exception, e:
166  rospy.logwarn('Caught exception when logging: %s' % e)
167 
168 
169  def add_task_ros_srv(self, req):
170  """
171  Adds a task into the task execution framework.
172  """
173  self.service_lock.acquire()
174  req.task.task_id = self.task_counter
175  self.task_counter += 1
176  self.add_tasks([req.task])
177  self.log_task_event(req.task, TaskEvent.ADDED, rospy.get_rostime())
178  self.service_lock.release()
179  return req.task.task_id
180  add_task_ros_srv.type=AddTask
181 
182  def get_active_task_ros_srv(self, req):
183  """
184  Gets the currently executing task.
185  """
186  return [self.active_task]
187  get_active_task_ros_srv.type=GetActiveTask
188 
189  def add_tasks_ros_srv(self, req):
190  """
191  Adds a task into the task execution framework.
192  """
193  self.service_lock.acquire()
194  task_ids = []
195  for task in req.tasks:
196  task.task_id = self.task_counter
197  task_ids.append(task.task_id)
198  self.task_counter += 1
199 
200  self.add_tasks(req.tasks)
201  self.log_task_events(req.tasks, TaskEvent.ADDED, rospy.get_rostime())
202  self.service_lock.release()
203  return [task_ids]
204  add_tasks_ros_srv.type=AddTasks
205 
206  def demand_task_ros_srv(self, req):
207  """
208  Demand a the task from the execution framework.
209  """
210  try:
211  self.service_lock.acquire()
212 
213  if self.active_task is not None and not self.is_task_interruptible(self.active_task):
214  return [0, False, self.active_task_completes_by - rospy.get_rostime()]
215 
216  req.task.task_id = self.task_counter
217  self.task_counter += 1
218  req.task.execution_time = rospy.get_rostime()
219 
220 
221 
222  # stop anything else
223  if self.active_task is not None:
224  self.pause_execution()
225  self.executing = False
226  self.cancel_active_task()
227 
228  # and inform implementation to let it take action
229  self.task_demanded(req.task, self.active_task)
230 
231  if not self.executing:
232  self.start_execution()
233 
234  self.log_task_event(req.task, TaskEvent.DEMANDED, rospy.get_rostime())
235  return [req.task.task_id, True, rospy.Duration(0)]
236  finally:
237  self.service_lock.release()
238 
239 
240  demand_task_ros_srv.type=DemandTask
241 
242  def cancel_task_ros_srv(self, req):
243  """ Cancel the speficially requested task """
244 
245  self.service_lock.acquire()
246 
247  cancelled = False
248  if self.active_task is not None and self.active_task.task_id == req.task_id:
249  self.log_task_event(self.active_task, TaskEvent.CANCELLED_MANUALLY, rospy.get_rostime())
250  self.cancel_active_task()
251  cancelled = True
252  else:
253  self.log_task_event(Task(task_id=req.task_id), TaskEvent.CANCELLED_MANUALLY, rospy.get_rostime())
254  cancelled = self.cancel_task(req.task_id)
255 
256  self.service_lock.release()
257 
258  return cancelled
259  cancel_task_ros_srv.type = CancelTask
260 
261  def clear_schedule_ros_srv(self, req):
262  """ Remove all scheduled tasks and active task """
263 
264  self.service_lock.acquire()
265 
266  self.clear_schedule()
267 
268  if self.active_task is not None:
269  self.cancel_active_task()
270 
271  self.service_lock.release()
272 
273  return EmptyResponse()
274 
275  clear_schedule_ros_srv.type = Empty
276 
278  return self.executing
279  get_execution_status_ros_srv.type = GetExecutionStatus
280 
282 
283  self.service_lock.acquire()
284 
285  success = False
286 
287  remaining_time = rospy.Duration(0)
288 
289 
290  if self.executing and not req.status:
291  rospy.logdebug("Pausing execution")
292 
293  previous = self.executing
294 
295 
296  if self.is_task_interruptible(self.active_task):
297  self.pause_execution()
298  self.executing = False
299  success = True
300  else:
301  remaining_time = self.active_task_completes_by - rospy.get_rostime()
302 
303  elif not self.executing and req.status:
304  rospy.logdebug("Starting execution")
305 
306  previous = self.executing
307  self.executing = True
308 
309  self.start_execution()
310  success = True
311  else:
312  previous = self.executing
313  success = True
314 
315  self.service_lock.release()
316 
317  return [previous, success, remaining_time]
318  set_execution_status_ros_srv.type = SetExecutionStatus
319 
320 
321  def prepare_task(self, task):
322 
323  self.active_task = task
324 
325  now = rospy.get_rostime()
326 
327  expected_nav_duration = rospy.Duration(0)
328  if self.active_task.start_node_id != '':
329  expected_nav_duration = self.expected_navigation_duration(task)
330  rospy.loginfo('expected_nav_duration: %s' % expected_nav_duration.to_sec())
331 
332  total_task_duration = expected_nav_duration + task.max_duration
333 
334  self.active_task_completes_by = now + total_task_duration
335 
336  return expected_nav_duration
337 
338 
339  def instantiate_from_string_pair(self, string_pair):
340  if len(string_pair.first) == 0:
341  return string_pair.second
342  elif string_pair.first == Task.INT_TYPE:
343  return int(string_pair.second)
344  elif string_pair.first == Task.FLOAT_TYPE:
345  return float(string_pair.second)
346  elif string_pair.first == Task.TIME_TYPE:
347  return rospy.Time.from_sec(float(string_pair.second))
348  elif string_pair.first == Task.DURATION_TYPE:
349  return rospy.Duration.from_sec(float(string_pair.second))
350  elif string_pair.first == Task.BOOL_TYPE:
351  return string_pair.second == 'True'
352  else:
353  msg = self.msg_store.query_id(string_pair.second, string_pair.first)[0]
354  # print msg
355  if msg == None:
356  raise RuntimeError("No matching object for id %s of type %s" % (string_pair.second, string_pair.first))
357  return msg
358 
359  def get_arguments(self, argument_list):
360  return map(self.instantiate_from_string_pair, argument_list)
361 
362 
363  def is_task_interruptible(self, task):
364  if task is None:
365  rospy.logwarn('is_task_interruptible passed a None')
366  return False
367 
368  try:
369  srv_name = task.action + '_is_interruptible'
370  rospy.wait_for_service(srv_name, timeout=1)
371  is_interruptible = rospy.ServiceProxy(srv_name, IsTaskInterruptible)
372  return is_interruptible().status
373  except rospy.ROSException as exc:
374  rospy.logdebug('%s service does not exist, treating as interruptible')
375  return True
376  except rospy.ServiceException as exc:
377  rospy.logwarn(exc.message)
378  return True
379 
380 
381 
382 
383 
384 # NOTE THIS AbstractTaskExecutor IS NOT USED BY THE CURRENT IMPLEMENTATION
385 
387 
388  def __init__(self):
389  super( AbstractTaskExecutor, self ).__init__()
390 
391  self.nav_client = None
392  self.action_client = None
393 
394  def execute_task(self, task):
395 
396  self.log_task_event(task, TaskEvent.TASK_STARTED, rospy.get_rostime())
397 
398  expected_nav_duration = self.prepare_task(task)
399 
400  if self.active_task.start_node_id != '':
401  self.start_task_navigation(expected_nav_duration)
402  elif self.active_task.action != '':
403  self.start_task_action()
404  else:
405  warning = 'Provided task had no start_node_id or action %s' % self.active_task
406  rospy.logwarn(warning)
407  self.log_task_event(self.active_task, TaskEvent.TASK_COMPLETE, now, warning)
408 
409  self.active_task = None
410 
411 
412 
413 
415  self.cancel_active_task_cb(None)
416 
417  def cancel_active_task_cb(self, event):
418  """ Cancel any active nav or task action """
419  if event is not None:
420  rospy.logwarn("Cancelling task that has overrun %s" % self.active_task.task_id)
421  else:
422  rospy.logwarn("Cancelling task %s" % self.active_task.task_id)
423 
424  now = rospy.get_rostime()
425  cancelled = self.active_task
426  self.active_task = None
427 
428  if self.nav_client is not None:
429  self.nav_timeout_timer.shutdown()
430  if self.nav_client.get_state() == GoalStatus.ACTIVE:
431  self.nav_client.cancel_goal()
432  self.nav_client = None
433  self.log_task_event(cancelled, TaskEvent.NAVIGATION_PREEMPTED, now)
434 
435 
436  if self.action_client is not None:
437  self.action_timeout_timer.shutdown()
438  if self.action_client.get_state() == GoalStatus.ACTIVE:
439  self.action_client.cancel_goal()
440  self.action_client = None
441  self.log_task_event(cancelled, TaskEvent.EXECUTION_PREEMPTED, now)
442 
443 
444  self.task_failed(cancelled)
445 
446  if event is not None:
447  self.log_task_event(cancelled, TaskEvent.CANCELLED_MANUALLY, now)
448 
449  self.log_task_event(cancelled, TaskEvent.TASK_FINISHED, now)
450 
451  def cancel_navigation(self, event):
452  """ Called on nav timeout """
453  rospy.logwarn("Cancelling navigation that has overrun for task %s" % self.active_task.task_id)
454 
455  # cancel navigation
456  if self.nav_client is not None:
457  if self.nav_client.get_state() == GoalStatus.ACTIVE:
458  self.nav_client.cancel_goal()
459  self.nav_client = None
460 
461  # fail task
462  now = rospy.get_rostime()
463  cancelled = self.active_task
464  self.active_task = None
465 
466  self.log_task_event(cancelled, TaskEvent.NAVIGATION_PREEMPTED, now)
467  self.task_failed(cancelled)
468  self.log_task_event(cancelled, TaskEvent.TASK_FINISHED, now)
469  else:
470  rospy.logwarn('Tried to cancel a navigation action that was None')
471 
472  def start_task_action(self):
473 
474  try:
475  rospy.loginfo('Starting to execute %s' % self.active_task.action)
476  self.log_task_event(self.active_task, TaskEvent.EXECUTION_STARTED, rospy.get_rostime())
477 
478  (action_string, goal_string) = self.get_task_types(self.active_task.action)
479  action_clz = dc_util.load_class(dc_util.type_to_class_string(action_string))
480  goal_clz = dc_util.load_class(dc_util.type_to_class_string(goal_string))
481 
482  self.action_client = actionlib.SimpleActionClient(self.active_task.action, action_clz)
483  self.action_client.wait_for_server()
484 
485  argument_list = self.get_arguments(self.active_task.arguments)
486 
487  # print "ARGS:"
488  # print argument_list
489 
490  goal = goal_clz(*argument_list)
491 
492  rospy.logdebug('Sending goal to %s' % self.active_task.action)
493  self.action_client.send_goal(goal, self.task_execution_complete_cb)
494 
495  wiggle_room = rospy.Duration(5)
496  # start a timer to kill off tasks that overrun
497  self.action_timeout_timer = rospy.Timer(self.active_task.max_duration + wiggle_room, self.cancel_active_task_cb, oneshot=True)
498 
499  except Exception, e:
500  rospy.logwarn('Exception in start_task_action: %s', e)
501  # do bookkeeping before causing update
502  completed = self.active_task
503  self.active_task = None
504  self.task_failed(completed)
505 
506  def start_task_navigation(self, expected_duration):
507  # always reconnect in case of issues before
508  self.nav_client = actionlib.SimpleActionClient('topological_navigation', GotoNodeAction)
509  self.nav_client.wait_for_server()
510  rospy.logdebug("Created action client")
511 
512  # start a timer to kill off tasks that overrun
513  self.nav_timeout_timer = rospy.Timer(expected_duration, self.cancel_navigation, oneshot=True)
514  # self.nav_timeout_timer = rospy.Timer(rospy.Duration(360), self.cancel_navigation, oneshot=True)
515 
516  nav_goal = GotoNodeGoal(target = self.active_task.start_node_id)
517 
518  self.log_task_event(self.active_task, TaskEvent.NAVIGATION_STARTED, rospy.get_rostime())
519 
520  self.nav_client.send_goal(nav_goal, self.navigation_complete_cb)
521  rospy.loginfo("navigating to %s for action %s" % (self.active_task.start_node_id, self.active_task.action))
522 
523 
524 
525  def navigation_complete_cb(self, goal_status, result):
526 
527  # if self.nav_client was set to None then navigation was cancelled and we don't care about the response to the call back
528 
529  if self.nav_client is not None:
530 
531  now = rospy.get_rostime()
532  # stop the countdown
533  self.nav_timeout_timer.shutdown()
534 
535  if self.nav_client.get_state() == GoalStatus.SUCCEEDED:
536 
537 
538  rospy.loginfo('Navigation to %s succeeded' % self.active_task.start_node_id)
539  self.log_task_event(self.active_task, TaskEvent.NAVIGATION_SUCCEEDED, now)
540 
541  if self.active_task.action != '':
542  self.start_task_action()
543  else:
544  # do bookkeeping before causing update
545  completed = self.active_task
546  self.active_task = None
547 
548  self.task_succeeded(completed)
549  else:
550  if self.nav_client.get_state() == GoalStatus.PREEMPTED:
551  rospy.loginfo('Navigation to %s timed out' % self.active_task.start_node_id)
552  self.log_task_event(self.active_task, TaskEvent.NAVIGATION_PREEMPTED, now)
553  else:
554  rospy.loginfo('Navigation to %s failed' % self.active_task.start_node_id)
555  self.log_task_event(self.active_task, TaskEvent.NAVIGATION_FAILED, now)
556 
557  # do bookkeeping before causing update
558  completed = self.active_task
559  self.active_task = None
560 
561  self.task_failed(completed)
562  self.log_task_event(completed, TaskEvent.TASK_FINISHED, now)
563 
564 
565 
566 
567  def task_execution_complete_cb(self, goal_status, result):
568 
569  if self.action_client is not None:
570  self.action_timeout_timer.shutdown()
571  now = rospy.get_rostime()
572 
573 
574  # do bookkeeping before causing update
575  completed = self.active_task
576 
577  self.active_task = None
578 
579  if self.action_client.get_state() == GoalStatus.SUCCEEDED:
580  rospy.loginfo('Execution of task %s succeeded' % completed.task_id)
581  self.log_task_event(completed, TaskEvent.EXECUTION_SUCCEEDED, now)
582  self.task_succeeded(completed)
583  else:
584  if self.action_client.get_state() == GoalStatus.PREEMPTED:
585  self.log_task_event(completed, TaskEvent.EXECUTION_PREEMPTED, now)
586  else:
587  self.log_task_event(completed, TaskEvent.EXECUTION_FAILED, now)
588  self.task_failed(completed)
589 
590  self.log_task_event(completed, TaskEvent.TASK_FINISHED, now)
591 
592 
593 
594 
595 
596 
597 
def instantiate_from_string_pair(self, string_pair)
def start_task_navigation(self, expected_duration)
def update_topological_closest_node(self, node_name)
def task_execution_complete_cb(self, goal_status, result)
def task_demanded(self, demanded_task, currently_active_task)
def navigation_complete_cb(self, goal_status, result)


task_executor
Author(s): Nick Hawes
autogenerated on Tue Mar 17 2015 20:08:13