手记

.NetCore利用BlockingCollection实现简易消息队列

 

消息队列现今的应用场景越来越大,常用的有RabbmitMQ和KafKa。
我们用BlockingCollection来实现简单的消息队列。


实现消息队列

用Vs2017创建一个控制台应用程序。创建DemoQueueBlock类,封装一些常用判断。

HasEle,判断是否有元素

Add向队列中添加元素

Take从队列中取出元素

为了不把BlockingCollection直接暴漏给使用者,我们封装一个DemoQueueBlock类

    /// <summary>
    /// BlockingCollection演示消息队列
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class DemoQueueBlock<T> where T : class
    {
        private static BlockingCollection<T> Colls;        public DemoQueueBlock()
        {

        }        public static bool IsComleted() {            if (Colls != null && Colls.IsCompleted) {                return true;
            }            return false;
        }        public static bool HasEle()
        {            if (Colls != null && Colls.Count>0)
            {                return true;
            }            return false;
        }       
        public static bool Add(T msg)
        {            if (Colls == null)
            {
                Colls = new BlockingCollection<T>();
            }
            Colls.Add(msg);            return true;
        }        public static T Take()
        {            if (Colls == null)
            {
                Colls = new BlockingCollection<T>();
            }            return Colls.Take();
        }
    }    /// <summary>
    /// 消息体
    /// </summary>
    public class DemoMessage
    {
        public string BusinessType { get; set; }        public string BusinessId { get; set; }        public string Body { get; set; }
    }

添加元素进队列

通过控制台,添加元素

           //添加元素            while (true)
            {
                Console.WriteLine("请输入队列");
                var read = Console.ReadLine();                if (read == "exit")
                {                    return;
                }

                DemoQueueBlock<DemoMessage>.Add(new DemoMessage() { BusinessId = read });
            }

消费队列

通过判断IsComleted,来确定是否获取队列

  Task.Factory.StartNew(() =>
            {                //从队列中取元素。
                while (!DemoQueueBlock<DemoMessage>.IsComleted())
                {                    try
                    {                        var m = DemoQueueBlock<DemoMessage>.Take();
                      Console.WriteLine("已消费:" + m.BusinessId);
                    }                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }
                }
            });

查看运行结果

运行结果

这样我们就实现了简易的消息队列。

示例源码

简易队列

参考链接

BlockingCollection
Orleans源码分析

原文出处:https://www.cnblogs.com/fancunwei/p/9568291.html

0人推荐
随时随地看视频
慕课网APP