小松的技术博客

六和敬

若今生迷局深陷,射影含沙。便许你来世袖手天下,一幕繁华。 你可愿转身落座,掌间朱砂,共我温酒煮茶。

Agera源码探索

Agera是google最近才开源的一个响应式框架。Android界已经存在一个目前非常流行的RxJava响应式框架,因而Agera这个框架不会那么快的火起来,与Rxjava的push data model不同,它采用push event, pull data model的模式,这意味着event传递不携带任何数据,订阅者在收到notify后自己去拉取数据。这种模式与前端的单向事件流很相似并且在前端已经被证实为比较成功的模式了,但这种模式在Android的应用则还要等待时间的校验。就目前而言,这个框架刚刚开源,如果想学习Android响应式框架,那么解读这个框架的源码应该是上乘选择了。

函数式Function接口

java中的方法并不是一等公民,无法直接用函数传参,但我们可以通过接口定义一系列行为来达到函数式编程的效果,RxJava提供了一些列无于语义的抽象接口,如Action0,Action1...系列和Function0,Function1...系列,Action代表无返回值,Function代表有返回值,接口的数字代表传参个数,而在Agera中,开发者提供的接口比较少,但却是非常语义化的接口:

  • Predicate:决定输入是否是期望值

    public interface Predicate<T>
        bool apply(@NonNull T value)
    }
    
  • Function: 转化一个值为另一个值

    public interface Function<TFrom, TTo> {
        @NonNull
        TTo apply(@NonNull TFrom input);
    }
    
  • Merge:merge输入的两个值,产出一个值

    public interface Merger<TFirst, TSecond, TTo> {
         @NonNull
         TTo merge(@NonNull TFirst first, @NonNull TSecond second);
    } 
    
  • Receiver: 接受一个值,无返回值

    public interface Receiver<T> {
        void accept(@NonNull T value);
    }
    
  • Binder: 接受两个输入,无返回值

    public interface Binder<TFirst, TSecond> {
        void bind(@NonNull TFirst first, @NonNull TSecond second);
    }
    
  • Condition: 无输入,返回bool值,决定是否应用相关条件

    public interface Condition {
        boolean applies();
    }
    

了解了这些接口,我们在业务上通过操作符的思维去处理我们的事件流了

repository = Repositories.repositoryWithInitialValue("default")
            .observe()
            .onUpdatesPerLoop()
            .getFrom(intSupplier)
            .transform(new Function<Int, String>() {
                @NonNull
                @Override
                public String apply(@NonNull String input) {
                    return "Input is" + String.valueOf(input);
                }
            })
            .thenMergeIn(integerSupplier,new Merger<String, Integer, String>() {
                @NonNull
                @Override
                public String merge(@NonNull String s, @NonNull Integer integer) {
                    return s + " plus " + String.valueOf(integer);
                }
            })
            .compile();

Agera观察者模式

Agera运用了观察者模式,提供了两个抽象的接口

public interface Observable {
    void addUpdatable(@NonNull Updatable updatable);
    void removeUpdatable(@NonNull Updatable updatable);
 }

 public interface Updatable {
     void update();
 }

从接口设计上也可以看出,这个Observable是完全不带任何数据的,我们暂且先不考虑数据相关的东西,先看看Observable是如何和Updatable相关联的。

首先看看BaseObservale的实现

public abstract class BaseObservable implements Observable {
  @NonNull
  private final Worker worker;

  protected BaseObservable() {
    checkState(Looper.myLooper() != null, "Can only be created on a Looper thread");
    worker = new Worker(this);
  }

  @Override
  public final void addUpdatable(@NonNull final Updatable updatable) {
    checkState(Looper.myLooper() != null, "Can only be added on a Looper thread");
    worker.addUpdatable(updatable);
  }

  @Override
  public final void removeUpdatable(@NonNull final Updatable updatable) {
    checkState(Looper.myLooper() != null, "Can only be removed on a Looper thread");
    worker.removeUpdatable(updatable);
  }

  protected final void dispatchUpdate() {
    worker.dispatchUpdate();
  }

  // observable被激活
  protected void observableActivated() {}

  // observable停用
  protected void observableDeactivated() {}
}

其实现比较简单,具体逻辑都交给Worker去做了,并且提供了observableActivatedobservableDeactivated两个沟子。WorkerBaseObservable的一个内部类:

static final class Worker {
  @NonNull
  private static final Object[] NO_UPDATABLES_OR_HANDLERS = new Object[0];
  @NonNull
  private final BaseObservable baseObservable;
  @NonNull
  private final WorkerHandler handler;
  private int size;

  Worker(@NonNull final BaseObservable baseObservable) {
    this.baseObservable = baseObservable;
    this.handler = workerHandler();
    this.size = 0;
  }

  synchronized void addUpdatable(@NonNull final Updatable updatable) {
    add(updatable, workerHandler());
    if (size == 1) {
      //添加第一个观察者时
      handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget();
    }
  }

  synchronized void removeUpdatable(@NonNull final Updatable updatable) {
    remove(updatable);
    if (size == 0) {
      //所有观察者都被remove时
      handler.obtainMessage(MSG_LAST_REMOVED, this).sendToTarget();
    }
  }

  void dispatchUpdate() {
    handler.obtainMessage(MSG_UPDATE, this).sendToTarget();
  }

  synchronized void sendUpdate() {
    for (int index = 0; index < updatablesAndHandlers.length; index = index + 2) {
      final Updatable updatable = (Updatable) updatablesAndHandlers[index];
      final WorkerHandler handler =
        (WorkerHandler) updatablesAndHandlers[index + 1];
      if (updatable != null) {
        if (handler.getLooper() == Looper.myLooper()) {
          //如果是在当前线程,直接执行
          updatable.update();
        } else {
          //非当前线程,则通过Handler发送给目标线程
          handler.obtainMessage(WorkerHandler.MSG_CALL_UPDATABLE, updatable).sendToTarget();
        }
      }
    }
  }

  void callFirstUpdatableAdded() {
    baseObservable.observableActivated();
  }

  void callLastUpdatableRemoved() {
    baseObservable.observableDeactivated();
  }
}

Worker可以看出,Agera的事件通知是通过Handler来实现的,Observable会在addObservableremoveObservable时把自己的状态信息Handler,并且提供了dispatchUpdate方法通过Handler来通知Updatable更新自己,这里需要关注两个问题:Handler是如何处理这些消息的?Handler会在什么线程处理这些消息?

要知道Handler如何处理消息,我们看Handler的具体实现即可:

static final class WorkerHandler extends Handler {
   static final int MSG_FIRST_ADDED = 0; //添加第一个观察者
   static final int MSG_LAST_REMOVED = 1; //最后一个观察者被移除时
   static final int MSG_UPDATE = 2; // dispatch update
   static final int MSG_CALL_UPDATABLE = 3; //通知UpdateAble更新
   static final int MSG_CALL_MAYBE_START_FLOW = 4;
   static final int MSG_CALL_ACKNOWLEDGE_CANCEL = 5;
   static final int MSG_CALL_LOW_PASS_UPDATE = 6;

   @Override
   public void handleMessage(final Message message) {
     switch (message.what) {
       case MSG_UPDATE:
         ((Worker) message.obj).sendUpdate();
         break;
       case MSG_FIRST_ADDED:
         ((Worker) message.obj).callFirstUpdatableAdded();
         break;
       case MSG_LAST_REMOVED:
         ((Worker) message.obj).callLastUpdatableRemoved();
         break;
       case MSG_CALL_UPDATABLE:
         ((Updatable) message.obj).update();
         break;
       case MSG_CALL_MAYBE_START_FLOW:
         ((CompiledRepository) message.obj).maybeStartFlow();
         break;
       case MSG_CALL_ACKNOWLEDGE_CANCEL:
         ((CompiledRepository) message.obj).acknowledgeCancel();
         break;
       case MSG_CALL_LOW_PASS_UPDATE:
         ((LowPassFilterObservable) message.obj).lowPassUpdate();
         break;
       default:
     }
   }
}

WorkerHandler定义了一些列消息类型,根据消息类型进行不同处理,这里只要跟踪处理就可以知道Handler是如何处理各种消息的了。

另一个问题是Handler会在哪一个线程处理这些消息,这是让我们聚焦在Worker的构造方法:

Worker(@NonNull final BaseObservable baseObservable) {
  ...
  this.handler = workerHandler();
  ...
}

以及addUpdatable方法:

synchronized void addUpdatable(@NonNull final Updatable updatable) {
  add(updatable, workerHandler());
  ...
}

Worker初始化时会被绑定一个Handler,addUpdatable时也会为添加进来的updatable绑定Handler,那我们来看看workerHandler()的实现:

private static final ThreadLocal<WeakReference<WorkerHandler>> handlers = new ThreadLocal<>();

@NonNull
static WorkerHandler workerHandler() {
  final WeakReference<WorkerHandler> handlerReference = handlers.get();
  WorkerHandler handler = handlerReference != null ? handlerReference.get() : null;
  if (handler == null) {
    handler = new WorkerHandler();
    handlers.set(new WeakReference<>(handler));
  }
  return handler;
}

这里运用到了ThreadLocal,TheadLocal会在每个线程中存在单独的副本,因而这里实际是取和当前Looper线程的Handler,初始化之后就存起来,这样在每个线程就可以用一个Handler去处理各种消息了,避免创建过多的Handler,这里很好的利用了ThreadLocal的特性完成相关的功能。

那我们也就可以得到这样的结论:

  • MSG_FIRST_ADDEDMSG_LAST_REMOVEDMSG_UPDATE是在Observable创建的线程处理的,这也就是说Observable创建的线程必须是Looper线程
  • MSG_CALL_UPDATABLE即Updatable更新的线程是Updatable被添加到Observable时的线程

在理解了BaseObservable后,我们其实就可以继承BaseObservable来完成一些事件派发的功能

public class MainActivity extends AppCompatActivity implements Updatable{

    private Button observableBtn;
    private TextView show;
    private ClickObservable clickObservable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        observableBtn = (Button)findViewById(R.id.observable_btn);
        show = (TextView)findViewById(R.id.show);

        clickObservable = new ClickObservable();
        clickObservable.addUpdatable(this);
        observableBtn.setOnClickListener(clickObservable);
    }

    @Override
    public void update() {
        show.setText("update!!");
    }    

    public static class ClickObservable extends BaseObservable implements View.OnClickListener{

        @Override
        public void onClick(View v) {
            dispatchUpdate();
        }
    }
}

但目前为止,我们的Observable和Updatable都没有和数据挂钩。那Agera是如何处理数据的呢?Agera提供了Repository来帮助我们实现更多的功能

Repository

Repository是一个接口:

public interface Repository<T> extends Observable, Supplier<T> {}

我们可以看到它集成自ObservableSupplier,那么Supplier又是什么?

public interface Supplier<T> {
    @NonNull
    T get();
}

从命名和get方法可以看出这是用于提供数据的,因而这是用于数据的接口。那我们该如何使用这个接口呢?我们可以从官方给出的实例源码来追踪一下

public class AgeraActivity extends Activity implements Receiver<Bitmap>, Updatable {
    private static final ExecutorService NETWORK_EXECUTOR =  newSingleThreadExecutor();
    private static final ExecutorService DECODE_EXECUTOR = newSingleThreadExecutor();
    private Repository<Result<Bitmap>> background;
    //...
    @Override
    protected void onCreate(final Bundle savedInstanceState) {
        //...
        background = repositoryWithInitialValue(Result.<Bitmap>absent())
            .observe() 
            .onUpdatesPerLoop() 
            .getFrom(new Supplier<HttpRequest>() {
                @NonNull
                @Override
                public HttpRequest get() {
                    DisplayMetrics displayMetrics = getResources().getDisplayMetrics();
                    int size = Math.max(displayMetrics.heightPixels, displayMetrics.widthPixels);
                    return httpGetRequest(BACKGROUND_BASE_URL + size).compile();
                }    
            })
            .goTo(NETWORK_EXECUTOR) 
            .attemptTransform(httpFunction())
            .orSkip()
            .goTo(DECODE_EXECUTOR)
            .thenTransform(new Function<HttpResponse, Result<Bitmap>>() {
                @NonNull
                @Override
                public Result<Bitmap> apply(@NonNull HttpResponse response) {
                    byte[] body = response.getBody();
                    return absentIfNull(decodeByteArray(body, 0, body.length));
                }
            })
            .onDeactivation(SEND_INTERRUPT)
            .compile();
    }

    @Override
    protected void onResume() {
        super.onResume();
        background.addUpdatable(this);
    }

    @Override
    protected void onPause() {
        super.onPause();
        background.removeUpdatable(this);
    }

    @Override
    public void update() {
        background.get().ifSucceededSendTo(this);
    }

    @Override
    public void accept(@NonNull Bitmap background) {
        backgroundView.setImageBitmap(background);
    }
}

这个例子是一个网络请求的一个完整的例子,我们可以跟随这个例子来探索Agera的实现。

在这个例子中Updatable的实现是Activity,Repository的实现是activity,而Repository继承自Observable,我们可以看到onResume时background.removeUpdatable(this)完成了订阅关系,而Updatable的方法update的实现是background.get().ifSucceededSendTo(this),这里我们可以看到,在Updatable中我们要主动的去get数据。接下来的重点就是Repository的实现了。

background = Repositories
            .repositoryWithInitialValue(Result.<Bitmap>absent())
            .observe() 
            .onUpdatesPerLoop() 
            .getFrom(...)
            .goTo(...) 
            .attemptTransform(...)
            .orSkip()
            .goTo(...) 
            .thenTransform(...)
            .onDeactivation(...) 
            .compile(); 

我们聚焦到最后的compile(),可以看出这引入了编译的机制,这其实也就是Builder模式的运用,这里首先调用了Repositories的静态方法repositoryWithInitialValue, 在Java规范中,很多以s结尾的类都是对应没有s的类的工具类,即RepositoriesRepository的工具类,ObservablesObservable的工具类等等。

public static <T> REventSource<T, T> repositoryWithInitialValue(@NonNull final T initialValue) {
    return RepositoryCompiler.repositoryWithInitialValue(initialValue);
}

这里直接调用了RepositoryCompiler的静态方法repositoryWithInitialValue.

private static final ThreadLocal<RepositoryCompiler> compilers = new ThreadLocal<>();

@NonNull
static <TVal> RepositoryCompilerStates.REventSource<TVal, TVal> repositoryWithInitialValue(@NonNull final TVal initialValue) {
    checkNotNull(Looper.myLooper());
    RepositoryCompiler compiler = compilers.get();
    if (compiler == null) {
       compiler = new RepositoryCompiler();
    } else {
       // Remove compiler from the ThreadLocal to prevent reuse in the middle of a compilation.
       // recycle(), called by compile(), will return the compiler here. ThreadLocal.set(null) keeps
       // the entry (with a null value) whereas remove() removes the entry; because we expect the
       // return of the compiler, don't use the heavier remove().
       compilers.set(null);
    }
    return compiler.start(initialValue);
}

private static void recycle(@NonNull final RepositoryCompiler compiler) {
    compilers.set(compiler);
}

这里有出现了我们熟悉的ThreadLocal,存储的是RepositoryCompiler的实例,这里需要注意的是,当从ThreadLocal取出实例后,源码调用了compilers.set(null),其注释也说得很清楚,这是为了防止在编译期间再次调用这个类去编译别的Repository,当编译完成后,调用recycle将这个实例放入ThreadLocal用于后期复用。

这个方法的返回值是RepositoryCompilerStates.REventSource,当我们去看RepositoryCompilerStates这个类时,会发现还有RFrequency、RFlow、RTermination、RConfig。这些其实Repository的编译状态,也可以说是单向事件流。

  • REventSource: 事件源
  • RFrequency: 消息通知间隔
  • RFlow: 继承自RsyncFlow,事件中间流
  • RTermination: 也是事件中间流,但是可能会失败,它可以用于错误处理
  • RConifg:配置一些通知Updatable的条件,如notifyIf,onDeactivation(repo停用时),onConcurrentUpdate(有新的事件流时)

我们再回头看RepositoryCompiler

final class RepositoryCompiler implements
    RepositoryCompilerStates.RFrequency,
    RepositoryCompilerStates.RFlow,
    RepositoryCompilerStates.RTermination,
    RepositoryCompilerStates.RConfig {
    //...
}

它实现了这些状态,也就是它这个单向事件流的管理者。此外Agera有一些规则:那些以then开头的方法代表要结束这个事件流如thenGetFrom, thenMergeIn,那些包含attempt的方法代表执行的操作可能会失败,使用它们后状态会切换到RTermination,如attemptGetFrom,thenAttemptMergeIn

当然Agera也为我们提供了线程切换的功能goTo,以及直接通知Updatable,等到调用get()方法时才去执行事件流的方法goLazy

我们简单看看getFrom的具体实现,其它的实现与其类似:

private final ArrayList<Object> directives = new ArrayList<>();

public RepositoryCompiler getFrom(@NonNull final Supplier supplier) {
    checkExpect(FLOW);
    addGetFrom(supplier, directives);
    return this;
}

private void checkExpect(@Expect final int accept) {
    checkState(expect == accept, "Unexpected compiler state");
}

addGetFrom的实现在CompileRepository中:

static void addGetFrom(@NonNull final Supplier supplier,
    @NonNull final List<Object> directives) {
    directives.add(GET_FROM);
    directives.add(supplier);
}

所以RepositoryCompiler的各种方法都是把系列行为以指令的形式保存在一个ArrayList中,我们可以猜想,事件就可以从源头按照给定的指令一步步执行,然后通知Updatable

那我们看看最后的compile()方法:

public Repository compile() {
    Repository repository = compileRepositoryAndReset();
    recycle(this);
    return repository;
}

private Repository compileRepositoryAndReset() {
    checkExpect(CONFIG);
    Repository repository = compiledRepository(initialValue, eventSources, frequency, directives, notifyChecker, concurrentUpdateConfig, deactivationConfig);
    expect = NOTHING;
    initialValue = null;
    eventSources.clear();
    frequency = 0;
    directives.clear();
    goLazyUsed = false;
    notifyChecker = objectsUnequal();
    deactivationConfig = RepositoryConfig.CONTINUE_FLOW;
    concurrentUpdateConfig = RepositoryConfig.CONTINUE_FLOW;
    return repository;
}

static Repository compiledRepository(
    @NonNull final Object initialValue,
    @NonNull final List<Observable> eventSources,
    final int frequency,
    @NonNull final List<Object> directives,
    @NonNull final Merger<Object, Object, Boolean> notifyChecker,
    @RepositoryConfig final int concurrentUpdateConfig,
    @RepositoryConfig final int deactivationConfig) {
    Observable eventSource = perMillisecondObservable(frequency, compositeObservable(eventSources.toArray(new Observable[eventSources.size()])));
    Object[] directiveArray = directives.toArray();
    return new CompiledRepository(initialValue, eventSource, directiveArray, notifyChecker, deactivationConfig, concurrentUpdateConfig);
}

从上我们可以知道,最终返回的是CompiledRepository。除此我们需要注意的是compiledRepository会将RepositoryCompiler保存的eventSources合成为一个并限制其通知频率:

Observable eventSource = perMillisecondObservable(frequency, compositeObservable(eventSources.toArray(new Observable[eventSources.size()])));

先看看CompositeObservable的实现:

private static final class CompositeObservable extends BaseObservable implements Updatable {
    @NonNull
    private final Observable[] observables;

    CompositeObservable(@NonNull final Observable... observables) {
      this.observables = observables;
    }

    @Override
    protected void observableActivated() {
      for (final Observable observable : observables) {
        observable.addUpdatable(this);
      }
    }

    @Override
    protected void observableDeactivated() {
      for (final Observable observable : observables) {
        observable.removeUpdatable(this);
      }
    }

    @Override
    public void update() {
      dispatchUpdate();
    }
}

CompositeObservable继承自BaseObservable并实现了Updatable,通过之前对BaseObservable的分析,我们就很清晰的了解到它的作用就是,观察传入的可观察对象,并将其派发出去。但这个类是私有的,我们只能通过静态方法compositeObservable,其目的是做一些优化和排重的工作,这里就不展示源码了。

接下来就该分析如何限制消息通知间隔了,跟踪perMillisecondObservable方法:

public static Observable perMillisecondObservable(
    final int shortestUpdateWindowMillis, @NonNull final Observable observable) {
    return new LowPassFilterObservable(shortestUpdateWindowMillis, observable);
}

static final class LowPassFilterObservable extends BaseObservable implements Updatable {
    @NonNull
    private final Observable observable;
    @NonNull
    private final WorkerHandler workerHandler;
    private final int shortestUpdateWindowMillis;

    private long lastUpdateTimestamp; //记录事件戳

    LowPassFilterObservable(final int shortestUpdateWindowMillis, @NonNull final Observable observable) {
      this.shortestUpdateWindowMillis = shortestUpdateWindowMillis;
      this.observable = checkNotNull(observable);
      this.workerHandler = workerHandler();
    }

    @Override
    protected void observableActivated() {
      observable.addUpdatable(this);
    }

    @Override
    protected void observableDeactivated() {
      observable.removeUpdatable(this);
      //消除未通知的消息
      workerHandler.removeMessages(WorkerHandler.MSG_CALL_LOW_PASS_UPDATE, this);
    }

    @Override
    public void update() {
      workerHandler.sendMessageDelayed(
          workerHandler.obtainMessage(WorkerHandler.MSG_CALL_LOW_PASS_UPDATE, this), (long) 0);
    }

    void lowPassUpdate() {
      workerHandler.removeMessages(WorkerHandler.MSG_CALL_LOW_PASS_UPDATE, this);
      final long elapsedRealtimeMillis = SystemClock.elapsedRealtime();
      final long timeFromLastUpdate = elapsedRealtimeMillis - lastUpdateTimestamp;
      if (timeFromLastUpdate >= shortestUpdateWindowMillis) {
        //大于给定间隔,更改事件戳的记录,派发消息
        lastUpdateTimestamp = elapsedRealtimeMillis;
        dispatchUpdate();
      } else {
        //利用Handler延迟处理消息
        workerHandler.sendMessageDelayed(
            workerHandler.obtainMessage(WorkerHandler.MSG_CALL_LOW_PASS_UPDATE, this),
            shortestUpdateWindowMillis - timeFromLastUpdate);
      }
    }
}

其实现就是记录时间戳,然后利用Handler延迟发送消息来实现消息通知间隔的。

通过以上的流程,我们就得到了一个CompiledRepository了。接下来就是对它的分析了,先看看它的声明:

final class CompiledRepository extends BaseObservable implements Repository, Updatable, Runnable {
//...
}

CompiledRepository继承自BaseObservable,同时实现了UpdatableRunnable

接下来我们看看当我们添加Updatable激活CompiledRepository会发生什么:

protected void observableActivated() {
    eventSource.addUpdatable(this);
    maybeStartFlow();
}

它会把自己注册到eventSource上去,当eventSource发出通知时,会调用自己的update方法,当然如果我们在编译时调用的是observe()而没有传入任何参数,实际上最终的eventSource是一个空的CompositeObservable,update方法也就不会出发了。

void maybeStartFlow() {
    synchronized (this) {
      if (runState == IDLE || runState == PAUSED_AT_GO_LAZY) {
        runState = RUNNING;
        lastDirectiveIndex = -1; // this could be pointing at the goLazy directive
        restartNeeded = false;
      } else {
        return; // flow already running, do not continue.
      }
    }
    intermediateValue = currentValue;
    runFlowFrom(0, false);
  }

这个方法会更改运行状态,然后调用到了runFlowFrom:

private void runFlowFrom(final int index, final boolean asynchronously) {
    final Object[] directives = this.directives;
    final int length = directives.length;
    int i = index;
    while (0 <= i && i < length) {
      int directiveType = (Integer) directives[i];
      if (asynchronously || directiveType == GO_TO || directiveType == GO_LAZY) {
        synchronized (this) {
          //检查flows是否被cancel掉
          if (checkCancellationLocked()) {
            break;
          }
          if (directiveType == GO_TO) {
            // 如果为GO_TO,切换状态
            setPausedAtGoToLocked(i);
          } else if (directiveType == GO_LAZY) {
            // 如果为GO_LAZY,切换状态,然后会执行dispatchUpdate,这里就直接返回了,等待get()时再继续流程
            setLazyAndEndFlowLocked(i);
            return;
          }
        }
      }
      switch (directiveType) {
        case GET_FROM:
          i = runGetFrom(directives, i);
          break;
        case MERGE_IN:
          i = runMergeIn(directives, i);
          break;
        case TRANSFORM:
          i = runTransform(directives, i);
          break;
        case CHECK:
          i = runCheck(directives, i);
          break;
        case GO_TO:
          i = runGoTo(directives, i);
          break;
        case SEND_TO:
          i = runSendTo(directives, i);
          break;
        case BIND:
          i = runBindWith(directives, i);
          break;
        case FILTER_SUCCESS:
          i = runFilterSuccess(directives, i);
          break;
        case END:
          i = runEnd(directives, i);
          break;
      }
    }
}

这个方法会根据已有的directives,一步一步完成事件的传递。每个directive会执行不同的方法,以runMergeIn为例:

private int runMergeIn(@NonNull final Object[] directives, final int index) {
    Supplier supplier = (Supplier) directives[index + 1];
    Merger merger = (Merger) directives[index + 2];
    intermediateValue = checkNotNull(merger.merge(intermediateValue, supplier.get()));
    return index + 3;
}

我们可以看到这些指令都是更改或者更改的是的CompiledRepository的成员变量intermediateValue。我们需要重点关注的是runGoTo,看看它是如何实现线程切换的。

private int runGoTo(@NonNull final Object[] directives, final int index) {
    Executor executor = (Executor) directives[index + 1];
    executor.execute(this);
    return -1;
}

这个方法调用的是将自己传入到directives的Executor的execute方法中,这也是为什么CompiledRepository要实现Runnable接口。所以我们来看一下run方法:

public void run() {
    //...
    runFlowFrom(continueFromGoTo(directives, index), true);
    //...
}

这里除去状态变更,也只是简单的将执行逻辑交给了Executor。那我们看看最终结束时会做些什么:

private int runEnd(@NonNull final Object[] directives, final int index) {
    boolean skip = (Boolean) directives[index + 1];
    if (skip) {
      skipAndEndFlow();
    } else {
      setNewValueAndEndFlow(intermediateValue);
    }
    return -1;
}

skip一般是attemt操作后可以执行orSkip:当发生错误时,结束事件流但不触发Updatable更新:

private synchronized void skipAndEndFlow() {
    //还原状态
    runState = IDLE;
    intermediateValue = initialValue; 
    checkRestartLocked();
}

如果不忽略则将intermediateValue保存到currentValue并dispatch更新:

private synchronized void setNewValueAndEndFlow(@NonNull final Object newValue) {
    boolean wasRunningLazily = runState == RUNNING_LAZILY;
    runState = IDLE;
    intermediateValue = initialValue;
    if (wasRunningLazily) {
      currentValue = newValue;
    } else {
      setNewValueLocked(newValue);
    }
    checkRestartLocked();
}

private void setNewValueLocked(@NonNull final Object newValue) {
    boolean shouldNotify = notifyChecker.merge(currentValue, newValue);
    currentValue = newValue;
    if (shouldNotify) {
      dispatchUpdate();
    }
}

到这里基本上CompiledRepository的事件传递以及涉及到的线程切换都讲明白了,那么Updatable调用update时get到的数据是什么呢?其实上面的分析我们基本可以确定是currentValue,我们来看看源码:

public synchronized Object get() {
    //如果是GO_LAZY,那么会在get之后才开始执行GO_LAZY后面的事件流。
    if (runState == PAUSED_AT_GO_LAZY) {
      int index = lastDirectiveIndex;
      runState = RUNNING_LAZILY;
      runFlowFrom(continueFromGoLazy(directives, index), false);
    }
    return currentValue;
}

至此,我们基本上把Agera的push event, pull data model的流程说完了,这样在使用Agera时也会清晰很多。目前来看Agera很好的利用了Android平台的相关技术来实现自己的响应式风格,短小而精悍,个人很看好其发展。当然Agera还为我们带来了其它的特性,比如返回结果的封装Result,一个生产者消费者队列模型Reservoir,还有其对网络、数据库的扩展。这些都值得我们继续探索与学习。

←支付宝← →微信 →
comments powered by Disqus