(文章目录)
前言
1.事件总线概念
事件总线是一种事件发布/订阅结构,通过发布订阅模式可以解耦不同架构层级,同样它也可以来解决业务之间的耦合,它有以下优点
- 松耦合
- 横切关注点
- 可测试性
- 事件驱动
事件总线类型
- 进程内事件 (Event):本地事件,它的发布与订阅需要在同一个进程中,订阅方与发布方需要在同一个项目中
- 跨进程事件 (IntegrationEvent):集成事件,它的发布与订阅一定不在同一个进程中,订阅方与发布方可以在同一个项目中,也可以在不同的项目中
2.发布-订阅概念
发布-订阅模式其实是一种对象间一对多的依赖关系,当一个对象的状态发送改变时,所有依赖于它的对象都将得到状态改变的通知。
订阅者(Subscriber)把自己想订阅的事件注册(Subscribe)到调度中心(Event Channel),当发布者(Publisher)发布该事件(Publish Event)到调度中心,也就是该事件触发时,由调度中心统一调度(Fire Event)订阅者注册到调度中心的处理代码。
本文主要介绍进程内事件总线
一、.NET Core使用事件总线(进程内事件总线)
1.安装包
新建ASP.NET Core 空项目Assignment.InProcessEventBus,并安装Masa.Contrib.Dispatcher.Events
Install-Package Masa.Contrib.Dispatcher.Events -version 0.7.0-preview.7
2.基本使用
1、注册EventBus (用于发布本地事件)
builder.Services.AddEventBus();
2、新增RegisterUserEvent类并继承Event,用于发布注册用户事件
public record RegisterUserEvent : Event
{
public string Account { get; set; }
public string Email { get; set; }
public string Password { get; set; }
}
3、新增注册用户处理程序
注册用户的处理程序可以放到任意一个类中,但其构造函数参数必须支持从DI获取,且处理程序的方法仅支持 Task或 Void 两种, 不支持其它类型
//用户处理任务类
public class UserHandler
{
private readonly ILogger<UserHandler>? _logger;
public UserHandler(ILogger<UserHandler>? logger = null)
{
//todo: 根据需要可在构造函数中注入其它服务 (需支持从DI获取)
_logger = logger;
}
[EventHandler]
public void RegisterUser(RegisterUserEvent @event)
{
//todo: 1. 编写注册用户业务
_logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户");
//todo: 2. 编写发送注册通知等
_logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送邮件提示注册成功");
}
}
4、发送注册用户事件
app.MapPost("/register", async (RegisterUserEvent @event, IEventBus eventBus) =>
{
await eventBus.PublishAsync(@event);
});
5、效果
二、进阶用法(参数校验)
根据需要我们可以自定义中间件,并注册到EventBus的请求管道中,比如通过增加FluentValidation, 将参数验证从业务代码中剥离开来,从而使得处理程序更专注于业务。
1.安装nueget包
FluentValidation
2.注册管道
//添加参数验证
builder.Services.AddValidatorsFromAssembly(Assembly.GetEntryAssembly());
3.自定义验证中间件ValidatorMiddleware.cs,用于验证参数
/// <summary>
/// 参数验证中间件(管道)
/// </summary>
/// <typeparam name="TEvent"></typeparam>
public class ValidatorMiddleware<TEvent> : Middleware<TEvent>
where TEvent : IEvent
{
private readonly ILogger<ValidatorMiddleware<TEvent>>? _logger;
private readonly IEnumerable<IValidator<TEvent>> _validators;
public ValidatorMiddleware(IEnumerable<IValidator<TEvent>> validators, ILogger<ValidatorMiddleware<TEvent>>? logger = null)
{
_validators = validators;
_logger = logger;
}
public override async Task HandleAsync(TEvent @event, EventHandlerDelegate next)
{
var typeName = @event.GetType().FullName;
_logger?.LogDebug("----- Validating command {CommandType}", typeName);
var failures = _validators
.Select(v => v.Validate(@event))
.SelectMany(result => result.Errors)
.Where(error => error != null)
.ToList();
if (failures.Any())
{
_logger?.LogError("Validation errors - {CommandType} - Event: {@Command} - Errors: {@ValidationErrors}",
typeName,
@event,
failures);
throw new ValidationException("Validation exception", failures);
}
await next();
}
}
4.注册EventBus并使用验证中间件ValidatorMiddleware
builder.Services.AddEventBus(eventBusBuilder=>eventBusBuilder.UseMiddleware(typeof(ValidatorMiddleware<>)));
5.添加注册用户验证类RegisterUserEventValidator.cs
public class RegisterUserEventValidator : AbstractValidator<RegisterUserEvent>
{
public RegisterUserEventValidator()
{
RuleFor(e => e.Account).NotNull().WithMessage("用户名不能为空");
RuleFor(e => e.Email).NotNull().WithMessage("邮箱不能为空");
RuleFor(e => e.Password)
.NotNull().WithMessage("密码不能为空")
.MinimumLength(6)
.WithMessage("密码必须大于6位")
.MaximumLength(20)
.WithMessage("密码必须小于20位");
}
}
6.效果
二、进阶用法(事件编排)
1.将注册用户业务拆分为三个Handler,并通过指定Order的值来对执行事件排序
using Masa.Contrib.Dispatcher.Events;
namespace Assignment.InProcessEventBus
{
//用户处理任务类
public class UserHandler
{
private readonly ILogger<UserHandler>? _logger;
public UserHandler(ILogger<UserHandler>? logger = null)
{
//todo: 根据需要可在构造函数中注入其它服务 (需支持从DI获取)
_logger = logger;
}
//[EventHandler]
//public void RegisterUser(RegisterUserEvent @event)
//{
// //todo: 1. 编写注册用户业务
// _logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户");
// //todo: 2. 编写发送注册通知等
// _logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送邮件提示注册成功");
//}
//---------------------------------------事件编排--------------------------------------
[EventHandler(1)]
public void RegisterUser(RegisterUserEvent @event)
{
_logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户");
//todo: 编写注册用户业务
}
[EventHandler(2)]
public void SendAwardByRegister(RegisterUserEvent @event)
{
_logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送注册奖励");
//todo: 编写发送奖励等
}
[EventHandler(3)]
public void SendNoticeByRegister(RegisterUserEvent @event)
{
_logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送注册成功邮件");
//todo: 编写发送注册通知等
}
}
}
三、进阶用法(分布式事务)
1.Saga概念
Saga 设计模式是一种在分布式事务场景中跨微服务管理数据一致性的方法。Saga 是一系列事务,用于更新每项服务并发布消息或事件来触发下一个事务步骤。如果某个步骤失败,则 Saga 将执行补偿事务,以抵消上一个事务的影响。
2.Saga分布式事务源码
当发送奖励出现异常时,则执行补偿机制,执行顺序如下
1-2-3-4(执行失败)-3-2-1
//用户处理任务类
public class UserHandler
{
private readonly ILogger<UserHandler>? _logger;
public UserHandler(ILogger<UserHandler>? logger = null)
{
//todo: 根据需要可在构造函数中注入其它服务 (需支持从DI获取)
_logger = logger;
}
//[EventHandler]
//public void RegisterUser(RegisterUserEvent @event)
//{
// //todo: 1. 编写注册用户业务
// _logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户");
// //todo: 2. 编写发送注册通知等
// _logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送邮件提示注册成功");
//}
//---------------------------------------事件编排--------------------------------------
[EventHandler(1)]
public void RegisterUser(RegisterUserEvent @event)
{
_logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户");
//todo: 编写注册用户业务
}
[EventHandler(2)]
public void SendAwardByRegister(RegisterUserEvent @event)
{
_logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送注册奖励");
//todo: 编写发送奖励等
}
//---------------------------------------Saga执行奖励补偿--------------------------------------
[EventHandler(1, IsCancel = true)]
public void CancelSendAwardByRegister(RegisterUserEvent @event)
{
_logger?.LogDebug("-----------{Account} 注册成功,发放奖励失败 {Message}-----------", @event.Account, "发放奖励补偿");
}
//---------------------------------------Saga不执行奖励补偿--------------------------------------
[EventHandler(3, FailureLevels = FailureLevels.Ignore)]
public void SendNoticeByRegister(RegisterUserEvent @event)
{
_logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送邮件提示注册成功");
//todo: 编写发送注册通知等
throw new Exception("取消");
}
}
3.参数解读
3.1 EventHandler
- FailureLevels: 失败级别, 默认: Throw
- Throw:发生异常后,依次执行Order小于当前Handler的Order的取消动作,比如:Handler顺序为 1、2、3,CancelHandler为 1、2、3,如果执行 Handler3 异常,则依次执行 2、1
- ThrowAndCancel:发生异常后,依次执行Order小于等于当前Handler的Order的取消动作,比如:Handler顺序为 1、2、3,CancelHandler为 1、2、3,如果执行 Handler3 异常,则依次执行 3、2、1
- Ignore:发生异常后,忽略当前异常(不执行取消动作),继续执行其他Handler
- Order: 执行顺序,默认: int.MaxValue,用于控制当前方法的执行顺序
- EnableRetry: 当Handler异常后是否启用重试, 默认: false
- RetryTimes: 重试次数,当出现异常后执行多少次重试, 需开启重试配置
- IsCancel: 是否是补偿机制,默认: false