scheduled_task_executor.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import rospy
4 from Queue import Queue, Empty
5 from strands_executive_msgs.msg import Task, ExecutionStatus, DurationMatrix, DurationList
6 from strands_executive_msgs.srv import GetSchedule
7 from task_executor.sm_base_executor import AbstractTaskExecutor
8 from threading import Thread
9 from task_executor.execution_schedule import ExecutionSchedule
10 from operator import attrgetter
11 import copy
12 
14 
15  def __init__(self):
16  # init node first, must be done before call to super init for service advertising to work
17  rospy.init_node("task_executor", log_level=rospy.INFO)
18 
19  # init superclasses
20  super( ScheduledTaskExecutor, self ).__init__()
21 
22  # service for scheduler
23  schedule_srv_name = 'get_schedule'
24  rospy.logdebug('Waiting for %s service' % schedule_srv_name)
25  rospy.wait_for_service(schedule_srv_name)
26  self.schedule_srv = rospy.ServiceProxy(schedule_srv_name, GetSchedule)
27 
28  # topic on which current schedule is broadcast
29  self.schedule_publisher = rospy.Publisher('/current_schedule', ExecutionStatus, queue_size=1)
30 
31  # defaults for setting the ends of tasks
32  self.default_duration = rospy.Duration.from_sec(60 * 60 * 4)
33 
34  # storage for tasks which have been added but not considered
35  self.unscheduled_tasks = Queue()
36 
37  # data structure that manages tasks
39 
40 
41  self.running = False
42 
43  self.advertise_services()
44 
45  def start_execution(self):
46  """ Called when overall execution should (re)start """
47 
48  if not self.running:
49  self.running = True
50 
51  self.scheduling_thread = Thread(target=self.schedule_tasks)
52  self.execution_thread = Thread(target=self.execute_tasks)
53 
54  self.scheduling_thread.start()
55  self.execution_thread.start()
56 
57 
58  def pause_execution(self):
59 
60  if self.running:
61  rospy.loginfo('Pausing execution')
62  self.running = False
63 
64  # save the tasks we had previously
65  previously_scheduled = self.execution_schedule.get_schedulable_tasks()
66  current_task = self.execution_schedule.get_current_task()
67 
68  # clear all tasks that were previously scheduled
69  self.execution_schedule.clear_schedule()
70 
71  if current_task is not None:
72  rospy.loginfo('Interrupting active task')
73 
74  # stop execution
75  self.cancel_active_task()
76 
77  # wait for current task to end
79 
80  # add the interrupted task back to the queue for scheduling later
81  self.unscheduled_tasks.put(current_task)
82 
83  for task in previously_scheduled:
84  self.unscheduled_tasks.put(task)
85 
86 
87 
88 
89  def get_default_end_time(self, start_time):
90  return start_time + self.default_duration
91 
92 
93  def fill_times(self, task):
94  if task.start_after.is_zero():
95  task.start_after = rospy.get_rostime()
96 
97  if task.end_before.is_zero():
98  task.end_before = self.get_default_end_time(task.start_after)
99 
100 
101  def add_tasks(self, tasks):
102  """ Called with new tasks for the executor """
103 
104  for task in tasks:
105  self.fill_times(task)
106  # if no end node, set it to the same as the start
107  if task.end_node_id == '':
108  task.end_node_id = task.start_node_id
109 
110  for task in tasks:
111  self.unscheduled_tasks.put(task)
112 
113  def task_complete(self, task):
114  """ Called when the given task has completed execution """
115  # pass signal to schedule
116  self.execution_schedule.task_complete(task)
117 
118 
120  """ Useful for cases where you don't know what the effect of cancellation will be """
121 
122  wait_count = 0
123  # a similar theshold is used in sm_base_executor to wait for termination of active task
124  wait_threshold = 31
125  while self.execution_schedule.get_current_task() is not None and wait_count < wait_threshold and not rospy.is_shutdown():
126  rospy.sleep(1)
127  rospy.loginfo('Waiting for previous task to finish')
128  wait_count += 1
129 
130  # if it's still not None then we got bored of waiting
131  if self.execution_schedule.get_current_task() is not None:
132  rospy.logwarn('Previous task did not terminate nicely. Everything from here on in could be dicey.')
133  self.execution_schedule.current_task = None
134 
135 
136  def task_demanded(self, demanded_task, currently_active_task):
137  """ Called when a task is demanded. self.active_task is the demanded task (and is being executed) and previously_active_task was the task that was being executed (which could be None) """
138 
139  if demanded_task.end_node_id == '':
140  demanded_task.end_node_id = demanded_task.start_node_id
141 
142  rospy.loginfo('Task %s, %s demanded from scheduler' % (demanded_task.task_id, demanded_task.action))
143 
144  # save the tasks we had previously
145  previously_scheduled = self.execution_schedule.get_schedulable_tasks()
146 
147  # clear all tasks that were previously scheduled
148  self.execution_schedule.clear_schedule()
149 
150  # wait until the demanded task has been taken off for execution, otherwise we run into problems with get_schedulable_tasks
151  # cancellation should block until cancelled
153 
154  # try to schedule them back in
155  self.execution_schedule.add_new_tasks([demanded_task])
156  # put scheduled tasks back into execution. this will trigger a change in execution if necessary
157  self.execution_schedule.set_schedule([demanded_task])
158 
159 
160  # now try to put the other tasks back in
161  if len(previously_scheduled) > 0:
162  success, added = self.try_schedule(previously_scheduled)
163  if success:
164  rospy.loginfo('Was able to reinstate %s/%s tasks after demand' % (len(added), len(previously_scheduled)))
165  if currently_active_task != None:
166  success, added = self.try_schedule([currently_active_task])
167  if success and len(added) > 0:
168  rospy.loginfo('Was also able to reinstate previously active task after demand')
169  else:
170  rospy.loginfo('Was not able to reinstate previously active task after demand (but other tasks ok)')
171  else:
172  rospy.loginfo('Was NOT able to reinstate tasks after demand')
173 
174 
175  def get_duration_matrix(self, tasks):
176  """
177  Creates the matrix of durations between waypoints needed as input to the scheuler.
178  Output is a DurationMatrix encoding duration[i][j] where this is the duration expected for travelling between the end of the ith task in tasks and the start of the jth element.
179  """
180  # first populate sets of start and end locations for moves between tasks, these are the end and start of tasks repsectively
181  start_nodes = set([task.end_node_id for task in tasks])
182  end_nodes = set([task.start_node_id for task in tasks])
183 
184  # next get the costs for each element of the cross product
185  durations = dict()
186  for start in start_nodes:
187  durations[start] = dict()
188  for end in end_nodes:
189  durations[start][end] = self.get_navigation_duration(start, end)
190 
191 
192  # now populate the DurationMatrix object
193  dm = DurationMatrix()
194  for task_i in tasks:
195  dm.durations.append(DurationList())
196  for task_j in tasks:
197  dm.durations[-1].durations.append(durations[task_i.end_node_id][task_j.start_node_id])
198 
199  return dm
200 
201 
202 
203 
204  def call_scheduler(self, tasks, earliest_start, current_id=0):
205  """
206 
207  Calls scheduler. Reorders the list of tasks in execution order with their execution times set.
208 
209  """
210  resp = self.schedule_srv(tasks, earliest_start, current_id, self.get_duration_matrix(tasks))
211 
212  # rospy.loginfo(resp)
213 
214  if len(resp.task_order) > 0:
215 
216  # add start times to a dictionary for fast lookup
217  task_times = {}
218  for (task_id, start_time) in zip(resp.task_order, resp.execution_times):
219  # print task_id, start_time
220  task_times[task_id] = start_time
221 
222  # set start times inside of tasks
223  for task in tasks:
224  # add min_window back on to starting times
225  # print task.task_id, task_times[task.task_id]
226  task.execution_time = task_times[task.task_id]
227 
228  # taking out as rescheduling demanded tasks have an issue here I think
229  # assert task.execution_time >= task.start_after
230  # assert task.execution_time + task.max_duration <= task.end_before
231 
232  # print 'task %s will start at %s.%s' % (task.task_id, task.execution_time.secs, task.execution_time.nsecs)
233 
234  tasks.sort(key=attrgetter('execution_time'))
235  return True
236 
237  else:
238  rospy.loginfo('No schedule found')
239  return False
240 
241 
242  def publish_schedule(self):
243  # pub.publish(std_msgs.msg.String("foo"))
244 
245  exe_status = ExecutionStatus()
246 
247  current = self.execution_schedule.get_current_task()
248 
249  if current:
250  exe_status.currently_executing = True
251  exe_status.execution_queue.append(current)
252 
253  exe_status.execution_queue.extend(self.execution_schedule.get_execution_queue())
254  # print schedule
255  self.schedule_publisher.publish(exe_status)
256 
257  def bound_tasks_by_start_window(self, tasks, start_after):
258  bounded_tasks = []
259  dropped_tasks = []
260  for task in tasks:
261  # if it's still executable after the bound
262  if start_after + task.max_duration <= task.end_before:
263  #
264  # This is no longer done because it means that execution also has to work with a bounded start_after which may actually lead to unnecessary delays
265  #
266  # # if we need to push it back to the bound
267  # if start_after > task.start_after:
268  # # rospy.loginfo('pushing boundary')
269  # task.start_after = start_after
270  bounded_tasks.append(task)
271  else:
272  dropped_tasks.append(task)
273 
274  return bounded_tasks, dropped_tasks
275 
276  def try_schedule(self, additional_tasks):
277 
278  # this is the lower bound on future execution time
279  lower_bound = rospy.get_rostime()
280  if self.execution_schedule.get_current_task() is not None:
281  lower_bound = self.get_active_task_completion_time()
282  rospy.logwarn('lower bound %s' % lower_bound)
283 
284  # bound additional tasks bu this bound
285  bounded_tasks, dropped_tasks = self.bound_tasks_by_start_window(additional_tasks, lower_bound)
286  if len(bounded_tasks) < len(additional_tasks):
287  rospy.logwarn('Dropped %s additional tasks which are no longer executable' % (len(additional_tasks) - len(bounded_tasks)))
288  additional_tasks = bounded_tasks
289 
290 
291  # these are previoulsy scheduled tasks which we must now reschedule
292  schedulable_tasks = self.execution_schedule.get_schedulable_tasks()
293 
294  bounded_tasks, dropped_tasks = self.bound_tasks_by_start_window(schedulable_tasks, lower_bound)
295  if len(bounded_tasks) < len(schedulable_tasks):
296  rospy.logwarn('Dropped %s existing tasks which are no longer executable' % (len(schedulable_tasks) - len(bounded_tasks)))
297  # have to remove these from schedule too, although this assumes successful scheduling
298  # TODO: what if scheduling is not successful?
299  self.execution_schedule.remove_tasks(dropped_tasks)
300  schedulable_tasks = bounded_tasks
301 
302 
303  # the tasks to try and schedule
304  to_schedule = []
305  # add in the schedulable tasks we already have
306  to_schedule.extend(schedulable_tasks)
307  # and the ones we have just been given
308  to_schedule.extend(additional_tasks)
309 
310  # if we are currently executing something, including this for the start of the schedule too
311  current_id = 0
312  current_task = copy.deepcopy(self.execution_schedule.get_current_task())
313  if current_task is not None:
314  current_id = current_task.task_id
315  rospy.loginfo('Including current task %s in scheduling problem' % current_id)
316  # assuming this executed when requested (may not be!) update how much time is left
317  current_task.max_duration = rospy.Time(1)
318  current_task.end_before = lower_bound
319  current_task.start_after = lower_bound - rospy.Time(2)
320  to_schedule.append(current_task)
321 
322  # reorder tasks and add execution information
323  if self.call_scheduler(to_schedule, lower_bound, current_id):
324  # if this was successful, add new tasks into execution schedule
325  self.execution_schedule.add_new_tasks(additional_tasks)
326 
327  # and remove the current task from the new schedule
328  to_schedule = [t for t in to_schedule if t.task_id != current_id]
329 
330  # put scheduled tasks back into execution. this will trigger a change in execution if necessary
331  self.execution_schedule.set_schedule(to_schedule)
332 
333  rospy.loginfo('Added %s tasks into the schedule to get total of %s' % (len(additional_tasks), self.execution_schedule.get_execution_queue_length()))
334  return True, additional_tasks
335  else:
336  # previously scheduled tasks will still remain scheduled
337  rospy.logwarn('Discarding %s unschedulable tasks, retaining %s' % (len(additional_tasks), self.execution_schedule.get_execution_queue_length()))
338  return False, []
339 
340  def schedule_tasks(self):
341  loopSecs = 5
342 
343  while not rospy.is_shutdown() and self.running:
344  # print "scheduling thread %s" % rospy.is_shutdown()
345  try:
346  unscheduled = []
347  # block until at least one task is available
348  unscheduled.append(self.unscheduled_tasks.get(True, loopSecs))
349  # now check for any remaining tasks in the queue
350  try:
351  while True:
352  unscheduled.append(self.unscheduled_tasks.get(False))
353  except Empty, e:
354  pass
355 
356  if self.running:
357  rospy.loginfo('Got a further %s tasks to schedule' % len(unscheduled))
358  self.try_schedule(unscheduled)
359  else:
360  rospy.loginfo('Putting %s tasks to schedule later' % len(unscheduled))
361  for task in unscheduled:
362  self.unscheduled_tasks.put(task)
363 
364  except Empty, e:
365  # rospy.logdebug('No new tasks to schedule')
366  pass
367 
368  self.publish_schedule()
369 
370 
371  def execute_tasks(self):
372  wait_time = 1 # 1second
373 
374  while not rospy.is_shutdown() and self.running:
375 
376  # print "executing thread %s" % rospy.is_shutdown()
377  if(self.execution_schedule.wait_for_execution_change(wait_time)):
378  if self.running:
379  next_task = self.execution_schedule.get_current_task()
380  rospy.loginfo('Next task to execute: %s' % next_task.task_id)
381  if next_task is not None:
382  self.execute_task(next_task)
383  else:
384  rospy.logwarn('Next task was None')
385 
386 
387 
388 
389  def cancel_task(self, task_id):
390  """ Called when a request is received to cancel a task. The currently executing one is checked elsewhere. """
391  if self.execution_schedule.remove_task_with_id(task_id):
392  # reschedule after a successful removal
393  self.try_schedule([])
394  return True
395  else:
396  return False
397 
398  def clear_schedule(self):
399  """ Called to clear all tasks from schedule, with the exception of the currently executing one. """
400  self.execution_schedule.clear_schedule()
401 
402 
403 
404 if __name__ == '__main__':
405  executor = ScheduledTaskExecutor()
406  rospy.spin()
407 
408 
409 # create a schedule class which handles blocking until execution and manages the various changes
410 
def bound_tasks_by_start_window(self, tasks, start_after)
def task_demanded(self, demanded_task, currently_active_task)


task_executor
Author(s): Nick Hawes
autogenerated on Tue Mar 17 2015 20:08:13