加载中...
加载中...
在机器人技术快速发展的今天,分布式系统的实时通信能力直接影响着机器人应用的性能和可靠性。无论是多机器人协作、传感器数据融合,还是复杂的控制系统,都需要高效、可靠、实时的数据交换机制。传统的通信方案,如 TCP/IP、UDP 或简单的消息队列,往往难以同时满足实时性、可靠性和可扩展性的要求,限制了复杂机器人系统的发展。
DDS(Data Distribution Service,数据分发服务)的出现,为这一挑战提供了一个工业级的解决方案。作为由 OMG(Object Management Group)制定的实时数据分发中间件标准,DDS 不仅提供了高效的发布-订阅通信模式,还通过丰富的 QoS(Quality of Service)策略,使得开发者可以根据应用需求精确控制通信行为。这种设计使得 DDS 在航空航天、自动驾驶、机器人等对实时性要求极高的领域得到了广泛应用。
DDS 的核心价值在于其以数据为中心的架构设计。与传统的以节点为中心的通信模型不同,DDS 将数据作为系统的核心,通过主题(Topic)和 QoS 策略,实现了发布者和订阅者的完全解耦。这种设计不仅提高了系统的灵活性,还支持动态发现、自动连接和智能路由,使得分布式系统的构建和维护变得更加简单。
从技术角度来看,DDS 基于 RTPS(Real-Time Publish-Subscribe)协议,提供了去中心化的发现机制和高效的数据传输能力。它支持多种 QoS 策略,包括可靠性、持久性、历史记录、期限、延迟预算和存活性等,使得开发者可以根据不同的应用场景选择最合适的配置。更重要的是,DDS 的标准化设计确保了不同实现之间的互操作性,使得系统可以灵活地选择最适合的 DDS 实现。
在机器人领域,DDS 的应用尤为广泛。ROS2 将 DDS 作为其默认的通信中间件,通过 DDS 实现了去中心化的节点通信,摆脱了 ROS1 中依赖中央节点的限制。Unitree SDK2 采用 DDS 作为其通信基础,为机器人控制提供了低延迟、高可靠性的通信保障。在自动驾驶领域,超过 200 个车型项目采用了 DDS,用于传输传感器数据和控制命令。这些应用充分证明了 DDS 在机器人通信中的价值和重要性。
本文将带您全面深入地了解 DDS 的方方面面,从基础概念到技术架构,从 QoS 策略到实际应用,从安装配置到最佳实践。无论您是刚开始接触分布式通信的新手,还是希望深入了解 DDS 技术细节的资深开发者,都能从本文中获得有价值的知识和实践指导。我们将重点关注实用操作,提供详细的技术分析、代码示例和使用案例,帮助您在实际项目中快速应用 DDS 的强大功能。
DDS(Data Distribution Service,数据分发服务)是由 OMG(Object Management Group)制定的分布式实时通信中间件标准,旨在通过发布-订阅(Publish-Subscribe)模式实现高性能、可扩展的数据交换。DDS 的核心目标是提供一个标准化的、高性能的、可靠的实时数据分发平台,满足各种分布式应用对实时通信的需求。
DDS 的历史发展
DDS 标准的开发始于 2001 年,OMG 于 2004 年发布了 DDS 1.0 版本。随后,DDS 标准经历了多次更新和完善:
DDS 标准主要由四个部分组成:
DDS 的定位
在分布式通信生态系统中,DDS 占据了一个独特的位置。它既不是简单的消息队列(如 RabbitMQ、Kafka),也不是传统的 RPC 框架(如 gRPC、Thrift),而是一个专门为实时、可靠、可扩展的数据分发而设计的中间件标准。DDS 特别适合以下应用场景:
DDS 的应用领域
DDS 在多个领域得到了广泛应用:
DDS 遵循几个核心设计理念,这些理念指导了其架构设计和功能实现:
1. 以数据为中心(Data-Centric)
DDS 采用以数据为中心的架构,将数据作为系统的核心,而不是以节点或服务为中心。在这种架构中,发布者发布数据到主题(Topic),订阅者订阅感兴趣的主题并接收数据。这种设计使得发布者和订阅者完全解耦,提高了系统的灵活性和可扩展性。
2. 发布-订阅模式(Publish-Subscribe)
DDS 采用发布-订阅通信模式,这是一种异步的、多对多的通信模式。发布者不需要知道订阅者的存在,订阅者也不需要知道发布者的位置,它们通过主题进行连接。这种模式非常适合分布式系统,因为它支持:
3. 服务质量保证(Quality of Service)
DDS 提供了丰富的 QoS 策略,允许开发者根据应用需求精确控制通信行为。这些 QoS 策略包括:
4. 实时性优先(Real-Time First)
DDS 的设计优先考虑实时性,通过优化的协议栈和 QoS 机制,确保数据传输的低延迟和可预测性。这对于对实时性要求极高的应用(如机器人控制、自动驾驶)至关重要。
5. 标准化与互操作性
DDS 是一个开放的标准,确保了不同实现之间的互操作性。这意味着使用不同 DDS 实现的系统可以相互通信,提高了系统的灵活性和可移植性。
理解 DDS,需要理解它与相关技术的关系和区别:
DDS vs ROS1
DDS vs MQTT
DDS vs gRPC
DDS 在 ROS2 中的作用
ROS2 将 DDS 作为其默认的通信中间件,这是 ROS2 相对于 ROS1 的重要改进之一。在 ROS2 中:
DCPS(Data-Centric Publish-Subscribe)是 DDS 的核心模型,定义了数据分发的抽象接口。理解 DCPS 模型是理解 DDS 的关键。
DCPS 的核心实体
DCPS 模型定义了以下核心实体:
域参与者(DomainParticipant)
发布者(Publisher)
订阅者(Subscriber)
主题(Topic)
数据写入者(DataWriter)
数据读取者(DataReader)
DCPS 的数据流
DCPS 模型的数据流如下:
发布流程:
write() 方法发布数据订阅流程:
read() 或 take() 方法接收数据发现流程:
RTPS(Real-Time Publish-Subscribe Protocol)是 DDS 的有线互操作协议,定义了数据在网络上传输的格式和行为。RTPS 协议确保了不同 DDS 实现之间的互操作性。
RTPS 协议栈
RTPS 协议栈包括以下层次:
RTPS 发现机制
RTPS 提供了去中心化的发现机制,使得节点可以自动发现和连接。发现机制包括两个阶段:
参与者发现(Participant Discovery)
端点发现(Endpoint Discovery)
RTPS 数据传输
RTPS 定义了数据传输的格式和流程:
域(Domain)的概念
DDS 域是一个逻辑上的通信边界,属于不同域的参与者不能直接通信。域的概念使得多个独立的 DDS 应用可以在同一个物理网络上运行,而不会相互干扰。
域参与者(DomainParticipant)
域参与者是应用程序进入 DDS 域的入口点,是 DDS 应用程序的基础实体。
域参与者的作用:
创建域参与者:
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
using namespace eprosima::fastdds::dds;
// 创建域参与者,域 ID 为 0
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(
0, // 域 ID
PARTICIPANT_QOS_DEFAULT, // QoS 策略
nullptr // 监听器
);
多域通信
虽然不同域的参与者不能直接通信,但可以通过以下方式实现跨域通信:
QoS(Quality of Service)是 DDS 的核心特性之一,它允许开发者根据应用需求精确控制通信行为。DDS 提供了丰富的 QoS 策略,每个策略都可以独立配置,使得系统可以针对不同的应用场景进行优化。
QoS 的重要性
QoS 策略的重要性体现在:
QoS 策略的分类
DDS 的 QoS 策略可以分为以下几类:
QoS 策略的作用范围
QoS 策略可以在多个层次配置:
QoS 策略的匹配
DDS 使用 QoS 策略匹配来决定发布者和订阅者是否可以建立连接。只有当发布者和订阅者的 QoS 策略兼容时,它们才能建立连接。QoS 策略的匹配规则:
可靠性 QoS 策略控制数据传输的可靠性,决定数据是否保证被成功传输。
可靠性模式
DDS 提供了两种可靠性模式:
BEST_EFFORT(尽力而为)
RELIABLE(可靠)
可靠性配置
#include <fastdds/dds/publisher/DataWriter.hpp>
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>
using namespace eprosima::fastdds::dds;
// 创建数据写入者 QoS
DataWriterQos writer_qos;
writer_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
// 创建数据写入者
DataWriter* writer = publisher->create_datawriter(
topic,
writer_qos,
nullptr
);
可靠性匹配规则
持久性 QoS 策略控制数据的持久化级别,决定新加入的订阅者是否能够接收到历史数据。
持久性模式
DDS 提供了四种持久性模式:
VOLATILE(易失)
TRANSIENT_LOCAL(瞬态本地)
TRANSIENT(瞬态)
PERSISTENT(持久)
持久性配置
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>
using namespace eprosima::fastdds::dds;
// 配置为瞬态本地持久性
DataWriterQos writer_qos;
writer_qos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS;
// 配置历史记录深度(保留最新的 10 个样本)
writer_qos.history().kind = KEEP_LAST_HISTORY_QOS;
writer_qos.history().depth = 10;
持久性匹配规则
历史记录 QoS 策略控制 DDS 缓存保留多少历史样本。
历史记录模式
DDS 提供了两种历史记录模式:
KEEP_LAST(保留最新的 N 个)
KEEP_ALL(保留所有)
历史记录配置
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>
using namespace eprosima::fastdds::dds;
// 配置历史记录策略
DataWriterQos writer_qos;
// 保留最新的 10 个样本
writer_qos.history().kind = KEEP_LAST_HISTORY_QOS;
writer_qos.history().depth = 10;
// 或者保留所有样本
writer_qos.history().kind = KEEP_ALL_HISTORY_QOS;
历史记录与持久性的关系
期限 QoS 策略规定发布者发布数据的周期,订阅者可以监控发布者是否按期发布数据。
期限的作用
期限配置
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>
#include <fastrtps/utils/Time.hpp>
using namespace eprosima::fastdds::dds;
using namespace eprosima::fastrtps;
// 配置期限为 100 毫秒
DataWriterQos writer_qos;
writer_qos.deadline().period.seconds = 0;
writer_qos.deadline().period.nanosec = 100000000; // 100ms
期限监控
订阅者可以监听期限违约事件:
#include <fastdds/dds/subscriber/DataReaderListener.hpp>
class MyListener : public DataReaderListener {
public:
void on_requested_deadline_missed(
DataReader* reader,
const RequestedDeadlineMissedStatus& status) override {
// 处理期限违约
std::cout << "Deadline missed!" << std::endl;
}
};
延迟预算 QoS 策略提示数据传输的期望延迟,有助于中间件进行优化。
延迟预算的作用
延迟预算配置
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>
using namespace eprosima::fastdds::dds;
// 配置延迟预算为 50 毫秒
DataWriterQos writer_qos;
writer_qos.latency_budget().duration.seconds = 0;
writer_qos.latency_budget().duration.nanosec = 50000000; // 50ms
存活性 QoS 策略用于监测发布者或订阅者的活跃状态,确保通信双方的存活性。
存活性模式
DDS 提供了三种存活性模式:
AUTOMATIC(自动)
MANUAL_BY_PARTICIPANT(按参与者手动)
MANUAL_BY_TOPIC(按主题手动)
存活性配置
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>
using namespace eprosima::fastdds::dds;
// 配置存活性策略
DataWriterQos writer_qos;
writer_qos.liveliness().kind = AUTOMATIC_LIVELINESS_QOS;
writer_qos.liveliness().lease_duration.seconds = 5; // 5 秒租约
存活性监控
订阅者可以监听存活性变化事件:
#include <fastdds/dds/subscriber/DataReaderListener.hpp>
class MyListener : public DataReaderListener {
public:
void on_liveliness_changed(
DataReader* reader,
const LivelinessChangedStatus& status) override {
// 处理存活性变化
std::cout << "Liveliness changed!" << std::endl;
}
};
除了上述 QoS 策略,DDS 还提供了其他重要的 QoS 策略:
资源限制(ResourceLimits)
控制资源的分配和使用:
DataWriterQos writer_qos;
writer_qos.resource_limits().max_samples = 1000;
writer_qos.resource_limits().max_instances = 10;
writer_qos.resource_limits().max_samples_per_instance = 100;
所有权(Ownership)
控制数据的所有权:
DataWriterQos writer_qos;
writer_qos.ownership().kind = EXCLUSIVE_OWNERSHIP_QOS;
writer_qos.ownership_strength().value = 10;
时间过滤(TimeBasedFilter)
根据时间过滤数据:
DataReaderQos reader_qos;
reader_qos.time_based_filter().minimum_separation.seconds = 1;
reader_qos.time_based_filter().minimum_separation.nanosec = 0;
ROS2(Robot Operating System 2)将 DDS 作为其默认的通信中间件,这是 ROS2 相对于 ROS1 的重要改进之一。
ROS2 的 DDS 中间件
ROS2 支持多种 DDS 实现,开发者可以根据需求选择合适的实现:
Fast DDS(eProsima)
Cyclone DDS(Eclipse)
RTI Connext DDS
OpenSplice DDS
ROS2 与 DDS 的集成
在 ROS2 中,DDS 提供了底层的通信能力:
ROS2 QoS 配置示例
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
using namespace std::chrono_literals;
class PublisherNode : public rclcpp::Node {
public:
PublisherNode() : Node("publisher_node") {
// 配置 QoS
rclcpp::QoS qos(10);
qos.reliability(RMW_QOS_POLICY_RELIABILITY_RELIABLE);
qos.durability(RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL);
// 创建发布者
publisher_ = this->create_publisher<std_msgs::msg::String>(
"topic", qos);
// 定时发布
timer_ = this->create_wall_timer(
500ms, std::bind(&PublisherNode::timer_callback, this));
}
private:
void timer_callback() {
auto message = std_msgs::msg::String();
message.data = "Hello, ROS2!";
publisher_->publish(message);
}
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
rclcpp::TimerBase::SharedPtr timer_;
};
ROS2 DDS 的优势
Unitree SDK2 采用 DDS 作为其通信基础,为机器人控制提供了低延迟、高可靠性的通信保障。
SDK2 的 DDS 通信架构
Unitree SDK2 使用 DDS 实现以下通信模式:
SDK2 的 DDS 实现
SDK2 基于 DDS 实现了分层的 API:
SDK2 DDS 通信示例
#include <unitree/robot/channel/channel_publisher.hpp>
#include <unitree/common/time/time_tool.hpp>
int main() {
// 创建发布者
unitree::robot::ChannelPublisher<MyDataType> publisher;
publisher.Init("my_topic_name");
// 准备数据
MyDataType data;
data.value = 123;
data.timestamp = unitree::common::TimeTool::GetCurrentTime();
// 发布数据
publisher.Write(data);
return 0;
}
SDK2 DDS 的优势
在自动驾驶领域,DDS 被广泛应用于传感器数据分发、控制命令传输和多 ECU 通信。
传感器数据分发
自动驾驶系统需要处理来自多种传感器的数据:
DDS 通过发布-订阅模式,使得这些传感器数据可以在不同的 ECU 之间高效分发。
控制命令传输
自动驾驶系统需要将控制命令传输到执行器:
DDS 的可靠性 QoS 确保了这些关键控制命令的可靠传输。
多 ECU 通信
自动驾驶系统包含多个 ECU(Electronic Control Unit),DDS 提供了高效的跨 ECU 通信能力:
自动驾驶 DDS 应用案例
超过 200 个车型项目采用了 DDS,包括:
工业机器人
在工业机器人系统中,DDS 用于:
无人机系统
在无人机系统中,DDS 用于:
多机器人协作
在多机器人协作系统中,DDS 用于:
DDS 是一个标准,有多种实现可供选择。选择合适的实现对于项目的成功至关重要。
主要 DDS 实现对比
Fast DDS(eProsima)
Cyclone DDS(Eclipse)
RTI Connext DDS
OpenSplice DDS
选择建议
系统要求
Ubuntu 安装
# 安装依赖
sudo apt update
sudo apt install -y \
cmake \
g++ \
libasio-dev \
libtinyxml2-dev \
libssl-dev \
libp11-kit-dev \
libengine-pkcs11-openssl
# 从源码编译安装
git clone https://github.com/eProsima/Fast-DDS.git
cd Fast-DDS
mkdir build && cd build
cmake ..
make -j$(nproc)
sudo make install
ROS2 中的 Fast DDS
如果使用 ROS2,Fast DDS 通常已经包含在 ROS2 安装中:
# 安装 ROS2(包含 Fast DDS)
sudo apt install ros-humble-desktop
# 设置环境变量使用 Fast DDS
export RMW_IMPLEMENTATION=rmw_fastrtps_cpp
配置文件
Fast DDS 使用 XML 配置文件进行配置:
<?xml version="1.0" encoding="UTF-8" ?>
<profiles xmlns="http://www.eprosima.com/XMLSchemas/fastRTPS_Profiles">
<participant profile_name="default">
<rtps>
<builtin>
<discovery_config>
<discoveryProtocol>SIMPLE</discoveryProtocol>
</discovery_config>
</builtin>
</rtps>
</participant>
</profiles>
C++ 发布者示例
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/publisher/Publisher.hpp>
#include <fastdds/dds/publisher/DataWriter.hpp>
#include <fastdds/dds/topic/Topic.hpp>
#include <iostream>
#include <thread>
#include <chrono>
using namespace eprosima::fastdds::dds;
// 定义数据类型
struct HelloWorld {
uint32_t index;
std::string message;
HelloWorld() : index(0) {}
HelloWorld(uint32_t i, const std::string& msg)
: index(i), message(msg) {}
};
int main() {
// 创建域参与者
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(
0, PARTICIPANT_QOS_DEFAULT);
if (participant == nullptr) {
std::cerr << "Failed to create participant" << std::endl;
return 1;
}
// 注册数据类型(简化示例,实际需要使用 IDL 生成类型支持)
// TypeSupport type(new HelloWorldPubSubType());
// type.register_type(participant);
// 创建发布者
Publisher* publisher = participant->create_publisher(
PUBLISHER_QOS_DEFAULT, nullptr);
if (publisher == nullptr) {
std::cerr << "Failed to create publisher" << std::endl;
return 1;
}
// 创建主题
Topic* topic = participant->create_topic(
"HelloWorldTopic",
"HelloWorld",
TOPIC_QOS_DEFAULT);
if (topic == nullptr) {
std::cerr << "Failed to create topic" << std::endl;
return 1;
}
// 创建数据写入者
DataWriter* writer = publisher->create_datawriter(
topic,
DATAWRITER_QOS_DEFAULT,
nullptr);
if (writer == nullptr) {
std::cerr << "Failed to create datawriter" << std::endl;
return 1;
}
// 发布数据
for (uint32_t i = 0; i < 10; ++i) {
HelloWorld msg(i, "Hello, DDS!");
// 写入数据(实际需要使用类型支持)
// writer->write(&msg);
std::cout << "Published: index=" << msg.index
<< ", message=" << msg.message << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
// 清理资源
publisher->delete_datawriter(writer);
participant->delete_publisher(publisher);
participant->delete_topic(topic);
DomainParticipantFactory::get_instance()->delete_participant(participant);
return 0;
}
C++ 订阅者示例
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/subscriber/Subscriber.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/topic/Topic.hpp>
#include <iostream>
#include <thread>
#include <chrono>
using namespace eprosima::fastdds::dds;
// 定义数据类型(与发布者相同)
struct HelloWorld {
uint32_t index;
std::string message;
};
int main() {
// 创建域参与者
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(
0, PARTICIPANT_QOS_DEFAULT);
// 创建订阅者
Subscriber* subscriber = participant->create_subscriber(
SUBSCRIBER_QOS_DEFAULT, nullptr);
// 创建主题
Topic* topic = participant->create_topic(
"HelloWorldTopic",
"HelloWorld",
TOPIC_QOS_DEFAULT);
// 创建数据读取者
DataReader* reader = subscriber->create_datareader(
topic,
DATAREADER_QOS_DEFAULT,
nullptr);
// 读取数据
for (int i = 0; i < 10; ++i) {
// 实际需要使用类型支持读取数据
// HelloWorld msg;
// if (reader->take_next_sample(&msg, &info) == ReturnCode_t::RETCODE_OK) {
// std::cout << "Received: index=" << msg.index
// << ", message=" << msg.message << std::endl;
// }
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// 清理资源
subscriber->delete_datareader(reader);
participant->delete_subscriber(subscriber);
participant->delete_topic(topic);
DomainParticipantFactory::get_instance()->delete_participant(participant);
return 0;
}
Python 示例
import fastdds
import time
# 创建域参与者
factory = fastdds.DomainParticipantFactory.get_instance()
participant = factory.create_participant(0, fastdds.PARTICIPANT_QOS_DEFAULT)
# 创建发布者
publisher = participant.create_publisher(fastdds.PUBLISHER_QOS_DEFAULT)
# 创建主题
topic = participant.create_topic(
"HelloWorldTopic",
"HelloWorld",
fastdds.TOPIC_QOS_DEFAULT)
# 创建数据写入者
writer = publisher.create_datawriter(topic, fastdds.DATAWRITER_QOS_DEFAULT)
# 发布数据
for i in range(10):
# 实际需要使用 IDL 生成的数据类型
# msg = HelloWorld()
# msg.index = i
# msg.message = "Hello, DDS!"
# writer.write(msg)
print(f"Published: index={i}")
time.sleep(0.5)
# 清理资源
publisher.delete_datawriter(writer)
participant.delete_publisher(publisher)
participant.delete_topic(topic)
factory.delete_participant(participant)
自定义 QoS 配置
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>
using namespace eprosima::fastdds::dds;
// 创建自定义 QoS
DataWriterQos writer_qos;
// 配置可靠性
writer_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
// 配置持久性
writer_qos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS;
// 配置历史记录
writer_qos.history().kind = KEEP_LAST_HISTORY_QOS;
writer_qos.history().depth = 10;
// 配置期限
writer_qos.deadline().period.seconds = 0;
writer_qos.deadline().period.nanosec = 100000000; // 100ms
// 使用自定义 QoS 创建数据写入者
DataWriter* writer = publisher->create_datawriter(
topic,
writer_qos,
nullptr);
多主题通信
// 创建多个主题
Topic* topic1 = participant->create_topic("Topic1", "Type1", TOPIC_QOS_DEFAULT);
Topic* topic2 = participant->create_topic("Topic2", "Type2", TOPIC_QOS_DEFAULT);
// 为每个主题创建数据写入者
DataWriter* writer1 = publisher->create_datawriter(topic1, DATAWRITER_QOS_DEFAULT, nullptr);
DataWriter* writer2 = publisher->create_datawriter(topic2, DATAWRITER_QOS_DEFAULT, nullptr);
// 发布到不同主题
// writer1->write(&data1);
// writer2->write(&data2);
错误处理
#include <fastdds/dds/core/status/StatusMask.hpp>
// 创建监听器处理错误
class MyListener : public DataWriterListener {
public:
void on_offered_deadline_missed(
DataWriter* writer,
const OfferedDeadlineMissedStatus& status) override {
std::cerr << "Deadline missed!" << std::endl;
}
void on_offered_incompatible_qos(
DataWriter* writer,
const OfferedIncompatibleQosStatus& status) override {
std::cerr << "Incompatible QoS!" << std::endl;
}
};
// 使用监听器
MyListener listener;
DataWriter* writer = publisher->create_datawriter(
topic,
DATAWRITER_QOS_DEFAULT,
&listener);
架构对比
| 特性 | ROS1 | DDS |
|---|---|---|
| 通信模式 | 节点-话题 | 发布-订阅 |
| 中央节点 | 需要 roscore | 不需要 |
| 发现机制 | 中央注册 | 去中心化自动发现 |
| 实时性 | 中等 | 高 |
| 可靠性 | TCP/UDP | 可配置 QoS |
| 跨语言 | 有限 | 广泛支持 |
性能对比
适用场景
互操作性
ROS2 基于 DDS,可以通过桥接节点实现 ROS1 和 ROS2 之间的通信。
协议对比
| 特性 | MQTT | DDS |
|---|---|---|
| 通信模式 | 客户端-服务器 | 发布-订阅(去中心化) |
| 消息代理 | 需要 Broker | 不需要 |
| QoS 级别 | 3 个简单级别 | 丰富的 QoS 策略 |
| 实时性 | 中等 | 高 |
| 资源消耗 | 低 | 中等 |
| 适用场景 | IoT、轻量级应用 | 高性能实时应用 |
QoS 对比
适用场景
通信模式对比
| 特性 | gRPC | DDS |
|---|---|---|
| 通信模式 | 客户端-服务器 | 发布-订阅 |
| 数据交换 | 请求-响应 | 持续数据流 |
| 协议 | HTTP/2 | RTPS |
| 实时性 | 中等 | 高 |
| 适用场景 | 微服务、API 调用 | 实时数据分发 |
性能对比
适用场景
消息模式对比
| 特性 | ZeroMQ | DDS |
|---|---|---|
| 消息模式 | 多种模式 | 发布-订阅 |
| 发现机制 | 手动配置 | 自动发现 |
| QoS 支持 | 有限 | 丰富 |
| 标准化 | 非标准 | OMG 标准 |
| 适用场景 | 通用消息传递 | 实时数据分发 |
性能对比
适用场景
不同应用场景的 QoS 配置
实时传感器数据流
DataWriterQos qos;
qos.reliability().kind = BEST_EFFORT_RELIABILITY_QOS;
qos.durability().kind = VOLATILE_DURABILITY_QOS;
qos.history().kind = KEEP_LAST_HISTORY_QOS;
qos.history().depth = 1;
控制命令
DataWriterQos qos;
qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
qos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS;
qos.history().kind = KEEP_LAST_HISTORY_QOS;
qos.history().depth = 10;
状态信息
DataWriterQos qos;
qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
qos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS;
qos.history().kind = KEEP_LAST_HISTORY_QOS;
qos.history().depth = 1;
配置数据
DataWriterQos qos;
qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
qos.durability().kind = PERSISTENT_DURABILITY_QOS;
qos.history().kind = KEEP_ALL_HISTORY_QOS;
QoS 匹配原则
减少延迟
提高吞吐量
资源管理
常见问题
连接失败
数据丢失
延迟过高
调试工具
fastdds discovery、fastdds shm 等性能分析
// 启用统计信息
DataWriterQos qos;
qos.publish_mode().kind = ASYNCHRONOUS_PUBLISH_MODE;
qos.writer_data_lifecycle().autodispose_unregistered_instances = false;
// 获取统计信息
PublicationMatchedStatus status;
writer->get_publication_matched_status(status);
std::cout << "Matched subscribers: " << status.current_count << std::endl;
DDS 安全扩展
DDS 提供了安全扩展(DDS Security),包括:
网络安全
数据加密
// 配置加密(需要 DDS Security)
DataWriterQos qos;
// 配置安全属性
// qos.properties().properties().push_back(...);
DDS 作为工业级的实时数据分发标准,具有以下主要优势:
1. 标准化
DDS 是 OMG 制定的开放标准,确保了不同实现之间的互操作性。这使得系统可以灵活地选择最适合的 DDS 实现,也可以在不同实现之间切换。
2. 高性能
DDS 通过优化的协议栈和 QoS 机制,提供了低延迟、高吞吐量的通信性能。这对于对实时性要求极高的应用(如机器人控制、自动驾驶)至关重要。
3. 丰富的 QoS 策略
DDS 提供了丰富的 QoS 策略,允许开发者根据应用需求精确控制通信行为。这些 QoS 策略包括可靠性、持久性、历史记录、期限、延迟预算、存活性等。
4. 去中心化架构
DDS 采用去中心化的发布-订阅架构,不需要中央节点。这种设计提高了系统的可靠性和可扩展性,也简化了系统的部署和维护。
5. 自动发现
DDS 提供了自动发现机制,节点可以自动发现和连接,不需要手动配置。这简化了分布式系统的构建和维护。
6. 跨平台支持
DDS 支持多种操作系统和编程语言,包括 Linux、Windows、macOS 和 C++、Java、Python、C# 等。
尽管 DDS 具有许多优势,但也存在一些局限性:
1. 复杂性
DDS 的概念和 API 相对复杂,学习曲线较陡。特别是 QoS 策略的配置和理解需要一定的经验。
2. 资源消耗
DDS 的资源消耗相对较高,特别是在使用可靠性和持久性 QoS 时。这对于资源受限的环境可能不太适合。
3. 生态系统
虽然 DDS 在特定领域(如机器人、自动驾驶)有广泛应用,但相比一些更通用的消息中间件(如 MQTT),DDS 的生态系统相对较小。
4. 文档和社区
虽然主要的 DDS 实现都有文档和社区支持,但相比一些更成熟的技术,DDS 的文档和社区资源相对较少。
DDS 技术正在不断发展,未来的发展方向包括:
1. 性能优化
DDS 实现正在不断优化性能,包括减少延迟、提高吞吐量、降低资源消耗等。
2. 功能扩展
DDS 标准正在扩展新的功能,包括更好的安全支持、更多的 QoS 策略、更好的工具支持等。
3. 易用性改进
DDS 实现正在改进易用性,包括更简单的 API、更好的文档、更多的示例代码等。
4. 云原生支持
DDS 正在向云原生方向发展,支持容器化部署、微服务架构等。
5. 边缘计算
DDS 在边缘计算领域的应用正在增加,包括 IoT、工业 4.0 等。
官方文档
教程和示例
社区和论坛
书籍
DDS 作为工业级的实时数据分发标准,在机器人、自动驾驶、航空航天等领域发挥着重要作用。通过本文的全面介绍,我们希望您能够深入理解 DDS 的核心概念、技术架构、QoS 策略和实际应用。
无论您是刚开始接触 DDS 的新手,还是希望深入了解 DDS 技术细节的资深开发者,DDS 都为您提供了一个强大、灵活、可靠的实时通信平台。通过合理使用 DDS 的 QoS 策略和最佳实践,您可以构建出高性能、高可靠性的分布式实时系统。
随着 DDS 技术的不断发展和完善,我们相信 DDS 将在更多领域得到应用,为分布式实时系统的发展做出更大的贡献。希望本文能够帮助您在 DDS 的学习和应用中取得成功。
发表评论
请登录后发表评论
评论 (0)