前言
本文原创,转载请说明出处!
Apache-ActiveMQ:是Apache提供的一款开源的消息传送服务,不论是安装还是使用都非常简单实用,可以作为消息缓存管理器来使用(个人观点),看看ActiveMQ提供了什么。
ActiveMQ提供两种消息传递方式:Queues(队列)和Topics(主题)。
这两者的区别主要体现在获取消息端,Queues方式下消息只能被获取一次,也就是说多个消息获取端不会获得重复的信息。
Topics方式相当于订阅,所有消息获取端都可以获得相同的消息。
相比之下,Queues更适合做负载均衡,让多个消息获取端处理一组消息;Topic适合作为消息分发或者消息多用途时使用。
ActiveMQ传递消息的种类包括:BytesMessage、ObjectMessage、TextMessage、XmlMessage,还有些不常用的MapMessagee、StreamMessage等。
接下来看具体的用法。
一、配置Apache-ActiveMQ
Apache-ActiveMQ的安装和配置还是比较简单的,先到下载Apache-ActiveMQ,本文使用的版本是5.5.1,据说比较稳定。
另外到下载两个bin包,里面有对应的DLL,稍后会在项目中用到。
下载好以后,如果没有安装过java SDK,把这个也安装一下,否则无法运行。配置环境变量可以参考
以上步骤完成后,在目录\apache-activemq-5.5.1-bin\apache-activemq-5.5.1\bin下双击activemq.bat启动服务。
Apache-ActiveMQ默认有一个监控网站页面,可以查看当前Apache-ActiveMQ的各项参数。
如何配置Apache-ActiveMQ可以参考,Apache-ActiveMQ提供了丰富的配置,包括分发策略、数据恢复策略、负载均衡、持久化策略。
大家有兴趣可以Google一下,很多写配置的文档,本篇博客重点不在配置,就不赘述了。
二、消息传递
Apache-ActiveMQ把消息传递的过程分为2个部分,提供消息称为Producer(可以理解为制作消息方),接收消息称为Consumer(消息消费方)。
Producter向Apache-ActiveMQ Topic(Queue)中添加消息,Consumer从Apache-ActiveMQ Topic(Queue)取出消息,看到其他人大多写TextMessage,此次示例是用XMLMessage作为消息对象,希望可以给需要用XML传递消息的读者提供一些帮助,下面用代码示例说明。
新建一个Producer用来发送消息:
class Program { static void Main(string[] args) { try { IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/"); using (IConnection connection = factory.CreateConnection()) { using (ISession session = connection.CreateSession()) { IMessageProducer prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("Producer01")); int i = 0; while (i<=10) { Employee Em = new Employee() { name = "Employee" + i.ToString(), num = i }; ITextMessage msg = prod.CreateXmlMessage(Em); Console.WriteLine("Sending: " + Em.name); prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent,Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue); System.Threading.Thread.Sleep(3000); i++; } } } Console.ReadLine(); } catch (System.Exception e) { Console.WriteLine("{0}", e.Message); Console.ReadLine(); } } } public class Employee { public string name { set; get; } public int num { set; get; } }
代码分析:
- 新建一个工厂类示例,端口61616是默认端口,可以在Apache-ActiveMQ配置文件中配置
- 创建连接,接下来创建会话Session,用Session创建一个ActiveMQTopic的Producer,还可以选择ActiveMQQueue方式
- Employee是自定义的员工类,包含一个Name(姓名)属性和一个Num(编号)属性
- CreateXmlMessage新建消息,传入参数Employee实例,此方法会自动此类型转化为XML形式
- Send方法发送消息,在此方法中包含四个参数
- 第一个参数是要发送的消息
- 第二个参数用来设置消息是否持久化,持久化的方式有多种(文件、数据库等),可以在ActiveMQQueue配置文件中定义,默认为文件
- 第三个参数设置消息优先级
- 第四个参数设置消息过期时间,如果是MaxValue表示永不过期,根据实际情况选择
发送消息很简单,接下来是接受消息的代码:
class Program { static void Main(string[] args) { try { IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/"); using (IConnection connection = factory.CreateConnection()) { connection.ClientId = "Listener01"; connection.Start(); using (ISession session = connection.CreateSession()) { IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("Producer01"), "Consumer01", null, false); consumer.Listener += new MessageListener(consumer_Listener); Console.ReadLine(); } connection.Stop(); connection.Close(); } } catch (System.Exception e) { Console.WriteLine(e.Message); } } static void consumer_Listener(IMessage message) { try { ITextMessage msg = (ITextMessage)message; XmlDocument xml = new XmlDocument(); xml.LoadXml(msg.Text); Employee employee = (Employee)xml; Console.WriteLine("Receive: " + employee["name"]+"-"+employee["num"]); } catch (System.Exception e) { Console.WriteLine(e.Message); } } } public class Employee { public Employee() { dic = new Dictionary(); } public Dictionary dic { set; get; } public string this[string key] { get { return dic[key]; } set { if (dic.ContainsKey(key)) { dic[key] = value; } else { dic.Add(key, value); } } } public static implicit operator Employee(XmlDocument xml) { XmlNodeList col = xml.LastChild.ChildNodes; Employee employee = new Employee(); foreach (XmlNode node in col) { employee[node.Name] = node.InnerText; } return employee; } }
代码分析:
- 与发送消息相同,先与Apache-ActiveMQ建立连接
- 与发送消息不同,接受消息需要通过Start方法显式打开连接
- 新建会话Session,CreateDurableConsumer创建消费者(订阅者)
- 设置consumer的接受事件,当接收消息后就会执行consumer_Listener方法
- Employee类对应Producer中的Employee类,用Dictionary<string,string>保存员工信息,并实现一个隐式类型转换
此时我们就可以运行一下我们的示例程序,要记得先运行Apache-ActiveMQ服务程序
以下是运行截图:
总结:
Apache-ActiveMQ个人认为是一个很好的工具,用以定时处理一些消息提供支持,也解决了消息发送与消息处理速度不同的问题。
欢迎各位提出宝贵的意见和建议,如有问题可以加我的QQ:54474314讨论