订阅 package com.whty.jedis.pubsub;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;public class SubThread extends Thread { private final JedisPool jedisPool; private final Subscriber subscriber = new Subscriber(); pr
package com.whty.jedis.pubsub;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class SubThread extends Thread {
private final JedisPool jedisPool;
private final Subscriber subscriber = new Subscriber();
private final String channel = "mychannel";
public SubThread(JedisPool jedisPool) {
super("SubThread");
this.jedisPool = jedisPool;
}
@Override
public void run() {
System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
/** subscriber 代表是处理消息的类
* channel 是密钥
*/
jedis.subscribe(subscriber, channel); //订阅消息
} catch (Exception e) {
System.out.println(String.format("subsrcibe channel error, %s", e));
} finally {
if (jedis != null) {
/*jedis.close();*/
}
}
}
}
发布
package com.whty.jedis.pubsub;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class PublisherThread {
private final JedisPool jedisPool;
public PublisherThread(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
public void start() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); //获取内容
Jedis jedis = jedisPool.getResource();
while (true) {
String line = null;
try {
line = reader.readLine(); //读取一行内容
if (!"quit".equals(line)) {
/**
* publish 第一个参数是 密钥和 订阅消息的密钥是一样的
* publish 第二个参数是 发布的消息
*/
jedis.publish("mychannel", line); //发布消息
} else {
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
main方法
package com.whty.jedis.pubsub;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class PubSubDemo
{
public static void main( String[] args )
{
// 替换成你的reids地址和端口
String redisIp = "127.0.0.1";
int reidsPort = 6379;
JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), redisIp, reidsPort);
System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", redisIp, reidsPort));
SubThread subThread = new SubThread(jedisPool); //先订阅消息
subThread.start();
PublisherThread publisher = new PublisherThread(jedisPool); //发布消息
publisher.start();
}
}
消息类
package com.whty.jedis.pubsub;
import redis.clients.jedis.JedisPubSub;
/**
* 继承这个类就完成了对客户端对订阅的监听
* @author yang
* 监听到订阅模式接受到消息时的回调 (onPMessage)
监听到订阅频道接受到消息时的回调 (onMessage )
订阅频道时的回调( onSubscribe )
取消订阅频道时的回调( onUnsubscribe )
订阅频道模式时的回调 ( onPSubscribe )
取消订阅模式时的回调( onPUnsubscribe )
*
*/
public class Subscriber extends JedisPubSub {
// 取得订阅的消息后的处理
public void onMessage(String channel, String message) {
System.out.println(channel + "====取得订阅的消息后的处理=======" + message);
}
// 初始化订阅时候的处理
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(channel + "====初始化订阅时候的处理化=====" + subscribedChannels);
}
// 取消订阅时候的处理
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println(channel + "=====取消订阅时候的处理=====" + subscribedChannels);
}
// 初始化按表达式的方式订阅时候的处理
public void onPSubscribe(String pattern, int subscribedChannels) {
System.out.println(pattern + "===初始化按表达式的方式订阅时候的处理===" + subscribedChannels);
}
// 取消按表达式的方式订阅时候的处理
public void onPUnsubscribe(String pattern, int subscribedChannels) {
System.out.println(pattern + "====取消按表达式的方式订阅时候的处理====" + subscribedChannels);
}
// 取得按表达式的方式订阅的消息后的处理
public void onPMessage(String pattern, String channel, String message) {
System.out.println(pattern + "==" + channel + "=" + message);
}
}
redis-test.rar
redis-test.rar
EhCacheDemo-master.rar
EhCacheDemo-master.rar
redis分布式锁.zip
redis分布式锁.zip
netty-demo.rar
netty-demo.rar
