Hystrix fallback机制浅析

Hystrix作为后端弹性架构的一把利器,用处可以说非常的广泛,最近在写代码的时候接触到了这个框架,所以趁着业余时间粗粗的看了下其中的源码,发现有很多地方值得学习,于是准备写几篇文章记录一下。

Hystrix的功能比较多,这一篇文章先探讨其中一个比较简单的功能——fallback的具体实现。

怎么用

下面我们先来看一下fallback的一个具体使用场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@HystrixCommand(
fallbackMethod = "fallbackFunc",
commandProperties = {
//超过此时间,HystrixCommand被标记为TIMEOUT,并执行回退逻辑,默认1000ms
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "500"),
//设置在回路被打开,拒绝请求到再次尝试请求并决定回路是否继续打开的时间,默认5000ms
@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "60000"),
//设置打开回路并启动回退逻辑的错误比率,默认值50%
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "50")
},
threadPoolProperties = {
@HystrixProperty(name = "coreSize", value = "200")
})
@Override
public Map<String, Boolean> someFunc() {

.......
}

可以看到只要一个方法被注解上了HystrixCommand注解,那么这个方法就会被Hystrix监控,注解中有一个参数叫fallbackMethod,很显然,就是当被注解的方法发生异常之后,会调用fallbackMethod,在上文中也就是fallbackFunc。值得一提的是HystrixCommand注解有很多的参数,这也是Hystrix功能核心所在,具体的参数可以参考这篇文章

源码分析

既然是注解,又是在SpringMVC中,自然而然的就联想到了AOP,翻一翻源码,果不其然有一个HystrixCommandAspect类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")

public void hystrixCommandAnnotationPointcut() {
}

@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}

@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

Object result;
try {
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause() != null ? e.getCause() : e;
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}

Pointcut指向2个注解,其中一个就是我们用到的HystrixCommand。

具体的编织方法,我们来看下面这一段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

Object result;
try {
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause() != null ? e.getCause() : e;
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
  1. 用于joinPoint生成一个metaHolder,这个metaHolder中存储了和被注解方法相关的信息。
  1. 根据metaHolder生成一个invokable,而这个invokable就是我们的command。
  2. 根据注解的类型获取一个executionType。
  3. 判断该方法的返回类型是否是Observable,根绝返回类型进行执行。

很有意思的第四点,竟然有RxJava的代码在其中,其实看过源码你就会知道,Hystrix内部已经深度集成了RxJava了,这对做Android的我来说还是有点惊喜的,我之前的博客中也有几篇关于RxJava的文章哦~

我们看返回值不是Observable的,也就是下面这一行执行代码:

1
result = CommandExecutor.execute(invokable, executionType, metaHolder);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
Validate.notNull(invokable);
Validate.notNull(metaHolder);

switch (executionType) {
case SYNCHRONOUS: {
return castToExecutable(invokable, executionType).execute();
}
case ASYNCHRONOUS: {
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
case OBSERVABLE: {
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}

根据executionType去执行,我们看其中的同步执行:

1
2
3
4
5
6
private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
if (invokable instanceof HystrixExecutable) {
return (HystrixExecutable) invokable;
}
throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
}

其实很简单啦,就是调用command的execute方法。

command是Hystrix的核心理念,其中的基类是AbstractCommand,我们来看其子类HystrixCommand的execute方法:

1
2
3
4
5
6
7
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}

其中调用了queue方法,而queue方法中最核心的一行代码是:

1
final Future<R> delegate = toObservable().toBlocking().toFuture();

又见Rxjava的影子,可见其真的是深度集成啊~

toObservable的代码灰常的长,但是如果你对RxJava有所了解的话,其实逻辑是非常清晰的,我们提取其中关键的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
......
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);

Observable<R> afterCache;

// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}

return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});

通过defer去做延时绑定,并且判断缓存的逻辑,我们这篇文章不关注缓存,而已afterCache就是hystrixObservable,最后通过doOnTerminate,doOnUnsubscribe和doOnComplete去做收尾的一些工作。

可以看到Hystrix内部是如何结合Rxjava去做异步操作的,逻辑很清晰,这里不得不吹一下RxJava了,真是好用啊!!

下面我们来看applyHystrixSemantics这个observable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
rivate Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_cmd);

/* determine if we're allowed to execute */
if (circuitBreaker.attemptExecution()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};

final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};

if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}

其中又是RxJava的代码,最关键的代码是:

1
2
3
4
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);

这里我们就不一一分析的,其实光看名字就知道了,executeCommandAndObserve做具体的方法执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

.....

final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/

if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}

return handleFailureViaFallback(e);
}
}
};

final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};

Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}

return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}

execution的链式调用中有onErrorResumeNext,并传入了handlerFallback,而handlerFallback中处理了具体的一些错误,入超时等。说道超时,我们可以看一下和它相关的代码,execution在生成的时候,有一个HystrixObservableTimeoutOperator操作符:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
final CompositeSubscription s = new CompositeSubscription();
// if the child unsubscribes we unsubscribe our parent as well
child.add(s);

//capture the HystrixRequestContext upfront so that we can use it in the timeout thread later
final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();

TimerListener listener = new TimerListener() {

@Override
public void tick() {
// if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
// otherwise it means we lost a race and the run() execution completed or did not start
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// report timeout failure
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);

// shut down the original request
s.unsubscribe();

final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

@Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});


timeoutRunnable.run();
//if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
}
}

@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};

它的call方法中去做了计时,如果超时,就抛出HystrixTimeoutException这个exception。

我们来看和HystrixTimeoutException相关的handleTimeoutViaFallback。

1
2
3
private Observable<R> handleTimeoutViaFallback() {
return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
}

调用了getFallbackOrThrowException方法,而其中则会调用getFallbackAction方法。

1
2
3
protected CommandAction getFallbackAction() {
return commandActions.getFallbackAction();
}

commandActions是怎么传入的呢?回到Aspect类中:

1
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);

command是通过工厂类生成的。

1
2
3
4
5
6
7
8
9
10
11
public HystrixInvokable create(MetaHolder metaHolder) {
HystrixInvokable executable;
if (metaHolder.isCollapserAnnotationPresent()) {
executable = new CommandCollapser(metaHolder);
} else if (metaHolder.isObservable()) {
executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
} else {
executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
}
return executable;
}

看最后一个else里的代码是去new了一个command。

1
2
3
4
5
6
7
8
9
protected AbstractHystrixCommand(HystrixCommandBuilder builder) {
super(builder.getSetterBuilder().build());
this.commandActions = builder.getCommandActions();
this.collapsedRequests = builder.getCollapsedRequests();
this.cacheResultInvocationContext = builder.getCacheResultInvocationContext();
this.cacheRemoveInvocationContext = builder.getCacheRemoveInvocationContext();
this.ignoreExceptions = builder.getIgnoreExceptions();
this.executionType = builder.getExecutionType();
}

在构造函数中传入了commandActions。那HystrixCommandBuilder又是怎么生成呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
public <ResponseType> HystrixCommandBuilder create(MetaHolder metaHolder, Collection<HystrixCollapser.CollapsedRequest<ResponseType, Object>> collapsedRequests) {
validateMetaHolder(metaHolder);

return HystrixCommandBuilder.builder()
.setterBuilder(createGenericSetterBuilder(metaHolder))
.commandActions(createCommandActions(metaHolder))
.collapsedRequests(collapsedRequests)
.cacheResultInvocationContext(createCacheResultInvocationContext(metaHolder))
.cacheRemoveInvocationContext(createCacheRemoveInvocationContext(metaHolder))
.ignoreExceptions(metaHolder.getCommandIgnoreExceptions())
.executionType(metaHolder.getExecutionType())
.build();
}

builder中的commandActions是通过metaHolder生成的:

1
2
3
4
5
6
private CommandActions createCommandActions(MetaHolder metaHolder) {
CommandAction commandAction = createCommandAction(metaHolder);
CommandAction fallbackAction = createFallbackAction(metaHolder);
return CommandActions.builder().commandAction(commandAction)
.fallbackAction(fallbackAction).build();
}

其中createFallbackAction会通过metaHolder找到被注解方法中有没有注解上fallbackMethod(第一章节中的fallbackFunc),如果有,则传入其中。

总结

至此,我们已经分析完了Hystrix中的fallback机制,总得来说就是通过RxJava去做异步操作,并获取注解中的fallbackMethod,在捕获异常之后调用具体的方法。