参考文献

延时队列的定义

  • 首先队列是一种先进先出(FIFO)的数据结构.普通队列中的元素是有序的,先进入队列的元素会被优先取出进行消费.延时队列相对普通队列最大的区别是在于其"延时"的特性上.
  • 普通队列的元素是先进先出,是按照队列入队的顺序进行处理;延时队列中的元素会指定一个延时时间,表示其希望在经过指定时间后再进行处理.

延时队列的使用场景

  • 新建的订单,如果用户在 15 分钟内未支付,则自动取消.
  • 公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户.
  • 安全工单超过 24 小时未处理,则自动拉企业微信群提醒相关责任人.
  • 用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时.

延时队列的具体实现方案

Java DelayQueue

  • DelayQueue是无界的延时阻塞队列,内部是使用优先级队列PriorityQueue实现的,其是按时间来定优先级的延时阻塞队列,只有在延时期满时才能从队列中提取元素,先过期的元素会在队首,每次从队列里面取出来的都是最先要过期的元素,当执行队列的take方法操作元素未过期是会阻塞当前线程直到元素过期为止.

    1
    2
    public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {}
    • 队列中元素必须实现Delayed接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.holelin.sundry.domain;

import lombok.Data;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
* @Description:
* @Author: HoleLin
* @CreateDate: 2022/7/25 11:35
* @UpdateUser: HoleLin
* @UpdateDate: 2022/7/25 11:35
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
@Data
public class MeetingNotice implements Delayed {

private long noticeTime;

private long meetingId;
private String title;

public MeetingNotice(String title, long meetingId, long noticeTime, TimeUnit unit) {
this.title = title;
this.meetingId = meetingId;
this.noticeTime = System.currentTimeMillis() + (noticeTime > 0 ? unit.toMillis(noticeTime) : 0);
}

@Override
public long getDelay(TimeUnit unit) {
return noticeTime - System.currentTimeMillis();
}

@Override
public int compareTo(Delayed o) {
MeetingNotice meetingNotice = (MeetingNotice) o;
long diff = this.noticeTime - meetingNotice.noticeTime;
if (diff <= 0) {
return -1;
} else {
return 1;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.holelin.sundry.test;

import cn.hutool.core.collection.CollUtil;
import com.holelin.sundry.domain.MeetingNotice;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

import java.util.Objects;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;

@Slf4j
class MeetingNoticeTest {

@Test
public void testDelayQueue() {
final MeetingNotice notice1 = new MeetingNotice("晨会", 1L, 5, TimeUnit.SECONDS);
final MeetingNotice notice2 = new MeetingNotice("晨会1", 2L, 10, TimeUnit.SECONDS);
final MeetingNotice notice3 = new MeetingNotice("晨会2", 3L, 15, TimeUnit.SECONDS);

final DelayQueue<MeetingNotice> meetingNotices = new DelayQueue<>();
meetingNotices.add(notice1);
meetingNotices.add(notice2);
meetingNotices.add(notice3);
while (CollUtil.isNotEmpty(meetingNotices)) {
final MeetingNotice notice = meetingNotices.poll();
if (Objects.nonNull(notice)) {
log.info("会议:{},还有15分钟中开始", notice.getTitle());
}
}
}
}

定时任务

  • 在启动类中使用@EnableScheduling注解开启定时任务功能.

    1
    2
    3
    4
    5
    6
    7
    @EnableScheduling
    @SpringBootApplication
    public class DelayqueueApplication {
    public static void main(String[] args) {
    SpringApplication.run(DelayqueueApplication.class, args);
    }
    }
  • 编写一个定时任务,每个5秒执行一次.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Component
    public class QuartzDemo {

    //每隔五秒
    @Scheduled(cron = "0/5 * * * * ? ")
    public void process(){
    System.out.println("我是定时任务!");
    }
    }

Redis Sorted Set

  • Redis的数据结构Zset,同样可以实现延迟队列的效果,主要利用它的score属性,Redis通过score来为集合中的成员进行从小到打的排序.

  • 通过zadd命令向队列delayqueue 中添加元素,并设置score值表示元素过期的时间;向delayqueue 添加三个order1order2order3,分别是10秒20秒30秒后过期.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    package com.holelin.redis.test;

    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.Tuple;

    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    import java.util.Set;

    /**
    * @Description:
    * @Author: HoleLin
    * @CreateDate: 2022/7/25 14:00
    * @UpdateUser: HoleLin
    * @UpdateDate: 2022/7/25 14:00
    * @UpdateRemark: 修改内容
    * @Version: 1.0
    */
    public class DelayQueueTest {

    private JedisPool jedisPool = null;
    // Redis服务器IP
    private String ADDR = "127.0.0.1";
    // Redis的端口号
    private int PORT = 6379;

    private static String DELAY_QUEUE = "delayqueue";

    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public DelayQueueTest() {
    jedisPool = new JedisPool(ADDR, PORT);
    }

    public static void main(String[] args) {

    DelayQueueTest redisDelay = new DelayQueueTest();
    redisDelay.pushOrderQueue();
    redisDelay.pollOrderQueue();
    redisDelay.deleteZSet();
    }

    public void deleteZSet() {
    Jedis jedis = jedisPool.getResource();
    jedis.del(DELAY_QUEUE);
    }

    /**
    * 消息入队
    */
    public void pushOrderQueue() {
    Jedis jedis = jedisPool.getResource();
    Calendar cal1 = Calendar.getInstance();
    cal1.add(Calendar.SECOND, 10);
    int order1 = (int) (cal1.getTimeInMillis() / 1000);

    Calendar cal2 = Calendar.getInstance();
    cal2.add(Calendar.SECOND, 20);
    int order2 = (int) (cal2.getTimeInMillis() / 1000);

    Calendar cal3 = Calendar.getInstance();
    cal3.add(Calendar.SECOND, 30);
    int order3 = (int) (cal3.getTimeInMillis() / 1000);

    jedis.zadd(DELAY_QUEUE, order1, "order1");
    jedis.zadd(DELAY_QUEUE, order2, "order2");
    jedis.zadd(DELAY_QUEUE, order3, "order3");
    System.out.println(sdf.format(new Date()) + " add finished.");
    }

    /**
    * 消费消息
    */
    public void pollOrderQueue() {

    Jedis jedis = jedisPool.getResource();
    while (true) {

    Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);

    String value = ((Tuple) set.toArray()[0]).getElement();
    int score = (int) ((Tuple) set.toArray()[0]).getScore();

    Calendar cal = Calendar.getInstance();
    int nowSecond = (int) (cal.getTimeInMillis() / 1000);

    if (nowSecond >= score) {
    jedis.zrem(DELAY_QUEUE, value);
    System.out.println(sdf.format(new Date()) + " removed key:" + value);
    }

    if (jedis.zcard(DELAY_QUEUE) <= 0) {
    System.out.println(sdf.format(new Date()) + " zset empty ");
    return;
    }
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    }

Redis过期回调

  • Rediskey过期回调事件,也能达到延迟队列的效果,简单来说我们开启监听key是否过期的事件,一旦key过期会触发一个callback事件.

    • 修改redis.conf文件开启notify-keyspace-events Ex

    • 实现过期Key监听器

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      package com.holelin.redis.utils;

      import lombok.extern.slf4j.Slf4j;
      import org.springframework.data.redis.connection.Message;
      import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
      import org.springframework.data.redis.listener.RedisMessageListenerContainer;
      import org.springframework.stereotype.Component;
      /**
      * @Description:
      * @Author: HoleLin
      * @CreateDate: 2022/7/26 15:45
      * @UpdateUser: HoleLin
      * @UpdateDate: 2022/7/26 15:45
      * @UpdateRemark: 修改内容
      * @Version: 1.0
      */
      @Slf4j
      @Component
      public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

      public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
      super(listenerContainer);
      }

      @Override
      public void onMessage(Message message, byte[] pattern) {
      String expiredKey = message.toString();
      log.info("监听到key:" + expiredKey + "已过期");
      }
      }
    • 配置监听器

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      package com.holelin.redis.config;

      import com.fasterxml.jackson.annotation.JsonAutoDetect;
      import com.fasterxml.jackson.annotation.JsonTypeInfo;
      import com.fasterxml.jackson.annotation.PropertyAccessor;
      import com.fasterxml.jackson.databind.ObjectMapper;
      import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.data.redis.connection.RedisConnectionFactory;
      import org.springframework.data.redis.core.RedisTemplate;
      import org.springframework.data.redis.listener.RedisMessageListenerContainer;
      import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

      import java.io.Serializable;
      import java.net.UnknownHostException;

      /**
      * @Description: Redis 配置类
      * @Author: HoleLin
      * @CreateDate: 2020/9/4 10:36
      * @UpdateUser: HoleLin
      * @UpdateDate: 2020/9/4 10:36
      * @UpdateRemark: 修改内容
      * @Version: 1.0
      */
      @Configuration
      public class RedisAutoConfiguration {

      @Bean
      RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

      RedisMessageListenerContainer container = new RedisMessageListenerContainer();
      container.setConnectionFactory(connectionFactory);
      return container;
      }
      }

RabbitMQ 延时队列

  • 利用 RabbitMQ 做延时队列是比较常见的一种方式,而实际上RabbitMQ 自身并没有直接支持提供延迟队列功能,而是通过 RabbitMQ 消息队列的 TTLDXL这两个属性间接实现的.

  • Time To Live(TTL) :

    • TTL 顾名思义:指的是消息的存活时间,RabbitMQ可以通过x-message-tt参数来设置指定Queue(队列)和 Message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒.

    • RabbitMQ 可以从两种维度设置消息过期时间,分别是队列消息本身.

      • 设置队列过期时间,那么队列中所有消息都具有相同的过期时间.

      • 设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息TTL都可以不同.

    • 如果同时设置队列和队列中消息的TTL,则TTL值以两者中较小的值为准.而队列中的消息存在队列中的时间,一旦超过TTL过期时间则成为Dead Letter(死信)

  • Dead Letter Exchanges(DLX)

    • DLX即死信交换机,绑定在死信交换机上的即死信队列.RabbitMQQueue(队列)可以配置两个参数x-dead-letter-exchangex-dead-letter-routing-key(可选),一旦队列内出现了Dead Letter(死信),则按照这两个参数可以将消息重新路由到另一个Exchange(交换机),让消息重新被消费.
    • x-dead-letter-exchange:队列中出现Dead Letter后将Dead Letter重新路由转发到指定 exchange(交换机).
    • x-dead-letter-routing-key:指定routing-key发送,一般为要指定转发的队列.
    • 队列出现Dead Letter的情况有:
      • 消息超时未消费,也就是 TTL 过期了
      • 队列达到最大长度
      • 消息被消费端拒绝(basic.reject/basic.nack)并且不再重新投递 requeue=false

Redisson DelayQueue

  • 延时队列的具体实现最好查看源码,涉及三个队列和一个发布订阅通道
    • 阻塞队列 ListKEY = queueName,执行 BLPOP 命令从左端弹出元素,右端插入元素.当一条数据到达过期时间的时候,会从redisson_delay_queue:{DelayMessage}中移除,加入到这个队列,客户端监听的就是这个队列,这个队列里面的全都是已经过期的数据.
    • 有序集合 Sorted SetKEY = redisson_delay_queue_timeout:{queueName},score 是元素的过期时间,按从小到大排序,过期时间小于当前时间表示已过期,删除集合中的元素,同时将普通集合 List中对应的元素删除,并将元素添加到阻塞队列 List等待客户端消费.
    • 普通集合 ListKEY = redisson_delay_queue:{DelayMessage},按顺序从右端添加元素,元素过期会被删除.
    • 发布/订阅通道redisson_delay_queue_channel,往延时队列中放入一个数据时,会将延时时间publish出去,客户端收到之后会按这个时间延时之后再执行定时任务.
示例
  • 依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
      <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.0.RELEASE</version>
    <relativePath/>
    <!-- lookup parent from repository -->
    </parent>


    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>${redisson.version}</version>
    <exclusions>
    <exclusion>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-data-31</artifactId>
    </exclusion>
    </exclusions>
    </dependency>
    <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-data-22</artifactId>
    <version>${redisson.version}</version>
    </dependency>
    </dependencies>
  • 配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    import org.redisson.Redisson;
    import org.redisson.api.RedissonClient;
    import org.redisson.config.Config;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    @Configuration
    public class RedissonConfig {

    @Autowired
    private RedisProperties redisProperties;

    @Bean
    public RedissonClient redissonClient() {
    Config config = new Config();
    String redisUrl = String.format("redis://%s:%s", redisProperties.getHost(), redisProperties.getPort());
    config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword());
    config.useSingleServer().setDatabase(1);
    return Redisson.create(config);
    }

    @Bean
    public RedisTemplate<String, Serializable> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
    RedisTemplate<String, Serializable> template = new RedisTemplate<>();
    template.setConnectionFactory(redisConnectionFactory);

    Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
    ObjectMapper om = new ObjectMapper();
    om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
    ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.WRAPPER_ARRAY);
    jackson2JsonRedisSerializer.setObjectMapper(om);
    template.setConnectionFactory(redisConnectionFactory);
    template.setKeySerializer(jackson2JsonRedisSerializer);
    template.setValueSerializer(jackson2JsonRedisSerializer);
    template.setHashKeySerializer(jackson2JsonRedisSerializer);
    template.setHashValueSerializer(jackson2JsonRedisSerializer);
    template.afterPropertiesSet();
    return template;
    }

    }
  • yaml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    spring:
    redis:
    database: 1
    host: localhost
    password: holelin
    port: 6379
    timeout: 500
    jedis:
    pool:
    max-active: 50
    max-idle: 20
    max-wait: 3000
    min-idle: 2
  • Redisson延迟队列工具类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    import lombok.extern.slf4j.Slf4j;
    import org.redisson.api.RBlockingDeque;
    import org.redisson.api.RBlockingQueue;
    import org.redisson.api.RDelayedQueue;
    import org.redisson.api.RedissonClient;
    import org.springframework.stereotype.Component;

    import java.util.Map;
    import java.util.concurrent.TimeUnit;

    /**
    * @author HoleLin
    */
    @Slf4j
    @Component
    public class RedissonDelayQueueUtils {

    private final RedissonClient redissonClient;

    public RedissonDelayQueueUtils(RedissonClient redissonClient) {
    this.redissonClient = redissonClient;
    }

    public <T> void sendMessage(T value, long delay, TimeUnit timeUnit, String queueCode) {
    try {
    RBlockingQueue<Object> blockingDeque = redissonClient.getBlockingQueue(queueCode);
    RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
    //如果延时队列中原先存在这条消息,remove可以删除延时队列中的这条消息
    //如果多次发送相同的消息,先remove再offer,只有最后一条会被延时消费,延时时间以最后一条的发送时间开始延时
    delayedQueue.remove(value);
    delayedQueue.offer(value, delay, timeUnit);
    log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delay) + "秒");
    } catch (Exception e) {
    log.error("(添加延时队列失败) {}", e.getMessage());
    throw new RuntimeException("(添加延时队列失败)");
    }
    }

    /**
    * 获取延迟队列
    *
    * @param queueCode 队列名称
    * @param <T> 队列具体内容泛型
    * @throws InterruptedException
    */
    public <T> T getDelayQueue(String queueCode) throws InterruptedException {
    RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(queueCode);
    T value = (T) blockingDeque.take();
    return value;
    }
    }

  • 延迟队列业务枚举类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    /**
    * @author HoleLin
    */
    public enum RedissonDelayQueueEnum {
    AI_QUEUE_RECORD_EXPIRED("AI_QUEUE_RECORD_EXPIRED", "aiQueueRecordExpired");
    /**
    * 延迟队列的QueueCode
    */
    private String code;
    /**
    * 延迟队列具体业务实现的Bean
    * 可通过Spring上下文获取
    */
    private String beanId;

    RedissonDelayQueueEnum(String code, String beanId) {
    this.code = code;
    this.beanId = beanId;
    }

    public String getCode() {
    return code;
    }

    public String getBeanId() {
    return beanId;
    }
    }
  • 延迟队列执行器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /**
    * 延迟队列执行器
    *
    * @author HoleLin
    */
    public interface RedissonDelayHandle<T> {

    void execute(T t);
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    /**
    * Ai队列记录失效处理类
    *
    * @author HoleLin
    */
    @Slf4j
    @Component(value = "aiQueueRecordExpired")
    public class AiQueueRecordExpiredHandler implements RedissonDelayHandle<AITaskRecordInfo> {

    @Override
    public void execute(AITaskRecordInfo aiTaskRecordInfo) {
    // 具体实现
    }
    }
  • 创建延迟队列消费线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    import com.holelin.enums.RedissonDelayQueueEnum;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.context.ApplicationContext;
    import org.springframework.core.annotation.Order;
    import org.springframework.stereotype.Component;

    /**
    * @author HoleLin
    */
    @Slf4j
    @Component
    @Order(value = 3)
    public class RedissonDelayQueueRunner implements ApplicationRunner {

    private RedissonDelayQueueUtils redissonDelayQueueUtils;
    private ApplicationContext applicationContext;

    public RedissonDelayQueueRunner(RedissonDelayQueueUtils redissonDelayQueueUtils, ApplicationContext applicationContext) {
    this.redissonDelayQueueUtils = redissonDelayQueueUtils;
    this.applicationContext = applicationContext;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
    new Thread(() -> {
    while (true) {
    RedissonDelayQueueEnum[] queueEnums = RedissonDelayQueueEnum.values();
    for (RedissonDelayQueueEnum queueEnum : queueEnums) {
    Object value = null;
    try {
    value = redissonDelayQueueUtils.getDelayQueue(queueEnum.getCode());
    if (value != null) {
    RedissonDelayHandle redisDelayQueueHandle = (RedissonDelayHandle) applicationContext.getBean(queueEnum.getBeanId());
    redisDelayQueueHandle.execute(value);
    }
    } catch (InterruptedException e) {
    log.error("(Redis延迟队列异常中断) {}", e.getMessage());
    }
    }
    }
    }).start();
    }
    }

Redisson延迟队列源码分析

1
redisson 3.23.4

数据结构

  • redisson_delay_queue_timeout:{queue_name} 消息延时队列,ZSET结构(value为消息,score为过期时间),这样就可以知道当前过期的消息.
  • redisson_delay_queue:{queue_name} 消息顺序队列,LIST结构,按照消息添加顺序存储,移除消息时可以按照添加顺序删除.
  • {queue_name} 消息目标队列,LIST结构,存储实际到期可以被消费的消息供消费者拉取消费.
  • redisson_delay_queue_channel:{queue_name} 发布订阅channel主题,用于通知客户端定时器从定期队列转移到期的消息到目标队列.

消息生产

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
if (delay < 0) {
throw new IllegalArgumentException("Delay can't be negative");
}
// 延迟时间 毫秒级
long delayInMs = timeUnit.toMillis(delay);
// 消息过期时间 = 当前时间 + 延迟时间
long timeout = System.currentTimeMillis() + delayInMs;
// 生成随机ID,目的是为了确保任务的唯一性和避免数据冲突
byte[] random = getServiceManager().generateIdArray(8);
// KEYS:
// KEYS[1]: getRawName()
// KEYS[2]: timeoutSetName
// KEYS[3]: queueName
// KEYS[4]: channelName
// ARGV:
// ARGV[1]: timeout
// ARGV[2]: random
// ARGV[3]: encode(e)
// 将消息和到期时间插入【消息延时队列】和【消息顺序队列】
// 如果最近到期的消息是刚刚插入的消息,则对指定主题发布到期时间,目的是为了让客户端定时去把【消息延时队列】里的到期数据移动到【消息目标队列】
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
// 构造一个二进制字符串,用于表示任务的元数据
// 'Bc0' 和 'Lc0' 是 struct.pack 的格式字符串,分别表示无符号字节和无符号长整型(64位),后面跟的是变长字符串
"local value = struct.pack('Bc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]);"
// 将任务添加到timeoutSetName有序集合中,以timeout延迟时间为分数进行排序
+ "redis.call('zadd', KEYS[2], ARGV[1], value);"
// 将任务添加到queueName列表中
+ "redis.call('rpush', KEYS[3], value);"
// if new object added to queue head when publish its startTime
// to all scheduler workers
// 检查新添加的任务是否是当前timeoutSetName有序集合中的第一个任务(即最早到期的任务),如果是,则发布一条消息通知调度器
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
+ "if v[1] == value then "
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end;",
Arrays.asList(getRawName(), timeoutSetName, queueName, channelName),
timeout, random, encode(e));
}
  • 通过redissonClient.getDelayedQueue获取RDelayedQueue对象
  • 然后delayedQueue调用offer方法去保存消息
  • 最后真正的保存逻辑是由RedissonDelayedQueue执行offerAsync方法调用的lua脚本

定时器转移消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
channelName = prefixName("redisson_delay_queue_channel", getRawName());
queueName = prefixName("redisson_delay_queue", getRawName());
timeoutSetName = prefixName("redisson_delay_queue_timeout", getRawName());

QueueTransferTask task = new QueueTransferTask(commandExecutor.getServiceManager()) {

@Override
protected RFuture<Long> pushTaskAsync() {
// KEYS:
// KEYS[1]: getRawName()
// KEYS[2]: timeoutSetName
// KEYS[3]: queueName
// ARGV:
// ARGV[1]: System.currentTimeMillis() 当前时间毫秒数
// ARGV[2]: 100 获取数量
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
// 获取过期任务:从timeoutSetName中获取所有分数在0到ARGV[1]之间的成员(即过期任务),限制最多ARGV[2]个
"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
// 如果存在过期任务
+ "if #expiredValues > 0 then "
// 遍历每个过期任务
+ "for i, v in ipairs(expiredValues) do "
// 解析任务数据
+ "local randomId, value = struct.unpack('Bc0Lc0', v);"
// 将任务推入目标队列
+ "redis.call('rpush', KEYS[1], value);"
// 从源队列中移除该任务
+ "redis.call('lrem', KEYS[3], 1, v);"
+ "end; "
// 从有序集中移除这些过期任务
+ "redis.call('zrem', KEYS[2], unpack(expiredValues));"
+ "end; "
// get startTime from scheduler queue head task
// 获取下一次任务调度的起始时间
+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
+ "if v[1] ~= nil then " // 如果存在头部任务
+ "return v[2]; " // 返回该任务的分数作为下一次调度的起始时间
+ "end "
+ "return nil;", // 如果没有任务,则返回null
Arrays.asList(getRawName(), timeoutSetName, queueName),
System.currentTimeMillis(), 100);
}

@Override
protected RTopic getTopic() {
return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName);
}
};

queueTransferService.schedule(queueName, task);

this.queueTransferService = queueTransferService;
}

消息消费

1
2
3
4
@Override
public RFuture<V> takeAsync() {
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), 0);
}