The NMS quick start application demonstrates how to use asynchronous messaging to implement a system for purchasing a stock. To purchase a stock, a client application will send a stock request message containing the information about the stock, i.e. ticker symbol, quantity, etc. The client request message will be received by the server where it will perform business processing on the request, for example to determine if the user has sufficient credit to purchase the stock or if the user is even allowed to make the purchase due to existing account restrictions. These are typically external processes as well. Usually the server application will persist state about the request and forward it on to an execute venue where the actual execution of the stock request is performed. In addition, market data for the stock will be sent from the server process to the client. The high level exchange of information is shown below.
This example was developed with ActiveMQ 5.1 and the ActiveMQ NMS library with subversion repository number 685750.
To implement this flow using messaging the following queues and topics will be used. All requests from the client to the server will be sent on the queue named APP.STOCK.REQUEST. Responses to the requests will be sent from the server to the client on a queue unique to each client. In this example the queue name is of the form APP.STOCK.<UserName>, and more specifically is configured to be APP.STOCK.JOE. Market data does not need to be delivered to an individual client as many client applications are interested in this shared information. As such, the server will send market data information on a topic named APP.STOCK.MARKETDATA. The messaging communication between the server and the execution venue is not included as part of the application. An local implementation of the service interface that represents the execution venue is used instead of one based on messaging or another middleware technology. The messaging flow showing the queues and topics used is shown below.
Queues are shown in red and topics in green.
Gateways represent the service operation to send a message. The
client will send a stock request to the server based on the contract
defined by the IStockService
interface .
public interface IStockService { void Send(TradeRequest tradeRequest); }
The server will send market data to the clients based on the
contract defined by the IMarketDataService
interface.
public interface IMarketDataService { void SendMarketData(); }
The market data gateway has no method parameters as it is assumed
that implementations will manage the data to send internally. The
TradeRequest
object is one of the data objects that
will be exchanged in the application and is discussed in the next
section.
The use of interfaces allows for multiple implementations to be
created. Implementations that use messaging to communicate will be based
on the Spring's NmsGateway
class and will be
discussed later. stub or mock implementations can be used for testing
purposes.
The TradeRequest
object shown above contains
all the information required to process a stock order. To promote the
interoperability of this data across different platforms the
TradeRequest
class is generated from an XML Schema
using Microsoft's Schema Definition Tool (xsd.exe). The schema for trade
request is shown below
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" elementFormDefault="qualified" targetNamespace="http://www.springframework.net/nms/common/2008-08-05"> <xs:element name="TradeRequest"> <xs:complexType> <xs:sequence> <xs:element name="Ticker" type="xs:string"/> <xs:element name="Quantity" type="xs:long"/> <xs:element name="Price" type="xs:decimal"/> <xs:element name="OrderType" type="xs:string"/> <xs:element name="AccountName" type="xs:string"/> <xs:element name="BuyRequest" type="xs:boolean"/> <xs:element name="UserName" type="xs:string"/> <xs:element name="RequestID" type="xs:string"/> </xs:sequence> </xs:complexType> </xs:element> </xs:schema>
Running xsd.exe on this schema will result in a class that contains properties for each of the element names. A partial code listing of the TradeRequest class is shown below
// This code was generated by a tool. public partial class TradeRequest { public string Ticker { get { return this.tickerField; } set { this.tickerField = value; } } public long Quantity { get { return this.quantityField; } set { this.quantityField = value; } } // Additional properties not shown for brevity. }
The schema and the TradeRequest
class are
located in the project Spring.NmsQuickStart.Common
.
This common project will be shared between the server and client for
convenience.
When sending a response back to the client the type
TradeResponse
will be used. The schema for the
TradeResponse
is shown below
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" elementFormDefault="qualified" targetNamespace="http://www.springframework.net/nms/common/2008-08-05"> <xs:element name="TradeResponse"> <xs:complexType> <xs:sequence> <xs:element name="Ticker" type="xs:string"/> <xs:element name="Quantity" type="xs:integer"/> <xs:element name="Price" type="xs:decimal"/> <xs:element name="OrderType" type="xs:string"/> <xs:element name="Error" type="xs:boolean"/> <xs:element name="ErrorMessage" type="xs:string"/> </xs:sequence> </xs:complexType> </xs:element> </xs:schema>
The TradeResponse
type also generated from a
schema using xsd.exe. A partial code listing is shown below
// This code was generated by a tool. public partial class TradeResponse { public string Ticker { get { return this.tickerField; } set { this.tickerField = value; } } public long Quantity { get { return this.quantityField; } set { this.quantityField = value; } } // Additional properties not shown for brevity. }
The market data information will be sent using a Hashtable data structure.
When the TradeRequest
message is received by
the server, it will be handled by the class
Spring.NmsQuickStart.Server.Handlers.StockAppHandler
shown below
public class StockAppHandler { private IExecutionVenueService executionVenueService; private ICreditCheckService creditCheckService; private ITradingService tradingService; public TradeResponse Handle(TradeRequest tradeRequest) { TradeResponse tradeResponse; IList errors = new ArrayList(); if (creditCheckService.CanExecute(tradeRequest, errors)) { tradeResponse = executionVenueService.ExecuteTradeRequest(tradeRequest); tradingService.ProcessTrade(tradeRequest, tradeResponse); } else { tradeResponse = new TradeResponse(); tradeResponse.Error = true; tradeResponse.ErrorMessage = errors[0].ToString(); } return tradeResponse; } }
The stub implementations of the services, located in the namespace
Spring.NmsQuickStart.Server.Services.Stubs
, will result
in always sending back a error-free trade response message. A realistic
implementation would likely have the execution venue and trading service
be remote services and the trading service could be implemented as a local
transactional service layer that uses spring's declarative transaction
management features.
The client will receive a TradeResponse message as well as a Hashtable of data representing the market data. The message handle for the client is the class Spring.NmsQuickStart.Client.Handlers.StockAppHandler and is shown below.
public class StockAppHandler { // definition of stockController omitted for brevity. public void Handle(Hashtable data) { // forward to controller to update view stockController.UpdateMarketData(data); } public void Handle(TradeResponse tradeResponse) { // forward to controller to update view stockController.UpdateTrade(tradeResponse); } }
What is important to note about these handlers is that they contain no messaging API artifacts. As such you can write unit and integration tests against these classes independent of the middleware. The missing link between the messaging world and the objects processed by the message handlers are message converters. Spring's messaging helper classes, i.e. SimpleMessageListenerContainer and NmsTemplate use message converters to pass data to the handlers and to send data via messaging for gateway implementations
The implementation of IMessageConverter used is
Spring.NmsQuickStart.Common.Converters.XmlMessageConverter
.
This converter adds the ability to marshal and unmarshal objects to and
from XML strings. It also uses Spring's
SimpleMessageConverter
to convert Hashtables,
strings, and byte arrays. In order to pass information about the
serialized type, type information is put in the message properties. The
type information can be either the class name or an integer value
identifying the type. In systems where the client and server are deployed
together and are tightly coupled, sharing the class name is a convenient
shortcut. The alternative is to register a type for a given integer value.
The XML configuration used to configure these objects is shown
below
<object name="XmlMessageConverter" type="Spring.NmsQuickStart.Common.Converters.XmlMessageConverter, Spring.NmsQuickStart.Common"> <property name="TypeMapper" ref="TypeMapper"/> </object> <object name="TypeMapper" type="Spring.NmsQuickStart.Common.Converters.TypeMapper, Spring.NmsQuickStart.Common"> <!-- use simple configuation style --> <property name="DefaultNamespace" value="Spring.NmsQuickStart.Common.Data"/> <property name="DefaultAssemblyName" value="Spring.NmsQuickStart.Common"/> </object>
This configuration is common between the server and the client.
The implementations of the gateway interfaces inherit from Spring's
helper class NmsGatewaySupport
in order to get easy
access to a NmsTemplate for sending. The implementation of the
IStockService
interface is shown below
public class NmsStockServiceGateway : NmsGatewaySupport, IStockService { private IDestination defaultReplyToQueue; public IDestination DefaultReplyToQueue { set { defaultReplyToQueue = value; } } public void Send(TradeRequest tradeRequest) { // post process message NmsTemplate.ConvertAndSendWithDelegate(tradeRequest, delegate(IMessage message) { message.NMSReplyTo = defaultReplyToQueue; message.NMSCorrelationID = new Guid().ToString(); return message; }); } }
The Send
method is using NmsTemplate's
ConvertAndSendWithDelegate(object obj,
MessagePostProcessorDelegate messagePostProcessorDelegate)
method. The anonymous delegate allows you to modify the message
properties, such as NMSReplyTo and NMSCorrelationID after the message has
been converted from an object but before it has been sent. The use of an
anonymous delegate allows makes it very easy to apply any post processing
logic to the converted message.
The object definition for the
NmsStockServiceGateway
is shown below along with
its dependent object definitions of NmsTemplate and the
ConnectionFactory.
<object name="StockServiceGateway" type="Spring.NmsQuickStart.Client.Gateways.NmsStockServiceGateway, Spring.NmsQuickStart.Client"> <property name="NmsTemplate" ref="NmsTemplate"/> <property name="DefaultReplyToQueue"> <object type="Apache.NMS.ActiveMQ.Commands.ActiveMQQueue, Apache.NMS.ActiveMQ"> <constructor-arg value="APP.STOCK.JOE"/> </object> </property> </object> <object name="NmsTemplate" type="Spring.Messaging.Nms.Core.NmsTemplate, Spring.Messaging.Nms"> <property name="ConnectionFactory" ref="ConnectionFactory"/> <property name="DefaultDestinationName" value="APP.STOCK.REQUEST"/> <property name="MessageConverter" ref="XmlMessageConverter"/> </object> <object id="ConnectionFactory" type="Apache.NMS.ActiveMQ.ConnectionFactory, Apache.NMS.ActiveMQ"> <constructor-arg index="0" value="tcp://localhost:61616"/> </object>
In this example the 'raw'
Apache.NMS.ActiveMQ.ConnectionFactory
connection
factory was used. It would be more efficient resource wise to use Spring's
CachingConnectionFactory
wrapper class so that
connections will not be open and closed for each message send as well as
allowing for the caching of other intermediate NMS API objects such as
sessions and message producers.
A similar configuration is used on the server to configure the class
Spring.NmsQuickStart.Server.Gateways.MarketDataServiceGateway
that implements the IMarketDataService
interface.
Since the client is also a consumer of messages, on the topic APP.STOCK.MARKETDATA and the queue APP.STOCK.JOE (for Trader Joe!), two message listener containers are defined as shown below.
<nms:listener-container connection-factory="ConnectionFactory"> <nms:listener ref="MessageListenerAdapter" destination="APP.STOCK.JOE" /> <nms:listener ref="MessageListenerAdapter" destination="APP.STOCK.MARKETDATA" pubsub-domain="true"/> </nms:listener-container>
Refer to the messages reference docs for all the available attributes to configure the container and also the section on registering the NMS schema with Spring..
On the server we define a message listener container for the queue APP.STOCK.REQUEST but set the concurrency property to 10 so that 10 threads will be consuming messages from the queue.
<nms:listener-container connection-factory="ConnectionFactory" concurrency="10"> <nms:listener ref="MessageListenerAdapter" destination="APP.STOCK.REQUEST" /> </nms:listener-container>
To run both the client and server make sure that you select 'Multiple Startup Projects' within VS.NET. The GUI has a button to make a hardcoded trade request and show confirmation in a text box. A text area is used to display the market data. There is a 'Get Portfolio' button that is not implemented at the moment. A picture of the GUI after it has been running for a while and trade has been sent and responded to is shown below