[jboss-svn-commits] JBL Code SVN: r24118 - labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Nov 27 01:24:28 EST 2008
Author: beve
Date: 2008-11-27 01:24:28 -0500 (Thu, 27 Nov 2008)
New Revision: 24118
Added:
labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsBus.java
Removed:
labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JMSBus.java
Log:
Updated the java filename. The class has been renamed but not the source file.
Deleted: labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JMSBus.java
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JMSBus.java 2008-11-27 05:22:21 UTC (rev 24117)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JMSBus.java 2008-11-27 06:24:28 UTC (rev 24118)
@@ -1,555 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, Red Hat Middleware LLC, and others contributors as indicated
- * by the @authors tag. All rights reserved.
- * See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- * This copyrighted material is made available to anyone wishing to use,
- * modify, copy, or redistribute it subject to the terms and conditions
- * of the GNU Lesser General Public License, v. 2.1.
- * This program is distributed in the hope that it will be useful, but WITHOUT A
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
- * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
- * You should have received a copy of the GNU Lesser General Public License,
- * v.2.1 along with this distribution; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
- * MA 02110-1301, USA.
- *
- * (C) 2005-2008, JBoss Inc.
- */
-package org.jboss.esb.jms;
-
-import org.apache.log4j.Logger;
-import org.jboss.esb.api.bus.AbstractBus;
-import org.jboss.esb.api.bus.AbstractNotification;
-import org.jboss.esb.api.bus.BusMessage;
-import org.jboss.esb.api.context.DeploymentContext;
-import org.jboss.esb.api.context.ResourceLocator;
-import org.jboss.esb.api.routing.RoutingException;
-import org.jboss.esb.api.exception.DeploymentException;
-import org.jboss.esb.deploy.DeploymentRuntime;
-
-import javax.jms.JMSException;
-import javax.jms.ObjectMessage;
-import java.util.Properties;
-
-/**
- * ESB Bus implementation for JMS.
- * <p/>
- * Acts as the interface to the JMS Bus for a deployment.
- *
- * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- */
-public class JmsBus extends AbstractBus
-{
- /**
- * Logger.
- */
- private static Logger logger = Logger.getLogger(JmsBus.class);
- /**
- * Deployment coordination topic property key name.
- */
- public static final String DEPLOYMENT_COORDINTATION_TOPIC_KEY = "deployment.coordintation.topic";
- /**
- * Default deployment coordination topic name.
- */
- public static final String DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME = "jbossesb.deployment.coordintation.topic";
- /**
- * Deployment JMS Bus Queue name config key.
- */
- public static final String BUS_QUEUE_KEY = "deployment.bus.queue";
- /**
- * Default Deployment JMS Bus Queue name.
- */
- public static final String DEFAULT_BUS_QUEUE_NAME = "jbossesb.jms.bus";
- /**
- * Deployment ID key.
- */
- public static final String DEPLOYMENT_ID = "jbossesb_deployment_id";
- /**
- * Shared JMS Topic Session.
- */
- private JMSSession topicSession;
- /**
- * Shared JMS Topic Session.
- */
- private JMSSession queueSession;
- /**
- * JMS Deployment coordination listener.
- */
- private JMSCoordinationListener notificationListener;
- /**
- * JMS Bus Message Listener.
- */
- private JMSQueueMessageListener messageListener;
- /**
- * Deployment notification broadcaster.
- */
- private MessageSender notificationSender;
- /**
- * JMS Bus Message Sender.
- */
- private MessageSender messageSender;
-
- /**
- * Connect the bus.
- *
- * @throws RoutingException Connection exception.
- */
- public final void connect() throws RoutingException
- {
- assertIsConfigured();
-
- /*
- * Need to set the context classloader to this classes classloader. When deployed/included
- * as an osgi bundle this calls make by this class will involve jndi which uses
- * the thread context classloader. The application classloader will the the context classloader
- * for the bundle and will not have the ability to use the bundles classloader to find the
- * resources is needs.
- */
- ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
- try
- {
- try
- {
- connectJMSSessions(getProperties());
- }
- catch (DeploymentException e)
- {
- throw new RoutingException("Unable to connect JMS Bus interface for deployment '" + getDeploymentName() + "'.", e);
- }
-
- if (topicSession != null && queueSession != null)
- {
- try
- {
- connectCoordinationListener(getProperties());
- connectCoordintationSender(getProperties());
- connectBusListener(getProperties());
- connectBusSender(getProperties());
- }
- catch (DeploymentException e)
- {
- try
- {
- if (notificationListener == null && notificationSender == null && messageListener == null && messageSender == null)
- {
- if (!logger.isDebugEnabled())
- {
- logger.info("Deployment '" + getDeploymentName() + "' is not being coordinated with any other deployment. Turn on deug logging for more details.");
- }
- else
- {
- logger.debug("Deployment '" + getDeploymentName() + "' is not being coordinated with any other deployment.", e);
- }
- }
- else
- {
- throw new RoutingException("Unable to connect JMS Bus interface for deployment '" + getDeploymentName() + "'.", e);
- }
- }
- finally
- {
- closeJMSSessions();
- }
- }
- catch (Throwable t)
- {
- closeJMSSessions();
- throw new RoutingException("Unable to connect JMS Bus interface for deployment '" + getDeploymentName() + "'.", t);
- }
- }
- }
- finally
- {
- Thread.currentThread().setContextClassLoader(contextClassLoader);
- }
- }
-
- /**
- * Is the bus connected.
- *
- * @return True if the bus is connected, otherwise false.
- */
- public final boolean isConnected()
- {
- return (topicSession != null && queueSession != null);
- }
-
- /**
- * Disconnect from the Bus.
- */
- public final void disconnect()
- {
- try
- {
- closeJMSHandler("Deployment Coordination Listener", notificationListener);
- closeJMSHandler("Deployment Coordination Sender", notificationSender);
- closeJMSHandler("JMS Bus Listener", messageListener);
- closeJMSHandler("JMS Bus Sender", messageSender);
- }
- finally
- {
- closeJMSSessions();
- }
- }
-
- /**
- * Send the supplied message to the specified deployment via the bus.
- *
- * @param message The message.
- * @param targetDeploymentId The target deployment ID.
- * @throws RoutingException Error sending message onto the Bus.
- */
- public final void sendMessage(final BusMessage message, final String targetDeploymentId) throws RoutingException
- {
- try
- {
- ObjectMessage jmsMessage = messageSender.getSession().createObjectMessage(message);
-
- jmsMessage.setStringProperty(DEPLOYMENT_ID, targetDeploymentId);
- messageSender.send(jmsMessage);
- }
- catch (JMSException e)
- {
- throw new RoutingException("Error sending message to JMS Bus for deployment ID '" + targetDeploymentId + "'.", e);
- }
- }
-
- /**
- * Send a notification message onto the Bus.
- * <p/>
- * This allows notification messages to be exchanged with other deployments
- * using the underlying Bus (deployment details, heartbeat, undeploy etc).
- *
- * @param notification The notification message to be sent onto the Bus.
- * @throws RoutingException Error sending notification onto the Bus.
- */
- public final void sendNotification(final AbstractNotification notification) throws RoutingException
- {
- if (notificationSender != null)
- {
- try
- {
- notificationSender.send(notificationSender.getSession().createObjectMessage(notification));
- }
- catch (JMSException e)
- {
- throw new RoutingException("Unable to send deployment notification onto JMS Bus.", e);
- }
- }
- }
-
- /**
- * Stop listening for ESB messages on the bus.
- */
- public final void stopListening()
- {
- closeJMSHandler("JMS Bus Listener", messageListener);
- messageListener = null;
- }
-
- /**
- * Connect the deployment coordination listener.
- *
- * @param busProperties Bus configuration properties.
- * @throws org.jboss.esb.api.exception.DeploymentException
- * Failed to connect listener.
- */
- private void connectCoordinationListener(final Properties busProperties) throws DeploymentException
- {
- String coordinationTopicName = busProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
-
- notificationListener = new JMSCoordinationListener(coordinationTopicName, busProperties);
- try
- {
- notificationListener.connect();
- }
- catch (JMSException e)
- {
- notificationListener = null;
- throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
- + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
- + "JNDI properties used: " + busProperties, e);
- }
- }
-
- /**
- * Connect the deployment coordination sender.
- *
- * @param busProperties Bus configuration properties.
- * @throws org.jboss.esb.api.exception.DeploymentException
- * Failed to connect sender.
- */
- private void connectCoordintationSender(final Properties busProperties) throws DeploymentException
- {
- String coordinationTopicName = busProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
-
- notificationSender = new MessageSender(coordinationTopicName, topicSession);
- try
- {
- notificationSender.connect();
- }
- catch (JMSException e)
- {
- notificationSender = null;
- throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
- + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
- + "JNDI properties used: " + busProperties, e);
- }
- }
-
- /**
- * Connect the {@link BusMessage} listener.
- *
- * @param busProperties Bus configuration properties.
- * @throws org.jboss.esb.api.exception.DeploymentException
- * Failed to connect listener.
- */
- private void connectBusListener(final Properties busProperties) throws DeploymentException
- {
- String busQueueName = busProperties.getProperty(BUS_QUEUE_KEY, DEFAULT_BUS_QUEUE_NAME);
-
- messageListener = new JMSQueueMessageListener(busQueueName, busProperties);
- try
- {
- messageListener.connect();
- }
- catch (JMSException e)
- {
- messageListener = null;
- throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
- + "to connect to the JMS Bus Queue '" + busQueueName + "'. The JMS Server must be running and this Queue must be deployed. "
- + "JNDI properties used: " + busProperties, e);
- }
- }
-
- /**
- * Connect the {@link BusMessage} sender.
- *
- * @param busProperties Bus configuration properties.
- * @throws org.jboss.esb.api.exception.DeploymentException
- * Failed to connect sender.
- */
- private void connectBusSender(final Properties busProperties) throws DeploymentException
- {
- String busQueueName = busProperties.getProperty(BUS_QUEUE_KEY, DEFAULT_BUS_QUEUE_NAME);
-
- messageSender = new MessageSender(busQueueName, queueSession);
- try
- {
- messageSender.connect();
- }
- catch (JMSException e)
- {
- messageSender = null;
- throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
- + "to connect to the JMS Bus Queue '" + busQueueName + "'. The JMS Server must be running and this Queue must be deployed. "
- + "JNDI properties used: " + busProperties, e);
- }
- }
-
- /**
- * Initialize the shared JMS Sessions.
- *
- * @param localBusProperties Local JMS Bus properties.
- */
- private void connectJMSSessions(final Properties localBusProperties) throws DeploymentException
- {
- ResourceLocator resourceLocator = DeploymentRuntime.getResourceLocator(DeploymentContext.getContext());
-
- topicSession = new JMSSession(JMSSession.Type.TOPIC, localBusProperties);
- try
- {
- if(resourceLocator != null)
- {
- topicSession.setClassLoader(resourceLocator.getClassLoader());
- }
- topicSession.connect();
- }
- catch (Throwable t)
- {
- closeJMSSessions();
- logger.debug("Failed to connect shared deployment JMS Topic Session.", t);
- return;
- }
- try
- {
- queueSession = new JMSSession(JMSSession.Type.QUEUE, localBusProperties);
- if(resourceLocator != null)
- {
- queueSession.setClassLoader(resourceLocator.getClassLoader());
- }
- queueSession.connect();
- }
- catch (Throwable t)
- {
- closeJMSSessions();
- // OK... this is an error... we were able to connect to the topic, but not the queue???
- throw new DeploymentException("Failed to connect shared deployment JMS Queue Session.", t);
- }
- }
-
- /**
- * Close the JMS Sessions.
- */
- private void closeJMSSessions()
- {
- // Close the sessions...
- if (topicSession != null)
- {
- if (topicSession.isConnected())
- {
- topicSession.close();
- }
- topicSession = null;
- }
- if (queueSession != null)
- {
- if (queueSession.isConnected())
- {
- queueSession.close();
- }
- queueSession = null;
- }
- }
-
- /**
- * Deployment Coordination Bus Notification Listener.
- */
- private class JMSCoordinationListener extends AbstractMessageListener
- {
-
- /**
- * Constructor.
- *
- * @param destinationName Destination name.
- * @param jndiProperties JNDI properties.
- */
- protected JMSCoordinationListener(final String destinationName, final Properties jndiProperties)
- {
- super(destinationName, topicSession, jndiProperties);
- }
-
- /**
- * Handle a coordination event from other deployments.
- *
- * @param message Coordination message.
- */
- public final void onMessage(final javax.jms.Message message)
- {
- if (message instanceof ObjectMessage)
- {
- try
- {
- Object objectMessage = ((ObjectMessage) message).getObject();
-
- if (objectMessage instanceof AbstractNotification)
- {
- AbstractNotification notification = (AbstractNotification) objectMessage;
- try
- {
- getNotificationListener().onNotification(notification);
- }
- catch (RoutingException e)
- {
- logger.error("Deployment '" + getDeploymentId() + "' failed to accept notification '" + notification.getClass().getName() + "' from '" + notification.getDeploymentId() + "'.", e);
- }
- }
- }
- catch (JMSException e)
- {
- logger.error("Unable to get Object from JMS ObjectMessage.", e);
- }
- }
- }
-
- }
-
- /**
- * JMS Bus message listener.
- */
- private class JMSQueueMessageListener extends AbstractMessageListener
- {
-
- /**
- * Constructor.
- *
- * @param destinationName Destination name.
- * @param jndiProperties JNDI properties.
- */
- protected JMSQueueMessageListener(final String destinationName, final Properties jndiProperties)
- {
- super(destinationName, queueSession, addDeploymentSelector(jndiProperties, getDeploymentId()));
- }
-
- /**
- * Process a BusMessage for this deployment.
- *
- * @param message The bus message.
- */
- public final void onMessage(final javax.jms.Message message)
- {
- if (message instanceof ObjectMessage)
- {
- try
- {
- Object objectMessage = ((ObjectMessage) message).getObject();
-
- if (objectMessage instanceof BusMessage)
- {
- try
- {
- getMessageListener().onMessage((BusMessage) objectMessage);
- }
- catch (RoutingException e)
- {
- logger.error("Error processing ESB Message: " + objectMessage, e);
- }
- }
- }
- catch (JMSException e)
- {
- logger.warn("Unable to get Object from JMS ObjectMessage.", e);
- }
- }
- }
-
-
- }
-
- /**
- * Set the deployment ID selector on the specified properties.
- *
- * @param jmsProperties JMS connection properties.
- * @param deploymentId Deployment ID.
- * @return The new properties with the JMS Selector set.
- */
- private static Properties addDeploymentSelector(final Properties jmsProperties, final String deploymentId)
- {
- Properties properties = (Properties) jmsProperties.clone();
- properties.setProperty(AbstractMessageListener.MESSAGE_SELECTOR, DEPLOYMENT_ID + "='" + deploymentId + "'");
- return properties;
- }
-
- /**
- * Close JMS Handler.
- *
- * @param name The handler name.
- * @param handler The handler.
- */
- private void closeJMSHandler(final String name, final AbstractMessageHandler handler)
- {
- if (handler != null && handler.isConnected())
- {
- try
- {
- handler.close();
- }
- catch (Throwable t)
- {
- logger.warn("Error closing " + name + " for deployment '" + getDeploymentName() + "'.", t);
- }
- }
- }
-}
Copied: labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsBus.java (from rev 24117, labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JMSBus.java)
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsBus.java (rev 0)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsBus.java 2008-11-27 06:24:28 UTC (rev 24118)
@@ -0,0 +1,555 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2008, JBoss Inc.
+ */
+package org.jboss.esb.jms;
+
+import org.apache.log4j.Logger;
+import org.jboss.esb.api.bus.AbstractBus;
+import org.jboss.esb.api.bus.AbstractNotification;
+import org.jboss.esb.api.bus.BusMessage;
+import org.jboss.esb.api.context.DeploymentContext;
+import org.jboss.esb.api.context.ResourceLocator;
+import org.jboss.esb.api.routing.RoutingException;
+import org.jboss.esb.api.exception.DeploymentException;
+import org.jboss.esb.deploy.DeploymentRuntime;
+
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+import java.util.Properties;
+
+/**
+ * ESB Bus implementation for JMS.
+ * <p/>
+ * Acts as the interface to the JMS Bus for a deployment.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class JmsBus extends AbstractBus
+{
+ /**
+ * Logger.
+ */
+ private static Logger logger = Logger.getLogger(JmsBus.class);
+ /**
+ * Deployment coordination topic property key name.
+ */
+ public static final String DEPLOYMENT_COORDINTATION_TOPIC_KEY = "deployment.coordintation.topic";
+ /**
+ * Default deployment coordination topic name.
+ */
+ public static final String DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME = "jbossesb.deployment.coordintation.topic";
+ /**
+ * Deployment JMS Bus Queue name config key.
+ */
+ public static final String BUS_QUEUE_KEY = "deployment.bus.queue";
+ /**
+ * Default Deployment JMS Bus Queue name.
+ */
+ public static final String DEFAULT_BUS_QUEUE_NAME = "jbossesb.jms.bus";
+ /**
+ * Deployment ID key.
+ */
+ public static final String DEPLOYMENT_ID = "jbossesb_deployment_id";
+ /**
+ * Shared JMS Topic Session.
+ */
+ private JMSSession topicSession;
+ /**
+ * Shared JMS Topic Session.
+ */
+ private JMSSession queueSession;
+ /**
+ * JMS Deployment coordination listener.
+ */
+ private JMSCoordinationListener notificationListener;
+ /**
+ * JMS Bus Message Listener.
+ */
+ private JMSQueueMessageListener messageListener;
+ /**
+ * Deployment notification broadcaster.
+ */
+ private MessageSender notificationSender;
+ /**
+ * JMS Bus Message Sender.
+ */
+ private MessageSender messageSender;
+
+ /**
+ * Connect the bus.
+ *
+ * @throws RoutingException Connection exception.
+ */
+ public final void connect() throws RoutingException
+ {
+ assertIsConfigured();
+
+ /*
+ * Need to set the context classloader to this classes classloader. When deployed/included
+ * as an osgi bundle this calls make by this class will involve jndi which uses
+ * the thread context classloader. The application classloader will the the context classloader
+ * for the bundle and will not have the ability to use the bundles classloader to find the
+ * resources is needs.
+ */
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try
+ {
+ try
+ {
+ connectJMSSessions(getProperties());
+ }
+ catch (DeploymentException e)
+ {
+ throw new RoutingException("Unable to connect JMS Bus interface for deployment '" + getDeploymentName() + "'.", e);
+ }
+
+ if (topicSession != null && queueSession != null)
+ {
+ try
+ {
+ connectCoordinationListener(getProperties());
+ connectCoordintationSender(getProperties());
+ connectBusListener(getProperties());
+ connectBusSender(getProperties());
+ }
+ catch (DeploymentException e)
+ {
+ try
+ {
+ if (notificationListener == null && notificationSender == null && messageListener == null && messageSender == null)
+ {
+ if (!logger.isDebugEnabled())
+ {
+ logger.info("Deployment '" + getDeploymentName() + "' is not being coordinated with any other deployment. Turn on deug logging for more details.");
+ }
+ else
+ {
+ logger.debug("Deployment '" + getDeploymentName() + "' is not being coordinated with any other deployment.", e);
+ }
+ }
+ else
+ {
+ throw new RoutingException("Unable to connect JMS Bus interface for deployment '" + getDeploymentName() + "'.", e);
+ }
+ }
+ finally
+ {
+ closeJMSSessions();
+ }
+ }
+ catch (Throwable t)
+ {
+ closeJMSSessions();
+ throw new RoutingException("Unable to connect JMS Bus interface for deployment '" + getDeploymentName() + "'.", t);
+ }
+ }
+ }
+ finally
+ {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+
+ /**
+ * Is the bus connected.
+ *
+ * @return True if the bus is connected, otherwise false.
+ */
+ public final boolean isConnected()
+ {
+ return (topicSession != null && queueSession != null);
+ }
+
+ /**
+ * Disconnect from the Bus.
+ */
+ public final void disconnect()
+ {
+ try
+ {
+ closeJMSHandler("Deployment Coordination Listener", notificationListener);
+ closeJMSHandler("Deployment Coordination Sender", notificationSender);
+ closeJMSHandler("JMS Bus Listener", messageListener);
+ closeJMSHandler("JMS Bus Sender", messageSender);
+ }
+ finally
+ {
+ closeJMSSessions();
+ }
+ }
+
+ /**
+ * Send the supplied message to the specified deployment via the bus.
+ *
+ * @param message The message.
+ * @param targetDeploymentId The target deployment ID.
+ * @throws RoutingException Error sending message onto the Bus.
+ */
+ public final void sendMessage(final BusMessage message, final String targetDeploymentId) throws RoutingException
+ {
+ try
+ {
+ ObjectMessage jmsMessage = messageSender.getSession().createObjectMessage(message);
+
+ jmsMessage.setStringProperty(DEPLOYMENT_ID, targetDeploymentId);
+ messageSender.send(jmsMessage);
+ }
+ catch (JMSException e)
+ {
+ throw new RoutingException("Error sending message to JMS Bus for deployment ID '" + targetDeploymentId + "'.", e);
+ }
+ }
+
+ /**
+ * Send a notification message onto the Bus.
+ * <p/>
+ * This allows notification messages to be exchanged with other deployments
+ * using the underlying Bus (deployment details, heartbeat, undeploy etc).
+ *
+ * @param notification The notification message to be sent onto the Bus.
+ * @throws RoutingException Error sending notification onto the Bus.
+ */
+ public final void sendNotification(final AbstractNotification notification) throws RoutingException
+ {
+ if (notificationSender != null)
+ {
+ try
+ {
+ notificationSender.send(notificationSender.getSession().createObjectMessage(notification));
+ }
+ catch (JMSException e)
+ {
+ throw new RoutingException("Unable to send deployment notification onto JMS Bus.", e);
+ }
+ }
+ }
+
+ /**
+ * Stop listening for ESB messages on the bus.
+ */
+ public final void stopListening()
+ {
+ closeJMSHandler("JMS Bus Listener", messageListener);
+ messageListener = null;
+ }
+
+ /**
+ * Connect the deployment coordination listener.
+ *
+ * @param busProperties Bus configuration properties.
+ * @throws org.jboss.esb.api.exception.DeploymentException
+ * Failed to connect listener.
+ */
+ private void connectCoordinationListener(final Properties busProperties) throws DeploymentException
+ {
+ String coordinationTopicName = busProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
+
+ notificationListener = new JMSCoordinationListener(coordinationTopicName, busProperties);
+ try
+ {
+ notificationListener.connect();
+ }
+ catch (JMSException e)
+ {
+ notificationListener = null;
+ throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
+ + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
+ + "JNDI properties used: " + busProperties, e);
+ }
+ }
+
+ /**
+ * Connect the deployment coordination sender.
+ *
+ * @param busProperties Bus configuration properties.
+ * @throws org.jboss.esb.api.exception.DeploymentException
+ * Failed to connect sender.
+ */
+ private void connectCoordintationSender(final Properties busProperties) throws DeploymentException
+ {
+ String coordinationTopicName = busProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
+
+ notificationSender = new MessageSender(coordinationTopicName, topicSession);
+ try
+ {
+ notificationSender.connect();
+ }
+ catch (JMSException e)
+ {
+ notificationSender = null;
+ throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
+ + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
+ + "JNDI properties used: " + busProperties, e);
+ }
+ }
+
+ /**
+ * Connect the {@link BusMessage} listener.
+ *
+ * @param busProperties Bus configuration properties.
+ * @throws org.jboss.esb.api.exception.DeploymentException
+ * Failed to connect listener.
+ */
+ private void connectBusListener(final Properties busProperties) throws DeploymentException
+ {
+ String busQueueName = busProperties.getProperty(BUS_QUEUE_KEY, DEFAULT_BUS_QUEUE_NAME);
+
+ messageListener = new JMSQueueMessageListener(busQueueName, busProperties);
+ try
+ {
+ messageListener.connect();
+ }
+ catch (JMSException e)
+ {
+ messageListener = null;
+ throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
+ + "to connect to the JMS Bus Queue '" + busQueueName + "'. The JMS Server must be running and this Queue must be deployed. "
+ + "JNDI properties used: " + busProperties, e);
+ }
+ }
+
+ /**
+ * Connect the {@link BusMessage} sender.
+ *
+ * @param busProperties Bus configuration properties.
+ * @throws org.jboss.esb.api.exception.DeploymentException
+ * Failed to connect sender.
+ */
+ private void connectBusSender(final Properties busProperties) throws DeploymentException
+ {
+ String busQueueName = busProperties.getProperty(BUS_QUEUE_KEY, DEFAULT_BUS_QUEUE_NAME);
+
+ messageSender = new MessageSender(busQueueName, queueSession);
+ try
+ {
+ messageSender.connect();
+ }
+ catch (JMSException e)
+ {
+ messageSender = null;
+ throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
+ + "to connect to the JMS Bus Queue '" + busQueueName + "'. The JMS Server must be running and this Queue must be deployed. "
+ + "JNDI properties used: " + busProperties, e);
+ }
+ }
+
+ /**
+ * Initialize the shared JMS Sessions.
+ *
+ * @param localBusProperties Local JMS Bus properties.
+ */
+ private void connectJMSSessions(final Properties localBusProperties) throws DeploymentException
+ {
+ ResourceLocator resourceLocator = DeploymentRuntime.getResourceLocator(DeploymentContext.getContext());
+
+ topicSession = new JMSSession(JMSSession.Type.TOPIC, localBusProperties);
+ try
+ {
+ if(resourceLocator != null)
+ {
+ topicSession.setClassLoader(resourceLocator.getClassLoader());
+ }
+ topicSession.connect();
+ }
+ catch (Throwable t)
+ {
+ closeJMSSessions();
+ logger.debug("Failed to connect shared deployment JMS Topic Session.", t);
+ return;
+ }
+ try
+ {
+ queueSession = new JMSSession(JMSSession.Type.QUEUE, localBusProperties);
+ if(resourceLocator != null)
+ {
+ queueSession.setClassLoader(resourceLocator.getClassLoader());
+ }
+ queueSession.connect();
+ }
+ catch (Throwable t)
+ {
+ closeJMSSessions();
+ // OK... this is an error... we were able to connect to the topic, but not the queue???
+ throw new DeploymentException("Failed to connect shared deployment JMS Queue Session.", t);
+ }
+ }
+
+ /**
+ * Close the JMS Sessions.
+ */
+ private void closeJMSSessions()
+ {
+ // Close the sessions...
+ if (topicSession != null)
+ {
+ if (topicSession.isConnected())
+ {
+ topicSession.close();
+ }
+ topicSession = null;
+ }
+ if (queueSession != null)
+ {
+ if (queueSession.isConnected())
+ {
+ queueSession.close();
+ }
+ queueSession = null;
+ }
+ }
+
+ /**
+ * Deployment Coordination Bus Notification Listener.
+ */
+ private class JMSCoordinationListener extends AbstractMessageListener
+ {
+
+ /**
+ * Constructor.
+ *
+ * @param destinationName Destination name.
+ * @param jndiProperties JNDI properties.
+ */
+ protected JMSCoordinationListener(final String destinationName, final Properties jndiProperties)
+ {
+ super(destinationName, topicSession, jndiProperties);
+ }
+
+ /**
+ * Handle a coordination event from other deployments.
+ *
+ * @param message Coordination message.
+ */
+ public final void onMessage(final javax.jms.Message message)
+ {
+ if (message instanceof ObjectMessage)
+ {
+ try
+ {
+ Object objectMessage = ((ObjectMessage) message).getObject();
+
+ if (objectMessage instanceof AbstractNotification)
+ {
+ AbstractNotification notification = (AbstractNotification) objectMessage;
+ try
+ {
+ getNotificationListener().onNotification(notification);
+ }
+ catch (RoutingException e)
+ {
+ logger.error("Deployment '" + getDeploymentId() + "' failed to accept notification '" + notification.getClass().getName() + "' from '" + notification.getDeploymentId() + "'.", e);
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ logger.error("Unable to get Object from JMS ObjectMessage.", e);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * JMS Bus message listener.
+ */
+ private class JMSQueueMessageListener extends AbstractMessageListener
+ {
+
+ /**
+ * Constructor.
+ *
+ * @param destinationName Destination name.
+ * @param jndiProperties JNDI properties.
+ */
+ protected JMSQueueMessageListener(final String destinationName, final Properties jndiProperties)
+ {
+ super(destinationName, queueSession, addDeploymentSelector(jndiProperties, getDeploymentId()));
+ }
+
+ /**
+ * Process a BusMessage for this deployment.
+ *
+ * @param message The bus message.
+ */
+ public final void onMessage(final javax.jms.Message message)
+ {
+ if (message instanceof ObjectMessage)
+ {
+ try
+ {
+ Object objectMessage = ((ObjectMessage) message).getObject();
+
+ if (objectMessage instanceof BusMessage)
+ {
+ try
+ {
+ getMessageListener().onMessage((BusMessage) objectMessage);
+ }
+ catch (RoutingException e)
+ {
+ logger.error("Error processing ESB Message: " + objectMessage, e);
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ logger.warn("Unable to get Object from JMS ObjectMessage.", e);
+ }
+ }
+ }
+
+
+ }
+
+ /**
+ * Set the deployment ID selector on the specified properties.
+ *
+ * @param jmsProperties JMS connection properties.
+ * @param deploymentId Deployment ID.
+ * @return The new properties with the JMS Selector set.
+ */
+ private static Properties addDeploymentSelector(final Properties jmsProperties, final String deploymentId)
+ {
+ Properties properties = (Properties) jmsProperties.clone();
+ properties.setProperty(AbstractMessageListener.MESSAGE_SELECTOR, DEPLOYMENT_ID + "='" + deploymentId + "'");
+ return properties;
+ }
+
+ /**
+ * Close JMS Handler.
+ *
+ * @param name The handler name.
+ * @param handler The handler.
+ */
+ private void closeJMSHandler(final String name, final AbstractMessageHandler handler)
+ {
+ if (handler != null && handler.isConnected())
+ {
+ try
+ {
+ handler.close();
+ }
+ catch (Throwable t)
+ {
+ logger.warn("Error closing " + name + " for deployment '" + getDeploymentName() + "'.", t);
+ }
+ }
+ }
+}
Property changes on: labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/JmsBus.java
___________________________________________________________________
Name: svn:mergeinfo
+
Name: svn:eol-style
+ native
More information about the jboss-svn-commits
mailing list