消息队列的简单实现 public class RedisQueue { private static Jedis redis; public static final String MSG_ZSET="upload_msg";//消息队列zset(主题) key public static final String TOPIC_ZSET="topic_upload";//主题已消费记录
public class RedisQueue { private static Jedis redis; public static final String MSG_ZSET="upload_msg";//消息队列zset(主题) key public static final String TOPIC_ZSET="topic_upload";//主题已消费记录zset key public static final String IP_ZSET_PRE="Server_";//消费者已消费记录zset key前缀 public static final String MSG_SEQ_ID="upload_seq_id";//消息队列 msg_id自增生成器 对应key public static void main(String[] args) { redis = new Jedis("192.168.26.128", 6379); init();//初始化,每个服务启动时 sendMsg();//模拟发送消息 receiveMsg();//消费消息 } //每个ip 启动时执行 public static void init(){ //获取本机ip String ipAddr = geIp(); if(ipAddr !=null){ Long ret = redis.zrank(TOPIC_ZSET, ipAddr); if(ret == null){//如果未创建,就开始初始化 String max_id = redis.get(MSG_SEQ_ID);//获取自增序列号 Long score = 0l; if(max_id != null){ score = Long.valueOf(max_id); } redis.zadd(TOPIC_ZSET,score,ipAddr);//初始化 主题已消费记录zset redis.zadd(IP_ZSET_PRE+ipAddr,score,TOPIC_ZSET);//初始化 消息队列zset(主题) } } } public static void sendMsg(){ //省略入mysql库等业务方法 String upload_url = "xxxx"; Long now = System.currentTimeMillis(); //加时间搓,可以是实现重复上传同一个jar,也可以去掉 String msg = upload_url+"|"+now; //生成消息Id Long msg_id = redis.incr(MSG_SEQ_ID); System.out.println(msg_id); //发送消息 想消息队列中添加一条新消息 redis.zadd(MSG_ZSET,msg_id,msg); } public static void receiveMsg(){ //获取本机ip String ipAddr = geIp(); if(ipAddr!=null){ while (true){ //获取当前已消费的msg_id Double score = redis.zscore(IP_ZSET_PRE+ipAddr,TOPIC_ZSET); System.out.println(score); //获取未读消息进行处理 Settuples2 = redis.zrangeByScoreWithScores(MSG_ZSET, score.longValue() + 1 + "", "inf"); Double lastMsg_id = 0d; for (Tuple t : tuples2) {//模拟jar包下载,以及热更新 业务操作 lastMsg_id = t.getScore(); System.out.println(t.getElement() + ":" + t.getScore()); } if(tuples2.size()>0){ //处理完成后,更新该服务器的已处理列表 redis.zadd(TOPIC_ZSET,lastMsg_id,ipAddr); redis.zadd(IP_ZSET_PRE+ipAddr,lastMsg_id,TOPIC_ZSET); //找出所以ip都消费过的消息id,其实就是zset的第一个成员 Set first = redis.zrangeWithScores(TOPIC_ZSET,0,0); Double allReceive_id = 0d; if(first.iterator().hasNext()){ Tuple temp = first.iterator().next(); if(temp!=null){ allReceive_id = temp.getScore(); redis.zremrangeByScore(MSG_ZSET,0,allReceive_id); } } } try { Thread.sleep(1000);//每隔1秒钟消费一次 } catch (InterruptedException e) { e.printStackTrace(); } } } } private static String geIp(){ //获取本机ip String ipAddr = null; try { ipAddr = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { e.printStackTrace(); } return ipAddr; } }