Java延时队列(DelayQueue)

介绍

J.U.C里面,有延时队列,顾名思义就是根据时间排序的队列。

DelayQueue 是JDK中提供的延时队列,内部封装优先级队列,并且提供空阻塞功能。

DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。

同时延时队列实现了阻塞队列的接口(BlockingQueue是个接口,DelayQueue实现了阻塞队列,override了几个方法)

核心字段

// 锁对象
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 头线程
private Thread leader = null;
// 条件
private final Condition available = lock.newCondition();
底层数据结构是new PriorityQueue<E>(),这是一个无界的优先级队列。

继续往下深究,实际上数据存放在了对象数组 Object[]里面,这个对象数组在插入数据的时候,是需要根据比较器来完成比较顺序插入的。

transient Object[] queue;
int size;
private final Comparator<? super E> comparator;
offer方法

把元素 element 插入优先级队列中,插入的顺序是根据定义的比较器(Comparator),最后反映出来的顺序,是经过排序的。

public boolean offer(E e) {
    // 获取锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 元素加入优先级队列
        q.offer(e);
        // 获取优先级头元素,头元素等于当前元素
        // 清空leader,并放开读限制
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        // 释放锁
        lock.unlock();
    }
}

take方法

1、可重入锁ReentrantLock lock

2、自旋等待 for (;;),这个时候已经是优先级队列了,如果是空就阻塞

public E take() throws InterruptedException {
    // 获取锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 自旋
        for (;;) {
            // 获取优先级队列头节点
            E first = q.peek();
            // 优先级队列为空
            if (first == null)
                // 阻塞
                available.await();
            else {
                // 判断头元素剩余时间是否小于等于0
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    // 优先级队列出队
                    return q.poll();
                // 到这,说明剩余时间大于0
                // 头引用置空
                first = null;
                // leader线程是否为空,不为空就等待
                if (leader != null)
                    available.await();
                else {
                    // 设置leader线程为当前线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 休眠剩余秒
                        available.awaitNanos(delay);
                    } finally {
                        // 休眠结束,leader线程还是当前线程
                        // 置空leader
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // leader线程为空,并且first不为空
        // 唤醒阻塞的leader,让它再去试一次
        if (leader == null && q.peek() != null)
            available.signal();
        // 解锁
        lock.unlock();
    }
}

当生产者线程调用put之类的方法加入元素时,会触发Delayed接口中的compareTo方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间越晚。

DelayQueue是Leader-Followr模式的变种,消费者线程处于等待状态时,总是等待最先到期的元素,而不是长时间的等待。消费者线程尽量把时间花在处理任务上,最小化空等的时间,以提高线程的利用效率。

使用场景

简单的延时任务,比如提交订单60秒后发个站内消息,私以为站内消息没发成功也无所谓,所以要求不高,可以考虑使用 DelayQueue,因为这是内存型的优先级队列,没有把数据存到数据库里面或者持久化,万一服务异常重启,那队列的数据都丢了。

唠嗑广场

随着世界的各种变化,对于老赖的理解,也记录一下。

如果对方是个人,那成为老赖,还钱的可能性就是依靠人的品性了,有的人不愿放弃,就会逐步还债,有的人就放弃,就变成老赖。

但如果对方是公司呢?从曾经的p2p公司破产,到这两年的房地产的员工理财,还有房地产烂尾楼,都是公司层面的烂尾。

公司的主体是一群人,其实是没人为此负责的,什么公司法人,都是换来换去的摆设。

看看恒大吧,就是这样啊。

每个员工都是其中的一员,但成整体后就没人负责了,需要政府机构出面,通过暴力强制执行。

就是一个和尚抬水喝,两个和尚挑水喝,三个和尚没水喝。

如果放大到国家呢?

整个世界200多个国家和地区,老赖的国家有没有?有的,那谁来制裁呢?

国际组织不是暴力机器,那只有其他国家的经济封锁来制裁了,因为没有到必须战争的时候,但如果到了战争的时候,那可以直接撕毁合约的。

人没那么可靠的,好的坏的,多看看。