在项目中有使用到延时队列的场景,做个简单的记录说明;首先DelayQueue实现了BlockingQueue,加入其中的元素必须实现Delayed接口; 当生产者元素调用put往其中加入元素时,出发Delayed接口
在项目中有使用到延时队列的场景,做个简单的记录说明;首先DelayQueue实现了BlockingQueue,加入其中的元素必须实现Delayed接口;
当生产者元素调用put往其中加入元素时,出发Delayed接口的compareTo方法进行排序,这个排序是按照时间的,按照计划执行的时间排序,先执行的在前面,后执行的排后面;消费者获取元素时,调用getDelay方法返回的值大于0,则消费者线程wait返回的这个时间后,再从队列头部取出元素;下面是个简单的例子
import org.jetbrains.annotations.NotNull; import java.util.Date; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class DelayEntity implements Delayed { private static final Long currentTime = System.currentTimeMillis(); private String str; private Long scheduleTime; public DelayEntity(String str, Long delayed) { this.str = str; scheduleTime = System.currentTimeMillis() + (1000) * delayed; } @Override public long getDelay(@NotNull TimeUnit unit) { return unit.convert(scheduleTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(@NotNull Delayed o) { return (int) (this.scheduleTime - ((DelayEntity) o).scheduleTime); } public String getStr() { return str; } public Long getScheduleTime() { return scheduleTime; } public String showScheduleTime() { return "计划执行时间:" + new Date(this.scheduleTime).toString(); } }
@Test public void test() throws InterruptedException { DelayQueue<DelayEntity> delayQueue = new DelayQueue<>(); delayQueue.put(new DelayEntity("1", 1l)); delayQueue.put(new DelayEntity("2", 2l)); delayQueue.put(new DelayEntity("4", 3l)); while (true) { DelayEntity take = delayQueue.take(); System.out.println("参数:" + take.getStr() + ";计划执行时间:" + take.showScheduleTime() + ";实际执行时间:" + new Date().toString()); } }
下面看下take方法
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
这里可以看到,他不是一直循环的,是获取到第一个元素的delay等待的时间,之后等待这个时间才去唤醒其他线程;
另外,添加元素时,add方法和put方法都是调用的offer方法,区别是一个返回值,一个没有;
延伸下BlockingQueue的几个常用的操作;
1.offer方法是BlockingQueue的,offer不会阻塞执行的方法,可以添加返回true,否则返回false;
2.BlockingQueue的put方法,如果没有空间,会阻塞一直等到有空间
3.poll获取元素,不会阻塞,获取不到就返回null;
4.take,获取不到就阻塞
到此这篇关于Java DelayQueue实现任务延时示例讲解的文章就介绍到这了,更多相关Java DelayQueue内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!