@ThreadSafe public abstract class HystrixCommand<R> implements HystrixExecutable<R> {
// 1. toObservable()方法中为核心实现 // 未做订阅,返回干净的 Observable 。这就是为什么上文说“未调用” 。 public Observable<R> toObservable() { if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) { return toObservable(Schedulers.computation()); } else { // 如果隔离策略是信号量,则所有请求阻塞同步执行 // 在当前调用该方法的线程中执行 // semaphore isolation is all blocking, no new threads involved // so we'll use the calling thread return toObservable(Schedulers.immediate()); } } // 2. observe()方法 public Observable<R> observe() { // us a ReplaySubject to buffer the eagerly subscribed-to Observable ReplaySubject<R> subject = ReplaySubject.create(); // 利用toObservable()方法返回的Observable让subject订阅,并返回其 // ReplaySubject 会发射所有来自原始 Observable 的数据给观察者,无论它们是何时订阅的。 // eagerly kick off subscription toObservable().subscribe(subject); // return the subject that can be subscribed to later while the execution has already started return subject; } // 3. queue()方法 public Future<R> queue() { // 利用toObservable()的toBlockingObservable()阻塞到Future中返回 final ObservableCommand<R> o = toObservable(Schedulers.immediate(), false); final Future<R> f = o.toBlockingObservable().toFuture(); // ... return f; } // 4. execute()方法 public R execute() { try { // 调用queue()方法返回的Future阻塞的get()方法获取结果 return queue().get(); } catch (Exception e) { throw decomposeException(e); } } }