[jboss-svn-commits] JBL Code SVN: r22724 - in labs/jbossesb/workspace/skeagh/runtime/src: main/java/org/jboss/esb/federate/bus and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Sep 12 15:57:15 EDT 2008


Author: tfennelly
Date: 2008-09-12 15:57:15 -0400 (Fri, 12 Sep 2008)
New Revision: 22724

Added:
   labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/bus/
Modified:
   labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java
   labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/Bus.java
   labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/DeploymentCoordinatorTest.java
Log:
Pulling the JMS Bus up out of the DeploymentCoordinator. It now initializes the Buses from the config.

Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java	2008-09-12 17:44:38 UTC (rev 22723)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java	2008-09-12 19:57:15 UTC (rev 22724)
@@ -24,9 +24,8 @@
 import org.jboss.esb.annotations.Uninitialize;
 import org.jboss.esb.deploy.DeploymentException;
 import org.jboss.esb.deploy.DeploymentRuntime;
+import org.jboss.esb.deploy.config.PropertiesUtil;
 import org.jboss.esb.federate.bus.Bus;
-import org.jboss.esb.deploy.config.PropertiesUtil;
-import org.jboss.esb.federate.bus.jms.JMSBus;
 import org.jboss.esb.federate.notify.AbstractDeploymentNotification;
 import org.jboss.esb.federate.notify.DeploymentDetailsNotification;
 import org.jboss.esb.federate.notify.DeploymentHeartbeatNotification;
@@ -38,6 +37,8 @@
 import org.jboss.esb.schedule.SchedulingException;
 import org.jboss.esb.util.AssertArgument;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -94,9 +95,9 @@
      */
     private DeploymentDetailsNotification detailsNotification;
     /**
-     * JMS Bus.  Will be dynamically loading this soon!  Will support different bus types.
+     * Bus list.
      */
-    private BusDeployment jmsBusDeployment;
+    private List<BusDeployment> busDeployments = new ArrayList<BusDeployment>();
     /**
      * Coordinator monitor timeout.  The number of milliseconds before a deployment is marked as offline.
      */
@@ -125,33 +126,55 @@
     @Initialize
     public final void initialize() throws DeploymentException
     {
-        // TODO: JMSBus is temporarily hardwired in... pull it out and generalise later...
-        JMSBus bus = new JMSBus();
-        bus.setDeploymentName(runtime.getDeploymentName());
-        bus.setDeploymentId(runtime.getDeploymentId());
+        ApplicationProperties deploymentProperties = PropertiesUtil.getDeploymentConfig();
 
-        bus.connect();
+        // Set the monitoring timeout - 4 heartbeats...
+        monitorTimeout = (deploymentProperties.getLongProperty(COORDINATOR_HEARTBEAT_FREQUENCY_KEY, COORDINATOR_HEARTBEAT_DEFAULT_FREQUENCY) * 4);
 
-        if (bus.isConnected())
+        // Connect all configured buses...
+        Set<Map.Entry<Object, Object>> configEntries = deploymentProperties.entrySet();
+        for (Map.Entry<Object, Object> configEntry : configEntries)
         {
-            ApplicationProperties deploymentProperties = PropertiesUtil.getDeploymentConfig();
+            String propertyName = (String) configEntry.getKey();
+            if(propertyName.startsWith("bus."))
+            {
+                String busClassName = (String) configEntry.getValue();
+                try
+                {
+                    Bus bus = Bus.Factory.newInstance(busClassName);
 
-            try
-            {
-                // Tell the other deployments using this bus that this deployment is online...
-                bus.sendNotification(detailsNotification);
+                    bus.setDeploymentName(runtime.getDeploymentName());
+                    bus.setDeploymentId(runtime.getDeploymentId());
+                    bus.connect();
+
+                    if (bus.isConnected())
+                    {
+                        busDeployments.add(new BusDeployment(bus));
+
+                        try
+                        {
+                            // Tell the other deployments using this bus that this deployment is online...
+                            bus.sendNotification(detailsNotification);
+                        }
+                        catch (RoutingException e)
+                        {
+                            bus.close();
+                            logger.error("Unable to start Bus.  Failed to deliver Deployment Details Notification (on statrup) to  bus '" + bus.getClass().getName() + "'.", e);
+                            return;
+                        }
+                    }
+                }
+                catch (DeploymentException e)
+                {
+                    uninitialize();
+                    throw e;
+                }
+                catch(Throwable t)
+                {
+                    uninitialize();
+                    throw new DeploymentException("Unexpected exception while starting Deployment Bus interfaces.", t);
+                }
             }
-            catch (RoutingException e)
-            {
-                bus.close();
-                logger.error("Unable to start Bus.  Failed to deliver Deployment Details Notification (on statrup) to  bus '" + bus.getClass().getName() + "'.", e);
-                return;
-            }
-
-            // Set the monitoring timeout - 4 heartbeats...
-            // TODO: Move this to a deployment related config i.e. a non-bus related config
-            monitorTimeout = (deploymentProperties.getLongProperty(COORDINATOR_HEARTBEAT_FREQUENCY_KEY, COORDINATOR_HEARTBEAT_DEFAULT_FREQUENCY) * 4);
-            jmsBusDeployment = new BusDeployment(bus);
         }
     }
 
@@ -161,19 +184,27 @@
     @Uninitialize
     public final void uninitialize()
     {
-        if (jmsBusDeployment != null)
+        if (!busDeployments.isEmpty())
         {
-            try
+            for (BusDeployment busDeployment : busDeployments)
             {
-                // Tell all other deployments we're no longer online...
-                jmsBusDeployment.bus.sendNotification(DeploymentUndeployNotification.toNotification(runtime));
+                try
+                {
+                    busDeployment.getBus().sendNotification(DeploymentUndeployNotification.toNotification(runtime));
+                }
+                catch(Throwable t)
+                {
+                    logger.error("Error sending undeploy notification on Bus '" + busDeployment.getBus().getClass().getName() + "'.", t);
+                }
+                try
+                {
+                    busDeployment.getBus().close();
+                }
+                catch(Throwable t)
+                {
+                    logger.error("Error closing Bus '" + busDeployment.getBus().getClass().getName() + "'.", t);
+                }
             }
-            catch (Throwable t)
-            {
-                logger.warn("Error sending undeploy notification'.", t);
-            }
-
-            jmsBusDeployment.bus.close();
         }
     }
 
@@ -184,22 +215,30 @@
      */
     public final void onSchedule() throws SchedulingException
     {
-        if (jmsBusDeployment != null)
+        if (!busDeployments.isEmpty())
         {
-            try
+            for (BusDeployment busDeployment : busDeployments)
             {
-                if (broadcastHeartbeat)
+                try
                 {
-                    // Tell all other deployments we're still alive...
-                    jmsBusDeployment.bus.sendNotification(heartbeatNotification);
+                    if(broadcastHeartbeat)
+                    {
+                        busDeployment.getBus().sendNotification(heartbeatNotification);
+                    }
                 }
-
-                jmsBusDeployment.checkMonitors();
+                catch(Throwable t)
+                {
+                    logger.error("Error sending heartbeat notification on Bus '" + busDeployment.getBus().getClass().getName() + "'.", t);
+                }
+                try
+                {
+                    busDeployment.checkMonitors();
+                }
+                catch(Throwable t)
+                {
+                    logger.error("Error checking monitors on Bus '" + busDeployment.getBus().getClass().getName() + "'.", t);
+                }
             }
-            catch (Throwable t)
-            {
-                logger.warn("Error sending deployment heartbeat notification'.", t);
-            }
         }
     }
 
@@ -214,19 +253,19 @@
     }
 
     /**
-     * Get the deployment monitors being managed by this deployment.
+     * Get the {@link BusDeployment} instances being managed by this deployment.
      *
-     * @return The deployment monitors being managed by this deployment.
+     * @return The {@link BusDeployment} instances being managed by this deployment.
      */
-    public final Map<String, DeploymentMonitor> getDeploymentMonitors()
+    public final List<BusDeployment> getBusDeployments()
     {
-        return jmsBusDeployment.deploymentMonitors;
+        return busDeployments;
     }
 
     /**
      * JBoss ESB Deployment.
      */
-    private final class BusDeployment
+    public final class BusDeployment
     {
         /**
          * The Bus.
@@ -252,6 +291,16 @@
             bus.setNotificationListener(notificationListener);
         }
 
+        public Bus getBus()
+        {
+            return bus;
+        }
+
+        public Map<String, DeploymentMonitor> getDeploymentMonitors()
+        {
+            return deploymentMonitors;
+        }
+
         /**
          * Bus notification listener.
          */

Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/Bus.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/Bus.java	2008-09-12 17:44:38 UTC (rev 22723)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/Bus.java	2008-09-12 19:57:15 UTC (rev 22724)
@@ -19,13 +19,14 @@
  */
 package org.jboss.esb.federate.bus;
 
+import org.jboss.esb.classpath.ClassUtil;
+import org.jboss.esb.deploy.DeploymentException;
 import org.jboss.esb.federate.notify.AbstractDeploymentNotification;
 import org.jboss.esb.federate.notify.NotificationListener;
 import org.jboss.esb.message.Message;
 import org.jboss.esb.routing.MessageDispatcher;
 import org.jboss.esb.routing.RoutingException;
 import org.jboss.esb.service.ServiceName;
-import org.jboss.esb.deploy.DeploymentException;
 
 /**
  * JBoss ESB Bus definition.
@@ -105,4 +106,36 @@
      * @throws RoutingException Error sending notification onto the Bus.
      */
     void sendNotification(AbstractDeploymentNotification notification) throws RoutingException;
+
+    /**
+     * Bus Factory class.
+     */
+    public static class Factory
+    {
+        /**
+         * New Bus instance.
+         * @param className Bus class name.
+         * @return Bus instance.
+         * @throws DeploymentException Error creating Bus instance.
+         */
+        public static Bus newInstance(String className) throws DeploymentException
+        {
+            try
+            {
+                return (Bus) ClassUtil.forName(className, Bus.class).newInstance();
+            }
+            catch (InstantiationException e)
+            {
+                throw new DeploymentException("Failed to create instance of Bus class '" + className + "'.  Class must define a public default constructor.", e);
+            }
+            catch (IllegalAccessException e)
+            {
+                throw new DeploymentException("Failed to create instance of Bus class '" + className + "'.", e);
+            }
+            catch (ClassNotFoundException e)
+            {
+                throw new DeploymentException("Failed to create instance of Bus class '" + className + "'.  Class not found on classpath.", e);
+            }
+        }
+    }
 }

Modified: labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/DeploymentCoordinatorTest.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/DeploymentCoordinatorTest.java	2008-09-12 17:44:38 UTC (rev 22723)
+++ labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/DeploymentCoordinatorTest.java	2008-09-12 19:57:15 UTC (rev 22724)
@@ -48,16 +48,16 @@
                         DeploymentCoordinator coordinator2 = deployment2.getDeploymentCoordinator();
                         DeploymentMonitor monitor;
 
-                        assertEquals(1, coordinator1.getDeploymentMonitors().size());
-                        assertEquals(1, coordinator2.getDeploymentMonitors().size());
+                        assertEquals(1, coordinator1.getBusDeployments().get(0).getDeploymentMonitors().size());
+                        assertEquals(1, coordinator2.getBusDeployments().get(0).getDeploymentMonitors().size());
 
                         // So deployment1 should be monitoring deployment2...
-                        monitor = (DeploymentMonitor) coordinator1.getDeploymentMonitors().values().toArray()[0];
+                        monitor = (DeploymentMonitor) coordinator1.getBusDeployments().get(0).getDeploymentMonitors().values().toArray()[0];
                         monitor.getServiceSets().setDeploymentId("x");
                         assertEquals("{online=true} deployment2:x[Services: [hello:goodbye]][OutboundRoutedServices: [hello:hello]]", monitor.toString());
 
                         // And deployment2 should be monitoring deployment1...
-                        monitor = (DeploymentMonitor) coordinator2.getDeploymentMonitors().values().toArray()[0];
+                        monitor = (DeploymentMonitor) coordinator2.getBusDeployments().get(0).getDeploymentMonitors().values().toArray()[0];
                         monitor.getServiceSets().setDeploymentId("x");
                         assertEquals("{online=true} deployment1:x[Services: [hello:hello]][OutboundRoutedServices: [hello:hello, hello:goodbye]]", monitor.toString());
 
@@ -70,25 +70,25 @@
                             DeploymentCoordinator coordinator3 = deployment3.getDeploymentCoordinator();
 
                             // Each of the 3 deployments should be monitoring the other 2 deployments...
-                            assertEquals(2, coordinator1.getDeploymentMonitors().size());
-                            assertEquals(2, coordinator2.getDeploymentMonitors().size());
-                            assertEquals(2, coordinator3.getDeploymentMonitors().size());
+                            assertEquals(2, coordinator1.getBusDeployments().get(0).getDeploymentMonitors().size());
+                            assertEquals(2, coordinator2.getBusDeployments().get(0).getDeploymentMonitors().size());
+                            assertEquals(2, coordinator3.getBusDeployments().get(0).getDeploymentMonitors().size());
 
                             // This part of the test will just use toStrings because the monitor maps are not linked i.e. not ordered...
                             String monitorsTOString;
 
                             // deployment1 should be monitoring deployment2 and deployment3...
-                            monitorsTOString = coordinator1.getDeploymentMonitors().values().toString();
+                            monitorsTOString = coordinator1.getBusDeployments().get(0).getDeploymentMonitors().values().toString();
                             assertTrue(monitorsTOString.indexOf("{online=true} deployment2") != -1);
                             assertTrue(monitorsTOString.indexOf("{online=true} deployment3") != -1);
 
                             // deployment2 should be monitoring deployment1 and deployment3...
-                            monitorsTOString = coordinator2.getDeploymentMonitors().values().toString();
+                            monitorsTOString = coordinator2.getBusDeployments().get(0).getDeploymentMonitors().values().toString();
                             assertTrue(monitorsTOString.indexOf("{online=true} deployment1") != -1);
                             assertTrue(monitorsTOString.indexOf("{online=true} deployment3") != -1);
 
                             // deployment3 should be monitoring deployment1 and deployment2...
-                            monitorsTOString = coordinator3.getDeploymentMonitors().values().toString();
+                            monitorsTOString = coordinator3.getBusDeployments().get(0).getDeploymentMonitors().values().toString();
                             assertTrue(monitorsTOString.indexOf("{online=true} deployment1") != -1);
                             assertTrue(monitorsTOString.indexOf("{online=true} deployment2") != -1);
                         } finally {
@@ -96,16 +96,16 @@
                         }
                         Thread.sleep(500);
 
-                        assertEquals(1, coordinator1.getDeploymentMonitors().size());
-                        assertEquals(1, coordinator2.getDeploymentMonitors().size());
+                        assertEquals(1, coordinator1.getBusDeployments().get(0).getDeploymentMonitors().size());
+                        assertEquals(1, coordinator2.getBusDeployments().get(0).getDeploymentMonitors().size());
 
                         // So deployment1 should be monitoring deployment2...
-                        monitor = (DeploymentMonitor) coordinator1.getDeploymentMonitors().values().toArray()[0];
+                        monitor = (DeploymentMonitor) coordinator1.getBusDeployments().get(0).getDeploymentMonitors().values().toArray()[0];
                         monitor.getServiceSets().setDeploymentId("x");
                         assertEquals("{online=true} deployment2:x[Services: [hello:goodbye]][OutboundRoutedServices: [hello:hello]]", monitor.toString());
 
                         // And deployment2 should be monitoring deployment1...
-                        monitor = (DeploymentMonitor) coordinator2.getDeploymentMonitors().values().toArray()[0];
+                        monitor = (DeploymentMonitor) coordinator2.getBusDeployments().get(0).getDeploymentMonitors().values().toArray()[0];
                         monitor.getServiceSets().setDeploymentId("x");
                         assertEquals("{online=true} deployment1:x[Services: [hello:hello]][OutboundRoutedServices: [hello:hello, hello:goodbye]]", monitor.toString());
                     } finally {
@@ -114,7 +114,7 @@
                     Thread.sleep(500);
 
                     // deployment1 should no longer be be monitoring deployment2...
-                    assertEquals(0, coordinator1.getDeploymentMonitors().size());
+                    assertEquals(0, coordinator1.getBusDeployments().get(0).getDeploymentMonitors().size());
                 } finally {
                     deployment1.undeploy();
                 }
@@ -141,16 +141,16 @@
                         DeploymentCoordinator coordinator2 = deployment2.getDeploymentCoordinator();
                         DeploymentMonitor monitor;
 
-                        assertEquals(1, coordinator1.getDeploymentMonitors().size());
-                        assertEquals(1, coordinator2.getDeploymentMonitors().size());
+                        assertEquals(1, coordinator1.getBusDeployments().get(0).getDeploymentMonitors().size());
+                        assertEquals(1, coordinator2.getBusDeployments().get(0).getDeploymentMonitors().size());
 
                         // So deployment1 should be monitoring deployment2...
-                        monitor = (DeploymentMonitor) coordinator1.getDeploymentMonitors().values().toArray()[0];
+                        monitor = (DeploymentMonitor) coordinator1.getBusDeployments().get(0).getDeploymentMonitors().values().toArray()[0];
                         monitor.getServiceSets().setDeploymentId("x");
                         assertEquals("{online=true} deployment2:x[Services: [hello:goodbye]][OutboundRoutedServices: [hello:hello]]", monitor.toString());
 
                         // And deployment2 should be monitoring deployment1...
-                        monitor = (DeploymentMonitor) coordinator2.getDeploymentMonitors().values().toArray()[0];
+                        monitor = (DeploymentMonitor) coordinator2.getBusDeployments().get(0).getDeploymentMonitors().values().toArray()[0];
                         monitor.getServiceSets().setDeploymentId("x");
                         assertEquals("{online=true} deployment1:x[Services: [hello:hello]][OutboundRoutedServices: [hello:hello, hello:goodbye]]", monitor.toString());
 
@@ -159,7 +159,7 @@
                         Thread.sleep(6000);
 
                         // deployment2 should see deployment1 as being offline now...
-                        monitor = (DeploymentMonitor) coordinator2.getDeploymentMonitors().values().toArray()[0];
+                        monitor = (DeploymentMonitor) coordinator2.getBusDeployments().get(0).getDeploymentMonitors().values().toArray()[0];
                         monitor.getServiceSets().setDeploymentId("x");
                         assertEquals("{online=false} deployment1:x[Services: [hello:hello]][OutboundRoutedServices: [hello:hello, hello:goodbye]]", monitor.toString());
 
@@ -168,7 +168,7 @@
                         Thread.sleep(6000);
 
                         // deployment2 should see deployment1 as being online again...
-                        monitor = (DeploymentMonitor) coordinator2.getDeploymentMonitors().values().toArray()[0];
+                        monitor = (DeploymentMonitor) coordinator2.getBusDeployments().get(0).getDeploymentMonitors().values().toArray()[0];
                         monitor.getServiceSets().setDeploymentId("x");
                         assertEquals("{online=true} deployment1:x[Services: [hello:hello]][OutboundRoutedServices: [hello:hello, hello:goodbye]]", monitor.toString());
 
@@ -178,7 +178,7 @@
                     Thread.sleep(100);
 
                     // deployment1 should no longer be be monitoring deployment2...
-                    assertEquals(0, coordinator1.getDeploymentMonitors().size());
+                    assertEquals(0, coordinator1.getBusDeployments().get(0).getDeploymentMonitors().size());
                 } finally {
                     deployment1.undeploy();
                 }




More information about the jboss-svn-commits mailing list