dubbo之Filter监控&拦截
发布日期:2021-05-16 10:23:28 浏览次数:10 分类:技术文章

本文共 9122 字,大约阅读时间需要 30 分钟。

前言

在《》中完成 Protocol 服务化。监控,统计将成首要任务。Dubbo引入ExporterListener,Filter两大概念。

ExporterListener

dubbo通过《》wraper的方式进行注入ProtocolListenerWrapper

相当于 new ProtocolListenerWrapper(new DubboProtocol())

// 主要对export,refer两方法进行代理,然后通知监听者public class ProtocolListenerWrapper implements Protocol {
@Override public
Exporter
export(Invoker
invoker) throws RpcException {
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker); } // 在spi容器中查找Activate注解符合当前url条件的ExporterListener的实现类 return new ListenerExporterWrapper
(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY))); } @Override public
Invoker
refer(Class
type, URL url) throws RpcException { if (UrlUtils.isRegistry(url)) { return protocol.refer(type, url); } return new ListenerInvokerWrapper
(protocol.refer(type, url), Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class) .getActivateExtension(url, INVOKER_LISTENER_KEY))); }}

关于Activate注解

  • group: 所属组,String[],例如消费端、服务端。
  • value String[],如果指定该值,只有当消费者或服务提供者URL中包含属性名为value的键值对,该过滤器才处于激活状态。
  • before:String[],用于指定执行顺序,before指定的过滤器在该过滤器之前执行(弃用)。
  • after:string[],用于指定执行顺序,after指定的过滤器在该过滤器之后执行(弃用)。
  • order:用户指定顺序,值越小,越先执行。

Filter

同理注入ProtocolFilterWrapper

// 主要通过代理Invoker完成public class ProtocolFilterWrapper implements Protocol {
private static
Invoker
buildInvokerChain(final Invoker
invoker, String key, String group) {
Invoker
last = invoker; // 在 spi窗口中获取Activate注解符合当前url条件Filter List
filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i); final Invoker
next = last; last = new Invoker
() { // ... @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { // 过滤调用 asyncResult = filter.invoke(next, invocation); } catch (Exception e) { // 异常结果监听 if (filter instanceof ListenableFilter) { ListenableFilter listenableFilter = ((ListenableFilter) filter); try { Filter.Listener listener = listenableFilter.listener(invocation); if (listener != null) { listener.onError(e, invoker, invocation); } } finally { listenableFilter.removeListener(invocation); } } else if (filter instanceof Filter.Listener) { Filter.Listener listener = (Filter.Listener) filter; listener.onError(e, invoker, invocation); } throw e; } finally { } return asyncResult.whenCompleteWithContext((r, t) -> { // 成功监听 if (filter instanceof ListenableFilter) { ListenableFilter listenableFilter = ((ListenableFilter) filter); Filter.Listener listener = listenableFilter.listener(invocation); try { if (listener != null) { if (t == null) { listener.onResponse(r, invoker, invocation); } else { listener.onError(t, invoker, invocation); } } } finally { listenableFilter.removeListener(invocation); } } else if (filter instanceof Filter.Listener) { Filter.Listener listener = (Filter.Listener) filter; if (t == null) { listener.onResponse(r, invoker, invocation); } else { listener.onError(t, invoker, invocation); } } }); } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }}

系统默认的Filter在/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter文件中定义

cache=org.apache.dubbo.cache.filter.CacheFiltervalidation=org.apache.dubbo.validation.filter.ValidationFilterecho=org.apache.dubbo.rpc.filter.EchoFiltergeneric=org.apache.dubbo.rpc.filter.GenericFiltergenericimpl=org.apache.dubbo.rpc.filter.GenericImplFiltertoken=org.apache.dubbo.rpc.filter.TokenFilter// 日志accesslog=org.apache.dubbo.rpc.filter.AccessLogFilteractivelimit=org.apache.dubbo.rpc.filter.ActiveLimitFilterclassloader=org.apache.dubbo.rpc.filter.ClassLoaderFiltercontext=org.apache.dubbo.rpc.filter.ContextFilterconsumercontext=org.apache.dubbo.rpc.filter.ConsumerContextFilterexception=org.apache.dubbo.rpc.filter.ExceptionFilterexecutelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilterdeprecated=org.apache.dubbo.rpc.filter.DeprecatedFiltercompatible=org.apache.dubbo.rpc.filter.CompatibleFiltertimeout=org.apache.dubbo.rpc.filter.TimeoutFiltertps=org.apache.dubbo.rpc.filter.TpsLimitFilter// 链路trace=org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilterfuture=org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter// 监控monitor=org.apache.dubbo.monitor.support.MonitorFiltermetrics=org.apache.dubbo.monitor.dubbo.MetricsFilter

Dubbo监控中心实现原理

声明MonitorFilter

// 表示支持提供者和消费者@Activate(group = {
Constants.PROVIDER, Constants.CONSUMER})public class MonitorFilter implements Filter {
@Override public Result invoke(Invoker
invoker, Invocation invocation) throws RpcException {
// 如果url中存在monitor,则设置了监控中心,收集调用信息 if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis()); // 相关计数 getConcurrent(invoker, invocation).incrementAndGet(); } return invoker.invoke(invocation); // proceed invocation chain } // concurrent counter private AtomicInteger getConcurrent(Invoker
invoker, Invocation invocation) {
String key = invoker.getInterface().getName() + "." + invocation.getMethodName(); return concurrents.computeIfAbsent(key, k -> new AtomicInteger()); } // 成功时 @Override public void onResponse(Result result, Invoker
invoker, Invocation invocation) {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
// 收集成功数据 collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), (long) invocation.get(MONITOR_FILTER_START_TIME), false); // 计数减一 getConcurrent(invoker, invocation).decrementAndGet(); // count down } } // 失败时 @Override public void onError(Throwable t, Invoker
invoker, Invocation invocation) {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
// 收集失败数据 collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), (long) invocation.get(MONITOR_FILTER_START_TIME), true); getConcurrent(invoker, invocation).decrementAndGet(); // count down } } private void collect(Invoker
invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
try {
URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY); // monitorFactory是从spi容器中创建。默认是发个监控中心 Monitor monitor = monitorFactory.getMonitor(monitorUrl); if (monitor == null) {
return; } // 统计数据 URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error); // 发送 monitor.collect(statisticsURL); } catch (Throwable t) {
logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t); } }}

以下根据MonitorFactory扩展点对Monitor的Demo实现

// 增加spi扩展配置// META-INF/services/org.apache.dubbo.monitor.MonitorFactory// dubbo=dubbo.test.MonitorTestpublic class MonitorTest implements MonitorFactory {
public static void main(String[] args) throws Exception {
ServiceRepository repository = ApplicationModel.getServiceRepository(); ServiceDescriptor serviceDescriptor = repository.registerService(GreetingsService.class); MethodDescriptor methodDescriptor = serviceDescriptor.getMethod("sayHi", new Class[]{
String.class}); Method method = methodDescriptor.getMethod(); String serviceName = serviceDescriptor.getServiceName(); URL url = URL.valueOf("dubbo://127.0.0.1:28092/"+serviceName+"?timeout=12000&monitor=mm"); Protocol protocol = new ProtocolFilterWrapper(new DubboProtocol()); protocol.export(new Invoker
() {
// .. @Override public Result invoke(Invocation invocation) throws RpcException {
System.out.println("yoyoy"); return AsyncRpcResult.newDefaultAsyncResult(invocation); } }); Invoker invoker = protocol.refer(GreetingsService.class, url); invoker.invoke(new RpcInvocation(method, serviceName, new String[]{
"yoyo"})); Thread.sleep(60000); } @Override public Monitor getMonitor(URL url) {
return new Monitor() {
@Override public void collect(URL statistics) {
System.out.println("statistics---------" + statistics.toString()); } // .. }; }}

转载地址:https://blog.csdn.net/y3over/article/details/114952373 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:dubbo之集群
下一篇:dubbo之网络通讯

发表评论

最新留言

不错!
[***.144.177.141]2024年03月15日 13时33分05秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

c#引入dll文件报错_(windows平台下)深入详解C++创建动态链接库DLL以及如何使用它(一)... 2019-04-21
ae2020不支持的视频驱动程序_AE脚本一键快速输出GIF动图格式插件脚本 GifGun 1.7.15 Win/Mac 支持AE 2020【资源分享1177】... 2019-04-21
和包支付的钱哪里来_支付宝年度账单来啦!看看你的钱都去哪了? 2019-04-21
testng执行参数_Jenkin+maven+testng动态入参构建自动化 2019-04-21
丁腈橡胶自然老化时间_做一天紫外光老化试验相当于户外环境作用的多少天?... 2019-04-21
注解报错_“步步精心”-Java Annotation注解继承说明 2019-04-21
wdcp安装mysql5.6_安装mysql5.6 2019-04-21
php加载mysql模块_PHP没有加载MySQL扩展模块的解决办法 2019-04-21
mha实现mysql读写分离_MySQL高可用及读写分离(MHA) 2019-04-21
mysql自动提交的概念_MySQL中的事务 2019-04-21
mysql323 mysql40_mysql5的sql文件导入到mysql4的方法 2019-04-21
mysql6.0配置环境变量_linux中 jdk tomcat mysql安装及环境变量配置 2019-04-21
apache远程访问mysql_远程服务器开启https协议,Apache+php+mysql证书配置 2019-04-21
mysql 简朝阳_聚积宝联合创始人& CTO简朝阳——MySQL8核心特性体验 2019-04-21
mapreduce执行过程流程图_Hadoop的MapReduce执行流程图 2019-04-21
mysql查询后生成一张表_mysql 查询记录并插入另一张表 2019-04-21
shell脚本for循环mysql_shell脚本for循环 2019-04-21
mysql mariadb知乎_在 Linux 上检查 MySQL/MariaDB 数据库正常运行时间的三种方法 2019-04-21
python以第一列为索引列_Python-Pandas-DataFrame 如何把df变为以数据中的某一列为index... 2019-04-21
Mysql和JSP之间的联系_建立JSP,MYSQL,FLEX之间的联系 2019-04-21