如何停止去抖动的 Rxjs Observable?

我创建了一个 observable,它将在最后一次更改后 3 秒触发,并调用publishChange服务。它可以工作,但我想创建一个doImmediateChange函数,它publishChange立即调用并停止去抖动的 observable。这怎么可能?


我的组件:


class MyComponent {

    private updateSubject = new Subject<string>();


    ngOnInit() {

        this.updateSubject.pipe(

            debounceTime(3000),

            distinctUntilChanged()

        ).subscribe(val => {

            this.srv.publishChange(val);

        });

    }


    doChange(val: string) {

        this.updateSubject.next(val);

    }


    doImmediateChange(val: string) {


        // Stop the current updateSubject if debounce is in progress and call publish immediately

        // ??

        this.srv.publishChange(val);


    }


}


慕桂英3389331
浏览 134回答 3
3回答

墨色风雨

您可以debounceTime使用switchMap和进行模拟delay。然后取消内部 ObservabletakeUntil以防止发出等待值。private updateSubject = new Subject<string>();private interrupt = new Subject();ngOnInit() {&nbsp; this.updateSubject.pipe(&nbsp; &nbsp; switchMap(val => of(val).pipe(&nbsp; &nbsp; &nbsp; delay(3000),&nbsp; &nbsp; &nbsp; takeUntil(this.interrupt)&nbsp; &nbsp; ))&nbsp; ).subscribe(val => publish(val));}doChange(val: string) {&nbsp; this.updateSubject.next(val);}doImmediateChange(val: string) {&nbsp; this.interrupt.next();&nbsp; publish(val);}https://stackblitz.com/edit/rxjs-ya93fb

拉莫斯之舞

使用竞赛运算符:第一个完成的observable成为唯一订阅的observable,因此这个递归函数将在一次发射后完成take(1),然后重新订阅() => this.raceRecursive()。private timed$ = new Subject<string>();private event$ = new Subject<string>();ngOnInit() {&nbsp; this.raceRecursive()}raceRecursive() {&nbsp; race(&nbsp; &nbsp; this.timed$.pipe(debounceTime(1000)),&nbsp; &nbsp; this.event$&nbsp; )&nbsp; &nbsp; .pipe(take(1)) // force it to complete&nbsp; &nbsp; .subscribe(&nbsp; &nbsp; &nbsp; val => console.log(val), // srv call here&nbsp; &nbsp; &nbsp; err => console.error(err),&nbsp; &nbsp; &nbsp; () => this.raceRecursive() // reset it once complete&nbsp; &nbsp; )}doChange(val: string) {&nbsp; this.timed$.next(val)}doImmediateChange(val: string) {&nbsp; this.event$.next(val)}

茅侃侃

您可以使用debounce和race实现此行为:使用您提供的代码private destroy$ = new Subject<void>();private immediate$ = new Subject<void>();private updateSubject$ = new Subject<string>();constructor(private srv: PubSubService) {}ngOnInit() {&nbsp; this.updateSubject$.pipe(&nbsp; &nbsp; &nbsp; takeUntil(this.destroy$),&nbsp; &nbsp; &nbsp; debounce(() => race(timer(3000), this.immediate$))&nbsp; ).subscribe(val => {&nbsp; &nbsp; &nbsp; this.srv.publishChange(val);&nbsp; });}doChange(val: string, immediate?: boolean) {&nbsp; this.updateSubject$.next(val);&nbsp; if (immediate) this.immediate$.next();}// don't forget to unsubscribengOnDestroy() {&nbsp; this.destroy$.next();}立即发出更改将替换之前的正常更改(即去抖 3 秒),而不会延迟(感谢我们的种族可观察)。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

JavaScript