PerformAction 教程(Python)

本教程是 Fleet Adapter 教程的扩展,将指导您在 Fleet Adapter 中编写自定义操作。虽然 RMF 提供了一些标准任务,但我们知道不同的机器人可能配备并编程为执行不同类型的操作,例如清洁、拾取物体、远程操作等。通过支持自定义任务,用户可以预先触发 Fleet Adapter 的 config.yaml 中指定的自定义操作,并且 RMF 将放弃对机器人的控制,直到收到机器人已完成自定义操作的信号。您可以探索 在 RMF 中支持新任务 部分,以了解有关支持自定义任务的更多信息以及如何创建自己的任务 JSON 以发送到 RMF。

在本教程中,我们将参考 rmf_demos_fleet_adapter 的简化版本,以在我们的 Fleet Adapter 中实现 Clean PerformAction 功能。

1. 在舰队config.yaml中定义 PerformAction

我们需要在舰队配置中定义操作的名称,以便 RMF 在提交任务时将此操作识别为可执行,并能够将其分派给可以完成该任务的舰队。在 rmf_fleet 部分下的 config.yaml 中,我们可以为舰队提供可执行操作的列表。例如,让我们将 clean 定义为此舰队支持的操作:

rmf_fleet:
  actions: ["clean"]

2. 在我们的舰队适配器内应用操作执行逻辑

RMF 收到包含此操作的任务并将其分派到正确的舰队后,舰队适配器的 execute_action(~) 回调将被触发。解析到此回调的 category 对应于我们之前定义的操作名称,而 description 包含我们可能感兴趣的有关该操作的任何详细信息。

假设这是提交给 RMF 的任务 JSON:

{
  "type": "dispatch_task_request",
  "request": {
    "unix_millis_earliest_start_time": start_time,
    "category": "clean",
    "description": {
      "zone": "clean_lobby"
    }
  }
}

在我们的示例中,提供的“category”将是“clean”,而“description”将包含此任务将我们的机器人引导到哪个清洁区域,即“clean_lobby”。因此,我们需要在“execute_action(~)”中实现逻辑:

    def execute_action(self, category: str, description: dict, execution):
        self.execution = execution

        if category == 'clean':
            self.perform_clean(description['zone'])

    def perform_clean(self, zone):
        if self.api.start_activity(self.name, 'clean', zone):
            self.node.get_logger().info(
                f'Commanding [{self.name}] to clean zone [{zone}]'
            )
        else:
            self.node.get_logger().error(
                f'Fleet manager for [{self.name}] does not know how to '
                f'clean zone [{zone}]. We will terminate the activity.'
            )
            self.execution.finished()
            self.execution = None

由于我们的机器人队列可能能够执行多个自定义操作,因此我们需要进行检查以确保收到的“类别”与我们的目标机器人 API 相匹配。收到“清洁”操作后,我们可以相应地触发机器人的 API。

3. 为自定义操作实现机器人 API

这就是“RobotClientAPI.py”中的“start_activity(~)”方法发挥作用的地方。我们需要它实现对机器人的 API 调用以启动清洁活动。例如,如果机器人 API 使用 REST 调用机器人,则实现的方法可能如下所示:

    def start_activity(
        self,
        robot_name: str,
        activity: str,
        label: str
    ):
        ''' Request the robot to begin a process. This is specific to the robot
            and the use case. For example, load/unload a cart for Deliverybot
            or begin cleaning a zone for a cleaning robot.'''
        url = (
            self.prefix +
            f"/open-rmf/rmf_demos_fm/start_activity?robot_name={robot_name}"
        )
        # data fields: task, map_name, destination{}, data{}
        data = {'activity': activity, 'label': label}
        try:
            response = requests.post(url, timeout=self.timeout, json=data)
            response.raise_for_status()
            if self.debug:
                print(f'Response: {response.json()}')

            if response.json()['success']:
                return True

            # If we get a response with success=False, then
            return False
        except HTTPError as http_err:
            print(f'HTTP error for {robot_name} in start_activity: {http_err}')
        except Exception as err:
            print(f'Other error {robot_name} in start_activity: {err}')
        return False

4. 完成操作

由于我们在“RobotAdapter”中存储了一个“self.execution”对象,因此当任何执行(导航、停止或操作)完成时,我们都会收到通知,因为更新循环会不断调用“is_command_completed”来检查其状态。

    def update(self, state):
        activity_identifier = None
        if self.execution:
            if self.api.is_command_completed():
                self.execution.finished()
                self.execution = None
            else:
                activity_identifier = self.execution.identifier

如果您的实现需要单独的回调来标记执行已完成,您可以创建一个新函数来进行此检查,并在操作完成时调用“self.execution.finished()”。