[jboss-svn-commits] JBL Code SVN: r22770 - in labs/jbossesb/workspace/skeagh: commons/src/main/java/org/jboss/esb/jms and 13 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Sep 15 05:07:42 EDT 2008
Author: tfennelly
Date: 2008-09-15 05:07:42 -0400 (Mon, 15 Sep 2008)
New Revision: 22770
Added:
labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessage.java
labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessageListener.java
labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/default.properties
labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/default.properties
Removed:
labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/main.properties
labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/main.properties
Modified:
labs/jbossesb/workspace/skeagh/api/service/src/main/java/org/jboss/esb/message/Message.java
labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageListener.java
labs/jbossesb/workspace/skeagh/commons/src/test/java/org/jboss/esb/jms/MessageSendAndListenTest.java
labs/jbossesb/workspace/skeagh/routing/jms/src/test/resources/META-INF/jbossesb/busconfig/jms/default.properties
labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/deploy/DeploymentRuntime.java
labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/deploy/config/PropertiesUtil.java
labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java
labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/Bus.java
labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/jms/JMSBus.java
labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/jms/default.properties
labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/jms/default.properties
Log:
More Bus work.
Modified: labs/jbossesb/workspace/skeagh/api/service/src/main/java/org/jboss/esb/message/Message.java
===================================================================
--- labs/jbossesb/workspace/skeagh/api/service/src/main/java/org/jboss/esb/message/Message.java 2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/api/service/src/main/java/org/jboss/esb/message/Message.java 2008-09-15 09:07:42 UTC (rev 22770)
@@ -21,6 +21,7 @@
import java.util.LinkedHashMap;
import java.util.Map;
+import java.io.Serializable;
/**
* JBoss ESB Message.
@@ -33,15 +34,21 @@
* @author <a href="mailto:dbevenius at redhat.com">Daniel Bevenius</a>
* @author <a href="mailto:tom.fennelly at jboss.com">Tom Fennelly</a>
*/
-public class Message
+public class Message implements Serializable
{
/**
* Primary Payload.
+ * <p/>
+ * Don't change this to be Serializable. The Message only needs to be
+ * Serializable if it needs to go onto a Bus, which is not always the case.
*/
private Object payload;
/**
* Attachments.
+ * <p/>
+ * Don't change these to be Serializable. The Message only needs to be
+ * Serializable if it needs to go onto a Bus, which is not always the case.
*/
private Map<String, Object> attachments = new LinkedHashMap<String, Object>();
@@ -58,6 +65,10 @@
*/
public Message(final Object payload)
{
+ /*
+ * Don't change this to be Serializable. The Message only needs to be
+ * Serializable if it needs to go onto a Bus, which is not always the case.
+ */
this.payload = payload;
}
@@ -68,6 +79,10 @@
*/
public final Object getPayload()
{
+ /*
+ * Don't change this to be Serializable. The Message only needs to be
+ * Serializable if it needs to go onto a Bus, which is not always the case.
+ */
return payload;
}
@@ -78,6 +93,10 @@
*/
public final void setPayload(final Object payload)
{
+ /*
+ * Don't change this to be Serializable. The Message only needs to be
+ * Serializable if it needs to go onto a Bus, which is not always the case.
+ */
this.payload = payload;
}
@@ -88,6 +107,10 @@
*/
public final Map<String, Object> getAttachments()
{
+ /*
+ * Don't change these to be Serializable. The Message only needs to be
+ * Serializable if it needs to go onto a Bus, which is not always the case.
+ */
return attachments;
}
@@ -98,6 +121,10 @@
*/
public final void setAttachments(final Map<String, Object> attachments)
{
+ /*
+ * Don't change these to be Serializable. The Message only needs to be
+ * Serializable if it needs to go onto a Bus, which is not always the case.
+ */
this.attachments = attachments;
}
}
Modified: labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageListener.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageListener.java 2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/commons/src/main/java/org/jboss/esb/jms/AbstractMessageListener.java 2008-09-15 09:07:42 UTC (rev 22770)
@@ -98,15 +98,10 @@
// Bind "this" listener to the destination...
if (getDestination() instanceof Topic)
{
- if (messageSelector == null)
- {
- messageConsumer = ((TopicSession) getSession()).createSubscriber((Topic) getDestination());
- }
- else
- {
- boolean noLocal = !properties.getProperty(NO_LOCAL, "true").equals("false");
- messageConsumer = ((TopicSession) getSession()).createSubscriber((Topic) getDestination(), messageSelector, noLocal);
- }
+ boolean noLocal = !properties.getProperty(NO_LOCAL, "true").equals("false");
+
+ // A null 'messageSelector' is valid i.e. no selector on the consumer. 'noLocal' is on by default...
+ messageConsumer = ((TopicSession) getSession()).createSubscriber((Topic) getDestination(), messageSelector, noLocal);
messageConsumer.setMessageListener(this);
}
else
Modified: labs/jbossesb/workspace/skeagh/commons/src/test/java/org/jboss/esb/jms/MessageSendAndListenTest.java
===================================================================
--- labs/jbossesb/workspace/skeagh/commons/src/test/java/org/jboss/esb/jms/MessageSendAndListenTest.java 2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/commons/src/test/java/org/jboss/esb/jms/MessageSendAndListenTest.java 2008-09-15 09:07:42 UTC (rev 22770)
@@ -59,9 +59,11 @@
public void test() throws Exception
{
Properties jndiProperties = getJndiProperties();
- JMSSession jmsSession = new JMSSession(destType, jndiProperties);
+ JMSSession jmsSession1 = new JMSSession(destType, jndiProperties);
+ JMSSession jmsSession2 = new JMSSession(destType, jndiProperties);
- jmsSession.connect();
+ jmsSession1.connect();
+ jmsSession2.connect();
try
{
if(destType == Topic.class) {
@@ -70,12 +72,12 @@
jndiProperties.setProperty("queue." + destName, destName);
}
- MockMessageListener<TextMessage> listener = new MockMessageListener<TextMessage>(destName, jmsSession, null);
+ MockMessageListener<TextMessage> listener = new MockMessageListener<TextMessage>(destName, jmsSession1, null);
listener.connect();
assertTrue(destType.isAssignableFrom(listener.getDestination().getClass()));
try {
- MessageSender sender = new MessageSender(destName, jmsSession);
+ MessageSender sender = new MessageSender(destName, jmsSession2);
List<String> messagesSent = Arrays.asList(new String[] {"message 1", "message 2", "message 3"});
sender.connect();
@@ -98,7 +100,8 @@
listener.close();
}
} finally {
- jmsSession.close();
+ jmsSession1.close();
+ jmsSession2.close();
}
}
}.run();
@@ -110,20 +113,21 @@
public void test() throws Exception
{
Properties jndiProperties = getJndiProperties();
- JMSSession jmsSession = new JMSSession(destType, jndiProperties);
+ JMSSession jmsSession1 = new JMSSession(destType, jndiProperties);
+ JMSSession jmsSession2 = new JMSSession(destType, jndiProperties);
- jmsSession.connect();
+ jmsSession1.connect();
+ jmsSession2.connect();
try
{
if(destType == Topic.class) {
jndiProperties.setProperty("topic." + destName, destName);
- jndiProperties.setProperty(AbstractMessageListener.NO_LOCAL, "false");
} else {
jndiProperties.setProperty("queue." + destName, destName);
}
- MockMessageListener<TextMessage> service1 = new MockMessageListener<TextMessage>(destName, jmsSession, "serviceId='service1'");
- MockMessageListener<TextMessage> service2 = new MockMessageListener<TextMessage>(destName, jmsSession, "serviceId='service2'");
+ MockMessageListener<TextMessage> service1 = new MockMessageListener<TextMessage>(destName, jmsSession1, "serviceId='service1'");
+ MockMessageListener<TextMessage> service2 = new MockMessageListener<TextMessage>(destName, jmsSession1, "serviceId='service2'");
List<String> service1Messages = Arrays.asList(new String[] {"message 1", "message 2", "message 3"});
List<String> service2Messages = Arrays.asList(new String[] {"message 4", "message 5", "message 6"});
@@ -131,7 +135,7 @@
try {
service2.connect();
try {
- MessageSender sender = new MessageSender(destName, jmsSession);
+ MessageSender sender = new MessageSender(destName, jmsSession2);
sender.connect();
try {
@@ -165,7 +169,8 @@
service1.close();
}
} finally {
- jmsSession.close();
+ jmsSession1.close();
+ jmsSession2.close();
}
}
}.run();
Modified: labs/jbossesb/workspace/skeagh/routing/jms/src/test/resources/META-INF/jbossesb/busconfig/jms/default.properties
===================================================================
--- labs/jbossesb/workspace/skeagh/routing/jms/src/test/resources/META-INF/jbossesb/busconfig/jms/default.properties 2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/routing/jms/src/test/resources/META-INF/jbossesb/busconfig/jms/default.properties 2008-09-15 09:07:42 UTC (rev 22770)
@@ -13,6 +13,8 @@
# Bus Queues and Topics...
deployment.coordintation.topic=jbossesb.deployment.coordintation.topic
+deployment.bus.queue=jbossesb.jms.bus
# ActiveMQ Queue and Topic deployments...
topic.jbossesb.deployment.coordintation.topic=jbossesb.deployment.coordintation.topic
+queue.jbossesb.jms.bus=jbossesb.jms.bus
Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/deploy/DeploymentRuntime.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/deploy/DeploymentRuntime.java 2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/deploy/DeploymentRuntime.java 2008-09-15 09:07:42 UTC (rev 22770)
@@ -456,7 +456,7 @@
try
{
SimpleSchedule heartbeatSchedule = new SimpleSchedule();
- ApplicationProperties deploymentProperties = PropertiesUtil.getDeploymentConfig();
+ ApplicationProperties deploymentProperties = PropertiesUtil.getDeploymentConfig(deploymentName);
heartbeatSchedule.setFrequency(deploymentProperties.getLongProperty(DeploymentCoordinator.COORDINATOR_HEARTBEAT_FREQUENCY_KEY, DeploymentCoordinator.COORDINATOR_HEARTBEAT_DEFAULT_FREQUENCY));
heartbeatSchedule.setExecCount(-1);
Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/deploy/config/PropertiesUtil.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/deploy/config/PropertiesUtil.java 2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/deploy/config/PropertiesUtil.java 2008-09-15 09:07:42 UTC (rev 22770)
@@ -21,14 +21,15 @@
import org.apache.log4j.Logger;
import org.jboss.esb.classpath.ClassUtil;
+import org.jboss.esb.deploy.DeploymentException;
import org.jboss.esb.federate.DeploymentCoordinator;
import org.jboss.esb.properties.ApplicationProperties;
-import org.jboss.esb.deploy.DeploymentException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
/**
@@ -53,7 +54,7 @@
/**
* Classpath dir name for ESB configs.
*/
- private static final String BUSCONFIG_CP_DIR = "/META-INF/jbossesb/" + BUSCONFIG_FILE_DIR;
+ private static final String ESBCONFIG_CP_DIR = "/META-INF/jbossesb/";
/**
* Private constructor.
@@ -79,39 +80,74 @@
* @param protocol The protocol name e.g. "jms".
* @param deploymentName The deployment name e.g. "Order_Service".
* @return The bus configuration as a {@link java.util.Properties} instance.
- * @throws java.io.IOException Unable to read configuration.
+ * @throws DeploymentException Unable to read configuration.
*/
- public static ApplicationProperties getBusConfig(final String protocol, final String deploymentName) throws IOException
+ public static ApplicationProperties getBusConfig(final String protocol, final String deploymentName) throws DeploymentException
{
- String firstCheckPath = protocol + "/" + URLEncoder.encode(deploymentName, "UTF-8") + ".properties";
- String secondCheckPath = protocol + "/" + "default.properties";
+ String firstCheckPath = BUSCONFIG_FILE_DIR + protocol + "/" + urlEncode(deploymentName) + ".properties";
+ String secondCheckPath = BUSCONFIG_FILE_DIR + protocol + "/" + "default.properties";
ApplicationProperties properties;
// First check for a deployment specific config ...
- properties = getBusConfig(firstCheckPath);
+ properties = getConfig(firstCheckPath);
if (properties != null)
{
return properties;
}
// Then check for a default ...
- properties = getBusConfig(secondCheckPath);
+ properties = getConfig(secondCheckPath);
if (properties != null)
{
return properties;
}
logger.debug("Unable to find JBoss ESB Bus configuration for protocol '" + protocol + "', deployment '" + deploymentName + "'. Tried:\n"
- + "\t1. File: " + BUSCONFIG_FILE_DIR + firstCheckPath + "\n"
- + "\t2. Classpath: " + BUSCONFIG_CP_DIR + firstCheckPath + "\n"
- + "\t3. File: " + BUSCONFIG_FILE_DIR + secondCheckPath + "\n"
- + "\t4. Classpath: " + BUSCONFIG_CP_DIR + secondCheckPath);
+ + "\t1. File: " + firstCheckPath + "\n"
+ + "\t2. Classpath: " + ESBCONFIG_CP_DIR + firstCheckPath + "\n"
+ + "\t3. File: " + secondCheckPath + "\n"
+ + "\t4. Classpath: " + ESBCONFIG_CP_DIR + secondCheckPath);
return null;
}
/**
+ * Get the main Deployment properties.
+ *
+ * @param deploymentName The deployment name.
+ * @return The main Deployment properties.
+ * @throws DeploymentException Error reading properties.
+ */
+ public static ApplicationProperties getDeploymentConfig(final String deploymentName) throws DeploymentException
+ {
+ String firstCheckPath = urlEncode(deploymentName) + ".properties";
+ String secondCheckPath = "default.properties";
+
+ ApplicationProperties properties;
+
+ // First check for a deployment specific config ...
+ properties = getConfig(firstCheckPath);
+ if (properties != null)
+ {
+ return properties;
+ }
+
+ // Then check for a default ...
+ properties = getConfig(secondCheckPath);
+ if (properties != null)
+ {
+ return properties;
+ }
+
+ throw new DeploymentException("Unable to find JBoss ESB Deployment configuration for deployment '" + deploymentName + "'. Tried:\n"
+ + "\t1. File: " + firstCheckPath + "\n"
+ + "\t2. Classpath: " + ESBCONFIG_CP_DIR + firstCheckPath + "\n"
+ + "\t3. File: " + secondCheckPath + "\n"
+ + "\t4. Classpath: " + ESBCONFIG_CP_DIR + secondCheckPath);
+ }
+
+ /**
* Get the specified bus configuration from the bus configurations directory.
* <p/>
* Checks in the following order:
@@ -122,45 +158,60 @@
*
* @param configPath The configuration file path.
* @return The bus configuration as a {@link java.util.Properties} instance.
- * @throws java.io.IOException Unable to read configuration.
+ * @throws DeploymentException Unable to read configuration.
*/
- public static ApplicationProperties getBusConfig(final String configPath) throws IOException
+ public static ApplicationProperties getConfig(final String configPath) throws DeploymentException
{
- String firstCheckPath = BUSCONFIG_FILE_DIR + configPath;
- String secondCheckPath = BUSCONFIG_CP_DIR + configPath;
+ String fileCheckPath = configPath;
+ String cpCheckPath = ESBCONFIG_CP_DIR + configPath;
InputStream configStream;
// 1st: Check for a deployment specific config on the local file system...
- File checkFile = new File(firstCheckPath);
+ File checkFile = new File(fileCheckPath);
if (checkFile.exists() && !checkFile.isDirectory())
{
- return ApplicationProperties.loadProperties(new FileInputStream(checkFile));
+ try
+ {
+ return ApplicationProperties.loadProperties(new FileInputStream(checkFile));
+ }
+ catch (IOException e)
+ {
+ throw new DeploymentException("Error reading properties file '" + checkFile.getAbsolutePath() + "'.", e);
+ }
}
// 2nd: Check for a deployment specific config on the classpath...
- configStream = ClassUtil.getResourceAsStream(secondCheckPath, DeploymentCoordinator.class);
+ configStream = ClassUtil.getResourceAsStream(cpCheckPath, DeploymentCoordinator.class);
if (configStream != null)
{
- return ApplicationProperties.loadProperties(configStream);
+ try
+ {
+ return ApplicationProperties.loadProperties(configStream);
+ }
+ catch (IOException e)
+ {
+ throw new DeploymentException("Error reading properties classpath file '" + cpCheckPath + "'.", e);
+ }
}
return null;
}
/**
- * Get the main Deployment properties.
- * @return The main Deployment properties.
- * @throws DeploymentException Error reading properties.
+ * URL encode the supplied string.
+ *
+ * @param string The String.
+ * @return The URL encoded string.
*/
- public static ApplicationProperties getDeploymentConfig() throws DeploymentException
+ private static String urlEncode(final String string)
{
try
{
- return getBusConfig("main.properties");
+ return URLEncoder.encode(string, "UTF-8");
}
- catch (IOException e)
+ catch (UnsupportedEncodingException e)
{
- throw new DeploymentException("Error reading main.properties.", e);
+ throw new RuntimeException("Unexpected runtime exception. Character encoding 'UTF-8' not supported on runtime VM.");
}
}
}
Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java 2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/DeploymentCoordinator.java 2008-09-15 09:07:42 UTC (rev 22770)
@@ -47,7 +47,7 @@
* Deployment Coordinator.
* <p/>
* Manages relationships between local deployments e.g. deployed in the same
- * container, on the same machine etc.
+ * container, on the same machine etc. Deploys the local Bus interface for the local deployment.
*
* @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
*/
@@ -58,14 +58,6 @@
*/
public static final String DEPLOYMENT_COORDINTATION_SCHEDULE_KEY = "deployment.coordintation.schedule";
/**
- * Deployment coordination topic property key name.
- */
- public static final String DEPLOYMENT_COORDINTATION_TOPIC_KEY = "deployment.coordintation.topic";
- /**
- * Default deployment coordination topic name.
- */
- public static final String DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME = "jbossesb.deployment.coordintation.topic";
- /**
* Heartbeat frequency config key.
*/
public static final String COORDINATOR_HEARTBEAT_FREQUENCY_KEY = "coordinator.heartbeat.frequency";
@@ -126,7 +118,7 @@
@Initialize
public final void initialize() throws DeploymentException
{
- ApplicationProperties deploymentProperties = PropertiesUtil.getDeploymentConfig();
+ ApplicationProperties deploymentProperties = PropertiesUtil.getDeploymentConfig(runtime.getDeploymentName());
// Set the monitoring timeout - 4 heartbeats...
monitorTimeout = (deploymentProperties.getLongProperty(COORDINATOR_HEARTBEAT_FREQUENCY_KEY, COORDINATOR_HEARTBEAT_DEFAULT_FREQUENCY) * 4);
Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/Bus.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/Bus.java 2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/Bus.java 2008-09-15 09:07:42 UTC (rev 22770)
@@ -23,10 +23,7 @@
import org.jboss.esb.deploy.DeploymentException;
import org.jboss.esb.federate.notify.AbstractDeploymentNotification;
import org.jboss.esb.federate.notify.NotificationListener;
-import org.jboss.esb.message.Message;
-import org.jboss.esb.routing.MessageDispatcher;
import org.jboss.esb.routing.RoutingException;
-import org.jboss.esb.service.ServiceName;
/**
* JBoss ESB Bus definition.
@@ -54,12 +51,14 @@
/**
* Connect the bus.
+ *
* @throws DeploymentException Connection exception.
*/
void connect() throws DeploymentException;
/**
* Is the bus connected.
+ *
* @return True if the bus is connected, otherwise false.
*/
boolean isConnected();
@@ -70,24 +69,20 @@
void close();
/**
- * Send the supplied message to the specified service on the specified
- * deployment via the bus.
+ * Send the supplied message to the specified deployment via the bus.
*
- * @param message The message.
- * @param service The target service.
- * @param serviceDeploymentId The deployment ID on which the Service is located.
+ * @param message The message.
+ * @param targetDeploymentId The target deployment ID.
* @throws RoutingException Error sending message onto the Bus.
*/
- void send(Message message, ServiceName service, String serviceDeploymentId) throws RoutingException;
+ void send(BusMessage message, String targetDeploymentId) throws RoutingException;
/**
- * Add a message dispatcher for receiving messages from the bus for
- * the specified local service.
+ * Set the bus message listener on the {@link Bus} implementation.
*
- * @param dispatcher The message dispatcher.
- * @param service The target service.
+ * @param listener The message listener.
*/
- void addDispatcher(MessageDispatcher dispatcher, ServiceName service);
+ void setBusMessageListener(BusMessageListener listener);
/**
* Set the bus notification listener.
@@ -114,6 +109,7 @@
{
/**
* New Bus instance.
+ *
* @param className Bus class name.
* @return Bus instance.
* @throws DeploymentException Error creating Bus instance.
Added: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessage.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessage.java (rev 0)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessage.java 2008-09-15 09:07:42 UTC (rev 22770)
@@ -0,0 +1,104 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright XXXX, Red Hat Middleware LLC, and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2008, JBoss Inc.
+ */
+package org.jboss.esb.federate.bus;
+
+import org.jboss.esb.context.AddressingContext;
+import org.jboss.esb.context.InvocationContext;
+import org.jboss.esb.message.Message;
+
+import java.io.Serializable;
+
+/**
+ * ESB Bus Message.
+ * <p/>
+ * This class encapsulates the ESB {@link org.jboss.esb.message.Message} for transportation
+ * on a Bus.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public final class BusMessage implements Serializable
+{
+ /**
+ * Invocation Context.
+ */
+ private InvocationContext invocationContext;
+ /**
+ * Addressing Context.
+ */
+ private AddressingContext addressingContext;
+ /**
+ * Message.
+ */
+ private Message message;
+
+ /**
+ * Get the message Invocation Context.
+ * @return The message Invocation Context.
+ */
+ public InvocationContext getInvocationContext()
+ {
+ return invocationContext;
+ }
+
+ /**
+ * Set the message Invocation Context.
+ * @param invocationContext The message Invocation Context.
+ */
+ public void setInvocationContext(final InvocationContext invocationContext)
+ {
+ this.invocationContext = invocationContext;
+ }
+
+ /**
+ * Get the message Addressing Context.
+ * @return The message Addressing Context.
+ */
+ public AddressingContext getAddressingContext()
+ {
+ return addressingContext;
+ }
+
+ /**
+ * Set the message Addressing Context.
+ * @param addressingContext The message Addressing Context.
+ */
+ public void setAddressingContext(final AddressingContext addressingContext)
+ {
+ this.addressingContext = addressingContext;
+ }
+
+ /**
+ * Get the message.
+ * @return The message.
+ */
+ public Message getMessage()
+ {
+ return message;
+ }
+
+ /**
+ * Set the message.
+ * @param message The message.
+ */
+ public void setMessage(final Message message)
+ {
+ this.message = message;
+ }
+}
Property changes on: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessage.java
___________________________________________________________________
Name: svn:eol-style
+ native
Copied: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessageListener.java (from rev 22750, labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/notify/NotificationListener.java)
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessageListener.java (rev 0)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessageListener.java 2008-09-15 09:07:42 UTC (rev 22770)
@@ -0,0 +1,35 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright XXXX, Red Hat Middleware LLC, and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2008, JBoss Inc.
+ */
+package org.jboss.esb.federate.bus;
+
+/**
+ * {@link BusMessage} Listener interface.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public interface BusMessageListener
+{
+ /**
+ * Message handler.
+ *
+ * @param busMessage The bus message.
+ */
+ void onMessage(BusMessage busMessage);
+}
\ No newline at end of file
Property changes on: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/BusMessageListener.java
___________________________________________________________________
Name: svn:mergeinfo
+
Name: svn:eol-style
+ native
Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/jms/JMSBus.java
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/jms/JMSBus.java 2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/java/org/jboss/esb/federate/bus/jms/JMSBus.java 2008-09-15 09:07:42 UTC (rev 22770)
@@ -21,25 +21,23 @@
import org.apache.log4j.Logger;
import org.jboss.esb.deploy.DeploymentException;
-import org.jboss.esb.federate.DeploymentCoordinator;
+import org.jboss.esb.deploy.config.PropertiesUtil;
import org.jboss.esb.federate.bus.Bus;
-import org.jboss.esb.deploy.config.PropertiesUtil;
+import org.jboss.esb.federate.bus.BusMessage;
+import org.jboss.esb.federate.bus.BusMessageListener;
import org.jboss.esb.federate.notify.AbstractDeploymentNotification;
import org.jboss.esb.federate.notify.NotificationListener;
+import org.jboss.esb.jms.AbstractMessageHandler;
import org.jboss.esb.jms.AbstractMessageListener;
import org.jboss.esb.jms.JMSSession;
import org.jboss.esb.jms.MessageSender;
-import org.jboss.esb.message.Message;
import org.jboss.esb.properties.ApplicationProperties;
-import org.jboss.esb.routing.MessageDispatcher;
import org.jboss.esb.routing.RoutingException;
-import org.jboss.esb.service.ServiceName;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Topic;
-import java.io.IOException;
import java.util.Properties;
/**
@@ -56,6 +54,26 @@
*/
private static Logger logger = Logger.getLogger(JMSBus.class);
/**
+ * Deployment coordination topic property key name.
+ */
+ public static final String DEPLOYMENT_COORDINTATION_TOPIC_KEY = "deployment.coordintation.topic";
+ /**
+ * Default deployment coordination topic name.
+ */
+ public static final String DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME = "jbossesb.deployment.coordintation.topic";
+ /**
+ * Deployment JMS Bus Queue name config key.
+ */
+ public static final String BUS_QUEUE_KEY = "deployment.bus.queue";
+ /**
+ * Default Deployment JMS Bus Queue name.
+ */
+ public static final String DEFAULT_BUS_QUEUE_NAME = "jbossesb.jms.bus";
+ /**
+ * Deployment ID key.
+ */
+ public static final String DEPLOYMENT_ID = "jbossesb_deployment_id";
+ /**
* Bus Properties.
*/
private ApplicationProperties busProperties;
@@ -76,17 +94,29 @@
*/
private JMSSession queueSession;
/**
- * Deployment coordination listener.
+ * JMS Deployment coordination listener.
*/
- private DeploymentNotificationListener deploymentNotificationListener;
+ private JMSCoordinationListener jmsCoordinationListener;
/**
* Deployment notification broadcaster.
*/
- private MessageSender notificationBroadcaster;
+ private MessageSender jmsCoordinationSender;
/**
- * Notification listener.
+ * Notification callback.
*/
- private NotificationListener notificationListener;
+ private NotificationListener coordinationCallback;
+ /**
+ * Bus message listener.
+ */
+ private BusMessageListener busMessageCallback;
+ /**
+ * JMS Bus Message Listener.
+ */
+ private JMSQueueMessageListener jmsBusListener;
+ /**
+ * JMS Bus Message Sender.
+ */
+ private MessageSender jmsBusSender;
/**
* Get the Bus configuration properties.
@@ -132,42 +162,51 @@
throw new IllegalStateException("'deploymentId' not set on Bus.");
}
- try
- {
- busProperties = PropertiesUtil.getBusConfig("jms", deploymentName);
- }
- catch (IOException e)
- {
- throw new DeploymentException("Failed to read local bus deployment configuration", e);
- }
+ busProperties = PropertiesUtil.getBusConfig("jms", deploymentName);
if (busProperties != null)
{
- intialiseJMSResources(busProperties);
+ connectJMSSessions(busProperties);
- if (topicSession != null)
+ if (topicSession != null && queueSession != null)
{
try
{
- connectNotificationListener(busProperties);
+ connectCoordinationListener(busProperties);
+ connectCoordintationSender(busProperties);
+ connectBusListener(busProperties);
+ connectBusSender(busProperties);
}
- catch (Throwable t)
+ catch (DeploymentException e)
{
- closeJMSResources();
- throw new DeploymentException("Failed to connect coordination listener", t);
- }
- if (deploymentNotificationListener != null && deploymentNotificationListener.isConnected())
- {
try
{
- connectNotificationBroadcaster(busProperties);
+ if(jmsCoordinationListener == null && jmsCoordinationSender == null && jmsBusListener == null && jmsBusSender == null)
+ {
+ if(!logger.isDebugEnabled())
+ {
+ logger.info("Deployment '" + deploymentName + "' is not being coordinated with any other local deployment. Turn on deug logging for more details.");
+ }
+ else
+ {
+ logger.debug("Deployment '" + deploymentName + "' is not being coordinated with any other local deployment.", e);
+ }
+ }
+ else
+ {
+ throw e;
+ }
}
- catch (Throwable t)
+ finally
{
- closeJMSResources();
- throw new DeploymentException("Unable to initialize Deployment Coordinator.", t);
+ closeJMSSessions();
}
}
+ catch (Throwable t)
+ {
+ closeJMSSessions();
+ throw new DeploymentException("Failed to connect coordination listener", t);
+ }
}
}
}
@@ -177,57 +216,50 @@
*/
public final void close()
{
- if (deploymentNotificationListener != null && deploymentNotificationListener.isConnected())
+ try
{
- try
- {
- deploymentNotificationListener.close();
- }
- catch (Throwable t)
- {
- logger.warn("Error closing deployment coordination listener for deployment '" + deploymentName + "'.", t);
- }
+ closeJMSHandler("Deployment Coordination Listener", jmsCoordinationListener);
+ closeJMSHandler("Deployment Coordination Sender", jmsCoordinationSender);
+ closeJMSHandler("JMS Bus Listener", jmsBusListener);
+ closeJMSHandler("JMS Bus Sender", jmsBusSender);
}
-
- if (notificationBroadcaster != null && notificationBroadcaster.isConnected())
+ finally
{
- try
- {
- notificationBroadcaster.close();
- }
- catch (Throwable t)
- {
- logger.warn("Error closing deployment coordination broadcaster for deployment '" + deploymentName + "'.", t);
- }
+ closeJMSSessions();
}
-
- closeJMSResources();
}
/**
- * Send the supplied message to the specified service on the specified
- * deployment via the bus.
+ * Send the supplied message to the specified deployment via the bus.
*
* @param message The message.
- * @param service The target service.
- * @param serviceDeploymentId The deployment ID on which the Service is located.
+ * @param targetDeploymentId The target deployment ID.
* @throws RoutingException Error sending message onto the Bus.
*/
- public final void send(final Message message, final ServiceName service, final String serviceDeploymentId) throws RoutingException
+ public final void send(final BusMessage message, final String targetDeploymentId) throws RoutingException
{
+ try
+ {
+ ObjectMessage jmsMessage = jmsBusSender.getSession().createObjectMessage(message);
+ jmsMessage.setStringProperty(DEPLOYMENT_ID, targetDeploymentId);
+ jmsBusSender.send(jmsMessage);
+ }
+ catch (JMSException e)
+ {
+ throw new RoutingException("Error sending message to JMS Bus for deployment ID '" + targetDeploymentId + "'.", e);
+ }
}
/**
* Add a message dispatcher for receiving messages from the bus for
* the specified local service.
*
- * @param dispatcher The message dispatcher.
- * @param service The target service.
+ * @param listener The message dispatcher.
*/
- public final void addDispatcher(final MessageDispatcher dispatcher, final ServiceName service)
+ public final void setBusMessageListener(final BusMessageListener listener)
{
-
+ this.busMessageCallback = listener;
}
/**
@@ -237,7 +269,7 @@
*/
public final void setNotificationListener(final NotificationListener listener)
{
- this.notificationListener = listener;
+ this.coordinationCallback = listener;
}
/**
@@ -251,11 +283,11 @@
*/
public final void sendNotification(final AbstractDeploymentNotification notification) throws RoutingException
{
- if (notificationBroadcaster != null)
+ if (jmsCoordinationSender != null)
{
try
{
- notificationBroadcaster.send(notificationBroadcaster.getSession().createObjectMessage(notification));
+ jmsCoordinationSender.send(jmsCoordinationSender.getSession().createObjectMessage(notification));
}
catch (JMSException e)
{
@@ -274,103 +306,98 @@
}
/**
- * Deployment Notification Listener.
+ * Connect the deployment coordination listener.
+ *
+ * @param busProperties Bus configuration properties.
+ * @throws org.jboss.esb.deploy.DeploymentException Failed to connect listener.
*/
- private class DeploymentNotificationListener extends AbstractMessageListener
+ private void connectCoordinationListener(final Properties busProperties) throws DeploymentException
{
- /**
- * Constructor.
- *
- * @param destinationName Destination name.
- * @param jndiProperties JNDI properties.
- */
- protected DeploymentNotificationListener(final String destinationName, final Properties jndiProperties)
+ String coordinationTopicName = busProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
+
+ jmsCoordinationListener = new JMSCoordinationListener(coordinationTopicName, busProperties);
+ try
{
- super(destinationName, topicSession, jndiProperties);
+ jmsCoordinationListener.connect();
}
-
- /**
- * Handle a coordination event from other deployments.
- *
- * @param message Coordination message.
- */
- public final void onMessage(final javax.jms.Message message)
+ catch (JMSException e)
{
- if (message instanceof ObjectMessage)
- {
- try
- {
- Object objectMessage = ((ObjectMessage) message).getObject();
+ jmsCoordinationListener = null;
+ throw new DeploymentException("Deployment '" + deploymentName + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
+ + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
+ + "JNDI properties used: " + busProperties, e);
+ }
+ }
- if (objectMessage instanceof AbstractDeploymentNotification)
- {
- AbstractDeploymentNotification notification = (AbstractDeploymentNotification) objectMessage;
- String messageDeploymentId = notification.getId();
+ /**
+ * Connect the deployment coordination sender.
+ *
+ * @param busProperties Bus configuration properties.
+ * @throws org.jboss.esb.deploy.DeploymentException Failed to connect sender.
+ */
+ private void connectCoordintationSender(final Properties busProperties) throws DeploymentException
+ {
+ String coordinationTopicName = busProperties.getProperty(DEPLOYMENT_COORDINTATION_TOPIC_KEY, DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
- // If it's not a notification from this deployment...
- if (!deploymentId.equals(messageDeploymentId))
- {
- notificationListener.onNotification(notification);
- }
- }
- }
- catch (JMSException e)
- {
- logger.warn("Unable to get Object from JMS ObjectMessage.", e);
- }
- }
+ jmsCoordinationSender = new MessageSender(coordinationTopicName, topicSession);
+ try
+ {
+ jmsCoordinationSender.connect();
}
+ catch (JMSException e)
+ {
+ jmsCoordinationSender = null;
+ throw new DeploymentException("Deployment '" + deploymentName + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
+ + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
+ + "JNDI properties used: " + busProperties, e);
+ }
+ }
- }
-
/**
- * Connect the deployment coordination listener.
+ * Connect the {@link BusMessage} listener.
*
- * @param localBusProperties Bus configuration properties.
+ * @param busProperties Bus configuration properties.
+ * @throws org.jboss.esb.deploy.DeploymentException Failed to connect listener.
*/
- private void connectNotificationListener(final Properties localBusProperties)
+ private void connectBusListener(final Properties busProperties) throws DeploymentException
{
- String coordinationTopicName = localBusProperties.getProperty(DeploymentCoordinator.DEPLOYMENT_COORDINTATION_TOPIC_KEY, DeploymentCoordinator.DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
+ String busQueueName = busProperties.getProperty(BUS_QUEUE_KEY, DEFAULT_BUS_QUEUE_NAME);
- deploymentNotificationListener = new DeploymentNotificationListener(coordinationTopicName, localBusProperties);
+ jmsBusListener = new JMSQueueMessageListener(busQueueName, busProperties);
try
{
- deploymentNotificationListener.connect();
+ jmsBusListener.connect();
}
catch (JMSException e)
{
- deploymentNotificationListener = null;
- logger.info("Deployment '" + deploymentName + "' is not being coordinated with any other local deployment. Turn on deug logging for more details.");
- if (logger.isDebugEnabled())
- {
- logger.debug("Deployment '" + deploymentName + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
- + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
- + "JNDI properties used: " + localBusProperties);
- }
+ jmsBusListener = null;
+ throw new DeploymentException("Deployment '" + deploymentName + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
+ + "to connect to the JMS Bus Queue '" + busQueueName + "'. The JMS Server must be running and this Queue must be deployed. "
+ + "JNDI properties used: " + busProperties, e);
}
}
/**
- * Connect the deployment coordination broadcaster.
+ * Connect the {@link BusMessage} sender.
*
- * @param localBusProperties Bus configuration properties.
- * @throws org.jboss.esb.deploy.DeploymentException Failed to connect deployment coordination broadcaster.
+ * @param busProperties Bus configuration properties.
+ * @throws org.jboss.esb.deploy.DeploymentException Failed to connect sender.
*/
- private void connectNotificationBroadcaster(final Properties localBusProperties) throws DeploymentException
+ private void connectBusSender(final Properties busProperties) throws DeploymentException
{
- String coordinationTopicName = localBusProperties.getProperty(DeploymentCoordinator.DEPLOYMENT_COORDINTATION_TOPIC_KEY, DeploymentCoordinator.DEFAULT_DEPLOYMENT_COORDINTATION_TOPIC_NAME);
+ String busQueueName = busProperties.getProperty(BUS_QUEUE_KEY, DEFAULT_BUS_QUEUE_NAME);
- notificationBroadcaster = new MessageSender(coordinationTopicName, topicSession);
+ jmsBusSender = new MessageSender(busQueueName, queueSession);
try
{
- notificationBroadcaster.connect();
+ jmsBusSender.connect();
}
catch (JMSException e)
{
- notificationBroadcaster = null;
+ jmsBusSender = null;
throw new DeploymentException("Deployment '" + deploymentName + "' is not being coordinated with any other local deployment. A JMSException occured while trying "
- + "to connect to the deployment coordination Topic '" + coordinationTopicName + "'. The JMS Server must be running and this Topic must be deployed. "
- + "JNDI properties used: " + localBusProperties, e);
+ + "to connect to the JMS Bus Queue '" + busQueueName + "'. The JMS Server must be running and this Queue must be deployed. "
+ + "JNDI properties used: " + busProperties, e);
}
}
@@ -379,7 +406,7 @@
*
* @param localBusProperties Local JMS Bus properties.
*/
- private void intialiseJMSResources(final ApplicationProperties localBusProperties)
+ private void connectJMSSessions(final ApplicationProperties localBusProperties)
{
topicSession = new JMSSession(Topic.class, localBusProperties);
try
@@ -388,7 +415,7 @@
}
catch (Throwable t)
{
- closeJMSResources();
+ closeJMSSessions();
logger.debug("Failed to connect shared deployment JMS Topic Session.", t);
return;
}
@@ -399,7 +426,7 @@
}
catch (Throwable t)
{
- closeJMSResources();
+ closeJMSSessions();
logger.debug("Failed to connect shared deployment JMS Queue Session.", t);
}
}
@@ -407,7 +434,7 @@
/**
* Close the JMS Sessions.
*/
- private void closeJMSResources()
+ private void closeJMSSessions()
{
// Close the sessions...
if (topicSession != null)
@@ -427,4 +454,126 @@
queueSession = null;
}
}
+
+ /**
+ * Deployment Notification Listener.
+ */
+ private class JMSCoordinationListener extends AbstractMessageListener
+ {
+
+ /**
+ * Constructor.
+ *
+ * @param destinationName Destination name.
+ * @param jndiProperties JNDI properties.
+ */
+ protected JMSCoordinationListener(final String destinationName, final Properties jndiProperties)
+ {
+ super(destinationName, topicSession, jndiProperties);
+ }
+
+ /**
+ * Handle a coordination event from other deployments.
+ *
+ * @param message Coordination message.
+ */
+ public final void onMessage(final javax.jms.Message message)
+ {
+ if (message instanceof ObjectMessage)
+ {
+ try
+ {
+ Object objectMessage = ((ObjectMessage) message).getObject();
+
+ if (objectMessage instanceof AbstractDeploymentNotification)
+ {
+ coordinationCallback.onNotification((AbstractDeploymentNotification) objectMessage);
+ }
+ }
+ catch (JMSException e)
+ {
+ logger.warn("Unable to get Object from JMS ObjectMessage.", e);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * JMS Bus message lsitener.
+ */
+ private class JMSQueueMessageListener extends AbstractMessageListener
+ {
+
+ /**
+ * Constructor.
+ *
+ * @param destinationName Destination name.
+ * @param jndiProperties JNDI properties.
+ */
+ protected JMSQueueMessageListener(final String destinationName, final Properties jndiProperties)
+ {
+ super(destinationName, queueSession, addDeploymentSelector(jndiProperties, deploymentId));
+ }
+
+ /**
+ * Process a BusMessage for this deployment.
+ *
+ * @param message The bus message.
+ */
+ public final void onMessage(final javax.jms.Message message)
+ {
+ if (message instanceof ObjectMessage)
+ {
+ try
+ {
+ Object objectMessage = ((ObjectMessage) message).getObject();
+
+ if (objectMessage instanceof BusMessage)
+ {
+ busMessageCallback.onMessage((BusMessage) objectMessage);
+ }
+ }
+ catch (JMSException e)
+ {
+ logger.warn("Unable to get Object from JMS ObjectMessage.", e);
+ }
+ }
+ }
+
+
+ }
+
+ /**
+ * Set the deployment ID selector on the specified properties.
+ * @param jmsProperties JMS connection properties.
+ * @param deploymentId Deployment ID.
+ * @return The new properties with the JMS Selector set.
+ */
+ private static Properties addDeploymentSelector(Properties jmsProperties, String deploymentId)
+ {
+ Properties properties = (Properties) jmsProperties.clone();
+ properties.setProperty(AbstractMessageListener.MESSAGE_SELECTOR, DEPLOYMENT_ID + "='" + deploymentId + "'");
+ return properties;
+ }
+
+ /**
+ * Close JMS Handler.
+ * @param name The handler name.
+ * @param handler The handler.
+ */
+ private void closeJMSHandler(final String name, final AbstractMessageHandler handler)
+ {
+ if (handler != null && handler.isConnected())
+ {
+ try
+ {
+ handler.close();
+ }
+ catch (Throwable t)
+ {
+ logger.warn("Error closing " + name + " for deployment '" + deploymentName + "'.", t);
+ }
+ }
+ }
}
Modified: labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/jms/default.properties
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/jms/default.properties 2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/jms/default.properties 2008-09-15 09:07:42 UTC (rev 22770)
@@ -13,3 +13,4 @@
# Bus Queues and Topics...
deployment.coordintation.topic=jbossesb.deployment.coordintation.topic
+deployment.bus.queue=jbossesb.jms.bus
Deleted: labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/main.properties
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/main.properties 2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/main.properties 2008-09-15 09:07:42 UTC (rev 22770)
@@ -1,9 +0,0 @@
-###########################################################################################
-# Default JMS ESB config.
-##########################################################################################
-
-# Coordination settings...
-coordinator.heartbeat.frequency=5000
-
-# Buses to be deployed...
-bus.jms=org.jboss.esb.federate.bus.jms.JMSBus
\ No newline at end of file
Copied: labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/default.properties (from rev 22750, labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/busconfig/main.properties)
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/default.properties (rev 0)
+++ labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/default.properties 2008-09-15 09:07:42 UTC (rev 22770)
@@ -0,0 +1,9 @@
+###########################################################################################
+# Default JMS ESB config.
+##########################################################################################
+
+# Coordination settings...
+coordinator.heartbeat.frequency=5000
+
+# Buses to be deployed...
+bus.jms=org.jboss.esb.federate.bus.jms.JMSBus
\ No newline at end of file
Property changes on: labs/jbossesb/workspace/skeagh/runtime/src/main/resources/META-INF/jbossesb/default.properties
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Name: svn:eol-style
+ native
Modified: labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/jms/default.properties
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/jms/default.properties 2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/jms/default.properties 2008-09-15 09:07:42 UTC (rev 22770)
@@ -13,6 +13,8 @@
# Bus Queues and Topics...
deployment.coordintation.topic=jbossesb.deployment.coordintation.topic
+deployment.bus.queue=jbossesb.jms.bus
# ActiveMQ Queue and Topic deployments...
topic.jbossesb.deployment.coordintation.topic=jbossesb.deployment.coordintation.topic
+queue.jbossesb.jms.bus=jbossesb.jms.bus
Deleted: labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/main.properties
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/main.properties 2008-09-15 06:50:32 UTC (rev 22769)
+++ labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/main.properties 2008-09-15 09:07:42 UTC (rev 22770)
@@ -1,9 +0,0 @@
-###########################################################################################
-# Default JMS ESB config.
-##########################################################################################
-
-# Coordination settings...
-coordinator.heartbeat.frequency=700
-
-# Buses to be deployed...
-bus.jms=org.jboss.esb.federate.bus.jms.JMSBus
\ No newline at end of file
Copied: labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/default.properties (from rev 22750, labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/busconfig/main.properties)
===================================================================
--- labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/default.properties (rev 0)
+++ labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/default.properties 2008-09-15 09:07:42 UTC (rev 22770)
@@ -0,0 +1,9 @@
+###########################################################################################
+# Default JMS ESB config.
+##########################################################################################
+
+# Coordination settings...
+coordinator.heartbeat.frequency=700
+
+# Buses to be deployed...
+bus.jms=org.jboss.esb.federate.bus.jms.JMSBus
\ No newline at end of file
Property changes on: labs/jbossesb/workspace/skeagh/runtime/src/test/resources/META-INF/jbossesb/default.properties
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Name: svn:eol-style
+ native
More information about the jboss-svn-commits
mailing list