参考文献

指数退避算法

实例

  • spring-retry
    • org.springframework.retry.interceptor.RetryInterceptorBuilder

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package cn.holelin.ct.pacs.manager;

import cn.hutool.json.JSONUtil;
import cn.holelin.ct.pacs.config.DefaultPacsServerProperties;
import cn.holelin.ct.pacs.config.PacsProperties;
import cn.holelin.ct.pacs.config.PacsServerProperties;
import cn.holelin.ct.pacs.domain.PacsCMoveConfig;
import cn.holelin.ct.pacs.entity.PullTaskRecord;
import cn.holelin.ct.pacs.enums.InformationModelEnum;
import cn.holelin.ct.pacs.request.PacsStoreCondition;
import cn.holelin.ct.pacs.service.PullTaskRecordService;
import cn.holelin.ct.pacs.support.PacsCMoveSupport;
import cn.holelin.ct.pacs.utils.PacsContext;
import cn.holelin.ct.pacs.utils.PacsHelper;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author HoleLin
*/
@Slf4j
public class StoreDicomWithLockRunnable implements Runnable {

private final PacsProperties pacsProperties;
private final ReentrantLock lock = new ReentrantLock();
private final PacsCMoveSupport support;
private final PullTaskRecordService pullTaskRecordService;
/**
* 初始退避时间 1秒
*/
private static final long INITIAL_SLEEP_MILLIS = 1000L;
/**
* 最大退避时间 60秒
*/
private static final long MAX_SLEEP_MILLIS = 60000L;

public StoreDicomWithLockRunnable(PacsProperties pacsProperties,
PullTaskRecordService pullTaskRecordService) {
this.pacsProperties = pacsProperties;
this.support = new PacsCMoveSupport(config(),
pacsProperties.getDefaultConfig().getNeedExtendedNegotiation());
this.pullTaskRecordService = pullTaskRecordService;
}

@Override
public void run() {
long currentSleepMillis = INITIAL_SLEEP_MILLIS;

while (!Thread.currentThread().isInterrupted()) {
try {
if (lock.tryLock()) {
try {
final Optional<PullTaskRecord> optional = pullTaskRecordService.getTask();
if (optional.isEmpty()){
// 没有任务,指数退避
log.debug("拉取DICOM线程,未找到任务,等待 {} 毫秒后重试", currentSleepMillis);
TimeUnit.MILLISECONDS.sleep(currentSleepMillis);
// 指数增长
currentSleepMillis = Math.min(currentSleepMillis * 2, MAX_SLEEP_MILLIS);
continue;
}
processTask(optional.get());
// 任务成功获取,重置退避时间
currentSleepMillis = INITIAL_SLEEP_MILLIS;
}finally {
lock.unlock();
}
}
} catch (Exception e) {
log.error("任务执行时发生异常:", e);
}
}
}

private void processTask(PullTaskRecord task) {
PacsStoreCondition condition = task.getCondition();
String taskId = task.getTaskId();
log.info("开始处理任务:{},条件为:{}", taskId, JSONUtil.parseObj(condition).toString());
if (PacsContext.offer(taskId)) {
try {
support.execute(InformationModelEnum.MOVE, PacsHelper.buildBaseConditionAttributes(condition), taskId);
} catch (Exception e) {
log.error("处理C_MOVE异常,taskId:{}", taskId, e);
PacsContext.remove();
}
} else {
log.error("offer失败");
}
}

private PacsCMoveConfig config() {
PacsServerProperties remoteConfig = pacsProperties.getRemoteConfig();
DefaultPacsServerProperties defaultConfig = pacsProperties.getDefaultConfig();
PacsCMoveConfig config = new PacsCMoveConfig();
config.setRemotePort(remoteConfig.getPort());
config.setRemoteHostName(remoteConfig.getHostname());
config.setRemoteAeTitle(remoteConfig.getAet());
config.setAeTitle(defaultConfig.getAet());
config.setDeviceName(defaultConfig.getAet());
config.setDestinationAeTitle(defaultConfig.getAet());
return config;
}
}