将 Single 应用于 ObservableSource 而不是过度阅读

总的来说,我对 RX 很陌生,尤其是 rxjava,请原谅我的错误。


此操作依赖于两个异步操作。


第一个使用过滤器函数尝试从异步 Observable 返回的列表中获取单个实体。


第二个是与设备通信并生成状态更新的 Observable 的异步操作。


我想获取从过滤器函数创建的 Single,将其应用于pairReader(...),并订阅其 Observable 以获取更新。我可以让它如图所示工作,但前提是我包含注释take(1),否则我会得到一个异常,因为链试图从 Single 中提取另一个值。


  Observable<DeviceCredential> getCredentials() {

    return deviceCredentialService()

            .getCredentials()

            .flatMapIterable(event -> event.getData());

  }


  Single<Organization> getOrgFromCreds(String orgid) {

    return getCredentials()

      // A device is logically constrained to only have a single cred per org

      .map(DeviceCredential::getOrganization)

      .filter(org -> org.getId().equals(orgid))

      .take(1)  // Without this I get an exception

      .singleOrError();

  }


  Function<Organization, Observable<Reader.EnrollmentState>> pairReader(String name) {

    return org -> readerService().pair(name, org);

  }


getOrgFromCreds(orgid)

  .flatMapObservable(pairReader(readerid))

  .subscribe(state -> {

     switch(state) {

       case BEGUN:

         LOG.d(TAG, "Pairing begun");

         break;

       case PAIRED:

         LOG.d(TAG, "Pairing success");

         callback.success();

         break;

       case NOTIFIED_SERVER:

         LOG.d(TAG, "Pairing server notified");

         break;

     }},

     error -> {

       Crashlytics.logException(error);

       callback.error(error.getLocalizedMessage());

     });


波斯汪
浏览 122回答 3
3回答

噜噜哒

如果我没看错,您需要使用之前检索到的异步数据执行一些操作。因此,您可以使用.zip()运算符。这是一个例子:Observable.zip( &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;getOrgFromCreds().toObservable(), &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;getCredentials(), &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;(first,&nbsp;second)&nbsp;->&nbsp;/*create&nbsp;output&nbsp;object&nbsp;here*/) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.subscribe( &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;(n)&nbsp;->&nbsp;/*do&nbsp;onNext*/, &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;(e)&nbsp;->&nbsp;/*do&nbsp;onError*/ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;);请注意,该.zip()运算符将等待两个流的发射,然后它将使用您在“此处创建输出对象”中提供的函数创建外部发射。如果您不想等待这两个项目 - 您可以使用.combineLatest()。

SMILET

如果源流发出不止一项,singleOrError()则应该发出错误。文档对于您的情况,请使用first()或firstOrError()代替。&nbsp;&nbsp;Single<Organization>&nbsp;getOrgFromCreds(String&nbsp;orgid)&nbsp;{&nbsp; &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;getCredentials() &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.map(DeviceCredential::getOrganization) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.filter(org&nbsp;->&nbsp;org.getId().equals(orgid)) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.firstOrError(); &nbsp;&nbsp;}

回首忆惘然

这里的问题原来是 API 的设计方式很奇怪(不幸的是,文档非常糟糕)。我不明白为什么我得到重复项,并认为我使用flatMapIterable不正确。该deviceCredentialService.getCredentials()调用实际创建的是一个可观察对象,它发出DataEvent对象,这些对象是对结果列表的简单包装,并带有结果来源的标志。API 设计者希望允许用户使用本地缓存的数据立即填充 UI,同时执行对 REST API 的较长请求。该DataEvent.from属性是一个枚举,用于标记来自本地设备缓存或来自远程 API 调用的来源。我解决这个问题的方法是简单地忽略来自本地缓存的结果,只从 API 发出结果:&nbsp; Observable<DeviceCredential> getCredentials() {&nbsp; &nbsp; return deviceCredentialService()&nbsp; &nbsp; &nbsp; .getCredentials()&nbsp; &nbsp; &nbsp; // Only get creds from network&nbsp; &nbsp; &nbsp; .filter(e -> e.getFrom() == SyncedDataSourceObservableFactory.From.SOURCE)&nbsp; &nbsp; &nbsp; .flatMapIterable(e -> e.getData());&nbsp; }&nbsp; Single<Organization> getOrgFromCreds(String orgid) {&nbsp; &nbsp; return getCredentials()&nbsp; &nbsp; &nbsp; // A device is logically constrained to only have a single cred per org&nbsp; &nbsp; &nbsp; .map(DeviceCredential::getOrganization)&nbsp; &nbsp; &nbsp; .filter(org -> org.getId().equals(orgid))&nbsp; &nbsp; &nbsp; .singleOrError();&nbsp; }然后计划是使用记忆化缓存实体,使实施应用程序能够访问缓存失效。由于提供的接口不允许抑制 API 调用,因此如果应用程序感觉它是新鲜的,则无法仅使用缓存。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java