datetime:2024/04/28 11:06
author:nzb
导航
https://github.com/ros-planning/navigation2.git
分支:humble
节点:3ed4c2d

应用及工具
nav2_waypoint_follower| 航路点追随者,看README.mdnav2_simple_commander| 简单的应用demo,python版本API,看README.md
nav2_waypoint_follower
包含一个具有特定任务执行器插件的路径点跟随程序。如果您需要前往给定位置并完成特定任务(例如拍照、拾取箱子或等待用户输入),这非常有用。 这是一个很好的演示应用程序,演示了如何在一个示例应用程序中使用Nav2。
主要类
- WaypointFollower:一个动作服务器,使用行为树导航机器人到目标位置,继承于
nav2_util::LifecycleNodeWaypointFollower:构造函数- 节点名称:
waypoint_follower - 初始化节点参数
stop_on_failure->trueloop_rate->20waypoint_task_executor_plugin->wait_at_waypointwait_at_waypoint.plugin->nav2_waypoint_follower::WaitAtWaypoint
- 节点名称:
on_configure:配置成员变量,初始化follow_waypoints动作服务- 获取节点参数配置,包括
waypoint_task_executor_plugin - 创建
/navigate_to_pose动作客户端nav_to_pose_client_ - 创建
/follow_waypoints动作服务action_server_,绑定followWaypoints - 创建参数
waypoint_task_executor_plugin配置的插件实例waypoint_task_executor_,然后调用实例initialize
- 获取节点参数配置,包括
on_activate:激活服务,绑定参数回调,调用createBond()on_deactivate:属性重置,调用destroyBond()on_cleanup:重置成员变量on_shutdown:followWaypoints:动作服务回调- 获取目标,实例化反馈和结果实例
- 校验服务是否激活,点位是否为空
- 开启
while循环- 检测是否有取消请求,存在->
nav_to_pose_client_->async_cancel_all_goals()->action_server_->terminate_all(),返回 - 检测是否有抢占请求,存在->
action_server_->accept_pending_goal();goal_index=0;new_goal=true - 是否
new_goal==truenew_goal=false- 初始化
send_goal_options绑定resultCallback和goalResponseCallback - 发送目标,
nav_to_pose_client_->async_send_goal(client_goal, send_goal_options) current_goal_status_ = ActionStatus::PROCESSING
- 发布反馈
action_server_->publish_feedback(feedback) - 查看
current_goal_status_状态ActionStatus::FAILED- 更新失败点索引
stop_on_failure_==true:调用action_server_->terminate_current返回退出函数- 否则继续下一个点
ActionStatus::SUCCEEDED- 到达目标位置后调用目标位置的任务
waypoint_task_executor_->processAtWaypoint stop_on_failure_==true:调用action_server_->terminate_current返回退出函数- 否则执行成功,继续往下
- 到达目标位置后调用目标位置的任务
- 任务完成或任务中,重置属性或打印状态
- 轮询并睡眠
- 检测是否有取消请求,存在->
resultCallback:动作客户端结果回调,检查结果目标ID和当前任务ID,根据对应结果code返回action状态goalResponseCallback:动作客户端目标响应回调,校验是否发送目标给服务器失败dynamicParametersCallback:参数变更回调
插件
InputAtWaypoint:等待插件(用户交互插件),继承于
nav2_core::WaypointTaskExecutorInputAtWaypoint:构造函数initialize:声明和加载使用的参数- 声明获取节点参数
plugin_name + ".waypoint_pause_duration"->0plugin_name + ".enabled"->true
- 声明获取节点参数
processAtWaypoint:到点执行的操作Cb:执行处理的回调
InputAtWaypoint:继承于
nav2_core::WaypointTaskExecutorInputAtWaypoint:构造函数initialize:声明和加载使用的参数- 声明获取节点参数
plugin_name + ".timeout"->10.0plugin_name + ".enabled"->trueplugin_name + ".input_topic"->input_at_waypoint/input
- 创建话题调用,话题名称为
plugin_name + ".input_topic",绑定Cb
- 声明获取节点参数
processAtWaypoint:到点执行的操作,等待完成返回Cb:执行处理的回调,input_received_ = true
PhotoAtWaypoint:拍照插件,继承于
nav2_core::WaypointTaskExecutorInputAtWaypoint:构造函数initialize:声明和加载使用的参数- 声明获取节点参数
plugin_name + ".enabled"->trueplugin_name + ".image_topic"->/camera/color/image_rawplugin_name + ".save_dir"->/tmp/waypoint_imagesplugin_name + ".image_format"->png
- 是否存在保存路径,不存在创建
- 创建话题调用,话题名称为
plugin_name + ".input_topic",绑定imageCallback
- 声明获取节点参数
processAtWaypoint:到点执行的操作,拼接图片名称和路径,调用deepCopyMsg2Mat拷贝,然后保存imageCallback:话题订阅回调,curr_frame_msg_ = msgdeepCopyMsg2Mat:给定一个指向sensor::msg::Image类型的共享指针,对输入的cv Mat进行深度复制
nav2_simple_commander
| Robot Navigator Method | Description |
|---|---|
| setInitialPose(initial_pose) | 初始化机器人初始位置PoseStamped |
| goThroughPoses(poses, behavior_tree='') | 请求机器人驾驶通过一组位姿(PoseStamped数组). |
| goToPose(pose, behavior_tree='') | 请求机器人驾驶达到一个位置(PoseStamped数组). |
| followWaypoints(poses) | 请求机器人遵循一组路径点(PoseStamped 列表)。 这将在每个姿势执行特定的 TaskExecutor. |
| followPath(path, controller_id='', goal_checker_id='') | 请求机器人遵循从起点到目标的路径 PoseStamped,nav_msgs/Path. |
| spin(spin_dist=1.57, time_allowance=10) | 要求机器人原地旋转指定角度 |
| backup(backup_dist=0.15, backup_speed=0.025, time_allowance=10) | 要求机器人后退指定距离。 |
| cancelTask() | 需要一个正在做的任务请求 |
| isTaskComplete() | 检查任务是否完成,超时时间为 100 毫秒。 如果已完成则返回 True,如果仍在继续则返回 False。 |
| getFeedback() | 获取任务反馈 |
| getResult() | 获取任务的最终结果,在 isTaskComplete 返回 True 后调用。 返回操作服务器结果对象。 |
| getPath(start, goal, planner_id='', use_start=False) | 获取从起点到目标的路径 PoseStamped,nav_msgs/Path。 |
| getPathThroughPoses(start, goals, planner_id='', use_start=False) | 获取从起点到一组目标的路径、PoseStamped 列表、nav_msgs/Path。 |
| smoothPath(path, smoother_id='', max_duration=2.0, check_for_collision=False) | 平滑给定的路径nav_msgs/msg/Path |
| changeMap(map_filepath) | 请求从当前地图更改为 map_filepath 的yaml。 |
| clearAllCostmaps() | 清除全局和局部代价地图 |
| clearLocalCostmap() | 清除局部代价地图 |
| clearGlobalCostmap() | 清除全局代价地图 |
| getGlobalCostmap() | 获取全局代价地图, nav2_msgs/Costmap |
| getLocalCostmap() | 获取局部代价地图, nav2_msgs/Costmap |
| waitUntilNav2Active(navigator='bt_navigator, localizer='amcl') | 阻塞直到 Nav2 完全在线并且生命周期节点处于活动状态。 与自动启动或外部生命周期启动结合使用。 可以指定自定义导航器和定位器节点 |
| lifecycleStartup() | 向所有生命周期管理服务器发送请求,使它们进入活动状态,如果自动启动为 false 并且您希望该程序控制 Nav2 的生命周期,则使用该请求。 |
| lifecycleShutdown() | 向所有生命周期管理服务器发送请求以将其关闭。 |
| destroyNode() | 销毁节点释放资源 |
- 构建应用程序的通用模板如下:
from nav2_simple_commander.robot_navigator import BasicNavigator
import rclpy
rclpy.init()
nav = BasicNavigator()
...
nav.setInitialPose(init_pose)
nav.waitUntilNav2Active() # if autostarted, else use `lifecycleStartup()`
...
path = nav.getPath(init_pose, goal_pose)
smoothed_path = nav.smoothPath(path)
...
nav.goToPose(goal_pose)
while not nav.isTaskComplete():
feedback = nav.getFeedback()
if feedback.navigation_duration > 600:
nav.cancelTask()
...
result = nav.getResult()
if result == TaskResult.SUCCEEDED:
print('Goal succeeded!')
elif result == TaskResult.CANCELED:
print('Goal was canceled!')
elif result == TaskResult.FAILED:
print('Goal failed!')
nav2_simple_commander有一些示例,用来突出作为用户可用的API函数:example_nav_to_pose.py- 演示了导航器的导航到位姿功能,以及一些辅助方法。example_nav_through_poses.py- 演示了导航器的穿越位姿功能,以及一些辅助方法。example_waypoint_follower.py- 演示了导航器的路径点跟随功能,以及一些辅助方法。example_follow_path.py- 演示了导航器的路径跟随功能,以及一些辅助方法,如路径平滑处理。
nav2_simple_commander有几个演示,旨在突出使用nav2_simple_commanderAPI构建的几个简单的自主应用程序:demo_security.py- 一个简单的安防机器人应用程序,展示了如何让机器人使用Navigate Through Poses沿着安全路径执行巡逻任务,无限期地。demo_picking.py- 一个简单的物品拣选应用程序,展示了如何让机器人驶向仓库中的特定货架,以拣选物品或让人员将物品放入篮子并将其送往目的地进行运输,使用Navigate To Pose。demo_inspection.py- 一个简单的货架检查应用程序,展示了如何使用Waypoint Follower和任务执行器来拍照、RFID扫描等,以分析当前货架状态并定位仓库中的物品。
BasicNavigator
__init__:节点名称basic_navigator- 动作
navigate_through_poses动作客户端nav_through_poses_clientnavigate_to_pose动作客户端nav_to_pose_clientfollow_waypoints动作客户端follow_waypoints_clientfollow_path动作客户端follow_path_clientcompute_path_to_pose动作客户端compute_path_to_pose_clientcompute_path_through_poses动作客户端compute_path_through_poses_clientsmooth_path动作客户端smoother_clientspin动作客户端spin_clientbackup动作客户端backup_clientassisted_teleop动作客户端assisted_teleop_client
- 话题
amcl_pose话题订阅localization_pose_sub,绑定_amclPoseCallbackinitialpose话题发布initial_pose_pub
- 服务
map_server/load_map服务客户端change_maps_srvglobal_costmap/clear_entirely_global_costmap服务客户端clear_costmap_global_srvlocal_costmap/clear_entirely_local_costmap服务客户端clear_costmap_local_srvglobal_costmap/get_costmap服务客户端get_costmap_global_srvlocal_costmap/get_costmap服务客户端get_costmap_local_srv
- 动作
destroyNode():销毁节点,调用destroyNode()destroyNode():调用动作,服务和话题节点的destroy()setInitialPose(self, initial_pose):初始化位姿initial_pose_received置为False,直到_amclPoseCallback回调置为True- 调用
self._setInitialPose()
goThroughPoses(self, poses, behavior_tree=''):发送一个NavThroughPoses动作请求- 调用
while not self.nav_through_poses_client.wait_for_server(timeout_sec=1.0)循环等待服务可达 - 调用发送请求
self.nav_through_poses_client.send_goal_async,绑定self._feedbackCallback rclpy.spin_until_future_complete:轮询直到完成- 获取发送请求结果,未接收返回
False - 获取结果返回
True
- 调用
goToPose(self, pose, behavior_tree=''):发送一个NavToPose的动作请求- 调用
while not self.nav_to_pose_client.wait_for_server(timeout_sec=1.0)循环等待服务可达 - 调用发送请求
self.nav_to_pose_client.send_goal_async,绑定self._feedbackCallback rclpy.spin_until_future_complete:轮询直到完成- 获取发送请求结果,未接收返回
False - 获取结果返回
True
- 调用
followWaypoints(self, poses):发送一个FollowWaypoints动作请求- 调用
while not self.follow_waypoints_client.wait_for_server(timeout_sec=1.0)循环等待服务可达 - 调用发送请求
self.follow_waypoints_client.send_goal_async,绑定self._feedbackCallback rclpy.spin_until_future_complete:轮询直到完成- 获取发送请求结果,未接收返回
False - 获取结果返回
True
- 调用
spin(self, spin_dist=1.57, time_allowance=10):旋转- 调用
while not self.spin_client.wait_for_server(timeout_sec=1.0)循环等待服务可达 - 调用发送请求
self.spin_client.send_goal_async,绑定self._feedbackCallback rclpy.spin_until_future_complete:轮询直到完成- 获取发送请求结果,未接收返回
False - 获取结果返回
True
- 调用
backup(self, backup_dist=0.15, backup_speed=0.025, time_allowance=10):后退- 调用
while not self.backup_client.wait_for_server(timeout_sec=1.0)循环等待服务可达 - 调用发送请求
self.backup_client.send_goal_async,绑定self._feedbackCallback rclpy.spin_until_future_complete:轮询直到完成- 获取发送请求结果,未接收返回
False - 获取结果返回
True
- 调用
assistedTeleop(self, time_allowance=30):辅助远程控制- 调用
while not self.assisted_teleop_client.wait_for_server(timeout_sec=1.0)循环等待服务可达 - 调用发送请求
self.assisted_teleop_client.send_goal_async,绑定self._feedbackCallback rclpy.spin_until_future_complete:轮询直到完成- 获取发送请求结果,未接收返回
False - 获取结果返回
True
- 调用
followPath(self, path, controller_id='', goal_checker_id=''):发送一个FollowPath动作请求- 调用
while not self.follow_path_client.wait_for_server(timeout_sec=1.0)循环等待服务可达 - 调用发送请求
self.follow_path_client.send_goal_async,绑定self._feedbackCallback rclpy.spin_until_future_complete:轮询直到完成- 获取发送请求结果,未接收返回
False - 获取结果返回
True
- 调用
cancelTask(self):取消任何类型的任务- 存在
self.result_future- 调用
future = self.goal_handle.cancel_goal_async() - 调用
rclpy.spin_until_future_complete(self, future)
- 调用
- 存在
isTaskComplete(self):检测任何类型的任务是否完成- 存在
self.result_future- 调用
rclpy.spin_until_future_complete(self, self.result_future, timeout_sec=0.10)- 未超时,获取状态,不等于
GoalStatus.STATUS_SUCCEEDED,任务失败,返回True - 超时,返回
False,表示还在执行中
- 未超时,获取状态,不等于
- 调用
- 不存在或状态等于
STATUS_SUCCEEDED返回True,执行成功
- 存在
getFeedback(self):返回反馈数据,return self.feedbackgetResult(self):根据状态返回映射的新的任务状态waitUntilNav2Active(self, navigator='bt_navigator', localizer='amcl'):阻塞等待导航激活并运行- 调用
self._waitForNodeToActivate(localizer) - 如果
localizer == 'amcl'调用self._waitForInitialPose() - 调用
self._waitForNodeToActivate(navigator)
- 调用
_getPathImpl(self, start, goal, planner_id='', use_start=False):发送一个ComputePathToPose动作请求,返回路径- 调用
while not self.compute_path_to_pose_client.wait_for_server(timeout_sec=1.0)循环等待服务可达 - 调用发送请求
self.compute_path_to_pose_client.send_goal_async rclpy.spin_until_future_complete:轮询直到完成- 获取发送请求结果,未接收返回
None - 调用
self.goal_handle.get_result_async()获取结果,如果状态不等于GoalStatus.STATUS_SUCCEEDED,获取失败,返回None - 返回路径结果
- 调用
getPath(self, start, goal, planner_id='', use_start=False):发送一个ComputePathToPose动作请求,返回路径- 调用
self._getPathImpl,根据结果返回
- 调用
getPathThroughPoses(self, start, goals, planner_id='', use_start=False):发送一个ComputePathThroughPoses动作请求,返回路径- 调用
while not self.compute_path_through_poses_client.wait_for_server(timeout_sec=1.0)循环等待服务可达 - 调用发送请求
self.compute_path_through_poses_client.send_goal_async rclpy.spin_until_future_complete:轮询直到完成- 获取发送请求结果,未接收返回
None - 调用
self.goal_handle.get_result_async()获取结果,如果状态不等于GoalStatus.STATUS_SUCCEEDED,获取失败,返回None - 返回路径结果
- 调用
_smoothPathImpl(self, path, smoother_id='', max_duration=2.0, check_for_collision=False):发送一个SmoothPath动作请求,返回路径- 调用
while not self.smoother_client.wait_for_server(timeout_sec=1.0)循环等待服务可达 - 调用发送请求
self.smoother_client.send_goal_async rclpy.spin_until_future_complete:轮询直到完成- 获取发送请求结果,未接收返回
None - 调用
self.goal_handle.get_result_async()获取结果,如果状态不等于GoalStatus.STATUS_SUCCEEDED,获取失败,返回None - 返回路径结果
- 调用
smoothPath(self, path, smoother_id='', max_duration=2.0, check_for_collision=False):发送一个SmoothPath动作请求,返回路径- 调用
self._smoothPathImpl,根据结果返回
- 调用
changeMap(self, map_filepath):在地图服务更改当前静态地图- 调用
while not self.change_maps_srv.wait_for_server(timeout_sec=1.0)循环等待服务可达 - 调用发送请求
future = self.change_maps_srv.call_async(req) rclpy.spin_until_future_complete:轮询直到完成- 获取结果状态,打印相关日志
- 调用
clearAllCostmaps(self):清除所有代价地图- 调用
self.clearLocalCostmap() - 调用
self.clearGlobalCostmap()
- 调用
clearLocalCostmap():清除局部代价地图- 调用
while not self.clear_costmap_local_srv.wait_for_server(timeout_sec=1.0)循环等待服务可达 - 调用发送请求
future = self.clear_costmap_local_srv.call_async(req) rclpy.spin_until_future_complete:轮询直到完成
- 调用
clearGlobalCostmap():清除全局代价地图- 调用
while not self.clear_costmap_global_srv.wait_for_server(timeout_sec=1.0)循环等待服务可达 - 调用发送请求
future = self.clear_costmap_global_srv.call_async(req) rclpy.spin_until_future_complete:轮询直到完成
- 调用
getGlobalCostmap(self):获取全局代价地图- 调用
while not self.get_costmap_global_srv.wait_for_server(timeout_sec=1.0)循环等待服务可达 - 调用发送请求
future = self.get_costmap_global_srv.call_async(req) rclpy.spin_until_future_complete:轮询直到完成,返回地图结果
- 调用
getLocalCostmap(self):获取局部代价地图- 调用
while not self.get_costmap_local_srv.wait_for_server(timeout_sec=1.0)循环等待服务可达 - 调用发送请求
future = self.get_costmap_local_srv.call_async(req) rclpy.spin_until_future_complete:轮询直到完成,返回地图结果
- 调用
lifecycleStartup(self):生命周期启动,启动导航生命周期系统for srv_name, srv_type in self.get_service_names_and_types()srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes'- 创建客户端
mgr_client = self.create_client(ManageLifecycleNodes, srv_name) while not mgr_client.wait_for_service(timeout_sec=1.0)等待服务可用- 发送启动
STARTUP请求future = mgr_client.call_async(req) while循环,启动需要完整的map->odom->base_linkTF树,所以如果我们不成功,请尝试初始化初始姿势- 调用
rclpy.spin_until_future_complete(self, future, timeout_sec=0.10) - 如果
not future调用self._waitForInitialPose(),初始化位姿 - 否则
break
- 调用
- 创建客户端
- 启动成功
lifecycleShutdown(self):生命周期关闭,关闭导航生命周期系统for srv_name, srv_type in self.get_service_names_and_types()srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes'- 创建客户端
mgr_client = self.create_client(ManageLifecycleNodes, srv_name) while not mgr_client.wait_for_service(timeout_sec=1.0)等待服务可用- 发送关闭
SHUTDOWN请求future = mgr_client.call_async(req) - 调用
rclpy.spin_until_future_complete(self, future, timeout_sec=0.10)
- 创建客户端
- 关闭成功
_waitForNodeToActivate(self, node_name):等待节点激活- 创建
{node_name}/get_state服务客户端 while循环等待服务可用while循环获取state,直到state == active跳出返回
- 创建
_waitForInitialPose(self):等待初始位置设置成功while not self.initial_pose_received- 调用
self._setInitialPose() - 调用
rclpy.spin_once(self, timeout_sec=1.0)
- 调用
_amclPoseCallback(self, msg):amcl_pose话题订阅回调,设置属性self.initial_pose_received = True_feedbackCallback(self, msg):动作反馈回调,简单赋值self.feedback = msg.feedback_setInitialPose(self):发布初始位姿:self.initial_pose_pub.publish(msg)info(self, msg):打印info日志warn(self, msg):打印warn日志error(self, msg):打印error日志debug(self, msg):打印debug日志
example_nav_to_pose.py
- 实例化
navigator = BasicNavigator() - 设置初始化位姿,
navigator.setInitialPose(initial_pose) - 阻塞等待导航模块起来并运行中,
navigator.waitUntilNav2Active(),如果导航模块未配置autostart则需要调用navigator.lifecycleStartup()