[jboss-cvs] JBoss Messaging SVN: r5605 - in branches/Branch_Failover_Page: src/main/org/jboss/messaging/core/persistence/impl/journal and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jan 8 22:25:05 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-08 22:25:04 -0500 (Thu, 08 Jan 2009)
New Revision: 5605

Added:
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
Removed:
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/remote/
Modified:
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/Queue.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
Log:
Part I

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-09 03:25:04 UTC (rev 5605)
@@ -365,17 +365,19 @@
             // We set the duplicate detection header to prevent the message being depaged more than once in case of
             // failure during depage
 
+            ServerMessage msg = message.getMessage(storageManager);
+
+            // FIXME: This code is duplicated on QueueImpl::removeOrCacheReference
+            
             byte[] bytes = new byte[8];
 
             ByteBuffer buff = ByteBuffer.wrap(bytes);
 
-            ServerMessage msg = message.getMessage(storageManager);
-
             buff.putLong(msg.getMessageID());
 
             SimpleString duplID = new SimpleString(bytes);
 
-            message.getMessage(storageManager).putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, duplID);
+            msg.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, duplID);
          }
 
          int bytesToWrite = message.getEncodeSize() + PageImpl.SIZE_RECORD;
@@ -611,6 +613,8 @@
          currentPageLock.readLock().unlock();
       }
 
+      System.out.println("Paging " + this.getStoreName());
+      
       // if the first check failed, we do it again under a global currentPageLock
       // (writeLock) this time
       writeLock.lock();

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-09 03:25:04 UTC (rev 5605)
@@ -474,7 +474,7 @@
                   throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
                }
 
-               MessageReference removed = queue.removeReferenceWithID(messageID);
+               MessageReference removed = queue.removeReferenceWithID(messageID, true);
 
                if (removed == null)
                {
@@ -894,7 +894,7 @@
                      throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
                   }
 
-                  MessageReference removed = queue.removeReferenceWithID(messageID);
+                  MessageReference removed = queue.removeReferenceWithID(messageID, true);
 
                   referencesToAck.add(removed);
 
@@ -976,7 +976,7 @@
                throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
             }
 
-            MessageReference removed = queue.removeReferenceWithID(messageID);
+            MessageReference removed = queue.removeReferenceWithID(messageID, true);
 
             referencesToAck.add(removed);
 

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-09 03:25:04 UTC (rev 5605)
@@ -26,7 +26,9 @@
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.AddressManager;
 import org.jboss.messaging.core.postoffice.Binding;
@@ -45,6 +47,8 @@
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.TransactionOperation;
+import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
 import org.jboss.messaging.util.JBMThreadFactory;
 import org.jboss.messaging.util.Pair;
 import org.jboss.messaging.util.SimpleString;
@@ -52,6 +56,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -293,7 +298,14 @@
                                               final SimpleString linkAddress,
                                               final boolean duplicateDetection) throws Exception
    {
-      Binding binding = createLinkBinding(name, address, filter, durable, temporary, exclusive, linkAddress, duplicateDetection);
+      Binding binding = createLinkBinding(name,
+                                          address,
+                                          filter,
+                                          durable,
+                                          temporary,
+                                          exclusive,
+                                          linkAddress,
+                                          duplicateDetection);
 
       addBindingInMemory(binding);
 
@@ -356,7 +368,50 @@
 
       if (bindings != null)
       {
-         bindings.route(message, tx);
+         // TODO: Why this method is being called with tx == null?
+         if (tx == null)
+         {
+            bindings.route(message, null);
+         }
+         else
+         {
+            SimpleString destination = message.getDestination();
+   
+            // TODO - this can all be optimised
+            Set<SimpleString> pagingAddresses = (Set<SimpleString>)tx.getProperty(TransactionPropertyIndexes.DESTINATIONS_IN_PAGE_MODE);
+   
+            if (pagingAddresses == null)
+            {
+               pagingAddresses = new HashSet<SimpleString>();
+   
+               tx.putProperty(TransactionPropertyIndexes.DESTINATIONS_IN_PAGE_MODE, pagingAddresses);
+            }
+   
+            boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
+   
+            if (!depage && !message.isReload() &&
+                (pagingAddresses.contains(destination) || pagingManager.isPaging(destination)))
+            {
+               pagingAddresses.add(destination);
+   
+               List<ServerMessage> messages = (List<ServerMessage>)tx.getProperty(TransactionPropertyIndexes.PAGED_MESSAGES);
+   
+               if (messages == null)
+               {
+                  messages = new ArrayList<ServerMessage>();
+   
+                  tx.putProperty(TransactionPropertyIndexes.PAGED_MESSAGES, messages);
+   
+                  tx.addOperation(new PageMessageOperation());
+               }
+   
+               messages.add(message);
+            }
+            else
+            {
+               bindings.route(message, tx);
+            }
+         }
       }
    }
 
@@ -480,7 +535,13 @@
                                      final SimpleString linkAddress,
                                      final boolean duplicateDetection) throws Exception
    {
-      Bindable bindable = bindableFactory.createLink(-1, name, filter, durable, temporary, linkAddress, duplicateDetection);
+      Bindable bindable = bindableFactory.createLink(-1,
+                                                     name,
+                                                     filter,
+                                                     durable,
+                                                     temporary,
+                                                     linkAddress,
+                                                     duplicateDetection);
 
       Binding binding = new BindingImpl(BindingType.LINK, address, bindable, exclusive);
 
@@ -544,10 +605,15 @@
       }
 
       pagingManager.reloadStores();
-      
+
       Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<SimpleString, Long>>>();
 
-      storageManager.loadMessageJournal(this, storageManager, queueSettingsRepository, queues, resourceManager, duplicateIDMap);
+      storageManager.loadMessageJournal(this,
+                                        storageManager,
+                                        queueSettingsRepository,
+                                        queues,
+                                        resourceManager,
+                                        duplicateIDMap);
 
       for (Map.Entry<SimpleString, List<Pair<SimpleString, Long>>> entry : duplicateIDMap.entrySet())
       {
@@ -562,7 +628,7 @@
       }
 
       // This is necessary as if the server was previously stopped while a depage was being executed,
-      // it needs to resume the depage process on those destinations      
+      // it needs to resume the depage process on those destinations
       pagingManager.startGlobalDepage();
    }
 
@@ -598,4 +664,106 @@
          }
       }
    }
+
+   // TODO - this can be further optimised to have one PageMessageOperation per message, NOT one which uses a shared
+   // list
+   private class PageMessageOperation implements TransactionOperation
+   {
+
+      public void afterCommit(final Transaction tx) throws Exception
+      {
+      }
+
+      public void afterPrepare(final Transaction tx) throws Exception
+      {
+      }
+
+      public void afterRollback(final Transaction tx) throws Exception
+      {
+      }
+
+      public void beforeCommit(final Transaction tx) throws Exception
+      {
+         if (tx.getState() != Transaction.State.PREPARED)
+         {
+            pageMessages(tx);
+         }
+      }
+
+      public void beforePrepare(final Transaction tx) throws Exception
+      {
+         pageMessages(tx);
+      }
+
+      public void beforeRollback(final Transaction tx) throws Exception
+      {
+      }
+
+      private void pageMessages(final Transaction tx) throws Exception
+      {
+         List<ServerMessage> messages = (List<ServerMessage>)tx.getProperty(TransactionPropertyIndexes.PAGED_MESSAGES);
+
+         if (messages != null && !messages.isEmpty())
+         {
+            PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
+            if (pageTransaction == null)
+            {
+               pageTransaction = new PageTransactionInfoImpl(tx.getID());
+
+               tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransaction);
+
+               // To avoid a race condition where depage happens before the transaction is completed, we need to inform
+               // the
+               // pager about this transaction is being processed
+               pagingManager.addTransaction(pageTransaction);
+            }
+
+            boolean pagingPersistent = false;
+
+            HashSet<SimpleString> pagedDestinationsToSync = new HashSet<SimpleString>();
+
+            // We only need to add the dupl id header once per transaction
+            boolean first = true;
+            for (ServerMessage message : messages)
+            {
+               // http://wiki.jboss.org/wiki/JBossMessaging2Paging
+               // Explained under Transaction On Paging. (This is the item B)
+               if (pagingManager.page(message, tx.getID(), first))
+               {
+                  if (message.isDurable())
+                  {
+                     // We only create pageTransactions if using persistent messages
+                     pageTransaction.increment();
+                     pagingPersistent = true;
+                     pagedDestinationsToSync.add(message.getDestination());
+                  }
+               }
+               else
+               {
+                  // This could happen when the PageStore left the pageState
+
+                  // TODO is this correct - don't we lose transactionality here???
+                  route(message, null);
+               }
+               first = false;
+            }
+
+            if (pagingPersistent)
+            {
+               tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+
+               if (!pagedDestinationsToSync.isEmpty())
+               {
+                  pagingManager.sync(pagedDestinationsToSync);
+                  storageManager.storePageTransaction(tx.getID(), pageTransaction);
+               }
+            }
+
+            messages.clear();
+         }
+      }
+
+   }
+
 }

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/Queue.java	2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/Queue.java	2009-01-09 03:25:04 UTC (rev 5605)
@@ -93,8 +93,26 @@
 
    int getMessagesAdded();
 
-   MessageReference removeReferenceWithID(long id) throws Exception;
+   /**
+    * When replicating deliveries, we are just replaying removing the messages from the Queue, 
+    * and on that case removeSizes are coming from acks, on that case acknowledged should be passed as false
+    * 
+    * @param id
+    * @param affectPaging
+    * @return
+    * @throws Exception
+    */
+   MessageReference removeReferenceWithID(long id, boolean acknowledged) throws Exception;
 
+   
+   /**
+    * Remove the reference or add it on the DuplicateIDCache case not found
+    * @param id
+    * @return
+    * @throws Exception
+    */
+   MessageReference removeOrCacheReference(long id) throws Exception;
+
    /** Remove message from queue, add it to the scheduled delivery list without affect reference counting */
    void rescheduleDelivery(long id, long scheduledDeliveryTime);
 

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-09 03:25:04 UTC (rev 5605)
@@ -12,27 +12,26 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
-import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.list.PriorityLinkedList;
 import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
-import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.DuplicateIDCache;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -81,8 +80,6 @@
 
    private final boolean temporary;
 
-   private final PostOffice postOffice;
-
    private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
 
    private final ConcurrentSet<MessageReference> expiringMessageReferences = new ConcurrentHashSet<MessageReference>();
@@ -91,6 +88,13 @@
 
    private volatile Distributor distributionPolicy = new RoundRobinDistributor();
 
+   private final DuplicateIDCache duplicateIDCache;
+
+   /** 
+    * We need to Lock the duplicateIDCache
+    * */
+   private final Lock duplicateIDLock = new ReentrantLock();
+
    private boolean direct;
 
    private boolean promptDelivery;
@@ -133,7 +137,7 @@
 
       this.temporary = temporary;
 
-      this.postOffice = postOffice;
+      this.duplicateIDCache = postOffice.getDuplicateIDCache(this.name);
 
       this.storageManager = storageManager;
 
@@ -154,154 +158,129 @@
    // Bindable implementation -------------------------------------------------------------------------------------
 
    public boolean route(final ServerMessage message, Transaction tx) throws Exception
-   {     
+   {
       if (filter != null && !filter.match(message))
       {
          return false;
       }
-      
+
       SimpleString duplicateID = (SimpleString)message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
 
-      DuplicateIDCache cache = null;
+      boolean startedTx = false;
 
-      if (!message.isReload() && duplicateID != null)
-      {
-         cache = postOffice.getDuplicateIDCache(message.getDestination());
+      boolean durableRef = message.isDurable() && durable;
 
-         if (cache.contains(duplicateID))
-         {
-            if (tx == null)
-            {
-               log.warn("Duplicate message detected - message will not be routed");
-            }
-            else
-            {
-               log.warn("Duplicate message detected - transaction will be rejected");
-               
-               tx.markAsRollbackOnly(null);
-            }
+      boolean startLock = false;
 
-            return true;
-         }
-      }
-      
-      boolean durableRef = message.isDurable() && durable;
-      
-      boolean startedTx = false;
-      
-      if (cache != null && tx == null && durableRef)
+      try
       {
-         //We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
-         
-         tx = new TransactionImpl(storageManager);
-         
-         startedTx = true;
-      }
-      
-      // There is no way to cache the Store, since a Queue may belong to multiple addresses,
-      // so we aways need this lookup
-      PagingStore store = pagingManager.getPageStore(message.getDestination());
 
-      if (tx == null)
-      {
-         // If durable, must be persisted before anything is routed
-         MessageReference ref = message.createReference(this);
-
-         if (!message.isReload())
+         if (!message.isReload() && duplicateID != null)
          {
-            if (message.getRefCount() == 1)
+            if (duplicateIDCache.contains(duplicateID))
             {
-               if (durableRef)
+               if (tx == null)
                {
-                  storageManager.storeMessage(message);
+                  log.warn("Duplicate message detected - message will not be routed");
                }
-               
-               if (cache != null)
+               else
                {
-                  cache.addToCache(duplicateID);
+                  log.warn("Duplicate message detected - transaction will be rejected");
+
+                  //tx.markAsRollbackOnly(null);
                }
+
+               return true;
             }
 
-            Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
-
-            if (scheduledDeliveryTime != null)
+            if (tx == null && durableRef)
             {
-               ref.setScheduledDeliveryTime(scheduledDeliveryTime);
+               tx = new TransactionImpl(storageManager);
+               startedTx = true;
 
-               if (durableRef)
-               {
-                  storageManager.updateScheduledDeliveryTime(ref);
-               }
             }
+
+            if (isBackup())
+            {
+               // We need to lock access to route when dealing with backupNodes
+               // Paging will add IDs for records not found during replicateACK,
+               // and that needs to be atomic between route and addID
+               duplicateIDLock.lock();
+               startLock = true;
+            }
          }
 
-         if (message.getRefCount() == 1)
+         // There is no way to cache the Store, since a Queue may belong to multiple addresses,
+         // so we always need this lookup
+         PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+         if (tx == null)
          {
-            store.addSize(message.getMemoryEstimate());
-         }
+            // If durable, must be persisted before anything is routed
+            MessageReference ref = message.createReference(this);
 
-         store.addSize(ref.getMemoryEstimate());
+            if (!message.isReload())
+            {
+               if (message.getRefCount() == 1)
+               {
+                  if (durableRef)
+                  {
+                     storageManager.storeMessage(message);
+                  }
 
-         // TODO addLast never currently returns anything other than STATUS_HANDLED
+                  if (duplicateID != null)
+                  {
+                     duplicateIDCache.addToCache(duplicateID);
+                  }
+               }
 
-         addLast(ref);
-      }
-      else
-      {
-         // TODO combine this similar logic with the non transactional case
+               Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
 
-         SimpleString destination = message.getDestination();
+               if (scheduledDeliveryTime != null)
+               {
+                  ref.setScheduledDeliveryTime(scheduledDeliveryTime);
 
-         //TODO - this can all be optimised
-         Set<SimpleString> pagingAddresses = (Set<SimpleString>)tx.getProperty(TransactionPropertyIndexes.DESTINATIONS_IN_PAGE_MODE);
-         
-         if (pagingAddresses == null)
-         {
-            pagingAddresses = new HashSet<SimpleString>();
-            
-            tx.putProperty(TransactionPropertyIndexes.DESTINATIONS_IN_PAGE_MODE, pagingAddresses);
-         }
-         
-         boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
-         
-         if (!depage && !message.isReload() && (pagingAddresses.contains(destination) || pagingManager.isPaging(destination)))
-         {
-            pagingAddresses.add(destination);
-                        
-            List<ServerMessage> messages = (List<ServerMessage>)tx.getProperty(TransactionPropertyIndexes.PAGED_MESSAGES);
-            
-            if (messages == null)
+                  if (durableRef)
+                  {
+                     storageManager.updateScheduledDeliveryTime(ref);
+                  }
+               }
+            }
+
+            if (message.getRefCount() == 1)
             {
-               messages = new ArrayList<ServerMessage>();
-               
-               tx.putProperty(TransactionPropertyIndexes.PAGED_MESSAGES, messages);
-               
-               tx.addOperation(new PageMessageOperation());
+               store.addSize(message.getMemoryEstimate());
             }
-            
-            messages.add(message);
+
+            store.addSize(ref.getMemoryEstimate());
+
+            // TODO addLast never currently returns anything other than STATUS_HANDLED
+
+            addLast(ref);
          }
          else
          {
+            // TODO combine this similar logic with the non transactional case
+
             MessageReference ref = message.createReference(this);
 
             boolean first = message.getRefCount() == 1;
-            
-            if (!message.isReload() &&  message.getRefCount() == 1)
+
+            if (!message.isReload() && message.getRefCount() == 1)
             {
                if (durableRef)
                {
                   storageManager.storeMessageTransactional(tx.getID(), message);
                }
-               
-               if (cache != null)
+
+               if (duplicateID != null)
                {
-                  cache.addToCache(duplicateID, tx);
+                  duplicateIDCache.addToCache(duplicateID, tx);
                }
             }
 
             Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
-           
+
             if (scheduledDeliveryTime != null)
             {
                ref.setScheduledDeliveryTime(scheduledDeliveryTime);
@@ -318,25 +297,38 @@
             }
 
             store.addSize(ref.getMemoryEstimate());
-            
+
             tx.addOperation(new AddMessageOperation(ref, first));
 
             if (durableRef)
-            {               
+            {
                tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
             }
          }
+
+         if (startedTx)
+         {
+            tx.commit();
+            tx = null;
+         }
       }
-      
-      if (startedTx)
+      finally
       {
-         tx.commit();
+         if (startedTx && tx != null)
+         {
+            // some exception happened during routing, the tx needs to be rolled back
+            tx.rollback();
+         }
+
+         if (startLock)
+         {
+            duplicateIDLock.unlock();
+         }
       }
-      
+
       return true;
    }
-   
-   
+
    // Queue implementation ----------------------------------------------------------------------------------------
 
    public boolean isClustered()
@@ -360,7 +352,7 @@
    }
 
    public void addLast(final MessageReference ref)
-   {          
+   {
       add(ref, false);
    }
 
@@ -448,8 +440,66 @@
       }
    }
 
-   public synchronized MessageReference removeReferenceWithID(final long id) throws Exception
+   public MessageReference removeOrCacheReference(long id) throws Exception
    {
+      // most of the times, the remove will work ok, so we first try it without any locks
+      MessageReference ref = removeReferenceWithID(id, false);
+      
+      if (ref == null)
+      {
+         for (int i = 0; i < 10; i++)
+         {
+            System.out.println("Retry " + i);
+            Thread.sleep(100);
+            
+            ref = removeReferenceWithID(id, false);
+            
+            if (ref != null)
+            {
+               System.out.println("Finally found it:");
+               break;
+            }
+         }
+//         duplicateIDLock.lock();
+//         try
+//         {
+//            // we need to do it again under a lock.
+//            // Depage could still be routing the Message, so we need to lock the duplicateIDLock
+//            // to avoid a race with route
+//
+//            ref = removeReferenceWithID(id, false);
+//
+//            if (ref == null)
+//            {
+//               
+//               System.out.println("Didn't find the reference " + id + " on backup. Storing it!");
+//
+//               
+//               // FIXME: This code is duplicated on QueueImpl::removeOrCacheReference
+//
+//               byte[] bytes = new byte[8];
+//
+//               ByteBuffer buff = ByteBuffer.wrap(bytes);
+//
+//               buff.putLong(id);
+//
+//               SimpleString duplID = new SimpleString(bytes);
+//
+//               duplicateIDCache.addToCache(duplID);
+//            }
+//         }
+//         finally
+//         {
+//            duplicateIDLock.unlock();
+//         }
+      }
+
+      return ref;
+
+   }
+
+   public synchronized MessageReference removeReferenceWithID(final long id, final boolean acknowledged) throws Exception
+   {
       Iterator<MessageReference> iterator = messageReferences.iterator();
 
       MessageReference removed = null;
@@ -464,7 +514,10 @@
 
             removed = ref;
 
-            referenceRemoved(removed);
+            if (acknowledged)
+            {
+               referenceRemoved(removed);
+            }
 
             break;
          }
@@ -519,14 +572,14 @@
 
       return null;
    }
-   
+
    public long getPersistenceID()
    {
       return persistenceID;
    }
 
    public void setPersistenceID(final long id)
-   {      
+   {
       this.persistenceID = id;
    }
 
@@ -588,11 +641,12 @@
    public synchronized int deleteAllReferences(final StorageManager storageManager,
                                                final PostOffice postOffice,
                                                final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
-   {      
+   {
       return deleteMatchingReferences(null, storageManager, postOffice, queueSettingsRepository);
    }
 
-   public synchronized int deleteMatchingReferences(final Filter filter, final StorageManager storageManager,
+   public synchronized int deleteMatchingReferences(final Filter filter,
+                                                    final StorageManager storageManager,
                                                     final PostOffice postOffice,
                                                     final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
    {
@@ -631,7 +685,8 @@
       return count;
    }
 
-   public synchronized boolean deleteReference(final long messageID, final StorageManager storageManager,
+   public synchronized boolean deleteReference(final long messageID,
+                                               final StorageManager storageManager,
                                                final PostOffice postOffice,
                                                final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
    {
@@ -1132,115 +1187,17 @@
          }
       }
    }
-   
-   //TODO - this can be further optimised to have one PageMessageOperation per message, NOT one which uses a shared list
-   private class PageMessageOperation implements TransactionOperation
-   {
-      public void afterCommit(final Transaction tx) throws Exception
-      { 
-      }
 
-      public void afterPrepare(final Transaction tx) throws Exception
-      {  
-      }
-
-      public void afterRollback(final Transaction tx) throws Exception
-      {
-      }
-
-      public void beforeCommit(final Transaction tx) throws Exception
-      { 
-         if (tx.getState() != Transaction.State.PREPARED)
-         {
-            pageMessages(tx);
-         }
-      }
-
-      public void beforePrepare(final Transaction tx) throws Exception
-      { 
-         pageMessages(tx);
-      }
-
-      public void beforeRollback(final Transaction tx) throws Exception
-      {
-      }
-      
-      private void pageMessages(final Transaction tx) throws Exception
-      {
-         List<ServerMessage> messages = (List<ServerMessage>)tx.getProperty(TransactionPropertyIndexes.PAGED_MESSAGES);
-         
-         if (messages != null && !messages.isEmpty())
-         {
-            PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
-            
-            if (pageTransaction == null)
-            {
-               pageTransaction = new PageTransactionInfoImpl(tx.getID());
-               
-               tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransaction);
-               
-               // To avoid a race condition where depage happens before the transaction is completed, we need to inform the
-               // pager about this transaction is being processed
-               pagingManager.addTransaction(pageTransaction);
-            }
-   
-            boolean pagingPersistent = false;
-   
-            HashSet<SimpleString> pagedDestinationsToSync = new HashSet<SimpleString>();
-   
-            // We only need to add the dupl id header once per transaction
-            boolean first = true;
-            for (ServerMessage message : messages)
-            {
-               // http://wiki.jboss.org/wiki/JBossMessaging2Paging
-               // Explained under Transaction On Paging. (This is the item B)
-               if (pagingManager.page(message, tx.getID(), first))
-               {
-                  if (message.isDurable())
-                  {
-                     // We only create pageTransactions if using persistent messages
-                     pageTransaction.increment();
-                     pagingPersistent = true;
-                     pagedDestinationsToSync.add(message.getDestination());
-                  }
-               }
-               else
-               {
-                  // This could happen when the PageStore left the pageState
-                                     
-                  //TODO is this correct - don't we lose transactionality here???
-                  postOffice.route(message, null);
-               }
-               first = false;
-            }
-   
-            if (pagingPersistent)
-            {
-               tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
-               
-               if (!pagedDestinationsToSync.isEmpty())
-               {
-                  pagingManager.sync(pagedDestinationsToSync);
-                  storageManager.storePageTransaction(tx.getID(), pageTransaction);
-               }
-            }
-            
-            messages.clear();
-         }
-      }
-      
-   }
-
    private class AddMessageOperation implements TransactionOperation
    {
       private final MessageReference ref;
-      
+
       private final boolean first;
 
       AddMessageOperation(final MessageReference ref, final boolean first)
       {
          this.ref = ref;
-         
+
          this.first = first;
       }
 
@@ -1258,7 +1215,7 @@
       }
 
       public void beforeCommit(final Transaction tx) throws Exception
-      {         
+      {
       }
 
       public void beforePrepare(final Transaction tx) throws Exception
@@ -1268,11 +1225,11 @@
       public void beforeRollback(final Transaction tx) throws Exception
       {
          ServerMessage msg = ref.getMessage();
-         
+
          PagingStore store = pagingManager.getPageStore(msg.getDestination());
-         
+
          store.addSize(-ref.getMemoryEstimate());
-         
+
          if (first)
          {
             store.addSize(-msg.getMemoryEstimate());
@@ -1281,5 +1238,4 @@
 
    }
 
-
 }

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-09 03:25:04 UTC (rev 5605)
@@ -406,23 +406,22 @@
    {
       // It may not be the first in the queue - since there may be multiple producers
       // sending to the queue
-      MessageReference ref = messageQueue.removeReferenceWithID(messageID);
+      MessageReference ref = messageQueue.removeOrCacheReference(messageID);
 
-      if (ref == null)
+      if (ref != null)
       {
-         throw new IllegalStateException("Cannot find ref when replicating delivery " + messageID);
-      }
+         // We call doHandle rather than handle, since we don't want to check available credits
+         // This is because delivery and receive credits can be processed in different order on live
+         // and backup, and otherwise we could have a situation where the delivery is replicated
+         // but the credits haven't arrived yet, so the delivery gets rejected on backup
+         HandleStatus handled = doHandle(ref);
 
-      // We call doHandle rather than handle, since we don't want to check available credits
-      // This is because delivery and receive credits can be processed in different order on live
-      // and backup, and otherwise we could have a situation where the delivery is replicated
-      // but the credits haven't arrived yet, so the delivery gets rejected on backup
-      HandleStatus handled = doHandle(ref);
-
-      if (handled != HandleStatus.HANDLED)
-      {
-         throw new IllegalStateException("Reference was not handled " + ref + " " + handled);
+         if (handled != HandleStatus.HANDLED)
+         {
+            throw new IllegalStateException("Reference was not handled " + ref + " " + handled);
+         }
       }
+
    }
 
    public void failedOver()

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java	2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java	2009-01-09 03:25:04 UTC (rev 5605)
@@ -56,9 +56,9 @@
 
    protected final Map<String, Object> backupParams = new HashMap<String, Object>();
 
-   private MessagingService liveService;
+   protected MessagingService liveService;
 
-   private MessagingService backupService;
+   protected MessagingService backupService;
 
    // Static --------------------------------------------------------
 
@@ -73,7 +73,7 @@
                                                                      backupParams));
    }
 
-   protected void setUpFileBased() throws Exception
+   protected void setUpFileBased(long maxGlobalSize) throws Exception
    {
 
       deleteDirectory(new File(getTestDir()));
@@ -86,8 +86,8 @@
       backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
       backupConf.setJournalFileSize(100 * 1024);
 
-      backupConf.setPagingMaxGlobalSizeBytes(100 * 1024 * 1024);
-      backupConf.setPagingDefaultSize(10 * 1024);
+      backupConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
+      backupConf.setPagingDefaultSize(20 * 1024);
 
       backupConf.setSecurityEnabled(false);
       backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -108,8 +108,8 @@
       liveConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/live"));
       liveConf.setPagingDirectory(getPageDir(getTestDir() + "/live"));
 
-      liveConf.setPagingMaxGlobalSizeBytes(100 * 1024 * 1024);
-      liveConf.setPagingDefaultSize(10 * 1024);
+      liveConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
+      liveConf.setPagingDefaultSize(20 * 1024);
       liveConf.setJournalFileSize(100 * 1024);
 
       liveConf.setSecurityEnabled(false);

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java	2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java	2009-01-09 03:25:04 UTC (rev 5605)
@@ -68,12 +68,11 @@
       factory.setBlockOnNonPersistentSend(true);
       factory.setBlockOnPersistentSend(true);
       
-      // Enable this and the test will fail
-      //factory.setMinLargeMessageSize(10 * 1024);
+      factory.setMinLargeMessageSize(10 * 1024);
 
       ClientSession session = factory.createSession(null, null, false, true, true, false, 0);
 
-      final int numberOfMessages = 500;
+      final int numberOfMessages = 1;
 
       final int numberOfBytes = 15000;
       
@@ -145,7 +144,7 @@
    @Override
    protected void setUp() throws Exception
    {
-      setUpFileBased();
+      setUpFileBased(100*1024*1024);
    }
 
    protected void tearDown() throws Exception

Added: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java	                        (rev 0)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2009-01-09 03:25:04 UTC (rev 5605)
@@ -0,0 +1,264 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A LargeMessageFailoverTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Dec 8, 2008 7:09:38 PM
+ *
+ *
+ */
+public class LargeMessageFailoverTest extends FailoverTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testLargeMessageReplicatedNoFailover() throws Exception
+   {
+      testLargeMessage(-1, 500);
+   }
+
+   public void testLargeMessageFailOnProducing() throws Exception
+   {
+      testLargeMessage(1, 500);
+   }
+
+   
+//   public void testFail() throws Exception
+//   {
+//      for (int i = 0; i < 100; i++)
+//      {
+//         System.out.println ("****************** " + i);
+//         testLargeMessageFailOnConsume();
+//         tearDown();
+//         setUp();
+//      }
+//   }
+//   
+   public void testLargeMessageFailOnConsume() throws Exception
+   {
+      testLargeMessage(2, 500);
+   }
+
+   private void testLargeMessage(final int placeToFail, final int numberOfMessages) throws Exception
+   {
+      ClientSessionFactory factory = createFailoverFactory();
+
+      // factory.setMinLargeMessageSize(10 * 1024);
+
+      final int messageSize = 25000;
+
+      try
+      {
+
+         sendMessages(factory, placeToFail == 1, numberOfMessages, messageSize);
+
+         receiveMessages(factory, placeToFail == 2, numberOfMessages, messageSize);
+
+      }
+      finally
+      {
+         System.out.println("Giving up!!!!!!");
+      }
+
+   }
+
+   /**
+    * @param factory
+    * @param placeToFail
+    * @param numberOfMessages
+    * @param messageSize
+    * @throws MessagingException
+    */
+   private void receiveMessages(final ClientSessionFactory factory,
+                                final boolean fail,
+                                final int numberOfMessages,
+                                final int messageSize) throws MessagingException
+   {
+      ClientSession session = factory.createSession(null, null, false, true, true, false, 0);
+
+      try
+      {
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         session.start();
+
+         final RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            if (fail)
+            {
+               if (i == numberOfMessages / 3)
+               {
+                  forceFailure(conn);
+               }
+            }
+
+            ClientMessage message = consumer.receive(5000);
+
+            assertNotNull(message);
+
+            message.acknowledge();
+
+            MessagingBuffer buffer = message.getBody();
+
+            buffer.rewind();
+
+            assertEquals(messageSize, buffer.limit());
+
+            assertEquals(i, buffer.getInt());
+         }
+         assertNull(consumer.receive(500));
+      }
+      finally
+      {
+         session.close();
+      }
+   }
+
+   /**
+    * @param conn
+    */
+   private void forceFailure(final RemotingConnection conn)
+   {
+      new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               Thread.sleep(100);
+            }
+            catch (InterruptedException e)
+            {
+               e.printStackTrace();
+            }
+            
+            System.out.println("***************************************** Forcing failure");
+            conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+         }
+      }.start();
+   }
+
+   /**
+    * @param factory
+    * @param placeToFail
+    * @param numberOfMessages
+    * @param messageSize
+    * @throws MessagingException
+    */
+   private void sendMessages(final ClientSessionFactory factory,
+                             final boolean fail,
+                             final int numberOfMessages,
+                             final int messageSize) throws MessagingException
+   {
+      ClientSession session = factory.createSession(null, null, false, true, true, false, 0);
+
+      try
+      {
+         session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         final RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            if (fail)
+            {
+               if (i == numberOfMessages / 3)
+               {
+                  forceFailure(conn);
+               }
+           }
+
+            ClientMessage message = session.createClientMessage(true);
+
+            ByteBuffer buffer = ByteBuffer.allocate(messageSize);
+
+            buffer.putInt(i);
+
+            buffer.rewind();
+
+            message.setBody(new ByteBufferWrapper(buffer));
+
+            producer.send(message);
+
+         }
+      }
+      finally
+      {
+         session.close();
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      setUpFileBased(100 * 1024 * 1024);
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	                        (rev 0)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	2009-01-09 03:25:04 UTC (rev 5605)
@@ -0,0 +1,186 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A PagingFailoverTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Dec 8, 2008 10:53:16 AM
+ *
+ *
+ */
+public class PagingFailoverTest extends FailoverTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   public void testPaging() throws Exception
+   {
+      ClientSession session = null;
+      try
+      {
+         ClientSessionFactory sf1 = createFailoverFactory();
+         
+         sf1.setBlockOnAcknowledge(true);
+         sf1.setBlockOnNonPersistentSend(true);
+         sf1.setBlockOnPersistentSend(true);
+         
+         //sf1.setMinLargeMessageSize(10 * 1024);
+
+         session = sf1.createSession(null, null, false, true, true, false, 0);
+
+         session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+         session.start();
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         final int numMessages = 5000;
+         
+         PagingManager pmLive = liveService.getServer().getPostOffice().getPagingManager();
+         PagingStore storeLive = pmLive.getPageStore(ADDRESS);
+         
+         PagingManager pmBackup = backupService.getServer().getPostOffice().getPagingManager();
+         PagingStore storeBackup = pmBackup.getPageStore(ADDRESS); 
+         
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = session.createClientMessage(true);
+            ByteBuffer buffer = ByteBuffer.allocate(1000);
+
+            buffer.putInt(i);
+
+            buffer.rewind();
+
+            message.setBody(new ByteBufferWrapper(buffer));
+
+            producer.send(message);
+            
+            if (storeLive.isPaging())
+            {
+               assertTrue(storeBackup.isPaging());
+            }
+        }
+
+         final RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+         assertEquals("GloblSize", pmLive.getGlobalSize(), pmBackup.getGlobalSize());
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            
+            if (i == numMessages / 2)
+            {
+//               assertEquals("GlobalSize", pmLive.getGlobalSize(), pmBackup.getGlobalSize());
+               System.out.println("Failing");
+               conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+            }
+
+            ClientMessage message = consumer.receive(1000);
+            
+            //System.out.println("message = " + message);
+            
+            assertNotNull(message);
+
+            message.acknowledge();
+            //session.commit();
+
+            message.getBody().rewind();
+            
+            assertEquals(i, message.getBody().getInt());
+
+         }
+
+         assertNull(consumer.receive(100));
+         
+         
+//         assertEquals(0, pmLive.getGlobalSize());
+         assertEquals(0, pmBackup.getGlobalSize());
+
+      }
+      finally
+      {
+         if (session != null)
+         {
+            try
+            {
+               session.close();
+            }
+            catch (Exception ignored)
+            {
+               // eat it
+            }
+         }
+      }
+
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void tearDown() throws Exception
+   {
+      //super.tearDown();
+   }
+   
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      setUpFileBased(100 * 1024);
+      
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java	2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java	2009-01-09 03:25:04 UTC (rev 5605)
@@ -23,17 +23,44 @@
 package org.jboss.messaging.tests.integration.paging;
 
 import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executor;
 
 import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.management.impl.ManagementServiceImpl;
+import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.PagedMessage;
+import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
+import org.jboss.messaging.core.paging.impl.PagingStoreFactoryNIO;
+import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.persistence.impl.journal.JournalStorageManager;
+import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.RemotingServiceImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.security.JBMSecurityManager;
+import org.jboss.messaging.core.security.impl.JBMSecurityManagerImpl;
+import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServerImpl;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.tests.integration.paging.remote.RemotePageCrashExecution;
 import org.jboss.messaging.tests.util.ServiceTestBase;
-import org.jboss.messaging.tests.util.SpawnedVMSupport;
+import org.jboss.messaging.util.OrderedExecutorFactory;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * This test will make sure that a failing depage won't cause duplicated messages
@@ -49,6 +76,8 @@
 
    // Constants -----------------------------------------------------
 
+   public static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------
@@ -59,12 +88,9 @@
 
    public void testCrashDuringDeleteFile() throws Exception
    {
-      clearData();
+      
+      pageAndFail();
 
-      Process process = SpawnedVMSupport.spawnVM(RemotePageCrashExecution.class.getCanonicalName());
-      process.waitFor();
-      assertEquals("The remote process failed, test is invalid", RemotePageCrashExecution.OK, process.exitValue());
-
       File pageDir = new File(getPageDir());
 
       File directories[] = pageDir.listFiles();
@@ -97,7 +123,7 @@
 
          session.start();
 
-         ClientConsumer consumer = session.createConsumer(RemotePageCrashExecution.ADDRESS);
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
 
          assertNull(consumer.receive(200));
 
@@ -120,8 +146,330 @@
       super.tearDown();
    }
 
+   // Privte -------------------------------------------------------
+   
+   
+   /** This method will leave garbage on paging. 
+    *  It will not delete page files as if the server crashed right after commit, 
+    *  and before removing the file*/
+   private void pageAndFail() throws Exception
+   {
+      clearData();
+      Configuration config = createDefaultConfig();
+
+      config.setPagingMaxGlobalSizeBytes(100 * 1024);
+      config.setPagingDefaultSize(10 * 1024);
+
+      MessagingService service = newMessagingService(config);
+
+      service.start();
+
+      try
+      {
+         ClientSessionFactory sf = createInVMFactory();
+
+         sf.setBlockOnNonPersistentSend(true);
+         sf.setBlockOnPersistentSend(true);
+         sf.setBlockOnAcknowledge(true);
+
+         ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+         session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         ByteBuffer ioBuffer = ByteBuffer.allocate(1024);
+
+         ClientMessage message = null;
+
+         MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+         message = session.createClientMessage(true);
+         message.setBody(bodyLocal);
+
+         PagingStore store = service.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+
+         int messages = 0;
+         while (!store.isPaging())
+         {
+            producer.send(message);
+            messages++;
+         }
+
+         for (int i = 0; i < 2; i++)
+         {
+            messages++;
+            producer.send(message);
+         }
+
+         session.close();
+
+         assertTrue(service.getServer().getPostOffice().getPagingManager().getGlobalSize() > 0);
+
+         session = sf.createSession(null, null, false, true, true, false, 0);
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         session.start();
+
+         for (int i = 0; i < messages; i++)
+         {
+            ClientMessage message2 = consumer.receive(10000);
+
+            assertNotNull(message2);
+
+            message2.acknowledge();
+         }
+
+         consumer.close();
+
+         session.close();
+
+         assertEquals(0, service.getServer().getPostOffice().getPagingManager().getGlobalSize());
+
+      }
+      finally
+      {
+         try
+         {
+            service.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
    // Private -------------------------------------------------------
 
+   private MessagingServiceImpl newMessagingService(final Configuration configuration)
+   {
+
+      StorageManager storageManager = new JournalStorageManager(configuration);
+
+      RemotingService remotingService = new RemotingServiceImpl(configuration);
+
+      JBMSecurityManager securityManager = new JBMSecurityManagerImpl(true);
+
+      ManagementService managementService = new ManagementServiceImpl(ManagementFactory.getPlatformMBeanServer(), false);
+
+      remotingService.setManagementService(managementService);
+
+      MessagingServer server = new FailingMessagingServiceImpl();
+
+      server.setConfiguration(configuration);
+
+      server.setStorageManager(storageManager);
+
+      server.setRemotingService(remotingService);
+
+      server.setSecurityManager(securityManager);
+
+      server.setManagementService(managementService);
+
+      return new MessagingServiceImpl(server, storageManager, remotingService);
+   }
+
    // Inner classes -------------------------------------------------
 
+   /** This is hacking MessagingServerImpl, 
+    *  to make sure the server will fail right 
+    *  after before the page-file was removed */
+   class FailingMessagingServiceImpl extends MessagingServerImpl
+   {
+      /**
+       * Method could be replaced for test purposes 
+       */
+      @Override
+      protected PagingManager createPagingManager()
+      {
+         return new PagingManagerImpl(new FailurePagingStoreFactoryNIO(super.getConfiguration().getPagingDirectory(),
+                                                                       super.getConfiguration().getPagingMaxThreads()),
+                                      super.getStorageManager(),
+                                      super.getQueueSettingsRepository(),
+                                      super.getConfiguration().getPagingMaxGlobalSizeBytes(),
+                                      super.getConfiguration().getPagingDefaultSize(),
+                                      super.getConfiguration().isJournalSyncNonTransactional());
+      }
+
+      class FailurePagingStoreFactoryNIO extends PagingStoreFactoryNIO
+
+      {
+         /**
+          * @param directory
+          * @param maxThreads
+          */
+         public FailurePagingStoreFactoryNIO(final String directory, final int maxThreads)
+         {
+            super(directory, maxThreads);
+         }
+
+         // Constants -----------------------------------------------------
+
+         // Attributes ----------------------------------------------------
+
+         // Static --------------------------------------------------------
+
+         // Constructors --------------------------------------------------
+
+         // Public --------------------------------------------------------
+
+         @Override
+         public synchronized PagingStore newStore(final SimpleString destinationName, final QueueSettings settings) throws Exception
+         {
+            Field factoryField = PagingStoreFactoryNIO.class.getDeclaredField("executorFactory");
+            factoryField.setAccessible(true);
+
+            OrderedExecutorFactory factory = (OrderedExecutorFactory)factoryField.get(this);
+            return new FailingPagingStore(destinationName, settings, factory.getExecutor());
+         }
+
+         // Package protected ---------------------------------------------
+
+         // Protected -----------------------------------------------------
+
+         // Private -------------------------------------------------------
+
+         // Inner classes -------------------------------------------------
+         class FailingPagingStore extends PagingStoreImpl
+         {
+
+            /**
+             * @param pagingManager
+             * @param storageManager
+             * @param postOffice
+             * @param fileFactory
+             * @param storeFactory
+             * @param storeName
+             * @param queueSettings
+             * @param executor
+             */
+            public FailingPagingStore(final SimpleString storeName,
+                                      final QueueSettings queueSettings,
+                                      final Executor executor)
+            {
+               super(getPostOffice().getPagingManager(),
+                     getStorageManager(),
+                     getPostOffice(),
+                     null,
+                     FailurePagingStoreFactoryNIO.this,
+                     storeName,
+                     queueSettings,
+                     executor);
+            }
+
+            @Override
+            protected Page createPage(final int page) throws Exception
+            {
+
+               Page originalPage = super.createPage(page);
+
+               return new FailingPage(originalPage);
+            }
+
+         }
+
+      }
+
+      class FailingPage implements Page
+      {
+         Page delegatedPage;
+
+         /**
+          * @throws Exception
+          * @see org.jboss.messaging.core.paging.Page#close()
+          */
+         public void close() throws Exception
+         {
+            delegatedPage.close();
+         }
+
+         /**
+          * @throws Exception
+          * @see org.jboss.messaging.core.paging.Page#delete()
+          */
+         public void delete() throws Exception
+         {
+            // This will let the file stay, simulating a system failure
+         }
+
+         /**
+          * @return
+          * @see org.jboss.messaging.core.paging.Page#getNumberOfMessages()
+          */
+         public int getNumberOfMessages()
+         {
+            return delegatedPage.getNumberOfMessages();
+         }
+
+         /**
+          * @return
+          * @see org.jboss.messaging.core.paging.Page#getPageId()
+          */
+         public int getPageId()
+         {
+            return delegatedPage.getPageId();
+         }
+
+         /**
+          * @return
+          * @see org.jboss.messaging.core.paging.Page#getSize()
+          */
+         public int getSize()
+         {
+            return delegatedPage.getSize();
+         }
+
+         /**
+          * @throws Exception
+          * @see org.jboss.messaging.core.paging.Page#open()
+          */
+         public void open() throws Exception
+         {
+            delegatedPage.open();
+         }
+
+         /**
+          * @return
+          * @throws Exception
+          * @see org.jboss.messaging.core.paging.Page#read()
+          */
+         public List<PagedMessage> read() throws Exception
+         {
+            return delegatedPage.read();
+         }
+
+         /**
+          * @throws Exception
+          * @see org.jboss.messaging.core.paging.Page#sync()
+          */
+         public void sync() throws Exception
+         {
+            delegatedPage.sync();
+         }
+
+         /**
+          * @param message
+          * @throws Exception
+          * @see org.jboss.messaging.core.paging.Page#write(org.jboss.messaging.core.paging.PagedMessage)
+          */
+         public void write(final PagedMessage message) throws Exception
+         {
+            delegatedPage.write(message);
+         }
+
+         public FailingPage(final Page delegatePage)
+         {
+            delegatedPage = delegatePage;
+         }
+      }
+
+   }
+
+   // Inner classes -------------------------------------------------
+
 }

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java	2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java	2009-01-09 03:25:04 UTC (rev 5605)
@@ -637,12 +637,21 @@
       }
 
    }
+   
+   public void testPageMultipleDestinations() throws Exception
+   {
+      internalTestPageMultipleDestinations(false);
+   }
 
+   
+   public void testPageMultipleDestinationsTransacted() throws Exception
+   {
+      internalTestPageMultipleDestinations(true);
+   }
 
-   public void testPageMultipleDestinations() throws Exception
+
+   private void internalTestPageMultipleDestinations(boolean transacted) throws Exception
    {
-      clearData();
-
       Configuration config = createDefaultConfig();
 
       final int MAX_SIZE = 90 * 1024; // this must be lower than minlargeMessageSize on the SessionFactory
@@ -666,7 +675,7 @@
          sf.setBlockOnPersistentSend(true);
          sf.setBlockOnAcknowledge(true);
 
-         ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+         ClientSession session = sf.createSession(null, null, false, !transacted, true, false, 0);
 
          for (int i = 0; i < NUMBER_OF_BINDINGS; i++)
          {
@@ -687,6 +696,11 @@
          for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
          {
             producer.send(message);
+            
+            if (transacted) 
+            {
+               session.commit();
+            }
          }
 
          session.close();

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2009-01-09 03:25:04 UTC (rev 5605)
@@ -1164,7 +1164,7 @@
       EasyMock.expect(consumer.handle(messageReference3)).andReturn(HandleStatus.HANDLED);
       EasyMock.replay(consumer);
       queue.addListFirst(messageReferences);
-      queue.removeReferenceWithID(2);
+      queue.removeReferenceWithID(2, true);
       queue.addConsumer(consumer);
       queue.deliverNow();
       EasyMock.verify(consumer);




More information about the jboss-cvs-commits mailing list