[jboss-svn-commits] JBL Code SVN: r15404 - in labs/jbossesb/trunk/product: rosetta/src/org/jboss/soa/esb/actions/routing and 3 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Sep 27 08:36:53 EDT 2007
Author: beve
Date: 2007-09-27 08:36:53 -0400 (Thu, 27 Sep 2007)
New Revision: 15404
Modified:
labs/jbossesb/trunk/product/docs/MessageActionGuide.odt
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyJMSUnitTest.java
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyQueuesUnitTest.java
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyTopicsUnitTest.java
Log:
Work for JBESB-1065 "Add Peristent, Priority, and TimeToLive to NotifyJMS and JMSRouter".
Modified: labs/jbossesb/trunk/product/docs/MessageActionGuide.odt
===================================================================
(Binary files differ)
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java 2007-09-27 12:23:21 UTC (rev 15403)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java 2007-09-27 12:36:53 UTC (rev 15404)
@@ -28,6 +28,7 @@
import java.util.Properties;
import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -66,6 +67,9 @@
* jndiName="queue/A"
* message-prop-<i>>prop-name<</i>="<i>>prop-value<</i>" >!-- (Optional)--<
* unwrap="true/false" >!-- (Optional - default false)--<
+ * persistent="true/false" >!-- (Optional - default true)--<
+ * priority="integer" >!-- (Optional - default Message.DEFAULT_PRIORITY)--<
+ * time-to-live="long" >!-- (Optional - default Message.DEFAULT_TIME_TO_LIVE)--<
* />
* </pre>
* Note how properties to be set on the message are prefixed with "message-prop-".
@@ -87,6 +91,18 @@
*/
private static Logger logger = Logger.getLogger(JMSRouter.class);
/**
+ * Constant used in configuration
+ */
+ public static final String PERSISTENT_ATTR = "persistent";
+ /**
+ * Constant used in configuration
+ */
+ public static final String PRIORITY_ATTR = "priority";
+ /**
+ * Constant used in configuration
+ */
+ public static final String TIME_TO_LIVE_ATTR = "time-to-live";
+ /**
* Routing properties.
*/
private List<KeyValuePair> properties;
@@ -102,6 +118,20 @@
* Strategy for setting JMSProperties
*/
private JMSPropertiesSetter jmsPropertiesStrategy = new DefaultJMSPropertiesSetter();
+ /**
+ * Whether messages sent by this router should be sent with delivery mode
+ * DeliveryMode.PERSISTENT or DeliveryMode.NON_PERSISTENT
+ * Default is to send messages persistently
+ */
+ private int deliveryMode = DeliveryMode.PERSISTENT;
+ /**
+ * The priority for messages sent with this router
+ */
+ private int priority = Message.DEFAULT_PRIORITY;
+ /**
+ * The time-to-live for messages sent with this router
+ */
+ private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
/**
* Public constructor.
@@ -126,11 +156,24 @@
super(actionName, properties);
this.properties = properties;
+ logger.debug(properties);
queueName = KeyValuePair.getValue("jndiName", properties);
if(queueName == null) {
throw new ConfigurationException("JMSRouter must specify a 'jndiName' property.");
}
+
+ boolean persistent = Boolean.parseBoolean( KeyValuePair.getValue( PERSISTENT_ATTR, properties, "true") );
+ deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+
+ String priorityStr = KeyValuePair.getValue( PRIORITY_ATTR, properties );
+ if ( priorityStr != null )
+ priority = Integer.parseInt( priorityStr );
+
+ final String ttlStr = KeyValuePair.getValue( TIME_TO_LIVE_ATTR, properties );
+ if ( ttlStr != null )
+ timeToLive = Long.parseLong( ttlStr );
+
createQueueSetup( queueName );
}
@@ -281,6 +324,15 @@
{
try {
queueSetup = new JMSSendQueueSetup(queueName);
+ queueSetup.setDeliveryMode( deliveryMode );
+ queueSetup.setPriority( priority );
+ queueSetup.setTimeToLive( timeToLive );
+ if ( logger.isDebugEnabled() )
+ {
+ logger.debug( "JMSRouter DeliveryMode : " + deliveryMode);
+ logger.debug( "JMSRouter Priority : " + priority);
+ logger.debug( "JMSRouter TimeToLive : " + timeToLive);
+ }
} catch (Throwable t) {
throw new ConfigurationException("Failed to configure JMS Queue for routing.", t);
}
@@ -327,8 +379,27 @@
pool.closeSession(jmsSession) ;
}
}
+
}
+ public void setDeliveryMode(final int deliveryMode ) throws JMSException
+ {
+ if ( jmsProducer != null )
+ jmsProducer.setDeliveryMode( deliveryMode );
+ }
+
+ public void setPriority(final int priority ) throws JMSException
+ {
+ if ( jmsProducer != null )
+ jmsProducer.setPriority( priority );
+ }
+
+ public void setTimeToLive(final long ttl ) throws JMSException
+ {
+ if ( jmsProducer != null )
+ jmsProducer.setTimeToLive( ttl );
+ }
+
private void close() {
try {
if (jmsProducer!=null) {
@@ -369,4 +440,33 @@
}
+ /**
+ * The delivery mode in use.
+ * @return true if the delivery mode is DeliveryMode.PERSISTENT
+ */
+ public boolean isDeliveryModePersistent()
+ {
+ return deliveryMode == DeliveryMode.PERSISTENT ;
+ }
+
+ /**
+ * The priority used when sending messages.
+ *
+ * @return int the priorty
+ */
+ public int getPriority()
+ {
+ return priority;
+ }
+
+ /**
+ * The time-to-live used when sending messages.
+ *
+ * @return int the time-to-live for messages
+ */
+ public long getTimeToLive()
+ {
+ return timeToLive;
+ }
+
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java 2007-09-27 12:23:21 UTC (rev 15403)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyJMS.java 2007-09-27 12:36:53 UTC (rev 15404)
@@ -23,8 +23,10 @@
package org.jboss.soa.esb.notification;
import java.util.Iterator;
+import java.util.List;
import java.util.Properties;
+import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
@@ -33,9 +35,13 @@
import javax.naming.NamingException;
import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.helpers.KeyValuePair;
import org.jboss.soa.esb.message.body.content.BytesBody;
import org.jboss.soa.esb.message.format.MessageType;
import org.jboss.soa.esb.notification.jms.DefaultJMSPropertiesSetter;
@@ -57,6 +63,11 @@
public abstract class NotifyJMS extends NotificationTarget
{
protected Logger log = Logger.getLogger(this.getClass());
+
+ /**
+ * Connection Factory JNDI name.
+ */
+ public static final String CONNECTION_FACTORY = "ConnectionFactory";
/**
* Strategy for setting JMSProperties
@@ -72,6 +83,36 @@
* @throws JMSException if the send was not sucessful
*/
protected abstract void send(final Message p_oMsg, MessageProducer msgProducer ) throws JMSException;
+
+ /**
+ * Get a session specific to the subclasses implementation
+ * i.e Queue or Topic.
+ *
+ * @param pool
+ * @return Session
+ * @throws NamingException
+ * @throws JMSException
+ * @throws ConnectionException
+ */
+ protected abstract Session getSession( JmsConnectionPool pool ) throws NamingException, JMSException, ConnectionException;
+
+ /**
+ * Creates a message producer specific to the subclasses implementation
+ * i.e QueueSender or TopicPublisher.
+ *
+ * @param pool
+ * @param sAtt
+ * @param session
+ * @param environment
+ * @return MessageProducer
+ * @throws NamingException
+ * @throws JMSException
+ * @throws ConnectionException
+ */
+ protected abstract MessageProducer createProducer( final JmsConnectionPool pool,
+ final String destinationName,
+ final Session session,
+ final Properties environment) throws NamingException, JMSException, ConnectionException;
/**
* Element name mnemonic to search for child elements in the ConfigTree at
@@ -99,29 +140,60 @@
public static final String ATT_DEST_NAME = "jndiName";
/**
+ * Constant used in configuration
+ */
+ public static final String PERSISTENT_ATTR = "persistent";
+
+ /**
+ * Constant used in configuration
+ */
+ public static final String PRIORITY_ATTR = "priority";
+
+ /**
+ * Constant used in configuration
+ */
+ public static final String TIME_TO_LIVE_ATTR = "time-to-live";
+
+ /**
* This object holds the JNDI naming context that will be used to obtain a
* JMS destination (javax.jms.Destination) to send/publish the notification
* at sendNotification() time
*/
- protected Context[] m_oCtx;
+ protected Context[] contexts;
private Properties m_oProps = new Properties();
/**
* The javax.jms.Connection instance used to talk to JMS
*/
- protected JmsConnectionPool[] mPool;
+ protected JmsConnectionPool[] connectionPools;
/**
* The javax.jms.Session instance used to talk to JMS
*/
- protected Session[] m_oSess;
+ protected Session[] sessions;
/**
* Array with an instance of javax.jms.MessageProducer on each entry that
* will be used to send the notification at sendNotification() time
*/
- protected MessageProducer[] m_oaMssProd;
+ protected MessageProducer[] producers;
+
+ /**
+ * Delivery mode for JMS Messages, either DeliveryMode.PERSISTENT
+ * or DeliveryMode.NON_PERSISTENT
+ */
+ protected int[] deliveryModes;
+
+ /**
+ * Priorities for JMS Messages sen by this notifier
+ */
+ protected int[] priorities;
+
+ /**
+ * Time-to-lives for JMS Messages sen by this notifier
+ */
+ protected long[] timeToLives;
/**
* Constructor that will be called by child classes to perform
@@ -158,11 +230,11 @@
*/
public void release ()
{
- for (int i=0; i<mPool.length; i++) {
- mPool[i].closeSession(m_oSess[i]);
- if (m_oCtx[i]!=null) {
+ for (int i=0; i<connectionPools.length; i++) {
+ connectionPools[i].closeSession(sessions[i]);
+ if (contexts[i]!=null) {
try {
- m_oCtx[i].close();
+ contexts[i].close();
} catch (NamingException ne) {
log.error(ne.getMessage(), ne);
}
@@ -187,7 +259,7 @@
if (MessageType.JAVA_SERIALIZED.equals(esbMessage.getType()))
{
- jmsMessage = m_oSess[0].createObjectMessage((byte[]) esbMessage.getBody().get(BytesBody.BYTES_LOCATION));
+ jmsMessage = sessions[0].createObjectMessage((byte[]) esbMessage.getBody().get(BytesBody.BYTES_LOCATION));
}
else
{
@@ -195,7 +267,7 @@
if (esbMessage.getBody().get(BytesBody.BYTES_LOCATION)!=null) {
content = new String((byte[]) esbMessage.getBody().get(BytesBody.BYTES_LOCATION));
}
- jmsMessage = m_oSess[0].createTextMessage(content);
+ jmsMessage = sessions[0].createTextMessage(content);
}
setJMSProperties( esbMessage, jmsMessage );
@@ -242,15 +314,15 @@
try
{
final StringBuilder jmsExceptions = new StringBuilder();
- for (int i1 = 0; i1 < m_oaMssProd.length; i1++)
+ for (int i1 = 0; i1 < producers.length; i1++)
{
try
{
- send( p_oMsg, m_oaMssProd[i1]);
+ send( p_oMsg, producers[i1]);
}
catch (final JMSException e)
{
- final String msg = "[JMSException while sending to : " + m_oaMssProd[i1].getDestination();
+ final String msg = "[JMSException while sending to : " + producers[i1].getDestination();
log.error(msg, e);
jmsExceptions.append( NotifyUtil.createExceptionErrorString( msg, e ));
}
@@ -263,6 +335,77 @@
release();
}
}
+
+ /**
+ * Will setup/create JMS connections, sessions, producers.
+ *
+ * @param configTrees
+ * @param destinationType
+ * @throws ConfigurationException
+ * @throws JMSException
+ * @throws ConnectionException
+ */
+ protected void setUpProducers (final ConfigTree[] configTrees, final String destinationType) throws ConfigurationException, JMSException, ConnectionException
+ {
+ // REVIEW: The connection factory name is hardcoded and is the same as
+ // that of the queue connection factory.
+ final int nrQueuesOrTopics = configTrees.length;
+ producers = new MessageProducer[nrQueuesOrTopics];
+ connectionPools = new JmsConnectionPool[nrQueuesOrTopics];
+ contexts = new Context[nrQueuesOrTopics];
+ sessions = new Session[nrQueuesOrTopics];
+ deliveryModes = new int[nrQueuesOrTopics];
+ priorities = new int[nrQueuesOrTopics];
+ timeToLives = new long[nrQueuesOrTopics];
+
+ try
+ {
+ for (int i = 0; i < nrQueuesOrTopics; i++)
+ {
+ Properties environment = new Properties();
+ String sAtt = configTrees[i].getAttribute(ATT_DEST_NAME);
+ if (null == sAtt) throw new ConfigurationException("Missing attribute '" + ATT_DEST_NAME);
+
+ String jndiURL = configTrees[i].getAttribute(JMSEpr.JNDI_URL_TAG);
+ if (jndiURL!=null) environment.setProperty(Context.PROVIDER_URL, jndiURL);
+
+ String contextFactory = configTrees[i].getAttribute(JMSEpr.JNDI_CONTEXT_FACTORY_TAG);
+ if (contextFactory!=null) environment.setProperty(Context.INITIAL_CONTEXT_FACTORY, contextFactory);
+
+ String prefix = configTrees[i].getAttribute(JMSEpr.JNDI_PKG_PREFIX_TAG);
+ if (prefix!=null) environment.setProperty(Context.URL_PKG_PREFIXES, prefix);
+ List<KeyValuePair> properties=configTrees[i].childPropertyList();
+ for (KeyValuePair property : properties) {
+ if (property.getKey().startsWith("java.naming.")) {
+ environment.setProperty(property.getKey(), property.getValue());
+ }
+ }
+
+ String connectionFactory = configTrees[i].getAttribute(JMSEpr.CONNECTION_FACTORY_TAG, CONNECTION_FACTORY);
+
+ connectionPools[i] = JmsConnectionPoolContainer.getPool(environment, connectionFactory, destinationType);
+ sessions[i] = getSession( connectionPools[i] );
+ producers[i] = createProducer( connectionPools[i], sAtt, sessions[i], environment );
+
+ final String persistentStr = configTrees[i].getAttribute( PERSISTENT_ATTR, "true" );
+ deliveryModes[i] = persistentStr.equalsIgnoreCase( "true" ) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+
+ final String priorityStr = configTrees[i].getAttribute( PRIORITY_ATTR );
+ priorities[i] = priorityStr == null ? Message.DEFAULT_PRIORITY : Integer.parseInt( priorityStr );
+
+ final String ttlStr = configTrees[i].getAttribute( TIME_TO_LIVE_ATTR );
+ timeToLives[i] = ttlStr == null ? Message.DEFAULT_TIME_TO_LIVE : Long.parseLong( ttlStr );
+
+ producers[i].setDeliveryMode( deliveryModes[i] );
+ producers[i].setPriority( priorities[i] );
+ producers[i].setTimeToLive( timeToLives[i] );
+ }
+ }
+ catch (NamingException ex)
+ {
+ throw new ConfigurationException(ex);
+ }
+ }
/**
* Sets the strategy for handling the setting of properties on an outgoing
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java 2007-09-27 12:23:21 UTC (rev 15403)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyQueues.java 2007-09-27 12:36:53 UTC (rev 15404)
@@ -22,7 +22,6 @@
package org.jboss.soa.esb.notification;
-import java.util.List;
import java.util.Properties;
import javax.jms.JMSException;
@@ -35,13 +34,12 @@
import javax.naming.Context;
import javax.naming.NamingException;
+import org.apache.log4j.Logger;
import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
-import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.helpers.ConfigTree;
-import org.jboss.soa.esb.helpers.KeyValuePair;
import org.jboss.soa.esb.helpers.NamingContext;
;
@@ -60,12 +58,10 @@
*/
public class NotifyQueues extends NotifyJMS
{
+ @SuppressWarnings("unused")
+ private Logger log = Logger.getLogger( NotifyQueues.class );
+
/**
- * Connection Factory JNDI name.
- */
- public static final String CONNECTION_FACTORY = "ConnectionFactory";
-
- /**
* Element name mnemonic to search for child elements in the ConfigTree at
* constructor time, that will hold a "jndiName" attribute specifying the
* value to look up in the JNDI context in order to obtain a queue
@@ -73,7 +69,7 @@
* @see NotifyJMS#ATT_DEST_NAME
*/
public static final String CHILD_QUEUE = "queue";
-
+
public NotifyQueues (ConfigTree p_oP) throws ConfigurationException, JMSException, ConnectionException
{
super(p_oP);
@@ -82,68 +78,60 @@
protected void setQueues (ConfigTree[] p_oaP) throws ConfigurationException, JMSException, ConnectionException
{
- try
- {
-
- m_oaMssProd = new MessageProducer[p_oaP.length];
- mPool = new JmsConnectionPool[p_oaP.length];
- m_oCtx = new Context[p_oaP.length];
- m_oSess = new Session[p_oaP.length];
-
- for (int i = 0; i < p_oaP.length; i++)
- {
- String sAtt = p_oaP[i].getAttribute(ATT_DEST_NAME);
- if (null == sAtt)
- throw new ConfigurationException("Missing queue jndiName");
-
- Properties environment = new Properties();
-
- String jndiURL = p_oaP[i].getAttribute(JMSEpr.JNDI_URL_TAG);
- if (jndiURL!=null) environment.setProperty(Context.PROVIDER_URL, jndiURL);
- String contextFactory = p_oaP[i].getAttribute(JMSEpr.JNDI_CONTEXT_FACTORY_TAG);
- if (contextFactory!=null) environment.setProperty(Context.INITIAL_CONTEXT_FACTORY, contextFactory);
- String prefix = p_oaP[i].getAttribute(JMSEpr.JNDI_PKG_PREFIX_TAG);
- if (prefix!=null) environment.setProperty(Context.URL_PKG_PREFIXES, prefix);
- List<KeyValuePair> properties=p_oaP[i].childPropertyList();
- for (KeyValuePair property : properties) {
- if (property.getKey().startsWith("java.naming.")) {
- environment.setProperty(property.getKey(), property.getValue());
- }
- }
-
- String connectionFactory = p_oaP[i].getAttribute(JMSEpr.CONNECTION_FACTORY_TAG);
- if (connectionFactory==null) {
- connectionFactory = CONNECTION_FACTORY;
- }
- mPool[i] = JmsConnectionPoolContainer.getPool(environment, connectionFactory, JMSEpr.QUEUE_TYPE);
- m_oCtx[i] = NamingContext.getServerContext(environment);
- Queue oQ=null;
- QueueSession queueSession = mPool[i].getQueueSession();
- try {
- oQ = (Queue) m_oCtx[i].lookup(sAtt);
- } catch (NamingException ne) {
- try {
- m_oCtx[i] = NamingContext.getFreshServerContext(environment);
- oQ = (Queue) m_oCtx[i].lookup(sAtt);
- } catch (NamingException nex) {
- //ActiveMQ
- oQ = queueSession.createQueue(sAtt);
- }
- }
- m_oaMssProd[i] = queueSession.createSender(oQ);
- m_oSess[i] = queueSession;
- }
- }
- catch (NamingException ex)
- {
- throw new ConfigurationException(ex);
- }
+ setUpProducers( p_oaP, JMSEpr.QUEUE_TYPE );
}
protected void send (final Message p_oMsg, MessageProducer msgProducer ) throws JMSException
{
QueueSender oCurr = (QueueSender) msgProducer;
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( "Sending to queue with DeliveryMode : " + msgProducer.getDeliveryMode());
+ log.debug( "Sending to queue with Priority : " + msgProducer.getPriority());
+ log.debug( "Sending to queue with TTL : " + msgProducer.getTimeToLive());
+ }
oCurr.send(p_oMsg);
}
+
+ protected QueueSession getSession(final JmsConnectionPool pool) throws NamingException, JMSException, ConnectionException
+ {
+ return pool.getQueueSession();
+ }
+
+ protected MessageProducer createProducer(
+ final JmsConnectionPool pool,
+ final String destinationName,
+ final Session session,
+ final Properties environment) throws NamingException, JMSException, ConnectionException
+ {
+
+ QueueSession queueSession = (QueueSession) session;
+
+ Context context = NamingContext.getServerContext(environment);
+ Queue queue=null;
+ try
+ {
+ queue = (Queue) context.lookup(destinationName);
+ }
+ catch (NamingException ne)
+ {
+ if ( context != null ) context.close();
+ context = NamingContext.getFreshServerContext(environment);
+ try
+ {
+ queue = (Queue) context.lookup(destinationName);
+ }
+ catch (NamingException nex)
+ {
+ //ActiveMQ
+ queueSession.createTopic(destinationName);
+ }
+ }
+ finally
+ {
+ if ( context != null ) context.close();
+ }
+ return queueSession.createSender(queue);
+ }
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java 2007-09-27 12:23:21 UTC (rev 15403)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/NotifyTopics.java 2007-09-27 12:36:53 UTC (rev 15404)
@@ -22,7 +22,6 @@
package org.jboss.soa.esb.notification;
-import java.util.List;
import java.util.Properties;
import javax.jms.JMSException;
@@ -37,20 +36,18 @@
import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
-import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.helpers.ConfigTree;
-import org.jboss.soa.esb.helpers.KeyValuePair;
import org.jboss.soa.esb.helpers.NamingContext;
/**
*
* KS: It is silly we open and close the jms connection with eash request.
+ * @author John Doe
*
*/
public class NotifyTopics extends NotifyJMS
{
- public static final String CONNECTION_FACTORY = "ConnectionFactory";
public static final String CHILD_TOPIC = "topic";
@@ -61,62 +58,9 @@
setTopics(p_oP.getChildren(CHILD_TOPIC));
} // __________________________________
- protected void setTopics (ConfigTree[] p_oaP) throws ConfigurationException, JMSException, ConnectionException
+ protected void setTopics (ConfigTree[] configTrees) throws ConfigurationException, JMSException, ConnectionException
{
- // REVIEW: The connection factory name is hardcoded and is the same as
- // that of the queue connection factory.
- m_oaMssProd = new MessageProducer[p_oaP.length];
- mPool = new JmsConnectionPool[p_oaP.length];
- m_oCtx = new Context[p_oaP.length];
- m_oSess = new Session[p_oaP.length];
-
- try
- {
- for (int i = 0; i < p_oaP.length; i++)
- {
- Properties environment = new Properties();
- String sAtt = p_oaP[i].getAttribute(ATT_DEST_NAME);
- if (null == sAtt) throw new ConfigurationException("Missing topic jndiName");
- String jndiURL = p_oaP[i].getAttribute(JMSEpr.JNDI_URL_TAG);
- if (jndiURL!=null) environment.setProperty(Context.PROVIDER_URL, jndiURL);
- String contextFactory = p_oaP[i].getAttribute(JMSEpr.JNDI_CONTEXT_FACTORY_TAG);
- if (contextFactory!=null) environment.setProperty(Context.INITIAL_CONTEXT_FACTORY, contextFactory);
- String prefix = p_oaP[i].getAttribute(JMSEpr.JNDI_PKG_PREFIX_TAG);
- if (prefix!=null) environment.setProperty(Context.URL_PKG_PREFIXES, prefix);
- List<KeyValuePair> properties=p_oaP[i].childPropertyList();
- for (KeyValuePair property : properties) {
- if (property.getKey().startsWith("java.naming.")) {
- environment.setProperty(property.getKey(), property.getValue());
- }
- }
-
- String connectionFactory = p_oaP[i].getAttribute(JMSEpr.CONNECTION_FACTORY_TAG);
- if (connectionFactory==null) {
- connectionFactory = CONNECTION_FACTORY;
- }
- mPool[i] = JmsConnectionPoolContainer.getPool(environment, connectionFactory, JMSEpr.TOPIC_TYPE);
- TopicSession topicSession = mPool[i].getTopicSession();
- m_oCtx[i] = NamingContext.getServerContext(environment);
- Topic oT=null;
- try {
- oT = (Topic) m_oCtx[i].lookup(sAtt);
- } catch (NamingException ne) {
- m_oCtx[i] = NamingContext.getFreshServerContext(environment);
- try {
- oT = (Topic) m_oCtx[i].lookup(sAtt);
- } catch (NamingException nex) {
- //ActiveMQ
- topicSession.createTopic(sAtt);
- }
- }
- m_oaMssProd[i] = topicSession.createPublisher(oT);
- m_oSess[i] = topicSession;
- }
- }
- catch (NamingException ex)
- {
- throw new ConfigurationException(ex);
- }
+ setUpProducers( configTrees, JMSEpr.TOPIC_TYPE );
}
protected void send (final Message p_oMsg, MessageProducer msgProducer ) throws JMSException
@@ -125,4 +69,45 @@
oCurr.publish(p_oMsg);
}
+ protected TopicSession getSession(final JmsConnectionPool pool) throws NamingException, JMSException, ConnectionException
+ {
+ return pool.getTopicSession();
+ }
+
+ protected MessageProducer createProducer(
+ final JmsConnectionPool pool,
+ final String destinationName,
+ final Session session,
+ final Properties environment) throws NamingException, JMSException, ConnectionException
+ {
+
+ TopicSession topicSession = (TopicSession) session;
+
+ Context context = NamingContext.getServerContext(environment);
+ Topic topic=null;
+ try
+ {
+ topic = (Topic) context.lookup(destinationName);
+ }
+ catch (NamingException ne)
+ {
+ if ( context != null ) context.close();
+ context = NamingContext.getFreshServerContext(environment);
+ try
+ {
+ topic = (Topic) context.lookup(destinationName);
+ }
+ catch (NamingException nex)
+ {
+ //ActiveMQ
+ topicSession.createTopic(destinationName);
+ }
+ }
+ finally
+ {
+ if ( context != null ) context.close();
+ }
+ return topicSession.createPublisher(topic);
+ }
+
} // ____________________________________________________________________________
Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java 2007-09-27 12:23:21 UTC (rev 15403)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/actions/routing/JmsRouterIntegrationTest.java 2007-09-27 12:36:53 UTC (rev 15404)
@@ -23,6 +23,7 @@
package org.jboss.soa.esb.actions.routing;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -199,6 +200,62 @@
router.getOkNotification(null);
}
+ @Test
+ public void construct_with_default_persitent_attribute() throws ConfigurationException, NamingException, JMSException
+ {
+ ConfigTree config = createConfigTree();
+ JMSRouter router = new JMSRouter( config );
+ assertTrue( router.isDeliveryModePersistent() );
+ }
+
+ @Test
+ public void construct_with_persitent_attribute() throws ConfigurationException, NamingException, JMSException
+ {
+ ConfigTree config = createConfigTree();
+ config.setAttribute( JMSRouter.PERSISTENT_ATTR, "false" );
+ JMSRouter router = new JMSRouter( config );
+
+ assertFalse ( router.isDeliveryModePersistent() );
+ }
+
+ @Test
+ public void construct_with_default_priority_attribute() throws ConfigurationException, NamingException, JMSException
+ {
+ ConfigTree config = createConfigTree();
+ JMSRouter router = new JMSRouter( config );
+ assertEquals( javax.jms.Message.DEFAULT_PRIORITY, router.getPriority() );
+ }
+
+ @Test
+ public void construct_with_priority_attribute() throws ConfigurationException, NamingException, JMSException
+ {
+ final int expectedPriority = 10;
+ ConfigTree config = createConfigTree();
+ config.setAttribute( JMSRouter.PRIORITY_ATTR, String.valueOf( expectedPriority ) );
+ JMSRouter router = new JMSRouter( config );
+
+ assertEquals ( expectedPriority, router.getPriority() );
+ }
+
+ @Test
+ public void construct_with_default_time_to_live_attribute() throws ConfigurationException, NamingException, JMSException
+ {
+ ConfigTree config = createConfigTree();
+ JMSRouter router = new JMSRouter( config );
+ assertEquals( javax.jms.Message.DEFAULT_TIME_TO_LIVE, router.getTimeToLive() );
+ }
+
+ @Test
+ public void construct_with_time_to_live_attribute() throws ConfigurationException, NamingException, JMSException
+ {
+ final long ttl = 6000l;
+ ConfigTree config = createConfigTree();
+ config.setAttribute( JMSRouter.TIME_TO_LIVE_ATTR, String.valueOf( ttl ) );
+ JMSRouter router = new JMSRouter( config );
+
+ assertEquals ( ttl, router.getTimeToLive() );
+ }
+
private void assertProcessContract( final String messageID, final Message msg, JMSRouter router ) throws ActionProcessingException, JMSException
{
final Message message = router.process ( msg );
Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyJMSUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyJMSUnitTest.java 2007-09-27 12:23:21 UTC (rev 15403)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyJMSUnitTest.java 2007-09-27 12:36:53 UTC (rev 15404)
@@ -26,6 +26,7 @@
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Properties;
import javax.jms.BytesMessage;
import javax.jms.Destination;
@@ -45,9 +46,12 @@
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
+import javax.naming.NamingException;
import junit.framework.JUnit4TestAdapter;
+import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
+import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.message.format.MessageFactory;
@@ -102,8 +106,8 @@
protected MockNotifyJMS(ConfigTree p_oP) throws ConfigurationException
{
super( p_oP );
- m_oSess = new Session[1];
- m_oSess[0] = new MockJMSSession();
+ sessions = new Session[1];
+ sessions[0] = new MockJMSSession();
}
@Override
@@ -119,6 +123,21 @@
{
return message;
}
+
+ @Override
+ protected MessageProducer createProducer( JmsConnectionPool pool,
+ String sAtt, Session session, Properties environment )
+ throws NamingException, JMSException, ConnectionException
+ {
+ return null;
+ }
+
+ @Override
+ protected Session getSession( JmsConnectionPool pool )
+ throws NamingException, JMSException, ConnectionException
+ {
+ return null;
+ }
}
Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyQueuesUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyQueuesUnitTest.java 2007-09-27 12:23:21 UTC (rev 15403)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyQueuesUnitTest.java 2007-09-27 12:36:53 UTC (rev 15404)
@@ -32,6 +32,7 @@
import java.nio.ByteBuffer;
import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
@@ -73,12 +74,12 @@
import org.mockejb.jms.MockTopic;
import org.mockejb.jms.ObjectMessageImpl;
import org.mockejb.jms.QueueConnectionFactoryImpl;
-import org.mockejb.jms.TextMessageImpl;
import org.mockejb.jndi.MockContextFactory;
/**
* NotifyQueues unit tests.
* @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @author <a href="mailto:daniel.bevenius at gmail.com">Daniel Bevenius</a>
*/
public class NotifyQueuesUnitTest {
@@ -86,7 +87,7 @@
private MockQueue mockQueue1;
private MockQueue mockQueue2;
- private NotifyQueues notifyQueues;
+ private NotifyJMS notifyQueues;
private ConfigTree rootEl;
@@ -100,8 +101,8 @@
rootEl = new ConfigTree("rootEl");
addMessagePropertyConfigs(rootEl);
- addQueueConfig(rootEl, "queue1");
- addQueueConfig(rootEl, "queue2");
+ addQueueConfig(rootEl, "queue1", "false");
+ addQueueConfig(rootEl, "queue2", "true");
mockQueue1 = createAndBindQueue("queue1");
mockQueue2 = createAndBindQueue("queue2");
@@ -151,26 +152,29 @@
}
@Test
- public void release()
+ public void sendNotification_persistent() throws ConfigurationException, JMSException, ConnectionException
{
- notifyQueues.release();
+ assertEquals( DeliveryMode.NON_PERSISTENT, notifyQueues.deliveryModes[0] );
}
- @Test ( expected = JMSException.class )
- public void sendToAll_negative() throws ConfigurationException, JMSException, ConnectionException
+ @Test
+ public void sendNotification_priority() throws ConfigurationException, JMSException, ConnectionException
{
- NotifyQueues notifyQueues = new MockNofityQueues(rootEl);
- notifyQueues.sendToAll(new TextMessageImpl("junit test"));
+ assertEquals( 10, notifyQueues.priorities[0] );
}
- @Test ( expected = NotificationException.class )
- public void sendNotification_negative() throws ConfigurationException, JMSException, ConnectionException, NotificationException
+ @Test
+ public void sendNotification_ttl() throws ConfigurationException, JMSException, ConnectionException
{
- NotifyQueues notifyQueues = new MockNofityQueues(rootEl);
- org.jboss.soa.esb.message.Message message = MessageFactory.getInstance().getMessage(MessageType.JAVA_SERIALIZED);
- notifyQueues.sendNotification(message);
+ assertEquals( 600l, notifyQueues.timeToLives[0] );
}
+ @Test
+ public void release()
+ {
+ notifyQueues.release();
+ }
+
private void checkQueueTextMessage(MockQueue mockQueue, int messageIdx, String expectedText) throws JMSException {
assertTrue(mockQueue.getMessages().size() > messageIdx);
Message message = mockQueue.getMessageAt(0);
@@ -198,15 +202,18 @@
private void addMessagePropertyConfigs(ConfigTree rootEl) {
- ConfigTree propEl = new ConfigTree(NotifyQueues.CHILD_MSG_PROP,rootEl);
+ ConfigTree propEl = new ConfigTree(NotifyJMS.CHILD_MSG_PROP,rootEl);
propEl.setAttribute(NotifyJMS.ATT_PROP_NAME, "testpropname");
propEl.setAttribute(NotifyJMS.ATT_PROP_VALUE, "testpropvalue");
}
- private void addQueueConfig(ConfigTree rootEl, String queueName) {
+ private void addQueueConfig(ConfigTree rootEl, String queueName, String persistent) {
ConfigTree queueEl = new ConfigTree(NotifyQueues.CHILD_QUEUE,rootEl);
+ queueEl.setAttribute( NotifyJMS.PERSISTENT_ATTR, persistent);
+ queueEl.setAttribute( NotifyJMS.PRIORITY_ATTR, "10");
+ queueEl.setAttribute( NotifyJMS.TIME_TO_LIVE_ATTR, "600");
queueEl.setAttribute(NotifyJMS.ATT_DEST_NAME, queueName);
}
@@ -269,92 +276,6 @@
}
}
- private static class MockNofityQueues extends NotifyQueues
- {
-
- public MockNofityQueues(ConfigTree p_oP) throws ConfigurationException, JMSException, ConnectionException
- {
- super( p_oP );
- }
-
- @Override
- protected void setQueues (ConfigTree[] p_oaP) throws ConfigurationException, JMSException, ConnectionException
- {
- m_oaMssProd = new MessageProducer[1];
- mPool = new JmsConnectionPool[0];
- m_oSess = new Session[1];
- m_oaMssProd[0] = new MockSender();
- m_oSess[0] = new MockSession();
-
- }
- }
- private static class MockSender implements QueueSender
- {
- public void send( Message arg0 ) throws JMSException
- {
- throw new JMSException( "MockJMSException");
- }
- public Destination getDestination() throws JMSException { return null; }
-
- public Queue getQueue() throws JMSException { return null; }
- public void send( Queue arg0, Message arg1 ) throws JMSException { }
- public void send( Message arg0, int arg1, int arg2, long arg3 ) throws JMSException { }
- public void send( Queue arg0, Message arg1, int arg2, int arg3, long arg4 ) throws JMSException { }
- public void close() throws JMSException { }
- public int getDeliveryMode() throws JMSException { return 0; }
- public boolean getDisableMessageID() throws JMSException { return false; }
- public boolean getDisableMessageTimestamp() throws JMSException { return false; }
- public int getPriority() throws JMSException { return 0; }
- public long getTimeToLive() throws JMSException { return 0; }
- public void send( Destination arg0, Message arg1 ) throws JMSException { }
- public void send( Destination arg0, Message arg1, int arg2, int arg3, long arg4 ) throws JMSException { }
- public void setDeliveryMode( int arg0 ) throws JMSException { }
- public void setDisableMessageID( boolean arg0 ) throws JMSException { }
- public void setDisableMessageTimestamp( boolean arg0 ) throws JMSException { }
- public void setPriority( int arg0 ) throws JMSException { }
- public void setTimeToLive( long arg0 ) throws JMSException { }
- }
- private static class MockSession implements Session
- {
- public ObjectMessage createObjectMessage() throws JMSException
- {
- return null;
- }
- public ObjectMessage createObjectMessage( Serializable arg0 ) throws JMSException
- {
- return new ObjectMessageImpl( arg0 );
- }
- public void close() throws JMSException { }
- public void commit() throws JMSException { }
- public QueueBrowser createBrowser( Queue arg0 ) throws JMSException { return null; }
- public QueueBrowser createBrowser( Queue arg0, String arg1 ) throws JMSException { return null; }
- public BytesMessage createBytesMessage() throws JMSException { return null; }
- public MessageConsumer createConsumer( Destination arg0 ) throws JMSException { return null; }
- public MessageConsumer createConsumer( Destination arg0, String arg1 ) throws JMSException { return null; }
- public MessageConsumer createConsumer( Destination arg0, String arg1, boolean arg2 ) throws JMSException { return null; }
- public TopicSubscriber createDurableSubscriber( Topic arg0, String arg1 ) throws JMSException { return null; }
- public TopicSubscriber createDurableSubscriber( Topic arg0, String arg1, String arg2, boolean arg3 ) throws JMSException { return null; }
- public MapMessage createMapMessage() throws JMSException { return null; }
- public Message createMessage() throws JMSException { return null; }
- public MessageProducer createProducer( Destination arg0 ) throws JMSException { return null; }
- public Queue createQueue( String arg0 ) throws JMSException { return null; }
- public StreamMessage createStreamMessage() throws JMSException { return null; }
- public TemporaryQueue createTemporaryQueue() throws JMSException { return null; }
- public TemporaryTopic createTemporaryTopic() throws JMSException { return null; }
- public TextMessage createTextMessage() throws JMSException { return null; }
- public TextMessage createTextMessage( String arg0 ) throws JMSException { return null; }
- public Topic createTopic( String arg0 ) throws JMSException { return null; }
- public int getAcknowledgeMode() throws JMSException { return 0; }
- public MessageListener getMessageListener() throws JMSException { return null; }
- public boolean getTransacted() throws JMSException { return false; }
- public void recover() throws JMSException { }
- public void rollback() throws JMSException { }
- public void run() { }
- public void setMessageListener( MessageListener arg0 ) throws JMSException { }
- public void unsubscribe( String arg0 ) throws JMSException { }
-
- }
-
public static junit.framework.Test suite()
{
return new JUnit4TestAdapter(NotifyQueuesUnitTest.class);
Modified: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyTopicsUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyTopicsUnitTest.java 2007-09-27 12:23:21 UTC (rev 15403)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/notification/NotifyTopicsUnitTest.java 2007-09-27 12:36:53 UTC (rev 15404)
@@ -26,6 +26,7 @@
import java.lang.reflect.Proxy;
import java.nio.ByteBuffer;
+import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -38,6 +39,8 @@
import junit.framework.TestCase;
+import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
+import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.message.body.content.BytesBody;
import org.jboss.soa.esb.message.format.MessageFactory;
@@ -95,6 +98,21 @@
checkTopicObjectMessage(mockTopic2, 0, new Integer(123).toString().getBytes());
}
+ public void test_sendNotification_persistent() throws ConfigurationException, JMSException, ConnectionException
+ {
+ assertEquals( DeliveryMode.PERSISTENT, notifyTopics.deliveryModes[0] );
+ }
+
+ public void test_sendNotification_priority() throws ConfigurationException, JMSException, ConnectionException
+ {
+ assertEquals( Message.DEFAULT_PRIORITY, notifyTopics.priorities[0] );
+ }
+
+ public void test_sendNotification_ttl() throws ConfigurationException, JMSException, ConnectionException
+ {
+ assertEquals( Message.DEFAULT_TIME_TO_LIVE, notifyTopics.timeToLives[0] );
+ }
+
public void test_createException()
{
final String msg = "[JMSException while publishing to : /topic/SomeName";
More information about the jboss-svn-commits
mailing list