In the world of robotics, planning a path from one point to another isn’t just about finding a route—it’s about doing it intelligently.
If you’ve ever implemented Dijkstra’s algorithm, you know how powerful it is… but also how slow it can be when the map gets big. What if we could be smarter? What if we could guide the exploration toward the goal instead of blindly exploring everything?
That’s exactly what A* does.
In this tutorial, we’ll dive into the theory behind A*, then implement it as a Python ROS 2 node, using an occupancy grid and a simulated robot. By the end, you’ll be able to visualize in RViz how A* efficiently explores only the most promising cells—saving time, resources, and computation.
Let’s start with the same question we posed in the course:
Can we do any better than Dijkstra? Is there a faster way to reach the goal without exploring in all directions?
Yes—and it’s called A*.
While Dijkstra explores nodes uniformly, A* evaluates each possible move with a simple yet powerful formula:
$$f(n) = g(n) + h(n)$$
This heuristic is what gives A* its directionality. It guesses which paths are likely to lead to the goal, and gives them priority.
In the course, we visualized two different heuristics:
Since our robot operates on a 2D occupancy grid where it can only move in four directions (up, down, left, right), Manhattan distance is the perfect fit.
Here’s how it looks in code:
def manhattan_distance(self, node: GraphNode, goal_node: GraphNode):
return abs(node.x - goal_node.x) + abs(node.y - goal_node.y)
This distance doesn’t consider walls or obstacles—it’s just an estimate. But it’s enough to guide the algorithm intelligently.
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from nav_msgs.msg import OccupancyGrid, Path
from geometry_msgs.msg import PoseStamped, Pose
from rclpy.qos import QoSProfile, DurabilityPolicy
from tf2_ros import Buffer, TransformListener, LookupException
from queue import PriorityQueue
class GraphNode:
def __init__(self, x, y, cost=0, heuristic=0, prev=None):
self.x = x
self.y = y
self.cost = cost
self.heuristic = heuristic
self.prev = prev
def __lt__(self, other):
return (self.cost + self.heuristic) < (other.cost + other.heuristic)
def __eq__(self, other):
return self.x == other.x and self.y == other.y
def __hash__(self):
return hash((self.x, self.y))
def __add__(self, other):
return GraphNode(self.x + other[0], self.y + other[1])
class AStarPlanner(Node):
def __init__(self):
super().__init__("a_star_node")
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)
map_qos = QoSProfile(depth=10)
map_qos.durability = DurabilityPolicy.TRANSIENT_LOCAL
self.map_sub = self.create_subscription(
OccupancyGrid, "/map", self.map_callback, map_qos
)
self.pose_sub = self.create_subscription(
PoseStamped, "/goal_pose", self.goal_callback, 10
)
self.path_pub = self.create_publisher(Path, "/a_star/path", 10)
self.map_pub = self.create_publisher(OccupancyGrid, "/a_star/visited_map", 10)
self.map_ = None
self.visited_map_ = OccupancyGrid()
def map_callback(self, map_msg: OccupancyGrid):
self.map_ = map_msg
self.visited_map_.header.frame_id = map_msg.header.frame_id
self.visited_map_.info = map_msg.info
self.visited_map_.data = [-1] * (map_msg.info.height * map_msg.info.width)
def goal_callback(self, pose: PoseStamped):
if self.map_ is None:
self.get_logger().error("No map received!")
return
self.visited_map_.data = [-1] * (self.visited_map_.info.height * self.visited_map_.info.width)
try:
map_to_base_tf = self.tf_buffer.lookup_transform(
self.map_.header.frame_id, "base_footprint", rclpy.time.Time()
)
except LookupException:
self.get_logger().error("Could not transform from map to base_footprint")
return
map_to_base_pose = Pose()
map_to_base_pose.position.x = map_to_base_tf.transform.translation.x
map_to_base_pose.position.y = map_to_base_tf.transform.translation.y
map_to_base_pose.orientation = map_to_base_tf.transform.rotation
path = self.plan(map_to_base_pose, pose.pose)
if path.poses:
self.get_logger().info("Shortest path found!")
self.path_pub.publish(path)
else:
self.get_logger().warn("No path found to the goal.")
def plan(self, start: Pose, goal: Pose):
# Define possible movement directions
explore_directions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
# Priority queue with custom comparison for A* based on cost + heuristic
pending_nodes = PriorityQueue()
visited_nodes = set()
start_node = self.world_to_grid(start)
goal_node = self.world_to_grid(goal)
start_node.heuristic = self.manhattan_distance(start_node, goal_node)
pending_nodes.put(start_node)
while not pending_nodes.empty() and rclpy.ok():
active_node = pending_nodes.get()
# Goal found!
if active_node == goal_node:
break
# Explore neighbors
for dir_x, dir_y in explore_directions:
new_node: GraphNode = active_node + (dir_x, dir_y)
if (new_node not in visited_nodes and self.pose_on_map(new_node) and
self.map_.data[self.pose_to_cell(new_node)] == 0):
new_node.cost = active_node.cost + 1
new_node.heuristic = self.manhattan_distance(new_node, goal_node)
new_node.prev = active_node
pending_nodes.put(new_node)
visited_nodes.add(new_node)
self.visited_map_.data[self.pose_to_cell(active_node)] = -106
self.map_pub.publish(self.visited_map_)
path = Path()
path.header.frame_id = self.map_.header.frame_id
while active_node and active_node.prev and rclpy.ok():
last_pose: Pose = self.grid_to_world(active_node)
last_pose_stamped = PoseStamped()
last_pose_stamped.header.frame_id = self.map_.header.frame_id
last_pose_stamped.pose = last_pose
path.poses.append(last_pose_stamped)
active_node = active_node.prev
path.poses.reverse()
return path
def manhattan_distance(self, node: GraphNode, goal_node: GraphNode):
return abs(node.x - goal_node.x) + abs(node.y - goal_node.y)
def pose_on_map(self, node: GraphNode):
return 0 <= node.x < self.map_.info.width and 0 <= node.y < self.map_.info.height
def world_to_grid(self, pose: Pose) -> GraphNode:
grid_x = int((pose.position.x - self.map_.info.origin.position.x) / self.map_.info.resolution)
grid_y = int((pose.position.y - self.map_.info.origin.position.y) / self.map_.info.resolution)
return GraphNode(grid_x, grid_y)
def grid_to_world(self, node: GraphNode) -> Pose:
pose = Pose()
pose.position.x = node.x * self.map_.info.resolution + self.map_.info.origin.position.x
pose.position.y = node.y * self.map_.info.resolution + self.map_.info.origin.position.y
return pose
def pose_to_cell(self, node: GraphNode):
return node.y * self.map_.info.width + node.x
def main(args=None):
rclpy.init(args=args)
node = AStarPlanner()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
Now let’s build the actual planner.
Since this planner shares a lot with the Dijkstra version we already implemented, we’ll reuse that script as a starting point.
In your ROS 2 workspace:
cp dijkstra_planner.py a_star_planner.py
Open the file and rename the class:
class AStarPlanner(Node):
def __init__(self):
super().__init__('a_star_node')
Update the topics it will publish to:
self.path_pub = self.create_publisher(Path, "/a_star/path", 10)
self.map_pub = self.create_publisher(OccupancyGrid, "/a_star/visited_map", 10)
In Dijkstra, each node only tracked its cost and parent. A* adds a heuristic value.
So modify the GraphNode
class:
def __init__(self, x, y, cost=0, heuristic=0, prev=None):
self.x = x
self.y = y
self.cost = cost
self.heuristic = heuristic
self.prev = prev
Update how nodes are compared inside the priority queue:
def __lt__(self, other):
return (self.cost + self.heuristic) < (other.cost + other.heuristic)
This ensures that nodes with the lowest combined cost and estimate are explored first.
In your planner, after converting the robot’s current pose and the goal into grid coordinates:
start_node = self.world_to_grid(start)
goal_node = self.world_to_grid(goal)
You can now assign the heuristic to the start node:
start_node.heuristic = self.manhattan_distance(start_node, goal_node)
Then, in your main exploration loop, assign it to each neighbor node:
new_node.heuristic = self.manhattan_distance(new_node, goal_node)
We want to distinguish A* from Dijkstra visually. Let’s mark the visited cells in orange using a different value:
self.visited_map_.data[self.pose_to_cell(active_node)] = -106
In RViz, this will appear as a bright orange overlay on the grid—clearly showing the path of exploration.
Open CMakeLists.txt
of your package and make sure to install the new file:
install(PROGRAMS
${PROJECT_NAME}/a_star_planner.py
DESTINATION lib/${PROJECT_NAME}
)
Then build the workspace:
cd bumperbot_ws
colcon build
Launch the robot simulation:
ros2 launch bumperbot_bringup simulated_robot.launch.py
Then launch your planner in a new terminal:
ros2 run bumperbot_planning a_star_planner
In RViz:
/a_star/path
to visualize the calculated path/a_star/visited_map
and set display type to CostmapYou’ll immediately see A* in action—moving quickly and efficiently toward the target!
Try sending goals both near and far from the robot’s current location.
You’ll notice a clear pattern: A* prioritizes cells in the direction of the goal, skipping irrelevant parts of the map.
A* is fast, but it’s not perfect. If the heuristic is not admissible (i.e. it overestimates the cost), A* might return a non-optimal path.
In our implementation, Manhattan distance is admissible—so we’re safe. But always remember:
The heuristic is just a guess, not the truth.
By integrating a simple heuristic into our existing path planner, we made our robot smarter—and much faster at navigating to its goal.
With just a few modifications, the A* planner in Python is now ready to explore any occupancy grid efficiently, intelligently, and with a visual feedback that makes debugging and testing a joy.