2 from threading
import Event
3 from copy
import deepcopy
4 from Queue
import Queue, Empty
5 from collections
import deque
28 """ Add new tasks to be scheduled. """
29 self.tasks.extend(tasks)
32 """ Get the tasks which are available to be scheduled. Does not include the task being executed. """
33 return deepcopy(self.
tasks)
36 """ Get the tasks which are queued for execution. """
40 """ Get the number of tasks which are queued for execution. """
45 """ Removes all tasks from the execution queue and task list """
46 self.execution_queue.clear()
51 prior_tasks_len = len(self.
tasks)
53 self.
tasks = [t
for t
in self.
tasks if t.task_id != task_id]
66 Pops the head of the execution queue to the current task, removes this from schedulable tasks, and notifies the wait_for_execution_change method.
71 self.
tasks = [t
for t
in self.
tasks if t.task_id != self.current_task.task_id]
72 self.execution_change.set()
77 rospy.logdebug(
'timer for execution delay fired')
83 Checks whether the next action can be executed now (i.e. the current time is within its constraints). If not, delays execution suitably.
86 now = rospy.get_rostime()
89 rospy.loginfo(
'start window: %s.%s' % (next_task.start_after.secs, next_task.start_after.nsecs))
90 rospy.loginfo(
' now: %s.%s' % (now.secs, now.nsecs))
93 if next_task.start_after <= now:
94 rospy.loginfo(
'start window is open')
97 exe_delay = next_task.start_after - now
98 rospy.loginfo(
'need to delay %s.%s for execution' % (exe_delay.secs, exe_delay.nsecs))
106 assert task
is not None
108 assert task.task_id == self.current_task.task_id
115 Receive a list of tasks in order of execution, with their execution times set.
116 Returns true if the schedule was
121 if len(scheduled_tasks) != len(self.
tasks):
122 rospy.logwarn(
'Number of scheduled tasks mismatch: given %s, had %s' % (len(scheduled_tasks),len(self.
tasks)))
125 for scheduled
in scheduled_tasks:
126 if not any(t.task_id == scheduled.task_id
for t
in self.
tasks):
127 rospy.logwarn(
'Trying to scheduled a missed task')
131 self.execution_queue.clear()
132 self.execution_queue.extend(scheduled_tasks)
138 self.execution_delay_timer.shutdown()
146 """ Get the next task for execution. """
152 Blocks until current_task changes value
154 self.execution_change.wait(timeout)
157 if self.execution_change.is_set():
159 self.execution_change.clear()
def wait_for_execution_change(self, timeout)
def get_current_task(self)
def get_execution_queue(self)
def remove_task_with_id(self, task_id)
def get_execution_queue_length(self)
def execution_delay_cb(self, event)
def next_in_schedule(self)
def execute_next_task(self)
def add_new_tasks(self, tasks)
def set_schedule(self, scheduled_tasks)
def get_schedulable_tasks(self)
def task_complete(self, task)
def remove_tasks(self, tasks)