[FASTDDS]04-hello_fastdds程序分解 在之前的章节我们介绍了如何搭建 fastdds 编译环境,以及使用 fastgen 生成 demo 代码。这里我们将在前面的基础上,参考 fastdds 官方的另外一份 Writing a simpl
[FASTDDS]04-hello_fastdds程序分解
在之前的章节我们介绍了如何搭建
fastdds
编译环境,以及使用fastgen
生成demo
代码。这里我们将在前面的基础上,参考fastdds
官方的另外一份 Writing a simple C++ publisher and subscriber application,来对fastdds
的发布者和订阅者代码进行分解。
生成工程代码
我们依旧使用fastgen
生成要发送消息的相关接口,如下:
// HelloWorld.idl
struct HelloWorld
{
unsigned long index;
string message;
};
<path/to/Fast DDS-Gen>/scripts/fastddsgen HelloWorld.idl
将会至少生成以下文件:
- HelloWorld.cxx: HelloWorld 类型的定义.
- HelloWorld.h: HelloWorld.cxx 的头文件.
- HelloWorldPubSubTypes.cxx: HelloWorld 类型序列化和反序列化的实现.
- HelloWorldPubSubTypes.h: HelloWorldPubSubTypes.cxx 头文件.
发布者代码分解
可以通过以下命令获取发布者的示例代码:
wget -O HelloWorldPublisher.cpp \
https://raw.githubusercontent.com/eProsima/Fast-RTPS-docs/master/code/Examples/C++/DDSHelloWorld/src/HelloWorldPublisher.cpp
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/**
* @file HelloWorldPublisher.cpp
*
*/
#include "HelloWorldPubSubTypes.h"
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/dds/publisher/Publisher.hpp>
#include <fastdds/dds/publisher/DataWriter.hpp>
#include <fastdds/dds/publisher/DataWriterListener.hpp>
using namespace eprosima::fastdds::dds;
class HelloWorldPublisher
{
private:
HelloWorld hello_; // 要发送的数据结构
DomainParticipant* participant_; // domain 参与者,暂未知道该对象的作用
Publisher* publisher_; // 发布监听者,用于判断是否可以进行发布。根据示例代码,是注册一个回调函数,当可以进行发布的时候进行回调,在该回调用我们可以使用自定义的方式来通知发布者
Topic* topic_; // topic,目前看,topic是以字符串定义名字进行区分的
DataWriter* writer_; // 发布者发布数据时使用
TypeSupport type_; // 用于注册helloworld的序列化类
class PubListener : public DataWriterListener
{
public:
PubListener()
: matched_(0)
{
}
~PubListener() override
{
}
// dds在协议栈准备好写条件的时候,会通过该回调进行通知
void on_publication_matched(
DataWriter*,
const PublicationMatchedStatus& info) override
{
if (info.current_count_change == 1)
{
matched_ = info.total_count;
std::cout << "Publisher matched." << std::endl;
}
else if (info.current_count_change == -1)
{
matched_ = info.total_count;
std::cout << "Publisher unmatched." << std::endl;
}
else
{
std::cout << info.current_count_change
<< " is not a valid value for PublicationMatchedStatus current count change." << std::endl;
}
}
std::atomic_int matched_;// 原子变量,用于同步状态。发布者会根据这跟状态来判断是否可以发布信息。这里只是示例,也可以用其他方式进行同步。
} listener_;
public:
HelloWorldPublisher()
: participant_(nullptr)
, publisher_(nullptr)
, topic_(nullptr)
, writer_(nullptr)
, type_(new HelloWorldPubSubType())
{
}
virtual ~HelloWorldPublisher()
{
if (writer_ != nullptr)
{
publisher_->delete_datawriter(writer_);
}
if (publisher_ != nullptr)
{
participant_->delete_publisher(publisher_);
}
if (topic_ != nullptr)
{
participant_->delete_topic(topic_);
}
DomainParticipantFactory::get_instance()->delete_participant(participant_);
}
//!Initialize the publisher
bool init()
{
hello_.index(0);
hello_.message("HelloWorld");
DomainParticipantQos participantQos; // Qos
participantQos.name("Participant_publisher");
participant_ = DomainParticipantFactory::get_instance()->create_participant(0, participantQos);// 创建 domain 的 participant
if (participant_ == nullptr)
{
return false;
}
// Register the Type
type_.register_type(participant_);
// Create the publications Topic
topic_ = participant_->create_topic("HelloWorldTopic", "HelloWorld", TOPIC_QOS_DEFAULT);// 创建 topic,参数:topicname,typename,qos
if (topic_ == nullptr)
{
return false;
}
// Create the Publisher
publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT, nullptr);
if (publisher_ == nullptr)
{
return false;
}
// Create the DataWriter
writer_ = publisher_->create_datawriter(topic_, DATAWRITER_QOS_DEFAULT, &listener_);
if (writer_ == nullptr)
{
return false;
}
return true;
}
//!Send a publication
bool publish()
{
if (listener_.matched_ > 0)
{
hello_.index(hello_.index() + 1);
writer_->write(&hello_);
return true;
}
return false;
}
//!Run the Publisher
void run(
uint32_t samples)
{
uint32_t samples_sent = 0;
while (samples_sent < samples)
{
if (publish())
{
samples_sent++;
std::cout << "Message: " << hello_.message() << " with index: " << hello_.index()
<< " SENT" << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
};
int main(
int argc,
char** argv)
{
std::cout << "Starting publisher." << std::endl;
int samples = 10;
HelloWorldPublisher* mypub = new HelloWorldPublisher();
if(mypub->init())
{
mypub->run(static_cast<uint32_t>(samples));
}
delete mypub;
return 0;
}
从代码看,要发布一个消息,需要满足以下条件:
-
- 创建要发布数据的
idl
,并通过fast-gen
生成对应的数据结构,并生成该数据结构序列化的类
- 创建要发布数据的
-
- 通过
DomainParticipantFactory
创建participant
,参数为domain id
和qos
- 通过
-
- 将
participant
注册到type
,该类型提供helloworld
的序列化支持
- 将
-
- 通过
participant
创建publisher_
,用于发布数据
- 通过
-
PubListener
继承于DataWriterListener
,用于给发布者注册消息通知函数。DDS
通过服务发现,将消息最终给到应用
-
publisher_
创建writer_
,用于最终的消息发布
相关类的基本功能如下:
- DomainParticipantFactory:创建和销毁
DomainParticipant
对象 - DomainParticipant:作为其他所有实例对象的容器,并扮演
Publisher
、Subscriber
、Topic
对象的工厂 - TypeSupport:通过注册
topic
对应的数据结构,提供序列化以及获取指定数据类型的key
支持 - Publisher:负责创建
DataWriters
. - DataWriter:为应用提供通过指定
topic
发布数据的能力 - DataWriterListener:提供重写
DataWriterListener
的接口,子类可以在on_publication_matched
中通过传递的PublicationMatchedStatus
来判断是否满足发布条件
以上功能依赖dds
的以下库:
fastcdr
:提供序列化功能fastrtps
:提供数据交互功能
订阅者代码分解
wget -O HelloWorldSubscriber.cpp \
https://raw.githubusercontent.com/eProsima/Fast-RTPS-docs/master/code/Examples/C++/DDSHelloWorld/src/HelloWorldSubscriber.cpp
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/**
* @file HelloWorldSubscriber.cpp
*
*/
#include "HelloWorldPubSubTypes.h"
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/dds/subscriber/Subscriber.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/subscriber/DataReaderListener.hpp>
#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>
#include <fastdds/dds/subscriber/SampleInfo.hpp>
using namespace eprosima::fastdds::dds;
class HelloWorldSubscriber
{
private:
DomainParticipant* participant_;
Subscriber* subscriber_;
DataReader* reader_;
Topic* topic_;
TypeSupport type_;
class SubListener : public DataReaderListener
{
public:
SubListener()
: samples_(0)
{
}
~SubListener() override
{
}
void on_subscription_matched(
DataReader*,
const SubscriptionMatchedStatus& info) override
{
if (info.current_count_change == 1)
{
std::cout << "Subscriber matched." << std::endl;
}
else if (info.current_count_change == -1)
{
std::cout << "Subscriber unmatched." << std::endl;
}
else
{
std::cout << info.current_count_change
<< " is not a valid value for SubscriptionMatchedStatus current count change" << std::endl;
}
}
void on_data_available(
DataReader* reader) override
{
SampleInfo info;
if (reader->take_next_sample(&hello_, &info) == ReturnCode_t::RETCODE_OK)
{
if (info.valid_data)
{
samples_++;
std::cout << "Message: " << hello_.message() << " with index: " << hello_.index()
<< " RECEIVED." << std::endl;
}
}
}
HelloWorld hello_;
std::atomic_int samples_;
} listener_;
public:
HelloWorldSubscriber()
: participant_(nullptr)
, subscriber_(nullptr)
, topic_(nullptr)
, reader_(nullptr)
, type_(new HelloWorldPubSubType())
{
}
virtual ~HelloWorldSubscriber()
{
if (reader_ != nullptr)
{
subscriber_->delete_datareader(reader_);
}
if (topic_ != nullptr)
{
participant_->delete_topic(topic_);
}
if (subscriber_ != nullptr)
{
participant_->delete_subscriber(subscriber_);
}
DomainParticipantFactory::get_instance()->delete_participant(participant_);
}
//!Initialize the subscriber
bool init()
{
DomainParticipantQos participantQos;
participantQos.name("Participant_subscriber");
participant_ = DomainParticipantFactory::get_instance()->create_participant(0, participantQos);
if (participant_ == nullptr)
{
return false;
}
// Register the Type
type_.register_type(participant_);
// Create the subscriptions Topic
topic_ = participant_->create_topic("HelloWorldTopic", "HelloWorld", TOPIC_QOS_DEFAULT);
if (topic_ == nullptr)
{
return false;
}
// Create the Subscriber
subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT, nullptr);
if (subscriber_ == nullptr)
{
return false;
}
// Create the DataReader
reader_ = subscriber_->create_datareader(topic_, DATAREADER_QOS_DEFAULT, &listener_);
if (reader_ == nullptr)
{
return false;
}
return true;
}
//!Run the Subscriber
void run(
uint32_t samples)
{
while(listener_.samples_ < samples)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
};
int main(
int argc,
char** argv)
{
std::cout << "Starting subscriber." << std::endl;
int samples = 10;
HelloWorldSubscriber* mysub = new HelloWorldSubscriber();
if(mysub->init())
{
mysub->run(static_cast<uint32_t>(samples));
}
delete mysub;
return 0;
}
DomainParticipantFactory
、DomainParticipant
、TypeSupport
是发布者和订阅者都会使用的类。和发布者不一样的是,订阅者将Publisher
改为了Subscriber
,将DataWriter
改为了DataReader
,将DataWriterListener
改为了DataReaderListener
、将DataWriterQos
该为了DataReaderQos
。
- Subscriber:负责创建和配置
DataReader
- DataReader.:负责数据的实际接收。它注册与
topic
绑定的数据,该topic
用于标记数据并可以读并访问订阅者接收的数据 - DataReaderListener:这是分配给数据读取者的监听器
- DataReaderQoS:定义
DataReader
的Qos
- SampleInfo:读取或者采集样本的附带信息