User-defined Custom Tasks in RMF Task

注意:用户定义的自定义任务目前处于实验阶段

新的通用任务组合系统正在开发中,讨论可在此处 找到。

处理 RMF 任务时,有两个包:

rmf_task 提供 API 和基类,用于在 RMF 中定义和管理任务。任务被定义为生成阶段的对象,这些阶段是一系列有意义的步骤,可产生理想的结果。每个任务都有一个描述和一个组件,允许我们在给定初始状态的情况下模拟机器人完成任务后的状态,还有一个组件将命令实际机器人执行任务。

rmf_task_sequence 提供了 rmf_task 的开箱即用实现,其中 Task 对象由一系列阶段定义。因此,此类任务生成的阶段将与用于定义它们的阶段序列相匹配。 rmf_task_sequence 中定义的阶段反过来是事件的集合,这些事件还具有用于在事件期间模拟最终状态和命令机器人的组件。目前支持 此处 定义的事件。 然后,用户可以通过将此类阶段/事件的序列串联在一起来构建任务的任意定义。RMF 能够规划和执行此类任务。

perform_action 是一个基于序列的事件,支持执行自定义操作。 perform_action 行为的可定制性是有限的,因此实施自定义逻辑的用户无需担心如何与交通系统交互、开门或使用电梯。这也最大限度地降低了系统集成商引入错误的风险,从而扰乱交通系统或任何其他资源共享系统。当 perform_action 运行时,机器人将成为“只读”交通代理,因此其他机器人将简单地避开它

用户将在 rmf_fleet_adapter 层上工作。在此层中,API 仅限于使用 rmf_task_sequence 来执行任务。仅支持某些事件,它们的描述可以在 此处 找到。

rmf_fleet_adapter 层充当用户可以使用的 API。它仅支持对 此处 中提到的现有事件中的 perform_action 进行自定义行为。 用户只能在 perform_action 中添加自定义任务。RMF 将命令传递给舰队适配器集成的平台特定端,并正式释放对机器人的控制,直到操作完成。 要在 perform_action 中使用自定义任务,用户需要使用 API 的两个部分。

  1. FleetUpdateHandle::add_performable_action 它由两部分组成:第一部分是动作的“类别”,第二部分是“考虑”部分,将根据该部分决定是否接受该动作。

以下是一个例子:

rmf_fleet:
  name: "ecobot40"
  limits:
    linear: [1.2, 1.5] # velocity, acceleration
    angular: [0.6, 0.6]
  profile: # Robot profile is modelled as a circle
    footprint: 0.5
    vicinity: 0.6
  reversible: False
  battery_system:
    voltage: 24.0
    capacity: 40.0
    charging_current: 26.4
  mechanical_system:
    mass: 80.0
    moment_of_inertia: 20.0
    friction_coefficient: 0.20
  ambient_system:
    power: 20.0
  cleaning_system:
    power: 760.0
  recharge_threshold: 0.05
  recharge_soc: 1.0
  publish_fleet_state: True
  account_for_battery_drain: True
  task_capabilities: # Specify the types of RMF Tasks that robots in this fleet are capable of performing
    loop: True
    delivery: False
    clean: True
    finishing_request: "park"
    action_categories: ["clean", "manual_control"]
def _consider(description: dict):
        confirm = adpt.fleet_update_handle.Confirmation()

        # Currently there's no way for user to submit a robot_task_request
        # .json file via the rmf-web dashboard. Thus the short term solution
        # is to add the fleet_name info into action description. NOTE
        # only matching fleet_name action will get accepted
        if (description["category"] == "manual_control" and
            description["description"]["fleet_name"] != fleet_name):
                return confirm

        node.get_logger().warn(
            f"Accepting action: {description} ")
        confirm.accept()
        return confirm
fleet_config = config_yaml['rmf_fleet']
task_capabilities_config = fleet_config['task_capabilities']
if 'action_categories' in task_capabilities_config:

	for cat in task_capabilities_config['action_categories']:

		node.get_logger().info(

			f"Fleet [{fleet_name}] is configured"

			f" to perform action of category [{cat}]")

		fleet_handle.add_performable_action(cat, _consider)
  1. RobotUpdateHandle::set_action_executor 在这里,您可以告诉舰队适配器如何指示您的机器人开始执行操作。此函数的回调包括:
  • category(string) 类型的操作。
  • description(JSON) 消息,其中包含有关如何执行操作的详细信息。
  • execution(object) 对象,在执行操作时,队列适配器的平台特定端必须持有该对象,理想情况下会定期更新剩余时间estimates.

在“perform_action”中使用自定义任务时,机器人不会参与交通协商。这意味着它将被允许将其轨迹报告给交通调度,从而使其他机器人能够避开它。但是,在任务完成之前,机器人将无法容纳其他机器人。

这是一个例子:

# call this when starting cleaning execution
    def _action_executor(self,
                        category: str,
                        description: dict,
                        execution:
                        adpt.robot_update_handle.ActionExecution):
        with self._lock:
            # only accept clean and manual_control
            assert(category in ["clean", "manual_control"])

            self.action_category = category
            if (category == "clean"):
                attempts = 0
                self.api.set_cleaning_mode(self.config['active_cleaning_config'])
                while True:
                    self.node.get_logger().info(f"Requesting robot {self.name} to clean {description}")
                    if self.api.start_clean(description["clean_task_name"], self.robot_map_name):
                        self.check_task_completion = self.api.task_completed # will check api
                        break
                    if (attempts > 3):
                        self.node.get_logger().warn(
                            f"Failed to initiate cleaning action for robot [{self.name}]")
                        # TODO: issue error ticket
                        self.in_error = True # TODO: toggle error back
                        execution.error("Failed to initiate cleaning action for robot {self.name}")
                        execution.finished()
                        return
                    attempts+=1
                    time.sleep(1.0)
            elif (category == "manual_control"):
                self.check_task_completion = lambda:False  # user can only cancel the manual_control

            # start Perform Action
            self.node.get_logger().warn(f"Robot [{self.name}] starts [{category}] action")
            self.start_action_time = self.adapter.now()
            self.on_waypoint = None
            self.on_lane = None
            self.action_execution = execution
            self.stubbornness = self.update_handle.unstable_be_stubborn()
            # robot moves slower during perform action
            self.vehicle_traits.linear.nominal_velocity *= 0.2
self.update_handle.set_action_executor(self._action_executor)

这些示例是以下内容的一部分 repository.