当前位置 : 主页 > 编程语言 > c++ >

ZBus的创建、启动、生产、回调

来源:互联网 收集:自由互联 发布时间:2021-06-30
ZbusConfig.java package com.accenture.icc.zbus.config;import java.io.IOException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map;import org.apache.commons.lang3.tim
ZbusConfig.java
package com.accenture.icc.zbus.config;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.zbus.broker.Broker;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.mq.Consumer.ConsumerHandler;
import org.zbus.mq.MqAdmin;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;

import com.accenture.icc.pojo.AnalogInputData;
import com.accenture.icc.pojo.DataUnit;
import com.accenture.icc.pojo.TripleDataWrapper;

@Configuration
@PropertySource("classpath:data.properties")
public class ZbusConfig {
	
	private static final Logger logger = LoggerFactory.getLogger(ZbusConfig.class);
	
	@Value("${zbus.mq.sub}")
	private String subMq;
	
	@Value("${zbus.mq.recv}")
	private String recvMq;
	
	@Value("${zbus.mq.alarm}")
	private String alarmMq;
	
	@Value("${zbus.mq.subCommonData}")
	private String subIndexDataMq;
	
	@Value("${zbus.mq.recvCommonData}")
	private String indexDataMq;
	
	@Value("${csv.power.maxsize}")
	private int powerMaxListSize;
	@Value("${csv.power.period}")
	private int powerPeriod;
	
	@Value("${csv.consumption.maxsize}")
	private int consumptionMaxListSize;
	@Value("${csv.consumption.period}")
	private int consumptionPeriod;
	
	@Autowired
	private TripleDataWrapper powerWrapper;
	@Autowired
	private TripleDataWrapper consumptionWrapper;
	@Autowired
	private TripleDataWrapper transformerWrapper;
	@Autowired
	private TripleDataWrapper powerFactorWrapper;
	@Autowired
	private TripleDataWrapper unbalanceWrapper;
	@Autowired
	private TripleDataWrapper stationParamWrapper;

	/**
	 * 回调函数
	 * @param messaging
	 * @return
	 */
	@Bean
	public ConsumerHandler consumerHandler(SimpMessageSendingOperations messaging) {
		
		ConsumerHandler consumerHandler = new ConsumerHandler() {
			@Override
			public void handle(Message msg, Consumer consumer) throws IOException {
				logger.info("RECEIVING MESSAGE: {}", msg.getBodyString());
				String[] fields = msg.getBodyString().split(",");
				if(fields.length < 5){
					return;
				}
				String tag = fields[0];
				int tableId = 0;
				int recordId = 0;
				int fieldId = 0;
				double value = 0;
				String timeStamp = "";
				try{
					tableId = Integer.parseInt(fields[1]);
					recordId = Integer.parseInt(fields[2]);
					fieldId = Integer.parseInt(fields[3]);
					value = Double.parseDouble(fields[4]);
					if(value < 0.01 || value == 128) {
						value = 0;
					}
					timeStamp = fields[6];
				} catch (NumberFormatException e) {
					logger.info("message format error.");
					return;
				}
				
				AnalogInputData analogInputData = new AnalogInputData(tag, tableId, recordId, fieldId, value, timeStamp);
				
				if(powerWrapper.containsRecord(recordId)){
					List
 
   datalist = powerWrapper.getDataListByRecordid(recordId);
					if(datalist.size() == powerMaxListSize) {
						datalist.remove(0);
					}
					datalist.add(analogInputData);
				}
				if(consumptionWrapper.containsRecord(recordId)){
					List
  
    datalist = consumptionWrapper.getDataListByRecordid(recordId); if(datalist.size() == consumptionMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(transformerWrapper.containsRecord(recordId)) { List
   
     datalist = transformerWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(powerFactorWrapper.containsRecord(recordId)) { List
    
      datalist = powerFactorWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(unbalanceWrapper.containsRecord(recordId)) { List
     
       datalist = unbalanceWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } broadcastAnalog(recordId, messaging); } }; return consumerHandler; } @Bean public ConsumerHandler consumerHandlerAlarm(SimpMessageSendingOperations messaging) { ConsumerHandler consumerHandlerAlarm = new ConsumerHandler() { @Override public void handle(Message msg, Consumer consumer2) throws IOException { logger.info("RECEIVING MESSAGE: {}", msg.getBodyString()); String strMsg = msg.getBodyString(); List
      
        alarmList = powerWrapper.getAlarmList(); if(alarmList.size() == consumptionMaxListSize) { alarmList.remove(0); } alarmList.add(strMsg); broadcastAlarm(messaging); } }; return consumerHandlerAlarm; } /** * 从zbus获取站点指标数据 * @param messaging * @return */ @Bean public ConsumerHandler consumerHandlerIndexData(SimpMessageSendingOperations messaging){ ConsumerHandler consumerHandlerIndexData = new ConsumerHandler() { @Override public void handle(Message msg, Consumer consumerIndexData) throws IOException { String dataMsg = msg.getBodyString(); //获取站内首页数据 List
       
         indexDataList = powerWrapper.getIndexDataList(); if (indexDataList.size() == consumptionMaxListSize) { indexDataList.remove(0); } indexDataList.add(dataMsg); broadcastIndexData( messaging); } }; return consumerHandlerIndexData; } private void broadcastIndexData(SimpMessageSendingOperations messaging){ Map
        
          resMap = new HashMap<>(); resMap.put("status", 1000); List
         
           indexDataList = powerWrapper.getIndexDataList(); resMap.put("indeDataList", indexDataList); String destination = "/topic/indexData/"; messaging.convertAndSend(destination, resMap); } private void broadcastAnalog(int recordId, SimpMessageSendingOperations messaging) { Map
          
            resMap = new HashMap<>(); resMap.put("status", 1000); int groupid = -1; int factoryid = -1; List
           
             dataUnits = null; String destination = ""; if(powerWrapper.containsRecord(recordId)){ groupid = powerWrapper.getGroupIdByRecord(recordId); dataUnits = powerWrapper.getDataUnitsByGroupid(groupid); factoryid = powerWrapper.getFactoryIdByRecord(recordId); destination = "/topic/analogs/"+factoryid+"/1"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } if(consumptionWrapper.containsRecord(recordId)){ groupid = consumptionWrapper.getGroupIdByRecord(recordId); dataUnits = consumptionWrapper.getDataUnitsByGroupid(groupid); factoryid = consumptionWrapper.getFactoryIdByRecord(recordId); destination = "/topic/consumption/"+factoryid+"/1"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } if(transformerWrapper.containsRecord(recordId)) { groupid = transformerWrapper.getGroupIdByRecord(recordId); dataUnits = transformerWrapper.getDataUnitsByGroupid(groupid); factoryid = transformerWrapper.getFactoryIdByRecord(recordId); destination = "/topic/analogs/"+factoryid+"/2"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } if(powerFactorWrapper.containsRecord(recordId)) { groupid = powerFactorWrapper.getGroupIdByRecord(recordId); dataUnits = powerFactorWrapper.getDataUnitsByGroupid(groupid); factoryid = powerFactorWrapper.getFactoryIdByRecord(recordId); destination = "/topic/analogs/"+factoryid+"/3"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } if(unbalanceWrapper.containsRecord(recordId)) { groupid = unbalanceWrapper.getGroupIdByRecord(recordId); dataUnits = unbalanceWrapper.getDataUnitsByGroupid(groupid); factoryid = unbalanceWrapper.getFactoryIdByRecord(recordId); destination = "/topic/analogs/"+factoryid+"/4"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } } private void broadcastAlarm(SimpMessageSendingOperations messaging) { Map
            
              resMap = new HashMap<>(); resMap.put("status", 1000); List
             
               alarmList = powerWrapper.getAlarmList(); resMap.put("alarmList", alarmList); String destination = "/topic/alarm/"; messaging.convertAndSend(destination, resMap); } /** * @return * @throws IOException */ @Bean(destroyMethod="close") public Broker broker(@Value("${zbus.address}") String zbusAddress) throws IOException { Broker broker = new ZbusBroker(zbusAddress); return broker; } /** * @param consumerHandler * @return * @throws IOException * @throws InterruptedException */ @Bean(initMethod="start") public Consumer zbusConsumer(Broker broker, ConsumerHandler consumerHandler) throws IOException, InterruptedException { MqAdmin mqAdmin = new MqAdmin(broker, recvMq); mqAdmin.removeMQ(); Consumer consumer = new Consumer(broker, recvMq); consumer.onMessage(consumerHandler); return consumer; } @Bean(initMethod="start") public Consumer zbusConsumerAlarm(Broker broker, ConsumerHandler consumerHandlerAlarm) throws IOException, InterruptedException { MqAdmin mqAdminAlarm = new MqAdmin(broker, alarmMq); mqAdminAlarm.removeMQ(); Consumer consumerAlarm = new Consumer(broker, alarmMq); consumerAlarm.onMessage(consumerHandlerAlarm); return consumerAlarm; } @Bean(initMethod="start") public Consumer zbusConsumerIndexData(Broker broker, ConsumerHandler consumerHandlerIndexData) throws IOException, InterruptedException{ MqAdmin mqAdmin2 = new MqAdmin(broker, indexDataMq); mqAdmin2.removeMQ(); Consumer consumerIndexData = new Consumer(broker, indexDataMq); consumerIndexData.onMessage(consumerHandlerIndexData); return consumerIndexData; } /** * @return * @throws IOException * @throws InterruptedException */ @Bean public Producer zbusProducer(Broker broker) throws IOException, InterruptedException { List
              
                powerRecords = powerWrapper.getRecordIds(); List
               
                 consumptionRecords = consumptionWrapper.getRecordIds(); List
                
                  transformerRecords = transformerWrapper.getRecordIds(); List
                 
                   stationParamRecords = stationParamWrapper.getRecordIds(); Producer producer = new Producer(broker, subMq); producer.createMQ(); subscribeData(producer, powerRecords, powerPeriod); subscribeData(producer, consumptionRecords, consumptionPeriod); subscribeData(producer, transformerRecords, powerPeriod); producer.setMq(subIndexDataMq); producer.createMQ(); subscribeData(producer, stationParamRecords, powerPeriod); return producer; } /** * @param producer * @param records * @param period * @throws IOException * @throws InterruptedException */ private void subscribeData(Producer producer, List
                  
                    records, int period) throws IOException, InterruptedException { for(Integer recordid : records) { long periodMillis = period * 60000; String msgbody = String.format("sub:%d,101,%d,7,%d", recordid, recordid, periodMillis); Message message = new Message(); message.setBody(msgbody); message = producer.sendSync(message); } } @Bean public static PropertySourcesPlaceholderConfigurer propertyConfigInDev() { return new PropertySourcesPlaceholderConfigurer(); } }
                  
                 
                
               
              
             
            
           
          
         
        
       
      
     
    
   
  
 
网友评论