datetime:2024/04/28 11:06
author:nzb

导航

https://github.com/ros-planning/navigation2.git

分支:humble

节点:3ed4c2d

应用及工具

  • nav2_waypoint_follower | 航路点追随者,看README.md
  • nav2_simple_commander | 简单的应用demo,python版本API,看README.md

包含一个具有特定任务执行器插件的路径点跟随程序。如果您需要前往给定位置并完成特定任务(例如拍照、拾取箱子或等待用户输入),这非常有用。 这是一个很好的演示应用程序,演示了如何在一个示例应用程序中使用Nav2

主要类

  • WaypointFollower:一个动作服务器,使用行为树导航机器人到目标位置,继承于nav2_util::LifecycleNode
    • WaypointFollower:构造函数
      • 节点名称:waypoint_follower
      • 初始化节点参数
        • stop_on_failure->true
        • loop_rate->20
        • waypoint_task_executor_plugin->wait_at_waypoint
        • wait_at_waypoint.plugin->nav2_waypoint_follower::WaitAtWaypoint
    • on_configure:配置成员变量,初始化follow_waypoints动作服务
      • 获取节点参数配置,包括waypoint_task_executor_plugin
      • 创建/navigate_to_pose动作客户端nav_to_pose_client_
      • 创建/follow_waypoints动作服务action_server_,绑定followWaypoints
      • 创建参数waypoint_task_executor_plugin配置的插件实例waypoint_task_executor_,然后调用实例initialize
    • on_activate:激活服务,绑定参数回调,调用createBond()
    • on_deactivate:属性重置,调用destroyBond()
    • on_cleanup:重置成员变量
    • on_shutdown
    • followWaypoints:动作服务回调
      • 获取目标,实例化反馈和结果实例
      • 校验服务是否激活,点位是否为空
      • 开启while循环
        • 检测是否有取消请求,存在->nav_to_pose_client_->async_cancel_all_goals()->action_server_->terminate_all(),返回
        • 检测是否有抢占请求,存在->action_server_->accept_pending_goal();goal_index=0;new_goal=true
        • 是否new_goal==true
          • new_goal=false
          • 初始化send_goal_options绑定resultCallbackgoalResponseCallback
          • 发送目标,nav_to_pose_client_->async_send_goal(client_goal, send_goal_options)
          • current_goal_status_ = ActionStatus::PROCESSING
        • 发布反馈action_server_->publish_feedback(feedback)
        • 查看current_goal_status_状态
          • ActionStatus::FAILED
            • 更新失败点索引
            • stop_on_failure_==true:调用action_server_->terminate_current返回退出函数
            • 否则继续下一个点
          • ActionStatus::SUCCEEDED
            • 到达目标位置后调用目标位置的任务waypoint_task_executor_->processAtWaypoint
            • stop_on_failure_==true:调用action_server_->terminate_current返回退出函数
            • 否则执行成功,继续往下
        • 任务完成或任务中,重置属性或打印状态
        • 轮询并睡眠
    • resultCallback:动作客户端结果回调,检查结果目标ID和当前任务ID,根据对应结果code返回action状态
    • goalResponseCallback:动作客户端目标响应回调,校验是否发送目标给服务器失败
    • dynamicParametersCallback:参数变更回调

插件

  • InputAtWaypoint:等待插件(用户交互插件),继承于nav2_core::WaypointTaskExecutor

    • InputAtWaypoint:构造函数
    • initialize:声明和加载使用的参数
      • 声明获取节点参数
        • plugin_name + ".waypoint_pause_duration" -> 0
        • plugin_name + ".enabled" -> true
    • processAtWaypoint:到点执行的操作
    • Cb:执行处理的回调
  • InputAtWaypoint:继承于nav2_core::WaypointTaskExecutor

    • InputAtWaypoint:构造函数
    • initialize:声明和加载使用的参数
      • 声明获取节点参数
        • plugin_name + ".timeout" -> 10.0
        • plugin_name + ".enabled" -> true
        • plugin_name + ".input_topic" -> input_at_waypoint/input
      • 创建话题调用,话题名称为plugin_name + ".input_topic",绑定Cb
    • processAtWaypoint:到点执行的操作,等待完成返回
    • Cb:执行处理的回调,input_received_ = true
  • PhotoAtWaypoint:拍照插件,继承于nav2_core::WaypointTaskExecutor

    • InputAtWaypoint:构造函数
    • initialize:声明和加载使用的参数
      • 声明获取节点参数
        • plugin_name + ".enabled" -> true
        • plugin_name + ".image_topic" -> /camera/color/image_raw
        • plugin_name + ".save_dir" -> /tmp/waypoint_images
        • plugin_name + ".image_format" -> png
      • 是否存在保存路径,不存在创建
      • 创建话题调用,话题名称为plugin_name + ".input_topic",绑定imageCallback
    • processAtWaypoint:到点执行的操作,拼接图片名称和路径,调用deepCopyMsg2Mat拷贝,然后保存
    • imageCallback:话题订阅回调,curr_frame_msg_ = msg
    • deepCopyMsg2Mat:给定一个指向sensor::msg::Image类型的共享指针,对输入的cv Mat进行深度复制
Robot Navigator Method Description
setInitialPose(initial_pose) 初始化机器人初始位置PoseStamped
goThroughPoses(poses, behavior_tree='') 请求机器人驾驶通过一组位姿(PoseStamped数组).
goToPose(pose, behavior_tree='') 请求机器人驾驶达到一个位置(PoseStamped数组).
followWaypoints(poses) 请求机器人遵循一组路径点(PoseStamped 列表)。 这将在每个姿势执行特定的 TaskExecutor.
followPath(path, controller_id='', goal_checker_id='') 请求机器人遵循从起点到目标的路径 PoseStampednav_msgs/Path.
spin(spin_dist=1.57, time_allowance=10) 要求机器人原地旋转指定角度
backup(backup_dist=0.15, backup_speed=0.025, time_allowance=10) 要求机器人后退指定距离。
cancelTask() 需要一个正在做的任务请求
isTaskComplete() 检查任务是否完成,超时时间为 100 毫秒。 如果已完成则返回 True,如果仍在继续则返回 False
getFeedback() 获取任务反馈
getResult() 获取任务的最终结果,在 isTaskComplete 返回 True 后调用。 返回操作服务器结果对象。
getPath(start, goal, planner_id='', use_start=False) 获取从起点到目标的路径 PoseStampednav_msgs/Path
getPathThroughPoses(start, goals, planner_id='', use_start=False) 获取从起点到一组目标的路径、PoseStamped 列表、nav_msgs/Path
smoothPath(path, smoother_id='', max_duration=2.0, check_for_collision=False) 平滑给定的路径nav_msgs/msg/Path
changeMap(map_filepath) 请求从当前地图更改为 map_filepathyaml
clearAllCostmaps() 清除全局和局部代价地图
clearLocalCostmap() 清除局部代价地图
clearGlobalCostmap() 清除全局代价地图
getGlobalCostmap() 获取全局代价地图, nav2_msgs/Costmap
getLocalCostmap() 获取局部代价地图, nav2_msgs/Costmap
waitUntilNav2Active(navigator='bt_navigator, localizer='amcl') 阻塞直到 Nav2 完全在线并且生命周期节点处于活动状态。 与自动启动或外部生命周期启动结合使用。 可以指定自定义导航器和定位器节点
lifecycleStartup() 向所有生命周期管理服务器发送请求,使它们进入活动状态,如果自动启动为 false 并且您希望该程序控制 Nav2 的生命周期,则使用该请求。
lifecycleShutdown() 向所有生命周期管理服务器发送请求以将其关闭。
destroyNode() 销毁节点释放资源
  • 构建应用程序的通用模板如下:

from nav2_simple_commander.robot_navigator import BasicNavigator
import rclpy

rclpy.init()

nav = BasicNavigator()
...
nav.setInitialPose(init_pose)
nav.waitUntilNav2Active()  # if autostarted, else use `lifecycleStartup()`
...
path = nav.getPath(init_pose, goal_pose)
smoothed_path = nav.smoothPath(path)
...
nav.goToPose(goal_pose)
while not nav.isTaskComplete():
    feedback = nav.getFeedback()
    if feedback.navigation_duration > 600:
        nav.cancelTask()
...
result = nav.getResult()
if result == TaskResult.SUCCEEDED:
    print('Goal succeeded!')
elif result == TaskResult.CANCELED:
    print('Goal was canceled!')
elif result == TaskResult.FAILED:
    print('Goal failed!')
  • nav2_simple_commander 有一些示例,用来突出作为用户可用的API函数:

    • example_nav_to_pose.py - 演示了导航器的导航到位姿功能,以及一些辅助方法。
    • example_nav_through_poses.py - 演示了导航器的穿越位姿功能,以及一些辅助方法。
    • example_waypoint_follower.py - 演示了导航器的路径点跟随功能,以及一些辅助方法。
    • example_follow_path.py - 演示了导航器的路径跟随功能,以及一些辅助方法,如路径平滑处理。
  • nav2_simple_commander 有几个演示,旨在突出使用 nav2_simple_commander API构建的几个简单的自主应用程序:

    • demo_security.py - 一个简单的安防机器人应用程序,展示了如何让机器人使用Navigate Through Poses沿着安全路径执行巡逻任务,无限期地。
    • demo_picking.py - 一个简单的物品拣选应用程序,展示了如何让机器人驶向仓库中的特定货架,以拣选物品或让人员将物品放入篮子并将其送往目的地进行运输,使用Navigate To Pose
    • demo_inspection.py - 一个简单的货架检查应用程序,展示了如何使用Waypoint Follower和任务执行器来拍照、RFID扫描等,以分析当前货架状态并定位仓库中的物品。

BasicNavigator

  • __init__:节点名称basic_navigator
    • 动作
      • navigate_through_poses动作客户端nav_through_poses_client
      • navigate_to_pose动作客户端nav_to_pose_client
      • follow_waypoints动作客户端follow_waypoints_client
      • follow_path动作客户端follow_path_client
      • compute_path_to_pose动作客户端compute_path_to_pose_client
      • compute_path_through_poses动作客户端compute_path_through_poses_client
      • smooth_path动作客户端smoother_client
      • spin动作客户端spin_client
      • backup动作客户端backup_client
      • assisted_teleop动作客户端assisted_teleop_client
    • 话题
      • amcl_pose话题订阅localization_pose_sub,绑定_amclPoseCallback
      • initialpose话题发布initial_pose_pub
    • 服务
      • map_server/load_map服务客户端change_maps_srv
      • global_costmap/clear_entirely_global_costmap服务客户端clear_costmap_global_srv
      • local_costmap/clear_entirely_local_costmap服务客户端clear_costmap_local_srv
      • global_costmap/get_costmap服务客户端get_costmap_global_srv
      • local_costmap/get_costmap服务客户端get_costmap_local_srv
  • destroyNode():销毁节点,调用destroyNode()
  • destroyNode():调用动作,服务和话题节点的destroy()
  • setInitialPose(self, initial_pose):初始化位姿
    • initial_pose_received置为False,直到_amclPoseCallback回调置为True
    • 调用self._setInitialPose()
  • goThroughPoses(self, poses, behavior_tree=''):发送一个NavThroughPoses动作请求
    • 调用while not self.nav_through_poses_client.wait_for_server(timeout_sec=1.0)循环等待服务可达
    • 调用发送请求self.nav_through_poses_client.send_goal_async,绑定self._feedbackCallback
    • rclpy.spin_until_future_complete:轮询直到完成
    • 获取发送请求结果,未接收返回False
    • 获取结果返回True
  • goToPose(self, pose, behavior_tree=''):发送一个NavToPose的动作请求
    • 调用while not self.nav_to_pose_client.wait_for_server(timeout_sec=1.0)循环等待服务可达
    • 调用发送请求self.nav_to_pose_client.send_goal_async,绑定self._feedbackCallback
    • rclpy.spin_until_future_complete:轮询直到完成
    • 获取发送请求结果,未接收返回False
    • 获取结果返回True
  • followWaypoints(self, poses):发送一个FollowWaypoints动作请求
    • 调用while not self.follow_waypoints_client.wait_for_server(timeout_sec=1.0)循环等待服务可达
    • 调用发送请求self.follow_waypoints_client.send_goal_async,绑定self._feedbackCallback
    • rclpy.spin_until_future_complete:轮询直到完成
    • 获取发送请求结果,未接收返回False
    • 获取结果返回True
  • spin(self, spin_dist=1.57, time_allowance=10):旋转
    • 调用while not self.spin_client.wait_for_server(timeout_sec=1.0)循环等待服务可达
    • 调用发送请求self.spin_client.send_goal_async,绑定self._feedbackCallback
    • rclpy.spin_until_future_complete:轮询直到完成
    • 获取发送请求结果,未接收返回False
    • 获取结果返回True
  • backup(self, backup_dist=0.15, backup_speed=0.025, time_allowance=10):后退
    • 调用while not self.backup_client.wait_for_server(timeout_sec=1.0)循环等待服务可达
    • 调用发送请求self.backup_client.send_goal_async,绑定self._feedbackCallback
    • rclpy.spin_until_future_complete:轮询直到完成
    • 获取发送请求结果,未接收返回False
    • 获取结果返回True
  • assistedTeleop(self, time_allowance=30):辅助远程控制
    • 调用while not self.assisted_teleop_client.wait_for_server(timeout_sec=1.0)循环等待服务可达
    • 调用发送请求self.assisted_teleop_client.send_goal_async,绑定self._feedbackCallback
    • rclpy.spin_until_future_complete:轮询直到完成
    • 获取发送请求结果,未接收返回False
    • 获取结果返回True
  • followPath(self, path, controller_id='', goal_checker_id=''):发送一个FollowPath动作请求
    • 调用while not self.follow_path_client.wait_for_server(timeout_sec=1.0)循环等待服务可达
    • 调用发送请求self.follow_path_client.send_goal_async,绑定self._feedbackCallback
    • rclpy.spin_until_future_complete:轮询直到完成
    • 获取发送请求结果,未接收返回False
    • 获取结果返回True
  • cancelTask(self):取消任何类型的任务
    • 存在self.result_future
      • 调用future = self.goal_handle.cancel_goal_async()
      • 调用rclpy.spin_until_future_complete(self, future)
  • isTaskComplete(self):检测任何类型的任务是否完成
    • 存在self.result_future
      • 调用rclpy.spin_until_future_complete(self, self.result_future, timeout_sec=0.10)
        • 未超时,获取状态,不等于GoalStatus.STATUS_SUCCEEDED,任务失败,返回True
        • 超时,返回False,表示还在执行中
    • 不存在或状态等于STATUS_SUCCEEDED返回True,执行成功
  • getFeedback(self):返回反馈数据,return self.feedback
  • getResult(self):根据状态返回映射的新的任务状态
  • waitUntilNav2Active(self, navigator='bt_navigator', localizer='amcl'):阻塞等待导航激活并运行
    • 调用self._waitForNodeToActivate(localizer)
    • 如果localizer == 'amcl'调用self._waitForInitialPose()
    • 调用self._waitForNodeToActivate(navigator)
  • _getPathImpl(self, start, goal, planner_id='', use_start=False):发送一个ComputePathToPose动作请求,返回路径
    • 调用while not self.compute_path_to_pose_client.wait_for_server(timeout_sec=1.0)循环等待服务可达
    • 调用发送请求self.compute_path_to_pose_client.send_goal_async
    • rclpy.spin_until_future_complete:轮询直到完成
    • 获取发送请求结果,未接收返回None
    • 调用self.goal_handle.get_result_async()获取结果,如果状态不等于GoalStatus.STATUS_SUCCEEDED,获取失败,返回None
    • 返回路径结果
  • getPath(self, start, goal, planner_id='', use_start=False):发送一个ComputePathToPose动作请求,返回路径
    • 调用self._getPathImpl,根据结果返回
  • getPathThroughPoses(self, start, goals, planner_id='', use_start=False):发送一个ComputePathThroughPoses动作请求,返回路径
    • 调用while not self.compute_path_through_poses_client.wait_for_server(timeout_sec=1.0)循环等待服务可达
    • 调用发送请求self.compute_path_through_poses_client.send_goal_async
    • rclpy.spin_until_future_complete:轮询直到完成
    • 获取发送请求结果,未接收返回None
    • 调用self.goal_handle.get_result_async()获取结果,如果状态不等于GoalStatus.STATUS_SUCCEEDED,获取失败,返回None
    • 返回路径结果
  • _smoothPathImpl(self, path, smoother_id='', max_duration=2.0, check_for_collision=False):发送一个SmoothPath动作请求,返回路径
    • 调用while not self.smoother_client.wait_for_server(timeout_sec=1.0)循环等待服务可达
    • 调用发送请求self.smoother_client.send_goal_async
    • rclpy.spin_until_future_complete:轮询直到完成
    • 获取发送请求结果,未接收返回None
    • 调用self.goal_handle.get_result_async()获取结果,如果状态不等于GoalStatus.STATUS_SUCCEEDED,获取失败,返回None
    • 返回路径结果
  • smoothPath(self, path, smoother_id='', max_duration=2.0, check_for_collision=False):发送一个SmoothPath动作请求,返回路径
    • 调用self._smoothPathImpl,根据结果返回
  • changeMap(self, map_filepath):在地图服务更改当前静态地图
    • 调用while not self.change_maps_srv.wait_for_server(timeout_sec=1.0)循环等待服务可达
    • 调用发送请求future = self.change_maps_srv.call_async(req)
    • rclpy.spin_until_future_complete:轮询直到完成
    • 获取结果状态,打印相关日志
  • clearAllCostmaps(self):清除所有代价地图
    • 调用self.clearLocalCostmap()
    • 调用self.clearGlobalCostmap()
  • clearLocalCostmap():清除局部代价地图
    • 调用while not self.clear_costmap_local_srv.wait_for_server(timeout_sec=1.0)循环等待服务可达
    • 调用发送请求future = self.clear_costmap_local_srv.call_async(req)
    • rclpy.spin_until_future_complete:轮询直到完成
  • clearGlobalCostmap():清除全局代价地图
    • 调用while not self.clear_costmap_global_srv.wait_for_server(timeout_sec=1.0)循环等待服务可达
    • 调用发送请求future = self.clear_costmap_global_srv.call_async(req)
    • rclpy.spin_until_future_complete:轮询直到完成
  • getGlobalCostmap(self):获取全局代价地图
    • 调用while not self.get_costmap_global_srv.wait_for_server(timeout_sec=1.0)循环等待服务可达
    • 调用发送请求future = self.get_costmap_global_srv.call_async(req)
    • rclpy.spin_until_future_complete:轮询直到完成,返回地图结果
  • getLocalCostmap(self):获取局部代价地图
    • 调用while not self.get_costmap_local_srv.wait_for_server(timeout_sec=1.0)循环等待服务可达
    • 调用发送请求future = self.get_costmap_local_srv.call_async(req)
    • rclpy.spin_until_future_complete:轮询直到完成,返回地图结果
  • lifecycleStartup(self):生命周期启动,启动导航生命周期系统
    • for srv_name, srv_type in self.get_service_names_and_types()
      • srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes'
        • 创建客户端mgr_client = self.create_client(ManageLifecycleNodes, srv_name)
        • while not mgr_client.wait_for_service(timeout_sec=1.0)等待服务可用
        • 发送启动STARTUP请求future = mgr_client.call_async(req)
        • while循环,启动需要完整的map->odom->base_link TF树,所以如果我们不成功,请尝试初始化初始姿势
          • 调用rclpy.spin_until_future_complete(self, future, timeout_sec=0.10)
          • 如果not future调用self._waitForInitialPose(),初始化位姿
          • 否则break
    • 启动成功
  • lifecycleShutdown(self):生命周期关闭,关闭导航生命周期系统
    • for srv_name, srv_type in self.get_service_names_and_types()
      • srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes'
        • 创建客户端mgr_client = self.create_client(ManageLifecycleNodes, srv_name)
        • while not mgr_client.wait_for_service(timeout_sec=1.0)等待服务可用
        • 发送关闭SHUTDOWN请求future = mgr_client.call_async(req)
        • 调用rclpy.spin_until_future_complete(self, future, timeout_sec=0.10)
    • 关闭成功
  • _waitForNodeToActivate(self, node_name):等待节点激活
    • 创建{node_name}/get_state服务客户端
    • while循环等待服务可用
    • while循环获取state,直到state == active跳出返回
  • _waitForInitialPose(self):等待初始位置设置成功
    • while not self.initial_pose_received
      • 调用self._setInitialPose()
      • 调用rclpy.spin_once(self, timeout_sec=1.0)
  • _amclPoseCallback(self, msg)amcl_pose话题订阅回调,设置属性self.initial_pose_received = True
  • _feedbackCallback(self, msg):动作反馈回调,简单赋值self.feedback = msg.feedback
  • _setInitialPose(self):发布初始位姿:self.initial_pose_pub.publish(msg)
  • info(self, msg):打印info日志
  • warn(self, msg):打印warn日志
  • error(self, msg):打印error日志
  • debug(self, msg):打印debug日志

example_nav_to_pose.py

  • 实例化navigator = BasicNavigator()
  • 设置初始化位姿,navigator.setInitialPose(initial_pose)
  • 阻塞等待导航模块起来并运行中,navigator.waitUntilNav2Active(),如果导航模块未配置autostart则需要调用navigator.lifecycleStartup()

results matching ""

    No results matching ""