moveit2
The MoveIt Motion Planning Framework for ROS 2.
rosout_publish_test.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2023, PickNik Robotics Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # # Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # # Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # # Neither the name of Willow Garage nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 # Author: Tyler Weaver
34 
35 import unittest
36 from threading import Event
37 from threading import Thread
38 
39 import launch
40 import launch_ros
41 import launch_testing
42 import rclpy
43 from rclpy.node import Node
44 from rcl_interfaces.msg import Log
45 
46 import pytest
47 
48 
49 @pytest.mark.launch_test
51  # dut: device under test
52  dut_process = launch_ros.actions.Node(
53  package="moveit_core",
54  executable=launch.substitutions.LaunchConfiguration("dut"),
55  output="screen",
56  )
57 
58  return launch.LaunchDescription(
59  [
60  launch.actions.DeclareLaunchArgument(
61  "dut",
62  description="Executable to launch",
63  ),
64  dut_process,
65  # Start tests right away - no need to wait for anything
66  launch_testing.actions.ReadyToTest(),
67  ]
68  ), {"dut_process": dut_process}
69 
70 
72  def __init__(self, name="rosout_observer_node"):
73  super().__init__(name)
74  self.msg_event_objectmsg_event_object = Event()
75 
76  def start_subscriber(self):
77  # Create a subscriber
78  self.subscriptionsubscription = self.create_subscription(
79  Log, "rosout", self.subscriber_callbacksubscriber_callback, 10
80  )
81 
82  # Add a spin thread
83  self.ros_spin_threadros_spin_thread = Thread(
84  target=lambda node: rclpy.spin(node), args=(self,)
85  )
86  self.ros_spin_threadros_spin_thread.start()
87 
88  def subscriber_callback(self, data):
89  self.msg_event_objectmsg_event_object.set()
90 
91 
92 # These tests will run concurrently with the dut process. After all these tests are done,
93 # the launch system will shut down the processes that it started up
94 class TestFixture(unittest.TestCase):
95  def test_rosout_msgs_published(self, proc_output):
96  rclpy.init()
97  try:
98  node = MakeRosoutObserverNode()
99  node.start_subscriber()
100  msgs_received_flag = node.msg_event_object.wait(timeout=5.0)
101  assert msgs_received_flag, "Did not receive msgs !"
102  finally:
103  rclpy.shutdown()
104 
105 
106 @launch_testing.post_shutdown_test()
107 class TestProcessOutput(unittest.TestCase):
108  def test_exit_code(self, proc_info):
109  # Check that all processes in the launch (in this case, there's just one) exit
110  # with code 0
111  launch_testing.asserts.assertExitCodes(proc_info)
def __init__(self, name="rosout_observer_node")
def test_rosout_msgs_published(self, proc_output)