0x00 往期博文

我的主页

0x01 实验结果

基于 OpenCV 在 HSV 色彩空间下识别红、绿、蓝三原色,框选出每种颜色轮廓最大的物体。

0x02 知识点

0\ 旭日X3派调用USB摄像头,web展示;

1\ HSV 和 LAB 色彩空间;

2\ 发布 PerceptionTargets.msg 消息,用于在 web 端显示;

0x03 编写 usb_cam launch 文件;

import os

from launch import LaunchDescription
from launch_ros.actions import Node

from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from ament_index_python import get_package_share_directory


def generate_launch_description():
    web_service_launch_include = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(
                get_package_share_directory('websocket'),
                'launch/hobot_websocket_service.launch.py'))
    )

    return LaunchDescription([
        web_service_launch_include,

        # 启动图片发布pkg
        Node(
            package='hobot_usb_cam',
            executable='hobot_usb_cam',
            name='hobot_usb_cam',
            parameters=[
                {"camera_calibration_file_path": ""},
                {"frame_id": "default_usb_cam"},
                {"framerate": 30},
                {"image_height": 480},
                {"image_width": 640},
                {"io_method": "mmap"},
                {"pixel_format": "mjpeg"},
                {"video_device": "/dev/video8"},
                {"zero_copy": False}
            ]
        ),

        # 启动web展示pkg
        Node(
            package='websocket',
            executable='websocket',
            output='screen',
            parameters=[
                {"image_topic": "/image"},
                {"image_type": "mjpeg"},
                {"smart_topic": "/hobot_color_ball_detection"},
                {"only_show_image": False}
            ],
            arguments=['--ros-args', '--log-level', 'error']
        ),
    ])

1\ 启动 webservice 节点,ngix 服务器

2\ 启动 usb_cam 节点,发布 jpeg 图像,注: 官方文档声明只能发布 jpeg;

3\ 启动 websokect 节点,智能显示识别结果;

websocket接收 图像消息 智能结果消息,根据时间戳进行匹配,然后输出给web端渲染显示,也可单独显示图像。
图像消息支持 sensor_msgs::msg::Image 以及 shared_mem 的 hbm_img_msgs::msg::HbmMsg1080P 类型消息,必须为hobot codec输出的jpeg格式数据。
智能结果消息支持 ai_msgs::msg::PerceptionTargets 类型消息,其中 header.stamp 必须和该结果对应的 image 消息相同,websocket会使用该字段进行消息匹配,还有智能结果对应的宽高必须要和接收到的图像分辨率一致。


具体依赖的package有:

mipi_cam:启动mipi cam,发布nv12类型图像消息
hobot_codec:将mipi_cam发布的nv12图像编码为websocket需要的jpeg格式图像
mono2d_body_detection:接收nv12格式数据,进行算法推理,发布人体、人头、人脸、人手框感知消息

这里重点关注 ai_msgs::msg::PerceptionTargets 类型消息,参考 mono2d_body_detection 发布的结果;

本文中笔者发布的智能结果消息,名称为: 

{"smart_topic": "/hobot_color_ball_detection"},

往期博文中对 ai_msgs::msg::PerceptionTargets 类型的消息有所介绍:

《深入Originbot:PID人脸追踪》

0x04 编写 color_ball_tracking.py 文件;

1、编写ColorBallTracking节点,订阅 usb_cam 发布的 Image 话题。

2、编写处理图像函数 color_identify

3、发布 PerceptionTargets.msg;

代码如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
@作者: ning
@说明: 订阅 /image 图像消息, 发布 框
"""


import rclpy                                       # ROS2 Python接口库
from rclpy.node import Node                        # ROS2 节点类
from sensor_msgs.msg import Image                  # ROS2标准定义的String消息
from ai_msgs.msg import PerceptionTargets, Target, Roi
from sensor_msgs.msg import RegionOfInterest

from cv_bridge import CvBridge  # 用于 openCV 和 ROS msg 之间的转换
import cv2
import math
import numpy as np
import time


class ColorBallTracking(Node):

    def __init__(self, name):
        super().__init__(name)                              # ROS2节点父类初始化

        # 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)
        self.sub = self.create_subscription(
            Image, 'image', self.listener_callback, 10)

        # 创建发布者对象(消息类型、话题名、队列长度), 发布框
        self.publisher_ = self.create_publisher(
            PerceptionTargets, 'hobot_color_ball_detection', 10)

        self.cv_bridge = CvBridge()

    # 色块识别函数
    def color_identify(self, image, target_color='red'):
        _img = image.copy()

        _color_space = 'hsv'  # 'hsv' 或 'lab'

        # 获取图像宽高, 默认 640, 480
        img_h, img_w = _img.shape[:2]

        # 图像已经是 640*480 了
        # img_resize = cv2.resize(_img, SIZE, interpolation=cv2.INTER_NEAREST)

        # 高斯模糊, 抑制摄像头噪声(一些白点会被过滤掉,但影响不大, 因为他们的面积不会超过被识别物体?)
        imt_g_blur = cv2.GaussianBlur(_img, (7, 7), 0)  # 可以改成均值模糊

        frame_mask = None  # 掩膜结果
        if _color_space == 'hsv':
            hsv_data = {
                'red': {'lower': (0, 43, 46), 'upper': (10, 255, 255)},
                'red1': {'lower': (156, 43, 46), 'upper': (180, 255, 255)},
                'blue': {'lower': (100, 43, 46), 'upper': (124, 255, 255)},
                'green': {'lower': (35, 43, 46), 'upper': (77, 255, 255)},
            }

            # 将图像转换到 HSV 色彩空间
            frame_hsv = cv2.cvtColor(imt_g_blur, cv2.COLOR_BGR2HSV)

            # 对图像进行掩膜运算
            frame_mask = cv2.inRange(frame_hsv,
                                     hsv_data[target_color]['lower'],
                                     hsv_data[target_color]['upper'])

            # 红色有两个区间, 所以做与运算
            if target_color == 'red':
                frame_mask1 = cv2.inRange(frame_hsv,
                                          hsv_data['red1']['lower'],
                                          hsv_data['red1']['upper'])
                frame_mask = cv2.bitwise_or(frame_mask, frame_mask1)

        elif _color_space == 'lab':
            lab_data = {
                'red': {'min': (0, 43, 46), 'max': (10, 255, 255)},
                'red1': {'min': (156, 43, 46), 'max': (180, 255, 255)},
                'blue': {'min': (100, 43, 46), 'max': (124, 255, 255)}
            }

            # 将图像转换到LAB空间
            frame_lab = cv2.cvtColor(imt_g_blur, cv2.COLOR_BGR2LAB)

            # 对原图像进行掩模运算
            frame_mask = cv2.inRange(frame_lab,
                                     np.array(lab_data[target_color]['min']),
                                     np.array(lab_data[target_color]['max']))

        # 颜色空间设置错误
        if frame_mask is None:
            return -1, -1, -1, 0

        # 开运算, 消除白点
        # opened = cv2.morphologyEx(frame_mask, cv2.MORPH_OPEN, np.ones((5, 5), np.uint8))  # 开运算, 消除小白点
        # closed = cv2.morphologyEx(opened, cv2.MORPH_CLOSE, np.ones((3, 3), np.uint8))  # 闭运算, 闭合被识别的障碍物
        closed = frame_mask

        # debug
        # cv2.imshow('color_identify', closed)
        # cv2.waitKey(10)  # 等待 10ms

        # 找出所有外轮廓
        contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)

        # 如果未发现任何颜色, 或者说 没有发现轮廓
        if len(contours) == 0:
            return -1, -1, -1, 0

        # 对轮廓对象按面积进行排序, 降序, 面积最大的为序号为0
        c_sorted = sorted(contours,
                          key=lambda x:
                          math.fabs(
                              cv2.contourArea(x)
                          ), reverse=True)

        c_0 = c_sorted[0]   # 最大轮廓

        # 如果面积小于 100, 认为是噪声, 进行下一次处理
        if math.fabs(cv2.contourArea(c_0)) <= 100:
            return -1, -1, -1, 0

        return tuple(
            map(int, cv2.boundingRect(c_0))
        )

    # 图像回调函数

    def listener_callback(self, data):
        self.get_logger().info('Receiving video frame')         # 输出日志信息,提示已进入回调函数

        # 将ROS的图像消息转化成OpenCV图像
        image = cv2.imdecode(np.frombuffer(data.data, np.uint8), cv2.IMREAD_COLOR)

        # debug
        # cv2.imshow("origin", image)
        # cv2.waitKey(10)  # 等待 10ms

        # 颜色识别
        _color_type = 'red'

        # --- PerceptionTargets
        pt = PerceptionTargets()
        pt.header = data.header
        pt.fps = 30
        # pt.perfs = []
        # --- target
        t = Target()
        t.type = 'red'
        t.track_id = 0

        t2 = Target()
        t2.type = 'blue'
        t2.track_id = 0
        # --- rois
        r = Roi()
        r2 = Roi()
        # r.type = 'red'
        # --- RegionOfInterest
        roi1 = RegionOfInterest()
        roi2 = RegionOfInterest()

        # 颜色识别
        roi1.x_offset, roi1.y_offset, roi1.width, roi1.height = self.color_identify(image, "red")
        roi2.x_offset, roi2.y_offset, roi2.width, roi2.height = self.color_identify(image, "blue")

        roi1.do_rectify = False
        roi2.do_rectify = False
        # --- RegionOfInterest end ---
        r.rect = roi1
        r.confidence = 0.

        r2.rect = roi2
        r2.confidence = 0.
        # --- rois end ---
        t.rois = [r]
        t2.rois = [r2]
        # t.attributes = []
        # t.points = []
        # t.captures = []
        # --- target end ---
        pt.targets = [t, t2]
        # pt.disappeared_targets = []
        # --- PerceptionTargets end ---

        self.publisher_.publish(pt)             # 发布预测结果消息


def main(args=None):                                    # ROS2节点主入口main函数
    rclpy.init(args=args)                               # ROS2 Python接口初始化
    node = ColorBallTracking("color_ball_tracking")     # 创建ROS2节点对象并进行初始化
    rclpy.spin(node)                                    # 循环等待ROS2退出
    node.destroy_node()                                 # 销毁节点对象
    rclpy.shutdown()                                    # 关闭ROS2 Python接口

0x05 难点;

1\ 将 usb_cam 发布的 jpeg 格式图像转为 bgr8;

image = cv2.imdecode(np.frombuffer(data.data, np.uint8), cv2.IMREAD_COLOR)
参考:
2\ color_identify 色块识别函数中的 Opencv 处理过程;
3\ 组织发布 PerceptionTargets.msg;