[jboss-svn-commits] JBL Code SVN: r22770 - in labs/jbossesb/workspace/skeagh: commons/src/main/java/org/jboss/esb/jms and 13 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Sep 15 05:07:42 EDT 2008


Author: tfennelly
Date: 2008-09-15 05:07:42 -0400 (Mon, 15 Sep 2008)
New Revision: 22770

Added:
   labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessage.java
   labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessageListener.java
   labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/default.properties
   labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/default.properties
Removed:
   labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/main.properties
   labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/main.properties
Modified:
   labs/jbossesb/workspace/skeagh/api/service/src/main/java/org/jboss/esb/message/Message.java
   labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageListener.java
   labs/jbossesb/workspace/skeagh/commons/src/test/java/org/jboss/esb/jms/MessageSendAndListenTest.java
   labs/jbossesb/workspace/skeagh/routing/jms/src/test/resources/META-INF/jbossesb/busconfig/jms/default.properties
   labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/deploy/DeploymentRuntime.java
   labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/deploy/config/PropertiesUtil.java
   labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java
   labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/Bus.java
   labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/jms/JMSBus.java
   labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/jms/default.properties
   labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/jms/default.properties
Log:
More Bus work.

Modified: labs/jbossesb/workspace/skeagh/api/service/src/main/java/org/jboss/esb/message/Message.java
===================================================================
--- labs/jbossesb/workspace/skeagh/api/service/src/main/java/org/jboss/esb/message/Message.java	2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/api/service/src/main/java/org/jboss/esb/message/Message.java	2008-09-15 09:07:42 UTC (rev 22770)
@@ -21,6 +21,7 @@
 
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.io.Serializable;
 
 /**
  * JBoss ESB Message.
@@ -33,15 +34,21 @@
  * @author <a href="mailto:dbevenius at redhat.com">Daniel Bevenius</a>
  * @author <a href="mailto:tom.fennelly at jboss.com">Tom Fennelly</a>
  */
-public class Message
+public class Message implements Serializable
 {
 
     /**
      * Primary Payload.
+     * <p/>
+     * Don't change this to be Serializable.  The Message only needs to be
+     * Serializable if it needs to go onto a Bus, which is not always the case.
      */
     private Object payload;
     /**
      * Attachments.
+     * <p/>
+     * Don't change these to be Serializable.  The Message only needs to be
+     * Serializable if it needs to go onto a Bus, which is not always the case.
      */
     private Map<String, Object> attachments = new LinkedHashMap<String, Object>();
 
@@ -58,6 +65,10 @@
      */
     public Message(final Object payload)
     {
+        /*
+         * Don't change this to be Serializable.  The Message only needs to be
+         * Serializable if it needs to go onto a Bus, which is not always the case.
+         */
         this.payload = payload;
     }
 
@@ -68,6 +79,10 @@
      */
     public final Object getPayload()
     {
+        /*
+         * Don't change this to be Serializable.  The Message only needs to be
+         * Serializable if it needs to go onto a Bus, which is not always the case.
+         */
         return payload;
     }
 
@@ -78,6 +93,10 @@
      */
     public final void setPayload(final Object payload)
     {
+        /*
+         * Don't change this to be Serializable.  The Message only needs to be
+         * Serializable if it needs to go onto a Bus, which is not always the case.
+         */
         this.payload = payload;
     }
 
@@ -88,6 +107,10 @@
      */
     public final Map<String, Object> getAttachments()
     {
+        /*
+         * Don't change these to be Serializable.  The Message only needs to be
+         * Serializable if it needs to go onto a Bus, which is not always the case.
+         */
         return attachments;
     }
 
@@ -98,6 +121,10 @@
      */
     public final void setAttachments(final Map<String, Object> attachments)
     {
+        /*
+         * Don't change these to be Serializable.  The Message only needs to be
+         * Serializable if it needs to go onto a Bus, which is not always the case.
+         */
         this.attachments = attachments;
     }
 }

Modified: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageListener.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageListener.java	2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageListener.java	2008-09-15 09:07:42 UTC (rev 22770)
@@ -98,15 +98,10 @@
             // Bind "this" listener to the destination...
             if (getDestination() instanceof Topic)
             {
-                if (messageSelector == null)
-                {
-                    messageConsumer = ((TopicSession) getSession()).createSubscriber((Topic) getDestination());
-                }
-                else
-                {
-                    boolean noLocal = !properties.getProperty(NO_LOCAL, "true").equals("false");
-                    messageConsumer = ((TopicSession) getSession()).createSubscriber((Topic) getDestination(), messageSelector, noLocal);
-                }
+                boolean noLocal = !properties.getProperty(NO_LOCAL, "true").equals("false");
+
+                // A null 'messageSelector' is valid i.e. no selector on the consumer. 'noLocal' is on by default... 
+                messageConsumer = ((TopicSession) getSession()).createSubscriber((Topic) getDestination(), messageSelector, noLocal);
                 messageConsumer.setMessageListener(this);
             }
             else

Modified: labs/jbossesb/workspace/skeagh/commons/src/test/java/org/jboss/esb/jms/MessageSendAndListenTest.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/test/java/org/jboss/esb/jms/MessageSendAndListenTest.java	2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/commons/src/test/java/org/jboss/esb/jms/MessageSendAndListenTest.java	2008-09-15 09:07:42 UTC (rev 22770)
@@ -59,9 +59,11 @@
             public void test() throws Exception
             {
                 Properties jndiProperties = getJndiProperties();
-                JMSSession jmsSession = new JMSSession(destType, jndiProperties);
+                JMSSession jmsSession1 = new JMSSession(destType, jndiProperties);
+                JMSSession jmsSession2 = new JMSSession(destType, jndiProperties);
 
-                jmsSession.connect();
+                jmsSession1.connect();
+                jmsSession2.connect();
                 try
                 {
                     if(destType == Topic.class) {
@@ -70,12 +72,12 @@
                         jndiProperties.setProperty("queue." + destName, destName);
                     }
 
-                    MockMessageListener<TextMessage> listener = new MockMessageListener<TextMessage>(destName, jmsSession, null);
+                    MockMessageListener<TextMessage> listener = new MockMessageListener<TextMessage>(destName, jmsSession1, null);
 
                     listener.connect();
                     assertTrue(destType.isAssignableFrom(listener.getDestination().getClass()));
                     try {
-                        MessageSender sender = new MessageSender(destName, jmsSession);
+                        MessageSender sender = new MessageSender(destName, jmsSession2);
                         List<String> messagesSent = Arrays.asList(new String[] {"message 1", "message 2", "message 3"});
 
                         sender.connect();
@@ -98,7 +100,8 @@
                         listener.close();
                     }
                 } finally {
-                    jmsSession.close();
+                    jmsSession1.close();
+                    jmsSession2.close();
                 }
             }
         }.run();
@@ -110,20 +113,21 @@
             public void test() throws Exception
             {
                 Properties jndiProperties = getJndiProperties();
-                JMSSession jmsSession = new JMSSession(destType, jndiProperties);
+                JMSSession jmsSession1 = new JMSSession(destType, jndiProperties);
+                JMSSession jmsSession2 = new JMSSession(destType, jndiProperties);
 
-                jmsSession.connect();
+                jmsSession1.connect();
+                jmsSession2.connect();
                 try
                 {
                     if(destType == Topic.class) {
                         jndiProperties.setProperty("topic." + destName, destName);
-                        jndiProperties.setProperty(AbstractMessageListener.NO_LOCAL, "false");
                     } else {
                         jndiProperties.setProperty("queue." + destName, destName);
                     }
 
-                    MockMessageListener<TextMessage> service1 = new MockMessageListener<TextMessage>(destName, jmsSession, "serviceId='service1'");
-                    MockMessageListener<TextMessage> service2 = new MockMessageListener<TextMessage>(destName, jmsSession, "serviceId='service2'");
+                    MockMessageListener<TextMessage> service1 = new MockMessageListener<TextMessage>(destName, jmsSession1, "serviceId='service1'");
+                    MockMessageListener<TextMessage> service2 = new MockMessageListener<TextMessage>(destName, jmsSession1, "serviceId='service2'");
                     List<String> service1Messages = Arrays.asList(new String[] {"message 1", "message 2", "message 3"});
                     List<String> service2Messages = Arrays.asList(new String[] {"message 4", "message 5", "message 6"});
 
@@ -131,7 +135,7 @@
                     try {
                         service2.connect();
                         try {
-                            MessageSender sender = new MessageSender(destName, jmsSession);
+                            MessageSender sender = new MessageSender(destName, jmsSession2);
 
                             sender.connect();
                             try {
@@ -165,7 +169,8 @@
                         service1.close();
                     }
                 } finally {
-                    jmsSession.close();
+                    jmsSession1.close();
+                    jmsSession2.close();
                 }
             }
         }.run();

Modified: labs/jbossesb/workspace/skeagh/routing/jms/src/test/resources/META-INF/jbossesb/busconfig/jms/default.properties
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/test/resources/META-INF/jbossesb/busconfig/jms/default.properties	2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/test/resources/META-INF/jbossesb/busconfig/jms/default.properties	2008-09-15 09:07:42 UTC (rev 22770)
@@ -13,6 +13,8 @@
 
 # Bus Queues and Topics...
 deployment.coordintation.topic=jbossesb.deployment.coordintation.topic
+deployment.bus.queue=jbossesb.jms.bus
 
 # ActiveMQ Queue and Topic deployments...
 topic.jbossesb.deployment.coordintation.topic=jbossesb.deployment.coordintation.topic
+queue.jbossesb.jms.bus=jbossesb.jms.bus

Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/deploy/DeploymentRuntime.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/deploy/DeploymentRuntime.java	2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/deploy/DeploymentRuntime.java	2008-09-15 09:07:42 UTC (rev 22770)
@@ -456,7 +456,7 @@
         try
         {
             SimpleSchedule heartbeatSchedule = new SimpleSchedule();
-            ApplicationProperties deploymentProperties = PropertiesUtil.getDeploymentConfig();
+            ApplicationProperties deploymentProperties = PropertiesUtil.getDeploymentConfig(deploymentName);
 
             heartbeatSchedule.setFrequency(deploymentProperties.getLongProperty(DeploymentCoordinator.COORDINATOR_HEARTBEAT_FREQUENCY_KEY, DeploymentCoordinator.COORDINATOR_HEARTBEAT_DEFAULT_FREQUENCY));
             heartbeatSchedule.setExecCount(-1);

Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/deploy/config/PropertiesUtil.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/deploy/config/PropertiesUtil.java	2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/deploy/config/PropertiesUtil.java	2008-09-15 09:07:42 UTC (rev 22770)
@@ -21,14 +21,15 @@
 
 import org.apache.log4j.Logger;
 import org.jboss.esb.classpath.ClassUtil;
+import org.jboss.esb.deploy.DeploymentException;
 import org.jboss.esb.federate.DeploymentCoordinator;
 import org.jboss.esb.properties.ApplicationProperties;
-import org.jboss.esb.deploy.DeploymentException;
 
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 
 /**
@@ -53,7 +54,7 @@
     /**
      * Classpath dir name for ESB configs.
      */
-    private static final String BUSCONFIG_CP_DIR = "/META-INF/jbossesb/" + BUSCONFIG_FILE_DIR;
+    private static final String ESBCONFIG_CP_DIR = "/META-INF/jbossesb/";
 
     /**
      * Private constructor.
@@ -79,39 +80,74 @@
      * @param protocol       The protocol name e.g. "jms".
      * @param deploymentName The deployment name e.g. "Order_Service".
      * @return The bus configuration as a {@link java.util.Properties} instance.
-     * @throws java.io.IOException Unable to read configuration.
+     * @throws DeploymentException Unable to read configuration.
      */
-    public static ApplicationProperties getBusConfig(final String protocol, final String deploymentName) throws IOException
+    public static ApplicationProperties getBusConfig(final String protocol, final String deploymentName) throws DeploymentException
     {
-        String firstCheckPath = protocol + "/" + URLEncoder.encode(deploymentName, "UTF-8") + ".properties";
-        String secondCheckPath = protocol + "/" + "default.properties";
+        String firstCheckPath = BUSCONFIG_FILE_DIR + protocol + "/" + urlEncode(deploymentName) + ".properties";
+        String secondCheckPath = BUSCONFIG_FILE_DIR + protocol + "/" + "default.properties";
 
         ApplicationProperties properties;
 
         // First check for a deployment specific config ...
-        properties = getBusConfig(firstCheckPath);
+        properties = getConfig(firstCheckPath);
         if (properties != null)
         {
             return properties;
         }
 
         // Then check for a default ...
-        properties = getBusConfig(secondCheckPath);
+        properties = getConfig(secondCheckPath);
         if (properties != null)
         {
             return properties;
         }
 
         logger.debug("Unable to find JBoss ESB Bus configuration for protocol '" + protocol + "', deployment '" + deploymentName + "'. Tried:\n"
-                + "\t1. File: " + BUSCONFIG_FILE_DIR + firstCheckPath + "\n"
-                + "\t2. Classpath: " + BUSCONFIG_CP_DIR + firstCheckPath + "\n"
-                + "\t3. File: " + BUSCONFIG_FILE_DIR + secondCheckPath + "\n"
-                + "\t4. Classpath: " + BUSCONFIG_CP_DIR + secondCheckPath);
+                + "\t1. File: " + firstCheckPath + "\n"
+                + "\t2. Classpath: " + ESBCONFIG_CP_DIR + firstCheckPath + "\n"
+                + "\t3. File: " + secondCheckPath + "\n"
+                + "\t4. Classpath: " + ESBCONFIG_CP_DIR + secondCheckPath);
 
         return null;
     }
 
     /**
+     * Get the main Deployment properties.
+     *
+     * @param deploymentName The deployment name.
+     * @return The main Deployment properties.
+     * @throws DeploymentException Error reading properties.
+     */
+    public static ApplicationProperties getDeploymentConfig(final String deploymentName) throws DeploymentException
+    {
+        String firstCheckPath = urlEncode(deploymentName) + ".properties";
+        String secondCheckPath = "default.properties";
+
+        ApplicationProperties properties;
+
+        // First check for a deployment specific config ...
+        properties = getConfig(firstCheckPath);
+        if (properties != null)
+        {
+            return properties;
+        }
+
+        // Then check for a default ...
+        properties = getConfig(secondCheckPath);
+        if (properties != null)
+        {
+            return properties;
+        }
+
+        throw new DeploymentException("Unable to find JBoss ESB Deployment configuration for deployment '" + deploymentName + "'. Tried:\n"
+                + "\t1. File: " + firstCheckPath + "\n"
+                + "\t2. Classpath: " + ESBCONFIG_CP_DIR + firstCheckPath + "\n"
+                + "\t3. File: " + secondCheckPath + "\n"
+                + "\t4. Classpath: " + ESBCONFIG_CP_DIR + secondCheckPath);
+    }
+
+    /**
      * Get the specified bus configuration from the bus configurations directory.
      * <p/>
      * Checks in the following order:
@@ -122,45 +158,60 @@
      *
      * @param configPath The configuration file path.
      * @return The bus configuration as a {@link java.util.Properties} instance.
-     * @throws java.io.IOException Unable to read configuration.
+     * @throws DeploymentException Unable to read configuration.
      */
-    public static ApplicationProperties getBusConfig(final String configPath) throws IOException
+    public static ApplicationProperties getConfig(final String configPath) throws DeploymentException
     {
-        String firstCheckPath = BUSCONFIG_FILE_DIR + configPath;
-        String secondCheckPath = BUSCONFIG_CP_DIR + configPath;
+        String fileCheckPath = configPath;
+        String cpCheckPath = ESBCONFIG_CP_DIR + configPath;
         InputStream configStream;
 
         // 1st: Check for a deployment specific config on the local file system...
-        File checkFile = new File(firstCheckPath);
+        File checkFile = new File(fileCheckPath);
         if (checkFile.exists() && !checkFile.isDirectory())
         {
-            return ApplicationProperties.loadProperties(new FileInputStream(checkFile));
+            try
+            {
+                return ApplicationProperties.loadProperties(new FileInputStream(checkFile));
+            }
+            catch (IOException e)
+            {
+                throw new DeploymentException("Error reading properties file '" + checkFile.getAbsolutePath() + "'.", e);
+            }
         }
 
         // 2nd: Check for a deployment specific config on the classpath...
-        configStream = ClassUtil.getResourceAsStream(secondCheckPath, DeploymentCoordinator.class);
+        configStream = ClassUtil.getResourceAsStream(cpCheckPath, DeploymentCoordinator.class);
         if (configStream != null)
         {
-            return ApplicationProperties.loadProperties(configStream);
+            try
+            {
+                return ApplicationProperties.loadProperties(configStream);
+            }
+            catch (IOException e)
+            {
+                throw new DeploymentException("Error reading properties classpath file '" + cpCheckPath + "'.", e);
+            }
         }
 
         return null;
     }
 
     /**
-     * Get the main Deployment properties.
-     * @return The main Deployment properties.
-     * @throws DeploymentException Error reading properties.
+     * URL encode the supplied string.
+     *
+     * @param string The String.
+     * @return The URL encoded string.
      */
-    public static ApplicationProperties getDeploymentConfig() throws DeploymentException
+    private static String urlEncode(final String string)
     {
         try
         {
-            return getBusConfig("main.properties");
+            return URLEncoder.encode(string, "UTF-8");
         }
-        catch (IOException e)
+        catch (UnsupportedEncodingException e)
         {
-            throw new DeploymentException("Error reading main.properties.", e);
+            throw new RuntimeException("Unexpected runtime exception.  Character encoding 'UTF-8' not supported on runtime VM.");
         }
     }
 }

Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java	2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java	2008-09-15 09:07:42 UTC (rev 22770)
@@ -47,7 +47,7 @@
  * Deployment Coordinator.
  * <p/>
  * Manages relationships between local deployments e.g. deployed in the same
- * container, on the same machine etc.
+ * container, on the same machine etc.  Deploys the local Bus interface for the local deployment.
  *
  * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
  */
@@ -58,14 +58,6 @@
      */
     public static final String DEPLOYMENT_COORDINTATION_SCHEDULE_KEY = "deployment.coordintation.schedule";
     /**
-     * Deployment coordination topic property key name.
-     */
-    public static final String DEPLOYMENT_COORDINTATION_TOPIC_KEY = "deployment.coordintation.topic";
-    /**
-     * Default deployment coordination topic name.
-     */
-    public static final String DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME = "jbossesb.deployment.coordintation.topic";
-    /**
      * Heartbeat frequency config key.
      */
     public static final String COORDINATOR_HEARTBEAT_FREQUENCY_KEY = "coordinator.heartbeat.frequency";
@@ -126,7 +118,7 @@
     @Initialize
     public final void initialize() throws DeploymentException
     {
-        ApplicationProperties deploymentProperties = PropertiesUtil.getDeploymentConfig();
+        ApplicationProperties deploymentProperties = PropertiesUtil.getDeploymentConfig(runtime.getDeploymentName());
 
         // Set the monitoring timeout - 4 heartbeats...
         monitorTimeout = (deploymentProperties.getLongProperty(COORDINATOR_HEARTBEAT_FREQUENCY_KEY, COORDINATOR_HEARTBEAT_DEFAULT_FREQUENCY) * 4);

Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/Bus.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/Bus.java	2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/Bus.java	2008-09-15 09:07:42 UTC (rev 22770)
@@ -23,10 +23,7 @@
 import org.jboss.esb.deploy.DeploymentException;
 import org.jboss.esb.federate.notify.AbstractDeploymentNotification;
 import org.jboss.esb.federate.notify.NotificationListener;
-import org.jboss.esb.message.Message;
-import org.jboss.esb.routing.MessageDispatcher;
 import org.jboss.esb.routing.RoutingException;
-import org.jboss.esb.service.ServiceName;
 
 /**
  * JBoss ESB Bus definition.
@@ -54,12 +51,14 @@
 
     /**
      * Connect the bus.
+     *
      * @throws DeploymentException Connection exception.
      */
     void connect() throws DeploymentException;
 
     /**
      * Is the bus connected.
+     *
      * @return True if the bus is connected, otherwise false.
      */
     boolean isConnected();
@@ -70,24 +69,20 @@
     void close();
 
     /**
-     * Send the supplied message to the specified service on the specified
-     * deployment via the bus.
+     * Send the supplied message to the specified deployment via the bus.
      *
-     * @param message             The message.
-     * @param service             The target service.
-     * @param serviceDeploymentId The deployment ID on which the Service is located.
+     * @param message            The message.
+     * @param targetDeploymentId The target deployment ID.
      * @throws RoutingException Error sending message onto the Bus.
      */
-    void send(Message message, ServiceName service, String serviceDeploymentId) throws RoutingException;
+    void send(BusMessage message, String targetDeploymentId) throws RoutingException;
 
     /**
-     * Add a message dispatcher for receiving messages from the bus for
-     * the specified local service.
+     * Set the bus message listener on the {@link Bus} implementation.
      *
-     * @param dispatcher The message dispatcher.
-     * @param service    The target service.
+     * @param listener The message listener.
      */
-    void addDispatcher(MessageDispatcher dispatcher, ServiceName service);
+    void setBusMessageListener(BusMessageListener listener);
 
     /**
      * Set the bus notification listener.
@@ -114,6 +109,7 @@
     {
         /**
          * New Bus instance.
+         *
          * @param className Bus class name.
          * @return Bus instance.
          * @throws DeploymentException Error creating Bus instance.

Added: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessage.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessage.java	                        (rev 0)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessage.java	2008-09-15 09:07:42 UTC (rev 22770)
@@ -0,0 +1,104 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright XXXX, Red Hat Middleware LLC, and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2008, JBoss Inc.
+ */
+package org.jboss.esb.federate.bus;
+
+import org.jboss.esb.context.AddressingContext;
+import org.jboss.esb.context.InvocationContext;
+import org.jboss.esb.message.Message;
+
+import java.io.Serializable;
+
+/**
+ * ESB Bus Message.
+ * <p/>
+ * This class encapsulates the ESB {@link org.jboss.esb.message.Message} for transportation
+ * on a Bus.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public final class BusMessage implements Serializable
+{
+    /**
+     * Invocation Context.
+     */
+    private InvocationContext invocationContext;
+    /**
+     * Addressing Context.
+     */
+    private AddressingContext addressingContext;
+    /**
+     * Message.
+     */
+    private Message message;
+
+    /**
+     * Get the message Invocation Context.
+     * @return The message Invocation Context.
+     */
+    public InvocationContext getInvocationContext()
+    {
+        return invocationContext;
+    }
+
+    /**
+     * Set the message Invocation Context.
+     * @param invocationContext The message Invocation Context.
+     */
+    public void setInvocationContext(final InvocationContext invocationContext)
+    {
+        this.invocationContext = invocationContext;
+    }
+
+    /**
+     * Get the message Addressing Context.
+     * @return The message Addressing Context.
+     */
+    public AddressingContext getAddressingContext()
+    {
+        return addressingContext;
+    }
+
+    /**
+     * Set the message Addressing Context.
+     * @param addressingContext The message Addressing Context.
+     */
+    public void setAddressingContext(final AddressingContext addressingContext)
+    {
+        this.addressingContext = addressingContext;
+    }
+
+    /**
+     * Get the message.
+     * @return The message.
+     */
+    public Message getMessage()
+    {
+        return message;
+    }
+
+    /**
+     * Set the message.
+     * @param message The message.
+     */
+    public void setMessage(final Message message)
+    {
+        this.message = message;
+    }
+}


Property changes on: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessage.java
___________________________________________________________________
Name: svn:eol-style
   + native

Copied: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessageListener.java (from rev 22750, labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/notify/NotificationListener.java)
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessageListener.java	                        (rev 0)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessageListener.java	2008-09-15 09:07:42 UTC (rev 22770)
@@ -0,0 +1,35 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright XXXX, Red Hat Middleware LLC, and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2008, JBoss Inc.
+ */
+package org.jboss.esb.federate.bus;
+
+/**
+ * {@link BusMessage} Listener interface.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public interface BusMessageListener
+{
+    /**
+     * Message handler.
+     *
+     * @param busMessage The bus message.
+     */
+    void onMessage(BusMessage busMessage);
+}
\ No newline at end of file


Property changes on: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessageListener.java
___________________________________________________________________
Name: svn:mergeinfo
   + 
Name: svn:eol-style
   + native

Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/jms/JMSBus.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/jms/JMSBus.java	2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/jms/JMSBus.java	2008-09-15 09:07:42 UTC (rev 22770)
@@ -21,25 +21,23 @@
 
 import org.apache.log4j.Logger;
 import org.jboss.esb.deploy.DeploymentException;
-import org.jboss.esb.federate.DeploymentCoordinator;
+import org.jboss.esb.deploy.config.PropertiesUtil;
 import org.jboss.esb.federate.bus.Bus;
-import org.jboss.esb.deploy.config.PropertiesUtil;
+import org.jboss.esb.federate.bus.BusMessage;
+import org.jboss.esb.federate.bus.BusMessageListener;
 import org.jboss.esb.federate.notify.AbstractDeploymentNotification;
 import org.jboss.esb.federate.notify.NotificationListener;
+import org.jboss.esb.jms.AbstractMessageHandler;
 import org.jboss.esb.jms.AbstractMessageListener;
 import org.jboss.esb.jms.JMSSession;
 import org.jboss.esb.jms.MessageSender;
-import org.jboss.esb.message.Message;
 import org.jboss.esb.properties.ApplicationProperties;
-import org.jboss.esb.routing.MessageDispatcher;
 import org.jboss.esb.routing.RoutingException;
-import org.jboss.esb.service.ServiceName;
 
 import javax.jms.JMSException;
 import javax.jms.ObjectMessage;
 import javax.jms.Queue;
 import javax.jms.Topic;
-import java.io.IOException;
 import java.util.Properties;
 
 /**
@@ -56,6 +54,26 @@
      */
     private static Logger logger = Logger.getLogger(JMSBus.class);
     /**
+     * Deployment coordination topic property key name.
+     */
+    public static final String DEPLOYMENT_COORDINTATION_TOPIC_KEY = "deployment.coordintation.topic";
+    /**
+     * Default deployment coordination topic name.
+     */
+    public static final String DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME = "jbossesb.deployment.coordintation.topic";
+    /**
+     * Deployment JMS Bus Queue name config key.
+     */
+    public static final String BUS_QUEUE_KEY = "deployment.bus.queue";
+    /**
+     * Default Deployment JMS Bus Queue name.
+     */
+    public static final String DEFAULT_BUS_QUEUE_NAME = "jbossesb.jms.bus";
+    /**
+     * Deployment ID key.
+     */
+    public static final String DEPLOYMENT_ID = "jbossesb_deployment_id";
+    /**
      * Bus Properties.
      */
     private ApplicationProperties busProperties;
@@ -76,17 +94,29 @@
      */
     private JMSSession queueSession;
     /**
-     * Deployment coordination listener.
+     * JMS Deployment coordination listener.
      */
-    private DeploymentNotificationListener deploymentNotificationListener;
+    private JMSCoordinationListener jmsCoordinationListener;
     /**
      * Deployment notification broadcaster.
      */
-    private MessageSender notificationBroadcaster;
+    private MessageSender jmsCoordinationSender;
     /**
-     * Notification listener.
+     * Notification callback.
      */
-    private NotificationListener notificationListener;
+    private NotificationListener coordinationCallback;
+    /**
+     * Bus message listener.
+     */
+    private BusMessageListener busMessageCallback;
+    /**
+     * JMS Bus Message Listener.
+     */
+    private JMSQueueMessageListener jmsBusListener;
+    /**
+     * JMS Bus Message Sender.
+     */
+    private MessageSender jmsBusSender;
 
     /**
      * Get the Bus configuration properties.
@@ -132,42 +162,51 @@
             throw new IllegalStateException("'deploymentId' not set on Bus.");
         }
 
-        try
-        {
-            busProperties = PropertiesUtil.getBusConfig("jms", deploymentName);
-        }
-        catch (IOException e)
-        {
-            throw new DeploymentException("Failed to read local bus deployment configuration", e);
-        }
+        busProperties = PropertiesUtil.getBusConfig("jms", deploymentName);
 
         if (busProperties != null)
         {
-            intialiseJMSResources(busProperties);
+            connectJMSSessions(busProperties);
 
-            if (topicSession != null)
+            if (topicSession != null && queueSession != null)
             {
                 try
                 {
-                    connectNotificationListener(busProperties);
+                    connectCoordinationListener(busProperties);
+                    connectCoordintationSender(busProperties);
+                    connectBusListener(busProperties);
+                    connectBusSender(busProperties);
                 }
-                catch (Throwable t)
+                catch (DeploymentException e)
                 {
-                    closeJMSResources();
-                    throw new DeploymentException("Failed to connect coordination listener", t);
-                }
-                if (deploymentNotificationListener != null && deploymentNotificationListener.isConnected())
-                {
                     try
                     {
-                        connectNotificationBroadcaster(busProperties);
+                        if(jmsCoordinationListener == null && jmsCoordinationSender == null && jmsBusListener == null && jmsBusSender == null)
+                        {
+                            if(!logger.isDebugEnabled())
+                            {
+                                logger.info("Deployment '" + deploymentName + "' is not being coordinated with any other local deployment.  Turn on deug logging for more details.");
+                            }
+                            else
+                            {
+                                logger.debug("Deployment '" + deploymentName + "' is not being coordinated with any other local deployment.", e);                                
+                            }
+                        }
+                        else
+                        {
+                            throw e;
+                        }
                     }
-                    catch (Throwable t)
+                    finally
                     {
-                        closeJMSResources();
-                        throw new DeploymentException("Unable to initialize Deployment Coordinator.", t);
+                        closeJMSSessions();
                     }
                 }
+                catch (Throwable t)
+                {
+                    closeJMSSessions();
+                    throw new DeploymentException("Failed to connect coordination listener", t);
+                }
             }
         }
     }
@@ -177,57 +216,50 @@
      */
     public final void close()
     {
-        if (deploymentNotificationListener != null && deploymentNotificationListener.isConnected())
+        try
         {
-            try
-            {
-                deploymentNotificationListener.close();
-            }
-            catch (Throwable t)
-            {
-                logger.warn("Error closing deployment coordination listener for deployment '" + deploymentName + "'.", t);
-            }
+            closeJMSHandler("Deployment Coordination Listener", jmsCoordinationListener);
+            closeJMSHandler("Deployment Coordination Sender", jmsCoordinationSender);
+            closeJMSHandler("JMS Bus Listener", jmsBusListener);
+            closeJMSHandler("JMS Bus Sender", jmsBusSender);
         }
-
-        if (notificationBroadcaster != null && notificationBroadcaster.isConnected())
+        finally
         {
-            try
-            {
-                notificationBroadcaster.close();
-            }
-            catch (Throwable t)
-            {
-                logger.warn("Error closing deployment coordination broadcaster for deployment '" + deploymentName + "'.", t);
-            }
+            closeJMSSessions();
         }
-
-        closeJMSResources();
     }
 
     /**
-     * Send the supplied message to the specified service on the specified
-     * deployment via the bus.
+     * Send the supplied message to the specified deployment via the bus.
      *
      * @param message      The message.
-     * @param service      The target service.
-     * @param serviceDeploymentId The deployment ID on which the Service is located.
+     * @param targetDeploymentId The target deployment ID.
      * @throws RoutingException Error sending message onto the Bus.
      */
-    public final void send(final Message message, final ServiceName service, final String serviceDeploymentId) throws RoutingException
+    public final void send(final BusMessage message, final String targetDeploymentId) throws RoutingException
     {
+        try
+        {
+            ObjectMessage jmsMessage = jmsBusSender.getSession().createObjectMessage(message);
 
+            jmsMessage.setStringProperty(DEPLOYMENT_ID, targetDeploymentId);
+            jmsBusSender.send(jmsMessage);
+        }
+        catch (JMSException e)
+        {
+            throw new RoutingException("Error sending message to JMS Bus for deployment ID '" + targetDeploymentId + "'.", e);
+        }
     }
 
     /**
      * Add a message dispatcher for receiving messages from the bus for
      * the specified local service.
      *
-     * @param dispatcher The message dispatcher.
-     * @param service    The target service.
+     * @param listener The message dispatcher.
      */
-    public final void addDispatcher(final MessageDispatcher dispatcher, final ServiceName service)
+    public final void setBusMessageListener(final BusMessageListener listener)
     {
-
+        this.busMessageCallback = listener;
     }
 
     /**
@@ -237,7 +269,7 @@
      */
     public final void setNotificationListener(final NotificationListener listener)
     {
-        this.notificationListener = listener;
+        this.coordinationCallback = listener;
     }
 
     /**
@@ -251,11 +283,11 @@
      */
     public final void sendNotification(final AbstractDeploymentNotification notification) throws RoutingException
     {
-        if (notificationBroadcaster != null)
+        if (jmsCoordinationSender != null)
         {
             try
             {
-                notificationBroadcaster.send(notificationBroadcaster.getSession().createObjectMessage(notification));
+                jmsCoordinationSender.send(jmsCoordinationSender.getSession().createObjectMessage(notification));
             }
             catch (JMSException e)
             {
@@ -274,103 +306,98 @@
     }
 
     /**
-     * Deployment Notification Listener.
+     * Connect the deployment coordination listener.
+     *
+     * @param busProperties Bus configuration properties.
+     * @throws org.jboss.esb.deploy.DeploymentException Failed to connect listener.
      */
-    private class DeploymentNotificationListener extends AbstractMessageListener
+    private void connectCoordinationListener(final Properties busProperties) throws DeploymentException
     {
-        /**
-         * Constructor.
-         *
-         * @param destinationName Destination name.
-         * @param jndiProperties  JNDI properties.
-         */
-        protected DeploymentNotificationListener(final String destinationName, final Properties jndiProperties)
+        String coordinationTopicName = busProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
+
+        jmsCoordinationListener = new JMSCoordinationListener(coordinationTopicName, busProperties);
+        try
         {
-            super(destinationName, topicSession, jndiProperties);
+            jmsCoordinationListener.connect();
         }
-
-        /**
-         * Handle a coordination event from other deployments.
-         *
-         * @param message Coordination message.
-         */
-        public final void onMessage(final javax.jms.Message message)
+        catch (JMSException e)
         {
-            if (message instanceof ObjectMessage)
-            {
-                try
-                {
-                    Object objectMessage = ((ObjectMessage) message).getObject();
+            jmsCoordinationListener = null;
+            throw new DeploymentException("Deployment '" + deploymentName + "' is not being coordinated with any other local deployment.  A JMSException occured while trying "
+                    + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
+                    + "JNDI properties used: " + busProperties, e);
+        }
+    }
 
-                    if (objectMessage instanceof AbstractDeploymentNotification)
-                    {
-                        AbstractDeploymentNotification notification = (AbstractDeploymentNotification) objectMessage;
-                        String messageDeploymentId = notification.getId();
+    /**
+     * Connect the deployment coordination sender.
+     *
+     * @param busProperties Bus configuration properties.
+     * @throws org.jboss.esb.deploy.DeploymentException Failed to connect sender.
+     */
+    private void connectCoordintationSender(final Properties busProperties) throws DeploymentException
+    {
+        String coordinationTopicName = busProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
 
-                        // If it's not a notification from this deployment...
-                        if (!deploymentId.equals(messageDeploymentId))
-                        {
-                            notificationListener.onNotification(notification);
-                        }
-                    }
-                }
-                catch (JMSException e)
-                {
-                    logger.warn("Unable to get Object from JMS ObjectMessage.", e);
-                }
-            }
+        jmsCoordinationSender = new MessageSender(coordinationTopicName, topicSession);
+        try
+        {
+            jmsCoordinationSender.connect();
         }
+        catch (JMSException e)
+        {
+            jmsCoordinationSender = null;
+            throw new DeploymentException("Deployment '" + deploymentName + "' is not being coordinated with any other local deployment.  A JMSException occured while trying "
+                    + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
+                    + "JNDI properties used: " + busProperties, e);
+        }
+    }
 
-    }    
-
     /**
-     * Connect the deployment coordination listener.
+     * Connect the {@link BusMessage} listener.
      *
-     * @param localBusProperties Bus configuration properties.
+     * @param busProperties Bus configuration properties.
+     * @throws org.jboss.esb.deploy.DeploymentException Failed to connect listener.
      */
-    private void connectNotificationListener(final Properties localBusProperties)
+    private void connectBusListener(final Properties busProperties) throws DeploymentException
     {
-        String coordinationTopicName = localBusProperties.getProperty(DeploymentCoordinator.DEPLOYMENT_COORDINTATION_TOPIC_KEY, DeploymentCoordinator.DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
+        String busQueueName = busProperties.getProperty(BUS_QUEUE_KEY, DEFAULT_BUS_QUEUE_NAME);
 
-        deploymentNotificationListener = new DeploymentNotificationListener(coordinationTopicName, localBusProperties);
+        jmsBusListener = new JMSQueueMessageListener(busQueueName, busProperties);
         try
         {
-            deploymentNotificationListener.connect();
+            jmsBusListener.connect();
         }
         catch (JMSException e)
         {
-            deploymentNotificationListener = null;
-            logger.info("Deployment '" + deploymentName + "' is not being coordinated with any other local deployment.  Turn on deug logging for more details.");
-            if (logger.isDebugEnabled())
-            {
-                logger.debug("Deployment '" + deploymentName + "' is not being coordinated with any other local deployment.  A JMSException occured while trying "
-                        + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
-                        + "JNDI properties used: " + localBusProperties);
-            }
+            jmsBusListener = null;
+            throw new DeploymentException("Deployment '" + deploymentName + "' is not being coordinated with any other local deployment.  A JMSException occured while trying "
+                    + "to connect to the JMS Bus Queue '" + busQueueName + "'. The JMS Server must be running and this Queue must be deployed. "
+                    + "JNDI properties used: " + busProperties, e);
         }
     }
 
     /**
-     * Connect the deployment coordination broadcaster.
+     * Connect the {@link BusMessage} sender.
      *
-     * @param localBusProperties Bus configuration properties.
-     * @throws org.jboss.esb.deploy.DeploymentException Failed to connect deployment coordination broadcaster.
+     * @param busProperties Bus configuration properties.
+     * @throws org.jboss.esb.deploy.DeploymentException Failed to connect sender.
      */
-    private void connectNotificationBroadcaster(final Properties localBusProperties) throws DeploymentException
+    private void connectBusSender(final Properties busProperties) throws DeploymentException
     {
-        String coordinationTopicName = localBusProperties.getProperty(DeploymentCoordinator.DEPLOYMENT_COORDINTATION_TOPIC_KEY, DeploymentCoordinator.DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
+        String busQueueName = busProperties.getProperty(BUS_QUEUE_KEY, DEFAULT_BUS_QUEUE_NAME);
 
-        notificationBroadcaster = new MessageSender(coordinationTopicName, topicSession);
+        jmsBusSender = new MessageSender(busQueueName, queueSession);
         try
         {
-            notificationBroadcaster.connect();
+            jmsBusSender.connect();
         }
         catch (JMSException e)
         {
-            notificationBroadcaster = null;
+            jmsBusSender = null;
             throw new DeploymentException("Deployment '" + deploymentName + "' is not being coordinated with any other local deployment.  A JMSException occured while trying "
-                    + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
-                    + "JNDI properties used: " + localBusProperties, e);
+                    + "to connect to the JMS Bus Queue '" + busQueueName + "'. The JMS Server must be running and this Queue must be deployed. "
+                    + "JNDI properties used: " + busProperties, e);
         }
     }
 
@@ -379,7 +406,7 @@
      *
      * @param localBusProperties Local JMS Bus properties.
      */
-    private void intialiseJMSResources(final ApplicationProperties localBusProperties)
+    private void connectJMSSessions(final ApplicationProperties localBusProperties)
     {
         topicSession = new JMSSession(Topic.class, localBusProperties);
         try
@@ -388,7 +415,7 @@
         }
         catch (Throwable t)
         {
-            closeJMSResources();
+            closeJMSSessions();
             logger.debug("Failed to connect shared deployment JMS Topic Session.", t);
             return;
         }
@@ -399,7 +426,7 @@
         }
         catch (Throwable t)
         {
-            closeJMSResources();
+            closeJMSSessions();
             logger.debug("Failed to connect shared deployment JMS Queue Session.", t);
         }
     }
@@ -407,7 +434,7 @@
     /**
      * Close the JMS Sessions.
      */
-    private void closeJMSResources()
+    private void closeJMSSessions()
     {
         // Close the sessions...
         if (topicSession != null)
@@ -427,4 +454,126 @@
             queueSession = null;
         }
     }
+
+    /**
+     * Deployment Notification Listener.
+     */
+    private class JMSCoordinationListener extends AbstractMessageListener
+    {
+
+        /**
+         * Constructor.
+         *
+         * @param destinationName Destination name.
+         * @param jndiProperties  JNDI properties.
+         */
+        protected JMSCoordinationListener(final String destinationName, final Properties jndiProperties)
+        {
+            super(destinationName, topicSession, jndiProperties);
+        }
+
+        /**
+         * Handle a coordination event from other deployments.
+         *
+         * @param message Coordination message.
+         */
+        public final void onMessage(final javax.jms.Message message)
+        {
+            if (message instanceof ObjectMessage)
+            {
+                try
+                {
+                    Object objectMessage = ((ObjectMessage) message).getObject();
+
+                    if (objectMessage instanceof AbstractDeploymentNotification)
+                    {
+                        coordinationCallback.onNotification((AbstractDeploymentNotification) objectMessage);
+                    }
+                }
+                catch (JMSException e)
+                {
+                    logger.warn("Unable to get Object from JMS ObjectMessage.", e);
+                }
+            }
+        }
+
+    }
+
+    /**
+     * JMS Bus message lsitener.
+     */
+    private class JMSQueueMessageListener extends AbstractMessageListener
+    {
+
+        /**
+         * Constructor.
+         *
+         * @param destinationName Destination name.
+         * @param jndiProperties  JNDI properties.
+         */
+        protected JMSQueueMessageListener(final String destinationName, final Properties jndiProperties)
+        {
+            super(destinationName, queueSession, addDeploymentSelector(jndiProperties, deploymentId));
+        }
+
+        /**
+         * Process a BusMessage for this deployment.
+         *
+         * @param message The bus message.
+         */
+        public final void onMessage(final javax.jms.Message message)
+        {
+            if (message instanceof ObjectMessage)
+            {
+                try
+                {
+                    Object objectMessage = ((ObjectMessage) message).getObject();
+
+                    if (objectMessage instanceof BusMessage)
+                    {
+                        busMessageCallback.onMessage((BusMessage) objectMessage);
+                    }
+                }
+                catch (JMSException e)
+                {
+                    logger.warn("Unable to get Object from JMS ObjectMessage.", e);
+                }
+            }
+        }
+
+
+    }
+
+    /**
+     * Set the deployment ID selector on the specified properties.
+     * @param jmsProperties JMS connection properties.
+     * @param deploymentId Deployment ID.
+     * @return The new properties with the JMS Selector set.
+     */
+    private static Properties addDeploymentSelector(Properties jmsProperties, String deploymentId)
+    {
+        Properties properties = (Properties) jmsProperties.clone();
+        properties.setProperty(AbstractMessageListener.MESSAGE_SELECTOR, DEPLOYMENT_ID + "='" + deploymentId + "'");
+        return properties;
+    }
+
+    /**
+     * Close JMS Handler.
+     * @param name The handler name.
+     * @param handler The handler.
+     */
+    private void closeJMSHandler(final String name, final AbstractMessageHandler handler)
+    {
+        if (handler != null && handler.isConnected())
+        {
+            try
+            {
+                handler.close();
+            }
+            catch (Throwable t)
+            {
+                logger.warn("Error closing " + name + " for deployment '" + deploymentName + "'.", t);
+            }
+        }
+    }
 }

Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/jms/default.properties
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/jms/default.properties	2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/jms/default.properties	2008-09-15 09:07:42 UTC (rev 22770)
@@ -13,3 +13,4 @@
 
 # Bus Queues and Topics...
 deployment.coordintation.topic=jbossesb.deployment.coordintation.topic
+deployment.bus.queue=jbossesb.jms.bus

Deleted: labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/main.properties
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/main.properties	2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/main.properties	2008-09-15 09:07:42 UTC (rev 22770)
@@ -1,9 +0,0 @@
-###########################################################################################
-# Default JMS ESB config.
-##########################################################################################
-
-# Coordination settings...
-coordinator.heartbeat.frequency=5000
-
-# Buses to be deployed...
-bus.jms=org.jboss.esb.federate.bus.jms.JMSBus
\ No newline at end of file

Copied: labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/default.properties (from rev 22750, labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/main.properties)
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/default.properties	                        (rev 0)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/default.properties	2008-09-15 09:07:42 UTC (rev 22770)
@@ -0,0 +1,9 @@
+###########################################################################################
+# Default JMS ESB config.
+##########################################################################################
+
+# Coordination settings...
+coordinator.heartbeat.frequency=5000
+
+# Buses to be deployed...
+bus.jms=org.jboss.esb.federate.bus.jms.JMSBus
\ No newline at end of file


Property changes on: labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/default.properties
___________________________________________________________________
Name: svn:mime-type
   + text/plain
Name: svn:eol-style
   + native

Modified: labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/jms/default.properties
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/jms/default.properties	2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/jms/default.properties	2008-09-15 09:07:42 UTC (rev 22770)
@@ -13,6 +13,8 @@
 
 # Bus Queues and Topics...
 deployment.coordintation.topic=jbossesb.deployment.coordintation.topic
+deployment.bus.queue=jbossesb.jms.bus
 
 # ActiveMQ Queue and Topic deployments...
 topic.jbossesb.deployment.coordintation.topic=jbossesb.deployment.coordintation.topic
+queue.jbossesb.jms.bus=jbossesb.jms.bus

Deleted: labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/main.properties
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/main.properties	2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/main.properties	2008-09-15 09:07:42 UTC (rev 22770)
@@ -1,9 +0,0 @@
-###########################################################################################
-# Default JMS ESB config.
-##########################################################################################
-
-# Coordination settings...
-coordinator.heartbeat.frequency=700
-
-# Buses to be deployed...
-bus.jms=org.jboss.esb.federate.bus.jms.JMSBus
\ No newline at end of file

Copied: labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/default.properties (from rev 22750, labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/main.properties)
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/default.properties	                        (rev 0)
+++ labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/default.properties	2008-09-15 09:07:42 UTC (rev 22770)
@@ -0,0 +1,9 @@
+###########################################################################################
+# Default JMS ESB config.
+##########################################################################################
+
+# Coordination settings...
+coordinator.heartbeat.frequency=700
+
+# Buses to be deployed...
+bus.jms=org.jboss.esb.federate.bus.jms.JMSBus
\ No newline at end of file


Property changes on: labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/default.properties
___________________________________________________________________
Name: svn:mime-type
   + text/plain
Name: svn:eol-style
   + native




More information about the jboss-svn-commits mailing list