C# 并发:使用多个 AutoResetEvent 是个好主意吗?

假设有很多线程调用Do(),只有一个工作线程处理实际工作。


void Do(Job job)

{

    concurrentQueue.Enqueue(job);

    // wait for job done

}


void workerThread()

{

    while (true)

    {

        Job job;

        if (concurrentQueue.TryDequeue(out job))

        {

            // do job

        }

    }

}

Do() 应该等到作业完成后再返回。所以我写了下面的代码:


class Task 

{

    public Job job;

    public AutoResetEvent ev;

}


void Do(Job job)

{

    using (var ev = new AutoResetEvent(false))

    {

        concurrentQueue.Enqueue(new Task { job = job, ev = ev }));

        ev.WaitOne();

    }

}


void workerThread()

{

    while (true)

    {

        Task task;

        if (concurrentQueue.TryDequeue(out task))

        {

            // do job

            task.ev.Set();

        }

    }

}

经过一些测试,我发现它按预期工作。但是我不确定这是分配许多 AutoResetEvents 的好方法,还是有更好的方法来完成?


汪汪一只猫
浏览 114回答 3
3回答

ABOUTYOU

由于所有客户端都必须等待一个线程来完成这项工作,因此实际上没有必要使用队列。所以我建议改用这个Monitor类,特别是Wait/Pulse功能。虽然它有点低级和冗长。class Worker<TResult> : IDisposable{&nbsp; &nbsp; private readonly object _outerLock = new object();&nbsp; &nbsp; private readonly object _innerLock = new object();&nbsp; &nbsp; private Func<TResult> _currentJob;&nbsp; &nbsp; private TResult _currentResult;&nbsp; &nbsp; private Exception _currentException;&nbsp; &nbsp; private bool _disposed;&nbsp; &nbsp; public Worker()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; var thread = new Thread(MainLoop);&nbsp; &nbsp; &nbsp; &nbsp; thread.IsBackground = true;&nbsp; &nbsp; &nbsp; &nbsp; thread.Start();&nbsp; &nbsp; }&nbsp; &nbsp; private void MainLoop()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; lock (_innerLock)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (true)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Monitor.Wait(_innerLock); // Wait for client requests&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (_disposed) break;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _currentResult = _currentJob.Invoke();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _currentException = null;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; catch (Exception ex)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _currentException = ex;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _currentResult = default;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Monitor.Pulse(_innerLock); // Notify the waiting client that the job is done&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; } // We are done&nbsp; &nbsp; }&nbsp; &nbsp; public TResult DoWork(Func<TResult> job)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; TResult result;&nbsp; &nbsp; &nbsp; &nbsp; Exception exception;&nbsp; &nbsp; &nbsp; &nbsp; lock (_outerLock) // Accept only one client at a time&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; lock (_innerLock) // Acquire inner lock&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (_disposed) throw new InvalidOperationException();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _currentJob = job;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Monitor.Pulse(_innerLock); // Notify worker thread about the new job&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Monitor.Wait(_innerLock); // Wait for worker thread to process the job&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; result = _currentResult;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; exception = _currentException;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Clean up&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _currentJob = null;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _currentResult = default;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _currentException = null;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; // Throw the exception, if occurred, preserving the stack trace&nbsp; &nbsp; &nbsp; &nbsp; if (exception != null) ExceptionDispatchInfo.Capture(exception).Throw();&nbsp; &nbsp; &nbsp; &nbsp; return result;&nbsp; &nbsp; }&nbsp; &nbsp; public void Dispose()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; lock (_outerLock)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; lock (_innerLock)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _disposed = true;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Monitor.Pulse(_innerLock); // Notify worker thread to exit loop&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}使用示例:var worker = new Worker<int>();int result = worker.DoWork(() => 1); // Accepts a function as argumentConsole.WriteLine($"Result: {result}");worker.Dispose();输出:Result: 1更新:之前的解决方案对等待不友好,所以这里有一个允许适当等待的解决方案。它TaskCompletionSource为每个作业使用一个,存储在一个BlockingCollection.class Worker<TResult> : IDisposable{&nbsp; &nbsp; private BlockingCollection<TaskCompletionSource<TResult>> _blockingCollection&nbsp; &nbsp; &nbsp; &nbsp; = new BlockingCollection<TaskCompletionSource<TResult>>();&nbsp; &nbsp; public Worker()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; var thread = new Thread(MainLoop);&nbsp; &nbsp; &nbsp; &nbsp; thread.IsBackground = true;&nbsp; &nbsp; &nbsp; &nbsp; thread.Start();&nbsp; &nbsp; }&nbsp; &nbsp; private void MainLoop()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; foreach (var tcs in _blockingCollection.GetConsumingEnumerable())&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var job = (Func<TResult>)tcs.Task.AsyncState;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var result = job.Invoke();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tcs.SetResult(result);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; catch (Exception ex)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tcs.TrySetException(ex);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; public Task<TResult> DoWorkAsync(Func<TResult> job)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; var tcs = new TaskCompletionSource<TResult>(job,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; TaskCreationOptions.RunContinuationsAsynchronously);&nbsp; &nbsp; &nbsp; &nbsp; _blockingCollection.Add(tcs);&nbsp; &nbsp; &nbsp; &nbsp; return tcs.Task;&nbsp; &nbsp; }&nbsp; &nbsp; public TResult DoWork(Func<TResult> job) // Synchronous call&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; var task = DoWorkAsync(job);&nbsp; &nbsp; &nbsp; &nbsp; try { task.Wait(); } catch { } // Swallow the AggregateException&nbsp; &nbsp; &nbsp; &nbsp; // Throw the original exception, if occurred, preserving the stack trace&nbsp; &nbsp; &nbsp; &nbsp; if (task.IsFaulted) ExceptionDispatchInfo.Capture(task.Exception.InnerException).Throw();&nbsp; &nbsp; &nbsp; &nbsp; return task.Result;&nbsp; &nbsp; }&nbsp; &nbsp; public void Dispose()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; _blockingCollection.CompleteAdding();&nbsp; &nbsp; }}使用示例var worker = new Worker<int>();int result = await worker.DoWorkAsync(() => 1); // Accepts a function as argumentConsole.WriteLine($"Result: {result}");worker.Dispose();输出:Result: 1

Smart猫小萌

从同步的角度来看,这工作正常。但是这样做似乎没有用。如果你想一个接一个地执行作业,你可以使用锁:lock&nbsp;(lockObject)&nbsp;{ &nbsp;&nbsp;RunJob(); }您对这段代码的意图是什么?还有一个效率问题,因为每个任务都会创建一个操作系统事件并等待它。如果您使用更现代TaskCompletionSource的,如果您同步等待该任务,这将在引擎盖下使用相同的东西。您可以使用异步等待 (&nbsp;await myTCS.Task;) 来稍微提高效率。当然,这会用 async/await 感染整个调用堆栈。如果这是一个相当低的交易量操作,您将不会获得太多收益。

吃鸡游戏

总的来说,我认为可行,尽管当您说“许多”线程正在调用 Do() 时,这可能无法很好地扩展……挂起的线程使用资源。这段代码的另一个问题是,在空闲时间,您将在“workerThread”中出现“硬循环”,这将导致您的应用程序返回高 CPU 使用率时间。您可能希望将此代码添加到“workerThread”:if&nbsp;(concurrentQueue.IsEmpty)&nbsp;Thread.Sleep(1);您可能还想为 WaitOne 调用引入超时以避免日志堵塞。
打开App,查看更多内容
随时随地看视频慕课网APP