SpringCloud - Hystrix的执行流程
发布日期:2021-06-30 12:35:51 浏览次数:2 分类:技术文章

本文共 11701 字,大约阅读时间需要 39 分钟。

0 Hystrix执行原理图

1 创建HystrixCommand/HystrixObservableCommand

一个HystrixCommandHystrixObservableCommand对象,代表对某个依赖服务发起的一次请求或者调用

构造的时候,可在构造器中传入任何需要的参数。

  • HystrixCommand仅返回一个结果的调用。
  • HystrixObservableCommand可能会返回多条结果的调用。

直接继承HystrixCommand并实现run方法即可。

public class GetUserAccountCommand extends HystrixCommand
{
... @Override protected UserAccount run() {
/* 模拟执行网络调用以检索用户信息 */ try {
Thread.sleep((int) (Math.random() * 10) + 2); } catch (InterruptedException e) {
} /* 5%的时间失败来说明fallback的工作原理 */ if (Math.random() > 0.95) {
throw new RuntimeException("random failure processing UserAccount network response"); } /* 延迟会增加5%的时间,因此有时会触发超时 */ if (Math.random() > 0.95) {
// 随机等待时间尖峰 try {
Thread.sleep((int) (Math.random() * 300) + 25); } catch (InterruptedException e) {
} } /* 成功...使用远程服务响应的数据创建UserAccount */ return new UserAccount(86975, "John James", 2, true, false, true); } ...}

2 调用command的执行方法

执行Command就可以发起一次对依赖服务的调用

要执行Command,需要在4个方法中选择其中的一个

  • 前两种是HystrixCommand独有的哦

2.1 execute()

/**     * 用于同步执行 command.     *      * @return R     *         如果command由于任何原因失败,则执行 #run 或从 #getFallback() fallback的结果.     *     * @throws HystrixRuntimeException     *             如果发生故障并且无法检索fallback     * @throws HystrixBadRequestException     *             如果使用了无效的参数或状态来表示用户故障,而不是系统故障     *     * @throws IllegalStateException     *             如果多次调用     */    public R execute() {
try {
return queue().get(); } catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e)); } }

调用后直接阻塞,直到依赖服务返回单条结果,或抛异常

2.2 queue()

/**     * 用于异步执行命令 command.     *     * 这将使该command在线程池上排队,并在完成后返回一个 Future 以获取结果.     * 注意:如果配置为不在单独的线程中运行,则其效果与 #execute() 相同,并会阻塞.     * 不会抛出异常,而只是切换为同步执行,因此无需更改代码即可 将command从运行在单独的线程切换到调用线程.     * (switch a command from running on a separate thread to the calling thread.)     *      * @return {@code Future 
}执行 #run() 的结果,或者如果command由于任何原因失败,则返回 #getFallback() 的结果. * @throws HystrixRuntimeException * 如果不存在fallback * 如果通过 ExecutionException#getCause() 中的{@code Future.get(), 如果不存在失败发生的话 * 或者如果无法将命令排队(如,短路,线程池/信号被拒绝),则立即返回 * @throws HystrixBadRequestException * 通过 ExecutionException#getCause() 中的 Future.get() 如果使用了无效的参数或状态来表示用户故障而不是系统故障 * @throws IllegalStateException * 如果多次调用 */ public Future
queue() {
/* * 当Future.cancel(boolean)的“ mayInterrupt”标志设为true时 * 由Observable.toBlocking().toFuture() 返回的Future不实现执行线程的中断 * 因此,为了遵守Future的约定,我们必须围绕它. */ final Future
delegate = toObservable().toBlocking().toFuture(); final Future
f = new Future
() {
@Override public boolean cancel(boolean mayInterruptIfRunning) {
if (delegate.isCancelled()) {
return false; } if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
/* * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption, * than that interruption request cannot be taken back. * 这里唯一有效的转换是false -> true. * 如果存在由该命令创建(这很奇怪,但从未禁止过)的两个futures,例如f1和f2, * 并且对f1.cancel(true)和f2.cancel(false)的调用是由不同的线程发起, * 尚不清楚在检查mayInterruptOnCancel时将使用什么值. * 处理这种情况的最一致的方法是说,如果在中断的情况下调用了任何cancellation,则无法撤回该中断请求. */ interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning); } final boolean res = delegate.cancel(interruptOnFutureCancel.get()); if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
final Thread t = executionThread.get(); if (t != null && !t.equals(Thread.currentThread())) {
t.interrupt(); } } return res; } @Override public boolean isCancelled() {
return delegate.isCancelled(); } @Override public boolean isDone() {
return delegate.isDone(); } @Override public R get() throws InterruptedException, ExecutionException {
return delegate.get(); } @Override public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit); } }; /* 对立即抛出的错误状态的特殊处理 */ if (f.isDone()) {
try {
f.get(); return f; } catch (Exception e) {
Throwable t = decomposeException(e); if (t instanceof HystrixBadRequestException) {
return f; } else if (t instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException) t; switch (hre.getFailureType()) {
case COMMAND_EXCEPTION: case TIMEOUT: // 不会仅从 queue().get() 中将这些类型从 queue() 中抛出, 因为它们是执行错误 return f; default: // these are errors we throw from queue() as they as rejection type errors // 这些是从 queue() 抛出的错误,因为它们是拒绝类型错误 throw hre; } } else {
throw Exceptions.sneakyThrow(t); } } } return f; }

调用,返回一个Future,后面可以通过Future获取单条结果

2.3 observe()

订阅一个Observable对象,Observable代表的是依赖服务返回的结果,获取到一个那个代表结果的Observable对象的拷贝对象

  • toObservable()
    返回一个Observable对象,如果我们订阅这个对象,就会执行command并且获取返回结果

其中execute()和queue()仅对HystrixCommand适用

K             value   = command.execute();Future
fValue = command.queue();Observable
ohValue = command.observe(); Observable
ocValue = command.toObservable();
  • execute()实际上会调用queue().get()
  • 在 queue() 方法中,会调用toObservable().toBlocking().toFuture()

即,无论是哪种执行command的方式,最终都是依赖toObservable()

也就是说同步的HystrixCommand最终都会依赖Observable,尽管HystrixCommand是用来发射单个事件的

3 检查是否开启缓存

如果这个command开启了请求缓存(request cache),而且这个调用的结果在缓存中存在,那么直接从缓存中返回结果

否则,继续往后

return Observable.defer(new Func0
>() {
@Override public Observable
call() {
/* 这是一个有状态的对象,因此只能使用一次 */ if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); // TODO 为此创建新的错误类型 throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null); } commandStartTimestamp = System.currentTimeMillis(); if (properties.requestLogEnabled().get()) {
// 记录此命令执行,无论发生什么情况 if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd); } } final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); /* 首先尝试从缓存 */ if (requestCacheEnabled) {
HystrixCommandResponseFromCache
fromCache = (HystrixCommandResponseFromCache
) requestCache.get(cacheKey); if (fromCache != null) {
isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } Observable
hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable
afterCache; // 放入缓存 if (requestCacheEnabled && cacheKey != null) { // 包装以缓存 HystrixCachedObservable
toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache
fromCache = (HystrixCommandResponseFromCache
) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { // 另一个线程击败了我们,因此使用缓存值 toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { // 我们刚刚创建了一个ObservableCommand,所以我们进行了强制转换并返回了它 afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } return afterCache .doOnTerminate(terminateCommandCleanup) // 进行一次清理(在正常终端状态(此行)或退订(下一行)) .doOnUnsubscribe(unsubscribeCommandCleanup) // 进行一次清理 .doOnCompleted(fireOnCompletedHook); } });

上述代码中有个判断final boolean requestCacheEnabled = isRequestCachingEnabled();可以看到如果我们的Command实现了getCacheKey方法,并且requestCacheEnabled(这个属性默认是true,可以通过调用HystrixCommand的构造方法传入一个setter对象修改默认属性)这样就不会执行后续的run方法,就会直接返回一个缓存的Observable。(必须是同一个request context里面的两个command才能用到缓存)

4 检查是否开启短路器

检查这个command对应的依赖服务是否开启短路器

如果断路器被打开了,那么hystrix就不会执行这个command,而是直接执行fallback降级

5 检查线程池/队列/semaphore是否已满

如果command对应的线程池/队列/semaphore已满,那么也不会执行command,而是直接去调用fallback降级机制,同时发送 reject 信息给断路器统计

6 执行command

调用HystrixObservableCommand.construct()或HystrixCommand.run()来实际执行这个command

  • HystrixCommand.run()
    返回一个单条结果,或者抛出一个异常
  • HystrixObservableCommand.construct()
    返回一个Observable对象,可以获取多条结果

如果HystrixCommand.run()或HystrixObservableCommand.construct()的执行,超过了timeout时长的话,那么command所在的线程就会抛出一个TimeoutException

如果timeout了,也会去执行fallback降级机制,而且就不会管run()或construct()返回的值

我们是不可能终止掉一个调用严重延迟的依赖服务的线程的,只能说给你抛出来一个TimeoutException,但是还是可能会因为严重延迟的调用线程占满整个线程池的

即使这个时候新来的流量都被限流了。。。

如果没有timeout的话,那么就会拿到一些调用依赖服务获取到的结果,然后hystrix会做一些logging记录和metric统计

7 短路健康检查

Hystrix会将每一个依赖服务的调用成功,失败,拒绝,超时,等事件,都会发送给circuit breaker断路器

短路器就会对调用成功/失败/拒绝/超时等事件的次数进行统计

短路器会根据这些统计次数来决定,是否要进行短路,如果打开了短路器,那么在一段时间内就会直接短路,然后如果在之后第一次检查发现调用成功了,就关闭断路器

8 调用fallback降级机制

failfast

在 run 方法中直接抛异常快速失败。

fail silent

run 里有个降级方法,内部返回一个空

return null ;return new Option
();return Collections . emptyList();return Collections . emptyMap( );

static fallback

  • 降级方法中返回一个默认值。
return true;return DEFAULT_OBJECT;

fallback by network

  • 主服务挂了,调用辅助服务的一个降级方法(需通过网络传输请求了)

primary + sencondary with fallback

主次降级。将新功能封装在老功能后面。

请求合并

将请求通过时间窗口都合并在一个队列中。

请求缓存

在以下几种情况中,hystrix会调用fallback降级机制

  • run()或construct()抛出一个异常
  • 短路器打开
  • 线程池/队列/semaphore满了
  • command执行超时了

一般在降级机制中,都建议给出一些默认的返回值,比如静态的一些代码逻辑,或者从内存中的缓存中提取一些数据,尽量在这里不要再进行网络请求了

即使在降级中,一定要进行网络调用,也应该将那个调用放在一个HystrixCommand中,进行隔离

  • 在HystrixCommand中,实现getFallback()方法,可以提供降级机制
  • 在HystirxObservableCommand中,实现一个resumeWithFallback()方法,返回一个Observable对象,可以提供降级结果

如果fallback返回了结果,那么hystrix就会返回这个结果

  • 对于HystrixCommand,会返回一个Observable对象,其中会发返回对应的结果
  • 对于HystrixObservableCommand,会返回一个原始的Observable对象

如果没有实现fallback,或者是fallback抛出了异常,Hystrix会返回一个Observable,但是不会返回任何数据

不同的command执行方式,其fallback为空或者异常时的返回结果不同

  • 对于execute(),直接抛出异常
  • 对于queue(),返回一个Future,调用get()时抛出异常
  • 对于observe(),返回一个Observable对象,但是调用subscribe()方法订阅它时,理解抛出调用者的onError方法
  • 对于toObservable(),返回一个Observable对象,但是调用subscribe()方法订阅它时,理解抛出调用者的onError方法

9 不同的执行方式

  • execute(),获取一个Future.get(),然后拿到单个结果
  • queue(),返回一个Future
  • observer(),立即订阅Observable,然后启动8大执行步骤,返回一个拷贝的Observable,订阅时理解回调给你结果
  • toObservable(),返回一个原始的Observable,必须手动订阅才会去执行8大步骤

转载地址:https://javaedge.blog.csdn.net/article/details/95924436 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:高可用后端架构设计实战-利用request cache请求缓存优化批量查询接口
下一篇:高可用架构(10)-Hystrix隔离策略、Command及资源池大小控制

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年05月02日 09时02分28秒