ROS 2 Integration Tests

Quick example on how to setup integration tests in ROS 2. Specifically running tests on a life cycle node.

The information is generally available online in pieces, but I didn’t see a complete example for what I wanted to do.

This example is primarily sourced from here and here (jazzy).

Package Dependencies

First off add the following dependencies for launch_testing to your package.xml file.

<test_depend>ament_cmake_ros</test_depend>
<test_depend>launch</test_depend>
<test_depend>launch_ros</test_depend>
<test_depend>launch_testing</test_depend>
<test_depend>launch_testing_ament_cmake</test_depend>
<test_depend>rclpy</test_depend>

CMake

cmake_minimum_required(VERSION 3.8)
project(app)

########
# test #
########

if(BUILD_TESTING)
  # Integration tests
  find_package(ament_cmake_ros REQUIRED)
  find_package(launch_testing_ament_cmake REQUIRED)
  function(add_ros_isolated_launch_test path)
    set(RUNNER "${ament_cmake_ros_DIR}/run_test_isolated.py")
    add_launch_test("${path}" RUNNER "${RUNNER}" ${ARGN})
  endfunction()

  add_ros_isolated_launch_test(tests/integration_tests/simple_test.launch.py)
endif()

I added my first test at: tests/integration_tests/simple_test.launch.py

Writing the Test

# simple_test.launch.py
import unittest

import os
import rclpy
import launch
import time
import launch_ros
import launch.actions
import launch_testing.actions
import launch_testing.markers

import pytest

import lifecycle_msgs
from lifecycle_msgs.srv import ChangeState

# This function specifies the processes to be run for our test
@pytest.mark.launch_test
def generate_test_description():
    pass

Launching the Lifecycle Node

Originally I thought I’d call the lifecycle state change service directly, but that does work (most likely the test runner is not multi-threaded and therefore calling the service causes a deadlock)

The way you’re supposed to do this is adding the state change events to the launch file actions.

@pytest.mark.launch_test
def generate_test_description():
    node = launch_ros.actions.LifecycleNode(
            package='nifty',
            executable='nifty_node',
            name='nifty',
            namespace='',
          )
    inactive_event = launch.actions.RegisterEventHandler(
        launch_ros.event_handlers.OnStateTransition(
            target_lifecycle_node=node, goal_state='inactive',
            entities=[
                launch.actions.LogInfo(
                    msg="node 'nifty' reached the 'inactive' state, 'activating'."),
                launch.actions.EmitEvent(event=launch_ros.events.lifecycle.ChangeState(
                    lifecycle_node_matcher=launch.events.matches_action(node),
                    transition_id=lifecycle_msgs.msg.Transition.TRANSITION_ACTIVATE,
                )),
            ],
        )
    )
    active_event = launch.actions.RegisterEventHandler(
        launch_ros.event_handlers.OnStateTransition(
            target_lifecycle_node=node, goal_state='active',
            entities=[
                launch.actions.LogInfo(
                    msg="node 'nifty' reached the 'active' state")
            ],
        )
    )
    configure_transition = launch.actions.EmitEvent(
                            event=launch_ros.events.lifecycle.ChangeState(
                                lifecycle_node_matcher=launch.events.matches_action(node),
                                transition_id=lifecycle_msgs.msg.Transition.TRANSITION_CONFIGURE,
                            )
                          )

    return launch.LaunchDescription([
        node,
        inactive_event,
        active_event,
        configure_transition,
        # Tell launch to start the test
        launch.actions.TimerAction(period=1.0, actions=[launch_testing.actions.ReadyToTest()]),
    ])

I wish there was a less verbose way to write this. But effectively all it’s doing is telling the launch system to transition the node into configure and then active state.

Writing a Test

# Active tests
class TestNifyNode(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        rclpy.init()

    @classmethod
    def tearDownClass(cls):
        rclpy.shutdown()

    def setUp(self):
        self.node = rclpy.create_node('simple_test')

    def tearDown(self):
        self.node.destroy_node()

    def test_publishes_pose(self, proc_output):
        """Check whether pose messages published"""
        msgs_rx = []
        sub = self.node.create_subscription(
            Pose, 'nifty/topic',
            lambda msg: msgs_rx.append(msg), 100)
        try:
            # Listen to the pose topic for 10 s
            end_time = time.time() + 10
            while time.time() < end_time:
                # spin to get subscriber callback executed
                rclpy.spin_once(self.node, timeout_sec=1)
            # There should have been 100 messages received
            assert len(msgs_rx) > 100
        finally:
            self.node.destroy_subscription(sub)