如何以同步方式合并两个 TPL DataFlow 管道?

我想编写一个应用程序来评估来自两个传感器的传感器数据。两个传感器都将它们的数据发送到Package对象中,这些对象被分成多个Frame对象。APackage本质上是a Tuple<Timestamp, Data[]>,aFrame是a Tuple<Timestamp, Data>。然后我需要始终Frame使用来自两个来源的最早时间戳。


所以基本上我的对象流是


Package -(1:n)-> Frame \

                        }-pair synchronized-> Tuple<Frame, Frame>

Package -(1:n)-> Frame /

例子

假设每个Package包含 2 或 3 个值(现实:5-7)和递增 1 的整数时间戳(现实:~200Hz => ~5ms 增量)。“数据”只是timestamp * 100为了简单起见。


Packages (timestamp, values[])


Source 1:

{(19, [1700, 1800, 1900]), (22, [2000, 2100, 2200]), (26, [2500, 2600]),

 (29, [2700, 2800, 2900]), ...}


Source 2:

{(17, [1500, 1600, 1700]), (19, [1800, 1900]), (21, [2000, 2100]),

 (26, [2400, 2500, 2600]), ...}

步骤后(1:n):


Frames (timestamp, value)


Source 1:

{(17, 1700), (18, 1800), (19, 1900), (20, 2000), (21, 2100),

 (22, 2200), (25, 2500), (26, 2600), (27, 2700), (28, 2800),

 (29, 2900), ...}


Source 2:

{(15, 1500), (16, 1600), (17, 1700), (18, 1800), (19, 1900),

 (20, 2000), (21, 2100), (24, 2400), (25, 2500), (26, 2600), ...}

步骤后pair synchronized:


Merged tuples (timestamp, source1, source2)


{(15, null, 1500), (16, null, 1600), (17, 1700, 1700), (18, 1800, 1800),

 (19, 1900, 1900), (20, 2000, 2000), (21, 2100, 2100), (22, 2200, null),

 (24, null, 2400), (25, 2500, 2500), (26, 2600, 2600), ...}

请注意,23缺少时间戳,因为两个源都没有发送值。那只是一个副作用。我可以放入或不放入一个空元组,这无关紧要。(27, 2700, 2700)元组是or也没有关系((27, 2700), (27, 2700)),即Tuple<Timestamp, Data, Data>or Tuple<Frame, Frame>。


吃鸡游戏
浏览 85回答 2
2回答

牛魔王的故事

TPL DataFlow API 的问题在于,一切都是内部/私有和/或密封的。这给您扩展 API 的可能性不大。无论如何,对于您的问题,实现一个新的 SynchronizedJoinBlock 类可能是个好主意。实际的业务逻辑位于 GetMessagesRecursive 方法中:&nbsp; &nbsp; public sealed class SynchronizedJoinBlock<T1, T2>&nbsp; &nbsp; &nbsp; &nbsp; : IReceivableSourceBlock<Tuple<T1, T2>>&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; private readonly object _syncObject = new object();&nbsp; &nbsp; &nbsp; &nbsp; private readonly Func<T1, T2, int> _compareFunction;&nbsp; &nbsp; &nbsp; &nbsp; private readonly Queue<T1> _target1Messages;&nbsp; &nbsp; &nbsp; &nbsp; private readonly Queue<T2> _target2Messages;&nbsp; &nbsp; &nbsp; &nbsp; private readonly TransformManyBlock<T1, Tuple<T1, T2>> _target1;&nbsp; &nbsp; &nbsp; &nbsp; private readonly TransformManyBlock<T2, Tuple<T1, T2>> _target2;&nbsp; &nbsp; &nbsp; &nbsp; private readonly BatchedJoinBlock<Tuple<T1, T2>, Tuple<T1, T2>> _batchedJoinBlock;&nbsp; &nbsp; &nbsp; &nbsp; private readonly TransformManyBlock<Tuple<IList<Tuple<T1, T2>>, IList<Tuple<T1, T2>>>, Tuple<T1, T2>> _transformManyBlock;&nbsp; &nbsp; &nbsp; &nbsp; public ITargetBlock<T1> Target1 => _target1;&nbsp; &nbsp; &nbsp; &nbsp; public ITargetBlock<T2> Target2 => _target2;&nbsp; &nbsp; &nbsp; &nbsp; public Task Completion => _transformManyBlock.Completion;&nbsp; &nbsp; &nbsp; &nbsp; public SynchronizedJoinBlock(Func<T1, T2, int> compareFunction)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _compareFunction = compareFunction&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ?? throw new ArgumentNullException(nameof(compareFunction));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _batchedJoinBlock = new BatchedJoinBlock<Tuple<T1, T2>, Tuple<T1, T2>>(1);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _target1Messages = new Queue<T1>();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _target2Messages = new Queue<T2>();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Func<ICollection<Tuple<T1, T2>>> getMessagesFunction = () =>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; lock (_syncObject)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (_target1Messages.Count > 0 && _target2Messages.Count > 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return GetMessagesRecursive(_target1Messages.Peek(), _target2Messages.Peek()).ToArray();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new Tuple<T1, T2>[0];&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; };&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _target1 = new TransformManyBlock<T1, Tuple<T1, T2>>((element) =>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _target1Messages.Enqueue(element);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return getMessagesFunction();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _target1.LinkTo(_batchedJoinBlock.Target1, new DataflowLinkOptions() { PropagateCompletion = true });&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _target2 = new TransformManyBlock<T2, Tuple<T1, T2>>((element) =>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _target2Messages.Enqueue(element);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return getMessagesFunction();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _target2.LinkTo(_batchedJoinBlock.Target2, new DataflowLinkOptions() { PropagateCompletion = true });&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _transformManyBlock = new TransformManyBlock<Tuple<IList<Tuple<T1, T2>>, IList<Tuple<T1, T2>>>, Tuple<T1, T2>>(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; element => element.Item1.Concat(element.Item2)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; );&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _batchedJoinBlock.LinkTo(_transformManyBlock, new DataflowLinkOptions() { PropagateCompletion = true });&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; private IEnumerable<Tuple<T1, T2>> GetMessagesRecursive(T1 value1, T2 value2)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int result = _compareFunction(value1, value2);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (result == 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; yield return Tuple.Create(_target1Messages.Dequeue(), _target2Messages.Dequeue());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else if (result < 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; yield return Tuple.Create(_target1Messages.Dequeue(), default(T2));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (_target1Messages.Count > 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; foreach (var item in GetMessagesRecursive(_target1Messages.Peek(), value2))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; yield return item;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; yield return Tuple.Create(default(T1), _target2Messages.Dequeue());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (_target2Messages.Count > 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; foreach (var item in GetMessagesRecursive(value1, _target2Messages.Peek()))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; yield return item;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; public void Complete()&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _target1.Complete();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _target2.Complete();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; Tuple<T1, T2> ISourceBlock<Tuple<T1, T2>>.ConsumeMessage(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; DataflowMessageHeader messageHeader,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .ConsumeMessage(messageHeader, target, out messageConsumed);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; void IDataflowBlock.Fault(Exception exception)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ((IDataflowBlock)_transformManyBlock).Fault(exception);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; DataflowLinkOptions linkOptions)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return _transformManyBlock.LinkTo(target, linkOptions);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; void ISourceBlock<Tuple<T1, T2>>.ReleaseReservation(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .ReleaseReservation(messageHeader, target);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; bool ISourceBlock<Tuple<T1, T2>>.ReserveMessage(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .ReserveMessage(messageHeader, target);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; public bool TryReceive(Predicate<Tuple<T1, T2>> filter, out Tuple<T1, T2> item)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return _transformManyBlock.TryReceive(filter, out item);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; public bool TryReceiveAll(out IList<Tuple<T1, T2>> items)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return _transformManyBlock.TryReceiveAll(out items);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }

叮当猫咪

这是一个SynchronizedJoinBlock块的实现,类似于 Hardy Hobeck's answer中提供的那个。Target1这个负责处理一些次要的细节,例如取消、处理异常,以及在输入块Target2被标记为已完成时处理传播剩余项目。此外,合并逻辑不涉及递归,这应该使其性能更好(希望我没有测量它)并且不易受到堆栈溢出异常的影响。小偏差:输出是一个ValueTuple<T1, T2>而不是Tuple<T1, T2>(目的是减少分配)。public sealed class SynchronizedJoinBlock<T1, T2> : IReceivableSourceBlock<(T1, T2)>{&nbsp; &nbsp; private readonly Func<T1, T2, int> _comparison;&nbsp; &nbsp; private readonly Queue<T1> _queue1 = new Queue<T1>();&nbsp; &nbsp; private readonly Queue<T2> _queue2 = new Queue<T2>();&nbsp; &nbsp; private readonly ActionBlock<T1> _input1;&nbsp; &nbsp; private readonly ActionBlock<T2> _input2;&nbsp; &nbsp; private readonly BufferBlock<(T1, T2)> _output;&nbsp; &nbsp; private readonly object _locker = new object();&nbsp; &nbsp; public SynchronizedJoinBlock(Func<T1, T2, int> comparison,&nbsp; &nbsp; &nbsp; &nbsp; CancellationToken cancellationToken = default)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; _comparison = comparison ?? throw new ArgumentNullException(nameof(comparison));&nbsp; &nbsp; &nbsp; &nbsp; // Create the three internal blocks&nbsp; &nbsp; &nbsp; &nbsp; var options = new ExecutionDataflowBlockOptions()&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; CancellationToken = cancellationToken&nbsp; &nbsp; &nbsp; &nbsp; };&nbsp; &nbsp; &nbsp; &nbsp; _input1 = new ActionBlock<T1>(Add1, options);&nbsp; &nbsp; &nbsp; &nbsp; _input2 = new ActionBlock<T2>(Add2, options);&nbsp; &nbsp; &nbsp; &nbsp; _output = new BufferBlock<(T1, T2)>(options);&nbsp; &nbsp; &nbsp; &nbsp; // Link the input blocks with the output block&nbsp; &nbsp; &nbsp; &nbsp; var inputTasks = new Task[] { _input1.Completion, _input2.Completion };&nbsp; &nbsp; &nbsp; &nbsp; Task.WhenAny(inputTasks).Unwrap().ContinueWith(t =>&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // If ANY input block fails, then the whole block has failed&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ((IDataflowBlock)_output).Fault(t.Exception.InnerException);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (!_input1.Completion.IsCompleted) _input1.Complete();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (!_input2.Completion.IsCompleted) _input2.Complete();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ClearQueues();&nbsp; &nbsp; &nbsp; &nbsp; }, default, TaskContinuationOptions.OnlyOnFaulted |&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; TaskContinuationOptions.RunContinuationsAsynchronously,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; TaskScheduler.Default);&nbsp; &nbsp; &nbsp; &nbsp; Task.WhenAll(inputTasks).ContinueWith(t =>&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // If ALL input blocks succeeded, then the whole block has succeeded&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (!t.IsCanceled) PostRemaining(); // Post what's left&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; catch (Exception ex)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ((IDataflowBlock)_output).Fault(ex);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _output.Complete();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ClearQueues();&nbsp; &nbsp; &nbsp; &nbsp; }, default, TaskContinuationOptions.NotOnFaulted |&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; TaskContinuationOptions.RunContinuationsAsynchronously,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; TaskScheduler.Default);&nbsp; &nbsp; }&nbsp; &nbsp; public ITargetBlock<T1> Target1 => _input1;&nbsp; &nbsp; public ITargetBlock<T2> Target2 => _input2;&nbsp; &nbsp; public Task Completion => _output.Completion;&nbsp; &nbsp; private void Add1(T1 value1)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; lock (_locker)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _queue1.Enqueue(value1);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; FindAndPostMatched_Unsafe();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; private void Add2(T2 value2)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; lock (_locker)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _queue2.Enqueue(value2);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; FindAndPostMatched_Unsafe();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; private void FindAndPostMatched_Unsafe()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; while (_queue1.Count > 0 && _queue2.Count > 0)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var result = _comparison(_queue1.Peek(), _queue2.Peek());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (result < 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _output.Post((_queue1.Dequeue(), default));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else if (result > 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _output.Post((default, _queue2.Dequeue()));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else // result == 0&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _output.Post((_queue1.Dequeue(), _queue2.Dequeue()));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; private void PostRemaining()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; lock (_locker)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (_queue1.Count > 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _output.Post((_queue1.Dequeue(), default));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (_queue2.Count > 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _output.Post((default, _queue2.Dequeue()));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; private void ClearQueues()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; lock (_locker)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _queue1.Clear();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _queue2.Clear();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; public void Complete() => _output.Complete();&nbsp; &nbsp; public void Fault(Exception exception)&nbsp; &nbsp; &nbsp; &nbsp; => ((IDataflowBlock)_output).Fault(exception);&nbsp; &nbsp; public IDisposable LinkTo(ITargetBlock<(T1, T2)> target,&nbsp; &nbsp; &nbsp; &nbsp; DataflowLinkOptions linkOptions)&nbsp; &nbsp; &nbsp; &nbsp; => _output.LinkTo(target, linkOptions);&nbsp; &nbsp; public bool TryReceive(Predicate<(T1, T2)> filter, out (T1, T2) item)&nbsp; &nbsp; &nbsp; &nbsp; => _output.TryReceive(filter, out item);&nbsp; &nbsp; public bool TryReceiveAll(out IList<(T1, T2)> items)&nbsp; &nbsp; &nbsp; &nbsp; => _output.TryReceiveAll(out items);&nbsp; &nbsp; (T1, T2) ISourceBlock<(T1, T2)>.ConsumeMessage(&nbsp; &nbsp; &nbsp; &nbsp; DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target,&nbsp; &nbsp; &nbsp; &nbsp; out bool messageConsumed)&nbsp; &nbsp; &nbsp; &nbsp; => ((ISourceBlock<(T1, T2)>)_output).ConsumeMessage(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; messageHeader, target, out messageConsumed);&nbsp; &nbsp; void ISourceBlock<(T1, T2)>.ReleaseReservation(&nbsp; &nbsp; &nbsp; &nbsp; DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target)&nbsp; &nbsp; &nbsp; &nbsp; => ((ISourceBlock<(T1, T2)>)_output).ReleaseReservation(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; messageHeader, target);&nbsp; &nbsp; bool ISourceBlock<(T1, T2)>.ReserveMessage(&nbsp; &nbsp; &nbsp; &nbsp; DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target)&nbsp; &nbsp; &nbsp; &nbsp; => ((ISourceBlock<(T1, T2)>)_output).ReserveMessage(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; messageHeader, target);}使用示例:var joinBlock = new SynchronizedJoinBlock<(int, int), (int, int)>(&nbsp; &nbsp; (x, y) => Comparer<int>.Default.Compare(x.Item1, y.Item1));var source1 = new (int, int)[] {(17, 1700), (18, 1800), (19, 1900),&nbsp; &nbsp; (20, 2000), (21, 2100), (22, 2200), (25, 2500), (26, 2600),&nbsp; &nbsp; (27, 2700), (28, 2800), (29, 2900)};var source2 = new (int, int)[] {(15, 1500), (16, 1600), (17, 1700),&nbsp; &nbsp; (18, 1800), (19, 1900), (20, 2000), (21, 2100), (24, 2400),&nbsp; &nbsp; (25, 2500), (26, 2600)};Array.ForEach(source1, x => joinBlock.Target1.Post(x));Array.ForEach(source2, x => joinBlock.Target2.Post(x));joinBlock.Target1.Complete();joinBlock.Target2.Complete();while (joinBlock.OutputAvailableAsync().Result){&nbsp; &nbsp; Console.WriteLine($"> Received: {joinBlock.Receive()}");}输出:收到:((0, 0), (15, 1500))收到:((0, 0), (16, 1600))收到:((17, 1700), (17, 1700))收到:((18 , 1800), (18, 1800))收到: ((19, 1900), (19, 1900))收到: ((20, 2000), (20, 2000))收到: ((21, 2100), ( 21, 2100))收到:((22, 2200), (0, 0))收到:((0, 0), (24, 2400))收到:((25, 2500), (25, 2500))收到:((26, 2600), (26, 2600))收到:((27, 2700), (0, 0))收到:((28, 2800), (0, 0))收到:((29 , 2900), (0, 0))假定传入数据是有序的。这个类与JoinDependencyBlock我之前在一个有点相关的问题中发布的类具有相似的结构。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go