手记

基于kafka-net实现的可以长链接的消息生产者

 今天有点时间,我就来说两句。最近接触的Kafka相关的东西要多一些,其实以前也接触过,但是在项目使用中的经验不是很多。最近公司的项目里面使用了Kafka消息中间件,由于以前的人员编写的客户端的类不是很好,没有设计的概念,就是一个简单类的功能罗列,没有考虑到后期的扩展和维护(以后可能会兼容其他形式的消息队列,要做到无缝衔接),所以这个重构的任务就落到我的身上。

      先说说我的感受,然后再贴出代码的实现吧。我第一次是基于Confluent.Kafka编写的Kafka消息生产者,后来经过测试,同步操作的时间比较长,要完成20万数据发送消息并更新到数据库的时间大概是16-18分钟,这个结果有点让人不能接受。为了提高性能,也做了很多测试,都没有办法解决这个问题。后来抱着试试看的想法,我又基于kafka-net重新实现了Kafka消息的生产者。经过测试,完成同样的任务,时间大概需要3分钟左右。两种实现方法完成同样的任务,都是以同步的方式生产消息,并将消息成功发送到Broker后,再将数据插入到数据库做记录。大家不要纠结为什么这样使用消息队列,这是上头的做法,我还不能做大的改动,我也无奈。

      目前看,基于kafka-net实现的消息生产者在生产消息并发送成功所需要的时间要比基于Confluent.Kafka实现的消息生产者的所需要的时间要少,尤其是发送的数据越多,这个时间的差距越大。具体的原因还不清楚,如果有高手可以不吝赐教。好了,我该上代码了。

      开始代码之前,要说明一点:Confluent.Kafka的Broker是不需要带Http://这个前缀的,但是 kafka-net 的Broker是有http://这个前缀的,大家要注意这个,刚开始的时候我也被坑了一下子。
   

using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;namespace Enterprise.Framework.MessageQueue
{    /// <summary>
    /// 消息生产者的接口定义,所有消息生产者的实现必须继承该接口    /// </summary>
    public interface IMessageProducer
    {        /// <summary>
        /// 将指定的消息内容发送到消息服务器并存放在指定的主题名称里        /// </summary>
        /// <param name="topic">发送消息的主题名称,这个主题就是对消息的分类,不同的主题存放不同的消息,该参数不能为空,空值会抛出异常</param>
        /// <param name="message">需要发送的消息内容,该参数不能为空,空值会抛出异常</param>        
        void Produce(string topic, string message);
    }
}

 1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6  7 namespace Enterprise.Framework.MessageQueue 8 { 9     /// <summary>10     /// Kafka消息生产者的接口定义,所有Kafka消息生产者的实现必须继承该接口11     /// </summary>12     public interface IKafkaMessageProducer : IMessageProducer13     {14         /// <summary>15         /// 将指定的消息内容发送到消息服务器并存放在指定的主题名称里16         /// </summary>17         /// <param name="topic">发送消息的主题名称,这个主题就是对消息的分类,不同的主题存放不同的消息,该参数不能为空,空值会抛出异常</param>18         /// <param name="message">需要发送的消息内容,该参数不能为空,空值会抛出异常</param>19         /// <param name="producedAction">当消息生产完成并成功发送到服务器后,可以对成功生产并发送的消息执行代理所封装方法的操作,默认值为空</param>20         void Produce(string topic, string message, Action<MessageResult> producedAction = null);21     }22 }

 1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Runtime.InteropServices; 5 using System.Text; 6  7 namespace Enterprise.Framework.AbstractInterface 8 { 9     /// <summary>10     /// 该抽象类定义了所有需要释放资源类型的抽象类11     /// </summary>12     public abstract class DisposableBase : IDisposable13     {14         private bool disposed = false;15 16         /// <summary>17         /// 实现IDisposable中的Dispose方法18         /// </summary>19         public void Dispose()20         {21             //必须为true22             Dispose(true);23             //通知垃圾回收机制不再调用终结器(析构器)24             GC.SuppressFinalize(this);25         }26 27         /// <summary>28         /// 不是必要的,提供一个Close方法仅仅是为了更符合其他语言(如C++)的规范29         /// </summary>30         public void Close()31         {32             Dispose();33         }34 35         /// <summary>36         /// 必须,以备程序员忘记了显式调用Dispose方法37         /// </summary>38         ~DisposableBase()39         {40             //必须为false41             Dispose(false);42         }43 44         /// <summary>45         /// 非密封类修饰用protected virtual46         /// 密封类修饰用private47         /// </summary>48         /// <param name="disposing">是否要清理托管资源,true表示需要清理托管资源,false表示不需要清理托管资源</param>49         protected virtual void Dispose(bool disposing)50         {51             if (disposed)52             {53                 return;54             }55             if (disposing)56             {57                 // 清理托管资源                58                 DisposeManagedResources();59             }60             // 清理非托管资源61             DisposeUnmanagedResource();62             //让类型知道自己已经被释放63             disposed = true;64         }65 66         /// <summary>67         /// 释放托管资源68         /// </summary>69         protected abstract void DisposeManagedResources();70 71         /// <summary>72         /// 释放非托管资源73         /// </summary>74         protected abstract void DisposeUnmanagedResource();75     }76 }

  1 using KafkaNet;  2 using KafkaNet.Model;  3 using KafkaNet.Protocol;  4 using System;  5 using System.Collections.Generic;  6 using System.IO;  7 using System.Linq;  8 using System.Text;  9 using System.Threading; 10 using System.Threading.Tasks; 11 using ThreeSoft.Framework.AbstractInterface; 12  13 namespace Enterprise.Framework.MessageQueue 14 { 15     /// <summary> 16     /// Kafka消息生产者具体的实现类,可以针对长链接进行消息发送处理,不用频繁进行消息组件的创建和销毁的工作 17     /// </summary> 18     public sealed class KafkaMessageKeepAliveProducer : DisposableBase, IDisposable, IKafkaMessageProducer 19     { 20         #region 私有字段 21  22         private KafkaNet.Producer _producer; 23         private BrokerRouter _brokerRouter; 24         private string _broker; 25  26         #endregion 27  28         #region 构造函数 29  30         /// <summary> 31         /// 通过构造函数初始化消息队列的服务器 32         /// </summary> 33         /// <param name="broker">消息队列服务器地址,该值不能为空</param> 34         public KafkaMessageKeepAliveProducer(string broker) 35         { 36             if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker)) 37             { 38                 throw new ArgumentNullException("消息队列服务器的地址不可以为空!"); 39             } 40  41             #region kafka-net实现 42  43             Uri[] brokerUriList = null; 44  45             if (broker.IndexOf(',') >= 0) 46             { 47                 string[] brokers = broker.Split(','); 48                 brokerUriList = new Uri[brokers.Length]; 49  50                 for (int i = 0; i < brokers.Length; i++) 51                 { 52                     brokerUriList[i] = new Uri(brokers[i]); 53                 } 54             } 55             else 56             { 57                 brokerUriList = new Uri[] { new Uri(broker) }; 58             } 59  60             var kafkaOptions = new KafkaOptions(brokerUriList); 61             _brokerRouter = new BrokerRouter(kafkaOptions); 62             _producer = new KafkaNet.Producer(_brokerRouter); 63  64             #endregion 65  66             _broker = broker; 67         } 68  69         #endregion 70  71         #region 实例属性 72  73         /// <summary> 74         /// 获取消息服务器的地址 75         /// </summary> 76         public string Broker 77         { 78             get { return _broker; } 79         } 80  81         #endregion 82  83         #region 发送消息的方法 84  85         /// <summary> 86         /// 将指定的消息内容发送到消息服务器并存放在指定的主题名称里 87         /// </summary>         88         /// <param name="topic">发送消息的主题名称,这个主题就是对消息的分类,不同的主题存放不同的消息,该参数不能为空,空值会抛出异常</param> 89         /// <param name="message">需要发送的消息内容,该参数不能为空,空值会抛出异常</param> 90         /// <param name="producedAction">当消息生产完成并成功发送到服务器后,可以对成功生产并发送的消息执行代理所封装方法的操作</param> 91         public void Produce(string topic, string message, Action<MessageResult> producedAction = null) 92         { 93             #region 同步实现 94  95             var currentDatetime = DateTime.Now; 96             var key = currentDatetime.Second.ToString(); 97             var events = new[] { new KafkaNet.Protocol.Message(message, key) }; 98             List<ProduceResponse> result = _producer.SendMessageAsync(topic, events).Result; 99 100             if (producedAction != null && result != null && result.Count > 0)101             {102                 MessageResult messageResult = new MessageResult { Broker = Broker, GroupID = null, Message = message, Offset = result[0].Offset, Partition = result[0].PartitionId, Topic = result[0].Topic };103                 producedAction(messageResult);104             }105 106             #endregion107         }108 109         /// <summary>110         /// 将指定的消息内容发送到消息服务器并存放在指定的主题名称里111         /// </summary>        112         /// <param name="topic">发送消息的主题名称,这个主题就是对消息的分类,不同的主题存放不同的消息,该参数不能为空,空值会抛出异常</param>113         /// <param name="message">需要发送的消息内容,该参数不能为空,空值会抛出异常</param>        114         public void Produce(string topic, string message)115         {116             Produce(topic, message, null);117         }118 119         #endregion120 121         #region 实现消息队列资源的释放122 123         /// <summary>124         /// 析构函数释放资源125         /// </summary>126         ~KafkaMessageKeepAliveProducer()127         {128             Dispose(false);129         }130 131         /// <summary>132         /// 释放托管资源133         /// </summary>134         protected override void DisposeManagedResources()135         {136             if (_producer != null)137             {138                 _producer.Dispose();139             }140             if (_brokerRouter != null)141             {142                 _brokerRouter.Dispose();143             }144         }145 146         /// <summary>147         /// 释放非托管资源148         /// </summary>149         protected override void DisposeUnmanagedResource(){}150 151         #endregion152     }153 }


    好了,今天就写到这里了,每天进步一点点,努力坚持。不忘初心,继续努力吧,欢迎大家前来讨论。

原文出处:https://www.cnblogs.com/PatrickLiu/p/9413626.html

0人推荐
随时随地看视频
慕课网APP