我有以下类,它基本上订阅int observable 并将值乘以 2。出于现实目的,我添加了 Thread.Sleep 来模拟繁重的处理。
public class WorkingClass
{
private BlockingCollection<int> _collection = new BlockingCollection<int>(1);
public WorkingClass(IObservable<int> rawValues)
{
rawValues.Subscribe(x => _collection.Add(x));
}
public IObservable<int> ProcessedValues()
{
return Observable.Create<int>(observer =>
{
while (true)
{
int value;
try
{
value = _collection.Take();
}
catch (Exception ex)
{
observer.OnError(ex);
break;
}
Thread.Sleep(1000); //Simulate long work
observer.OnNext(value * 2);
}
return Disposable.Empty;
});
}
}
我在测试它时遇到了麻烦,在下面的测试中我只想断言如果源流发出值 1,SUT 将发出值 2:
[Test]
public void SimpleTest()
{
var sourceValuesScheduler = new TestScheduler();
var newThreadScheduler = new TestScheduler();
var source = sourceValuesScheduler.CreateHotObservable(
new Recorded<Notification<int>>(1000, Notification.CreateOnNext(1)));
var sut = new WorkingClass(source);
var observer = sourceValuesScheduler.CreateObserver<int>();
sut.ProcessedValues()
.SubscribeOn(newThreadScheduler) //The cold part (i.e, the while loop) of the ProcessedValues Observable should run in a different thread
.Subscribe(observer);
sourceValuesScheduler.AdvanceTo(1000);
observer.Messages.AssertEqual(new Recorded<Notification<int>>(1000, Notification.CreateOnNext(2)));
}
如果我运行此测试,则断言会失败,因为 newThreadScheduler 从未启动,因此从未创建 ProcessedValues observable。如果我这样做:
sourceValuesScheduler.AdvanceTo(1000);
newThreadScheduler.AdvanceTo(1000);
它也不起作用,因为 newThreadScheduler 使用与 sourceValuesScheduler 相同的线程,因此测试将在处理后的值被发出后立即挂起,在以下行:
value = _collection.Take();
有没有办法让多个 TestScheduler 在不同的线程上运行?否则我怎么能测试这样的课程呢?
料青山看我应如是
相关分类