ROS2开发笔记——动作(action)样例(五)

2023-05-21 Views970字5 min read

Action是基于topic和service之上的通信方式,使用于那些需要较长时间执行的任务。Action通信示例如下,

下面,我们将通过一个计算斐波那契 (Fibonacci)数列的例子演示action通信。在斐波那契数列中第一和第二个元素分别为0和1,然后其余每位元素是其前两位元素之和,因此需要一步步递归计算,属于需要较长时间执行的任务。

在数学上,斐波那契数定义如下,

F0=0F1=1Fn=Fn1+Fn2(n2)* F_{0} = 0 \\ * F_{1} = 1 \\ * F_{n} = F_{n-1} + F_{n-2} (n \leq 2) \\

一、Action通信数据结构

新建包

$ mkdir -p ros2_ws/src #you can reuse existing workspace with this naming convention
$ cd ros2_ws/src
$ ros2 pkg create action_tutorials_interfaces
$ cd action_tutorials_interfaces
$ mkdir action

在action目录下新建接口文件Fibonacci.action。

Action通信构建于topic和service之上,客户端发起请求,服务端响应并返回结果,期间需要不断反馈任务进行进度,因此action通信的数据结构应当由以下三部分构成,

# Request
---
# Result
---
# Feedback

首先,客户端发起的请求是求几阶的斐波那契数列,因此请求应该是整型数;服务端应该将对应的数列返回,因此结果应该是整型数组;而反馈是计算的中间结果,因此同样是整型数组。根据上述分析,定义数据结构如下,

int32 order
---
int32[] sequence
---
int32[] partial_sequence

更新CMakeList.txt

find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "action/Fibonacci.action"
)

更新package.xml

<buildtool_depend>rosidl_default_generators</buildtool_depend>

<depend>action_msgs</depend>

<member_of_group>rosidl_interface_packages</member_of_group>

编译

# Change to the root of the workspace
$ cd ~/ros2_ws
# Build
$ colcon build

测试

# Source our workspace
# On Windows: call install/setup.bat
$ . install/setup.bash
# Check that our action definition exists
$ ros2 interface show action_tutorials_interfaces/action/Fibonacci

二、Action通信样例

1、环境准备

$ ros2 pkg create --build-type ament_python action_tutorials_py

2、编写ActionServer

在src/action_tutorials_py/action_tutorials_py下新建fibonacci_action_server.py文件,

import time

import rclpy
from rclpy.action import ActionServer # 导入ActionServer
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        # 初始化ActionServe
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...') # 日志

        feedback_msg = Fibonacci.Feedback() # 实例化反馈信息,注意调用方式
        feedback_msg.partial_sequence = [0, 1] # 初始化反馈信息

        for i in range(1, goal_handle.request.order): # 从goal_handle获取请求
            # 迭代计算斐波那契数列,并将计算结果作为反馈信息发布
            feedback_msg.partial_sequence.append(
                feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])
            self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))
            goal_handle.publish_feedback(feedback_msg)
            time.sleep(1)

        goal_handle.succeed() # 任务执行完成后,将标志置为SUCCEED

        result = Fibonacci.Result() # 实例化结果信息
        result.sequence = feedback_msg.partial_sequence # 将最后一个反馈信息作为结果
        return result


def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    rclpy.spin(fibonacci_action_server)


if __name__ == '__main__':
    main()

3、编写ActionClient

在src/action_tutorials_py/action_tutorials_py下新建fibonacci_action_client.py文件,

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci') #新建ActionClient,用过action的名字'fibonacci'与ActionServer建立通信

    def send_goal(self, order): # 发起请求的函数
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server() # 等待服务

        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback) # 异步执行,将请求发给server,返回一个future

        self._send_goal_future.add_done_callback(self.goal_response_callback) # 注册响应回调函数

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    action_client.send_goal(10)

    rclpy.spin(action_client)


if __name__ == '__main__':
    main()

4、更新依赖与程序执行入口

package.xml

  <depend>action_tutorials_interfaces</depend>

  <exec_depend>rclpy</exec_depend>

setup.py

entry_points={
        'console_scripts': [
            'fibonacci_action_server = action_tutorials_py.fibonacci_action_server:main',
            'fibonacci_action_client = action_tutorials_py.fibonacci_action_client:main',
        ],
    },

5、编译与运行

$ colcon build --packages-select action_tutorials_py
$ source install/setup.bash
$ ros2 run action_tutorials_py fibonacci_action_server
$ source install/setup.bash
$ ros2 run action_tutorials_py fibonacci_action_client
EOF