当前位置 : 主页 > 编程语言 > java >

redis 发布订阅分布式锁和EhCache缓存同步demo以及netty的demo

来源:互联网 收集:自由互联 发布时间:2021-06-28
订阅 package com.whty.jedis.pubsub;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;public class SubThread extends Thread { private final JedisPool jedisPool; private final Subscriber subscriber = new Subscriber(); pr
订阅
package com.whty.jedis.pubsub;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class SubThread extends Thread {
    private final JedisPool jedisPool;
    private final Subscriber subscriber = new Subscriber();

    private final String channel = "mychannel";

    public SubThread(JedisPool jedisPool) {
        super("SubThread");
        this.jedisPool = jedisPool;
    }

    @Override
    public void run() {
        System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            
            /** subscriber 代表是处理消息的类
             *  channel    是密钥
             */            
            jedis.subscribe(subscriber, channel);   //订阅消息
        } catch (Exception e) {
            System.out.println(String.format("subsrcibe channel error, %s", e));
        } finally {
            if (jedis != null) {
                /*jedis.close();*/
            }
        }
    }
}
发布
package com.whty.jedis.pubsub;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class PublisherThread {
    private final JedisPool jedisPool;

    public PublisherThread(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    public void start() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));   //获取内容
        Jedis jedis = jedisPool.getResource();
        while (true) {
            String line = null;
            try {
                line = reader.readLine();  //读取一行内容
                if (!"quit".equals(line)) {
                	 /**
                	  * publish 第一个参数是 密钥和 订阅消息的密钥是一样的
                	  * publish 第二个参数是 发布的消息
                	  */
                    jedis.publish("mychannel", line);   //发布消息
                } else {
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
         }
    }
}
main方法
package com.whty.jedis.pubsub;

import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class PubSubDemo 
{
    public static void main( String[] args )
    {
        // 替换成你的reids地址和端口
        String redisIp = "127.0.0.1";
        int reidsPort = 6379;
        JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), redisIp, reidsPort);
        System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", redisIp, reidsPort));

        SubThread subThread = new SubThread(jedisPool);   //先订阅消息
        subThread.start();

        PublisherThread publisher = new PublisherThread(jedisPool);  //发布消息
        publisher.start();
    }
}
消息类
package com.whty.jedis.pubsub;

import redis.clients.jedis.JedisPubSub;

/**
 * 继承这个类就完成了对客户端对订阅的监听
 * @author yang
 * 监听到订阅模式接受到消息时的回调 (onPMessage)
	监听到订阅频道接受到消息时的回调 (onMessage )
	订阅频道时的回调( onSubscribe )
	取消订阅频道时的回调( onUnsubscribe )
	订阅频道模式时的回调 ( onPSubscribe )
	取消订阅模式时的回调( onPUnsubscribe )
 *
 */
public class Subscriber extends JedisPubSub {
	// 取得订阅的消息后的处理
	public void onMessage(String channel, String message) {
		System.out.println(channel + "====取得订阅的消息后的处理=======" + message);
	}

	// 初始化订阅时候的处理
	public void onSubscribe(String channel, int subscribedChannels) {
		 System.out.println(channel + "====初始化订阅时候的处理化=====" + subscribedChannels);
	}

	// 取消订阅时候的处理
	public void onUnsubscribe(String channel, int subscribedChannels) {
		 System.out.println(channel + "=====取消订阅时候的处理=====" + subscribedChannels);
	}

	// 初始化按表达式的方式订阅时候的处理
	public void onPSubscribe(String pattern, int subscribedChannels) {
		 System.out.println(pattern + "===初始化按表达式的方式订阅时候的处理===" + subscribedChannels);
	}

	// 取消按表达式的方式订阅时候的处理
	public void onPUnsubscribe(String pattern, int subscribedChannels) {
		 System.out.println(pattern + "====取消按表达式的方式订阅时候的处理====" + subscribedChannels);
	}

	// 取得按表达式的方式订阅的消息后的处理
	public void onPMessage(String pattern, String channel, String message) {
		System.out.println(pattern + "==" + channel + "=" + message);
	}
}
redis-test.rar redis-test.rar EhCacheDemo-master.rar EhCacheDemo-master.rar redis分布式锁.zip redis分布式锁.zip netty-demo.rar netty-demo.rar
网友评论