订阅 package com.whty.jedis.pubsub;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;public class SubThread extends Thread { private final JedisPool jedisPool; private final Subscriber subscriber = new Subscriber(); pr
package com.whty.jedis.pubsub; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; public class SubThread extends Thread { private final JedisPool jedisPool; private final Subscriber subscriber = new Subscriber(); private final String channel = "mychannel"; public SubThread(JedisPool jedisPool) { super("SubThread"); this.jedisPool = jedisPool; } @Override public void run() { System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel)); Jedis jedis = null; try { jedis = jedisPool.getResource(); /** subscriber 代表是处理消息的类 * channel 是密钥 */ jedis.subscribe(subscriber, channel); //订阅消息 } catch (Exception e) { System.out.println(String.format("subsrcibe channel error, %s", e)); } finally { if (jedis != null) { /*jedis.close();*/ } } } }发布
package com.whty.jedis.pubsub; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; public class PublisherThread { private final JedisPool jedisPool; public PublisherThread(JedisPool jedisPool) { this.jedisPool = jedisPool; } public void start() { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); //获取内容 Jedis jedis = jedisPool.getResource(); while (true) { String line = null; try { line = reader.readLine(); //读取一行内容 if (!"quit".equals(line)) { /** * publish 第一个参数是 密钥和 订阅消息的密钥是一样的 * publish 第二个参数是 发布的消息 */ jedis.publish("mychannel", line); //发布消息 } else { break; } } catch (IOException e) { e.printStackTrace(); } } } }main方法
package com.whty.jedis.pubsub; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class PubSubDemo { public static void main( String[] args ) { // 替换成你的reids地址和端口 String redisIp = "127.0.0.1"; int reidsPort = 6379; JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), redisIp, reidsPort); System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", redisIp, reidsPort)); SubThread subThread = new SubThread(jedisPool); //先订阅消息 subThread.start(); PublisherThread publisher = new PublisherThread(jedisPool); //发布消息 publisher.start(); } }消息类
package com.whty.jedis.pubsub; import redis.clients.jedis.JedisPubSub; /** * 继承这个类就完成了对客户端对订阅的监听 * @author yang * 监听到订阅模式接受到消息时的回调 (onPMessage) 监听到订阅频道接受到消息时的回调 (onMessage ) 订阅频道时的回调( onSubscribe ) 取消订阅频道时的回调( onUnsubscribe ) 订阅频道模式时的回调 ( onPSubscribe ) 取消订阅模式时的回调( onPUnsubscribe ) * */ public class Subscriber extends JedisPubSub { // 取得订阅的消息后的处理 public void onMessage(String channel, String message) { System.out.println(channel + "====取得订阅的消息后的处理=======" + message); } // 初始化订阅时候的处理 public void onSubscribe(String channel, int subscribedChannels) { System.out.println(channel + "====初始化订阅时候的处理化=====" + subscribedChannels); } // 取消订阅时候的处理 public void onUnsubscribe(String channel, int subscribedChannels) { System.out.println(channel + "=====取消订阅时候的处理=====" + subscribedChannels); } // 初始化按表达式的方式订阅时候的处理 public void onPSubscribe(String pattern, int subscribedChannels) { System.out.println(pattern + "===初始化按表达式的方式订阅时候的处理===" + subscribedChannels); } // 取消按表达式的方式订阅时候的处理 public void onPUnsubscribe(String pattern, int subscribedChannels) { System.out.println(pattern + "====取消按表达式的方式订阅时候的处理====" + subscribedChannels); } // 取得按表达式的方式订阅的消息后的处理 public void onPMessage(String pattern, String channel, String message) { System.out.println(pattern + "==" + channel + "=" + message); } }redis-test.rar redis-test.rar EhCacheDemo-master.rar EhCacheDemo-master.rar redis分布式锁.zip redis分布式锁.zip netty-demo.rar netty-demo.rar