博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Apache-ActiveMQ transport XmlMessage
阅读量:6294 次
发布时间:2019-06-22

本文共 5891 字,大约阅读时间需要 19 分钟。

前言

本文原创,转载请说明出处!

Apache-ActiveMQ:是Apache提供的一款开源的消息传送服务,不论是安装还是使用都非常简单实用,可以作为消息缓存管理器来使用(个人观点),看看ActiveMQ提供了什么。

ActiveMQ提供两种消息传递方式:Queues(队列)和Topics(主题)。

这两者的区别主要体现在获取消息端,Queues方式下消息只能被获取一次,也就是说多个消息获取端不会获得重复的信息。

Topics方式相当于订阅,所有消息获取端都可以获得相同的消息。

相比之下,Queues更适合做负载均衡,让多个消息获取端处理一组消息;Topic适合作为消息分发或者消息多用途时使用。

ActiveMQ传递消息的种类包括:BytesMessage、ObjectMessage、TextMessage、XmlMessage,还有些不常用的MapMessagee、StreamMessage等。

接下来看具体的用法。

一、配置Apache-ActiveMQ

Apache-ActiveMQ的安装和配置还是比较简单的,先到下载Apache-ActiveMQ,本文使用的版本是5.5.1,据说比较稳定。

另外到下载两个bin包,里面有对应的DLL,稍后会在项目中用到。

下载好以后,如果没有安装过java SDK,把这个也安装一下,否则无法运行。配置环境变量可以参考

以上步骤完成后,在目录\apache-activemq-5.5.1-bin\apache-activemq-5.5.1\bin下双击activemq.bat启动服务。

Apache-ActiveMQ默认有一个监控网站页面,可以查看当前Apache-ActiveMQ的各项参数。

如何配置Apache-ActiveMQ可以参考,Apache-ActiveMQ提供了丰富的配置,包括分发策略、数据恢复策略、负载均衡、持久化策略。

大家有兴趣可以Google一下,很多写配置的文档,本篇博客重点不在配置,就不赘述了。

二、消息传递

Apache-ActiveMQ把消息传递的过程分为2个部分,提供消息称为Producer(可以理解为制作消息方),接收消息称为Consumer(消息消费方)。

Producter向Apache-ActiveMQ Topic(Queue)中添加消息,Consumer从Apache-ActiveMQ Topic(Queue)取出消息,看到其他人大多写TextMessage,此次示例是用XMLMessage作为消息对象,希望可以给需要用XML传递消息的读者提供一些帮助,下面用代码示例说明。

新建一个Producer用来发送消息:

class Program    {        static void Main(string[] args)        {            try            {                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");                using (IConnection connection = factory.CreateConnection())                {                    using (ISession session = connection.CreateSession())                    {                        IMessageProducer prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("Producer01"));                         int i = 0;                        while (i<=10)                        {                            Employee Em = new Employee() { name = "Employee" + i.ToString(), num = i };                            ITextMessage msg = prod.CreateXmlMessage(Em);                            Console.WriteLine("Sending: " +  Em.name);                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent,Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);                            System.Threading.Thread.Sleep(3000);                            i++;                        }                    }                }                Console.ReadLine();            }            catch (System.Exception e)            {                Console.WriteLine("{0}", e.Message);                Console.ReadLine();            }          }    }    public class Employee    {        public string name { set; get; }        public int num { set; get; }    }
Producer

代码分析:

  • 新建一个工厂类示例,端口61616是默认端口,可以在Apache-ActiveMQ配置文件中配置
  • 创建连接,接下来创建会话Session,用Session创建一个ActiveMQTopic的Producer,还可以选择ActiveMQQueue方式
  • Employee是自定义的员工类,包含一个Name(姓名)属性和一个Num(编号)属性
  • CreateXmlMessage新建消息,传入参数Employee实例,此方法会自动此类型转化为XML形式
  • Send方法发送消息,在此方法中包含四个参数
    • 第一个参数是要发送的消息
    • 第二个参数用来设置消息是否持久化,持久化的方式有多种(文件、数据库等),可以在ActiveMQQueue配置文件中定义,默认为文件
    • 第三个参数设置消息优先级
    • 第四个参数设置消息过期时间,如果是MaxValue表示永不过期,根据实际情况选择

 发送消息很简单,接下来是接受消息的代码:

class Program    {        static void Main(string[] args)        {            try            {                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");                using (IConnection connection = factory.CreateConnection())                {                    connection.ClientId = "Listener01";                    connection.Start();                    using (ISession session = connection.CreateSession())                    {                        IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("Producer01"), "Consumer01", null, false);                        consumer.Listener += new MessageListener(consumer_Listener);                        Console.ReadLine();                    }                    connection.Stop();                    connection.Close();                }            }            catch (System.Exception e)            {                Console.WriteLine(e.Message);            }        }        static void consumer_Listener(IMessage message)        {            try            {                ITextMessage msg = (ITextMessage)message;                XmlDocument xml = new XmlDocument();                xml.LoadXml(msg.Text);                Employee employee = (Employee)xml;                Console.WriteLine("Receive: " + employee["name"]+"-"+employee["num"]);            }            catch (System.Exception e)            {                Console.WriteLine(e.Message);            }        }    }    public class Employee    {        public Employee()        {            dic = new Dictionary
(); } public Dictionary
dic { set; get; } public string this[string key] { get { return dic[key]; } set { if (dic.ContainsKey(key)) { dic[key] = value; } else { dic.Add(key, value); } } } public static implicit operator Employee(XmlDocument xml) { XmlNodeList col = xml.LastChild.ChildNodes; Employee employee = new Employee(); foreach (XmlNode node in col) { employee[node.Name] = node.InnerText; } return employee; } }
Consumer

代码分析:

  • 与发送消息相同,先与Apache-ActiveMQ建立连接
  • 与发送消息不同,接受消息需要通过Start方法显式打开连接
  • 新建会话Session,CreateDurableConsumer创建消费者(订阅者)
  • 设置consumer的接受事件,当接收消息后就会执行consumer_Listener方法
  • Employee类对应Producer中的Employee类,用Dictionary<string,string>保存员工信息,并实现一个隐式类型转换

 

此时我们就可以运行一下我们的示例程序,要记得先运行Apache-ActiveMQ服务程序

以下是运行截图:

 

总结:

Apache-ActiveMQ个人认为是一个很好的工具,用以定时处理一些消息提供支持,也解决了消息发送与消息处理速度不同的问题。

欢迎各位提出宝贵的意见和建议,如有问题可以加我的QQ:54474314讨论

转载于:https://www.cnblogs.com/renzh/p/Apache-ActiveMQ.html

你可能感兴趣的文章
以太坊ERC20代币合约优化版
查看>>
Why I Began
查看>>
同一台电脑上Windows 7和Ubuntu 14.04的CPU温度和GPU温度对比
查看>>
js数组的操作
查看>>
springmvc Could not write content: No serializer
查看>>
Python系语言发展综述
查看>>
新手 开博
查看>>
借助开源工具高效完成Java应用的运行分析
查看>>
163 yum
查看>>
第三章:Shiro的配置——深入浅出学Shiro细粒度权限开发框架
查看>>
80后创业的经验谈(转,朴实但实用!推荐)
查看>>
让Windows图片查看器和windows资源管理器显示WebP格式
查看>>
我的友情链接
查看>>
vim使用点滴
查看>>
embedded linux学习中几个需要明确的概念
查看>>
mysql常用语法
查看>>
Morris ajax
查看>>
【Docker学习笔记(四)】通过Nginx镜像快速搭建静态网站
查看>>
ORA-12514: TNS: 监听程序当前无法识别连接描述符中请求的服务
查看>>
<转>云主机配置OpenStack使用spice的方法
查看>>