如何从 Action<T> 创建 Observable?

我试图从一个 Action 创建一个 Observable,但是 andFromEventPattern语法FromEvent让我很难理解。


这是我的尝试:


Observable

    .FromEventPattern<Action<IStreamTrade>, IStreamTrade>(

        x => _polygonSocket.TradeReceived += x,

        x => _polygonSocket.TradeReceived -= x)

    .Select(x => x?.EventArgs)

    .IsNotNull()

    .Subscribe( /** do stuff **/)

    .DisposeWith(Bindings);

我觉得FromEventPattern这里可能不是正确的选择,但我无法编译其他任何内容。


会发生什么:


我收到一个Parameter count mismatch.异常,如下所示:


Parameter count mismatch.

Thread started:  #26

  at System.Delegate.CreateDelegate (System.Type type, System.Object firstArgument, System.Reflection.MethodInfo method, System.Boolean throwOnBindFailure, System.Boolean allowClosed) [0x000ee] in /Users/builder/jenkins/workspace/xamarin-macios/xamarin-macios/external/mono/mcs/class/corlib/System/Delegate.cs:230 

  at System.Delegate.CreateDelegate (System.Type type, System.Object firstArgument, System.Reflection.MethodInfo method) [0x00000] in /Users/builder/jenkins/workspace/xamarin-macios/xamarin-macios/external/mono/mcs/class/corlib/System/Delegate.cs:296 

  at System.Reactive.ReflectionUtils.CreateDelegate[TDelegate] (System.Object o, System.Reflection.MethodInfo method) [0x00000] in <370f6a6bb34048878534065376a195cb>:0 

  at System.Reactive.Linq.ObservableImpl.FromEventPattern+Impl`2[TDelegate,TEventArgs].GetHandler (System.Action`1[T] onNext) [0x0003d] in <370f6a6bb34048878534065376a195cb>:0 

  at System.Reactive.Linq.ObservableImpl.EventProducer`2+Session[TDelegate,TArgs].Initialize () [0x00023] in <370f6a6bb34048878534065376a195cb>:0 

  at System.Reactive.Linq.ObservableImpl.EventProducer`2+Session[TDelegate,TArgs].Connect (System.IObserver`1[T] observer) [0x00033] in <370f6a6bb34048878534065376a195cb>:0 

我正在寻找什么:


我想学习如何从Action<IStreamTrade>. 我对使用的解决方案持开放态度Observable.Create,但更喜欢使用FromEvent或的解决方案FromEventPattern(如果可能的话)。

GitHub 源代码


蛊毒传说
浏览 42回答 2
2回答

ITMISS

尝试使用Observable.FromEvent,它将通用的基于 Action 的 .NET 事件转换为可观察的序列:Observable.FromEvent<IStreamTrade>(&nbsp; &nbsp; x => _polygonSocket.TradeReceived += x,&nbsp; &nbsp; x => _polygonSocket.TradeReceived -= x).Subscribe( /** do stuff **/).DisposeWith(Bindings);

繁星coding

虽然我更喜欢,并使用了维塔利的方法。这是使用的另一种方法Observable.Create。Observable&nbsp; &nbsp; .Create<IStreamTrade>(&nbsp; &nbsp; x =>&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; Action<IStreamTrade> aist = st => x.OnNext(st);&nbsp; &nbsp; &nbsp; &nbsp; _polygonSocket.TradeReceived += aist;&nbsp; &nbsp; &nbsp; &nbsp; return Disposable.Create(() => _polygonSocket.TradeReceived -= aist);&nbsp; &nbsp; })&nbsp; &nbsp; .Subscribe()&nbsp; &nbsp; .DisposeWith(Bindings);
打开App,查看更多内容
随时随地看视频慕课网APP