moveit2
The MoveIt Motion Planning Framework for ROS 2.
servo_cpp_integration.test.py
Go to the documentation of this file.
1 import os
2 import launch
3 import unittest
4 import launch_ros
5 import launch_testing
6 from ament_index_python.packages import get_package_share_directory
7 from moveit_configs_utils import MoveItConfigsBuilder
8 from launch_param_builder import ParameterBuilder
9 
10 
12  moveit_config = (
13  MoveItConfigsBuilder("moveit_resources_panda")
14  .robot_description(file_path="config/panda.urdf.xacro")
15  .to_moveit_configs()
16  )
17 
18  # Get parameters for the Servo node
19  servo_params = {
20  "moveit_servo_test": ParameterBuilder("moveit_servo")
21  .yaml("config/test_config_panda.yaml")
22  .to_dict()
23  }
24 
25  # This sets the update rate and planning group name for the acceleration limiting filter.
26  acceleration_filter_update_period = {"update_period": 0.01}
27  planning_group_name = {"planning_group_name": "panda_arm"}
28 
29  # ros2_control using FakeSystem as hardware
30  ros2_controllers_path = os.path.join(
31  get_package_share_directory("moveit_resources_panda_moveit_config"),
32  "config",
33  "ros2_controllers.yaml",
34  )
35  ros2_control_node = launch_ros.actions.Node(
36  package="controller_manager",
37  executable="ros2_control_node",
38  parameters=[ros2_controllers_path],
39  remappings=[
40  ("/controller_manager/robot_description", "/robot_description"),
41  ],
42  output="screen",
43  )
44 
45  joint_state_broadcaster_spawner = launch_ros.actions.Node(
46  package="controller_manager",
47  executable="spawner",
48  arguments=[
49  "joint_state_broadcaster",
50  "--controller-manager-timeout",
51  "300",
52  "--controller-manager",
53  "/controller_manager",
54  ],
55  )
56 
57  panda_arm_controller_spawner = launch_ros.actions.Node(
58  package="controller_manager",
59  executable="spawner",
60  arguments=["panda_arm_controller", "-c", "/controller_manager"],
61  )
62 
63  # Component nodes for tf and Servo
64  test_container = launch_ros.actions.ComposableNodeContainer(
65  name="servo_integration_tests_container",
66  namespace="/",
67  package="rclcpp_components",
68  executable="component_container_mt",
69  composable_node_descriptions=[
70  launch_ros.descriptions.ComposableNode(
71  package="robot_state_publisher",
72  plugin="robot_state_publisher::RobotStatePublisher",
73  name="robot_state_publisher",
74  parameters=[moveit_config.robot_description],
75  ),
76  launch_ros.descriptions.ComposableNode(
77  package="tf2_ros",
78  plugin="tf2_ros::StaticTransformBroadcasterNode",
79  name="static_tf2_broadcaster",
80  parameters=[{"child_frame_id": "/panda_link0", "frame_id": "/world"}],
81  ),
82  ],
83  output="screen",
84  )
85 
86  servo_gtest = launch_ros.actions.Node(
87  executable=launch.substitutions.PathJoinSubstitution(
88  [
89  launch.substitutions.LaunchConfiguration("test_binary_dir"),
90  "moveit_servo_cpp_integration_test",
91  ]
92  ),
93  parameters=[
94  servo_params,
95  acceleration_filter_update_period,
96  planning_group_name,
97  moveit_config.robot_description,
98  moveit_config.robot_description_semantic,
99  moveit_config.robot_description_kinematics,
100  ],
101  output="screen",
102  )
103 
104  return launch.LaunchDescription(
105  [
106  launch.actions.DeclareLaunchArgument(
107  name="test_binary_dir",
108  description="Binary directory of package "
109  "containing test executables",
110  ),
111  ros2_control_node,
112  joint_state_broadcaster_spawner,
113  panda_arm_controller_spawner,
114  test_container,
115  launch.actions.TimerAction(period=5.0, actions=[servo_gtest]),
116  launch_testing.actions.ReadyToTest(),
117  ]
118  ), {
119  "servo_gtest": servo_gtest,
120  }
121 
122 
123 class TestGTestWaitForCompletion(unittest.TestCase):
124  # Waits for test to complete, then waits a bit to make sure result files are generated
125  def test_gtest_run_complete(self, servo_gtest):
126  self.proc_info.assertWaitForShutdown(servo_gtest, timeout=4000.0)
127 
128 
129 @launch_testing.post_shutdown_test()
130 class TestGTestProcessPostShutdown(unittest.TestCase):
131  # Checks if the test has been completed with acceptable exit codes (successful codes)
132  def test_gtest_pass(self, proc_info, servo_gtest):
133  launch_testing.asserts.assertExitCodes(proc_info, process=servo_gtest)