当前位置 : 主页 > 网络安全 > 测试自动化 >

自动化RabbitMQ消费者测试

来源:互联网 收集:自由互联 发布时间:2021-06-19
我有一个.net微服务使用RabbitMQ客户端接收消息,我需要测试以下内容: 1- consumer已成功连接到rabbitMq主机. 2-消费者正在听队列. 3-消费者正在成功接收消息. 为了实现上述目标,我创建了一
我有一个.net微服务使用RabbitMQ客户端接收消息,我需要测试以下内容:

1- consumer已成功连接到rabbitMq主机.

2-消费者正在听队列.

3-消费者正在成功接收消息.

为了实现上述目标,我创建了一个发送消息的示例应用程序,我正在调试消费者以确保它正在接收消息.

如何自动完成此测试?因此将它包含在我的微服务CI中.

我正在考虑将我的示例应用程序包含在我的CI中,这样我就可以发出一条消息,然后运行一个消费者单元测试,等待特定时间然后通过,如果收到消息,但这似乎是一个错误的做法,因为测试将无法启动直到几秒钟消息被触发.

我想到的另一种方法是从单元测试本身触发示例应用程序,但如果示例应用程序无法工作,则会导致服务故障.

是否有通过RabbitMQ连接的微服务集成测试的最佳实践?

我已经建立了很多这样的测试.我已经抛出了一些基本代码
Github here with .NET Core 2.0.

您需要一个RabbitMQ集群来进行这些自动化测试.每个测试都从消除队列开始,以确保不存在任何消息.来自另一个测试的预先存在的消息将打破当前测试.

我有一个简单的帮助器来删除队列.在我的应用程序中,它们总是声明自己的队列,但如果不是你的情况那么你将不得不再次创建队列和任何交换的任何绑定.

public class QueueDestroyer
{
    public static void DeleteQueue(string queueName, string virtualHost)
    {
        var connectionFactory = new ConnectionFactory();
        connectionFactory.HostName = "localhost";
        connectionFactory.UserName = "guest";
        connectionFactory.Password = "guest";
        connectionFactory.VirtualHost = virtualHost;
        var connection = connectionFactory.CreateConnection();
        var channel = connection.CreateModel();
        channel.QueueDelete(queueName);
        connection.Close();
    }
}

我创建了一个非常简单的消费者示例,代表您的微服务.它在任务中运行直到取消.

public class Consumer
{
    private IMessageProcessor _messageProcessor;
    private Task _consumerTask;

    public Consumer(IMessageProcessor messageProcessor)
    {
        _messageProcessor = messageProcessor;
    }

    public void Consume(CancellationToken token, string queueName)
    {
        _consumerTask = Task.Run(() =>
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: queueName,
                                    durable: false,
                                    exclusive: false,
                                    autoDelete: false,
                                    arguments: null);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        _messageProcessor.ProcessMessage(message);
                    };
                    channel.BasicConsume(queue: queueName,
                                        autoAck: false,
                                            consumer: consumer);

                    while (!token.IsCancellationRequested)
                        Thread.Sleep(1000);
                }
            }
        });
    }

    public void WaitForCompletion()
    {
        _consumerTask.Wait();
    }

}

使用者具有IMessageProcessor接口,该接口将执行处理消息的工作.在我的集成测试中,我创建了一个假的.您可能会使用您首选的模拟框架.

测试发布者向队列发布消息.

public class TestPublisher
{
    public void Publish(string queueName, string message)
    {
        var factory = new ConnectionFactory() { HostName = "localhost", UserName="guest", Password="guest" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(exchange: "",
                                    routingKey: queueName,
                                    basicProperties: null,
                                    body: body);
        }
    }
}

我的示例测试如下所示:

[Fact]
public void If_SendMessageToQueue_ThenConsumerReceiv4es()
{
    // ARRANGE
    QueueDestroyer.DeleteQueue("queueX", "/");
    var cts = new CancellationTokenSource();
    var fake = new FakeProcessor();
    var myMicroService = new Consumer(fake);

    // ACT
    myMicroService.Consume(cts.Token, "queueX");

    var producer = new TestPublisher();
    producer.Publish("queueX", "hello");

    Thread.Sleep(1000); // make sure the consumer will have received the message
    cts.Cancel();

    // ASSERT
    Assert.Equal(1, fake.Messages.Count);
    Assert.Equal("hello", fake.Messages[0]);
}

我的假是这样的:

public class FakeProcessor : IMessageProcessor
{
    public List<string> Messages { get; set; }

    public FakeProcessor()
    {
        Messages = new List<string>();
    }

    public void ProcessMessage(string message)
    {
        Messages.Add(message);
    }
}

其他建议是:

>如果您可以将随机文本附加到队列中并在每次测试运行时交换名称,那么这样做是为了避免并发测试相互干扰>如果您的应用程序不这样做,我在代码中也有一些助手来声明队列,交换和绑定.>编写一个连接杀手类,它将强制关闭连接并检查您的应用程序是否仍然有效并可以恢复.我有代码,但不是在.NET Core中.只要问我它,我可以修改它以在.NET Core中运行.>总的来说,我认为你应该避免在集成测试中包含其他微服务.如果您将消息从一个服务发送到另一个服务并期望回复消息,那么创建一个可以模拟预期行为的假消费者.如果您收到来自其他服务的消息,则在集成测试项目中创建虚假发布者.

网友评论