PerformAction 教程(Python)
本教程是 Fleet Adapter 教程的扩展,将指导您在 Fleet Adapter 中编写自定义操作。虽然 RMF 提供了一些标准任务,但我们知道不同的机器人可能配备并编程为执行不同类型的操作,例如清洁、拾取物体、远程操作等。通过支持自定义任务,用户可以预先触发 Fleet Adapter 的 config.yaml
中指定的自定义操作,并且 RMF 将放弃对机器人的控制,直到收到机器人已完成自定义操作的信号。您可以探索 在 RMF 中支持新任务 部分,以了解有关支持自定义任务的更多信息以及如何创建自己的任务 JSON 以发送到 RMF。
在本教程中,我们将参考 rmf_demos_fleet_adapter
的简化版本,以在我们的 Fleet Adapter 中实现 Clean
PerformAction 功能。
1. 在舰队config.yaml
中定义 PerformAction
我们需要在舰队配置中定义操作的名称,以便 RMF 在提交任务时将此操作识别为可执行,并能够将其分派给可以完成该任务的舰队。在 rmf_fleet
部分下的 config.yaml
中,我们可以为舰队提供可执行操作的列表。例如,让我们将 clean
定义为此舰队支持的操作:
rmf_fleet:
actions: ["clean"]
2. 在我们的舰队适配器内应用操作执行逻辑
RMF 收到包含此操作的任务并将其分派到正确的舰队后,舰队适配器的 execute_action(~)
回调将被触发。解析到此回调的 category
对应于我们之前定义的操作名称,而 description
包含我们可能感兴趣的有关该操作的任何详细信息。
假设这是提交给 RMF 的任务 JSON:
{
"type": "dispatch_task_request",
"request": {
"unix_millis_earliest_start_time": start_time,
"category": "clean",
"description": {
"zone": "clean_lobby"
}
}
}
在我们的示例中,提供的“category”将是“clean”,而“description”将包含此任务将我们的机器人引导到哪个清洁区域,即“clean_lobby”。因此,我们需要在“execute_action(~)”中实现逻辑:
def execute_action(self, category: str, description: dict, execution):
self.execution = execution
if category == 'clean':
self.perform_clean(description['zone'])
def perform_clean(self, zone):
if self.api.start_activity(self.name, 'clean', zone):
self.node.get_logger().info(
f'Commanding [{self.name}] to clean zone [{zone}]'
)
else:
self.node.get_logger().error(
f'Fleet manager for [{self.name}] does not know how to '
f'clean zone [{zone}]. We will terminate the activity.'
)
self.execution.finished()
self.execution = None
由于我们的机器人队列可能能够执行多个自定义操作,因此我们需要进行检查以确保收到的“类别”与我们的目标机器人 API 相匹配。收到“清洁”操作后,我们可以相应地触发机器人的 API。
3. 为自定义操作实现机器人 API
这就是“RobotClientAPI.py”中的“start_activity(~)”方法发挥作用的地方。我们需要它实现对机器人的 API 调用以启动清洁活动。例如,如果机器人 API 使用 REST 调用机器人,则实现的方法可能如下所示:
def start_activity(
self,
robot_name: str,
activity: str,
label: str
):
''' Request the robot to begin a process. This is specific to the robot
and the use case. For example, load/unload a cart for Deliverybot
or begin cleaning a zone for a cleaning robot.'''
url = (
self.prefix +
f"/open-rmf/rmf_demos_fm/start_activity?robot_name={robot_name}"
)
# data fields: task, map_name, destination{}, data{}
data = {'activity': activity, 'label': label}
try:
response = requests.post(url, timeout=self.timeout, json=data)
response.raise_for_status()
if self.debug:
print(f'Response: {response.json()}')
if response.json()['success']:
return True
# If we get a response with success=False, then
return False
except HTTPError as http_err:
print(f'HTTP error for {robot_name} in start_activity: {http_err}')
except Exception as err:
print(f'Other error {robot_name} in start_activity: {err}')
return False
4. 完成操作
由于我们在“RobotAdapter”中存储了一个“self.execution”对象,因此当任何执行(导航、停止或操作)完成时,我们都会收到通知,因为更新循环会不断调用“is_command_completed”来检查其状态。
def update(self, state):
activity_identifier = None
if self.execution:
if self.api.is_command_completed():
self.execution.finished()
self.execution = None
else:
activity_identifier = self.execution.identifier
如果您的实现需要单独的回调来标记执行已完成,您可以创建一个新函数来进行此检查,并在操作完成时调用“self.execution.finished()”。