当前位置 : 主页 > 网络编程 > net编程 >

.net C# 开发通用 MQTT Client

来源:互联网 收集:自由互联 发布时间:2023-09-06
0、业务需求 公司部署了RabbitMq消息队列并且开启Mqtt协议,我们的SCADA平台也可以通过Mqtt协议发送业务报文。那么需要编写客户端根据具体报文执行相应的操作,如工单上线时切换ESOP,

0、业务需求

公司部署了RabbitMq消息队列并且开启Mqtt协议,我们的SCADA平台也可以通过Mqtt协议发送业务报文。那么需要编写客户端根据具体报文执行相应的操作,如工单上线时切换ESOP,成品下线时计算产量、OEE等。

每个业务都写一个监听不合适,全部写在一起也不合适,那么就使用配置文件配置各个业务处理类,反射方式加载业务程序集的方式来实现高内聚低耦合。


1、消息订阅类

Subscriber类创建mqtt客户,加载策略注册topic,加载业务dll 匹配策略。

public class Subscriber
    {
        private static string connStr = System.Configuration.ConfigurationManager.AppSettings["CMMQ"];
        private static IMqttClient _mqttClient;
        private static log4net.ILog logger = log4net.LogManager.GetLogger("InfoLog");
        private static Dictionary<string, Strategy> strategyDic = new Dictionary<string, Strategy>();
        public static void Run()
        {
            //1)读取策略文件解析topic 和处理类
            //2)初始化mqttclient 并绑定topic
            //3)接收数据时处理根据topic 调用处理类
            InitStrategy();
            MQConfig mqConfig = new ConfigInfoHelper().GetMQConfig(connStr);
            MqttClient(mqConfig);
            //Thread t = new Thread(() => MqttClient(mqConfig));

            //t.Start();


            Console.WriteLine("Subscribe Listening for message.Hit <Enter> to quit.");
            Console.ReadLine();
            try
            {
                //t.Abort();
            }
            catch (Exception ex)
            {
                Console.WriteLine("LoginDeal:" + ex.Message);
            }
            try
            {
                //t2.Abort();
            }
            catch (Exception ex)
            {
                Console.WriteLine("UploadDeal:" + ex.Message);
            }
        }

        private static void InitStrategy()
        {
            Console.WriteLine($"Begin Read CMSMsgStrategy.dll");
            strategyDic.Clear();
            var strategyList = new ConfigInfoHelper().GetStrategyObjects();
            if (System.IO.File.Exists("CMSMsgStrategy.dll") == false)
            {
                Console.WriteLine($"CMSMsgStrategy.dll not found!");
                return;
            }
            Assembly asm = Assembly.Load("CMSMsgStrategy");//.GetExecutingAssembly();
            foreach(var strategy in strategyList)
            {
                if (strategy.method == "Execute")
                {
                    object[] parameters = new object[2];
                    parameters[0] = strategy.topic;
                    parameters[1] = logger;
                    Object obj2 = asm.CreateInstance(strategy.class_name, true, BindingFlags.Default, null, parameters, null, null);
                    Strategy sy = obj2 as Strategy;
                    if(false == strategyDic.ContainsKey(strategy.topic))
                    {
                        strategyDic.Add(strategy.topic, sy);
                        Console.WriteLine($"Read CMSMsgStrategy.dll {strategy.topic}");
                    }
                }
            }
            Console.WriteLine($"End Read CMSMsgStrategy.dll");
        }

        public static async void Stop()
        {
            if(_mqttClient != null)
            {
                if (_mqttClient.IsConnected)
                {
                    if (null != _mqttClient)
                    {
                        await _mqttClient.DisconnectAsync();
                        _mqttClient = null;
                    }
                }
            }
        }

        private static async void MqttClient(MQConfig config)
        {
            try
            {
                var options = new MqttClientOptions() { ClientId = Guid.NewGuid().ToString("D") };
                options.ChannelOptions = new MqttClientTcpOptions()
                {
                    Server = config.HostName,
                    Port = Convert.ToInt32(config.Port)
                };
                options.Credentials = new MqttClientCredentials(config.UserName, Encoding.UTF8.GetBytes(config.Password));

                options.CleanSession = true;
                //options.KeepAlivePeriod = TimeSpan.FromSeconds(100.5);

                if (null != _mqttClient)
                {
                    await _mqttClient.DisconnectAsync();
                    _mqttClient = null;
                }
                _mqttClient = new MqttFactory().CreateMqttClient() as MqttClient;

                _mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync;

                //_mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync;

                //_mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync;

                //_mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("amq.topic.PartOffline")
                //             .WithQualityOfServiceLevel(
                //                 (MqttQualityOfServiceLevel)
                //                     Enum.Parse(typeof(MqttQualityOfServiceLevel), "0")).Build());
                

                var res = await _mqttClient.ConnectAsync(options);
                
                if (_mqttClient.IsConnected)
                {
                    //await _mqttClient.SubscribeAsync("amq/topic/PartOffLine", MqttQualityOfServiceLevel.AtMostOnce);
                    MqttClientSubscribeOptions options2 = new MqttClientSubscribeOptions();
                    
                    foreach(var keyValue in strategyDic)
                    {
                        MqttTopicFilterBuilder mqttTopicFilterBuilder = new MqttTopicFilterBuilder();
                        var topicFilter = mqttTopicFilterBuilder.WithTopic(keyValue.Key).WithAtMostOnceQoS().Build();
                        options2.TopicFilters.Add(topicFilter);
                    }
                    
                    await _mqttClient.SubscribeAsync(options2);
                }
            }
            catch (Exception ex)
            {
                logger.Error(ex.Message);
                throw;
            }
        }

        private static Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
        {
            logger.Info($"ClientID:{arg.ClientId} | TOPIC:{arg.ApplicationMessage.Topic} | Payload:{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)} | QoS:{arg.ApplicationMessage.QualityOfServiceLevel} | Retain:{arg.ApplicationMessage.Retain}");
            Console.WriteLine($"ClientID:{arg.ClientId} | TOPIC:{arg.ApplicationMessage.Topic} | Payload:{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)} | QoS:{arg.ApplicationMessage.QualityOfServiceLevel} | Retain:{arg.ApplicationMessage.Retain}");
            if (strategyDic.ContainsKey(arg.ApplicationMessage.Topic))
            {
                var strategy = strategyDic[arg.ApplicationMessage.Topic];
                try
                {
                    strategy.Execute(Encoding.UTF8.GetString(arg.ApplicationMessage.Payload));
                }catch(Exception ex)
                {
                    logger.Error($"Execute Strategy {strategy.GetType().FullName} Error:{arg.ApplicationMessage.Topic},{arg.ApplicationMessage.Payload}", ex);
                }
            }
            return null;
        }

        private static Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
        {
            logger.Info($"Client is Connected:  IsSessionPresent:{arg.ConnectResult.IsSessionPresent}");
            return null;
        }

        private static Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
        {
            logger.Info($"Client is DisConnected ClientWasConnected:{arg.ClientWasConnected}");
            return null;
        }

        
    }

2、主函数

internal static class Program
    {
        /// <summary>
        /// 应用程序的主入口点。
        /// </summary>
        static void Main()
        {
            log4net.Config.XmlConfigurator.ConfigureAndWatch(new FileInfo(Environment.CurrentDirectory + "/Log4Net.config"));
#if DEBUG
            Subscriber.Run();

#else
            ServiceBase[] ServicesToRun;
            ServicesToRun = new ServiceBase[]
            {
                new Service1()
            };
            ServiceBase.Run(ServicesToRun);
#endif
        }
    }

2.1、配置文件

App.config

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
    
	<configSections>
		<section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net" />
	</configSections>
	<appSettings>
		<add key="CMMQ" value="host=192.168.100.101;virtualHost=/;username=admin;password=admin@pwd~!;port=1883;"/>
		<add key="Strategy" value="Strategy.config"/>
	</appSettings>
	<startup>
		<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2" />
	</startup>
</configuration>

Configuration.xml

<?xml version="1.0" encoding="utf-8"?>
<strategy_definition>
	<strategys id="1" topic="amq/topic/PartOffLine" class_name="CMSMsgStrategy.AssyPartOffLineStrategy" method="Execute"/>
	<strategys id="2" topic="amq/topic/OrderOnLine" class_name="CMSMsgStrategy.QMSInspectionOrderStrategy" method="Execute"/>
	<strategys id="3" topic="amq/topic/OrderOffLine" class_name="" method=""/>
	<strategys id="4" topic="amq/topic/Downtime/On" class_name="CMSMsgStrategy.EquipmentDowntimeStrategy" method="Execute"/>
</strategy_definition>

3、辅助类(Model)

public class MQConfig
    {
        public string HostName;
        public string UserName;
        public string Password;
        public string Port = "1883";
        public string VirtualHost;
    }
    
 public class OrderOnLine
    {
        public string line_ent_name { get;set; }
        public DateTime Timestamp { get; set; }
        public string mes_order_no { get; set; }
        public string cus_info { get; set; }
        public string part_no { get; set; }
    }
    
public abstract class Strategy
    {
        public string Topic { get; set; }
        public DateTime StartTime { get; set; }

        public DateTime EndTime { get; set; }

        public virtual void Execute(string payload)
        {
            StartTime = DateTime.Now.ToLocalTime();

            EndTime = DateTime.Now.ToLocalTime();
        }
    }

4、业务类(Strategy接口实现)

编译为程序集 CMSMsgStrategy.dll并拷贝到主程序bin目录

public class AssyPartOffLineStrategy : Strategy
    {
        private log4net.ILog _logger;
        public AssyPartOffLineStrategy(string topic, log4net.ILog logger)
        {
            Topic = topic;
            _logger = logger;
        }

        public override void Execute(string payload)
        {
            _logger.Info($"Begin Execute Strategy: AssyPartOffLineStrategy->payload: {payload}");
            PartOffLine pol = Newtonsoft.Json.JsonConvert.DeserializeObject<PartOffLine>(payload);
            //解包 判断产线
            //调用刷新在线工单的url
            string uri = "http://localhost:7110/MESReportService.svc/SaveMesWOToCacheTable";
            string jsonDate = string.Format("/Date({0})/", pol.Timestamp.ToChinaTimeStamp(true));
            string content = "{\"strLineName\":\"" + pol.line_ent_name + "\",\"specifiedDate\":\"" + jsonDate + "\",\"pageNum\":0,\"pageSize\":0}";
            HttpHelper.PostMethodRawJson(uri, content, null);
            _logger.Info($"End Execute Strategy: AssyPartOffLineStrategy");
        }
    }
上一篇:通过WCF 上传图片
下一篇:没有了
网友评论