[jboss-svn-commits] JBL Code SVN: r22724 - in labs/jbossesb/workspace/skeagh/runtime/src: main/java/org/jboss/esb/federate/bus and 1 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Sep 12 15:57:15 EDT 2008
Author: tfennelly
Date: 2008-09-12 15:57:15 -0400 (Fri, 12 Sep 2008)
New Revision: 22724
Added:
labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/bus/
Modified:
labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java
labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/Bus.java
labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/DeploymentCoordinatorTest.java
Log:
Pulling the JMS Bus up out of the DeploymentCoordinator. It now initializes the Buses from the config.
Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java 2008-09-12 17:44:38 UTC (rev 22723)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java 2008-09-12 19:57:15 UTC (rev 22724)
@@ -24,9 +24,8 @@
import org.jboss.esb.annotations.Uninitialize;
import org.jboss.esb.deploy.DeploymentException;
import org.jboss.esb.deploy.DeploymentRuntime;
+import org.jboss.esb.deploy.config.PropertiesUtil;
import org.jboss.esb.federate.bus.Bus;
-import org.jboss.esb.deploy.config.PropertiesUtil;
-import org.jboss.esb.federate.bus.jms.JMSBus;
import org.jboss.esb.federate.notify.AbstractDeploymentNotification;
import org.jboss.esb.federate.notify.DeploymentDetailsNotification;
import org.jboss.esb.federate.notify.DeploymentHeartbeatNotification;
@@ -38,6 +37,8 @@
import org.jboss.esb.schedule.SchedulingException;
import org.jboss.esb.util.AssertArgument;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -94,9 +95,9 @@
*/
private DeploymentDetailsNotification detailsNotification;
/**
- * JMS Bus. Will be dynamically loading this soon! Will support different bus types.
+ * Bus list.
*/
- private BusDeployment jmsBusDeployment;
+ private List<BusDeployment> busDeployments = new ArrayList<BusDeployment>();
/**
* Coordinator monitor timeout. The number of milliseconds before a deployment is marked as offline.
*/
@@ -125,33 +126,55 @@
@Initialize
public final void initialize() throws DeploymentException
{
- // TODO: JMSBus is temporarily hardwired in... pull it out and generalise later...
- JMSBus bus = new JMSBus();
- bus.setDeploymentName(runtime.getDeploymentName());
- bus.setDeploymentId(runtime.getDeploymentId());
+ ApplicationProperties deploymentProperties = PropertiesUtil.getDeploymentConfig();
- bus.connect();
+ // Set the monitoring timeout - 4 heartbeats...
+ monitorTimeout = (deploymentProperties.getLongProperty(COORDINATOR_HEARTBEAT_FREQUENCY_KEY, COORDINATOR_HEARTBEAT_DEFAULT_FREQUENCY) * 4);
- if (bus.isConnected())
+ // Connect all configured buses...
+ Set<Map.Entry<Object, Object>> configEntries = deploymentProperties.entrySet();
+ for (Map.Entry<Object, Object> configEntry : configEntries)
{
- ApplicationProperties deploymentProperties = PropertiesUtil.getDeploymentConfig();
+ String propertyName = (String) configEntry.getKey();
+ if(propertyName.startsWith("bus."))
+ {
+ String busClassName = (String) configEntry.getValue();
+ try
+ {
+ Bus bus = Bus.Factory.newInstance(busClassName);
- try
- {
- // Tell the other deployments using this bus that this deployment is online...
- bus.sendNotification(detailsNotification);
+ bus.setDeploymentName(runtime.getDeploymentName());
+ bus.setDeploymentId(runtime.getDeploymentId());
+ bus.connect();
+
+ if (bus.isConnected())
+ {
+ busDeployments.add(new BusDeployment(bus));
+
+ try
+ {
+ // Tell the other deployments using this bus that this deployment is online...
+ bus.sendNotification(detailsNotification);
+ }
+ catch (RoutingException e)
+ {
+ bus.close();
+ logger.error("Unable to start Bus. Failed to deliver Deployment Details Notification (on statrup) to bus '" + bus.getClass().getName() + "'.", e);
+ return;
+ }
+ }
+ }
+ catch (DeploymentException e)
+ {
+ uninitialize();
+ throw e;
+ }
+ catch(Throwable t)
+ {
+ uninitialize();
+ throw new DeploymentException("Unexpected exception while starting Deployment Bus interfaces.", t);
+ }
}
- catch (RoutingException e)
- {
- bus.close();
- logger.error("Unable to start Bus. Failed to deliver Deployment Details Notification (on statrup) to bus '" + bus.getClass().getName() + "'.", e);
- return;
- }
-
- // Set the monitoring timeout - 4 heartbeats...
- // TODO: Move this to a deployment related config i.e. a non-bus related config
- monitorTimeout = (deploymentProperties.getLongProperty(COORDINATOR_HEARTBEAT_FREQUENCY_KEY, COORDINATOR_HEARTBEAT_DEFAULT_FREQUENCY) * 4);
- jmsBusDeployment = new BusDeployment(bus);
}
}
@@ -161,19 +184,27 @@
@Uninitialize
public final void uninitialize()
{
- if (jmsBusDeployment != null)
+ if (!busDeployments.isEmpty())
{
- try
+ for (BusDeployment busDeployment : busDeployments)
{
- // Tell all other deployments we're no longer online...
- jmsBusDeployment.bus.sendNotification(DeploymentUndeployNotification.toNotification(runtime));
+ try
+ {
+ busDeployment.getBus().sendNotification(DeploymentUndeployNotification.toNotification(runtime));
+ }
+ catch(Throwable t)
+ {
+ logger.error("Error sending undeploy notification on Bus '" + busDeployment.getBus().getClass().getName() + "'.", t);
+ }
+ try
+ {
+ busDeployment.getBus().close();
+ }
+ catch(Throwable t)
+ {
+ logger.error("Error closing Bus '" + busDeployment.getBus().getClass().getName() + "'.", t);
+ }
}
- catch (Throwable t)
- {
- logger.warn("Error sending undeploy notification'.", t);
- }
-
- jmsBusDeployment.bus.close();
}
}
@@ -184,22 +215,30 @@
*/
public final void onSchedule() throws SchedulingException
{
- if (jmsBusDeployment != null)
+ if (!busDeployments.isEmpty())
{
- try
+ for (BusDeployment busDeployment : busDeployments)
{
- if (broadcastHeartbeat)
+ try
{
- // Tell all other deployments we're still alive...
- jmsBusDeployment.bus.sendNotification(heartbeatNotification);
+ if(broadcastHeartbeat)
+ {
+ busDeployment.getBus().sendNotification(heartbeatNotification);
+ }
}
-
- jmsBusDeployment.checkMonitors();
+ catch(Throwable t)
+ {
+ logger.error("Error sending heartbeat notification on Bus '" + busDeployment.getBus().getClass().getName() + "'.", t);
+ }
+ try
+ {
+ busDeployment.checkMonitors();
+ }
+ catch(Throwable t)
+ {
+ logger.error("Error checking monitors on Bus '" + busDeployment.getBus().getClass().getName() + "'.", t);
+ }
}
- catch (Throwable t)
- {
- logger.warn("Error sending deployment heartbeat notification'.", t);
- }
}
}
@@ -214,19 +253,19 @@
}
/**
- * Get the deployment monitors being managed by this deployment.
+ * Get the {@link BusDeployment} instances being managed by this deployment.
*
- * @return The deployment monitors being managed by this deployment.
+ * @return The {@link BusDeployment} instances being managed by this deployment.
*/
- public final Map<String, DeploymentMonitor> getDeploymentMonitors()
+ public final List<BusDeployment> getBusDeployments()
{
- return jmsBusDeployment.deploymentMonitors;
+ return busDeployments;
}
/**
* JBoss ESB Deployment.
*/
- private final class BusDeployment
+ public final class BusDeployment
{
/**
* The Bus.
@@ -252,6 +291,16 @@
bus.setNotificationListener(notificationListener);
}
+ public Bus getBus()
+ {
+ return bus;
+ }
+
+ public Map<String, DeploymentMonitor> getDeploymentMonitors()
+ {
+ return deploymentMonitors;
+ }
+
/**
* Bus notification listener.
*/
Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/Bus.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/Bus.java 2008-09-12 17:44:38 UTC (rev 22723)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/Bus.java 2008-09-12 19:57:15 UTC (rev 22724)
@@ -19,13 +19,14 @@
*/
package org.jboss.esb.federate.bus;
+import org.jboss.esb.classpath.ClassUtil;
+import org.jboss.esb.deploy.DeploymentException;
import org.jboss.esb.federate.notify.AbstractDeploymentNotification;
import org.jboss.esb.federate.notify.NotificationListener;
import org.jboss.esb.message.Message;
import org.jboss.esb.routing.MessageDispatcher;
import org.jboss.esb.routing.RoutingException;
import org.jboss.esb.service.ServiceName;
-import org.jboss.esb.deploy.DeploymentException;
/**
* JBoss ESB Bus definition.
@@ -105,4 +106,36 @@
* @throws RoutingException Error sending notification onto the Bus.
*/
void sendNotification(AbstractDeploymentNotification notification) throws RoutingException;
+
+ /**
+ * Bus Factory class.
+ */
+ public static class Factory
+ {
+ /**
+ * New Bus instance.
+ * @param className Bus class name.
+ * @return Bus instance.
+ * @throws DeploymentException Error creating Bus instance.
+ */
+ public static Bus newInstance(String className) throws DeploymentException
+ {
+ try
+ {
+ return (Bus) ClassUtil.forName(className, Bus.class).newInstance();
+ }
+ catch (InstantiationException e)
+ {
+ throw new DeploymentException("Failed to create instance of Bus class '" + className + "'. Class must define a public default constructor.", e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new DeploymentException("Failed to create instance of Bus class '" + className + "'.", e);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new DeploymentException("Failed to create instance of Bus class '" + className + "'. Class not found on classpath.", e);
+ }
+ }
+ }
}
Modified: labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/DeploymentCoordinatorTest.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/DeploymentCoordinatorTest.java 2008-09-12 17:44:38 UTC (rev 22723)
+++ labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/DeploymentCoordinatorTest.java 2008-09-12 19:57:15 UTC (rev 22724)
@@ -48,16 +48,16 @@
DeploymentCoordinator coordinator2 = deployment2.getDeploymentCoordinator();
DeploymentMonitor monitor;
- assertEquals(1, coordinator1.getDeploymentMonitors().size());
- assertEquals(1, coordinator2.getDeploymentMonitors().size());
+ assertEquals(1, coordinator1.getBusDeployments().get(0).getDeploymentMonitors().size());
+ assertEquals(1, coordinator2.getBusDeployments().get(0).getDeploymentMonitors().size());
// So deployment1 should be monitoring deployment2...
- monitor = (DeploymentMonitor) coordinator1.getDeploymentMonitors().values().toArray()[0];
+ monitor = (DeploymentMonitor) coordinator1.getBusDeployments().get(0).getDeploymentMonitors().values().toArray()[0];
monitor.getServiceSets().setDeploymentId("x");
assertEquals("{online=true} deployment2:x[Services: [hello:goodbye]][OutboundRoutedServices: [hello:hello]]", monitor.toString());
// And deployment2 should be monitoring deployment1...
- monitor = (DeploymentMonitor) coordinator2.getDeploymentMonitors().values().toArray()[0];
+ monitor = (DeploymentMonitor) coordinator2.getBusDeployments().get(0).getDeploymentMonitors().values().toArray()[0];
monitor.getServiceSets().setDeploymentId("x");
assertEquals("{online=true} deployment1:x[Services: [hello:hello]][OutboundRoutedServices: [hello:hello, hello:goodbye]]", monitor.toString());
@@ -70,25 +70,25 @@
DeploymentCoordinator coordinator3 = deployment3.getDeploymentCoordinator();
// Each of the 3 deployments should be monitoring the other 2 deployments...
- assertEquals(2, coordinator1.getDeploymentMonitors().size());
- assertEquals(2, coordinator2.getDeploymentMonitors().size());
- assertEquals(2, coordinator3.getDeploymentMonitors().size());
+ assertEquals(2, coordinator1.getBusDeployments().get(0).getDeploymentMonitors().size());
+ assertEquals(2, coordinator2.getBusDeployments().get(0).getDeploymentMonitors().size());
+ assertEquals(2, coordinator3.getBusDeployments().get(0).getDeploymentMonitors().size());
// This part of the test will just use toStrings because the monitor maps are not linked i.e. not ordered...
String monitorsTOString;
// deployment1 should be monitoring deployment2 and deployment3...
- monitorsTOString = coordinator1.getDeploymentMonitors().values().toString();
+ monitorsTOString = coordinator1.getBusDeployments().get(0).getDeploymentMonitors().values().toString();
assertTrue(monitorsTOString.indexOf("{online=true} deployment2") != -1);
assertTrue(monitorsTOString.indexOf("{online=true} deployment3") != -1);
// deployment2 should be monitoring deployment1 and deployment3...
- monitorsTOString = coordinator2.getDeploymentMonitors().values().toString();
+ monitorsTOString = coordinator2.getBusDeployments().get(0).getDeploymentMonitors().values().toString();
assertTrue(monitorsTOString.indexOf("{online=true} deployment1") != -1);
assertTrue(monitorsTOString.indexOf("{online=true} deployment3") != -1);
// deployment3 should be monitoring deployment1 and deployment2...
- monitorsTOString = coordinator3.getDeploymentMonitors().values().toString();
+ monitorsTOString = coordinator3.getBusDeployments().get(0).getDeploymentMonitors().values().toString();
assertTrue(monitorsTOString.indexOf("{online=true} deployment1") != -1);
assertTrue(monitorsTOString.indexOf("{online=true} deployment2") != -1);
} finally {
@@ -96,16 +96,16 @@
}
Thread.sleep(500);
- assertEquals(1, coordinator1.getDeploymentMonitors().size());
- assertEquals(1, coordinator2.getDeploymentMonitors().size());
+ assertEquals(1, coordinator1.getBusDeployments().get(0).getDeploymentMonitors().size());
+ assertEquals(1, coordinator2.getBusDeployments().get(0).getDeploymentMonitors().size());
// So deployment1 should be monitoring deployment2...
- monitor = (DeploymentMonitor) coordinator1.getDeploymentMonitors().values().toArray()[0];
+ monitor = (DeploymentMonitor) coordinator1.getBusDeployments().get(0).getDeploymentMonitors().values().toArray()[0];
monitor.getServiceSets().setDeploymentId("x");
assertEquals("{online=true} deployment2:x[Services: [hello:goodbye]][OutboundRoutedServices: [hello:hello]]", monitor.toString());
// And deployment2 should be monitoring deployment1...
- monitor = (DeploymentMonitor) coordinator2.getDeploymentMonitors().values().toArray()[0];
+ monitor = (DeploymentMonitor) coordinator2.getBusDeployments().get(0).getDeploymentMonitors().values().toArray()[0];
monitor.getServiceSets().setDeploymentId("x");
assertEquals("{online=true} deployment1:x[Services: [hello:hello]][OutboundRoutedServices: [hello:hello, hello:goodbye]]", monitor.toString());
} finally {
@@ -114,7 +114,7 @@
Thread.sleep(500);
// deployment1 should no longer be be monitoring deployment2...
- assertEquals(0, coordinator1.getDeploymentMonitors().size());
+ assertEquals(0, coordinator1.getBusDeployments().get(0).getDeploymentMonitors().size());
} finally {
deployment1.undeploy();
}
@@ -141,16 +141,16 @@
DeploymentCoordinator coordinator2 = deployment2.getDeploymentCoordinator();
DeploymentMonitor monitor;
- assertEquals(1, coordinator1.getDeploymentMonitors().size());
- assertEquals(1, coordinator2.getDeploymentMonitors().size());
+ assertEquals(1, coordinator1.getBusDeployments().get(0).getDeploymentMonitors().size());
+ assertEquals(1, coordinator2.getBusDeployments().get(0).getDeploymentMonitors().size());
// So deployment1 should be monitoring deployment2...
- monitor = (DeploymentMonitor) coordinator1.getDeploymentMonitors().values().toArray()[0];
+ monitor = (DeploymentMonitor) coordinator1.getBusDeployments().get(0).getDeploymentMonitors().values().toArray()[0];
monitor.getServiceSets().setDeploymentId("x");
assertEquals("{online=true} deployment2:x[Services: [hello:goodbye]][OutboundRoutedServices: [hello:hello]]", monitor.toString());
// And deployment2 should be monitoring deployment1...
- monitor = (DeploymentMonitor) coordinator2.getDeploymentMonitors().values().toArray()[0];
+ monitor = (DeploymentMonitor) coordinator2.getBusDeployments().get(0).getDeploymentMonitors().values().toArray()[0];
monitor.getServiceSets().setDeploymentId("x");
assertEquals("{online=true} deployment1:x[Services: [hello:hello]][OutboundRoutedServices: [hello:hello, hello:goodbye]]", monitor.toString());
@@ -159,7 +159,7 @@
Thread.sleep(6000);
// deployment2 should see deployment1 as being offline now...
- monitor = (DeploymentMonitor) coordinator2.getDeploymentMonitors().values().toArray()[0];
+ monitor = (DeploymentMonitor) coordinator2.getBusDeployments().get(0).getDeploymentMonitors().values().toArray()[0];
monitor.getServiceSets().setDeploymentId("x");
assertEquals("{online=false} deployment1:x[Services: [hello:hello]][OutboundRoutedServices: [hello:hello, hello:goodbye]]", monitor.toString());
@@ -168,7 +168,7 @@
Thread.sleep(6000);
// deployment2 should see deployment1 as being online again...
- monitor = (DeploymentMonitor) coordinator2.getDeploymentMonitors().values().toArray()[0];
+ monitor = (DeploymentMonitor) coordinator2.getBusDeployments().get(0).getDeploymentMonitors().values().toArray()[0];
monitor.getServiceSets().setDeploymentId("x");
assertEquals("{online=true} deployment1:x[Services: [hello:hello]][OutboundRoutedServices: [hello:hello, hello:goodbye]]", monitor.toString());
@@ -178,7 +178,7 @@
Thread.sleep(100);
// deployment1 should no longer be be monitoring deployment2...
- assertEquals(0, coordinator1.getDeploymentMonitors().size());
+ assertEquals(0, coordinator1.getBusDeployments().get(0).getDeploymentMonitors().size());
} finally {
deployment1.undeploy();
}
More information about the jboss-svn-commits
mailing list