猿问

在 RxJs 中实现具有固定堆栈的排队系统

假设我想要一个队列,其中任何时候只异步处理 3 个项目,我该怎么做?

这就是我的意思:如果我有一组项目要上传到后端,即将一些人工制品上传到云存储,然后创建/更新一个文档以反映每个人工制品的 url,我不想:

  1. 在下一次之前异步/等待每个上传操作 - 因为这会很慢

  2. 同时发送所有内容 - 这可能导致写入热点或速率限制

  3. 做一个 promise.race - 这最终导致(2)

  4. 做一个 promise.all - 如果有一个长时间运行的上传,这个过程会变慢。

我想做的是:

  1. 有一个所有上传的队列,比如使用 RxJs 创建方法,例如from(array-of-upload-items)在任何时候处理 3 个项目的堆栈。

  2. 当一个项目离开堆栈即完成时,我们将一个新项目添加到队列中

  3. 确保在任何一点,堆栈中始终有 3 个项目正在处理,直到队列中没有更多项目等待放入堆栈。

我将如何使用 RxJs 来解决这个问题?

编辑:2020 年 6 月 27 日

这就是我的想法:

const rxQueue = from(filesArray) // this is the list of files to upload say like 25 files or so


      rxQueue

        .pipe(

          mergeMap((item) =>

            of(item).pipe(

              tap(async (item) => {

                  await Promise.race([

                      processUpload(item[0]),

                      processUpload(item[1]),

                      processUpload(item[2]),

                  ])

              }),

            ),

            3

          ),

        )

        .subscribe()

目标是确保在任何时候都处理(上传)3 个文件,以至于如果一个文件上传过程结束,则添加另一个文件以将堆栈保持在 3 个上传过程中。同理,如果 2 个文件上传同时结束,则将 2 个新文件添加到堆栈中,依此类推,直到文件数组中的所有文件都上传完毕。


慕标5832272
浏览 108回答 2
2回答

ABOUTYOU

我想你可以试试这个:from(filesArray)  .pipe(    mergeMap(file => service.uploadFile(file), 3)  )这假设service.uploadFile返回一个承诺或一个可观察的。假设您有 5 个文件,那么将从前 3 个文件创建 3 个可观察对象,当其中一个完成时,将获取第 4 个文件并从中创建一个新的可观察对象,依此类推。

偶然的你

用作Subject队列,并且mergeMap有一个并发参数,您可以限制最大并发数const queue=new Subject()queque.asObservable().pipe(mergeMap(item=>httpCall(item),3)queue.next(item)
随时随地看视频慕课网APP

相关分类

JavaScript
我要回答