0、业务需求 公司部署了RabbitMq消息队列并且开启Mqtt协议,我们的SCADA平台也可以通过Mqtt协议发送业务报文。那么需要编写客户端根据具体报文执行相应的操作,如工单上线时切换ESOP,
0、业务需求
公司部署了RabbitMq消息队列并且开启Mqtt协议,我们的SCADA平台也可以通过Mqtt协议发送业务报文。那么需要编写客户端根据具体报文执行相应的操作,如工单上线时切换ESOP,成品下线时计算产量、OEE等。
每个业务都写一个监听不合适,全部写在一起也不合适,那么就使用配置文件配置各个业务处理类,反射方式加载业务程序集的方式来实现高内聚低耦合。
1、消息订阅类
Subscriber类创建mqtt客户,加载策略注册topic,加载业务dll 匹配策略。
public class Subscriber
{
private static string connStr = System.Configuration.ConfigurationManager.AppSettings["CMMQ"];
private static IMqttClient _mqttClient;
private static log4net.ILog logger = log4net.LogManager.GetLogger("InfoLog");
private static Dictionary<string, Strategy> strategyDic = new Dictionary<string, Strategy>();
public static void Run()
{
//1)读取策略文件解析topic 和处理类
//2)初始化mqttclient 并绑定topic
//3)接收数据时处理根据topic 调用处理类
InitStrategy();
MQConfig mqConfig = new ConfigInfoHelper().GetMQConfig(connStr);
MqttClient(mqConfig);
//Thread t = new Thread(() => MqttClient(mqConfig));
//t.Start();
Console.WriteLine("Subscribe Listening for message.Hit <Enter> to quit.");
Console.ReadLine();
try
{
//t.Abort();
}
catch (Exception ex)
{
Console.WriteLine("LoginDeal:" + ex.Message);
}
try
{
//t2.Abort();
}
catch (Exception ex)
{
Console.WriteLine("UploadDeal:" + ex.Message);
}
}
private static void InitStrategy()
{
Console.WriteLine($"Begin Read CMSMsgStrategy.dll");
strategyDic.Clear();
var strategyList = new ConfigInfoHelper().GetStrategyObjects();
if (System.IO.File.Exists("CMSMsgStrategy.dll") == false)
{
Console.WriteLine($"CMSMsgStrategy.dll not found!");
return;
}
Assembly asm = Assembly.Load("CMSMsgStrategy");//.GetExecutingAssembly();
foreach(var strategy in strategyList)
{
if (strategy.method == "Execute")
{
object[] parameters = new object[2];
parameters[0] = strategy.topic;
parameters[1] = logger;
Object obj2 = asm.CreateInstance(strategy.class_name, true, BindingFlags.Default, null, parameters, null, null);
Strategy sy = obj2 as Strategy;
if(false == strategyDic.ContainsKey(strategy.topic))
{
strategyDic.Add(strategy.topic, sy);
Console.WriteLine($"Read CMSMsgStrategy.dll {strategy.topic}");
}
}
}
Console.WriteLine($"End Read CMSMsgStrategy.dll");
}
public static async void Stop()
{
if(_mqttClient != null)
{
if (_mqttClient.IsConnected)
{
if (null != _mqttClient)
{
await _mqttClient.DisconnectAsync();
_mqttClient = null;
}
}
}
}
private static async void MqttClient(MQConfig config)
{
try
{
var options = new MqttClientOptions() { ClientId = Guid.NewGuid().ToString("D") };
options.ChannelOptions = new MqttClientTcpOptions()
{
Server = config.HostName,
Port = Convert.ToInt32(config.Port)
};
options.Credentials = new MqttClientCredentials(config.UserName, Encoding.UTF8.GetBytes(config.Password));
options.CleanSession = true;
//options.KeepAlivePeriod = TimeSpan.FromSeconds(100.5);
if (null != _mqttClient)
{
await _mqttClient.DisconnectAsync();
_mqttClient = null;
}
_mqttClient = new MqttFactory().CreateMqttClient() as MqttClient;
_mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync;
//_mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync;
//_mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync;
//_mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("amq.topic.PartOffline")
// .WithQualityOfServiceLevel(
// (MqttQualityOfServiceLevel)
// Enum.Parse(typeof(MqttQualityOfServiceLevel), "0")).Build());
var res = await _mqttClient.ConnectAsync(options);
if (_mqttClient.IsConnected)
{
//await _mqttClient.SubscribeAsync("amq/topic/PartOffLine", MqttQualityOfServiceLevel.AtMostOnce);
MqttClientSubscribeOptions options2 = new MqttClientSubscribeOptions();
foreach(var keyValue in strategyDic)
{
MqttTopicFilterBuilder mqttTopicFilterBuilder = new MqttTopicFilterBuilder();
var topicFilter = mqttTopicFilterBuilder.WithTopic(keyValue.Key).WithAtMostOnceQoS().Build();
options2.TopicFilters.Add(topicFilter);
}
await _mqttClient.SubscribeAsync(options2);
}
}
catch (Exception ex)
{
logger.Error(ex.Message);
throw;
}
}
private static Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
logger.Info($"ClientID:{arg.ClientId} | TOPIC:{arg.ApplicationMessage.Topic} | Payload:{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)} | QoS:{arg.ApplicationMessage.QualityOfServiceLevel} | Retain:{arg.ApplicationMessage.Retain}");
Console.WriteLine($"ClientID:{arg.ClientId} | TOPIC:{arg.ApplicationMessage.Topic} | Payload:{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)} | QoS:{arg.ApplicationMessage.QualityOfServiceLevel} | Retain:{arg.ApplicationMessage.Retain}");
if (strategyDic.ContainsKey(arg.ApplicationMessage.Topic))
{
var strategy = strategyDic[arg.ApplicationMessage.Topic];
try
{
strategy.Execute(Encoding.UTF8.GetString(arg.ApplicationMessage.Payload));
}catch(Exception ex)
{
logger.Error($"Execute Strategy {strategy.GetType().FullName} Error:{arg.ApplicationMessage.Topic},{arg.ApplicationMessage.Payload}", ex);
}
}
return null;
}
private static Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
logger.Info($"Client is Connected: IsSessionPresent:{arg.ConnectResult.IsSessionPresent}");
return null;
}
private static Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
logger.Info($"Client is DisConnected ClientWasConnected:{arg.ClientWasConnected}");
return null;
}
}
2、主函数
internal static class Program
{
/// <summary>
/// 应用程序的主入口点。
/// </summary>
static void Main()
{
log4net.Config.XmlConfigurator.ConfigureAndWatch(new FileInfo(Environment.CurrentDirectory + "/Log4Net.config"));
#if DEBUG
Subscriber.Run();
#else
ServiceBase[] ServicesToRun;
ServicesToRun = new ServiceBase[]
{
new Service1()
};
ServiceBase.Run(ServicesToRun);
#endif
}
}
2.1、配置文件
App.config
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<configSections>
<section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net" />
</configSections>
<appSettings>
<add key="CMMQ" value="host=192.168.100.101;virtualHost=/;username=admin;password=admin@pwd~!;port=1883;"/>
<add key="Strategy" value="Strategy.config"/>
</appSettings>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2" />
</startup>
</configuration>
Configuration.xml
<?xml version="1.0" encoding="utf-8"?>
<strategy_definition>
<strategys id="1" topic="amq/topic/PartOffLine" class_name="CMSMsgStrategy.AssyPartOffLineStrategy" method="Execute"/>
<strategys id="2" topic="amq/topic/OrderOnLine" class_name="CMSMsgStrategy.QMSInspectionOrderStrategy" method="Execute"/>
<strategys id="3" topic="amq/topic/OrderOffLine" class_name="" method=""/>
<strategys id="4" topic="amq/topic/Downtime/On" class_name="CMSMsgStrategy.EquipmentDowntimeStrategy" method="Execute"/>
</strategy_definition>
3、辅助类(Model)
public class MQConfig
{
public string HostName;
public string UserName;
public string Password;
public string Port = "1883";
public string VirtualHost;
}
public class OrderOnLine
{
public string line_ent_name { get;set; }
public DateTime Timestamp { get; set; }
public string mes_order_no { get; set; }
public string cus_info { get; set; }
public string part_no { get; set; }
}
public abstract class Strategy
{
public string Topic { get; set; }
public DateTime StartTime { get; set; }
public DateTime EndTime { get; set; }
public virtual void Execute(string payload)
{
StartTime = DateTime.Now.ToLocalTime();
EndTime = DateTime.Now.ToLocalTime();
}
}
4、业务类(Strategy接口实现)
编译为程序集 CMSMsgStrategy.dll并拷贝到主程序bin目录
public class AssyPartOffLineStrategy : Strategy
{
private log4net.ILog _logger;
public AssyPartOffLineStrategy(string topic, log4net.ILog logger)
{
Topic = topic;
_logger = logger;
}
public override void Execute(string payload)
{
_logger.Info($"Begin Execute Strategy: AssyPartOffLineStrategy->payload: {payload}");
PartOffLine pol = Newtonsoft.Json.JsonConvert.DeserializeObject<PartOffLine>(payload);
//解包 判断产线
//调用刷新在线工单的url
string uri = "http://localhost:7110/MESReportService.svc/SaveMesWOToCacheTable";
string jsonDate = string.Format("/Date({0})/", pol.Timestamp.ToChinaTimeStamp(true));
string content = "{\"strLineName\":\"" + pol.line_ent_name + "\",\"specifiedDate\":\"" + jsonDate + "\",\"pageNum\":0,\"pageSize\":0}";
HttpHelper.PostMethodRawJson(uri, content, null);
_logger.Info($"End Execute Strategy: AssyPartOffLineStrategy");
}
}