task_routine.py
Go to the documentation of this file.
1 import rospy
2 from datetime import datetime, timedelta, date
3 from dateutil.tz import tzlocal, tzutc
4 from copy import copy
5 
6 from threading import Thread
7 
8 _epoch = datetime.utcfromtimestamp(0).replace(tzinfo=tzutc())
9 
10 class RoutineException(Exception):
11  pass
12 
13 def unix_time(dt):
14  """
15  Converts a datetime object to seconds since epoch
16  """
17  delta = dt - _epoch
18  return delta.total_seconds()
19 
20 def time_to_secs(time):
21  """
22  Coverts a datetime.time object to seconds.
23  """
24  return (time.hour * 60 * 60) + (time.minute * 60) + (time.second) + (time.microsecond/1000.0)
25 
26 
27 
28 def time_greater_than(t1, t2):
29  """ Seems to be a bug in datetime when comparing localtz dates, so do this instead. """
30  if t1.hour > t2.hour:
31  return True
32  elif t1.hour == t2.hour:
33  if t1.minute > t2.minute:
34  return True
35  elif t1.minute == t2.minute:
36  if t1.second > t2.second:
37  return True
38  elif t1.second == t2.second:
39  return t1.microsecond > t2.microsecond
40 
41  return False
42 
43 def time_less_than(t1, t2):
44  """ Seems to be a bug in datetime when comparing localtz dates, so do this instead. """
45  if t1.hour < t2.hour:
46  return True
47  elif t1.hour == t2.hour:
48  if t1.minute < t2.minute:
49  return True
50  elif t1.minute == t2.minute:
51  if t1.second < t2.second:
52  return True
53  elif t1.second == t2.second:
54  return t1.microsecond < t2.microsecond
55 
56  return False
57 
58 def delta_between(start, end):
59  """
60  Args:
61  start (datetime.time): start time of window
62  end (datetime.time): end time of window
63  Returns
64  datetime.timedelta: The delta of the window between start and end
65  """
66  sdt = datetime.combine(date.today(), start)
67  if time_less_than(start, end):
68  enddt = datetime.combine(date.today(), end)
69  else:
70  enddt = datetime.combine(date.today(), end) + timedelta(days=1)
71  return enddt - sdt
72 
73 
74 
75 
76 class DailyRoutine(object):
77  """ An object for setting up the daily schedule of a robot.
78  Args:
79  daily_start (datetime.time): The time of day when all tasks can start, local time.
80  daily_end (datetime.time): The time of day when all tasks should end start, local time.
81  """
82  def __init__(self, daily_start, daily_end):
83  super(DailyRoutine, self).__init__()
84 
85  if daily_start.tzinfo is None:
86  raise RoutineException('Start time must have timezone set')
87 
88  if daily_end.tzinfo is None:
89  raise RoutineException('End times must have timezone set')
90 
91  self.daily_start = daily_start
92  self.routine_duration = delta_between(daily_start, daily_end)
93  self.routine_tasks = []
94 
95  def repeat_every_day(self, tasks, times=1):
96  """
97  Repeat the given tasks a number of times during the day.
98  """
99  self.repeat_every(tasks, self.daily_start, self.routine_duration, times)
100 
101 
102 
103  def repeat_every_delta(self, tasks, delta=timedelta(hours=1), times=1, start_time=None, duration=None):
104  """
105  Repeat the given tasks every delta, a given number of times from the start time for the total given duration.
106  Args:
107  tasks: The list of tasks to repeat
108  delta (datetime.timedelta): The delta for the repetition (e.g. if this is one hour then the tasks repeat every hour, with each task having an hour time window for execution)
109  times (int): The number of times to repeat the tasks in each window
110  start_time (datetime.time): The time of day when the repetition should start.
111  duration (datetime.timedelta): The total length of time that should not be exceeded by the complete list of repetitions
112  """
113  if start_time is None:
114  start_time = self.daily_start
115  if duration is None:
116  duration = self.routine_duration
117 
118  # this window is moved forward throughout the repeat
119  window_start = datetime.combine(date.today(), start_time)
120  window_end = window_start + delta
121  # the past which we don't move the window
122  routine_end = window_start + duration
123 
124  while window_end <= routine_end:
125 
126  if window_start == window_end:
127  return
128 
129  print '%s.%s - %s.%s' % (window_start.hour, window_start.minute, window_end.hour, window_end.minute)
130  self.repeat_every(tasks, window_start.timetz(), delta, times)
131 
132  window_start = window_end
133  window_end = window_start + delta
134 
135 
136  def repeat_every_mins(self, tasks, mins=30, times=1):
137  """
138  Repeat the given tasks x times every n minutes
139  """
140 
141  self.repeat_every_delta(tasks, timedelta(minutes=mins), times)
142 
143 
144  def repeat_every_hour(self, tasks, hours=1, times=1):
145  """
146  Repeat the given tasks x times every n hours
147  """
148  self.repeat_every_delta(tasks, timedelta(hours=hours), times)
149 
150 
151  def repeat_every(self, tasks, daily_start, daily_duration, times=1):
152  """
153  Repeat the given task a number of times during the given time window.
154  Args:
155  daily_start (datetime.time): The time of day when the given tasks can start, local time.
156  daily_duration (datetime.timedelta): The duration of time during which the tasks can be executed
157  times: the number of times to execute the tasks in the window
158  """
159 
160  # make tasks a list if its not already
161  if not isinstance(tasks, list):
162  tasks = [tasks]
163 
164  if daily_start < self.daily_start:
165  raise Exception('Provided daily start %s is less than overall daily start %s' % (daily_start, self.daily_start))
166 
167 
168  daily_end = datetime.combine(date.today(), daily_start) + daily_duration
169  overall_end = datetime.combine(date.today(), self.daily_start) + self.routine_duration
170 
171 
172  if daily_end > overall_end:
173  raise Exception('Provided daily end %s is greater than overall daily end %s for tasks %s' % (daily_end, overall_end, tasks))
174 
175 
176 
177  self.routine_tasks += [(tasks, (daily_start, daily_duration))] * times
178 
179  def get_routine_tasks(self):
180  return self.routine_tasks
181 
182 
183 
184 
185 class DailyRoutineRunner(object):
186  """ An object for running the daily schedule of a robot.
187  Args:
188  daily_start (datetime.time): The time of day when all tasks can start, local time.
189  daily_end (datetime.time): The time of day when all tasks should end start, local time.
190  pre_start_window (datetime.timedelta): The duration before a task's start that it should be passed to the scheduler. Defaults to 1 hour.
191  """
192  def __init__(self, daily_start, daily_end, add_tasks_srv, pre_start_window=timedelta(hours=0.25), day_start_cb=None, day_end_cb=None, tasks_allowed_fn=None):
193  super(DailyRoutineRunner, self).__init__()
194 
195 
196  if daily_start.tzinfo is None:
197  raise RoutineException('Start time must have timezone set')
198 
199  if daily_end.tzinfo is None:
200  raise RoutineException('End times must have timezone set')
201 
202  self.daily_start = daily_start
203  self.routine_duration = delta_between(daily_start, daily_end)
204 
205  # work out when this current run of the routine should've started/ended
206  rostime_now = rospy.get_rostime()
207  now = datetime.fromtimestamp(rostime_now.to_sec(), tzlocal()).time()
208 
209  self.current_routine_start = datetime.combine(date.today(), self.daily_start)
210 
211  if time_less_than(daily_start, daily_end):
212  # if we're past the day end then the current routine starts tomorrow
213  if time_greater_than(now, daily_end):
214  self.current_routine_start += timedelta(days=1)
215  else:
216  if time_less_than(now, daily_end):
217  self.current_routine_start -= timedelta(days=1)
218 
220 
221  rospy.loginfo('Current day starts at %s' % self.current_routine_start)
222  rospy.loginfo('Current day ends at %s' % self.current_routine_end)
223 
224  if self.current_routine_start.tzinfo is None:
225  raise RoutineException()
226 
227  if self.current_routine_end.tzinfo is None:
228  raise RoutineException()
229 
230  # the tasks which need to be performed every day, tuples of form (daily_start, daily_end, task)
231  self.routine_tasks = []
232  self.add_tasks_srv = add_tasks_srv
233 
234  if tasks_allowed_fn is None:
236  else:
237  self.tasks_allowed = tasks_allowed_fn
238 
239  self.days_off = []
240  self.dates_off = []
241 
242  # check for local spellings!
243  self.all_days = []
244  day_today = datetime.fromtimestamp(rospy.get_rostime().to_sec(), tz=tzlocal())
245  for n in range(0,7):
246  self.all_days.append(day_today.strftime("%A"))
247  day_today += timedelta(days=1)
248 
249  self.pre_schedule_delay = rospy.Duration(pre_start_window.total_seconds())
250 
251  self.day_start_cb = day_start_cb
252  self.day_end_cb = day_end_cb
253  Thread(target=self._start_and_end_day).start()
254 
255  def add_day_off(self, day_name):
256  """ Add a day of the week, e.g. Saturday, on which the routing should not be run """
257  if day_name not in self.all_days:
258  raise Exception('Day name %s not allowed. Must be one of %s' % (day_name, self.all_days))
259  self.days_off.append(day_name)
260 
261  def add_date_off(self, date):
262  """ Add a datetime.date on which the robot should not work. """
263  self.dates_off.append(date)
264 
265  def day_off(self):
266  datetime_today = datetime.fromtimestamp(rospy.get_rostime().to_sec(), tz=tzlocal())
267  day_today = datetime_today.strftime("%A")
268  date_today = datetime_today.date()
269 
270  return (day_today in self.days_off) or (date_today in self.dates_off)
271 
272  def _tasks_allowed_fn(self, task):
273  return True
274 
276  """ Runs a loop which triggers the start and end of day callbacks """
277 
278  loop_delay = rospy.Duration(1)
279 
280  while not rospy.is_shutdown():
281 
282  # again, using sleeps rather than timers due to slightly odd sim time behaviour of the latter
283  now = datetime.fromtimestamp(rospy.get_rostime().to_sec(), tz=tzlocal())
284 
285  # wait for a the pre-delay before the start of the day to instantiate the day's tasks
286  pre_delayed_start = self.current_routine_start - timedelta(seconds=self.pre_schedule_delay.to_sec())
287  while now < pre_delayed_start and not rospy.is_shutdown():
288  rospy.sleep(loop_delay)
289  now = datetime.fromtimestamp(rospy.get_rostime().to_sec(), tz=tzlocal())
290 
291  self._new_day()
292 
293  # wait for the start of the day
294  while now < self.current_routine_start and not rospy.is_shutdown():
295  rospy.sleep(loop_delay)
296  now = datetime.fromtimestamp(rospy.get_rostime().to_sec(), tz=tzlocal())
297 
298  if self.day_start_cb is not None and not rospy.is_shutdown():
299  rospy.loginfo('triggering day start cb at %s' % now)
300  self.day_start_cb()
301 
302  while now < self.current_routine_end and not rospy.is_shutdown():
303  rospy.sleep(loop_delay)
304  now = datetime.fromtimestamp(rospy.get_rostime().to_sec(), tz=tzlocal())
305 
306  if self.day_end_cb is not None and not rospy.is_shutdown():
307  rospy.loginfo('triggering day end cb at %s' % now)
308  self.day_end_cb()
309 
310  # update routine window
311  self.current_routine_start += timedelta(days=1)
312  self.current_routine_end += timedelta(days=1)
313 
314 
315  """
316  Adds a task to the daily routine.
317  Args:
318  routines: A list of tuples where each tuple is (task_list, window) where task_list is a list of task and window is a tuple (start, duration) representing a time window in which to execute those tasks each day.
319  """
320  def add_tasks(self, routines):
321 
322  # add tasks to daily route
323  new_routine = []
324 
325  for task_tuple in routines:
326 
327 
328  daily_start = task_tuple[1][0]
329  daily_duration = task_tuple[1][1]
330 
331  for task in task_tuple[0]:
332 
333  # else, add to daily routine
334  routine = (daily_start, daily_duration, task)
335  self.routine_tasks.append(routine)
336  new_routine.append(routine)
337 
338  # see if we can still schedule any of these today
339  todays_tasks = self._instantiate_tasks_for_today(new_routine)
340  # the false means an exception is not thrown for any out of window tasks
341  self._create_routine(todays_tasks, False)
342 
343 
344  def _new_day(self):
345  """
346  Should be called each new day, in advance of the overall daily start time.
347  """
348 
349  todays_tasks = self._instantiate_tasks_for_today(self.routine_tasks)
350  self._create_routine(todays_tasks)
351 
352 
353  def _instantiate_tasks_for_today(self, routine_tasks):
354  # instantiate the daily tasks with release windows for today
355  todays_tasks = [self._instantiate_for_day(self.current_routine_start, *task_tuple) for task_tuple in routine_tasks]
356  return todays_tasks
357 
358  def _create_routine(self, tasks, throw=True):
359 
360  schedule_now, schedule_later = self._queue_for_scheduling(tasks, throw)
361 
362  rospy.loginfo('Scheduling %s tasks now and %s later' % (len(schedule_now), len(schedule_later)))
363 
364  self._delay_tasks(schedule_later)
365  self._schedule_tasks(schedule_now)
366 
367 
368  def _queue_for_scheduling(self, tasks, throw=True):
369  now = rospy.get_rostime()
370 
371  schedule_now = []
372  schedule_later = []
373 
374  for task in tasks:
375 
376  # print task.max_duration.secs
377  # print task.start_after.secs
378  # print task.end_before.secs
379  # print (now).secs
380  # print (now + task.max_duration).secs
381 
382 
383  # check we're not too late
384  if now + task.max_duration > task.end_before:
385  #
386  if throw:
387  raise RoutineException('%s is too late to schedule task %s' % (now.secs, task))
388  else:
389  rospy.logdebug('Ignoring task for today')
390  else:
391  # if we're in the the window when this should be scheduled
392  if now > (task.start_after - self.pre_schedule_delay):
393  schedule_now.append(task)
394  else:
395  schedule_later.append(task)
396 
397  return schedule_now, schedule_later
398 
399 
400  def _delay_tasks(self, tasks):
401  """
402  Delays call to the scheduer until the first of these needs to start, then reruns check for scheduling
403  """
404 
405  if len(tasks) == 0:
406  return
407 
408  # find the soonest start date of all tasks
409 
410  # an arbitrary large date
411  min_start_date = rospy.Time(unix_time(self.current_routine_end))
412  for task in tasks:
413  if task.start_after < min_start_date:
414  # print task.max_duration.secs
415  # print task.start_after.secs
416  # print task.end_before.secs
417  # print (now).secs
418  # print (now + task.max_duration).secs
419  min_start_date = task.start_after
420 
421  # print 'min start at %s' % min_start_date.secs
422 
423  # only delay up to the pre-scheduler window
424  min_start_date = min_start_date - self.pre_schedule_delay
425 
426  now = rospy.get_rostime()
427 
428 
429  # print 'min start at %s' % min_start_date.secs
430  # print ' now at %s' % now.secs
431 
432  delay = min_start_date - now
433 
434  assert delay.secs > 0
435 
436 
437  # print 'delaying %s' % delay.secs
438  self._delay_scheduling(tasks, delay)
439 
440 
441  # separated out to try differeing approaches
442  def _delay_scheduling(self, tasks, delay):
443  rospy.loginfo('Delaying %s tasks for %s secs' % (len(tasks), delay.secs))
444 
445  def _check_tasks():
446  # using a sleep instead of a timer as the timer seems flakey with sim time
447  target = rospy.get_rostime() + delay
448  # make sure we can be killed here
449  while rospy.get_rostime() < target and not rospy.is_shutdown():
450  rospy.sleep(1)
451  # handle being shutdown
452  if not rospy.is_shutdown():
453  self._create_routine(tasks)
454 
455  Thread(target=_check_tasks).start()
456 
457 
458  def _schedule_tasks(self, tasks):
459 
460  if len(tasks) > 0:
461 
462  allowed_tasks = [t for t in tasks if self.tasks_allowed(t)]
463 
464  if len(allowed_tasks) != len(tasks):
465  rospy.loginfo('Provided function prevented %s of %s tasks being send to the scheduler' % (len(tasks) - len(allowed_tasks), len(tasks)))
466 
467  if not self.day_off():
468  rospy.loginfo('Sending %s tasks to the scheduler' % (len(allowed_tasks)))
469  self.add_tasks_srv(allowed_tasks)
470  else:
471  rospy.loginfo('Taking the day off')
472 
473 
474  def _instantiate_for_day(self, start_of_day, daily_start, daily_duration, task):
475  """
476  Create a copy of the given task with start and end times instantiated from input variables relative to the start date provided.
477  """
478  instantiated_task = copy(task)
479  release_date = datetime.combine(start_of_day.date(), daily_start)
480  end_date = release_date + daily_duration
481  instantiated_task.start_after = rospy.Time(unix_time(release_date))
482  instantiated_task.end_before = rospy.Time(unix_time(end_date))
483  # print '%s (%s)' % (release_date, instantiated_task.start_after.secs)
484  # print '%s (%s)' % (end_date, instantiated_task.end_before.secs)
485 
486  return instantiated_task
487 
488 
489 
490 
491 
492 
def _instantiate_for_day(self, start_of_day, daily_start, daily_duration, task)
def delta_between(start, end)
Definition: task_routine.py:58
def __init__(self, daily_start, daily_end)
Definition: task_routine.py:82
def _instantiate_tasks_for_today(self, routine_tasks)


task_executor
Author(s): Nick Hawes
autogenerated on Tue Mar 17 2015 20:08:13