4 from Queue
import Queue, Empty
8 from threading
import Thread
10 from operator
import attrgetter
17 rospy.init_node(
"task_executor", log_level=rospy.INFO)
20 super( ScheduledTaskExecutor, self ).
__init__()
23 schedule_srv_name =
'get_schedule'
24 rospy.logdebug(
'Waiting for %s service' % schedule_srv_name)
25 rospy.wait_for_service(schedule_srv_name)
26 self.
schedule_srv = rospy.ServiceProxy(schedule_srv_name, GetSchedule)
46 """ Called when overall execution should (re)start """
54 self.scheduling_thread.start()
55 self.execution_thread.start()
61 rospy.loginfo(
'Pausing execution')
65 previously_scheduled = self.execution_schedule.get_schedulable_tasks()
66 current_task = self.execution_schedule.get_current_task()
69 self.execution_schedule.clear_schedule()
71 if current_task
is not None:
72 rospy.loginfo(
'Interrupting active task')
81 self.unscheduled_tasks.put(current_task)
83 for task
in previously_scheduled:
84 self.unscheduled_tasks.put(task)
94 if task.start_after.is_zero():
95 task.start_after = rospy.get_rostime()
97 if task.end_before.is_zero():
102 """ Called with new tasks for the executor """
107 if task.end_node_id ==
'':
108 task.end_node_id = task.start_node_id
111 self.unscheduled_tasks.put(task)
114 """ Called when the given task has completed execution """
116 self.execution_schedule.task_complete(task)
120 """ Useful for cases where you don't know what the effect of cancellation will be """
125 while self.execution_schedule.get_current_task()
is not None and wait_count < wait_threshold
and not rospy.is_shutdown():
127 rospy.loginfo(
'Waiting for previous task to finish')
131 if self.execution_schedule.get_current_task()
is not None:
132 rospy.logwarn(
'Previous task did not terminate nicely. Everything from here on in could be dicey.')
133 self.execution_schedule.current_task =
None
137 """ Called when a task is demanded. self.active_task is the demanded task (and is being executed) and previously_active_task was the task that was being executed (which could be None) """
139 if demanded_task.end_node_id ==
'':
140 demanded_task.end_node_id = demanded_task.start_node_id
142 rospy.loginfo(
'Task %s, %s demanded from scheduler' % (demanded_task.task_id, demanded_task.action))
145 previously_scheduled = self.execution_schedule.get_schedulable_tasks()
148 self.execution_schedule.clear_schedule()
155 self.execution_schedule.add_new_tasks([demanded_task])
157 self.execution_schedule.set_schedule([demanded_task])
161 if len(previously_scheduled) > 0:
162 success, added = self.
try_schedule(previously_scheduled)
164 rospy.loginfo(
'Was able to reinstate %s/%s tasks after demand' % (len(added), len(previously_scheduled)))
165 if currently_active_task !=
None:
166 success, added = self.
try_schedule([currently_active_task])
167 if success
and len(added) > 0:
168 rospy.loginfo(
'Was also able to reinstate previously active task after demand')
170 rospy.loginfo(
'Was not able to reinstate previously active task after demand (but other tasks ok)')
172 rospy.loginfo(
'Was NOT able to reinstate tasks after demand')
177 Creates the matrix of durations between waypoints needed as input to the scheuler.
178 Output is a DurationMatrix encoding duration[i][j] where this is the duration expected for travelling between the end of the ith task in tasks and the start of the jth element.
181 start_nodes = set([task.end_node_id
for task
in tasks])
182 end_nodes = set([task.start_node_id
for task
in tasks])
186 for start
in start_nodes:
187 durations[start] = dict()
188 for end
in end_nodes:
193 dm = DurationMatrix()
195 dm.durations.append(DurationList())
197 dm.durations[-1].durations.append(durations[task_i.end_node_id][task_j.start_node_id])
207 Calls scheduler. Reorders the list of tasks in execution order with their execution times set.
214 if len(resp.task_order) > 0:
218 for (task_id, start_time)
in zip(resp.task_order, resp.execution_times):
220 task_times[task_id] = start_time
226 task.execution_time = task_times[task.task_id]
234 tasks.sort(key=attrgetter(
'execution_time'))
238 rospy.loginfo(
'No schedule found')
245 exe_status = ExecutionStatus()
247 current = self.execution_schedule.get_current_task()
250 exe_status.currently_executing =
True
251 exe_status.execution_queue.append(current)
253 exe_status.execution_queue.extend(self.execution_schedule.get_execution_queue())
255 self.schedule_publisher.publish(exe_status)
262 if start_after + task.max_duration <= task.end_before:
270 bounded_tasks.append(task)
272 dropped_tasks.append(task)
274 return bounded_tasks, dropped_tasks
279 lower_bound = rospy.get_rostime()
280 if self.execution_schedule.get_current_task()
is not None:
282 rospy.logwarn(
'lower bound %s' % lower_bound)
286 if len(bounded_tasks) < len(additional_tasks):
287 rospy.logwarn(
'Dropped %s additional tasks which are no longer executable' % (len(additional_tasks) - len(bounded_tasks)))
288 additional_tasks = bounded_tasks
292 schedulable_tasks = self.execution_schedule.get_schedulable_tasks()
295 if len(bounded_tasks) < len(schedulable_tasks):
296 rospy.logwarn(
'Dropped %s existing tasks which are no longer executable' % (len(schedulable_tasks) - len(bounded_tasks)))
299 self.execution_schedule.remove_tasks(dropped_tasks)
300 schedulable_tasks = bounded_tasks
306 to_schedule.extend(schedulable_tasks)
308 to_schedule.extend(additional_tasks)
312 current_task = copy.deepcopy(self.execution_schedule.get_current_task())
313 if current_task
is not None:
314 current_id = current_task.task_id
315 rospy.loginfo(
'Including current task %s in scheduling problem' % current_id)
317 current_task.max_duration = rospy.Time(1)
318 current_task.end_before = lower_bound
319 current_task.start_after = lower_bound - rospy.Time(2)
320 to_schedule.append(current_task)
325 self.execution_schedule.add_new_tasks(additional_tasks)
328 to_schedule = [t
for t
in to_schedule
if t.task_id != current_id]
331 self.execution_schedule.set_schedule(to_schedule)
333 rospy.loginfo(
'Added %s tasks into the schedule to get total of %s' % (len(additional_tasks), self.execution_schedule.get_execution_queue_length()))
334 return True, additional_tasks
337 rospy.logwarn(
'Discarding %s unschedulable tasks, retaining %s' % (len(additional_tasks), self.execution_schedule.get_execution_queue_length()))
343 while not rospy.is_shutdown()
and self.
running:
348 unscheduled.append(self.unscheduled_tasks.get(
True, loopSecs))
352 unscheduled.append(self.unscheduled_tasks.get(
False))
357 rospy.loginfo(
'Got a further %s tasks to schedule' % len(unscheduled))
360 rospy.loginfo(
'Putting %s tasks to schedule later' % len(unscheduled))
361 for task
in unscheduled:
362 self.unscheduled_tasks.put(task)
374 while not rospy.is_shutdown()
and self.
running:
377 if(self.execution_schedule.wait_for_execution_change(wait_time)):
379 next_task = self.execution_schedule.get_current_task()
380 rospy.loginfo(
'Next task to execute: %s' % next_task.task_id)
381 if next_task
is not None:
384 rospy.logwarn(
'Next task was None')
390 """ Called when a request is received to cancel a task. The currently executing one is checked elsewhere. """
391 if self.execution_schedule.remove_task_with_id(task_id):
399 """ Called to clear all tasks from schedule, with the exception of the currently executing one. """
400 self.execution_schedule.clear_schedule()
404 if __name__ ==
'__main__':
def bound_tasks_by_start_window(self, tasks, start_after)
def get_active_task_completion_time(self)
def cancel_task(self, task_id)
def get_navigation_duration(self, start, end)
def add_tasks(self, tasks)
def get_duration_matrix(self, tasks)
def fill_times(self, task)
def task_demanded(self, demanded_task, currently_active_task)
def get_default_end_time(self, start_time)
def pause_execution(self)
def start_execution(self)
def advertise_services(self)
def publish_schedule(self)
def task_complete(self, task)
def wait_for_task_to_complete(self)
def execute_task(self, task)
def cancel_active_task(self)
def try_schedule(self, additional_tasks)