继续在 RxJs 中使用 mergeMap 可管道化的错误

我正在用 RxJs 管道和 mergeMap 操作符做一些并行的 HTTP get。


在第一个请求失败时(让我们想象 /urlnotexists 抛出 404 错误)它会停止所有其他请求。


我希望它继续查询所有剩余的 url,而不为这个失败的请求调用所有剩余的 mergeMap。


我尝试使用来自 RxJs 的 throwError 和 catchError 但没有成功。


索引.js


const { from } = require('rxjs');

const { mergeMap, scan } = require('rxjs/operators');


const request = {

  get: url => {

    return new Promise((resolve, reject) => {

      setTimeout(() => {

        if (url === '/urlnotexists') { return reject(new Error(url)); }

        return resolve(url);

      }, 1000);

    });

  }

};


(async function() {

  await from([

    '/urlexists',

    '/urlnotexists',

    '/urlexists2',

    '/urlexists3',

  ])

    .pipe(

      mergeMap(async url => {

        try {

          console.log('mergeMap 1:', url);

          const val = await request.get(url);

          return val;

        } catch(err) {

          console.log('err:', err.message);

          // a throw here prevent all remaining request.get() to be tried

        }

      }),

      mergeMap(async val => {

        // should not pass here if previous request.get() failed 

        console.log('mergeMap 2:', val);

        return val;

      }),

      scan((acc, val) => {

        // should not pass here if previous request.get() failed 

        acc.push(val);

        return acc;

      }, []),

    )

    .toPromise()

    .then(merged => {

      // should have merged /urlexists, /urlexists2 and /urlexists3

      // even if /urlnotexists failed

      console.log('merged:', merged);

    })

    .catch(err => {

      console.log('catched err:', err);

    });

})();

$ node index.js

mergeMap 1: /urlexists

mergeMap 1: /urlnotexists

mergeMap 1: /urlexists2

mergeMap 1: /urlexists3

err: /urlnotexists

mergeMap 2: /urlexists

mergeMap 2: undefined <- I didn't wanted this mergeMap to have been called

mergeMap 2: /urlexists2

mergeMap 2: /urlexists3

merged: [ '/urlexists', undefined, '/urlexists2', '/urlexists3' ]

我希望在最后发出并发 GET 请求并减少它们各自在一个对象中的值。


但是如果发生一些错误,我希望他们不要中断我的管道,而是记录它们。


有什么建议吗?


慕村225694
浏览 110回答 2
2回答

慕码人2483693

如果你想使用 RxJS,你应该在catchError与forkJoin.const { of, from, forkJoin } = rxjs;const { catchError, tap } = rxjs.operators;// your promise factory, unchanged (just shorter)const request = {&nbsp; get: url => {&nbsp; &nbsp; return new Promise((resolve, reject) => setTimeout(&nbsp; &nbsp; &nbsp; () => url === '/urlnotexists' ? reject(new Error(url)) : resolve(url), 1000&nbsp; &nbsp; ));&nbsp; }};// a single rxjs request with error handlingconst fetch$ = url => {&nbsp; console.log('before:', url);&nbsp; return from(request.get(url)).pipe(&nbsp; &nbsp; // add any additional operator that should be executed for each request here&nbsp; &nbsp; tap(val => console.log('after:', val)),&nbsp; &nbsp; catchError(error => {&nbsp; &nbsp; &nbsp; console.log('err:', error.message);&nbsp; &nbsp; &nbsp; return of(undefined);&nbsp; &nbsp; })&nbsp; );};// concurrently executed rxjs requestsforkJoin(["/urlexists", "/urlnotexists", "/urlexists2", "/urlexists3"].map(fetch$))&nbsp; .subscribe(merged => console.log("merged:", merged));<script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>

至尊宝的传说

如果你愿意放弃 RXJS 而只是用 async/await 解决它是非常简单的:const urls = ['/urlexists', '/urlnotexists', '/urlexists2', '/urlexists3']const promises = urls.map(url => request(url)const resolved = await Promise.allSettled(promises)// print out errorsresolved.forEach((r, i) => {&nbsp; if (r.status === "rejected') {&nbsp; &nbsp; console.log(`${urls[i]} failed: ${r.reason})&nbsp; }})// get the success resultsconst merged = resolved.filter(r => r.status === "resolved").map(r => r.value)console.log('merged', merged)这利用了Promise.allSettled提议的辅助方法。如果您的环境没有此方法,则可以按照此答案中所示实现它。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

JavaScript