消息队列的简单实现 public class RedisQueue { private static Jedis redis; public static final String MSG_ZSET="upload_msg";//消息队列zset(主题) key public static final String TOPIC_ZSET="topic_upload";//主题已消费记录
public class RedisQueue {
private static Jedis redis;
public static final String MSG_ZSET="upload_msg";//消息队列zset(主题) key
public static final String TOPIC_ZSET="topic_upload";//主题已消费记录zset key
public static final String IP_ZSET_PRE="Server_";//消费者已消费记录zset key前缀
public static final String MSG_SEQ_ID="upload_seq_id";//消息队列 msg_id自增生成器 对应key
public static void main(String[] args) {
redis = new Jedis("192.168.26.128", 6379);
init();//初始化,每个服务启动时
sendMsg();//模拟发送消息
receiveMsg();//消费消息
}
//每个ip 启动时执行
public static void init(){
//获取本机ip
String ipAddr = geIp();
if(ipAddr !=null){
Long ret = redis.zrank(TOPIC_ZSET, ipAddr);
if(ret == null){//如果未创建,就开始初始化
String max_id = redis.get(MSG_SEQ_ID);//获取自增序列号
Long score = 0l;
if(max_id != null){
score = Long.valueOf(max_id);
}
redis.zadd(TOPIC_ZSET,score,ipAddr);//初始化 主题已消费记录zset
redis.zadd(IP_ZSET_PRE+ipAddr,score,TOPIC_ZSET);//初始化 消息队列zset(主题)
}
}
}
public static void sendMsg(){
//省略入mysql库等业务方法
String upload_url = "xxxx";
Long now = System.currentTimeMillis();
//加时间搓,可以是实现重复上传同一个jar,也可以去掉
String msg = upload_url+"|"+now;
//生成消息Id
Long msg_id = redis.incr(MSG_SEQ_ID);
System.out.println(msg_id);
//发送消息 想消息队列中添加一条新消息
redis.zadd(MSG_ZSET,msg_id,msg);
}
public static void receiveMsg(){
//获取本机ip
String ipAddr = geIp();
if(ipAddr!=null){
while (true){
//获取当前已消费的msg_id
Double score = redis.zscore(IP_ZSET_PRE+ipAddr,TOPIC_ZSET);
System.out.println(score);
//获取未读消息进行处理
Set
tuples2 = redis.zrangeByScoreWithScores(MSG_ZSET, score.longValue() + 1 + "", "inf");
Double lastMsg_id = 0d;
for (Tuple t : tuples2) {//模拟jar包下载,以及热更新 业务操作
lastMsg_id = t.getScore();
System.out.println(t.getElement() + ":" + t.getScore());
}
if(tuples2.size()>0){
//处理完成后,更新该服务器的已处理列表
redis.zadd(TOPIC_ZSET,lastMsg_id,ipAddr);
redis.zadd(IP_ZSET_PRE+ipAddr,lastMsg_id,TOPIC_ZSET);
//找出所以ip都消费过的消息id,其实就是zset的第一个成员
Set
first = redis.zrangeWithScores(TOPIC_ZSET,0,0); Double allReceive_id = 0d; if(first.iterator().hasNext()){ Tuple temp = first.iterator().next(); if(temp!=null){ allReceive_id = temp.getScore(); redis.zremrangeByScore(MSG_ZSET,0,allReceive_id); } } } try { Thread.sleep(1000);//每隔1秒钟消费一次 } catch (InterruptedException e) { e.printStackTrace(); } } } } private static String geIp(){ //获取本机ip String ipAddr = null; try { ipAddr = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { e.printStackTrace(); } return ipAddr; } }
