如何在基于 db 的多线程通知/电子邮件发件人中减少 CPU 使用率

我正在尝试开发一个 Windows 服务来向订阅发送通知。数据保存在 SQL 服务器数据库中。


通知是通过向 REST API 端点发出 Web POST 请求来创建的,并保存在数据库表中。


该服务启动一个任务,该任务不断从该数据库表中读取通知并将它们添加到队列中。


该服务还启动了一些任务,这些任务不断从队列中读取并执行实际的发送过程。


代码运行良好并完成了所需的工作,但问题是运行服务时 CPU 使用率为 100%。


我尝试使用 Thread.Sleep 或 Task.Delay 但都没有帮助我减少 CPU 使用率。


我已在此 codeprojct页面中读到,我需要使用等待处理程序并且应该在某些条件下等待。我无法正常工作。


那么谁能建议我可以做些什么来减少EnqueueTask和的CPU使用率DequeueTask?


这是发件人代码:


static class NotificationSender

{

    static ConcurrentQueue<NotificationDelivery> deliveryQueue = null;

    static Task enqueueTask = null;

    static Task[] dequeueTasks = null;


    public static void StartSending(ServiceState serviceState)

    {

        PushService.InitServices();


        enqueueTask = Task.Factory.StartNew(EnqueueTask, serviceState);


        deliveryQueue = new ConcurrentQueue<NotificationDelivery>();


        int dequeueTasksCount = 10;

        dequeueTasks = new Task[dequeueTasksCount];

        for (int i = 0; i < dequeueTasksCount; i++)

        {

            dequeueTasks[i] = Task.Factory.StartNew(DequeueTask, serviceState);

        }

    }


    public static void EnqueueTask(object state)

    {

        ServiceState serviceState = (ServiceState)state;


        using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())

        {

            while (!serviceState.CancellationTokenSource.Token.IsCancellationRequested)

            {

                int toEnqueue = 100 - deliveryQueue.Count;



慕妹3242003
浏览 113回答 1
1回答

绝地无双

我想我终于可以使用等待处理程序和计时器进行良好的更改以维持 CPU 使用率。EnqueueTask如果没有获取到通知,将等待 5 秒,然后再尝试从通知表中获取数据。如果没有获取到通知,它将启动计时器并重置等待句柄。然后计时器经过的回调将设置等待句柄。现在也DequeueTask正在使用等待句柄。如果队列中没有更多项目,它将重置等待句柄以停止出队空队列。EnqueueTask将项目添加到队列时将设置此等待句柄。CPU 使用率现在 <= 10%这是更新的NotificationSender代码:static class NotificationSender{&nbsp; &nbsp; static ConcurrentQueue<NotificationDelivery> deliveryQueue = null;&nbsp; &nbsp; static Task enqueueTask = null;&nbsp; &nbsp; static Task[] dequeueTasks = null;&nbsp; &nbsp; static ManualResetEvent enqueueSignal = null;&nbsp; &nbsp; static ManualResetEvent dequeueSignal = null;&nbsp; &nbsp; static System.Timers.Timer enqueueTimer = null;&nbsp; &nbsp; public static void StartSending(CancellationToken token)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; PushService.InitServices();&nbsp; &nbsp; &nbsp; &nbsp; using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; NotificationDelivery[] queuedDeliveries = db.NotificationDeliveries&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Where(nd => nd.Status == NotificationDeliveryStatus.Queued)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .ToArray();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; foreach (NotificationDelivery delivery in queuedDeliveries)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; delivery.Status = NotificationDeliveryStatus.Pending;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; db.SaveChanges();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; enqueueSignal = new ManualResetEvent(true);&nbsp; &nbsp; &nbsp; &nbsp; dequeueSignal = new ManualResetEvent(false);&nbsp; &nbsp; &nbsp; &nbsp; enqueueTimer = new System.Timers.Timer();&nbsp; &nbsp; &nbsp; &nbsp; enqueueTimer.Elapsed += EnqueueTimerCallback;&nbsp; &nbsp; &nbsp; &nbsp; enqueueTimer.Interval = 5000;&nbsp; &nbsp; &nbsp; &nbsp; enqueueTimer.AutoReset = false;&nbsp; &nbsp; &nbsp; &nbsp; enqueueTimer.Stop();&nbsp; &nbsp; &nbsp; &nbsp; enqueueTask = new Task(EnqueueTask, token, TaskCreationOptions.LongRunning);&nbsp; &nbsp; &nbsp; &nbsp; enqueueTask.Start();&nbsp; &nbsp; &nbsp; &nbsp; deliveryQueue = new ConcurrentQueue<NotificationDelivery>();&nbsp; &nbsp; &nbsp; &nbsp; int dequeueTasksCount = 10;&nbsp; &nbsp; &nbsp; &nbsp; dequeueTasks = new Task[dequeueTasksCount];&nbsp; &nbsp; &nbsp; &nbsp; for (int i = 0; i < dequeueTasksCount; i++)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; dequeueTasks[i] = new Task(DequeueTask, token, TaskCreationOptions.LongRunning);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; dequeueTasks[i].Start();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; public static void EnqueueTimerCallback(Object source, ElapsedEventArgs e)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; enqueueSignal.Set();&nbsp; &nbsp; &nbsp; &nbsp; enqueueTimer.Stop();&nbsp; &nbsp; }&nbsp; &nbsp; public static void EnqueueTask(object state)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; CancellationToken token = (CancellationToken)state;&nbsp; &nbsp; &nbsp; &nbsp; using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (!token.IsCancellationRequested)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (enqueueSignal.WaitOne())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int toEnqueue = 100 - deliveryQueue.Count;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (toEnqueue > 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // fetch some records from db to be enqueued&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; NotificationDelivery[] deliveries = db.NotificationDeliveries&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Include("Subscription")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Include("Notification")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Include("Notification.NotificationLanguages")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Include("Notification.NotificationLanguages.Language")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Where(nd => nd.Status == NotificationDeliveryStatus.Pending && DateTime.Now >= nd.StartSendingAt)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .OrderBy(nd => nd.StartSendingAt)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Take(toEnqueue)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .ToArray();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; foreach (NotificationDelivery delivery in deliveries)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; delivery.Status = NotificationDeliveryStatus.Queued;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; deliveryQueue.Enqueue(delivery);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (deliveries.Length > 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // save Queued state, so not fetched again the next loop&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; db.SaveChanges();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // signal the DequeueTask&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; dequeueSignal.Set();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // no more notifications, wait 5 seconds before try fetching again&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; enqueueSignal.Reset();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; enqueueTimer.Start();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // save any changes made by the DequeueTask&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // an event may be used here to know if any changes made&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; db.SaveChanges();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Task.WaitAll(dequeueTasks);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; db.SaveChanges();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; public async static void DequeueTask(object state)&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; CancellationToken token = (CancellationToken)state;&nbsp; &nbsp; &nbsp; &nbsp; while (!token.IsCancellationRequested)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (dequeueSignal.WaitOne()) // block untill we have items in the queue&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; NotificationDelivery delivery = null;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (deliveryQueue.TryDequeue(out delivery))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; NotificationDeliveryStatus ns = NotificationDeliveryStatus.Pending;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (delivery.Subscription.Status == SubscriptionStatus.Subscribed)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; PushResult result = await PushService.DoPushAsync(delivery);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; switch (result)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case PushResult.Pushed:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ns = NotificationDeliveryStatus.Delivered;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case PushResult.Error:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ns = NotificationDeliveryStatus.FailureError;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case PushResult.NotSupported:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ns = NotificationDeliveryStatus.FailureNotSupported;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case PushResult.UnSubscribed:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ns = NotificationDeliveryStatus.FailureUnSubscribed;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; delivery.Subscription.Status = SubscriptionStatus.UnSubscribed;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ns = NotificationDeliveryStatus.FailureUnSubscribed;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; delivery.Status = ns;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; delivery.DeliveredAt = DateTime.Now;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // empty queue, no more items&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // stop dequeueing untill new items added by EnqueueTask&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; dequeueSignal.Reset();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; public static void Wait()&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; Task.WaitAll(enqueueTask);&nbsp; &nbsp; &nbsp; &nbsp; Task.WaitAll(dequeueTasks);&nbsp; &nbsp; &nbsp; &nbsp; enqueueTask.Dispose();&nbsp; &nbsp; &nbsp; &nbsp; for(int i = 0; i < dequeueTasks.Length; i++)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; dequeueTasks[i].Dispose();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}
打开App,查看更多内容
随时随地看视频慕课网APP