0x00 往期博文

OriginBot我的第一辆智能小车

使用(PS2)手柄控制OriginBot小车

《HelloOriginBot::使用Python编写Joy手柄功能包》

0x01 前言

OriginBot小车 基于 旭日X3pitros  开发,为了能够编写出更优秀的 'App' 释放小车的潜能,我们需要熟读 旭日X3pitros 用户手册以便更好地理解本文内容;

在用户手册中,需重点了解以下内容:

  • x3派面向生态开发者的产品定位,以及如何通过 Ai工具链 机器人开发平台(tros) 发挥5Tops算力;
  • x3派系统安装、配置,了解 x3派 的硬件接口,体验基础图像采集和Ai推理功能;
  • 了解 tros 基于 ros 新增的功能模块,例如 Hobot Sensor(机器人常用传感器)、Hobot Codec(视频编码器)、Hobot Render(Web端可视化)等;
  • 体验 tros 图像采集->图像编解码->图像展示 功能;
  • 了解 Boxs 算法仓库 和 Apps 应用实例;
  • 学习 小车人体跟随(body_tracking) App 源码;

您可以通过 '地平线程序员奶爸' 的教程来深入学习 tros 手势控制app和人体追踪app:

开发者说 | 地平线程序员奶爸带你玩转机器人开发平台 —— 第一期 手势控制(附直播课视频)

开发者说 | 地平线程序员奶爸带你玩转机器人开发平台 —— 第二期 人体跟随(附直播课视频)

0x02 白话

说白了,旭日x3pi为我们机器人开发提供了硬件平台,tros为我们机器人开发提供了软件平台,二者需结合起来一起使用;

硬件平台为我们提供了很高的算力,而软件平台则为发挥硬件的潜能提供了丰富的 demo;

我们可以借助这些demo(Boxs和Apps),高效、便捷(复制、粘贴)的进行机器人开发,打造具有竞争力的智能机器人产品。

0x03 创建人脸追踪功能包

通过ssh连接上OriginBot小车,cd进 /userdata/dev_ws/src/originbot 目录,创建 originbot_face_tracking 功能包:

ros2 pkg create --build-type ament_python originbot_face_tracking

这里的命名规则与Originbot其他功能包一致

0x04 编写Launch文件

在功能包路径下新建 launch 目录,编写 ob_face_tracking.launch.py 文件欲实现的系统框图如下:

其中椭圆形框内为Node名,矩形框内为topic名。

节点名与功能对照下图:

  • 传感节点使用了 tros Hobot Ssnros组件中的 /mipi_cam Node,参数指定使用 GC4663 摄像头,发布的图片格式和分辨率可以直接用于算法推理,其中通过共享内存的方式发布图片,可以极大地降低系统负载和传输延迟。
  • 感知节点使用了 tros 中的 Boxs 算法仓库,订阅 Hobot Sensor(mipi_cam Node)发布的图像数据后,通过视觉算法进行推理,检测出人体关键部位;
    • 人体检测和跟踪算法Node 订阅 Hobot Senso r发布的图像消息,利用 BPU 处理器进行AI推理,发布包含人体、人头、人脸、人手框和人体关键点检测结果的AI msg,并通过多目标跟踪(multi-target tracking,即MOT)功能,实现检测框的跟踪和ID分配。
  • 交互节点,人体检测结果订阅节点 /ai_msg_sub_node,订阅人体检测和跟踪节点发布的 Ai msg 消息,执行跟踪策略。根据人脸在画面中的位置通过比例控制器计算出小车旋转角度;
  • 控制节点/ob_tracking_Robot_CmdVel,发布消息名为'/cmd_vel'的命令控制小车做旋转运动;
  • WEB展示,由 /hobot_codec 将 /mipi_cam 发布的 ’nv12‘ 格式的图像转换为 ’jpeg‘ 格式,以便 /websocket 展示。同时, /websocket 节点订阅 /hobot_mono2d_body_detection 消息用于图像渲染;

ob_face_tracking.launch.py 文件的代码如下:

import os

from launch import LaunchDescription
from launch_ros.actions import Node

from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from ament_index_python import get_package_share_directory

def generate_launch_description():
    web_service_launch_include = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(
                get_package_share_directory('websocket'),
                'launch/hobot_websocket_service.launch.py'))
    )

    return LaunchDescription([
        web_service_launch_include,
        # 启动图片发布pkg
        Node(
            package='mipi_cam',
            executable='mipi_cam',
            output='screen',
            parameters=[
                {"camera_calibration_file_path": "/opt/tros/lib/mipi_cam/config/GC4663_calibration.yaml"},
                {"out_format": "nv12"},
                {"image_width": 960},
                {"image_height": 544},
                {"io_method": "shared_mem"},
                {"video_device": "GC4663"}
            ],
            arguments=['--ros-args', '--log-level', 'error']
        ),
        
        # 启动jpeg图片编码&发布pkg
        Node(
            package='hobot_codec',
            executable='hobot_codec_republish',
            output='screen',
            parameters=[
                {"channel": 1},
                {"in_mode": "shared_mem"},
                {"in_format": "nv12"},
                {"out_mode": "ros"},
                {"out_format": "jpeg"},
                {"sub_topic": "/hbmem_img"},
                {"pub_topic": "/image_jpeg"}
            ],
            arguments=['--ros-args', '--log-level', 'error']
        ),

        # 启动单目rgb人体、人头、人脸、人手框和人体关键点检测pkg
        Node(
            package='mono2d_body_detection',
            executable='mono2d_body_detection',
            output='screen',
            parameters=[
                {"ai_msg_pub_topic_name": "/hobot_mono2d_body_detection"},
                {"model_file_name": "/opt/tros/lib/mono2d_body_detection/config/multitask_body_head_face_hand_kps_960x544.hbm"}
            ],
            arguments=['--ros-args', '--log-level', 'error']
        ),

        # 启动web展示pkg
        Node(
            package='websocket',
            executable='websocket',
            output='screen',
            parameters=[
                {"image_topic": "/image_jpeg"},
                {"image_type": "mjpeg"},
                {"smart_topic": "/hobot_mono2d_body_detection"}
            ],
            arguments=['--ros-args', '--log-level', 'error']
        ),

        # 启动人脸追踪节点 
        Node(
            package='originbot_face_tracking',
            executable='face_tracking',
            output='screen',
            parameters=[
            ],
        )
    ])

0x05 功能包的实现

功能包中的 py 文件如下:

./oringinbot_face_tracking
├── common.py                  # 通用定义,如 图像宽、高等
├── face_tracking.py          # 人脸追踪主程序
├── __init__.py
├── main.py                       # 程序入口,维护一个SingleThreadedExecutor,负责调起其他节点
├── robot_ctrl_node.py      # 提供 robot_ctl(msg:Twist) 接口,负责发布 /cmd_vel 消息
├── smart_subscriber.py    # 订阅 /hobot_mono2d_body_detection 消息,之后调用 face_tracking.FeedSmart(msg:PerceptionTargets) 灌入 Ai msg
└── time_helper.py             # 时间计算工具

功能包程序设计如下:

略缩图如下:

main.py 代码如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

__author__ = 'ning'

"""
    @作者: ning
    @说明: 人脸跟踪app程序入口, 调起其它程序接口.
    创建并维护一个单线程执行器(SingleThreadedExecutor)将追踪器(TrackingManager)的所有节点添加到执行器中, 同时也将'智能消息订阅(smart_subscriber)'节点添加到执行器中;
    接着, 将追踪管理器(TrackingManager)的FeedSmart函数绑定到智能消息订阅节点(smart_subscriber)的回调函数中, 
    并启动单线程执行器的主循环函数,使得执行器可以在单独的线程中执行所有节点的回调函数。

    最后, 在程序退出之前, 释放追踪管理器的节点, 并关闭ROS2节点。
"""

import rclpy
from .smart_subscriber import SmartMsgSubscriber
from .face_tracking import TrackingManager

from functools import partial


def main():
    print("\n\tThis is face tracking package.\n\n"
          "Only rotate the robot without moving forward and backward.\n"
          "============================================\n")
    # 初始化 ROS2 节点
    rclpy.init()

    # 创建一个节点执行器
    node_executor = rclpy.executors.SingleThreadedExecutor()

    # 获取 TrackingManger 单例的所有Nodes;
    nodes = TrackingManager.Instance().GetNodes()

    for node in nodes:
        node_executor.add_node(node)    # 将每个节点添加到执行器中,以便于执行器可以在单独的线程中执行回调函数;

    # 将追踪管理器的 FeedSmart 函数绑定到一个回调函数中,将追踪管理器的单例对象作为第一个参数传入,订阅的消息作为第二个参数传入。
    smart_msg_subscriber = SmartMsgSubscriber(
        "ai_msg_sub_node",
        partial(TrackingManager.FeedSmart, TrackingManager.Instance()))   # 表示回调函数中的第一个参数,即订阅的消息。

    node_executor.add_node(smart_msg_subscriber)  # 添加智能消息订阅节点..
    node_executor.spin()  # 启动执行器的主循环。该函数会一直阻塞,直到执行器被停止。

    # release node before shutdown!
    TrackingManager.Instance().Release()

    rclpy.shutdown()    # 关闭ROS2 Python接口
    print("tracking node exit!")


# 测试用
if __name__ == '__main__':
    main()

smart_subscriber.py 代码如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

__author__ = 'ning'

"""
    @作者: ning
    @说明: 
"""

import rclpy
from rclpy.node import Node
from ai_msgs.msg import PerceptionTargets
from typing import Callable     # 函数类型 func


class SmartMsgSubscriber(Node):
    def __init__(self, node_name: str, smart_cb: Callable[[PerceptionTargets], None] = None):
        super().__init__(node_name=node_name)

        # main.py 传递来的 回调函数
        self.smart_cb = smart_cb

        # 声明一个ROS参数, 定义'智能订阅的话题名称'
        self.declare_parameter('ai_msg_sub_topic_name',
                               '/hobot_mono2d_body_detection')      # 默认 hobot_hand_gesture_detection 可选 hobot_mono2d_body_detection
        self.ai_msg_sub_topic_name = self.get_parameter(
            'ai_msg_sub_topic_name').value
        self.get_logger().info(
            f"Parameter:\nai_msg_sub_topic_name: {self.ai_msg_sub_topic_name}")

        # 智能订阅,其实就是可配置订阅话题名称;
        self.subscription = self.create_subscription(
            PerceptionTargets,
            self.ai_msg_sub_topic_name,
            self.topic_callback,
            10
        )

    # 智能订阅的回调函数, 输出信息后调用  smart_cb
    def topic_callback(self, msg:PerceptionTargets):
        ss = "Recved ai msg, frame_id: {}, stamp: {}_{}, targets size: {}\n".format(
            msg.header.frame_id, msg.header.stamp.sec, msg.header.stamp.nanosec, len(msg.targets))

        for tar in msg.targets:
            ss += "\t track_id: {}, type: {}, has roi num: {}".format(
                tar.track_id, tar.type, len(tar.rois))
            for roi in tar.rois:
                ss += ", roi type: {}".format(roi.type)
            ss += ", has attr num: {}".format(len(tar.attributes))
            for attr in tar.attributes:
                ss += ", attr type: {}, val: {}".format(attr.type, attr.value)
            ss += "\n"
        self.get_logger().info(ss)
        
        # 处理消息
        if self.smart_cb:
            self.smart_cb(msg)
        else:
            self.get_logger().warn("smart_cb was not set")


if __name__ == '__main__':
    rclpy.init()
    node = SmartMsgSubscriber(node_name='smart_msg_subscriber')
    rclpy.spin(node)
    rclpy.shutdown()

face_tracking.py 代码如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

__author__ = 'ning'

"""
    @作者: ning
    @说明: 人脸跟踪程序;
"""

import rclpy
import threading
import queue
from simple_pid import PID
from geometry_msgs.msg import Twist
from ai_msgs.msg import PerceptionTargets

from .common import *
from .time_helper import TimeHelper
from .robot_ctrl_node import RobotCmdVelNode


class TrackingManager:
    __instance = None  # 类变量,用于存储单例实例

    @classmethod
    def Instance(cls):
        if not cls.__instance:
            cls.__instance = cls()
        return cls.__instance

        # 释放资源
    def Release(self):
        print("TrackingManager release")
        self.start_ = False
        if self.smart_process_task_ and self.smart_process_task_.is_alive():
            self.smart_process_task_.join()
            self.smart_process_task_ = None

        self.param_node_ = None
        self.robot_cmdvel_node_ = None


    def __init__(self):
        # 跟踪器是否已启动?
        self.start_ = True

        # 实例化PID控制器
        self.pid = PID(5.0, 0.0, 0.0, setpoint=0)
        # 设置采样时间
        self.pid.sample_time = 0.1
        # 设置输出限制
        self.pid.output_limits = (-10, 10)
        # 设置pid死区
        self.offset_dead_block = 0.1

        # 实例化 common.py::TrackCfg, 存储跟踪配置信息
        self.track_cfg_ = TrackCfg()

        # TODO(ning 2023.03.12): 先解析数据, 最后再来控制小车;
        self.robot_cmdvel_node_ = RobotCmdVelNode("ob_tracking_RobotCmdVel")

        # 线程, 用于从 queue 中获取帧消息;
        self.smart_process_task_ = None
        self.queue_len_limit_ = 2
        self.smart_queue_ = queue.Queue(maxsize=self.queue_len_limit_)

        if not self.smart_process_task_:
            self.smart_process_task_ = threading.Thread(
                target=self.smart_process_task)
            self.smart_process_task_.start()


    def smart_process_task(self):
        while self.start_ and rclpy.ok():
            # 直接get,
            smart_frame = self.smart_queue_.get()
            # 执行跟踪策略
            self.RunTrackingStrategy(smart_frame)


    def FeedSmart(self, msg: PerceptionTargets):
        '''
        接收 smart_sub 的 msg
        '''
        if not rclpy.ok():
            return

        # 如果队列已满, 队列中第一帧未处理完成, 则将第一帧出栈, 保证队列中的帧是最新的;
        if self.smart_queue_.full():
            self.smart_queue_.get()

        # python 的队列是线程安全的
        self.smart_queue_.put(msg)      # 将ai msg消息放入队列中


    def RunTrackingStrategy(self, msg: PerceptionTargets):
        ''' 运行跟踪策略, 接收外部的目标检测结果作为输入。 '''
        # 开始追踪时间戳
        start_tracking_ts = TimeHelper.GetCurrentTimestampMillSec()

        self.ProcessSmart(msg)

        # 当前时间戳
        present_tracking_ts = TimeHelper.GetCurrentTimestampMillSec()

        # 输出跟踪策略花费的时间;
        rclpy.logging.get_logger('TrackingManager').info(
            "Run TrackingStrategy time ms cost: %d" % (
                present_tracking_ts - start_tracking_ts)
        )


    def ProcessSmart(self, msg: PerceptionTargets):
        # 解析 Ai msg 数据, 找出人脸, 计算PID误差值;
        err = 0.0
        for target in msg.targets:
            if target.rois[0].type == 'face':
                rect = target.rois[0].rect
                # (rect.x_offset + rect.width / 2) # 计算人脸x坐标中心点;
                # ((rect.x_offset + rect.width / 2) / track_cfg_.img_width - 0.5) * 2   # 归一化处理,将人脸坐标归一到 NDC空间 [-1, 1];
                err = float(((rect.x_offset + rect.width / 2) / self.track_cfg_.img_width - 0.5) * 2)
                break

        # 死区
        if abs(err) < self.offset_dead_block:
            err = 0

        out_ = self.pid(err)
        rclpy.logging.get_logger('TrackingManager').info("in=%.2f out=%.2f" % (err, out_)) 

        # 发布速度话题
        twist = Twist()
        twist.angular.z = out_
        self.robot_cmdvel_node_.robot_ctl(twist)


    def GetNodes(self):
        ''' 取所有节点 '''
        node_ptrs = []
        # node_ptrs.append(self.param_node_)
        node_ptrs.append(self.robot_cmdvel_node_)
        return node_ptrs


    def GetTrackCfg(self):
        ''' 取跟踪配置 '''
        return self.track_cfg_

0x06 程序设计难点

难点1:Python 多线程 Queue:

Python Queue是线程安全的不同于C++,因此可以直接使用get()、put() 而不需要使用锁Lock();

使用 多线程+消息队列 订阅 Ai msg,可以避免单线程时等待 Ai msg 产生的延迟?

难点2:PID人脸追踪器:

  • PID(proportion integration differentiation) 控制器只了P(proportion),因此为比例控制器,类似于一次函数;
  • PID(5.0, 0.0, 0.0, setpoint=0)   # P 经过调试后被设置为 5,setpoint 目标点为 0,即 NDC 空间下图像的中点;
  • pid.sample_time = 0.1              # 设置采样率可以简化 PID 的计算,推荐设置;采样率与函数的调用频率无关;
Q: 什么是 NDC 空间?
    在计算机图像中,NDC空间是指归一化设备坐标(Normalized Device Coordinates)空间。
    摄像头的分辨率各不相同,因此产生了 NDC,它可以将输入的图像归一化至 [-1 ,1]区间方便计算。
    err = float(((rect.x_offset + rect.width / 2) / self.track_cfg_.img_width - 0.5) * 2)
    本例算法中,在NDC空间下,0.5即为图像中心点。
您可以代入真实的人脸坐标,进行计算,以便理解算法;
参考:
难点3:Ai msg 消息的解析
通信接口在 \opt\tros\share\ai_msgs\msg\ 路径下:

PerceptionTargets.msg 接口如下:

# 感知结果

# 消息头
std_msgs/Header header

# 感知结果的处理帧率
# fps val is invalid if fps is less than 0
int16 fps

# 性能统计信息,比如记录每个模型推理的耗时
Perf[] perfs

# 感知目标集合
Target[] targets

# 消失目标集合
Target[] disappeared_targets

Target.msg 接口如下:

# 目标消息 Targets
# 目标类型名称,如:人、车、动物、物体
# person/car/object/animal
string type

# 目标跟踪ID号
uint64 track_id

# 检测框
Roi[] rois

# 属性
Attribute[] attributes

# 关键点
Point[] points

# 跟踪目标抓拍图、特征、特征的底库检索结果信息
Capture[] captures

Roi.msg 接口如下:

# roi感知消息
# 如:人体检测框、人头检测框、人脸检测框、人手检测框
# roi类型
# body/head/face/hand
string type

#  检测框
sensor_msgs/RegionOfInterest rect

# 检测结果的置信度
float32 confidence

BUG:经解析后,笔者认为这里有一个数据结构错误,但不影响程序使用;

PerceptionTargets.msg 中有一个 Target[] targets 数组,其中这个 target 的类型可以是人、车、动物、物体,这个 target 下有一个 Roi[] rois,这个 Roi 包含人体框、人头框、人脸框、人手框多个元素;
然而在实际使用中发现:

rois[]数组中只包含一个元素,数据伪代码如下:

target1 = {
    'type': 'person',
    'track_id': '',
    'rois': [
        {
            'type' : 'body',
            'rect':[],
            'confidence': []
        }
    ],
    'attributes': [],
    'points': [],
    'captures': []
}

target2 = {
    'type': 'person',
    'track_id': '',
    'rois': [
        {
            'type' : 'face',
            'rect':[],
            'confidence': []
        }
    ],
    'attributes': [],
    'points': [],
    'captures': []
}

target3 = {
    'type': 'person',
    'track_id': '',
    'rois': [
        {
            'type' : 'hand',
            'rect':[],
            'confidence': []
        }
    ],
    'attributes': [],
    'points': [],
    'captures': []
}

PerceptionTargets = {
    'header': '',
    'fps': '',
    'targets': [
        target1,
        target2,
        target3,
    ],
    'disappeared_targets': [

    ]
}

我认为正确的数据结构如下:

roi1 = {
    {
        'type': 'body',
        'rect': [],
        'confidence': []
    }
}
roi2 = {
    {
        'type': 'face',
        'rect': [],
        'confidence': []
    }
}
roi3 = {
    {
        'type': 'hand',
        'rect': [],
        'confidence': []
    }
}

target1 = {
    'type': 'person',
    'track_id': '',
    'rois': [
        roi1,
        roi2,
        roi3
    ],
    'attributes': [],
    'points': [],
    'captures': []
}

target2 = {
    'type': 'car',
    'track_id': '',
    'rois': [
        {
            'type': '车牌',
            'rect': [],
            'confidence': []
        }
    ],
    'attributes': [],
    'points': [],
    'captures': []
}

PerceptionTargets = {
    'header': '',
    'fps': '',
    'targets': [
        target1,
        target2,
    ],
    'disappeared_targets': [

    ]
}

 

正确 √

target.type == person

rois[人体,人脸,人手,人头, ...]

错误 ❌

target.type == person

rois[人体]

target.type == person

rois[人头]

target.type == person

rois[人脸]

0x06 编译运行

ssh连接小车,安装 py 功能包依赖库simple_pid:

1、使用 pip 安装 simple_pid 库:

pip3 install simple_pid

2、执行功能包根目录下的 .\install.sh 脚本自动安装依赖:

.\install.sh

cd 进 /userdata/dev_ws/ 目录,执行如下命令编译功能包;

colcon build --packages-select originbot_face_tracking

使用 Launch 文件启动人脸追踪功能:

ros2 launch originbot_face_tracking ob_face_tracking.launch.py

启动小车底盘:

ros2 launch originbot_bringup originbot.launch.py

enjoy it!

0x07 执行效果

O