当前位置 : 主页 > 编程语言 > c语言 >

[FASTDDS]04-hello_fastdds程序分解

来源:互联网 收集:自由互联 发布时间:2023-09-03
[FASTDDS]04-hello_fastdds程序分解 在之前的章节我们介绍了如何搭建 fastdds 编译环境,以及使用 fastgen 生成 demo 代码。这里我们将在前面的基础上,参考 fastdds 官方的另外一份 Writing a simpl

[FASTDDS]04-hello_fastdds程序分解

在之前的章节我们介绍了如何搭建fastdds编译环境,以及使用fastgen生成demo代码。这里我们将在前面的基础上,参考fastdds官方的另外一份 Writing a simple C++ publisher and subscriber application,来对fastdds的发布者和订阅者代码进行分解。

生成工程代码

我们依旧使用fastgen生成要发送消息的相关接口,如下:

// HelloWorld.idl
struct HelloWorld
{
    unsigned long index;
    string message;
};
<path/to/Fast DDS-Gen>/scripts/fastddsgen HelloWorld.idl

将会至少生成以下文件:

  • HelloWorld.cxx: HelloWorld 类型的定义.
  • HelloWorld.h: HelloWorld.cxx 的头文件.
  • HelloWorldPubSubTypes.cxx: HelloWorld 类型序列化和反序列化的实现.
  • HelloWorldPubSubTypes.h: HelloWorldPubSubTypes.cxx 头文件.

发布者代码分解

可以通过以下命令获取发布者的示例代码:

wget -O HelloWorldPublisher.cpp \
    https://raw.githubusercontent.com/eProsima/Fast-RTPS-docs/master/code/Examples/C++/DDSHelloWorld/src/HelloWorldPublisher.cpp
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
 * @file HelloWorldPublisher.cpp
 *
 */

#include "HelloWorldPubSubTypes.h"

#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/dds/publisher/Publisher.hpp>
#include <fastdds/dds/publisher/DataWriter.hpp>
#include <fastdds/dds/publisher/DataWriterListener.hpp>

using namespace eprosima::fastdds::dds;

class HelloWorldPublisher
{
private:
    HelloWorld hello_;                    // 要发送的数据结构
    DomainParticipant* participant_;      // domain 参与者,暂未知道该对象的作用
    Publisher* publisher_;                // 发布监听者,用于判断是否可以进行发布。根据示例代码,是注册一个回调函数,当可以进行发布的时候进行回调,在该回调用我们可以使用自定义的方式来通知发布者
    Topic* topic_;                        // topic,目前看,topic是以字符串定义名字进行区分的
    DataWriter* writer_;                  // 发布者发布数据时使用
    TypeSupport type_;                    // 用于注册helloworld的序列化类

    class PubListener : public DataWriterListener
    {
    public:

        PubListener()
            : matched_(0)
        {
        }

        ~PubListener() override
        {
        }

        // dds在协议栈准备好写条件的时候,会通过该回调进行通知
        void on_publication_matched(
                DataWriter*,
                const PublicationMatchedStatus& info) override
        {
            if (info.current_count_change == 1)
            {
                matched_ = info.total_count;
                std::cout << "Publisher matched." << std::endl;
            }
            else if (info.current_count_change == -1)
            {
                matched_ = info.total_count;
                std::cout << "Publisher unmatched." << std::endl;
            }
            else
            {
                std::cout << info.current_count_change
                        << " is not a valid value for PublicationMatchedStatus current count change." << std::endl;
            }
        }

        std::atomic_int matched_;// 原子变量,用于同步状态。发布者会根据这跟状态来判断是否可以发布信息。这里只是示例,也可以用其他方式进行同步。

    } listener_;

public:

    HelloWorldPublisher()
        : participant_(nullptr)
        , publisher_(nullptr)
        , topic_(nullptr)
        , writer_(nullptr)
        , type_(new HelloWorldPubSubType())
    {
    }

    virtual ~HelloWorldPublisher()
    {
        if (writer_ != nullptr)
        {
            publisher_->delete_datawriter(writer_);
        }
        if (publisher_ != nullptr)
        {
            participant_->delete_publisher(publisher_);
        }
        if (topic_ != nullptr)
        {
            participant_->delete_topic(topic_);
        }
        DomainParticipantFactory::get_instance()->delete_participant(participant_);
    }

    //!Initialize the publisher
    bool init()
    {
        hello_.index(0);
        hello_.message("HelloWorld");

        DomainParticipantQos participantQos; // Qos
        participantQos.name("Participant_publisher");
        participant_ = DomainParticipantFactory::get_instance()->create_participant(0, participantQos);// 创建 domain 的 participant

        if (participant_ == nullptr)
        {
            return false;
        }

        // Register the Type
        type_.register_type(participant_);

        // Create the publications Topic
        topic_ = participant_->create_topic("HelloWorldTopic", "HelloWorld", TOPIC_QOS_DEFAULT);// 创建 topic,参数:topicname,typename,qos

        if (topic_ == nullptr)
        {
            return false;
        }

        // Create the Publisher
        publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT, nullptr);

        if (publisher_ == nullptr)
        {
            return false;
        }

        // Create the DataWriter
        writer_ = publisher_->create_datawriter(topic_, DATAWRITER_QOS_DEFAULT, &listener_);

        if (writer_ == nullptr)
        {
            return false;
        }
        return true;
    }

    //!Send a publication
    bool publish()
    {
        if (listener_.matched_ > 0)
        {
            hello_.index(hello_.index() + 1);
            writer_->write(&hello_);
            return true;
        }
        return false;
    }

    //!Run the Publisher
    void run(
            uint32_t samples)
    {
        uint32_t samples_sent = 0;
        while (samples_sent < samples)
        {
            if (publish())
            {
                samples_sent++;
                std::cout << "Message: " << hello_.message() << " with index: " << hello_.index()
                            << " SENT" << std::endl;
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        }
    }
};

int main(
        int argc,
        char** argv)
{
    std::cout << "Starting publisher." << std::endl;
    int samples = 10;

    HelloWorldPublisher* mypub = new HelloWorldPublisher();
    if(mypub->init())
    {
        mypub->run(static_cast<uint32_t>(samples));
    }

    delete mypub;
    return 0;
}

从代码看,要发布一个消息,需要满足以下条件:

    1. 创建要发布数据的idl,并通过fast-gen生成对应的数据结构,并生成该数据结构序列化的类
    1. 通过DomainParticipantFactory创建participant,参数为domain idqos
    1. participant注册到type,该类型提供helloworld的序列化支持
    1. 通过participant创建publisher_,用于发布数据
    1. PubListener继承于DataWriterListener,用于给发布者注册消息通知函数。DDS通过服务发现,将消息最终给到应用
    1. publisher_创建writer_,用于最终的消息发布

相关类的基本功能如下:

  • DomainParticipantFactory:创建和销毁DomainParticipant对象
  • DomainParticipant:作为其他所有实例对象的容器,并扮演PublisherSubscriberTopic对象的工厂
  • TypeSupport:通过注册topic对应的数据结构,提供序列化以及获取指定数据类型的key支持
  • Publisher:负责创建DataWriters.
  • DataWriter:为应用提供通过指定topic发布数据的能力
  • DataWriterListener:提供重写DataWriterListener的接口,子类可以在on_publication_matched中通过传递的PublicationMatchedStatus来判断是否满足发布条件

以上功能依赖dds的以下库:

  • fastcdr:提供序列化功能
  • fastrtps:提供数据交互功能

订阅者代码分解

wget -O HelloWorldSubscriber.cpp \
    https://raw.githubusercontent.com/eProsima/Fast-RTPS-docs/master/code/Examples/C++/DDSHelloWorld/src/HelloWorldSubscriber.cpp
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
 * @file HelloWorldSubscriber.cpp
 *
 */

#include "HelloWorldPubSubTypes.h"

#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/dds/subscriber/Subscriber.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/subscriber/DataReaderListener.hpp>
#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>
#include <fastdds/dds/subscriber/SampleInfo.hpp>

using namespace eprosima::fastdds::dds;

class HelloWorldSubscriber
{
private:
    DomainParticipant* participant_;
    Subscriber* subscriber_;
    DataReader* reader_;
    Topic* topic_;
    TypeSupport type_;

    class SubListener : public DataReaderListener
    {
    public:
        SubListener()
            : samples_(0)
        {
        }

        ~SubListener() override
        {
        }

        void on_subscription_matched(
                DataReader*,
                const SubscriptionMatchedStatus& info) override
        {
            if (info.current_count_change == 1)
            {
                std::cout << "Subscriber matched." << std::endl;
            }
            else if (info.current_count_change == -1)
            {
                std::cout << "Subscriber unmatched." << std::endl;
            }
            else
            {
                std::cout << info.current_count_change
                        << " is not a valid value for SubscriptionMatchedStatus current count change" << std::endl;
            }
        }

        void on_data_available(
                DataReader* reader) override
        {
            SampleInfo info;
            if (reader->take_next_sample(&hello_, &info) == ReturnCode_t::RETCODE_OK)
            {
                if (info.valid_data)
                {
                    samples_++;
                    std::cout << "Message: " << hello_.message() << " with index: " << hello_.index()
                                << " RECEIVED." << std::endl;
                }
            }
        }

        HelloWorld hello_;
        std::atomic_int samples_;
    } listener_;

public:

    HelloWorldSubscriber()
        : participant_(nullptr)
        , subscriber_(nullptr)
        , topic_(nullptr)
        , reader_(nullptr)
        , type_(new HelloWorldPubSubType())
    {
    }

    virtual ~HelloWorldSubscriber()
    {
        if (reader_ != nullptr)
        {
            subscriber_->delete_datareader(reader_);
        }
        if (topic_ != nullptr)
        {
            participant_->delete_topic(topic_);
        }
        if (subscriber_ != nullptr)
        {
            participant_->delete_subscriber(subscriber_);
        }
        DomainParticipantFactory::get_instance()->delete_participant(participant_);
    }

    //!Initialize the subscriber
    bool init()
    {
        DomainParticipantQos participantQos;
        participantQos.name("Participant_subscriber");
        participant_ = DomainParticipantFactory::get_instance()->create_participant(0, participantQos);

        if (participant_ == nullptr)
        {
            return false;
        }

        // Register the Type
        type_.register_type(participant_);

        // Create the subscriptions Topic
        topic_ = participant_->create_topic("HelloWorldTopic", "HelloWorld", TOPIC_QOS_DEFAULT);

        if (topic_ == nullptr)
        {
            return false;
        }

        // Create the Subscriber
        subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT, nullptr);

        if (subscriber_ == nullptr)
        {
            return false;
        }

        // Create the DataReader
        reader_ = subscriber_->create_datareader(topic_, DATAREADER_QOS_DEFAULT, &listener_);

        if (reader_ == nullptr)
        {
            return false;
        }

        return true;
    }

    //!Run the Subscriber
    void run(
        uint32_t samples)
    {
        while(listener_.samples_ < samples)
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    }
};

int main(
        int argc,
        char** argv)
{
    std::cout << "Starting subscriber." << std::endl;
    int samples = 10;

    HelloWorldSubscriber* mysub = new HelloWorldSubscriber();
    if(mysub->init())
    {
        mysub->run(static_cast<uint32_t>(samples));
    }

    delete mysub;
    return 0;
}

DomainParticipantFactoryDomainParticipantTypeSupport是发布者和订阅者都会使用的类。和发布者不一样的是,订阅者将Publisher改为了Subscriber,将DataWriter改为了DataReader,将DataWriterListener改为了DataReaderListener、将DataWriterQos该为了DataReaderQos

  • Subscriber:负责创建和配置DataReader
  • DataReader.:负责数据的实际接收。它注册与topic绑定的数据,该topic用于标记数据并可以读并访问订阅者接收的数据
  • DataReaderListener:这是分配给数据读取者的监听器
  • DataReaderQoS:定义DataReaderQos
  • SampleInfo:读取或者采集样本的附带信息
上一篇:遍历搜索方法
下一篇:没有了
网友评论