7 import mongodb_store.util
as dc_util
9 from actionlib_msgs.msg
import GoalStatus
10 from geometry_msgs.msg
import Pose, Point, Quaternion
11 from mongodb_store.message_store
import MessageStoreProxy
12 from topological_navigation.msg
import GotoNodeAction, GotoNodeGoal
13 from strands_navigation_msgs.srv
import EstimateTravelTime
14 from std_srvs.srv
import Empty, EmptyResponse
15 from std_msgs.msg
import String
22 """ Called with new tasks for the executor """
26 """ Called when overall execution should (re)start """
30 """ Called when overall execution should pause """
34 """ Called when the given task has completed execution """
38 """ Called when the given task has completed execution successfully """
39 self.task_complete(task)
42 """ Called when the given task has completed execution but failed """
47 """ Called when a task is demanded. self.active_task is the demanded task (and is being executed) and previously_active_task was the task that was being executed (which could be None) """
51 """ Called when a request is received to cancel a task. The currently executing one is checked elsewhere. """
55 """ Called to clear all tasks from schedule, with the exception of the currently executing one. """
63 expected_time_srv_name =
'topological_navigation/travel_time_estimator'
64 rospy.loginfo(
'Waiting for %s' % expected_time_srv_name)
65 rospy.wait_for_service(expected_time_srv_name)
66 rospy.loginfo(
'... and got %s' % expected_time_srv_name)
67 self.
expected_time = rospy.ServiceProxy(expected_time_srv_name, EstimateTravelTime)
97 Adverstise ROS services. Only call at the end of constructor to avoid calls during construction.
100 for attr
in dir(self):
101 if attr.endswith(
"_ros_srv"):
102 service=getattr(self, attr)
103 rospy.Service(
"/task_executor/" + attr[:-8], service.type, service)
107 Returns the type string related to the action string provided.
109 rospy.logdebug(
"task action provided: %s", action_name)
110 topics = rospy.get_published_topics(action_name)
111 for [topic, type]
in topics:
112 if topic.endswith(
'feedback'):
113 return (type[:-8], type[:-14] +
'Goal')
114 raise RuntimeError(
'No action associated with topic: %s'% action_name)
119 if task.start_node_id ==
'':
120 return rospy.Duration(10)
131 self.expected_time_lock.acquire()
132 if start ==
'' or end ==
'':
134 return rospy.Duration(10)
139 return rospy.Duration(max(et.travel_time.to_sec() * 20, 10))
142 rospy.logwarn(
'Caught exception when getting expected time: %s' % e)
143 return rospy.Duration(10)
145 self.expected_time_lock.release()
151 te = TaskEvent(task=task, event=event, time=time, description=description)
154 self.task_event_publisher.publish(te)
155 self.logging_msg_store.insert(te)
157 rospy.logwarn(
'Caught exception when logging: %s' % e)
161 te = TaskEvent(task=task, event=event, time=time, description=description)
163 self.task_event_publisher.publish(te)
164 self.logging_msg_store.insert(te)
166 rospy.logwarn(
'Caught exception when logging: %s' % e)
171 Adds a task into the task execution framework.
173 self.service_lock.acquire()
177 self.
log_task_event(req.task, TaskEvent.ADDED, rospy.get_rostime())
178 self.service_lock.release()
179 return req.task.task_id
180 add_task_ros_srv.type=AddTask
184 Gets the currently executing task.
187 get_active_task_ros_srv.type=GetActiveTask
191 Adds a task into the task execution framework.
193 self.service_lock.acquire()
195 for task
in req.tasks:
197 task_ids.append(task.task_id)
202 self.service_lock.release()
204 add_tasks_ros_srv.type=AddTasks
208 Demand a the task from the execution framework.
211 self.service_lock.acquire()
218 req.task.execution_time = rospy.get_rostime()
226 self.cancel_active_task()
234 self.
log_task_event(req.task, TaskEvent.DEMANDED, rospy.get_rostime())
235 return [req.task.task_id,
True, rospy.Duration(0)]
237 self.service_lock.release()
240 demand_task_ros_srv.type=DemandTask
243 """ Cancel the speficially requested task """
245 self.service_lock.acquire()
248 if self.
active_task is not None and self.active_task.task_id == req.task_id:
250 self.cancel_active_task()
253 self.
log_task_event(Task(task_id=req.task_id), TaskEvent.CANCELLED_MANUALLY, rospy.get_rostime())
256 self.service_lock.release()
259 cancel_task_ros_srv.type = CancelTask
262 """ Remove all scheduled tasks and active task """
264 self.service_lock.acquire()
269 self.cancel_active_task()
271 self.service_lock.release()
273 return EmptyResponse()
275 clear_schedule_ros_srv.type = Empty
279 get_execution_status_ros_srv.type = GetExecutionStatus
283 self.service_lock.acquire()
287 remaining_time = rospy.Duration(0)
291 rospy.logdebug(
"Pausing execution")
304 rospy.logdebug(
"Starting execution")
315 self.service_lock.release()
317 return [previous, success, remaining_time]
318 set_execution_status_ros_srv.type = SetExecutionStatus
325 now = rospy.get_rostime()
327 expected_nav_duration = rospy.Duration(0)
328 if self.active_task.start_node_id !=
'':
330 rospy.loginfo(
'expected_nav_duration: %s' % expected_nav_duration.to_sec())
332 total_task_duration = expected_nav_duration + task.max_duration
336 return expected_nav_duration
340 if len(string_pair.first) == 0:
341 return string_pair.second
342 elif string_pair.first == Task.INT_TYPE:
343 return int(string_pair.second)
344 elif string_pair.first == Task.FLOAT_TYPE:
345 return float(string_pair.second)
346 elif string_pair.first == Task.TIME_TYPE:
347 return rospy.Time.from_sec(float(string_pair.second))
348 elif string_pair.first == Task.DURATION_TYPE:
349 return rospy.Duration.from_sec(float(string_pair.second))
350 elif string_pair.first == Task.BOOL_TYPE:
351 return string_pair.second ==
'True'
353 msg = self.msg_store.query_id(string_pair.second, string_pair.first)[0]
356 raise RuntimeError(
"No matching object for id %s of type %s" % (string_pair.second, string_pair.first))
365 rospy.logwarn(
'is_task_interruptible passed a None')
369 srv_name = task.action +
'_is_interruptible'
370 rospy.wait_for_service(srv_name, timeout=1)
371 is_interruptible = rospy.ServiceProxy(srv_name, IsTaskInterruptible)
372 return is_interruptible().status
373 except rospy.ROSException
as exc:
374 rospy.logdebug(
'%s service does not exist, treating as interruptible')
376 except rospy.ServiceException
as exc:
377 rospy.logwarn(exc.message)
389 super( AbstractTaskExecutor, self ).
__init__()
396 self.
log_task_event(task, TaskEvent.TASK_STARTED, rospy.get_rostime())
400 if self.active_task.start_node_id !=
'':
402 elif self.active_task.action !=
'':
405 warning =
'Provided task had no start_node_id or action %s' % self.
active_task
406 rospy.logwarn(warning)
418 """ Cancel any active nav or task action """
419 if event
is not None:
420 rospy.logwarn(
"Cancelling task that has overrun %s" % self.active_task.task_id)
422 rospy.logwarn(
"Cancelling task %s" % self.active_task.task_id)
424 now = rospy.get_rostime()
429 self.nav_timeout_timer.shutdown()
430 if self.nav_client.get_state() == GoalStatus.ACTIVE:
431 self.nav_client.cancel_goal()
433 self.
log_task_event(cancelled, TaskEvent.NAVIGATION_PREEMPTED, now)
437 self.action_timeout_timer.shutdown()
438 if self.action_client.get_state() == GoalStatus.ACTIVE:
439 self.action_client.cancel_goal()
441 self.
log_task_event(cancelled, TaskEvent.EXECUTION_PREEMPTED, now)
446 if event
is not None:
452 """ Called on nav timeout """
453 rospy.logwarn(
"Cancelling navigation that has overrun for task %s" % self.active_task.task_id)
457 if self.nav_client.get_state() == GoalStatus.ACTIVE:
458 self.nav_client.cancel_goal()
462 now = rospy.get_rostime()
466 self.
log_task_event(cancelled, TaskEvent.NAVIGATION_PREEMPTED, now)
470 rospy.logwarn(
'Tried to cancel a navigation action that was None')
475 rospy.loginfo(
'Starting to execute %s' % self.active_task.action)
478 (action_string, goal_string) = self.
get_task_types(self.active_task.action)
479 action_clz = dc_util.load_class(dc_util.type_to_class_string(action_string))
480 goal_clz = dc_util.load_class(dc_util.type_to_class_string(goal_string))
482 self.
action_client = actionlib.SimpleActionClient(self.active_task.action, action_clz)
483 self.action_client.wait_for_server()
485 argument_list = self.
get_arguments(self.active_task.arguments)
490 goal = goal_clz(*argument_list)
492 rospy.logdebug(
'Sending goal to %s' % self.active_task.action)
495 wiggle_room = rospy.Duration(5)
500 rospy.logwarn(
'Exception in start_task_action: %s', e)
508 self.
nav_client = actionlib.SimpleActionClient(
'topological_navigation', GotoNodeAction)
509 self.nav_client.wait_for_server()
510 rospy.logdebug(
"Created action client")
516 nav_goal = GotoNodeGoal(target = self.active_task.start_node_id)
521 rospy.loginfo(
"navigating to %s for action %s" % (self.active_task.start_node_id, self.active_task.action))
531 now = rospy.get_rostime()
533 self.nav_timeout_timer.shutdown()
535 if self.nav_client.get_state() == GoalStatus.SUCCEEDED:
538 rospy.loginfo(
'Navigation to %s succeeded' % self.active_task.start_node_id)
541 if self.active_task.action !=
'':
550 if self.nav_client.get_state() == GoalStatus.PREEMPTED:
551 rospy.loginfo(
'Navigation to %s timed out' % self.active_task.start_node_id)
554 rospy.loginfo(
'Navigation to %s failed' % self.active_task.start_node_id)
570 self.action_timeout_timer.shutdown()
571 now = rospy.get_rostime()
579 if self.action_client.get_state() == GoalStatus.SUCCEEDED:
580 rospy.loginfo(
'Execution of task %s succeeded' % completed.task_id)
581 self.
log_task_event(completed, TaskEvent.EXECUTION_SUCCEEDED, now)
584 if self.action_client.get_state() == GoalStatus.PREEMPTED:
585 self.
log_task_event(completed, TaskEvent.EXECUTION_PREEMPTED, now)
def get_active_task_completion_time(self)
def get_navigation_duration(self, start, end)
def update_topological_location(self, node_name)
def expected_navigation_duration(self, task)
def add_tasks_ros_srv(self, req)
def instantiate_from_string_pair(self, string_pair)
def start_task_navigation(self, expected_duration)
def task_complete(self, task)
def task_failed(self, task)
def update_topological_closest_node(self, node_name)
def task_succeeded(self, task)
def cancel_active_task_cb(self, event)
def add_task_ros_srv(self, req)
def is_task_interruptible(self, task)
def demand_task_ros_srv(self, req)
def advertise_services(self)
def prepare_task(self, task)
def get_arguments(self, argument_list)
def start_execution(self)
def task_execution_complete_cb(self, goal_status, result)
def clear_schedule_ros_srv(self, req)
def add_tasks(self, tasks)
def task_demanded(self, demanded_task, currently_active_task)
def get_task_types(self, action_name)
def cancel_navigation(self, event)
def get_active_task_ros_srv(self, req)
def get_execution_status_ros_srv(self, req)
def cancel_active_task(self)
def start_task_action(self)
def execute_task(self, task)
def cancel_task(self, task_id)
def cancel_task_ros_srv(self, req)
def pause_execution(self)
def navigation_complete_cb(self, goal_status, result)
def set_execution_status_ros_srv(self, req)