36 from threading
import Event
37 from threading
import Thread
43 from rclpy.node
import Node
44 from rcl_interfaces.msg
import Log
49 @pytest.mark.launch_test
52 dut_process = launch_ros.actions.Node(
53 package=
"moveit_core",
54 executable=launch.substitutions.LaunchConfiguration(
"dut"),
58 return launch.LaunchDescription(
60 launch.actions.DeclareLaunchArgument(
62 description=
"Executable to launch",
66 launch_testing.actions.ReadyToTest(),
68 ), {
"dut_process": dut_process}
72 def __init__(self, name="rosout_observer_node"):
84 target=
lambda node: rclpy.spin(node), args=(self,)
99 node.start_subscriber()
100 msgs_received_flag = node.msg_event_object.wait(timeout=5.0)
101 assert msgs_received_flag,
"Did not receive msgs !"
106 @launch_testing.post_shutdown_test()
111 launch_testing.asserts.assertExitCodes(proc_info)
def start_subscriber(self)
def subscriber_callback(self, data)
def __init__(self, name="rosout_observer_node")
def test_rosout_msgs_published(self, proc_output)
def test_exit_code(self, proc_info)
def generate_test_description()