datetime:2024/04/28 11:06
author:nzb
导航
https://github.com/ros-planning/navigation2.git
分支:humble
节点:3ed4c2d
应用及工具
nav2_waypoint_follower
| 航路点追随者,看README.md
nav2_simple_commander
| 简单的应用demo,python版本API,看README.md
nav2_waypoint_follower
包含一个具有特定任务执行器插件的路径点跟随程序。如果您需要前往给定位置并完成特定任务(例如拍照、拾取箱子或等待用户输入),这非常有用。 这是一个很好的演示应用程序,演示了如何在一个示例应用程序中使用Nav2
。
主要类
- WaypointFollower:一个动作服务器,使用行为树导航机器人到目标位置,继承于
nav2_util::LifecycleNode
WaypointFollower
:构造函数- 节点名称:
waypoint_follower
- 初始化节点参数
stop_on_failure
->true
loop_rate
->20
waypoint_task_executor_plugin
->wait_at_waypoint
wait_at_waypoint.plugin
->nav2_waypoint_follower::WaitAtWaypoint
- 节点名称:
on_configure
:配置成员变量,初始化follow_waypoints
动作服务- 获取节点参数配置,包括
waypoint_task_executor_plugin
- 创建
/navigate_to_pose
动作客户端nav_to_pose_client_
- 创建
/follow_waypoints
动作服务action_server_
,绑定followWaypoints
- 创建参数
waypoint_task_executor_plugin
配置的插件实例waypoint_task_executor_
,然后调用实例initialize
- 获取节点参数配置,包括
on_activate
:激活服务,绑定参数回调,调用createBond()
on_deactivate
:属性重置,调用destroyBond()
on_cleanup
:重置成员变量on_shutdown
:followWaypoints
:动作服务回调- 获取目标,实例化反馈和结果实例
- 校验服务是否激活,点位是否为空
- 开启
while
循环- 检测是否有取消请求,存在->
nav_to_pose_client_->async_cancel_all_goals()
->action_server_->terminate_all()
,返回 - 检测是否有抢占请求,存在->
action_server_->accept_pending_goal();goal_index=0;new_goal=true
- 是否
new_goal==true
new_goal=false
- 初始化
send_goal_options
绑定resultCallback
和goalResponseCallback
- 发送目标,
nav_to_pose_client_->async_send_goal(client_goal, send_goal_options)
current_goal_status_ = ActionStatus::PROCESSING
- 发布反馈
action_server_->publish_feedback(feedback)
- 查看
current_goal_status_
状态ActionStatus::FAILED
- 更新失败点索引
stop_on_failure_==true
:调用action_server_->terminate_current
返回退出函数- 否则继续下一个点
ActionStatus::SUCCEEDED
- 到达目标位置后调用目标位置的任务
waypoint_task_executor_->processAtWaypoint
stop_on_failure_==true
:调用action_server_->terminate_current
返回退出函数- 否则执行成功,继续往下
- 到达目标位置后调用目标位置的任务
- 任务完成或任务中,重置属性或打印状态
- 轮询并睡眠
- 检测是否有取消请求,存在->
resultCallback
:动作客户端结果回调,检查结果目标ID和当前任务ID,根据对应结果code
返回action
状态goalResponseCallback
:动作客户端目标响应回调,校验是否发送目标给服务器失败dynamicParametersCallback
:参数变更回调
插件
InputAtWaypoint:等待插件(用户交互插件),继承于
nav2_core::WaypointTaskExecutor
InputAtWaypoint
:构造函数initialize
:声明和加载使用的参数- 声明获取节点参数
plugin_name + ".waypoint_pause_duration"
->0
plugin_name + ".enabled"
->true
- 声明获取节点参数
processAtWaypoint
:到点执行的操作Cb
:执行处理的回调
InputAtWaypoint:继承于
nav2_core::WaypointTaskExecutor
InputAtWaypoint
:构造函数initialize
:声明和加载使用的参数- 声明获取节点参数
plugin_name + ".timeout"
->10.0
plugin_name + ".enabled"
->true
plugin_name + ".input_topic"
->input_at_waypoint/input
- 创建话题调用,话题名称为
plugin_name + ".input_topic"
,绑定Cb
- 声明获取节点参数
processAtWaypoint
:到点执行的操作,等待完成返回Cb
:执行处理的回调,input_received_ = true
PhotoAtWaypoint:拍照插件,继承于
nav2_core::WaypointTaskExecutor
InputAtWaypoint
:构造函数initialize
:声明和加载使用的参数- 声明获取节点参数
plugin_name + ".enabled"
->true
plugin_name + ".image_topic"
->/camera/color/image_raw
plugin_name + ".save_dir"
->/tmp/waypoint_images
plugin_name + ".image_format"
->png
- 是否存在保存路径,不存在创建
- 创建话题调用,话题名称为
plugin_name + ".input_topic"
,绑定imageCallback
- 声明获取节点参数
processAtWaypoint
:到点执行的操作,拼接图片名称和路径,调用deepCopyMsg2Mat
拷贝,然后保存imageCallback
:话题订阅回调,curr_frame_msg_ = msg
deepCopyMsg2Mat
:给定一个指向sensor::msg::Image
类型的共享指针,对输入的cv Mat
进行深度复制
nav2_simple_commander
Robot Navigator Method | Description |
---|---|
setInitialPose(initial_pose) | 初始化机器人初始位置PoseStamped |
goThroughPoses(poses, behavior_tree='') | 请求机器人驾驶通过一组位姿(PoseStamped 数组). |
goToPose(pose, behavior_tree='') | 请求机器人驾驶达到一个位置(PoseStamped 数组). |
followWaypoints(poses) | 请求机器人遵循一组路径点(PoseStamped 列表)。 这将在每个姿势执行特定的 TaskExecutor . |
followPath(path, controller_id='', goal_checker_id='') | 请求机器人遵循从起点到目标的路径 PoseStamped ,nav_msgs/Path . |
spin(spin_dist=1.57, time_allowance=10) | 要求机器人原地旋转指定角度 |
backup(backup_dist=0.15, backup_speed=0.025, time_allowance=10) | 要求机器人后退指定距离。 |
cancelTask() | 需要一个正在做的任务请求 |
isTaskComplete() | 检查任务是否完成,超时时间为 100 毫秒。 如果已完成则返回 True ,如果仍在继续则返回 False 。 |
getFeedback() | 获取任务反馈 |
getResult() | 获取任务的最终结果,在 isTaskComplete 返回 True 后调用。 返回操作服务器结果对象。 |
getPath(start, goal, planner_id='', use_start=False) | 获取从起点到目标的路径 PoseStamped ,nav_msgs/Path 。 |
getPathThroughPoses(start, goals, planner_id='', use_start=False) | 获取从起点到一组目标的路径、PoseStamped 列表、nav_msgs/Path 。 |
smoothPath(path, smoother_id='', max_duration=2.0, check_for_collision=False) | 平滑给定的路径nav_msgs/msg/Path |
changeMap(map_filepath) | 请求从当前地图更改为 map_filepath 的yaml 。 |
clearAllCostmaps() | 清除全局和局部代价地图 |
clearLocalCostmap() | 清除局部代价地图 |
clearGlobalCostmap() | 清除全局代价地图 |
getGlobalCostmap() | 获取全局代价地图, nav2_msgs/Costmap |
getLocalCostmap() | 获取局部代价地图, nav2_msgs/Costmap |
waitUntilNav2Active(navigator='bt_navigator, localizer='amcl') | 阻塞直到 Nav2 完全在线并且生命周期节点处于活动状态。 与自动启动或外部生命周期启动结合使用。 可以指定自定义导航器和定位器节点 |
lifecycleStartup() | 向所有生命周期管理服务器发送请求,使它们进入活动状态,如果自动启动为 false 并且您希望该程序控制 Nav2 的生命周期,则使用该请求。 |
lifecycleShutdown() | 向所有生命周期管理服务器发送请求以将其关闭。 |
destroyNode() | 销毁节点释放资源 |
- 构建应用程序的通用模板如下:
from nav2_simple_commander.robot_navigator import BasicNavigator
import rclpy
rclpy.init()
nav = BasicNavigator()
...
nav.setInitialPose(init_pose)
nav.waitUntilNav2Active() # if autostarted, else use `lifecycleStartup()`
...
path = nav.getPath(init_pose, goal_pose)
smoothed_path = nav.smoothPath(path)
...
nav.goToPose(goal_pose)
while not nav.isTaskComplete():
feedback = nav.getFeedback()
if feedback.navigation_duration > 600:
nav.cancelTask()
...
result = nav.getResult()
if result == TaskResult.SUCCEEDED:
print('Goal succeeded!')
elif result == TaskResult.CANCELED:
print('Goal was canceled!')
elif result == TaskResult.FAILED:
print('Goal failed!')
nav2_simple_commander
有一些示例,用来突出作为用户可用的API函数:example_nav_to_pose.py
- 演示了导航器的导航到位姿功能,以及一些辅助方法。example_nav_through_poses.py
- 演示了导航器的穿越位姿功能,以及一些辅助方法。example_waypoint_follower.py
- 演示了导航器的路径点跟随功能,以及一些辅助方法。example_follow_path.py
- 演示了导航器的路径跟随功能,以及一些辅助方法,如路径平滑处理。
nav2_simple_commander
有几个演示,旨在突出使用nav2_simple_commander
API构建的几个简单的自主应用程序:demo_security.py
- 一个简单的安防机器人应用程序,展示了如何让机器人使用Navigate Through Poses
沿着安全路径执行巡逻任务,无限期地。demo_picking.py
- 一个简单的物品拣选应用程序,展示了如何让机器人驶向仓库中的特定货架,以拣选物品或让人员将物品放入篮子并将其送往目的地进行运输,使用Navigate To Pose
。demo_inspection.py
- 一个简单的货架检查应用程序,展示了如何使用Waypoint Follower
和任务执行器来拍照、RFID
扫描等,以分析当前货架状态并定位仓库中的物品。
BasicNavigator
__init__
:节点名称basic_navigator
- 动作
navigate_through_poses
动作客户端nav_through_poses_client
navigate_to_pose
动作客户端nav_to_pose_client
follow_waypoints
动作客户端follow_waypoints_client
follow_path
动作客户端follow_path_client
compute_path_to_pose
动作客户端compute_path_to_pose_client
compute_path_through_poses
动作客户端compute_path_through_poses_client
smooth_path
动作客户端smoother_client
spin
动作客户端spin_client
backup
动作客户端backup_client
assisted_teleop
动作客户端assisted_teleop_client
- 话题
amcl_pose
话题订阅localization_pose_sub
,绑定_amclPoseCallback
initialpose
话题发布initial_pose_pub
- 服务
map_server/load_map
服务客户端change_maps_srv
global_costmap/clear_entirely_global_costmap
服务客户端clear_costmap_global_srv
local_costmap/clear_entirely_local_costmap
服务客户端clear_costmap_local_srv
global_costmap/get_costmap
服务客户端get_costmap_global_srv
local_costmap/get_costmap
服务客户端get_costmap_local_srv
- 动作
destroyNode()
:销毁节点,调用destroyNode()
destroyNode()
:调用动作,服务和话题节点的destroy()
setInitialPose(self, initial_pose)
:初始化位姿initial_pose_received
置为False
,直到_amclPoseCallback
回调置为True
- 调用
self._setInitialPose()
goThroughPoses(self, poses, behavior_tree='')
:发送一个NavThroughPoses
动作请求- 调用
while not self.nav_through_poses_client.wait_for_server(timeout_sec=1.0)
循环等待服务可达 - 调用发送请求
self.nav_through_poses_client.send_goal_async
,绑定self._feedbackCallback
rclpy.spin_until_future_complete
:轮询直到完成- 获取发送请求结果,未接收返回
False
- 获取结果返回
True
- 调用
goToPose(self, pose, behavior_tree='')
:发送一个NavToPose
的动作请求- 调用
while not self.nav_to_pose_client.wait_for_server(timeout_sec=1.0)
循环等待服务可达 - 调用发送请求
self.nav_to_pose_client.send_goal_async
,绑定self._feedbackCallback
rclpy.spin_until_future_complete
:轮询直到完成- 获取发送请求结果,未接收返回
False
- 获取结果返回
True
- 调用
followWaypoints(self, poses)
:发送一个FollowWaypoints
动作请求- 调用
while not self.follow_waypoints_client.wait_for_server(timeout_sec=1.0)
循环等待服务可达 - 调用发送请求
self.follow_waypoints_client.send_goal_async
,绑定self._feedbackCallback
rclpy.spin_until_future_complete
:轮询直到完成- 获取发送请求结果,未接收返回
False
- 获取结果返回
True
- 调用
spin(self, spin_dist=1.57, time_allowance=10)
:旋转- 调用
while not self.spin_client.wait_for_server(timeout_sec=1.0)
循环等待服务可达 - 调用发送请求
self.spin_client.send_goal_async
,绑定self._feedbackCallback
rclpy.spin_until_future_complete
:轮询直到完成- 获取发送请求结果,未接收返回
False
- 获取结果返回
True
- 调用
backup(self, backup_dist=0.15, backup_speed=0.025, time_allowance=10)
:后退- 调用
while not self.backup_client.wait_for_server(timeout_sec=1.0)
循环等待服务可达 - 调用发送请求
self.backup_client.send_goal_async
,绑定self._feedbackCallback
rclpy.spin_until_future_complete
:轮询直到完成- 获取发送请求结果,未接收返回
False
- 获取结果返回
True
- 调用
assistedTeleop(self, time_allowance=30)
:辅助远程控制- 调用
while not self.assisted_teleop_client.wait_for_server(timeout_sec=1.0)
循环等待服务可达 - 调用发送请求
self.assisted_teleop_client.send_goal_async
,绑定self._feedbackCallback
rclpy.spin_until_future_complete
:轮询直到完成- 获取发送请求结果,未接收返回
False
- 获取结果返回
True
- 调用
followPath(self, path, controller_id='', goal_checker_id='')
:发送一个FollowPath
动作请求- 调用
while not self.follow_path_client.wait_for_server(timeout_sec=1.0)
循环等待服务可达 - 调用发送请求
self.follow_path_client.send_goal_async
,绑定self._feedbackCallback
rclpy.spin_until_future_complete
:轮询直到完成- 获取发送请求结果,未接收返回
False
- 获取结果返回
True
- 调用
cancelTask(self)
:取消任何类型的任务- 存在
self.result_future
- 调用
future = self.goal_handle.cancel_goal_async()
- 调用
rclpy.spin_until_future_complete(self, future)
- 调用
- 存在
isTaskComplete(self)
:检测任何类型的任务是否完成- 存在
self.result_future
- 调用
rclpy.spin_until_future_complete(self, self.result_future, timeout_sec=0.10)
- 未超时,获取状态,不等于
GoalStatus.STATUS_SUCCEEDED
,任务失败,返回True
- 超时,返回
False
,表示还在执行中
- 未超时,获取状态,不等于
- 调用
- 不存在或状态等于
STATUS_SUCCEEDED
返回True
,执行成功
- 存在
getFeedback(self)
:返回反馈数据,return self.feedback
getResult(self)
:根据状态返回映射的新的任务状态waitUntilNav2Active(self, navigator='bt_navigator', localizer='amcl')
:阻塞等待导航激活并运行- 调用
self._waitForNodeToActivate(localizer)
- 如果
localizer == 'amcl'
调用self._waitForInitialPose()
- 调用
self._waitForNodeToActivate(navigator)
- 调用
_getPathImpl(self, start, goal, planner_id='', use_start=False)
:发送一个ComputePathToPose
动作请求,返回路径- 调用
while not self.compute_path_to_pose_client.wait_for_server(timeout_sec=1.0)
循环等待服务可达 - 调用发送请求
self.compute_path_to_pose_client.send_goal_async
rclpy.spin_until_future_complete
:轮询直到完成- 获取发送请求结果,未接收返回
None
- 调用
self.goal_handle.get_result_async()
获取结果,如果状态不等于GoalStatus.STATUS_SUCCEEDED
,获取失败,返回None
- 返回路径结果
- 调用
getPath(self, start, goal, planner_id='', use_start=False)
:发送一个ComputePathToPose
动作请求,返回路径- 调用
self._getPathImpl
,根据结果返回
- 调用
getPathThroughPoses(self, start, goals, planner_id='', use_start=False)
:发送一个ComputePathThroughPoses
动作请求,返回路径- 调用
while not self.compute_path_through_poses_client.wait_for_server(timeout_sec=1.0)
循环等待服务可达 - 调用发送请求
self.compute_path_through_poses_client.send_goal_async
rclpy.spin_until_future_complete
:轮询直到完成- 获取发送请求结果,未接收返回
None
- 调用
self.goal_handle.get_result_async()
获取结果,如果状态不等于GoalStatus.STATUS_SUCCEEDED
,获取失败,返回None
- 返回路径结果
- 调用
_smoothPathImpl(self, path, smoother_id='', max_duration=2.0, check_for_collision=False)
:发送一个SmoothPath
动作请求,返回路径- 调用
while not self.smoother_client.wait_for_server(timeout_sec=1.0)
循环等待服务可达 - 调用发送请求
self.smoother_client.send_goal_async
rclpy.spin_until_future_complete
:轮询直到完成- 获取发送请求结果,未接收返回
None
- 调用
self.goal_handle.get_result_async()
获取结果,如果状态不等于GoalStatus.STATUS_SUCCEEDED
,获取失败,返回None
- 返回路径结果
- 调用
smoothPath(self, path, smoother_id='', max_duration=2.0, check_for_collision=False)
:发送一个SmoothPath
动作请求,返回路径- 调用
self._smoothPathImpl
,根据结果返回
- 调用
changeMap(self, map_filepath)
:在地图服务更改当前静态地图- 调用
while not self.change_maps_srv.wait_for_server(timeout_sec=1.0)
循环等待服务可达 - 调用发送请求
future = self.change_maps_srv.call_async(req)
rclpy.spin_until_future_complete
:轮询直到完成- 获取结果状态,打印相关日志
- 调用
clearAllCostmaps(self)
:清除所有代价地图- 调用
self.clearLocalCostmap()
- 调用
self.clearGlobalCostmap()
- 调用
clearLocalCostmap()
:清除局部代价地图- 调用
while not self.clear_costmap_local_srv.wait_for_server(timeout_sec=1.0)
循环等待服务可达 - 调用发送请求
future = self.clear_costmap_local_srv.call_async(req)
rclpy.spin_until_future_complete
:轮询直到完成
- 调用
clearGlobalCostmap()
:清除全局代价地图- 调用
while not self.clear_costmap_global_srv.wait_for_server(timeout_sec=1.0)
循环等待服务可达 - 调用发送请求
future = self.clear_costmap_global_srv.call_async(req)
rclpy.spin_until_future_complete
:轮询直到完成
- 调用
getGlobalCostmap(self)
:获取全局代价地图- 调用
while not self.get_costmap_global_srv.wait_for_server(timeout_sec=1.0)
循环等待服务可达 - 调用发送请求
future = self.get_costmap_global_srv.call_async(req)
rclpy.spin_until_future_complete
:轮询直到完成,返回地图结果
- 调用
getLocalCostmap(self)
:获取局部代价地图- 调用
while not self.get_costmap_local_srv.wait_for_server(timeout_sec=1.0)
循环等待服务可达 - 调用发送请求
future = self.get_costmap_local_srv.call_async(req)
rclpy.spin_until_future_complete
:轮询直到完成,返回地图结果
- 调用
lifecycleStartup(self)
:生命周期启动,启动导航生命周期系统for srv_name, srv_type in self.get_service_names_and_types()
srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes'
- 创建客户端
mgr_client = self.create_client(ManageLifecycleNodes, srv_name)
while not mgr_client.wait_for_service(timeout_sec=1.0)
等待服务可用- 发送启动
STARTUP
请求future = mgr_client.call_async(req)
while
循环,启动需要完整的map->odom->base_link
TF树,所以如果我们不成功,请尝试初始化初始姿势- 调用
rclpy.spin_until_future_complete(self, future, timeout_sec=0.10)
- 如果
not future
调用self._waitForInitialPose()
,初始化位姿 - 否则
break
- 调用
- 创建客户端
- 启动成功
lifecycleShutdown(self)
:生命周期关闭,关闭导航生命周期系统for srv_name, srv_type in self.get_service_names_and_types()
srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes'
- 创建客户端
mgr_client = self.create_client(ManageLifecycleNodes, srv_name)
while not mgr_client.wait_for_service(timeout_sec=1.0)
等待服务可用- 发送关闭
SHUTDOWN
请求future = mgr_client.call_async(req)
- 调用
rclpy.spin_until_future_complete(self, future, timeout_sec=0.10)
- 创建客户端
- 关闭成功
_waitForNodeToActivate(self, node_name)
:等待节点激活- 创建
{node_name}/get_state
服务客户端 while
循环等待服务可用while
循环获取state
,直到state == active
跳出返回
- 创建
_waitForInitialPose(self)
:等待初始位置设置成功while not self.initial_pose_received
- 调用
self._setInitialPose()
- 调用
rclpy.spin_once(self, timeout_sec=1.0)
- 调用
_amclPoseCallback(self, msg)
:amcl_pose
话题订阅回调,设置属性self.initial_pose_received = True
_feedbackCallback(self, msg)
:动作反馈回调,简单赋值self.feedback = msg.feedback
_setInitialPose(self)
:发布初始位姿:self.initial_pose_pub.publish(msg)
info(self, msg)
:打印info
日志warn(self, msg)
:打印warn
日志error(self, msg)
:打印error
日志debug(self, msg)
:打印debug
日志
example_nav_to_pose.py
- 实例化
navigator = BasicNavigator()
- 设置初始化位姿,
navigator.setInitialPose(initial_pose)
- 阻塞等待导航模块起来并运行中,
navigator.waitUntilNav2Active()
,如果导航模块未配置autostart
则需要调用navigator.lifecycleStartup()