查看: 1182|回复: 0

[ASP.NET教程] 基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池)

发表于 2018-4-25 08:00:02

一、本文产生原由:  

 之前文章《总结消息队列RabbitMQ的基本用法》已对RabbitMQ的安装、用法都做了详细说明,而本文主要是针对在高并发且单次从RabbitMQ中消费消息时,出现了连接数不足、连接响应较慢、RabbitMQ服务器崩溃等各种性能问题的解方案,之所以会出现我列举的这些问题,究基根源,其实是TCP连接创建与断开太过频繁所致,这与我们使用ADO.NET来访问常规的关系型DB(如:SQL SERVER、MYSQL)有所不同,在访问DB时,我们一般都建议大家使用using包裹,目的是每次创建完DB连接,使用完成后自动释放连接,避免不必要的连接数及资源占用。可能有人会问,为何访问DB,可以每次创建再断开连接,都没有问题,而同样访问MQ(本文所指的MQ均是RabbitMQ),每次创建再断开连接,如果在高并发且创建与断开频率高的时候,会出现性能问题呢?其实如果了解了DB的连接创建与断开以及MQ的连接创建与断开原理就知道其中的区别了。这里我简要说明一下,DB连接与MQ连接 其实底层都是基于TCP连接,创建TCP连接肯定是有资源消耗的,是非常昂贵的,原则上尽可能少的去创建与断开TCP连接,DB创建连接、MQ创建连接可以说是一样的,但在断开销毁连接上就有很大的不同,DB创建连接再断开时,默认情况下是把该连接回收到连接池中,下次如果再有DB连接创建请求,则先判断DB连接池中是否有空闲的连接,若有则直接复用,若没有才创建连接,这样就达到了TCP连接的复用,而MQ创建连接都是新创建的TCP连接,断开时则直接断开TCP连接,简单粗暴,看似资源清理更彻底,但若在高并发高频率每次都重新创建与断开MQ连接,则性能只会越来越差(上面说过TCP连接是非常昂贵的),我在公司项目中就出现了该问题,后面在技术总监的指导下,对MQ的连接创建与断开作了优化,实现了类似DB连接池的概念。

连接池,故名思义,连接的池子,所有的连接作为一种资源集中存放在池中,需要使用时就可以到池中获取空闲连接资源,用完后再放回池中,以此达到连接资源的有效重用,同时也控制了资源的过度消耗与浪费(资源多少取决于池子的容量)

二、源代码奉献(可直接复制应用到大家的项目中)

下面就先贴出实现MQHelper(含连接池)的源代码:

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. using RabbitMQ.Util;
  7. using RabbitMQ.Client;
  8. using RabbitMQ.Client.Events;
  9. using System.Web.Caching;
  10. using System.Web;
  11. using System.Configuration;
  12. using System.IO;
  13. using System.Collections.Concurrent;
  14. using System.Threading;
  15. using System.Runtime.CompilerServices;
  16. namespace Zuowj.Core
  17. {
  18. public class MQHelper
  19. {
  20. private const string CacheKey_MQConnectionSetting = "MQConnectionSetting";
  21. private const string CacheKey_MQMaxConnectionCount = "MQMaxConnectionCount";
  22. private readonly static ConcurrentQueue<IConnection> FreeConnectionQueue;//空闲连接对象队列
  23. private readonly static ConcurrentDictionary<IConnection, bool> BusyConnectionDic;//使用中(忙)连接对象集合
  24. private readonly static ConcurrentDictionary<IConnection, int> MQConnectionPoolUsingDicNew;//连接池使用率
  25. private readonly static Semaphore MQConnectionPoolSemaphore;
  26. private readonly static object freeConnLock = new object(), addConnLock = new object();
  27. private static int connCount = 0;
  28. public const int DefaultMaxConnectionCount = 30;//默认最大保持可用连接数
  29. public const int DefaultMaxConnectionUsingCount = 10000;//默认最大连接可访问次数
  30. private static int MaxConnectionCount
  31. {
  32. get
  33. {
  34. if (HttpRuntime.Cache[CacheKey_MQMaxConnectionCount] != null)
  35. {
  36. return Convert.ToInt32(HttpRuntime.Cache[CacheKey_MQMaxConnectionCount]);
  37. }
  38. else
  39. {
  40. int mqMaxConnectionCount = 0;
  41. string mqMaxConnectionCountStr = ConfigurationManager.AppSettings[CacheKey_MQMaxConnectionCount];
  42. if (!int.TryParse(mqMaxConnectionCountStr, out mqMaxConnectionCount) || mqMaxConnectionCount <= 0)
  43. {
  44. mqMaxConnectionCount = DefaultMaxConnectionCount;
  45. }
  46. string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config");
  47. HttpRuntime.Cache.Insert(CacheKey_MQMaxConnectionCount, mqMaxConnectionCount, new CacheDependency(appConfigPath));
  48. return mqMaxConnectionCount;
  49. }
  50. }
  51. }
  52. /// <summary>
  53. /// 建立连接
  54. /// </summary>
  55. /// <param name="hostName">服务器地址</param>
  56. /// <param name="userName">登录账号</param>
  57. /// <param name="passWord">登录密码</param>
  58. /// <returns></returns>
  59. private static ConnectionFactory CrateFactory()
  60. {
  61. var mqConnectionSetting = GetMQConnectionSetting();
  62. var connectionfactory = new ConnectionFactory();
  63. connectionfactory.HostName = mqConnectionSetting[0];
  64. connectionfactory.UserName = mqConnectionSetting[1];
  65. connectionfactory.Password = mqConnectionSetting[2];
  66. if (mqConnectionSetting.Length > 3) //增加端口号
  67. {
  68. connectionfactory.Port = Convert.ToInt32(mqConnectionSetting[3]);
  69. }
  70. return connectionfactory;
  71. }
  72. private static string[] GetMQConnectionSetting()
  73. {
  74. string[] mqConnectionSetting = null;
  75. if (HttpRuntime.Cache[CacheKey_MQConnectionSetting] == null)
  76. {
  77. //MQConnectionSetting=Host IP|;userid;|;password
  78. string mqConnSettingStr = ConfigurationManager.AppSettings[CacheKey_MQConnectionSetting];
  79. if (!string.IsNullOrWhiteSpace(mqConnSettingStr))
  80. {
  81. mqConnSettingStr = EncryptUtility.Decrypt(mqConnSettingStr);//解密MQ连接字符串,若项目中无此需求可移除,EncryptUtility是一个AES的加解密工具类,大家网上可自行查找
  82. if (mqConnSettingStr.Contains(";|;"))
  83. {
  84. mqConnectionSetting = mqConnSettingStr.Split(new[] { ";|;" }, StringSplitOptions.RemoveEmptyEntries);
  85. }
  86. }
  87. if (mqConnectionSetting == null || mqConnectionSetting.Length < 3)
  88. {
  89. throw new Exception("MQConnectionSetting未配置或配置不正确");
  90. }
  91. string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config");
  92. HttpRuntime.Cache.Insert(CacheKey_MQConnectionSetting, mqConnectionSetting, new CacheDependency(appConfigPath));
  93. }
  94. else
  95. {
  96. mqConnectionSetting = HttpRuntime.Cache[CacheKey_MQConnectionSetting] as string[];
  97. }
  98. return mqConnectionSetting;
  99. }
  100. public static IConnection CreateMQConnection()
  101. {
  102. var factory = CrateFactory();
  103. factory.AutomaticRecoveryEnabled = true;//自动重连
  104. var connection = factory.CreateConnection();
  105. connection.AutoClose = false;
  106. return connection;
  107. }
  108. static MQHelper()
  109. {
  110. FreeConnectionQueue = new ConcurrentQueue<IConnection>();
  111. BusyConnectionDic = new ConcurrentDictionary<IConnection, bool>();
  112. MQConnectionPoolUsingDicNew = new ConcurrentDictionary<IConnection, int>();//连接池使用率
  113. MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, "MQConnectionPoolSemaphore");//信号量,控制同时并发可用线程数
  114. }
  115. public static IConnection CreateMQConnectionInPoolNew()
  116. {
  117. SelectMQConnectionLine:
  118. MQConnectionPoolSemaphore.WaitOne();//当<MaxConnectionCount时,会直接进入,否则会等待直到空闲连接出现
  119. IConnection mqConnection = null;
  120. if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//如果已有连接数小于最大可用连接数,则直接创建新连接
  121. {
  122. lock (addConnLock)
  123. {
  124. if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)
  125. {
  126. mqConnection = CreateMQConnection();
  127. BusyConnectionDic[mqConnection] = true;//加入到忙连接集合中
  128. MQConnectionPoolUsingDicNew[mqConnection] = 1;
  129. // BaseUtil.Logger.DebugFormat("Create a MQConnection:{0},FreeConnectionCount:小贝, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
  130. return mqConnection;
  131. }
  132. }
  133. }
  134. if (!FreeConnectionQueue.TryDequeue(out mqConnection)) //如果没有可用空闲连接,则重新进入等待排队
  135. {
  136. // BaseUtil.Logger.DebugFormat("no FreeConnection,FreeConnectionCount:{0}, BusyConnectionCount:小贝", FreeConnectionQueue.Count, BusyConnectionDic.Count);
  137. goto SelectMQConnectionLine;
  138. }
  139. else if (MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen) //如果取到空闲连接,判断是否使用次数是否超过最大限制,超过则释放连接并重新创建
  140. {
  141. mqConnection.Close();
  142. mqConnection.Dispose();
  143. // BaseUtil.Logger.DebugFormat("close > DefaultMaxConnectionUsingCount mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:小贝", FreeConnectionQueue.Count, BusyConnectionDic.Count);
  144. mqConnection = CreateMQConnection();
  145. MQConnectionPoolUsingDicNew[mqConnection] = 0;
  146. // BaseUtil.Logger.DebugFormat("create new mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:小贝", FreeConnectionQueue.Count, BusyConnectionDic.Count);
  147. }
  148. BusyConnectionDic[mqConnection] = true;//加入到忙连接集合中
  149. MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1;//使用次数加1
  150. // BaseUtil.Logger.DebugFormat("set BusyConnectionDic:{0},FreeConnectionCount:小贝, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
  151. return mqConnection;
  152. }
  153. private static void ResetMQConnectionToFree(IConnection connection)
  154. {
  155. lock (freeConnLock)
  156. {
  157. bool result = false;
  158. if (BusyConnectionDic.TryRemove(connection, out result)) //从忙队列中取出
  159. {
  160. // BaseUtil.Logger.DebugFormat("set FreeConnectionQueue:{0},FreeConnectionCount:小贝, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
  161. }
  162. else
  163. {
  164. // BaseUtil.Logger.DebugFormat("failed TryRemove BusyConnectionDic:{0},FreeConnectionCount:小贝, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
  165. }
  166. if (FreeConnectionQueue.Count + BusyConnectionDic.Count > MaxConnectionCount)//如果因为高并发出现极少概率的>MaxConnectionCount,则直接释放该连接
  167. {
  168. connection.Close();
  169. connection.Dispose();
  170. }
  171. else
  172. {
  173. FreeConnectionQueue.Enqueue(connection);//加入到空闲队列,以便持续提供连接服务
  174. }
  175. MQConnectionPoolSemaphore.Release();//释放一个空闲连接信号
  176. //Interlocked.Decrement(ref connCount);
  177. //BaseUtil.Logger.DebugFormat("Enqueue FreeConnectionQueue:{0},FreeConnectionCount:小贝, BusyConnectionCount:{2},thread count:{3}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count,connCount);
  178. }
  179. }
  180. /// <summary>
  181. /// 发送消息
  182. /// </summary>
  183. /// <param name="connection">消息队列连接对象</param>
  184. /// <typeparam name="T">消息类型</typeparam>
  185. /// <param name="queueName">队列名称</param>
  186. /// <param name="durable">是否持久化</param>
  187. /// <param name="msg">消息</param>
  188. /// <returns></returns>
  189. public static string SendMsg(IConnection connection, string queueName, string msg, bool durable = true)
  190. {
  191. try
  192. {
  193. using (var channel = connection.CreateModel())//建立通讯信道
  194. {
  195. // 参数从前面开始分别意思为:队列名称,是否持久化,独占的队列,不使用时是否自动删除,其他参数
  196. channel.QueueDeclare(queueName, durable, false, false, null);
  197. var properties = channel.CreateBasicProperties();
  198. properties.DeliveryMode = 2;//1表示不持久,2.表示持久化
  199. if (!durable)
  200. properties = null;
  201. var body = Encoding.UTF8.GetBytes(msg);
  202. channel.BasicPublish("", queueName, properties, body);
  203. }
  204. return string.Empty;
  205. }
  206. catch (Exception ex)
  207. {
  208. return ex.ToString();
  209. }
  210. finally
  211. {
  212. ResetMQConnectionToFree(connection);
  213. }
  214. }
  215. /// <summary>
  216. /// 消费消息
  217. /// </summary>
  218. /// <param name="connection">消息队列连接对象</param>
  219. /// <param name="queueName">队列名称</param>
  220. /// <param name="durable">是否持久化</param>
  221. /// <param name="dealMessage">消息处理函数</param>
  222. /// <param name="saveLog">保存日志方法,可选</param>
  223. public static void ConsumeMsg(IConnection connection, string queueName, bool durable, Func<string, ConsumeAction> dealMessage, Action<string, Exception> saveLog = null)
  224. {
  225. try
  226. {
  227. using (var channel = connection.CreateModel())
  228. {
  229. channel.QueueDeclare(queueName, durable, false, false, null); //获取队列
  230. channel.BasicQos(0, 1, false); //分发机制为触发式
  231. var consumer = new QueueingBasicConsumer(channel); //建立消费者
  232. // 从左到右参数意思分别是:队列名称、是否读取消息后直接删除消息,消费者
  233. channel.BasicConsume(queueName, false, consumer);
  234. while (true) //如果队列中有消息
  235. {
  236. ConsumeAction consumeResult = ConsumeAction.RETRY;
  237. var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //获取消息
  238. string message = null;
  239. try
  240. {
  241. var body = ea.Body;
  242. message = Encoding.UTF8.GetString(body);
  243. consumeResult = dealMessage(message);
  244. }
  245. catch (Exception ex)
  246. {
  247. if (saveLog != null)
  248. {
  249. saveLog(message, ex);
  250. }
  251. }
  252. if (consumeResult == ConsumeAction.ACCEPT)
  253. {
  254. channel.BasicAck(ea.DeliveryTag, false); //消息从队列中删除
  255. }
  256. else if (consumeResult == ConsumeAction.RETRY)
  257. {
  258. channel.BasicNack(ea.DeliveryTag, false, true); //消息重回队列
  259. }
  260. else
  261. {
  262. channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丢弃
  263. }
  264. }
  265. }
  266. }
  267. catch (Exception ex)
  268. {
  269. if (saveLog != null)
  270. {
  271. saveLog("QueueName:" + queueName, ex);
  272. }
  273. throw ex;
  274. }
  275. finally
  276. {
  277. ResetMQConnectionToFree(connection);
  278. }
  279. }
  280. /// <summary>
  281. /// 依次获取单个消息
  282. /// </summary>
  283. /// <param name="connection">消息队列连接对象</param>
  284. /// <param name="QueueName">队列名称</param>
  285. /// <param name="durable">持久化</param>
  286. /// <param name="dealMessage">处理消息委托</param>
  287. public static void ConsumeMsgSingle(IConnection connection, string QueueName, bool durable, Func<string, ConsumeAction> dealMessage)
  288. {
  289. try
  290. {
  291. using (var channel = connection.CreateModel())
  292. {
  293. channel.QueueDeclare(QueueName, durable, false, false, null); //获取队列
  294. channel.BasicQos(0, 1, false); //分发机制为触发式
  295. uint msgCount = channel.MessageCount(QueueName);
  296. if (msgCount > 0)
  297. {
  298. var consumer = new QueueingBasicConsumer(channel); //建立消费者
  299. // 从左到右参数意思分别是:队列名称、是否读取消息后直接删除消息,消费者
  300. channel.BasicConsume(QueueName, false, consumer);
  301. ConsumeAction consumeResult = ConsumeAction.RETRY;
  302. var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //获取消息
  303. try
  304. {
  305. var body = ea.Body;
  306. var message = Encoding.UTF8.GetString(body);
  307. consumeResult = dealMessage(message);
  308. }
  309. catch (Exception ex)
  310. {
  311. throw ex;
  312. }
  313. finally
  314. {
  315. if (consumeResult == ConsumeAction.ACCEPT)
  316. {
  317. channel.BasicAck(ea.DeliveryTag, false); //消息从队列中删除
  318. }
  319. else if (consumeResult == ConsumeAction.RETRY)
  320. {
  321. channel.BasicNack(ea.DeliveryTag, false, true); //消息重回队列
  322. }
  323. else
  324. {
  325. channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丢弃
  326. }
  327. }
  328. }
  329. else
  330. {
  331. dealMessage(string.Empty);
  332. }
  333. }
  334. }
  335. catch (Exception ex)
  336. {
  337. throw ex;
  338. }
  339. finally
  340. {
  341. ResetMQConnectionToFree(connection);
  342. }
  343. }
  344. /// <summary>
  345. /// 获取队列消息数
  346. /// </summary>
  347. /// <param name="connection"></param>
  348. /// <param name="QueueName"></param>
  349. /// <returns></returns>
  350. public static int GetMessageCount(IConnection connection, string QueueName)
  351. {
  352. int msgCount = 0;
  353. try
  354. {
  355. using (var channel = connection.CreateModel())
  356. {
  357. channel.QueueDeclare(QueueName, true, false, false, null); //获取队列
  358. msgCount = (int)channel.MessageCount(QueueName);
  359. }
  360. }
  361. catch (Exception ex)
  362. {
  363. throw ex;
  364. }
  365. finally
  366. {
  367. ResetMQConnectionToFree(connection);
  368. }
  369. return msgCount;
  370. }
  371. }
  372. public enum ConsumeAction
  373. {
  374. ACCEPT, // 消费成功
  375. RETRY, // 消费失败,可以放回队列重新消费
  376. REJECT, // 消费失败,直接丢弃
  377. }
  378. }
复制代码

现在对上述代码的核心点作一个简要的说明:

先说一下静态构造函数:

FreeConnectionQueue 用于存放空闲连接对象队列,为何使用Queue,因为当我从中取出1个空闲连接后,空闲连接数就应该少1个,这个Queue很好满足这个需求,而且这个Queue是并发安全的Queue哦(ConcurrentQueue)

BusyConnectionDic 忙(使用中)连接对象集合,为何这里使用字典对象呢,因为当我用完后,需要能够快速的找出使用中的连接对象,并能快速移出,同时重新放入到空闲队列FreeConnectionQueue ,达到连接复用

MQConnectionPoolUsingDicNew 连接使用次数记录集合,这个只是辅助记录连接使用次数,以便可以计算一个连接的已使用次数,当达到最大使用次数时,则应断开重新创建

MQConnectionPoolSemaphore 这个是信号量,这是控制并发连接的重要手段,连接池的容量等同于这个信号量的最大可并行数,保证同时使用的连接数不超过连接池的容量,若超过则会等待;

具体步骤说明:

1.MaxConnectionCount:最大保持可用连接数(可以理解为连接池的容量),可以通过CONFIG配置,默认为30;

2.DefaultMaxConnectionUsingCount:默认最大连接可访问次数,我这里没有使用配置,而是直接使用常量固定为1000,大家若有需要可以改成从CONFIG配置,参考MaxConnectionCount的属性设置(采取了依赖缓存)

3.CreateMQConnectionInPoolNew:从连接池中创建MQ连接对象,这个是核心方法,是实现连接池的地方,代码中已注释了重要的步骤逻辑,这里说一下实现思路:

  3.1 通过MQConnectionPoolSemaphore.WaitOne() 利用信号量的并行等待方法,如果当前并发超过信号量的最大并行度(也就是作为连接池的最大容量),则需要等待空闲连接池,防止连接数超过池的容量,如果并发没有超过池的容量,则可以进入获取连接的逻辑;

  3.2FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount,如果空闲连接队列+忙连接集合的总数小于连接池的容量,则可以直接创建新的MQ连接,否则FreeConnectionQueue.TryDequeue(out mqConnection) 尝试从空闲连接队列中获取一个可用的空闲连接使用,若空闲连接都没有,则需要返回到方法首行,重新等待空闲连接;

  3.3MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen 如果取到空闲连接,则先判断使用次数是否超过最大限制,超过则释放连接或空闲连接已断开连接也需要重新创建,否则该连接可用;

  3.4BusyConnectionDic[mqConnection] = true;加入到忙连接集合中,MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1; 使用次数加1,确保每使用一次连接,连接次数能记录

4.ResetMQConnectionToFree:重置释放连接对象,这个是保证MQ连接用完后能够回收到空闲连接队列中(即:回到连接池中),而不是直接断开连接,这个方法很简单就不作作过多说明。

好了,都说明了如何实现含连接池的MQHelper,现在再来举几个例子来说明如何用:

三、实际应用(简单易上手)

获取并消费一个消息:

  1. public string GetMessage(string queueName)
  2. {
  3. string message = null;
  4. try
  5. {
  6. var connection = MQHelper.CreateMQConnectionInPoolNew();
  7. MQHelper.ConsumeMsgSingle(connection, queueName, true, (msg) =>
  8. {
  9. message = msg;
  10. return ConsumeAction.ACCEPT;
  11. });
  12. }
  13. catch (Exception ex)
  14. {
  15. BaseUtil.Logger.Error(string.Format("MQHelper.ConsumeMsgSingle Error:{0}", ex.Message), ex);
  16. message = "ERROR:" + ex.Message;
  17. }
  18. //BaseUtil.Logger.InfoFormat("第{0}次请求,从消息队列(队列名称:小贝)中获取消息值为:{2}", Interlocked.Increment(ref requestCount), queueName, message);
  19. return message;
  20. }
复制代码

 发送一个消息:

  1. public string SendMessage(string queueName, string msg)
  2. {
  3. string result = null;
  4. try
  5. {
  6. var connection = MQHelper.CreateMQConnectionInPoolNew();
  7. result = MQHelper.SendMsg(connection, queueName, msg);
  8. }
  9. catch (Exception ex)
  10. {
  11. BaseUtil.Logger.Error(string.Format("MQHelper.SendMessage Error:{0}", ex.Message), ex);
  12. result = ex.Message;
  13. }
  14. return result;
  15. }
复制代码

 获取消息队列消息数:

  1. public int GetMessageCount(string queueName)
  2. {
  3. int result = -1;
  4. try
  5. {
  6. var connection = MQHelper.CreateMQConnectionInPoolNew();
  7. result = MQHelper.GetMessageCount(connection, queueName);
  8. }
  9. catch (Exception ex)
  10. {
  11. BaseUtil.Logger.Error(string.Format("MQHelper.GetMessageCount Error:{0}", ex.Message), ex);
  12. result = -1;
  13. }
  14. return result;
  15. }
复制代码

 这里说一下:BaseUtil.Logger 是Log4Net的实例对象,另外上面没有针对持续订阅消费消息(ConsumeMsg)作说明,因为这个其实可以不用连接池也不会有问题,因为它是一个持久订阅并持久消费的过程,不会出现频繁创建连接对象的情况。

最后要说的是,虽说代码贴出来,大家一看就觉得很简单,好像没有什么技术含量,但如果没有完整的思路也还是需要花费一些时间和精力的,代码中核心是如何简单高效的解决并发及连接复用的的问题,该MQHelper有经过压力测试并顺利在我司项目中使用,完美解决了之前的问题,由于这个方案是我在公司通宵实现的,可能有一些方面的不足,大家可以相互交流或完善后入到自己的项目中。



回复

使用道具 举报