gistfile1.txt package org.study.javabasic.thread.blockqueue;import java.util.Map;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.DelayQueue;import java.util.concurrent.Dela
package org.study.javabasic.thread.blockqueue; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /*** * 使用延迟队列实现缓存操作 * * @author 陈波 * * @param* @param */ public class DealyBlockQueenCache implements Cache { /*** * 默认过期时间2分钟 */ private static final long DEFAULT_CACHE_TIMEOUT = TimeUnit.NANOSECONDS.convert(2, TimeUnit.MINUTES); /** * 存放元素的HashMap */ private Map cacheMap = new ConcurrentHashMap<>(); /*** * 保存元素的Key的延迟队列,主要用于清理过期的数据信息 */ private BlockingQueue > dealyQueue = new DelayQueue<>(); /*** * 清除过期数据的任务 */ private final CacheExpireCleanTask cleanTask = new CacheExpireCleanTask(); /*** * 启动守护线程进行数据的过期清理操作 */ public DealyBlockQueenCache() { /*** * 启动任务操作 */ Thread cleanThread = new Thread(cleanTask, "dealyblockqueuecache-clean-task"); cleanThread.setDaemon(true); cleanThread.start(); } /*** * 摧毁后,若需要在次使用缓存则需要重新生成 */ @Override public void destory() { this.cleanTask.running = false; this.dealyQueue.clear(); this.cacheMap.clear(); this.dealyQueue = null; this.cacheMap = null; } /*** * 存入数据操作 */ @Override public void put(K key, V value) { try { synchronized (key) { dealyQueue.put(new DealyCacheItem (key, DEFAULT_CACHE_TIMEOUT)); cacheMap.put(key, value); } } catch (InterruptedException e) { throw new RuntimeException("store cache data failed!"); } } @Override public V get(K key) { return cacheMap.get(key); } /*** * 存入数据操作 */ @Override public void put(K key, V value, long timeOut, TimeUnit timeUnit) { try { synchronized (key) { dealyQueue.put(new DealyCacheItem (key, TimeUnit.NANOSECONDS.convert(timeOut, timeUnit))); cacheMap.put(key, value); } } catch (InterruptedException e) { throw new RuntimeException("store cache data failed!"); } } /*** * 检查方法操作,通过守护线程来调用这个检查方法 * * @throws InterruptedException */ private final void cacheExpireCheck() throws InterruptedException { /*** * 利用延迟阻塞队列的特性,队列头部的元素一定是达到过期时间的元素 */ DealyCacheItem removeKey = dealyQueue.take(); if (removeKey != null) { V v = cacheMap.remove(removeKey.getCacheData()); afterClean(v); } } /*** * 清除数据之后的操作,主要用于一些监控操作,默认不处理,若需处理执行覆盖这个方法 * * @param v * @return */ public V afterClean(V v) { return null; }; /*** * 移除元素操作 */ @Override public V remove(K key) { synchronized (key) { dealyQueue.remove(key); return cacheMap.remove(key); } } @Override public boolean isEmpty() { return cacheMap == null || cacheMap.isEmpty(); } /*** * 延迟项数据 * * @author lyf * * @param */ private static class DealyCacheItem implements Delayed { /*** * 缓存数据项 */ private T cacheData; /*** * 过期时间 */ private long expireTime; /*** * 序列 */ private long sequence; /*** * 记录数据操作 */ private static AtomicLong totalCount = new AtomicLong(0); /*** * 生成数据项,并指定缓存的存活时间长短为timeOutNanos纳秒 * * @param cacheData * @param timeOutNanos */ public DealyCacheItem(T cacheData, long timeOutNanos) { this.cacheData = cacheData; // 即cacheData这个数据将在当前时间timeOutNanos之后过期 this.expireTime = System.nanoTime() + timeOutNanos; // 记录树进行增长,并赋值给对应的序列 this.sequence = totalCount.incrementAndGet(); } /*** * 用于优先级队列中优先顺序排序操作 * */ @SuppressWarnings("unchecked") @Override public int compareTo(Delayed o) { if (this == o) { return 0; } if (o instanceof DealyCacheItem) { DealyCacheItem other = (DealyCacheItem ) o; // 比较过期时间,看谁的时间比较短,比较短的放在前面 // 若两者过期的时间相同,则看对应的序列 long m = (this.expireTime - other.expireTime); int p = m > 0 ? 1 : m < 0 ? -1 : (this.sequence > other.sequence ? 1 : -1); return p; } long m = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return m > 0 ? 1 : m < 0 ? -1 : 0; } @Override public long getDelay(TimeUnit unit) { /*** * 计算数据是否已经到过期的时间了 */ return unit.convert(this.expireTime - System.nanoTime(), TimeUnit.NANOSECONDS); } public T getCacheData() { return cacheData; } public long getExpireTime() { return expireTime; } public long getSequence() { return sequence; } } /*** * 定时清理任务 * * @author 陈波 * */ private final class CacheExpireCleanTask implements Runnable { volatile boolean running = true; @Override public void run() { while (!Thread.interrupted() || !running) { try { cacheExpireCheck(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }