small_925_ant
2019-01-20 21:18:03浏览 2896
上一篇主要介绍了基于zookeeper的服务注册实现:
https://www.imooc.com/article/274027
对于我们的系统还需要一些策略,比如worker机器的选择策略,请求失败的策略,让我们的系统更加的可靠。
客户端负载均衡
对于loadbalance,主要是实现几个方法,让我们的manager客户端进行worker机器进行合适的选择。比如轮训,随机,或者权重。
具体实现
0-工具方法
private int getNextNonNegative() {
return MathUtil.getNonNegative(idx.getAndIncrement());
}
public static int getNonNegative(int originValue){
return 0x7fffffff & originValue;
}
1-随机
@Override
public Endpoint doSelect(RequestPacket request) {
List<Endpoint> endpoints = getEndpoints();
int idx = (int) (ThreadLocalRandom.current().nextDouble() * endpoints.size());
for (int i = 0; i < endpoints.size(); i++) {
Endpoint ref = endpoints.get((i + idx) % endpoints.size());
return ref;
}
return null;
}
2-轮训
@Override
public Endpoint doSelect(RequestPacket request) {
List<Endpoint> endpoints = getEndpoints();
int index = getNextNonNegative();
for (int i = 0; i < endpoints.size(); i++) {
Endpoint ref = endpoints.get((i + index) % endpoints.size());
return ref;
}
return null;
}
3-低并发度优先: referer的某时刻的call数越小优先级越高
低并发referer获取策略:
由于Referer List可能很多,比如上百台,如果每次都要从这上百个Referer或者最低并 发的几个,性能有些损耗,因此 random.nextInt(list.size()) 获取一个起始index,然后获取最多不超过MAX_REFERER_COUNT的状态是isAvailable的referer进行判断activeCount.
protected Endpoint doSelect(RequestPacket request) {
List<Endpoint> endpoints = getEndpoints();
int refererSize = endpoints.size();
int startIndex = ThreadLocalRandom.current().nextInt(refererSize);
int currentCursor = 0;
int currentAvailableCursor = 0;
Endpoint endpoint = null;
while (currentAvailableCursor < MAX_REFERER_COUNT && currentCursor < refererSize) {
Endpoint temp = endpoints.get((startIndex + currentCursor) % refererSize);
currentCursor++;
currentAvailableCursor++;
if (endpoint == null) {
endpoint = temp;
} else {
if (compare(endpoint, temp) > 0) {
endpoint = temp;
}
}
}
return endpoint;
}
4 ......更多
Ha策略
这里就简单实现两种:
1- FailfastHaStrategy 快速失败
我们通过loadBalance选择一个worker机器地址,然后发起请求,如果出现错误就立刻报错失败。
2- FailoverHaStrategy 会带有重试功能
我们通过loadBalance会选择一组worker机器,如果第一个失败,会轮训调用后面的机器,直到成功,或者全部失败
实现
1-FailfastHaStrategy
public ResponsePacket call(RequestPacket request, LoadBalance loadBalance) {
Endpoint endpoint = loadBalance.select(request);
log.info("{}FailfastHaStrategy start to call {},request:{}", Constants.LOG_PREFIX, endpoint.getHost(), request.toString());
return request(endpoint, request);
}
2-FailoverHaStrategy
public class FailoverHaStrategy extends AbstractHaStrategy {
protected ThreadLocal<List<Endpoint>> endpointHolder = new ThreadLocal<List<Endpoint>>() {
@Override
protected java.util.List<Endpoint> initialValue() {
return new ArrayList<Endpoint>();
}
};
public ResponsePacket call(RequestPacket request, LoadBalance loadBalance) {
List<Endpoint> endpointList = selectReferers(request, loadBalance);
if (endpointList.isEmpty()) {
throw new CommonException(999999, String.format("FailoverHaStrategy No Endpoint loadbalance:%s", loadBalance));
}
int tryCount = request.getRetries();
if (tryCount < 0) {
tryCount = 0;
}
for (int i = 0; i <= tryCount; i++) {
Endpoint endpoint = endpointList.get(i % endpointList.size());
log.info("{}FailoverHaStrategy start to call ......{},tryCount:{},request:{}", Constants.LOG_PREFIX, endpoint.getHost(), (i + 1), request.toString());
try {
return request(endpoint, request);
} catch (RuntimeException e) {
if (e instanceof CommonException) {
throw e;
} else if (i >= tryCount) {
log.info("{}tryCount is over......throw e", Constants.LOG_PREFIX);
throw e;
}
log.info("{}try run ,tryCount:{}", Constants.LOG_PREFIX, (i + 1));
}
}
throw new CommonException(999999, "FailoverHaStrategy.call should not come here!");
}
protected List<Endpoint> selectReferers(RequestPacket request, LoadBalance loadBalance) {
List<Endpoint> endpoints = endpointHolder.get();
endpoints.clear();
loadBalance.selectToHolder(request, endpoints);
return endpoints;
}
}
总结
后续我们会继续介绍一些核心功能,比如代理的实现和transport层的实现
完整代码见github: https://github.com/lizu18xz/faya-job