ROS2开发笔记——自定义数据结构(msg、srv)(四)

2023-05-17 Views1534字8 min read

目前,我们学习的topic和service两种通信方式用的都是ROS预定义的数据结构,但是有时候为了满足特殊需求就得自定义数据结构。

一般而言,好的开发习惯是将数据结构放到独立的包中方便管理,也有数据结构的功能代码放到同一个包中的,例如官方教程中的Implementing custom interfaces,这里,我们遵循官方建议的习惯,将数据结构放到独立的包中。

一、新建包

好像现在只支持用C++定义数据结构,在ros2_ws/src下新建数据结构包,

$ ros2 pkg create --build-type ament_cmake tutorial_interfaces

采用C++开发时,新建的包稍微有些不一样,

tutorial_interfaces/
     CMakeLists.txt
     include/tutorial_interfaces/
     package.xml
     src/

package.xml还是指定了包的依赖(依赖包的名字会不一样),src/存放源文件,include/tutorial_interfaces/存放头文件,CMakeLists.txt描述了怎么编译这个包。

在这个样例中我们学习怎么自定义信息和服务两种数据结构,在ros2_ws/src/tutorial_interfaces目录下再新建两个对应的目录,

$ mkdir msg srv

二、定义数据结构

如果我们要定义的数据结构是信息,那么就再tutorial_interfaces/msg目录下新建对应的文件,信息数据结构文件以.msg结尾,例如我们定义一个Num.msg,如下所示,它只有一个整型变量

int64 num

在我们定义的数据结构中也可以引用ROS中预定义的数据结构,假如我们想定义一个球的数据结构,球需要定义球心和半径,我们就可以如下定义Sphere.msg,geometry_msgs/Point是ROS预定义的数据结构

geometry_msgs/Point center
float64 radius

如果我们要定义的数据结构是服务,那么就再tutorial_interfaces/srv目录下新建对应的文件,服务的数据结构以.srv结尾,仿照前面服务通信用到加法服务数据结构定义3个整数的加法服务数据结构AddThreeInts.srv,

int64 a
int64 b
int64 c
---
int64 sum

同样,上半部分是请求,下半部分是结果。

三、更新配置文件

3.1、CMakeLists.txt

新建包时生成的CMakeLists.txt如下所示,

cmake_minimum_required(VERSION 3.8)
project(tutorial_interfaces)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

# find dependencies
find_package(ament_cmake REQUIRED)
# uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)

if(BUILD_TESTING)
  find_package(ament_lint_auto REQUIRED)
  # the following line skips the linter which checks for copyrights
  # comment the line when a copyright and license is added to all source files
  set(ament_cmake_copyright_FOUND TRUE)
  # the following line skips cpplint (only works in a git repo)
  # comment the line when this package is in a git repo and when
  # a copyright and license is added to all source files
  set(ament_cmake_cpplint_FOUND TRUE)
  ament_lint_auto_find_test_dependencies()
endif()

ament_package()

我们的球数据结构Sphere.msg依赖geometry_msgs包,同时我们还需要调用rosidl_default_generators来把我们自定义是数据结构转换成语言相关的代码,默认的生成器应该包含了C++和Python两种语言。最后,调用rosidl_generate_interfaces()生成对应的接口,

find_package(geometry_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/Num.msg"
  "msg/Sphere.msg"
  "srv/AddThreeInts.srv"
  DEPENDENCIES geometry_msgs # Add packages that above messages depend on, in this case geometry_msgs for Sphere.msg
)

3.2、package.xml

package.xml增加如下代码,

<depend>geometry_msgs</depend> # Sphere.msg依赖该包
<buildtool_depend>rosidl_default_generators</buildtool_depend> # 用于生成语言相关的代码
<exec_depend>rosidl_default_runtime</exec_depend> # 这是使用自定义接口需要的运行时
<member_of_group>rosidl_interface_packages</member_of_group> # 我们的接口是关联到这个包的

四、编译与验证

编译

$ colcon build --packages-select tutorial_interfaces

验证

$ source install/setup.bash
$ ros2 interface show tutorial_interfaces/msg/Num
int64 num
$ ros2 interface show tutorial_interfaces/msg/Sphere
geometry_msgs/Point center
        float64 x
        float64 y
        float64 z
float64 radius
$ ros2 interface show tutorial_interfaces/srv/AddThreeInts
int64 a
int64 b
int64 c
---
int64 sum

4.1、在发布/订阅者通信中验证

回到我们的发布/订阅者通信开发样例中,按如下修改代码,我们发布的主题不再使用预定义的String类型,而是简单发布一个数字,用我们自定义的Num.msg类型,

发布者代码修改如下,

import rclpy
from rclpy.node import Node

from tutorial_interfaces.msg import Num                            # CHANGE


class MinimalPublisher(Node):

    def __init__(self):
        super().__init__('minimal_publisher')
        self.publisher_ = self.create_publisher(Num, 'topic', 10)  # CHANGE
        timer_period = 0.5
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0

    def timer_callback(self):
        msg = Num()                                                # CHANGE
        msg.num = self.i                                           # CHANGE
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%d"' % msg.num)       # CHANGE
        self.i += 1


def main(args=None):
    rclpy.init(args=args)

    minimal_publisher = MinimalPublisher()

    rclpy.spin(minimal_publisher)

    minimal_publisher.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

订阅者代码修改如下,

import rclpy
from rclpy.node import Node

from tutorial_interfaces.msg import Num                        # CHANGE


class MinimalSubscriber(Node):

    def __init__(self):
        super().__init__('minimal_subscriber')
        self.subscription = self.create_subscription(
            Num,                                               # CHANGE
            'topic',
            self.listener_callback,
            10)
        self.subscription

    def listener_callback(self, msg):
            self.get_logger().info('I heard: "%d"' % msg.num)  # CHANGE


def main(args=None):
    rclpy.init(args=args)

    minimal_subscriber = MinimalSubscriber()

    rclpy.spin(minimal_subscriber)

    minimal_subscriber.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

我们还需要在package.xml添加依赖

<exec_depend>tutorial_interfaces</exec_depend>

最后,重新编译运行

$ colcon build --packages-select py_pubsub
$ ros2 run py_pubsub talker
[INFO] [minimal_publisher]: Publishing: '0'
[INFO] [minimal_publisher]: Publishing: '1'
[INFO] [minimal_publisher]: Publishing: '2'
$ ros2 run py_pubsub listener
...

4.2、在服务通信中验证

代码修改如下

服务端

from tutorial_interfaces.srv import AddThreeInts        # CHANGE

import rclpy
from rclpy.node import Node


class MinimalService(Node):

    def __init__(self):
        super().__init__('minimal_service')
        self.srv = self.create_service(AddThreeInts, 'add_three_ints', self.add_three_ints_callback)     # CHANGE

    def add_three_ints_callback(self, request, response):
        response.sum = request.a + request.b + request.c        # CHANGE
        self.get_logger().info('Incoming request\na: %d b: %d c: %d' % (request.a, request.b, request.c))  # CHANGE

        return response

def main(args=None):
    rclpy.init(args=args)

    minimal_service = MinimalService()

    rclpy.spin(minimal_service)

    rclpy.shutdown()

if __name__ == '__main__':
    main()

客户端

from tutorial_interfaces.srv import AddThreeInts                            # CHANGE
import sys
import rclpy
from rclpy.node import Node


class MinimalClientAsync(Node):

    def __init__(self):
        super().__init__('minimal_client_async')
        self.cli = self.create_client(AddThreeInts, 'add_three_ints')       # CHANGE
        while not self.cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('service not available, waiting again...')
        self.req = AddThreeInts.Request()            # CHANGE

    def send_request(self):
        self.req.a = int(sys.argv[1])
        self.req.b = int(sys.argv[2])
        self.req.c = int(sys.argv[3])              # CHANGE
        self.future = self.cli.call_async(self.req)


def main(args=None):
    rclpy.init(args=args)

    minimal_client = MinimalClientAsync()
    minimal_client.send_request()

    while rclpy.ok():
        rclpy.spin_once(minimal_client)
        if minimal_client.future.done():
            try:
                response = minimal_client.future.result()
            except Exception as e:
                minimal_client.get_logger().info(
                    'Service call failed %r' % (e,))
            else:
                minimal_client.get_logger().info(
                    'Result of add_three_ints: for %d + %d + %d = %d' %                                # CHANGE
                    (minimal_client.req.a, minimal_client.req.b, minimal_client.req.c, response.sum))  # CHANGE
            break

    minimal_client.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

添加依赖

<exec_depend>tutorial_interfaces</exec_depend>

编译运行

$ colcon build --packages-select py_srvcli
$ ros2 run py_srvcli service
$ ros2 run py_srvcli client 2 3 1
EOF