[jboss-cvs] JBoss Messaging SVN: r2862 - in trunk: src/main/org/jboss/jms/server/endpoint and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jul 9 17:57:24 EDT 2007


Author: timfox
Date: 2007-07-09 17:57:24 -0400 (Mon, 09 Jul 2007)
New Revision: 2862

Modified:
   trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java
   trunk/src/main/org/jboss/messaging/core/contract/Queue.java
   trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/ChangeFailoverNodeTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/RecoverDeliveriesTest.java
   trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
   trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
   trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
Log:
more stuff


Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java	2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java	2007-07-09 21:57:24 UTC (rev 2862)
@@ -170,6 +170,12 @@
    	return supportsFailover;
    }
    
+   //Only used in testing
+   public void setSupportsFailover(boolean failover)
+   {
+   	this.supportsFailover = failover;
+   }
+   
    /** Method used to update the delegate and failoverMap during viewChange */
    public synchronized void updateFailoverInfo(ClientConnectionFactoryDelegate[] delegates,
                                                Map failoverMap)

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-07-09 21:57:24 UTC (rev 2862)
@@ -36,7 +36,6 @@
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
-import javax.jms.TextMessage;
 
 import org.jboss.aop.AspectManager;
 import org.jboss.jms.client.delegate.ClientBrowserDelegate;

Modified: trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java	2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java	2007-07-09 21:57:24 UTC (rev 2862)
@@ -149,5 +149,11 @@
 	void sendReplicateAckMessage(String queueName, long messageID) throws Exception;
 	
 	boolean isFirstNode();
+	
+	
+	//For testing only
+	Map getRecoveryArea(String queueName);
+   
+   int getRecoveryMapSize(String queueName);
 }
 

Modified: trunk/src/main/org/jboss/messaging/core/contract/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/Queue.java	2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/src/main/org/jboss/messaging/core/contract/Queue.java	2007-07-09 21:57:24 UTC (rev 2862)
@@ -86,4 +86,9 @@
    List recoverDeliveries(List messageIds);  
    
    void removeStrandedReferences(String sessionID);
+   
+   //For testing only
+   Map getRecoveryArea();
+   
+   int getRecoveryMapSize();
 }

Modified: trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2007-07-09 21:57:24 UTC (rev 2862)
@@ -475,6 +475,8 @@
    		ids = new ConcurrentHashMap(ids);
    	}
    	
+   	if (trace) { log.trace("Adding " + ids.size() + " ids to recovery area for node " + nodeID); }
+   	
    	recoveryArea.put(nid, ids);
    	
    	if (trace) { log.trace("Added"); }
@@ -485,6 +487,18 @@
    	return recoverDeliveriesTimeout;
    }
    
+   //testing only
+   
+   public Map getRecoveryArea()
+   {
+   	return this.recoveryArea;
+   }
+   
+   public int getRecoveryMapSize()
+   {
+   	return this.recoveryMap.size();
+   }
+   
    // ChannelSupport overrides --------------------------------------
    
    protected void deliverInternal()
@@ -642,9 +656,10 @@
 		}   	
    }
    
-   private class RecoveryEntry
+   static class RecoveryEntry
    {
    	String sessionID;
+   	
    	MessageReference ref;
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-07-09 21:57:24 UTC (rev 2862)
@@ -64,6 +64,7 @@
 import org.jboss.messaging.core.contract.Message;
 import org.jboss.messaging.core.contract.MessageReference;
 import org.jboss.messaging.core.contract.MessageStore;
+import org.jboss.messaging.core.contract.MessagingComponent;
 import org.jboss.messaging.core.contract.PersistenceManager;
 import org.jboss.messaging.core.contract.PostOffice;
 import org.jboss.messaging.core.contract.Queue;
@@ -306,6 +307,11 @@
    }
       
    // MessagingComponent overrides -----------------------------------------------------------------
+   
+   public MessagingComponent getInstance()
+   {
+   	return this;
+   }
 
    public void start() throws Exception
    {
@@ -570,6 +576,7 @@
    	}
    }
    
+   
    public Collection getAllBindingsForQueueName(String queueName) throws Exception
    {
    	return getBindings(queueName);
@@ -641,7 +648,40 @@
 	{
 		return firstNode;
 	}
+	
+	
+	//	Testing only
    
+   public Map getRecoveryArea(String queueName)
+   {
+   	Binding binding = (Binding)localNameMap.get(queueName);
+   	
+   	if (binding != null)
+   	{
+   		return binding.queue.getRecoveryArea();
+   	}
+   	else
+   	{
+   		return null;
+   	}
+   }
+   
+   public int getRecoveryMapSize(String queueName)
+   {
+   	Binding binding = (Binding)localNameMap.get(queueName);
+   	
+   	if (binding != null)
+   	{
+   		return binding.queue.getRecoveryMapSize();
+   	}
+   	else
+   	{
+   		return 0;
+   	}
+   }
+   
+   //End testing only
+   
    // GroupListener implementation -------------------------------------------------------------
  
    public void setState(byte[] bytes) throws Exception
@@ -1255,9 +1295,17 @@
    	
    	try
    	{
+   		log.info("local name map is " + localNameMap);
+   		
+   		if (localNameMap == null)
+   		{
+   			throw new IllegalStateException("Cannot add all replicated deliveries since there are no bindings - probably the queues aren't deployed");
+   		}
+   		
    		if (localNameMap != null)
    		{
    			Iterator iter = deliveries.entrySet().iterator();
+   			log.info("deliveries is " + deliveries);
    			
    			while (iter.hasNext())
    			{
@@ -1267,14 +1315,18 @@
    				
    				Map ids = (Map)entry.getValue();
    				
+   				log.info("queue;" + queueName + " ids: " + ids.size());
+   				
    				Binding binding = (Binding)localNameMap.get(queueName);
    				
    				if (binding == null)
    				{
-   					throw new IllegalStateException("Cannot find binding with name " + queueName);
+   					throw new IllegalStateException("Cannot find binding with name " + queueName + " maybe it hasn't been deployed");
    				}
    				
+   				log.info("adding");
    				binding.queue.addAllToRecoveryArea(nodeID, ids);
+   				log.info("added");
    			}   			   			
    		}
    	}
@@ -2615,7 +2667,7 @@
 	   	{
 	   		if (trace) { log.trace("Old failover node still exists, telling it remove replicated deliveries"); }
 	   		
-	   		ClusterRequest request = new AckAllReplicatedDeliveriesMessage(oldFailoverNodeID);
+	   		ClusterRequest request = new AckAllReplicatedDeliveriesMessage(thisNodeID);
 	   		
 	   		groupMember.unicastControl(request, info.getControlChannelAddress(), true);
 	   		

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ChangeFailoverNodeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ChangeFailoverNodeTest.java	2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ChangeFailoverNodeTest.java	2007-07-09 21:57:24 UTC (rev 2862)
@@ -22,6 +22,9 @@
 
 package org.jboss.test.messaging.jms.clustering;
 
+import java.util.Iterator;
+import java.util.Map;
+
 import javax.jms.Connection;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -52,9 +55,8 @@
    // Attributes ----------------------------------------------------
    
    // Constructors --------------------------------------------------
-   
+   	
 	
-	
    public ChangeFailoverNodeTest(String name)
    {
       super(name);
@@ -62,18 +64,16 @@
    
    // Public --------------------------------------------------------
    
-//   public void testKillFailoverNodeTransactional() throws Exception
-//   {
-//   	this.killFailoverNode(true);
-//   }
-//   
-//   public void testKillFailoverNodeNonTransactional() throws Exception
-//   {
-//   	this.killFailoverNode(false);
-//   }
+   public void testKillFailoverNodeTransactional() throws Exception
+   {
+   	this.killFailoverNode(true);
+   }
    
+   public void testKillFailoverNodeNonTransactional() throws Exception
+   {
+   	this.killFailoverNode(false);
+   }      
    
-   
    public void testStopFailoverNodeTransactional() throws Exception
    {
    	this.stopFailoverNode(true);
@@ -84,6 +84,30 @@
    	this.stopFailoverNode(false);
    }
       
+   public void testAddNodeToGetNewFailoverNodeNonTransactional() throws Exception
+   {
+   	this.addNodeToGetNewFailoverNode(false);
+   }
+   
+   public void testkillTwoFailoverNodesNonTransactional() throws Exception
+   {
+   	this.killTwoFailoverNodes(false);
+   }
+   
+   public void testkillTwoFailoverNodesTransactional() throws Exception
+   {
+   	this.killTwoFailoverNodes(true);
+   }
+   
+   public void testKillAllTooOneAndBackAgainNonTransactional() throws Exception
+   {
+   	this.killAllTooOneAndBackAgain(false);
+   }
+   
+   public void testKillAllTooOneAndBackAgainTransactional() throws Exception
+   {
+   	this.killAllTooOneAndBackAgain(true);
+   }
   
    
    // Package protected ---------------------------------------------
@@ -106,6 +130,653 @@
    
    // Private -------------------------------------------------------
 
+   private void killAllTooOneAndBackAgain(boolean transactional) throws Exception
+   {
+   	JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
+
+      Connection conn0 = createConnectionOnServer(factory, 0);
+ 
+      try
+      {
+      	SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn0).registerFailoverListener(failoverListener);
+      	
+         Session sessSend = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      		
+      	MessageProducer prod0 = sessSend.createProducer(queue[0]);
+      	
+      	final int numMessages = 10;
+      	
+      	for (int i = 0; i < numMessages; i++)
+      	{
+      		TextMessage tm = sessSend.createTextMessage("message" + i);
+      		
+      		prod0.send(tm);      		
+      	}
+      	
+      	Session sess0 = conn0.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+      	
+      	MessageConsumer cons0 = sess0.createConsumer(queue[0]);
+      
+      	
+      	conn0.start();
+      	
+      	TextMessage tm = null;
+      	
+      	for (int i = 0; i < numMessages; i++)
+      	{
+      		tm = (TextMessage)cons0.receive(2000);
+      		
+      		assertNotNull(tm);
+      		
+      		assertEquals("message" + i, tm.getText());
+      	}
+      	
+      	//Don't ack
+      	
+      	int failoverNodeId = this.getFailoverNodeForNode(factory, 0);
+      	
+      	log.info("Failover node for node 0 is " + failoverNodeId);
+      	
+      	//Now kill the failover node
+      	
+      	log.info("killing node " + failoverNodeId);
+      	ServerManagement.kill(failoverNodeId);
+      	
+      	Thread.sleep(5000);
+      	
+      	int newFailoverNodeId = this.getFailoverNodeForNode(factory, 0);      	    
+      	
+      	//Now kill the second failover node
+      	
+      	log.info("killing node " + newFailoverNodeId);
+      	ServerManagement.kill(newFailoverNodeId);
+      	
+      	Thread.sleep(5000);
+      	
+      	int evennewerFailoverNodeId = this.getFailoverNodeForNode(factory, 0);
+      	
+      	//Now kill the third failover node
+      	
+      	log.info("killing node " + evennewerFailoverNodeId);
+      	ServerManagement.kill(evennewerFailoverNodeId);
+      	
+      	//This just leaves the current node
+      	
+      	//Add a node
+      	
+      	ServerManagement.start(1, "all", false);
+      	
+      	log.info("started node 1");
+      	
+         //Now kill the node itself
+      	
+      	ServerManagement.kill(0);
+
+         log.info("########");
+         log.info("######## KILLED NODE 0");
+         log.info("########");
+
+         // wait for the client-side failover to complete
+
+         log.info("Waiting for failover to complete");
+         
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(120000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+         
+         log.info("Failover completed");
+         
+         assertEquals(1, getServerId(conn0));
+                            
+         //Now ack
+         if (transactional)
+         {
+         	sess0.commit();
+         }
+         else
+         {
+         	tm.acknowledge();
+         }
+         
+         log.info("acked");
+         
+         sess0.close();
+         
+         log.info("closed");
+         
+	      sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+	      
+	      log.info("created new session");
+      	
+      	cons0 = sess0.createConsumer(queue[0]);
+      	
+      	log.info("Created consumer");
+      	
+         //Messages should be gone
+      	
+      	tm = (TextMessage)cons0.receive(5000);
+      	
+      	assertNull(tm); 	
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+      }
+   }
+   
+   
+   private void killTwoFailoverNodes(boolean transactional) throws Exception
+   {
+   	JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
+
+      Connection conn0 = createConnectionOnServer(factory, 0);
+ 
+      try
+      {
+      	SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn0).registerFailoverListener(failoverListener);
+      	
+         Session sessSend = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      		
+      	MessageProducer prod0 = sessSend.createProducer(queue[0]);
+      	
+      	final int numMessages = 10;
+      	
+      	for (int i = 0; i < numMessages; i++)
+      	{
+      		TextMessage tm = sessSend.createTextMessage("message" + i);
+      		
+      		prod0.send(tm);      		
+      	}
+      	
+      	Session sess0 = conn0.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+      	
+      	MessageConsumer cons0 = sess0.createConsumer(queue[0]);
+      
+      	
+      	conn0.start();
+      	
+      	TextMessage tm = null;
+      	
+      	for (int i = 0; i < numMessages; i++)
+      	{
+      		tm = (TextMessage)cons0.receive(2000);
+      		
+      		assertNotNull(tm);
+      		
+      		assertEquals("message" + i, tm.getText());
+      	}
+      	
+      	//Don't ack
+      	
+      	int failoverNodeId = this.getFailoverNodeForNode(factory, 0);
+      	
+      	log.info("Failover node for node 0 is " + failoverNodeId);
+      	
+      	dumpFailoverMap(ServerManagement.getServer(0).getFailoverMap());
+      	
+      	int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+      	Map ids = (Map)recoveryArea.get(new Integer(0));
+      	assertNotNull(ids);
+      	assertEquals(numMessages, ids.size());
+      	
+      	//Now kill the failover node
+      	
+      	log.info("killing node " + failoverNodeId);
+      	ServerManagement.kill(failoverNodeId);
+      	
+      	Thread.sleep(5000);
+      	
+      	int newFailoverNodeId = this.getFailoverNodeForNode(factory, 0);
+      	
+      	log.info("New Failover node for node 0 is " + newFailoverNodeId);
+      	
+      	recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	recoveryArea = ServerManagement.getServer(newFailoverNodeId).getRecoveryArea(queue[newFailoverNodeId].getQueueName());
+      	ids = (Map)recoveryArea.get(new Integer(0));
+      	assertNotNull(ids);
+      	assertEquals(numMessages, ids.size());
+      	
+      	//Now kill the second failover node
+      	
+      	log.info("killing node " + newFailoverNodeId);
+      	ServerManagement.kill(newFailoverNodeId);
+      	
+      	Thread.sleep(5000);
+      	
+      	int evennewerFailoverNodeId = this.getFailoverNodeForNode(factory, 0);
+      	
+      	recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	recoveryArea = ServerManagement.getServer(evennewerFailoverNodeId).getRecoveryArea(queue[evennewerFailoverNodeId].getQueueName());
+      	ids = (Map)recoveryArea.get(new Integer(0));
+      	assertNotNull(ids);
+      	assertEquals(numMessages, ids.size());
+      	
+      	log.info("New Failover node for node 0 is " + evennewerFailoverNodeId);
+      	      	         
+         //Now kill the node itself
+      	
+      	ServerManagement.kill(0);
+
+         log.info("########");
+         log.info("######## KILLED NODE 0");
+         log.info("########");
+
+         // wait for the client-side failover to complete
+
+         log.info("Waiting for failover to complete");
+         
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(120000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+         
+         log.info("Failover completed");
+         
+         assertEquals(evennewerFailoverNodeId, getServerId(conn0));
+         
+         recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	recoveryArea = ServerManagement.getServer(evennewerFailoverNodeId).getRecoveryArea(queue[evennewerFailoverNodeId].getQueueName());
+      	ids = (Map)recoveryArea.get(new Integer(3));
+      	assertNull(ids);
+                           
+         
+                  
+         //Now ack
+         if (transactional)
+         {
+         	sess0.commit();
+         }
+         else
+         {
+         	tm.acknowledge();
+         }
+         
+         log.info("acked");
+         
+         sess0.close();
+         
+         log.info("closed");
+         
+	      sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+	      
+	      log.info("created new session");
+      	
+      	cons0 = sess0.createConsumer(queue[0]);
+      	
+      	log.info("Created consumer");
+      	
+         //Messages should be gone
+      	
+      	tm = (TextMessage)cons0.receive(5000);
+      	
+      	assertNull(tm); 	
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+      }
+   }
+   
+   
+   private void addNodeToGetNewFailoverNode(boolean transactional) throws Exception
+   {
+   	JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
+
+      Connection conn3 = createConnectionOnServer(factory, 3);
+ 
+      try
+      {
+      	SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn3).registerFailoverListener(failoverListener);
+      	
+         Session sessSend = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      		
+      	MessageProducer prod2 = sessSend.createProducer(queue[2]);
+      	
+      	final int numMessages = 10;
+      	
+      	for (int i = 0; i < numMessages; i++)
+      	{
+      		TextMessage tm = sessSend.createTextMessage("message" + i);
+      		
+      		prod2.send(tm);      		
+      	}
+      	
+      	Session sess3 = conn3.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+      	
+      	MessageConsumer cons3 = sess3.createConsumer(queue[3]);
+      
+      	
+      	conn3.start();
+      	
+      	TextMessage tm = null;
+      	
+      	for (int i = 0; i < numMessages; i++)
+      	{
+      		tm = (TextMessage)cons3.receive(2000);
+      		
+      		assertNotNull(tm);
+      		
+      		assertEquals("message" + i, tm.getText());
+      	}
+      	
+      	//Don't ack
+      	
+      	int failoverNodeId = this.getFailoverNodeForNode(factory, 3);
+      	
+      	log.info("Failover node for node 3 is " + failoverNodeId);
+      	
+      	dumpFailoverMap(ServerManagement.getServer(3).getFailoverMap());
+      	
+      	int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+      	Map ids = (Map)recoveryArea.get(new Integer(3));
+      	assertNotNull(ids);
+      	assertEquals(numMessages, ids.size());
+      	
+      	
+      	
+      	//We now add a new node - this should cause the failover node to change
+      	
+         ServerManagement.start(4, "all", false);
+         
+         ServerManagement.deployQueue("testDistributedQueue", 4);
+         
+         Thread.sleep(5000);
+         
+         
+         recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+      	ids = (Map)recoveryArea.get(new Integer(3));
+      	assertNull(ids);
+
+         
+         
+         dumpFailoverMap(ServerManagement.getServer(3).getFailoverMap());
+      	
+         int newFailoverNodeId = this.getFailoverNodeForNode(factory, 3);
+         
+         recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	recoveryArea = ServerManagement.getServer(newFailoverNodeId).getRecoveryArea(queue[3].getQueueName());
+      	ids = (Map)recoveryArea.get(new Integer(3));
+      	assertNotNull(ids);
+      	assertEquals(numMessages, ids.size());
+         
+         
+         log.info("New failover node is " + newFailoverNodeId);
+         
+         assertTrue(failoverNodeId != newFailoverNodeId);
+         
+         //Now kill the node
+      	
+      	ServerManagement.kill(3);
+
+         log.info("########");
+         log.info("######## KILLED NODE 3");
+         log.info("########");
+
+         // wait for the client-side failover to complete
+
+         log.info("Waiting for failover to complete");
+         
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(120000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+         
+         log.info("Failover completed");
+         
+         assertEquals(newFailoverNodeId, getServerId(conn3));
+         
+         recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	recoveryArea = ServerManagement.getServer(newFailoverNodeId).getRecoveryArea(queue[3].getQueueName());
+      	ids = (Map)recoveryArea.get(new Integer(3));
+      	assertNull(ids);
+                           
+         
+                  
+         //Now ack
+         if (transactional)
+         {
+         	sess3.commit();
+         }
+         else
+         {
+         	tm.acknowledge();
+         }
+         
+         log.info("acked");
+         
+         sess3.close();
+         
+         log.info("closed");
+         
+	      sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+	      
+	      log.info("created new session");
+      	
+      	cons3 = sess3.createConsumer(queue[3]);
+      	
+      	log.info("Created consumer");
+      	
+         //Messages should be gone
+      	
+      	tm = (TextMessage)cons3.receive(5000);
+      	
+      	assertNull(tm);      		  	
+      }
+      finally
+      {
+         if (conn3 != null)
+         {
+            conn3.close();
+         }
+         
+         try
+         {
+         	ServerManagement.stop(4);
+         }
+         catch (Exception e)
+         {}
+      }
+   }
+   
+   public void testFailoverToNodeWithNoQueueDeployed() throws Exception
+   {
+   	JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
+
+      Connection conn3 = createConnectionOnServer(factory, 3);
+ 
+      try
+      {
+      	SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn3).registerFailoverListener(failoverListener);
+      	
+         Session sessSend = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      		
+      	MessageProducer prod2 = sessSend.createProducer(queue[2]);
+      	
+      	final int numMessages = 10;
+      	
+      	for (int i = 0; i < numMessages; i++)
+      	{
+      		TextMessage tm = sessSend.createTextMessage("message" + i);
+      		
+      		prod2.send(tm);      		
+      	}
+      	
+      	Session sess3 = conn3.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      	
+      	MessageConsumer cons3 = sess3.createConsumer(queue[3]);
+      
+      	
+      	conn3.start();
+      	
+      	TextMessage tm = null;
+      	
+      	for (int i = 0; i < numMessages; i++)
+      	{
+      		tm = (TextMessage)cons3.receive(2000);
+      		
+      		assertNotNull(tm);
+      		
+      		assertEquals("message" + i, tm.getText());
+      	}
+      	
+      	//Don't ack
+      	
+      	int failoverNodeId = this.getFailoverNodeForNode(factory, 3);
+      	
+      	log.info("Failover node for node 3 is " + failoverNodeId);
+      	
+      	dumpFailoverMap(ServerManagement.getServer(3).getFailoverMap());
+      	
+      	//We now add a new node - this should cause the failover node to change
+      	
+         ServerManagement.start(4, "all", false);
+         
+         //DO NOT deploy the queue on it
+         
+         Thread.sleep(5000);
+         
+         dumpFailoverMap(ServerManagement.getServer(3).getFailoverMap());
+      	
+         int newFailoverNodeId = this.getFailoverNodeForNode(factory, 3);
+         
+         log.info("New failover node is " + newFailoverNodeId);
+         
+         assertTrue(failoverNodeId != newFailoverNodeId);
+         
+         //Now kill the node
+      	
+         // The queue does not exist on the new node so it tests the case where queue merging DOES NOT occur
+         
+         ServerManagement.kill(3);
+
+         log.info("########");
+         log.info("######## KILLED NODE 3");
+         log.info("########");
+
+         // wait for the client-side failover to complete
+
+         log.info("Waiting for failover to complete");
+         
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(120000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+         
+         log.info("Failover completed");
+         
+         assertEquals(newFailoverNodeId, getServerId(conn3));
+                  
+         //Now ack
+         
+         tm.acknowledge();
+         
+         
+         log.info("acked");
+         
+         sess3.close();
+         
+         log.info("closed");
+         
+	      sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+	      
+	      log.info("created new session");
+      	
+      	cons3 = sess3.createConsumer(queue[3]);
+      	
+      	log.info("Created consumer");
+      	
+         //Messages should be gone
+      	
+         tm = (TextMessage)cons3.receive(5000);
+      		
+      	assertNull(tm);      		
+      }
+      finally
+      {
+         if (conn3 != null)
+         {
+            conn3.close();
+         }
+         
+         try
+         {
+         	ServerManagement.stop(4);
+         }
+         catch (Exception e)
+         {}
+      }
+   }
+   
+   private void dumpFailoverMap(Map map)
+   {
+   	Iterator iter = map.entrySet().iterator();
+   	
+   	log.info("*** dumping failover map ***");
+   	
+   	while (iter.hasNext())
+   	{
+   		Map.Entry entry = (Map.Entry)iter.next();
+   		
+   		log.info(entry.getKey() + "-->" + entry.getValue());
+   	}
+   	
+   	log.info("*** end dump ***");
+   }
+   
    private void killFailoverNode(boolean transactional) throws Exception
    {
    	JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/RecoverDeliveriesTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/RecoverDeliveriesTest.java	2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/RecoverDeliveriesTest.java	2007-07-09 21:57:24 UTC (rev 2862)
@@ -23,18 +23,22 @@
 package org.jboss.test.messaging.jms.clustering;
 
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import javax.jms.Connection;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.jboss.jms.client.FailoverEvent;
 import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
+import org.jboss.messaging.util.MessageQueueNameHelper;
 import org.jboss.test.messaging.tools.ServerManagement;
 
 
@@ -63,8 +67,6 @@
 	 * 
 	 * Test timeout
 	 * 
-	 * Test multiple failover
-	 * 
 	 * Test nothing left in map or area
 	 * 
 	 * Multiple queues in same session
@@ -92,6 +94,16 @@
    {
    	this.simple(false);
    }
+   
+   public void testTempQueueTransactional() throws Exception
+   {
+   	this.temporaryQueue(true);
+   }
+   
+   public void testTempQueueNonTransactional() throws Exception
+   {
+   	this.temporaryQueue(false);
+   }
       
    public void testWithConnectionOnNewNodeTransactional() throws Exception
    {
@@ -113,6 +125,177 @@
    	connectionsOnAllNodes(false);
    }     
    
+   public void testCancelTransactional() throws Exception
+   {
+   	cancel(true);
+   }
+   
+   public void testCancelNonTransactional() throws Exception
+   {
+   	cancel(false);
+   }
+   
+   public void testDurableSubTransactional() throws Exception
+   {
+   	durableSub(true);
+   }
+   
+   public void testDurableSubNonTransactional() throws Exception
+   {
+   	durableSub(false);
+   }
+   
+   
+   public void testTimeout() throws Exception
+   {
+   	final long timeout = 20 * 1000;
+   	
+   	JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
+   	
+   	((ClientClusteredConnectionFactoryDelegate)factory.getDelegate()).setSupportsFailover(false);
+   	
+      Connection conn1 = createConnectionOnServer(factory,1);
+      
+      Connection conn2 = null;
+ 
+      try
+      {      	
+      	ServerManagement.getServer(2).setAttribute(ServerManagement.getServerPeerObjectName(), "RecoverDeliveriesTimeout", String.valueOf(timeout));      	      	
+      	
+      	ServerManagement.deployQueue("timeoutQueue", 0);
+      	ServerManagement.deployQueue("timeoutQueue", 1);
+      	ServerManagement.deployQueue("timeoutQueue", 2);
+      	
+      	Queue timeoutQueue = (Queue)ic[1].lookup("/queue/timeoutQueue");
+      	
+         Session sessSend = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      		
+      	MessageProducer prod1 = sessSend.createProducer(timeoutQueue);
+      	
+      	final int numMessages = 10;
+      	
+      	for (int i = 0; i < numMessages; i++)
+      	{
+      		TextMessage tm = sessSend.createTextMessage("message" + i);
+      		
+      		prod1.send(tm);      		
+      	}
+      	
+      	Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      	
+      	MessageConsumer cons1 = sess1.createConsumer(timeoutQueue);
+      
+      	
+      	conn1.start();
+      	
+      	TextMessage tm = null;
+      	
+      	for (int i = 0; i < numMessages; i++)
+      	{
+      		tm = (TextMessage)cons1.receive(2000);
+      		
+      		assertNotNull(tm);
+      		
+      		assertEquals("message" + i, tm.getText());
+      	}
+      		
+      	int failoverNodeId = this.getFailoverNodeForNode(factory, 1);
+      	
+      	assertEquals(2, failoverNodeId);
+      	
+      	int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(timeoutQueue.getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(timeoutQueue.getQueueName());
+      	Map ids = (Map)recoveryArea.get(new Integer(1));
+      	assertNotNull(ids);
+      	assertEquals(numMessages, ids.size());
+      	
+      	//First turn OFF failover on the connection factory
+      	
+      	ServerManagement.kill(1);
+
+         log.info("########");
+         log.info("######## KILLED NODE 1");
+         log.info("########");
+         
+         //Failover won't occur on the client
+         
+         //Let's give it enough time to happen on the server
+         
+         Thread.sleep(10000);
+         
+         recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(timeoutQueue.getQueueName());
+      	assertEquals(numMessages, recoveryMapSize);
+      	recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(timeoutQueue.getQueueName());
+      	ids = (Map)recoveryArea.get(new Integer(1));
+      	assertNull(ids);
+ 
+      	//Now we wait for the timeout period
+      	
+      	log.info("Waiting for timeout");
+      	Thread.sleep(timeout + 1000);
+      	log.info("Waited");
+      	
+      	recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(timeoutQueue.getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(timeoutQueue.getQueueName());
+      	ids = (Map)recoveryArea.get(new Integer(1));
+      	assertNull(ids);
+      	
+      	//Now we should be able to consume the refs again
+      	
+      	conn1.close();
+      	
+      	log.info("Creating connection");
+      	
+      	conn2 = createConnectionOnServer(factory, failoverNodeId);
+                     
+       	Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+       	
+       	MessageConsumer cons2 = sess2.createConsumer(timeoutQueue);
+              	
+       	conn2.start();
+       	
+       	Set msgs = new HashSet();
+       	for (int i = 0; i < numMessages; i++)
+       	{
+       		tm = (TextMessage)cons2.receive(2000);
+       		
+       		assertNotNull(tm);
+       		
+       		//assertEquals("message" + i, tm.getText());
+       		
+       		log.info("Got message:" + tm.getText());
+       		
+       		msgs.add(tm.getText());
+       	}
+       	
+       	for (int i = 0; i < numMessages; i++)
+       	{
+       		assertTrue(msgs.contains("message" + i));
+       	}
+       	
+       	tm = (TextMessage)cons2.receive(5000);
+       	
+       	assertNull(tm);
+      	
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+         
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+   
+   
+
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------
@@ -181,6 +364,13 @@
       	
       	int failoverNodeId = this.getFailoverNodeForNode(factory, 1);
       	
+      	int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+      	Map ids = (Map)recoveryArea.get(new Integer(1));
+      	assertNotNull(ids);
+      	assertEquals(numMessages, ids.size());
+      	
       	ServerManagement.kill(1);
 
          log.info("########");
@@ -218,6 +408,12 @@
          	tm.acknowledge();
          }
          
+         recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+      	ids = (Map)recoveryArea.get(new Integer(1));
+      	assertNull(ids);
+         
          log.info("acked");
          
          sess1.close();
@@ -247,8 +443,8 @@
       }
    }
    
-   
-   private void killFailoverNode(boolean transactional) throws Exception
+  
+   private void temporaryQueue(boolean transactional) throws Exception
    {
    	JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
 
@@ -260,8 +456,10 @@
          ((JBossConnection)conn1).registerFailoverListener(failoverListener);
       	
          Session sessSend = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Queue tempQueue1 = sessSend.createTemporaryQueue();
       		
-      	MessageProducer prod1 = sessSend.createProducer(queue[1]);
+      	MessageProducer prod1 = sessSend.createProducer(tempQueue1);
       	
       	final int numMessages = 10;
       	
@@ -274,7 +472,7 @@
       	
       	Session sess1 = conn1.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
       	
-      	MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+      	MessageConsumer cons1 = sess1.createConsumer(tempQueue1);
       
       	
       	conn1.start();
@@ -292,23 +490,150 @@
       	
       	//Don't ack
       	
-      	//We kill the failover node for node 1
+      	//Now kill server
+      	
       	int failoverNodeId = this.getFailoverNodeForNode(factory, 1);
       	
-      	log.info("Killing failover node:" + failoverNodeId);
+      	int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(tempQueue1.getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(tempQueue1.getQueueName());
+      	Map ids = (Map)recoveryArea.get(new Integer(1));
+      	assertNotNull(ids);
+      	assertEquals(numMessages, ids.size());
       	
-      	ServerManagement.kill(failoverNodeId);
+      	ServerManagement.kill(1);
+
+         log.info("########");
+         log.info("######## KILLED NODE 1");
+         log.info("########");
+
+         // wait for the client-side failover to complete
+
+         log.info("Waiting for failover to complete");
+         
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(120000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+         
+         log.info("Failover completed");
+         
+         assertEquals(failoverNodeId, getServerId(conn1));
+                  
+         //Now ack
+         if (transactional)
+         {
+         	sess1.commit();
+         }
+         else
+         {
+         	tm.acknowledge();
+         }
+         
+         recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(tempQueue1.getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(tempQueue1.getQueueName());
+      	ids = (Map)recoveryArea.get(new Integer(1));
+      	assertNull(ids);
+         
+         log.info("acked");
+         
+         sess1.close();
+         
+         log.info("closed");
+         
+	      sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+	      
+	      log.info("created new session");
       	
-      	log.info("Killed failover node");
+      	cons1 = sess1.createConsumer(tempQueue1);
       	
-      	Thread.sleep(5000);
+      	log.info("Created consumer");
       	
-      	//Now kill 
+         //Messages should be gone
       	
-      	failoverNodeId = this.getFailoverNodeForNode(factory, 1);
+         tm = (TextMessage)cons1.receive(5000);
+      		
+      	assertNull(tm);      		
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+      }
+   }
+   
+   private void durableSub(boolean transactional) throws Exception
+   {
+   	JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
+
+      Connection conn1 = createConnectionOnServer(factory,1);
+ 
+      try
+      {
+      	String clientID = "I am sick of writing these fucking tests!!! AAAAAAAAAARRRRRRRRRRGGGGGGGHHHHHHH";
+      	conn1.setClientID(clientID);
       	
-      	log.info("Failover node id is now " + failoverNodeId);
+      	SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn1).registerFailoverListener(failoverListener);
       	
+         Session sessSend = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               		
+         String subName = "ooooooooo matron!!";
+         
+         MessageConsumer sub = sessSend.createDurableSubscriber(topic[1], subName);
+         
+         String queueName = MessageQueueNameHelper.createSubscriptionName(clientID, subName);
+         
+      	MessageProducer prod1 = sessSend.createProducer(topic[1]);
+      	
+      	final int numMessages = 10;
+      	
+      	for (int i = 0; i < numMessages; i++)
+      	{
+      		TextMessage tm = sessSend.createTextMessage("message" + i);
+      		
+      		prod1.send(tm);      		
+      	}
+      	
+      	Session sess1 = conn1.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+      	
+      	conn1.start();
+      	
+      	TextMessage tm = null;
+      	
+      	for (int i = 0; i < numMessages; i++)
+      	{
+      		tm = (TextMessage)sub.receive(2000);
+      		
+      		assertNotNull(tm);
+      		
+      		assertEquals("message" + i, tm.getText());
+      	}
+      	
+      	//Don't ack
+      	
+      	//Now kill server
+      	
+      	int failoverNodeId = this.getFailoverNodeForNode(factory, 1);
+      	
+      	int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
+      	assertEquals(0, recoveryMapSize);
+      	Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
+      	Map ids = (Map)recoveryArea.get(new Integer(1));
+      	assertNotNull(ids);
+      	assertEquals(numMessages, ids.size());
+      	
       	ServerManagement.kill(1);
 
          log.info("########");
@@ -346,6 +671,12 @@
          	tm.acknowledge();
          }
          
+         recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queueName);
+      	assertEquals(0, recoveryMapSize);
+      	recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queueName);
+      	ids = (Map)recoveryArea.get(new Integer(1));
+      	assertNull(ids);
+         
          log.info("acked");
          
          sess1.close();
@@ -356,7 +687,7 @@
 	      
 	      log.info("created new session");
       	
-      	cons1 = sess1.createConsumer(queue[1]);
+      	MessageConsumer cons1 = sess1.createConsumer(topic[1]);
       	
       	log.info("Created consumer");
       	
@@ -375,8 +706,86 @@
       }
    }
    
+   private void cancel(boolean transactional) throws Exception
+   {
+   	JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
+
+      Connection conn1 = createConnectionOnServer(factory,1);
+ 
+      try
+      {
+      	SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn1).registerFailoverListener(failoverListener);
+      	
+         Session sessSend = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      		
+      	MessageProducer prod1 = sessSend.createProducer(queue[1]);
+      	
+      	final int numMessages = 10;
+      	
+      	for (int i = 0; i < numMessages; i++)
+      	{
+      		TextMessage tm = sessSend.createTextMessage("message" + i);
+      		
+      		prod1.send(tm);      		
+      	}
+      	
+      	Session sess1 = conn1.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+      	
+      	MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+      
+      	
+      	conn1.start();
+      	
+      	TextMessage tm = null;
+      	
+      	for (int i = 0; i < numMessages; i++)
+      	{
+      		tm = (TextMessage)cons1.receive(2000);
+      		
+      		assertNotNull(tm);
+      		
+      		assertEquals("message" + i, tm.getText());
+      	}
+      	
+      	//Don't ack
+      	
+      	int failoverNodeId = this.getFailoverNodeForNode(factory, 1);      	      	
+
+      	int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+      	Map ids = (Map)recoveryArea.get(new Integer(1));
+      	assertNotNull(ids);
+      	assertEquals(numMessages, ids.size());
+      	
+      	//Now cancel the session
+      	
+      	sess1.close();
+      	
+      	Thread.sleep(5000);
+      	
+      	//Ensure the dels are removed from the backup
+      	
+      	recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+      	ids = (Map)recoveryArea.get(new Integer(1));
+      	assertNull(ids);
+ 
+      
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+      }
+   }
    
    
+   
    private void connectionOnNewNode(boolean transactional) throws Exception
    {
    	JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
@@ -434,6 +843,13 @@
       	
       	int failoverNodeId = this.getFailoverNodeForNode(factory, 1);      	
       	
+      	int recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	Map recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+      	Map ids = (Map)recoveryArea.get(new Integer(1));
+      	assertNotNull(ids);
+      	assertEquals(numMessages, ids.size());
+      	
       	ServerManagement.kill(1);
 
          log.info("########");
@@ -473,6 +889,12 @@
          
          log.info("acked");
          
+         recoveryMapSize = ServerManagement.getServer(failoverNodeId).getRecoveryMapSize(queue[failoverNodeId].getQueueName());
+      	assertEquals(0, recoveryMapSize);
+      	recoveryArea = ServerManagement.getServer(failoverNodeId).getRecoveryArea(queue[failoverNodeId].getQueueName());
+      	ids = (Map)recoveryArea.get(new Integer(1));
+      	assertNull(ids);
+         
          sess1.close();
          
          log.info("closed");
@@ -763,6 +1185,24 @@
       }
    }
    
+   /*
+    * Test recoveryArea timeout
+    * 
+    * Create session, consume messages but don't ack
+    * 
+    * kill the server
+    * 
+    * somehow prevent the client failing over. ??
+    * 
+    * assert recovery area is full
+    * 
+    * wait for timeout
+    * 
+    * assert messages can be consumed
+    * 
+    * assert recovery area is empty
+    */
+   
    // Inner classes -------------------------------------------------
    
 }

Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2007-07-09 21:57:24 UTC (rev 2862)
@@ -40,6 +40,7 @@
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.MessageStore;
 import org.jboss.messaging.core.contract.PersistenceManager;
+import org.jboss.messaging.core.contract.PostOffice;
 import org.jboss.messaging.util.XMLUtil;
 import org.jboss.remoting.ServerInvocationHandler;
 import org.jboss.test.messaging.tools.ServerManagement;
@@ -819,8 +820,31 @@
 
    public Set getNodeIDView() throws Exception
    {
-      return (Set)sc.getAttribute(postOfficeObjectName, "NodeIDView");
+   	PostOffice postOffice = (PostOffice)sc.getAttribute(postOfficeObjectName, "Instance");
+   	
+   	return postOffice.nodeIDView();
    }
+   
+   public Map getFailoverMap() throws Exception
+   {
+   	PostOffice postOffice = (PostOffice)sc.getAttribute(postOfficeObjectName, "Instance");
+   	
+   	return postOffice.getFailoverMap();
+   }
+   
+   public Map getRecoveryArea(String queueName) throws Exception
+   {
+   	PostOffice postOffice = (PostOffice)sc.getAttribute(postOfficeObjectName, "Instance");
+   	
+   	return postOffice.getRecoveryArea(queueName);
+   }
+   
+   public int getRecoveryMapSize(String queueName) throws Exception
+   {
+   	PostOffice postOffice = (PostOffice)sc.getAttribute(postOfficeObjectName, "Instance");
+   	
+   	return postOffice.getRecoveryMapSize(queueName);
+   }
 
    public List pollNotificationListener(long listenerID) throws Exception
    {

Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java	2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java	2007-07-09 21:57:24 UTC (rev 2862)
@@ -24,16 +24,16 @@
 import java.rmi.registry.LocateRegistry;
 import java.rmi.registry.Registry;
 import java.rmi.server.UnicastRemoteObject;
-import java.util.Set;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
-import java.util.Collections;
+import java.util.Set;
+
+import javax.management.NotificationListener;
 import javax.management.ObjectName;
-import javax.management.NotificationListener;
 import javax.transaction.UserTransaction;
 
-import org.jboss.jms.jndi.JMSProviderAdapter;
 import org.jboss.jms.server.DestinationManager;
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.logging.Logger;
@@ -457,7 +457,22 @@
    {
       return server.getNodeIDView();
    }
-
+   
+   public Map getFailoverMap() throws Exception
+   {
+   	return server.getFailoverMap();
+   }
+   
+   public Map getRecoveryArea(String queueName) throws Exception
+   {
+   	return server.getRecoveryArea(queueName);
+   }
+   
+   public int getRecoveryMapSize(String queueName) throws Exception
+   {
+   	return server.getRecoveryMapSize(queueName);
+   }
+   
    public List pollNotificationListener(long listenerID) throws Exception
    {
       ProxyNotificationListener pl = null;

Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java	2007-07-09 21:24:59 UTC (rev 2861)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java	2007-07-09 21:57:24 UTC (rev 2862)
@@ -23,6 +23,7 @@
 
 import java.rmi.Remote;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import javax.management.NotificationListener;
@@ -272,6 +273,12 @@
     * USE IT ONLY FOR CLUSTERING TESTS!
     */
    Set getNodeIDView() throws Exception;
+   
+   Map getFailoverMap() throws Exception;
+   
+   Map getRecoveryArea(String queueName) throws Exception;
+   
+   int getRecoveryMapSize(String queueName) throws Exception;
 
    /**
     * @return List<Notification>




More information about the jboss-cvs-commits mailing list