[jboss-svn-commits] JBL Code SVN: r24118 - labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Nov 27 01:24:28 EST 2008


Author: beve
Date: 2008-11-27 01:24:28 -0500 (Thu, 27 Nov 2008)
New Revision: 24118

Added:
   labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsBus.java
Removed:
   labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JMSBus.java
Log:
Updated the java filename. The class has been renamed but not the source file.


Deleted: labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JMSBus.java
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JMSBus.java	2008-11-27 05:22:21 UTC (rev 24117)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JMSBus.java	2008-11-27 06:24:28 UTC (rev 24118)
@@ -1,555 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, Red Hat Middleware LLC, and others contributors as indicated
- * by the @authors tag. All rights reserved.
- * See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- * This copyrighted material is made available to anyone wishing to use,
- * modify, copy, or redistribute it subject to the terms and conditions
- * of the GNU Lesser General Public License, v. 2.1.
- * This program is distributed in the hope that it will be useful, but WITHOUT A
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
- * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
- * You should have received a copy of the GNU Lesser General Public License,
- * v.2.1 along with this distribution; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
- * MA 02110-1301, USA.
- *
- * (C) 2005-2008, JBoss Inc.
- */
-package org.jboss.esb.jms;
-
-import org.apache.log4j.Logger;
-import org.jboss.esb.api.bus.AbstractBus;
-import org.jboss.esb.api.bus.AbstractNotification;
-import org.jboss.esb.api.bus.BusMessage;
-import org.jboss.esb.api.context.DeploymentContext;
-import org.jboss.esb.api.context.ResourceLocator;
-import org.jboss.esb.api.routing.RoutingException;
-import org.jboss.esb.api.exception.DeploymentException;
-import org.jboss.esb.deploy.DeploymentRuntime;
-
-import javax.jms.JMSException;
-import javax.jms.ObjectMessage;
-import java.util.Properties;
-
-/**
- * ESB Bus implementation for JMS.
- * <p/>
- * Acts as the interface to the JMS Bus for a deployment.
- *
- * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- */
-public class JmsBus extends AbstractBus
-{
-    /**
-     * Logger.
-     */
-    private static Logger logger = Logger.getLogger(JmsBus.class);
-    /**
-     * Deployment coordination topic property key name.
-     */
-    public static final String DEPLOYMENT_COORDINTATION_TOPIC_KEY = "deployment.coordintation.topic";
-    /**
-     * Default deployment coordination topic name.
-     */
-    public static final String DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME = "jbossesb.deployment.coordintation.topic";
-    /**
-     * Deployment JMS Bus Queue name config key.
-     */
-    public static final String BUS_QUEUE_KEY = "deployment.bus.queue";
-    /**
-     * Default Deployment JMS Bus Queue name.
-     */
-    public static final String DEFAULT_BUS_QUEUE_NAME = "jbossesb.jms.bus";
-    /**
-     * Deployment ID key.
-     */
-    public static final String DEPLOYMENT_ID = "jbossesb_deployment_id";
-    /**
-     * Shared JMS Topic Session.
-     */
-    private JMSSession topicSession;
-    /**
-     * Shared JMS Topic Session.
-     */
-    private JMSSession queueSession;
-    /**
-     * JMS Deployment coordination listener.
-     */
-    private JMSCoordinationListener notificationListener;
-    /**
-     * JMS Bus Message Listener.
-     */
-    private JMSQueueMessageListener messageListener;
-    /**
-     * Deployment notification broadcaster.
-     */
-    private MessageSender notificationSender;
-    /**
-     * JMS Bus Message Sender.
-     */
-    private MessageSender messageSender;
-
-    /**
-     * Connect the bus.
-     *
-     * @throws RoutingException Connection exception.
-     */
-    public final void connect() throws RoutingException
-    {
-        assertIsConfigured();
-
-        /*
-         * Need to set the context classloader to this classes classloader. When deployed/included
-         * as an osgi bundle this calls make by this class will involve jndi which uses
-         * the thread context classloader. The application classloader will the the context classloader
-         * for the bundle and will not have the ability to use the bundles classloader to find the
-         * resources is needs.
-         */
-        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
-        try
-        {
-            try
-            {
-                connectJMSSessions(getProperties());
-            }
-            catch (DeploymentException e)
-            {
-                throw new RoutingException("Unable to connect JMS Bus interface for deployment '" + getDeploymentName() + "'.", e);
-            }
-
-            if (topicSession != null && queueSession != null)
-            {
-                try
-                {
-                    connectCoordinationListener(getProperties());
-                    connectCoordintationSender(getProperties());
-                    connectBusListener(getProperties());
-                    connectBusSender(getProperties());
-                }
-                catch (DeploymentException e)
-                {
-                    try
-                    {
-                        if (notificationListener == null && notificationSender == null && messageListener == null && messageSender == null)
-                        {
-                            if (!logger.isDebugEnabled())
-                            {
-                                logger.info("Deployment '" + getDeploymentName() + "' is not being coordinated with any other deployment.  Turn on deug logging for more details.");
-                            }
-                            else
-                            {
-                                logger.debug("Deployment '" + getDeploymentName() + "' is not being coordinated with any other deployment.", e);
-                            }
-                        }
-                        else
-                        {
-                            throw new RoutingException("Unable to connect JMS Bus interface for deployment '" + getDeploymentName() + "'.", e);
-                        }
-                    }
-                    finally
-                    {
-                        closeJMSSessions();
-                    }
-                }
-                catch (Throwable t)
-                {
-                    closeJMSSessions();
-                    throw new RoutingException("Unable to connect JMS Bus interface for deployment '" + getDeploymentName() + "'.", t);
-                }
-            }
-        }
-        finally
-        {
-            Thread.currentThread().setContextClassLoader(contextClassLoader);
-        }
-    }
-
-    /**
-     * Is the bus connected.
-     *
-     * @return True if the bus is connected, otherwise false.
-     */
-    public final boolean isConnected()
-    {
-        return (topicSession != null && queueSession != null);
-    }
-
-    /**
-     * Disconnect from the Bus.
-     */
-    public final void disconnect()
-    {
-        try
-        {
-            closeJMSHandler("Deployment Coordination Listener", notificationListener);
-            closeJMSHandler("Deployment Coordination Sender", notificationSender);
-            closeJMSHandler("JMS Bus Listener", messageListener);
-            closeJMSHandler("JMS Bus Sender", messageSender);
-        }
-        finally
-        {
-            closeJMSSessions();
-        }
-    }
-
-    /**
-     * Send the supplied message to the specified deployment via the bus.
-     *
-     * @param message            The message.
-     * @param targetDeploymentId The target deployment ID.
-     * @throws RoutingException Error sending message onto the Bus.
-     */
-    public final void sendMessage(final BusMessage message, final String targetDeploymentId) throws RoutingException
-    {
-        try
-        {
-            ObjectMessage jmsMessage = messageSender.getSession().createObjectMessage(message);
-
-            jmsMessage.setStringProperty(DEPLOYMENT_ID, targetDeploymentId);
-            messageSender.send(jmsMessage);
-        }
-        catch (JMSException e)
-        {
-            throw new RoutingException("Error sending message to JMS Bus for deployment ID '" + targetDeploymentId + "'.", e);
-        }
-    }
-
-    /**
-     * Send a notification message onto the Bus.
-     * <p/>
-     * This allows notification messages to be exchanged with other deployments
-     * using the underlying Bus (deployment details, heartbeat, undeploy etc).
-     *
-     * @param notification The notification message to be sent onto the Bus.
-     * @throws RoutingException Error sending notification onto the Bus.
-     */
-    public final void sendNotification(final AbstractNotification notification) throws RoutingException
-    {
-        if (notificationSender != null)
-        {
-            try
-            {
-                notificationSender.send(notificationSender.getSession().createObjectMessage(notification));
-            }
-            catch (JMSException e)
-            {
-                throw new RoutingException("Unable to send deployment notification onto JMS Bus.", e);
-            }
-        }
-    }
-
-    /**
-     * Stop listening for ESB messages on the bus.
-     */
-    public final void stopListening()
-    {
-        closeJMSHandler("JMS Bus Listener", messageListener);
-        messageListener = null;
-    }
-
-    /**
-     * Connect the deployment coordination listener.
-     *
-     * @param busProperties Bus configuration properties.
-     * @throws org.jboss.esb.api.exception.DeploymentException
-     *          Failed to connect listener.
-     */
-    private void connectCoordinationListener(final Properties busProperties) throws DeploymentException
-    {
-        String coordinationTopicName = busProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
-
-        notificationListener = new JMSCoordinationListener(coordinationTopicName, busProperties);
-        try
-        {
-            notificationListener.connect();
-        }
-        catch (JMSException e)
-        {
-            notificationListener = null;
-            throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment.  A JMSException occured while trying "
-                    + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
-                    + "JNDI properties used: " + busProperties, e);
-        }
-    }
-
-    /**
-     * Connect the deployment coordination sender.
-     *
-     * @param busProperties Bus configuration properties.
-     * @throws org.jboss.esb.api.exception.DeploymentException
-     *          Failed to connect sender.
-     */
-    private void connectCoordintationSender(final Properties busProperties) throws DeploymentException
-    {
-        String coordinationTopicName = busProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
-
-        notificationSender = new MessageSender(coordinationTopicName, topicSession);
-        try
-        {
-            notificationSender.connect();
-        }
-        catch (JMSException e)
-        {
-            notificationSender = null;
-            throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment.  A JMSException occured while trying "
-                    + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
-                    + "JNDI properties used: " + busProperties, e);
-        }
-    }
-
-    /**
-     * Connect the {@link BusMessage} listener.
-     *
-     * @param busProperties Bus configuration properties.
-     * @throws org.jboss.esb.api.exception.DeploymentException
-     *          Failed to connect listener.
-     */
-    private void connectBusListener(final Properties busProperties) throws DeploymentException
-    {
-        String busQueueName = busProperties.getProperty(BUS_QUEUE_KEY, DEFAULT_BUS_QUEUE_NAME);
-
-        messageListener = new JMSQueueMessageListener(busQueueName, busProperties);
-        try
-        {
-            messageListener.connect();
-        }
-        catch (JMSException e)
-        {
-            messageListener = null;
-            throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment.  A JMSException occured while trying "
-                    + "to connect to the JMS Bus Queue '" + busQueueName + "'. The JMS Server must be running and this Queue must be deployed. "
-                    + "JNDI properties used: " + busProperties, e);
-        }
-    }
-
-    /**
-     * Connect the {@link BusMessage} sender.
-     *
-     * @param busProperties Bus configuration properties.
-     * @throws org.jboss.esb.api.exception.DeploymentException
-     *          Failed to connect sender.
-     */
-    private void connectBusSender(final Properties busProperties) throws DeploymentException
-    {
-        String busQueueName = busProperties.getProperty(BUS_QUEUE_KEY, DEFAULT_BUS_QUEUE_NAME);
-
-        messageSender = new MessageSender(busQueueName, queueSession);
-        try
-        {
-            messageSender.connect();
-        }
-        catch (JMSException e)
-        {
-            messageSender = null;
-            throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment.  A JMSException occured while trying "
-                    + "to connect to the JMS Bus Queue '" + busQueueName + "'. The JMS Server must be running and this Queue must be deployed. "
-                    + "JNDI properties used: " + busProperties, e);
-        }
-    }
-
-    /**
-     * Initialize the shared JMS Sessions.
-     *
-     * @param localBusProperties Local JMS Bus properties.
-     */
-    private void connectJMSSessions(final Properties localBusProperties) throws DeploymentException
-    {
-        ResourceLocator resourceLocator = DeploymentRuntime.getResourceLocator(DeploymentContext.getContext());
-
-        topicSession = new JMSSession(JMSSession.Type.TOPIC, localBusProperties);
-        try
-        {
-            if(resourceLocator != null)
-            {
-                topicSession.setClassLoader(resourceLocator.getClassLoader());
-            }
-            topicSession.connect();
-        }
-        catch (Throwable t)
-        {
-            closeJMSSessions();
-            logger.debug("Failed to connect shared deployment JMS Topic Session.", t);
-            return;
-        }
-        try
-        {
-            queueSession = new JMSSession(JMSSession.Type.QUEUE, localBusProperties);
-            if(resourceLocator != null)
-            {
-                queueSession.setClassLoader(resourceLocator.getClassLoader());
-            }
-            queueSession.connect();
-        }
-        catch (Throwable t)
-        {
-            closeJMSSessions();
-            // OK... this is an error... we were able to connect to the topic, but not the queue???
-            throw new DeploymentException("Failed to connect shared deployment JMS Queue Session.", t);
-        }
-    }
-
-    /**
-     * Close the JMS Sessions.
-     */
-    private void closeJMSSessions()
-    {
-        // Close the sessions...
-        if (topicSession != null)
-        {
-            if (topicSession.isConnected())
-            {
-                topicSession.close();
-            }
-            topicSession = null;
-        }
-        if (queueSession != null)
-        {
-            if (queueSession.isConnected())
-            {
-                queueSession.close();
-            }
-            queueSession = null;
-        }
-    }
-
-    /**
-     * Deployment Coordination Bus Notification Listener.
-     */
-    private class JMSCoordinationListener extends AbstractMessageListener
-    {
-
-        /**
-         * Constructor.
-         *
-         * @param destinationName Destination name.
-         * @param jndiProperties  JNDI properties.
-         */
-        protected JMSCoordinationListener(final String destinationName, final Properties jndiProperties)
-        {
-            super(destinationName, topicSession, jndiProperties);
-        }
-
-        /**
-         * Handle a coordination event from other deployments.
-         *
-         * @param message Coordination message.
-         */
-        public final void onMessage(final javax.jms.Message message)
-        {
-            if (message instanceof ObjectMessage)
-            {
-                try
-                {
-                    Object objectMessage = ((ObjectMessage) message).getObject();
-
-                    if (objectMessage instanceof AbstractNotification)
-                    {
-                        AbstractNotification notification = (AbstractNotification) objectMessage;
-                        try
-                        {
-                            getNotificationListener().onNotification(notification);
-                        }
-                        catch (RoutingException e)
-                        {
-                            logger.error("Deployment '" + getDeploymentId() + "' failed to accept notification '" + notification.getClass().getName() + "' from '" + notification.getDeploymentId() +  "'.", e);
-                        }
-                    }
-                }
-                catch (JMSException e)
-                {
-                    logger.error("Unable to get Object from JMS ObjectMessage.", e);
-                }
-            }
-        }
-
-    }
-
-    /**
-     * JMS Bus message listener.
-     */
-    private class JMSQueueMessageListener extends AbstractMessageListener
-    {
-
-        /**
-         * Constructor.
-         *
-         * @param destinationName Destination name.
-         * @param jndiProperties  JNDI properties.
-         */
-        protected JMSQueueMessageListener(final String destinationName, final Properties jndiProperties)
-        {
-            super(destinationName, queueSession, addDeploymentSelector(jndiProperties, getDeploymentId()));
-        }
-
-        /**
-         * Process a BusMessage for this deployment.
-         *
-         * @param message The bus message.
-         */
-        public final void onMessage(final javax.jms.Message message)
-        {
-            if (message instanceof ObjectMessage)
-            {
-                try
-                {
-                    Object objectMessage = ((ObjectMessage) message).getObject();
-
-                    if (objectMessage instanceof BusMessage)
-                    {
-                        try
-                        {
-                            getMessageListener().onMessage((BusMessage) objectMessage);
-                        }
-                        catch (RoutingException e)
-                        {
-                            logger.error("Error processing ESB Message: " + objectMessage, e);
-                        }
-                    }
-                }
-                catch (JMSException e)
-                {
-                    logger.warn("Unable to get Object from JMS ObjectMessage.", e);
-                }
-            }
-        }
-
-
-    }
-
-    /**
-     * Set the deployment ID selector on the specified properties.
-     *
-     * @param jmsProperties JMS connection properties.
-     * @param deploymentId  Deployment ID.
-     * @return The new properties with the JMS Selector set.
-     */
-    private static Properties addDeploymentSelector(final Properties jmsProperties, final String deploymentId)
-    {
-        Properties properties = (Properties) jmsProperties.clone();
-        properties.setProperty(AbstractMessageListener.MESSAGE_SELECTOR, DEPLOYMENT_ID + "='" + deploymentId + "'");
-        return properties;
-    }
-
-    /**
-     * Close JMS Handler.
-     *
-     * @param name    The handler name.
-     * @param handler The handler.
-     */
-    private void closeJMSHandler(final String name, final AbstractMessageHandler handler)
-    {
-        if (handler != null && handler.isConnected())
-        {
-            try
-            {
-                handler.close();
-            }
-            catch (Throwable t)
-            {
-                logger.warn("Error closing " + name + " for deployment '" + getDeploymentName() + "'.", t);
-            }
-        }
-    }
-}

Copied: labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsBus.java (from rev 24117, labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JMSBus.java)
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsBus.java	                        (rev 0)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsBus.java	2008-11-27 06:24:28 UTC (rev 24118)
@@ -0,0 +1,555 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2008, JBoss Inc.
+ */
+package org.jboss.esb.jms;
+
+import org.apache.log4j.Logger;
+import org.jboss.esb.api.bus.AbstractBus;
+import org.jboss.esb.api.bus.AbstractNotification;
+import org.jboss.esb.api.bus.BusMessage;
+import org.jboss.esb.api.context.DeploymentContext;
+import org.jboss.esb.api.context.ResourceLocator;
+import org.jboss.esb.api.routing.RoutingException;
+import org.jboss.esb.api.exception.DeploymentException;
+import org.jboss.esb.deploy.DeploymentRuntime;
+
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+import java.util.Properties;
+
+/**
+ * ESB Bus implementation for JMS.
+ * <p/>
+ * Acts as the interface to the JMS Bus for a deployment.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class JmsBus extends AbstractBus
+{
+    /**
+     * Logger.
+     */
+    private static Logger logger = Logger.getLogger(JmsBus.class);
+    /**
+     * Deployment coordination topic property key name.
+     */
+    public static final String DEPLOYMENT_COORDINTATION_TOPIC_KEY = "deployment.coordintation.topic";
+    /**
+     * Default deployment coordination topic name.
+     */
+    public static final String DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME = "jbossesb.deployment.coordintation.topic";
+    /**
+     * Deployment JMS Bus Queue name config key.
+     */
+    public static final String BUS_QUEUE_KEY = "deployment.bus.queue";
+    /**
+     * Default Deployment JMS Bus Queue name.
+     */
+    public static final String DEFAULT_BUS_QUEUE_NAME = "jbossesb.jms.bus";
+    /**
+     * Deployment ID key.
+     */
+    public static final String DEPLOYMENT_ID = "jbossesb_deployment_id";
+    /**
+     * Shared JMS Topic Session.
+     */
+    private JMSSession topicSession;
+    /**
+     * Shared JMS Topic Session.
+     */
+    private JMSSession queueSession;
+    /**
+     * JMS Deployment coordination listener.
+     */
+    private JMSCoordinationListener notificationListener;
+    /**
+     * JMS Bus Message Listener.
+     */
+    private JMSQueueMessageListener messageListener;
+    /**
+     * Deployment notification broadcaster.
+     */
+    private MessageSender notificationSender;
+    /**
+     * JMS Bus Message Sender.
+     */
+    private MessageSender messageSender;
+
+    /**
+     * Connect the bus.
+     *
+     * @throws RoutingException Connection exception.
+     */
+    public final void connect() throws RoutingException
+    {
+        assertIsConfigured();
+
+        /*
+         * Need to set the context classloader to this classes classloader. When deployed/included
+         * as an osgi bundle this calls make by this class will involve jndi which uses
+         * the thread context classloader. The application classloader will the the context classloader
+         * for the bundle and will not have the ability to use the bundles classloader to find the
+         * resources is needs.
+         */
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try
+        {
+            try
+            {
+                connectJMSSessions(getProperties());
+            }
+            catch (DeploymentException e)
+            {
+                throw new RoutingException("Unable to connect JMS Bus interface for deployment '" + getDeploymentName() + "'.", e);
+            }
+
+            if (topicSession != null && queueSession != null)
+            {
+                try
+                {
+                    connectCoordinationListener(getProperties());
+                    connectCoordintationSender(getProperties());
+                    connectBusListener(getProperties());
+                    connectBusSender(getProperties());
+                }
+                catch (DeploymentException e)
+                {
+                    try
+                    {
+                        if (notificationListener == null && notificationSender == null && messageListener == null && messageSender == null)
+                        {
+                            if (!logger.isDebugEnabled())
+                            {
+                                logger.info("Deployment '" + getDeploymentName() + "' is not being coordinated with any other deployment.  Turn on deug logging for more details.");
+                            }
+                            else
+                            {
+                                logger.debug("Deployment '" + getDeploymentName() + "' is not being coordinated with any other deployment.", e);
+                            }
+                        }
+                        else
+                        {
+                            throw new RoutingException("Unable to connect JMS Bus interface for deployment '" + getDeploymentName() + "'.", e);
+                        }
+                    }
+                    finally
+                    {
+                        closeJMSSessions();
+                    }
+                }
+                catch (Throwable t)
+                {
+                    closeJMSSessions();
+                    throw new RoutingException("Unable to connect JMS Bus interface for deployment '" + getDeploymentName() + "'.", t);
+                }
+            }
+        }
+        finally
+        {
+            Thread.currentThread().setContextClassLoader(contextClassLoader);
+        }
+    }
+
+    /**
+     * Is the bus connected.
+     *
+     * @return True if the bus is connected, otherwise false.
+     */
+    public final boolean isConnected()
+    {
+        return (topicSession != null && queueSession != null);
+    }
+
+    /**
+     * Disconnect from the Bus.
+     */
+    public final void disconnect()
+    {
+        try
+        {
+            closeJMSHandler("Deployment Coordination Listener", notificationListener);
+            closeJMSHandler("Deployment Coordination Sender", notificationSender);
+            closeJMSHandler("JMS Bus Listener", messageListener);
+            closeJMSHandler("JMS Bus Sender", messageSender);
+        }
+        finally
+        {
+            closeJMSSessions();
+        }
+    }
+
+    /**
+     * Send the supplied message to the specified deployment via the bus.
+     *
+     * @param message            The message.
+     * @param targetDeploymentId The target deployment ID.
+     * @throws RoutingException Error sending message onto the Bus.
+     */
+    public final void sendMessage(final BusMessage message, final String targetDeploymentId) throws RoutingException
+    {
+        try
+        {
+            ObjectMessage jmsMessage = messageSender.getSession().createObjectMessage(message);
+
+            jmsMessage.setStringProperty(DEPLOYMENT_ID, targetDeploymentId);
+            messageSender.send(jmsMessage);
+        }
+        catch (JMSException e)
+        {
+            throw new RoutingException("Error sending message to JMS Bus for deployment ID '" + targetDeploymentId + "'.", e);
+        }
+    }
+
+    /**
+     * Send a notification message onto the Bus.
+     * <p/>
+     * This allows notification messages to be exchanged with other deployments
+     * using the underlying Bus (deployment details, heartbeat, undeploy etc).
+     *
+     * @param notification The notification message to be sent onto the Bus.
+     * @throws RoutingException Error sending notification onto the Bus.
+     */
+    public final void sendNotification(final AbstractNotification notification) throws RoutingException
+    {
+        if (notificationSender != null)
+        {
+            try
+            {
+                notificationSender.send(notificationSender.getSession().createObjectMessage(notification));
+            }
+            catch (JMSException e)
+            {
+                throw new RoutingException("Unable to send deployment notification onto JMS Bus.", e);
+            }
+        }
+    }
+
+    /**
+     * Stop listening for ESB messages on the bus.
+     */
+    public final void stopListening()
+    {
+        closeJMSHandler("JMS Bus Listener", messageListener);
+        messageListener = null;
+    }
+
+    /**
+     * Connect the deployment coordination listener.
+     *
+     * @param busProperties Bus configuration properties.
+     * @throws org.jboss.esb.api.exception.DeploymentException
+     *          Failed to connect listener.
+     */
+    private void connectCoordinationListener(final Properties busProperties) throws DeploymentException
+    {
+        String coordinationTopicName = busProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
+
+        notificationListener = new JMSCoordinationListener(coordinationTopicName, busProperties);
+        try
+        {
+            notificationListener.connect();
+        }
+        catch (JMSException e)
+        {
+            notificationListener = null;
+            throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment.  A JMSException occured while trying "
+                    + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
+                    + "JNDI properties used: " + busProperties, e);
+        }
+    }
+
+    /**
+     * Connect the deployment coordination sender.
+     *
+     * @param busProperties Bus configuration properties.
+     * @throws org.jboss.esb.api.exception.DeploymentException
+     *          Failed to connect sender.
+     */
+    private void connectCoordintationSender(final Properties busProperties) throws DeploymentException
+    {
+        String coordinationTopicName = busProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
+
+        notificationSender = new MessageSender(coordinationTopicName, topicSession);
+        try
+        {
+            notificationSender.connect();
+        }
+        catch (JMSException e)
+        {
+            notificationSender = null;
+            throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment.  A JMSException occured while trying "
+                    + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
+                    + "JNDI properties used: " + busProperties, e);
+        }
+    }
+
+    /**
+     * Connect the {@link BusMessage} listener.
+     *
+     * @param busProperties Bus configuration properties.
+     * @throws org.jboss.esb.api.exception.DeploymentException
+     *          Failed to connect listener.
+     */
+    private void connectBusListener(final Properties busProperties) throws DeploymentException
+    {
+        String busQueueName = busProperties.getProperty(BUS_QUEUE_KEY, DEFAULT_BUS_QUEUE_NAME);
+
+        messageListener = new JMSQueueMessageListener(busQueueName, busProperties);
+        try
+        {
+            messageListener.connect();
+        }
+        catch (JMSException e)
+        {
+            messageListener = null;
+            throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment.  A JMSException occured while trying "
+                    + "to connect to the JMS Bus Queue '" + busQueueName + "'. The JMS Server must be running and this Queue must be deployed. "
+                    + "JNDI properties used: " + busProperties, e);
+        }
+    }
+
+    /**
+     * Connect the {@link BusMessage} sender.
+     *
+     * @param busProperties Bus configuration properties.
+     * @throws org.jboss.esb.api.exception.DeploymentException
+     *          Failed to connect sender.
+     */
+    private void connectBusSender(final Properties busProperties) throws DeploymentException
+    {
+        String busQueueName = busProperties.getProperty(BUS_QUEUE_KEY, DEFAULT_BUS_QUEUE_NAME);
+
+        messageSender = new MessageSender(busQueueName, queueSession);
+        try
+        {
+            messageSender.connect();
+        }
+        catch (JMSException e)
+        {
+            messageSender = null;
+            throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment.  A JMSException occured while trying "
+                    + "to connect to the JMS Bus Queue '" + busQueueName + "'. The JMS Server must be running and this Queue must be deployed. "
+                    + "JNDI properties used: " + busProperties, e);
+        }
+    }
+
+    /**
+     * Initialize the shared JMS Sessions.
+     *
+     * @param localBusProperties Local JMS Bus properties.
+     */
+    private void connectJMSSessions(final Properties localBusProperties) throws DeploymentException
+    {
+        ResourceLocator resourceLocator = DeploymentRuntime.getResourceLocator(DeploymentContext.getContext());
+
+        topicSession = new JMSSession(JMSSession.Type.TOPIC, localBusProperties);
+        try
+        {
+            if(resourceLocator != null)
+            {
+                topicSession.setClassLoader(resourceLocator.getClassLoader());
+            }
+            topicSession.connect();
+        }
+        catch (Throwable t)
+        {
+            closeJMSSessions();
+            logger.debug("Failed to connect shared deployment JMS Topic Session.", t);
+            return;
+        }
+        try
+        {
+            queueSession = new JMSSession(JMSSession.Type.QUEUE, localBusProperties);
+            if(resourceLocator != null)
+            {
+                queueSession.setClassLoader(resourceLocator.getClassLoader());
+            }
+            queueSession.connect();
+        }
+        catch (Throwable t)
+        {
+            closeJMSSessions();
+            // OK... this is an error... we were able to connect to the topic, but not the queue???
+            throw new DeploymentException("Failed to connect shared deployment JMS Queue Session.", t);
+        }
+    }
+
+    /**
+     * Close the JMS Sessions.
+     */
+    private void closeJMSSessions()
+    {
+        // Close the sessions...
+        if (topicSession != null)
+        {
+            if (topicSession.isConnected())
+            {
+                topicSession.close();
+            }
+            topicSession = null;
+        }
+        if (queueSession != null)
+        {
+            if (queueSession.isConnected())
+            {
+                queueSession.close();
+            }
+            queueSession = null;
+        }
+    }
+
+    /**
+     * Deployment Coordination Bus Notification Listener.
+     */
+    private class JMSCoordinationListener extends AbstractMessageListener
+    {
+
+        /**
+         * Constructor.
+         *
+         * @param destinationName Destination name.
+         * @param jndiProperties  JNDI properties.
+         */
+        protected JMSCoordinationListener(final String destinationName, final Properties jndiProperties)
+        {
+            super(destinationName, topicSession, jndiProperties);
+        }
+
+        /**
+         * Handle a coordination event from other deployments.
+         *
+         * @param message Coordination message.
+         */
+        public final void onMessage(final javax.jms.Message message)
+        {
+            if (message instanceof ObjectMessage)
+            {
+                try
+                {
+                    Object objectMessage = ((ObjectMessage) message).getObject();
+
+                    if (objectMessage instanceof AbstractNotification)
+                    {
+                        AbstractNotification notification = (AbstractNotification) objectMessage;
+                        try
+                        {
+                            getNotificationListener().onNotification(notification);
+                        }
+                        catch (RoutingException e)
+                        {
+                            logger.error("Deployment '" + getDeploymentId() + "' failed to accept notification '" + notification.getClass().getName() + "' from '" + notification.getDeploymentId() +  "'.", e);
+                        }
+                    }
+                }
+                catch (JMSException e)
+                {
+                    logger.error("Unable to get Object from JMS ObjectMessage.", e);
+                }
+            }
+        }
+
+    }
+
+    /**
+     * JMS Bus message listener.
+     */
+    private class JMSQueueMessageListener extends AbstractMessageListener
+    {
+
+        /**
+         * Constructor.
+         *
+         * @param destinationName Destination name.
+         * @param jndiProperties  JNDI properties.
+         */
+        protected JMSQueueMessageListener(final String destinationName, final Properties jndiProperties)
+        {
+            super(destinationName, queueSession, addDeploymentSelector(jndiProperties, getDeploymentId()));
+        }
+
+        /**
+         * Process a BusMessage for this deployment.
+         *
+         * @param message The bus message.
+         */
+        public final void onMessage(final javax.jms.Message message)
+        {
+            if (message instanceof ObjectMessage)
+            {
+                try
+                {
+                    Object objectMessage = ((ObjectMessage) message).getObject();
+
+                    if (objectMessage instanceof BusMessage)
+                    {
+                        try
+                        {
+                            getMessageListener().onMessage((BusMessage) objectMessage);
+                        }
+                        catch (RoutingException e)
+                        {
+                            logger.error("Error processing ESB Message: " + objectMessage, e);
+                        }
+                    }
+                }
+                catch (JMSException e)
+                {
+                    logger.warn("Unable to get Object from JMS ObjectMessage.", e);
+                }
+            }
+        }
+
+
+    }
+
+    /**
+     * Set the deployment ID selector on the specified properties.
+     *
+     * @param jmsProperties JMS connection properties.
+     * @param deploymentId  Deployment ID.
+     * @return The new properties with the JMS Selector set.
+     */
+    private static Properties addDeploymentSelector(final Properties jmsProperties, final String deploymentId)
+    {
+        Properties properties = (Properties) jmsProperties.clone();
+        properties.setProperty(AbstractMessageListener.MESSAGE_SELECTOR, DEPLOYMENT_ID + "='" + deploymentId + "'");
+        return properties;
+    }
+
+    /**
+     * Close JMS Handler.
+     *
+     * @param name    The handler name.
+     * @param handler The handler.
+     */
+    private void closeJMSHandler(final String name, final AbstractMessageHandler handler)
+    {
+        if (handler != null && handler.isConnected())
+        {
+            try
+            {
+                handler.close();
+            }
+            catch (Throwable t)
+            {
+                logger.warn("Error closing " + name + " for deployment '" + getDeploymentName() + "'.", t);
+            }
+        }
+    }
+}


Property changes on: labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsBus.java
___________________________________________________________________
Name: svn:mergeinfo
   + 
Name: svn:eol-style
   + native




More information about the jboss-svn-commits mailing list