13 from Queue
import Queue
14 from random
import randrange
15 from mongodb_store.message_store
import MessageStoreProxy
16 from geometry_msgs.msg
import Pose, Point, Quaternion
17 from strands_executive_msgs
import task_utils
20 from topological_navigation.msg
import GotoNodeAction
24 def __init__(self, action_string, action_sleep, master, tester):
29 self.
server = actionlib.SimpleActionServer(action_string, TestExecutionAction, self.
execute,
False)
36 task_description = self.master.task_descriptions.pop(0)
37 self.tester.assertEquals(task_description[0], self.master.node_id)
38 self.tester.assertEquals(task_description[1], self.
action_string)
39 self.tester.assertEquals(task_description[2], goal.some_goal_string)
40 self.tester.assertEquals(task_description[3], goal.test_pose)
41 self.server.set_succeeded()
46 def __init__(self, action_types, action_prefix, task_descriptions, action_sleep, tester):
56 rospy.sleep(wait_duration)
63 super(TestEntry, self).
__init__(*args)
66 rospy.Subscriber(
'topological_map', TopologicalMap, self.map_callback)
71 self.node_names = [node.name
for node
in msg.nodes]
74 while len(self.node_names) == 0:
77 return self.node_names
80 waypoints = self.get_nodes()
83 action_sleep = rospy.Duration.from_sec(1)
84 action_prefix =
'test_task_'
86 msg_store = MessageStoreProxy()
88 task_descriptions = []
90 for n
in range(test_tasks):
91 string =
'oh what a lovely number %s is' % n
92 pose = Pose(Point(n, 1, 2), Quaternion(3, 4, 5, 6))
93 task_descriptions += [[random.choice(waypoints),
94 action_prefix + str(randrange(action_types)),
97 assert test_tasks == len(task_descriptions)
99 executor =
FIFOTester(action_types, action_prefix, task_descriptions, action_sleep, self)
102 add_task_srv_name =
'/task_executor/add_task'
103 set_exe_stat_srv_name =
'/task_executor/set_execution_status'
104 rospy.loginfo(
"Waiting for task_executor service...")
105 rospy.wait_for_service(add_task_srv_name)
106 rospy.wait_for_service(set_exe_stat_srv_name)
107 rospy.loginfo(
"Done")
108 add_task_srv = rospy.ServiceProxy(add_task_srv_name, AddTask)
109 set_execution_status = rospy.ServiceProxy(set_exe_stat_srv_name, SetExecutionStatus)
113 for task_description
in task_descriptions:
115 task = Task(start_node_id=task_description[0], action=task_description[1])
117 task_utils.add_string_argument(task, task_description[2])
118 task_utils.add_object_id_argument(task, msg_store.insert(task_description[3]), Pose)
119 task_utils.add_int_argument(task, task_description[4])
120 task_utils.add_float_argument(task, task_description[5])
121 task_id = add_task_srv(task)
125 set_execution_status(
True)
128 wait_duration = rospy.Duration()
129 for n
in range(test_tasks):
130 wait_duration += action_sleep +action_sleep + action_sleep
132 executor.wait_for_completion(wait_duration + wait_duration)
134 except rospy.ServiceException, e:
135 print "Service call failed: %s"%e
139 if __name__ ==
'__main__':
140 rostest.rosrun(PKG, NAME, TestEntry, sys.argv)
def test_fifo_task_executor(self)
def map_callback(self, msg)
def wait_for_completion(self, wait_duration)
def __init__(self, action_types, action_prefix, task_descriptions, action_sleep, tester)
def __init__(self, action_string, action_sleep, master, tester)