猿问

C# 带槽的多线程

我有一个检查代理服务器的函数,目前它仅检查多个线程并等待所有线程完成,直到下一组线程开始。是否可以在允许的最大线程完成后立即启动一个新线程?


for (int i = 0; i < listProxies.Count(); i+=nThreadsNum)

{                              

    for (nCurrentThread = 0; nCurrentThread < nThreadsNum; nCurrentThread++)

    {

        if (nCurrentThread < nThreadsNum)

        {

           string strProxyIP = listProxies[i + nCurrentThread].sIPAddress;

           int nPort = listProxies[i + nCurrentThread].nPort;

                    tasks.Add(Task.Factory.StartNew<ProxyAddress>(() => CheckProxyServer(strProxyIP, nPort, nCurrentThread)));

        }

     }                


     Task.WaitAll(tasks.ToArray());


     foreach (var tsk in tasks)

     {

        ProxyAddress result = tsk.Result;

        UpdateProxyDBRecord(result.sIPAddress, result.bOnlineStatus);

     }


     tasks.Clear();                

}


莫回无
浏览 114回答 4
4回答

素胚勾勒不出你

这看起来简单得多:int numberProcessed = 0;Parallel.ForEach(listProxies,&nbsp; new ParallelOptions { MaxDegreeOfParallelism = nThreadsNum },&nbsp; (p)=> {&nbsp; &nbsp; var result = CheckProxyServer(p.sIPAddress, s.nPort, Thread.CurrentThread.ManagedThreadId);&nbsp; &nbsp; UpdateProxyDBRecord(result.sIPAddress, result.bOnlineStatus);&nbsp; &nbsp; Interlocked.Increment(numberProcessed);});带插槽:var obj = new Object();var slots = new List<int>();Parallel.ForEach(listProxies,&nbsp; new ParallelOptions { MaxDegreeOfParallelism = nThreadsNum },&nbsp; (p)=> {&nbsp; &nbsp; int threadId = Thread.CurrentThread.ManagedThreadId;&nbsp; &nbsp; int slot = slots.IndexOf(threadId);&nbsp; &nbsp; if (slot == -1)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; lock(obj)&nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; slots.Add(threadId);&nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; slot = slots.IndexOf(threadId);&nbsp; &nbsp; }&nbsp; &nbsp; var result = CheckProxyServer(p.sIPAddress, s.nPort, slot);&nbsp; &nbsp; UpdateProxyDBRecord(result.sIPAddress, result.bOnlineStatus);});我在那里采取了一些捷径来保证线程安全。您不必执行正常的检查-锁定-检查舞蹈,因为永远不会有两个线程尝试将相同的 threadid 添加到列表中,因此第二次检查将始终失败并且不需要。其次,出于同样的原因,我认为您也不需要锁定外部 IndexOf 。这使得它成为一个非常高效的并发例程,无论可枚举中有多少项,都很少锁定(它应该只锁定 nThreadsNum 次)。

人到中年有点甜

另一个解决方案是使用 aSemaphoreSlim或使用 的生产者-消费者模式BlockinCollection<T>。两种解决方案都支持取消。信号量瘦身private async Task CheckProxyServerAsync(IEnumerable<object> proxies){  var tasks = new List<Task>();  int currentThreadNumber = 0;  int maxNumberOfThreads = 8;  using (semaphore = new SemaphoreSlim(maxNumberOfThreads, maxNumberOfThreads))  {    foreach (var proxy in proxies)    {      // Asynchronously wait until thread is available if thread limit reached      await semaphore.WaitAsync();      string proxyIP = proxy.IPAddress;      int port = proxy.Port;      tasks.Add(Task.Run(() => CheckProxyServer(proxyIP, port, Interlocked.Increment(ref currentThreadNumber)))        .ContinueWith(          (task) =>          {            ProxyAddress result = task.Result;            // Method call must be thread-safe!            UpdateProxyDbRecord(result.IPAddress, result.OnlineStatus);            Interlocked.Decrement(ref currentThreadNumber);            // Allow to start next thread if thread limit was reached            semaphore.Release();          },          TaskContinuationOptions.OnlyOnRanToCompletion));    }    // Asynchronously wait until all tasks are completed    // to prevent premature disposal of semaphore    await Task.WhenAll(tasks);  }}生产者-消费者模式// Uses a fixed number of same threadsprivate async Task CheckProxyServerAsync(IEnumerable<ProxyInfo> proxies){  var pipe = new BlockingCollection<ProxyInfo>();  int maxNumberOfThreads = 8;  var tasks = new List<Task>();  // Create all threads (count == maxNumberOfThreads)  for (int currentThreadNumber = 0; currentThreadNumber < maxNumberOfThreads; currentThreadNumber++)  {    tasks.Add(      Task.Run(() => ConsumeProxyInfo(pipe, currentThreadNumber)));  }  proxies.ToList().ForEach(pipe.Add);  pipe.CompleteAdding();  await Task.WhenAll(tasks);}private void ConsumeProxyInfo(BlockingCollection<ProxyInfo> proxiesPipe, int currentThreadNumber){  while (!proxiesPipe.IsCompleted)  {    if (proxiesPipe.TryTake(out ProxyInfo proxy))    {      int port = proxy.Port;      string proxyIP = proxy.IPAddress;      ProxyAddress result = CheckProxyServer(proxyIP, port, currentThreadNumber);       // Method call must be thread-safe!      UpdateProxyDbRecord(result.IPAddress, result.OnlineStatus);    }  }}

30秒到达战场

我建议稍微改变一下你的方法。不要启动和停止线程,而是将代理服务器数据放入并发队列中,每个代理服务器对应一个项目。然后创建固定数量的线程(或异步任务)来处理队列。在我看来,这更有可能提供平稳的性能(您不会一遍又一遍地启动和停止线程,这会产生开销)并且更容易编码。一个简单的例子:class ProxyChecker{&nbsp; &nbsp; private ConcurrentQueue<ProxyInfo> _masterQueue = new ConcurrentQueue<ProxyInfo>();&nbsp; &nbsp; public ProxyChecker(IEnumerable<ProxyInfo> listProxies)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; foreach (var proxy in listProxies)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _masterQueue.Enqueue(proxy);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; public async Task RunChecks(int maximumConcurrency)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; var count = Math.Max(maximumConcurrency, _masterQueue.Count);&nbsp; &nbsp; &nbsp; &nbsp; var tasks = Enumerable.Range(0, count).Select( i => WorkerTask() ).ToList();&nbsp; &nbsp; &nbsp; &nbsp; await Task.WhenAll(tasks);&nbsp; &nbsp; }&nbsp; &nbsp; private async Task WorkerTask()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; ProxyInfo proxyInfo;&nbsp; &nbsp; &nbsp; &nbsp; while ( _masterList.TryDequeue(out proxyInfo))&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; DoTheTest(proxyInfo.IP, proxyInfo.Port)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}&nbsp;

胡说叔叔

如果我正确理解你的问题,这实际上是相当简单的await Task.WhenAny。基本上,您保留所有正在运行的任务的集合。一旦运行的任务达到一定数量,您将等待一个或多个任务完成,然后从集合中删除已完成的任务并继续添加更多任务。下面是我的意思的一个例子:&nbsp; &nbsp; &nbsp; &nbsp; var tasks = new List<Task>();&nbsp; &nbsp; &nbsp; &nbsp; for (int i = 0; i < 20; i++)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // I want my list of tasks to contain at most 5 tasks at once&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (tasks.Count == 5)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Wait for at least one of the tasks to complete&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; await Task.WhenAny(tasks.ToArray());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Remove all of the completed tasks from the list&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tasks = tasks.Where(t => !t.IsCompleted).ToList();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Add some task to the list&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tasks.Add(Task.Factory.StartNew(async delegate ()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; await Task.Delay(1000);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }));&nbsp; &nbsp; &nbsp; &nbsp; }
随时随地看视频慕课网APP
我要回答