execution_schedule.py
Go to the documentation of this file.
1 from strands_executive_msgs.msg import Task
2 from threading import Event
3 from copy import deepcopy
4 from Queue import Queue, Empty
5 from collections import deque
6 
7 import rospy
8 
9 # def enum(*sequential, **named):
10 # """
11 # Pre-v3 emum support from http://stackoverflow.com/questions/36932/how-can-i-represent-an-enum-in-python
12 # """
13 # enums = dict(zip(sequential, range(len(sequential))), **named)
14 # return type('Enum', (), enums)
15 
16 # ExecutionStatus = enum('ZERO', 'ONE', 'TWO')
17 
18 class ExecutionSchedule(object):
19 
20  def __init__(self):
21  self.current_task = None
22  self.execution_change = Event()
23  self.execution_queue = deque()
24  self.tasks = []
26 
27  def add_new_tasks(self, tasks):
28  """ Add new tasks to be scheduled. """
29  self.tasks.extend(tasks)
30 
32  """ Get the tasks which are available to be scheduled. Does not include the task being executed. """
33  return deepcopy(self.tasks)
34 
36  """ Get the tasks which are queued for execution. """
37  return deepcopy(self.execution_queue)
38 
40  """ Get the number of tasks which are queued for execution. """
41  return len(self.execution_queue)
42 
43 
44  def clear_schedule(self):
45  """ Removes all tasks from the execution queue and task list """
46  self.execution_queue.clear()
47  del self.tasks[:]
48 
49  def remove_task_with_id(self, task_id):
50  # remove from list of available tasks
51  prior_tasks_len = len(self.tasks)
52  prior_queue_len = len(self.execution_queue)
53  self.tasks = [t for t in self.tasks if t.task_id != task_id]
54  # also remove from execution queue, order and contents can be different from self.tasks
55  self.execution_queue = deque([t for t in self.execution_queue if t.task_id != task_id])
56  # if one of these got shorter then we did good
57  return len(self.tasks) < prior_tasks_len or len(self.execution_queue) < prior_queue_len
58 
59  def remove_tasks(self, tasks):
60  for task in tasks:
61  # massively inefficient!
62  self.remove_task_with_id(task.task_id)
63 
64  def execute_next_task(self):
65  """
66  Pops the head of the execution queue to the current task, removes this from schedulable tasks, and notifies the wait_for_execution_change method.
67  """
68  if len(self.execution_queue) > 0:
69  self.current_task = self.execution_queue.popleft()
70  # remove current task from schedulable tasks
71  self.tasks = [t for t in self.tasks if t.task_id != self.current_task.task_id]
72  self.execution_change.set()
73  else:
74  self.current_task = None
75 
76  def execution_delay_cb(self, event):
77  rospy.logdebug('timer for execution delay fired')
78  self.execution_delay_timer = None
79  self.next_in_schedule()
80 
81  def next_in_schedule(self):
82  """
83  Checks whether the next action can be executed now (i.e. the current time is within its constraints). If not, delays execution suitably.
84  """
85  if len(self.execution_queue) > 0:
86  now = rospy.get_rostime()
87  next_task = self.execution_queue[0]
88 
89  rospy.loginfo('start window: %s.%s' % (next_task.start_after.secs, next_task.start_after.nsecs))
90  rospy.loginfo(' now: %s.%s' % (now.secs, now.nsecs))
91 
92  # if the start window is open
93  if next_task.start_after <= now:
94  rospy.loginfo('start window is open')
95  self.execute_next_task()
96  else:
97  exe_delay = next_task.start_after - now
98  rospy.loginfo('need to delay %s.%s for execution' % (exe_delay.secs, exe_delay.nsecs))
99  self.current_task = None
100  self.execution_delay_timer = rospy.Timer(exe_delay, self.execution_delay_cb, oneshot=True)
101  else:
102  self.current_task = None
103 
104 
105  def task_complete(self, task):
106  assert task is not None
107  assert self.current_task is not None
108  assert task.task_id == self.current_task.task_id
109  # no time info yet
110  self.next_in_schedule()
111 
112 
113  def set_schedule(self, scheduled_tasks):
114  """
115  Receive a list of tasks in order of execution, with their execution times set.
116  Returns true if the schedule was
117  """
118 
119  # check that the tasks that were scheduled are the ones we're waiting on
120 
121  if len(scheduled_tasks) != len(self.tasks):
122  rospy.logwarn('Number of scheduled tasks mismatch: given %s, had %s' % (len(scheduled_tasks),len(self.tasks)))
123  return False
124 
125  for scheduled in scheduled_tasks:
126  if not any(t.task_id == scheduled.task_id for t in self.tasks):
127  rospy.logwarn('Trying to scheduled a missed task')
128  return False
129 
130  # now clear out the execution queue so that the new schedule comes into effect after current execution completes
131  self.execution_queue.clear()
132  self.execution_queue.extend(scheduled_tasks)
133 
134  # if nothing is executing, make sure something starts
135  if self.current_task is None:
136  # nothing may be executing becuase we're waiting for delayed execution
137  if self.execution_delay_timer is not None:
138  self.execution_delay_timer.shutdown()
139  self.next_in_schedule()
140  # else:
141  # rospy.logwarn('current task is still: %s', self.current_task)
142 
143 
144 
145  def get_current_task(self):
146  """ Get the next task for execution. """
147  return self.current_task
148 
149 
150  def wait_for_execution_change(self, timeout):
151  """
152  Blocks until current_task changes value
153  """
154  self.execution_change.wait(timeout)
155 
156  # if change was notified
157  if self.execution_change.is_set():
158  # reset to cause future calls to block until next notification
159  self.execution_change.clear()
160  return True
161  else:
162  # timeout
163  return False
164 
165 


task_executor
Author(s): Nick Hawes
autogenerated on Tue Mar 17 2015 20:08:13