当前位置 : 主页 > 编程语言 > c++ >

rabbitmq整合spring实现rpc技术调用

来源:互联网 收集:自由互联 发布时间:2021-06-30
请求端配置文件rabbitmq-context-client.xml 监听容器需要自己重写spring默认的监听容器 package net.nxmax.atp.remoting;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Queue;impo
请求端配置文件rabbitmq-context-client.xml
   

  

	
  
	
  
	
  
		
	
  
	
   
  

	
  
	
  

	
  
	
	
   
    
     
    
  

	
  
	
   
  

	
   
    
  
	

 
监听容器需要自己重写spring默认的监听容器
package net.nxmax.atp.remoting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

public class DynamicReplyMessageListenerContainer extends SimpleMessageListenerContainer 
{
	protected static final Logger logger = LoggerFactory.getLogger(DynamicReplyMessageListenerContainer.class);
	
	@Override
	protected void doInitialize() throws Exception 
	{
		logger.info("执行DynamicReplyMessageListenerContainer.doInitialize()");
		super.doInitialize();
		Object listener = getMessageListener();
		if (listener instanceof RabbitTemplate) {
			
			//这个队列是服务端的答复队列,
			Queue queue1 = new Queue("JC_TO_RMPS_QUEUE_1");
			Queue queue2 = new Queue("JC_TO_RMPS_QUEUE_2");
			setQueues(queue1,queue2);
		}
		
	}
}
客户端请求实现类
package com.hjh.rabbitmq.cross;

import java.util.concurrent.Semaphore;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
 * 交叉检测业务服务类,通过mq的方式请求
 * @author jianhua.huang
 * */
//@Service("crossDetectionByMq")
public class CrossDetectionByMqService
{

	private RabbitTemplate amqpTemplate;

    //信号量,用来控制并发线程的最大数量
	private Semaphore sempahore = new Semaphore(6); 

	public RabbitTemplate getAmqpTemplate()
	{
		return amqpTemplate;
	}

	public void setAmqpTemplate(RabbitTemplate amqpTemplate)
	{
		this.amqpTemplate = amqpTemplate;
	}


	/**
	 * 交叉检测方法入口
	 * @author jianhua.huang
	 * */
	public void doCrossDetection(String msgExt, String routingKey)
	{

		//通过mq远程请求交叉检测数据
		MessageProperties msgProp = new MessageProperties();
		msgProp.setContentType("text");
		String str = "凹凸曼,凹凸曼 | " + msgExt + "," + routingKey;
		Message msg = new Message(str.getBytes(),msgProp);
		amqpTemplate.setReplyTimeout(11000);
		Object response = null;
		long start = System.currentTimeMillis();
		try
		{
			sempahore.acquire();
			System.out.println("发送出去的消息为 : {" + str + "}");
			amqpTemplate.setReplyAddress(routingKey);
			response = amqpTemplate.convertSendAndReceive("rmpsToJcKey", msg);
			System.out.println("类型 : " + response.getClass().getSimpleName());
		} 
		catch (InterruptedException e)
		{
			e.printStackTrace();
		}
		finally
		{
			System.out.println(msgExt + "释放一个许可");
			sempahore.release();
		}
		
		long end = System.currentTimeMillis();
		System.out.println("response 为: {" + response + "},耗时 = " + (end - start));
	}

}
客户端main启动实现,主要是测试并发
package net.nxmax.atp.startup;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.hjh.rabbitmq.cross.CrossDetectionByMqService;

/**
 * 非客户端启动类
 * @author jianhua.huang
 * */
public class SimpleClientStartup implements Runnable
{
	
	private String routeKey;
	private CountDownLatch countDown;
	private CrossDetectionByMqService cross;
	public SimpleClientStartup(String routeKey, CountDownLatch countDown, CrossDetectionByMqService cross)
	{
		this.routeKey = routeKey;
		this.countDown = countDown;
		this.cross = cross;
	}
	
	public static void main(String[] args)
	{
		ApplicationContext ctx = new ClassPathXmlApplicationContext("rpc/fixq/rabbitmq-context-client.xml");
		CrossDetectionByMqService cross = ctx.getBean("crossDetection",CrossDetectionByMqService.class);

		cross.doCrossDetection(Thread.currentThread().getName(), "JC_TO_RMPS_KEY_1");
		
//		ExecutorService service = Executors.newCachedThreadPool(); 
//		final CountDownLatch countDown = new CountDownLatch(10); 
//
//		String[] keys = {"JC_TO_RMPS_KEY_1","JC_TO_RMPS_KEY_2"};
//		
//		for(int i=0;i<10;i++)
//		{ 
//			int idx = i % 2;
//			String routeKey = keys[idx];
//			service.execute(new SimpleClientStartup(routeKey, countDown, cross)); 
//			countDown.countDown(); 
//		}        
//		service.shutdown(); 
	}
	
	@Override
	public void run()
	{
		try
		{
			countDown.await(); 
			cross.doCrossDetection(Thread.currentThread().getName(), routeKey);
		} catch (Exception e)
		{
			e.printStackTrace();
		}
	}
	
	public void testCountDown()
	{
		ExecutorService service = Executors.newCachedThreadPool(); 
		final CountDownLatch cdOrder = new CountDownLatch(1); 
		final CountDownLatch cdAnswer = new CountDownLatch(3); 
		
		for(int i=0;i<3;i++)
		{ 
			Runnable runnable = new Runnable()
			{ 
				public void run()
				{ 
					try 
					{ 
						System.out.println("线程" + Thread.currentThread().getName() + "正准备接受命令");          
						cdOrder.await(); 
						System.out.println("线程" + Thread.currentThread().getName() + "已接受命令");                                
//						Thread.sleep((long)(Math.random()*10000));   
						cdAnswer.countDown();                        
						System.out.println("线程" + Thread.currentThread().getName() +  "回应命令处理结果");                 
					} catch (Exception e) 
					{ 
						e.printStackTrace(); 
					}                
				} 
			}; 
			service.execute(runnable); 
		}        

		try
		{ 
			Thread.sleep((long)(Math.random()*10000)); 
			System.out.println("线程" + Thread.currentThread().getName() +  "即将发布命令");                       
			cdOrder.countDown(); 
			System.out.println("线程" + Thread.currentThread().getName() +  "已发送命令,正在等待结果");     
			cdAnswer.await(); 
			System.out.println("线程" + Thread.currentThread().getName() +  "已收到所有响应结果");    

		} catch (Exception e) 
		{ 
			e.printStackTrace(); 
		}                
		service.shutdown(); 
	}

}
服务端配置文件applicationContext.xml
 

 
	
  
	
	
   
  
	
  

	
   
    
  

	
   
    
  


 
服务端CrossDetectionConsumer类型实现
package com.hjh.rabbitmq.cross;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

@Service
public class CrossDetectionConsumer implements MessageListener
{
	private RabbitTemplate amqpTemplate;

	public void onMessage(Message message)
	{
		String msg = new String(message.getBody());
		String json = "模拟交叉检测平台的响应信息, 响应线程名称为【 " + msg.substring(msg.indexOf("|") + 1) + "】rpc 请求!";
		MessageProperties msgProp = message.getMessageProperties();
		
		String replyTo = msgProp.getReplyTo();
		System.out.println("replyTo = " + replyTo);
		System.out.println("当前处理线程名称 : " + Thread.currentThread().getName());
		Message responseMsg = new Message(json.getBytes(),message.getMessageProperties());
		//"jcToRmpsKey"
		try
		{
			Thread.sleep(5000);
		} catch (Exception e)
		{
			// TODO: handle exception
		}
		amqpTemplate.send(replyTo, responseMsg);
	}

	public RabbitTemplate getAmqpTemplate()
	{
		return amqpTemplate;
	}

	public void setAmqpTemplate(RabbitTemplate amqpTemplate)
	{
		this.amqpTemplate = amqpTemplate;
	}

}
服务端main启动类
package net.nxmax.atp.startup;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ServerStartup {

	/** Logger */
	private static final Logger log = LoggerFactory
			.getLogger(ServerStartup.class);

	public static void main(String[] args) throws Exception {
		new ServerStartup().go();
	}

	private void go() throws Exception {
		log.info("Create ApplicationContext");
		ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("rpc/fixq/applicationContext.xml");
		AmqpTemplate amqpTemplate = ctx.getBean("amqpTemplateInternetProxy",AmqpTemplate.class);
		
//		amqpTemplate.convertAndSend("", "");
		log.info("Create ApplicationContext Finish.");
		BufferedReader r = new BufferedReader(new InputStreamReader(System.in));
		while (!r.readLine().equalsIgnoreCase("exit")) {
		}
		log.info("Exit now");
		ctx.destroy();
	}

}



注意:
	由于用spring整合rabbitmq来实现rpc调用技术,在客户端(即request端)访问远程服务器的时候,其本质就是用jdk的
	阻塞队列做的线程同步,因此并不限于服务器到底是使用异步的rabbitmq还是同步的rpc机制,这个总结是基于作者阅读
	源码发现的。
网友评论