[jboss-svn-commits] JBL Code SVN: r22609 - in labs/jbossesb/workspace/skeagh: commons/src/main/java/org/jboss/esb/util and 5 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Sep 10 12:10:35 EDT 2008
Author: tfennelly
Date: 2008-09-10 12:10:35 -0400 (Wed, 10 Sep 2008)
New Revision: 22609
Added:
labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/JMSSession.java
labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/util/JNDIUtil.java
labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/
labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/JMSBusDispatcher.java
Modified:
labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageHandler.java
labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageListener.java
labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/MessageSender.java
labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/MockMessageListener.java
labs/jbossesb/workspace/skeagh/commons/src/test/java/org/jboss/esb/jms/MessageSendAndListenTest.java
labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsInboundRouter.java
labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/JmsInboundRouterTest.java
labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java
Log:
Extracted the JMSSession object from the AbstractMessageHandler. This allows multiple message consumers and producers to share these resources, the lifecycle of which is managed around the deployment.
Modified: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageHandler.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageHandler.java 2008-09-10 16:10:28 UTC (rev 22608)
+++ labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageHandler.java 2008-09-10 16:10:35 UTC (rev 22609)
@@ -21,10 +21,13 @@
import org.apache.log4j.Logger;
import org.jboss.esb.util.AssertArgument;
+import org.jboss.esb.util.JNDIUtil;
-import javax.jms.*;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Session;
import javax.naming.Context;
-import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
@@ -33,7 +36,7 @@
*
* @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
*/
-public abstract class AbstractMessageHandler
+public abstract class AbstractMessageHandler implements ExceptionListener
{
/**
* Logger.
@@ -44,18 +47,10 @@
*/
private String destinationName;
/**
- * JNDI Properties.
+ * The JMS Session.
*/
- private Properties jndiProperties;
+ private JMSSession jmsSession;
/**
- * JMS Connection.
- */
- private Connection conn = null;
- /**
- * JMS Session.
- */
- private Session session = null;
- /**
* JMS Destination.
*/
private Destination destination = null;
@@ -64,16 +59,17 @@
* Public vonstructor.
*
* @param destinationName The JMS destination name.
- * @param jndiProperties The JNDI properties for connecting to the JMS destination.
+ * @param jmsSession The The JMS Session on which this handler is created.
+ * @throws JMSException Failed to connect.
*/
- public AbstractMessageHandler(final String destinationName, final Properties jndiProperties)
+ public AbstractMessageHandler(final String destinationName, final JMSSession jmsSession) throws JMSException
{
AssertArgument.isNotNullAndNotEmpty(destinationName, "destinationName");
- AssertArgument.isNotNull(jndiProperties, "jndiProperties");
+ AssertArgument.isNotNull(jmsSession, "jmsSession");
logger = Logger.getLogger(getClass());
- this.jndiProperties = jndiProperties;
this.destinationName = destinationName;
+ this.jmsSession = jmsSession;
}
/**
@@ -93,7 +89,7 @@
*/
public final Session getSession()
{
- return session;
+ return jmsSession.getSession();
}
/**
@@ -113,61 +109,16 @@
*/
public final void connect() throws JMSException
{
- ConnectionFactory connectionFactory;
-
- // Lookup the destination...
- try
+ destination = lookupDestination(destinationName, jmsSession.getProperties());
+ if (!jmsSession.getDestinationType().isAssignableFrom(destination.getClass()))
{
- Context context = getNamingContext();
- try
- {
- destination = (Destination) context.lookup(destinationName);
- } finally
- {
- context.close();
- }
- } catch (NamingException e)
- {
- throw (JMSException) (new JMSException("Destination lookup failed. Destination name '" + destinationName + "'.").initCause(e));
+ throw new JMSException("Handler '" + getClass().getName() + "' destination '" + destinationName + "' cannot be created. The destination type does not match that of the associate JMSSession instance. Session type is '" + jmsSession.getDestinationType().getName() + "'.");
}
- // Get the Destination ConnectionFactory...
- try
- {
- connectionFactory = getJmsConnectionFactory();
- } catch (ClassCastException e)
- {
- throw (JMSException) (new JMSException("Invalid JMS ConnectionFactory config. ConnectionFactory doesn't implement " + TopicConnectionFactory.class.getName() + ".").initCause(e));
- }
-
- // Create the destination connection...
- if (destination instanceof Topic)
- {
- conn = ((TopicConnectionFactory) connectionFactory).createTopicConnection();
- } else
- {
- conn = ((QueueConnectionFactory) connectionFactory).createQueueConnection();
- }
-
- // Create the Destination Session...
- if (destination instanceof Topic)
- {
- session = ((TopicConnection) conn).createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
- } else
- {
- session = ((QueueConnection) conn).createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
- }
-
- // Start the connection...
- conn.start();
-
- // Listen for exceptions on the connection...
+ // Listen for exceptions on the session connection...
if (this instanceof ExceptionListener)
{
- conn.setExceptionListener((ExceptionListener) this);
- } else
- {
- conn.setExceptionListener(new DefaultExceptionListener());
+ jmsSession.registerExceptionListener((ExceptionListener) this);
}
// And perform the handler specific connection ops...
@@ -183,54 +134,42 @@
{
// And perform the handler specific close ops...
handlerClose();
- } catch (Throwable e)
+ }
+ catch (Throwable e)
{
logger.debug("Handler '" + getClass().getName() + "' close method threw unexpected exception while closing JMS destination '" + destinationName + "'.", e);
// Handler close failures should be taken care of by the handler impl. Continue closing...
}
+ destination = null;
+ logger.debug("JMS handler '" + getClass().getName() + "' on JMS destination '" + destinationName + "' is now closed.");
+ }
- try
+ /**
+ * JMS Exception handler.
+ * @param e The JMS Exception.
+ */
+ public void onException(JMSException e)
+ {
+ close();
+ while (!isConnected())
{
- if (conn != null)
+ try
{
- conn.stop();
- logger.debug("Stopping JMS Connection for connected handler '" + getClass().getName() + "' on JMS destination '" + destinationName + "'.");
+ connect();
}
- } catch (Throwable e)
- {
- logger.error("Failed to stop JMS connection.", e);
- conn = null;
- }
- try
- {
- if (session != null)
+ catch (JMSException e1)
{
- session.close();
- logger.debug("Closing JMS Session for connected handler '" + getClass().getName() + "' on JMS destination '" + destinationName + "'.");
+ // Failed to reconnect, wait and try again...
+ try
+ {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e2)
+ {
+ logger.debug("Interrupted during reconnect attempt. Aborting reconnect! Will need restart to reconnect.", e);
+ }
}
- } catch (Throwable e)
- {
- logger.error("Failed to close JMS session.", e);
- } finally
- {
- session = null;
}
- try
- {
- if (conn != null)
- {
- conn.close();
- logger.debug("Closing JMS Connection for connected handler '" + getClass().getName() + "' on JMS destination '" + destinationName + "'.");
- }
- } catch (Throwable e)
- {
- logger.error("Failed to close JMS connection.", e);
- } finally
- {
- conn = null;
- }
- destination = null;
- logger.debug("JMS handler '" + getClass().getName() + "' on JMS destination '" + destinationName + "' is now closed.");
}
/**
@@ -250,139 +189,38 @@
protected abstract void handlerClose();
/**
- * Get the JMS Connection factory.
+ * Is the handler connected.
*
- * @return Connection Factory.
- * @throws JMSException Error getting factory.
+ * @return True if the connector is connected, otherwise false.
*/
- private ConnectionFactory getJmsConnectionFactory() throws JMSException
- {
- ConnectionFactory factory = null;
- Context context;
- String connectionFactoryRuntime = jndiProperties.getProperty(ConnectionFactory.class.getName(), "ConnectionFactory");
+ public abstract boolean isConnected();
- context = getNamingContext();
- try
- {
- factory = (ConnectionFactory) context.lookup(connectionFactoryRuntime);
- } catch (NamingException e)
- {
- throw (JMSException) (new JMSException("JNDI lookup of JMS Connection Factory [" + connectionFactoryRuntime + "] failed.").initCause(e));
- } catch (ClassCastException e)
- {
- throw (JMSException) (new JMSException("JNDI lookup of JMS Connection Factory failed. Class [" + connectionFactoryRuntime + "] is not an instance of [" + ConnectionFactory.class.getName() + "].").initCause(e));
- } finally
- {
- if (context != null)
- {
- try
- {
- context.close();
- } catch (NamingException ne)
- {
- logger.error("Failed to close Naming Context.", ne);
- }
- }
- }
-
- return factory;
- }
-
/**
- * Get the JNDI Context.
+ * Lookup the destination.
*
- * @return The context.
- * @throws JMSException Error getting context.
+ * @param destinationName Destination name.
+ * @param jndiProperties JNDI properties.
+ * @return The JMS Destination.
+ * @throws JMSException Error looking up destination.
*/
- private Context getNamingContext() throws JMSException
+ public static Destination lookupDestination(final String destinationName, final Properties jndiProperties) throws JMSException
{
- Context context;
-
+ // Lookup the destination...
try
{
- context = new InitialContext(jndiProperties);
- } catch (NamingException e)
- {
- throw new JMSException("Failed to load InitialContext: " + jndiProperties);
+ Context context = JNDIUtil.getNamingContext(jndiProperties);
+ try
+ {
+ return (Destination) context.lookup(destinationName);
+ }
+ finally
+ {
+ context.close();
+ }
}
- if (context == null)
+ catch (NamingException e)
{
- throw new JMSException("Failed to create JMS Server JNDI context. Check that '" + Context.PROVIDER_URL + "', '" + Context.INITIAL_CONTEXT_FACTORY + "', '" + Context.URL_PKG_PREFIXES + "' are correctly configured in the supplied JNDI properties.");
+ throw (JMSException) (new JMSException("Destination lookup failed. Destination name '" + destinationName + "'.").initCause(e));
}
-
- return context;
}
-
- /**
- * The Default JMS Exception Listener.
- *
- * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- */
- private class DefaultExceptionListener implements ExceptionListener
- {
-
- /**
- * We want this handler to handle only one exception.
- * It will close all existing resources and create a new instance
- * once it successfully reconnects.
- */
- private boolean hasHandledOneException = false;
-
- /**
- * JMS Exception handler.
- *
- * @param e The exception.
- */
- public final void onException(final JMSException e)
- {
- synchronized (AbstractMessageHandler.DefaultExceptionListener.class)
- {
- if (!hasHandledOneException)
- {
- if (!logger.isDebugEnabled())
- {
- logger.warn("JMS Exception on '" + getClass().getName() + "' on JMS destination '" + destinationName + "' is now closed (turn on debugging for more): " + e.getMessage());
- } else
- {
- logger.warn("JMS Exception on '" + getClass().getName() + "' on JMS destination '" + destinationName + "' is now closed.", e);
- }
- close();
- while (!isConnected())
- {
- try
- {
- connect();
- } catch (JMSException e1)
- {
- // Keep trying...
- continue;
- }
-
- // We've reconnected...
- try
- {
- Thread.sleep(5000);
- } catch (InterruptedException e1)
- {
- if (!logger.isDebugEnabled())
- {
- logger.warn("Interrupted during reconnect attempt. Aborting reconnect! Will need restart to reconnect (turn on debugging for more): " + e.getMessage());
- } else
- {
- logger.debug("Interrupted during reconnect attempt. Aborting reconnect! Will need restart to reconnect.", e);
- }
- }
- }
- hasHandledOneException = true;
- }
- }
- }
- }
-
- /**
- * Is the handler connected.
- *
- * @return True if the connector is connected, otherwise false.
- */
- public abstract boolean isConnected();
}
Modified: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageListener.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageListener.java 2008-09-10 16:10:28 UTC (rev 22608)
+++ labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageListener.java 2008-09-10 16:10:35 UTC (rev 22609)
@@ -21,7 +21,13 @@
import org.apache.log4j.Logger;
-import javax.jms.*;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
import java.util.Properties;
/**
@@ -66,13 +72,15 @@
* Connects the listener to the destination.
*
* @param destinationName The destination name.
- * @param jndiProperties The JNDI properties.
+ * @param jmsSession The JMS Session on which this handler is created.
+ * @param properties Listener properties.
+ * @throws JMSException Failed to connect.
*/
- protected AbstractMessageListener(final String destinationName, final Properties jndiProperties)
+ protected AbstractMessageListener(final String destinationName, final JMSSession jmsSession, final Properties properties) throws JMSException
{
- super(destinationName, jndiProperties);
+ super(destinationName, jmsSession);
logger = Logger.getLogger(getClass());
- this.properties = jndiProperties;
+ this.properties = properties;
}
/**
@@ -94,18 +102,21 @@
if (messageSelector == null)
{
messageConsumer = ((TopicSession) getSession()).createSubscriber((Topic) getDestination());
- } else
+ }
+ else
{
boolean noLocal = !properties.getProperty(NO_LOCAL, "true").equals("false");
messageConsumer = ((TopicSession) getSession()).createSubscriber((Topic) getDestination(), messageSelector, noLocal);
}
messageConsumer.setMessageListener(this);
- } else
+ }
+ else
{
if (messageSelector == null)
{
messageConsumer = ((QueueSession) getSession()).createReceiver((Queue) getDestination());
- } else
+ }
+ else
{
messageConsumer = ((QueueSession) getSession()).createReceiver((Queue) getDestination(), messageSelector);
}
@@ -113,7 +124,8 @@
}
logger.debug("Successfully connected listener '" + getClass().getName() + "' to JMS destination '" + getDestinationName() + "'.");
- } finally
+ }
+ finally
{
// If we failed to create the message listener, close and clean up....
if (messageConsumer == null)
@@ -135,10 +147,12 @@
messageConsumer.close();
logger.debug("Closed JMS MessageConsumer for connected listener '" + getClass().getName() + "' on JMS destination '" + getDestinationName() + "'.");
}
- } catch (Throwable e)
+ }
+ catch (Throwable e)
{
logger.error("Failed to close JMS Consumer.", e);
- } finally
+ }
+ finally
{
messageConsumer = null;
}
Copied: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/JMSSession.java (from rev 22552, labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageHandler.java)
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/JMSSession.java (rev 0)
+++ labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/JMSSession.java 2008-09-10 16:10:35 UTC (rev 22609)
@@ -0,0 +1,369 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright XXXX, Red Hat Middleware LLC, and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2008, JBoss Inc.
+ */
+package org.jboss.esb.jms;
+
+import org.apache.log4j.Logger;
+import org.jboss.esb.util.AssertArgument;
+import org.jboss.esb.util.JNDIUtil;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicSession;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * JMS Session.
+ * <p/>
+ * This object allows sharing of a JMS session across multiple consumers/producers etc.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class JMSSession
+{
+ /**
+ * Logger.
+ */
+ private Logger logger;
+ /**
+ * JNDI Properties.
+ */
+ private Properties jndiProperties;
+ /**
+ * JMS Connection.
+ */
+ private Connection conn = null;
+ /**
+ * JMS Session.
+ */
+ private Session session = null;
+ /**
+ * JMS Destination Type.
+ */
+ private Class<? extends Destination> destinationType = null;
+ /**
+ * Exception listener.
+ */
+ private ConnectionExceptionListener exceptionListener = new ConnectionExceptionListener();
+
+ /**
+ * Public constructor.
+ *
+ * @param destinationType The JMS destination type.
+ * @param jndiProperties The JNDI properties for creating the JMS session.
+ */
+ public JMSSession(Class<? extends Destination> destinationType, final Properties jndiProperties)
+ {
+ AssertArgument.isNotNull(destinationType, "destinationType");
+ AssertArgument.isNotNull(jndiProperties, "jndiProperties");
+
+ logger = Logger.getLogger(getClass());
+ this.destinationType = destinationType;
+ this.jndiProperties = jndiProperties;
+ }
+
+ /**
+ * Get the JMS Destination type associated with the session.
+ *
+ * @return The JMS Destination type.
+ */
+ public final Class<? extends Destination> getDestinationType()
+ {
+ return destinationType;
+ }
+
+ /**
+ * Get the JNDI Properties from which this session was created.
+ *
+ * @return
+ */
+ public final Properties getProperties()
+ {
+ return jndiProperties;
+ }
+
+ /**
+ * Get the JMS Session.
+ *
+ * @return The JMS Session.
+ */
+ public final Session getSession()
+ {
+ return session;
+ }
+
+ /**
+ * Connect to the configured destination.
+ *
+ * @throws javax.jms.JMSException Failed to connect.
+ */
+ public final void connect() throws JMSException
+ {
+ ConnectionFactory connectionFactory;
+
+ try
+ {
+ // Get the Destination ConnectionFactory...
+ try
+ {
+ connectionFactory = getJmsConnectionFactory();
+ }
+ catch (ClassCastException e)
+ {
+ throw (JMSException) (new JMSException("Invalid JMS ConnectionFactory config. ConnectionFactory doesn't implement " + TopicConnectionFactory.class.getName() + ".").initCause(e));
+ }
+
+ // Create the destination connection...
+ if (destinationType == Topic.class)
+ {
+ conn = ((TopicConnectionFactory) connectionFactory).createTopicConnection();
+ }
+ else
+ {
+ conn = ((QueueConnectionFactory) connectionFactory).createQueueConnection();
+ }
+
+ // Create the Destination Session...
+ if (destinationType == Topic.class)
+ {
+ session = ((TopicConnection) conn).createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
+ }
+ else
+ {
+ session = ((QueueConnection) conn).createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
+ }
+
+ // Start the connection...
+ conn.start();
+
+ // Listen for exceptions on the connection...
+ conn.setExceptionListener(exceptionListener);
+ }
+ catch (JMSException e)
+ {
+ close();
+ throw e;
+ }
+ catch (Throwable t)
+ {
+ close();
+ throw (JMSException) (new JMSException("Unexpected exception while creating JMS Session.").initCause(t));
+ }
+ }
+
+ /**
+ * Close out the session and all it's resources.
+ */
+ public final void close()
+ {
+ try
+ {
+ if (conn != null)
+ {
+ conn.stop();
+ logger.debug("Stopping JMS Connection for connected session.");
+ }
+ }
+ catch (Throwable e)
+ {
+ logger.error("Failed to stop JMS connection.", e);
+ conn = null;
+ }
+ try
+ {
+ if (session != null)
+ {
+ session.close();
+ logger.debug("Closing JMS Session.");
+ }
+ }
+ catch (Throwable e)
+ {
+ logger.error("Failed to close JMS session.", e);
+ }
+ finally
+ {
+ session = null;
+ }
+ try
+ {
+ if (conn != null)
+ {
+ conn.close();
+ logger.debug("Closing JMS Connection for connected session.");
+ }
+ }
+ catch (Throwable e)
+ {
+ logger.error("Failed to close JMS connection.", e);
+ }
+ finally
+ {
+ conn = null;
+ }
+ }
+
+ /**
+ * Get the JMS Connection factory.
+ *
+ * @return Connection Factory.
+ * @throws javax.jms.JMSException Error getting factory.
+ */
+ private ConnectionFactory getJmsConnectionFactory() throws JMSException
+ {
+ ConnectionFactory factory = null;
+ Context context;
+ String connectionFactoryRuntime = jndiProperties.getProperty(ConnectionFactory.class.getName(), "ConnectionFactory");
+
+ context = JNDIUtil.getNamingContext(jndiProperties);
+ try
+ {
+ factory = (ConnectionFactory) context.lookup(connectionFactoryRuntime);
+ }
+ catch (NamingException e)
+ {
+ throw (JMSException) (new JMSException("JNDI lookup of JMS Connection Factory [" + connectionFactoryRuntime + "] failed.").initCause(e));
+ }
+ catch (ClassCastException e)
+ {
+ throw (JMSException) (new JMSException("JNDI lookup of JMS Connection Factory failed. Class [" + connectionFactoryRuntime + "] is not an instance of [" + ConnectionFactory.class.getName() + "].").initCause(e));
+ }
+ finally
+ {
+ if (context != null)
+ {
+ try
+ {
+ context.close();
+ }
+ catch (NamingException ne)
+ {
+ logger.error("Failed to close Naming Context.", ne);
+ }
+ }
+ }
+
+ return factory;
+ }
+
+ /**
+ * Register an exception listener.
+ *
+ * @param listener The listener instance.
+ */
+ public final void registerExceptionListener(final ExceptionListener listener)
+ {
+ exceptionListener.registerExceptionListener(listener);
+ }
+
+ /**
+ * The Default JMS Exception Listener.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+ private class ConnectionExceptionListener implements ExceptionListener
+ {
+ /**
+ * Exception listeners.
+ */
+ private Set<ExceptionListener> listeners = new HashSet<ExceptionListener>();
+
+ /**
+ * Register an exception listener.
+ *
+ * @param listener The listener instance.
+ */
+ public final void registerExceptionListener(final ExceptionListener listener)
+ {
+ listeners.add(listener);
+ }
+
+ /**
+ * JMS Exception handler.
+ *
+ * @param e The exception.
+ */
+ public final void onException(final JMSException e)
+ {
+ synchronized (ConnectionExceptionListener.class)
+ {
+ // Close the session on behalf of the session owner...
+ close();
+
+ // Reconnect on behalf of the session owner...
+ while (!isConnected())
+ {
+ try
+ {
+ connect();
+ }
+ catch (JMSException e1)
+ {
+ // Failed to reconnect, wait and try again...
+ try
+ {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e2)
+ {
+ logger.debug("Interrupted during reconnect attempt. Aborting reconnect! Will need restart to reconnect.", e);
+ }
+ }
+ }
+
+ // Tell all listeners...
+ for (ExceptionListener listener : listeners)
+ {
+ try
+ {
+ listener.onException(e);
+ }
+ catch (Throwable t)
+ {
+ logger.warn("Exception during exception processing from JMS ExceptionListener '" + listener.getClass().getName() + "'.", t);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Is the session connected.
+ *
+ * @return True if the connector is connected, otherwise false.
+ */
+ public final boolean isConnected()
+ {
+ return (session != null);
+ }
+}
\ No newline at end of file
Property changes on: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/JMSSession.java
___________________________________________________________________
Name: svn:mergeinfo
+
Name: svn:eol-style
+ native
Modified: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/MessageSender.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/MessageSender.java 2008-09-10 16:10:28 UTC (rev 22608)
+++ labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/MessageSender.java 2008-09-10 16:10:35 UTC (rev 22609)
@@ -21,8 +21,13 @@
import org.apache.log4j.Logger;
-import javax.jms.*;
-import java.util.Properties;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
/**
* JMS Message sender.
@@ -48,11 +53,11 @@
* Connects the sender to the destination.
*
* @param destinationName The destination name.
- * @param jndiProperties The JNDI properties.
+ * @param jmsSession The JMS Session instance.
*/
- public MessageSender(final String destinationName, final Properties jndiProperties)
+ public MessageSender(final String destinationName, final JMSSession jmsSession) throws JMSException
{
- super(destinationName, jndiProperties);
+ super(destinationName, jmsSession);
logger = Logger.getLogger(getClass());
}
@@ -103,7 +108,6 @@
messageProducer.send(message);
}
-
/**
* Close out the sender and all it's resources.
*/
@@ -125,6 +129,7 @@
}
}
+
/**
* Is the handler connected.
*
Modified: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/MockMessageListener.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/MockMessageListener.java 2008-09-10 16:10:28 UTC (rev 22608)
+++ labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/MockMessageListener.java 2008-09-10 16:10:35 UTC (rev 22609)
@@ -21,6 +21,7 @@
import javax.jms.TextMessage;
import javax.jms.Message;
+import javax.jms.JMSException;
import java.util.List;
import java.util.ArrayList;
import java.util.Properties;
@@ -33,9 +34,9 @@
public final List<TextMessage> messagesReceived = new ArrayList<TextMessage>();
- protected MockMessageListener(final String destinationName, final Properties jndiProperties, final String messageSelector)
+ protected MockMessageListener(final String destinationName, final JMSSession jmsSession, final Properties jndiProperties, final String messageSelector) throws JMSException
{
- super(destinationName, addMessageSelector(jndiProperties, messageSelector));
+ super(destinationName, jmsSession, addMessageSelector(jndiProperties, messageSelector));
}
public void onMessage(Message message)
Added: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/util/JNDIUtil.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/util/JNDIUtil.java (rev 0)
+++ labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/util/JNDIUtil.java 2008-09-10 16:10:35 UTC (rev 22609)
@@ -0,0 +1,69 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright XXXX, Red Hat Middleware LLC, and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2008, JBoss Inc.
+ */
+package org.jboss.esb.util;
+
+import javax.jms.JMSException;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Properties;
+
+/**
+ * JNDI Utilitiy methods.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public abstract class JNDIUtil
+{
+ /**
+ * Private default constructor.
+ */
+ private JNDIUtil()
+ {
+ }
+
+ /**
+ * Get the JNDI Context.
+ * <p/>
+ * Don't forget to close it when done!
+ *
+ * @return The context.
+ * @throws javax.jms.JMSException Error getting context.
+ */
+ public static Context getNamingContext(final Properties jndiProperties) throws JMSException
+ {
+ Context context;
+
+ try
+ {
+ context = new InitialContext(jndiProperties);
+ }
+ catch (NamingException e)
+ {
+ throw new JMSException("Failed to load InitialContext: " + jndiProperties);
+ }
+ if (context == null)
+ {
+ throw new JMSException("Failed to create JNDI context. Check that '" + Context.PROVIDER_URL + "', '" + Context.INITIAL_CONTEXT_FACTORY + "', '" + Context.URL_PKG_PREFIXES + "' are correctly configured in the supplied JNDI properties.");
+ }
+
+ return context;
+ }
+}
Property changes on: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/util/JNDIUtil.java
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: labs/jbossesb/workspace/skeagh/commons/src/test/java/org/jboss/esb/jms/MessageSendAndListenTest.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/test/java/org/jboss/esb/jms/MessageSendAndListenTest.java 2008-09-10 16:10:28 UTC (rev 22608)
+++ labs/jbossesb/workspace/skeagh/commons/src/test/java/org/jboss/esb/jms/MessageSendAndListenTest.java 2008-09-10 16:10:35 UTC (rev 22609)
@@ -59,39 +59,46 @@
public void test() throws Exception
{
Properties jndiProperties = getJndiProperties();
+ JMSSession jmsSession = new JMSSession(destType, jndiProperties);
- if(destType == Topic.class) {
- jndiProperties.setProperty("topic." + destName, destName);
- } else {
- jndiProperties.setProperty("queue." + destName, destName);
- }
+ jmsSession.connect();
+ try
+ {
+ if(destType == Topic.class) {
+ jndiProperties.setProperty("topic." + destName, destName);
+ } else {
+ jndiProperties.setProperty("queue." + destName, destName);
+ }
- MockMessageListener listener = new MockMessageListener(destName, jndiProperties, null);
+ MockMessageListener listener = new MockMessageListener(destName, jmsSession, jndiProperties, null);
- listener.connect();
- assertTrue(destType.isAssignableFrom(listener.getDestination().getClass()));
- try {
- MessageSender sender = new MessageSender(destName, jndiProperties);
- List<String> messagesSent = Arrays.asList(new String[] {"message 1", "message 2", "message 3"});
+ listener.connect();
+ assertTrue(destType.isAssignableFrom(listener.getDestination().getClass()));
+ try {
+ MessageSender sender = new MessageSender(destName, jmsSession);
+ List<String> messagesSent = Arrays.asList(new String[] {"message 1", "message 2", "message 3"});
- sender.connect();
- assertTrue(destType.isAssignableFrom(sender.getDestination().getClass()));
- try {
- for (String message : messagesSent)
- {
- sender.send(sender.getSession().createTextMessage(message));
+ sender.connect();
+ assertTrue(destType.isAssignableFrom(sender.getDestination().getClass()));
+ try {
+ for (String message : messagesSent)
+ {
+ sender.send(sender.getSession().createTextMessage(message));
+ }
+ } finally {
+ sender.close();
}
+
+ Thread.sleep(1000);
+ assertEquals(3, listener.messagesReceived.size());
+ assertTrue(messagesSent.contains(listener.messagesReceived.get(0).getText()));
+ assertTrue(messagesSent.contains(listener.messagesReceived.get(1).getText()));
+ assertTrue(messagesSent.contains(listener.messagesReceived.get(2).getText()));
} finally {
- sender.close();
+ listener.close();
}
-
- Thread.sleep(1000);
- assertEquals(3, listener.messagesReceived.size());
- assertTrue(messagesSent.contains(listener.messagesReceived.get(0).getText()));
- assertTrue(messagesSent.contains(listener.messagesReceived.get(1).getText()));
- assertTrue(messagesSent.contains(listener.messagesReceived.get(2).getText()));
} finally {
- listener.close();
+ jmsSession.close();
}
}
}.run();
@@ -103,54 +110,62 @@
public void test() throws Exception
{
Properties jndiProperties = getJndiProperties();
+ JMSSession jmsSession = new JMSSession(destType, jndiProperties);
- if(destType == Topic.class) {
- jndiProperties.setProperty("topic." + destName, destName);
- } else {
- jndiProperties.setProperty("queue." + destName, destName);
- }
+ jmsSession.connect();
+ try
+ {
+ if(destType == Topic.class) {
+ jndiProperties.setProperty("topic." + destName, destName);
+ jndiProperties.setProperty(AbstractMessageListener.NO_LOCAL, "false");
+ } else {
+ jndiProperties.setProperty("queue." + destName, destName);
+ }
- MockMessageListener service1 = new MockMessageListener(destName, jndiProperties, "serviceId='service1'");
- MockMessageListener service2 = new MockMessageListener(destName, jndiProperties, "serviceId='service2'");
- List<String> service1Messages = Arrays.asList(new String[] {"message 1", "message 2", "message 3"});
- List<String> service2Messages = Arrays.asList(new String[] {"message 4", "message 5", "message 6"});
+ MockMessageListener service1 = new MockMessageListener(destName, jmsSession, jndiProperties, "serviceId='service1'");
+ MockMessageListener service2 = new MockMessageListener(destName, jmsSession, jndiProperties, "serviceId='service2'");
+ List<String> service1Messages = Arrays.asList(new String[] {"message 1", "message 2", "message 3"});
+ List<String> service2Messages = Arrays.asList(new String[] {"message 4", "message 5", "message 6"});
- service1.connect();
- try {
- service2.connect();
+ service1.connect();
try {
- MessageSender sender = new MessageSender(destName, jndiProperties);
+ service2.connect();
+ try {
+ MessageSender sender = new MessageSender(destName, jmsSession);
- sender.connect();
- try {
- for (String message : service1Messages)
- {
- send(sender, message, "service1");
+ sender.connect();
+ try {
+ for (String message : service1Messages)
+ {
+ send(sender, message, "service1");
+ }
+ for (String message : service2Messages)
+ {
+ send(sender, message, "service2");
+ }
+ } finally {
+ sender.close();
}
- for (String message : service2Messages)
- {
- send(sender, message, "service2");
- }
- } finally {
- sender.close();
- }
- Thread.sleep(1000);
+ Thread.sleep(1000);
- assertEquals(3, service1.messagesReceived.size());
- assertTrue(service1Messages.contains(service1.messagesReceived.get(0).getText()));
- assertTrue(service1Messages.contains(service1.messagesReceived.get(1).getText()));
- assertTrue(service1Messages.contains(service1.messagesReceived.get(2).getText()));
+ assertEquals(3, service1.messagesReceived.size());
+ assertTrue(service1Messages.contains(service1.messagesReceived.get(0).getText()));
+ assertTrue(service1Messages.contains(service1.messagesReceived.get(1).getText()));
+ assertTrue(service1Messages.contains(service1.messagesReceived.get(2).getText()));
- assertEquals(3, service2.messagesReceived.size());
- assertTrue(service2Messages.contains(service2.messagesReceived.get(0).getText()));
- assertTrue(service2Messages.contains(service2.messagesReceived.get(1).getText()));
- assertTrue(service2Messages.contains(service2.messagesReceived.get(2).getText()));
+ assertEquals(3, service2.messagesReceived.size());
+ assertTrue(service2Messages.contains(service2.messagesReceived.get(0).getText()));
+ assertTrue(service2Messages.contains(service2.messagesReceived.get(1).getText()));
+ assertTrue(service2Messages.contains(service2.messagesReceived.get(2).getText()));
+ } finally {
+ service2.close();
+ }
} finally {
- service2.close();
+ service1.close();
}
} finally {
- service1.close();
+ jmsSession.close();
}
}
}.run();
Modified: labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsInboundRouter.java
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsInboundRouter.java 2008-09-10 16:10:28 UTC (rev 22608)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsInboundRouter.java 2008-09-10 16:10:35 UTC (rev 22609)
@@ -20,29 +20,27 @@
*/
package org.jboss.esb.jms;
-import static org.jboss.esb.jms.JmsConstants.CONNECTION_FACTORY;
-
-import java.util.Properties;
-
-import javax.jms.JMSException;
-import javax.naming.Context;
-
import org.apache.log4j.Logger;
import org.jboss.esb.annotations.Initialize;
import org.jboss.esb.annotations.Property;
+import org.jboss.esb.annotations.Property.Use;
import org.jboss.esb.annotations.Uninitialize;
-import org.jboss.esb.annotations.Property.Use;
import org.jboss.esb.context.InvocationContext;
+import static org.jboss.esb.jms.JmsConstants.CONNECTION_FACTORY;
import org.jboss.esb.message.Message;
import org.jboss.esb.routing.InboundRouter;
import org.jboss.esb.routing.MessageDispatcher;
import org.jboss.esb.routing.RoutingException;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.naming.Context;
+import java.util.Properties;
+
/**
* Inbound router for JMS.
*
* @author <a href="mailto:dbevenius at jboss.com">Daniel Bevenius</a>
- *
*/
public class JmsInboundRouter implements InboundRouter
{
@@ -71,10 +69,13 @@
/**
* Should JMSHeaders and user properties be extrated from the JMS Message
*/
- @Property(use = Use.OPTIONAL, defaultVal = "true" )
+ @Property(use = Use.OPTIONAL, defaultVal = "true")
private boolean extractProperties;
-
/**
+ * The JMS Session on which the listener is created.
+ */
+ private JMSSession jmsSession;
+ /**
* JmsMessageListener that takes care of the JMS connection.
*/
private JmsMessageListener messageListener;
@@ -90,9 +91,24 @@
log.info("JMSProperties : " + jmsProperties + "," + destination);
// add this class as a JMS message listener...
- messageListener = new JmsMessageListener(destination, jmsProperties, dispatcher);
- messageListener.connect();
-
+ Destination jmsDestination = AbstractMessageHandler.lookupDestination(destination, jmsProperties);
+ jmsSession = new JMSSession(jmsDestination.getClass(), jmsProperties);
+ jmsSession.connect();
+ try
+ {
+ messageListener = new JmsMessageListener(destination, jmsSession, jmsProperties, dispatcher);
+ messageListener.connect();
+ }
+ catch (JMSException e)
+ {
+ jmsSession.close();
+ throw e;
+ }
+ catch (Throwable t)
+ {
+ jmsSession.close();
+ throw (JMSException) (new JMSException("Unexpected exception while connecting JmsMesssageListener.").initCause(t));
+ }
}
/**
@@ -101,7 +117,15 @@
@Uninitialize
public final void uninitialize()
{
- messageListener.close();
+ try
+ {
+ messageListener.close();
+ }
+ finally
+ {
+ jmsSession.close();
+ }
+
}
/**
@@ -116,6 +140,7 @@
/**
* Set the dispatcher used by this instance.
+ *
* @param dispatcher - the dispatcher to use
*/
public final void setDispatcher(final MessageDispatcher dispatcher)
@@ -125,6 +150,7 @@
/**
* Will display the dispatcher and the properties for this instance.
+ *
* @return String - string representation of this instance.
*/
@Override
@@ -164,7 +190,6 @@
* the {@link MessageDispatcher} to pass the message along though the ESB.
*
* @author <a href="mailto:dbevenius at redhat.com">Daniel Bevenius</a>
- *
*/
private class JmsMessageListener extends AbstractMessageListener
{
@@ -176,13 +201,13 @@
/**
* Protected contstructor.
*
- * @param destinationName the JMS destination name.
- * @param jndiProperties the jndi properties to be used for jndi lookups.
- * @param dispatcher the {@link MessageDispatcher} to be used.
+ * @param destinationName the JMS destination name.
+ * @param jndiProperties the jndi properties to be used for jndi lookups.
+ * @param dispatcher the {@link MessageDispatcher} to be used.
*/
- protected JmsMessageListener(final String destinationName, final Properties jndiProperties, final MessageDispatcher dispatcher)
+ protected JmsMessageListener(final String destinationName, final JMSSession jmsSession, final Properties jndiProperties, final MessageDispatcher dispatcher) throws JMSException
{
- super(destinationName, jndiProperties);
+ super(destinationName, jmsSession, jndiProperties);
this.dispatcher = dispatcher;
}
@@ -190,7 +215,8 @@
* Handles a single JMSMessage, extracts the contents and creates
* and populates an ESB Message object instance and dispataches
* the ESB Message object instance to the ESB.
- * @param jmsMessage the JMS Message object instance
+ *
+ * @param jmsMessage the JMS Message object instance
*/
public void onMessage(final javax.jms.Message jmsMessage)
{
Modified: labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/JmsInboundRouterTest.java
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/JmsInboundRouterTest.java 2008-09-10 16:10:28 UTC (rev 22608)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/JmsInboundRouterTest.java 2008-09-10 16:10:35 UTC (rev 22609)
@@ -20,20 +20,19 @@
*/
package org.jboss.esb.jms;
-import java.util.Properties;
+import org.jboss.esb.deploy.DeploymentRuntime;
+import org.jboss.esb.deploy.config.digest.DigestUtil;
+import org.junit.Test;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import java.util.Properties;
-import org.jboss.esb.deploy.DeploymentRuntime;
-import org.jboss.esb.deploy.config.digest.DigestUtil;
-import org.junit.Test;
-
/**
* Test for {@link JmsInboundRouter}.
*
* @author <a href="mailto:dbevenius at redhat.com">Daniel Bevenius</a>
- *
*/
public class JmsInboundRouterTest
{
@@ -42,32 +41,46 @@
@Test
public final void initializeConfOverride() throws Exception
{
- new JMSTestRunner(JMSTestRunner.DEFAULT_PROVIDER_URL) {
+ new JMSTestRunner()
+ {
@Override
public void test() throws Exception
{
DeploymentRuntime runtime = DigestUtil.digestConfig(getClass().getResourceAsStream("jms-inbound-router_01.xml"));
runtime.deploy();
- try {
+ try
+ {
Thread.sleep(3000);
Properties jndiProperties = getJndiProperties();
jndiProperties.setProperty("queue." + DESTINATION_NAME, DESTINATION_NAME);
- MessageSender sender = new MessageSender(DESTINATION_NAME, jndiProperties);
+ JMSSession jmsSession = new JMSSession(Queue.class, jndiProperties);
+
+ jmsSession.connect();
try
{
- sender.connect();
- Session session = sender.getSession();
- TextMessage textMessage = session.createTextMessage("test");
- sender.send(textMessage);
- Thread.sleep(3000);
+ MessageSender sender = new MessageSender(DESTINATION_NAME, jmsSession);
+ try
+ {
+ sender.connect();
+ Session session = sender.getSession();
+ TextMessage textMessage = session.createTextMessage("test");
+ sender.send(textMessage);
+ Thread.sleep(3000);
+ }
+ finally
+ {
+ sender.close();
+ }
}
finally
{
- sender.close();
+ jmsSession.close();
}
- } finally {
+ }
+ finally
+ {
runtime.undeploy();
}
}
Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java 2008-09-10 16:10:28 UTC (rev 22608)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java 2008-09-10 16:10:35 UTC (rev 22609)
@@ -30,6 +30,7 @@
import org.jboss.esb.federate.notify.DeploymentHeartbeatNotification;
import org.jboss.esb.federate.notify.DeploymentUndeployNotification;
import org.jboss.esb.jms.AbstractMessageListener;
+import org.jboss.esb.jms.JMSSession;
import org.jboss.esb.jms.MessageSender;
import org.jboss.esb.properties.ApplicationProperties;
import org.jboss.esb.schedule.AbstractScheduleListener;
@@ -39,6 +40,8 @@
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Topic;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -100,6 +103,14 @@
*/
private DeploymentDetailsNotification detailsNotification;
/**
+ * Shared JMS Topic Session.
+ */
+ private JMSSession topicSession;
+ /**
+ * Shared JMS Topic Session.
+ */
+ private JMSSession queueSession;
+ /**
* Deployment coordination listener.
*/
private CoordinationListener coordinationListener;
@@ -149,23 +160,37 @@
if (localBusProperties != null)
{
- connectCoordinationListener(localBusProperties);
- if (coordinationListener != null && coordinationListener.isConnected())
+ intialiseJMSSessions(localBusProperties);
+
+ if(topicSession != null)
{
try
{
- connectCoordinationBroadcaster(localBusProperties);
-
- // Tell all other deployments we're online, what services we have etc...
- sendNotification(detailsNotification);
+ connectCoordinationListener(localBusProperties);
}
catch (Throwable t)
{
- throw new DeploymentException("Unable to initialize Deployment Coordinator.", t);
+ closeJMSSessions();
+ throw new DeploymentException("Failed to connect coordination listener", t);
}
+ if (coordinationListener != null && coordinationListener.isConnected())
+ {
+ try
+ {
+ connectCoordinationBroadcaster(localBusProperties);
- // Set the monitoring timeout - 4 heartbeats...
- monitorTimeout = (localBusProperties.getLongProperty(COORDINATOR_HEARTBEAT_FREQUENCY_KEY, COORDINATOR_HEARTBEAT_DEFAULT_FREQUENCY) * 4);
+ // Tell all other deployments we're online, what services we have etc...
+ sendNotification(detailsNotification);
+ }
+ catch (Throwable t)
+ {
+ closeJMSSessions();
+ throw new DeploymentException("Unable to initialize Deployment Coordinator.", t);
+ }
+
+ // Set the monitoring timeout - 4 heartbeats...
+ monitorTimeout = (localBusProperties.getLongProperty(COORDINATOR_HEARTBEAT_FREQUENCY_KEY, COORDINATOR_HEARTBEAT_DEFAULT_FREQUENCY) * 4);
+ }
}
}
}
@@ -209,6 +234,8 @@
logger.warn("Error closing deployment coordination broadcaster for deployment '" + runtime.getDeploymentName() + "'.", t);
}
}
+
+ closeJMSSessions();
}
/**
@@ -272,15 +299,16 @@
*/
private class CoordinationListener extends AbstractMessageListener
{
+
/**
* Constructor.
*
* @param destinationName Destination name.
* @param jndiProperties JNDI properties.
*/
- protected CoordinationListener(final String destinationName, final Properties jndiProperties)
+ protected CoordinationListener(final String destinationName, final Properties jndiProperties) throws JMSException
{
- super(destinationName, jndiProperties);
+ super(destinationName, topicSession, jndiProperties);
}
/**
@@ -343,6 +371,7 @@
}
}
}
+
}
/**
@@ -449,7 +478,7 @@
*
* @param localBusProperties Bus configuration properties.
*/
- private void connectCoordinationListener(Properties localBusProperties)
+ private void connectCoordinationListener(Properties localBusProperties) throws JMSException
{
String coordinationTopicName = localBusProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
@@ -476,11 +505,11 @@
*
* @param localBusProperties Bus configuration properties.
*/
- private void connectCoordinationBroadcaster(Properties localBusProperties) throws DeploymentException
+ private void connectCoordinationBroadcaster(Properties localBusProperties) throws DeploymentException, JMSException
{
String coordinationTopicName = localBusProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
- coordinationBroadcaster = new MessageSender(coordinationTopicName, localBusProperties);
+ coordinationBroadcaster = new MessageSender(coordinationTopicName, topicSession);
try
{
coordinationBroadcaster.connect();
@@ -499,14 +528,17 @@
*/
public static class DeploymentMonitor
{
+
/**
* Time of last received heartbeat.
*/
private long lastHeartbeat;
+
/**
* Deployment Service Sets.
*/
private DeploymentServiceSets serviceSets;
+
/**
* Deployment online/offline flag.
*/
@@ -553,5 +585,60 @@
{
return "{online=" + online + "} " + serviceSets.toString();
}
+
}
+
+ /**
+ * Initialize the shared JMS Sessions.
+ *
+ * @param localBusProperties Local JMS Bus properties.
+ */
+ private void intialiseJMSSessions(final ApplicationProperties localBusProperties)
+ {
+ topicSession = new JMSSession(Topic.class, localBusProperties);
+ try
+ {
+ topicSession.connect();
+ }
+ catch (Throwable t)
+ {
+ closeJMSSessions();
+ logger.debug("Failed to connect shared deployment JMS Topic Session.", t);
+ return;
+ }
+ queueSession = new JMSSession(Queue.class, localBusProperties);
+ try
+ {
+ queueSession.connect();
+ }
+ catch (Throwable t)
+ {
+ closeJMSSessions();
+ logger.debug("Failed to connect shared deployment JMS Queue Session.", t);
+ }
+ }
+
+ /**
+ * Close the JMS Sessions.
+ */
+ private void closeJMSSessions()
+ {
+ // Close the sessions...
+ if(topicSession != null)
+ {
+ if(topicSession.isConnected())
+ {
+ topicSession.close();
+ }
+ topicSession = null;
+ }
+ if(queueSession != null)
+ {
+ if(queueSession.isConnected())
+ {
+ queueSession.close();
+ }
+ queueSession = null;
+ }
+ }
}
Added: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/JMSBusDispatcher.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/JMSBusDispatcher.java (rev 0)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/JMSBusDispatcher.java 2008-09-10 16:10:35 UTC (rev 22609)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright XXXX, Red Hat Middleware LLC, and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2008, JBoss Inc.
+ */
+package org.jboss.esb.federate.bus;
+
+import org.jboss.esb.routing.MessageDispatcher;
+import org.jboss.esb.routing.RoutingException;
+import org.jboss.esb.message.Message;
+import org.jboss.esb.context.InvocationContext;
+
+/**
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class JMSBusDispatcher implements MessageDispatcher
+{
+ public void dispatch(Message message, InvocationContext invocationContext) throws RoutingException
+ {
+ }
+}
Property changes on: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/JMSBusDispatcher.java
___________________________________________________________________
Name: svn:eol-style
+ native
More information about the jboss-svn-commits
mailing list