ZbusConfig.java package com.accenture.icc.zbus.config;import java.io.IOException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map;import org.apache.commons.lang3.tim
package com.accenture.icc.zbus.config;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.zbus.broker.Broker;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.mq.Consumer.ConsumerHandler;
import org.zbus.mq.MqAdmin;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;
import com.accenture.icc.pojo.AnalogInputData;
import com.accenture.icc.pojo.DataUnit;
import com.accenture.icc.pojo.TripleDataWrapper;
@Configuration
@PropertySource("classpath:data.properties")
public class ZbusConfig {
private static final Logger logger = LoggerFactory.getLogger(ZbusConfig.class);
@Value("${zbus.mq.sub}")
private String subMq;
@Value("${zbus.mq.recv}")
private String recvMq;
@Value("${zbus.mq.alarm}")
private String alarmMq;
@Value("${zbus.mq.subCommonData}")
private String subIndexDataMq;
@Value("${zbus.mq.recvCommonData}")
private String indexDataMq;
@Value("${csv.power.maxsize}")
private int powerMaxListSize;
@Value("${csv.power.period}")
private int powerPeriod;
@Value("${csv.consumption.maxsize}")
private int consumptionMaxListSize;
@Value("${csv.consumption.period}")
private int consumptionPeriod;
@Autowired
private TripleDataWrapper powerWrapper;
@Autowired
private TripleDataWrapper consumptionWrapper;
@Autowired
private TripleDataWrapper transformerWrapper;
@Autowired
private TripleDataWrapper powerFactorWrapper;
@Autowired
private TripleDataWrapper unbalanceWrapper;
@Autowired
private TripleDataWrapper stationParamWrapper;
/**
* 回调函数
* @param messaging
* @return
*/
@Bean
public ConsumerHandler consumerHandler(SimpMessageSendingOperations messaging) {
ConsumerHandler consumerHandler = new ConsumerHandler() {
@Override
public void handle(Message msg, Consumer consumer) throws IOException {
logger.info("RECEIVING MESSAGE: {}", msg.getBodyString());
String[] fields = msg.getBodyString().split(",");
if(fields.length < 5){
return;
}
String tag = fields[0];
int tableId = 0;
int recordId = 0;
int fieldId = 0;
double value = 0;
String timeStamp = "";
try{
tableId = Integer.parseInt(fields[1]);
recordId = Integer.parseInt(fields[2]);
fieldId = Integer.parseInt(fields[3]);
value = Double.parseDouble(fields[4]);
if(value < 0.01 || value == 128) {
value = 0;
}
timeStamp = fields[6];
} catch (NumberFormatException e) {
logger.info("message format error.");
return;
}
AnalogInputData analogInputData = new AnalogInputData(tag, tableId, recordId, fieldId, value, timeStamp);
if(powerWrapper.containsRecord(recordId)){
List
datalist = powerWrapper.getDataListByRecordid(recordId);
if(datalist.size() == powerMaxListSize) {
datalist.remove(0);
}
datalist.add(analogInputData);
}
if(consumptionWrapper.containsRecord(recordId)){
List
datalist = consumptionWrapper.getDataListByRecordid(recordId); if(datalist.size() == consumptionMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(transformerWrapper.containsRecord(recordId)) { List
datalist = transformerWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(powerFactorWrapper.containsRecord(recordId)) { List
datalist = powerFactorWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(unbalanceWrapper.containsRecord(recordId)) { List
datalist = unbalanceWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } broadcastAnalog(recordId, messaging); } }; return consumerHandler; } @Bean public ConsumerHandler consumerHandlerAlarm(SimpMessageSendingOperations messaging) { ConsumerHandler consumerHandlerAlarm = new ConsumerHandler() { @Override public void handle(Message msg, Consumer consumer2) throws IOException { logger.info("RECEIVING MESSAGE: {}", msg.getBodyString()); String strMsg = msg.getBodyString(); List
alarmList = powerWrapper.getAlarmList(); if(alarmList.size() == consumptionMaxListSize) { alarmList.remove(0); } alarmList.add(strMsg); broadcastAlarm(messaging); } }; return consumerHandlerAlarm; } /** * 从zbus获取站点指标数据 * @param messaging * @return */ @Bean public ConsumerHandler consumerHandlerIndexData(SimpMessageSendingOperations messaging){ ConsumerHandler consumerHandlerIndexData = new ConsumerHandler() { @Override public void handle(Message msg, Consumer consumerIndexData) throws IOException { String dataMsg = msg.getBodyString(); //获取站内首页数据 List
indexDataList = powerWrapper.getIndexDataList(); if (indexDataList.size() == consumptionMaxListSize) { indexDataList.remove(0); } indexDataList.add(dataMsg); broadcastIndexData( messaging); } }; return consumerHandlerIndexData; } private void broadcastIndexData(SimpMessageSendingOperations messaging){ Map
resMap = new HashMap<>(); resMap.put("status", 1000); List
indexDataList = powerWrapper.getIndexDataList(); resMap.put("indeDataList", indexDataList); String destination = "/topic/indexData/"; messaging.convertAndSend(destination, resMap); } private void broadcastAnalog(int recordId, SimpMessageSendingOperations messaging) { Map
resMap = new HashMap<>(); resMap.put("status", 1000); int groupid = -1; int factoryid = -1; List
dataUnits = null; String destination = ""; if(powerWrapper.containsRecord(recordId)){ groupid = powerWrapper.getGroupIdByRecord(recordId); dataUnits = powerWrapper.getDataUnitsByGroupid(groupid); factoryid = powerWrapper.getFactoryIdByRecord(recordId); destination = "/topic/analogs/"+factoryid+"/1"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } if(consumptionWrapper.containsRecord(recordId)){ groupid = consumptionWrapper.getGroupIdByRecord(recordId); dataUnits = consumptionWrapper.getDataUnitsByGroupid(groupid); factoryid = consumptionWrapper.getFactoryIdByRecord(recordId); destination = "/topic/consumption/"+factoryid+"/1"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } if(transformerWrapper.containsRecord(recordId)) { groupid = transformerWrapper.getGroupIdByRecord(recordId); dataUnits = transformerWrapper.getDataUnitsByGroupid(groupid); factoryid = transformerWrapper.getFactoryIdByRecord(recordId); destination = "/topic/analogs/"+factoryid+"/2"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } if(powerFactorWrapper.containsRecord(recordId)) { groupid = powerFactorWrapper.getGroupIdByRecord(recordId); dataUnits = powerFactorWrapper.getDataUnitsByGroupid(groupid); factoryid = powerFactorWrapper.getFactoryIdByRecord(recordId); destination = "/topic/analogs/"+factoryid+"/3"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } if(unbalanceWrapper.containsRecord(recordId)) { groupid = unbalanceWrapper.getGroupIdByRecord(recordId); dataUnits = unbalanceWrapper.getDataUnitsByGroupid(groupid); factoryid = unbalanceWrapper.getFactoryIdByRecord(recordId); destination = "/topic/analogs/"+factoryid+"/4"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } } private void broadcastAlarm(SimpMessageSendingOperations messaging) { Map
resMap = new HashMap<>(); resMap.put("status", 1000); List
alarmList = powerWrapper.getAlarmList(); resMap.put("alarmList", alarmList); String destination = "/topic/alarm/"; messaging.convertAndSend(destination, resMap); } /** * @return * @throws IOException */ @Bean(destroyMethod="close") public Broker broker(@Value("${zbus.address}") String zbusAddress) throws IOException { Broker broker = new ZbusBroker(zbusAddress); return broker; } /** * @param consumerHandler * @return * @throws IOException * @throws InterruptedException */ @Bean(initMethod="start") public Consumer zbusConsumer(Broker broker, ConsumerHandler consumerHandler) throws IOException, InterruptedException { MqAdmin mqAdmin = new MqAdmin(broker, recvMq); mqAdmin.removeMQ(); Consumer consumer = new Consumer(broker, recvMq); consumer.onMessage(consumerHandler); return consumer; } @Bean(initMethod="start") public Consumer zbusConsumerAlarm(Broker broker, ConsumerHandler consumerHandlerAlarm) throws IOException, InterruptedException { MqAdmin mqAdminAlarm = new MqAdmin(broker, alarmMq); mqAdminAlarm.removeMQ(); Consumer consumerAlarm = new Consumer(broker, alarmMq); consumerAlarm.onMessage(consumerHandlerAlarm); return consumerAlarm; } @Bean(initMethod="start") public Consumer zbusConsumerIndexData(Broker broker, ConsumerHandler consumerHandlerIndexData) throws IOException, InterruptedException{ MqAdmin mqAdmin2 = new MqAdmin(broker, indexDataMq); mqAdmin2.removeMQ(); Consumer consumerIndexData = new Consumer(broker, indexDataMq); consumerIndexData.onMessage(consumerHandlerIndexData); return consumerIndexData; } /** * @return * @throws IOException * @throws InterruptedException */ @Bean public Producer zbusProducer(Broker broker) throws IOException, InterruptedException { List
powerRecords = powerWrapper.getRecordIds(); List
consumptionRecords = consumptionWrapper.getRecordIds(); List
transformerRecords = transformerWrapper.getRecordIds(); List
stationParamRecords = stationParamWrapper.getRecordIds(); Producer producer = new Producer(broker, subMq); producer.createMQ(); subscribeData(producer, powerRecords, powerPeriod); subscribeData(producer, consumptionRecords, consumptionPeriod); subscribeData(producer, transformerRecords, powerPeriod); producer.setMq(subIndexDataMq); producer.createMQ(); subscribeData(producer, stationParamRecords, powerPeriod); return producer; } /** * @param producer * @param records * @param period * @throws IOException * @throws InterruptedException */ private void subscribeData(Producer producer, List
records, int period) throws IOException, InterruptedException { for(Integer recordid : records) { long periodMillis = period * 60000; String msgbody = String.format("sub:%d,101,%d,7,%d", recordid, recordid, periodMillis); Message message = new Message(); message.setBody(msgbody); message = producer.sendSync(message); } } @Bean public static PropertySourcesPlaceholderConfigurer propertyConfigInDev() { return new PropertySourcesPlaceholderConfigurer(); } }
