猿问

反应式服务。分组和缓存流

新增内容:带有测试的完整源代码现在位于https://github.com/bboyle1234/ReactiveTest

假设我们有一个视图状态对象,可以通过小的部分视图更改事件进行更新。以下是总视图、增量视图更新事件和Update构建总视图的累加器函数的一些示例模型:

interface IDeviceView : ICloneable {

    Guid DeviceId { get; }

}


class DeviceTotalView : IDeviceView {

    public Guid DeviceId { get; set; }

    public int Voltage { get; set; }

    public int Currents { get; set; }

    public object Clone() => this.MemberwiseClone();

}


class DeviceVoltagesUpdateView : IDeviceView {

    public Guid DeviceId { get; set; }

    public int Voltage { get; set; }

    public object Clone() => this.MemberwiseClone();

}


class DeviceCurrentsUpdateView : IDeviceView {

    public Guid DeviceId { get; set; }

    public int Current { get; set; }

    public object Clone() => this.MemberwiseClone();

}


class DeviceUpdateEvent {

    public DeviceTotalView View;

    public IDeviceView LastUpdate;

}


static DeviceUpdateEvent Update(DeviceUpdateEvent previousUpdate, IDeviceView update) {

    if (update.DeviceId != previousUpdate.View.DeviceId) throw new InvalidOperationException("Device ids do not match (numskull exception).");

    var view = (DeviceTotalView)previousUpdate.View.Clone();

    switch (update) {

        case DeviceVoltagesUpdateView x: {

            view.Voltage = x.Voltage;

            break;

        }

        case DeviceCurrentsUpdateView x: {

            view.Currents = x.Current;

            break;

        }

    }

    return new DeviceUpdateEvent { View = view, LastUpdate = update };

}

接下来,假设我们已经有一个可注入服务,能够为所有设备生成小更新事件的可观察流,并且我们希望创建一个可以为各个设备生成聚合视图流的服务。


这是我们要创建的服务的接口:


interface IDeviceService {

    /// <summary>

    /// Gets an observable that produces aggregated update events for the device with the given deviceId.

    /// On subscription, the most recent event is immediately pushed to the subscriber.

    /// There can be multiple subscribers.

    /// </summary>

    IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId);

}

如何使用System.Reactive v4库中的反应式扩展(目标)来实现此接口及其要求.netstandard2.0?这是我的带有注释的锅炉代码,这是我所能得到的。


红糖糍粑
浏览 110回答 2
2回答

杨__羊羊

接受的答案中给出的实现虽然对我来说是一次神奇的教育,但也有一些问题需要依次解决。第一个是线程竞争问题,第二个是系统中存在大量设备时的性能问题。我最终解决了线程竞争并通过这个修改后的实现显着提高了性能:在构造函数中,分组和扫描的设备流直接订阅到 a BehaviorSubject,它实现了Replay(1).RefCount()立即通知新订阅者流中最新值所需的功能。在该方法中,我们继续使用字典查找来查找设备流,如果字典中尚不存在则GetDeviceStream创建预加载。BehaviorSubject我们已经删除了Where上述问题中先前实现中存在的搜索。使用 where 搜索会导致线程竞争问题,该问题通过使分组流可重播得以解决。这导致了指数性能问题。替换它,将FirstOrDefault花费的时间减少一半,然后完全删除它以支持GetCreate字典技术,从而获得完美的性能 O(1) 而不是 O(n2)。GetCreateSubject使用Lazy代理对象作为字典值,因为有时可以针对单个键多次ConcurrentDictionary调用该方法。向字典Create提供 a可确保仅在其中一个懒惰者上调用该属性,因此每个设备只创建一个。LazyValueBehaviorSubjectclass DeviceService : IDeviceService, IDisposable {&nbsp; &nbsp; readonly CompositeDisposable _disposable = new CompositeDisposable();&nbsp; &nbsp; readonly ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>> _streams = new ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>>();&nbsp; &nbsp; BehaviorSubject<DeviceUpdateEvent> GetCreateSubject(Guid deviceId) {&nbsp; &nbsp; &nbsp; &nbsp; return _streams.GetOrAdd(deviceId, Create).Value;&nbsp; &nbsp; &nbsp; &nbsp; Lazy<BehaviorSubject<DeviceUpdateEvent>> Create(Guid id) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new Lazy<BehaviorSubject<DeviceUpdateEvent>>(() => {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var subject = new BehaviorSubject<DeviceUpdateEvent>(DeviceUpdateEvent.GetInitialView(deviceId));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _disposable.Add(subject);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return subject;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; public DeviceService(IConnectableObservable<IDeviceView> source) {&nbsp; &nbsp; &nbsp; &nbsp; _disposable.Add(source&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .GroupBy(x => x.DeviceId)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Subscribe(deviceStream => {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _disposable.Add(deviceStream&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Scan(DeviceUpdateEvent.GetInitialView(deviceStream.Key), DeviceUtils.Update)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Subscribe(GetCreateSubject(deviceStream.Key)));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }));&nbsp; &nbsp; &nbsp; &nbsp; _disposable.Add(source.Connect());&nbsp; &nbsp; }&nbsp; &nbsp; public void Dispose() {&nbsp; &nbsp; &nbsp; &nbsp; _disposable.Dispose();&nbsp; &nbsp; }&nbsp; &nbsp; public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId) {&nbsp; &nbsp; &nbsp; &nbsp; return GetCreateSubject(deviceId).AsObservable();&nbsp; &nbsp; }}[TestMethod]public async Task Test2() {&nbsp; &nbsp; var input = new AsyncProducerConsumerQueue<IDeviceView>();&nbsp; &nbsp; var source = new ConnectableObservableForAsyncProducerConsumerQueue<IDeviceView>(input);&nbsp; &nbsp; var service = new DeviceService(source);&nbsp; &nbsp; var ids = Enumerable.Range(0, 100000).Select(i => Guid.NewGuid()).ToArray();&nbsp; &nbsp; var idsRemaining = ids.ToHashSet();&nbsp; &nbsp; var t1 = Task.Run(async () => {&nbsp; &nbsp; &nbsp; &nbsp; foreach (var id in ids) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id, Voltage = 1 });&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; });&nbsp; &nbsp; var t2 = Task.Run(() => {&nbsp; &nbsp; &nbsp; &nbsp; foreach (var id in ids) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; service.GetDeviceStream(id).Subscribe(x => idsRemaining.Remove(x.View.DeviceId));&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; });&nbsp; &nbsp; await Task.WhenAll(t1, t2);&nbsp; &nbsp; var sw = Stopwatch.StartNew();&nbsp; &nbsp; while (idsRemaining.Count > 0) {&nbsp; &nbsp; &nbsp; &nbsp; if (sw.Elapsed.TotalSeconds > 600) throw new Exception("Failed");&nbsp; &nbsp; &nbsp; &nbsp; await Task.Delay(100);&nbsp; &nbsp; }}在这里查看整个问题源代码和测试代码: https:&nbsp;//github.com/bboyle1234/ReactiveTest

九州编程

你的要点中有一些奇怪的代码。这是我的工作内容:public class DeviceService : IDeviceService, IDisposable{&nbsp; &nbsp; readonly IObservable<IDeviceView> Source;&nbsp; &nbsp; private readonly Dictionary<Guid, IObservable<DeviceUpdateEvent>> _updateStreams = new Dictionary<Guid, IObservable<DeviceUpdateEvent>>();&nbsp; &nbsp; private readonly IObservable<(Guid, IObservable<DeviceUpdateEvent>)> _groupedStream;&nbsp; &nbsp; private readonly CompositeDisposable _disposable = new CompositeDisposable();&nbsp; &nbsp; public DeviceService(IObservable<IDeviceView> source)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; Source = source;&nbsp; &nbsp; &nbsp; &nbsp; _groupedStream = source&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .GroupBy(v => v.DeviceId)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Select(o => (o.Key, o&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Scan(new DeviceUpdateEvent { View = DeviceTotalView.GetInitialView(o.Key), LastUpdate = null }, (lastTotalView, newView) => lastTotalView.Update(newView))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Replay(1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .RefCount()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ));&nbsp; &nbsp; &nbsp; &nbsp; var groupSubscription = _groupedStream.Subscribe(t =>&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _updateStreams[t.Item1] = t.Item2;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _disposable.Add(t.Item2.Subscribe());&nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; _disposable.Add(groupSubscription);&nbsp; &nbsp; }&nbsp; &nbsp; public void Dispose()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; _disposable.Dispose();&nbsp; &nbsp; }&nbsp; &nbsp; public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; /// How do we implement this? The observable that we return must be pre-loaded with the latest update&nbsp; &nbsp; &nbsp; &nbsp; if(this._updateStreams.ContainsKey(deviceId))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return this._updateStreams[deviceId];&nbsp; &nbsp; &nbsp; &nbsp; return _groupedStream&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Where(t => t.Item1 == deviceId)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Select(t => t.Item2)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Switch();&nbsp; &nbsp; }}这里的肉就是_groupedStream一块。正如您所说,您按 DeviceId 进行分组,然后用于Scan更新状态。我还转移Update到静态类并使其成为扩展方法。你需要一个初始状态,所以我修改了你的DeviceTotalView类来获得它。相应修改:public class DeviceTotalView : IDeviceView{&nbsp; &nbsp; public Guid DeviceId { get; set; }&nbsp; &nbsp; public int Voltage { get; set; }&nbsp; &nbsp; public int Currents { get; set; }&nbsp; &nbsp; public object Clone() => this.MemberwiseClone();&nbsp; &nbsp; public static DeviceTotalView GetInitialView(Guid deviceId)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; return new DeviceTotalView&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; DeviceId = deviceId,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Voltage = 0,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Currents = 0&nbsp; &nbsp; &nbsp; &nbsp; };&nbsp; &nbsp; }}接下来,它.Replay(1).Refcount()会记住最新的更新,然后在订阅时提供该更新。然后,我们将所有这些子可观察量填充到字典中,以便在方法调用时轻松检索。虚拟订阅 ( _disposable.Add(t.Item2.Subscribe())) 是Replay正常工作所必需的。如果早期请求尚未更新的 DeviceId,我们会订阅该设备,它将_groupedStream等待第一次更新,生成该 Id 的可观察值,然后.Switch订阅该子可观察值。然而,所有这些都在你的测试代码中失败了,我猜是因为这个ConnectableObservableForAsyncProducerConsumerQueue类。我不想调试它,因为我不建议这样做。一般来说,不建议混合 TPL 和 Rx 代码。他们解决的问题大部分是重叠的,而且互相妨碍。因此,我修改了您的测试代码,用重播主题替换了可连接的可观察队列。我还添加了早期请求的测试用例(在该设备的更新到达之前):DeviceUpdateEvent deviceView1 = null;DeviceUpdateEvent deviceView2 = null;DeviceUpdateEvent deviceView3 = null;var subject = new ReplaySubject<IDeviceView>();var id1 = Guid.NewGuid();var id2 = Guid.NewGuid();var id3 = Guid.NewGuid();subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 1 });subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 2 });subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 100 });var service = new DeviceService(subject);service.GetDeviceStream(id1).Subscribe(x => deviceView1 = x);service.GetDeviceStream(id2).Subscribe(x => deviceView2 = x);service.GetDeviceStream(id3).Subscribe(x => deviceView3 = x);/// I believe there is no need to pause here because the Subscribe method calls above&nbsp;/// block until the events have all been pushed into the subscribers above.Assert.AreEqual(deviceView1.View.DeviceId, id1);Assert.AreEqual(deviceView2.View.DeviceId, id2);Assert.AreEqual(deviceView1.View.Voltage, 2);Assert.AreEqual(deviceView2.View.Voltage, 100);Assert.IsNull(deviceView3);subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 101 });Assert.AreEqual(deviceView2.View.Voltage, 101);subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id3, Voltage = 101 });Assert.AreEqual(deviceView3.View.DeviceId, id3);Assert.AreEqual(deviceView3.View.Voltage, 101);这一切都很好,并且可以在没有异步的情况下运行。另外,作为一般提示,我建议使用Microsoft.Reactive.Testing包对 Rx 代码进行单元测试,而不是进行时间间隔测试。
随时随地看视频慕课网APP
我要回答