Dubbo 是如何控制并发数和限流的?
发布日期:2021-06-30 12:58:03 浏览次数:2 分类:技术文章

本文共 5579 字,大约阅读时间需要 18 分钟。

点击关注公众号,Java干货及时送达

ExecuteLimitFilter

ExecuteLimitFilter执行流程:

ExecuteLimitFilter

@Overridepublic Result invoke(Invoker
 invoker, Invocation invocation) throws RpcException {    URL url = invoker.getUrl();    String methodName = invocation.getMethodName();    Semaphore executesLimit = null;    boolean acquireResult = false;    int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);    if (max > 0) {        RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());        //            if (count.getActive() >= max) {        /**             * http://manzhizhen.iteye.com/blog/2386408             * use semaphore for concurrency control (to limit thread number)             */        executesLimit = count.getSemaphore(max);        if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {            throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than 
 limited.");        }    }    long begin = System.currentTimeMillis();    boolean isSuccess = true;    RpcStatus.beginCount(url, methodName);    try {        Result result = invoker.invoke(invocation);        return result;    } catch (Throwable t) {        isSuccess = false;        if (t instanceof RuntimeException) {            throw (RuntimeException) t;        } else {            throw new RpcException("unexpected exception when ExecuteLimitFilter", t);        }    } finally {        RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);        if(acquireResult) {            executesLimit.release();        }    }}

我们接下来看看RpcStatus这个类

private static final ConcurrentMap
> METHOD_STATISTICS = new ConcurrentHashMap
>();public static RpcStatus getStatus(URL url, String methodName) {    String uri = url.toIdentityString();    ConcurrentMap
 map = METHOD_STATISTICS.get(uri);    if (map == null) {        METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap
());        map = METHOD_STATISTICS.get(uri);    }    RpcStatus status = map.get(methodName);    if (status == null) {        map.putIfAbsent(methodName, new RpcStatus());        status = map.get(methodName);    }    return status;}

这个方法很简单,大概就是给RpcStatus这个类里面的静态属性METHOD_STATISTICS里面设值。外层的map是以url为key,里层的map是以方法名为key。

private volatile int executesPermits;public Semaphore getSemaphore(int maxThreadNum) {    if(maxThreadNum <= 0) {        return null;    }    if (executesLimit == null || executesPermits != maxThreadNum) {        synchronized (this) {            if (executesLimit == null || executesPermits != maxThreadNum) {                executesLimit = new Semaphore(maxThreadNum);                executesPermits = maxThreadNum;            }        }    }    return executesLimit;}

这个方法是获取信号量,如果这个实例里面的信号量是空的,那么就添加一个,如果不是空的就返回。另外,关注公众号Java技术栈,在后台回复:面试,可以获取我整理的 Dubbo 系列面试题和答案。

TPSLimiter

TpsLimitFilter 过滤器,用于服务提供者中,提供限流的功能。

配置方式:

通过 <dubbo:parameter key="tps" value="" /> 配置项,添加到 <dubbo:service /> 或 <dubbo:provider /> 或 <dubbo:protocol /> 中开启,例如:

dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoServiceImpl" protocol="injvm" >

通过 <dubbo:parameter key="tps.interval" value="" /> 配置项,设置 TPS 周期。

源码分析

TpsLimitFilter

private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();@Overridepublic Result invoke(Invoker
 invoker, Invocation invocation) throws RpcException {    if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {        throw new RpcException(            "Failed to invoke service " +            invoker.getInterface().getName() +            "." +            invocation.getMethodName() +            " because exceed max service tps.");    }    return invoker.invoke(invocation);}

invoke方法调用了DefaultTPSLimiter的isAllowable,我们进入到isAllowable方法看一下

DefaultTPSLimiter

private final ConcurrentMap
 stats    = new ConcurrentHashMap
();@Overridepublic boolean isAllowable(URL url, Invocation invocation) {    //获取tps这个参数设置的大小    int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);    //获取tps.interval这个参数设置的大小,默认60秒    long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,                                     Constants.DEFAULT_TPS_LIMIT_INTERVAL);    String serviceKey = url.getServiceKey();    if (rate > 0) {        StatItem statItem = stats.get(serviceKey);        if (statItem == null) {            stats.putIfAbsent(serviceKey,                              new StatItem(serviceKey, rate, interval));            statItem = stats.get(serviceKey);        }        return statItem.isAllowable();    } else {        StatItem statItem = stats.get(serviceKey);        if (statItem != null) {            stats.remove(serviceKey);        }    }    return true;}

若要限流,调用 StatItem#isAllowable(url, invocation) 方法,根据 TPS 限流规则判断是否限制此次调用。

StatItem

private long lastResetTime;private long interval;private AtomicInteger token;private int rate;public boolean isAllowable() {    long now = System.currentTimeMillis();    // 若到达下一个周期,恢复可用种子数,设置最后重置时间。    if (now > lastResetTime + interval) {        token.set(rate);// 回复可用种子数        lastResetTime = now;// 最后重置时间    }    // CAS ,直到或得到一个种子,或者没有足够种子    int value = token.get();    boolean flag = false;    while (value > 0 && !flag) {        flag = token.compareAndSet(value, value - 1);        value = token.get();    }    return flag;}

关注公众号Java技术栈,在后台回复:面试,可以获取我整理的 Dubbo 系列面试题和答案。

作者:luozhiyun

出处:www.cnblogs.com/luozhiyun/p/10960593.html

关注Java技术栈看更多干货

戳原文,获取精选面试题!

转载地址:https://javastack.blog.csdn.net/article/details/114274938 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Spring 中的重试机制,简单、实用!
下一篇:Linux 服务器必备的安全设置,建议收藏!

发表评论

最新留言

逛到本站,mark一下
[***.202.152.39]2024年04月24日 09时30分16秒