36 from threading
import Event
37 from threading
import Thread
43 from rclpy.node
import Node
44 from rclpy.qos
import (
49 from std_msgs.msg
import String
54 @pytest.mark.launch_test
56 srdf_publisher = launch_ros.actions.Node(
57 package=
"moveit_ros_planning",
58 parameters=[{
"robot_description_semantic":
"test value"}],
59 executable=
"srdf_publisher",
63 return launch.LaunchDescription(
67 launch_testing.actions.ReadyToTest(),
69 ), {
"srdf_publisher": srdf_publisher}
82 "robot_description_semantic",
84 qos_profile=QoSProfile(
85 reliability=QoSReliabilityPolicy.RELIABLE,
86 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
93 target=
lambda node: rclpy.spin(node), args=(self,)
98 self.
srdfsrdf = msg.data
109 node.start_subscriber()
110 msgs_received_flag = node.msg_event_object.wait(timeout=5.0)
113 ),
"Did not receive message on `/robot_description_semantic`!"
114 assert node.srdf ==
"test value"
119 @launch_testing.post_shutdown_test()
124 launch_testing.asserts.assertExitCodes(proc_info)
def subscriber_callback(self, msg)
def start_subscriber(self)
def __init__(self, name="srdf_observer_node")
def test_rosout_msgs_published(self, proc_output)
def test_exit_code(self, proc_info)
def generate_test_description()