解决方案-延时队列
参考文献
延时队列的定义
- 首先队列是一种先进先出(FIFO)的数据结构.普通队列中的元素是有序的,先进入队列的元素会被优先取出进行消费.延时队列相对普通队列最大的区别是在于其"延时"的特性上.
- 普通队列的元素是先进先出,是按照队列入队的顺序进行处理;延时队列中的元素会指定一个延时时间,表示其希望在经过指定时间后再进行处理.
延时队列的使用场景
- 新建的订单,如果用户在 15 分钟内未支付,则自动取消.
- 公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户.
- 安全工单超过 24 小时未处理,则自动拉企业微信群提醒相关责任人.
- 用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时.
延时队列的具体实现方案
Java DelayQueue
-
DelayQueue
是无界的延时阻塞队列,内部是使用优先级队列PriorityQueue
实现的,其是按时间来定优先级的延时阻塞队列,只有在延时期满时才能从队列中提取元素,先过期的元素会在队首,每次从队列里面取出来的都是最先要过期的元素,当执行队列的take
方法操作元素未过期是会阻塞当前线程直到元素过期为止.1
2public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {}- 队列中元素必须实现
Delayed
接口
- 队列中元素必须实现
1 | package com.holelin.sundry.domain; |
1 | package com.holelin.sundry.test; |
定时任务
-
在启动类中使用
@EnableScheduling
注解开启定时任务功能.1
2
3
4
5
6
7
public class DelayqueueApplication {
public static void main(String[] args) {
SpringApplication.run(DelayqueueApplication.class, args);
}
} -
编写一个定时任务,每个5秒执行一次.
1
2
3
4
5
6
7
8
9
public class QuartzDemo {
//每隔五秒
public void process(){
System.out.println("我是定时任务!");
}
}
Redis Sorted Set
-
Redis
的数据结构Zset
,同样可以实现延迟队列的效果,主要利用它的score
属性,Redis
通过score
来为集合中的成员进行从小到打的排序. -
通过
zadd
命令向队列delayqueue
中添加元素,并设置score
值表示元素过期的时间;向delayqueue
添加三个order1
、order2
、order3
,分别是10秒
、20秒
、30秒
后过期.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106package com.holelin.redis.test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Tuple;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Set;
/**
* @Description:
* @Author: HoleLin
* @CreateDate: 2022/7/25 14:00
* @UpdateUser: HoleLin
* @UpdateDate: 2022/7/25 14:00
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
public class DelayQueueTest {
private JedisPool jedisPool = null;
// Redis服务器IP
private String ADDR = "127.0.0.1";
// Redis的端口号
private int PORT = 6379;
private static String DELAY_QUEUE = "delayqueue";
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public DelayQueueTest() {
jedisPool = new JedisPool(ADDR, PORT);
}
public static void main(String[] args) {
DelayQueueTest redisDelay = new DelayQueueTest();
redisDelay.pushOrderQueue();
redisDelay.pollOrderQueue();
redisDelay.deleteZSet();
}
public void deleteZSet() {
Jedis jedis = jedisPool.getResource();
jedis.del(DELAY_QUEUE);
}
/**
* 消息入队
*/
public void pushOrderQueue() {
Jedis jedis = jedisPool.getResource();
Calendar cal1 = Calendar.getInstance();
cal1.add(Calendar.SECOND, 10);
int order1 = (int) (cal1.getTimeInMillis() / 1000);
Calendar cal2 = Calendar.getInstance();
cal2.add(Calendar.SECOND, 20);
int order2 = (int) (cal2.getTimeInMillis() / 1000);
Calendar cal3 = Calendar.getInstance();
cal3.add(Calendar.SECOND, 30);
int order3 = (int) (cal3.getTimeInMillis() / 1000);
jedis.zadd(DELAY_QUEUE, order1, "order1");
jedis.zadd(DELAY_QUEUE, order2, "order2");
jedis.zadd(DELAY_QUEUE, order3, "order3");
System.out.println(sdf.format(new Date()) + " add finished.");
}
/**
* 消费消息
*/
public void pollOrderQueue() {
Jedis jedis = jedisPool.getResource();
while (true) {
Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);
String value = ((Tuple) set.toArray()[0]).getElement();
int score = (int) ((Tuple) set.toArray()[0]).getScore();
Calendar cal = Calendar.getInstance();
int nowSecond = (int) (cal.getTimeInMillis() / 1000);
if (nowSecond >= score) {
jedis.zrem(DELAY_QUEUE, value);
System.out.println(sdf.format(new Date()) + " removed key:" + value);
}
if (jedis.zcard(DELAY_QUEUE) <= 0) {
System.out.println(sdf.format(new Date()) + " zset empty ");
return;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Redis
过期回调
-
Redis
的key
过期回调事件,也能达到延迟队列的效果,简单来说我们开启监听key
是否过期的事件,一旦key
过期会触发一个callback
事件.-
修改
redis.conf
文件开启notify-keyspace-events Ex
-
实现过期
Key
监听器1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30package com.holelin.redis.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/**
* @Description:
* @Author: HoleLin
* @CreateDate: 2022/7/26 15:45
* @UpdateUser: HoleLin
* @UpdateDate: 2022/7/26 15:45
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
log.info("监听到key:" + expiredKey + "已过期");
}
} -
配置监听器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37package com.holelin.redis.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import java.io.Serializable;
import java.net.UnknownHostException;
/**
* @Description: Redis 配置类
* @Author: HoleLin
* @CreateDate: 2020/9/4 10:36
* @UpdateUser: HoleLin
* @UpdateDate: 2020/9/4 10:36
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
public class RedisAutoConfiguration {
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
-
RabbitMQ
延时队列
-
利用
RabbitMQ
做延时队列是比较常见的一种方式,而实际上RabbitMQ
自身并没有直接支持提供延迟队列功能,而是通过RabbitMQ
消息队列的TTL
和DXL
这两个属性间接实现的. -
Time To Live
(TTL
) :-
TTL
顾名思义:指的是消息的存活时间,RabbitMQ
可以通过x-message-tt
参数来设置指定Queue
(队列)和Message
(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒. -
RabbitMQ 可以从两种维度设置消息过期时间,分别是队列和消息本身.
-
设置队列过期时间,那么队列中所有消息都具有相同的过期时间.
-
设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息
TTL
都可以不同.
-
-
如果同时设置队列和队列中消息的
TTL
,则TTL
值以两者中较小的值为准.而队列中的消息存在队列中的时间,一旦超过TTL
过期时间则成为Dead Letter
(死信)
-
-
Dead Letter Exchanges
(DLX
)DLX
即死信交换机,绑定在死信交换机上的即死信队列.RabbitMQ
的Queue
(队列)可以配置两个参数x-dead-letter-exchange
和x-dead-letter-routing-key
(可选),一旦队列内出现了Dead Letter
(死信),则按照这两个参数可以将消息重新路由到另一个Exchange
(交换机),让消息重新被消费.x-dead-letter-exchange
:队列中出现Dead Letter
后将Dead Letter
重新路由转发到指定exchange
(交换机).x-dead-letter-routing-key
:指定routing-key
发送,一般为要指定转发的队列.- 队列出现
Dead Letter
的情况有:- 消息超时未消费,也就是
TTL
过期了 - 队列达到最大长度
- 消息被消费端拒绝(
basic.reject/basic.nack
)并且不再重新投递requeue=false
- 消息超时未消费,也就是
Redisson DelayQueue
- 延时队列的具体实现最好查看源码,涉及三个队列和一个发布订阅通道
- 阻塞队列 List:
KEY = queueName
,执行 BLPOP 命令从左端弹出元素,右端插入元素.当一条数据到达过期时间的时候,会从redisson_delay_queue:{DelayMessage}中移除,加入到这个队列,客户端监听的就是这个队列,这个队列里面的全都是已经过期的数据. - 有序集合 Sorted Set:
KEY = redisson_delay_queue_timeout:{queueName}
,score
是元素的过期时间,按从小到大排序,过期时间小于当前时间表示已过期,删除集合中的元素,同时将普通集合 List中对应的元素删除,并将元素添加到阻塞队列 List等待客户端消费. - 普通集合 List:
KEY = redisson_delay_queue:{DelayMessage}
,按顺序从右端添加元素,元素过期会被删除. - 发布/订阅通道:
redisson_delay_queue_channel
,往延时队列中放入一个数据时,会将延时时间publish
出去,客户端收到之后会按这个时间延时之后再执行定时任务.
- 阻塞队列 List:
示例
-
依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
<relativePath/>
<!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>${redisson.version}</version>
<exclusions>
<exclusion>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data-31</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data-22</artifactId>
<version>${redisson.version}</version>
</dependency>
</dependencies> -
配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
public class RedissonConfig {
private RedisProperties redisProperties;
public RedissonClient redissonClient() {
Config config = new Config();
String redisUrl = String.format("redis://%s:%s", redisProperties.getHost(), redisProperties.getPort());
config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword());
config.useSingleServer().setDatabase(1);
return Redisson.create(config);
}
public RedisTemplate<String, Serializable> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
RedisTemplate<String, Serializable> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.WRAPPER_ARRAY);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setConnectionFactory(redisConnectionFactory);
template.setKeySerializer(jackson2JsonRedisSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
} -
yaml
1
2
3
4
5
6
7
8
9
10
11
12
13spring:
redis:
database: 1
host: localhost
password: holelin
port: 6379
timeout: 500
jedis:
pool:
max-active: 50
max-idle: 20
max-wait: 3000
min-idle: 2 -
Redisson
延迟队列工具类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* @author HoleLin
*/
public class RedissonDelayQueueUtils {
private final RedissonClient redissonClient;
public RedissonDelayQueueUtils(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
public <T> void sendMessage(T value, long delay, TimeUnit timeUnit, String queueCode) {
try {
RBlockingQueue<Object> blockingDeque = redissonClient.getBlockingQueue(queueCode);
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
//如果延时队列中原先存在这条消息,remove可以删除延时队列中的这条消息
//如果多次发送相同的消息,先remove再offer,只有最后一条会被延时消费,延时时间以最后一条的发送时间开始延时
delayedQueue.remove(value);
delayedQueue.offer(value, delay, timeUnit);
log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delay) + "秒");
} catch (Exception e) {
log.error("(添加延时队列失败) {}", e.getMessage());
throw new RuntimeException("(添加延时队列失败)");
}
}
/**
* 获取延迟队列
*
* @param queueCode 队列名称
* @param <T> 队列具体内容泛型
* @throws InterruptedException
*/
public <T> T getDelayQueue(String queueCode) throws InterruptedException {
RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(queueCode);
T value = (T) blockingDeque.take();
return value;
}
} -
延迟队列业务枚举类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28/**
* @author HoleLin
*/
public enum RedissonDelayQueueEnum {
AI_QUEUE_RECORD_EXPIRED("AI_QUEUE_RECORD_EXPIRED", "aiQueueRecordExpired");
/**
* 延迟队列的QueueCode
*/
private String code;
/**
* 延迟队列具体业务实现的Bean
* 可通过Spring上下文获取
*/
private String beanId;
RedissonDelayQueueEnum(String code, String beanId) {
this.code = code;
this.beanId = beanId;
}
public String getCode() {
return code;
}
public String getBeanId() {
return beanId;
}
} -
延迟队列执行器
1
2
3
4
5
6
7
8
9
10/**
* 延迟队列执行器
*
* @author HoleLin
*/
public interface RedissonDelayHandle<T> {
void execute(T t);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14/**
* Ai队列记录失效处理类
*
* @author HoleLin
*/
public class AiQueueRecordExpiredHandler implements RedissonDelayHandle<AITaskRecordInfo> {
public void execute(AITaskRecordInfo aiTaskRecordInfo) {
// 具体实现
}
} -
创建延迟队列消费线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46import com.holelin.enums.RedissonDelayQueueEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* @author HoleLin
*/
public class RedissonDelayQueueRunner implements ApplicationRunner {
private RedissonDelayQueueUtils redissonDelayQueueUtils;
private ApplicationContext applicationContext;
public RedissonDelayQueueRunner(RedissonDelayQueueUtils redissonDelayQueueUtils, ApplicationContext applicationContext) {
this.redissonDelayQueueUtils = redissonDelayQueueUtils;
this.applicationContext = applicationContext;
}
public void run(ApplicationArguments args) throws Exception {
new Thread(() -> {
while (true) {
RedissonDelayQueueEnum[] queueEnums = RedissonDelayQueueEnum.values();
for (RedissonDelayQueueEnum queueEnum : queueEnums) {
Object value = null;
try {
value = redissonDelayQueueUtils.getDelayQueue(queueEnum.getCode());
if (value != null) {
RedissonDelayHandle redisDelayQueueHandle = (RedissonDelayHandle) applicationContext.getBean(queueEnum.getBeanId());
redisDelayQueueHandle.execute(value);
}
} catch (InterruptedException e) {
log.error("(Redis延迟队列异常中断) {}", e.getMessage());
}
}
}
}).start();
}
}
Redisson
延迟队列源码分析
1 | redisson 3.23.4 |
数据结构
redisson_delay_queue_timeout:{queue_name}
消息延时队列,ZSET
结构(value
为消息,score
为过期时间),这样就可以知道当前过期的消息.redisson_delay_queue:{queue_name}
消息顺序队列,LIST
结构,按照消息添加顺序存储,移除消息时可以按照添加顺序删除.{queue_name}
消息目标队列,LIST
结构,存储实际到期可以被消费的消息供消费者拉取消费.redisson_delay_queue_channel:{queue_name}
发布订阅channel
主题,用于通知客户端定时器从定期队列转移到期的消息到目标队列.
消息生产
1 |
|
- 通过
redissonClient.getDelayedQueue
获取RDelayedQueue
对象 - 然后
delayedQueue
调用offer
方法去保存消息 - 最后真正的保存逻辑是由
RedissonDelayedQueue
执行offerAsync
方法调用的lua
脚本
定时器转移消息
1 | protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) { |
消息消费
1 |
|