如何并行延迟多个消息?

我正在做一些TCP编程,并且我想模拟一些延迟。


每个“消息”(代表序列化对象的byte [])必须延迟一段时间t。我以为我可以使用一种功能来收集原始消息:


private Queue<byte[]> rawMessages = new Queue<byte[]>();

private void OnDataReceived(object sender, byte[] data)

{

    rawMessages.Enqueue(data);

}

带有while循环的另一种方法,该方法可以连续读取rawMessages并延迟每个循环:


private Queue<byte[]> delayedRawMessages = new Queue<byte[]>();

delayMessagesInTask = Task.Factory.StartNew(() =>

{

    while (true) //TODO: Swap a cancellation token

    {

        if(rawMessages.Count > 0){

            var rawMessage = rawMessages.Dequeue();

            //???

            //Once the message has been delayed, enqueue it into another buffer.

            delayedRawMessages.Enqueue(rawMessage);

        }

    }

});

我以为要延迟每条消息,我可以有另一种方法来生成线程,该线程Thread.Sleep(t)用于等待时间t然后排队delayedRawMessages。我敢肯定这会奏效,但我认为必须有更好的方法。该Thread.Sleep方法的问题在于,消息2可能会在消息1之前完成延迟……我显然需要将消息延迟并以正确的顺序执行,否则我将不会使用TCP。


我正在寻找一种方法来做到这一点,该方法将t尽可能长时间地延迟,并且不会通过降低速度来影响应用程序的其余部分。


这里有人知道更好的方法吗?


江户川乱折腾
浏览 180回答 1
1回答

慕运维8079593

我决定采用生产者/多个消费者的方法。using System.Collections.Concurrent;using System.Collections.Generic;using System.Threading;using System.Threading.Tasks;public class LatencySimulator {&nbsp; &nbsp; public enum SimulatorType { UP, DOWN };&nbsp; &nbsp; public SimulatorType type;&nbsp; &nbsp; public int latency = 0;&nbsp; &nbsp; public int maxConsumers = 50;&nbsp; &nbsp; public BlockingCollection<byte[]> inputQueue = new BlockingCollection<byte[]>(new ConcurrentQueue<byte[]>());&nbsp; &nbsp; public Queue<byte[]> delayedMessagesQueue = new Queue<byte[]>();&nbsp; &nbsp; void CreateConsumers()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; for (int i = 0; i < maxConsumers; i++)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Task.Factory.StartNew(() => Consumer(),TaskCreationOptions.LongRunning);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; private void Consumer()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; foreach (var item in inputQueue.GetConsumingEnumerable())&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Thread.Sleep(latency);&nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; delayedMessagesQueue.Enqueue(item);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}要使用它,请创建一个new LatencySimulator,设置其“类型”,最大使用者和要模拟的等待时间。致电CreateConsmers(),然后填充inputQueue。邮件被延迟后,它们将出现在中delayedMessagesQueue。我真的不知道这是否是实现我的目标的理想方法,但目前仍有效。
打开App,查看更多内容
随时随地看视频慕课网APP