User-defined Custom Tasks in RMF Task
注意:用户定义的自定义任务目前处于实验阶段
新的通用任务组合系统正在开发中,讨论可在此处 找到。
处理 RMF 任务时,有两个包:
rmf_task
提供 API 和基类,用于在 RMF 中定义和管理任务。任务被定义为生成阶段的对象,这些阶段是一系列有意义的步骤,可产生理想的结果。每个任务都有一个描述和一个组件,允许我们在给定初始状态的情况下模拟机器人完成任务后的状态,还有一个组件将命令实际机器人执行任务。
rmf_task_sequence
提供了 rmf_task
的开箱即用实现,其中 Task
对象由一系列阶段定义。因此,此类任务生成的阶段将与用于定义它们的阶段序列相匹配。
rmf_task_sequence
中定义的阶段反过来是事件的集合,这些事件还具有用于在事件期间模拟最终状态和命令机器人的组件。目前支持 此处 定义的事件。
然后,用户可以通过将此类阶段/事件的序列串联在一起来构建任务的任意定义。RMF 能够规划和执行此类任务。
perform_action
是一个基于序列的事件,支持执行自定义操作。
perform_action
行为的可定制性是有限的,因此实施自定义逻辑的用户无需担心如何与交通系统交互、开门或使用电梯。这也最大限度地降低了系统集成商引入错误的风险,从而扰乱交通系统或任何其他资源共享系统。当 perform_action
运行时,机器人将成为“只读”交通代理,因此其他机器人将简单地避开它
用户将在 rmf_fleet_adapter
层上工作。在此层中,API 仅限于使用 rmf_task_sequence
来执行任务。仅支持某些事件,它们的描述可以在 此处 找到。
rmf_fleet_adapter
层充当用户可以使用的 API。它仅支持对 此处 中提到的现有事件中的 perform_action
进行自定义行为。
用户只能在 perform_action
中添加自定义任务。RMF 将命令传递给舰队适配器集成的平台特定端,并正式释放对机器人的控制,直到操作完成。
要在 perform_action
中使用自定义任务,用户需要使用 API 的两个部分。
- FleetUpdateHandle::add_performable_action 它由两部分组成:第一部分是动作的“类别”,第二部分是“考虑”部分,将根据该部分决定是否接受该动作。
以下是一个例子:
rmf_fleet:
name: "ecobot40"
limits:
linear: [1.2, 1.5] # velocity, acceleration
angular: [0.6, 0.6]
profile: # Robot profile is modelled as a circle
footprint: 0.5
vicinity: 0.6
reversible: False
battery_system:
voltage: 24.0
capacity: 40.0
charging_current: 26.4
mechanical_system:
mass: 80.0
moment_of_inertia: 20.0
friction_coefficient: 0.20
ambient_system:
power: 20.0
cleaning_system:
power: 760.0
recharge_threshold: 0.05
recharge_soc: 1.0
publish_fleet_state: True
account_for_battery_drain: True
task_capabilities: # Specify the types of RMF Tasks that robots in this fleet are capable of performing
loop: True
delivery: False
clean: True
finishing_request: "park"
action_categories: ["clean", "manual_control"]
def _consider(description: dict):
confirm = adpt.fleet_update_handle.Confirmation()
# Currently there's no way for user to submit a robot_task_request
# .json file via the rmf-web dashboard. Thus the short term solution
# is to add the fleet_name info into action description. NOTE
# only matching fleet_name action will get accepted
if (description["category"] == "manual_control" and
description["description"]["fleet_name"] != fleet_name):
return confirm
node.get_logger().warn(
f"Accepting action: {description} ")
confirm.accept()
return confirm
fleet_config = config_yaml['rmf_fleet']
task_capabilities_config = fleet_config['task_capabilities']
if 'action_categories' in task_capabilities_config:
for cat in task_capabilities_config['action_categories']:
node.get_logger().info(
f"Fleet [{fleet_name}] is configured"
f" to perform action of category [{cat}]")
fleet_handle.add_performable_action(cat, _consider)
- RobotUpdateHandle::set_action_executor 在这里,您可以告诉舰队适配器如何指示您的机器人开始执行操作。此函数的回调包括:
category(string)
类型的操作。description(JSON)
消息,其中包含有关如何执行操作的详细信息。execution(object)
对象,在执行操作时,队列适配器的平台特定端必须持有该对象,理想情况下会定期更新剩余时间estimates.
在“perform_action”中使用自定义任务时,机器人不会参与交通协商。这意味着它将被允许将其轨迹报告给交通调度,从而使其他机器人能够避开它。但是,在任务完成之前,机器人将无法容纳其他机器人。
这是一个例子:
# call this when starting cleaning execution
def _action_executor(self,
category: str,
description: dict,
execution:
adpt.robot_update_handle.ActionExecution):
with self._lock:
# only accept clean and manual_control
assert(category in ["clean", "manual_control"])
self.action_category = category
if (category == "clean"):
attempts = 0
self.api.set_cleaning_mode(self.config['active_cleaning_config'])
while True:
self.node.get_logger().info(f"Requesting robot {self.name} to clean {description}")
if self.api.start_clean(description["clean_task_name"], self.robot_map_name):
self.check_task_completion = self.api.task_completed # will check api
break
if (attempts > 3):
self.node.get_logger().warn(
f"Failed to initiate cleaning action for robot [{self.name}]")
# TODO: issue error ticket
self.in_error = True # TODO: toggle error back
execution.error("Failed to initiate cleaning action for robot {self.name}")
execution.finished()
return
attempts+=1
time.sleep(1.0)
elif (category == "manual_control"):
self.check_task_completion = lambda:False # user can only cancel the manual_control
# start Perform Action
self.node.get_logger().warn(f"Robot [{self.name}] starts [{category}] action")
self.start_action_time = self.adapter.now()
self.on_waypoint = None
self.on_lane = None
self.action_execution = execution
self.stubbornness = self.update_handle.unstable_be_stubborn()
# robot moves slower during perform action
self.vehicle_traits.linear.nominal_velocity *= 0.2
self.update_handle.set_action_executor(self._action_executor)
这些示例是以下内容的一部分 repository.