19.4. 接收消息

Spring Framework

19.4. 接收消息

19.4.1. 同步接收

虽然JMS一般都和异步处理相关,但它也可以同步的方式使用消息。可重载的receive(..)方法提供了这种功能。在同步接收中,接收线程被阻塞直至获得一个消息,有可能出现线程被无限阻塞的危险情况。属性receiveTimeout指定了接收器可等待消息的延时时间。

19.4.2. 异步接收 - 消息驱动的POJOs

类似于EJB世界里流行的消息驱动bean(MDB),消息驱动POJO(MDP)作为JMS消息的接收器。MDP的一个约束(但也请看下面的有关javax.jms.MessageListener类的讨论)是它必须实现javax.jms.MessageListener接口。另外当你的POJO将以多线程的方式接收消息时必须确保你的代码是线程-安全的。

以下是MDP的一个简单实现:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ExampleListener implements MessageListener {

  public void onMessage(Message message) {
    if (message instanceof TextMessage) {
      try {
        System.out.println(((TextMessage) message).getText());
      } catch (JMSException ex) {
        throw new RuntimeException(ex);
      }
    } else {
      throw new IllegalArgumentException("Message must be of type TextMessage");
    }
  }
}

一旦你实现了MessageListener后就可以创建一个消息侦听容器。

请看下面例子是如何定义和配置一个随Sping发行的消息侦听容器的(这个例子用DefaultMessageListenerContainer)

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener" />

<!-- and this is the attendant message listener container -->
<bean id="listenerContainer"
  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="concurrentConsumers" value="5"/>
  <property name="connectionFactory" ref="connectionFactory" />
  <property name="destination" ref="destination" />
  <property name="messageListener" ref="messageListener" />
</bean>

关于各个消息侦听容器实现的特色请参阅相关的Spring Javadoc文档。

19.4.3. SessionAwareMessageListener 接口

SessionAwareMessageListener接口是一个Spring专门用来提供类似于JMS MessageListener的接口,也提供了从接收Message来访问JMS Session的消息处理方法。

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

    void onMessage(Message message, Session session) throws JMSException;
}

如果你希望你的MDP可以响应所有接收到的消息(使用onMessage(Message, Session)方法提供的Session)那么你可以选择让你的MDP实现这个接口(优先于标准的JMS MessageListener接口)。所有随Spring发行的支持MDP的消息侦听容器都支持MessageListenerSessionAwareMessageListener接口的实现。要注意的是实现了SessionAwareMessageListener接口的类通过接口和Spring有了耦合。是否选择使用它完全取决于开发者或架构师。

请注意SessionAwareMessageListener接口的'onMessage(..)'方法会抛出JMSException异常。和标准JMS MessageListener接口相反,当使用SessionAwareMessageListener接口时,客户端代码负责处理任何抛出的异常。

19.4.4. MessageListenerAdapter

MessageListenerAdapter类是Spring的异步支持消息类中的不变类(final class):简而言之,它允许你几乎将任意一个类做为MDP显露出来(当然有某些限制)。

注意

如果你使用JMS 1.0.2 API,你将使用和MessageListenerAdapter一样功能的类MessageListenerAdapter102

考虑如下接口定义。注意虽然这个接口既不是从MessageListener也不是从SessionAwareMessageListener继承来得,但通过MessageListenerAdapter类依然可以当作一个MDP来使用。同时也请注意各种消息处理方法是如何根据他们可以接收并处理消息的内容来进行强类型匹配的。

public interface MessageDelegate {

    void handleMessage(String message);

    void handleMessage(Map message);

    void handleMessage(byte[] message);

    void handleMessage(Serializable message);
}
public class DefaultMessageDelegate implements MessageDelegate {
    // implementation elided for clarity...
}

特别请注意,上面的MessageDelegate接口(上文中DefaultMessageDelegate类)的实现完全依赖于JMS。它是一个真正的POJO,我们可以通过如下配置把它设置成MDP。

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultMessageDelegate"/>
    </constructor-arg>
</bean>

<!-- and this is the attendant message listener container... -->
<bean id="listenerContainer"
  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="concurrentConsumers" value="5"/>
  <property name="connectionFactory" ref="connectionFactory" />
  <property name="destination" ref="destination" />
  <property name="messageListener" ref="messageListener" />
</bean>

下面是另外一个只能处理接收JMSTextMessage消息的MDP示例。注意消息处理方法是如何实际调用'receive'(在MessageListenerAdapter中默认的消息处理方法的名字是'handleMessage')的,但是它是可配置的(你下面就将看到)。注意'receive(..)'方法是如何使用强制类型来只接收和处理JMS TextMessage消息的。

public interface TextMessageDelegate {

    void receive(TextMessage message);
}
public class DefaultTextMessageDelegate implements TextMessageDelegate {
    // implementation elided for clarity...
}

辅助的MessageListenerAdapter类配置文件类似如下:

<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultTextMessageDelegate"/>
    </constructor-arg>
    <property name="defaultListenerMethod" value="receive"/>
    <!-- we don't want automatic message context extraction -->
    <property name="messageConverter">
        <null/>
    </property>
</bean>

请注意,如果上面的'messageListener'收到一个不是TextMessage类型的JMS Message,将会产生一个IllegalStateException异常(随之产生的其他异常只被捕获而不处理)。

MessageListenerAdapter还有一个功能就是如果处理方法返回一个非空值,它将自动返回一个响应消息

请看下面的接口及其实现:

public interface ResponsiveTextMessageDelegate {

    // notice the return type...
    String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
    // implementation elided for clarity...
}

如果上面的DefaultResponsiveTextMessageDelegateMessageListenerAdapter联合使用,那么任意从执行'receive(..)'方法返回的非空值都将(缺省情况下)转换成一个TextMessage。这个返回的TextMessage将被发送到原来的Message中JMS Reply-To属性定义的目的地(如果存在),或者是MessageListenerAdapter设置(如果配置了)的缺省目的地;如果没有定义目的地,那么将产生一个InvalidDestinationException异常(此异常将不会只被捕获而不处理,它沿着调用堆栈上传)。

19.4.5. 事务中的多方参与

参与到事务中只需要一点微小的改动。你需要创建一个事务管理器,并且注册到一个可以参与事务的子类中(DefaultMessageListenerContainerServerSessionMessageListenerContainer)。

为了创建事务管理器,你需要创建一个JmsTransactionManager的实例并提供给它一个支持XA事务功能的连接工厂。

<bean id="transactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
  <property name="connectionFactory" ref="connectionFactory" />
</bean>

然后你只需要把它加入到我们先前的容器配置中。容器会处理其他的事情。

<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="concurrentConsumers" value="5" />
  <property name="connectionFactory" ref="connectionFactory" />
  <property name="destination" ref="destination" />
  <property name="messageListener" ref="messageListener" />
  <property name="transactionManager" ref="transactionManager" />
</bean>