dubbo之集群
发布日期:2021-05-16 10:23:30 浏览次数:11 分类:技术文章

本文共 14977 字,大约阅读时间需要 49 分钟。

前言

集群容错源码包含四个部分,分别是服务目录 Directory、服务路由 Router、集群 Cluster和 负载均衡 LoadBalance。

快速上手

根据集群容源码,编写了例子:

public class ClusterTest {
public static void main(String[] args) throws Exception {
ServiceRepository repository = ApplicationModel.getServiceRepository(); ServiceDescriptor serviceDescriptor = repository.registerService(GreetingsService.class); MethodDescriptor methodDescriptor = serviceDescriptor.getMethod("sayHi", new Class[]{
String.class}); Method method = methodDescriptor.getMethod(); String serviceName = serviceDescriptor.getServiceName(); URL url = URL.valueOf("dubbo://127.0.0.1:28092/"+serviceName+"?timeout=12000&monitor=mm"); Protocol protocol = new ProtocolFilterWrapper(new DubboProtocol()); protocol.export(new Invoker
() {
// ... @Override public Result invoke(Invocation invocation) throws RpcException {
System.out.println("yoyoy111111111"); return AsyncRpcResult.newDefaultAsyncResult(invocation); } }); Invoker invoker = protocol.refer(GreetingsService.class, url); // 把远程对象invoker加入StaticDirectory // 创建集群invoker代码Cluster // Cluster调用的invoke,会从StaticDirectory加载可能的远程对象执行 invoker = Cluster.getCluster(null, false).join(new StaticDirectory(Lists.newArrayList(invoker))); invoker.invoke(new RpcInvocation(method, serviceName, new String[]{
"yoyo"})); }}

分析invoke源码得出:

在这里插入图片描述
集群工作过程可分为两个阶段:

  1. 在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。
  2. 第二个阶段是在服务消费者进行远程调用时。以 FailoverClusterInvoker 为例
    1. 该类型 Cluster Invoker 首先会调用 Directory 的 list 方法列举 Invoker 列表(可将 Invoker 简单理解为服务提供者)。
    2. Directory 的用途是保存 Invoker,可简单类比为 List。
    3. 当 FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表后,它会通过 LoadBalance 从 Invoker 列表中选择一个 Invoker。
    4. FailoverClusterInvoker 会将参数传给 LoadBalance 选择出的 Invoker 实例的 invoke 方法,进行真正的远程调用。

服务目录

StaticDirectory即为服务目录,它的作用维护远程调用Invoker集合的。分析其源码

public abstract class AbstractDirectory
implements Directory
{
// 注册中心url 例: // zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=test2&dubbo=2.0.2&pid=7861&release=2.7.8&timestamp=1616255181499 private final URL url; // 注册中心url + refer中参数 // zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=test2&dubbo=2.0.2&interface=dubbo.GreetingsService&metadata-type=remote&methods=sayHi&pid=7861&protocol=dubbo&register.ip=192.168.3.5&release=2.7.8&side=consumer&sticky=false&timestamp=1616255078292 private volatile URL consumerUrl; // 路由规则,用于过滤不符合要求的远程Invoker private volatile List
routers;} // 静态目录实现类public class StaticDirectory
extends AbstractDirectory
{
// consumerUrl 对应的远程对象集合 private final List
> invokers; // 目录对应的接口 public Class
getInterface() { return invokers.get(0).getInterface(); } // 根据url利用spi提取router对象并创建路由链对象 public void buildRouterChain() { RouterChain
routerChain = RouterChain.buildChain(getUrl()); routerChain.setInvokers(invokers); this.setRouterChain(routerChain); } // 根据传入运行时会话invocation通过Router规则对invokers进行过滤 // 返回符合要求的 @Override protected List
> doList(Invocation invocation) throws RpcException { List
> finalInvokers = invokers; if (routerChain != null) { finalInvokers = routerChain.route(getConsumerUrl(), invocation); } return finalInvokers == null ? Collections.emptyList() : finalInvokers; }}

服务路由

上述的routerChain对象就是由服务路由Router组成的一条链路。其作用是

在刷新 Invoker 列表的过程中,会通过 Router 进行服务路由,筛选出符合路由规则的服务提供者。Dubbo 目前提供了三种服务路由实现。

条件路由 ConditionRouter

条件路由规则由两个条件组成,分别用于对服务消费者和提供者进行匹配。 比如有这样一条规则:

  • host = 10.20.153.10 => host = 10.20.153.11
  • 该条规则表示 IP 为 10.20.153.10 的服务消费者只可调用 IP 为 10.20.153.11 机器上的服务

如果服务消费者匹配条件为空,表示不对服务消费者进行限制。如果服务提供者匹配条件为空,表示对某些服务消费者禁用服务。

脚本路由 ScriptRouter

支持 JDK 脚本引擎的所有脚本,比如:javascript, jruby, groovy 等,通过 type=javascript 参数设置脚本类型,缺省为 javascript。如:

  • 所有包含get的方法全部调用192.168.0.23这台提供者
(function(invokers,invocation,context){
var methodName = invocation.methodName;var resultInvokers=[];for(var i=0;i
=0&&providerHost=='192.168.0.23'){
resultInvokers.push(invoker); }}//该方法返回值必须是一个Invoker的集合或者数组return resultInvokers;})(invokers,invocation,context);

其实就是用脚本的方式实现写个方法执行

标签路由 TagRouter

标签路由通过将某一个或多个服务的提供者划分到同一个分组,约束流量只在指定分组中流转,从而实现流量隔离的目的。

  • 对于provider端,可以通过静态或动态的方式对服务进行打标签,静态打标可以在JVM的启动参数上增加-Ddubbo.provider.tag={env_tag}来实现,动态打标则是直接修改provider在注册中心上的地址实现
  • 对于consumer端,请求标签的作用域为每一次 invocation,使用attachment来传递请求标签.目前仅仅支持硬编码的方式设置dubbo.tag

它与dubbo的version、group机制有什么区别?

  • dubbo的version与group是静态的路由,如果URL中带了不匹配的version,在上图代码中的invokers就根本不存在该服务提供者;
  • tag路由是动态的,就算带了不匹配的tag,也是会放在invokers中,每次调用时都执行一次路由逻辑。

标签路由降级约定

  • consumer dubbo.tag=tag1 时优先选择标记了tag=tag1的provider。若集群中不存在与请求标记对应的服务,默认将降级请求 tag为空的provider;如果要改变这种默认行为,即找不到匹配tag1的provider返回异常,需设置request.tag.force=true;
  • consumer dubbo.tag未设置时,只会匹配tag为空的provider。即使集群中存在可用的服务,若tag不匹配也就无法调用,这与约定1不同,携带标签的请求可以降级访问到无标签的服务,但不携带标签/携带其他种类标签的请求永远无法访问到其他标签的服务。

负载均衡

FailoverClusterInvoker中的select()方法最近会使用到负载均衡,何为负载均衡?它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况。Dubbo 提供了4种负载均衡实现

基于权重随机算法的 RandomLoadBalance
  1. 假设我们有一组服务器 servers = [A, B, C] 对应的权重为 weights = [5, 3, 2],权重总和为10。
  2. 现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。
  3. 接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。

权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。比如,经过一万次选择后,服务器 A 被选中的次数大约为5000次,服务器 B 被选中的次数约为3000次,服务器 C 被选中的次数约为2000次。

基于最少活跃调用数算法的 LeastActiveLoadBalance
  1. 每个服务提供者对应一个活跃数 active。
  2. 初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。
  3. 某一时刻它们的活跃数相同,此时 Dubbo 会根据它们的权重去分配请求,权重越大,获取到新请求的概率就越大。

在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求

基于 hash 一致性的 ConsistentHashLoadBalance

一致性 hash 算法由麻省理工学院的 Karger 及其合作者于1997年提出的,算法提出之初是用于大规模缓存系统的负载均衡。它的工作过程是这样的

  1. 首先根据 ip 或者其他的信息为缓存节点生成一个 hash,并将这个 hash 投射到 [0, 232 - 1] 的圆环上。
  2. 当有查询或写入请求时,则为缓存项的 key 生成一个 hash 值。然后查找第一个大于或等于该 hash 值的缓存节点,并到这个节点中查询或写入缓存项。
  3. 如果当前节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其 hash 值的缓存节点即可。
    在这里插入图片描述

大致效果如上图所示,每个缓存节点在圆环上占据一个位置。如果缓存项的 key 的 hash 值小于缓存节点 hash 值,则到该缓存节点中存储或读取缓存项。比如下面绿色点对应的缓存项将会被存储到 cache-2 节点中。由于 cache-3 挂了,原本应该存到该节点中的缓存项最终会存储到 cache-4 节点中。

我们把上图的缓存节点替换成 Dubbo 的服务提供者

在这里插入图片描述

这里相同颜色的节点均属于同一个服务提供者,比如 Invoker1-1,Invoker1-2,……, Invoker1-160。这样做的目的是通过引入虚拟节点,让 Invoker 在圆环上分散开来,避免数据倾斜问题。所谓数据倾斜是指,由于节点不够分散,导致大量请求落到了同一个节点上,而其他节点只会接收到了少量请求的情况

基于加权轮询算法的 RoundRobinLoadBalance
  1. 假设我们有三台服务器 servers = [A, B, C],对应的权重为 weights = [2, 5, 1]
  2. 三次请求分别返回A,B,C,weight=[1,4,0]
  3. 二次请求分别反回A,B,weight=[0,3,0]
  4. 三次请求全返回B
  5. 恢复新的一轮,weights = [2, 5, 1]
  • 我们有三台服务器 A、B、C。我们将第一个请求分配给服务器 A,第二个请求分配给服务器 B,第三个请求分配给服务器 C,第四个请求再次分配给服务器 A。这个过程就叫做轮询。
  • 但现实情况下,我们并不能保证每台服务器性能均相近。如果我们将等量的请求分配给性能较差的服务器,这显然是不合理的。因此,这个时候我们需要对轮询过程进行加权,以调控每台服务器的负载。经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。比如服务器 A、B、C 权重比为 5:2:1。那么在8次请求中,服务器 A 将收到其中的5次请求,服务器 B 会收到其中的2次请求,服务器 C 则收到其中的1次请求。

以默认的RandomLoadBalance为例子源码分析

public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random"; private final Random random = new Random(); @Override protected
Invoker
doSelect(List
> invokers, URL url, Invocation invocation) {
int length = invokers.size(); int totalWeight = 0; boolean sameWeight = true; // 下面这个循环有两个作用,第一是计算总权重 totalWeight, // 第二是检测每个服务提供者的权重是否相同 for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation); // 累加权重 totalWeight += weight; // 检测当前服务提供者的权重与上一个服务提供者的权重是否相同, // 不相同的话,则将 sameWeight 置为 false。 if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false; } } // 下面的 if 分支主要用于获取随机数,并计算随机数落在哪个区间上 if (totalWeight > 0 && !sameWeight) {
// 随机获取一个 [0, totalWeight) 区间内的数字 int offset = random.nextInt(totalWeight); // 循环让 offset 数减去服务提供者权重值,当 offset 小于0时,返回相应的 Invoker。 // 举例说明一下,我们有 servers = [A, B, C],weights = [5, 3, 2],offset = 7。 // 第一次循环,offset - 5 = 2 > 0,即 offset > 5, // 表明其不会落在服务器 A 对应的区间上。 // 第二次循环,offset - 3 = -1 < 0,即 5 < offset < 8, // 表明其会落在服务器 B 对应的区间上 for (int i = 0; i < length; i++) {
// 让随机值 offset 减去权重值 offset -= getWeight(invokers.get(i), invocation); if (offset < 0) {
// 返回相应的 Invoker return invokers.get(i); } } } // 如果所有服务提供者权重值相同,此时直接随机返回一个即可 return invokers.get(random.nextInt(length)); }}

集群

为了避免单点故障,现在的应用通常至少会部署在两台服务器上。Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker。集群 Cluster 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。Dubbo 主要提供了这样几种容错方式:

  • FailoverClusterInvoker - 失败自动切换,在调用失败时,会自动切换 Invoker 进行重试。默认配置。
  • FailfastClusterInvoker - 快速失败,只会进行一次调用,失败后立即抛出异常。适用于幂等操作,比如新增记录。
  • FailsafeClusterInvoker - 失败安全,当调用过程中出现异常时,FailsafeClusterInvoker 仅会打印异常,而不会抛出异常。适用于写入审计日志等操作。
  • FailbackClusterInvoke - 失败自动恢复,会在调用失败后,返回一个空结果给服务消费者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。
  • ForkingClusterInvoker - 并行调用多个服务提供者,会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。

以默认的FailoverClusterInvoker为例子进行源码分析

public class
extends AbstractClusterInvoker
{
// 省略部分代码 @Override public Result doInvoke(Invocation invocation, final List
> invokers, LoadBalance loadbalance) throws RpcException {
List
> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); // 获取重试次数 int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } RpcException le = null; List
> invoked = new ArrayList
>(copyinvokers.size()); Set
providers = new HashSet
(len); // 循环调用,失败重试 for (int i = 0; i < len; i++) { if (i > 0) { checkWhetherDestroyed(); // 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了 // 通过调用list可得到最新可用符合路由规则的Invoker列表 copyinvokers = list(invocation); // 对 copyinvokers 进行判空检查 checkInvokers(copyinvokers, invocation); } // 1. 如果粘滞连接开启,则使用粘滞连接 // 2. 用均衡算法选取一个 // 3. 发现2选取的是已用过的排除 // 4. 如果3步聚全部排序了,则在已用过的选取一个 Invoker
invoker = select(loadbalance, invocation, copyinvokers, invoked); // 添加到 invoker 到 invoked 列表中 invoked.add(invoker); // 设置 invoked 到 RPC 上下文中 RpcContext.getContext().setInvokers((List) invoked); try { // 调用目标 Invoker 的 invoke 方法 Result result = invoker.invoke(invocation); return result; } catch (RpcException e) { if (e.isBiz()) { throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } // 若重试失败,则抛出异常 throw new RpcException(..., "Failed to invoke the method ..."); }}

注册中心

如果一直使用静态服务目录,dubbo集群将变的很死板。因此dubbo提供了动态RegistryDirectory动态服务目录。

public class RegistryDirectory
extends AbstractDirectory
implements NotifyListener {
// 集群策略,默认为failover。 private static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension(); // 路由工厂,如果注册中心那边发过来路由策略变化,则根据此工厂类创建新路由 private static final RouterFactory ROUTER_FACTORY = ExtensionLoader.getExtensionLoader(RouterFactory.class) .getAdaptiveExtension(); // 服务key唯一标识,{group}/{interfaceName}:{version} private final String serviceKey; // 服务提供者接口类interfaceName private final Class
serviceType; // refer中所有的参数 private final Map
queryMap; // 与父亲consumerUrl相同, 注册中心url + refer参数 private final URL directoryUrl; // 是否引用多个服务组, group = aa,bb 或者 * // @see toMergeInvokerList private final boolean multiGroup; // 与服务端通讯所使用的协议,将注册中新提交的url转变成Invoker private Protocol protocol; // 注册中心,上面的url发生变动都会通知本类 private Registry registry; // 是否要把自己的registeredConsumerUrl信息注册到注册中心去 private boolean shouldRegister; // registeredConsumerUrl包括必要的服务治理数据 private boolean shouldSimplified; // 最终url = 注册中心url + exportUrl参数 + referUrl参数 + configurators 配置 private volatile URL overrideDirectoryUrl; // refer Url private volatile URL registeredConsumerUrl; // 注册中心,手动配置的参数信息信息 private volatile List
configurators; // 服务提供者, exportUrl - Invoker private volatile Map
> urlInvokerMap; // 服务提供者, Invoker集合 private volatile List
> invokers; // 服务提供者, 所有的exportUrl private volatile Set
cachedInvokerUrls; }

由属性可知其完成依赖于注册中心的服务发现.Demo如下

public class RegistryTest  {
public static void main(String[] args) throws Exception {
ApplicationModel.setApplication("app"); ServiceRepository repository = ApplicationModel.getServiceRepository(); ServiceDescriptor serviceDescriptor = repository.registerService(GreetingsService.class); MethodDescriptor methodDescriptor = serviceDescriptor.getMethod("sayHi", new Class[]{
String.class}); Method method = methodDescriptor.getMethod(); String serviceName = serviceDescriptor.getServiceName(); repository.registerProvider(serviceName, new GreetingsServiceImpl(),serviceDescriptor, null, null ); // 用于创建ZookeeperRegistry RegistryFactory registryFactory = new ZookeeperRegistryFactory(); String exportUrl = URL.decode("dubbo://127.0.0.1:28092/dubbo.GreetingsService?monitor=mm&timeout=12000"); String referUrl = URL.decode("register.ip=127.0.0.1:28092"); URL url = URL.valueOf("zookeeper://127.0.0.1:2181/dubbo.GreetingsService?registry=multicast&refer="+referUrl+"&export=" + exportUrl); // 通过RegistryProtocol可以把注册中心与服务目录关联起来 RegistryProtocol protocol = new RegistryProtocol(); protocol.setProtocol(new DubboProtocol()); protocol.setRegistryFactory(registryFactory); protocol.export(new Invoker
() {
// ... @Override public Result invoke(Invocation invocation) throws RpcException {
System.out.println("22yoyoyoy"); return AsyncRpcResult.newDefaultAsyncResult(invocation); } }); RegistryProtocol protocol2 = new RegistryProtocol(); protocol2.setProtocol(new DubboProtocol()); protocol2.setRegistryFactory(registryFactory); Invoker invoker = protocol2.refer(GreetingsService.class, url); invoker.invoke(new RpcInvocation(method, serviceName, new String[]{
"yoyo"})); }}
  1. RegistryProtocol.export方法会把url注册到注册中心上去
  2. RegistryProtocol.doRefer会根据url订阅注册中心变更事件,通过NotifyListener反通知RegistryDirectory
  3. 注册中心通知事件不仅仅总共有三种:
    1. configuratorURL ,在管理人员,人工的配置信息
    2. routerURLs, 管理人员,设置的路由信息
    3. providerURLs 提供者,由于代码修改,启停所导致的变动

关于ZookeeperRegistry注册中心的细节可参考《 》

主要参考

《》

《》
《》
《》
《》
《 》

转载地址:https://blog.csdn.net/y3over/article/details/114991588 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:dubbo之服务配置及启停
下一篇:dubbo之Filter监控&拦截

发表评论

最新留言

留言是一种美德,欢迎回访!
[***.207.175.100]2024年03月28日 10时38分24秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

【Leetcode刷题篇】leetcode312 戳气球 2019-04-26
前后端分离如何使用spring boot处理跨域请求 2019-04-26
【Leetcode刷题篇】leetcode283 移动零 2019-04-26
【Leetcode刷题篇】leetcode611 有效三角形的个数 2019-04-26
【Leetcode刷题篇】leetcode26 删除排序数组中的重复项 2019-04-26
【大话Java面试】-如何通俗易懂的理解Redis的分布式寻址算法hash slot? 2019-04-26
【大话Java面试】-如何通俗易懂的理解单例模式? 2019-04-26
【大话Java面试】请列出Java中几个常用的设计模式? 2019-04-26
【大话Java面试】-如何通俗易懂的理解Java异常以及Java异常处理? 2019-04-26
【大话Mysql面试】-Mysql的索引为什么要使用B+树,而不是B树,红黑树等之类? 2019-04-26
【大话Mysql面试】-如何通俗易懂的了解Mysql的索引最左前缀匹配原则 2019-04-26
【大话Mysql面试】-MYSQL的两种存储引擎MyISAM与InnoDB的区别是什么? 2019-04-26
【大话Mysql面试】-InnoDB可重复读隔离级别下如何避免幻读?MVCC和next-key是什么 2019-04-26
【大话Mysql面试】-Mysql如何恢复数据?如何进行主从复制?Binlog日志到底是什么? 2019-04-26
理解String.intern()和String类常量池疑难解析例子 2019-04-26
python flask打造前后端分离的口罩检测 2019-04-26
【大话Mysql面试】-MySQL基础知识 2019-04-26
【大话Mysql面试】-MySQL数据类型有哪些 2019-04-26
【大话Mysql面试】-MySQL数据引擎 2019-04-26
【大话Mysql面试】-常见SQL语句书写 2019-04-26