Fleet Adapter Tutorial (Python)

fleet_adapter 充当机器人和核心 RMF 系统之间的桥梁。

其职责包括但不限于:

  • 使用车队机器人的位置更新交通时间表

  • 响应任务

  • 控制供应商机器人。

fleet_adapter 接收有关队列中每个机器人的信息(位置、当前正在进行的任务、电池电量等),并将其发送到核心 RMF 系统进行任务规划和调度。

  • 当核心 RMF 系统有任务要分派时,它会与各个队列适配器通信,以检查哪个队列适合执行此任务。

  • 它发送请求,队列适配器通过提交其队列机器人的可用性和状态来响应。

  • RMF 确定最适合该任务的队列并响应中标,即选定的队列。响应包含与委派任务相关的导航命令。

  • 然后,队列适配器将通过适当的 API 将导航命令发送给机器人。

下面提供的教程基于 rmf_demos,该实现在 rmf_demos 存储库中实现。此特定实现使用 Python 编写,并使用 REST API 作为舰队适配器和舰队管理器之间的接口。您可以选择使用其他 API 进行自己的集成。

1. Pre-requisites

获取依赖项

在运行您的车队适配器之前,请确保您已按照此处 的说明安装了 ROS 2 和 RMF。您可以选择安装二进制文件或从源代码构建两者。您可能还希望前往我们的 RMF Github repo 以获取 RMF 安装的最新更新和说明。

如果您从源代码构建了 ROS 2 和/或 RMF,请确保在继续下一步之前获取包含其构建代码的工作区。

在我们的示例中,rmf_demos_fleet_adapter 使用 REST API 作为车队适配器和机器人车队管理器之间的接口,因此要使演示正常运行,我们需要安装使用 FastAPI 所需的依赖项。

pip3 install fastapi uvicorn

此步骤仅适用于此实现;根据您自己的车队管理器使用的 API,您必须相应地安装任何必要的依赖项。

开始使用车队适配器模板

创建一个工作区并克隆 fleet_adapter_template 存储库。

mkdir -p ~/rmf_ws/src
cd ~/rmf_ws/src/
git clone https://github.com/open-rmf/fleet_adapter_template.git

此模板包含完全控制和轻松完全控制队列适配器的代码。两种实现均使用 RobotClientAPI.py 中的 API 调用与机器人进行通信。

2. Update the config.yaml file

config.yaml 文件包含设置车队适配器的重要参数。用户应首先更新这些描述其车队机器人的配置。

务必遵循下面示例 config.yaml 中提供的字段,否则在将此 YAML 文件解析到车队适配器时会出现导入错误。如果您想编辑任何字段名称或值范围,甚至附加其他字段,请确保您还修改了车队适配器代码中处理此配置导入的部分。

某些字段是可选的,如下所示。

# FLEET CONFIG =================================================================
# RMF Fleet parameters

rmf_fleet:
  name: "tinyRobot"
  limits:
    linear: [0.5, 0.75] # velocity, acceleration
    angular: [0.6, 2.0] # velocity, acceleration
  profile: # Robot profile is modelled as a circle
    footprint: 0.3 # radius in m
    vicinity: 0.5 # radius in m
  reversible: True # whether robots in this fleet can reverse
  battery_system:
    voltage: 12.0 # V
    capacity: 24.0 # Ahr
    charging_current: 5.0 # A
  mechanical_system:
    mass: 20.0 # kg
    moment_of_inertia: 10.0 #kgm^2
    friction_coefficient: 0.22
  ambient_system:
    power: 20.0 # W
  tool_system:
    power: 0.0 # W
  recharge_threshold: 0.10 # Battery level below which robots in this fleet will not operate
  recharge_soc: 1.0 # Battery level to which robots in this fleet should be charged up to during recharging tasks
  publish_fleet_state: 10.0 # Publish frequency for fleet state, ensure that it is same as robot_state_update_frequency
  account_for_battery_drain: True
  task_capabilities: # Specify the types of RMF Tasks that robots in this fleet are capable of performing
    loop: True
    delivery: True
  actions: ["teleop"]
  finishing_request: "park" # [park, charge, nothing]
  responsive_wait: True # Should responsive wait be on/off for the whole fleet by default? False if not specified.
  robots:
    tinyRobot1:
        charger: "tinyRobot1_charger"
        responsive_wait: False # Should responsive wait be on/off for this specific robot? Overrides the fleet-wide setting.
    tinyRobot2:
        charger: "tinyRobot2_charger"
        # No mention of responsive_wait means the fleet-wide setting will be used

  robot_state_update_frequency: 10.0 # Hz

fleet_manager:
  prefix: "http://127.0.0.1:8080"
  user: "some_user"
  password: "some_password"

# TRANSFORM CONFIG =============================================================
# For computing transforms between Robot and RMF coordinate systems

# Optional
reference_coordinates:
  L1:
    rmf: [[20.33, -3.156],
          [8.908, -2.57],
          [13.02, -3.601],
          [21.93, -4.124]]
    robot: [[59, 399],
          [57, 172],
          [68, 251],
          [75, 429]]
  • rmf_fleet: 重要的车队参数,包括车辆特性、任务能力和用于连接车队经理的用户信息。

    • limits: 线性和角加速度和速度的最大值。

    • profile: 该车队车辆的覆盖范围半径和个人附近区域。

    • reversible: 用于启用/禁用机器人反向遍历的标志。

    • battery_system: 有关电池电压、容量和充电电流的信息。

    • recharge_threshold: 设置最低充电值,低于该值时机器人必须返回其充电器。

    • recharge_soc: 机器人需要充电至的总电池容量的分数。

    • task_capabilities: 机器人在‘循环’、‘运送’和‘清洁’之间可以执行的任务。

    • account_for_battery_drain: RMF 在调度任务之前是否应该考虑机器人的电池消耗。

    • action [Optional]: 舰队可执行的自定义操作列表。

    • finishing_request: 机器人完成任务后应该做什么,可以设置为“停止”,“充电”或“不做任何事”。

    • responsive_wait [Optional]: True # 默认情况下,整个舰队的响应等待是否应打开/关闭?如果未指定,则为 false。

    • robots: 有关队列中每个机器人的信息。本节中的每个项目对应于队列中单个机器人的配置。您可以相应地添加更多机器人。

      • tinyRobot1: 机器人的名称。

        • charger: 机器人充电点的名称。

        • responsive_wait: 该特定机器人是否应打开/关闭其响应等待。覆盖全队设置。

    • robot_state_update_frequency: 机器人应该多久更新一次车队。

  • fleet_manager:可配置 prefixuserpassword 字段以适合您所选的 API。如果您确实要修改它们,请确保也编辑 RobotClientAPI.py 中的相应字段。这些参数将用于设置与您的车队管理器/机器人的连接。

  • reference_coordinates [可选]:如果车队机器人未在与 RMF 相同的坐标系中运行,您可以提供两组 (x, y) 坐标,它们对应于每个系统中的相同位置。这有助于估计从一个帧到另一个帧的坐标转换。建议至少有 4 个匹配的航点。

注意:这未在 rmf_demos_fleet_adapter 中实现,因为演示机器人和 RMF 使用相同的坐标系。

3. 创建导航图

需要将导航图解析到车队适配器,以便 RMF 能够理解机器人的环境。可以使用提供的 RMF 交通编辑器building_map_generator nav CLI 创建导航图。请参阅交通编辑器 repo 的 README 以获取安装和地图生成说明。

您可能还想查看本书的 交通编辑器 部分,了解有关创建自己的数字地图的详细信息和说明。

现在您应该有一个 YAML 文件,其中包含有关车道和航点的信息(以及其他信息),这些信息描述了您的机器人车队可以采取的路径。

4. 填写您的 RobotAPI

RobotClientAPI.py 提供了一组由舰队适配器使用的方法。当 RMF 需要通过舰队适配器向托管机器人发送或检索信息时,会触发这些回调。为了满足您选择的界面,您需要在 RobotAPI 中填写标有 # IMPLEMENT YOUR CODE HERE # 的缺失代码块,并添加发送或检索相应信息的逻辑。例如,如果您的机器人使用 REST API 与舰队适配器交互,您将需要在这些函数中对适当的端点进行 HTTP 请求调用。

您可以参考为 rmf_demos_fleet_adapter 实现的 RobotAPI 类,了解如何填充这些方法的示例。

  • navigate:向机器人 API 发送导航命令。它从 RMF 中获取目的地坐标、所需地图名称和可选速度限制。
  • start_activity:向机器人发送命令以开始执行任务。此方法对于由 execute_action() 触发的自定义可执行操作很有用。
  • stop:命令机器人停止移动。
  • positionmapbattery_soc:以 [x, y, theta] 格式检索机器人在其坐标系中的当前位置、其当前地图名称和电池充电状态。在 rmf_demos_fleet_adapter 中,这些方法合并在 get_data() 下。
  • is_command_completed:检查机器人是否已完成正在进行的过程或任务。在 rmf_demos_fleet_adapter 中,这在 RobotUpdateData 类下实现。根据您的机器人 API,您可以选择以任何一种方式集成它。此回调将帮助 RMF 识别已调度命令的完成时间,并继续发送后续命令。

如果需要,可以向 RobotAPI 添加更多参数以用于这些回调,例如身份验证详细信息和任务 ID。您可能还希望在 RobotAPIfleet_adapter.py 中为特定用例编写其他方法。 rmf_demos_fleet_adapter 实现演示了 Teleoperation 操作,这将在 PerformAction 教程 中进行更详细的阐述。

5. 创建您的车队适配器!

现在我们已经准备好组件,我们可以开始创建我们的车队适配器了。fleet_adapter.py 使用 Easy Full Control API 轻松创建 Adapter 实例,并通过解析我们之前准备的配置 YAML 文件来设置车队配置和机器人。由于我们已经定义了 RobotAPI,因此 fleet_adapter.py 中的回调将使用所实现的方法,以便 RMF 可以检索机器人信息并适当地发送导航或操作命令。

您可能希望使用车队适配器模板中提供的 fleet_adapter.py,并根据您希望车队实现的目标对其进行修改。

6. 运行您的舰队适配器

此时,您应该已准备好 4 个组件以运行您的舰队适配器:

  • fleet_adapter.py
  • RobotClientAPI.py
  • 舰队 config.yaml 文件
  • 导航图

构建你的舰队适配器包

如果您克隆了 fleet_adapter_template 存储库,那么您的 Python 脚本就已经包含在 ROS 2 包中。否则,您可以按照 此处 中的说明在您的工作区中创建一个包。对于以下说明,我们将使用 fleet_adapter_template 包中使用的包和模块名称。

将脚本放在相应的文件夹中后,返回工作区的根目录并构建包。

colcon build --packages-select fleet_adapter_template

运行!

我们现在将获取工作区并运行 fleet 适配器:

. ~/rmf_ws/install/setup.bash

ros2 run fleet_adapter_template fleet_adapter -c <path-to-config> -n <path-to-nav-graph>

7. 深入研究代码 [可选]

以下步骤详细说明了 Easy Full Control 车队适配器以及代码的每个部分的作用。

a. 导入重要参数并创建适配器

运行车队适配器时,我们需要解析我们在前面的步骤中创建的车队配置文件和导航图。这些文件将传递给 EasyFullControl API 以设置适配器的车队配置。

    config_path = args.config_file
    nav_graph_path = args.nav_graph

    fleet_config = rmf_easy.FleetConfiguration.from_config_files(
        config_path, nav_graph_path
    )
    assert fleet_config, f'Failed to parse config file [{config_path}]'

    # Parse the yaml in Python to get the fleet_manager info
    with open(config_path, "r") as f:
        config_yaml = yaml.safe_load(f)

使用这些参数,我们可以创建一个适配器实例并向其添加一个 EasyFullControl 队列。如果适配器应根据模拟时钟运行或向任何 websocket 服务器广播任务更新,我们还需要配置 use_sim_timeserver_uri 参数。

    # ROS 2 node for the command handle
    fleet_name = fleet_config.fleet_name
    node = rclpy.node.Node(f'{fleet_name}_command_handle')
    adapter = Adapter.make(f'{fleet_name}_fleet_adapter')
    assert adapter, (
        'Unable to initialize fleet adapter. '
        'Please ensure RMF Schedule Node is running'
    )

    # Enable sim time for testing offline
    if args.use_sim_time:
        param = Parameter("use_sim_time", Parameter.Type.BOOL, True)
        node.set_parameters([param])
        adapter.node.use_sim_time()

    adapter.start()
    time.sleep(1.0)

    if args.server_uri == '':
        server_uri = None
    else:
        server_uri = args.server_uri

    fleet_config.server_uri = server_uri
    fleet_handle = adapter.add_easy_fleet(fleet_config)

b. 配置 RMF 和机器人之间的转换

我们定义了一个辅助函数来计算 RMF 和机器人坐标之间的转换。如果您的机器人在与 RMF 相同的坐标下运行(例如在模拟中),则您不需要这部分代码。

def compute_transforms(level, coords, node=None):
    """Get transforms between RMF and robot coordinates."""
    rmf_coords = coords['rmf']
    robot_coords = coords['robot']
    tf = nudged.estimate(rmf_coords, robot_coords)
    if node:
        mse = nudged.estimate_error(tf, rmf_coords, robot_coords)
        node.get_logger().info(
            f"Transformation error estimate for {level}: {mse}"
        )

    return Transformation(
        tf.get_rotation(),
        tf.get_scale(),
        tf.get_translation()
    )
    # Configure the transforms between robot and RMF frames
    for level, coords in config_yaml['reference_coordinates'].items():
        tf = compute_transforms(level, coords, node)
        fleet_config.add_robot_coordinates_transformation(level, tf)

根据集成所需的地图数量(或级别),您将提取每个地图的相应坐标变换并将其添加到 FleetConfiguration 对象。如果您将 rclpy.Node 传递给此函数,则此函数将记录变换误差估计。

然后,在我们的 main 函数中,我们将计算出的变换添加到我们的 FleetConfiguration。EasyFullControl 车队适配器将处理这些变换并相应地在机器人的坐标中发出导航命令。

    # Configure the transforms between robot and RMF frames
    for level, coords in config_yaml['reference_coordinates'].items():
        tf = compute_transforms(level, coords, node)
        fleet_config.add_robot_coordinates_transformation(level, tf)

c. 初始化机器人 API 并设置 RobotAdapter

config.yaml 可能包含我们连接到机器人或机器人车队管理器所需的任何连接凭据。我们将其解析为 RobotAPI,以便轻松在 RMF 和机器人的 API 之间进行交互。这完全是可选的;为了更安全地存储凭据,请相应地将它们导入 RobotAPI。

    # Initialize robot API for this fleet
    fleet_mgr_yaml = config_yaml['fleet_manager']
    api = RobotAPI(fleet_mgr_yaml)

根据我们“config.yaml”中的已知机器人列表,我们可以为每个要添加到机器人队伍中的机器人初始化一个“RobotAdapter”类。

    robots = {}
    for robot_name in fleet_config.known_robots:
        robot_config = fleet_config.get_known_robot_configuration(robot_name)
        robots[robot_name] = RobotAdapter(
            robot_name, robot_config, node, api, fleet_handle
        )

d. 检索机器人状态并将机器人添加到车队

此更新循环将允许我们使用机器人的信息异步更新“RobotUpdateHandle”,这样从一个机器人检索状态时发生的任何错误都不会阻止其他机器人更新车队适配器。

    update_period = 1.0/config_yaml['rmf_fleet'].get(
        'robot_state_update_frequency', 10.0
    )

    def update_loop():
        asyncio.set_event_loop(asyncio.new_event_loop())
        while rclpy.ok():
            now = node.get_clock().now()

            # Update all the robots in parallel using a thread pool
            update_jobs = []
            for robot in robots.keys():
                update_jobs.append(update_robot(robot))

            asyncio.get_event_loop().run_until_complete(
                asyncio.wait(update_jobs)
            )

            next_wakeup = now + Duration(nanoseconds=update_period*1e9)
            while node.get_clock().now() < next_wakeup:
                time.sleep(0.001)

    update_thread = threading.Thread(target=update_loop, args=())
    update_thread.start()

调用函数“update_robot()”以确保我们的机器人的当前地图、位置和电池充电状态将得到正确更新。如果机器人是新加入到车队句柄中的,我们将通过“add_robot()”将其添加进去。

@parallel
def update_robot(robot: RobotAdapter):
    data = robot.api.get_data(robot.name)
    if data is None:
        return

    state = rmf_easy.RobotState(
        data.map,
        data.position,
        data.battery_soc
    )

    if robot.update_handle is None:
        robot.update_handle = robot.fleet_handle.add_robot(
            robot.name,
            state,
            robot.configuration,
            robot.make_callbacks()
        )
        return

    robot.update(state)

e. 在 RobotAdapter 类中

RobotAdapter 类帮助我们跟踪机器人可能正在执行的任何正在进行的过程,并在 RMFs 发送相应命令时执行正确的操作。

class RobotAdapter:
    def __init__(
        self,
        name: str,
        configuration,
        node,
        api: RobotAPI,
        fleet_handle
    ):
        self.name = name
        self.execution = None
        self.update_handle = None
        self.configuration = configuration
        self.node = node
        self.api = api
        self.fleet_handle = fleet_handle

我们需要将 3 个重要的回调传递给 EasyFullControl API:

  • navigate
  • stop
  • execute_action

如上所述,当 RMF 需要命令机器人执行某些操作时,每个回调都会被触发。因此,我们在“RobotAdapter”中定义了这些回调:

    def navigate(self, destination, execution):
        self.execution = execution
        self.node.get_logger().info(
            f'Commanding [{self.name}] to navigate to {destination.position} '
            f'on map [{destination.map}]'
        )

        self.api.navigate(
            self.name,
            destination.position,
            destination.map,
            destination.speed_limit
        )

    def stop(self, activity):
        if self.execution is not None:
            if self.execution.identifier.is_same(activity):
                self.execution = None
                self.stop(self.name)

    def execute_action(self, category: str, description: dict, execution):
        ''' Trigger a custom action you would like your robot to perform.
        You may wish to use RobotAPI.start_activity to trigger different
        types of actions to your robot.'''
        self.execution = execution
        # ------------------------ #
        # IMPLEMENT YOUR CODE HERE #
        # ------------------------ #
        return

请注意,execute_action(~) 在舰队适配器模板中没有任何实现的代码。此回调旨在灵活,并满足 RMF 提供的任务下可能无法使用的自定义可执行操作。您可以在 PerformAction 教程 部分中了解如何设计和编写自己的操作并从舰队适配器执行它们。

    def make_callbacks(self):
        return rmf_easy.RobotCallbacks(
            lambda destination, execution: self.navigate(
                destination, execution
            ),
            lambda activity: self.stop(activity),
            lambda category, description, execution: self.execute_action(
                category, description, execution
            )
        )

最后,我们使用RobotCallbacks() API 将所有回调添加到我们的队列适配器。