[jboss-svn-commits] JBL Code SVN: r24033 - in labs/jbossesb/workspace/skeagh: routing/jms/src/main/java/org/jboss/esb/jms and 6 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Nov 21 09:05:34 EST 2008
Author: tfennelly
Date: 2008-11-21 09:05:34 -0500 (Fri, 21 Nov 2008)
New Revision: 24033
Added:
labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/bus/
labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/bus/JMSBus.java
labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/bus/package.html
labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/bus/
labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/bus/JMSBus_Standalone_Test.java
labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/bus/jmsbus.properties
Removed:
labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/jms/
labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/jms/
labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/bus/jms/JMSBus_Standalone_Test.java
labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/bus/jms/jmsbus.properties
Modified:
labs/jbossesb/workspace/skeagh/performance/src/test/resources/META-INF/jbossesb/jbossesb-default.properties
labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/jbossesb-default.properties
Log:
https://jira.jboss.org/jira/browse/JBESB-2182
Modified: labs/jbossesb/workspace/skeagh/performance/src/test/resources/META-INF/jbossesb/jbossesb-default.properties
===================================================================
--- labs/jbossesb/workspace/skeagh/performance/src/test/resources/META-INF/jbossesb/jbossesb-default.properties 2008-11-21 13:47:36 UTC (rev 24032)
+++ labs/jbossesb/workspace/skeagh/performance/src/test/resources/META-INF/jbossesb/jbossesb-default.properties 2008-11-21 14:05:34 UTC (rev 24033)
@@ -6,7 +6,7 @@
coordinator.heartbeat.frequency=700
# Buses to be deployed...
-bus.jms=org.jboss.esb.federate.bus.jms.JMSBus
+bus.jms=org.jboss.esb.jms.bus.JMSBus
# Pre-installed deployment units...
unit.deadletter=deadletter.xml
Copied: labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/bus/JMSBus.java (from rev 24032, labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/jms/JMSBus.java)
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/bus/JMSBus.java (rev 0)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/bus/JMSBus.java 2008-11-21 14:05:34 UTC (rev 24033)
@@ -0,0 +1,559 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2008, JBoss Inc.
+ */
+package org.jboss.esb.jms.bus;
+
+import org.apache.log4j.Logger;
+import org.jboss.esb.api.bus.AbstractNotification;
+import org.jboss.esb.api.bus.BusMessage;
+import org.jboss.esb.api.context.DeploymentContext;
+import org.jboss.esb.api.context.ResourceLocator;
+import org.jboss.esb.api.routing.RoutingException;
+import org.jboss.esb.deploy.DeploymentException;
+import org.jboss.esb.deploy.DeploymentRuntime;
+import org.jboss.esb.api.bus.AbstractBus;
+import org.jboss.esb.jms.AbstractMessageHandler;
+import org.jboss.esb.jms.AbstractMessageListener;
+import org.jboss.esb.jms.JMSSession;
+import org.jboss.esb.jms.MessageSender;
+
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+import java.util.Properties;
+
+/**
+ * ESB Bus implementation for JMS.
+ * <p/>
+ * Acts as the interface to the JMS Bus for a deployment.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class JMSBus extends AbstractBus
+{
+ /**
+ * Logger.
+ */
+ private static Logger logger = Logger.getLogger(JMSBus.class);
+ /**
+ * Deployment coordination topic property key name.
+ */
+ public static final String DEPLOYMENT_COORDINTATION_TOPIC_KEY = "deployment.coordintation.topic";
+ /**
+ * Default deployment coordination topic name.
+ */
+ public static final String DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME = "jbossesb.deployment.coordintation.topic";
+ /**
+ * Deployment JMS Bus Queue name config key.
+ */
+ public static final String BUS_QUEUE_KEY = "deployment.bus.queue";
+ /**
+ * Default Deployment JMS Bus Queue name.
+ */
+ public static final String DEFAULT_BUS_QUEUE_NAME = "jbossesb.jms.bus";
+ /**
+ * Deployment ID key.
+ */
+ public static final String DEPLOYMENT_ID = "jbossesb_deployment_id";
+ /**
+ * Shared JMS Topic Session.
+ */
+ private JMSSession topicSession;
+ /**
+ * Shared JMS Topic Session.
+ */
+ private JMSSession queueSession;
+ /**
+ * JMS Deployment coordination listener.
+ */
+ private JMSCoordinationListener notificationListener;
+ /**
+ * JMS Bus Message Listener.
+ */
+ private JMSQueueMessageListener messageListener;
+ /**
+ * Deployment notification broadcaster.
+ */
+ private MessageSender notificationSender;
+ /**
+ * JMS Bus Message Sender.
+ */
+ private MessageSender messageSender;
+
+ /**
+ * Connect the bus.
+ *
+ * @throws RoutingException Connection exception.
+ */
+ public final void connect() throws RoutingException
+ {
+ assertIsConfigured();
+
+ /*
+ * Need to set the context classloader to this classes classloader. When deployed/included
+ * as an osgi bundle this calls make by this class will involve jndi which uses
+ * the thread context classloader. The application classloader will the the context classloader
+ * for the bundle and will not have the ability to use the bundles classloader to find the
+ * resources is needs.
+ */
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try
+ {
+ try
+ {
+ connectJMSSessions(getProperties());
+ }
+ catch (DeploymentException e)
+ {
+ throw new RoutingException("Unable to connect JMS Bus interface for deployment '" + getDeploymentName() + "'.", e);
+ }
+
+ if (topicSession != null && queueSession != null)
+ {
+ try
+ {
+ connectCoordinationListener(getProperties());
+ connectCoordintationSender(getProperties());
+ connectBusListener(getProperties());
+ connectBusSender(getProperties());
+ }
+ catch (DeploymentException e)
+ {
+ try
+ {
+ if (notificationListener == null && notificationSender == null && messageListener == null && messageSender == null)
+ {
+ if (!logger.isDebugEnabled())
+ {
+ logger.info("Deployment '" + getDeploymentName() + "' is not being coordinated with any other deployment. Turn on deug logging for more details.");
+ }
+ else
+ {
+ logger.debug("Deployment '" + getDeploymentName() + "' is not being coordinated with any other deployment.", e);
+ }
+ }
+ else
+ {
+ throw new RoutingException("Unable to connect JMS Bus interface for deployment '" + getDeploymentName() + "'.", e);
+ }
+ }
+ finally
+ {
+ closeJMSSessions();
+ }
+ }
+ catch (Throwable t)
+ {
+ closeJMSSessions();
+ throw new RoutingException("Unable to connect JMS Bus interface for deployment '" + getDeploymentName() + "'.", t);
+ }
+ }
+ }
+ finally
+ {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+
+ /**
+ * Is the bus connected.
+ *
+ * @return True if the bus is connected, otherwise false.
+ */
+ public final boolean isConnected()
+ {
+ return (topicSession != null && queueSession != null);
+ }
+
+ /**
+ * Disconnect from the Bus.
+ */
+ public final void disconnect()
+ {
+ try
+ {
+ closeJMSHandler("Deployment Coordination Listener", notificationListener);
+ closeJMSHandler("Deployment Coordination Sender", notificationSender);
+ closeJMSHandler("JMS Bus Listener", messageListener);
+ closeJMSHandler("JMS Bus Sender", messageSender);
+ }
+ finally
+ {
+ closeJMSSessions();
+ }
+ }
+
+ /**
+ * Send the supplied message to the specified deployment via the bus.
+ *
+ * @param message The message.
+ * @param targetDeploymentId The target deployment ID.
+ * @throws RoutingException Error sending message onto the Bus.
+ */
+ public final void sendMessage(final BusMessage message, final String targetDeploymentId) throws RoutingException
+ {
+ try
+ {
+ ObjectMessage jmsMessage = messageSender.getSession().createObjectMessage(message);
+
+ jmsMessage.setStringProperty(DEPLOYMENT_ID, targetDeploymentId);
+ messageSender.send(jmsMessage);
+ }
+ catch (JMSException e)
+ {
+ throw new RoutingException("Error sending message to JMS Bus for deployment ID '" + targetDeploymentId + "'.", e);
+ }
+ }
+
+ /**
+ * Send a notification message onto the Bus.
+ * <p/>
+ * This allows notification messages to be exchanged with other deployments
+ * using the underlying Bus (deployment details, heartbeat, undeploy etc).
+ *
+ * @param notification The notification message to be sent onto the Bus.
+ * @throws RoutingException Error sending notification onto the Bus.
+ */
+ public final void sendNotification(final AbstractNotification notification) throws RoutingException
+ {
+ if (notificationSender != null)
+ {
+ try
+ {
+ notificationSender.send(notificationSender.getSession().createObjectMessage(notification));
+ }
+ catch (JMSException e)
+ {
+ throw new RoutingException("Unable to send deployment notification onto JMS Bus.", e);
+ }
+ }
+ }
+
+ /**
+ * Stop listening for ESB messages on the bus.
+ */
+ public final void stopListening()
+ {
+ closeJMSHandler("JMS Bus Listener", messageListener);
+ messageListener = null;
+ }
+
+ /**
+ * Connect the deployment coordination listener.
+ *
+ * @param busProperties Bus configuration properties.
+ * @throws org.jboss.esb.deploy.DeploymentException
+ * Failed to connect listener.
+ */
+ private void connectCoordinationListener(final Properties busProperties) throws DeploymentException
+ {
+ String coordinationTopicName = busProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
+
+ notificationListener = new JMSCoordinationListener(coordinationTopicName, busProperties);
+ try
+ {
+ notificationListener.connect();
+ }
+ catch (JMSException e)
+ {
+ notificationListener = null;
+ throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
+ + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
+ + "JNDI properties used: " + busProperties, e);
+ }
+ }
+
+ /**
+ * Connect the deployment coordination sender.
+ *
+ * @param busProperties Bus configuration properties.
+ * @throws org.jboss.esb.deploy.DeploymentException
+ * Failed to connect sender.
+ */
+ private void connectCoordintationSender(final Properties busProperties) throws DeploymentException
+ {
+ String coordinationTopicName = busProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
+
+ notificationSender = new MessageSender(coordinationTopicName, topicSession);
+ try
+ {
+ notificationSender.connect();
+ }
+ catch (JMSException e)
+ {
+ notificationSender = null;
+ throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
+ + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
+ + "JNDI properties used: " + busProperties, e);
+ }
+ }
+
+ /**
+ * Connect the {@link BusMessage} listener.
+ *
+ * @param busProperties Bus configuration properties.
+ * @throws org.jboss.esb.deploy.DeploymentException
+ * Failed to connect listener.
+ */
+ private void connectBusListener(final Properties busProperties) throws DeploymentException
+ {
+ String busQueueName = busProperties.getProperty(BUS_QUEUE_KEY, DEFAULT_BUS_QUEUE_NAME);
+
+ messageListener = new JMSQueueMessageListener(busQueueName, busProperties);
+ try
+ {
+ messageListener.connect();
+ }
+ catch (JMSException e)
+ {
+ messageListener = null;
+ throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
+ + "to connect to the JMS Bus Queue '" + busQueueName + "'. The JMS Server must be running and this Queue must be deployed. "
+ + "JNDI properties used: " + busProperties, e);
+ }
+ }
+
+ /**
+ * Connect the {@link BusMessage} sender.
+ *
+ * @param busProperties Bus configuration properties.
+ * @throws org.jboss.esb.deploy.DeploymentException
+ * Failed to connect sender.
+ */
+ private void connectBusSender(final Properties busProperties) throws DeploymentException
+ {
+ String busQueueName = busProperties.getProperty(BUS_QUEUE_KEY, DEFAULT_BUS_QUEUE_NAME);
+
+ messageSender = new MessageSender(busQueueName, queueSession);
+ try
+ {
+ messageSender.connect();
+ }
+ catch (JMSException e)
+ {
+ messageSender = null;
+ throw new DeploymentException("Deployment '" + getDeploymentName() + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
+ + "to connect to the JMS Bus Queue '" + busQueueName + "'. The JMS Server must be running and this Queue must be deployed. "
+ + "JNDI properties used: " + busProperties, e);
+ }
+ }
+
+ /**
+ * Initialize the shared JMS Sessions.
+ *
+ * @param localBusProperties Local JMS Bus properties.
+ */
+ private void connectJMSSessions(final Properties localBusProperties) throws DeploymentException
+ {
+ ResourceLocator resourceLocator = DeploymentRuntime.getResourceLocator(DeploymentContext.getContext());
+
+ topicSession = new JMSSession(JMSSession.Type.TOPIC, localBusProperties);
+ try
+ {
+ if(resourceLocator != null)
+ {
+ topicSession.setClassLoader(resourceLocator.getClassLoader());
+ }
+ topicSession.connect();
+ }
+ catch (Throwable t)
+ {
+ closeJMSSessions();
+ logger.debug("Failed to connect shared deployment JMS Topic Session.", t);
+ return;
+ }
+ try
+ {
+ queueSession = new JMSSession(JMSSession.Type.QUEUE, localBusProperties);
+ if(resourceLocator != null)
+ {
+ queueSession.setClassLoader(resourceLocator.getClassLoader());
+ }
+ queueSession.connect();
+ }
+ catch (Throwable t)
+ {
+ closeJMSSessions();
+ // OK... this is an error... we were able to connect to the topic, but not the queue???
+ throw new DeploymentException("Failed to connect shared deployment JMS Queue Session.", t);
+ }
+ }
+
+ /**
+ * Close the JMS Sessions.
+ */
+ private void closeJMSSessions()
+ {
+ // Close the sessions...
+ if (topicSession != null)
+ {
+ if (topicSession.isConnected())
+ {
+ topicSession.close();
+ }
+ topicSession = null;
+ }
+ if (queueSession != null)
+ {
+ if (queueSession.isConnected())
+ {
+ queueSession.close();
+ }
+ queueSession = null;
+ }
+ }
+
+ /**
+ * Deployment Coordination Bus Notification Listener.
+ */
+ private class JMSCoordinationListener extends AbstractMessageListener
+ {
+
+ /**
+ * Constructor.
+ *
+ * @param destinationName Destination name.
+ * @param jndiProperties JNDI properties.
+ */
+ protected JMSCoordinationListener(final String destinationName, final Properties jndiProperties)
+ {
+ super(destinationName, topicSession, jndiProperties);
+ }
+
+ /**
+ * Handle a coordination event from other deployments.
+ *
+ * @param message Coordination message.
+ */
+ public final void onMessage(final javax.jms.Message message)
+ {
+ if (message instanceof ObjectMessage)
+ {
+ try
+ {
+ Object objectMessage = ((ObjectMessage) message).getObject();
+
+ if (objectMessage instanceof AbstractNotification)
+ {
+ AbstractNotification notification = (AbstractNotification) objectMessage;
+ try
+ {
+ getNotificationListener().onNotification(notification);
+ }
+ catch (RoutingException e)
+ {
+ logger.error("Deployment '" + getDeploymentId() + "' failed to accept notification '" + notification.getClass().getName() + "' from '" + notification.getDeploymentId() + "'.", e);
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ logger.error("Unable to get Object from JMS ObjectMessage.", e);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * JMS Bus message listener.
+ */
+ private class JMSQueueMessageListener extends AbstractMessageListener
+ {
+
+ /**
+ * Constructor.
+ *
+ * @param destinationName Destination name.
+ * @param jndiProperties JNDI properties.
+ */
+ protected JMSQueueMessageListener(final String destinationName, final Properties jndiProperties)
+ {
+ super(destinationName, queueSession, addDeploymentSelector(jndiProperties, getDeploymentId()));
+ }
+
+ /**
+ * Process a BusMessage for this deployment.
+ *
+ * @param message The bus message.
+ */
+ public final void onMessage(final javax.jms.Message message)
+ {
+ if (message instanceof ObjectMessage)
+ {
+ try
+ {
+ Object objectMessage = ((ObjectMessage) message).getObject();
+
+ if (objectMessage instanceof BusMessage)
+ {
+ try
+ {
+ getMessageListener().onMessage((BusMessage) objectMessage);
+ }
+ catch (RoutingException e)
+ {
+ logger.error("Error processing ESB Message: " + objectMessage, e);
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ logger.warn("Unable to get Object from JMS ObjectMessage.", e);
+ }
+ }
+ }
+
+
+ }
+
+ /**
+ * Set the deployment ID selector on the specified properties.
+ *
+ * @param jmsProperties JMS connection properties.
+ * @param deploymentId Deployment ID.
+ * @return The new properties with the JMS Selector set.
+ */
+ private static Properties addDeploymentSelector(final Properties jmsProperties, final String deploymentId)
+ {
+ Properties properties = (Properties) jmsProperties.clone();
+ properties.setProperty(AbstractMessageListener.MESSAGE_SELECTOR, DEPLOYMENT_ID + "='" + deploymentId + "'");
+ return properties;
+ }
+
+ /**
+ * Close JMS Handler.
+ *
+ * @param name The handler name.
+ * @param handler The handler.
+ */
+ private void closeJMSHandler(final String name, final AbstractMessageHandler handler)
+ {
+ if (handler != null && handler.isConnected())
+ {
+ try
+ {
+ handler.close();
+ }
+ catch (Throwable t)
+ {
+ logger.warn("Error closing " + name + " for deployment '" + getDeploymentName() + "'.", t);
+ }
+ }
+ }
+}
Property changes on: labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/bus/JMSBus.java
___________________________________________________________________
Name: svn:eol-style
+ native
Copied: labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/bus/package.html (from rev 24031, labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/jms/package.html)
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/bus/package.html (rev 0)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/bus/package.html 2008-11-21 14:05:34 UTC (rev 24033)
@@ -0,0 +1,8 @@
+<html>
+<head></head>
+<body>
+JMS Bus.
+
+<h2>Package Specification</h2>
+</body>
+</html>
\ No newline at end of file
Property changes on: labs/jbossesb/workspace/skeagh/routing/jms/src/main/java/org/jboss/esb/jms/bus/package.html
___________________________________________________________________
Name: svn:mergeinfo
+
Copied: labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/bus/JMSBus_Standalone_Test.java (from rev 24032, labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/bus/jms/JMSBus_Standalone_Test.java)
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/bus/JMSBus_Standalone_Test.java (rev 0)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/bus/JMSBus_Standalone_Test.java 2008-11-21 14:05:34 UTC (rev 24033)
@@ -0,0 +1,235 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2008, JBoss Inc.
+ */
+package org.jboss.esb.jms.bus;
+
+import junit.framework.TestCase;
+import org.jboss.esb.api.bus.AbstractNotification;
+import org.jboss.esb.api.bus.BusMessage;
+import org.jboss.esb.api.bus.BusMessageListener;
+import org.jboss.esb.api.bus.BusNotificationListener;
+import org.jboss.esb.api.routing.RoutingException;
+import org.jboss.esb.deploy.DeploymentException;
+import org.jboss.esb.federate.notify.DeploymentHeartbeatNotification;
+import org.jboss.esb.properties.ApplicationProperties;
+import org.jboss.esb.test.JmsTestRunner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class JMSBus_Standalone_Test extends TestCase
+{
+ public void test_send_message() throws Exception
+ {
+ new JmsTestRunner()
+ {
+ @Override
+ public void test() throws Exception
+ {
+ JMSBus interface1 = createBusInterfaceInstance("deployment1");
+ TestBusMessageListener interface1Listener = new TestBusMessageListener();
+ interface1.setMessageListener(interface1Listener);
+ try
+ {
+ JMSBus interface2 = createBusInterfaceInstance("deployment2");
+ TestBusMessageListener interface2Listener = new TestBusMessageListener();
+ interface2.setMessageListener(interface2Listener);
+ try
+ {
+ JMSBus interface3 = createBusInterfaceInstance("deployment3");
+ TestBusMessageListener interface3Listener = new TestBusMessageListener();
+ interface3.setMessageListener(interface3Listener);
+ try
+ {
+ BusMessage message = new BusMessage();
+
+ // Send a message from deployment1 to deployment2
+ assertEquals(0, interface1Listener.messagesReceived.size());
+ assertEquals(0, interface2Listener.messagesReceived.size());
+ assertEquals(0, interface3Listener.messagesReceived.size());
+ interface1.sendMessage(message, "deployment2");
+ Thread.sleep(100);
+ assertEquals(0, interface1Listener.messagesReceived.size());
+ assertEquals(1, interface2Listener.messagesReceived.size());
+ assertEquals(0, interface3Listener.messagesReceived.size());
+ interface2Listener.messagesReceived.clear();
+
+ // Send a message from deployment2 to deployment3
+ assertEquals(0, interface1Listener.messagesReceived.size());
+ assertEquals(0, interface2Listener.messagesReceived.size());
+ assertEquals(0, interface3Listener.messagesReceived.size());
+ interface2.sendMessage(message, "deployment3");
+ Thread.sleep(100);
+ assertEquals(0, interface1Listener.messagesReceived.size());
+ assertEquals(0, interface2Listener.messagesReceived.size());
+ assertEquals(1, interface3Listener.messagesReceived.size());
+ interface3Listener.messagesReceived.clear();
+
+ // Send a message from deployment3 to deployment1
+ assertEquals(0, interface1Listener.messagesReceived.size());
+ assertEquals(0, interface2Listener.messagesReceived.size());
+ assertEquals(0, interface3Listener.messagesReceived.size());
+ interface3.sendMessage(message, "deployment1");
+ Thread.sleep(100);
+ assertEquals(1, interface1Listener.messagesReceived.size());
+ assertEquals(0, interface2Listener.messagesReceived.size());
+ assertEquals(0, interface3Listener.messagesReceived.size());
+ interface1Listener.messagesReceived.clear();
+ }
+ finally
+ {
+ interface3.disconnect();
+ }
+ }
+ finally
+ {
+ interface2.disconnect();
+ }
+ }
+ finally
+ {
+ interface1.disconnect();
+ }
+ }
+ }.run();
+ }
+
+ public void test_send_notification() throws Exception
+ {
+ new JmsTestRunner()
+ {
+ @Override
+ public void test() throws Exception
+ {
+ JMSBus interface1 = createBusInterfaceInstance("deployment1");
+ TestNotificationListener interface1Listener = new TestNotificationListener();
+ interface1.setNotificationListener(interface1Listener);
+ try
+ {
+ JMSBus interface2 = createBusInterfaceInstance("deployment2");
+ TestNotificationListener interface2Listener = new TestNotificationListener();
+ interface2.setNotificationListener(interface2Listener);
+ try
+ {
+ JMSBus interface3 = createBusInterfaceInstance("deployment3");
+ TestNotificationListener interface3Listener = new TestNotificationListener();
+ interface3.setNotificationListener(interface3Listener);
+ try
+ {
+ DeploymentHeartbeatNotification notification = new DeploymentHeartbeatNotification();
+
+ // deployment1 send a notification...
+ assertEquals(0, interface1Listener.notificationsReceived.size());
+ assertEquals(0, interface2Listener.notificationsReceived.size());
+ assertEquals(0, interface3Listener.notificationsReceived.size());
+ interface1.sendNotification(notification);
+ Thread.sleep(100);
+ assertEquals(0, interface1Listener.notificationsReceived.size());
+ assertEquals(1, interface2Listener.notificationsReceived.size());
+ assertEquals(1, interface3Listener.notificationsReceived.size());
+ interface2Listener.notificationsReceived.clear();
+ interface3Listener.notificationsReceived.clear();
+
+ // deployment2 send a notification...
+ assertEquals(0, interface1Listener.notificationsReceived.size());
+ assertEquals(0, interface2Listener.notificationsReceived.size());
+ assertEquals(0, interface3Listener.notificationsReceived.size());
+ interface2.sendNotification(notification);
+ Thread.sleep(100);
+ assertEquals(1, interface1Listener.notificationsReceived.size());
+ assertEquals(0, interface2Listener.notificationsReceived.size());
+ assertEquals(1, interface3Listener.notificationsReceived.size());
+ interface1Listener.notificationsReceived.clear();
+ interface3Listener.notificationsReceived.clear();
+
+ // deployment3 send a notification...
+ assertEquals(0, interface1Listener.notificationsReceived.size());
+ assertEquals(0, interface2Listener.notificationsReceived.size());
+ assertEquals(0, interface3Listener.notificationsReceived.size());
+ interface3.sendNotification(notification);
+ Thread.sleep(100);
+ assertEquals(1, interface1Listener.notificationsReceived.size());
+ assertEquals(1, interface2Listener.notificationsReceived.size());
+ assertEquals(0, interface3Listener.notificationsReceived.size());
+ interface1Listener.notificationsReceived.clear();
+ interface3Listener.notificationsReceived.clear();
+ }
+ finally
+ {
+ interface3.disconnect();
+ }
+ }
+ finally
+ {
+ interface2.disconnect();
+ }
+ }
+ finally
+ {
+ interface1.disconnect();
+ }
+ }
+ }.run();
+ }
+
+ private JMSBus createBusInterfaceInstance(String name) throws IOException, DeploymentException, InterruptedException
+ {
+ JMSBus busInterface1 = new JMSBus();
+ ApplicationProperties properties = ApplicationProperties.loadProperties(getClass().getResourceAsStream("jmsbus.properties"));
+
+ busInterface1.setDeploymentName(name);
+ busInterface1.setDeploymentId(name); // using the name as the ID for testing
+ busInterface1.setProperties(properties);
+ try
+ {
+ busInterface1.connect();
+ }
+ catch (org.jboss.esb.api.routing.RoutingException e)
+ {
+ e.printStackTrace();
+ }
+
+ Thread.sleep(500);
+
+ return busInterface1;
+ }
+
+ private class TestBusMessageListener implements BusMessageListener
+ {
+ private List<BusMessage> messagesReceived = new ArrayList<BusMessage>();
+ public void onMessage(BusMessage busMessage)
+ {
+ messagesReceived.add(busMessage);
+ }
+ }
+
+ private class TestNotificationListener implements BusNotificationListener
+ {
+ private List<AbstractNotification> notificationsReceived = new ArrayList<AbstractNotification>();
+
+ public void onNotification(AbstractNotification notification) throws RoutingException
+ {
+ notificationsReceived.add(notification);
+ }
+ }
+}
Property changes on: labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/bus/JMSBus_Standalone_Test.java
___________________________________________________________________
Name: svn:eol-style
+ native
Copied: labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/bus/jmsbus.properties (from rev 24031, labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/bus/jms/jmsbus.properties)
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/bus/jmsbus.properties (rev 0)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/bus/jmsbus.properties 2008-11-21 14:05:34 UTC (rev 24033)
@@ -0,0 +1,18 @@
+###########################################################################################
+# Using ActiveMQ because it's so easy to embed. Apparently JBM v2.0 will also be
+# easy to embed. We might be able to switch to that then!!
+##########################################################################################
+
+# JNDI Settings...
+# NOTE: The JNDI settings in this config file must be the same as those
+# set in the JMSTestRunner class!!
+java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
+java.naming.provider.url=tcp://localhost:61717
+
+# Bus Queues and Topics...
+deployment.coordintation.topic=jbossesb.deployment.coordintation.topic
+deployment.bus.queue=jbossesb.jms.bus
+
+# ActiveMQ Queue and Topic deployments...
+topic.jbossesb.deployment.coordintation.topic=jbossesb.deployment.coordintation.topic
+queue.jbossesb.jms.bus=jbossesb.jms.bus
Property changes on: labs/jbossesb/workspace/skeagh/routing/jms/src/test/java/org/jboss/esb/jms/bus/jmsbus.properties
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Name: svn:mergeinfo
+
Name: svn:eol-style
+ native
Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/jbossesb-default.properties
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/jbossesb-default.properties 2008-11-21 13:47:36 UTC (rev 24032)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/jbossesb-default.properties 2008-11-21 14:05:34 UTC (rev 24033)
@@ -13,7 +13,7 @@
coordinator.heartbeat.frequency=5000
# Buses to be deployed...
-bus.jms=org.jboss.esb.federate.bus.jms.JMSBus
+bus.jms=org.jboss.esb.jms.bus.JMSBus
# Pre-installed deployment units...
unit.deadletter=deadletter.xml
\ No newline at end of file
Deleted: labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/bus/jms/JMSBus_Standalone_Test.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/bus/jms/JMSBus_Standalone_Test.java 2008-11-21 13:47:36 UTC (rev 24032)
+++ labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/bus/jms/JMSBus_Standalone_Test.java 2008-11-21 14:05:34 UTC (rev 24033)
@@ -1,235 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, Red Hat Middleware LLC, and others contributors as indicated
- * by the @authors tag. All rights reserved.
- * See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- * This copyrighted material is made available to anyone wishing to use,
- * modify, copy, or redistribute it subject to the terms and conditions
- * of the GNU Lesser General Public License, v. 2.1.
- * This program is distributed in the hope that it will be useful, but WITHOUT A
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
- * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
- * You should have received a copy of the GNU Lesser General Public License,
- * v.2.1 along with this distribution; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
- * MA 02110-1301, USA.
- *
- * (C) 2005-2008, JBoss Inc.
- */
-package org.jboss.esb.federate.bus.jms;
-
-import junit.framework.TestCase;
-import org.jboss.esb.deploy.DeploymentException;
-import org.jboss.esb.api.bus.BusMessage;
-import org.jboss.esb.api.bus.BusMessageListener;
-import org.jboss.esb.api.bus.AbstractNotification;
-import org.jboss.esb.api.routing.RoutingException;
-import org.jboss.esb.api.bus.BusNotificationListener;
-import org.jboss.esb.federate.notify.DeploymentHeartbeatNotification;
-import org.jboss.esb.test.JmsTestRunner;
-import org.jboss.esb.properties.ApplicationProperties;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- */
-public class JMSBus_Standalone_Test extends TestCase
-{
- public void test_send_message() throws Exception
- {
- new JmsTestRunner()
- {
- @Override
- public void test() throws Exception
- {
- JMSBus interface1 = createBusInterfaceInstance("deployment1");
- TestBusMessageListener interface1Listener = new TestBusMessageListener();
- interface1.setMessageListener(interface1Listener);
- try
- {
- JMSBus interface2 = createBusInterfaceInstance("deployment2");
- TestBusMessageListener interface2Listener = new TestBusMessageListener();
- interface2.setMessageListener(interface2Listener);
- try
- {
- JMSBus interface3 = createBusInterfaceInstance("deployment3");
- TestBusMessageListener interface3Listener = new TestBusMessageListener();
- interface3.setMessageListener(interface3Listener);
- try
- {
- BusMessage message = new BusMessage();
-
- // Send a message from deployment1 to deployment2
- assertEquals(0, interface1Listener.messagesReceived.size());
- assertEquals(0, interface2Listener.messagesReceived.size());
- assertEquals(0, interface3Listener.messagesReceived.size());
- interface1.sendMessage(message, "deployment2");
- Thread.sleep(100);
- assertEquals(0, interface1Listener.messagesReceived.size());
- assertEquals(1, interface2Listener.messagesReceived.size());
- assertEquals(0, interface3Listener.messagesReceived.size());
- interface2Listener.messagesReceived.clear();
-
- // Send a message from deployment2 to deployment3
- assertEquals(0, interface1Listener.messagesReceived.size());
- assertEquals(0, interface2Listener.messagesReceived.size());
- assertEquals(0, interface3Listener.messagesReceived.size());
- interface2.sendMessage(message, "deployment3");
- Thread.sleep(100);
- assertEquals(0, interface1Listener.messagesReceived.size());
- assertEquals(0, interface2Listener.messagesReceived.size());
- assertEquals(1, interface3Listener.messagesReceived.size());
- interface3Listener.messagesReceived.clear();
-
- // Send a message from deployment3 to deployment1
- assertEquals(0, interface1Listener.messagesReceived.size());
- assertEquals(0, interface2Listener.messagesReceived.size());
- assertEquals(0, interface3Listener.messagesReceived.size());
- interface3.sendMessage(message, "deployment1");
- Thread.sleep(100);
- assertEquals(1, interface1Listener.messagesReceived.size());
- assertEquals(0, interface2Listener.messagesReceived.size());
- assertEquals(0, interface3Listener.messagesReceived.size());
- interface1Listener.messagesReceived.clear();
- }
- finally
- {
- interface3.disconnect();
- }
- }
- finally
- {
- interface2.disconnect();
- }
- }
- finally
- {
- interface1.disconnect();
- }
- }
- }.run();
- }
-
- public void test_send_notification() throws Exception
- {
- new JmsTestRunner()
- {
- @Override
- public void test() throws Exception
- {
- JMSBus interface1 = createBusInterfaceInstance("deployment1");
- TestNotificationListener interface1Listener = new TestNotificationListener();
- interface1.setNotificationListener(interface1Listener);
- try
- {
- JMSBus interface2 = createBusInterfaceInstance("deployment2");
- TestNotificationListener interface2Listener = new TestNotificationListener();
- interface2.setNotificationListener(interface2Listener);
- try
- {
- JMSBus interface3 = createBusInterfaceInstance("deployment3");
- TestNotificationListener interface3Listener = new TestNotificationListener();
- interface3.setNotificationListener(interface3Listener);
- try
- {
- DeploymentHeartbeatNotification notification = new DeploymentHeartbeatNotification();
-
- // deployment1 send a notification...
- assertEquals(0, interface1Listener.notificationsReceived.size());
- assertEquals(0, interface2Listener.notificationsReceived.size());
- assertEquals(0, interface3Listener.notificationsReceived.size());
- interface1.sendNotification(notification);
- Thread.sleep(100);
- assertEquals(0, interface1Listener.notificationsReceived.size());
- assertEquals(1, interface2Listener.notificationsReceived.size());
- assertEquals(1, interface3Listener.notificationsReceived.size());
- interface2Listener.notificationsReceived.clear();
- interface3Listener.notificationsReceived.clear();
-
- // deployment2 send a notification...
- assertEquals(0, interface1Listener.notificationsReceived.size());
- assertEquals(0, interface2Listener.notificationsReceived.size());
- assertEquals(0, interface3Listener.notificationsReceived.size());
- interface2.sendNotification(notification);
- Thread.sleep(100);
- assertEquals(1, interface1Listener.notificationsReceived.size());
- assertEquals(0, interface2Listener.notificationsReceived.size());
- assertEquals(1, interface3Listener.notificationsReceived.size());
- interface1Listener.notificationsReceived.clear();
- interface3Listener.notificationsReceived.clear();
-
- // deployment3 send a notification...
- assertEquals(0, interface1Listener.notificationsReceived.size());
- assertEquals(0, interface2Listener.notificationsReceived.size());
- assertEquals(0, interface3Listener.notificationsReceived.size());
- interface3.sendNotification(notification);
- Thread.sleep(100);
- assertEquals(1, interface1Listener.notificationsReceived.size());
- assertEquals(1, interface2Listener.notificationsReceived.size());
- assertEquals(0, interface3Listener.notificationsReceived.size());
- interface1Listener.notificationsReceived.clear();
- interface3Listener.notificationsReceived.clear();
- }
- finally
- {
- interface3.disconnect();
- }
- }
- finally
- {
- interface2.disconnect();
- }
- }
- finally
- {
- interface1.disconnect();
- }
- }
- }.run();
- }
-
- private JMSBus createBusInterfaceInstance(String name) throws IOException, DeploymentException, InterruptedException
- {
- JMSBus busInterface1 = new JMSBus();
- ApplicationProperties properties = ApplicationProperties.loadProperties(getClass().getResourceAsStream("jmsbus.properties"));
-
- busInterface1.setDeploymentName(name);
- busInterface1.setDeploymentId(name); // using the name as the ID for testing
- busInterface1.setProperties(properties);
- try
- {
- busInterface1.connect();
- }
- catch (org.jboss.esb.api.routing.RoutingException e)
- {
- e.printStackTrace();
- }
-
- Thread.sleep(500);
-
- return busInterface1;
- }
-
- private class TestBusMessageListener implements BusMessageListener
- {
- private List<BusMessage> messagesReceived = new ArrayList<BusMessage>();
- public void onMessage(BusMessage busMessage)
- {
- messagesReceived.add(busMessage);
- }
- }
-
- private class TestNotificationListener implements BusNotificationListener
- {
- private List<AbstractNotification> notificationsReceived = new ArrayList<AbstractNotification>();
-
- public void onNotification(AbstractNotification notification) throws RoutingException
- {
- notificationsReceived.add(notification);
- }
- }
-}
Deleted: labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/bus/jms/jmsbus.properties
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/bus/jms/jmsbus.properties 2008-11-21 13:47:36 UTC (rev 24032)
+++ labs/jbossesb/workspace/skeagh/runtime/src/test/java/org/jboss/esb/federate/bus/jms/jmsbus.properties 2008-11-21 14:05:34 UTC (rev 24033)
@@ -1,18 +0,0 @@
-###########################################################################################
-# Using ActiveMQ because it's so easy to embed. Apparently JBM v2.0 will also be
-# easy to embed. We might be able to switch to that then!!
-##########################################################################################
-
-# JNDI Settings...
-# NOTE: The JNDI settings in this config file must be the same as those
-# set in the JMSTestRunner class!!
-java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
-java.naming.provider.url=tcp://localhost:61717
-
-# Bus Queues and Topics...
-deployment.coordintation.topic=jbossesb.deployment.coordintation.topic
-deployment.bus.queue=jbossesb.jms.bus
-
-# ActiveMQ Queue and Topic deployments...
-topic.jbossesb.deployment.coordintation.topic=jbossesb.deployment.coordintation.topic
-queue.jbossesb.jms.bus=jbossesb.jms.bus
More information about the jboss-svn-commits
mailing list