当前位置 : 主页 > 编程语言 > 其它开发 >

MassTransit 入门(一)

来源:互联网 收集:自由互联 发布时间:2022-05-15
本文地址源码 MassTransit是一个面向.net的免费开源分布式应用程序框架。 MassTransit使得创建应用程序和服务变得很容易,这些应用程序和服务利用基于消息的、松散耦合的异步通信来获得

本文地址源码

  • MassTransit是一个面向.net的免费开源分布式应用程序框架。
  • MassTransit使得创建应用程序和服务变得很容易,这些应用程序和服务利用基于消息的、松散耦合的异步通信来获得更高的可用性、可靠性和可伸缩性。
  • MassTransit 8.x版本。
实现简单发布订阅
  • 添加Nuget包引用:
    • MassTransit
    • MassTransit.RabbitMQ(演示也可基于内存)
生产端
  • 配置MassTransit
builder.Services.AddMassTransit(x =>
{
   
    // 使用内存
    //x.UsingInMemory();
    // 使用RabbitMq
    x.UsingRabbitMq((context, config) =>
    {
        config.Host("rabbitmq://localhost:5672", host =>
        {
            host.Username("admin");
            host.Password("admin");
        });
    });
});
  • 定义消息体
public class OrderEto
{
    public Guid Id { get; init; }

    public string Name { get; set; }
    
    public DateTime CreationTime { get; set; }
}
  • 发布消息
[ApiController]
[Route("[controller]")]
public class PublishController : ControllerBase
{
    private readonly ILogger<PublishController> _logger;
    private readonly IPublishEndpoint _publishEndpoint;

    public PublishController(ILogger<PublishController> logger, IPublishEndpoint publishEndpoint)
    {
        _logger = logger;
        _publishEndpoint = publishEndpoint;
    }

    [HttpGet]
    public async Task Get()
    {
        await _publishEndpoint.Publish<OrderEto>(new OrderEto()
        {
            Id = Guid.NewGuid(),
            Name = "Phone",
            CreationTime = DateTime.Now
        });
    }
}
消费者端
builder.Services.AddMassTransit(x =>
{
    
    // 通过扫描程序集注册消费者
    x.AddConsumers(typeof(Program).Assembly);
   
    // 通过类型单个注册消费者
    // x.AddConsumer<OrderEtoConsumer>(typeof(OrderEtoConsumerDefinition));
    
    // x.SetKebabCaseEndpointNameFormatter();
    
    // 通过泛型单个注册消费者
    //x.AddConsumer<OrderEtoConsumer, OrderEtoConsumerDefinition>();
    
    // 通过指定命名空间注册消费者
    // x.AddConsumersFromNamespaceContaining<OrderEtoConsumer>();
    
    // 使用内存队列
    // x.UsingInMemory();
    x.UsingRabbitMq((context, config) =>
    {
      
        config.Host("rabbitmq://localhost:5672", hostconfig =>
        {
            hostconfig.Username("admin");
            hostconfig.Password("admin");
        });
        
        config.ConfigureEndpoints(context);
       
    });
});
  • 消费者定义
public class OrderEtoConsumer : IConsumer<OrderEto>
{
    private readonly ILogger<OrderEtoConsumer> _logger;

    public OrderEtoConsumer(ILogger<OrderEtoConsumer> logger)
    {
        _logger = logger;
    }

    public Task Consume(ConsumeContext<OrderEto> context)
    {
        _logger.LogInformation($"MassTransit.Consumer.One 收到消息:{JsonSerializer.Serialize(context.Message)}");
        return Task.CompletedTask;
    }
}

public class OrderEtoConsumerDefinition : ConsumerDefinition<OrderEtoConsumer>
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<OrderEtoConsumer> consumerConfigurator)
    {
        endpointConfigurator.UseMessageRetry(r => r.Intervals(500, 1000));
    }
}

Abp Vnext Vue实现
  • Github地址
  • 文档地址
  • 演示地址
上一篇:DRF JWT认证(二)
下一篇:没有了
网友评论