[jboss-cvs] JBoss Messaging SVN: r2947 - in trunk: src/main/org/jboss/jms/server/endpoint and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Aug 3 05:51:24 EDT 2007
Author: timfox
Date: 2007-08-03 05:51:23 -0400 (Fri, 03 Aug 2007)
New Revision: 2947
Modified:
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/impl/FailoverWaiter.java
trunk/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
trunk/tests/build.xml
trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java
trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/ChangeFailoverNodeTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterViewUpdateTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java
trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java
trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
trunk/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
trunk/tests/src/org/jboss/test/messaging/tools/container/Server.java
trunk/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java
trunk/tests/src/org/jboss/test/thirdparty/remoting/InvokerReferenceCountTest.java
Log:
Fix clustering tests interim
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -1042,6 +1042,11 @@
// Public ---------------------------------------------------------------------------------------
+ public void resetAllSuckers()
+ {
+ clusterConnectionManager.resetAllSuckers();
+ }
+
public byte[] getClientAOPStack()
{
return clientAOPStack;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -1126,7 +1126,7 @@
JBossMessage copy = makeCopyForDLQOrExpiry(true, del);
- moveInTransaction(copy, del, expiryQueue);
+ moveInTransaction(copy, del, expiryQueue, true);
}
else
{
@@ -1655,7 +1655,7 @@
JBossMessage copy = makeCopyForDLQOrExpiry(true, del);
- moveInTransaction(copy, del, rec.expiryQueue);
+ moveInTransaction(copy, del, rec.expiryQueue, false);
}
else
{
@@ -1663,7 +1663,7 @@
JBossMessage copy = makeCopyForDLQOrExpiry(false, del);
- moveInTransaction(copy, del, rec.dlq);
+ moveInTransaction(copy, del, rec.dlq, true);
}
}
@@ -1712,7 +1712,7 @@
return copy;
}
- private void moveInTransaction(JBossMessage msg, Delivery del, Queue queue) throws Throwable
+ private void moveInTransaction(JBossMessage msg, Delivery del, Queue queue, boolean dlq) throws Throwable
{
Transaction tx = tr.createTransaction();
@@ -1727,8 +1727,7 @@
}
else
{
- log.warn("Cannot move to destination since destination has not been deployed! " +
- "The message will be removed");
+ log.warn("No " + (dlq ? "DLQ" : "expiry queue") + " has been specified so the message will be removed");
del.acknowledge(tx);
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/FailoverWaiter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/FailoverWaiter.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/src/main/org/jboss/messaging/core/impl/FailoverWaiter.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -173,5 +173,20 @@
failoverStatusLock.notifyAll();
}
}
+ else if (notification.type == ClusterNotification.TYPE_NODE_JOIN)
+ {
+ synchronized (failoverStatusLock)
+ {
+ //A node that we previously failed over for has been restarted so we wipe the failover status
+ //It is vital that we do this otherwise if the resurrected node subsequently fails again
+ //when connections try to reconnect they will think that failover is already complete
+ if (notification.nodeID == failedOverFor)
+ {
+ failedOverFor = -1;
+
+ failoverStatusLock.notifyAll();
+ }
+ }
+ }
}
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -155,13 +155,11 @@
if (createTablesOnStartup)
{
createSchema();
- }
-
- log.debug(this + " started");
+ }
}
+
public void stop() throws Exception
{
- log.debug(this + " stopped");
}
// Protected ----------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -254,6 +254,8 @@
recoveryMap.put(new Long(message.getMessageID()), re);
+ deliveringCount.increment();
+
iter.remove();
toTimeout.addLast(ref);
@@ -265,8 +267,7 @@
//This can happen if a delivery is replicated, the message delivered, then acked, then the node crashes
//before the ack is replicated.
//This is ok
-
-
+
//Set up a timeout to put the refs back in the queue if they don't get claimed by failed over consumers
MessagingTimeoutFactory.instance.getFactory().
@@ -283,6 +284,8 @@
{
if (trace) { log.trace("Recovering deliveries"); }
+ log.info("There are " + recoveryMap.size() + " entries in map");
+
List refs = new ArrayList();
Iterator iter = messageIds.iterator();
@@ -303,6 +306,10 @@
refs.add(del);
}
+ else
+ {
+ log.info("Can't find entry for message id " + messageID + " in map");
+ }
}
return refs;
@@ -693,6 +700,8 @@
synchronized (lock)
{
messageRefs.addFirst(ref, ref.getMessage().getPriority());
+
+ deliveringCount.decrement();
}
added = true;
Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -142,6 +142,18 @@
return connections;
}
+ public void resetAllSuckers()
+ {
+ Iterator iter = connections.values().iterator();
+
+ while (iter.hasNext())
+ {
+ ConnectionInfo conn = (ConnectionInfo)iter.next();
+
+ conn.resetAllSuckers();
+ }
+ }
+
/*
* We respond to two types of events -
*
@@ -595,6 +607,20 @@
started = false;
}
+ synchronized void resetAllSuckers()
+ {
+ Iterator iter = suckers.values().iterator();
+
+ while (iter.hasNext())
+ {
+ MessageSucker sucker = (MessageSucker)iter.next();
+
+ sucker.setConsuming(false);
+ }
+
+
+ }
+
synchronized void close()
{
Iterator iter = suckers.values().iterator();
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/tests/build.xml 2007-08-03 09:51:23 UTC (rev 2947)
@@ -708,7 +708,8 @@
<echo message=""/>
<junit printsummary="${junit.printsummary}"
- fork="${junit.fork}"
+ fork="on"
+ forkMode="once"
includeantruntime="yes"
haltonerror="${junit.haltonerror}"
haltonfailure="${junit.haltonfailure}"
@@ -737,8 +738,7 @@
<formatter classname="org.jboss.ant.taskdefs.XMLJUnitMultipleResultFormatter"
usefile="${junit.formatter.usefile}" extension="-Clustering.xml"/>
- <batchtest fork="${junit.batchtest.fork}"
- todir="${junit.batchtest.todir}"
+ <batchtest todir="${junit.batchtest.todir}"
haltonfailure="${junit.batchtest.haltonfailure}"
haltonerror="${junit.batchtest.haltonerror}">
<formatter type="plain" usefile="${junit.formatter.usefile}"/>
@@ -750,8 +750,14 @@
<formatter classname="org.jboss.test.messaging.tools.ant.JUnitTestSuiteListener"/>
<fileset dir="${build.tests.classes}">
- <include name="**/jms/clustering/${test-mask}.class"/>
+ <!-- <include name="**/jms/clustering/${test-mask}.class"/> -->
<exclude name="**/jms/clustering/ClusterLeakTest.class"/>
+<include name="**/jms/clustering/ChangeFailoverNodeTest.class"/>
+<include name="**/jms/clustering/ClusterConnectionManagerTest.class"/>
+<include name="**/jms/clustering/ClusteredConnectionFactoryTest.class"/>
+<!--
+<include name="**/jms/clustering/ClusterViewUpdateTest.class"/>
+-->
</fileset>
</batchtest>
</junit>
Modified: trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -126,17 +126,6 @@
ServerManagement.getServer(server).invoke(new ObjectName(on), "removeAllMessages", null, null);
}
- protected void checkEmpty2(Queue queue) throws Exception
- {
- ObjectName destObjectName = new ObjectName("jboss.messaging.destination:service=Queue,name=" + queue.getQueueName());
-
- Integer messageCount = (Integer)ServerManagement.getAttribute(destObjectName, "MessageCount");
-
- Integer deliveringCount = (Integer)ServerManagement.getAttribute(destObjectName, "DeliveringCount");
-
- assertEquals(0, messageCount.intValue() - deliveringCount.intValue());
- }
-
protected void checkEmpty(Queue queue) throws Exception
{
ObjectName destObjectName = new ObjectName("jboss.messaging.destination:service=Queue,name=" + queue.getQueueName());
@@ -151,8 +140,11 @@
ObjectName destObjectName = new ObjectName("jboss.messaging.destination:service=Queue,name=" + queue.getQueueName());
Integer messageCount = (Integer)ServerManagement.getServer(server).getAttribute(destObjectName, "MessageCount");
-
- assertEquals(0, messageCount.intValue());
+
+ if (messageCount.intValue() != 0)
+ {
+ fail("Message count for queue " + queue.getQueueName() + " on server " + server + " is " + messageCount);
+ }
}
protected void checkEmpty(Topic topic) throws Exception
@@ -163,21 +155,21 @@
assertEquals(0, messageCount.intValue());
}
-
- protected void checkEmpty(Topic topic, String subName) throws Exception
+
+ protected void checkNoSubscriptions(Topic topic) throws Exception
{
ObjectName destObjectName = new ObjectName("jboss.messaging.destination:service=Topic,name=" + topic.getTopicName());
- List msgs = (List)ServerManagement.invoke(destObjectName, "listAllMessages", new Object[] { subName }, new String[] { "java.lang.String" });
-
- assertEquals(0, msgs.size());
+ Integer messageCount = (Integer)ServerManagement.getAttribute(destObjectName, "AllSubscriptionsCount");
+
+ assertEquals(0, messageCount.intValue());
}
- protected void checkNoSubscriptions(Topic topic) throws Exception
+ protected void checkNoSubscriptions(Topic topic, int server) throws Exception
{
ObjectName destObjectName = new ObjectName("jboss.messaging.destination:service=Topic,name=" + topic.getTopicName());
- Integer messageCount = (Integer)ServerManagement.getAttribute(destObjectName, "AllSubscriptionsCount");
+ Integer messageCount = (Integer)ServerManagement.getServer(server).getAttribute(destObjectName, "AllSubscriptionsCount");
assertEquals(0, messageCount.intValue());
}
@@ -194,14 +186,6 @@
return expected == messageCount.intValue();
}
- protected int getMessageCount(String queueName) throws Exception
- {
- ObjectName destObjectName =
- new ObjectName("jboss.messaging.destination:service=Queue,name=" + queueName);
- Integer messageCount = (Integer)ServerManagement.getAttribute(destObjectName, "MessageCount");
- return messageCount.intValue();
- }
-
protected void drainDestination(ConnectionFactory cf, Destination dest) throws Exception
{
Connection conn = null;
Modified: trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -168,12 +168,6 @@
Queue overrideExpiry = (Queue)ic.lookup("/queue/OverrideExpiry");
- drainDestination(cf, testQueue);
-
- drainDestination(cf, defaultExpiry);
-
- drainDestination(cf, overrideExpiry);
-
conn = cf.createConnection();
{
@@ -300,8 +294,7 @@
Connection conn = null;
try
- {
-
+ {
ServerManagement.deployQueue("ExpiryQueue");
String defaultExpiryObjectName = "jboss.messaging.destination:service=Queue,name=ExpiryQueue";
@@ -312,8 +305,6 @@
Queue defaultExpiry = (Queue)ic.lookup("/queue/ExpiryQueue");
- drainDestination(cf, defaultExpiry);
-
conn = cf.createConnection();
conn.setClientID("wib1");
@@ -453,8 +444,6 @@
Queue expiryQueue = (Queue)ic.lookup("/queue/ExpiryQueue");
- drainDestination(cf, expiryQueue);
-
final int NUM_MESSAGES = 5;
conn = cf.createConnection();
@@ -536,8 +525,6 @@
Queue expiryQueue = (Queue)ic.lookup("/queue/ExpiryQueue");
- drainDestination(cf, expiryQueue);
-
final int NUM_MESSAGES = 5;
conn = cf.createConnection();
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ChangeFailoverNodeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ChangeFailoverNodeTest.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ChangeFailoverNodeTest.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -45,7 +45,7 @@
* $Id: $
*
*/
-public class ChangeFailoverNodeTest extends ClusteringTestBase
+public class ChangeFailoverNodeTest extends NewClusteringTestBase
{
// Constants -----------------------------------------------------
@@ -53,6 +53,8 @@
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
+
+ private String queueName = "testDistributedQueue";
// Constructors --------------------------------------------------
@@ -92,18 +94,8 @@
public void testAddNodeToGetNewFailoverNodeTransactional() throws Exception
{
this.addNodeToGetNewFailoverNode(true);
- }
+ }
- public void testKillTwoFailoverNodesNonTransactional() throws Exception
- {
- this.killTwoFailoverNodes(false);
- }
-
- public void testKillTwoFailoverNodesTransactional() throws Exception
- {
- this.killTwoFailoverNodes(true);
- }
-
public void testKillAllToOneAndBackAgainNonTransactional() throws Exception
{
this.killAllToOneAndBackAgain(false);
@@ -118,45 +110,20 @@
// Protected -----------------------------------------------------
- protected void setUp() throws Exception
- {
- nodeCount = 4;
-
- super.setUp();
-
- log.debug("setup done");
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
-
- for (int i = 0; i < nodeCount; i++)
- {
- if (ServerManagement.isStarted(i))
- {
- ServerManagement.log(ServerManagement.INFO, "Undeploying Server " + i, i);
- ServerManagement.stop(i);
- }
- }
- }
-
// Private -------------------------------------------------------
private void killAllToOneAndBackAgain(boolean transactional) throws Exception
{
- JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
-
- Connection conn0 = createConnectionOnServer(factory, 0);
+ Connection conn1 = createConnectionOnServer(cf, 1);
try
{
SimpleFailoverListener failoverListener = new SimpleFailoverListener();
- ((JBossConnection)conn0).registerFailoverListener(failoverListener);
+ ((JBossConnection)conn1).registerFailoverListener(failoverListener);
- Session sessSend = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sessSend = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod0 = sessSend.createProducer(queue[0]);
+ MessageProducer prod1 = sessSend.createProducer(queue[1]);
final int numMessages = 10;
@@ -164,23 +131,23 @@
{
TextMessage tm = sessSend.createTextMessage("message" + i);
- prod0.send(tm);
+ prod1.send(tm);
}
sessSend.close();
- Session sess0 = conn0.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+ Session sess1 = conn1.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer cons0 = sess0.createConsumer(queue[0]);
+ MessageConsumer cons1 = sess1.createConsumer(queue[1]);
- conn0.start();
+ conn1.start();
TextMessage tm = null;
for (int i = 0; i < numMessages; i++)
{
- tm = (TextMessage)cons0.receive(2000);
+ tm = (TextMessage)cons1.receive(2000);
assertNotNull(tm);
@@ -189,87 +156,107 @@
//Don't ack
- int failoverNodeId = this.getFailoverNodeForNode(factory, 0);
+ int failoverNodeId = this.getFailoverNodeForNode(cf, 1);
- log.info("Failover node for node 0 is " + failoverNodeId);
+ log.info("Failover node for node 1 is " + failoverNodeId);
- int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
assertEquals(0, recoveryMapSize);
- Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
- Map ids = (Map)recoveryArea.get(new Integer(0));
+ Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
+ Map ids = (Map)recoveryArea.get(new Integer(1));
assertNotNull(ids);
assertEquals(numMessages, ids.size());
- //Now kill the failover node
+ //Now kill/stop the failover node
- log.info("killing node " + failoverNodeId);
- ServerManagement.kill(failoverNodeId);
+ log.info("killing/stoppin node " + failoverNodeId);
+ if (failoverNodeId != 0)
+ {
+ ServerManagement.kill(failoverNodeId);
+ }
+ else
+ {
+ ServerManagement.stop(failoverNodeId);
+ }
Thread.sleep(5000);
- int newFailoverNodeId = this.getFailoverNodeForNode(factory, 0);
+ int newFailoverNodeId = this.getFailoverNodeForNode(cf, 1);
- recoveryMapSize = ServerManagement.getServer(newFailoverNodeId).getRecoveryMapSize(queue[newFailoverNodeId].getQueueName());
+ recoveryMapSize = ServerManagement.getServer(newFailoverNodeId).getRecoveryMapSize(queueName);
assertEquals(0, recoveryMapSize);
- recoveryArea = ServerManagement.getServer(newFailoverNodeId).getRecoveryArea(queue[newFailoverNodeId].getQueueName());
- ids = (Map)recoveryArea.get(new Integer(0));
+ recoveryArea = ServerManagement.getServer(newFailoverNodeId).getRecoveryArea(queueName);
+ ids = (Map)recoveryArea.get(new Integer(1));
assertNotNull(ids);
assertEquals(numMessages, ids.size());
//Now kill the second failover node
- log.info("killing node " + newFailoverNodeId);
- ServerManagement.kill(newFailoverNodeId);
+ log.info("killing/stoppin node " + newFailoverNodeId);
+ if (newFailoverNodeId != 0)
+ {
+ ServerManagement.kill(newFailoverNodeId);
+ }
+ else
+ {
+ ServerManagement.stop(newFailoverNodeId);
+ }
Thread.sleep(5000);
- int evennewerFailoverNodeId = this.getFailoverNodeForNode(factory, 0);
+ int evennewerFailoverNodeId = this.getFailoverNodeForNode(cf, 1);
- recoveryMapSize = ServerManagement.getServer(evennewerFailoverNodeId).getRecoveryMapSize(queue[evennewerFailoverNodeId].getQueueName());
+ recoveryMapSize = ServerManagement.getServer(evennewerFailoverNodeId).getRecoveryMapSize(queueName);
assertEquals(0, recoveryMapSize);
- recoveryArea = ServerManagement.getServer(evennewerFailoverNodeId).getRecoveryArea(queue[evennewerFailoverNodeId].getQueueName());
- ids = (Map)recoveryArea.get(new Integer(0));
+ recoveryArea = ServerManagement.getServer(evennewerFailoverNodeId).getRecoveryArea(queueName);
+ ids = (Map)recoveryArea.get(new Integer(1));
assertNotNull(ids);
assertEquals(numMessages, ids.size());
//Now kill the third failover node
- log.info("killing node " + evennewerFailoverNodeId);
- ServerManagement.kill(evennewerFailoverNodeId);
+ log.info("killing/stoppin node " + evennewerFailoverNodeId);
+ if (evennewerFailoverNodeId != 0)
+ {
+ ServerManagement.kill(evennewerFailoverNodeId);
+ }
+ else
+ {
+ ServerManagement.stop(evennewerFailoverNodeId);
+ }
//This just leaves the current node
Thread.sleep(5000);
- int evenevennewerFailoverNodeId = this.getFailoverNodeForNode(factory, 0);
+ int evenevennewerFailoverNodeId = this.getFailoverNodeForNode(cf, 1);
- assertEquals(0, evenevennewerFailoverNodeId);
+ assertEquals(1, evenevennewerFailoverNodeId);
//Add a node
- ServerManagement.start(1, "all", false);
+ ServerManagement.start(4, "all", false);
- ServerManagement.deployQueue("testDistributedQueue", 1);
+ ServerManagement.deployQueue("testDistributedQueue", 4);
Thread.sleep(5000);
- log.info("started node 1");
+ log.info("started node 4");
- int evenevenevennewerFailoverNodeId = this.getFailoverNodeForNode(factory, 0);
+ int evenevenevennewerFailoverNodeId = this.getFailoverNodeForNode(cf, 1);
- recoveryMapSize = ServerManagement.getServer(evenevenevennewerFailoverNodeId).getRecoveryMapSize(queue[evenevenevennewerFailoverNodeId].getQueueName());
+ recoveryMapSize = ServerManagement.getServer(evenevenevennewerFailoverNodeId).getRecoveryMapSize(queueName);
assertEquals(0, recoveryMapSize);
- recoveryArea = ServerManagement.getServer(evenevenevennewerFailoverNodeId).getRecoveryArea(queue[evenevenevennewerFailoverNodeId].getQueueName());
- ids = (Map)recoveryArea.get(new Integer(0));
+ recoveryArea = ServerManagement.getServer(evenevenevennewerFailoverNodeId).getRecoveryArea(queueName);
+ ids = (Map)recoveryArea.get(new Integer(1));
assertNotNull(ids);
assertEquals(numMessages, ids.size());
- //Now kill the node itself
-
- ServerManagement.kill(0);
+ ServerManagement.kill(1);
+
log.info("########");
- log.info("######## KILLED NODE 0");
+ log.info("######## KILLED NODE 1");
log.info("########");
// wait for the client-side failover to complete
@@ -278,7 +265,7 @@
while(true)
{
- FailoverEvent event = failoverListener.getEvent(30000);
+ FailoverEvent event = failoverListener.getEvent(120000);
if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
{
break;
@@ -291,13 +278,13 @@
log.info("Failover completed");
- assertEquals(evenevenevennewerFailoverNodeId, getServerId(conn0));
+ assertEquals(evenevenevennewerFailoverNodeId, getServerId(conn1));
//Now ack
if (transactional)
{
- sess0.commit();
+ sess1.commit();
}
else
{
@@ -306,238 +293,66 @@
log.info("acked");
- sess0.close();
+ sess1.close();
log.info("closed");
- sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
log.info("created new session");
- cons0 = sess0.createConsumer(queue[0]);
+ cons1 = sess1.createConsumer(queue[0]);
log.info("Created consumer");
//Messages should be gone
- tm = (TextMessage)cons0.receive(5000);
+ tm = (TextMessage)cons1.receive(5000);
assertNull(tm);
- recoveryMapSize = ServerManagement.getServer(evenevenevennewerFailoverNodeId).getRecoveryMapSize(queue[evenevenevennewerFailoverNodeId].getQueueName());
+ recoveryMapSize = ServerManagement.getServer(evenevenevennewerFailoverNodeId).getRecoveryMapSize(queueName);
assertEquals(0, recoveryMapSize);
- recoveryArea = ServerManagement.getServer(evenevenevennewerFailoverNodeId).getRecoveryArea(queue[evenevenevennewerFailoverNodeId].getQueueName());
- ids = (Map)recoveryArea.get(new Integer(0));
+ recoveryArea = ServerManagement.getServer(evenevenevennewerFailoverNodeId).getRecoveryArea(queueName);
+ ids = (Map)recoveryArea.get(new Integer(1));
assertNull(ids);
}
finally
{
- if (conn0 != null)
+ if (conn1 != null)
{
- conn0.close();
- }
+ conn1.close();
+ }
- // Since we kill the rmi server in this test, we must kill the other servers too
-
- for (int i = nodeCount - 1; i >= 0; i--)
- {
- ServerManagement.kill(i);
- }
+ ServerManagement.kill(4);
}
}
- private void killTwoFailoverNodes(boolean transactional) throws Exception
+ private void addNodeToGetNewFailoverNode(boolean transactional) throws Exception
{
- JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
-
- Connection conn0 = createConnectionOnServer(factory, 0);
+ Connection conn = null;
try
- {
- SimpleFailoverListener failoverListener = new SimpleFailoverListener();
- ((JBossConnection)conn0).registerFailoverListener(failoverListener);
+ {
+ //First we must find the node that fails over onto zero since this is the one that will change when
+ //we add a node
+
+ int nodeID = this.getNodeThatFailsOverOnto(cf, 0);
- Session sessSend = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod0 = sessSend.createProducer(queue[0]);
+ conn = createConnectionOnServer(cf, nodeID);
- final int numMessages = 10;
-
- for (int i = 0; i < numMessages; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod0.send(tm);
- }
-
- Session sess0 = conn0.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
-
- MessageConsumer cons0 = sess0.createConsumer(queue[0]);
-
- conn0.start();
-
- TextMessage tm = null;
-
- for (int i = 0; i < numMessages; i++)
- {
- tm = (TextMessage)cons0.receive(2000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- //Don't ack
-
- int failoverNodeId = this.getFailoverNodeForNode(factory, 0);
-
- log.info("Failover node for node 0 is " + failoverNodeId);
-
- dumpFailoverMap(ServerManagement.getServer(0).getFailoverMap());
-
- int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
- assertEquals(0, recoveryMapSize);
- Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
- Map ids = (Map)recoveryArea.get(new Integer(0));
- assertNotNull(ids);
- assertEquals(numMessages, ids.size());
-
- //Now kill the failover node
-
- log.info("killing node " + failoverNodeId);
- ServerManagement.kill(failoverNodeId);
-
- Thread.sleep(5000);
-
- int newFailoverNodeId = this.getFailoverNodeForNode(factory, 0);
-
- log.info("New Failover node for node 0 is " + newFailoverNodeId);
-
- recoveryMapSize = ServerManagement.getServer(newFailoverNodeId).getRecoveryMapSize(queue[newFailoverNodeId].getQueueName());
- assertEquals(0, recoveryMapSize);
- recoveryArea = ServerManagement.getServer(newFailoverNodeId).getRecoveryArea(queue[newFailoverNodeId].getQueueName());
- ids = (Map)recoveryArea.get(new Integer(0));
- assertNotNull(ids);
- assertEquals(numMessages, ids.size());
-
- //Now kill the second failover node
-
- log.info("killing node " + newFailoverNodeId);
- ServerManagement.kill(newFailoverNodeId);
-
- Thread.sleep(5000);
-
- int evennewerFailoverNodeId = this.getFailoverNodeForNode(factory, 0);
-
- recoveryMapSize = ServerManagement.getServer(evennewerFailoverNodeId).getRecoveryMapSize(queue[evennewerFailoverNodeId].getQueueName());
- assertEquals(0, recoveryMapSize);
- recoveryArea = ServerManagement.getServer(evennewerFailoverNodeId).getRecoveryArea(queue[evennewerFailoverNodeId].getQueueName());
- ids = (Map)recoveryArea.get(new Integer(0));
- assertNotNull(ids);
- assertEquals(numMessages, ids.size());
-
- log.info("New Failover node for node 0 is " + evennewerFailoverNodeId);
-
- //Now kill the node itself
-
- ServerManagement.kill(0);
-
- log.info("########");
- log.info("######## KILLED NODE 0");
- log.info("########");
-
- // wait for the client-side failover to complete
-
- log.info("Waiting for failover to complete");
-
- while(true)
- {
- FailoverEvent event = failoverListener.getEvent(30000);
- if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
- {
- break;
- }
- if (event == null)
- {
- fail("Did not get expected FAILOVER_COMPLETED event");
- }
- }
-
- log.info("Failover completed");
-
- assertEquals(evennewerFailoverNodeId, getServerId(conn0));
-
- recoveryMapSize = ServerManagement.getServer(evennewerFailoverNodeId).getRecoveryMapSize(queue[evennewerFailoverNodeId].getQueueName());
- assertEquals(0, recoveryMapSize);
- recoveryArea = ServerManagement.getServer(evennewerFailoverNodeId).getRecoveryArea(queue[evennewerFailoverNodeId].getQueueName());
- ids = (Map)recoveryArea.get(new Integer(0));
- assertNull(ids);
-
- //Now ack
- if (transactional)
- {
- sess0.commit();
- }
- else
- {
- tm.acknowledge();
- }
-
- log.info("acked");
-
- sess0.close();
-
- log.info("closed");
-
- sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- log.info("created new session");
-
- cons0 = sess0.createConsumer(queue[0]);
-
- log.info("Created consumer");
-
- //Messages should be gone
-
- tm = (TextMessage)cons0.receive(5000);
-
- assertNull(tm);
- }
- finally
- {
- if (conn0 != null)
- {
- conn0.close();
- }
-
- // Since we kill the rmi server in this test, we must kill the other servers too
-
- for (int i = nodeCount - 1; i >= 0; i--)
- {
- ServerManagement.kill(i);
- }
- }
- }
-
-
- private void addNodeToGetNewFailoverNode(boolean transactional) throws Exception
- {
- JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
-
- Connection conn3 = createConnectionOnServer(factory, 3);
-
- try
- {
SimpleFailoverListener failoverListener = new SimpleFailoverListener();
- ((JBossConnection)conn3).registerFailoverListener(failoverListener);
+ ((JBossConnection)conn).registerFailoverListener(failoverListener);
- Session sessSend = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod2 = sessSend.createProducer(queue[2]);
+ MessageProducer prod2 = sessSend.createProducer(queue[nodeID]);
final int numMessages = 10;
+ //Send some messages at this node
+
for (int i = 0; i < numMessages; i++)
{
TextMessage tm = sessSend.createTextMessage("message" + i);
@@ -545,12 +360,12 @@
prod2.send(tm);
}
- Session sess3 = conn3.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+ Session sess3 = conn.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer cons3 = sess3.createConsumer(queue[3]);
-
+ MessageConsumer cons3 = sess3.createConsumer(queue[nodeID]);
+
- conn3.start();
+ conn.start();
TextMessage tm = null;
@@ -565,16 +380,16 @@
//Don't ack
- int failoverNodeId = this.getFailoverNodeForNode(factory, 3);
+ int failoverNodeId = this.getFailoverNodeForNode(cf, nodeID);
- log.info("Failover node for node 3 is " + failoverNodeId);
+ log.info("Failover node for node is " + failoverNodeId);
- dumpFailoverMap(ServerManagement.getServer(3).getFailoverMap());
+ dumpFailoverMap(ServerManagement.getServer(nodeID).getFailoverMap());
- int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
assertEquals(0, recoveryMapSize);
- Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
- Map ids = (Map)recoveryArea.get(new Integer(3));
+ Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
+ Map ids = (Map)recoveryArea.get(new Integer(nodeID));
assertNotNull(ids);
assertEquals(numMessages, ids.size());
@@ -586,33 +401,35 @@
Thread.sleep(5000);
- recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
- assertEquals(0, recoveryMapSize);
- recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
- ids = (Map)recoveryArea.get(new Integer(3));
- assertNull(ids);
-
- dumpFailoverMap(ServerManagement.getServer(3).getFailoverMap());
-
- int newFailoverNodeId = this.getFailoverNodeForNode(factory, 3);
+ dumpFailoverMap(ServerManagement.getServer(nodeID).getFailoverMap());
+ int newFailoverNodeId = this.getFailoverNodeForNode(cf, nodeID);
+
assertTrue(failoverNodeId != newFailoverNodeId);
log.info("New failover node is " + newFailoverNodeId);
-
- recoveryMapSize = ServerManagement.getServer(newFailoverNodeId).getRecoveryMapSize(queue[3].getQueueName());
+
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
assertEquals(0, recoveryMapSize);
- recoveryArea = ServerManagement.getServer(newFailoverNodeId).getRecoveryArea(queue[3].getQueueName());
- ids = (Map)recoveryArea.get(new Integer(3));
+ recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
+ ids = (Map)recoveryArea.get(new Integer(nodeID));
+ assertNull(ids);
+
+ dumpFailoverMap(ServerManagement.getServer(nodeID).getFailoverMap());
+
+ recoveryMapSize = ServerManagement.getServer(newFailoverNodeId).getRecoveryMapSize(queueName);
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(newFailoverNodeId).getRecoveryArea(queueName);
+ ids = (Map)recoveryArea.get(new Integer(nodeID));
assertNotNull(ids);
assertEquals(numMessages, ids.size());
//Now kill the node
- ServerManagement.kill(3);
+ ServerManagement.kill(nodeID);
log.info("########");
- log.info("######## KILLED NODE 3");
+ log.info("######## KILLED NODE");
log.info("########");
// wait for the client-side failover to complete
@@ -621,7 +438,7 @@
while(true)
{
- FailoverEvent event = failoverListener.getEvent(30000);
+ FailoverEvent event = failoverListener.getEvent(120000);
if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
{
break;
@@ -634,12 +451,12 @@
log.info("Failover completed");
- assertEquals(newFailoverNodeId, getServerId(conn3));
+ assertEquals(newFailoverNodeId, getServerId(conn));
- recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[3].getQueueName());
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
assertEquals(0, recoveryMapSize);
- recoveryArea = ServerManagement.getServer(newFailoverNodeId).getRecoveryArea(queue[3].getQueueName());
- ids = (Map)recoveryArea.get(new Integer(3));
+ recoveryArea = ServerManagement.getServer(newFailoverNodeId).getRecoveryArea(queueName);
+ ids = (Map)recoveryArea.get(new Integer(nodeID));
assertNull(ids);
//Now ack
@@ -658,11 +475,11 @@
log.info("closed");
- sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ sess3 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
log.info("created new session");
- cons3 = sess3.createConsumer(queue[3]);
+ cons3 = sess3.createConsumer(queue[nodeID]);
log.info("Created consumer");
@@ -674,17 +491,12 @@
}
finally
{
- if (conn3 != null)
+ if (conn != null)
{
- conn3.close();
+ conn.close();
}
- try
- {
- ServerManagement.stop(4);
- }
- catch (Exception e)
- {}
+ ServerManagement.kill(4);
}
}
@@ -706,6 +518,9 @@
log.info("*** end dump ***");
}
+
+ private static int counter;
+
private void killFailoverNode(boolean transactional) throws Exception
{
JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
@@ -727,7 +542,11 @@
{
TextMessage tm = sessSend.createTextMessage("message" + i);
- prod1.send(tm);
+ tm.setIntProperty("counter", counter++);
+
+ prod1.send(tm);
+
+ log.info("Sent " + tm.getJMSMessageID());
}
Session sess1 = conn1.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
@@ -753,6 +572,14 @@
//We kill the failover node for node 1
int failoverNodeId = this.getFailoverNodeForNode(factory, 1);
+
+ int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
+ assertEquals(0, recoveryMapSize);
+ Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
+ Map ids = (Map)recoveryArea.get(new Integer(1));
+ assertNotNull(ids);
+ assertEquals(numMessages, ids.size());
+
log.info("Killing failover node:" + failoverNodeId);
ServerManagement.kill(failoverNodeId);
@@ -765,6 +592,13 @@
failoverNodeId = this.getFailoverNodeForNode(factory, 1);
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
+ ids = (Map)recoveryArea.get(new Integer(1));
+ assertNotNull(ids);
+ assertEquals(numMessages, ids.size());
+
log.info("Failover node id is now " + failoverNodeId);
ServerManagement.kill(1);
@@ -793,6 +627,12 @@
log.info("Failover completed");
assertEquals(failoverNodeId, getServerId(conn1));
+
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
+ ids = (Map)recoveryArea.get(new Integer(1));
+ assertNull(ids);
//Now ack
if (transactional)
@@ -855,7 +695,10 @@
{
TextMessage tm = sessSend.createTextMessage("message" + i);
- prod1.send(tm);
+ prod1.send(tm);
+
+ log.info("Sent " + tm.getJMSMessageID());
+
}
Session sess1 = conn1.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
@@ -881,24 +724,28 @@
//We stop the failover node for node 1
int failoverNodeId = this.getFailoverNodeForNode(factory, 1);
- int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
assertEquals(0, recoveryMapSize);
- Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+ Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
Map ids = (Map)recoveryArea.get(new Integer(1));
assertNotNull(ids);
assertEquals(numMessages, ids.size());
- log.info("Killing failover node:" + failoverNodeId);
+ log.info("Stopping failover node:" + failoverNodeId);
ServerManagement.stop(failoverNodeId);
- log.info("Killed failover node");
+ log.info("Stopped failover node");
+ Thread.sleep(5000);
+
int newfailoverNode = this.getFailoverNodeForNode(factory, 1);
- recoveryMapSize = ServerManagement.getServer(newfailoverNode).getRecoveryMapSize(queue[newfailoverNode].getQueueName());
+ log.info("New failover node is " + newfailoverNode);
+
+ recoveryMapSize = ServerManagement.getServer(newfailoverNode).getRecoveryMapSize(queueName);
assertEquals(0, recoveryMapSize);
- recoveryArea = ServerManagement.getServer(newfailoverNode).getRecoveryArea(queue[newfailoverNode].getQueueName());
+ recoveryArea = ServerManagement.getServer(newfailoverNode).getRecoveryArea(queueName);
ids = (Map)recoveryArea.get(new Integer(1));
assertNotNull(ids);
assertEquals(numMessages, ids.size());
@@ -938,12 +785,25 @@
assertEquals(newfailoverNode, getServerId(conn1));
- recoveryMapSize = ServerManagement.getServer(newfailoverNode).getRecoveryMapSize(queue[newfailoverNode].getQueueName());
+ recoveryMapSize = ServerManagement.getServer(newfailoverNode).getRecoveryMapSize(queueName);
assertEquals(0, recoveryMapSize);
- recoveryArea = ServerManagement.getServer(newfailoverNode).getRecoveryArea(queue[newfailoverNode].getQueueName());
+ recoveryArea = ServerManagement.getServer(newfailoverNode).getRecoveryArea(queueName);
ids = (Map)recoveryArea.get(new Integer(1));
+
+ log.info("Final failover");
+
+ if (ids != null)
+ {
+ Iterator iter = ids.entrySet().iterator();
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ log.info(entry.getKey() + "--->" + entry.getValue());
+ }
+ }
+
assertNull(ids);
-
//Now ack
if (transactional)
@@ -983,9 +843,7 @@
}
}
}
-
-
-
+
// Inner classes -------------------------------------------------
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -23,14 +23,13 @@
package org.jboss.test.messaging.jms.clustering;
import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.ObjectName;
-import javax.naming.InitialContext;
import org.jboss.test.messaging.tools.ServerManagement;
@@ -46,7 +45,7 @@
* $Id: $
*
*/
-public class ClusterConnectionManagerTest extends ClusteringTestBase
+public class ClusterConnectionManagerTest extends NewClusteringTestBase
{
// Constants ------------------------------------------------------------------------------------
@@ -285,16 +284,17 @@
protected void setUp() throws Exception
{
- nodeCount = 2;
-
super.setUp();
- log.debug("setup done");
-
- //undeploy CF
-
- undeployAll();
+ undeployAll();
}
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ undeployAll();
+ }
private void undeployAll() throws Exception
{
@@ -325,13 +325,6 @@
{}
}
- protected void tearDown() throws Exception
- {
- undeployAll();
-
- super.tearDown();
- }
-
// Private --------------------------------------------------------------------------------------
private void deployCFRemote() throws Exception
@@ -354,37 +347,29 @@
private void deployLocal() throws Exception
{
- ServerManagement.deployQueue("suckQueue", 1);
+ ServerManagement.deployQueue("suckQueue", 0);
}
private void deployRemote() throws Exception
{
- ServerManagement.deployQueue("suckQueue", 0);
+ ServerManagement.deployQueue("suckQueue", 1);
}
private void suck() throws Exception
- {
- InitialContext ic0 = new InitialContext(ServerManagement.getJNDIEnvironment(0));
+ {
+ Queue queue0 = (Queue)ic[0].lookup("/queue/suckQueue");
- Queue queue0 = (Queue)ic0.lookup("/queue/suckQueue");
+ Queue queue1 = (Queue)ic[1].lookup("/queue/suckQueue");
- InitialContext ic1 = new InitialContext(ServerManagement.getJNDIEnvironment(1));
-
- Queue queue1 = (Queue)ic1.lookup("/queue/suckQueue");
-
- ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
Connection conn0 = null;
Connection conn1 = null;
try
{
- conn0 = this.createConnectionOnServer(cf0, 0);
+ conn0 = this.createConnectionOnServer(cf, 0);
- assertEquals(0, this.getServerId(conn0));
+ assertEquals(0, getServerId(conn0));
//Send some messages on node 0
@@ -394,6 +379,13 @@
MessageProducer prod = sess0.createProducer(queue0);
+ //Note! The message must be sent as non persistent for this test
+ //Since we have not deployed suckQueue on all nodes of the cluster
+ //this would cause persistent messages to not be delivered since they would
+ //fail to replicate to their backup (since suckQueue is not deployed on it)
+
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = sess0.createTextMessage("message" + i);
@@ -403,9 +395,9 @@
//Consume them on node 1
- conn1 = this.createConnectionOnServer(cf1, 1);
+ conn1 = this.createConnectionOnServer(cf, 1);
- assertEquals(1, this.getServerId(conn1));
+ assertEquals(1, getServerId(conn1));
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterViewUpdateTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterViewUpdateTest.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterViewUpdateTest.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -38,7 +38,7 @@
*
* $Id$
*/
-public class ClusterViewUpdateTest extends ClusteringTestBase
+public class ClusterViewUpdateTest extends NewClusteringTestBase
{
// Constants ------------------------------------------------------------------------------------
@@ -56,7 +56,7 @@
// Public ---------------------------------------------------------------------------------------
- public void testUpdateConnectionFactory() throws Exception
+ public void testUpdateConnectionFactoryOnKill() throws Exception
{
Connection conn = createConnectionOnServer(cf, 0);
@@ -65,8 +65,54 @@
ClientClusteredConnectionFactoryDelegate cfDelegate =
(ClientClusteredConnectionFactoryDelegate)jbcf.getDelegate();
+ assertEquals(4, cfDelegate.getDelegates().length);
+
+ Connection conn1 = cf.createConnection();
+
+ assertEquals(1, getServerId(conn1));
+
+ log.info("*** killing server");
+ ServerManagement.kill(1);
+ log.info("killed server");
+
+ log.info("sleeping 5 secs ...");
+ Thread.sleep(5000);
+
+ // first part of the test, verifies if the CF was updated
+ assertEquals(2, cfDelegate.getDelegates().length);
+ conn.close();
+
+ log.info("sleeping 5 secs ...");
+ Thread.sleep(5000);
+
+ // Second part, verifies a possible race condition on failoverMap and handleFilover
+
+ log.info("ServerId=" + getServerId(conn1));
+ assertTrue(1 != getServerId(conn1));
+
+ conn1.close();
+
+ //restart
+ ServerManagement.start(1, "all");
+
+ Thread.sleep(5000);
+
assertEquals(3, cfDelegate.getDelegates().length);
+
+ log.info("Done!!");
+ }
+
+ public void testUpdateConnectionFactoryOnStop() throws Exception
+ {
+ Connection conn = createConnectionOnServer(cf, 0);
+ JBossConnectionFactory jbcf = (JBossConnectionFactory)cf;
+
+ ClientClusteredConnectionFactoryDelegate cfDelegate =
+ (ClientClusteredConnectionFactoryDelegate)jbcf.getDelegate();
+
+ assertEquals(3, cfDelegate.getDelegates().length);
+
Connection conn1 = cf.createConnection();
assertEquals(1, getServerId(conn1));
@@ -89,10 +135,16 @@
log.info("ServerId=" + getServerId(conn1));
assertTrue(1 != getServerId(conn1));
-
- //Session sess = conn1.createSession(true, Session.SESSION_TRANSACTED);
+
conn1.close();
+ //restart
+ ServerManagement.start(1, "all");
+
+ Thread.sleep(5000);
+
+ assertEquals(3, cfDelegate.getDelegates().length);
+
log.info("Done!!");
}
@@ -199,19 +251,6 @@
// Protected ------------------------------------------------------------------------------------
- protected void setUp() throws Exception
- {
- config = "all+http";
- nodeCount = 3;
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- config="all";
- }
-
// Private --------------------------------------------------------------------------------------
// Validate if two distinct CFs are valid
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -22,6 +22,9 @@
package org.jboss.test.messaging.jms.clustering;
+import java.util.HashSet;
+import java.util.Set;
+
import javax.jms.Connection;
import org.jboss.jms.client.JBossConnectionFactory;
@@ -34,7 +37,7 @@
* @version <tt>$Revision$</tt>
* $Id$
*/
-public class ClusteredConnectionFactoryTest extends ClusteringTestBase
+public class ClusteredConnectionFactoryTest extends NewClusteringTestBase
{
// Constants ------------------------------------------------------------------------------------
@@ -53,44 +56,35 @@
// Public ---------------------------------------------------------------------------------------
public void testGetAOPBroken() throws Exception
- {
+ {
+ ServerManagement.kill(2);
+ ServerManagement.kill(1);
+ ServerManagement.kill(0);
+
try
{
- ServerManagement.kill(0);
- ServerManagement.kill(1);
- ServerManagement.kill(2);
-
- try
- {
- assertNotNull(((JBossConnectionFactory)cf).getDelegate().getClientAOPStack());
- fail ("This should try an exception as every server is down");
- }
- catch (MessagingNetworkFailureException e)
- {
- log.trace(e.toString(), e);
- }
+ assertNotNull(((JBossConnectionFactory)cf).getDelegate().getClientAOPStack());
+ fail("This should throw an exception as every server is down");
}
- finally
+ catch (MessagingNetworkFailureException e)
{
- // need to re-start 0, it's the RMI server the other servers use
- ServerManagement.start(0, "all", true);
+ log.trace(e.toString(), e);
}
}
-
+
public void testLoadAOP() throws Exception
{
-
Connection conn = null;
try
{
- ServerManagement.kill(0);
+ ServerManagement.kill(2);
ServerManagement.kill(1);
-
+
assertNotNull(((JBossConnectionFactory)cf).getDelegate().getClientAOPStack());
conn = cf.createConnection();
- assertEquals(2, getServerId(conn));
+ assertEquals(0, getServerId(conn));
}
finally
{
@@ -104,13 +98,9 @@
{
}
}
-
- ServerManagement.kill(2);
- // need to re-start 0, it's the RMI server the other servers use
- ServerManagement.start(0, "all", true);
}
}
-
+
public void testCreateConnectionOnBrokenServer() throws Exception
{
Connection conn = null;
@@ -120,11 +110,23 @@
conn = createConnectionOnServer(cf, 0);
conn.close();
conn = null;
-
+
ServerManagement.kill(1);
+
conn = cf.createConnection();
+
+ Set ids = new HashSet();
+
+ ids.add(new Integer(getServerId(conn)));
+
+ conn.close();
+
+ conn = cf.createConnection();
- assertEquals(2, getServerId(conn));
+ ids.add(new Integer(getServerId(conn)));
+
+ assertTrue(ids.contains(new Integer(0)));
+ assertTrue(ids.contains(new Integer(2)));
}
finally
{
@@ -141,18 +143,27 @@
try
{
- // Poison each server with a different pointcut crash
- ServerManagement.poisonTheServer(1, PoisonInterceptor.CF_CREATE_CONNECTION);
-
conn = createConnectionOnServer(cf, 0);
conn.close();
- conn = null;
+
+ // Poison the server
+ ServerManagement.poisonTheServer(1, PoisonInterceptor.CF_CREATE_CONNECTION);
// this should break on server1
- log.info("creating connection on server 1");
conn = cf.createConnection();
- assertEquals(2, getServerId(conn));
+ Set ids = new HashSet();
+
+ ids.add(new Integer(getServerId(conn)));
+
+ conn.close();
+
+ conn = cf.createConnection();
+
+ ids.add(new Integer(getServerId(conn)));
+
+ assertTrue(ids.contains(new Integer(0)));
+ assertTrue(ids.contains(new Integer(2)));
}
finally
{
@@ -177,27 +188,9 @@
conn = cf.createConnection();
assertEquals(0, getServerId(conn));
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
- public void testFailureOnGetBlockId() throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = createConnectionOnServer(cf, 0);
+
conn.close();
-
- ServerManagement.kill(1);
- ServerManagement.kill(2);
+
conn = cf.createConnection();
assertEquals(0, getServerId(conn));
@@ -211,23 +204,11 @@
}
}
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
- protected void setUp() throws Exception
- {
- nodeCount = 3;
- super.setUp();
-
- log.debug("setup done");
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
// Private --------------------------------------------------------------------------------------
// Inner classes --------------------------------------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -690,10 +690,7 @@
conn.close();
}
- drainDestination(cf, queue);
-
- ServerManagement.undeployQueue("QueueListMessages");
-
+ ServerManagement.undeployQueue("QueueListMessages");
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -247,13 +247,24 @@
return stopped;
}
}
-
-
+
public static synchronized void kill(int i) throws Exception
{
log.info("Attempting to kill server " + i);
ServerHolder holder = servers[i];
+
+ if (i == 0)
+ {
+ //Cannot kill server 0 if there are any other servers since it has the rmi registry in it
+ for (int j = 1; j < servers.length; j++)
+ {
+ if (servers[j] != null)
+ {
+ throw new IllegalStateException("Cannot kill server 0, since server[" + j + "] still exists");
+ }
+ }
+ }
if (holder == null)
{
Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -896,6 +896,11 @@
{
sc.flushManagedConnectionPool();
}
+
+ public void resetAllSuckers() throws Exception
+ {
+ getServerPeer().resetAllSuckers();
+ }
// Public ---------------------------------------------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -102,7 +102,6 @@
registry = LocateRegistry.getRegistry(DEFAULT_REGISTRY_PORT);
registry.bind(RMI_SERVER_PREFIX + serverIndex, testServer);
registry.bind(NAMING_SERVER_PREFIX + serverIndex, testServer.getNamingDelegate());
-
}
catch(Exception e)
{
@@ -115,31 +114,13 @@
registry.bind(NAMING_SERVER_PREFIX + serverIndex, testServer.getNamingDelegate());
}
- log.info("RMI server " + serverIndex + " bound");
-
- // there is one crash test that needs to start the server, as an external VM...
- // as one client will crash while another will be alive
- boolean startAll=false;
-
- for (int i=0;i<args.length;i++)
- {
- if (args[i].equals("-startAll"))
- {
- startAll=true;
- }
- }
-
- if (startAll)
- {
- testServer.start("all", true);
- }
+ log.info("RMI server " + serverIndex + " bound");
}
// Attributes ----------------------------------------------------
protected RemoteTestServer server;
private RMINamingDelegate namingDelegate;
- // Map<Long-ProxyNotificationListener>
private Map proxyListeners;
// Constructors --------------------------------------------------
@@ -178,17 +159,24 @@
public synchronized void kill() throws Exception
{
- //We deregister in another thread, them pause, then kill the VM
- //This ensures if the deregister hangs (which can happen if the RMI registry is dead) then it doesn't prevent
- //the kill
- //We always kill on this thread to ensure the kill completes in a timely manner which may not occur if it occurs
- //on its own thread due to thread scheduling differences
- new Thread(new Deregisterer(), "Deregisterer").start();
-
+ log.info("kill() invoked - first deregistering from the rmi registry");
+
+ // unregister myself from the RMI registry
+
+ Registry registry = LocateRegistry.getRegistry(DEFAULT_REGISTRY_PORT);
+
+ String name = RMI_SERVER_PREFIX + server.getServerID();
+ registry.unbind(name);
+ log.info("unregistered " + name + " from registry");
+
+ // unregister myself from the RMI registry
+
+ name = NAMING_SERVER_PREFIX + server.getServerID();
+ registry.unbind(name);
+ log.info("unregistered " + name + " from registry");
+
log.info("Killing VM!!!!");
- Thread.sleep(250);
-
Runtime.getRuntime().halt(1);
}
@@ -508,6 +496,11 @@
{
server.flushManagedConnectionPool();
}
+
+ public void resetAllSuckers() throws Exception
+ {
+ server.resetAllSuckers();
+ }
// Public --------------------------------------------------------
@@ -523,42 +516,4 @@
}
// Inner classes -------------------------------------------------
-
- public class Deregisterer implements Runnable
- {
- public void run()
- {
- log.info("Deregistering from RMI");
-
- try
- {
- // unregister myself from the RMI registry
-
- Registry registry = LocateRegistry.getRegistry(DEFAULT_REGISTRY_PORT);
-
- String name = RMI_SERVER_PREFIX + server.getServerID();
- registry.unbind(name);
- log.info("unregistered " + name + " from registry");
- }
- catch (Throwable t)
- {
- log.error("Failed to unregister", t);
- }
-
- try
- {
- // unregister myself from the RMI registry
-
- Registry registry = LocateRegistry.getRegistry(DEFAULT_REGISTRY_PORT);
-
- String name = NAMING_SERVER_PREFIX + server.getServerID();
- registry.unbind(name);
- log.info("unregistered " + name + " from registry");
- }
- catch (Throwable t)
- {
- log.error("Failed to unregister", t);
- }
- }
- }
}
Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/Server.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/Server.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/Server.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -288,4 +288,6 @@
void flushManagedConnectionPool() throws Exception;
+ void resetAllSuckers() throws Exception;
+
}
Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -1681,9 +1681,7 @@
connFactoryObjectNames.add(on);
}
}
-
-
-
+
private void parseConfig(String config)
{
config = config.toLowerCase();
Modified: trunk/tests/src/org/jboss/test/thirdparty/remoting/InvokerReferenceCountTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/thirdparty/remoting/InvokerReferenceCountTest.java 2007-08-02 06:27:36 UTC (rev 2946)
+++ trunk/tests/src/org/jboss/test/thirdparty/remoting/InvokerReferenceCountTest.java 2007-08-03 09:51:23 UTC (rev 2947)
@@ -59,10 +59,7 @@
Queue queue = (Queue)ic.lookup("/queue/testQueue");
- this.drainDestination(cf, queue);
-
- ic.close();
-
+ ic.close();
}
protected void tearDown() throws Exception
More information about the jboss-cvs-commits
mailing list