Hystrix
<h1>高可用架构</h1>
<h2>1 Hystrix介绍</h2>
<h3>1.1 Hystrix 是什么?</h3>
<p>在分布式系统中,每个服务都可能会调用很多其他服务,被调用的那些服务就是依赖服务,有的时候某些依赖服务
出现故障也是很正常的。</p>
<p>Hystrix 可以让我们在分布式系统中对服务间的调用进行控制,加入一些调用延迟或者依赖故障的容错机制。</p>
<p>Hystrix 通过将依赖服务进行资源隔离,进而阻止某个依赖服务出现故障时在整个系统所有的依赖服务调用中进行
蔓延;同时Hystrix 还提供故障时的 fallback 降级机制。</p>
<p>总而言之,Hystrix 通过这些方法帮助我们提升分布式系统的可用性和稳定性。</p>
<h3>1.2 Hystrix的设计原则</h3>
<ul>
<li>对依赖服务调用时出现的调用延迟和调用失败进行控制和容错保护。</li>
<li>在复杂的分布式系统中,阻止某一个依赖服务的故障在整个系统中蔓延。比如某一个服务故障了,导致其它服务
也跟着故障。</li>
<li>提供 fail-fast(快速失败)和快速恢复的支持。</li>
<li>提供 fallback 优雅降级的支持。</li>
<li>支持近实时的监控、报警以及运维操作。</li>
</ul>
<p>举个栗子。</p>
<p>有这样一个分布式系统,服务 A 依赖于服务 B,服务 B 依赖于服务 C/D/E。在这样一个成熟的系统内,比如说最
多可能只有 100 个线程资源。正常情况下,40 个线程并发调用服务 C,各 30 个线程并发调用 D/E。</p>
<p>调用服务 C,只需要 20ms,现在因为服务 C 故障了,比如延迟,或者挂了,此时线程会 hang 住 2s 左右。40
个线程全部被卡住,由于请求不断涌入,其它的线程也用来调用服务 C,
同样也会被卡住。这样导致服务 B 的线程资源被耗尽,无法接收新的请求,甚至可能因为大量线程不断的运转,
导致自己宕机。服务 A 也挂。
<img src="https://s2.ax1x.com/2019/12/17/QIUaCT.png" alt="资源隔离示例图" />
Hystrix 可以对其进行资源隔离,比如限制服务 B 只有 40 个线程调用服务 C。当此 40 个线程被 hang 住时,
其它 60 个线程依然能正常调用工作。从而确保整个系统不会被拖垮。</p>
<h3>1.3 Hystrix 更加细节的设计原则</h3>
<ul>
<li>阻止任何一个依赖服务耗尽所有的资源,比如 tomcat 中的所有线程资源。</li>
<li>避免请求排队和积压,采用限流和 fail fast 来控制故障。</li>
<li>提供 fallback 降级机制来应对故障。</li>
<li>使用资源隔离技术,比如 bulkhead(舱壁隔离技术)、swimlane(泳道技术)、circuit breaker(断路技术)
来限制任何一个依赖服务的故障的影响。</li>
<li>通过近实时的统计/监控/报警功能,来提高故障发现的速度。</li>
<li>通过近实时的属性和配置热修改功能,来提高故障处理和恢复的速度。</li>
<li>保护依赖服务调用的所有故障情况,而不仅仅只是网络故障情况。
<h2>2 基于 Hystrix 线程池技术实现资源隔离</h2>
<p>Hystrix 里面核心的一项功能,其实就是所谓的资源隔离,要解决的最最核心的问题,就是将多个依赖服务的调
用分别隔离到各自的资源池内,不会去用其它资源。避免说对某一个依赖服务的调用,因为依赖服务的接口调用的延迟或者失败,导
致服务所有的线程资源全部耗费在这个服务的接口调用上。一旦说某个服务的线程资源全部耗尽的话,就可能导
致服务崩溃,甚至说这种故障会不断蔓延。</p></li>
</ul>
<p>比如说商品服务,现在同时发起的调用量已经到了 1000,但是线程池内就 10 个线程,最多就只会用这 10 个线
程去执行,不会说,对商品服务的请求,因为接口调用延时,将 tomcat 内部所有的线程资源全部耗尽。</p>
<p>Hystrix 实现资源隔离,主要有两种技术:</p>
<ul>
<li>线程池</li>
<li>信号量</li>
</ul>
<p>默认情况下,Hystrix 使用线程池模式。这里讲解的是线程池隔离技术。</p>
<p>Hystrix 进行资源隔离,其实是提供了一个抽象,叫做 command。这也是 Hystrix 最最基本的资源隔离技术。</p>
<h3>2.1 利用 HystrixCommand 获取单条数据</h3>
<p>我们通过将调用商品服务的操作封装在 HystrixCommand 中,限定一个 key,比如下面的
GetProductInfoCommandGroup,在这里我们可以简单认为这是一个线程池,每次调用商品服务,就只会用该线程
池中的资源,不会再去用其它线程资源了。</p>
<pre><code class="language-java">public class GetProductInfoCommand extends HystrixCommand<ProductInfo> {
private Long productId;
public GetProductInfoCommand(Long productId) {
super(HystrixCommandGroupKey.Factory.asKey("GetProductInfoCommandGroup"));
this.productId = productId;
}
@Override
protected ProductInfo run() {
String url = "http://localhost:8081/getProductInfo?productId=" + productId;
// 调用商品服务接口
String response = HttpClientUtils.sendGetRequest(url);
return JSONObject.parseObject(response, ProductInfo.class);
}
}</code></pre>
<p>我们在缓存服务接口中,根据 productId 创建 command 并执行,获取到商品数据。</p>
<pre><code>@RequestMapping("/getProductInfo")
@ResponseBody
public String getProductInfo(Long productId) {
HystrixCommand<ProductInfo> getProductInfoCommand = new GetProductInfoCommand(productId);
// 通过command执行,获取最新商品数据
ProductInfo productInfo = getProductInfoCommand.execute();
System.out.println(productInfo);
return "success";
}</code></pre>
<p>上面执行的是 execute() 方法,其实是同步的。也可以对 command 调用 queue() 方法,它仅仅是将 command
放入线程池的一个等待队列,就立即返回,拿到一个 Future 对象,后面可以继续做其它一些事情,然后过一段
时间对 Future 调用 get() 方法获取数据。这是异步的。</p>
<h3>2.2 利用 HystrixObservableCommand 批量获取数据</h3>
<p>只要是获取商品数据,全部都绑定到同一个线程池里面去,我们通过 HystrixObservableCommand 的一个线程去
执行,而在这个线程里面,批量把多个 productId 的 productInfo 拉回来。</p>
<pre><code class="language-java">public class GetProductInfosCommand extends HystrixObservableCommand<ProductInfo> {
private String[] productIds;
public GetProductInfosCommand(String[] productIds) {
// 还是绑定在同一个线程池
super(HystrixCommandGroupKey.Factory.asKey("GetProductInfoGroup"));
this.productIds = productIds;
}
@Override
protected Observable<ProductInfo> construct() {
return Observable.unsafeCreate((Observable.OnSubscribe<ProductInfo>) subscriber -> {
for (String productId : productIds) {
// 批量获取商品数据
String url = "http://localhost:8081/getProductInfo?productId=" + productId;
String response = HttpClientUtils.sendGetRequest(url);
ProductInfo productInfo = JSONObject.parseObject(response, ProductInfo.class);
subscriber.onNext(productInfo);
}
subscriber.onCompleted();
}).subscribeOn(Schedulers.io());
}
}</code></pre>
<p>在缓存服务接口中,根据传来的 id 列表,比如是以 , 分隔的 id 串,通过上面的 HystrixObservableCommand,
执行 Hystrix 的一些 API 方法,获取到所有商品数据。</p>
<pre><code>public String getProductInfos(String productIds) {
String[] productIdArray = productIds.split(",");
HystrixObservableCommand<ProductInfo> getProductInfosCommand = new GetProductInfosCommand(productIdArray);
Observable<ProductInfo> observable = getProductInfosCommand.observe();
observable.subscribe(new Observer<ProductInfo>() {
@Override
public void onCompleted() {
System.out.println("获取完了所有的商品数据");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
/**
* 获取完一条数据,就回调一次这个方法
* @param productInfo
*/
@Override
public void onNext(ProductInfo productInfo) {
System.out.println(productInfo);
}
});
return "success";
}</code></pre>
<p>我们回过头来,看看 Hystrix 线程池技术是如何实现资源隔离的。
<img src="https://s2.ax1x.com/2019/12/17/QIUtU0.png" alt="线程池实现资源隔离" /></p>
<p>从 Nginx 开始,缓存都失效了,那么 Nginx 通过缓存服务去调用商品服务。缓存服务默认的线程大小是 10 个,
最多就只有 10 个线程去调用商品服务的接口。即使商品服务接口故障了,最多就只有 10 个线程会 hang 死在
调用商品服务接口的路上,缓存服务的 tomcat 内其它的线程还是可以用来调用其它的服务,干其它的事情。</p>
<h2>3 基于 Hystrix 信号量机制实现资源隔离</h2>
<h3>3.1 信号量机制</h3>
<p>信号量的资源隔离只是起到一个开关的作用,比如,服务 A 的信号量大小为 10,那么就是说它同时只允许有 10
个 tomcat 线程来访问服务 A,其它的请求都会被拒绝,从而达到资源隔离和限流保护的作用。
<img src="https://s2.ax1x.com/2019/12/17/QIU8Ds.png" alt="信号量机制" /></p>
<h3>3.2 线程池与信号量的区别</h3>
<p>线程池隔离技术,并不是说去控制类似 tomcat 这种 web 容器的线程。更加严格的意义上来说,Hystrix 的线
程池隔离技术,控制的是 tomcat 线程的执行。Hystrix 线程池满后,会确保说,tomcat 的线程不会因为依赖
服务的接口调用延迟或故障而被 hang 住,tomcat 其它的线程不会卡死,可以快速返回,然后支撑其它的事情。</p>
<p>线程池隔离技术,是用 Hystrix 自己的线程去执行调用;而信号量隔离技术,是直接让 tomcat 线程去调用依
赖服务。信号量隔离,只是一道关卡,信号量有多少,就允许多少个 tomcat 线程通过它,然后去执行。
<img src="https://s2.ax1x.com/2019/12/17/QIUlvQ.png" alt="线程池与信号量" /></p>
<h4>3.2.1 适用场景</h4>
<ul>
<li>线程池技术,适合绝大多数场景,比如说我们对依赖服务的网络请求的调用和访问、需要对调用的 timeout
进行控制(捕捉 timeout 超时异常)。</li>
<li>信号量技术,适合说你的访问不是对外部依赖的访问,而是对内部的一些比较复杂的业务逻辑的访问,并且
系统内部的代码,其实不涉及任何的网络请求,那么只要做信号量的普通限流就可以了,因为不需要去捕获
timeout 类似的问题。</li>
</ul>
<p>信号量隔离的优点:不用自己管理线程池啦,不用 care timeout 超时啦,也不需要进行线程的上下文切换啦。
信号量做隔离的话,性能相对来说会高一些。</p>
<h2>4 Hystrix 隔离策略细粒度控制</h2>
<p>对资源隔离这一块东西,其实可以做一定细粒度的一些控制。</p>
<ul>
<li>execution.isolation.strategy:
指定了 HystrixCommand.run() 的资源隔离策略:THREAD or SEMAPHORE,一种基于线程池,一种基于信号量。</li>
</ul>
<pre><code>// to use thread isolation
HystrixCommandProperties.Setter().withExecutionIsolationStrategy(ExecutionIsolationStrategy.THREAD)
// to use semaphore isolation
HystrixCommandProperties.Setter().withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)</code></pre>
<p>线程池机制,每个 command 运行在一个线程中,限流是通过线程池的大小来控制的;信号量机制,command 是
运行在调用线程中,通过信号量的容量来进行限流。Hystrix默认的策略就是线程池。</p>
<p>线程池机制其实最大的好处就是对于网络访问请求,如果有超时的话,可以避免调用线程阻塞住。</p>
<p>信号量机制通常是针对超大并发量的场景下,每个服务实例每秒都几百的 QPS,那么此时你用线程池的话,
线程一般不会太多,可能撑不住那么高的并发,如果要撑住,可能要耗费大量的线程资源,那么就是用信号量,
来进行限流保护。一般用信号量常见于那种基于纯内存的一些业务逻辑服务,而不涉及到任何网络访问请求。</p>
<ul>
<li>command key & command group:
使用线程池隔离,需要对依赖服务、依赖服务接口、线程池进行划分。</li>
</ul>
<p>每一个 command,都可以设置一个自己的名称 command key,同时可以设置一个自己的组 command group。</p>
<pre><code>private static final Setter cachedSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"));
public CommandHelloWorld(String name) {
super(cachedSetter);
this.name = name;
}</code></pre>
<p>command group 是一个非常重要的概念,默认情况下,就是通过 command group 来定义一个线程池的,而且还会
通过 command group 来聚合一些监控和报警信息。同一个 command group 中的请求,都会进入同一个线程池中。</p>
<ul>
<li>command thread pool</li>
</ul>
<p>ThreadPoolKey 代表了一个 HystrixThreadPool,用来进行统一监控、统计、缓存。默认的 ThreadPoolKey 就是
command group 的名称。每个 command 都会跟它的 ThreadPoolKey 对应的 ThreadPool 绑定在一起。如果不想
直接用 command group,也可以手动设置 ThreadPool 的名称。</p>
<pre><code>private static final Setter cachedSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool"));
public CommandHelloWorld(String name) {
super(cachedSetter);
this.name = name;
}</code></pre>
<ul>
<li>command key & command group & command thread pool</li>
</ul>
<p>command key ,代表了一类 command,一般来说,代表了底层的依赖服务的一个接口。</p>
<p>command group ,代表了某一个底层的依赖服务,这是很合理的,一个依赖服务可能会暴露出来多个接口,每个
接口就是一个 command key。command group 在逻辑上去组织起来一堆 command key 的调用、统计信息、成功
次数、timeout 超时次数、失败次数等,可以看到某一个服务整体的一些访问情况。一般来说,推荐根据一个服
务区划分出一个线程池,command key 默认都是属于同一个线程池的。</p>
<p>比如说你以一个服务为粒度,估算出来这个服务每秒的所有接口加起来的整体 QPS 在 100 左右,你调用这个服
务,当前这个服务部署了 10 个服务实例,每个服务实例上,其实用这个 command group 对应这个服务,给一个
线程池,量大概在 10 个左右就可以了,你对整个服务的整体的访问 QPS 就大概在每秒 100 左右。</p>
<p>但是,如果说 command group 对应了一个服务,而这个服务暴露出来的几个接口,访问量很不一样,差异非常之
大。你可能就希望在这个服务 command group 内部,包含的对应多个接口的 command key,做一些细粒度的资
源隔离。就是说,对同一个服务的不同接口,使用不同的线程池。</p>
<pre><code>command key -> command group
command key -> 自己的 thread pool key</code></pre>
<p>辑上来说,多个 command key 属于一个command group,在做统计的时候,会放在一起统计。每个 command key
有自己的线程池,每个接口有自己的线程池,去做资源隔离和限流。</p>
<p>说白点,就是说如果你的 command key 要用自己的线程池,可以定义自己的 thread pool key,就 ok 了。</p>
<ul>
<li>coreSize:设置线程池的大小,默认是 10。一般来说,用这个默认的 10 个线程大小就够了。
<pre><code>HystrixThreadPoolProperties.Setter().withCoreSize(int value);</code></pre></li>
<li>queueSizeRejectionThreshold
如果说线程池中的 10 个线程都在工作中,没有空闲的线程来做其它的事情,此时再有请求过来,会先进入队列
积压。如果说队列积压满了,再有请求过来,就直接 reject,拒绝请求,执行 fallback 降级的逻辑,快速返
回。因为 maxQueueSize 不允许热修改,因此提供这个参数可以热修改,控制队列的最大大小。
<pre><code>HystrixThreadPoolProperties.Setter().withQueueSizeRejectionThreshold(int value);</code></pre></li>
<li>execution.isolation.semaphore.maxConcurrentRequests
设置使用 SEMAPHORE 隔离策略的时候允许访问的最大并发量,超过这个最大并发量,请求直接被 reject。</li>
</ul>
<p>这个并发量的设置,跟线程池大小的设置,应该是类似的,但是基于信号量的话,性能会好很多,而且 Hystrix
框架本身的开销会小很多。</p>
<pre><code>HystrixCommandProperties.Setter().withExecutionIsolationSemaphoreMaxConcurrentRequests(int value);</code></pre>
<h2>5 深入 Hystrix 执行时内部原理</h2>
<p>前面我们了解了 Hystrix 最基本的支持高可用的技术:资源隔离 + 限流。</p>
<ul>
<li>创建 command;</li>
<li>执行这个 command;</li>
<li>配置这个 command 对应的 group 和线程池。</li>
</ul>
<p>这里,我们要讲一下,你开始执行这个 command,调用了这个 command 的 execute() 方法之后,Hystrix 底层
的执行流程和步骤以及原理是什么。具体流程图如下:
<img src="https://s2.ax1x.com/2019/12/17/QIUYEq.png" alt="command执行流程图" /></p>
<h3>5.1 创建command</h3>
<p>一个 HystrixCommand 或 HystrixObservableCommand 对象,代表了对某个依赖服务发起的一次请求或者调用。
创建的时候,可以在构造函数中传入任何需要的参数。</p>
<ul>
<li>HystrixCommand 主要用于仅仅会返回一个结果的调用。</li>
<li>HystrixObservableCommand 主要用于可能会返回多条结果的调用。</li>
</ul>
<pre><code>// 创建 HystrixCommand
HystrixCommand hystrixCommand = new HystrixCommand(arg1, arg2);
// 创建 HystrixObservableCommand
HystrixObservableCommand hystrixObservableCommand = new HystrixObservableCommand(arg1, arg2);</code></pre>
<h3>5.2 调用 command 执行方法</h3>
<p>执行 command,就可以发起一次对依赖服务的调用。</p>
<p>要执行 command,可以在 4 个方法中选择其中的一个:execute()、queue()、observe()、toObservable()。</p>
<p>其中 execute() 和 queue() 方法仅仅对 HystrixCommand 适用。</p>
<ul>
<li>execute():调用后直接 block 住,属于同步调用,直到依赖服务返回单条结果,或者抛出异常。</li>
<li>queue():返回一个 Future,属于异步调用,后面可以通过 Future 获取单条结果。</li>
<li>observe():订阅一个 Observable 对象,Observable 代表的是依赖服务返回的结果,获取到一个那个代表结
果的 Observable 对象的拷贝对象。</li>
<li>toObservable():返回一个 Observable 对象,如果我们订阅这个对象,就会执行 command 并且获取返回结果。</li>
</ul>
<pre><code>K value = hystrixCommand.execute();
Future<K> fValue = hystrixCommand.queue();
Observable<K> oValue = hystrixObservableCommand.observe();
Observable<K> toOValue = hystrixObservableCommand.toObservable();</code></pre>
<p>execute() 实际上会调用 queue().get() 方法,可以看一下 Hystrix 源码。</p>
<pre><code>public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}</code></pre>
<p>而在 queue() 方法中,会调用 toObservable().toBlocking().toFuture()。</p>
<pre><code>final Future<R> delegate = toObservable().toBlocking().toFuture();</code></pre>
<p>也就是说,先通过 toObservable() 获得 Future 对象,然后调用 Future 的 get() 方法。那么,其实无论是哪
种方式执行 command,最终都是依赖于 toObservable() 去执行的。</p>
<h3>5.3 检查是否开启缓存</h3>
<p>从这一步开始,就进入到 Hystrix 底层运行原理啦,看一下 Hystrix 一些更高级的功能和特性。</p>
<p>如果这个 command 开启了请求缓存 Request Cache,而且这个调用的结果在缓存中存在,那么直接从缓存中返
回结果。否则,继续往后的步骤。</p>
<h3>5.4 检查是否开启了断路器</h3>
<p>检查这个 command 对应的依赖服务是否开启了断路器。如果断路器被打开了,那么 Hystrix 就不会执行这个
command,而是直接去执行 fallback 降级机制,返回降级结果。</p>
<h3>5.5 检查线程池、队列/信号量是否已满</h3>
<p>如果这个 command 线程池和队列已满,或者 semaphore 信号量已满,那么也不会执行 command,而是直接去
调用 fallback 降级机制,同时发送 reject 信息给断路器统计。</p>
<h3>5.6 执行 command</h3>
<p>调用 HystrixObservableCommand 对象的 construct() 方法,或者 HystrixCommand 的 run() 方法来实际执
行这个 command。</p>
<ul>
<li>HystrixCommand.run() 返回单条结果,或者抛出异常。
<pre><code>// 通过command执行,获取最新一条商品数据
ProductInfo productInfo = getProductInfoCommand.execute();</code></pre></li>
<li>HystrixObservableCommand.construct() 返回一个 Observable 对象,可以获取多条结果。</li>
</ul>
<pre><code>Observable<ProductInfo> observable = getProductInfosCommand.observe();
// 订阅获取多条结果
observable.subscribe(new Observer<ProductInfo>() {
@Override
public void onCompleted() {
System.out.println("获取完了所有的商品数据");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
/**
* 获取完一条数据,就回调一次这个方法
*
* @param productInfo 商品信息
*/
@Override
public void onNext(ProductInfo productInfo) {
System.out.println(productInfo);
}
});</code></pre>
<p>如果是采用线程池方式,并且 HystrixCommand.run() 或者 HystrixObservableCommand.construct() 的执行时
间超过了 timeout 时长的话,那么 command 所在的线程会抛出一个 TimeoutException,这时会执行 fallback
降级机制,不会去管 run() 或 construct() 返回的值了。另一种情况,如果 command 执行出错抛出了其它异
常,那么也会走 fallback 降级。这两种情况下,Hystrix 都会发送异常事件给断路器统计。</p>
<p>注意,我们是不可能终止掉一个调用严重延迟的依赖服务的线程的,只能说给你抛出来一个 TimeoutException。</p>
<p>如果没有 timeout,也正常执行的话,那么调用线程就会拿到一些调用依赖服务获取到的结果,然后 Hystrix
也会做一些 logging 记录和 metric 度量统计。</p>
<h3>5.7 断路健康检查</h3>
<p>Hystrix 会把每一个依赖服务的调用成功、失败、Reject、Timeout 等事件发送给 circuit breaker 断路器。
断路器就会对这些事件的次数进行统计,根据异常事件发生的比例来决定是否要进行断路(熔断)。如果打开了
断路器,那么在接下来一段时间内,会直接断路,返回降级结果。</p>
<p>如果在之后,断路器尝试执行 command,调用没有出错,返回了正常结果,那么 Hystrix 就会把断路器关闭。</p>
<h3>5.8 调用 fallback 降级机制</h3>
<p>在以下几种情况中,Hystrix 会调用 fallback 降级机制。</p>
<ul>
<li>断路器处于打开状态;</li>
<li>线程池/队列/semaphore满了;</li>
<li>command 执行超时;</li>
<li>run() 或者 construct() 抛出异常。</li>
</ul>
<p>一般在降级机制中,都建议给出一些默认的返回值,比如静态的一些代码逻辑,或者从内存中的缓存中提取一些
数据,在这里尽量不要再进行网络请求了。</p>
<p>在降级中,如果一定要进行网络调用的话,也应该将那个调用放在一个 HystrixCommand 中进行隔离。</p>
<ul>
<li>HystrixCommand 中,实现 getFallback() 方法,可以提供降级机制。</li>
<li>HystrixObservableCommand 中,实现 resumeWithFallback() 方法,返回一个 Observable 对象,可以提供
降级结果。</li>
</ul>
<p>如果没有实现 fallback,或者 fallback 抛出了异常,Hystrix 会返回一个 Observable,但是不会返回任何数
据。不同的 command 执行方式,其 fallback 为空或者异常时的返回结果不同。</p>
<ul>
<li>对于 execute(),直接抛出异常。</li>
<li>对于 queue(),返回一个 Future,调用 get() 时抛出异常。</li>
<li>对于 observe(),返回一个 Observable 对象,但是调用 subscribe() 方法订阅它时,立即抛出调用者的
onError() 方法。</li>
<li>对于 toObservable(),返回一个 Observable 对象,但是调用 subscribe() 方法订阅它时,立即抛出调用者
的 onError() 方法。
<h3>5.9 不同的执行方式</h3></li>
<li>execute(),获取一个 Future.get(),然后拿到单个结果。</li>
<li>queue(),返回一个 Future。</li>
<li>observe(),立即订阅 Observable,然后启动 8 大执行步骤,返回一个拷贝的 Observable,订阅时立即回调给你结果。</li>
<li>toObservable(),返回一个原始的 Observable,必须手动订阅才会去执行 8 大步骤。
<h2>6 基于 request cache 请求缓存技术优化批量商品数据查询接口</h2>
<p>在一次请求上下文中,如果有多个 command,参数都是一样的,调用的接口也是一样的,而结果可以认为也是一
样的。那么这个时候,我们可以让第一个 command 执行返回的结果缓存在内存中,然后这个请求上下文后续的其
它对这个依赖的调用全部从内存中取出缓存结果就可以了。</p></li>
</ul>
<p>这样的话,好处在于不用在一次请求上下文中反复多次执行一样的 command,避免重复执行网络请求,提升整个
请求的性能。</p>
<p>举个栗子。比如说我们在一次请求上下文中,请求获取 productId 为 1 的数据,第一次缓存中没有,那么会从
商品服务中获取数据,返回最新数据结果,同时将数据缓存在内存中。后续同一次请求上下文中,如果还有获取
productId 为 1 的数据的请求,直接从缓存中取就好了。
<img src="https://s2.ax1x.com/2019/12/17/QIUGbn.png" alt="请求缓存" />
HystrixCommand 和 HystrixObservableCommand 都可以指定一个缓存 key,然后 Hystrix 会自动进行缓存,接
着在同一个 request context 内,再次访问的话,就会直接取用缓存。</p>
<h3>6.1 实现 Hystrix 请求上下文过滤器并注册</h3>
<p>定义 HystrixRequestContextFilter 类,实现 Filter 接口。</p>
<pre><code class="language-java">/**
* Hystrix 请求上下文过滤器
*/
public class HystrixRequestContextFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
filterChain.doFilter(servletRequest, servletResponse);
} catch (IOException | ServletException e) {
e.printStackTrace();
} finally {
context.shutdown();
}
}
@Override
public void destroy() {
}
}</code></pre>
<p>然后将该 filter 对象注册到 SpringBoot Application 中。</p>
<pre><code class="language-java">@SpringBootApplication
public class EshopApplication {
public static void main(String[] args) {
SpringApplication.run(EshopApplication.class, args);
}
@Bean
public FilterRegistrationBean filterRegistrationBean() {
FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean(new HystrixRequestContextFilter());
filterRegistrationBean.addUrlPatterns("/*");
return filterRegistrationBean;
}
}</code></pre>
<h3>6.2 command 重写 getCacheKey() 方法</h3>
<p>在 GetProductInfoCommand 中,重写 getCacheKey() 方法,这样的话,每一次请求的结果,都会放在 Hystrix
请求上下文中。下一次同一个 productId 的数据请求,直接取缓存,无须再调用 run() 方法。</p>
<pre><code class="language-java">public class GetProductInfoCommand extends HystrixCommand<ProductInfo> {
private Long productId;
private static final HystrixCommandKey KEY = HystrixCommandKey.Factory.asKey("GetProductInfoCommand");
public GetProductInfoCommand(Long productId) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ProductInfoService"))
.andCommandKey(KEY));
this.productId = productId;
}
@Override
protected ProductInfo run() {
String url = "http://localhost:8081/getProductInfo?productId=" + productId;
String response = HttpClientUtils.sendGetRequest(url);
System.out.println("调用接口查询商品数据,productId=" + productId);
return JSONObject.parseObject(response, ProductInfo.class);
}
/**
* 每次请求的结果,都会放在Hystrix绑定的请求上下文上
*
* @return cacheKey 缓存key
*/
@Override
public String getCacheKey() {
return "product_info_" + productId;
}
/**
* 将某个商品id的缓存清空
*
* @param productId 商品id
*/
public static void flushCache(Long productId) {
HystrixRequestCache.getInstance(KEY,
HystrixConcurrencyStrategyDefault.getInstance()).clear("product_info_" + productId);
}
}</code></pre>
<h2>7 基于本地缓存的 fallback 降级机制</h2>
<p>Hystrix 出现以下四种情况,都会去调用 fallback 降级机制:</p>
<ul>
<li>断路器处于打开的状态。</li>
<li>资源池已满(线程池+队列 / 信号量)。</li>
<li>Hystrix 调用各种接口,或者访问外部依赖,比如 MySQL、Redis、Zookeeper、Kafka 等等,出现了任何异常
的情况。</li>
<li>访问外部依赖的时候,访问时间过长,报了 TimeoutException 异常。
<h3>7.1 两种最经典的降级机制</h3></li>
<li>纯内存数据:在降级逻辑中,你可以在内存中维护一个 ehcache,作为一个纯内存的基于 LRU 自动清理的缓存,让数据放在
缓存内。如果说外部依赖有异常,fallback 这里直接尝试从 ehcache 中获取数据。</li>
<li>默认值:fallback 降级逻辑中,也可以直接返回一个默认值。</li>
</ul>
<p>在 HystrixCommand,降级逻辑的书写,是通过实现 getFallback() 接口;而在 HystrixObservableCommand 中,
则是实现 resumeWithFallback() 方法。</p>
<h2>8 深入 Hystrix 断路器执行原理</h2>
<ul>
<li>RequestVolumeThreshold:
表示在滑动窗口中,至少有多少个请求,才可能触发断路。
<pre><code>HystrixCommandProperties.Setter()
.withCircuitBreakerRequestVolumeThreshold(int)</code></pre>
<p>*ErrorThresholdPercentage:表示异常(报错/超时/reject)比例达到多少,才会触发断路,默认值是 50(%)。</p>
<pre><code>HystrixCommandProperties.Setter()
.withCircuitBreakerErrorThresholdPercentage(int)</code></pre></li>
<li>SleepWindowInMilliseconds:表示断路器开启后多长时间内请求都走降级。
<pre><code>HystrixCommandProperties.Setter()
.withCircuitBreakerSleepWindowInMilliseconds(int)</code></pre>
<p>断路开启,也就是由 close 转换到 open 状态(close -> open)。那么之后在 SleepWindowInMilliseconds 时
间内,所有经过该断路器的请求全部都会被断路,不调用后端服务,直接走 fallback 降级机制。</p></li>
</ul>
<p>而在该参数时间过后,断路器会变为 half-open 半开闭状态,尝试让一条请求经过断路器,看能不能正常调用。
如果调用成功了,那么就自动恢复,断路器转为 close 状态。</p>
<ul>
<li>Enabled:控制是否允许断路器工作,包括跟踪依赖服务调用的健康状况,以及对异常情况过多时是否允许触
发断路。默认值是 true。</li>
<li>ForceOpen:设置为 true 的话,直接强迫打开断路器,相当于是手动断路了,手动降级,默认值是 false。
<pre><code>HystrixCommandProperties.Setter()
.withCircuitBreakerForceOpen(boolean)</code></pre>
<p>ForceClosed:设置为 true,直接强迫关闭断路器,相当于手动停止断路了,手动升级,默认值是 false。</p>
<pre><code>HystrixCommandProperties.Setter()
.withCircuitBreakerForceClosed(boolean)</code></pre>
<h2>9 深入 Hystrix 线程池隔离与接口限流</h2>
<p>Hystrix 通过判断线程池或者信号量是否已满,超出容量的请求,直接 Reject 走降级,从而达到限流的作用。</p></li>
</ul>
<p>限流是限制对后端的服务的访问量,比如说你对 MySQL、Redis、Zookeeper 以及其它各种后端中间件的资源的
访问的限制,其实是为了避免过大的流量直接打死后端的服务。</p>
<h3>9.1 线程池隔离技术的设计</h3>
<p>Hystrix 采用了 Bulkhead Partition 舱壁隔离技术,来将外部依赖进行资源隔离,进而避免任何外部依赖的故
障导致本服务崩溃。</p>
<p>舱壁隔离,是说将船体内部空间区隔划分成若干个隔舱,一旦某几个隔舱发生破损进水,水流不会在其间相互流
动,如此一来船舶在受损时,依然能具有足够的浮力和稳定性,进而减低立即沉船的危险。
<img src="https://s2.ax1x.com/2019/12/17/QIU3uj.jpg" alt="舱壁隔离" /></p>
<p>Hystrix 对每个外部依赖用一个单独的线程池,这样的话,如果对那个外部依赖调用延迟很严重,最多就是耗尽
那个依赖自己的线程池而已,不会影响其他的依赖调用。</p>
<h3>9.2 Hystrix 应用线程池机制的场景</h3>
<ul>
<li>每个服务都会调用几十个后端依赖服务,那些后端依赖服务通常是由很多不同的团队开发的。</li>
<li>每个后端依赖服务都会提供它自己的 client 调用库,比如说用 thrift 的话,就会提供对应的 thrift 依赖。</li>
<li>client 调用库随时会变更。</li>
<li>client 调用库随时可能会增加新的网络请求的逻辑。</li>
<li>client 调用库可能会包含诸如自动重试、数据解析、内存中缓存等逻辑。</li>
<li>client 调用库一般都对调用者来说是个黑盒,包括实现细节、网络访问、默认配置等等。</li>
<li>在真实的生产环境中,经常会出现调用者,突然间惊讶的发现,client 调用库发生了某些变化。</li>
<li>即使 client 调用库没有改变,依赖服务本身可能有会发生逻辑上的变化。</li>
<li>有些依赖的 client 调用库可能还会拉取其他的依赖库,而且可能那些依赖库配置的不正确。</li>
<li>大多数网络请求都是同步调用的。</li>
<li>调用失败和延迟,也有可能会发生在 client 调用库本身的代码中,不一定就是发生在网络请求中。</li>
</ul>
<p>简单来说,就是你必须默认 client 调用库很不靠谱,而且随时可能发生各种变化,所以就要用强制
隔离的方式来确保任何服务的故障不会影响当前服务。</p>
<h3>9.3 线程池机制的优点</h3>
<ul>
<li>任何一个依赖服务都可以被隔离在自己的线程池内,即使自己的线程池资源填满了,也不会影响任何其他的服
务调用。</li>
<li>服务可以随时引入一个新的依赖服务,因为即使这个新的依赖服务有问题,也不会影响其他任何服务的调用。</li>
<li>当一个故障的依赖服务重新变好的时候,可以通过清理掉线程池,瞬间恢复该服务的调用,而如果是 tomcat
线程池被占满,再恢复就很麻烦。</li>
<li>如果一个 client 调用库配置有问题,线程池的健康状况随时会报告,比如成功/失败/拒绝/超时的次数统计,
然后可以近实时热修改依赖服务的调用配置,而不用停机。</li>
<li>基于线程池的异步本质,可以在同步的调用之上,构建一层异步调用层。</li>
</ul>
<p>简单来说,最大的好处,就是资源隔离,确保说任何一个依赖服务故障,不会拖垮当前的这个服务。</p>
<h3>9.4 线程池机制的缺点</h3>
<ul>
<li>线程池机制最大的缺点就是增加了 CPU 的开销。除了 tomcat 本身的调用线程之外,还有 Hystrix 自己管理的
线程池。</li>
<li>每个 command 的执行都依托一个独立的线程,会进行排队,调度,还有上下文切换。</li>
</ul>
<p>我们可以用 Hystrix semaphore 技术来实现对某个依赖服务的并发访问量的限制,而不是通过线程池/队列的大
小来限制流量。</p>
<p>semaphore 技术可以用来限流和削峰,但是不能用来对调用延迟的服务进行 timeout 和隔离。</p>
<h2>10 基于 timeout 机制为服务接口调用超时提供安全保护</h2>
<ul>
<li>TimeoutMilliseconds</li>
</ul>
<p>在 Hystrix 中,我们可以手动设置 timeout 时长,如果一个 command 运行时间超过了设定的时长,那么就被认
为是 timeout,然后 Hystrix command 标识为 timeout,同时执行 fallback 降级逻辑。</p>
<p>TimeoutMilliseconds 默认值是 1000,也就是 1000ms。</p>
<pre><code>HystrixCommandProperties.Setter()
..withExecutionTimeoutInMilliseconds(int)</code></pre>
<ul>
<li>TimeoutEnabled:这个参数用于控制是否要打开 timeout 机制,默认值是 true。</li>
</ul>
<pre><code>HystrixCommandProperties.Setter()
.withExecutionTimeoutEnabled(boolean)</code></pre>