一、JMS基本概念 1.1 P2P通信
1.2 Pub/Sub通信
二、JMS消息
三、JMS P2P编程
3.1 使用JMS QueueConnection对象
3.2 处理回退事件
3.3 关闭JMS对象
3.4 接收消息
3.5 消息驱动的Bean
3.6 消息持久化
3.7 消息选择器
四、JMS Pub/Sub编程
五、二阶段提交的事务
━━━━━━━━━━━━━━━━━━━━━━━━━━
EJB 2.0和J2EE 1.3规范开始提供对Java消息服务(JMS)的支持。在J2EE 1.3加入JMS之前,J2EE环境中的组件通过RMI-IIOP协议通信,J2EE是一个完全同步的平台。由于在J2EE 1.3规范中引入了JMS,J2EE环境开始具备一项极其重要的功能--异步通信。
● 说明:RMI-IIOP是Java远程方法调用(RMI,Remote Method Invocation)的一个兼容CORBA的版本,CORBA是Common Object Request Broker Architecture的缩写,即公用对象请求代理(调度)体系结构。RMI-IIOP通过CORBA平台和语言中立的通信协议发送RMI消息。
为了支持异步通信,J2EE 1.3规范还引入了一种新的EJB类型:消息驱动的Bean,即Message Driven Bean,简称MDB。如前所述,在JMS之前,J2EE原来是一个建立在Java RMI-IIOP通信协议基础上的同步环境,但MDB却具有接收异步消息的能力。
异步通信使得企业应用能够建立在一种全新的通信机制之上,它具有如下重要优点:
■ 异步通信程序不直接交换信息。由于这个原因,通信的任意一方能够在对方停止响应时继续自己的操作,例如当接收消息的一方不能响应时,发送方仍能够发出消息。
■ 异步通信有助于提高可靠性。支持异步通信的中间件软件提供了有保证的(可靠的)消息递送机制,即使整个环境出现问题也能够保证消息传递到目的地。 Internet是一种不可靠的通信媒体,对于通过Internet通信的应用来说,异步通信的这一特点非常富有吸引力。
■ 发送异步消息的程序不会在等待应答的时候被锁定,从而极大地有利于提高性能。
JMS本身不是一个通信软件,而是一个标准的应用编程接口(API),用来建立厂商中立的异步通信机制。从这个意义上说,JMS类似于JDBC和 JNDI,例如就JDBC来说,它要求有一个底层的数据库提供者,JMS则要求有一个支持JMS标准的底层异步通信中间件提供者,一般称为面向消息的中间件(Message-Oriented Middleware,MOM)。
MOM是一种支持应用程序通过交换消息实现异步通信的技术。在某种程度上,异步通信有点象是人们通过email进行通信;类似地,同步通信的程序就象是人们通过电话进行通信。在异步通信过程中,程序的结合方式是宽松的,换句话说,异步通信的程序并不直接相互联系,而是通过称为队列(Queue)或主题(Topic)的虚拟通道联系。
同时,异步通信还意味着消息通过一个中转站,按照存储-转发的方式传递,即使通信的接收方当时并未运行,另一方(发送方)的程序也能够顺利发出消息,一旦接收方的程序后来开始运行,消息就会传递给它。
JMS通信主要的优点是提供了一个环境,在这个环境中各个程序通过标准的API进行通信,开发者不必再面对各种不同操作系统、数据表示方式、底层协议所带来的复杂性。
本文讨论了JMS编程的基本概念以及两种JMS通信方式:第一种是端对端通信(Point-to-Point,P2P)方式,第二种是出版/订阅(Publish/Subscribe,Pub/Sub)方式,另外还涵盖了JMS消息结构、JMS主要对象、MDB、JMS和WebSphere编程、消息持久化以及JMS事务支持方面的问题。
一、JMS基本概念
鉴于JMS是一种比较新的技术,所以本文将首先详细介绍JMS编程的一般概念,然后再探讨WSAD 5.0环境下的JMS开发。如前所示,JMS程序本身并不直接相互通信,消息被发送给一个临时中转站--队列或主题。队列和主题都是能够收集消息的中转对象,但两者支持的消息传递机制又有所不同,分别对应两种不同的通信方式--P2P和Pub/Sub。
1.1 P2P通信
P2P消息传递又可以按照推(Push)和拉(Pull)两种方式运作。在P2P拉方式中,程序通过称为队列的虚拟通道通信:在通信会话的发送方,发送程序把一个消息"放入"队列,在接收方,接收程序定期扫描队列,寻找它希望接收和处理的消息。和推方式相比,拉方式的消息传递效率较低,因为它需要周而复始地检查消息是否到达,这个过程会消耗一定的资源。另外必须注意的一点是,当接收方发现一个需要处理的消息时,它就会"提取"消息,从效果上看等于从队列删除了消息。
因此,即使有多个接收程序在处理同一个队列,对于某一特定的消息来说,总是只有一个接收者。JMS程序可以使用多个队列,每一个队列可以由多个程序处理,但是只有一个程序才会收到某个特定的消息。
在P2P推方式的消息传递中,发送程序的工作原理也一样,它把消息发送到队列,但接收程序的工作原理有所不同。接收程序实现了一个Listener接口,包括实现了该接口中的onMessage回调方法,在J2EE环境中监听队列接收消息的任务交给了容器,每当一个新的消息达到队列,容器就调用 onMessage方法,将消息作为参数传递给onMessage方法。
P2P通信最重要的特点(不管是推还是拉)是:每一个消息总是只由一个程序接收。一般而言,P2P程序在通信过程中参与的活动较多,例如,发送程序可以向接收程序指出应答消息应当放入哪一个队列,它还可以要求返回一个确认或报告消息。
1.2 Pub/Sub通信
在Pub/Sub通信方式中,程序之间通过一个主题(Topic)实现通信,用主题作为通信媒介要求有Pub/Sub代理程序的支持(稍后详细介绍)。在消息发送方,生产消息的程序向主题发送消息;在接收方,消息的消费程序向感兴趣的主题订阅消息。当一个消息到达主题,所有向该主题订阅的消费程序都会通过onMessage方法的参数收到消息。
这是一种推式的消息传递机制。可以设想,会有一个以上的消费程序收到同一消息的副本。相比之下,程序在Pub/Sub通信过程中参与的活动较少,当生产者程序向某个特定的队列发送消息,它不知道到底会有多少程序接收到该消息(可能有多个,可能没有)。通过订阅主题实现的通信是一种比较灵活的通信方式,主题的订阅者可以动态地改变,却不需要改动底层的通信机制,而且它对整个通信过程完全透明。
Pub/Sub方式的通信要求有Pub/Sub代理的支持。Pub/Sub代理是一种协调、控制消息传递过程,保证消息传递到接收方的程序。P2P通信方式中程序利用一个队列作为通信的中转站,相比之下,Pub/Sub通信方式中程序直接交互的是特殊的代理队列,这就是我们要在 WebSphere MQ常规安装方式之上再装一个MA0C的原因(只有用WebSphere MQ作为JMS提供者时才必须如此。要求使用MQ 5.3.1或更高的版本),MA0C就是一个支持Pub/Sub方式通信的代理软件。
QueueConnectionFactory和TopicConnectionFactory对象是创建相应的QueueConnection对象和 TopicConnection对象的JMS工厂类对象,JMS程序正是通过QueueConnection对象和TopicConnection对象连接到底层的MOM技术。
二、JMS消息
基于JMS的程序通过交换JMS消息实现通信,JMS消息由三部分构成:消息头,消息属性(可选的),消息正文。消息头有多个域构成,包含了消息递送信息和元数据。
消息属性部分包含一些标准的以及应用特定的域,消息选择器依据这些信息来过滤收到的消息。JMS定义了一组标准的属性,要求MOM提供者支持(如表1所示)。消息正文部分包含应用特定的数据(也就是要求递送到目标位置的内容)。
表1 JMS标准消息属性
可选的属性包括JMSXUserID、JMSXAppID、JMSXProducerTXID、ConsumerTXID、 JMSXRcvTimestamp、JMSXDeliveryCount以及JMSXState。消息头为JMS消息中间件提供了描述性信息,诸如消息递送的目标、消息的创建者、保留消息的时间,等等,如表2所示。
表2 JMS消息头的域
*在所有这些消息类型中,TextMessage是最常用的,它不仅简单,而且能够用来传递XML数据。
JMS支持多种消息正文类型,包括:
■ textMessage:消息正文包含一个java.lang.String对象。这是最简单的消息格式,但可以用来传递XML文档。
■ ObjectMessage:消息正文中包含了一个串行化之后的Java对象,这类Java对象必须是可串行化的。
■ MapMessage:消息正文包含一系列"名字-值"形式的数据元素,通常用来传送一些按键-值形式编码的数据。设置数据元素可以用setInt、 setFloat、setString等方法;在接收方,提取数据元素使用相应的getInt、getFloat、getString等方法。
■ BytesMessage:消息正文包含一个字节数组。如果需要发送应用生成的原始数据,通常采用这一消息类型。
■ StreamMessage:消息正文包含一个Java基本数据类型(int,char,double,等等)的流。接收方读取数据的次序跟发送方写入数据的次序一样,读写消息元素分别使用readInt和writeInt、readString和writeString之类的方法。
JMS消息对象可以用JMS会话对象(参见本文"使用JMS QueueConnection对象"部分)创建。下面的例子显示了如何创建各类消息对象:
TextMessage textMsg = session.createTextMessage(); MapMessage mapMsg = session.createMapMessage(); ObjectMessage objectMsg = session.createObjectMessage(); BytesMessage byteMsg = session.createBytesMessage(); |
消息对象提供了设置、提取各个消息头域的方法,下面是几个设置和提取JMS消息头域的例子:
// 获取和设置消息ID String messageID = testMsg.getJMSMessageID(); testMsg.setJMSCorrelationID(messageID); // 获取和设置消息优先级 int messagePriority = mapMsg.getJMSPriority(); mapMsg.setJMSPriority(1); |
另外,消息对象还为标准属性和应用特有的属性提供了类似的设置和提取方法。下面的例子显示了如何设置、提取JMS消息标准属性和应用特有属性域:
int groupSeq = objectMsg.getIntProperty("JMSGroupSeq"); objectMsg.setStringProperty("myName", "孙悟空"); |
JMS为读写消息正文内容提供了许多方法。下面略举数例,说明如何操作不同类型的消息正文:
// Text Message TextMessage textMsg = session.createTextMessage(); textMsg.setText("消息内容文本");// Map Message MapMessage mapMsg = session.createMapMessage(); mapMsg.setInt(BookCatalogNumber, 100); mapMsg.setString(BookTitle, "书籍题目"); mapMsg.setLong(BookCost, 50.00); String bookTitle = mapMsg.getString("BookTitle"); // Object Message ObjectMessage objectMsg = session.createObjectMessage(); Book book = new Book("WinSocks 2.0"); objectMsg.setObject(book); |
三、JMS P2P编程
在JMS P2P通信方式中,发送程序将消息放入一个队列,根据通信要求,发送程序可以要求一个应答信息(请求-应答模式),也可以不要求立即获得应答(发送-遗忘模式)。如果需要应答信息,发送程序通过消息头的JMSReplayTo域向消息的接收程序声明应答信息应当放入哪一个本地队列。
在请求 -应答模式中,发送程序可以按照两种方式操作。一种称为伪同步方式,发送程序在等待应答消息时会被阻塞;另一种是异步方式,发送程序发送消息后不被阻塞,可以照常执行其他处理任务,它可以在以后适当的时机检查队列,查看有没有它希望得到的应答信息。下面的代码片断显示了JMS程序发送消息的过程。
// 发送JMS消息 import javax.jms.Message; import javax.jms.TextMessage; import javax.jms.QueueConnectionFactory; import javax.jms.QueueConnection; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Queue; import javax.jms.JMSException; import javax.naming.InitialContext; import javax.naming.Context;public class MyJMSSender { private String requestMessage; private String messageID; private int requestRetCode = 1; private QueueConnectionFactory queueConnectionFactory = null; private Queue requestQueue = null; private Queue responseQueue = null; private QueueConnection queueConnection = null; private QueueSession queueSession = null; private QueueSender queueSender = null; private TextMessage textMsg = null; // 其他代码... public int processOutputMessages(String myMessage) { // 查找管理对象 try { InitialContext initContext = new InitialContext(); Context env = (Context) initContext.lookup ("java:comp/env"); queueConnectionFactory = (QueueConnectionFactory) env.lookup("tlQCF"); requestQueue = (Queue) env.lookup("tlReqQueue"); responseQueue = (Queue) env.lookup("tlResQueue"); queueConnection = queueConnectionFactory. createQueueConnection(); queueConnection.start(); queueSession = queueConnection.createQueueSession (true, 0); queueSender = queueSession.createSender(requestQueue); textMsg = queueSession.createTextMessage(); textMsg.setText(myMessage); textMsg.setJMSReplyTo(responseQueue); // 处理消息的代码逻辑... queueSender.send(textMsg); queueConnection.stop(); queueConnection.close(); queueConnection = null; } catch (Exception e) { // 异常处理代码... } return requestRetCode = 0; } } |
下面来分析一下这段代码。JMS程序的第一个任务是找到JNDI名称上下文的位置。对于WSAD开发环境来说,如果JMS程序属于J2EE项目,则 JNDI名称空间的位置由WSAD测试服务器管理,运行时环境能够自动获知这些信息。在这种情况下,我们只要调用InitialContext类默认的构造函数创建一个实例就可以了,即:
InitialContext initContext = new InitialContext(); |
对于WSAD环境之外的程序,或者程序不是使用WSAD JNDI名称空间,例如使用LDAP(轻量级目录访问协议),程序寻找JNDI名称的操作稍微复杂一点,必须在一个Properties或 Hashtable对象中设定INITIAL_CONTEXT_FACTORY和PROVIDER_URL,然后将该Properties对象或 Hashtable对象作为参数调用InitialContext的构造函数。下面我们来看几个创建InitialContext对象的例子,第一个例子显示了运行在WSAD之外的程序如何找到WSAD InitialContext对象。
// 例一:运行在WSAD之外的程序寻找WSAD InitialContext对象 // 说明:将localhost替换为JNDI服务运行的服务器名称 Properties props = new Properties(); props.put(Context.INITIAL_CONTEXT_FACTORY, "com.ibm.websphere.naming.WsnInitialContextFactory"); props.put(Context.PROVIDER_URL, "iiop://localhost/"); InitialContext initialContext = InitialContext(props);// 例二:下面的例子显示了如何找到基于文件的JNDI InitialContext Hashtable hashTab = new Hashtable (); hashTab.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory"); hashTab.put(Context.PROVIDER_URL, "file://c:/temp"); InitialContext initialContext = InitialContext(hashTab); // 例三:下面的例子显示了如何找到基于LDAP的JNDI InitialContext Hashtable hashTab = new Hashtable (); hashTab.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory"); hashTab.put(Context.PROVIDER_URL, "file://server.company.com/o=provider_name, c=us"); InitialContext initialContext = InitialContext(hashTab); |
获得InitialContext对象之后,下一步是要查找java:comp/env子上下文(Subcontext),例如:
Context env = (Context) initContext.lookup("java:comp/env"; |
java:comp/env是J2EE规范推荐的保存环境变量的JNDI名称子上下文。在这个子上下文中,JMS程序需要找到几个由JMS管理的对象,包括QueueConnectionFactory对象、Queue对象等。
下面的代码寻找由JMS管理的几个对象,这些对象是JMS程序执行操作所必需的。
queueConnectionFactory = (QueueConnectionFactory) env.lookup("QCF"); requestQueue = (Queue) env.lookup("requestQueue"); |
接下来,用QueueConnectionFactory对象构造出QueueConnection对象。
queueConnection = queueConnectionFactory.createQueueConnection(); |
3.1 使用JMS QueueConnection对象
JMS QueueConnection提供了一个通向底层MOM(就本文而言,它是指WebSphere MQ队列管理器)的连接,按照这种方式创建的连接使用默认的Java绑定传输端口来连接到本地的队列管理器。
对于MQ客户程序来说(运行在不带本地队列管理器的机器上的MQ程序),QueueConnectionFactory对象需要稍作调整,以便使用客户端传输端口:
QueueConn.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP); |
另外,刚刚创建的QueueConnection对象总是处于停止状态,需要调用"queueConnection.start();"将它启动。
建立连接之后,用QueueConnection对象的createQueueSession方法来创建一次会话。必须注意的是,QueueSession对象有一个单线程的上下文,不是线程安全的,因此会话对象本身以及在会话对象基础上创建的对象都不是线程安全的,在多线程环境中必须加以保护。createQueueSession方法有两个参数,构造一个QueueSession对象的语句类如:
queueSession = queueConnection.createQueueSession (false, Session.AUTO_ACKNOWLEDGE); |
createQueueSession方法的第一个参数是boolean类型,它指定了JMS事务类型--也就是说,当前的队列会话是事务化的(true)还是非事务化的(false)。JMS事务类型主要用于控制消息传递机制,不要将它与EJB的事务类型(NotSupported,Required,等等)混淆了,EJB的事务类型设定的是EJB模块本身的事务上下文。 createQueueSession方法的第二个参数是一个整数,指定了确认模式,也就是决定了如何向服务器证实消息已经传到。
如果队列会话是事务化的(调用createQueueSession方法的第一个参数是true),第二个参数的值将被忽略,因为提交事务时应答总是自动执行的。如果由于某种原因事务被回退,则不会有应答出现,视同消息尚未递送,JMS服务器将尝试再次发送消息。如果有多个消息参与到同一会话上下文,它们被作为一个组处理,确认最后一个消息也就自动确认了此前所有尚未确认的消息。如果发生回退,情况也相似,整个消息组将被视为尚未递送,JMS服务器将试图再次递送这些消息。
下面说明一下底层的工作机制。当发送程序发出一个消息,JMS服务器接收该消息;如果消息是持久性的,服务器先将消息写入磁盘,然后确认该消息。自此之后,JMS服务器负责把消息发送到目的地,除非它从客户程序收到了确认信息,否则不会从临时存储位置删除消息。对于非持久性的消息,收到消息并保存到内存之后确认信息就立即发出了。
如果队列会话是非事务化的(调用 createQueueSession方法的第一个参数是false),则应答模式由第二个参数决定。第二个参数的可能取值包括:AUTO_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE,以及CLIENT_ACKNOWLEDGE。
■ 对于非事务化的会话来说,AUTO_ACKNOWLEDGE确认模式是最常用的确认模式。对于事务化的会话,系统总是假定使用AUTO_ACKNOWLEDGE确认模式。
■ DUPS_OK_ACKNOWLEDGE模式是一种"懒惰的"确认方式。可以想到,这种模式可能导致消息提供者传递的一些重复消息出错。这种确认模式只用于程序可以容忍重复消息存在的情况。
■ 在CLIENT_ACKNOWLEDGE模式中,消息的传递通过调用消息对象的acknowledge方法获得确认。
在AUTO_ACKNOWLEDGE模式中,消息的确认通常在事务结束处完成。CLIENT_ACKNOWLEDGE使得应用程序能够加快这一过程,只要处理过程一结束就立即予以确认,恰好在整个事务结束之前。当程序正在处理多个消息时,这种确认模式也是非常有用的,因为在这种模式中程序可以在收到所有必需的消息后立即予以确认。
对于非事务化的会话,一旦把消息放入了队列,所有接收程序立即能够看到该消息,且不能回退。对于事务化的会话,JMS事务化上下文把多个消息组织成一个工作单元,消息的发送和接收都是成组执行。事务化上下文会保存事务执行期间产生的所有消息,但不会把消息立即发送到目的地。
只有当事务化的队列会话执行提交时,保存的消息才会作为一个整体发送给目的地,这时接收程序才可以看到这些消息。如果在事务化队列会话期间出现错误,在错误出现之前已经成功处理的消息也会被撤销。定义成事务化的队列会话总是有一个当前事务,不需要显式地用代码来开始一个事务。当然,事务化的环境总是存在一定的代价--事务化会话总是要比非事务化会话慢。
● 注意:JMS队列会话的事务化是一个概念,实现了JMS逻辑的Java方法的事务属性是另一个概念,不要将两者混淆了。TX_REQUIRED属性表示的是方法在一个事务上下文之内运行,从而确保数据库更新和队列中消息的处理作为一个不可分割的工作单元执行(要么都成功提交,要么都回退)。顺便说明一下,在容器管理的事务环境中,一个全局性的两阶段提交(Two-Phase Commit)事务上下文会被激活(参见本文后面的详细说明),这时参与全局性事务的数据源应当用XA版的数据库驱动程序构建。
相比之下,在createQueueSession方法的第一个参数中指定true建立的是JMS事务上下文:多个消息被视为一个工作单元。在消息的接收方,执行queueSession.commit()之前,即使已经收到了多个消息也不会给出确认信息;一旦queueSession.commit() 执行,它就确认收到了在此之前尚未提交的所有消息;消息发送方的情况也相似。
3.2 处理回退事件
如前所述,如果事务异常终止,收到的消息会被发回到原来的队列。接收消息的程序下一次再来处理该队列时,它还会再次得到同一个消息,而且这一次事务很有可能还是要失败,必须再次把消息发送回输入队列--如此不断重复,就形成了无限循环的情况。
为防止这种情况出现,我们可以设置监听端口上的Max_retry计数器,超过Max_retry计数器规定的值,接收程序就不会再收到该消息;或者对于推式的会话,消息不会再被传递。另外,在推式会话中,重新传递的事务会被设置JMSRedelivered标记,程序可以调用消息对象的 getJMSRedelivered方法来检查该标记。
消息通过QueueSender JMS对象发送,QueueSender对象利用QueueSession对象的createSender方法创建,每一个队列都要创建一个QueueSender对象:
queueSender = queueSession.createSender(requestQueue); |
接下来创建一个消息(TextMessage类型),根据myMessage字符串的值设置消息的内容,myMessage变量作为输入参数传递给queueSession.createTextMessag方法。
textMsg = queueSession.createTextMessage(myMessage); |
指定接收队列,告诉接收消息的程序要把应答放入哪一个队列:
textMsg.setJMSReplyTo(responseQueue); |
最后,用Sender对象发送消息,然后停止并关闭连接。
queueSender.send(textMsg); queueConnection.stop(); queueConnection.close(); |
发出消息之后,可以调用消息对象的getJMSMessageID方法获得JMS赋予消息的ID(即提取JMSMessageID消息头域),以后就可以通过这一ID寻找应答消息:
String messageID = message.getJMSMessageID(); |
如果有JMS连接池,释放后的会话不会被拆除,而是被返回给连接池以供重用。
3.3 关闭JMS对象
垃圾收集器不一定会及时关闭JMS对象,如果程序要创建大量短期生存的对象,可能会引起问题,至少会浪费大量宝贵的资源,所以显式地释放所有不用的资源是很重要的。
if (queueConn != null) { queueConn.stop(); queueConn.close(); queueConn = null; } |
关闭队列连接对象将自动关闭所有利用该连接对象创建的对象。但个别JMS提供者例外,这时请按照下面代码所示的次序关闭所有JMS对象。
// 关闭JMS对象 if (queueReceiver != null) { queueReceiver.close(); queueReceiver = null; } if (queueSender != null) { queueSender.close(); queueSender = null; } if (queueSession != null) { queueSession.close(); queueSession = null; } if (queueConn != null) { queueConn.stop(); queueConn.close(); queueConn = null; } |
3.4 接收消息
消息接收方的处理逻辑也和发送方的相似。消息由JMS QueueReceiver对象接收,QueueReceiver对象建立在为特定队列创建的QueueSession对象的基础上。差别在于 QueueReceiver接收消息的方式--QueueReceiver对象能够按照伪同步或异步方式接收消息。下面的代码显示了如何用伪同步方式接收消息。
// 伪同步方式接收消息 import javax.jms.Message; import javax.jms.TextMessage; import javax.jms.QueueConnectionFactory; import javax.jms.QueueConnection; import javax.jms.QueueSender; import javax.jms.Queue; import javax.jms.Exception; import javax.naming.InitialContext; public class MyJMSReceiver { private String responseMessage; private String messageID; private int replyRetCode = 1; private QueueConnectionFactory queueConnectionFactory = null; private Queue inputQueue = null; private QueueConnection queueConnection = null; private QueueSession queueSession = null; private QueueReceiver queueReceiver = null; private TextMessage textMsg = null; public void processIncomingMessages() { // 查找管理对象 InitialContext initContext = new InitialContext(); Context env = (Context) initContext.lookup("java:comp/env"); queueConnectionFactory = (QueueConnectionFactory) env.lookup("tlQCF"); inputQueue = (Queue) env.lookup("tlQueue"); queueConnection = queueConnectionFactory. createQueueConnection(); queueConnection.start(); queueSession=queueConnection.createQueueSession(true, 0); queueReceiver = queueSession.createReceiver(inputQueue); // 等一秒钟,看看是否有消息到达 TextMessage inputMessage = queueReceiver.receive(1000); // 其他处理代码... queueConnection.stop(); queueConnection.close(); } } |
下面分析一下上面的代码。消息由QueueReceiver对象执行receive方法接收。receive方法有一个参数,它指定了接收消息的等待时间(以毫秒计)。在上面的代码中,QueueReceiver对象在一秒之后超出期限,解除锁定,把控制返回给程序。如果调用receive方法时指定了等待时间,QueueReceiver对象在指定的时间内被锁定并等待消息,如果超出了等待时间仍无消息到达,QueueReceiver对象超时并解除锁定,把控制返回给程序。
接收消息的方法还有一个"不等待"的版本,使用这个方法时QueueReceiver对象检查是否有消息之后立即返回,将控制交还给程序。下面是一个例子:
TextMessage message = queueReceiver.receiveNoWait(); |
如果调用receive方法时不指定参数,QueueReceiver对象会无限期地等待消息。采用这种用法时应当非常小心,因为程序会被无限期地锁定。下面是一个无限期等待消息的例子:
TextMessage message = queueReceiver.receive(); |
不管等待期限的参数设置了多少,这都属于拉式消息传递,如前所述,这种消息传递方式的效率较低。不仅如此,这种方法还不适合于J2EE EJB层,不能用于EJB组件之内,原因稍后就会说明。不过,这种处理方式适合在Servlet、JSP和普通Java JMS应用中使用。
接收消息的第二种办法是异步接收。用异步接收方式时,QueueReceiver对象必须用 setMessageListener(class_name)方法注册一个MessageListener类,其中class_name参数可以是任何实现了onMessage接口方法的类。在下面这个例子中,onMessage方法由同一个类实现(为简单计,这里没有给出try/catch块的代码)。
● 注意:接下来的消息接收方式不适用于EJB组件,这些代码仅适用于Servlet、JSP和普通的Java JMS应用程序。
// 消息监听类的例子 import javax.jms.Message; import javax.jms.TextMessage; import javax.jms.QueueConnectionFactory; import javax.jms.QueueConnection; import javax.jms.QueueReceiver; import javax.jms.Queue; import javax.jms.Exception; import javax.naming.InitialContext; public class MyListenerClass implements javax.jms.MessageListener { private int responseRetCode = 1; private boolean loopFlag = true; private QueueConnectionFactory queueConnectionFactory = null; private Queue responseQueue = null; private QueueConnection queueConnection = null; private QueueSession queueSession = null; private QueueSender queueSender = null; public void prepareEnvironment(String myMessage) { // 查找管理对象 InitialContext initContext = new InitialContext(); Context env = (Context) initContext.lookup("java:comp/env"); queueConnectionFactory = (QueueConnectionFactory) env.lookup("tlQCF"); responseQueue = (Queue) env.lookup("tlResQueue"); queueConnection = queueConnectionFactory. createQueueConnection(); queueSession = queueConnection.createQueueSession (true, 0); queueReceiver = queueSession.createReceiver (responseQueue); queueReceiver.setMessageListener(this) queueConnection.start(); }public void onMessage(Message message) { // 希望收到一个文本消息... if (message instanceof TextMessage) { String responseText = "确认消息传递:" + ((TextMessage) message).getText(); // 当收到一个以@字符开头的消息时,循环结束, // MessageListener终止 if (responseText.charAt(0) == '@') { loopFlag = 1; // 结束处理; } else {t // 继续处理消息 // 本例中我们知道应答消息的队列,并非真的要用到 //message.getJMSReplyTo // 这只是一个如何获取应答消息队列的例子 Destination replyToQueue=message.getJMSReplyTo(); // 设置应答消息 TextMessage responseMessage = responseSession.createTextMessage(responseText); // 使CorrelationID等于消息ID, //这样客户程序就能将应答消息和原来的请求 // 消息对应起来 messageID = message.getJMSMessageID(); responseMessage.setJMSCorrelationID(messageID); // 设置消息的目的地 responseMessage.setJMSDestination( replyToQueue) queueSender.send( responseMessage); } } } // 保持监听器活动 while (loopFlag) { // 将控制转移到其他任务(休眠2秒) System.out.println("正处于监听器循环之内..."); Thread.currentThread().sleep(2000); } // 当loopFlag域设置成flase时,结束处理过程 queueConn.stop(); queueConnection.close(); } |
注册一个MessageListener对象时,一个实现了MessageListener逻辑的新线程被创建。我们要保持这个线程处于活动状态,因此使用了一个while循环,首先让线程休眠一定的时间(这里是2秒),将处理器的控制权转让给其他任务。当线程被唤醒时,它检查队列,然后再次进入休眠状态。当一个消息到达当前注册的MessageListener对象所监视的队列时,JMS调用MessageListener对象的 onMessage(message)方法,将消息作为参数传递给onMessage方法。
这是一种推式的消息接收方式,程序效率较高,但仍不适合在EJB组件之内使用。下面将探讨为什么这些接收消息的方式都不能用于EJB组件之内,然后给出解决办法。虽然这部分内容放入了P2P通信方式中讨论,但其基本思路同样适用于Pub/Sub通信方式。
3.5 消息驱动的Bean
在前文讨论JMS消息接收处理逻辑的过程中,我们看到的代码仅仅适用于Servlet、JSP以及普通的Java应用程序,但不适用于EJB,因为在JMS的接收端使用EJB存在一些技术问题。一般地,JMS程序的交互模式分两种:
■ 发送-遗忘:JMS客户程序发出消息,但不需要应答。从性能的角度来看,这是最理想的处理模式,发送程序不需要等待对方响应请求,可以继续执行自己的任务。
■ 同步请求-答复:JMS客户程序发出消息并等待答复。在JMS中,这种交互方式通过执行一个伪同步的接收方法(前文已经讨论)实现。然而,这里可能出现问题。如果EJB模块在一个事务上下文之内执行(通常总是如此),单一事务之内无法执行请求-答复处理,这是因为发送者发出一个消息之后,只有当发送者提交了事务,接收者才能收到消息。因此,在单一事务之内是不可能得到应答的,因为在一个未提交的事务上下文之内接收程序永远收不到消息,当然也不可能作出应答了。解决的办法是请求-答复必须通过两个不同的事务执行。
对于EJB来说,在JMS通信的接收端还可能出现另一个EJB特有的问题。在异步通信中,何时能够获得应答是不可预料的,异步通信的主要优点之一就是发送方能够在发出消息之后继续当前的工作,即使接收方当时根本不能接收消息也一样,但请求-答复模式却隐含地假定了EJB组件(假定是一个会话Bean)应该在发出消息之后等待答复。J2EE实际上是一个基于组件技术的事务处理环境,它的设计目标是处理大量短期生存的任务,却不是针对那些可能会被长期阻塞来等待应答的任务。
为了解决这个问题,Sun在 EJB 2.0规范中加入了一种新的EJB类型,即MDB。MDB是专门为JMS异步消息环境中(接收端)EJB组件面临的问题而设计的,其设计思路是把监听消息是否到达的任务从EJB组件移到容器,由于这个原因,MDB组件总是在容器的控制之下运行,容器代替MDB监听着特定的队列或主题,当消息到达该队列或者主题,容器就激活MDB,调用MDB的onMessage方法,将收到的消息作为参数传递给onMessage方法。
MDB是一种异步组件,它的工作方式和其他同步工作的EJB组件(会话Bean,实体Bean)不同,MDB没有Remote接口和Home接口,客户程序不能直接激活MDB,MDB只能通过收到的消息激活。MDB的另一个重要特点是它对事务和安全上下文的处理方式与众不同,MDB已经彻底脱离了客户程序,因此也不使用客户程序的事务和安全上下文。
发送JMS消息的远程客户程序完全有可能运行在不同的环境之中--非J2EE的环境,例如普通的Java应用程序,可能根本没有任何安全或事务上下文。因此,发送方的事务和安全上下文永远不会延续到接收方的MDB组件。由于MDB永远不会由客户程序直接激活,所以它们也永远不可能在客户程序的事务上下文之内运行。由于这个原因,下面这些事务属性对于MDB来说没有任何意义--Supports,RequiresNew,Mandatory,以及 None,因为这些事务属性隐含地意味着延用客户程序的事务上下文。MDB可以使用的事务属性只有两个:NotSupported和Required。如果一个MDB组件有NotSupported事务属性,则它的消息处理过程不属于任何事务上下文。
就象其他类型的EJB一样,MDB可能参与到两种类型的事务:Bean管理的事务,容器管理的事务。MDB组件的所有方法中,只有onMessage方法可以参与到事务上下文。如果让MDB参与到Bean管理的事务上下文,则表示允许MDB在onMessage方法之内开始和结束一个事务。这种策略的问题在于,收到的消息总是位于onMessage方法之内开始的事务之外(要让消息参与其中已经太迟了)。在这种情况下,如果由于某种原因必须回退,则消息必须手工处理。
如果让MDB参与到容器管理的事务上下文,情况就完全不同了。只要设置了Required事务属性,容器就可以在收到消息时开始一个事务,这样,消息就成为事务的一部分,当事务成功提交时,或者当事务被回退而消息被返回到发送队列时,都可以获得明确的确认信息。
事务可能在两种情形下回退:第一种情形是程序显式地调用setRollBackOnly方法,第二种情形是在onMessage方法之内抛出一个系统异常(注意,抛出应用程序异常不会触发回退动作)。当事务回退时,消息就被返回到原来的队列,监听器将再次发送消息要求处理。一般地,这种情况会引起无限循环,最后导致应用运行不正常。Max_retries属性可以用来控制监听器再次提取重发消息的次数(这个属性在配置监听器端口的时候设置),超过Max_retries规定的限制值之后,监听器将停止处理该消息(显然,这算不上最理想的处理方式)。
如果用WebSphere MQ作为JMS提供者,还有一种更好的解决办法。我们可以配置WebSphere MQ,使其尝试一定的次数之后停止发送同一消息,并将消息放入一个特定的错误队列或Dead.Letter.Queue。记住,MDB是无状态的组件,也就是说它不会在两次不同的方法调用之间维持任何状态信息(就象Web服务器处理HTTP请求的情况一样),同时,由于客户程序永远不会直接调用MDB组件,所以MDB组件不能识别任何特定的客户程序,在这个意义上,可以认为MDB组件是"匿名"运行的。所有MDB组件都必须实现javax.ejb.MessageDrivenBean接口和javax.jms.MessageListener接口。
除了onMessage方法之外,MDB还有其他几个回调方法--由容器调用的方法:
■ ejbCreate方法:容器调用该方法来创建MDB的实例。在ejbCreate方法中可以放入一些初始化的逻辑。
■ setMessageDrivenContext方法:当Bean第一次被加入到MDB Bean的缓冲池时容器将调用该方法。setMessageDrivenContext方法通常用来捕获MessageDrivenContext,并将 setMessageDrivenContext保存到类里面的变量,例如:
public void setMessageDrivenContext(java.ejb. MessageDrivenContext mdbContext) { messageDrivenContext = mdbContext; } |
■ ejbRemove方法:容器把Bean从缓冲池移出并拆除时会调用该方法。一般地,ejbRemove方法会执行一些清理操作。
一般而言,在onMessage方法之内执行业务逻辑是不推荐的,业务方面的操作最好委派给其他EJB组件执行。
MDB容器会自动控制好并发处理多个消息的情形。每一个MDB实例处理一个消息,在onMessage方法把控制返回给容器之前,不会要求MDB同时处理另一个消息。如果有多个消息必须并行处理,容器会激活同一MDB的多个实例。
从WebSphere 5.0开始,开发环境(WSAD 5.0)和运行环境(WAS 5.0)都开始全面支持MDB。
下面的代码片断显示了一个MDB的概念框架。
// MDB概念框架 // package 声明略 import javax.jms.Message; import javax.jms.MapMessage; import javax.naming.InitialContext; import java.util.*; public class LibraryNotificationBean implements javax.ejb.MessageDrivenBean, javax.jms.MessageListener { MessageDrivenContext messageDrivenContext; Context jndiContext; public void setMessageDrivenContext(MessageDrivenContext msgDrivenContext) { messageDrivenContext = msgDrivenContext; try { jndiContext = new InitialContext(); } catch (NamingException ne) { throw new EJBException(ne); } }public void ejbCreate() { } public void onMessage(Message notifyMsg) { try { MapMessage notifyMessage = (MapMessage) notifyMsg; String bookTitle = (String) notifyMessage. getString("BookTitle"); String bookAuthor = (String) notifyMessage. getString("BookAuthor"); String bookCatalogNumber = (String) notifyMessage.getString ("bookCatalogNumber"); Integer bookQuantity = (Integer) notifyMessage.getInteger("BookQuantity"); // 处理消息...(调用其他EJB组件) } catch (Exception e) { throw new EJBException(e); } } public void ejbRemove() { try { jndiContext.close(); jndiContext = null; } catch (NamingException ne) { // 异常处理 } } } |
3.6 消息持久化
消息可以是持久性的,也可以是非持久性的,持久性的消息和非持久性的消息可以放入同一个队列。持久性的消息会被写入磁盘,即使系统出现故障也可以恢复。当然,正象其他场合的持久化操作一样,消息的持久化也会增加一定的开销,持久性消息大约要慢7%。控制消息持久化的办法之一是定义队列时设置其属性,如果没有显式地设置持久化属性,系统将采用默认值。另外,JMS应用程序本身也可以定义持久化属性:
■ PERSISTENCE(QDEF):继承队列的默认持久化属性值。
■ PERSISTENCE(PERS):持久性的消息。
■ PERSISTENCE(NON):非持久性的消息。
消息的持久性还可以通过消息属性头JMSDeliveryMode来调节,JMSDeliveryMode可以取值DeliveryMode.PERSISTENT或DeliveryMode.NON_PERSISTENT,前者表示消息必须持久化,后者表示消息不需持久化。在事务化会话中处理的消息总是持久性的。
3.7 消息选择器
JMS提供了从队列选取一个消息子集的机制,能够过滤掉所有不满足选择条件的消息。选择消息的条件不仅可以引用消息头的域,还可以引用消息属性的域。下面是如何利用这一功能的例子:
QueueReceiver queueReceiver = queueSession.createReceiver(requestQueue, "BookTitle = 'Windows 2000'"); QueueBrowser queueBrowser = queueSession.createBrowser (requestQueue, "BookTitle = 'Windows 2000' AND BookAuthor = 'Robert Lee'"); |
注意,字符串(例如'Windows 2000')被一个双引号之内的单引号对包围。
四、JMS Pub/Sub编程
Pub/Sub通信方式的编程与P2P通信编程相似,两者最主要的差别是消息发送的目的地对象。在Pub/Sub通信方式中,发布消息的目的地和消费消息的消息源是一个称为主题(Topic)的JMS对象。主题对象的作用就象是一个虚拟通道,其中封装了一个Pub/Sub的目的地(Destination)对象。
在P2P通信方式中,(发送到队列的)消息只能由一个消息消费者接收;但在Pub/Sub通信方式中,消息生产者发布到主题的消息可以分发到多个消息消费者,而且,消息的生产者和消费者之间的结合是如此宽松,以至于生产者根本不必了解任何有关消息消费者的情况,消息生产者和消费者都只要知道一个公共的目的地(也就是双方"交谈"的主题)。
由于这个原因,消息的生产者通常称为出版者,消息的消费者则相应地称为订阅者。出版者为某一主题发布的所有消息都会传递到该主题的所有订阅者,订阅者将收到它订阅的主题上的所有消息,每一个订阅者都会收到一个消息的副本。订阅可以是耐久性的(Durable)或非耐久性(Nondurable)。非耐久性的订阅者只能收到订阅之后才发布的消息。
耐久性订阅者则不同,它可以中断之后再连接,仍能收到在它断线期间发布的消息。Put/Sub通信方式中耐久性连接(在某种程度上)类似于P2P通信中的持久性消息/队列。出版者和订阅者永远不会直接通信,Pub/Sub代理的作用就象是一个信息发布中心,把所有消息发布给它们的订阅者。
● 注意:从WebSphere MQ 5.2开始,加装了MA88和MA0C扩展的WebSphere MQ可以当作JMS Pub/Sub通信代理。此外,WebSphere MQ Integrator也能够作为一个代理用。从MQ 5.3开始,MA88成了MQ基本软件包的一部分,因此只要在MQ 5.3的基础上安装MA0C就可以了。在MQ JMS环境中,要让Pub/Sub通信顺利运行,运行Pub/Sub代理的队列管理器上必须创建一组系统队列。
MQ JMS MA0C扩展模块提供了一个工具来构造所有必需的Pub/Sub系统队列,这个工具就是MQJMS_PSQ.mqsc,位于\java\bin目录之下。要构造Pub/Sub通信方式所需的系统队列,只需从上述目录执行命令:runmqsc < MQJMS_PSQ.mqsc。
多个主题可以组织成一种树形的层次结构,树形结构中主题的名称之间用一个斜杠(/)分隔--例如,Books/UnixBooks /SolarisBooks。如果要订阅一个以上的主题,可以在指定主题名字的时候使用通配符,例如,"Books/#"就是一个使用通配符的例子。
下面给出了一个JMS Pub/Sub通信的例子(为简单计,这里省略了try/catch代码块)。在这个例子中,Books/ UnixBooks/SolarisBooks主题的订阅者将收到所有发布到SolarisBooks的消息,Books/#主题的订阅者将收到所有有关 Books的消息(包括发布到UnixBooks和SolarisBooks主题的消息)。
// JMS Pub/Sub通信 import javsx.jms.*; import javax.naming.*; import javax.ejb.*; public class PublisherExample implements javax.ejb.SessionBean { private TopicConnectionFactory topicConnFactory = null; private TopicConnection topicConnection = null; private TopicPublisher topicPublisher = null; private TopicSession topicSession = null; private SessionContext sessionContext = null; public void setSessionContext(SessionContext ctx) { sessionContext = cts; } public void ejbCreate() throws CreateException { InitialContext initContext = new InitialContext(); // 从JNDI查找主题的连接工厂 topicConnFactory =(TopicConnectionFactory) initContext.lookup("java:comp/env/TCF"); // 从JNDI查找主题 Topic unixBooksTopic = (Topic) initContext.lookup("java:comp/env/UnixBooks"); Topic javaBooksTopic = (Topic) initContext.lookup("java:comp/env/JavaBooks"); Topic linuxBooksTopic = (Topic) initContext.lookup("java:comp/env/LinuxBooks"); Topic windowsBooksTopic = (Topic) initContext.lookup("java:comp/env/WindowsBooks"); Topic allBooksTopic = (Topic) initContext.lookup("java:comp/env/AllBooks"); // 创建连接 topicConnection = topicConnFactory.createTopicConnection(); topicConn.start(); // 创建会话 topicSession = topicConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); } public void publishMessage(String workMessage, String topicToPublish) { // 创建一个消息 TextMessage message = topicSession.createTextMessage (workMessage); // 创建出版者,发布消息 if ((topicToPublish.toLowerCase()).equals("java")) { TopicPublisher javaBooksPublisher = topicSession.createPublisher(javaBooksTopic); javaBooksPublisher.publish(message); } if ((topicToPublish.toLowerCase()).equals("unix")) { TopicPublisher unixBooksPublisher = topicSession.createPublisher(unixBooksTopic); J2EE Enterprise Messaging 475 unixBooksPublisher. publish(message); } if ((topicToPublish.toLowerCase()).equals("linux")) { TopicPublisher linuxBooksPublisher = topicSession.createPublisher(linuxBooksTopic); linuxBooksPublisher.publish(message); } if ((topicToPublish.toLowerCase()).equals("windows")) { TopicPublisher windowsBooksPublisher = topicSession.createPublisher(windowsBooksTopic); windowsBooksPublisher.publish(message); } TopicPublisher allBooksPublisher = topicSession.createPublisher(allBooksTopic); allBooksPublisher.publish(message); } public void ejbActivate() { } public void ejbPassivate() { } public void ejbRemove() { // 清理工作... if (javaBooksPublisher != null) { javaBooksPublisher.close(); javaBooksPublisher = null; } if (unixBooksPublisher != null) { unixBooksPublisher.close(); Chapter 9 476 unixBooksPublisher = null; } if (linuxBooksPublisher != null) { linuxBooksPublisher.close(); linuxBooksPublisher = null; } if (windowsBooksPublisher != null) { windowsBooksPublisher.close(); windowsBooksPublisher = null; } if (allBooksPublisher != null) { allBooksPublisher.close(); allBooksPublisher = null; } if (topicSession != null) { topicSession.close(); topicSession = null; } if (topicConnection != null) { topicConnection.stop(); topicConnection.close(); topicConnection = null; } } |
这段代码比较简单,想来不需要多余的解释了。唯一值得一提的地方是如何将一个消息发布到不同的主题:对于每一个特定的主题,分别创建一个对应的出版者,然后用它将消息发布到主题。
如果一个MDB组件只负责接收消息,把所有其他的消息处理操作都委托给专用业务组件(这意味着MDB之内不包含消息发送或发布逻辑),MDB的代码就相当于P2P通信方式中的处理代码,使用MDB唯一的改变之处是将监听端口从监听一个队列改为监听一个主题。有兴趣的读者可以自己试验一下双重监听的实现方式。
五、二阶段提交的事务
在企业级处理任务中,为了保证JMS或非JMS代码处理业务逻辑过程的完整性(对于一个单元的工作,要么成功提交所有的处理步骤,要么全部回退所有处理步骤),操作一般要在一个事务上下文之内进行。除了将消息放入队列的过程之外,如果还要向数据库插入记录(二阶段提交,要么全部成功,要么全部失败),事务上下文的重要性尤其突出。
为了支持二阶段提交,JMS 规范定义了下列XA版的JMS对象:XAConnectionFactory、XAQueueConnectionFactory、XASession、 XAQueueSession、XATopicConnectionFactory、XATopicConnection,以及 XATopicSession。另外,凡是参与全局事务的所有资源均应该使用其XA版。特别地,对于JDBC资源,必须使用JDBC XADataSource。最后一点是,全局事务由JTA TransactionManager控制。下面的代码显示了建立全局事务所需的步骤。
// 配置全局事务 // 从JNDI名称空间获取一个JTA TransactionManager TransactionManager globalTxnManager = jndiContext.lookup("java:comp/env/txt/txnmgr"); // 启动全局事务 globalTxnManager.begin(); // 获取事务对象 Transaction globalTxn = globalTxnManager.getTransaction(); // 获取XA数据资源 XADatasource xaDatasource = jndiContext.lookup ("java:comp/env/jdbc/datasource"); // 获取一个连接 XAConnection jdbcXAConn = xaDatasource.getConnection(); // 从XA连接获取XAResource XAResource jdbcXAResource = jdbcXAConn.getXAResource(); // 在全局事务中"征募"XAResource globalTxn.enlist(jdbcXAResource); // 获取XA队列连接 XAQueueConnectionFactory xaQueueConnectionFactory = JndiContext.lookup("java:comp/env/jms/xaQCF") XAQueueConnection xaQueueConnection = XaQueueConnectionFactory.createXAQueueConnection(); // 获取XA队列会话 XAQueueSession xaQueueSession = xaQueueConnection. createXAQueueSession(); // 从会话获取XA资源 XAResource jmsXAResource = xaQueueSession.getXAResource(); // 在全局事务中"征募"该XAResource globalTxn.enlist(jmsXAResource); // 其他处理工作... // 提交全局事务 globalTxn.commit(); |
总结:本文介绍了JMS及其在WSAD环境下的编程,探讨了JMS异步通信的主要优点,以及两种通信方式(P2P和Pub/Sub)、MDB、JMS事务、二阶段提交全局事务等方面的概念。希望本文的介绍对你有所帮助。