executeCommandAndObserve方法处理onerror异常。
return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext);
handleFallback方法处理执行过程中的各种异常
final Func1> handleFallback = new Func1 >() { @Override public Observable call(Throwable t) { Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } };
handleThreadPoolRejectionViaFallback、handleTimeoutViaFallback、handleBadRequestByEmittingError、handleFailureViaFallback最终都会调用getFallbackOrThrowException来处理各种异常。
getFallbackOrThrowException方法执行fallback方法并返回结果,如果执行过程中异常,返回异常信息。
private ObservablegetFallbackOrThrowException(final AbstractCommand _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) { ... Observable fallbackExecutionChain; // acquire a permit if (fallbackSemaphore.tryAcquire()) { try { if (isFallbackUserDefined()) { executionHook.onFallbackStart(this); fallbackExecutionChain = getFallbackObservable(); } else { //same logic as above without the hook invocation fallbackExecutionChain = getFallbackObservable(); } } catch (Throwable ex) { //If hook or user-fallback throws, then use that as the result of the fallback lookup fallbackExecutionChain = Observable.error(ex); } return fallbackExecutionChain .doOnEach(setRequestContext) .lift(new FallbackHookApplication(_cmd)) .doOnNext(markFallbackEmit) .doOnCompleted(markFallbackCompleted) .onErrorResumeNext(handleFallbackError) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } else { return handleFallbackRejectionByEmittingError(); } } else { return handleFallbackDisabledByEmittingError(originalException, failureType, message); } } }
handleFallbackError方法,返回异常
final Func1> handleFallbackError = new Func1 >() { @Override public Observable call(Throwable t) { Exception e = originalException; Exception fe = getExceptionFromThrowable(t); if (fe instanceof UnsupportedOperationException) { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING); /* executionHook for all errors */ e = wrapWithOnErrorHook(failureType, e); return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe)); } else { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe); eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE); /* executionHook for all errors */ e = wrapWithOnErrorHook(failureType, e); return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe)); } } };