参考文献

延时队列的定义

  • 首先队列是一种先进先出(FIFO)的数据结构.普通队列中的元素是有序的,先进入队列的元素会被优先取出进行消费.延时队列相对普通队列最大的区别是在于其"延时"的特性上.
  • 普通队列的元素是先进先出,是按照队列入队的顺序进行处理;延时队列中的元素会指定一个延时时间,表示其希望在经过指定时间后再进行处理.

延时队列的使用场景

  • 新建的订单,如果用户在 15 分钟内未支付,则自动取消。
  • 公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户。
  • 安全工单超过 24 小时未处理,则自动拉企业微信群提醒相关责任人。
  • 用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。

延时队列的具体实现方案

Java DelayQueue

  • DelayQueue是无界的延时阻塞队列,内部是使用优先级队列PriorityQueue实现的,其是按时间来定优先级的延时阻塞队列,只有在延时期满时才能从队列中提取元素,先过期的元素会在队首,每次从队列里面取出来的都是最先要过期的元素,当执行队列的take方法操作元素未过期是会阻塞当前线程直到元素过期为止.

    1
    2
    public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {}
    • 队列中元素必须实现Delayed接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.holelin.sundry.domain;

import lombok.Data;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
* @Description:
* @Author: HoleLin
* @CreateDate: 2022/7/25 11:35
* @UpdateUser: HoleLin
* @UpdateDate: 2022/7/25 11:35
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
@Data
public class MeetingNotice implements Delayed {

private long noticeTime;

private long meetingId;
private String title;

public MeetingNotice(String title, long meetingId, long noticeTime, TimeUnit unit) {
this.title = title;
this.meetingId = meetingId;
this.noticeTime = System.currentTimeMillis() + (noticeTime > 0 ? unit.toMillis(noticeTime) : 0);
}

@Override
public long getDelay(TimeUnit unit) {
return noticeTime - System.currentTimeMillis();
}

@Override
public int compareTo(Delayed o) {
MeetingNotice meetingNotice = (MeetingNotice) o;
long diff = this.noticeTime - meetingNotice.noticeTime;
if (diff <= 0) {
return -1;
} else {
return 1;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.holelin.sundry.test;

import cn.hutool.core.collection.CollUtil;
import com.holelin.sundry.domain.MeetingNotice;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

import java.util.Objects;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;

@Slf4j
class MeetingNoticeTest {

@Test
public void testDelayQueue() {
final MeetingNotice notice1 = new MeetingNotice("晨会", 1L, 5, TimeUnit.SECONDS);
final MeetingNotice notice2 = new MeetingNotice("晨会1", 2L, 10, TimeUnit.SECONDS);
final MeetingNotice notice3 = new MeetingNotice("晨会2", 3L, 15, TimeUnit.SECONDS);

final DelayQueue<MeetingNotice> meetingNotices = new DelayQueue<>();
meetingNotices.add(notice1);
meetingNotices.add(notice2);
meetingNotices.add(notice3);
while (CollUtil.isNotEmpty(meetingNotices)) {
final MeetingNotice notice = meetingNotices.poll();
if (Objects.nonNull(notice)) {
log.info("会议:{},还有15分钟中开始", notice.getTitle());
}
}
}
}

定时任务

  • 在启动类中使用@EnableScheduling注解开启定时任务功能。

    1
    2
    3
    4
    5
    6
    7
    @EnableScheduling
    @SpringBootApplication
    public class DelayqueueApplication {
    public static void main(String[] args) {
    SpringApplication.run(DelayqueueApplication.class, args);
    }
    }
  • 编写一个定时任务,每个5秒执行一次。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Component
    public class QuartzDemo {

    //每隔五秒
    @Scheduled(cron = "0/5 * * * * ? ")
    public void process(){
    System.out.println("我是定时任务!");
    }
    }

Redis Sorted Set

  • Redis的数据结构Zset,同样可以实现延迟队列的效果,主要利用它的score属性,Redis通过score来为集合中的成员进行从小到打的排序.

  • 通过zadd命令向队列delayqueue 中添加元素,并设置score值表示元素过期的时间;向delayqueue 添加三个order1order2order3,分别是10秒20秒30秒后过期。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    package com.holelin.redis.test;

    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.Tuple;

    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    import java.util.Set;

    /**
    * @Description:
    * @Author: HoleLin
    * @CreateDate: 2022/7/25 14:00
    * @UpdateUser: HoleLin
    * @UpdateDate: 2022/7/25 14:00
    * @UpdateRemark: 修改内容
    * @Version: 1.0
    */
    public class DelayQueueTest {

    private JedisPool jedisPool = null;
    // Redis服务器IP
    private String ADDR = "127.0.0.1";
    // Redis的端口号
    private int PORT = 6379;

    private static String DELAY_QUEUE = "delayqueue";

    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public DelayQueueTest() {
    jedisPool = new JedisPool(ADDR, PORT);
    }

    public static void main(String[] args) {

    DelayQueueTest redisDelay = new DelayQueueTest();
    redisDelay.pushOrderQueue();
    redisDelay.pollOrderQueue();
    redisDelay.deleteZSet();
    }

    public void deleteZSet() {
    Jedis jedis = jedisPool.getResource();
    jedis.del(DELAY_QUEUE);
    }

    /**
    * 消息入队
    */
    public void pushOrderQueue() {
    Jedis jedis = jedisPool.getResource();
    Calendar cal1 = Calendar.getInstance();
    cal1.add(Calendar.SECOND, 10);
    int order1 = (int) (cal1.getTimeInMillis() / 1000);

    Calendar cal2 = Calendar.getInstance();
    cal2.add(Calendar.SECOND, 20);
    int order2 = (int) (cal2.getTimeInMillis() / 1000);

    Calendar cal3 = Calendar.getInstance();
    cal3.add(Calendar.SECOND, 30);
    int order3 = (int) (cal3.getTimeInMillis() / 1000);

    jedis.zadd(DELAY_QUEUE, order1, "order1");
    jedis.zadd(DELAY_QUEUE, order2, "order2");
    jedis.zadd(DELAY_QUEUE, order3, "order3");
    System.out.println(sdf.format(new Date()) + " add finished.");
    }

    /**
    * 消费消息
    */
    public void pollOrderQueue() {

    Jedis jedis = jedisPool.getResource();
    while (true) {

    Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);

    String value = ((Tuple) set.toArray()[0]).getElement();
    int score = (int) ((Tuple) set.toArray()[0]).getScore();

    Calendar cal = Calendar.getInstance();
    int nowSecond = (int) (cal.getTimeInMillis() / 1000);

    if (nowSecond >= score) {
    jedis.zrem(DELAY_QUEUE, value);
    System.out.println(sdf.format(new Date()) + " removed key:" + value);
    }

    if (jedis.zcard(DELAY_QUEUE) <= 0) {
    System.out.println(sdf.format(new Date()) + " zset empty ");
    return;
    }
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    }

Redis过期回调

  • Rediskey过期回调事件,也能达到延迟队列的效果,简单来说我们开启监听key是否过期的事件,一旦key过期会触发一个callback事件。

    • 修改redis.conf文件开启notify-keyspace-events Ex

    • 实现过期Key监听器

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      package com.holelin.redis.utils;

      import lombok.extern.slf4j.Slf4j;
      import org.springframework.data.redis.connection.Message;
      import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
      import org.springframework.data.redis.listener.RedisMessageListenerContainer;
      import org.springframework.stereotype.Component;
      /**
      * @Description:
      * @Author: HoleLin
      * @CreateDate: 2022/7/26 15:45
      * @UpdateUser: HoleLin
      * @UpdateDate: 2022/7/26 15:45
      * @UpdateRemark: 修改内容
      * @Version: 1.0
      */
      @Slf4j
      @Component
      public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

      public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
      super(listenerContainer);
      }

      @Override
      public void onMessage(Message message, byte[] pattern) {
      String expiredKey = message.toString();
      log.info("监听到key:" + expiredKey + "已过期");
      }
      }
    • 配置监听器

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      package com.holelin.redis.config;

      import com.fasterxml.jackson.annotation.JsonAutoDetect;
      import com.fasterxml.jackson.annotation.JsonTypeInfo;
      import com.fasterxml.jackson.annotation.PropertyAccessor;
      import com.fasterxml.jackson.databind.ObjectMapper;
      import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.data.redis.connection.RedisConnectionFactory;
      import org.springframework.data.redis.core.RedisTemplate;
      import org.springframework.data.redis.listener.RedisMessageListenerContainer;
      import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

      import java.io.Serializable;
      import java.net.UnknownHostException;

      /**
      * @Description: Redis 配置类
      * @Author: HoleLin
      * @CreateDate: 2020/9/4 10:36
      * @UpdateUser: HoleLin
      * @UpdateDate: 2020/9/4 10:36
      * @UpdateRemark: 修改内容
      * @Version: 1.0
      */
      @Configuration
      public class RedisAutoConfiguration {

      @Bean
      RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

      RedisMessageListenerContainer container = new RedisMessageListenerContainer();
      container.setConnectionFactory(connectionFactory);
      return container;
      }
      }

RabbitMQ 延时队列

  • 利用 RabbitMQ 做延时队列是比较常见的一种方式,而实际上RabbitMQ 自身并没有直接支持提供延迟队列功能,而是通过 RabbitMQ 消息队列的 TTLDXL这两个属性间接实现的。

  • Time To Live(TTL) :

    • TTL 顾名思义:指的是消息的存活时间,RabbitMQ可以通过x-message-tt参数来设置指定Queue(队列)和 Message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒。

    • RabbitMQ 可以从两种维度设置消息过期时间,分别是队列消息本身.

      • 设置队列过期时间,那么队列中所有消息都具有相同的过期时间。

      • 设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息TTL都可以不同。

    • 如果同时设置队列和队列中消息的TTL,则TTL值以两者中较小的值为准。而队列中的消息存在队列中的时间,一旦超过TTL过期时间则成为Dead Letter(死信)

  • Dead Letter ExchangesDLX

    • DLX即死信交换机,绑定在死信交换机上的即死信队列。RabbitMQQueue(队列)可以配置两个参数x-dead-letter-exchangex-dead-letter-routing-key(可选),一旦队列内出现了Dead Letter(死信),则按照这两个参数可以将消息重新路由到另一个Exchange(交换机),让消息重新被消费。
    • x-dead-letter-exchange:队列中出现Dead Letter后将Dead Letter重新路由转发到指定 exchange(交换机)。
    • x-dead-letter-routing-key:指定routing-key发送,一般为要指定转发的队列。
    • 队列出现Dead Letter的情况有:
      • 消息超时未消费,也就是 TTL 过期了
      • 队列达到最大长度
      • 消息被消费端拒绝(basic.reject/basic.nack)并且不再重新投递 requeue=false

Redisson DelayQueue

  • 延时队列的具体实现最好查看源码,涉及三个队列和一个发布订阅通道
    • 阻塞队列 ListKEY = queueName,执行 BLPOP 命令从左端弹出元素,右端插入元素。当一条数据到达过期时间的时候,会从redisson_delay_queue:{DelayMessage}中移除,加入到这个队列,客户端监听的就是这个队列,这个队列里面的全都是已经过期的数据。
    • 有序集合 Sorted SetKEY = redisson_delay_queue_timeout:{queueName}score 是元素的过期时间,按从小到大排序,过期时间小于当前时间表示已过期,删除集合中的元素,同时将普通集合 List中对应的元素删除,并将元素添加到阻塞队列 List等待客户端消费。
    • 普通集合 ListKEY = redisson_delay_queue:{DelayMessage},按顺序从右端添加元素,元素过期会被删除。
    • 发布/订阅通道redisson_delay_queue_channel,往延时队列中放入一个数据时,会将延时时间publish出去,客户端收到之后会按这个时间延时之后再执行定时任务。
示例
  • 依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
      <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.0.RELEASE</version>
    <relativePath/>
    <!-- lookup parent from repository -->
    </parent>


    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>${redisson.version}</version>
    <exclusions>
    <exclusion>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-data-31</artifactId>
    </exclusion>
    </exclusions>
    </dependency>
    <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-data-22</artifactId>
    <version>${redisson.version}</version>
    </dependency>
    </dependencies>
  • 配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    import org.redisson.Redisson;
    import org.redisson.api.RedissonClient;
    import org.redisson.config.Config;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    @Configuration
    public class RedissonConfig {

    @Autowired
    private RedisProperties redisProperties;

    @Bean
    public RedissonClient redissonClient() {
    Config config = new Config();
    String redisUrl = String.format("redis://%s:%s", redisProperties.getHost(), redisProperties.getPort());
    config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword());
    config.useSingleServer().setDatabase(1);
    return Redisson.create(config);
    }

    @Bean
    public RedisTemplate<String, Serializable> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
    RedisTemplate<String, Serializable> template = new RedisTemplate<>();
    template.setConnectionFactory(redisConnectionFactory);

    Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
    ObjectMapper om = new ObjectMapper();
    om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
    ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.WRAPPER_ARRAY);
    jackson2JsonRedisSerializer.setObjectMapper(om);
    template.setConnectionFactory(redisConnectionFactory);
    template.setKeySerializer(jackson2JsonRedisSerializer);
    template.setValueSerializer(jackson2JsonRedisSerializer);
    template.setHashKeySerializer(jackson2JsonRedisSerializer);
    template.setHashValueSerializer(jackson2JsonRedisSerializer);
    template.afterPropertiesSet();
    return template;
    }

    }
  • yaml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    spring:
    redis:
    database: 1
    host: localhost
    password: holelin
    port: 6379
    timeout: 500
    jedis:
    pool:
    max-active: 50
    max-idle: 20
    max-wait: 3000
    min-idle: 2
  • Redisson延迟队列工具类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    import lombok.extern.slf4j.Slf4j;
    import org.redisson.api.RBlockingDeque;
    import org.redisson.api.RBlockingQueue;
    import org.redisson.api.RDelayedQueue;
    import org.redisson.api.RedissonClient;
    import org.springframework.stereotype.Component;

    import java.util.Map;
    import java.util.concurrent.TimeUnit;

    /**
    * @author HoleLin
    */
    @Slf4j
    @Component
    public class RedissonDelayQueueUtils {

    private final RedissonClient redissonClient;

    public RedissonDelayQueueUtils(RedissonClient redissonClient) {
    this.redissonClient = redissonClient;
    }

    public <T> void sendMessage(T value, long delay, TimeUnit timeUnit, String queueCode) {
    try {
    RBlockingQueue<Object> blockingDeque = redissonClient.getBlockingQueue(queueCode);
    RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
    //如果延时队列中原先存在这条消息,remove可以删除延时队列中的这条消息
    //如果多次发送相同的消息,先remove再offer,只有最后一条会被延时消费,延时时间以最后一条的发送时间开始延时
    delayedQueue.remove(value);
    delayedQueue.offer(value, delay, timeUnit);
    log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delay) + "秒");
    } catch (Exception e) {
    log.error("(添加延时队列失败) {}", e.getMessage());
    throw new RuntimeException("(添加延时队列失败)");
    }
    }

    /**
    * 获取延迟队列
    *
    * @param queueCode 队列名称
    * @param <T> 队列具体内容泛型
    * @throws InterruptedException
    */
    public <T> T getDelayQueue(String queueCode) throws InterruptedException {
    RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(queueCode);
    T value = (T) blockingDeque.take();
    return value;
    }
    }

  • 延迟队列业务枚举类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    /**
    * @author HoleLin
    */
    public enum RedissonDelayQueueEnum {
    AI_QUEUE_RECORD_EXPIRED("AI_QUEUE_RECORD_EXPIRED", "aiQueueRecordExpired");
    /**
    * 延迟队列的QueueCode
    */
    private String code;
    /**
    * 延迟队列具体业务实现的Bean
    * 可通过Spring上下文获取
    */
    private String beanId;

    RedissonDelayQueueEnum(String code, String beanId) {
    this.code = code;
    this.beanId = beanId;
    }

    public String getCode() {
    return code;
    }

    public String getBeanId() {
    return beanId;
    }
    }
  • 延迟队列执行器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /**
    * 延迟队列执行器
    *
    * @author HoleLin
    */
    public interface RedissonDelayHandle<T> {

    void execute(T t);
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    /**
    * Ai队列记录失效处理类
    *
    * @author HoleLin
    */
    @Slf4j
    @Component(value = "aiQueueRecordExpired")
    public class AiQueueRecordExpiredHandler implements RedissonDelayHandle<AITaskRecordInfo> {

    @Override
    public void execute(AITaskRecordInfo aiTaskRecordInfo) {
    // 具体实现
    }
    }
  • 创建延迟队列消费线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    import com.holelin.enums.RedissonDelayQueueEnum;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.context.ApplicationContext;
    import org.springframework.core.annotation.Order;
    import org.springframework.stereotype.Component;

    /**
    * @author HoleLin
    */
    @Slf4j
    @Component
    @Order(value = 3)
    public class RedissonDelayQueueRunner implements ApplicationRunner {

    private RedissonDelayQueueUtils redissonDelayQueueUtils;
    private ApplicationContext applicationContext;

    public RedissonDelayQueueRunner(RedissonDelayQueueUtils redissonDelayQueueUtils, ApplicationContext applicationContext) {
    this.redissonDelayQueueUtils = redissonDelayQueueUtils;
    this.applicationContext = applicationContext;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
    new Thread(() -> {
    while (true) {
    RedissonDelayQueueEnum[] queueEnums = RedissonDelayQueueEnum.values();
    for (RedissonDelayQueueEnum queueEnum : queueEnums) {
    Object value = null;
    try {
    value = redissonDelayQueueUtils.getDelayQueue(queueEnum.getCode());
    if (value != null) {
    RedissonDelayHandle redisDelayQueueHandle = (RedissonDelayHandle) applicationContext.getBean(queueEnum.getBeanId());
    redisDelayQueueHandle.execute(value);
    }
    } catch (InterruptedException e) {
    log.error("(Redis延迟队列异常中断) {}", e.getMessage());
    }
    }
    }
    }).start();
    }
    }