moveit2
The MoveIt Motion Planning Framework for ROS 2.
srdf_publisher_test.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2024, PickNik Robotics Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # # Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # # Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # # Neither the name of PickNik Robotics Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 # Author: Tyler Weaver
34 
35 import unittest
36 from threading import Event
37 from threading import Thread
38 
39 import launch
40 import launch_ros
41 import launch_testing
42 import rclpy
43 from rclpy.node import Node
44 from rclpy.qos import (
45  QoSProfile,
46  QoSReliabilityPolicy,
47  QoSDurabilityPolicy,
48 )
49 from std_msgs.msg import String
50 
51 import pytest
52 
53 
54 @pytest.mark.launch_test
56  srdf_publisher = launch_ros.actions.Node(
57  package="moveit_ros_planning",
58  parameters=[{"robot_description_semantic": "test value"}],
59  executable="srdf_publisher",
60  output="screen",
61  )
62 
63  return launch.LaunchDescription(
64  [
65  srdf_publisher,
66  # Start tests right away - no need to wait for anything
67  launch_testing.actions.ReadyToTest(),
68  ]
69  ), {"srdf_publisher": srdf_publisher}
70 
71 
72 class SrdfObserverNode(Node):
73  def __init__(self, name="srdf_observer_node"):
74  super().__init__(name)
75  self.msg_event_objectmsg_event_object = Event()
76  self.srdfsrdf = ""
77 
78  def start_subscriber(self):
79  # Create a subscriber
80  self.subscriptionsubscription = self.create_subscription(
81  String,
82  "robot_description_semantic",
83  self.subscriber_callbacksubscriber_callback,
84  qos_profile=QoSProfile(
85  reliability=QoSReliabilityPolicy.RELIABLE,
86  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
87  depth=1,
88  ),
89  )
90 
91  # Add a spin thread
92  self.ros_spin_threadros_spin_thread = Thread(
93  target=lambda node: rclpy.spin(node), args=(self,)
94  )
95  self.ros_spin_threadros_spin_thread.start()
96 
97  def subscriber_callback(self, msg):
98  self.srdfsrdf = msg.data
99  self.msg_event_objectmsg_event_object.set()
100 
101 
102 # These tests will run concurrently with the dut process. After all these tests are done,
103 # the launch system will shut down the processes that it started up
104 class TestFixture(unittest.TestCase):
105  def test_rosout_msgs_published(self, proc_output):
106  rclpy.init()
107  try:
108  node = SrdfObserverNode()
109  node.start_subscriber()
110  msgs_received_flag = node.msg_event_object.wait(timeout=5.0)
111  assert (
112  msgs_received_flag
113  ), "Did not receive message on `/robot_description_semantic`!"
114  assert node.srdf == "test value"
115  finally:
116  rclpy.shutdown()
117 
118 
119 @launch_testing.post_shutdown_test()
120 class TestProcessOutput(unittest.TestCase):
121  def test_exit_code(self, proc_info):
122  # Check that all processes in the launch (in this case, there's just one) exit
123  # with code 0
124  launch_testing.asserts.assertExitCodes(proc_info)
def __init__(self, name="srdf_observer_node")
def test_rosout_msgs_published(self, proc_output)