参考文献

随机算法

  • 随机算法,顾名思义就是从可用的服务节点中,随机挑选一个节点来访问.
  • 在实现时,随机算法通常是通过生成一个随机数来实现,比如服务有 10 个节点,那么就每一次生成一个 1~10 之间的随机数,假设生成的是 2,那么就访问编号为 2 的节点.
  • 采用随机算法,在节点数量足够多,并且访问量比较大的情况下,各个节点被访问的概率是基本相同的.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/*
* Copyright 2009-2016 Weibo, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.weibo.api.motan.cluster.loadbalance;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.rpc.Referer;
import com.weibo.api.motan.rpc.Request;

/**
*
* random load balance.
*
* @author fishermen
* @version V1.0 created at: 2013-5-21
*/
@SpiMeta(name = "random")
public class RandomLoadBalance<T> extends AbstractLoadBalance<T> {

@Override
protected Referer<T> doSelect(Request request) {
List<Referer<T>> referers = getReferers();

int idx = (int) (ThreadLocalRandom.current().nextDouble() * referers.size());
for (int i = 0; i < referers.size(); i++) {
Referer<T> ref = referers.get((i + idx) % referers.size());
if (ref.isAvailable()) {
return ref;
}
}
return null;
}

@Override
protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder) {
List<Referer<T>> referers = getReferers();

int idx = (int) (ThreadLocalRandom.current().nextDouble() * referers.size());
for (int i = 0; i < referers.size(); i++) {
Referer<T> referer = referers.get((i + idx) % referers.size());
if (referer.isAvailable()) {
refersHolder.add(referer);
}
}
}
}

轮询算法

  • 轮询算法,顾名思义就是按照固定的顺序,把可用的服务节点,挨个访问一次.
  • 在实现时,轮询算法通常是把所有可用节点放到一个数组里,然后按照数组编号,挨个访问.比如服务有 10 个节点,放到数组里就是一个大小为 10 的数组,这样的话就可以从序号为 0 的节点开始访问,访问后序号自动加 1,下一次就会访问序号为 1 的节点,以此类推.
  • 轮询算法能够保证所有节点被访问到的概率是相同的.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/*
* Copyright 2009-2016 Weibo, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.weibo.api.motan.cluster.loadbalance;

import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.rpc.Referer;
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.util.MathUtil;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
*
* Round robin loadbalance.
*
* @author fishermen
* @version V1.0 created at: 2013-6-13
*/
@SpiMeta(name = "roundrobin")
public class RoundRobinLoadBalance<T> extends AbstractLoadBalance<T> {

private AtomicInteger idx = new AtomicInteger(0);

@Override
protected Referer<T> doSelect(Request request) {
List<Referer<T>> referers = getReferers();

int index = getNextNonNegative();
for (int i = 0; i < referers.size(); i++) {
Referer<T> ref = referers.get((i + index) % referers.size());
if (ref.isAvailable()) {
return ref;
}
}
return null;
}

@Override
protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder) {
List<Referer<T>> referers = getReferers();

int index = getNextNonNegative();
for (int i = 0, count = 0; i < referers.size() && count < MAX_REFERER_COUNT; i++) {
Referer<T> referer = referers.get((i + index) % referers.size());
if (referer.isAvailable()) {
refersHolder.add(referer);
count++;
}
}
}

// get non-negative int
private int getNextNonNegative() {
return MathUtil.getNonNegative(idx.incrementAndGet());
}
}

加权轮询算法

  • 轮询算法能够保证所有节点被访问的概率相同,而加权轮询算法是在此基础上,给每个节点赋予一个权重,从而使每个节点被访问到的概率不同,权重大的节点被访问的概率就高,权重小的节点被访问的概率就小.
  • 在实现时,加权轮询算法是生成一个节点序列,该序列里有 n 个节点,n 是所有节点的权重之和.在这个序列中,每个节点出现的次数,就是它的权重值.比如有三个节点:a、b、c,权重分别是 3、2、1,那么生成的序列就是{a、a、b、c、b、a},这样的话按照这个序列访问,前 6 次请求就会分别访问节点 a 三次,节点 b 两次,节点 c 一次.从第 7 个请求开始,又重新按照这个序列的顺序来访问节点.
  • 在应用加权轮询算法的时候,根据我的经验,要尽可能保证生产的序列的均匀,如果生成的不均匀会造成节点访问失衡,比如刚才的例子,如果生成的序列是{a、a、a、b、b、c},就会导致前 3 次访问的节点都是 a.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
/*
* Copyright 2009-2016 Weibo, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.weibo.api.motan.cluster.loadbalance;

import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.rpc.Referer;
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.util.CollectionUtil;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.MathUtil;

import org.apache.commons.lang3.StringUtils;

import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 权重可配置的负载均衡器
*
* @author chengya1
*/
@SpiMeta(name = "configurableWeight")
public class ConfigurableWeightLoadBalance<T> extends ActiveWeightLoadBalance<T> {

@SuppressWarnings("rawtypes")
private static final RefererListCacheHolder emptyHolder = new EmptyHolder();

@SuppressWarnings("unchecked")
private volatile RefererListCacheHolder<T> holder = emptyHolder;

private String weightString;

@SuppressWarnings("unchecked")
@Override
public void onRefresh(List<Referer<T>> referers) {
super.onRefresh(referers);

if (CollectionUtil.isEmpty(referers)) {
holder = emptyHolder;
} else if (StringUtils.isEmpty(weightString)) {
holder = new SingleGroupHolder<T>(referers);
} else {
holder = new MultiGroupHolder<T>(weightString, referers);
}
}

@Override
protected Referer<T> doSelect(Request request) {
if (holder == emptyHolder) {
return null;
}

RefererListCacheHolder<T> h = this.holder;
Referer<T> r = h.next();
if (!r.isAvailable()) {
int retryTimes = getReferers().size() - 1;
for (int i = 0; i < retryTimes; i++) {
r = h.next();
if (r.isAvailable()) {
break;
}
}
}
if (r.isAvailable()) {
return r;
} else {
noAvailableReferer();
return null;
}
}

@Override
protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder) {
if (holder == emptyHolder) {
return;
}

RefererListCacheHolder<T> h = this.holder;
int i = 0, j = 0;
while (i++ < getReferers().size()) {
Referer<T> r = h.next();
if (r.isAvailable()) {
refersHolder.add(r);
if (++j == MAX_REFERER_COUNT) {
return;
}
}
}
if (refersHolder.isEmpty()) {
noAvailableReferer();
}
}

private void noAvailableReferer() {
LoggerUtil.error(this.getClass().getSimpleName() + " 当前没有可用连接, pool.size=" + getReferers().size());
}

@Override
public void setWeightString(String weightString) {
this.weightString = weightString;
}


/*****************************************************************************************
* ************************************************************************************* *
*****************************************************************************************/
static abstract class RefererListCacheHolder<T> {
abstract Referer<T> next();
}

static class EmptyHolder<T> extends RefererListCacheHolder<T> {
@Override
Referer<T> next() {
return null;
}
}

@SuppressWarnings("hiding")
class SingleGroupHolder<T> extends RefererListCacheHolder<T> {

private int size;
private List<Referer<T>> cache;

SingleGroupHolder(List<Referer<T>> list) {
cache = list;
size = list.size();
LoggerUtil.info("ConfigurableWeightLoadBalance build new SingleGroupHolder.");
}

@Override
Referer<T> next() {
return cache.get(ThreadLocalRandom.current().nextInt(size));
}
}

@SuppressWarnings("hiding")
class MultiGroupHolder<T> extends RefererListCacheHolder<T> {

private int randomKeySize = 0;
private List<String> randomKeyList = new ArrayList<String>();
private Map<String, AtomicInteger> cursors = new HashMap<String, AtomicInteger>();
private Map<String, List<Referer<T>>> groupReferers = new HashMap<String, List<Referer<T>>>();

MultiGroupHolder(String weights, List<Referer<T>> list) {
LoggerUtil.info("ConfigurableWeightLoadBalance build new MultiGroupHolder. weights:" + weights);
String[] groupsAndWeights = weights.split(",");
int[] weightsArr = new int[groupsAndWeights.length];
Map<String, Integer> weightsMap = new HashMap<String, Integer>(groupsAndWeights.length);
int i = 0;
for (String groupAndWeight : groupsAndWeights) {
String[] gw = groupAndWeight.split(":");
if (gw.length == 2) {
Integer w = Integer.valueOf(gw[1]);
weightsMap.put(gw[0], w);
groupReferers.put(gw[0], new ArrayList<Referer<T>>());
weightsArr[i++] = w;
}
}

// 求出最大公约数,若不为1,对权重做除法
int weightGcd = findGcd(weightsArr);
if (weightGcd != 1) {
for(Map.Entry<String,Integer> entry: weightsMap.entrySet()) {
weightsMap.put(entry.getKey(),entry.getValue()/weightGcd);
}
}

for (Map.Entry<String, Integer> entry : weightsMap.entrySet()) {
for (int j = 0; j < entry.getValue(); j++) {
randomKeyList.add(entry.getKey());
}
}
Collections.shuffle(randomKeyList);
randomKeySize = randomKeyList.size();

for (String key : weightsMap.keySet()) {
cursors.put(key, new AtomicInteger(0));
}

for (Referer<T> referer : list) {
groupReferers.get(referer.getServiceUrl().getGroup()).add(referer);
}
}

@Override
Referer<T> next() {
String group = randomKeyList.get(ThreadLocalRandom.current().nextInt(randomKeySize));
AtomicInteger ai = cursors.get(group);
List<Referer<T>> referers = groupReferers.get(group);
return referers.get(MathUtil.getNonNegative(ai.getAndIncrement()) % referers.size());
}

// 求最大公约数
private int findGcd(int n, int m) {
return (n == 0 || m == 0) ? n + m : findGcd(m, n % m);
}

// 求最大公约数
private int findGcd(int[] arr) {
int i = 0;
for (; i < arr.length - 1; i++) {
arr[i + 1] = findGcd(arr[i], arr[i + 1]);
}
return findGcd(arr[i], arr[i - 1]);
}
}

}

最少活跃连接算法

  • 最少活跃连接算法,顾名思义就是每一次访问都选择连接数最少的节点.因为不同节点处理请求的速度不同,使得同一个服务消费者同每一个节点的连接数都不相同.连接数大的节点,可以认为是处理请求慢,而连接数小的节点,可以认为是处理请求快.所以在挑选节点时,可以以连接数为依据,选择连接数最少的节点访问.
  • 在实现时,需要记录跟每一个节点的连接数,这样在选择节点时,才能比较出连接数最小的节点.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/*
* Copyright 2009-2016 Weibo, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.weibo.api.motan.cluster.loadbalance;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.rpc.Referer;
import com.weibo.api.motan.rpc.Request;

/**
* "低并发优化" 负载均衡
*
* <pre>
* 1) 低并发度优先: referer的某时刻的call数越小优先级越高
*
* 2) 低并发referer获取策略:
* 由于Referer List可能很多,比如上百台,如果每次都要从这上百个Referer获取最低并发的几个,性能有些损耗,
* 因此 random.nextInt(list.size()) 获取一个起始的index,然后获取最多不超过MAX_REFERER_COUNT的
* 状态是isAvailable的referer进行判断activeCount.
* </pre>
*
* @author maijunsheng
* @version 创建时间:2013-6-14
*
*/
@SpiMeta(name = "activeWeight")
public class ActiveWeightLoadBalance<T> extends AbstractLoadBalance<T> {

@Override
protected Referer<T> doSelect(Request request) {
List<Referer<T>> referers = getReferers();

int refererSize = referers.size();
int startIndex = ThreadLocalRandom.current().nextInt(refererSize);
int currentCursor = 0;
int currentAvailableCursor = 0;

Referer<T> referer = null;

while (currentAvailableCursor < MAX_REFERER_COUNT && currentCursor < refererSize) {
Referer<T> temp = referers.get((startIndex + currentCursor) % refererSize);
currentCursor++;

if (!temp.isAvailable()) {
continue;
}

currentAvailableCursor++;

if (referer == null) {
referer = temp;
} else {
if (compare(referer, temp) > 0) {
referer = temp;
}
}
}

return referer;
}

@Override
protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder) {
List<Referer<T>> referers = getReferers();

int refererSize = referers.size();
int startIndex = ThreadLocalRandom.current().nextInt(refererSize);
int currentCursor = 0;
int currentAvailableCursor = 0;

while (currentAvailableCursor < MAX_REFERER_COUNT && currentCursor < refererSize) {
Referer<T> temp = referers.get((startIndex + currentCursor) % refererSize);
currentCursor++;

if (!temp.isAvailable()) {
continue;
}

currentAvailableCursor++;

refersHolder.add(temp);
}

Collections.sort(refersHolder, new LowActivePriorityComparator<T>());
}

private int compare(Referer<T> referer1, Referer<T> referer2) {
return referer1.activeRefererCount() - referer2.activeRefererCount();
}

static class LowActivePriorityComparator<T> implements Comparator<Referer<T>> {
@Override
public int compare(Referer<T> referer1, Referer<T> referer2) {
return referer1.activeRefererCount() - referer2.activeRefererCount();
}
}

}

一致性hash算法

  • 一致性 hash 算法,是通过某个 hash 函数,把同一个来源的请求都映射到同一个节点上.一致性 hash 算法最大的特点就是同一个来源的请求,只会映射到同一个节点上,可以说是具有记忆功能.只有当这个节点不可用时,请求才会被分配到相邻的可用节点上.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/*
* Copyright 2009-2016 Weibo, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.weibo.api.motan.cluster.loadbalance;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import com.weibo.api.motan.common.MotanConstants;
import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.rpc.Referer;
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.util.MathUtil;

/**
*
* Use consistent hash to choose referer
*
* @author fishermen
* @version V1.0 created at: 2013-5-21
*/
@SpiMeta(name = "consistent")
public class ConsistentHashLoadBalance<T> extends AbstractLoadBalance<T> {

private List<Referer<T>> consistentHashReferers;

@Override
public void onRefresh(List<Referer<T>> referers) {
super.onRefresh(referers);

List<Referer<T>> copyReferers = new ArrayList<Referer<T>>(referers);
List<Referer<T>> tempRefers = new ArrayList<Referer<T>>();
for (int i = 0; i < MotanConstants.DEFAULT_CONSISTENT_HASH_BASE_LOOP; i++) {
Collections.shuffle(copyReferers);
for (Referer<T> ref : copyReferers) {
tempRefers.add(ref);
}
}

consistentHashReferers = tempRefers;
}

@Override
protected Referer<T> doSelect(Request request) {

int hash = getHash(request);
Referer<T> ref;
for (int i = 0; i < getReferers().size(); i++) {
ref = consistentHashReferers.get((hash + i) % consistentHashReferers.size());
if (ref.isAvailable()) {
return ref;
}
}
return null;
}

@Override
protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder) {
List<Referer<T>> referers = getReferers();

int hash = getHash(request);
for (int i = 0; i < referers.size(); i++) {
Referer<T> ref = consistentHashReferers.get((hash + i) % consistentHashReferers.size());
if (ref.isAvailable()) {
refersHolder.add(ref);
}
}
}

private int getHash(Request request) {
int hashcode;
if (request.getArguments() == null || request.getArguments().length == 0) {
hashcode = request.hashCode();
} else {
hashcode = Arrays.hashCode(request.getArguments());
}
return MathUtil.getNonNegative(hashcode);
}
}

负载均衡算法的使用场景

  • 随机算法:实现比较简单,在请求量远超可用服务节点数量的情况下,各个服务节点被访问的概率基本相同,主要应用在各个服务节点的性能差异不大的情况下.
  • 轮询算法:跟随机算法类似,各个服务节点被访问的概率也基本相同,也主要应用在各个服务节点性能差异不大的情况下.
  • 加权轮询算法:在轮询算法基础上的改进,可以通过给每个节点设置不同的权重来控制访问的概率,因此主要被用在服务节点性能差异比较大的情况.比如经常会出现一种情况,因为采购时间的不同,新的服务节点的性能往往要高于旧的节点,这个时候可以给新的节点设置更高的权重,让它承担更多的请求,充分发挥新节点的性能优势.
  • 最少活跃连接算法:与加权轮询算法预先定义好每个节点的访问权重不同,采用最少活跃连接算法,客户端同服务端节点的连接数是在时刻变化的,理论上连接数越少代表此时服务端节点越空闲,选择最空闲的节点发起请求,能获取更快的响应速度.尤其在服务端节点性能差异较大,而又不好做到预先定义权重时,采用最少活跃连接算法是比较好的选择.
  • 一致性 hash 算法:因为它能够保证同一个客户端的请求始终访问同一个服务节点,所以适合服务端节点处理不同客户端请求差异较大的场景.比如服务端缓存里保存着客户端的请求结果,如果同一客户端一直访问一个服务节点,那么就可以一直从缓存中获取数据