task_routine_tester.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import rospy
4 
5 from datetime import datetime, timedelta, time
6 from dateutil.tz import tzlocal
7 
8 
9 import mongodb_store_msgs.srv as dc_srv
10 from mongodb_store_msgs.msg import StringPair
11 import mongodb_store.util as dc_util
12 from mongodb_store.message_store import MessageStoreProxy
13 from geometry_msgs.msg import Pose, Point, Quaternion
14 import StringIO
15 
16 from strands_executive_msgs import task_utils
17 from strands_executive_msgs.msg import Task
18 from strands_executive_msgs.srv import AddTasks, SetExecutionStatus
19 # import strands_executive_msgs
20 
21 import rospy
22 import actionlib
23 from strands_executive_msgs.msg import Task
24 from task_executor.msg import *
25 from topological_navigation.msg import GotoNodeAction
26 from copy import deepcopy
27 from random import random
28 
29 
30 from task_executor import task_routine
31 from task_executor.utils import TestTaskAction
32 
33 
35  # get services necessary to do the jon
36  add_tasks_srv_name = '/task_executor/add_tasks'
37  set_exe_stat_srv_name = '/task_executor/set_execution_status'
38  rospy.loginfo("Waiting for task_executor service...")
39  rospy.wait_for_service(add_tasks_srv_name)
40  rospy.wait_for_service(set_exe_stat_srv_name)
41  rospy.loginfo("Done")
42  add_tasks_srv = rospy.ServiceProxy(add_tasks_srv_name, AddTasks)
43  set_execution_status = rospy.ServiceProxy(set_exe_stat_srv_name, SetExecutionStatus)
44  return add_tasks_srv, set_execution_status
45 
46 def create_wait_task(max_duration):
47  master_task = Task(action='wait_action',start_node_id='ChargingPoint', max_duration=max_duration)
48  task_utils.add_time_argument(master_task, rospy.Time())
49  task_utils.add_duration_argument(master_task, max_duration)
50  return master_task
51 
53  rospy.loginfo('on_day_start')
54 
55 
56 def on_day_end():
57  rospy.loginfo('on_day_end')
58 
59 
60 if __name__ == '__main__':
61  rospy.init_node("task_routine_tester")
62  # wait for simulated time to kick in as rospy.get_rostime() is 0 until first clock message received
63  while not rospy.is_shutdown() and rospy.get_rostime().secs == 0:
64  pass
65 
66  # create a task we will copy later
67  actual_action_duration = rospy.Duration(20)
68  task = create_wait_task(actual_action_duration)
69 
70  # get services to call into execution framework
71  add_tasks, set_execution_status = get_services()
72  set_execution_status(True)
73 
74  # some useful times
75  localtz = tzlocal()
76  # the time the robot will be active
77  start = time(8,45, tzinfo=localtz)
78  end = time(23,15, tzinfo=localtz)
79  midday = time(12,00, tzinfo=localtz)
80 
81  morning = (start, midday)
82  afternoon = (midday, end)
83 
84  routine = task_routine.DailyRoutine(start, end)
85 
86  routine.repeat_every_mins(task, times=1)
87 
88  # create the object which will talk to the scheduler
89  runner = task_routine.DailyRoutineRunner(start, end, add_tasks, day_start_cb=on_day_start, day_end_cb=on_day_end)
90  # pass the routine tasks on to the runner which handles the daily instantiation of actual tasks
91  runner.add_tasks(routine.get_routine_tasks())
92 
93  rospy.spin()
def create_wait_task(max_duration)


task_executor
Author(s): Nick Hawes
autogenerated on Tue Mar 17 2015 20:08:13