[jboss-svn-commits] JBL Code SVN: r22609 - in labs/jbossesb/workspace/skeagh: commons/src/main/java/org/jboss/esb/util and 5 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Sep 10 12:10:35 EDT 2008


Author: tfennelly
Date: 2008-09-10 12:10:35 -0400 (Wed, 10 Sep 2008)
New Revision: 22609

Added:
   labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/JMSSession.java
   labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/util/JNDIUtil.java
   labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/
   labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/JMSBusDispatcher.java
Modified:
   labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageHandler.java
   labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageListener.java
   labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/MessageSender.java
   labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/MockMessageListener.java
   labs/jbossesb/workspace/skeagh/commons/src/test/java/org/jboss/esb/jms/MessageSendAndListenTest.java
   labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsInboundRouter.java
   labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/JmsInboundRouterTest.java
   labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java
Log:
Extracted the JMSSession object from the AbstractMessageHandler.  This allows multiple message consumers and producers to share these resources, the lifecycle of which is managed around the deployment.

Modified: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageHandler.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageHandler.java	2008-09-10 16:10:28 UTC (rev 22608)
+++ labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageHandler.java	2008-09-10 16:10:35 UTC (rev 22609)
@@ -21,10 +21,13 @@
 
 import org.apache.log4j.Logger;
 import org.jboss.esb.util.AssertArgument;
+import org.jboss.esb.util.JNDIUtil;
 
-import javax.jms.*;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Session;
 import javax.naming.Context;
-import javax.naming.InitialContext;
 import javax.naming.NamingException;
 import java.util.Properties;
 
@@ -33,7 +36,7 @@
  *
  * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
  */
-public abstract class AbstractMessageHandler
+public abstract class AbstractMessageHandler implements ExceptionListener
 {
     /**
      * Logger.
@@ -44,18 +47,10 @@
      */
     private String destinationName;
     /**
-     * JNDI Properties.
+     * The JMS Session.
      */
-    private Properties jndiProperties;
+    private JMSSession jmsSession;
     /**
-     * JMS Connection.
-     */
-    private Connection conn = null;
-    /**
-     * JMS Session.
-     */
-    private Session session = null;
-    /**
      * JMS Destination.
      */
     private Destination destination = null;
@@ -64,16 +59,17 @@
      * Public vonstructor.
      *
      * @param destinationName The JMS destination name.
-     * @param jndiProperties  The JNDI properties for connecting to the JMS destination.
+     * @param jmsSession      The The JMS Session on which this handler is created.
+     * @throws JMSException Failed to connect.
      */
-    public AbstractMessageHandler(final String destinationName, final Properties jndiProperties)
+    public AbstractMessageHandler(final String destinationName, final JMSSession jmsSession) throws JMSException
     {
         AssertArgument.isNotNullAndNotEmpty(destinationName, "destinationName");
-        AssertArgument.isNotNull(jndiProperties, "jndiProperties");
+        AssertArgument.isNotNull(jmsSession, "jmsSession");
 
         logger = Logger.getLogger(getClass());
-        this.jndiProperties = jndiProperties;
         this.destinationName = destinationName;
+        this.jmsSession = jmsSession;
     }
 
     /**
@@ -93,7 +89,7 @@
      */
     public final Session getSession()
     {
-        return session;
+        return jmsSession.getSession();
     }
 
     /**
@@ -113,61 +109,16 @@
      */
     public final void connect() throws JMSException
     {
-        ConnectionFactory connectionFactory;
-
-        // Lookup the destination...
-        try
+        destination = lookupDestination(destinationName, jmsSession.getProperties());
+        if (!jmsSession.getDestinationType().isAssignableFrom(destination.getClass()))
         {
-            Context context = getNamingContext();
-            try
-            {
-                destination = (Destination) context.lookup(destinationName);
-            } finally
-            {
-                context.close();
-            }
-        } catch (NamingException e)
-        {
-            throw (JMSException) (new JMSException("Destination lookup failed.  Destination name '" + destinationName + "'.").initCause(e));
+            throw new JMSException("Handler '" + getClass().getName() + "' destination '" + destinationName + "' cannot be created.  The destination type does not match that of the associate JMSSession instance.  Session type is '" + jmsSession.getDestinationType().getName() + "'.");
         }
 
-        // Get the Destination ConnectionFactory...
-        try
-        {
-            connectionFactory = getJmsConnectionFactory();
-        } catch (ClassCastException e)
-        {
-            throw (JMSException) (new JMSException("Invalid JMS ConnectionFactory config.  ConnectionFactory doesn't implement " + TopicConnectionFactory.class.getName() + ".").initCause(e));
-        }
-
-        // Create the destination connection...
-        if (destination instanceof Topic)
-        {
-            conn = ((TopicConnectionFactory) connectionFactory).createTopicConnection();
-        } else
-        {
-            conn = ((QueueConnectionFactory) connectionFactory).createQueueConnection();
-        }
-
-        // Create the Destination Session...
-        if (destination instanceof Topic)
-        {
-            session = ((TopicConnection) conn).createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
-        } else
-        {
-            session = ((QueueConnection) conn).createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
-        }
-
-        // Start the connection...
-        conn.start();
-
-        // Listen for exceptions on the connection...
+        // Listen for exceptions on the session connection...
         if (this instanceof ExceptionListener)
         {
-            conn.setExceptionListener((ExceptionListener) this);
-        } else
-        {
-            conn.setExceptionListener(new DefaultExceptionListener());
+            jmsSession.registerExceptionListener((ExceptionListener) this);
         }
 
         // And perform the handler specific connection ops...
@@ -183,54 +134,42 @@
         {
             // And perform the handler specific close ops...
             handlerClose();
-        } catch (Throwable e)
+        }
+        catch (Throwable e)
         {
             logger.debug("Handler '" + getClass().getName() + "' close method threw unexpected exception while closing JMS destination '" + destinationName + "'.", e);
             // Handler close failures should be taken care of by the handler impl.  Continue closing...
         }
+        destination = null;
+        logger.debug("JMS handler '" + getClass().getName() + "' on JMS destination '" + destinationName + "' is now closed.");
+    }
 
-        try
+    /**
+     * JMS Exception handler.
+     * @param e The JMS Exception.
+     */
+    public void onException(JMSException e)
+    {
+        close();
+        while (!isConnected())
         {
-            if (conn != null)
+            try
             {
-                conn.stop();
-                logger.debug("Stopping JMS Connection for connected handler '" + getClass().getName() + "' on JMS destination '" + destinationName + "'.");
+                connect();
             }
-        } catch (Throwable e)
-        {
-            logger.error("Failed to stop JMS connection.", e);
-            conn = null;
-        }
-        try
-        {
-            if (session != null)
+            catch (JMSException e1)
             {
-                session.close();
-                logger.debug("Closing JMS Session for connected handler '" + getClass().getName() + "' on JMS destination '" + destinationName + "'.");
+                // Failed to reconnect, wait and try again...
+                try
+                {
+                    Thread.sleep(5000);
+                }
+                catch (InterruptedException e2)
+                {
+                    logger.debug("Interrupted during reconnect attempt.  Aborting reconnect!  Will need restart to reconnect.", e);
+                }
             }
-        } catch (Throwable e)
-        {
-            logger.error("Failed to close JMS session.", e);
-        } finally
-        {
-            session = null;
         }
-        try
-        {
-            if (conn != null)
-            {
-                conn.close();
-                logger.debug("Closing JMS Connection for connected handler '" + getClass().getName() + "' on JMS destination '" + destinationName + "'.");
-            }
-        } catch (Throwable e)
-        {
-            logger.error("Failed to close JMS connection.", e);
-        } finally
-        {
-            conn = null;
-        }
-        destination = null;
-        logger.debug("JMS handler '" + getClass().getName() + "' on JMS destination '" + destinationName + "' is now closed.");
     }
 
     /**
@@ -250,139 +189,38 @@
     protected abstract void handlerClose();
 
     /**
-     * Get the JMS Connection factory.
+     * Is the handler connected.
      *
-     * @return Connection Factory.
-     * @throws JMSException Error getting factory.
+     * @return True if the connector is connected, otherwise false.
      */
-    private ConnectionFactory getJmsConnectionFactory() throws JMSException
-    {
-        ConnectionFactory factory = null;
-        Context context;
-        String connectionFactoryRuntime = jndiProperties.getProperty(ConnectionFactory.class.getName(), "ConnectionFactory");
+    public abstract boolean isConnected();
 
-        context = getNamingContext();
-        try
-        {
-            factory = (ConnectionFactory) context.lookup(connectionFactoryRuntime);
-        } catch (NamingException e)
-        {
-            throw (JMSException) (new JMSException("JNDI lookup of JMS Connection Factory [" + connectionFactoryRuntime + "] failed.").initCause(e));
-        } catch (ClassCastException e)
-        {
-            throw (JMSException) (new JMSException("JNDI lookup of JMS Connection Factory failed.  Class [" + connectionFactoryRuntime + "] is not an instance of [" + ConnectionFactory.class.getName() + "].").initCause(e));
-        } finally
-        {
-            if (context != null)
-            {
-                try
-                {
-                    context.close();
-                } catch (NamingException ne)
-                {
-                    logger.error("Failed to close Naming Context.", ne);
-                }
-            }
-        }
-
-        return factory;
-    }
-
     /**
-     * Get the JNDI Context.
+     * Lookup the destination.
      *
-     * @return The context.
-     * @throws JMSException Error getting context.
+     * @param destinationName Destination name.
+     * @param jndiProperties  JNDI properties.
+     * @return The JMS Destination.
+     * @throws JMSException Error looking up destination.
      */
-    private Context getNamingContext() throws JMSException
+    public static Destination lookupDestination(final String destinationName, final Properties jndiProperties) throws JMSException
     {
-        Context context;
-
+        // Lookup the destination...
         try
         {
-            context = new InitialContext(jndiProperties);
-        } catch (NamingException e)
-        {
-            throw new JMSException("Failed to load InitialContext: " + jndiProperties);
+            Context context = JNDIUtil.getNamingContext(jndiProperties);
+            try
+            {
+                return (Destination) context.lookup(destinationName);
+            }
+            finally
+            {
+                context.close();
+            }
         }
-        if (context == null)
+        catch (NamingException e)
         {
-            throw new JMSException("Failed to create JMS Server JNDI context.  Check that '" + Context.PROVIDER_URL + "', '" + Context.INITIAL_CONTEXT_FACTORY + "', '" + Context.URL_PKG_PREFIXES + "' are correctly configured in the supplied JNDI properties.");
+            throw (JMSException) (new JMSException("Destination lookup failed.  Destination name '" + destinationName + "'.").initCause(e));
         }
-
-        return context;
     }
-
-    /**
-     * The Default JMS Exception Listener.
-     *
-     * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
-     */
-    private class DefaultExceptionListener implements ExceptionListener
-    {
-
-        /**
-         * We want this handler to handle only one exception.
-         * It will close all existing resources and create a new instance
-         * once it successfully reconnects.
-         */
-        private boolean hasHandledOneException = false;
-
-        /**
-         * JMS Exception handler.
-         *
-         * @param e The exception.
-         */
-        public final void onException(final JMSException e)
-        {
-            synchronized (AbstractMessageHandler.DefaultExceptionListener.class)
-            {
-                if (!hasHandledOneException)
-                {
-                    if (!logger.isDebugEnabled())
-                    {
-                        logger.warn("JMS Exception on '" + getClass().getName() + "' on JMS destination '" + destinationName + "' is now closed (turn on debugging for more): " + e.getMessage());
-                    } else
-                    {
-                        logger.warn("JMS Exception on '" + getClass().getName() + "' on JMS destination '" + destinationName + "' is now closed.", e);
-                    }
-                    close();
-                    while (!isConnected())
-                    {
-                        try
-                        {
-                            connect();
-                        } catch (JMSException e1)
-                        {
-                            // Keep trying...
-                            continue;
-                        }
-
-                        // We've reconnected...
-                        try
-                        {
-                            Thread.sleep(5000);
-                        } catch (InterruptedException e1)
-                        {
-                            if (!logger.isDebugEnabled())
-                            {
-                                logger.warn("Interrupted during reconnect attempt.  Aborting reconnect!  Will need restart to reconnect (turn on debugging for more): " + e.getMessage());
-                            } else
-                            {
-                                logger.debug("Interrupted during reconnect attempt.  Aborting reconnect!  Will need restart to reconnect.", e);
-                            }
-                        }
-                    }
-                    hasHandledOneException = true;
-                }
-            }
-        }
-    }
-
-    /**
-     * Is the handler connected.
-     *
-     * @return True if the connector is connected, otherwise false.
-     */
-    public abstract boolean isConnected();
 }

Modified: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageListener.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageListener.java	2008-09-10 16:10:28 UTC (rev 22608)
+++ labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageListener.java	2008-09-10 16:10:35 UTC (rev 22609)
@@ -21,7 +21,13 @@
 
 import org.apache.log4j.Logger;
 
-import javax.jms.*;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
 import java.util.Properties;
 
 /**
@@ -66,13 +72,15 @@
      * Connects the listener to the destination.
      *
      * @param destinationName The destination name.
-     * @param jndiProperties  The JNDI properties.
+     * @param jmsSession      The JMS Session on which this handler is created.
+     * @param properties      Listener properties.
+     * @throws JMSException Failed to connect.
      */
-    protected AbstractMessageListener(final String destinationName, final Properties jndiProperties)
+    protected AbstractMessageListener(final String destinationName, final JMSSession jmsSession, final Properties properties) throws JMSException
     {
-        super(destinationName, jndiProperties);
+        super(destinationName, jmsSession);
         logger = Logger.getLogger(getClass());
-        this.properties = jndiProperties;
+        this.properties = properties;
     }
 
     /**
@@ -94,18 +102,21 @@
                 if (messageSelector == null)
                 {
                     messageConsumer = ((TopicSession) getSession()).createSubscriber((Topic) getDestination());
-                } else
+                }
+                else
                 {
                     boolean noLocal = !properties.getProperty(NO_LOCAL, "true").equals("false");
                     messageConsumer = ((TopicSession) getSession()).createSubscriber((Topic) getDestination(), messageSelector, noLocal);
                 }
                 messageConsumer.setMessageListener(this);
-            } else
+            }
+            else
             {
                 if (messageSelector == null)
                 {
                     messageConsumer = ((QueueSession) getSession()).createReceiver((Queue) getDestination());
-                } else
+                }
+                else
                 {
                     messageConsumer = ((QueueSession) getSession()).createReceiver((Queue) getDestination(), messageSelector);
                 }
@@ -113,7 +124,8 @@
             }
 
             logger.debug("Successfully connected listener '" + getClass().getName() + "' to JMS destination '" + getDestinationName() + "'.");
-        } finally
+        }
+        finally
         {
             // If we failed to create the message listener, close and clean up....
             if (messageConsumer == null)
@@ -135,10 +147,12 @@
                 messageConsumer.close();
                 logger.debug("Closed JMS MessageConsumer for connected listener '" + getClass().getName() + "' on JMS destination '" + getDestinationName() + "'.");
             }
-        } catch (Throwable e)
+        }
+        catch (Throwable e)
         {
             logger.error("Failed to close JMS Consumer.", e);
-        } finally
+        }
+        finally
         {
             messageConsumer = null;
         }

Copied: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/JMSSession.java (from rev 22552, labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageHandler.java)
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/JMSSession.java	                        (rev 0)
+++ labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/JMSSession.java	2008-09-10 16:10:35 UTC (rev 22609)
@@ -0,0 +1,369 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright XXXX, Red Hat Middleware LLC, and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2008, JBoss Inc.
+ */
+package org.jboss.esb.jms;
+
+import org.apache.log4j.Logger;
+import org.jboss.esb.util.AssertArgument;
+import org.jboss.esb.util.JNDIUtil;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicSession;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * JMS Session.
+ * <p/>
+ * This object allows sharing of a JMS session across multiple consumers/producers etc.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class JMSSession
+{
+    /**
+     * Logger.
+     */
+    private Logger logger;
+    /**
+     * JNDI Properties.
+     */
+    private Properties jndiProperties;
+    /**
+     * JMS Connection.
+     */
+    private Connection conn = null;
+    /**
+     * JMS Session.
+     */
+    private Session session = null;
+    /**
+     * JMS Destination Type.
+     */
+    private Class<? extends Destination> destinationType = null;
+    /**
+     * Exception listener.
+     */
+    private ConnectionExceptionListener exceptionListener = new ConnectionExceptionListener();
+
+    /**
+     * Public constructor.
+     *
+     * @param destinationType The JMS destination type.
+     * @param jndiProperties  The JNDI properties for creating the JMS session.
+     */
+    public JMSSession(Class<? extends Destination> destinationType, final Properties jndiProperties)
+    {
+        AssertArgument.isNotNull(destinationType, "destinationType");
+        AssertArgument.isNotNull(jndiProperties, "jndiProperties");
+
+        logger = Logger.getLogger(getClass());
+        this.destinationType = destinationType;
+        this.jndiProperties = jndiProperties;
+    }
+
+    /**
+     * Get the JMS Destination type associated with the session.
+     *
+     * @return The JMS Destination type.
+     */
+    public final Class<? extends Destination> getDestinationType()
+    {
+        return destinationType;
+    }
+
+    /**
+     * Get the JNDI Properties from which this session was created.
+     *
+     * @return
+     */
+    public final Properties getProperties()
+    {
+        return jndiProperties;
+    }
+
+    /**
+     * Get the JMS Session.
+     *
+     * @return The JMS Session.
+     */
+    public final Session getSession()
+    {
+        return session;
+    }
+
+    /**
+     * Connect to the configured destination.
+     *
+     * @throws javax.jms.JMSException Failed to connect.
+     */
+    public final void connect() throws JMSException
+    {
+        ConnectionFactory connectionFactory;
+
+        try
+        {
+            // Get the Destination ConnectionFactory...
+            try
+            {
+                connectionFactory = getJmsConnectionFactory();
+            }
+            catch (ClassCastException e)
+            {
+                throw (JMSException) (new JMSException("Invalid JMS ConnectionFactory config.  ConnectionFactory doesn't implement " + TopicConnectionFactory.class.getName() + ".").initCause(e));
+            }
+
+            // Create the destination connection...
+            if (destinationType == Topic.class)
+            {
+                conn = ((TopicConnectionFactory) connectionFactory).createTopicConnection();
+            }
+            else
+            {
+                conn = ((QueueConnectionFactory) connectionFactory).createQueueConnection();
+            }
+
+            // Create the Destination Session...
+            if (destinationType == Topic.class)
+            {
+                session = ((TopicConnection) conn).createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
+            }
+            else
+            {
+                session = ((QueueConnection) conn).createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
+            }
+
+            // Start the connection...
+            conn.start();
+
+            // Listen for exceptions on the connection...
+            conn.setExceptionListener(exceptionListener);
+        }
+        catch (JMSException e)
+        {
+            close();
+            throw e;
+        }
+        catch (Throwable t)
+        {
+            close();
+            throw (JMSException) (new JMSException("Unexpected exception while creating JMS Session.").initCause(t));
+        }
+    }
+
+    /**
+     * Close out the session and all it's resources.
+     */
+    public final void close()
+    {
+        try
+        {
+            if (conn != null)
+            {
+                conn.stop();
+                logger.debug("Stopping JMS Connection for connected session.");
+            }
+        }
+        catch (Throwable e)
+        {
+            logger.error("Failed to stop JMS connection.", e);
+            conn = null;
+        }
+        try
+        {
+            if (session != null)
+            {
+                session.close();
+                logger.debug("Closing JMS Session.");
+            }
+        }
+        catch (Throwable e)
+        {
+            logger.error("Failed to close JMS session.", e);
+        }
+        finally
+        {
+            session = null;
+        }
+        try
+        {
+            if (conn != null)
+            {
+                conn.close();
+                logger.debug("Closing JMS Connection for connected session.");
+            }
+        }
+        catch (Throwable e)
+        {
+            logger.error("Failed to close JMS connection.", e);
+        }
+        finally
+        {
+            conn = null;
+        }
+    }
+
+    /**
+     * Get the JMS Connection factory.
+     *
+     * @return Connection Factory.
+     * @throws javax.jms.JMSException Error getting factory.
+     */
+    private ConnectionFactory getJmsConnectionFactory() throws JMSException
+    {
+        ConnectionFactory factory = null;
+        Context context;
+        String connectionFactoryRuntime = jndiProperties.getProperty(ConnectionFactory.class.getName(), "ConnectionFactory");
+
+        context = JNDIUtil.getNamingContext(jndiProperties);
+        try
+        {
+            factory = (ConnectionFactory) context.lookup(connectionFactoryRuntime);
+        }
+        catch (NamingException e)
+        {
+            throw (JMSException) (new JMSException("JNDI lookup of JMS Connection Factory [" + connectionFactoryRuntime + "] failed.").initCause(e));
+        }
+        catch (ClassCastException e)
+        {
+            throw (JMSException) (new JMSException("JNDI lookup of JMS Connection Factory failed.  Class [" + connectionFactoryRuntime + "] is not an instance of [" + ConnectionFactory.class.getName() + "].").initCause(e));
+        }
+        finally
+        {
+            if (context != null)
+            {
+                try
+                {
+                    context.close();
+                }
+                catch (NamingException ne)
+                {
+                    logger.error("Failed to close Naming Context.", ne);
+                }
+            }
+        }
+
+        return factory;
+    }
+
+    /**
+     * Register an exception listener.
+     *
+     * @param listener The listener instance.
+     */
+    public final void registerExceptionListener(final ExceptionListener listener)
+    {
+        exceptionListener.registerExceptionListener(listener);
+    }
+
+    /**
+     * The Default JMS Exception Listener.
+     *
+     * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+     */
+    private class ConnectionExceptionListener implements ExceptionListener
+    {
+        /**
+         * Exception listeners.
+         */
+        private Set<ExceptionListener> listeners = new HashSet<ExceptionListener>();
+
+        /**
+         * Register an exception listener.
+         *
+         * @param listener The listener instance.
+         */
+        public final void registerExceptionListener(final ExceptionListener listener)
+        {
+            listeners.add(listener);
+        }
+
+        /**
+         * JMS Exception handler.
+         *
+         * @param e The exception.
+         */
+        public final void onException(final JMSException e)
+        {
+            synchronized (ConnectionExceptionListener.class)
+            {
+                // Close the session on behalf of the session owner...
+                close();
+
+                // Reconnect on behalf of the session owner...
+                while (!isConnected())
+                {
+                    try
+                    {
+                        connect();
+                    }
+                    catch (JMSException e1)
+                    {
+                        // Failed to reconnect, wait and try again...
+                        try
+                        {
+                            Thread.sleep(5000);
+                        }
+                        catch (InterruptedException e2)
+                        {
+                            logger.debug("Interrupted during reconnect attempt.  Aborting reconnect!  Will need restart to reconnect.", e);
+                        }
+                    }
+                }
+
+                // Tell all listeners...
+                for (ExceptionListener listener : listeners)
+                {
+                    try
+                    {
+                        listener.onException(e);
+                    }
+                    catch (Throwable t)
+                    {
+                        logger.warn("Exception during exception processing from JMS ExceptionListener '" + listener.getClass().getName() + "'.", t);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Is the session connected.
+     *
+     * @return True if the connector is connected, otherwise false.
+     */
+    public final boolean isConnected()
+    {
+        return (session != null);
+    }
+}
\ No newline at end of file


Property changes on: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/JMSSession.java
___________________________________________________________________
Name: svn:mergeinfo
   + 
Name: svn:eol-style
   + native

Modified: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/MessageSender.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/MessageSender.java	2008-09-10 16:10:28 UTC (rev 22608)
+++ labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/MessageSender.java	2008-09-10 16:10:35 UTC (rev 22609)
@@ -21,8 +21,13 @@
 
 import org.apache.log4j.Logger;
 
-import javax.jms.*;
-import java.util.Properties;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
 
 /**
  * JMS Message sender.
@@ -48,11 +53,11 @@
      * Connects the sender to the destination.
      *
      * @param destinationName The destination name.
-     * @param jndiProperties  The JNDI properties.
+     * @param jmsSession  The JMS Session instance.
      */
-    public MessageSender(final String destinationName, final Properties jndiProperties)
+    public MessageSender(final String destinationName, final JMSSession jmsSession) throws JMSException
     {
-        super(destinationName, jndiProperties);
+        super(destinationName, jmsSession);
         logger = Logger.getLogger(getClass());
     }
 
@@ -103,7 +108,6 @@
         messageProducer.send(message);
     }
 
-
     /**
      * Close out the sender and all it's resources.
      */
@@ -125,6 +129,7 @@
         }
     }
 
+
     /**
      * Is the handler connected.
      *

Modified: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/MockMessageListener.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/MockMessageListener.java	2008-09-10 16:10:28 UTC (rev 22608)
+++ labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/MockMessageListener.java	2008-09-10 16:10:35 UTC (rev 22609)
@@ -21,6 +21,7 @@
 
 import javax.jms.TextMessage;
 import javax.jms.Message;
+import javax.jms.JMSException;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Properties;
@@ -33,9 +34,9 @@
     
     public final List<TextMessage> messagesReceived = new ArrayList<TextMessage>();
 
-    protected MockMessageListener(final String destinationName, final Properties jndiProperties, final String messageSelector)
+    protected MockMessageListener(final String destinationName, final JMSSession jmsSession, final Properties jndiProperties, final String messageSelector) throws JMSException
     {
-        super(destinationName, addMessageSelector(jndiProperties, messageSelector));
+        super(destinationName, jmsSession, addMessageSelector(jndiProperties, messageSelector));
     }
 
     public void onMessage(Message message)

Added: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/util/JNDIUtil.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/util/JNDIUtil.java	                        (rev 0)
+++ labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/util/JNDIUtil.java	2008-09-10 16:10:35 UTC (rev 22609)
@@ -0,0 +1,69 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright XXXX, Red Hat Middleware LLC, and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2008, JBoss Inc.
+ */
+package org.jboss.esb.util;
+
+import javax.jms.JMSException;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Properties;
+
+/**
+ * JNDI Utilitiy methods.
+ * 
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public abstract class JNDIUtil
+{
+    /**
+     * Private default constructor.
+     */
+    private JNDIUtil()
+    {
+    }
+
+    /**
+     * Get the JNDI Context.
+     * <p/>
+     * Don't forget to close it when done!
+     *
+     * @return The context.
+     * @throws javax.jms.JMSException Error getting context.
+     */
+    public static Context getNamingContext(final Properties jndiProperties) throws JMSException
+    {
+        Context context;
+
+        try
+        {
+            context = new InitialContext(jndiProperties);
+        }
+        catch (NamingException e)
+        {
+            throw new JMSException("Failed to load InitialContext: " + jndiProperties);
+        }
+        if (context == null)
+        {
+            throw new JMSException("Failed to create JNDI context.  Check that '" + Context.PROVIDER_URL + "', '" + Context.INITIAL_CONTEXT_FACTORY + "', '" + Context.URL_PKG_PREFIXES + "' are correctly configured in the supplied JNDI properties.");
+        }
+
+        return context;
+    }    
+}


Property changes on: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/util/JNDIUtil.java
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: labs/jbossesb/workspace/skeagh/commons/src/test/java/org/jboss/esb/jms/MessageSendAndListenTest.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/test/java/org/jboss/esb/jms/MessageSendAndListenTest.java	2008-09-10 16:10:28 UTC (rev 22608)
+++ labs/jbossesb/workspace/skeagh/commons/src/test/java/org/jboss/esb/jms/MessageSendAndListenTest.java	2008-09-10 16:10:35 UTC (rev 22609)
@@ -59,39 +59,46 @@
             public void test() throws Exception
             {
                 Properties jndiProperties = getJndiProperties();
+                JMSSession jmsSession = new JMSSession(destType, jndiProperties);
 
-                if(destType == Topic.class) {
-                    jndiProperties.setProperty("topic." + destName, destName);
-                } else {
-                    jndiProperties.setProperty("queue." + destName, destName);
-                }
+                jmsSession.connect();
+                try
+                {
+                    if(destType == Topic.class) {
+                        jndiProperties.setProperty("topic." + destName, destName);
+                    } else {
+                        jndiProperties.setProperty("queue." + destName, destName);
+                    }
 
-                MockMessageListener listener = new MockMessageListener(destName, jndiProperties, null);
+                    MockMessageListener listener = new MockMessageListener(destName, jmsSession, jndiProperties, null);
 
-                listener.connect();
-                assertTrue(destType.isAssignableFrom(listener.getDestination().getClass()));
-                try {
-                    MessageSender sender = new MessageSender(destName, jndiProperties);
-                    List<String> messagesSent = Arrays.asList(new String[] {"message 1", "message 2", "message 3"});
+                    listener.connect();
+                    assertTrue(destType.isAssignableFrom(listener.getDestination().getClass()));
+                    try {
+                        MessageSender sender = new MessageSender(destName, jmsSession);
+                        List<String> messagesSent = Arrays.asList(new String[] {"message 1", "message 2", "message 3"});
 
-                    sender.connect();
-                    assertTrue(destType.isAssignableFrom(sender.getDestination().getClass()));
-                    try {
-                        for (String message : messagesSent)
-                        {
-                            sender.send(sender.getSession().createTextMessage(message));
+                        sender.connect();
+                        assertTrue(destType.isAssignableFrom(sender.getDestination().getClass()));
+                        try {
+                            for (String message : messagesSent)
+                            {
+                                sender.send(sender.getSession().createTextMessage(message));
+                            }
+                        } finally {
+                            sender.close();
                         }
+
+                        Thread.sleep(1000);
+                        assertEquals(3, listener.messagesReceived.size());
+                        assertTrue(messagesSent.contains(listener.messagesReceived.get(0).getText()));
+                        assertTrue(messagesSent.contains(listener.messagesReceived.get(1).getText()));
+                        assertTrue(messagesSent.contains(listener.messagesReceived.get(2).getText()));
                     } finally {
-                        sender.close();
+                        listener.close();
                     }
-
-                    Thread.sleep(1000);
-                    assertEquals(3, listener.messagesReceived.size());
-                    assertTrue(messagesSent.contains(listener.messagesReceived.get(0).getText()));
-                    assertTrue(messagesSent.contains(listener.messagesReceived.get(1).getText()));
-                    assertTrue(messagesSent.contains(listener.messagesReceived.get(2).getText()));
                 } finally {
-                    listener.close();
+                    jmsSession.close();
                 }
             }
         }.run();
@@ -103,54 +110,62 @@
             public void test() throws Exception
             {
                 Properties jndiProperties = getJndiProperties();
+                JMSSession jmsSession = new JMSSession(destType, jndiProperties);
 
-                if(destType == Topic.class) {
-                    jndiProperties.setProperty("topic." + destName, destName);
-                } else {
-                    jndiProperties.setProperty("queue." + destName, destName);
-                }
+                jmsSession.connect();
+                try
+                {
+                    if(destType == Topic.class) {
+                        jndiProperties.setProperty("topic." + destName, destName);
+                        jndiProperties.setProperty(AbstractMessageListener.NO_LOCAL, "false");
+                    } else {
+                        jndiProperties.setProperty("queue." + destName, destName);
+                    }
 
-                MockMessageListener service1 = new MockMessageListener(destName, jndiProperties, "serviceId='service1'");
-                MockMessageListener service2 = new MockMessageListener(destName, jndiProperties, "serviceId='service2'");
-                List<String> service1Messages = Arrays.asList(new String[] {"message 1", "message 2", "message 3"});
-                List<String> service2Messages = Arrays.asList(new String[] {"message 4", "message 5", "message 6"});
+                    MockMessageListener service1 = new MockMessageListener(destName, jmsSession, jndiProperties, "serviceId='service1'");
+                    MockMessageListener service2 = new MockMessageListener(destName, jmsSession, jndiProperties, "serviceId='service2'");
+                    List<String> service1Messages = Arrays.asList(new String[] {"message 1", "message 2", "message 3"});
+                    List<String> service2Messages = Arrays.asList(new String[] {"message 4", "message 5", "message 6"});
 
-                service1.connect();
-                try {
-                    service2.connect();
+                    service1.connect();
                     try {
-                        MessageSender sender = new MessageSender(destName, jndiProperties);
+                        service2.connect();
+                        try {
+                            MessageSender sender = new MessageSender(destName, jmsSession);
 
-                        sender.connect();
-                        try {
-                            for (String message : service1Messages)
-                            {
-                                send(sender, message, "service1");
+                            sender.connect();
+                            try {
+                                for (String message : service1Messages)
+                                {
+                                    send(sender, message, "service1");
+                                }
+                                for (String message : service2Messages)
+                                {
+                                    send(sender, message, "service2");
+                                }
+                            } finally {
+                                sender.close();
                             }
-                            for (String message : service2Messages)
-                            {
-                                send(sender, message, "service2");
-                            }
-                        } finally {
-                            sender.close();
-                        }
 
-                        Thread.sleep(1000);
+                            Thread.sleep(1000);
 
-                        assertEquals(3, service1.messagesReceived.size());
-                        assertTrue(service1Messages.contains(service1.messagesReceived.get(0).getText()));
-                        assertTrue(service1Messages.contains(service1.messagesReceived.get(1).getText()));
-                        assertTrue(service1Messages.contains(service1.messagesReceived.get(2).getText()));
+                            assertEquals(3, service1.messagesReceived.size());
+                            assertTrue(service1Messages.contains(service1.messagesReceived.get(0).getText()));
+                            assertTrue(service1Messages.contains(service1.messagesReceived.get(1).getText()));
+                            assertTrue(service1Messages.contains(service1.messagesReceived.get(2).getText()));
 
-                        assertEquals(3, service2.messagesReceived.size());
-                        assertTrue(service2Messages.contains(service2.messagesReceived.get(0).getText()));
-                        assertTrue(service2Messages.contains(service2.messagesReceived.get(1).getText()));
-                        assertTrue(service2Messages.contains(service2.messagesReceived.get(2).getText()));
+                            assertEquals(3, service2.messagesReceived.size());
+                            assertTrue(service2Messages.contains(service2.messagesReceived.get(0).getText()));
+                            assertTrue(service2Messages.contains(service2.messagesReceived.get(1).getText()));
+                            assertTrue(service2Messages.contains(service2.messagesReceived.get(2).getText()));
+                        } finally {
+                            service2.close();
+                        }
                     } finally {
-                        service2.close();
+                        service1.close();
                     }
                 } finally {
-                    service1.close();
+                    jmsSession.close();
                 }
             }
         }.run();

Modified: labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsInboundRouter.java
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsInboundRouter.java	2008-09-10 16:10:28 UTC (rev 22608)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsInboundRouter.java	2008-09-10 16:10:35 UTC (rev 22609)
@@ -20,29 +20,27 @@
  */
 package org.jboss.esb.jms;
 
-import static org.jboss.esb.jms.JmsConstants.CONNECTION_FACTORY;
-
-import java.util.Properties;
-
-import javax.jms.JMSException;
-import javax.naming.Context;
-
 import org.apache.log4j.Logger;
 import org.jboss.esb.annotations.Initialize;
 import org.jboss.esb.annotations.Property;
+import org.jboss.esb.annotations.Property.Use;
 import org.jboss.esb.annotations.Uninitialize;
-import org.jboss.esb.annotations.Property.Use;
 import org.jboss.esb.context.InvocationContext;
+import static org.jboss.esb.jms.JmsConstants.CONNECTION_FACTORY;
 import org.jboss.esb.message.Message;
 import org.jboss.esb.routing.InboundRouter;
 import org.jboss.esb.routing.MessageDispatcher;
 import org.jboss.esb.routing.RoutingException;
 
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.naming.Context;
+import java.util.Properties;
+
 /**
  * Inbound router for JMS.
  *
  * @author <a href="mailto:dbevenius at jboss.com">Daniel Bevenius</a>
- *
  */
 public class JmsInboundRouter implements InboundRouter
 {
@@ -71,10 +69,13 @@
     /**
      * Should JMSHeaders and user properties be extrated from the JMS Message
      */
-    @Property(use = Use.OPTIONAL, defaultVal = "true" )
+    @Property(use = Use.OPTIONAL, defaultVal = "true")
     private boolean extractProperties;
-
     /**
+     * The JMS Session on which the listener is created.
+     */
+    private JMSSession jmsSession;
+    /**
      * JmsMessageListener that takes care of the JMS connection.
      */
     private JmsMessageListener messageListener;
@@ -90,9 +91,24 @@
         log.info("JMSProperties : " + jmsProperties + "," + destination);
 
         // add this class as a JMS message listener...
-        messageListener = new JmsMessageListener(destination, jmsProperties, dispatcher);
-        messageListener.connect();
-
+        Destination jmsDestination = AbstractMessageHandler.lookupDestination(destination, jmsProperties);
+        jmsSession = new JMSSession(jmsDestination.getClass(), jmsProperties);
+        jmsSession.connect();
+        try
+        {
+            messageListener = new JmsMessageListener(destination, jmsSession, jmsProperties, dispatcher);
+            messageListener.connect();
+        }
+        catch (JMSException e)
+        {
+            jmsSession.close();
+            throw e;
+        }
+        catch (Throwable t)
+        {
+            jmsSession.close();
+            throw (JMSException) (new JMSException("Unexpected exception while connecting JmsMesssageListener.").initCause(t));
+        }
     }
 
     /**
@@ -101,7 +117,15 @@
     @Uninitialize
     public final void uninitialize()
     {
-       messageListener.close();
+        try
+        {
+            messageListener.close();
+        }
+        finally
+        {
+            jmsSession.close();
+        }
+
     }
 
     /**
@@ -116,6 +140,7 @@
 
     /**
      * Set the dispatcher used by this instance.
+     *
      * @param dispatcher - the dispatcher to use
      */
     public final void setDispatcher(final MessageDispatcher dispatcher)
@@ -125,6 +150,7 @@
 
     /**
      * Will display the dispatcher and the properties for this instance.
+     *
      * @return String - string representation of this instance.
      */
     @Override
@@ -164,7 +190,6 @@
      * the {@link MessageDispatcher} to pass the message along though the ESB.
      *
      * @author <a href="mailto:dbevenius at redhat.com">Daniel Bevenius</a>
-     *
      */
     private class JmsMessageListener extends AbstractMessageListener
     {
@@ -176,13 +201,13 @@
         /**
          * Protected contstructor.
          *
-         * @param destinationName   the JMS destination name.
-         * @param jndiProperties    the jndi properties to be used for jndi lookups.
-         * @param dispatcher        the {@link MessageDispatcher} to be used.
+         * @param destinationName the JMS destination name.
+         * @param jndiProperties  the jndi properties to be used for jndi lookups.
+         * @param dispatcher      the {@link MessageDispatcher} to be used.
          */
-        protected JmsMessageListener(final String destinationName, final Properties jndiProperties, final MessageDispatcher dispatcher)
+        protected JmsMessageListener(final String destinationName, final JMSSession jmsSession, final Properties jndiProperties, final MessageDispatcher dispatcher) throws JMSException
         {
-            super(destinationName, jndiProperties);
+            super(destinationName, jmsSession, jndiProperties);
             this.dispatcher = dispatcher;
         }
 
@@ -190,7 +215,8 @@
          * Handles a single JMSMessage, extracts the contents and creates
          * and populates an ESB Message object instance and dispataches
          * the ESB Message object instance to the ESB.
-         * @param jmsMessage    the JMS Message object instance
+         *
+         * @param jmsMessage the JMS Message object instance
          */
         public void onMessage(final javax.jms.Message jmsMessage)
         {

Modified: labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/JmsInboundRouterTest.java
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/JmsInboundRouterTest.java	2008-09-10 16:10:28 UTC (rev 22608)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/JmsInboundRouterTest.java	2008-09-10 16:10:35 UTC (rev 22609)
@@ -20,20 +20,19 @@
  */
 package org.jboss.esb.jms;
 
-import java.util.Properties;
+import org.jboss.esb.deploy.DeploymentRuntime;
+import org.jboss.esb.deploy.config.digest.DigestUtil;
+import org.junit.Test;
 
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import java.util.Properties;
 
-import org.jboss.esb.deploy.DeploymentRuntime;
-import org.jboss.esb.deploy.config.digest.DigestUtil;
-import org.junit.Test;
-
 /**
  * Test for {@link JmsInboundRouter}.
  *
  * @author <a href="mailto:dbevenius at redhat.com">Daniel Bevenius</a>
- *
  */
 public class JmsInboundRouterTest
 {
@@ -42,32 +41,46 @@
     @Test
     public final void initializeConfOverride() throws Exception
     {
-        new JMSTestRunner(JMSTestRunner.DEFAULT_PROVIDER_URL) {
+        new JMSTestRunner()
+        {
             @Override
             public void test() throws Exception
             {
                 DeploymentRuntime runtime = DigestUtil.digestConfig(getClass().getResourceAsStream("jms-inbound-router_01.xml"));
 
                 runtime.deploy();
-                try {
+                try
+                {
                     Thread.sleep(3000);
 
                     Properties jndiProperties = getJndiProperties();
                     jndiProperties.setProperty("queue." + DESTINATION_NAME, DESTINATION_NAME);
-                    MessageSender sender = new MessageSender(DESTINATION_NAME, jndiProperties);
+                    JMSSession jmsSession = new JMSSession(Queue.class, jndiProperties);
+
+                    jmsSession.connect();
                     try
                     {
-                        sender.connect();
-                        Session session = sender.getSession();
-                        TextMessage textMessage = session.createTextMessage("test");
-                        sender.send(textMessage);
-                        Thread.sleep(3000);
+                        MessageSender sender = new MessageSender(DESTINATION_NAME, jmsSession);
+                        try
+                        {
+                            sender.connect();
+                            Session session = sender.getSession();
+                            TextMessage textMessage = session.createTextMessage("test");
+                            sender.send(textMessage);
+                            Thread.sleep(3000);
+                        }
+                        finally
+                        {
+                            sender.close();
+                        }
                     }
                     finally
                     {
-                        sender.close();
+                        jmsSession.close();
                     }
-                } finally {
+                }
+                finally
+                {
                     runtime.undeploy();
                 }
             }

Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java	2008-09-10 16:10:28 UTC (rev 22608)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java	2008-09-10 16:10:35 UTC (rev 22609)
@@ -30,6 +30,7 @@
 import org.jboss.esb.federate.notify.DeploymentHeartbeatNotification;
 import org.jboss.esb.federate.notify.DeploymentUndeployNotification;
 import org.jboss.esb.jms.AbstractMessageListener;
+import org.jboss.esb.jms.JMSSession;
 import org.jboss.esb.jms.MessageSender;
 import org.jboss.esb.properties.ApplicationProperties;
 import org.jboss.esb.schedule.AbstractScheduleListener;
@@ -39,6 +40,8 @@
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Topic;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -100,6 +103,14 @@
      */
     private DeploymentDetailsNotification detailsNotification;
     /**
+     * Shared JMS Topic Session.
+     */
+    private JMSSession topicSession;
+    /**
+     * Shared JMS Topic Session.
+     */
+    private JMSSession queueSession;
+    /**
      * Deployment coordination listener.
      */
     private CoordinationListener coordinationListener;
@@ -149,23 +160,37 @@
 
         if (localBusProperties != null)
         {
-            connectCoordinationListener(localBusProperties);
-            if (coordinationListener != null && coordinationListener.isConnected())
+            intialiseJMSSessions(localBusProperties);
+
+            if(topicSession != null)
             {
                 try
                 {
-                    connectCoordinationBroadcaster(localBusProperties);
-
-                    // Tell all other deployments we're online, what services we have etc...
-                    sendNotification(detailsNotification);
+                    connectCoordinationListener(localBusProperties);
                 }
                 catch (Throwable t)
                 {
-                    throw new DeploymentException("Unable to initialize Deployment Coordinator.", t);
+                    closeJMSSessions();
+                    throw new DeploymentException("Failed to connect coordination listener", t);
                 }
+                if (coordinationListener != null && coordinationListener.isConnected())
+                {
+                    try
+                    {
+                        connectCoordinationBroadcaster(localBusProperties);
 
-                // Set the monitoring timeout - 4 heartbeats...                
-                monitorTimeout = (localBusProperties.getLongProperty(COORDINATOR_HEARTBEAT_FREQUENCY_KEY, COORDINATOR_HEARTBEAT_DEFAULT_FREQUENCY) * 4);
+                        // Tell all other deployments we're online, what services we have etc...
+                        sendNotification(detailsNotification);
+                    }
+                    catch (Throwable t)
+                    {
+                        closeJMSSessions();
+                        throw new DeploymentException("Unable to initialize Deployment Coordinator.", t);
+                    }
+
+                    // Set the monitoring timeout - 4 heartbeats...
+                    monitorTimeout = (localBusProperties.getLongProperty(COORDINATOR_HEARTBEAT_FREQUENCY_KEY, COORDINATOR_HEARTBEAT_DEFAULT_FREQUENCY) * 4);
+                }
             }
         }
     }
@@ -209,6 +234,8 @@
                 logger.warn("Error closing deployment coordination broadcaster for deployment '" + runtime.getDeploymentName() + "'.", t);
             }
         }
+
+        closeJMSSessions();
     }
 
     /**
@@ -272,15 +299,16 @@
      */
     private class CoordinationListener extends AbstractMessageListener
     {
+
         /**
          * Constructor.
          *
          * @param destinationName Destination name.
          * @param jndiProperties  JNDI properties.
          */
-        protected CoordinationListener(final String destinationName, final Properties jndiProperties)
+        protected CoordinationListener(final String destinationName, final Properties jndiProperties) throws JMSException
         {
-            super(destinationName, jndiProperties);
+            super(destinationName, topicSession, jndiProperties);
         }
 
         /**
@@ -343,6 +371,7 @@
                 }
             }
         }
+
     }
 
     /**
@@ -449,7 +478,7 @@
      *
      * @param localBusProperties Bus configuration properties.
      */
-    private void connectCoordinationListener(Properties localBusProperties)
+    private void connectCoordinationListener(Properties localBusProperties) throws JMSException
     {
         String coordinationTopicName = localBusProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
 
@@ -476,11 +505,11 @@
      *
      * @param localBusProperties Bus configuration properties.
      */
-    private void connectCoordinationBroadcaster(Properties localBusProperties) throws DeploymentException
+    private void connectCoordinationBroadcaster(Properties localBusProperties) throws DeploymentException, JMSException
     {
         String coordinationTopicName = localBusProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
 
-        coordinationBroadcaster = new MessageSender(coordinationTopicName, localBusProperties);
+        coordinationBroadcaster = new MessageSender(coordinationTopicName, topicSession);
         try
         {
             coordinationBroadcaster.connect();
@@ -499,14 +528,17 @@
      */
     public static class DeploymentMonitor
     {
+
         /**
          * Time of last received heartbeat.
          */
         private long lastHeartbeat;
+
         /**
          * Deployment Service Sets.
          */
         private DeploymentServiceSets serviceSets;
+
         /**
          * Deployment online/offline flag.
          */
@@ -553,5 +585,60 @@
         {
             return "{online=" + online + "} " + serviceSets.toString();
         }
+
     }
+
+    /**
+     * Initialize the shared JMS Sessions.
+     * 
+     * @param localBusProperties Local JMS Bus properties.
+     */
+    private void intialiseJMSSessions(final ApplicationProperties localBusProperties)
+    {
+        topicSession = new JMSSession(Topic.class, localBusProperties);
+        try
+        {
+            topicSession.connect();
+        }
+        catch (Throwable t)
+        {
+            closeJMSSessions();
+            logger.debug("Failed to connect shared deployment JMS Topic Session.", t);
+            return;
+        }
+        queueSession = new JMSSession(Queue.class, localBusProperties);
+        try
+        {
+            queueSession.connect();
+        }
+        catch (Throwable t)
+        {
+            closeJMSSessions();
+            logger.debug("Failed to connect shared deployment JMS Queue Session.", t);
+        }
+    }
+
+    /**
+     * Close the JMS Sessions.
+     */
+    private void closeJMSSessions()
+    {
+        // Close the sessions...
+        if(topicSession != null)
+        {
+            if(topicSession.isConnected())
+            {
+                topicSession.close();
+            }
+            topicSession = null;
+        }
+        if(queueSession != null)
+        {
+            if(queueSession.isConnected())
+            {
+                queueSession.close();
+            }
+            queueSession = null;
+        }
+    }
 }

Added: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/JMSBusDispatcher.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/JMSBusDispatcher.java	                        (rev 0)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/JMSBusDispatcher.java	2008-09-10 16:10:35 UTC (rev 22609)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright XXXX, Red Hat Middleware LLC, and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2008, JBoss Inc.
+ */
+package org.jboss.esb.federate.bus;
+
+import org.jboss.esb.routing.MessageDispatcher;
+import org.jboss.esb.routing.RoutingException;
+import org.jboss.esb.message.Message;
+import org.jboss.esb.context.InvocationContext;
+
+/**
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class JMSBusDispatcher implements MessageDispatcher
+{
+    public void dispatch(Message message, InvocationContext invocationContext) throws RoutingException
+    {
+    }
+}


Property changes on: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/JMSBusDispatcher.java
___________________________________________________________________
Name: svn:eol-style
   + native




More information about the jboss-svn-commits mailing list