[jboss-cvs] JBoss Messaging SVN: r2862 - in trunk: src/main/org/jboss/jms/server/endpoint and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jul 9 17:57:24 EDT 2007
Author: timfox
Date: 2007-07-09 17:57:24 -0400 (Mon, 09 Jul 2007)
New Revision: 2862
Modified:
trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java
trunk/src/main/org/jboss/messaging/core/contract/Queue.java
trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/ChangeFailoverNodeTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/RecoverDeliveriesTest.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
Log:
more stuff
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2007-07-09 21:57:24 UTC (rev 2862)
@@ -170,6 +170,12 @@
return supportsFailover;
}
+ //Only used in testing
+ public void setSupportsFailover(boolean failover)
+ {
+ this.supportsFailover = failover;
+ }
+
/** Method used to update the delegate and failoverMap during viewChange */
public synchronized void updateFailoverInfo(ClientConnectionFactoryDelegate[] delegates,
Map failoverMap)
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-07-09 21:57:24 UTC (rev 2862)
@@ -36,7 +36,6 @@
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
-import javax.jms.TextMessage;
import org.jboss.aop.AspectManager;
import org.jboss.jms.client.delegate.ClientBrowserDelegate;
Modified: trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java 2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java 2007-07-09 21:57:24 UTC (rev 2862)
@@ -149,5 +149,11 @@
void sendReplicateAckMessage(String queueName, long messageID) throws Exception;
boolean isFirstNode();
+
+
+ //For testing only
+ Map getRecoveryArea(String queueName);
+
+ int getRecoveryMapSize(String queueName);
}
Modified: trunk/src/main/org/jboss/messaging/core/contract/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/Queue.java 2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/src/main/org/jboss/messaging/core/contract/Queue.java 2007-07-09 21:57:24 UTC (rev 2862)
@@ -86,4 +86,9 @@
List recoverDeliveries(List messageIds);
void removeStrandedReferences(String sessionID);
+
+ //For testing only
+ Map getRecoveryArea();
+
+ int getRecoveryMapSize();
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2007-07-09 21:57:24 UTC (rev 2862)
@@ -475,6 +475,8 @@
ids = new ConcurrentHashMap(ids);
}
+ if (trace) { log.trace("Adding " + ids.size() + " ids to recovery area for node " + nodeID); }
+
recoveryArea.put(nid, ids);
if (trace) { log.trace("Added"); }
@@ -485,6 +487,18 @@
return recoverDeliveriesTimeout;
}
+ //testing only
+
+ public Map getRecoveryArea()
+ {
+ return this.recoveryArea;
+ }
+
+ public int getRecoveryMapSize()
+ {
+ return this.recoveryMap.size();
+ }
+
// ChannelSupport overrides --------------------------------------
protected void deliverInternal()
@@ -642,9 +656,10 @@
}
}
- private class RecoveryEntry
+ static class RecoveryEntry
{
String sessionID;
+
MessageReference ref;
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-09 21:57:24 UTC (rev 2862)
@@ -64,6 +64,7 @@
import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.core.contract.MessageReference;
import org.jboss.messaging.core.contract.MessageStore;
+import org.jboss.messaging.core.contract.MessagingComponent;
import org.jboss.messaging.core.contract.PersistenceManager;
import org.jboss.messaging.core.contract.PostOffice;
import org.jboss.messaging.core.contract.Queue;
@@ -306,6 +307,11 @@
}
// MessagingComponent overrides -----------------------------------------------------------------
+
+ public MessagingComponent getInstance()
+ {
+ return this;
+ }
public void start() throws Exception
{
@@ -570,6 +576,7 @@
}
}
+
public Collection getAllBindingsForQueueName(String queueName) throws Exception
{
return getBindings(queueName);
@@ -641,7 +648,40 @@
{
return firstNode;
}
+
+
+ // Testing only
+ public Map getRecoveryArea(String queueName)
+ {
+ Binding binding = (Binding)localNameMap.get(queueName);
+
+ if (binding != null)
+ {
+ return binding.queue.getRecoveryArea();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public int getRecoveryMapSize(String queueName)
+ {
+ Binding binding = (Binding)localNameMap.get(queueName);
+
+ if (binding != null)
+ {
+ return binding.queue.getRecoveryMapSize();
+ }
+ else
+ {
+ return 0;
+ }
+ }
+
+ //End testing only
+
// GroupListener implementation -------------------------------------------------------------
public void setState(byte[] bytes) throws Exception
@@ -1255,9 +1295,17 @@
try
{
+ log.info("local name map is " + localNameMap);
+
+ if (localNameMap == null)
+ {
+ throw new IllegalStateException("Cannot add all replicated deliveries since there are no bindings - probably the queues aren't deployed");
+ }
+
if (localNameMap != null)
{
Iterator iter = deliveries.entrySet().iterator();
+ log.info("deliveries is " + deliveries);
while (iter.hasNext())
{
@@ -1267,14 +1315,18 @@
Map ids = (Map)entry.getValue();
+ log.info("queue;" + queueName + " ids: " + ids.size());
+
Binding binding = (Binding)localNameMap.get(queueName);
if (binding == null)
{
- throw new IllegalStateException("Cannot find binding with name " + queueName);
+ throw new IllegalStateException("Cannot find binding with name " + queueName + " maybe it hasn't been deployed");
}
+ log.info("adding");
binding.queue.addAllToRecoveryArea(nodeID, ids);
+ log.info("added");
}
}
}
@@ -2615,7 +2667,7 @@
{
if (trace) { log.trace("Old failover node still exists, telling it remove replicated deliveries"); }
- ClusterRequest request = new AckAllReplicatedDeliveriesMessage(oldFailoverNodeID);
+ ClusterRequest request = new AckAllReplicatedDeliveriesMessage(thisNodeID);
groupMember.unicastControl(request, info.getControlChannelAddress(), true);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ChangeFailoverNodeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ChangeFailoverNodeTest.java 2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ChangeFailoverNodeTest.java 2007-07-09 21:57:24 UTC (rev 2862)
@@ -22,6 +22,9 @@
package org.jboss.test.messaging.jms.clustering;
+import java.util.Iterator;
+import java.util.Map;
+
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -52,9 +55,8 @@
// Attributes ----------------------------------------------------
// Constructors --------------------------------------------------
-
+
-
public ChangeFailoverNodeTest(String name)
{
super(name);
@@ -62,18 +64,16 @@
// Public --------------------------------------------------------
-// public void testKillFailoverNodeTransactional() throws Exception
-// {
-// this.killFailoverNode(true);
-// }
-//
-// public void testKillFailoverNodeNonTransactional() throws Exception
-// {
-// this.killFailoverNode(false);
-// }
+ public void testKillFailoverNodeTransactional() throws Exception
+ {
+ this.killFailoverNode(true);
+ }
+ public void testKillFailoverNodeNonTransactional() throws Exception
+ {
+ this.killFailoverNode(false);
+ }
-
public void testStopFailoverNodeTransactional() throws Exception
{
this.stopFailoverNode(true);
@@ -84,6 +84,30 @@
this.stopFailoverNode(false);
}
+ public void testAddNodeToGetNewFailoverNodeNonTransactional() throws Exception
+ {
+ this.addNodeToGetNewFailoverNode(false);
+ }
+
+ public void testkillTwoFailoverNodesNonTransactional() throws Exception
+ {
+ this.killTwoFailoverNodes(false);
+ }
+
+ public void testkillTwoFailoverNodesTransactional() throws Exception
+ {
+ this.killTwoFailoverNodes(true);
+ }
+
+ public void testKillAllTooOneAndBackAgainNonTransactional() throws Exception
+ {
+ this.killAllTooOneAndBackAgain(false);
+ }
+
+ public void testKillAllTooOneAndBackAgainTransactional() throws Exception
+ {
+ this.killAllTooOneAndBackAgain(true);
+ }
// Package protected ---------------------------------------------
@@ -106,6 +130,653 @@
// Private -------------------------------------------------------
+ private void killAllTooOneAndBackAgain(boolean transactional) throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
+
+ Connection conn0 = createConnectionOnServer(factory, 0);
+
+ try
+ {
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)conn0).registerFailoverListener(failoverListener);
+
+ Session sessSend = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod0 = sessSend.createProducer(queue[0]);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+
+ prod0.send(tm);
+ }
+
+ Session sess0 = conn0.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons0 = sess0.createConsumer(queue[0]);
+
+
+ conn0.start();
+
+ TextMessage tm = null;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ tm = (TextMessage)cons0.receive(2000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ //Don't ack
+
+ int failoverNodeId = this.getFailoverNodeForNode(factory, 0);
+
+ log.info("Failover node for node 0 is " + failoverNodeId);
+
+ //Now kill the failover node
+
+ log.info("killing node " + failoverNodeId);
+ ServerManagement.kill(failoverNodeId);
+
+ Thread.sleep(5000);
+
+ int newFailoverNodeId = this.getFailoverNodeForNode(factory, 0);
+
+ //Now kill the second failover node
+
+ log.info("killing node " + newFailoverNodeId);
+ ServerManagement.kill(newFailoverNodeId);
+
+ Thread.sleep(5000);
+
+ int evennewerFailoverNodeId = this.getFailoverNodeForNode(factory, 0);
+
+ //Now kill the third failover node
+
+ log.info("killing node " + evennewerFailoverNodeId);
+ ServerManagement.kill(evennewerFailoverNodeId);
+
+ //This just leaves the current node
+
+ //Add a node
+
+ ServerManagement.start(1, "all", false);
+
+ log.info("started node 1");
+
+ //Now kill the node itself
+
+ ServerManagement.kill(0);
+
+ log.info("########");
+ log.info("######## KILLED NODE 0");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ log.info("Waiting for failover to complete");
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ log.info("Failover completed");
+
+ assertEquals(1, getServerId(conn0));
+
+ //Now ack
+ if (transactional)
+ {
+ sess0.commit();
+ }
+ else
+ {
+ tm.acknowledge();
+ }
+
+ log.info("acked");
+
+ sess0.close();
+
+ log.info("closed");
+
+ sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ log.info("created new session");
+
+ cons0 = sess0.createConsumer(queue[0]);
+
+ log.info("Created consumer");
+
+ //Messages should be gone
+
+ tm = (TextMessage)cons0.receive(5000);
+
+ assertNull(tm);
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+ }
+ }
+
+
+ private void killTwoFailoverNodes(boolean transactional) throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
+
+ Connection conn0 = createConnectionOnServer(factory, 0);
+
+ try
+ {
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)conn0).registerFailoverListener(failoverListener);
+
+ Session sessSend = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod0 = sessSend.createProducer(queue[0]);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+
+ prod0.send(tm);
+ }
+
+ Session sess0 = conn0.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons0 = sess0.createConsumer(queue[0]);
+
+
+ conn0.start();
+
+ TextMessage tm = null;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ tm = (TextMessage)cons0.receive(2000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ //Don't ack
+
+ int failoverNodeId = this.getFailoverNodeForNode(factory, 0);
+
+ log.info("Failover node for node 0 is " + failoverNodeId);
+
+ dumpFailoverMap(ServerManagement.getServer(0).getFailoverMap());
+
+ int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ assertEquals(0, recoveryMapSize);
+ Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+ Map ids = (Map)recoveryArea.get(new Integer(0));
+ assertNotNull(ids);
+ assertEquals(numMessages, ids.size());
+
+ //Now kill the failover node
+
+ log.info("killing node " + failoverNodeId);
+ ServerManagement.kill(failoverNodeId);
+
+ Thread.sleep(5000);
+
+ int newFailoverNodeId = this.getFailoverNodeForNode(factory, 0);
+
+ log.info("New Failover node for node 0 is " + newFailoverNodeId);
+
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(newFailoverNodeId).getRecoveryArea(queue[newFailoverNodeId].getQueueName());
+ ids = (Map)recoveryArea.get(new Integer(0));
+ assertNotNull(ids);
+ assertEquals(numMessages, ids.size());
+
+ //Now kill the second failover node
+
+ log.info("killing node " + newFailoverNodeId);
+ ServerManagement.kill(newFailoverNodeId);
+
+ Thread.sleep(5000);
+
+ int evennewerFailoverNodeId = this.getFailoverNodeForNode(factory, 0);
+
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(evennewerFailoverNodeId).getRecoveryArea(queue[evennewerFailoverNodeId].getQueueName());
+ ids = (Map)recoveryArea.get(new Integer(0));
+ assertNotNull(ids);
+ assertEquals(numMessages, ids.size());
+
+ log.info("New Failover node for node 0 is " + evennewerFailoverNodeId);
+
+ //Now kill the node itself
+
+ ServerManagement.kill(0);
+
+ log.info("########");
+ log.info("######## KILLED NODE 0");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ log.info("Waiting for failover to complete");
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ log.info("Failover completed");
+
+ assertEquals(evennewerFailoverNodeId, getServerId(conn0));
+
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(evennewerFailoverNodeId).getRecoveryArea(queue[evennewerFailoverNodeId].getQueueName());
+ ids = (Map)recoveryArea.get(new Integer(3));
+ assertNull(ids);
+
+
+
+ //Now ack
+ if (transactional)
+ {
+ sess0.commit();
+ }
+ else
+ {
+ tm.acknowledge();
+ }
+
+ log.info("acked");
+
+ sess0.close();
+
+ log.info("closed");
+
+ sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ log.info("created new session");
+
+ cons0 = sess0.createConsumer(queue[0]);
+
+ log.info("Created consumer");
+
+ //Messages should be gone
+
+ tm = (TextMessage)cons0.receive(5000);
+
+ assertNull(tm);
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+ }
+ }
+
+
+ private void addNodeToGetNewFailoverNode(boolean transactional) throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
+
+ Connection conn3 = createConnectionOnServer(factory, 3);
+
+ try
+ {
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)conn3).registerFailoverListener(failoverListener);
+
+ Session sessSend = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod2 = sessSend.createProducer(queue[2]);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+
+ prod2.send(tm);
+ }
+
+ Session sess3 = conn3.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons3 = sess3.createConsumer(queue[3]);
+
+
+ conn3.start();
+
+ TextMessage tm = null;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ tm = (TextMessage)cons3.receive(2000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ //Don't ack
+
+ int failoverNodeId = this.getFailoverNodeForNode(factory, 3);
+
+ log.info("Failover node for node 3 is " + failoverNodeId);
+
+ dumpFailoverMap(ServerManagement.getServer(3).getFailoverMap());
+
+ int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ assertEquals(0, recoveryMapSize);
+ Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+ Map ids = (Map)recoveryArea.get(new Integer(3));
+ assertNotNull(ids);
+ assertEquals(numMessages, ids.size());
+
+
+
+ //We now add a new node - this should cause the failover node to change
+
+ ServerManagement.start(4, "all", false);
+
+ ServerManagement.deployQueue("testDistributedQueue", 4);
+
+ Thread.sleep(5000);
+
+
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+ ids = (Map)recoveryArea.get(new Integer(3));
+ assertNull(ids);
+
+
+
+ dumpFailoverMap(ServerManagement.getServer(3).getFailoverMap());
+
+ int newFailoverNodeId = this.getFailoverNodeForNode(factory, 3);
+
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(newFailoverNodeId).getRecoveryArea(queue[3].getQueueName());
+ ids = (Map)recoveryArea.get(new Integer(3));
+ assertNotNull(ids);
+ assertEquals(numMessages, ids.size());
+
+
+ log.info("New failover node is " + newFailoverNodeId);
+
+ assertTrue(failoverNodeId != newFailoverNodeId);
+
+ //Now kill the node
+
+ ServerManagement.kill(3);
+
+ log.info("########");
+ log.info("######## KILLED NODE 3");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ log.info("Waiting for failover to complete");
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ log.info("Failover completed");
+
+ assertEquals(newFailoverNodeId, getServerId(conn3));
+
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(newFailoverNodeId).getRecoveryArea(queue[3].getQueueName());
+ ids = (Map)recoveryArea.get(new Integer(3));
+ assertNull(ids);
+
+
+
+ //Now ack
+ if (transactional)
+ {
+ sess3.commit();
+ }
+ else
+ {
+ tm.acknowledge();
+ }
+
+ log.info("acked");
+
+ sess3.close();
+
+ log.info("closed");
+
+ sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ log.info("created new session");
+
+ cons3 = sess3.createConsumer(queue[3]);
+
+ log.info("Created consumer");
+
+ //Messages should be gone
+
+ tm = (TextMessage)cons3.receive(5000);
+
+ assertNull(tm);
+ }
+ finally
+ {
+ if (conn3 != null)
+ {
+ conn3.close();
+ }
+
+ try
+ {
+ ServerManagement.stop(4);
+ }
+ catch (Exception e)
+ {}
+ }
+ }
+
+ public void testFailoverToNodeWithNoQueueDeployed() throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
+
+ Connection conn3 = createConnectionOnServer(factory, 3);
+
+ try
+ {
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)conn3).registerFailoverListener(failoverListener);
+
+ Session sessSend = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod2 = sessSend.createProducer(queue[2]);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+
+ prod2.send(tm);
+ }
+
+ Session sess3 = conn3.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons3 = sess3.createConsumer(queue[3]);
+
+
+ conn3.start();
+
+ TextMessage tm = null;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ tm = (TextMessage)cons3.receive(2000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ //Don't ack
+
+ int failoverNodeId = this.getFailoverNodeForNode(factory, 3);
+
+ log.info("Failover node for node 3 is " + failoverNodeId);
+
+ dumpFailoverMap(ServerManagement.getServer(3).getFailoverMap());
+
+ //We now add a new node - this should cause the failover node to change
+
+ ServerManagement.start(4, "all", false);
+
+ //DO NOT deploy the queue on it
+
+ Thread.sleep(5000);
+
+ dumpFailoverMap(ServerManagement.getServer(3).getFailoverMap());
+
+ int newFailoverNodeId = this.getFailoverNodeForNode(factory, 3);
+
+ log.info("New failover node is " + newFailoverNodeId);
+
+ assertTrue(failoverNodeId != newFailoverNodeId);
+
+ //Now kill the node
+
+ // The queue does not exist on the new node so it tests the case where queue merging DOES NOT occur
+
+ ServerManagement.kill(3);
+
+ log.info("########");
+ log.info("######## KILLED NODE 3");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ log.info("Waiting for failover to complete");
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ log.info("Failover completed");
+
+ assertEquals(newFailoverNodeId, getServerId(conn3));
+
+ //Now ack
+
+ tm.acknowledge();
+
+
+ log.info("acked");
+
+ sess3.close();
+
+ log.info("closed");
+
+ sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ log.info("created new session");
+
+ cons3 = sess3.createConsumer(queue[3]);
+
+ log.info("Created consumer");
+
+ //Messages should be gone
+
+ tm = (TextMessage)cons3.receive(5000);
+
+ assertNull(tm);
+ }
+ finally
+ {
+ if (conn3 != null)
+ {
+ conn3.close();
+ }
+
+ try
+ {
+ ServerManagement.stop(4);
+ }
+ catch (Exception e)
+ {}
+ }
+ }
+
+ private void dumpFailoverMap(Map map)
+ {
+ Iterator iter = map.entrySet().iterator();
+
+ log.info("*** dumping failover map ***");
+
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ log.info(entry.getKey() + "-->" + entry.getValue());
+ }
+
+ log.info("*** end dump ***");
+ }
+
private void killFailoverNode(boolean transactional) throws Exception
{
JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/RecoverDeliveriesTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/RecoverDeliveriesTest.java 2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/RecoverDeliveriesTest.java 2007-07-09 21:57:24 UTC (rev 2862)
@@ -23,18 +23,22 @@
package org.jboss.test.messaging.jms.clustering;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.jboss.jms.client.FailoverEvent;
import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
+import org.jboss.messaging.util.MessageQueueNameHelper;
import org.jboss.test.messaging.tools.ServerManagement;
@@ -63,8 +67,6 @@
*
* Test timeout
*
- * Test multiple failover
- *
* Test nothing left in map or area
*
* Multiple queues in same session
@@ -92,6 +94,16 @@
{
this.simple(false);
}
+
+ public void testTempQueueTransactional() throws Exception
+ {
+ this.temporaryQueue(true);
+ }
+
+ public void testTempQueueNonTransactional() throws Exception
+ {
+ this.temporaryQueue(false);
+ }
public void testWithConnectionOnNewNodeTransactional() throws Exception
{
@@ -113,6 +125,177 @@
connectionsOnAllNodes(false);
}
+ public void testCancelTransactional() throws Exception
+ {
+ cancel(true);
+ }
+
+ public void testCancelNonTransactional() throws Exception
+ {
+ cancel(false);
+ }
+
+ public void testDurableSubTransactional() throws Exception
+ {
+ durableSub(true);
+ }
+
+ public void testDurableSubNonTransactional() throws Exception
+ {
+ durableSub(false);
+ }
+
+
+ public void testTimeout() throws Exception
+ {
+ final long timeout = 20 * 1000;
+
+ JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
+
+ ((ClientClusteredConnectionFactoryDelegate)factory.getDelegate()).setSupportsFailover(false);
+
+ Connection conn1 = createConnectionOnServer(factory,1);
+
+ Connection conn2 = null;
+
+ try
+ {
+ ServerManagement.getServer(2).setAttribute(ServerManagement.getServerPeerObjectName(), "RecoverDeliveriesTimeout", String.valueOf(timeout));
+
+ ServerManagement.deployQueue("timeoutQueue", 0);
+ ServerManagement.deployQueue("timeoutQueue", 1);
+ ServerManagement.deployQueue("timeoutQueue", 2);
+
+ Queue timeoutQueue = (Queue)ic[1].lookup("/queue/timeoutQueue");
+
+ Session sessSend = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod1 = sessSend.createProducer(timeoutQueue);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons1 = sess1.createConsumer(timeoutQueue);
+
+
+ conn1.start();
+
+ TextMessage tm = null;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ tm = (TextMessage)cons1.receive(2000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ int failoverNodeId = this.getFailoverNodeForNode(factory, 1);
+
+ assertEquals(2, failoverNodeId);
+
+ int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(timeoutQueue.getQueueName());
+ assertEquals(0, recoveryMapSize);
+ Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(timeoutQueue.getQueueName());
+ Map ids = (Map)recoveryArea.get(new Integer(1));
+ assertNotNull(ids);
+ assertEquals(numMessages, ids.size());
+
+ //First turn OFF failover on the connection factory
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ //Failover won't occur on the client
+
+ //Let's give it enough time to happen on the server
+
+ Thread.sleep(10000);
+
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(timeoutQueue.getQueueName());
+ assertEquals(numMessages, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(timeoutQueue.getQueueName());
+ ids = (Map)recoveryArea.get(new Integer(1));
+ assertNull(ids);
+
+ //Now we wait for the timeout period
+
+ log.info("Waiting for timeout");
+ Thread.sleep(timeout + 1000);
+ log.info("Waited");
+
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(timeoutQueue.getQueueName());
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(timeoutQueue.getQueueName());
+ ids = (Map)recoveryArea.get(new Integer(1));
+ assertNull(ids);
+
+ //Now we should be able to consume the refs again
+
+ conn1.close();
+
+ log.info("Creating connection");
+
+ conn2 = createConnectionOnServer(factory, failoverNodeId);
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons2 = sess2.createConsumer(timeoutQueue);
+
+ conn2.start();
+
+ Set msgs = new HashSet();
+ for (int i = 0; i < numMessages; i++)
+ {
+ tm = (TextMessage)cons2.receive(2000);
+
+ assertNotNull(tm);
+
+ //assertEquals("message" + i, tm.getText());
+
+ log.info("Got message:" + tm.getText());
+
+ msgs.add(tm.getText());
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ assertTrue(msgs.contains("message" + i));
+ }
+
+ tm = (TextMessage)cons2.receive(5000);
+
+ assertNull(tm);
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -181,6 +364,13 @@
int failoverNodeId = this.getFailoverNodeForNode(factory, 1);
+ int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ assertEquals(0, recoveryMapSize);
+ Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+ Map ids = (Map)recoveryArea.get(new Integer(1));
+ assertNotNull(ids);
+ assertEquals(numMessages, ids.size());
+
ServerManagement.kill(1);
log.info("########");
@@ -218,6 +408,12 @@
tm.acknowledge();
}
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+ ids = (Map)recoveryArea.get(new Integer(1));
+ assertNull(ids);
+
log.info("acked");
sess1.close();
@@ -247,8 +443,8 @@
}
}
-
- private void killFailoverNode(boolean transactional) throws Exception
+
+ private void temporaryQueue(boolean transactional) throws Exception
{
JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
@@ -260,8 +456,10 @@
((JBossConnection)conn1).registerFailoverListener(failoverListener);
Session sessSend = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue tempQueue1 = sessSend.createTemporaryQueue();
- MessageProducer prod1 = sessSend.createProducer(queue[1]);
+ MessageProducer prod1 = sessSend.createProducer(tempQueue1);
final int numMessages = 10;
@@ -274,7 +472,7 @@
Session sess1 = conn1.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+ MessageConsumer cons1 = sess1.createConsumer(tempQueue1);
conn1.start();
@@ -292,23 +490,150 @@
//Don't ack
- //We kill the failover node for node 1
+ //Now kill server
+
int failoverNodeId = this.getFailoverNodeForNode(factory, 1);
- log.info("Killing failover node:" + failoverNodeId);
+ int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(tempQueue1.getQueueName());
+ assertEquals(0, recoveryMapSize);
+ Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(tempQueue1.getQueueName());
+ Map ids = (Map)recoveryArea.get(new Integer(1));
+ assertNotNull(ids);
+ assertEquals(numMessages, ids.size());
- ServerManagement.kill(failoverNodeId);
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ log.info("Waiting for failover to complete");
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ log.info("Failover completed");
+
+ assertEquals(failoverNodeId, getServerId(conn1));
+
+ //Now ack
+ if (transactional)
+ {
+ sess1.commit();
+ }
+ else
+ {
+ tm.acknowledge();
+ }
+
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(tempQueue1.getQueueName());
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(tempQueue1.getQueueName());
+ ids = (Map)recoveryArea.get(new Integer(1));
+ assertNull(ids);
+
+ log.info("acked");
+
+ sess1.close();
+
+ log.info("closed");
+
+ sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ log.info("created new session");
- log.info("Killed failover node");
+ cons1 = sess1.createConsumer(tempQueue1);
- Thread.sleep(5000);
+ log.info("Created consumer");
- //Now kill
+ //Messages should be gone
- failoverNodeId = this.getFailoverNodeForNode(factory, 1);
+ tm = (TextMessage)cons1.receive(5000);
+
+ assertNull(tm);
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ }
+ }
+
+ private void durableSub(boolean transactional) throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
+
+ Connection conn1 = createConnectionOnServer(factory,1);
+
+ try
+ {
+ String clientID = "I am sick of writing these fucking tests!!! AAAAAAAAAARRRRRRRRRRGGGGGGGHHHHHHH";
+ conn1.setClientID(clientID);
- log.info("Failover node id is now " + failoverNodeId);
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)conn1).registerFailoverListener(failoverListener);
+ Session sessSend = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String subName = "ooooooooo matron!!";
+
+ MessageConsumer sub = sessSend.createDurableSubscriber(topic[1], subName);
+
+ String queueName = MessageQueueNameHelper.createSubscriptionName(clientID, subName);
+
+ MessageProducer prod1 = sessSend.createProducer(topic[1]);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ Session sess1 = conn1.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+
+ conn1.start();
+
+ TextMessage tm = null;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ tm = (TextMessage)sub.receive(2000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ //Don't ack
+
+ //Now kill server
+
+ int failoverNodeId = this.getFailoverNodeForNode(factory, 1);
+
+ int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
+ assertEquals(0, recoveryMapSize);
+ Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
+ Map ids = (Map)recoveryArea.get(new Integer(1));
+ assertNotNull(ids);
+ assertEquals(numMessages, ids.size());
+
ServerManagement.kill(1);
log.info("########");
@@ -346,6 +671,12 @@
tm.acknowledge();
}
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
+ ids = (Map)recoveryArea.get(new Integer(1));
+ assertNull(ids);
+
log.info("acked");
sess1.close();
@@ -356,7 +687,7 @@
log.info("created new session");
- cons1 = sess1.createConsumer(queue[1]);
+ MessageConsumer cons1 = sess1.createConsumer(topic[1]);
log.info("Created consumer");
@@ -375,8 +706,86 @@
}
}
+ private void cancel(boolean transactional) throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
+
+ Connection conn1 = createConnectionOnServer(factory,1);
+
+ try
+ {
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)conn1).registerFailoverListener(failoverListener);
+
+ Session sessSend = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod1 = sessSend.createProducer(queue[1]);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ Session sess1 = conn1.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+
+
+ conn1.start();
+
+ TextMessage tm = null;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ tm = (TextMessage)cons1.receive(2000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ //Don't ack
+
+ int failoverNodeId = this.getFailoverNodeForNode(factory, 1);
+
+ int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ assertEquals(0, recoveryMapSize);
+ Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+ Map ids = (Map)recoveryArea.get(new Integer(1));
+ assertNotNull(ids);
+ assertEquals(numMessages, ids.size());
+
+ //Now cancel the session
+
+ sess1.close();
+
+ Thread.sleep(5000);
+
+ //Ensure the dels are removed from the backup
+
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+ ids = (Map)recoveryArea.get(new Integer(1));
+ assertNull(ids);
+
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ }
+ }
+
private void connectionOnNewNode(boolean transactional) throws Exception
{
JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
@@ -434,6 +843,13 @@
int failoverNodeId = this.getFailoverNodeForNode(factory, 1);
+ int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ assertEquals(0, recoveryMapSize);
+ Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+ Map ids = (Map)recoveryArea.get(new Integer(1));
+ assertNotNull(ids);
+ assertEquals(numMessages, ids.size());
+
ServerManagement.kill(1);
log.info("########");
@@ -473,6 +889,12 @@
log.info("acked");
+ recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+ assertEquals(0, recoveryMapSize);
+ recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+ ids = (Map)recoveryArea.get(new Integer(1));
+ assertNull(ids);
+
sess1.close();
log.info("closed");
@@ -763,6 +1185,24 @@
}
}
+ /*
+ * Test recoveryArea timeout
+ *
+ * Create session, consume messages but don't ack
+ *
+ * kill the server
+ *
+ * somehow prevent the client failing over. ??
+ *
+ * assert recovery area is full
+ *
+ * wait for timeout
+ *
+ * assert messages can be consumed
+ *
+ * assert recovery area is empty
+ */
+
// Inner classes -------------------------------------------------
}
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2007-07-09 21:57:24 UTC (rev 2862)
@@ -40,6 +40,7 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.MessageStore;
import org.jboss.messaging.core.contract.PersistenceManager;
+import org.jboss.messaging.core.contract.PostOffice;
import org.jboss.messaging.util.XMLUtil;
import org.jboss.remoting.ServerInvocationHandler;
import org.jboss.test.messaging.tools.ServerManagement;
@@ -819,8 +820,31 @@
public Set getNodeIDView() throws Exception
{
- return (Set)sc.getAttribute(postOfficeObjectName, "NodeIDView");
+ PostOffice postOffice = (PostOffice)sc.getAttribute(postOfficeObjectName, "Instance");
+
+ return postOffice.nodeIDView();
}
+
+ public Map getFailoverMap() throws Exception
+ {
+ PostOffice postOffice = (PostOffice)sc.getAttribute(postOfficeObjectName, "Instance");
+
+ return postOffice.getFailoverMap();
+ }
+
+ public Map getRecoveryArea(String queueName) throws Exception
+ {
+ PostOffice postOffice = (PostOffice)sc.getAttribute(postOfficeObjectName, "Instance");
+
+ return postOffice.getRecoveryArea(queueName);
+ }
+
+ public int getRecoveryMapSize(String queueName) throws Exception
+ {
+ PostOffice postOffice = (PostOffice)sc.getAttribute(postOfficeObjectName, "Instance");
+
+ return postOffice.getRecoveryMapSize(queueName);
+ }
public List pollNotificationListener(long listenerID) throws Exception
{
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java 2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java 2007-07-09 21:57:24 UTC (rev 2862)
@@ -24,16 +24,16 @@
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
-import java.util.Set;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.HashMap;
-import java.util.Collections;
+import java.util.Set;
+
+import javax.management.NotificationListener;
import javax.management.ObjectName;
-import javax.management.NotificationListener;
import javax.transaction.UserTransaction;
-import org.jboss.jms.jndi.JMSProviderAdapter;
import org.jboss.jms.server.DestinationManager;
import org.jboss.jms.server.ServerPeer;
import org.jboss.logging.Logger;
@@ -457,7 +457,22 @@
{
return server.getNodeIDView();
}
-
+
+ public Map getFailoverMap() throws Exception
+ {
+ return server.getFailoverMap();
+ }
+
+ public Map getRecoveryArea(String queueName) throws Exception
+ {
+ return server.getRecoveryArea(queueName);
+ }
+
+ public int getRecoveryMapSize(String queueName) throws Exception
+ {
+ return server.getRecoveryMapSize(queueName);
+ }
+
public List pollNotificationListener(long listenerID) throws Exception
{
ProxyNotificationListener pl = null;
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java 2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java 2007-07-09 21:57:24 UTC (rev 2862)
@@ -23,6 +23,7 @@
import java.rmi.Remote;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import javax.management.NotificationListener;
@@ -272,6 +273,12 @@
* USE IT ONLY FOR CLUSTERING TESTS!
*/
Set getNodeIDView() throws Exception;
+
+ Map getFailoverMap() throws Exception;
+
+ Map getRecoveryArea(String queueName) throws Exception;
+
+ int getRecoveryMapSize(String queueName) throws Exception;
/**
* @return List<Notification>
More information about the jboss-cvs-commits
mailing list