[jboss-cvs] JBoss Messaging SVN: r2842 - in trunk: docs/examples/bridge/etc and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jul 4 19:45:50 EDT 2007
Author: timfox
Date: 2007-07-04 19:45:50 -0400 (Wed, 04 Jul 2007)
New Revision: 2842
Modified:
trunk/build-messaging.xml
trunk/docs/examples/bridge/etc/test-bridge-service.xml
trunk/docs/examples/distributed-queue/src/org/jboss/example/jms/distributedqueue/DistributedQueueExample.java
trunk/docs/examples/distributed-topic/src/org/jboss/example/jms/distributedtopic/DistributedTopicExample.java
trunk/src/etc/xmdesc/Bridge-xmbean.xml
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
Updates to examples and various tweaks
Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml 2007-07-04 20:30:19 UTC (rev 2841)
+++ trunk/build-messaging.xml 2007-07-04 23:45:50 UTC (rev 2842)
@@ -386,7 +386,8 @@
<include name="org/jboss/jms/exception/*.class"/>
<include name="org/jboss/jms/wireformat/*.class"/>
<include name="org/jboss/messaging/util/**/*.class"/>
- <include name="org/jboss/messaging/core/message/**/*.class"/>
+ <include name="org/jboss/messaging/core/impl/message/**/*.class"/>
+ <include name="org/jboss/messaging/core/contract/**/*.class"/>
<include name="org/jboss/jms/server/remoting/**/*.class"/>
</fileset>
<fileset dir="${build.etc}">
Modified: trunk/docs/examples/bridge/etc/test-bridge-service.xml
===================================================================
--- trunk/docs/examples/bridge/etc/test-bridge-service.xml 2007-07-04 20:30:19 UTC (rev 2841)
+++ trunk/docs/examples/bridge/etc/test-bridge-service.xml 2007-07-04 23:45:50 UTC (rev 2842)
@@ -72,7 +72,11 @@
<!-- The maximum number of connection retries to make in case of failure, before giving up
-1 means try forever-->
<attribute name="MaxRetries">-1</attribute>
+
+ <!-- If true then the message id of the message before bridging will be added as a header to the message so it is available
+ to the receiver. Can then be sent as correlation id to correlate in a distributed request-response -->
+ <attribute name="AddMessageIDInHeader">false</attribute>
</mbean>
- </server>
\ No newline at end of file
+ </server>
Modified: trunk/docs/examples/distributed-queue/src/org/jboss/example/jms/distributedqueue/DistributedQueueExample.java
===================================================================
--- trunk/docs/examples/distributed-queue/src/org/jboss/example/jms/distributedqueue/DistributedQueueExample.java 2007-07-04 20:30:19 UTC (rev 2841)
+++ trunk/docs/examples/distributed-queue/src/org/jboss/example/jms/distributedqueue/DistributedQueueExample.java 2007-07-04 23:45:50 UTC (rev 2842)
@@ -35,9 +35,8 @@
/**
* The example creates two connections to two distinct cluster nodes on which we have previously
- * deployed a distributed queue. The example sends and receives messages using both connections.
+ * deployed a distributed queue. The example sends messages on one node and consumes them from another
*
- *
* Since this example is also used as a smoke test, it is essential that the VM exits with exit
* code 0 in case of successful execution and a non-zero value on failure.
*
@@ -81,18 +80,14 @@
// Let's make sure that (this example is also a smoke test)
assertNotEquals(getServerID(connection0), getServerID(connection1));
- // Create a session, a producer and a consumer on the first connection
+ // Create a session, and a producer on the first connection
Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer publisher0 = session0.createProducer(distributedQueue);
- MessageConsumer consumer0 = session0.createConsumer(distributedQueue);
- ExampleListener messageListener0 = new ExampleListener("MessageListener0");
- consumer0.setMessageListener(messageListener0);
+
+ // Create another session, and consumer on the second connection
- // Create another session, producer and consumer on the second connection
-
- Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer publisher1 = session1.createProducer(distributedQueue);
+ Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(distributedQueue);
ExampleListener messageListener1 = new ExampleListener("MessageListener1");
consumer1.setMessageListener(messageListener1);
@@ -107,25 +102,15 @@
TextMessage message = session0.createTextMessage("Hello!");
publisher0.send(message);
- message = session1.createTextMessage("Another Hello!");
- publisher1.send(message);
-
- log("The messages were successfully sent to the distributed queue");
-
- messageListener0.waitForMessage(3000);
-
- message = (TextMessage)messageListener0.getMessage();
- log(messageListener0.getName() + " received message: " + message.getText());
- assertEquals("Hello!", message.getText());
-
+ log("The message was successfully sent to the distributed queue");
+
messageListener1.waitForMessage(3000);
message = (TextMessage)messageListener1.getMessage();
log(messageListener1.getName() + " received message: " + message.getText());
- assertEquals("Another Hello!", message.getText());
-
+ assertEquals("Hello!", message.getText());
+
displayProviderInfo(connection0.getMetaData());
-
}
finally
{
Modified: trunk/docs/examples/distributed-topic/src/org/jboss/example/jms/distributedtopic/DistributedTopicExample.java
===================================================================
--- trunk/docs/examples/distributed-topic/src/org/jboss/example/jms/distributedtopic/DistributedTopicExample.java 2007-07-04 20:30:19 UTC (rev 2841)
+++ trunk/docs/examples/distributed-topic/src/org/jboss/example/jms/distributedtopic/DistributedTopicExample.java 2007-07-04 23:45:50 UTC (rev 2842)
@@ -34,7 +34,7 @@
import org.jboss.example.jms.common.ExampleSupport;
/**
- * The example sends a message to a distributed topic depolyed on the JMS cluster. The message is
+ * The example sends a message to a distributed topic deployed on the JMS cluster. The message is
* subsequently received by two different subscribers, connected to two distinct cluster nodes.
*
* Since this example is also used as a smoke test, it is essential that the VM exits with exit
Modified: trunk/src/etc/xmdesc/Bridge-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/Bridge-xmbean.xml 2007-07-04 20:30:19 UTC (rev 2841)
+++ trunk/src/etc/xmdesc/Bridge-xmbean.xml 2007-07-04 23:45:50 UTC (rev 2842)
@@ -129,6 +129,12 @@
<type>int</type>
</attribute>
+ <attribute access="read-write" getMethod="isAddMessageIDInHeader" setMethod="setAddMessageIDInHeader">
+ <description>Should the message id be added as a header in the message when bridging?</description>
+ <name>AddMessageIDInHeader</name>
+ <type>boolean</type>
+ </attribute>
+
<attribute access="read-only" getMethod="isFailed">
<description>Has the bridge failed?</description>
<name>Failed</name>
@@ -147,6 +153,7 @@
<type>boolean</type>
</attribute>
+
<!-- Managed operations -->
<operation>
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-07-04 20:30:19 UTC (rev 2841)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-07-04 23:45:50 UTC (rev 2842)
@@ -1085,7 +1085,7 @@
//Ok
}
- if (dest != null)
+ if (dest != null && dest.getName() != null)
{
Binding binding = postOffice.getBindingForQueueName(dest.getName());
@@ -1123,7 +1123,7 @@
//Ok
}
- if (dest != null)
+ if (dest != null && dest.getName() != null)
{
Binding binding = postOffice.getBindingForQueueName(dest.getName());
Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-07-04 20:30:19 UTC (rev 2841)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-07-04 23:45:50 UTC (rev 2842)
@@ -254,21 +254,14 @@
throw new IllegalArgumentException("Cannot find factory with name " + uniqueName);
}
- if (supportsFailover || supportsLoadBalancing)
+ if (replicator != null)
{
- setupReplicator();
-
- // Remove from replicants
- if (replicator != null)
- {
- //There may be no clustered post office deployed
- if (!replicator.remove(Replicator.CF_PREFIX + uniqueName))
- {
- throw new IllegalStateException("Cannot find replicant to remove: " +
- Replicator.CF_PREFIX + uniqueName);
- }
+ if (!replicator.remove(Replicator.CF_PREFIX + uniqueName))
+ {
+ throw new IllegalStateException("Cannot find replicant to remove: " +
+ Replicator.CF_PREFIX + uniqueName);
}
- }
+ }
Dispatcher.instance.unregisterTarget(endpoint.getID(), endpoint);
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-07-04 20:30:19 UTC (rev 2841)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-07-04 23:45:50 UTC (rev 2842)
@@ -177,7 +177,7 @@
try
{
if (notification.type == ClusterNotification.TYPE_REPLICATOR_PUT && notification.data instanceof String)
- {
+ {
String key = (String)notification.data;
if (key.startsWith(Replicator.CF_PREFIX))
@@ -189,8 +189,8 @@
if (uniqueName.equals(connectionFactoryUniqueName))
{
- log.trace(this + " deployment of ClusterConnectionFactory");
-
+ log.trace(this + " deployment of ClusterConnectionFactory");
+
synchronized (this)
{
ensureAllConnectionsCreated();
@@ -215,9 +215,7 @@
String uniqueName = key.substring(Replicator.CF_PREFIX.length());
if (uniqueName.equals(connectionFactoryUniqueName))
- {
- log.trace(this + " undeployment of ClusterConnectionFactory");
-
+ {
Map updatedReplicantMap = replicator.get(key);
List toRemove = new ArrayList();
@@ -298,7 +296,7 @@
//Look for local queue
Binding localBinding = postOffice.getBindingForQueueName(queueName);
-
+
if (localBinding == null)
{
//This is ok - the queue was deployed on the remote node before being deployed on the local node - do nothing for now
@@ -311,11 +309,11 @@
if (trace) { log.trace(this + " Creating sucker"); }
createSucker(queueName, notification.nodeID);
- }
+ }
}
}
else if (notification.type == ClusterNotification.TYPE_UNBIND)
- {
+ {
String queueName = (String)notification.data;
if (notification.nodeID == this.nodeID)
@@ -332,8 +330,7 @@
//We need to remove the sucker corresponding to the remote queue
- removeSucker(queueName, notification.nodeID);
-
+ removeSucker(queueName, notification.nodeID);
}
}
}
@@ -402,6 +399,8 @@
if (info == null)
{
if (trace) { log.trace("Cluster pull connection factory has not yet been deployed on local node"); }
+
+ return;
}
//Only create if it isn't already there
@@ -415,14 +414,17 @@
Binding binding = this.postOffice.getBindingForQueueName(queueName);
Queue localQueue = binding.queue;
+
+ if (localQueue.isClustered())
+ {
+ MessageSucker sucker = new MessageSucker(localQueue, info.connection, localInfo.connection, xa, preserveOrdering);
+
+ info.addSucker(sucker);
+
+ sucker.start();
- MessageSucker sucker = new MessageSucker(localQueue, info.connection, localInfo.connection, xa, preserveOrdering);
-
- info.addSucker(sucker);
-
- sucker.start();
-
- if (trace) { log.trace("Started it"); }
+ if (trace) { log.trace("Started it"); }
+ }
}
else
{
@@ -478,23 +480,26 @@
Iterator iter = allBindings.iterator();
Map nameMap = new HashMap();
-
+
//This can probably be greatly optimised
while (iter.hasNext())
{
Binding binding = (Binding)iter.next();
- List queues = (List)nameMap.get(binding.queue.getName());
-
- if (queues == null)
- {
- queues = new ArrayList();
+ if (binding.queue.isClustered())
+ {
+ List queues = (List)nameMap.get(binding.queue.getName());
- nameMap.put(binding.queue.getName(), queues);
+ if (queues == null)
+ {
+ queues = new ArrayList();
+
+ nameMap.put(binding.queue.getName(), queues);
+ }
+
+ queues.add(binding.queue);
}
-
- queues.add(binding.queue);
}
iter = nameMap.entrySet().iterator();
@@ -529,14 +534,14 @@
{
iter2 = queues.iterator();
- while (iter.hasNext())
+ while (iter2.hasNext())
{
Queue queue = (Queue)iter2.next();
- if (queue.getNodeID() != this.nodeID)
+ if (queue.getNodeID() != this.nodeID && queue.isClustered())
{
//Now we have found a local and remote with matching names - so we can create a sucker
-
+
createSucker(queueName, queue.getNodeID());
}
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-04 20:30:19 UTC (rev 2841)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-04 23:45:50 UTC (rev 2842)
@@ -967,6 +967,7 @@
}
}
+ log.info("*** sending remove notification");
ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_REPLICATOR_REMOVE, originatorNodeID, key);
clusterNotifier.sendNotification(notification);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java 2007-07-04 20:30:19 UTC (rev 2841)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java 2007-07-04 23:45:50 UTC (rev 2842)
@@ -72,12 +72,10 @@
protected void tearDown() throws Exception
{
- super.tearDown();
-
+ super.tearDown();
}
-
-
+
public void testStopStartPauseResume() throws Exception
{
ServerManagement.deployQueue("sourceQueue", 1);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java 2007-07-04 20:30:19 UTC (rev 2841)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java 2007-07-04 23:45:50 UTC (rev 2842)
@@ -65,70 +65,220 @@
// Public ---------------------------------------------------------------------------------------
public void test1() throws Exception
- {
- deployCF();
-
+ {
+ deployCFLocal();
+ deployCFRemote();
deployLocal();
-
deployRemote();
-
suck();
}
-
+
public void test2() throws Exception
- {
- deployCF();
-
+ {
+ deployCFLocal();
+ deployCFRemote();
deployRemote();
-
deployLocal();
-
suck();
}
-
+
public void test3() throws Exception
- {
+ {
+ deployCFLocal();
+ deployLocal();
+ deployCFRemote();
deployRemote();
-
- deployCF();
-
- deployLocal();
-
suck();
}
-
+
public void test4() throws Exception
- {
+ {
+ deployCFLocal();
+ deployLocal();
deployRemote();
-
- deployLocal();
-
- deployCF();
-
+ deployCFRemote();
suck();
}
-
+
public void test5() throws Exception
- {
+ {
+ deployCFLocal();
+ deployRemote();
+ deployCFRemote();
deployLocal();
-
- deployRemote();
-
- deployCF();
-
suck();
}
-
+
public void test6() throws Exception
- {
+ {
+ deployCFLocal();
+ deployRemote();
deployLocal();
-
- deployCF();
-
+ deployCFRemote();
+ suck();
+ }
+
+ public void test7() throws Exception
+ {
+ deployCFRemote();
+ deployCFLocal();
+ deployLocal();
deployRemote();
-
suck();
}
+
+ public void test8() throws Exception
+ {
+ deployCFRemote();
+ deployCFLocal();
+ deployRemote();
+ deployLocal();
+ suck();
+ }
+
+ public void test9() throws Exception
+ {
+ deployCFRemote();
+ deployLocal();
+ deployCFLocal();
+ deployRemote();
+ suck();
+ }
+
+ public void test10() throws Exception
+ {
+ deployCFRemote();
+ deployLocal();
+ deployRemote();
+ deployCFLocal();
+ suck();
+ }
+
+ public void test11() throws Exception
+ {
+ deployCFRemote();
+ deployRemote();
+ deployCFLocal();
+ deployLocal();
+ suck();
+ }
+
+ public void test12() throws Exception
+ {
+ deployCFRemote();
+ deployRemote();
+ deployLocal();
+ deployCFLocal();
+ suck();
+ }
+
+ public void test13() throws Exception
+ {
+ deployLocal();
+ deployCFLocal();
+ deployCFRemote();
+ deployRemote();
+ suck();
+ }
+
+ public void test14() throws Exception
+ {
+ deployLocal();
+ deployCFLocal();
+ deployRemote();
+ deployCFRemote();
+ suck();
+ }
+
+ public void test15() throws Exception
+ {
+ deployLocal();
+ deployCFRemote();
+ deployCFLocal();
+ deployRemote();
+ suck();
+ }
+
+ public void test16() throws Exception
+ {
+ deployLocal();
+ deployCFRemote();
+ deployRemote();
+ deployCFLocal();
+ suck();
+ }
+
+ public void test17() throws Exception
+ {
+ deployLocal();
+ deployRemote();
+ deployCFLocal();
+ deployCFRemote();
+ suck();
+ }
+
+ public void test18() throws Exception
+ {
+ deployLocal();
+ deployRemote();
+ deployCFRemote();
+ deployCFLocal();
+ suck();
+ }
+
+ public void test19() throws Exception
+ {
+ deployRemote();
+ deployCFLocal();
+ deployCFRemote();
+ deployLocal();
+ suck();
+ }
+
+ public void test20() throws Exception
+ {
+ deployRemote();
+ deployCFLocal();
+ deployLocal();
+ deployCFRemote();
+ suck();
+ }
+
+ public void test21() throws Exception
+ {
+ deployRemote();
+ deployCFRemote();
+ deployCFLocal();
+ deployLocal();
+ suck();
+ }
+
+ public void test22() throws Exception
+ {
+ deployRemote();
+ deployCFRemote();
+ deployLocal();
+ deployCFLocal();
+ suck();
+ }
+
+ public void test23() throws Exception
+ {
+ deployRemote();
+ deployLocal();
+ deployCFLocal();
+ deployCFRemote();
+ suck();
+ }
+
+ public void test24() throws Exception
+ {
+ deployRemote();
+ deployLocal();
+ deployCFRemote();
+ deployCFLocal();
+ suck();
+ }
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
@@ -136,41 +286,72 @@
protected void setUp() throws Exception
{
nodeCount = 2;
+
super.setUp();
log.debug("setup done");
//undeploy CF
+ undeployAll();
+ }
+
+ private void undeployAll() throws Exception
+ {
+ ServerManagement.undeployQueue("suckQueue", 0);
+
+ ServerManagement.undeployQueue("suckQueue", 1);
+
String cfName =
(String)ServerManagement.getServer(1).getAttribute(ServerManagement.getServerPeerObjectName(), "ClusterPullConnectionFactoryName");
- //undeploy cf on node 1
- ServerManagement.undeployConnectionFactory(new ObjectName(cfName));
+ log.info("CF name is " + cfName);
+
+
+ //undeploy cf on node 0
+ try
+ {
+ ServerManagement.getServer(0).undeployConnectionFactory(new ObjectName(cfName));
+ }
+ catch (Exception ignore)
+ {}
+
+ //undeploy cf on node 0
+ try
+ {
+ ServerManagement.getServer(1).undeployConnectionFactory(new ObjectName(cfName));
+ }
+ catch (Exception ignore)
+ {}
}
-
+
protected void tearDown() throws Exception
{
- ServerManagement.undeployQueue("suckQueue", 0);
+ undeployAll();
- ServerManagement.undeployQueue("suckQueue", 1);
-
super.tearDown();
}
// Private --------------------------------------------------------------------------------------
-
-
-
- private void deployCF() throws Exception
+
+ private void deployCFRemote() throws Exception
{
String cfName =
(String)ServerManagement.getServer(1).getAttribute(ServerManagement.getServerPeerObjectName(), "ClusterPullConnectionFactoryName");
//Deploy cf on node 1
- ServerManagement.deployConnectionFactory(cfName, null, 150);
+ ServerManagement.getServer(1).deployConnectionFactory(cfName, null, 150);
}
+ private void deployCFLocal() throws Exception
+ {
+ String cfName =
+ (String)ServerManagement.getServer(0).getAttribute(ServerManagement.getServerPeerObjectName(), "ClusterPullConnectionFactoryName");
+
+ //Deploy cf on node 0
+ ServerManagement.getServer(0).deployConnectionFactory(cfName, null, 150);
+ }
+
private void deployLocal() throws Exception
{
ServerManagement.deployQueue("suckQueue", 1);
@@ -203,6 +384,8 @@
{
conn0 = this.createConnectionOnServer(cf0, 0);
+ assertEquals(0, this.getServerId(conn0));
+
//Send some messages on node 0
final int NUM_MESSAGES = 100;
@@ -222,6 +405,8 @@
conn1 = this.createConnectionOnServer(cf1, 1);
+ assertEquals(1, this.getServerId(conn1));
+
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons1 = sess1.createConsumer(queue1);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2007-07-04 20:30:19 UTC (rev 2841)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2007-07-04 23:45:50 UTC (rev 2842)
@@ -322,6 +322,8 @@
log.info("************** STOPPING SERVER 0");
ServerManagement.stop(0);
+
+ Thread.sleep(10000);
log.info("server stopped");
@@ -372,6 +374,8 @@
log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
ServerManagement.stop(1);
+
+ Thread.sleep(10000);
assertEquals(1, ServerManagement.getServer(2).getNodeIDView().size());
More information about the jboss-cvs-commits
mailing list