数据结构-延迟队列
参考文献
延迟队列概念
顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费
使用场景
- 有效期:限时活动、拼团等
- 超时处理:取消超时未支付订单、超时自动确认收货等
- 延迟处理:机器人点赞/观看数/评论/关注、等待依赖条件等
- 重试:网络异常重试、打车派单、依赖条件未满足重试等
- 定时任务:智能设备定时启动等
具体实现
DelayQueue
延迟队列
-
JDK
中提供了一组实现延迟队列的API
,位于Java.util.concurrent
包下DelayQueue
。 -
DelayQueue
是一个BlockingQueue
(无界阻塞)队列,它本质就是封装了一个PriorityQueue
(优先队列)。 -
PriorityQueue
内部使用完全二叉堆
来实现队列元素排序,我们在向DelayQueue
队列中添加元素时,会给元素一个Delay
(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了Delay
时间才允许从队列中取出。队列中可以放基本数据类型或自定义实体类,在存放基本数据类型时,优先队列中元素默认升序排列,自定义实体类就需要我们根据类属性值比较计算了。 -
要实现
DelayQueue
延时队列,队中元素要implements
Delayed
接口,这个接口里只有一个getDelay
方法,用于设置延期时间。Order
类中compareTo
方法负责对队列中的元素进行排序。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25package java.util.concurrent;
/**
* A mix-in style interface for marking objects that should be
* acted upon after a given delay.
*
* <p>An implementation of this interface must define a
* {@code compareTo} method that provides an ordering consistent with
* its {@code getDelay} method.
*
* @since 1.5
* @author Doug Lea
*/
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class Order implements Delayed {
/**
* 延迟时间
*/
private long time;
private String name;
public Order(String name, long time, TimeUnit unit) {
this.name = name;
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
public int compareTo(Delayed o) {
Order Order = (Order) o;
long diff = this.time - Order.time;
if (diff <= 0) {
return -1;
} else {
return 1;
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
DelayQueue<Order> delayQueue = new DelayQueue<>();
delayQueue.put(Order1);
delayQueue.put(Order2);
delayQueue.put(Order3);
System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
while (delayQueue.size() != 0) {
/**
* 取队列头部元素是否过期
*/
Order task = delayQueue.poll();
if (task != null) {
System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.getName(), LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
Thread.sleep(1000);
}
}
}
// 输出
订单延迟队列开始时间:2021-07-12 14:24:12
订单:{Order1}被取消, 取消时间:{2021-07-12 14:24:17}
订单:{Order2}被取消, 取消时间:{2021-07-12 14:24:22}
订单:{Order3}被取消, 取消时间:{2021-07-12 14:24:27}
Quartz
定时任务
-
Quartz
一款非常经典任务调度框架,在Redis
、RabbitMQ
还未广泛应用时,超时未支付取消订单功能都是由定时任务实现的。定时任务它有一定的周期性,可能很多单子已经超时,但还没到达触发执行的时间点,那么就会造成订单处理的不够及时。 -
引入
quartz
框架依赖包1
2
3
4<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency> -
在启动类中使用
@EnableScheduling
注解开启定时任务功能。1
2
3
4
5
6
7
public class DelayqueueApplication {
public static void main(String[] args) {
SpringApplication.run(DelayqueueApplication.class, args);
}
} -
编写一个定时任务,每个5秒执行一次。
1
2
3
4
5
6
7
8
9@Component
public class QuartzDemo {
//每隔五秒
@Scheduled(cron = "0/5 * * * * ? ")
public void process(){
System.out.println("我是定时任务!");
}
}
Redis Sorted Set
-
Redis
的数据结构Zset
,同样可以实现延迟队列的效果,主要利用它的score
属性,redis
通过score
来为集合中的成员进行从小到大的排序。 -
通过
zadd
命令向队列delayqueue
中添加元素,并设置score
值表示元素过期的时间;向delayqueue
添加三个order1
、order2
、order3
,分别是10秒
、20秒
、30秒
后过期。1
2
3zadd delayqueue 10 order1
zadd delayqueue 20 order2
zadd delayqueue 30 order3 -
消费端轮询队列
delayqueue
, 将元素排序后取最小时间与当前时间比对,若小于当前时间代表已经过期移除key
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25/**
* 消费消息
*/
public void pollOrderQueue() {
while (true) {
Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);
String value = ((Tuple) set.toArray()[0]).getElement();
int score = (int) ((Tuple) set.toArray()[0]).getScore();
Calendar cal = Calendar.getInstance();
int nowSecond = (int) (cal.getTimeInMillis() / 1000);
if (nowSecond >= score) {
jedis.zrem(DELAY_QUEUE, value);
System.out.println(sdf.format(new Date()) + " removed key:" + value);
}
if (jedis.zcard(DELAY_QUEUE) <= 0) {
System.out.println(sdf.format(new Date()) + " zset empty ");
return;
}
Thread.sleep(1000);
}
}
Redis
过期回调
-
Redis
的key
过期回调事件,也能达到延迟队列的效果,简单来说我们开启监听key是否过期的事件,一旦key过期会触发一个callback事件。 -
修改
redis.conf
文件开启notify-keyspace-events Ex
1
notify-keyspace-events Ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50############################# EVENT NOTIFICATION ##############################
# Redis can notify Pub/Sub clients about events happening in the key space.
# This feature is documented at https://redis.io/topics/notifications
#
# For instance if keyspace events notification is enabled, and a client
# performs a DEL operation on key "foo" stored in the Database 0, two
# messages will be published via Pub/Sub:
#
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo
#
# It is possible to select the events that Redis will notify among a set
# of classes. Every class is identified by a single character:
#
# K Keyspace events, published with __keyspace@<db>__ prefix.
# E Keyevent events, published with __keyevent@<db>__ prefix.
# g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
# $ String commands
# l List commands
# s Set commands
# h Hash commands
# z Sorted set commands
# x Expired events (events generated every time a key expires)
# e Evicted events (events generated when a key is evicted for maxmemory)
# t Stream commands
# d Module key type events
# m Key-miss events (Note: It is not included in the 'A' class)
# A Alias for g$lshzxetd, so that the "AKE" string means all the events
# (Except key-miss events which are excluded from 'A' due to their
# unique nature).
#
# The "notify-keyspace-events" takes as argument a string that is composed
# of zero or multiple characters. The empty string means that notifications
# are disabled.
#
# Example: to enable list and generic events, from the point of view of the
# event name, use:
#
# notify-keyspace-events Elg
#
# Example 2: to get the stream of the expired keys subscribing to channel
# name __keyevent@0__:expired use:
#
# notify-keyspace-events Ex
#
# By default all notifications are disabled because most users don't need
# this feature and the feature has some overhead. Note that if you don't
# specify at least one of K or E, no events will be delivered.
notify-keyspace-events "" -
Redis
监听配置,注入BeanRedisMessageListenerContainer
1
2
3
4
5
6
7
8
9
public class RedisListenerConfig {
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
} -
编写
Redis
过期回调监听方法,必须继承KeyExpirationEventMessageListener
,有点类似于MQ
的消息监听。1
2
3
4
5
6
7
8
9
10
11
12
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
System.out.println("监听到key:" + expiredKey + "已过期");
}
}