参考文献

延迟队列概念

顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费

使用场景

  • 有效期:限时活动、拼团等
  • 超时处理:取消超时未支付订单、超时自动确认收货等
  • 延迟处理:机器人点赞/观看数/评论/关注、等待依赖条件等
  • 重试:网络异常重试、打车派单、依赖条件未满足重试等
  • 定时任务:智能设备定时启动等

具体实现

DelayQueue延迟队列
  • JDK 中提供了一组实现延迟队列的API,位于Java.util.concurrent包下DelayQueue

  • DelayQueue是一个BlockingQueue(无界阻塞)队列,它本质就是封装了一个PriorityQueue(优先队列)。

  • PriorityQueue内部使用完全二叉堆来实现队列元素排序,我们在向DelayQueue队列中添加元素时,会给元素一个Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了Delay时间才允许从队列中取出。队列中可以放基本数据类型或自定义实体类,在存放基本数据类型时,优先队列中元素默认升序排列,自定义实体类就需要我们根据类属性值比较计算了。

  • 要实现DelayQueue延时队列,队中元素要implements Delayed 接口,这个接口里只有一个getDelay方法,用于设置延期时间。Order类中compareTo方法负责对队列中的元素进行排序。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    package java.util.concurrent;

    /**
    * A mix-in style interface for marking objects that should be
    * acted upon after a given delay.
    *
    * <p>An implementation of this interface must define a
    * {@code compareTo} method that provides an ordering consistent with
    * its {@code getDelay} method.
    *
    * @since 1.5
    * @author Doug Lea
    */
    public interface Delayed extends Comparable<Delayed> {

    /**
    * Returns the remaining delay associated with this object, in the
    * given time unit.
    *
    * @param unit the time unit
    * @return the remaining delay; zero or negative values indicate
    * that the delay has already elapsed
    */
    long getDelay(TimeUnit unit);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    import com.fasterxml.jackson.annotation.JsonFormat;
    import lombok.Data;

    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    @Data
    public class Order implements Delayed {
    /**
    * 延迟时间
    */
    @JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
    private long time;
    private String name;

    public Order(String name, long time, TimeUnit unit) {
    this.name = name;
    this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
    }

    @Override
    public long getDelay(TimeUnit unit) {
    return time - System.currentTimeMillis();
    }
    @Override
    public int compareTo(Delayed o) {
    Order Order = (Order) o;
    long diff = this.time - Order.time;
    if (diff <= 0) {
    return -1;
    } else {
    return 1;
    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.TimeUnit;

    public class DelayQueueDemo {

    public static void main(String[] args) throws InterruptedException {
    Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
    Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
    Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
    DelayQueue<Order> delayQueue = new DelayQueue<>();
    delayQueue.put(Order1);
    delayQueue.put(Order2);
    delayQueue.put(Order3);

    System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
    while (delayQueue.size() != 0) {
    /**
    * 取队列头部元素是否过期
    */
    Order task = delayQueue.poll();
    if (task != null) {
    System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.getName(), LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
    }
    Thread.sleep(1000);
    }
    }
    }
    // 输出
    订单延迟队列开始时间:2021-07-12 14:24:12
    订单:{Order1}被取消, 取消时间:{2021-07-12 14:24:17}
    订单:{Order2}被取消, 取消时间:{2021-07-12 14:24:22}
    订单:{Order3}被取消, 取消时间:{2021-07-12 14:24:27}
Quartz 定时任务
  • Quartz一款非常经典任务调度框架,在RedisRabbitMQ还未广泛应用时,超时未支付取消订单功能都是由定时任务实现的。定时任务它有一定的周期性,可能很多单子已经超时,但还没到达触发执行的时间点,那么就会造成订单处理的不够及时。

  • 引入quartz框架依赖包

    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
    </dependency>
  • 在启动类中使用@EnableScheduling注解开启定时任务功能。

    1
    2
    3
    4
    5
    6
    7
    @EnableScheduling
    @SpringBootApplication
    public class DelayqueueApplication {
    public static void main(String[] args) {
    SpringApplication.run(DelayqueueApplication.class, args);
    }
    }
  • 编写一个定时任务,每个5秒执行一次。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Component
    public class QuartzDemo {

    //每隔五秒
    @Scheduled(cron = "0/5 * * * * ? ")
    public void process(){
    System.out.println("我是定时任务!");
    }
    }
Redis Sorted Set
  • Redis的数据结构Zset,同样可以实现延迟队列的效果,主要利用它的score属性,redis通过score来为集合中的成员进行从小到大的排序。

  • 通过zadd命令向队列delayqueue 中添加元素,并设置score值表示元素过期的时间;向delayqueue 添加三个order1order2order3,分别是10秒20秒30秒后过期。

    1
    2
    3
    zadd delayqueue 10 order1
    zadd delayqueue 20 order2
    zadd delayqueue 30 order3
  • 消费端轮询队列delayqueue, 将元素排序后取最小时间与当前时间比对,若小于当前时间代表已经过期移除key

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    /**
    * 消费消息
    */
    public void pollOrderQueue() {

    while (true) {
    Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);

    String value = ((Tuple) set.toArray()[0]).getElement();
    int score = (int) ((Tuple) set.toArray()[0]).getScore();

    Calendar cal = Calendar.getInstance();
    int nowSecond = (int) (cal.getTimeInMillis() / 1000);
    if (nowSecond >= score) {
    jedis.zrem(DELAY_QUEUE, value);
    System.out.println(sdf.format(new Date()) + " removed key:" + value);
    }

    if (jedis.zcard(DELAY_QUEUE) <= 0) {
    System.out.println(sdf.format(new Date()) + " zset empty ");
    return;
    }
    Thread.sleep(1000);
    }
    }
Redis 过期回调
  • Rediskey过期回调事件,也能达到延迟队列的效果,简单来说我们开启监听key是否过期的事件,一旦key过期会触发一个callback事件。

  • 修改redis.conf文件开启notify-keyspace-events Ex

    1
    notify-keyspace-events Ex
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    ############################# EVENT NOTIFICATION ##############################

    # Redis can notify Pub/Sub clients about events happening in the key space.
    # This feature is documented at https://redis.io/topics/notifications
    #
    # For instance if keyspace events notification is enabled, and a client
    # performs a DEL operation on key "foo" stored in the Database 0, two
    # messages will be published via Pub/Sub:
    #
    # PUBLISH __keyspace@0__:foo del
    # PUBLISH __keyevent@0__:del foo
    #
    # It is possible to select the events that Redis will notify among a set
    # of classes. Every class is identified by a single character:
    #
    # K Keyspace events, published with __keyspace@<db>__ prefix.
    # E Keyevent events, published with __keyevent@<db>__ prefix.
    # g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
    # $ String commands
    # l List commands
    # s Set commands
    # h Hash commands
    # z Sorted set commands
    # x Expired events (events generated every time a key expires)
    # e Evicted events (events generated when a key is evicted for maxmemory)
    # t Stream commands
    # d Module key type events
    # m Key-miss events (Note: It is not included in the 'A' class)
    # A Alias for g$lshzxetd, so that the "AKE" string means all the events
    # (Except key-miss events which are excluded from 'A' due to their
    # unique nature).
    #
    # The "notify-keyspace-events" takes as argument a string that is composed
    # of zero or multiple characters. The empty string means that notifications
    # are disabled.
    #
    # Example: to enable list and generic events, from the point of view of the
    # event name, use:
    #
    # notify-keyspace-events Elg
    #
    # Example 2: to get the stream of the expired keys subscribing to channel
    # name __keyevent@0__:expired use:
    #
    # notify-keyspace-events Ex
    #
    # By default all notifications are disabled because most users don't need
    # this feature and the feature has some overhead. Note that if you don't
    # specify at least one of K or E, no events will be delivered.
    notify-keyspace-events ""
  • Redis监听配置,注入Bean RedisMessageListenerContainer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Configuration
    public class RedisListenerConfig {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    return container;
    }
    }
  • 编写Redis过期回调监听方法,必须继承KeyExpirationEventMessageListener ,有点类似于MQ的消息监听。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Component
    public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
    super(listenerContainer);
    }
    @Override
    public void onMessage(Message message, byte[] pattern) {
    String expiredKey = message.toString();
    System.out.println("监听到key:" + expiredKey + "已过期");
    }
    }