ZbusConfig.java package com.accenture.icc.zbus.config;import java.io.IOException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map;import org.apache.commons.lang3.tim
package com.accenture.icc.zbus.config; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.lang3.time.DateFormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; import org.springframework.messaging.simp.SimpMessageSendingOperations; import org.zbus.broker.Broker; import org.zbus.broker.ZbusBroker; import org.zbus.mq.Consumer; import org.zbus.mq.Consumer.ConsumerHandler; import org.zbus.mq.MqAdmin; import org.zbus.mq.Producer; import org.zbus.net.http.Message; import com.accenture.icc.pojo.AnalogInputData; import com.accenture.icc.pojo.DataUnit; import com.accenture.icc.pojo.TripleDataWrapper; @Configuration @PropertySource("classpath:data.properties") public class ZbusConfig { private static final Logger logger = LoggerFactory.getLogger(ZbusConfig.class); @Value("${zbus.mq.sub}") private String subMq; @Value("${zbus.mq.recv}") private String recvMq; @Value("${zbus.mq.alarm}") private String alarmMq; @Value("${zbus.mq.subCommonData}") private String subIndexDataMq; @Value("${zbus.mq.recvCommonData}") private String indexDataMq; @Value("${csv.power.maxsize}") private int powerMaxListSize; @Value("${csv.power.period}") private int powerPeriod; @Value("${csv.consumption.maxsize}") private int consumptionMaxListSize; @Value("${csv.consumption.period}") private int consumptionPeriod; @Autowired private TripleDataWrapper powerWrapper; @Autowired private TripleDataWrapper consumptionWrapper; @Autowired private TripleDataWrapper transformerWrapper; @Autowired private TripleDataWrapper powerFactorWrapper; @Autowired private TripleDataWrapper unbalanceWrapper; @Autowired private TripleDataWrapper stationParamWrapper; /** * 回调函数 * @param messaging * @return */ @Bean public ConsumerHandler consumerHandler(SimpMessageSendingOperations messaging) { ConsumerHandler consumerHandler = new ConsumerHandler() { @Override public void handle(Message msg, Consumer consumer) throws IOException { logger.info("RECEIVING MESSAGE: {}", msg.getBodyString()); String[] fields = msg.getBodyString().split(","); if(fields.length < 5){ return; } String tag = fields[0]; int tableId = 0; int recordId = 0; int fieldId = 0; double value = 0; String timeStamp = ""; try{ tableId = Integer.parseInt(fields[1]); recordId = Integer.parseInt(fields[2]); fieldId = Integer.parseInt(fields[3]); value = Double.parseDouble(fields[4]); if(value < 0.01 || value == 128) { value = 0; } timeStamp = fields[6]; } catch (NumberFormatException e) { logger.info("message format error."); return; } AnalogInputData analogInputData = new AnalogInputData(tag, tableId, recordId, fieldId, value, timeStamp); if(powerWrapper.containsRecord(recordId)){ List datalist = powerWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(consumptionWrapper.containsRecord(recordId)){ List datalist = consumptionWrapper.getDataListByRecordid(recordId); if(datalist.size() == consumptionMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(transformerWrapper.containsRecord(recordId)) { List datalist = transformerWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(powerFactorWrapper.containsRecord(recordId)) { List datalist = powerFactorWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(unbalanceWrapper.containsRecord(recordId)) { List datalist = unbalanceWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } broadcastAnalog(recordId, messaging); } }; return consumerHandler; } @Bean public ConsumerHandler consumerHandlerAlarm(SimpMessageSendingOperations messaging) { ConsumerHandler consumerHandlerAlarm = new ConsumerHandler() { @Override public void handle(Message msg, Consumer consumer2) throws IOException { logger.info("RECEIVING MESSAGE: {}", msg.getBodyString()); String strMsg = msg.getBodyString(); ListalarmList = powerWrapper.getAlarmList(); if(alarmList.size() == consumptionMaxListSize) { alarmList.remove(0); } alarmList.add(strMsg); broadcastAlarm(messaging); } }; return consumerHandlerAlarm; } /** * 从zbus获取站点指标数据 * @param messaging * @return */ @Bean public ConsumerHandler consumerHandlerIndexData(SimpMessageSendingOperations messaging){ ConsumerHandler consumerHandlerIndexData = new ConsumerHandler() { @Override public void handle(Message msg, Consumer consumerIndexData) throws IOException { String dataMsg = msg.getBodyString(); //获取站内首页数据 List indexDataList = powerWrapper.getIndexDataList(); if (indexDataList.size() == consumptionMaxListSize) { indexDataList.remove(0); } indexDataList.add(dataMsg); broadcastIndexData( messaging); } }; return consumerHandlerIndexData; } private void broadcastIndexData(SimpMessageSendingOperations messaging){ Map resMap = new HashMap<>(); resMap.put("status", 1000); List indexDataList = powerWrapper.getIndexDataList(); resMap.put("indeDataList", indexDataList); String destination = "/topic/indexData/"; messaging.convertAndSend(destination, resMap); } private void broadcastAnalog(int recordId, SimpMessageSendingOperations messaging) { Map resMap = new HashMap<>(); resMap.put("status", 1000); int groupid = -1; int factoryid = -1; List dataUnits = null; String destination = ""; if(powerWrapper.containsRecord(recordId)){ groupid = powerWrapper.getGroupIdByRecord(recordId); dataUnits = powerWrapper.getDataUnitsByGroupid(groupid); factoryid = powerWrapper.getFactoryIdByRecord(recordId); destination = "/topic/analogs/"+factoryid+"/1"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } if(consumptionWrapper.containsRecord(recordId)){ groupid = consumptionWrapper.getGroupIdByRecord(recordId); dataUnits = consumptionWrapper.getDataUnitsByGroupid(groupid); factoryid = consumptionWrapper.getFactoryIdByRecord(recordId); destination = "/topic/consumption/"+factoryid+"/1"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } if(transformerWrapper.containsRecord(recordId)) { groupid = transformerWrapper.getGroupIdByRecord(recordId); dataUnits = transformerWrapper.getDataUnitsByGroupid(groupid); factoryid = transformerWrapper.getFactoryIdByRecord(recordId); destination = "/topic/analogs/"+factoryid+"/2"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } if(powerFactorWrapper.containsRecord(recordId)) { groupid = powerFactorWrapper.getGroupIdByRecord(recordId); dataUnits = powerFactorWrapper.getDataUnitsByGroupid(groupid); factoryid = powerFactorWrapper.getFactoryIdByRecord(recordId); destination = "/topic/analogs/"+factoryid+"/3"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } if(unbalanceWrapper.containsRecord(recordId)) { groupid = unbalanceWrapper.getGroupIdByRecord(recordId); dataUnits = unbalanceWrapper.getDataUnitsByGroupid(groupid); factoryid = unbalanceWrapper.getFactoryIdByRecord(recordId); destination = "/topic/analogs/"+factoryid+"/4"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } } private void broadcastAlarm(SimpMessageSendingOperations messaging) { Map resMap = new HashMap<>(); resMap.put("status", 1000); List alarmList = powerWrapper.getAlarmList(); resMap.put("alarmList", alarmList); String destination = "/topic/alarm/"; messaging.convertAndSend(destination, resMap); } /** * @return * @throws IOException */ @Bean(destroyMethod="close") public Broker broker(@Value("${zbus.address}") String zbusAddress) throws IOException { Broker broker = new ZbusBroker(zbusAddress); return broker; } /** * @param consumerHandler * @return * @throws IOException * @throws InterruptedException */ @Bean(initMethod="start") public Consumer zbusConsumer(Broker broker, ConsumerHandler consumerHandler) throws IOException, InterruptedException { MqAdmin mqAdmin = new MqAdmin(broker, recvMq); mqAdmin.removeMQ(); Consumer consumer = new Consumer(broker, recvMq); consumer.onMessage(consumerHandler); return consumer; } @Bean(initMethod="start") public Consumer zbusConsumerAlarm(Broker broker, ConsumerHandler consumerHandlerAlarm) throws IOException, InterruptedException { MqAdmin mqAdminAlarm = new MqAdmin(broker, alarmMq); mqAdminAlarm.removeMQ(); Consumer consumerAlarm = new Consumer(broker, alarmMq); consumerAlarm.onMessage(consumerHandlerAlarm); return consumerAlarm; } @Bean(initMethod="start") public Consumer zbusConsumerIndexData(Broker broker, ConsumerHandler consumerHandlerIndexData) throws IOException, InterruptedException{ MqAdmin mqAdmin2 = new MqAdmin(broker, indexDataMq); mqAdmin2.removeMQ(); Consumer consumerIndexData = new Consumer(broker, indexDataMq); consumerIndexData.onMessage(consumerHandlerIndexData); return consumerIndexData; } /** * @return * @throws IOException * @throws InterruptedException */ @Bean public Producer zbusProducer(Broker broker) throws IOException, InterruptedException { List powerRecords = powerWrapper.getRecordIds(); List consumptionRecords = consumptionWrapper.getRecordIds(); List transformerRecords = transformerWrapper.getRecordIds(); List stationParamRecords = stationParamWrapper.getRecordIds(); Producer producer = new Producer(broker, subMq); producer.createMQ(); subscribeData(producer, powerRecords, powerPeriod); subscribeData(producer, consumptionRecords, consumptionPeriod); subscribeData(producer, transformerRecords, powerPeriod); producer.setMq(subIndexDataMq); producer.createMQ(); subscribeData(producer, stationParamRecords, powerPeriod); return producer; } /** * @param producer * @param records * @param period * @throws IOException * @throws InterruptedException */ private void subscribeData(Producer producer, List records, int period) throws IOException, InterruptedException { for(Integer recordid : records) { long periodMillis = period * 60000; String msgbody = String.format("sub:%d,101,%d,7,%d", recordid, recordid, periodMillis); Message message = new Message(); message.setBody(msgbody); message = producer.sendSync(message); } } @Bean public static PropertySourcesPlaceholderConfigurer propertyConfigInDev() { return new PropertySourcesPlaceholderConfigurer(); } }