当前位置 : 主页 > 编程语言 > java >

Redis 中的消息队列--使用zset实现“消息队列”示例展示

来源:互联网 收集:自由互联 发布时间:2021-06-28
消息队列的简单实现 public class RedisQueue { private static Jedis redis; public static final String MSG_ZSET="upload_msg";//消息队列zset(主题) key public static final String TOPIC_ZSET="topic_upload";//主题已消费记录
消息队列的简单实现
public class RedisQueue {  
    private static Jedis redis;  
   
    public static final String MSG_ZSET="upload_msg";//消息队列zset(主题) key  
    public static final String TOPIC_ZSET="topic_upload";//主题已消费记录zset key  
    public static final String IP_ZSET_PRE="Server_";//消费者已消费记录zset key前缀  
    public static final String MSG_SEQ_ID="upload_seq_id";//消息队列 msg_id自增生成器 对应key  
   
    public static void main(String[] args) {  
        redis = new Jedis("192.168.26.128", 6379);  
        init();//初始化,每个服务启动时  
        sendMsg();//模拟发送消息  
        receiveMsg();//消费消息  
   
    }  
   
    //每个ip 启动时执行  
    public static void init(){  
        //获取本机ip  
        String ipAddr = geIp();  
        if(ipAddr !=null){  
            Long ret = redis.zrank(TOPIC_ZSET, ipAddr);  
            if(ret == null){//如果未创建,就开始初始化  
                String max_id = redis.get(MSG_SEQ_ID);//获取自增序列号  
                Long score = 0l;  
                if(max_id != null){  
                    score = Long.valueOf(max_id);  
                }  
                redis.zadd(TOPIC_ZSET,score,ipAddr);//初始化 主题已消费记录zset  
                redis.zadd(IP_ZSET_PRE+ipAddr,score,TOPIC_ZSET);//初始化 消息队列zset(主题)  
            }  
        }  
    }  
   
    public static void sendMsg(){  
        //省略入mysql库等业务方法  
        String upload_url = "xxxx";  
        Long now = System.currentTimeMillis();  
        //加时间搓,可以是实现重复上传同一个jar,也可以去掉  
        String msg = upload_url+"|"+now;  
   
        //生成消息Id  
        Long msg_id = redis.incr(MSG_SEQ_ID);  
        System.out.println(msg_id);  
   
        //发送消息 想消息队列中添加一条新消息  
        redis.zadd(MSG_ZSET,msg_id,msg);  
    }  
   
    public static void receiveMsg(){  
        //获取本机ip  
        String ipAddr = geIp();  
        if(ipAddr!=null){  
            while (true){  
                //获取当前已消费的msg_id  
                Double score = redis.zscore(IP_ZSET_PRE+ipAddr,TOPIC_ZSET);  
                System.out.println(score);  
   
                //获取未读消息进行处理  
                Set
 
   tuples2 = redis.zrangeByScoreWithScores(MSG_ZSET, score.longValue() + 1 + "", "inf");  
                Double lastMsg_id = 0d;  
                for (Tuple t : tuples2) {//模拟jar包下载,以及热更新 业务操作  
                    lastMsg_id = t.getScore();  
                    System.out.println(t.getElement() + ":" + t.getScore());  
                }  
   
                if(tuples2.size()>0){  
                    //处理完成后,更新该服务器的已处理列表  
                    redis.zadd(TOPIC_ZSET,lastMsg_id,ipAddr);  
                    redis.zadd(IP_ZSET_PRE+ipAddr,lastMsg_id,TOPIC_ZSET);  
   
                    //找出所以ip都消费过的消息id,其实就是zset的第一个成员  
                    Set
  
    first = redis.zrangeWithScores(TOPIC_ZSET,0,0); Double allReceive_id = 0d; if(first.iterator().hasNext()){ Tuple temp = first.iterator().next(); if(temp!=null){ allReceive_id = temp.getScore(); redis.zremrangeByScore(MSG_ZSET,0,allReceive_id); } } } try { Thread.sleep(1000);//每隔1秒钟消费一次 } catch (InterruptedException e) { e.printStackTrace(); } } } } private static String geIp(){ //获取本机ip String ipAddr = null; try { ipAddr = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { e.printStackTrace(); } return ipAddr; } }
  
 
网友评论