DengYong's Blog

CompletionService的使用

2023-12-09

最近做的项目中有这样一个场景,需要根据手机号查询用户近两天的订单信息,要求最好在15秒以内返回,如果超过15秒则可以直接忽略,然而这些订单信息需要分不同的场景去调不同的接口获取,直接调用接口的话,耗时太长,所以考虑使用CompletionService来异步调用接口,以达到快速返回的目的。

方法的封装

注:这里的代码与真实项目代码不完全相同哈,仅做参考

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package cn.heartisai;

import cn.hutool.core.thread.NamedThreadFactory;

import java.util.*;
import java.util.concurrent.*;
import java.util.function.Predicate;

public class ParallelExecutor {

//定义线程池
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() / 2,
Runtime.getRuntime().availableProcessors() * 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),
new NamedThreadFactory("parallel", false),
(r, executor) -> r.run());

public static <T> TimeLimitedExecutor<T> newTimeLimitedExecutor(long limitedTime) {
return new TimeLimitedExecutor<>(limitedTime);
}

public static <T> TimeLimitedExecutor<T> newTimeLimitedExecutor(long limitedTime, long subtleTime) {
return new TimeLimitedExecutor<>(limitedTime, subtleTime);
}

public static class TimeLimitedExecutor<T> {

private final long limitedTime;
private long subtleTime;

private final ExecutorCompletionService<T> completionService;

List<Callable<T>> callableList = new ArrayList<>();

/**
* 时间限制执行器
* @param limitedTime 限制执行的总时间(单位:ms)
*/
public TimeLimitedExecutor(long limitedTime) {
this(limitedTime, 20L);
}

/**
* 时间限制执行器
* @param limitedTime 限制执行的总时间(单位:ms)
* @param subtleTime 到达总时间后,后续的执行尝试时间
*/
public TimeLimitedExecutor(long limitedTime, long subtleTime) {
this.limitedTime = limitedTime;
this.subtleTime = subtleTime;
completionService = new ExecutorCompletionService<>(executor);
}

public TimeLimitedExecutor<T> setSubtleTime(long subtleTime) {
this.subtleTime = subtleTime;
return this;
}

/**
* 提交任务
* @param callable 任务
* @return TimeLimitedExecutor
*/
public TimeLimitedExecutor<T> submit(Callable<T> callable) {
callableList.add(callable);
return this;
}

/**
* 提交任务
* @param runnable Runnable
* @param result 默认返回值
* @return TimeLimitedExecutor
*/
public TimeLimitedExecutor<T> submit(Runnable runnable, T result) {
callableList.add(() -> {
runnable.run();
return result;
});
return this;
}

/**
* 执行任务并返回数据列表
* @return 数据列表
*/
public List<T> execute() {
return execute(null);
}

/**
* 执行任务并返回数据列表
* @param stopWhen 停止的条件,有些场景可能,当查询到某些数据时,就可以直接返回无需继续查询了,那么就可以使用这个参数
* @return 数据列表
*/
public List<T> execute(Predicate<T> stopWhen) {
if (callableList.isEmpty()) {
return Collections.emptyList();
}

if (stopWhen == null) {
stopWhen = (v) -> false;
}

List<Future<T>> futureList = new ArrayList<>();
for (Callable<T> c : callableList) {
futureList.add(completionService.submit(c));
}

List<T> list = new ArrayList<>();
Set<Future<T>> set = new HashSet<>();
final long startTime = System.currentTimeMillis();
for (int i = 0, len = callableList.size(); i < len; i++) {
long pollTime = i == 0 ? limitedTime : limitedTime - (System.currentTimeMillis() - startTime);
if (pollTime <= 5) {
pollTime = subtleTime;
}
Future<T> future = null;
try {
future = completionService.poll(pollTime, TimeUnit.MILLISECONDS);
} catch (Exception e) {
System.out.println("获取future异常了");
e.printStackTrace();
}
if (future == null) {
System.out.println("超时出间范围了");
} else {
try {
T value = future.get();
set.add(future);
list.add(value);
if (stopWhen.test(value)) {
break;
}
} catch (Exception e) {
System.out.println("获取数据异常了");
e.printStackTrace();
}
}
}

//取消那些没有执行完的任务
if (set.size() < futureList.size()) {
for (Future<T> f : futureList) {
if (!set.contains(f) && !f.isCancelled()) {
//true会引发中断,这样任务在执行中可以中断,否则任务会执行完
f.cancel(true);
}
}
}

return list;

}

}

}

调用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private static void print(String str) {
System.out.println(System.currentTimeMillis() + ":" + str);
}

public static void main(String[] args) {
print("start...");
List<String> list = ParallelExecutor.<String>newTimeLimitedExecutor(5000)
.submit(() -> {
print("进入A");
Thread.sleep(1000L);
print("A2");
Thread.sleep(3000L);
print("A3");
Thread.sleep(3000L);
print("完成A");
return "A";
}).submit(() -> {
print("进入B");
Thread.sleep(1000L);
print("B2");
Thread.sleep(2000L);
print("完成B");
return "B";
}).submit(() -> {
print("进入C");
Thread.sleep(6000L);
print("完成C");
return "C";
}).submit(() -> {
print("进入D");
Thread.sleep(1000L);
print("D2");
Thread.sleep(3000L);
print("完成D");
return "D";
}).submit(() -> {
print("进入E");
Thread.sleep(2000L);
print("E2");
Thread.sleep(4000L);
print("完成E");
return "E";
}).execute();
print("finish~");
System.out.println("结果:" + String.join(",", list));
}

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1702127177681:start...
1702127177706:进入A
1702127177706:进入B
1702127177706:进入C
1702127177706:进入D
1702127178710:D2
1702127178710:A2
1702127178710:B2
1702127180714:完成B
1702127180715:进入E
1702127181725:A3
1702127181725:完成D
超时出间范围了
1702127182726:E2
超时出间范围了
超时出间范围了
1702127182773:finish~
结果:B,D

示例说明

  • 总体执行时间要求为5s,其中B和D在要求的时间范围内,A、C和E超时了
  • A任务执行总时间为7s,其中A2和A3在要求的时间范围内,所以输出了A2A3,最的的那一个sleep 3s超过了要求时间范围,所以未输出
  • B任务执行总时间为3s,
  • C任务执行总时间为6s,整个任务超时了,所以未输出
  • D任务执行总时间为4s
  • E任务执行总时间为6s,其中E2在要求的时间范围内,所以输出E2,最的的那一个sleep 4s超过了要求时间范围,所以未输出

其它调用示例

如果把调用示例中的第43行中的.execute()改成如下代码

1
.execute("B"::equals); //表示获取值为"B"以后就不再执行剩下的任务了

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
1702127975977:start...
1702127976004:进入A
1702127976004:进入B
1702127976004:进入C
1702127976005:进入D
1702127977008:D2
1702127977008:B2
1702127977008:A2
1702127979020:完成B
1702127979021:进入E
1702127979021:finish~
结果:B

执行结果说明

  • 总体执行时间要求为5s,但到B值以后就无需往下执行其它任务了,由于B任务执行总时间为3s,所以日志中只输出了时间在3s以内的任务,而结果也只有B

代码的隐患

  • 因为使用的线程池,当同一时间最多只能有4个线程在执行, 但当submit了5个任务的时候,第5个任务会进入队列,等待前面的任务有执行完成的以后再执行,就像调用示例中的一样,只有B执行完以后,E任务才会进入,这样就会出现一个问题,当把E任务的执行改为小于总时间5s也不会执行,只有当B任务的时间 + E任务的时间 < 5s 的时候,才会执行完,才能得到E的值。

代码优化

为了规避前述所说的隐患,在构建线程池的时候,可以使用如下代码

1
2
3
4
5
6
//定义线程池
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() / 4,
Runtime.getRuntime().availableProcessors() * 2, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>(), //这里是关键
new NamedThreadFactory("parallel", false),
(r, executor) -> r.run());

扫描二维码,分享此文章