19.4. 接收消息

Spring Framework

19.4. 接收消息

19.4.1. 同步接收

虽然JMS一般都和异步处理相关,但它也可以同步的方式使用消息。可重载的 receive(..) 方法提供了这种功能。在同步接收中,接收线程被阻塞直至获得一个消息,有可能出现线程被无限阻塞的危险情况。属性 receiveTimeout 指定了接收器可等待消息的延时时间。

19.4.2. 异步接收 - 消息驱动的POJO

类似于EJB世界里流行的消息驱动Bean(MDB),消息驱动POJO(MDP)作为JMS消息的接收器。MDP的一个约束(但也请看下面的有关 javax.jms.MessageListener 类的讨论)是它必须实现 javax.jms.MessageListener 接口。另外当你的POJO将以多线程的方式接收消息时必须确保你的代码是线程-安全的。

以下是MDP的一个简单实现:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ExampleListener implements MessageListener {

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                System.out.println(((TextMessage) message).getText());
            }
            catch (JMSException ex) {
                throw new RuntimeException(ex);
            }
        }
        else {
            throw new IllegalArgumentException("Message must be of type TextMessage");
        }
    }
}

一旦你实现了 MessageListener 后就可以创建一个消息侦听容器。

请看下面例子是如何定义和配置一个随Sping发行的消息侦听容器的(这个例子用 DefaultMessageListenerContainer

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener" />

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener" />
</bean>

关于各个消息侦听容器实现的特色请参阅相关的Spring Javadoc文档。

19.4.3. SessionAwareMessageListener接口

SessionAwareMessageListener 接口是一个Spring专门用来提供类似于JMS MessageListener 的接口,也提供了从接收 Message 来访问JMS Session 的消息处理方法。

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

    void onMessage(Message message, Session session) throws JMSException;
}

如果你希望你的MDP可以响应所有接收到的消息(使用 onMessage(Message, Session) 方法提供的 Session)那么你可以选择让你的MDP实现这个接口(优先于标准的JMS MessageListener 接口)。所有随Spring发行的支持MDP的消息侦听容器都支持 MessageListenerSessionAwareMessageListener 接口的实现。要注意的是实现了 SessionAwareMessageListener 接口的类通过接口和Spring有了耦合。是否选择使用它完全取决于开发者或架构师。

请注意 SessionAwareMessageListener 接口的 'onMessage(..)' 方法会抛出 JMSException异常。和标准JMS MessageListener 接口相反,当使用 SessionAwareMessageListener 接口时,客户端代码负责处理任何抛出的异常。

19.4.4. MessageListenerAdapter

MessageListenerAdapter 类是Spring的异步支持消息类中的不变类(final class):简而言之,它允许你几乎将 任意 一个类做为MDP显露出来(当然有某些限制)。

注意

如果你使用JMS 1.0.2 API,你将使用和 MessageListenerAdapter 一样功能的类 MessageListenerAdapter102

考虑如下接口定义。注意虽然这个接口既不是从 MessageListener 也不是从 SessionAwareMessageListener 继承来得,但通过 MessageListenerAdapter 类依然可以当作一个MDP来使用。同时也请注意各种消息处理方法是如何根据他们可以接收并处理消息的内容来进行强类型匹配的。

public interface MessageDelegate {

    void handleMessage(String message);

    void handleMessage(Map message);

    void handleMessage(byte[] message);

    void handleMessage(Serializable message);
}
public class DefaultMessageDelegate implements MessageDelegate {
    // implementation elided for clarity...
}

特别请注意,上面的 MessageDelegate 接口(上文中 DefaultMessageDelegate 类)的实现完全 依赖于JMS。它是一个真正的POJO,我们可以通过如下配置把它设置成MDP。

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultMessageDelegate"/>
    </constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener" />
</bean>

下面是另外一个只能处理接收JMS TextMessage 消息的MDP示例。注意消息处理方法是如何实际调用 'receive' (在 MessageListenerAdapter 中默认的消息处理方法的名字是 'handleMessage')的,但是它是可配置的(你下面就将看到)。注意 'receive(..)' 方法是如何使用强制类型来只接收和处理JMS TextMessage消息的。

public interface TextMessageDelegate {

    void receive(TextMessage message);
}
public class DefaultTextMessageDelegate implements TextMessageDelegate {
    // implementation elided for clarity...
}

辅助的 MessageListenerAdapter 类配置文件类似如下:

<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultTextMessageDelegate"/>
    </constructor-arg>
    <property name="defaultListenerMethod" value="receive"/>
    <!-- we don't want automatic message context extraction -->
    <property name="messageConverter">
        <null/>
    </property>
</bean>

请注意,如果上面的 'messageListener' 收到一个不是 TextMessage 类型的JMS Message,将会产生一个 IllegalStateException 异常(随之产生的其他异常只被捕获而不处理)。

MessageListenerAdapter 还有一个功能就是如果处理方法返回一个非空值,它将自动返回一个响应 消息

请看下面的接口及其实现:

public interface ResponsiveTextMessageDelegate {

    // notice the return type...
    String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
    // implementation elided for clarity...
}

如果上面的 DefaultResponsiveTextMessageDelegateMessageListenerAdapter 联合使用,那么任意从执行 'receive(..)' 方法返回的非空值都将(缺省情况下)转换成一个 TextMessage。这个返回的 TextMessage 将被发送到原来的 Message 中JMS Reply-To属性定义的 目的地(如果存在),或者是 MessageListenerAdapter 设置(如果配置了)的缺省 目的地;如果没有定义 目的地,那么将产生一个 InvalidDestinationException 异常(此异常将不会只被捕获而不处理,它 沿着调用堆栈上传)。

19.4.5. 事务中的消息处理

在消息监听器的调用中使用事务只需要重新配置监听器容器

通过监听器容器定义中的 sessionTransacted 标记可以轻松的激活本地资源事务。每次消息监听器的调用都在激活的JMS事务中执行,执行失败时,消息接收将发生回滚。这个本地事务还将包含响应信息的发送(通过 SessionAwareMessageListener),但其它资源的操作(例如访问数据库)是独立的。经常会发生类似于数据库处理已提交但消息处理提交失败的情况,因此需要在监听器的实现中进行重复消息的检测。

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
    <property name="sessionTransacted" value="true"/>
</bean>

当参与外部管理的事务时,你需要使用支持外来事务的监听器容器:通常是 DefaultMessageListenerContainer 来配置事务管理器。

参与XA事务时,消息监听器容器需要配置 JtaTransactionManager(默认会委托给J2EE服务器事务子系统)。注意以下JMS ConnectionFactory需要具有XA能力并注册JTA事务协调器!(参考你所使用的J2EE服务器中JNDI资源的配置。)这样,消息接收就像数据库访问一样作为同一个事务的一部分(具有统一提交的语义,仅仅增加了XA事务日志的额外开销)。

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然后你只需要把它添加到早先配置好的容器中。这个容器将处理剩下的事情。

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
    <property name="transactionManager" ref="transactionManager"/>
</bean>