[jboss-cvs] JBoss Messaging SVN: r6771 - in trunk: src/main/org/jboss/messaging/core/persistence/impl/journal and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed May 13 11:16:39 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-05-13 11:16:38 -0400 (Wed, 13 May 2009)
New Revision: 6771

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/OrderingOnBackupTest.java
Removed:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
   trunk/src/main/org/jboss/messaging/core/transaction/TransactionOperation.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PreserveOrderDuringFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/util/SpawnedVMSupport.java
   trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1558 - Order of Queues on backup node during rollback and session.close (redelivery)

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -129,6 +129,11 @@
       log.trace(message);
    }
 
+   private static void trace(final String message, Exception t)
+   {
+      log.trace(message, t);
+   }
+
    // Constructors --------------------------------------------------
 
    public PagingStoreImpl(final PagingManager pagingManager,
@@ -389,6 +394,17 @@
          }
 
          currentPageLock.readLock().lock();
+         if (isTrace)
+         {
+         	if (pagingManager.isBackup())
+         	{
+            	trace("Paging Reference[" + message.getMessage(null).getMessageID() + "] on Backup");
+         	}
+         	else
+         	{
+            	trace("Paging Reference[" + message.getMessage(null).getMessageID() + "] on Live");
+         	}
+         }
 
          try
          {
@@ -469,6 +485,11 @@
             {
                if (!depaging.get())
                {
+                  if (isTrace)
+                  {
+                     trace("Starting depaging for " + this.getStoreName(), new Exception ("trace"));
+                  }
+                     		
                   depaging.set(true);
                   Runnable depageAction = new DepageRunnable(executor);
                   executor.execute(depageAction);
@@ -936,6 +957,11 @@
       final boolean globalFull = isGlobalFull(getPageSizeBytes());
       if (pageFull || globalFull || !isPaging())
       {
+         if (isTrace) 
+         {
+            trace("clearDepage::true");
+         }
+         
          depaging.set(false);
          if (!globalFull)
          {
@@ -945,6 +971,11 @@
       }
       else
       {
+         if (isTrace) 
+         {
+            trace("clearDepage::false");
+         }
+
          return false;
       }
    }
@@ -1033,13 +1064,16 @@
                // the lock and this would dead lock
                if (running && !clearDepage())
                {
+                  if (isTrace)
+                  {
+                     trace("Scheduling to depage " + PagingStoreImpl.this.getStoreName());
+                  }
                   followingExecutor.execute(this);
                }
             }
          }
          catch (Throwable e)
          {
-            e.printStackTrace();
             log.error(e, e);
          }
       }

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -28,6 +28,8 @@
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -1584,6 +1586,14 @@
       {
       }
 
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.transaction.TransactionOperation#getDistinctQueues()
+       */
+      public Collection<Queue> getDistinctQueues()
+      {
+         return Collections.emptySet();
+      }
+
    }
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -23,12 +23,15 @@
 package org.jboss.messaging.core.postoffice.impl;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.DuplicateIDCache;
+import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.TransactionOperation;
 import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
@@ -242,6 +245,14 @@
       {
       }
 
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.transaction.TransactionOperation#getDistinctQueues()
+       */
+      public Collection<Queue> getDistinctQueues()
+      {
+         return Collections.emptySet();
+      }
+
    }
    
    private static final class ByteArrayHolder

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -23,6 +23,8 @@
 package org.jboss.messaging.core.postoffice.impl;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -969,6 +971,16 @@
          messagesToPage.add(message);
       }
 
+      
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.transaction.TransactionOperation#getDistinctQueues()
+       */
+      public Collection<Queue> getDistinctQueues()
+      {
+         return Collections.emptySet();
+      }
+
       public void afterCommit(final Transaction tx) throws Exception
       {
          // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the

Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -147,4 +147,8 @@
    void deliverNow();
 
    boolean checkDLQ(MessageReference ref) throws Exception;
+   
+   void lockDelivery();
+   
+   void unlockDelivery();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -40,6 +40,8 @@
 	//void handleClose(Packet packet);
 	
 	void close() throws Exception;
+	
+	int getCountOfPendingDeliveries();
 
 	List<MessageReference> cancelRefs(boolean lastConsumedAsDelivered, Transaction tx) throws Exception;
 	

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -17,6 +17,7 @@
 import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ORIG_MESSAGE_ID;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -27,6 +28,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -112,6 +114,8 @@
 
    private final PagingManager pagingManager;
 
+   private final Semaphore lock = new Semaphore(1);
+
    private volatile PagingStore pagingStore;
 
    private final StorageManager storageManager;
@@ -328,6 +332,48 @@
 
    // Queue implementation ----------------------------------------------------------------------------------------
 
+   public void lockDelivery()
+   {
+      if (backup)
+      {
+         return;
+      }
+      
+      if (trace)
+      {
+         log.trace("Trying to lock queue=" + this.name + ", backup=" + this.backup, new Exception("trace"));
+      }
+      
+      try
+      {
+         lock.acquire();
+      }
+      catch (InterruptedException e)
+      {
+         log.warn(e.getMessage(), e);
+      }
+
+      if (trace)
+      {
+         log.trace("Locked, queue=" + this.name + ", backup=" + this.backup, new Exception("trace"));
+      }
+   }
+
+   public void unlockDelivery()
+   {
+      if (backup)
+      {
+         return;
+      }
+      
+      lock.release();
+
+      if (trace)
+      {
+         log.trace("UN-Locked, queue=" + this.name + ", backup = " + this.backup, new Exception("trace"));
+      }
+   }
+
    public boolean isDurable()
    {
       return durable;
@@ -359,8 +405,13 @@
    }
 
    public void addLast(final MessageReference ref)
-   {    
-      add(ref, false);      
+   {
+   
+      if (trace)
+      {
+      	log.trace("AddLast(" + this.getName() +  (backup?"@Backup":"@Live") + "::" + ref);
+      }
+      add(ref, false);
    }
 
    public void addFirst(final MessageReference ref)
@@ -425,11 +476,11 @@
       if (delay > 0)
       {
          if (consumers.size() == 0 && messageReferences.size() > 0)
-         {         
+         {
             DelayedAddRedistributor dar = new DelayedAddRedistributor(executor, replicatingChannel);
-   
+
             future = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
-   
+
             futures.add(future);
          }
       }
@@ -989,6 +1040,12 @@
    // Public
    // -----------------------------------------------------------------------------
 
+   /** To be used on tests only. Do not use it otherwise */
+   public PriorityLinkedList<MessageReference> getReferencesList()
+   {
+      return this.messageReferences;
+   }
+
    @Override
    public boolean equals(final Object other)
    {
@@ -1196,7 +1253,7 @@
    /*
     * Attempt to deliver all the messages in the queue
     */
-   private void deliver()
+   private synchronized void deliver()
    {
       // We don't do actual delivery if the queue is on a backup node - this is
       // because it's async and could get out of step
@@ -1470,10 +1527,15 @@
          // Must be set to false *before* executing to avoid race
          waitingToDeliver.set(false);
 
-         synchronized (QueueImpl.this)
+         QueueImpl.this.lockDelivery();
+         try
          {
             deliver();
          }
+         finally
+         {
+            QueueImpl.this.unlockDelivery();
+         }
       }
    }
 
@@ -1535,6 +1597,26 @@
          }
       }
 
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.transaction.TransactionOperation#getDistinctQueues()
+       */
+      public synchronized Collection<Queue> getDistinctQueues()
+      {
+         HashSet<Queue> queues = new HashSet<Queue>();
+
+         for (MessageReference ref : refsToAck)
+         {
+            queues.add(ref.getQueue());
+         }
+
+         for (MessageReference ref : refsToAdd)
+         {
+            queues.add(ref.getQueue());
+         }
+
+         return queues;
+      }
+
       public void afterCommit(final Transaction tx) throws Exception
       {
          for (MessageReference ref : refsToAdd)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -240,9 +240,18 @@
       {
          MessageReference ref = iter.next();
 
+         if (trace)
+         {
+            log.trace("Adding reference " + ref + " into a Transaction for close/cancel");
+         }
+
          ref.getQueue().cancel(tx, ref);
       }
 
+      if (trace)
+      {
+         log.trace("***************** tx.rollback being called now *****************");
+      }
       tx.rollback();
 
       if (!browseOnly)
@@ -267,6 +276,11 @@
       }
    }
 
+   public int getCountOfPendingDeliveries()
+   {
+      return deliveringRefs.size();
+   }
+   
    public LinkedList<MessageReference> cancelRefs(final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
    {
       boolean performACK = lastConsumedAsDelivered;
@@ -363,11 +377,12 @@
          {
             throw new IllegalStateException(System.identityHashCode(this) + " Could not find reference on consumerID=" +
                                             id +
-                                            ", messageId " +
+                                            ", messageId = " +
                                             messageID +
-                                            " backup " +
+                                            " backup = " +
                                             messageQueue.isBackup() +
-                                            " closed " +
+                                            " queue = " + messageQueue.getName() + 
+                                            " closed = " +
                                             closed);
          }
 
@@ -424,6 +439,11 @@
 
    public void deliverReplicated(final long messageID) throws Exception
    {
+      if (trace)
+      {
+         log.trace("Replicating delivery Reference[" + messageID + "] queueOnConsumer=" + messageQueue.getName());
+      }
+
       MessageReference ref = messageQueue.removeFirstReference(messageID);
 
       if (ref == null)
@@ -434,8 +454,8 @@
          // force a depage
          if (!store.readPage()) // This returns false if there are no pages
          {
-            throw new IllegalStateException("Cannot find ref " + messageID +                                          
-                                            " in queue " +
+            throw new IllegalStateException("Cannot find Reference[" + messageID +
+                                            "] in queue " +
                                             messageQueue.getName());
          }
          else
@@ -444,7 +464,8 @@
 
             if (ref == null)
             {
-               throw new IllegalStateException("Cannot find ref after depaging");
+               throw new IllegalStateException("Cannot find Reference[" + messageID +
+                                            "] after depaging on Queue " + messageQueue.getName());
             }
          }
       }
@@ -486,12 +507,30 @@
 
    // Public ---------------------------------------------------------------------------------------
 
-   
-   /** Only use this on tests */
+   /** To be used on tests only */
    public AtomicInteger getAvailableCredits()
    {
       return availableCredits;
    }
+
+   /** To be used on tests only */
+   public java.util.Queue<MessageReference> getDeliveringRefs()
+   {
+      return deliveringRefs;
+   }
+
+   /** To be used on tests only */
+   public ServerSession getSession()
+   {
+      return session;
+   }
+
+   /** To be used on tests only */
+   public long getReplicatedSessionID()
+   {
+      return replicatedSessionID;
+   }
+
    // Private --------------------------------------------------------------------------------------
 
    private void promptDelivery()
@@ -642,7 +681,7 @@
 
       if (replicatingChannel == null)
       {
-         // it doesn't need lock because deliverLargeMesasge is already inside the lock.lock()
+         // it doesn't need lock because deliverLargeMesasge is already inside the lock()
          largeMessageDeliverer = localDeliverer;
          largeMessageDeliverer.deliver();
       }
@@ -697,6 +736,10 @@
       {
          // Not replicated - just send now
 
+         if (trace)
+         {
+            log.trace("delivering Message " + ref + " on backup");
+         }
          channel.send(packet);
       }
       else
@@ -708,6 +751,10 @@
          {
             public void run()
             {
+               if (trace)
+               {
+                  log.trace("delivering Message " + ref + " on live");
+               }
                channel.send(packet);
             }
          });

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -178,7 +178,7 @@
    private final QueueFactory queueFactory;
 
    private final SimpleString nodeID;
-   
+
    private boolean backup;
 
    // The current currentLargeMessage being processed
@@ -263,7 +263,7 @@
       this.nodeID = server.getNodeID();
 
       this.replicatingChannel = replicatingChannel;
-      
+
       this.backup = backup;
    }
 
@@ -316,7 +316,7 @@
          throw new IllegalStateException("Cannot find consumer with id " + consumer.getID() + " to remove");
       }
    }
-   
+
    public void close() throws Exception
    {
       if (tx != null && tx.getXid() == null)
@@ -348,7 +348,7 @@
             log.error("Failed to delete large message file", error);
          }
       }
-      
+
       remotingConnection.removeFailureListener(this);
    }
 
@@ -378,15 +378,15 @@
    public void handleCreateQueue(final CreateQueueMessage packet)
    {
       if (replicatingChannel == null)
-      {         
+      {
          doHandleCreateQueue(packet);
       }
       else
-      {       
+      {
          replicatingChannel.replicatePacket(packet, oppositeChannelID, new Runnable()
          {
             public void run()
-            {              
+            {
                doHandleCreateQueue(packet);
             }
          });
@@ -487,7 +487,7 @@
    {
       if (replicatingChannel == null)
       {
-         doHandleCommit(packet);
+            doHandleCommit(packet);
       }
       else
       {
@@ -502,18 +502,32 @@
    }
 
    public void handleRollback(final RollbackMessage packet)
-   {            
+   {
+      
+      
       if (replicatingChannel == null)
       {
          doHandleRollback(packet);
       }
       else
       {
+         final HashSet<Queue> queues = lockUsedQueues();
+
          replicatingChannel.replicatePacket(packet, oppositeChannelID, new Runnable()
          {
             public void run()
             {
-               doHandleRollback(packet);              
+               try
+               {
+                  doHandleRollback(packet);
+               }
+               finally
+               {
+                  for (Queue queue : queues)
+                  {
+                     queue.unlockDelivery();
+                  }
+               }
             }
          });
       }
@@ -734,7 +748,7 @@
          });
       }
    }
-   
+
    private void lockConsumers()
    {
       for (ServerConsumer consumer : consumers.values())
@@ -750,23 +764,29 @@
          consumer.unlock();
       }
    }
-   
+
    public void handleStart(final Packet packet)
    {
-      if (replicatingChannel != null)
-      {         
+      if (replicatingChannel == null)
+      {
+         setStarted(true);
+
+         channel.confirm(packet);
+      }
+      else
+      {
          lockConsumers();
-         
+
          try
-         {         
+         {
             setStarted(true);
-                        
+
             replicatingChannel.replicatePacket(packet, oppositeChannelID, new Runnable()
             {
                public void run()
                {
-                  //setStarted(true);
-   
+                  // setStarted(true);
+
                   channel.confirm(packet);
                }
             });
@@ -776,12 +796,6 @@
             unlockConsumers();
          }
       }
-      else
-      {
-         setStarted(true);
-
-         channel.confirm(packet);
-      }
    }
 
    public void handleStop(final Packet packet)
@@ -797,23 +811,31 @@
       // delivery is processed on backup
       // it's stopped so barfs and cannot process delivery
 
-      if (replicatingChannel != null)
+      if (replicatingChannel == null)
       {
+         setStarted(false);
+
+         channel.confirm(packet);
+
+         channel.send(response);
+      }
+      else
+      {
          lockConsumers();
-         
+
          try
          {
-         
+
             setStarted(false);
-            
+
             replicatingChannel.replicatePacket(packet, oppositeChannelID, new Runnable()
             {
                public void run()
                {
                   channel.confirm(packet);
-   
+
                   channel.send(response);
-      
+
                }
             });
          }
@@ -822,14 +844,6 @@
             unlockConsumers();
          }
       }
-      else
-      {
-         setStarted(false);
-         
-         channel.confirm(packet);
-
-         channel.send(response);
-      }
    }
 
    public void handleFailedOver(final Packet packet)
@@ -844,12 +858,15 @@
 
    public void handleClose(final Packet packet)
    {
+      
       if (replicatingChannel == null)
       {
          doHandleClose(packet);
       }
       else
       {
+         final HashSet<Queue> queues = lockUsedQueues();
+
          // We need to stop the consumers first before replicating, to ensure no deliveries occur after this,
          // but we need to process the actual close() when the replication response returns, otherwise things
          // can happen like acks can come in after close
@@ -863,22 +880,46 @@
          {
             public void run()
             {
-               doHandleClose(packet);
+               try
+               {
+                  doHandleClose(packet);
+               }
+               finally
+               {
+                  for (Queue queue : queues)
+                  {
+                     queue.unlockDelivery();
+                  }
+               }
             }
          });
       }
    }
 
    public void handleCloseConsumer(final SessionConsumerCloseMessage packet)
-   {      
+   {
       final ServerConsumer consumer = consumers.get(packet.getConsumerID());
-           
+      
+      consumer.setStarted(false);
+
       if (replicatingChannel == null)
-      {         
+      {
          doHandleCloseConsumer(packet, consumer);
       }
       else
       {
+         final Queue queue;
+         
+         if (consumer.getCountOfPendingDeliveries() > 0)
+         {
+            queue = consumer.getQueue();
+            queue.lockDelivery();
+         }
+         else
+         {
+            queue = null;
+         }
+         
          // We need to stop the consumer first before replicating, to ensure no deliveries occur after this,
          // but we need to process the actual close() when the replication response returns, otherwise things
          // can happen like acks can come in after close
@@ -889,14 +930,22 @@
          {
             public void run()
             {
-               doHandleCloseConsumer(packet, consumer);
+               try
+               {
+                  doHandleCloseConsumer(packet, consumer);
+               }
+               finally
+               {
+                  if (queue != null)
+                  {
+                     queue.unlockDelivery();
+                  }
+               }
             }
          });
       }
    }
 
-   
-
    public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
    {
       if (replicatingChannel == null)
@@ -957,17 +1006,17 @@
                }
 
                currentLargeMessage = msg;
-               
+
                doSendLargeMessage(packet);
             }
          });
       }
    }
-   
+
    public void handleSend(final SessionSendMessage packet)
    {
       if (replicatingChannel == null)
-      {                  
+      {
          doSend(packet);
       }
       else
@@ -1003,7 +1052,7 @@
    public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)
    {
       ServerConsumer consumer = consumers.get(packet.getConsumerID());
-      
+
       if (consumer == null)
       {
          throw new IllegalStateException("Cannot handle replicated delivery, consumer is closed " + packet.getConsumerID() +
@@ -1044,21 +1093,21 @@
       {
          // Put the id back to the original client session id
          this.id = this.oppositeChannelID;
-         
+
          this.oppositeChannelID = -1;
-         
+
          backup = false;
       }
-      
+
       remotingConnection.removeFailureListener(this);
-      
-      //Note. We do not destroy the replicating connection here. In the case the live server has really crashed
-      //then the connection will get cleaned up anyway when the server ping timeout kicks in.
-      //In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing
-      //the replicating connection will cause the outstanding responses to be be replayed on the live server,
-      //if these reach the client who then subsequently fails over, on reconnection to backup, it will have
-      //received responses that the backup did not know about.
 
+      // Note. We do not destroy the replicating connection here. In the case the live server has really crashed
+      // then the connection will get cleaned up anyway when the server ping timeout kicks in.
+      // In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing
+      // the replicating connection will cause the outstanding responses to be be replayed on the live server,
+      // if these reach the client who then subsequently fails over, on reconnection to backup, it will have
+      // received responses that the backup did not know about.
+
       channel.transferConnection(newConnection, this.id, replicatingChannel);
 
       newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
@@ -1068,7 +1117,7 @@
       remotingConnection.addFailureListener(this);
 
       int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
- 
+
       channel.replayCommands(lastReceivedCommandID, this.id);
 
       if (wasStarted)
@@ -1078,7 +1127,7 @@
 
       return serverLastReceivedCommandID;
    }
-   
+
    public Channel getChannel()
    {
       return channel;
@@ -1139,6 +1188,7 @@
       }
 
    }
+
    // Public
    // ----------------------------------------------------------------------------
 
@@ -1146,7 +1196,7 @@
    {
       return tx;
    }
-   
+
    // Private
    // ----------------------------------------------------------------------------
 
@@ -1176,9 +1226,9 @@
 
       channel.confirm(packet);
 
-      channel.send(response);           
+      channel.send(response);
    }
-   
+
    private void doHandleCreateConsumer(final SessionCreateConsumerMessage packet)
    {
       SimpleString name = packet.getQueueName();
@@ -1188,7 +1238,7 @@
       boolean browseOnly = packet.isBrowseOnly();
 
       Packet response = null;
-      
+
       try
       {
          Binding binding = postOffice.getBinding(name);
@@ -1231,7 +1281,8 @@
             theQueue = (Queue)binding.getBindable();
          }
 
-         ServerConsumer consumer = new ServerConsumerImpl(server, idGenerator.generateID(),
+         ServerConsumer consumer = new ServerConsumerImpl(server,
+                                                          idGenerator.generateID(),
                                                           oppositeChannelID,
                                                           this,
                                                           (QueueBinding)binding,
@@ -1262,7 +1313,7 @@
             props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
 
             props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
-            
+
             if (filterString != null)
             {
                props.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
@@ -1473,7 +1524,7 @@
          List<SimpleString> names = new ArrayList<SimpleString>();
 
          Bindings bindings = postOffice.getMatchingBindings(address);
-         
+
          for (Binding binding : bindings.getBindings())
          {
             if (binding.getType() == BindingType.LOCAL_QUEUE)
@@ -1510,7 +1561,7 @@
       try
       {
          ServerConsumer consumer = consumers.get(packet.getConsumerID());
-         
+
          consumer.acknowledge(autoCommitAcks, tx, packet.getMessageID());
 
          if (packet.isRequiresResponse())
@@ -2163,7 +2214,7 @@
 
       channel.confirm(packet);
 
-      //We flush the confirmations to make sure any send confirmations get handled on the client side
+      // We flush the confirmations to make sure any send confirmations get handled on the client side
       channel.flushConfirmations();
 
       channel.send(response);
@@ -2200,7 +2251,6 @@
          log.error("Failed to create large message", e);
          Packet response = null;
 
-         
          channel.confirm(packet);
          if (response != null)
          {
@@ -2228,7 +2278,7 @@
       try
       {
          long id = storageManager.generateUniqueID();
-                 
+
          currentLargeMessage.setMessageID(id);
       }
       catch (Exception e)
@@ -2246,9 +2296,9 @@
       try
       {
          ServerMessage message = packet.getServerMessage();
-         
+
          long id = storageManager.generateUniqueID();
-         
+
          message.setMessageID(id);
 
          if (message.getDestination().equals(managementAddress))
@@ -2315,7 +2365,7 @@
             currentLargeMessage = null;
 
             message.complete();
-                       
+
             send(message);
          }
 
@@ -2380,7 +2430,7 @@
    {
       LargeServerMessage largeMessage = storageManager.createLargeMessage();
 
-      MessagingBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header); 
+      MessagingBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
 
       largeMessage.decodeProperties(headerBuffer);
 
@@ -2458,7 +2508,33 @@
          postOffice.route(msg, tx);
       }
    }
+   
+   /**
+    * We need to avoid delivery when rolling back while doing replication, or the backup node could be on a different order
+    * @return
+    */
+   private HashSet<Queue> lockUsedQueues()
+   {
+      final HashSet<Queue> queues = new HashSet<Queue>();
+      
+      for (ServerConsumer consumer : consumers.values())
+      {
+         queues.add(consumer.getQueue());
+      }
+      
+      if (tx != null)
+      {
+         queues.addAll(tx.getDistinctQueues());
+      }
+      
+      for (Queue queue : queues)
+      {
+         queue.lockDelivery();
+      }
+      return queues;
+   }
 
+
    private void doSecurity(final ServerMessage msg) throws Exception
    {
       try

Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -22,9 +22,12 @@
 
 package org.jboss.messaging.core.transaction;
 
+import java.util.Set;
+
 import javax.transaction.xa.Xid;
 
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.Queue;
 
 /**
  * A JBoss Messaging internal transaction
@@ -67,6 +70,8 @@
    void putProperty(int index, Object property);
    
    Object getProperty(int index);
+   
+   Set<Queue> getDistinctQueues();
 
    static enum State
    {

Modified: trunk/src/main/org/jboss/messaging/core/transaction/TransactionOperation.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/TransactionOperation.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/transaction/TransactionOperation.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -22,6 +22,10 @@
 
 package org.jboss.messaging.core.transaction;
 
+import java.util.Collection;
+
+import org.jboss.messaging.core.server.Queue;
+
 /**
  * 
  * A TransactionOperation
@@ -31,6 +35,10 @@
  */
 public interface TransactionOperation
 {
+   
+   /** rollback will need a distinct list of Queues in order to lock those queues before calling rollback */
+   Collection<Queue> getDistinctQueues();
+   
    void beforePrepare(Transaction tx) throws Exception;
    
    void beforeCommit(Transaction tx) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -13,7 +13,10 @@
 package org.jboss.messaging.core.transaction.impl;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import javax.transaction.xa.Xid;
 
@@ -21,6 +24,7 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.TransactionOperation;
 import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
@@ -37,11 +41,11 @@
    private List<TransactionOperation> operations;
 
    private static final Logger log = Logger.getLogger(TransactionImpl.class);
-   
+
    private static final int INITIAL_NUM_PROPERTIES = 10;
-      
+
    private Object[] properties = new Object[INITIAL_NUM_PROPERTIES];
-      
+
    private final StorageManager storageManager;
 
    private final Xid xid;
@@ -55,7 +59,7 @@
    private final Object timeoutLock = new Object();
 
    private final long createTime;
-   
+
    public TransactionImpl(final StorageManager storageManager)
    {
       this.storageManager = storageManager;
@@ -92,6 +96,29 @@
    // Transaction implementation
    // -----------------------------------------------------------
 
+   public Set<Queue> getDistinctQueues()
+   {
+      HashSet<Queue> queues = new HashSet<Queue>();
+
+      if (operations != null)
+      {
+         for (TransactionOperation op : operations)
+         {
+            Collection<Queue> q = op.getDistinctQueues();
+            if (q == null)
+            {
+               log.warn("Operation " + op + " returned null getDistinctQueues");
+            }
+            else
+            {
+               queues.addAll(q);
+            }
+         }
+      }
+
+      return queues;
+   }
+
    public long getID()
    {
       return id;
@@ -114,7 +141,7 @@
             }
             else
             {
-               //Do nothing
+               // Do nothing
                return;
             }
          }
@@ -127,7 +154,7 @@
          {
             throw new IllegalStateException("Cannot prepare non XA transaction");
          }
-         
+
          if (operations != null)
          {
             for (TransactionOperation operation : operations)
@@ -146,12 +173,12 @@
             {
                operation.afterPrepare(this);
             }
-         }                 
+         }
       }
    }
 
    public void commit() throws Exception
-   {           
+   {
       commit(true);
    }
 
@@ -167,7 +194,7 @@
             }
             else
             {
-               //Do nothing
+               // Do nothing
                return;
             }
          }
@@ -176,7 +203,7 @@
          {
             if (onePhase)
             {
-               if(state == State.ACTIVE)
+               if (state == State.ACTIVE)
                {
                   prepare();
                }
@@ -202,7 +229,7 @@
             }
          }
 
-         if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || (xid != null && state == State.PREPARED) )
+         if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || (xid != null && state == State.PREPARED))
          {
             storageManager.commit(id);
          }
@@ -237,7 +264,7 @@
                throw new IllegalStateException("Transaction is in invalid state " + state);
             }
          }
-         
+
          if (operations != null)
          {
             for (TransactionOperation operation : operations)
@@ -256,7 +283,7 @@
             {
                operation.afterRollback(this);
             }
-         }                  
+         }
       }
    }
 
@@ -282,7 +309,7 @@
    {
       return state;
    }
-   
+
    public void setState(final State state)
    {
       this.state = state;
@@ -313,7 +340,7 @@
 
       operations.remove(operation);
    }
-   
+
    public int getOperationsCount()
    {
       return operations.size();
@@ -324,20 +351,20 @@
       if (index >= properties.length)
       {
          Object[] newProperties = new Object[index];
-         
+
          System.arraycopy(properties, 0, newProperties, 0, properties.length);
-         
+
          properties = newProperties;
       }
-      
-      properties[index] = property;      
+
+      properties[index] = property;
    }
-   
+
    public Object getProperty(int index)
    {
       return properties[index];
    }
-   
+
    // Private
    // -------------------------------------------------------------------
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -929,6 +929,8 @@
                   {
                      Thread.sleep(10);
                   }
+                  
+                  assertNull(consumerImpl.getAvailableCredits());
                }
             }
          }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -1281,7 +1281,7 @@
 
    protected int getLatchWait()
    {
-      return 20000;
+      return 60000;
    }
 
    protected int getNumIterations()

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/OrderingOnBackupTest.java (from rev 6694, trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/OrderingOnBackupTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/OrderingOnBackupTest.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -0,0 +1,924 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.PagedMessage;
+import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
+import org.jboss.messaging.core.postoffice.QueueBinding;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.QueueImpl;
+import org.jboss.messaging.core.server.impl.ServerConsumerImpl;
+import org.jboss.messaging.tests.util.RandomUtil;
+import org.jboss.messaging.tests.util.SpawnedVMSupport;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * 
+ * It validates if the messages are in the same ordering on the page system between the backup and live nodes.
+ * 
+ * This test is valid as long as we want to guarantee strict ordering on both nodes for paged messages between backup and live nodes.
+ * 
+ * If we change this concept anyway this test may become invalid and we would need to delete it.
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class OrderingOnBackupTest extends FailoverTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(OrderingOnBackupTest.class);
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   private static void debug(String message)
+   {
+      log.info(message);
+   }
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   public void testPageOrderingLiveAndBackupProducerOnly() throws Exception
+   {
+      internalTestPageOrderingLiveAndBackup(false);
+   }
+
+   public void testPageOrderingLiveAndBackupConsume() throws Exception
+   {
+      internalTestPageOrderingLiveAndBackup(true);
+   }
+
+   private void internalTestPageOrderingLiveAndBackup(boolean consumeMessages) throws Exception
+   {
+      final SimpleString threadIDKey = new SimpleString("THREAD_ID");
+      final SimpleString sequenceIDKey = new SimpleString("SEQUENCE_ID");
+      final SimpleString ADDRESS = new SimpleString("SOME_QUEUE");
+
+      final int NUMBER_OF_THREADS = 100;
+      final int NUMBER_OF_MESSAGES = 200;
+
+      final int NUMBER_OF_HANDLERS = consumeMessages ? NUMBER_OF_THREADS : 0;
+
+      setUpFailoverServers(true, 100 * 1024, 50 * 1024);
+
+      final ClientSessionFactory factory = createFailoverFactory();
+
+      ClientSession session = factory.createSession(false, true, true);
+      for (int i = 0; i < NUMBER_OF_THREADS; i++)
+      {
+         session.createQueue(ADDRESS, ADDRESS.concat("-" + i), true);
+      }
+      session.close();
+
+      MyHandler handlers[] = new MyHandler[NUMBER_OF_HANDLERS];
+
+      for (int i = 0; i < handlers.length; i++)
+      {
+         handlers[i] = new MyHandler(factory, ADDRESS.concat("-" + i), NUMBER_OF_MESSAGES * 10);
+      }
+
+      final CountDownLatch flagAlign = new CountDownLatch(NUMBER_OF_THREADS);
+      final CountDownLatch flagStart = new CountDownLatch(1);
+
+      class ProducerThread extends Thread
+      {
+         Throwable e;
+
+         final int threadID;
+
+         ProducerThread(int threadID)
+         {
+            this.threadID = threadID;
+         }
+
+         public void run()
+         {
+            try
+            {
+               ClientSession session = factory.createSession(false, true, true);
+               ClientProducer producer = session.createProducer(ADDRESS);
+
+               // I want to jinx all this by having everybody start sending at the same time
+               flagAlign.countDown();
+               flagStart.await();
+
+               for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+               {
+                  ClientMessage msg = session.createClientMessage(true);
+                  msg.setBody(ChannelBuffers.wrappedBuffer(new byte[512]));
+                  msg.getProperties().putIntProperty(threadIDKey, this.threadID);
+                  msg.getProperties().putIntProperty(sequenceIDKey, i);
+                  producer.send(msg);
+               }
+
+               session.close();
+
+            }
+            catch (Throwable e)
+            {
+               // System.out => Hudson/JUNIT reports
+               e.printStackTrace();
+               this.e = e;
+            }
+
+         }
+      }
+
+      ProducerThread threads[] = new ProducerThread[NUMBER_OF_THREADS];
+
+      for (int i = 0; i < threads.length; i++)
+      {
+         threads[i] = new ProducerThread(i);
+         threads[i].start();
+      }
+
+      assertTrue("Error initializing some of the threads", flagAlign.await(10, TimeUnit.SECONDS));
+
+      flagStart.countDown();
+
+      for (ProducerThread t : threads)
+      {
+         t.join();
+      }
+
+      for (ProducerThread t : threads)
+      {
+         if (t.e != null)
+         {
+            throw new Exception("Test Failed", t.e);
+         }
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         handler.close();
+         if (handler.failure != null)
+         {
+            throw new Exception("Failure on consumer", handler.failure);
+         }
+      }
+
+      PagingManager livePagingManager = liveServer.getPostOffice().getPagingManager();
+      PagingManager backupPagingManager = backupServer.getPostOffice().getPagingManager();
+
+      TestSupportPageStore livePagingStore = (TestSupportPageStore)livePagingManager.getPageStore(ADDRESS);
+      TestSupportPageStore backupPagingStore = (TestSupportPageStore)backupPagingManager.getPageStore(ADDRESS);
+
+      debug("Pages: " + livePagingStore.getNumberOfPages() + " on backup: " + backupPagingStore.getNumberOfPages());
+
+      if (consumeMessages)
+      {
+         if (livePagingStore.getNumberOfPages() == backupPagingStore.getNumberOfPages() - 1)
+         {
+            // The live node may have one extra page in front of the backup
+            backupPagingStore.depage();
+         }
+      }
+
+      assertEquals(livePagingStore.getNumberOfPages(), backupPagingStore.getNumberOfPages());
+
+      Page livePage = null;
+      Page backupPage = null;
+
+      while (true)
+      {
+         livePage = livePagingStore.depage();
+
+         if (livePage == null)
+         {
+            assertNull(backupPagingStore.depage());
+            break;
+         }
+
+         backupPage = backupPagingStore.depage();
+
+         assertNotNull(backupPage);
+
+         livePage.open();
+         backupPage.open();
+
+         List<PagedMessage> liveMessages = livePage.read();
+         List<PagedMessage> backupMessages = backupPage.read();
+
+         livePage.close();
+         backupPage.close();
+
+         assertEquals(liveMessages.size(), backupMessages.size());
+
+         Iterator<PagedMessage> backupIterator = backupMessages.iterator();
+
+         for (PagedMessage liveMsg : liveMessages)
+         {
+            PagedMessage backupMsg = backupIterator.next();
+            assertNotNull(backupMsg);
+
+            ServerMessage liveSrvMsg = liveMsg.getMessage(null);
+            ServerMessage backupSrvMsg = liveMsg.getMessage(null);
+
+            assertEquals(liveSrvMsg.getMessageID(), backupSrvMsg.getMessageID());
+            assertEquals(liveSrvMsg.getProperty(threadIDKey), backupSrvMsg.getProperty(threadIDKey));
+            assertEquals(liveSrvMsg.getProperty(sequenceIDKey), backupSrvMsg.getProperty(sequenceIDKey));
+         }
+      }
+
+   }
+
+   public void testDeliveryOrderOnTransactionalRollbackMultiThread() throws Exception
+   {
+
+      final SimpleString ADDRESS = new SimpleString("TEST");
+      final SimpleString PROPERTY_KEY = new SimpleString("KEY-STR");
+
+      final AtomicInteger errors = new AtomicInteger(0);
+
+      int NTHREADS = 30;
+      final int NMESSAGES = 1000;
+
+      class ProdThread extends Thread
+      {
+         final CountDownLatch latchAlign;
+
+         final CountDownLatch latchStart;
+
+         final ClientSessionFactory sf;
+
+         ProdThread(final CountDownLatch latchAlign, final CountDownLatch latchStart, final ClientSessionFactory sf)
+         {
+            this.latchAlign = latchAlign;
+            this.latchStart = latchStart;
+            this.sf = sf;
+         }
+
+         @Override
+         public void run()
+         {
+            ClientSession sess = null;
+            try
+            {
+               latchAlign.countDown();
+               latchStart.await();
+
+               sess = sf.createSession(false, false, false);
+
+               ClientProducer prod = sess.createProducer(ADDRESS);
+
+               for (int i = 0; i < NMESSAGES; i++)
+               {
+                  ClientMessage msg = createTextMessage(sess, "test" + i, false);
+                  msg.putStringProperty(PROPERTY_KEY, RandomUtil.randomSimpleString());
+                  prod.send(msg);
+               }
+               
+               sess.commit();
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+               errors.incrementAndGet();
+            }
+            finally
+            {
+               try
+               {
+                  sess.close();
+               }
+               catch (Throwable ignored)
+               {
+               }
+            }
+
+         }
+      };
+
+      class ConsumerThread extends Thread
+      {
+         final ClientSessionFactory sf;
+
+         volatile ClientSession sess;
+
+         final CountDownLatch latchAlign;
+
+         final CountDownLatch latchStart;
+
+         final boolean rollback;
+
+         ConsumerThread(final ClientSessionFactory sf,
+                        final CountDownLatch latchAlign,
+                        final CountDownLatch latchStart,
+                        final boolean rollback)
+         {
+            this.sf = sf;
+            this.latchAlign = latchAlign;
+            this.latchStart = latchStart;
+            this.rollback = rollback;
+         }
+
+         @Override
+         public void run()
+         {
+            ClientConsumer cons = null;
+            try
+            {
+               latchAlign.countDown();
+               latchStart.await();
+
+               sess = sf.createSession(false, false, false);
+
+               cons = sess.createConsumer(ADDRESS);
+
+               sess.start();
+
+               ClientMessage msg = null;
+
+               while ((msg = cons.receive(1000)) != null)
+               {
+                  msg.acknowledge();
+               }
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+               errors.incrementAndGet();
+            }
+            finally
+            {
+               try
+               {
+                  if (rollback)
+                  {
+                     sess.rollback();
+                     cons.close();
+                  }
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace();
+                  errors.incrementAndGet();
+               }
+
+            }
+         }
+      };
+
+      this.setUpFailoverServers(false, -1, 512);
+
+      ClientSessionFactory sf = createFailoverFactory();
+      sf.setConsumerWindowSize(-1);
+
+      ClientSession s = sf.createSession(false, true, true);
+
+      s.createQueue(ADDRESS, ADDRESS, true);
+
+      s.close();
+
+      CountDownLatch latchAlign = new CountDownLatch(NTHREADS);
+
+      CountDownLatch latchStart = new CountDownLatch(1);
+
+      ProdThread pthreads[] = new ProdThread[NTHREADS];
+
+      for (int i = 0; i < NTHREADS; i++)
+      {
+         pthreads[i] = new ProdThread(latchAlign, latchStart, sf);
+         pthreads[i].start();
+      }
+
+      latchAlign.await();
+      latchStart.countDown();
+
+      for (Thread t : pthreads)
+      {
+         t.join();
+      }
+
+      assertEquals(0, errors.get());
+
+      compareQueues(ADDRESS, PROPERTY_KEY, NTHREADS * NMESSAGES);
+
+      ConsumerThread cthreads[] = new ConsumerThread[NTHREADS];
+
+      log.info("**********************  Consuming messages ****************************");
+
+      latchAlign = new CountDownLatch(NTHREADS);
+
+      latchStart = new CountDownLatch(1);
+
+      for (int i = 0; i < NTHREADS; i++)
+      {
+         // 50% of the consumers will close the session without ACKing messages what cause them to be redelivered.
+         // This shouldn't affect delivery on backup
+         cthreads[i] = new ConsumerThread(sf, latchAlign, latchStart, i % 2 == 0);
+         cthreads[i].start();
+      }
+
+      latchAlign.await();
+      latchStart.countDown();
+
+      for (ConsumerThread t : cthreads)
+      {
+         t.join();
+      }
+
+      assertEquals(0, errors.get());
+
+      compareConsumers(ADDRESS, PROPERTY_KEY, NTHREADS / 2);
+
+      for (ConsumerThread t : cthreads)
+      {
+         if (t.sess != null)
+         {
+            t.sess.close();
+         }
+      }
+
+      sf.close();
+
+      compareQueues(ADDRESS, PROPERTY_KEY, NTHREADS * NMESSAGES);
+
+      stopServers();
+      // ClientProducer p = s
+
+   }
+
+   public void testDeliveryOrderOnRedeliveryMultiThread() throws Exception
+   {
+
+      final SimpleString ADDRESS = new SimpleString("TEST");
+      final SimpleString PROPERTY_KEY = new SimpleString("KEY-STR");
+
+      final AtomicInteger errors = new AtomicInteger(0);
+
+      int NTHREADS = 30;
+      final int NMESSAGES = 1000;
+
+      class ProdThread extends Thread
+      {
+         final CountDownLatch latchAlign;
+
+         final CountDownLatch latchStart;
+
+         final ClientSessionFactory sf;
+
+         ProdThread(final CountDownLatch latchAlign, final CountDownLatch latchStart, final ClientSessionFactory sf)
+         {
+            this.latchAlign = latchAlign;
+            this.latchStart = latchStart;
+            this.sf = sf;
+         }
+
+         @Override
+         public void run()
+         {
+            ClientSession sess = null;
+            try
+            {
+               latchAlign.countDown();
+               latchStart.await();
+
+               sess = sf.createSession(false, true, true);
+
+               ClientProducer prod = sess.createProducer(ADDRESS);
+
+               for (int i = 0; i < NMESSAGES; i++)
+               {
+                  ClientMessage msg = createTextMessage(sess, "test" + i, false);
+                  msg.putStringProperty(PROPERTY_KEY, RandomUtil.randomSimpleString());
+                  prod.send(msg);
+               }
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+               errors.incrementAndGet();
+            }
+            finally
+            {
+               try
+               {
+                  sess.close();
+               }
+               catch (Throwable ignored)
+               {
+               }
+            }
+
+         }
+      };
+
+      class ConsumerThread extends Thread
+      {
+         final ClientSessionFactory sf;
+
+         volatile ClientSession sess;
+
+         final CountDownLatch latchAlign;
+
+         final CountDownLatch latchStart;
+
+         final boolean closeSession;
+
+         ConsumerThread(final ClientSessionFactory sf,
+                        final CountDownLatch latchAlign,
+                        final CountDownLatch latchStart,
+                        final boolean closeSession)
+         {
+            this.sf = sf;
+            this.latchAlign = latchAlign;
+            this.latchStart = latchStart;
+            this.closeSession = closeSession;
+         }
+
+         @Override
+         public void run()
+         {
+            ClientConsumer cons = null;
+            try
+            {
+               latchAlign.countDown();
+               latchStart.await();
+
+               sess = sf.createSession(false, true, true);
+
+               cons = sess.createConsumer(ADDRESS);
+
+               sess.start();
+
+               ClientMessage msg = null;
+
+               while ((msg = cons.receive(1000)) != null)
+               {
+                  // do not ack. Forcing it to come back to head of queue thorugh cancel & rollback
+                  // debug("Received Msg = " + getTextMessage(msg));
+               }
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+               errors.incrementAndGet();
+            }
+            finally
+            {
+               try
+               {
+                  sess.commit();
+               }
+               catch (MessagingException e)
+               {
+                  e.printStackTrace();
+               }
+               if (closeSession)
+               {
+                  try
+                  {
+                     cons.close();
+                  }
+                  catch (Throwable ignored)
+                  {
+                  }
+               }
+            }
+         }
+      };
+
+      this.setUpFailoverServers(false, -1, 512);
+
+      ClientSessionFactory sf = createFailoverFactory();
+      sf.setConsumerWindowSize(-1);
+
+      ClientSession s = sf.createSession(false, true, true);
+
+      s.createQueue(ADDRESS, ADDRESS, true);
+
+      s.close();
+
+      CountDownLatch latchAlign = new CountDownLatch(NTHREADS);
+
+      CountDownLatch latchStart = new CountDownLatch(1);
+
+      ProdThread pthreads[] = new ProdThread[NTHREADS];
+
+      for (int i = 0; i < NTHREADS; i++)
+      {
+         pthreads[i] = new ProdThread(latchAlign, latchStart, sf);
+         pthreads[i].start();
+      }
+
+      latchAlign.await();
+      latchStart.countDown();
+
+      for (Thread t : pthreads)
+      {
+         t.join();
+      }
+
+      assertEquals(0, errors.get());
+
+      compareQueues(ADDRESS, PROPERTY_KEY, NTHREADS * NMESSAGES);
+
+      ConsumerThread cthreads[] = new ConsumerThread[NTHREADS];
+
+      log.info("**********************  Consuming messages ****************************");
+
+      latchAlign = new CountDownLatch(NTHREADS);
+
+      latchStart = new CountDownLatch(1);
+
+      for (int i = 0; i < NTHREADS; i++)
+      {
+         // 50% of the consumers will close the session without ACKing messages what cause them to be redelivered.
+         // This shouldn't affect delivery on backup
+         cthreads[i] = new ConsumerThread(sf, latchAlign, latchStart, i % 2 == 0);
+         cthreads[i].start();
+      }
+
+      latchAlign.await();
+      latchStart.countDown();
+
+      for (ConsumerThread t : cthreads)
+      {
+         t.join();
+      }
+
+      assertEquals(0, errors.get());
+
+      compareConsumers(ADDRESS, PROPERTY_KEY, NTHREADS / 2);
+
+      for (ConsumerThread t : cthreads)
+      {
+         if (t.sess != null)
+         {
+            t.sess.close();
+         }
+      }
+
+      sf.close();
+
+      compareQueues(ADDRESS, PROPERTY_KEY, NTHREADS * NMESSAGES);
+
+      stopServers();
+      // ClientProducer p = s
+
+   }
+
+   /**
+    * Compare if Consumers between Backup and Live server are identical
+    * @param ADDRESS
+    * @param propertyToAssert
+    * @param NTHREADS
+    * @param NMESSAGES
+    * @throws Exception
+    */
+   private void compareConsumers(final SimpleString ADDRESS,
+                                 final SimpleString propertyToAssert,
+                                 int expectedNumberOfConsumers) throws Exception
+   {
+      List<QueueBinding> blive = getLocalQueueBindings(liveServer.getPostOffice(), ADDRESS.toString());
+      List<QueueBinding> bbackup = getLocalQueueBindings(backupServer.getPostOffice(), ADDRESS.toString());
+
+      assertEquals(1, blive.size());
+      assertEquals(1, bbackup.size());
+
+      QueueImpl qlive = (QueueImpl)blive.get(0).getQueue();
+      QueueImpl qbackup = (QueueImpl)bbackup.get(0).getQueue();
+
+      assertEquals(expectedNumberOfConsumers, qlive.getConsumerCount());
+
+      assertEquals(expectedNumberOfConsumers, qbackup.getConsumerCount());
+
+      debug("*****************************************************************************************");
+      debug("LiveConsumers:");
+      for (Consumer c : qlive.getConsumers())
+      {
+         ServerConsumerImpl sc = ((ServerConsumerImpl)c);
+         debug("ID: " + sc.getID() +
+               " SessionID = " +
+               sc.getSession().getID() +
+               " ReplicateSession = " +
+               sc.getReplicatedSessionID());
+      }
+
+      debug("*****************************************************************************************");
+      debug("BackupConsumers:");
+      for (Consumer c : qbackup.getConsumers())
+      {
+         ServerConsumerImpl sc = ((ServerConsumerImpl)c);
+         debug("ID: " + sc.getID() +
+               " SessionID = " +
+               sc.getSession().getID() +
+               " ReplicateSession = " +
+               sc.getReplicatedSessionID());
+      }
+
+      for (Consumer c1 : qlive.getConsumers())
+      {
+         ServerConsumerImpl liveConsumer = (ServerConsumerImpl)c1;
+
+         ServerConsumerImpl backupConsumer = null;
+
+         for (Consumer c2 : qbackup.getConsumers())
+         {
+            ServerConsumerImpl tmp2 = (ServerConsumerImpl)c2;
+            if (liveConsumer.getID() == tmp2.getID() && tmp2.getSession().getID() == liveConsumer.getReplicatedSessionID())
+            {
+               backupConsumer = tmp2;
+               break;
+            }
+         }
+
+         assertNotNull("Couldn't find a consumerID=" + liveConsumer.getID() + " on the backup node", backupConsumer);
+
+         long timeout = System.currentTimeMillis() + 5000;
+
+         // This is async, so a timed out check
+         while (System.currentTimeMillis() < timeout && liveConsumer.getDeliveringRefs().size() != backupConsumer.getDeliveringRefs()
+                                                                                                                 .size())
+         {
+            Thread.sleep(10);
+         }
+
+         assertEquals("Consumer ID = " + liveConsumer.getID() +
+                               " didn't have the same number of deliveries between live and backup node",
+                      liveConsumer.getDeliveringRefs().size(),
+                      backupConsumer.getDeliveringRefs().size());
+
+         Iterator<MessageReference> iterBackup = backupConsumer.getDeliveringRefs().iterator();
+         for (MessageReference refLive : liveConsumer.getDeliveringRefs())
+         {
+            MessageReference refBackup = iterBackup.next();
+
+            assertEquals(refLive.getMessage().getMessageID(), refBackup.getMessage().getMessageID());
+
+            // debug("Property on live = " + refLive.getMessage().getProperty(propertyToAssert));
+            // debug("Property on backup = " + refBackup.getMessage().getProperty(propertyToAssert));
+
+            assertNotNull(refLive.getMessage().getProperty(propertyToAssert));
+            assertTrue(refLive.getMessage()
+                              .getProperty(propertyToAssert)
+                              .equals(refBackup.getMessage().getProperty(propertyToAssert)));
+         }
+
+         assertFalse(iterBackup.hasNext());
+      }
+   }
+
+   /**
+    * Compare if a Queue on Backup and Live server are identical
+    * @param ADDRESS
+    * @param propertyToAssert
+    * @param NTHREADS
+    * @param NMESSAGES
+    * @throws Exception
+    */
+   private void compareQueues(final SimpleString ADDRESS,
+                              final SimpleString propertyToAssert,
+                              int expectedNumberOfMessages) throws Exception
+   {
+      List<QueueBinding> blive = getLocalQueueBindings(liveServer.getPostOffice(), ADDRESS.toString());
+      List<QueueBinding> bbackup = getLocalQueueBindings(backupServer.getPostOffice(), ADDRESS.toString());
+
+      assertEquals(1, blive.size());
+      assertEquals(1, bbackup.size());
+
+      QueueImpl qlive = (QueueImpl)blive.get(0).getQueue();
+      QueueImpl qbackup = (QueueImpl)bbackup.get(0).getQueue();
+
+      assertEquals(expectedNumberOfMessages, qlive.getReferencesList().size());
+
+      assertEquals(expectedNumberOfMessages, qbackup.getReferencesList().size());
+
+      Iterator<MessageReference> iterBackup = qbackup.getReferencesList().iterator();
+
+      for (MessageReference refLive : qlive.getReferencesList())
+      {
+         assertTrue(iterBackup.hasNext());
+         MessageReference refBackup = iterBackup.next();
+
+         assertEquals(refLive.getMessage().getMessageID(), refBackup.getMessage().getMessageID());
+         assertNotNull(refLive.getMessage().getProperty(propertyToAssert));
+         assertEquals(refLive.getMessage().getProperty(propertyToAssert), refBackup.getMessage()
+                                                                                   .getProperty(propertyToAssert));
+      }
+
+      assertFalse(iterBackup.hasNext());
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   class MyHandler implements MessageHandler
+   {
+      final ClientSession session;
+
+      final ClientConsumer consumer;
+
+      volatile boolean started = true;
+
+      final int msgs;
+
+      volatile int receivedMsgs = 0;
+
+      final CountDownLatch latch;
+
+      Throwable failure;
+
+      MyHandler(ClientSessionFactory sf, SimpleString address, final int msgs) throws Exception
+      {
+         this.session = sf.createSession(null, null, false, true, true, false, 0);
+         this.consumer = session.createConsumer(address);
+         consumer.setMessageHandler(this);
+         this.session.start();
+         this.msgs = msgs;
+         latch = new CountDownLatch(msgs);
+      }
+
+      public synchronized void close() throws Exception
+      {
+         session.close();
+      }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
+       */
+      public synchronized void onMessage(ClientMessage message)
+      {
+         try
+         {
+            if (!started)
+            {
+               throw new IllegalStateException("Stopped Handler received message");
+            }
+
+            if (receivedMsgs++ == msgs)
+            {
+               debug("done");
+               started = false;
+               session.stop();
+            }
+
+            message.acknowledge();
+
+            if (!started)
+            {
+               latch.countDown();
+            }
+
+         }
+         catch (Throwable e)
+         {
+            this.failure = e;
+         }
+      }
+
+   }
+}

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -1,331 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.failover;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.jboss.messaging.core.buffers.ChannelBuffers;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.MessageHandler;
-import org.jboss.messaging.core.paging.Page;
-import org.jboss.messaging.core.paging.PagedMessage;
-import org.jboss.messaging.core.paging.PagingManager;
-import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * 
- * It validates if the messages are in the same ordering on the page system between the backup and live nodes.
- * 
- * This test is valid as long as we want to guarantee strict ordering on both nodes for paged messages between backup and live nodes.
- * 
- * If we change this concept anyway this test may become invalid and we would need to delete it.
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class PageOrderingOnBackupTest extends FailoverTestBase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-   public void testPageOrderingLiveAndBackupProducerOnly() throws Exception
-   {
-      internalTestPageOrderingLiveAndBackup(false);
-   }
-
-   public void testPageOrderingLiveAndBackupConsume() throws Exception
-   {
-      internalTestPageOrderingLiveAndBackup(true);
-   }
-
-   private void internalTestPageOrderingLiveAndBackup(boolean consumeMessages) throws Exception
-   {
-      final SimpleString threadIDKey = new SimpleString("THREAD_ID");
-      final SimpleString sequenceIDKey = new SimpleString("SEQUENCE_ID");
-      final SimpleString ADDRESS = new SimpleString("SOME_QUEUE");
-
-      final int NUMBER_OF_THREADS = 100;
-      final int NUMBER_OF_MESSAGES = 200;
-
-      final int NUMBER_OF_HANDLERS = consumeMessages ? NUMBER_OF_THREADS : 0;
-
-      setUpFailoverServers(true, 100 * 1024, 50 * 1024);
-
-      final ClientSessionFactory factory = createFailoverFactory();
-
-      ClientSession session = factory.createSession(false, true, true);
-      for (int i = 0; i < NUMBER_OF_THREADS; i++)
-      {
-         session.createQueue(ADDRESS, ADDRESS.concat("-" + i), true);
-      }
-      session.close();
-
-      MyHandler handlers[] = new MyHandler[NUMBER_OF_HANDLERS];
-
-      for (int i = 0; i < handlers.length; i++)
-      {
-         handlers[i] = new MyHandler(factory, ADDRESS.concat("-" + i), NUMBER_OF_MESSAGES * 10);
-      }
-
-      final CountDownLatch flagAlign = new CountDownLatch(NUMBER_OF_THREADS);
-      final CountDownLatch flagStart = new CountDownLatch(1);
-
-      class ProducerThread extends Thread
-      {
-         Throwable e;
-
-         final int threadID;
-
-         ProducerThread(int threadID)
-         {
-            this.threadID = threadID;
-         }
-
-         public void run()
-         {
-            try
-            {
-               ClientSession session = factory.createSession(false, true, true);
-               ClientProducer producer = session.createProducer(ADDRESS);
-
-               // I want to jinx all this by having everybody start sending at the same time
-               flagAlign.countDown();
-               flagStart.await();
-
-               for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
-               {
-                  ClientMessage msg = session.createClientMessage(true);
-                  msg.setBody(ChannelBuffers.wrappedBuffer(new byte[512]));
-                  msg.getProperties().putIntProperty(threadIDKey, this.threadID);
-                  msg.getProperties().putIntProperty(sequenceIDKey, i);
-                  producer.send(msg);
-               }
-
-               session.close();
-
-            }
-            catch (Throwable e)
-            {
-               // System.out => Hudson/JUNIT reports
-               e.printStackTrace();
-               this.e = e;
-            }
-
-         }
-      }
-
-      ProducerThread threads[] = new ProducerThread[NUMBER_OF_THREADS];
-
-      for (int i = 0; i < threads.length; i++)
-      {
-         threads[i] = new ProducerThread(i);
-         threads[i].start();
-      }
-
-      assertTrue("Error initializing some of the threads", flagAlign.await(10, TimeUnit.SECONDS));
-
-      flagStart.countDown();
-
-      for (ProducerThread t : threads)
-      {
-         t.join();
-      }
-
-      for (ProducerThread t : threads)
-      {
-         if (t.e != null)
-         {
-            throw new Exception("Test Failed", t.e);
-         }
-      }
-
-      Thread.sleep(5000);
-
-      for (MyHandler handler : handlers)
-      {
-         handler.close();
-         if (handler.failure != null)
-         {
-            throw new Exception("Failure on consumer", handler.failure);
-         }
-      }
-
-      PagingManager livePagingManager = liveServer.getPostOffice().getPagingManager();
-      PagingManager backupPagingManager = backupServer.getPostOffice().getPagingManager();
-
-      TestSupportPageStore livePagingStore = (TestSupportPageStore)livePagingManager.getPageStore(ADDRESS);
-      TestSupportPageStore backupPagingStore = (TestSupportPageStore)backupPagingManager.getPageStore(ADDRESS);
-
-      System.out.println("Pages: " + livePagingStore.getNumberOfPages() +
-                         " on backup: " +
-                         backupPagingStore.getNumberOfPages());
-
-
-      if (consumeMessages)
-      {
-         if (livePagingStore.getNumberOfPages() == backupPagingStore.getNumberOfPages() - 1)
-         {
-            // The live node may have one extra page in front of the backup
-            backupPagingStore.depage();
-         }
-      }
-
-      assertEquals(livePagingStore.getNumberOfPages(), backupPagingStore.getNumberOfPages());
-
-      Page livePage = null;
-      Page backupPage = null;
-
-      while (true)
-      {
-         livePage = livePagingStore.depage();
-
-         if (livePage == null)
-         {
-            assertNull(backupPagingStore.depage());
-            break;
-         }
-
-         backupPage = backupPagingStore.depage();
-
-         assertNotNull(backupPage);
-
-         livePage.open();
-         backupPage.open();
-
-         List<PagedMessage> liveMessages = livePage.read();
-         List<PagedMessage> backupMessages = backupPage.read();
-
-         livePage.close();
-         backupPage.close();
-
-         assertEquals(liveMessages.size(), backupMessages.size());
-
-         Iterator<PagedMessage> backupIterator = backupMessages.iterator();
-
-         for (PagedMessage liveMsg : liveMessages)
-         {
-            PagedMessage backupMsg = backupIterator.next();
-            assertNotNull(backupMsg);
-
-            ServerMessage liveSrvMsg = liveMsg.getMessage(null);
-            ServerMessage backupSrvMsg = liveMsg.getMessage(null);
-
-            assertEquals(liveSrvMsg.getMessageID(), backupSrvMsg.getMessageID());
-            assertEquals(liveSrvMsg.getProperty(threadIDKey), backupSrvMsg.getProperty(threadIDKey));
-            assertEquals(liveSrvMsg.getProperty(sequenceIDKey), backupSrvMsg.getProperty(sequenceIDKey));
-         }
-      }
-
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-   class MyHandler implements MessageHandler
-   {
-      final ClientSession session;
-
-      final ClientConsumer consumer;
-
-      volatile boolean started = true;
-
-      final int msgs;
-
-      volatile int receivedMsgs = 0;
-
-      final CountDownLatch latch;
-
-      Throwable failure;
-
-      MyHandler(ClientSessionFactory sf, SimpleString address, final int msgs) throws Exception
-      {
-         this.session = sf.createSession(null, null, false, true, true, false, 0);
-         this.consumer = session.createConsumer(address);
-         consumer.setMessageHandler(this);
-         this.session.start();
-         this.msgs = msgs;
-         latch = new CountDownLatch(msgs);
-      }
-
-      public synchronized void close() throws Exception
-      {
-         session.close();
-      }
-
-      /* (non-Javadoc)
-       * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
-       */
-      public synchronized void onMessage(ClientMessage message)
-      {
-         try
-         {
-            if (!started)
-            {
-               throw new IllegalStateException("Stopped Handler received message");
-            }
-
-            if (receivedMsgs++ == msgs)
-            {
-               System.out.println("done");
-               started = false;
-               session.stop();
-            }
-
-            message.acknowledge();
-
-            if (!started)
-            {
-               latch.countDown();
-            }
-
-         }
-         catch (Throwable e)
-         {
-            this.failure = e;
-         }
-      }
-
-   }
-}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -25,6 +25,8 @@
 import java.io.File;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -45,12 +47,15 @@
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.QueueBinding;
 import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
 import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
 import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Messaging;
 import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.impl.QueueImpl;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.jms.client.JBossBytesMessage;
 import org.jboss.messaging.utils.SimpleString;
@@ -68,7 +73,6 @@
    // Constants -----------------------------------------------------
    private static final int RECEIVE_TIMEOUT = 20000;
 
-
    final int PAGE_SIZE = 512;
 
    final int MAX_GLOBAL = 40 * PAGE_SIZE;
@@ -87,9 +91,9 @@
 
    protected static final SimpleString ADDRESS_GLOBAL = new SimpleString("FailoverTestAddress");
 
-   protected MessagingServer liveService;
+   protected MessagingServer liveServer;
 
-   protected MessagingServer backupService;
+   protected MessagingServer backupServer;
 
    protected final Map<String, Object> backupParams = new HashMap<String, Object>();
 
@@ -228,7 +232,7 @@
          for (int i = 0; i < numSessions; i++)
          {
             SimpleString subName = new SimpleString(threadNum + "sub" + i);
-   
+
             s.deleteQueue(subName);
          }
       }
@@ -243,9 +247,9 @@
 
    protected void stop() throws Exception
    {
-      backupService.stop();
+      backupServer.stop();
 
-      liveService.stop();
+      liveServer.stop();
 
       assertEquals(0, InVMRegistry.instance.size());
 
@@ -386,14 +390,14 @@
 
          backupConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
          backupConf.setPagingGlobalWatermarkSize(pageSize);
-         backupService = Messaging.newMessagingServer(backupConf);
+         backupServer = Messaging.newMessagingServer(backupConf);
       }
       else
       {
-         backupService = Messaging.newMessagingServer(backupConf, false);
+         backupServer = Messaging.newMessagingServer(backupConf, false);
       }
 
-      backupService.start();
+      backupServer.start();
 
       Configuration liveConf = new ConfigurationImpl();
       liveConf.setSecurityEnabled(false);
@@ -427,22 +431,22 @@
 
       if (fileBased)
       {
-         liveService = Messaging.newMessagingServer(liveConf);
+         liveServer = Messaging.newMessagingServer(liveConf);
       }
       else
       {
-         liveService = Messaging.newMessagingServer(liveConf, false);
+         liveServer = Messaging.newMessagingServer(liveConf, false);
       }
 
       AddressSettings settings = new AddressSettings();
       settings.setPageSizeBytes(pageSize);
 
-      liveService.getAddressSettingsRepository().addMatch("#", settings);
-      backupService.getAddressSettingsRepository().addMatch("#", settings);
+      liveServer.getAddressSettingsRepository().addMatch("#", settings);
+      backupServer.getAddressSettingsRepository().addMatch("#", settings);
 
       clearData(getTestDir() + "/live");
 
-      liveService.start();
+      liveServer.start();
    }
 
    // Private -------------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -57,7 +57,7 @@
 
    private final Logger log = Logger.getLogger(PagingFailoverTest.class);
   
-   final int RECEIVE_TIMEOUT = 25000;
+   final int RECEIVE_TIMEOUT = 50000;
 
    // Attributes ----------------------------------------------------
 
@@ -81,6 +81,8 @@
       int numberOfConsumedMessages = multiThreadConsumer(getNumberOfThreads(), false, false);
 
       assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+      
+      System.out.println("Done!");
 
    }
 
@@ -130,6 +132,10 @@
       try
       {
          ClientSessionFactory sf1 = createFailoverFactory();
+         
+         sf1.setBlockOnAcknowledge(true);
+         sf1.setBlockOnNonPersistentSend(true);
+         sf1.setBlockOnPersistentSend(true);
 
          session = sf1.createSession(null, null, false, true, true, false, 0);
 
@@ -283,6 +289,10 @@
             factory = createFailoverFactory();
             store = liveServer.getPostOffice().getPagingManager().getPageStore(ADDRESS);
          }
+         
+         factory.setBlockOnNonPersistentSend(true);
+         factory.setBlockOnAcknowledge(true);
+         factory.setBlockOnPersistentSend(true);
 
          session = factory.createSession(false, true, true, false);
 
@@ -436,6 +446,10 @@
       final PagingStore store = liveServer.getPostOffice().getPagingManager().getPageStore(ADDRESS);
 
       final ClientSessionFactory factory = createFailoverFactory();
+      
+      factory.setBlockOnNonPersistentSend(true);
+      factory.setBlockOnAcknowledge(true);
+      factory.setBlockOnPersistentSend(true);
 
       ClientSession session = factory.createSession(false, true, true, false);
       try

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PreserveOrderDuringFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PreserveOrderDuringFailoverTest.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PreserveOrderDuringFailoverTest.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -68,6 +68,7 @@
    {
       for (int i = 0; i < 20; i++)
       {
+         log.info("testOrdering # " + i);
          setUpFailoverServers(false, -1, -1);
          failoverOrderTest();
          stopServers();

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -32,6 +32,7 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.server.Messaging;
 import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.TransactionOperation;
@@ -43,6 +44,9 @@
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
+
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -545,5 +549,10 @@
       {
          latch.countDown();
       }
+
+      public Collection<Queue> getDistinctQueues()
+      {
+         return Collections.emptySet();
+      }
    }
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.tests.unit.core.postoffice.impl;
 
 import java.io.InputStream;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -275,6 +276,14 @@
 
       }
 
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.transaction.Transaction#getDistinctQueues()
+       */
+      public Set<Queue> getDistinctQueues()
+      {
+         return Collections.emptySet();
+      }
+
    }
 
    class FakeMessage implements ServerMessage
@@ -1370,6 +1379,20 @@
 
       }
 
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.server.Queue#lock()
+       */
+      public void lockDelivery()
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.server.Queue#unlock()
+       */
+      public void unlockDelivery()
+      {
+      }
+
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -38,11 +38,6 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.Bindings;
-import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.QueueBinding;
-import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
 import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
 import org.jboss.messaging.core.security.JBMSecurityManager;
@@ -54,7 +49,6 @@
 import org.jboss.messaging.integration.transports.netty.NettyConnectorFactory;
 import org.jboss.messaging.jms.client.JBossBytesMessage;
 import org.jboss.messaging.jms.client.JBossTextMessage;
-import org.jboss.messaging.utils.SimpleString;
 
 /**
  * 
@@ -366,38 +360,6 @@
       message.getBody().writeBytes(b);
       return message;
    }
-
-   protected int getMessageCount(final MessagingServer service, final String address) throws Exception
-   {
-      return getMessageCount(service.getPostOffice(), address);
-   }
-
-   /**
-    * @param address
-    * @param postOffice
-    * @return
-    * @throws Exception
-    */
-   protected int getMessageCount(final PostOffice postOffice, final String address) throws Exception
-   {
-      int messageCount;
-      messageCount = 0;
-
-      Bindings bindings = postOffice.getBindingsForAddress(new SimpleString(address));
-
-      for (Binding binding : bindings.getBindings())
-      {
-         if ((binding instanceof LocalQueueBinding))
-         {
-            QueueBinding qBinding = (QueueBinding)binding;
-
-            messageCount += qBinding.getQueue().getMessageCount();
-
-         }
-      }
-      return messageCount;
-   }
-
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/util/SpawnedVMSupport.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/SpawnedVMSupport.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/util/SpawnedVMSupport.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -71,8 +71,17 @@
    {
       return spawnVM(className, vmargs, true, args);
    }
+   
+   public static Process spawnVM(final String className,
+                                 final String[] vmargs,
+                                 final boolean logOutput,
+                                 final String... args) throws Exception
+   {
+      return spawnVM(className, "-Xms512m -Xmx512m ", vmargs, logOutput, args);
+   }
 
    public static Process spawnVM(final String className,
+                                 final String memoryArgs,
                                  final String[] vmargs,
                                  final boolean logOutput,
                                  final String... args) throws Exception
@@ -81,7 +90,7 @@
 
       sb.append("java").append(' ');
 
-      sb.append("-Xms512m -Xmx512m ");
+      sb.append(memoryArgs);
 
       for (String vmarg : vmargs)
       {

Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2009-05-13 15:16:38 UTC (rev 6771)
@@ -36,6 +36,7 @@
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -53,8 +54,14 @@
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.Bindings;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.postoffice.QueueBinding;
+import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
 import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
 import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -737,6 +744,51 @@
       return new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
    }
    
+
+   protected int getMessageCount(final MessagingServer service, final String address) throws Exception
+   {
+      return getMessageCount(service.getPostOffice(), address);
+   }
+
+   /**
+    * @param address
+    * @param postOffice
+    * @return
+    * @throws Exception
+    */
+   protected int getMessageCount(final PostOffice postOffice, final String address) throws Exception
+   {
+      int messageCount = 0;
+      
+      List<QueueBinding> bindings = getLocalQueueBindings(postOffice, address);
+      
+      for (QueueBinding qBinding: bindings)
+      {
+         messageCount += qBinding.getQueue().getMessageCount();
+      }
+
+      return messageCount;
+   }
+   
+   protected List<QueueBinding> getLocalQueueBindings(final PostOffice postOffice, final String address) throws Exception
+   {
+      ArrayList<QueueBinding> bindingsFound = new ArrayList<QueueBinding>();
+
+      Bindings bindings = postOffice.getBindingsForAddress(new SimpleString(address));
+
+      for (Binding binding : bindings.getBindings())
+      {
+         if ((binding instanceof LocalQueueBinding))
+         {
+            bindingsFound.add((QueueBinding)binding);
+         }
+      }
+      return bindingsFound;
+   }
+
+
+   
+   
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------




More information about the jboss-cvs-commits mailing list