fifo_tester.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 PKG = 'task_executor'
3 NAME = 'fifo_tester'
4 
5 import rospy
6 import unittest
7 import rostest
8 import random
9 
10 import actionlib
11 from strands_executive_msgs.msg import Task
12 from task_executor.msg import *
13 from Queue import Queue
14 from random import randrange
15 from mongodb_store.message_store import MessageStoreProxy
16 from geometry_msgs.msg import Pose, Point, Quaternion
17 from strands_executive_msgs import task_utils
18 from strands_executive_msgs.msg import Task
19 from strands_executive_msgs.srv import AddTask, SetExecutionStatus
20 from topological_navigation.msg import GotoNodeAction
21 from strands_navigation_msgs.msg import TopologicalMap
22 
23 class FakeActionServer(object):
24  def __init__(self, action_string, action_sleep, master, tester):
25  self.master = master
26  self.tester = tester
27  self.action_string = action_string
28  self.action_sleep = action_sleep
29  self.server = actionlib.SimpleActionServer(action_string, TestExecutionAction, self.execute, False)
30  self.server.start()
31 
32  def execute(self, goal):
33  # print 'called with goal %s'%goal
34  rospy.sleep(self.action_sleep)
35 
36  task_description = self.master.task_descriptions.pop(0)
37  self.tester.assertEquals(task_description[0], self.master.node_id)
38  self.tester.assertEquals(task_description[1], self.action_string)
39  self.tester.assertEquals(task_description[2], goal.some_goal_string)
40  self.tester.assertEquals(task_description[3], goal.test_pose)
41  self.server.set_succeeded()
42 
43 
44 
45 class FIFOTester(object):
46  def __init__(self, action_types, action_prefix, task_descriptions, action_sleep, tester):
47  self.tester = tester
48  self.action_sleep = action_sleep
49  self.node_id = ''
50  self.task_descriptions = task_descriptions
51  self.action_servers = [FakeActionServer(action_prefix + str(n), action_sleep, self, tester) for n in range(action_types)]
52 
53 
54  def wait_for_completion(self, wait_duration):
55  # give everything time to complete
56  rospy.sleep(wait_duration)
57  self.tester.assertEquals(self.task_descriptions, [])
58 
59 
60 class TestEntry(unittest.TestCase):
61 # class TestEntry(object):
62  def __init__(self, *args):
63  super(TestEntry, self).__init__(*args)
64  rospy.init_node(NAME)
65  self.node_names = []
66  rospy.Subscriber('topological_map', TopologicalMap, self.map_callback)
67 
68 
69  def map_callback(self, msg):
70  print 'got map'
71  self.node_names = [node.name for node in msg.nodes]
72 
73  def get_nodes(self):
74  while len(self.node_names) == 0:
75  print 'no nodes'
76  rospy.sleep(1)
77  return self.node_names
78 
80  waypoints = self.get_nodes()
81  action_types = 5
82  test_tasks = 5
83  action_sleep = rospy.Duration.from_sec(1)
84  action_prefix = 'test_task_'
85 
86  msg_store = MessageStoreProxy()
87 
88  task_descriptions = []
89  # list comprehension seemed to use the same result from randrange
90  for n in range(test_tasks):
91  string = 'oh what a lovely number %s is' % n
92  pose = Pose(Point(n, 1, 2), Quaternion(3, 4, 5, 6))
93  task_descriptions += [[random.choice(waypoints),
94  action_prefix + str(randrange(action_types)),
95  string,
96  pose, n, n + 0.1]]
97  assert test_tasks == len(task_descriptions)
98 
99  executor = FIFOTester(action_types, action_prefix, task_descriptions, action_sleep, self)
100 
101  # get task services
102  add_task_srv_name = '/task_executor/add_task'
103  set_exe_stat_srv_name = '/task_executor/set_execution_status'
104  rospy.loginfo("Waiting for task_executor service...")
105  rospy.wait_for_service(add_task_srv_name)
106  rospy.wait_for_service(set_exe_stat_srv_name)
107  rospy.loginfo("Done")
108  add_task_srv = rospy.ServiceProxy(add_task_srv_name, AddTask)
109  set_execution_status = rospy.ServiceProxy(set_exe_stat_srv_name, SetExecutionStatus)
110 
111  try:
112 
113  for task_description in task_descriptions:
114  # create the task from the description
115  task = Task(start_node_id=task_description[0], action=task_description[1])
116  # add some dummy arguments
117  task_utils.add_string_argument(task, task_description[2])
118  task_utils.add_object_id_argument(task, msg_store.insert(task_description[3]), Pose)
119  task_utils.add_int_argument(task, task_description[4])
120  task_utils.add_float_argument(task, task_description[5])
121  task_id = add_task_srv(task)
122  print task_id
123 
124  # Start the task executor running
125  set_execution_status(True)
126 
127 
128  wait_duration = rospy.Duration()
129  for n in range(test_tasks):
130  wait_duration += action_sleep +action_sleep + action_sleep
131 
132  executor.wait_for_completion(wait_duration + wait_duration)
133 
134  except rospy.ServiceException, e:
135  print "Service call failed: %s"%e
136 
137 
138 
139 if __name__ == '__main__':
140  rostest.rosrun(PKG, NAME, TestEntry, sys.argv)
141  # rospy.init_node('test_travel_time_estimator')
142  # executor = TestEntry()
143  # executor.test_fifo_task_executor()
144 
def test_fifo_task_executor(self)
Definition: fifo_tester.py:79
def get_nodes(self)
Definition: fifo_tester.py:73
def __init__(self, args)
Definition: fifo_tester.py:62
def map_callback(self, msg)
Definition: fifo_tester.py:69
def wait_for_completion(self, wait_duration)
Definition: fifo_tester.py:54
def __init__(self, action_types, action_prefix, task_descriptions, action_sleep, tester)
Definition: fifo_tester.py:46
def __init__(self, action_string, action_sleep, master, tester)
Definition: fifo_tester.py:24


task_executor
Author(s): Nick Hawes
autogenerated on Tue Mar 17 2015 20:08:13