[jboss-cvs] JBoss Messaging SVN: r1769 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/client/state src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/server/remoting src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests tests/src/org/jboss/test/messaging/core tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 12 04:11:54 EST 2006


Author: timfox
Date: 2006-12-12 04:11:39 -0500 (Tue, 12 Dec 2006)
New Revision: 1769

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/AckInfo.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/Channel.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/ChannelSupport.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
   branches/Branch_Client_Failover_Experiment/tests/build.xml
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/SimpleChannel.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
Final commit on this branch



Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -49,6 +49,7 @@
 import org.jboss.jms.client.state.SessionState;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.server.endpoint.CreateConnectionResult;
+import org.jboss.jms.tx.AckInfo;
 import org.jboss.logging.Logger;
 import org.jboss.remoting.Client;
 import org.jboss.remoting.ConnectionListener;
@@ -328,8 +329,23 @@
          failedSessionDelegate.copyState(newSessionDelegate);
          
          log.info("copied state");
+         
+         //Now we remove any unacked np messages - this is because we don't want to ack them
+         //since the server won't know about them and will barf
+         Iterator iter = failedSessionState.getToAck().iterator();
+         
+         while (iter.hasNext())
+         {
+            AckInfo info = (AckInfo)iter.next();
+            
+            if (!info.getMessage().getMessage().isReliable())
+            {
+               iter.remove();
+            }            
+         }
+         
+         //TODO remove any unacked from the resource manager
 
-
          if (trace) { log.trace("replacing session (" + failedSessionDelegate + ") with a new failover session " + newSessionDelegate); }
 
          //TODO Clebert please add comment as to why this clone is necessary
@@ -359,8 +375,32 @@
                 handleFailoverOnBrowser((BrowserState)sessionChild, newSessionDelegate);
             }
          }
+         
+         /* Now we must sent the list of unacked AckInfos to the server - so the consumers
+          * delivery lists can be repopulated
+          */
+         List ackInfos = null;
+         
+         if (!failedSessionState.isTransacted())
+         {
+            //Get the ack infos from the list in the session state
+            ackInfos = failedSessionState.getToAck();
+         }
+         else
+         {
+            //Transacted session - we need to get the acks
+            //TODO
+         }
+         
+         //TODO for a transacted session the ackinfos will be in the resource manager!!
+         
+         if (!ackInfos.isEmpty())
+         {
+            newSessionDelegate.sendUnackedAckInfos(ackInfos);
+         }
+                  
       }
-      
+            
       //We must not start the connection until the end
       if (failedState.isStarted())
       {
@@ -369,6 +409,8 @@
       
       log.info("Failover done");
    }
+   
+   
 
    private void handleFailoverOnConsumer(ClientConnectionDelegate connectionDelegate,
                                          ConnectionState failedConnectionState,
@@ -387,11 +429,11 @@
 
       ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)failedSessionDelegate.
          failOverConsumer((JBossDestination)failedConsumerState.getDestination(),
-                                failedConsumerState.getSelector(),
-                                failedConsumerState.isNoLocal(),
-                                failedConsumerState.getSubscriptionName(),
-                                failedConsumerState.isConnectionConsumer(),
-                                failedConsumerDelegate.getChannelId());
+                           failedConsumerState.getSelector(),
+                           failedConsumerState.isNoLocal(),
+                           failedConsumerState.getSubscriptionName(),
+                           failedConsumerState.isConnectionConsumer(),
+                           failedConsumerDelegate.getChannelId());
 
       if (trace) { log.trace("handleFailoverOnConsumer: alternate consumer created"); }
 
@@ -418,6 +460,9 @@
 
       MessageCallbackHandler handler = cm.unregisterHandler(oldServerID, oldConsumerID);
       handler.setConsumerId(failedConsumerState.getConsumerID());
+      
+      //Clear the buffer of the handler
+      handler.clearBuffer();
 
       cm.registerHandler(failedConnectionState.getServerID(),
                          failedConsumerState.getConsumerID(),
@@ -426,8 +471,8 @@
       failedSessionState.addCallbackHandler(handler);
       
       log.info("failed over consumer");
-
    }
+   
 
    private void handleFailoverOnProducer(ProducerState failedProducerState,
                                          ClientSessionDelegate failedSessionDelegate)

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -441,6 +441,15 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
    
+   /**
+    * This invocation should either be handled by the client-side interceptor chain or by the
+    * server-side endpoint.
+    */
+   public void sendUnackedAckInfos(List ackInfos) throws JMSException
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+   
 
    // Public --------------------------------------------------------
 
@@ -457,6 +466,7 @@
       return ((ConnectionState)state.getParent()).getRemotingConnection().getInvokingClient();
    }
 
+
    // Package Private -----------------------------------------------
 
    // Private -------------------------------------------------------

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -514,6 +514,11 @@
       }
    }
    
+   public void clearBuffer()
+   {
+      buffer.clear();
+   }
+   
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/SessionState.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/SessionState.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -48,7 +48,6 @@
  */
 public class SessionState extends HierarchicalStateSupport
 {
-
    protected static Logger log = Logger.getLogger(SessionState.class);
 
    private int acknowledgeMode;

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -31,9 +31,9 @@
 import org.jboss.jms.selector.Selector;
 import org.jboss.jms.server.remoting.JMSDispatcher;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Channel;
 import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.Routable;
-import org.jboss.messaging.core.local.PagingFilteredQueue;
 
 /**
  * Concrete implementation of BrowserEndpoint.
@@ -66,7 +66,7 @@
    // Constructors --------------------------------------------------
 
    protected ServerBrowserEndpoint(ServerSessionEndpoint session, int id,
-                                   PagingFilteredQueue destination, String messageSelector)
+                                   Channel destination, String messageSelector)
       throws JMSException
    {     
       this.session = session;

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -42,6 +42,7 @@
 import org.jboss.jms.server.remoting.MessagingMarshallable;
 import org.jboss.jms.util.ExceptionUtil;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Channel;
 import org.jboss.messaging.core.Delivery;
 import org.jboss.messaging.core.DeliveryObserver;
 import org.jboss.messaging.core.MessageReference;
@@ -49,7 +50,6 @@
 import org.jboss.messaging.core.Receiver;
 import org.jboss.messaging.core.Routable;
 import org.jboss.messaging.core.SimpleDelivery;
-import org.jboss.messaging.core.local.PagingFilteredQueue;
 import org.jboss.messaging.core.plugin.contract.PostOffice;
 import org.jboss.messaging.core.plugin.postoffice.Binding;
 import org.jboss.messaging.core.tx.Transaction;
@@ -87,7 +87,7 @@
 
    private int id;
 
-   private PagingFilteredQueue messageQueue;
+   private Channel messageQueue;
    
    private String queueName;
 
@@ -131,7 +131,7 @@
    
    // Constructors --------------------------------------------------
 
-   protected ServerConsumerEndpoint(int id, PagingFilteredQueue messageQueue, String queueName,
+   protected ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
                                     ServerSessionEndpoint sessionEndpoint,
                                     String selector, boolean noLocal, JBossDestination dest,
                                     int prefetchSize, Queue dlq)
@@ -633,7 +633,24 @@
       }
       
    }
-   
+
+   protected void createDeliveries(List messageIds) throws Throwable
+   {
+      List dels = messageQueue.createDeliveries(messageIds);
+            
+      synchronized (lock)
+      {      
+         Iterator iter = dels.iterator();
+         
+         while (iter.hasNext())
+         {
+            Delivery del = (Delivery)iter.next();
+            
+            deliveries.put(new Long(del.getReference().getMessageID()), del);
+         }
+      }
+   }
+
    protected void cancelDelivery(Long messageID, int deliveryCount) throws Throwable
    {
       Delivery del = (Delivery)deliveries.remove(messageID);
@@ -653,7 +670,7 @@
          throw new IllegalStateException("Cannot find delivery to cancel:" + id);
       }
    }
-               
+           
    protected void start()
    {             
       synchronized (lock)

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -27,6 +27,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -186,11 +187,11 @@
          int prefetchSize = connectionEndpoint.getPrefetchSize();
          
          ServerConsumerEndpoint ep =
-            new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(),
+
+            new ServerConsumerEndpoint(consumerID, binding.getQueue(),
                                        binding.getQueue().getName(), this, selectorString, noLocal,
                                        jmsDestination, prefetchSize, dlq);
 
-         
          JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
          
          ClientConsumerDelegate stub =
@@ -755,6 +756,56 @@
          throw ExceptionUtil.handleJMSInvocation(t, this + " cancelDeliveries");
       }
    }
+   
+   public void sendUnackedAckInfos(List ackInfos) throws JMSException
+   {
+      try
+      {
+         //Sort into different list for each consumer
+         Map ackMap = new HashMap();
+                  
+         for (int i = ackInfos.size() - 1; i >= 0; i--)
+         {
+            AckInfo ack = (AckInfo)ackInfos.get(i);
+            
+            ServerConsumerEndpoint consumer =
+               this.connectionEndpoint.getConsumerEndpoint(ack.getConsumerID());
+   
+            if (consumer == null)
+            {
+               throw new IllegalArgumentException("Cannot find consumer id: " + ack.getConsumerID());
+            }
+            
+            LinkedList acks = (LinkedList)ackMap.get(consumer);
+            
+            if (acks == null)
+            {
+               acks = new LinkedList();
+               
+               ackMap.put(consumer, acks);
+            }
+            
+            acks.addFirst(new Long(ack.getMessageID()));
+         }  
+         
+         Iterator iter = ackMap.entrySet().iterator();
+         
+         while (iter.hasNext())
+         {
+            Map.Entry entry = (Map.Entry)iter.next();
+            
+            ServerConsumerEndpoint consumer = (ServerConsumerEndpoint)entry.getKey();
+            
+            List acks = (List)entry.getValue();
+            
+            consumer.createDeliveries(acks);
+         }
+      }
+      catch (Throwable t)
+      {
+         throw ExceptionUtil.handleJMSInvocation(t, this + " sendUnackedAckInfos");
+      }
+   }
 
    public void addTemporaryDestination(JBossDestination dest) throws JMSException
    {

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -120,5 +120,14 @@
     * @param ackInfos
     */
    void cancelDeliveries(List ackInfos) throws JMSException;
+   
+   
+   /**
+    * Send a list of unacked ackInfos to the server so the delivery lists can be repopulated
+    * used at failover
+    * @param ackInfos
+    * @throws JMSException
+    */
+   void sendUnackedAckInfos(List ackInfos) throws JMSException;
 }
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -147,6 +147,11 @@
    {
       endpoint.cancelDeliveries(ackInfos);
    }
+   
+   public void sendUnackedAckInfos(List ackInfos) throws JMSException
+   {
+      endpoint.sendUnackedAckInfos(ackInfos);
+   }
 
 
    // AdvisedSupport overrides --------------------------------------
@@ -161,6 +166,7 @@
       return "SessionAdvised->" + endpoint;
    }
 
+
    // Public --------------------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -37,8 +37,8 @@
 
 import org.jboss.aop.Dispatcher;
 import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.jms.client.remoting.CallbackManager;
 import org.jboss.jms.client.remoting.HandleMessageResponse;
-import org.jboss.jms.client.remoting.CallbackManager;
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.server.Version;
@@ -95,6 +95,7 @@
    protected static final byte MORE = 5;
    protected static final byte SEND_TRANSACTION = 6;
    protected static final byte GET_ID_BLOCK = 7;
+   protected static final byte UNACKED_ACKINFOS = 8;
  
 
    // The response codes - start from 100
@@ -312,6 +313,28 @@
    
                   if (trace) { log.trace("wrote cancelDeliveries()"); }
                }
+               else if ("sendUnackedAckInfos".equals(methodName) && mi.getArguments() != null)
+               {
+                  dos.writeByte(UNACKED_ACKINFOS);
+   
+                  writeHeader(mi, dos);
+   
+                  List ids = (List)mi.getArguments()[0];
+   
+                  dos.writeInt(ids.size());
+   
+                  Iterator iter = ids.iterator();
+   
+                  while (iter.hasNext())
+                  {
+                     AckInfo ack = (AckInfo)iter.next();
+                     ack.write(dos);
+                  }
+   
+                  dos.flush();
+   
+                  if (trace) { log.trace("wrote sendUnackedAckInfos()"); }
+               }
                else
                {
                   dos.write(SERIALIZED);
@@ -700,6 +723,35 @@
    
                return request;
             }
+            case UNACKED_ACKINFOS:
+            {
+               MethodInvocation mi = readHeader(dis);
+   
+               int size = dis.readInt();
+   
+               List acks = new ArrayList(size);
+   
+               for (int i = 0; i < size; i++)
+               {
+                  AckInfo ack = new AckInfo();
+                  
+                  ack.read(dis);
+                  
+                  acks.add(ack);
+               }
+   
+               Object[] args = new Object[] {acks};
+   
+               mi.setArguments(args);
+   
+               InvocationRequest request =
+                  new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
+                                        new MessagingMarshallable(version, mi), null, null, null);
+   
+               if (trace) { log.trace("read unackedAckInfos()"); }
+   
+               return request;
+            }
             case ID_BLOCK_RESPONSE:
             {
                IdBlock block = new IdBlock();

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/AckInfo.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/AckInfo.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/AckInfo.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -86,7 +86,7 @@
    /** Used to change ack's id during failover */
    public void setConsumerID(int consumerID)
    {
-       this.consumerID=consumerID;
+       this.consumerID = consumerID;
    }
    
    public MessageProxy getMessage()

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/Channel.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/Channel.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/Channel.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -145,6 +145,8 @@
    void deactivate();
    
    boolean isActive();
+   
+   List createDeliveries(List messageIds);
 
 }
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -491,6 +491,58 @@
          }
       }
    }
+   
+   public List createDeliveries(List messageIds)
+   {
+      //debug
+      Iterator iter = messageIds.iterator();
+      
+      log.info("***** createdeliveries");
+      while (iter.hasNext())
+      {
+         Long l = (Long)iter.next();
+         
+         log.info("Creating delivery for " + l);
+      }
+      log.info("**** end dump");
+      
+      iter = messageIds.iterator();
+      
+      List dels = new ArrayList();
+      
+      synchronized (refLock)
+      {
+         ListIterator liter = messageRefs.iterator();
+                           
+         while (iter.hasNext())
+         {
+            Long id = (Long)iter.next();
+            
+            if (!liter.hasNext())
+            {
+               throw new IllegalStateException("Cannot find ref in queue! (Might be paged!)");
+            }
+            
+            MessageReference ref = (MessageReference)liter.next();
+            
+            if (ref.getMessageID() == id.longValue())
+            {
+               liter.remove();
+               
+               Delivery del = new SimpleDelivery(this, ref);
+               
+               dels.add(del);
+                              
+               this.deliveries.add(del);
+               
+            }
+         }                  
+      }
+      
+      //TODO we need to look in paging state too - currently not supported
+      
+      return dels;
+   }
 
    // Public --------------------------------------------------------
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -272,4 +272,9 @@
    {
       return "RemoteQueueStub(node=" + this.nodeId + " name=" + this.name + " channelId=" + this.id + ")";
    }
+
+   public List createDeliveries(List messageIds)
+   {
+      throw new UnsupportedOperationException();
+   }
 }

Modified: branches/Branch_Client_Failover_Experiment/tests/build.xml
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/build.xml	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/tests/build.xml	2006-12-12 09:11:39 UTC (rev 1769)
@@ -745,7 +745,7 @@
                     haltonerror="${junit.batchtest.haltonerror}">
             <formatter type="plain" usefile="${junit.formatter.usefile}"/>
             <fileset dir="${build.tests.classes}">
-               <include name="**/jms/clustering/*Test.class"/>
+               <include name="**/jms/clustering/HATest.class"/>
                <!--
                <include name="**/jms/clustering/SimpleClusteringTest.class"/>
                -->

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/SimpleChannel.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/SimpleChannel.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/SimpleChannel.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -216,6 +216,11 @@
       throw new UnsupportedOperationException();
    }
 
+   public List createDeliveries(List messageIds)
+   {
+      throw new UnsupportedOperationException();
+   }
+
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -369,6 +369,12 @@
          // TODO Auto-generated method stub
          return false;
       }
+
+      public List createDeliveries(List messageIds)
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
       
    }
 

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -24,6 +24,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+
 import org.jboss.messaging.core.Delivery;
 import org.jboss.messaging.core.DeliveryObserver;
 import org.jboss.messaging.core.Filter;
@@ -608,6 +609,12 @@
          // TODO Auto-generated method stub
          return false;
       }
+
+      public List createDeliveries(List messageIds)
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
       
    }
    

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-12 08:12:41 UTC (rev 1768)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-12 09:11:39 UTC (rev 1769)
@@ -26,6 +26,7 @@
 import java.util.Set;
 
 import javax.jms.Connection;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -69,355 +70,661 @@
    }
    
    // Public --------------------------------------------------------
+//   
+//   /*
+//    * Test that connections created using a clustered connection factory are created round robin on
+//    * different servers
+//    */
+//   public void testRoundRobinConnectionCreation() throws Exception
+//   {
+//      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//      
+//      ClusteredClientConnectionFactoryDelegate delegate =
+//         (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//      
+//      log.info ("number of delegates = " + delegate.getDelegates().length);
+//      log.info ("number of servers = " + ServerManagement.getServer(0).getNodeIDView().size());
+//      
+//      assertEquals(3, delegate.getDelegates().length);
+//      
+//      ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//      
+//      ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//      
+//      ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+//      
+//      assertEquals(0, cf1.getServerId());
+//      
+//      assertEquals(1, cf2.getServerId());
+//      
+//      assertEquals(2, cf3.getServerId());
+//      
+//      assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
+//      
+//      Connection conn1 = null;
+//      
+//      Connection conn2 = null;
+//      
+//      Connection conn3 = null;
+//      
+//      Connection conn4 = null;
+//      
+//      Connection conn5 = null;
+//      
+//      try
+//      {         
+//         conn1 = factory.createConnection();
+//         
+//         conn2 = factory.createConnection();
+//         
+//         conn3 = factory.createConnection();
+//         
+//         conn4 = factory.createConnection();
+//         
+//         conn5 = factory.createConnection();
+//         
+//         ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
+//         
+//         ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
+//         
+//         ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
+//         
+//         ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
+//         
+//         ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
+//         
+//         int serverID1 = state1.getServerID();
+//         
+//         int serverID2 = state2.getServerID();
+//         
+//         int serverID3 = state3.getServerID();
+//         
+//         int serverID4 = state4.getServerID();
+//         
+//         int serverID5 = state5.getServerID();
+//         
+//         log.info("server id 1: " + serverID1);
+//         
+//         log.info("server id 2: " + serverID2);
+//         
+//         log.info("server id 3: " + serverID3);
+//         
+//         log.info("server id 4: " + serverID4);
+//         
+//         log.info("server id 5: " + serverID5);
+//         
+//         assertEquals(0, serverID1);
+//         
+//         assertEquals(1, serverID2);
+//         
+//         assertEquals(2, serverID3);
+//         
+//         assertEquals(0, serverID4);
+//         
+//         assertEquals(1, serverID5);
+//      }
+//      finally
+//      {
+//         if (conn1 != null)
+//         {
+//            conn1.close();
+//         }
+//         
+//         if (conn2 != null)
+//         {
+//            conn2.close();
+//         }
+//         
+//         if (conn3 != null)
+//         {
+//            conn3.close();
+//         }
+//         
+//         if (conn4 != null)
+//         {
+//            conn4.close();
+//         }
+//         
+//         if (conn5 != null)
+//         {
+//            conn5.close();
+//         }
+//      }
+//      
+//   }
+// 
+//   /*
+//    * Test that the failover mapping is created correctly and updated properly when nodes leave
+//    * or join
+//    */
+//   public void testDefaultFailoverMap() throws Exception
+//   {     
+//      {
+//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//         
+//         ClusteredClientConnectionFactoryDelegate delegate =
+//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//         
+//         assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
+//         
+//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//         
+//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//         
+//         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//         
+//         ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+//         
+//         //The order here depends on the order the servers were started in
+//         
+//         //If any servers get stopped and then started then the order will change
+//    
+//         log.info("cf1 serverid=" + cf1.getServerId());
+//         
+//         log.info("cf2 serverid=" + cf2.getServerId());
+//         
+//         log.info("cf3 serverid=" + cf3.getServerId());
+//         
+//         
+//         assertEquals(0, cf1.getServerId());
+//         
+//         assertEquals(1, cf2.getServerId());
+//         
+//         assertEquals(2, cf3.getServerId());
+//         
+//         Map failoverMap = delegate.getFailoverMap();
+//         
+//         assertEquals(3, delegates.length);
+//         
+//         assertEquals(3, failoverMap.size());
+//         
+//         // Default failover policy just chooses the node to the right
+//         
+//         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//         
+//         assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+//         
+//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+//      }
+//      
+//      //Now cleanly stop one of the servers
+//            
+//      log.info("************** STOPPING SERVER 0");
+//      ServerManagement.stop(0);
+//      
+//      log.info("server stopped");
+//      
+//      assertEquals(2, ServerManagement.getServer(1).getNodeIDView().size());
+//      
+//      {         
+//         //Lookup another connection factory
+//         
+//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+//         
+//         log.info("Got connection factory");
+//         
+//         ClusteredClientConnectionFactoryDelegate delegate =
+//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//         
+//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//         
+//         Map failoverMap = delegate.getFailoverMap();
+//         
+//         log.info("Got failover map");
+//         
+//         assertEquals(2, delegates.length);
+//         
+//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//         
+//         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//         
+//         //Order here depends on order servers were started in
+//         
+//         log.info("cf1 serverid=" + cf1.getServerId());
+//         
+//         log.info("cf2 serverid=" + cf2.getServerId());
+//         
+//         assertEquals(1, cf1.getServerId());
+//         
+//         assertEquals(2, cf2.getServerId());
+//         
+//         
+//         assertEquals(2, failoverMap.size());
+//         
+//         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//         
+//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+//      }
+//      
+//      //Cleanly stop another server
+//      
+//      log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
+//      
+//      ServerManagement.stop(1);
+//      
+//      assertEquals(1, ServerManagement.getServer(2).getNodeIDView().size());
+//      
+//      {         
+//         //Lookup another connection factory
+//         
+//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
+//         
+//         ClusteredClientConnectionFactoryDelegate delegate =
+//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//         
+//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//         
+//         Map failoverMap = delegate.getFailoverMap();
+//         
+//         assertEquals(1, delegates.length);
+//         
+//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//         
+//         assertEquals(2, cf1.getServerId());
+//         
+//         
+//         assertEquals(1, failoverMap.size());
+//         
+//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//      }
+//            
+//      //Restart server 0
+//      
+//      ServerManagement.start("all", 0);
+//      
+//      {
+//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//         
+//         log.info("Got connection factory");
+//         
+//         ClusteredClientConnectionFactoryDelegate delegate =
+//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//         
+//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//         
+//         Map failoverMap = delegate.getFailoverMap();
+//         
+//         log.info("Got failover map");
+//         
+//         assertEquals(2, delegates.length);
+//         
+//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//         
+//         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//         
+//         log.info("cf1 serverid=" + cf1.getServerId());
+//         
+//         log.info("cf2 serverid=" + cf2.getServerId());
+//         
+//         assertEquals(2, cf1.getServerId());
+//         
+//         assertEquals(0, cf2.getServerId());
+//         
+//         
+//         assertEquals(2, failoverMap.size());
+//         
+//         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//         
+//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+//      }
+//      
+//      
+//      //Restart server 1
+//      
+//      ServerManagement.start("all", 1);
+//      
+//      {
+//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+//         
+//         log.info("Got connection factory");
+//         
+//         ClusteredClientConnectionFactoryDelegate delegate =
+//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//         
+//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//         
+//         Map failoverMap = delegate.getFailoverMap();
+//         
+//         log.info("Got failover map");
+//         
+//         assertEquals(3, delegates.length);
+//         
+//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//         
+//         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//         
+//         ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+//         
+//         log.info("cf1 serverid=" + cf1.getServerId());
+//         
+//         log.info("cf2 serverid=" + cf2.getServerId());
+//         
+//         log.info("cf3 serverid=" + cf3.getServerId());
+//         
+//         assertEquals(2, cf1.getServerId());
+//         
+//         assertEquals(0, cf2.getServerId());
+//         
+//         assertEquals(1, cf3.getServerId());
+//         
+//         
+//         assertEquals(3, failoverMap.size());
+//         
+//         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//         
+//         assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+//         
+//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+//      }            
+//   }
+//   
+//   public void testSimpleFailover() throws Exception
+//   {
+//      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//      
+//      ClusteredClientConnectionFactoryDelegate delegate =
+//         (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+//      Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+//      assertEquals(3, nodeIDView.size());
+//      
+//      ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//      
+//      ClientConnectionFactoryDelegate cf1 = delegates[0];
+//      
+//      ClientConnectionFactoryDelegate cf2 = delegates[1];
+//      
+//      ClientConnectionFactoryDelegate cf3 = delegates[2];
+//      
+//      int server0Id = cf1.getServerId();
+//      
+//      int server1Id = cf2.getServerId();
+//      
+//      int server2Id = cf3.getServerId();
+//      
+//      log.info("server 0 id: " + server0Id);
+//      
+//      log.info("server 1 id: " + server1Id);
+//      
+//      log.info("server 2 id: " + server2Id);
+//                  
+//      Map failoverMap = delegate.getFailoverMap();
+//      
+//      log.info(failoverMap.get(new Integer(server0Id)));
+//      log.info(failoverMap.get(new Integer(server1Id)));
+//      log.info(failoverMap.get(new Integer(server2Id)));
+//      
+//      int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+//      
+//      // server 1 should failover onto server 2
+//      
+//      assertEquals(server2Id, server1FailoverId);
+//      
+//      Connection conn = null;
+//      
+//      try
+//      {
+//      
+//         //Get a connection on server 1
+//         conn = factory.createConnection(); //connection on server 0
+//         
+//         conn.close();
+//         
+//         conn = factory.createConnection(); //connection on server 1
+//         
+//         JBossConnection jbc = (JBossConnection)conn;
+//         
+//         ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+//         
+//         ConnectionState state = (ConnectionState)del.getState();
+//         
+//         int initialServerID = state.getServerID();
+//         
+//         assertEquals(1, initialServerID);
+//                           
+//         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//         
+//         MessageProducer prod = sess.createProducer(queue1);
+//         
+//         MessageConsumer cons = sess.createConsumer(queue1);
+//         
+//         final int NUM_MESSAGES = 100;
+//         
+//         for (int i = 0; i < NUM_MESSAGES; i++)
+//         {
+//            TextMessage tm = sess.createTextMessage("message:" + i);
+//            
+//            prod.send(tm);
+//         }
+//         
+//         //So now, messages should be in queue1 on server 1
+//         //So we now kill server 1
+//         //Which should cause transparent failover of connection conn onto server 1
+//         
+//         log.info("************ KILLING (CRASHING) SERVER 1");
+//         
+//         ServerManagement.getServer(1).destroy();
+//         
+//         log.info("killed server, now waiting");
+//         
+//         Thread.sleep(5000);
+//         
+//         log.info("done wait");
+//         
+//         state = (ConnectionState)del.getState();
+//         
+//         int finalServerID = state.getServerID();
+//         
+//         log.info("final server id= " + finalServerID);
+//         
+//         //server id should now be 2
+//         
+//         assertEquals(2, finalServerID);
+//         
+//         conn.start();
+//         
+//         for (int i = 0; i < NUM_MESSAGES; i++)
+//         {
+//            TextMessage tm = (TextMessage)cons.receive(1000);
+//            
+//            log.info("message is " + tm);
+//            
+//            assertNotNull(tm);
+//            
+//            assertEquals("message:" + i, tm.getText());
+//         }
+//         log.info("done");
+//      }
+//      finally
+//      {         
+//         if (conn != null)
+//         {
+//            try
+//            {
+//               conn.close();
+//            }
+//            catch (Exception e)
+//            {
+//               e.printStackTrace();
+//            }
+//         }
+//      }
+//      
+//   }
    
-   /*
-    * Test that connections created using a clustered connection factory are created round robin on
-    * different servers
-    */
-   public void testRoundRobinConnectionCreation() throws Exception
-   {
-      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-      
-      ClusteredClientConnectionFactoryDelegate delegate =
-         (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-      
-      log.info ("number of delegates = " + delegate.getDelegates().length);
-      log.info ("number of servers = " + ServerManagement.getServer(0).getNodeIDView().size());
-      
-      assertEquals(3, delegate.getDelegates().length);
-      
-      ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-      
-      ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-      
-      ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-      
-      assertEquals(0, cf1.getServerId());
-      
-      assertEquals(1, cf2.getServerId());
-      
-      assertEquals(2, cf3.getServerId());
-      
-      assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
-      
-      Connection conn1 = null;
-      
-      Connection conn2 = null;
-      
-      Connection conn3 = null;
-      
-      Connection conn4 = null;
-      
-      Connection conn5 = null;
-      
-      try
-      {         
-         conn1 = factory.createConnection();
-         
-         conn2 = factory.createConnection();
-         
-         conn3 = factory.createConnection();
-         
-         conn4 = factory.createConnection();
-         
-         conn5 = factory.createConnection();
-         
-         ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
-         
-         ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
-         
-         ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
-         
-         ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
-         
-         ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
-         
-         int serverID1 = state1.getServerID();
-         
-         int serverID2 = state2.getServerID();
-         
-         int serverID3 = state3.getServerID();
-         
-         int serverID4 = state4.getServerID();
-         
-         int serverID5 = state5.getServerID();
-         
-         log.info("server id 1: " + serverID1);
-         
-         log.info("server id 2: " + serverID2);
-         
-         log.info("server id 3: " + serverID3);
-         
-         log.info("server id 4: " + serverID4);
-         
-         log.info("server id 5: " + serverID5);
-         
-         assertEquals(0, serverID1);
-         
-         assertEquals(1, serverID2);
-         
-         assertEquals(2, serverID3);
-         
-         assertEquals(0, serverID4);
-         
-         assertEquals(1, serverID5);
-      }
-      finally
-      {
-         if (conn1 != null)
-         {
-            conn1.close();
-         }
-         
-         if (conn2 != null)
-         {
-            conn2.close();
-         }
-         
-         if (conn3 != null)
-         {
-            conn3.close();
-         }
-         
-         if (conn4 != null)
-         {
-            conn4.close();
-         }
-         
-         if (conn5 != null)
-         {
-            conn5.close();
-         }
-      }
-      
-   }
- 
-   /*
-    * Test that the failover mapping is created correctly and updated properly when nodes leave
-    * or join
-    */
-   public void testDefaultFailoverMap() throws Exception
-   {     
-      {
-         JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-         
-         ClusteredClientConnectionFactoryDelegate delegate =
-            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-         
-         assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
-         
-         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-         
-         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-         
-         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-         
-         ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-         
-         //The order here depends on the order the servers were started in
-         
-         //If any servers get stopped and then started then the order will change
-    
-         log.info("cf1 serverid=" + cf1.getServerId());
-         
-         log.info("cf2 serverid=" + cf2.getServerId());
-         
-         log.info("cf3 serverid=" + cf3.getServerId());
-         
-         
-         assertEquals(0, cf1.getServerId());
-         
-         assertEquals(1, cf2.getServerId());
-         
-         assertEquals(2, cf3.getServerId());
-         
-         Map failoverMap = delegate.getFailoverMap();
-         
-         assertEquals(3, delegates.length);
-         
-         assertEquals(3, failoverMap.size());
-         
-         // Default failover policy just chooses the node to the right
-         
-         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-         
-         assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-         
-         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
-      }
-      
-      //Now cleanly stop one of the servers
-            
-      log.info("************** STOPPING SERVER 0");
-      ServerManagement.stop(0);
-      
-      log.info("server stopped");
-      
-      assertEquals(2, ServerManagement.getServer(1).getNodeIDView().size());
-      
-      {         
-         //Lookup another connection factory
-         
-         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-         
-         log.info("Got connection factory");
-         
-         ClusteredClientConnectionFactoryDelegate delegate =
-            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-         
-         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-         
-         Map failoverMap = delegate.getFailoverMap();
-         
-         log.info("Got failover map");
-         
-         assertEquals(2, delegates.length);
-         
-         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-         
-         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-         
-         //Order here depends on order servers were started in
-         
-         log.info("cf1 serverid=" + cf1.getServerId());
-         
-         log.info("cf2 serverid=" + cf2.getServerId());
-         
-         assertEquals(1, cf1.getServerId());
-         
-         assertEquals(2, cf2.getServerId());
-         
-         
-         assertEquals(2, failoverMap.size());
-         
-         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-         
-         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-      }
-      
-      //Cleanly stop another server
-      
-      log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
-      
-      ServerManagement.stop(1);
-      
-      assertEquals(1, ServerManagement.getServer(2).getNodeIDView().size());
-      
-      {         
-         //Lookup another connection factory
-         
-         JBossConnectionFactory factory =  (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
-         
-         ClusteredClientConnectionFactoryDelegate delegate =
-            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-         
-         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-         
-         Map failoverMap = delegate.getFailoverMap();
-         
-         assertEquals(1, delegates.length);
-         
-         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-         
-         assertEquals(2, cf1.getServerId());
-         
-         
-         assertEquals(1, failoverMap.size());
-         
-         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-      }
-            
-      //Restart server 0
-      
-      ServerManagement.start("all", 0);
-      
-      {
-         JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-         
-         log.info("Got connection factory");
-         
-         ClusteredClientConnectionFactoryDelegate delegate =
-            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-         
-         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-         
-         Map failoverMap = delegate.getFailoverMap();
-         
-         log.info("Got failover map");
-         
-         assertEquals(2, delegates.length);
-         
-         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-         
-         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-         
-         log.info("cf1 serverid=" + cf1.getServerId());
-         
-         log.info("cf2 serverid=" + cf2.getServerId());
-         
-         assertEquals(2, cf1.getServerId());
-         
-         assertEquals(0, cf2.getServerId());
-         
-         
-         assertEquals(2, failoverMap.size());
-         
-         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-         
-         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-      }
-      
-      
-      //Restart server 1
-      
-      ServerManagement.start("all", 1);
-      
-      {
-         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-         
-         log.info("Got connection factory");
-         
-         ClusteredClientConnectionFactoryDelegate delegate =
-            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-         
-         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-         
-         Map failoverMap = delegate.getFailoverMap();
-         
-         log.info("Got failover map");
-         
-         assertEquals(3, delegates.length);
-         
-         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-         
-         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-         
-         ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-         
-         log.info("cf1 serverid=" + cf1.getServerId());
-         
-         log.info("cf2 serverid=" + cf2.getServerId());
-         
-         log.info("cf3 serverid=" + cf3.getServerId());
-         
-         assertEquals(2, cf1.getServerId());
-         
-         assertEquals(0, cf2.getServerId());
-         
-         assertEquals(1, cf3.getServerId());
-         
-         
-         assertEquals(3, failoverMap.size());
-         
-         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-         
-         assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-         
-         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
-      }            
-   }
    
-   public void testSimpleFailover() throws Exception
+//   public void testFailoverWithUnackedMessagesClientAcknowledge() throws Exception
+//   {
+//      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//      
+//      ClusteredClientConnectionFactoryDelegate delegate =
+//         (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+//      Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+//      assertEquals(3, nodeIDView.size());
+//      
+//      ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//      
+//      ClientConnectionFactoryDelegate cf1 = delegates[0];
+//      
+//      ClientConnectionFactoryDelegate cf2 = delegates[1];
+//      
+//      ClientConnectionFactoryDelegate cf3 = delegates[2];
+//      
+//      int server0Id = cf1.getServerId();
+//      
+//      int server1Id = cf2.getServerId();
+//      
+//      int server2Id = cf3.getServerId();
+//      
+//      log.info("server 0 id: " + server0Id);
+//      
+//      log.info("server 1 id: " + server1Id);
+//      
+//      log.info("server 2 id: " + server2Id);
+//                  
+//      Map failoverMap = delegate.getFailoverMap();
+//      
+//      log.info(failoverMap.get(new Integer(server0Id)));
+//      log.info(failoverMap.get(new Integer(server1Id)));
+//      log.info(failoverMap.get(new Integer(server2Id)));
+//      
+//      int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+//      
+//      // server 1 should failover onto server 2
+//      
+//      assertEquals(server2Id, server1FailoverId);
+//      
+//      Connection conn = null;
+//      
+//      try
+//      {      
+//         //Get a connection on server 1
+//         conn = factory.createConnection(); //connection on server 0
+//         
+//         conn.close();
+//         
+//         conn = factory.createConnection(); //connection on server 1
+//         
+//         JBossConnection jbc = (JBossConnection)conn;
+//         
+//         ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+//         
+//         ConnectionState state = (ConnectionState)del.getState();
+//         
+//         int initialServerID = state.getServerID();
+//         
+//         assertEquals(1, initialServerID);
+//                           
+//         Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+//         
+//         MessageProducer prod = sess.createProducer(queue1);
+//         
+//         MessageConsumer cons = sess.createConsumer(queue1);
+//         
+//         final int NUM_MESSAGES = 100;
+//         
+//         for (int i = 0; i < NUM_MESSAGES; i++)
+//         {
+//            TextMessage tm = sess.createTextMessage("message:" + i);
+//            
+//            prod.send(tm);
+//         }
+//         
+//         conn.start();
+//         
+//         //Now consume half of the messages but don't ack them these will end up in 
+//         //client side toAck list
+//         
+//         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+//         {
+//            TextMessage tm = (TextMessage)cons.receive(500);
+//            
+//            assertNotNull(tm);
+//            
+//            assertEquals("message:" + i, tm.getText());
+//         }
+//         
+//         //So now, messages should be in queue1 on server 1
+//         //So we now kill server 1
+//         //Which should cause transparent failover of connection conn onto server 1
+//         
+//         log.info("************ KILLING (CRASHING) SERVER 1");
+//         
+//         ServerManagement.getServer(1).destroy();
+//         
+//         log.info("killed server, now waiting");
+//         
+//         Thread.sleep(5000);
+//         
+//         log.info("done wait");
+//         
+//         state = (ConnectionState)del.getState();
+//         
+//         int finalServerID = state.getServerID();
+//         
+//         log.info("final server id= " + finalServerID);
+//         
+//         //server id should now be 2
+//         
+//         assertEquals(2, finalServerID);
+//         
+//         conn.start();
+//         
+//         //Now should be able to consume the rest of the messages
+//         
+//         log.info("here1");
+//         
+//         TextMessage tm = null;
+//         
+//         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+//         {
+//            tm = (TextMessage)cons.receive(500);
+//                                    
+//            log.info("message is " + tm.getText());
+//            
+//            assertNotNull(tm);
+//            
+//            assertEquals("message:" + i, tm.getText());
+//         }
+//         
+//         log.info("here2");
+//         
+//         //Now should be able to acknowledge them
+//         
+//         tm.acknowledge();
+//         
+//         //Now check there are no more messages there
+//         sess.close();
+//         
+//         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//         
+//         cons = sess.createConsumer(queue1);
+//         
+//         Message m = cons.receive(500);
+//         
+//         assertNull(m);
+//         
+//         log.info("got to end of test");
+//      }
+//      finally
+//      {         
+//         if (conn != null)
+//         {
+//            try
+//            {
+//               conn.close();
+//            }
+//            catch (Exception e)
+//            {
+//               e.printStackTrace();
+//            }
+//         }
+//      }
+//      
+//   }
+   
+   public void testFailoverWithUnackedMessagesTransactional() throws Exception
    {
       JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
       
@@ -462,8 +769,7 @@
       Connection conn = null;
       
       try
-      {
-      
+      {      
          //Get a connection on server 1
          conn = factory.createConnection(); //connection on server 0
          
@@ -481,7 +787,7 @@
          
          assertEquals(1, initialServerID);
                            
-         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
          
          MessageProducer prod = sess.createProducer(queue1);
          
@@ -496,6 +802,22 @@
             prod.send(tm);
          }
          
+         sess.commit();
+         
+         conn.start();
+         
+         //Now consume half of the messages but don't commit them these will end up in 
+         //client side resource manager
+         
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+         {
+            TextMessage tm = (TextMessage)cons.receive(500);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message:" + i, tm.getText());
+         }
+         
          //So now, messages should be in queue1 on server 1
          //So we now kill server 1
          //Which should cause transparent failover of connection conn onto server 1
@@ -522,17 +844,41 @@
          
          conn.start();
          
-         for (int i = 0; i < NUM_MESSAGES; i++)
+         //Now should be able to consume the rest of the messages
+         
+         log.info("here1");
+         
+         TextMessage tm = null;
+         
+         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
          {
-            TextMessage tm = (TextMessage)cons.receive(1000);
+            tm = (TextMessage)cons.receive(500);
+                                    
+            log.info("message is " + tm.getText());
             
-            log.info("message is " + tm);
-            
             assertNotNull(tm);
             
             assertEquals("message:" + i, tm.getText());
          }
-         log.info("done");
+         
+         log.info("here2");
+         
+         //Now should be able to commit them
+         
+         sess.commit();
+         
+         //Now check there are no more messages there
+         sess.close();
+         
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         cons = sess.createConsumer(queue1);
+         
+         Message m = cons.receive(500);
+         
+         assertNull(m);
+         
+         log.info("got to end of test");
       }
       finally
       {         
@@ -551,6 +897,9 @@
       
    }
    
+   
+   
+   
 //   public void testEvenSimplerFailover() throws Exception
 //   {
 //      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");




More information about the jboss-cvs-commits mailing list