使用 TestSchedulers、Rx 和 BlockingCollection 进行死锁测试

我有以下类,它基本上订阅int observable 并将值乘以 2。出于现实目的,我添加了 Thread.Sleep 来模拟繁重的处理。


public class WorkingClass

{

    private BlockingCollection<int> _collection = new BlockingCollection<int>(1);


    public WorkingClass(IObservable<int> rawValues)

    {

        rawValues.Subscribe(x => _collection.Add(x));

    }


    public IObservable<int> ProcessedValues()

    {

        return Observable.Create<int>(observer =>

        {

            while (true)

            {

                int value;


                try

                {

                    value = _collection.Take();

                }

                catch (Exception ex)

                {

                    observer.OnError(ex);

                    break;

                }


                Thread.Sleep(1000); //Simulate long work

                observer.OnNext(value * 2);

            }


            return Disposable.Empty;

        });

    }

}

我在测试它时遇到了麻烦,在下面的测试中我只想断言如果源流发出值 1,SUT 将发出值 2:


[Test]

public void SimpleTest()

{

    var sourceValuesScheduler = new TestScheduler();

    var newThreadScheduler = new TestScheduler();


    var source = sourceValuesScheduler.CreateHotObservable(

         new Recorded<Notification<int>>(1000, Notification.CreateOnNext(1)));


    var sut = new WorkingClass(source);


    var observer = sourceValuesScheduler.CreateObserver<int>();


    sut.ProcessedValues()

        .SubscribeOn(newThreadScheduler) //The cold part (i.e, the while loop) of the ProcessedValues Observable should run in a different thread

        .Subscribe(observer);


    sourceValuesScheduler.AdvanceTo(1000);


    observer.Messages.AssertEqual(new Recorded<Notification<int>>(1000, Notification.CreateOnNext(2)));

}

如果我运行此测试,则断言会失败,因为 newThreadScheduler 从未启动,因此从未创建 ProcessedValues observable。如果我这样做:


 sourceValuesScheduler.AdvanceTo(1000);

 newThreadScheduler.AdvanceTo(1000); 

它也不起作用,因为 newThreadScheduler 使用与 sourceValuesScheduler 相同的线程,因此测试将在处理后的值被发出后立即挂起,在以下行:


value = _collection.Take();

有没有办法让多个 TestScheduler 在不同的线程上运行?否则我怎么能测试这样的课程呢?


莫回无
浏览 89回答 1
1回答

料青山看我应如是

Take()阻塞,直到有一个项目可以从 中删除BlockingCollection<int>或者您调用CompleteAdding()它。ProcessedValues()鉴于您当前的实现,您订阅并执行循环的线程while将永远不会完成。您应该BlockingCollection<int>在单独的线程上使用它。Task例如,您可以在ProcessedValues()调用时创建消耗。考虑以下实现,它也处理BlockingCollection<int>:public sealed class WorkingClass : IDisposable{&nbsp; &nbsp; private BlockingCollection<int> _collection = new BlockingCollection<int>(1);&nbsp; &nbsp; private List<Task> _consumerTasks = new List<Task>();&nbsp; &nbsp; public WorkingClass(IObservable<int> rawValues)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; rawValues.Subscribe(x => _collection.Add(x));&nbsp; &nbsp; }&nbsp; &nbsp; public IObservable<int> ProcessedValues()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; return Observable.Create<int>(observer =>&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _consumerTasks.Add(Task.Factory.StartNew(() => Consume(observer), TaskCreationOptions.LongRunning));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return Disposable.Empty;&nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; }&nbsp; &nbsp; private void Consume(IObserver<int> observer)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; try&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; foreach (int value in _collection.GetConsumingEnumerable())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Thread.Sleep(1000); //Simulate long work&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; observer.OnNext(value * 2);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; catch (Exception ex)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; observer.OnError(ex);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; public void Dispose()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; _collection.CompleteAdding();&nbsp; &nbsp; &nbsp; &nbsp; Task.WaitAll(_consumerTasks.ToArray());&nbsp; &nbsp; &nbsp; &nbsp; _collection.Dispose();&nbsp; &nbsp; }}可以使用以下代码进行测试:var sourceValuesScheduler = new TestScheduler();var source = sourceValuesScheduler.CreateHotObservable(&nbsp; &nbsp; new Recorded<Notification<int>>(1000, Notification.CreateOnNext(1)));var observer = sourceValuesScheduler.CreateObserver<int>();using (var sut = new WorkingClass(source)){&nbsp; &nbsp; sourceValuesScheduler.AdvanceTo(1000); //add to collection&nbsp; &nbsp; sut.ProcessedValues().Subscribe(observer); //consume} //...and wait until the loop existsobserver.Messages.AssertEqual(new Recorded<Notification<int>>(1000, Notification.CreateOnNext(2)));
打开App,查看更多内容
随时随地看视频慕课网APP