在上位机和下位机或者服务端和客户端通信的时候,很多时候可能为了赶项目进度或者写代码方便直接使用Socket通信,传输string类型的关键字驱动对应的事件,这就有可能导致程序中存在大量的Switch-Case、If-Else判断。当通信的逻辑越来越复杂,增加的关键字就越来越多,导致一个事件处理类中不断的累加成千上万的Switch-Case、If-Else代码,导致后期的代码极其难以维护。
当大家在看到大量的Switch-Case、If-Else代码堆积在一起肯定会感觉非常的头痛,其中的业务逻辑就如同线团一样错综复杂。我曾经在一家规模不小的大厂中就看到一个客户端的工程中,有一个上万行代码的消息处理类,其中的消息关键字嵌套关键字,Switch-Case代码占据大壁江山,而项目中的其他人员没有愿意接这个工程的,久而久之成了代代相传的祖传代码。
那么,如何去优化干掉Switch-Case、If-Else?其实这个话题可以写成一个系列,根据不同的情况使用不同的设计模式去进行重构和优化。今天推荐的订阅发布模式+事件驱动模式就是针对以上在网络通信过程中的代码优化,使用消息中间件RabbitMQ替代Socket。
如果您接触过、使用过RabbitMQ,本文中的代码也许更能理解,以下代码仅代表个人的开发经验与大家一起分享学习,如有异议欢迎沟通讨论。
订阅发布模式+事件驱动模式概念订阅发布模式,将消息分为不同的类型,发布者无需关心有哪些订阅者存在,而订阅者只关心自己感兴趣的消息,无需关心有哪些发布者存在。
事件驱动模式,程序的行为是通过事件驱动的,程序能够实时感知自己感兴趣的事件,并对不同的事件作出不同的反应。
实现思路
消息中间件RabbitMQ
本文的订阅发布模式+事件驱动模式是借助RabbitMQ来实现,需要确保本地电脑已经安装RabbitMQ的相关环境,然后在VS中创建一个解决方案MyRBQServer,并添加两个控制台程序MyRBQClient、MyRBQServer,一个类库MyRBQPolisher,并安装Nuget包 RabbitMQ.Client。
MyRBQClient:模拟客户端,其实在本文的设计中没有明显的客户端和服务端的概念,客户端和服务端都可以发布和订阅事件。
MyRBQPolisher:消息的接口实现库,其中包含用于发布和订阅事件的接口 IPublisher和各种消息的定义,这个库是本文代码中的重点。
MyRBQServer:模拟服务端。
实现MyRBQPolisher
在MyRBQPolisher中添加一个消息基类EventBase和一个消息处理类的泛型接口IMessageEvent,IMessageEvent的类型需要约束为EventBase的子类。
EventBase是所有消息事件的基类,而IMessageEvent则是对应消息事件的处理接口,后面会使用反射并动态创建对象调用Invoke方法。
public abstract class EventBase { }
public interface IMessageEvent<EventType> where EventType: EventBase { Task Invoke(EventType eventBase); }
然后再添加一个发布订阅的接口IPublisher,并添加订阅接口Subscribe和发布接口Publish, 此处我把订阅的接口和发布的接口写在同一个类中,实际应用的时候也可以分开。
public interface IPublisher { void Subscribe<TC, TH>() where TC : EventBase where TH : IMessageEvent<TC>; void Publish<TC>(TC @event) where TC:EventBase; }
再添加一个接口IEventManager和对应的接口实现类EventManager,这是一个用来管理注册事件的处理类,在这个实现类中使用Dictionary来保存消息事件和消息处理类的对应关系。
public interface IEventManager { void Subscribe<TC, TH>() where TC : EventBase where TH : IMessageEvent<TC>; Type GetEventHandleType(string eventKey); Type GetEventType(string eventKey); }
public class EventManager : IEventManager { private Dictionary<string, Type> _messageEvents = new Dictionary<string, Type>();//保存消息事件和对应的消息处理类型 private List<Type> _eventTypes = new List<Type>(); /// <summary> /// 获取消息对应的处理类型 /// </summary> /// <param name="eventKey"></param> /// <returns></returns> public Type GetEventHandleType(string eventKey) { if (_messageEvents.ContainsKey(eventKey)) { return _messageEvents[eventKey]; } return null; } /// <summary> /// 获取消息类型 /// </summary> /// <param name="eventKey"></param> /// <returns></returns> public Type GetEventType(string eventKey) { return _eventTypes.FirstOrDefault(s=>s.Name == eventKey); } /// <summary> /// 注册消息事件类型和消息处理类型 /// </summary> /// <typeparam name="TC"></typeparam> /// <typeparam name="TH"></typeparam> public void Subscribe<TC, TH>() where TC : EventBase where TH : IMessageEvent<TC> { string eventKey = typeof(TC).Name; if (_messageEvents.ContainsKey(eventKey)) { throw new Exception("The same event has been subscribe"); } _messageEvents.Add(eventKey, typeof(TH)); _eventTypes.Add(typeof(TC)); } }
添加一个ServiceProcesser类,这是RabbitMQ的业务逻辑实现类,用来发送消息,注册消息接收的回调事件。
public class ServiceProcesser { private const string EXCHANGE_NAME = "ServiceProcesser"; private const string QUEUE_NAME= "domain.event"; private static object _sync = new object(); private IConnectionFactory _connectionFactory; private IConnection _connection; private IEventManager _eventManager; private IModel _consumeChannel; public bool IsConnected => (_connection?.IsOpen).GetValueOrDefault(false); public ServiceProcesser(IEventManager eventManager, IConnectionFactory connectionFactory) { _connectionFactory = connectionFactory; _eventManager = eventManager; CreateConsumer(); } /// <summary> /// 将eventBaseJson 序列化并作为消息发送 /// </summary> /// <param name="eventBase"></param> public void Send(EventBase eventBase) { using (var channel = _connection.CreateModel()) { string evenKey = eventBase.GetType().Name; channel.ExchangeDeclare(EXCHANGE_NAME, "direct"); //Json序列化类来发送消息 string message = JsonConvert.SerializeObject(eventBase); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(EXCHANGE_NAME, evenKey, null, body); } } public bool TryConnection() { lock (_sync) { try { if (!IsConnected) { _connection = _connectionFactory.CreateConnection(); } return true; } catch (Exception) { //写日志 return false; } } } /// <summary> /// 使用消息事件的名字绑定路由和队列 /// </summary> /// <param name="eventName"></param> public void BindEvent(string eventName) { if (!IsConnected) { TryConnection(); } using (var channel = _connection.CreateModel()) { channel.QueueDeclare(QUEUE_NAME, true, false, false, null); channel.QueueBind(QUEUE_NAME,EXCHANGE_NAME,eventName); } } /// <summary> /// 注册RabbitMQ消费者的回调接口 /// </summary> private void CreateConsumer() { if (!IsConnected) { TryConnection(); } _consumeChannel = _connection.CreateModel(); _consumeChannel.ExchangeDeclare(EXCHANGE_NAME, "direct"); _consumeChannel.QueueDeclare(QUEUE_NAME, true, false, false, null); var consumer = new EventingBasicConsumer(_consumeChannel); consumer.Received += async (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); await ProcessEvent(ea.RoutingKey, message); }; _consumeChannel.BasicConsume(QUEUE_NAME, true, consumer); } /// <summary> /// 通过反射动态消息处理类来动态调用消息处理接口Invoke /// </summary> /// <param name="routeKey"></param> /// <param name="message"></param> /// <returns></returns> private async Task ProcessEvent(string routeKey, string message) { Type eventType = _eventManager.GetEventType(routeKey); if (eventType != null) { object @event = JsonConvert.DeserializeObject(message, eventType); if(@event != null && @event is EventBase) { var handleType = _eventManager.GetEventHandleType(eventType.Name); object handler = Activator.CreateInstance(handleType); await (Task)(typeof(IMessageEvent<>)).MakeGenericType(eventType).GetMethod("Invoke").Invoke(handler,new object[1] { @event }); } } } }
发送消息的Send方法使用参数EventBase类型的事件对象,并将该对象进行Json的序列化作为消息发送。
/// <summary> /// 将eventBaseJson 序列化并作为消息发送 /// </summary> /// <param name="eventBase"></param> public void Send(EventBase eventBase) { using (var channel = _connection.CreateModel()) { string evenKey = eventBase.GetType().Name; channel.ExchangeDeclare(EXCHANGE_NAME, "direct"); //Json序列化类来发送消息 string message = JsonConvert.SerializeObject(eventBase); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(EXCHANGE_NAME, evenKey, null, body); } }
ProcessEvent是消息的接收回调处理方法,在这个方法中根据获取到的事件名称调用IEventManager的接口查找对应的事件类型和消息处理类型,并通过反射动态创建消息处理类并调用处理的接口Invoke。
/// <summary> /// 通过反射动态消息处理类来动态调用消息处理接口Invoke /// </summary> /// <param name="routeKey"></param> /// <param name="message"></param> /// <returns></returns> private async Task ProcessEvent(string routeKey, string message) { Type eventType = _eventManager.GetEventType(routeKey); if (eventType != null) { object @event = JsonConvert.DeserializeObject(message, eventType); if(@event != null && @event is EventBase) { var handleType = _eventManager.GetEventHandleType(eventType.Name); object handler = Activator.CreateInstance(handleType); await (Task)(typeof(IMessageEvent<>)).MakeGenericType(eventType).GetMethod("Invoke").Invoke(handler,new object[1] { @event }); } } }
最后添加 IPublisher的实现类 Publisher
public class Publisher : IPublisher { private const string HOST_NAME = "localhost"; private const string USER_NAME = "admin"; private const string PASSWORD = "admin"; private ServiceProcesser _serviceProcesser; private IEventManager _eventManager; public Publisher() { var connectionFactory = new ConnectionFactory(); connectionFactory.HostName = HOST_NAME; connectionFactory.UserName = USER_NAME; connectionFactory.Password = PASSWORD; _eventManager = new EventManager(); _serviceProcesser = new ServiceProcesser(_eventManager, connectionFactory); } public void Publish<TC>(TC @event) where TC : EventBase { if (!_serviceProcesser.IsConnected) { _serviceProcesser.TryConnection(); } _serviceProcesser.Send(@event); } public void Subscribe<TC, TH>() where TC : EventBase where TH : IMessageEvent<TC> { _eventManager.Subscribe<TC,TH>(); _serviceProcesser.BindEvent(typeof(TC).Name); } }
客户端和服务端订阅发布事件
在MyRBQPolisher类库中添加一个消息的定义类HelloWorldEvent并继承消息基类EventBase
public class HelloWorldEvent:EventBase { public string MyName { get; set; } = "Joiun"; }
MyRBQClient和MyRBQServer分别引用类库MyRBQPolisher。
在MyRBQClient中添加一个HelloWorldEvent的消息处理类HelloWroldHandler,
public class HelloWroldHandler : IMessageEvent<HelloWorldEvent> { public Task Invoke(HelloWorldEvent eventBase) { Console.WriteLine(eventBase?.MyName); return Task.FromResult(0); } }
在Main方法中创建消息接口IPublisher和对应的实现类,并订阅HelloWorldEvent,注册对应的消息处理类,这样一来,就实现了对消息HelloWorldEvent的处理,处理的逻辑包含在HelloWroldHandler的Invoke方法中。
static void Main(string[] args) { IPublisher publisher = new Publisher(); publisher.Subscribe<HelloWorldEvent, HelloWroldHandler>(); Console.Read(); }
而MyRBQServer的Main方法就更简单了,只要创建IPunlisher的接口并发送一个HelloWorldEvent的对象既可。
static void Main(string[] args) { IPublisher publisher = new Publisher(); publisher.Publish(new HelloWorldEvent()); Console.WriteLine("Send Success"); Console.ReadLine(); }
运行程序,结果:
而MyRBQServer如果需要订阅自己发送的消息,也可以创建一个自己的HelloWroldHandler处理类并注册订阅即可。
在上述代码中,HelloWorldEvent就相当于是一个WCF中的消息契约,需要通信的双方规定好消息的格式,否则会有序列化方面的问题,而这样做的好处就是可以将原本的一个个消息关键字和对应的行为分散到了不同的消息类和处理类中,并订阅感兴趣的事件来驱动对应的行为,不同事件行为之间互不影响,松耦合,将一个上万行代码的消息处理类化整为零,没有大量的String类型的关键字,没有Switch Case,If Else判断。
需要注意的是,Inovke方法的每一次动态调用都是在不同的子线程中调用,如果需要在Invoke中处理UI相关的代码,则可以借助主线程的上下文来更新。