[hornetq-commits] JBoss hornetq SVN: r9709 - in trunk: src/main/org/hornetq/core/paging/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Sep 21 18:25:30 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-09-21 18:25:29 -0400 (Tue, 21 Sep 2010)
New Revision: 9709

Modified:
   trunk/src/main/org/hornetq/core/paging/PagingStore.java
   trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/server/HornetQServer.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
HORNETQ-523 - Ordering issue with TX and paging

Modified: trunk/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingStore.java	2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/paging/PagingStore.java	2010-09-21 22:25:29 UTC (rev 9709)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.paging;
 
+import java.util.List;
+
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.ServerMessage;
@@ -49,7 +51,7 @@
 
    void sync() throws Exception;
 
-   boolean page(ServerMessage message, long transactionId) throws Exception;
+   boolean page(List<ServerMessage> messages, long transactionId) throws Exception;
 
    boolean page(ServerMessage message) throws Exception;
 

Modified: trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java	2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java	2010-09-21 22:25:29 UTC (rev 9709)
@@ -236,6 +236,11 @@
    {
       return size.intValue();
    }
+   
+   public String toString()
+   {
+      return "PageImpl::pageID="  + this.pageId + ", file=" + this.file;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java	2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java	2010-09-21 22:25:29 UTC (rev 9709)
@@ -235,11 +235,16 @@
 
    // Private -------------------------------------------------------
 
-   private PagingStore newStore(final SimpleString address) throws Exception
+   protected PagingStore newStore(final SimpleString address) throws Exception
    {
       return pagingStoreFactory.newStore(address,
                                          addressSettingsRepository.getMatch(address.toString()));
    }
+   
+   protected PagingStoreFactory getStoreFactory()
+   {
+      return pagingStoreFactory;
+   }
 
    // Inner classes -------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java	2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java	2010-09-21 22:25:29 UTC (rev 9709)
@@ -220,6 +220,26 @@
    {
       return new NIOSequentialFileFactory(directory + File.separatorChar + directoryName, false);
    }
+   
+   protected PagingManager getPagingManager()
+   {
+      return pagingManager;
+   }
+   
+   protected StorageManager getStorageManager()
+   {
+      return storageManager;
+   }
+   
+   protected PostOffice getPostOffice()
+   {
+      return postOffice;
+   }
+   
+   protected ExecutorFactory getExecutorFactory()
+   {
+      return executorFactory;
+   }
 
    // Private -------------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-09-21 22:25:29 UTC (rev 9709)
@@ -14,6 +14,7 @@
 package org.hornetq.core.paging.impl;
 
 import java.text.DecimalFormat;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -110,7 +111,7 @@
    private volatile Page currentPage;
 
    private final ReentrantLock writeLock = new ReentrantLock();
-   
+
    /** duplicate cache used at this address */
    private final DuplicateIDCache duplicateCache;
 
@@ -186,7 +187,7 @@
       this.storeFactory = storeFactory;
 
       this.syncNonTransactional = syncNonTransactional;
-      
+
       // Post office could be null on the backup node
       if (postOffice == null)
       {
@@ -196,7 +197,7 @@
       {
          this.duplicateCache = postOffice.getDuplicateIDCache(storeName);
       }
-      
+
    }
 
    // Public --------------------------------------------------------
@@ -263,7 +264,7 @@
       return storeName;
    }
 
-   public boolean page(final ServerMessage message, final long transactionID) throws Exception
+   public boolean page(final List<ServerMessage> message, final long transactionID) throws Exception
    {
       // The sync on transactions is done on commit only
       return page(message, transactionID, false);
@@ -273,7 +274,7 @@
    {
       // If non Durable, there is no need to sync as there is no requirement for persistence for those messages in case
       // of crash
-      return page(message, -1, syncNonTransactional && message.isDurable());
+      return page(Arrays.asList(message), -1, syncNonTransactional && message.isDurable());
    }
 
    public void sync() throws Exception
@@ -541,7 +542,6 @@
       writeLock.lock();
 
       currentPageLock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging
-
       try
       {
          if (!running)
@@ -597,6 +597,7 @@
             {
                returnPage = createPage(firstPageId++);
             }
+
             return returnPage;
          }
       }
@@ -619,10 +620,14 @@
     * @return
     * @throws Exception
     */
-   private boolean readPage() throws Exception
+   protected boolean readPage() throws Exception
    {
       Page page = depage();
 
+      // It's important that only depage should happen while locked
+      // or we would be holding a lock for a long time
+      // The reading (IO part) should happen outside of any locks
+
       if (page == null)
       {
          return false;
@@ -630,8 +635,8 @@
 
       page.open();
 
-      List<PagedMessage> messages =  null;
-      
+      List<PagedMessage> messages = null;
+
       try
       {
          messages = page.read();
@@ -688,25 +693,25 @@
    class OurRunnable implements Runnable
    {
       boolean ran;
-      
+
       final Runnable runnable;
-      
+
       OurRunnable(final Runnable runnable)
       {
          this.runnable = runnable;
       }
-      
+
       public synchronized void run()
       {
          if (!ran)
          {
             runnable.run();
-            
+
             ran = true;
          }
       }
    }
-   
+
    public void executeRunnableWhenMemoryAvailable(final Runnable runnable)
    {
       if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && maxSize != -1)
@@ -714,23 +719,23 @@
          if (sizeInBytes.get() > maxSize)
          {
             OurRunnable ourRunnable = new OurRunnable(runnable);
-            
+
             onMemoryFreedRunnables.add(ourRunnable);
-            
-            //We check again to avoid a race condition where the size can come down just after the element
-            //has been added, but the check to execute was done before the element was added
-            //NOTE! We do not fix this race by locking the whole thing, doing this check provides
-            //MUCH better performance in a highly concurrent environment
+
+            // We check again to avoid a race condition where the size can come down just after the element
+            // has been added, but the check to execute was done before the element was added
+            // NOTE! We do not fix this race by locking the whole thing, doing this check provides
+            // MUCH better performance in a highly concurrent environment
             if (sizeInBytes.get() <= maxSize)
             {
-               //run it now
+               // run it now
                ourRunnable.run();
             }
 
             return;
          }
       }
-      
+
       runnable.run();
    }
 
@@ -797,9 +802,7 @@
 
    }
 
-   private boolean page(final ServerMessage message,
-                        final long transactionID,
-                        final boolean sync) throws Exception
+   protected boolean page(final List<ServerMessage> messages, final long transactionID, final boolean sync) throws Exception
    {
       if (!running)
       {
@@ -857,60 +860,63 @@
             return false;
          }
 
-         PagedMessage pagedMessage;
-         
-         if (!message.isDurable())
+         for (ServerMessage message : messages)
          {
-            // The address should never be transient when paging (even for non-persistent messages when paging)
-            // This will force everything to be persisted
-            message.bodyChanged();
-         }
+            PagedMessage pagedMessage;
 
-         if (transactionID != -1)
-         {
-            pagedMessage = new PagedMessageImpl(message, transactionID);
-         }
-         else
-         {
-            pagedMessage = new PagedMessageImpl(message);
-         }
+            if (!message.isDurable())
+            {
+               // The address should never be transient when paging (even for non-persistent messages when paging)
+               // This will force everything to be persisted
+               message.bodyChanged();
+            }
 
-         int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
-
-         if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
-         {
-            // Make sure nothing is currently validating or using currentPage
-            currentPageLock.writeLock().lock();
-            try
+            if (transactionID != -1)
             {
-               openNewPage();
-
-               // openNewPage will set currentPageSize to zero, we need to set it again
-               currentPageSize.addAndGet(bytesToWrite);
+               pagedMessage = new PagedMessageImpl(message, transactionID);
             }
-            finally
+            else
             {
-               currentPageLock.writeLock().unlock();
+               pagedMessage = new PagedMessageImpl(message);
             }
-         }
 
-         currentPageLock.readLock().lock();
+            int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
 
-         try
-         {
-            currentPage.write(pagedMessage);
+            if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
+            {
+               // Make sure nothing is currently validating or using currentPage
+               currentPageLock.writeLock().lock();
+               try
+               {
+                  openNewPage();
 
-            if (sync)
+                  // openNewPage will set currentPageSize to zero, we need to set it again
+                  currentPageSize.addAndGet(bytesToWrite);
+               }
+               finally
+               {
+                  currentPageLock.writeLock().unlock();
+               }
+            }
+
+            currentPageLock.readLock().lock();
+
+            try
             {
-               currentPage.sync();
+               currentPage.write(pagedMessage);
+ 
+               if (sync)
+               {
+                  currentPage.sync();
+               }
             }
+            finally
+            {
+               currentPageLock.readLock().unlock();
+            }
+         }
 
-            return true;
-         }
-         finally
-         {
-            currentPageLock.readLock().unlock();
-         }
+         return true;
       }
       finally
       {
@@ -940,9 +946,9 @@
 
       // Depage has to be done atomically, in case of failure it should be
       // back to where it was
-      
+
       byte[] duplicateIdForPage = generateDuplicateID(pageId);
-      
+
       Transaction depageTransaction = new TransactionImpl(storageManager);
 
       // DuplicateCache could be null during replication
@@ -950,7 +956,8 @@
       {
          if (duplicateCache.contains(duplicateIdForPage))
          {
-            log.warn("Page " + pageId + " had been processed already but the file wasn't removed as a crash happened. Ignoring this page");
+            log.warn("Page " + pageId +
+                     " had been processed already but the file wasn't removed as a crash happened. Ignoring this page");
             return true;
          }
 
@@ -1058,7 +1065,7 @@
       {
          // This will set the journal transaction to commit;
          depageTransaction.setContainsPersistent();
-         
+
          entry.getKey().storeUpdate(storageManager, this.pagingManager, depageTransaction, entry.getValue().intValue());
       }
 

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-09-21 22:25:29 UTC (rev 9709)
@@ -15,16 +15,15 @@
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Message;
+import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
@@ -476,10 +475,10 @@
       if (addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null)
       {
          pagingManager.deletePageStore(binding.getAddress());
-         
+
          managementService.unregisterAddress(binding.getAddress());
       }
-      
+
       if (binding.getType() == BindingType.LOCAL_QUEUE)
       {
          managementService.unregisterQueue(uniqueName, binding.getAddress());
@@ -502,7 +501,7 @@
       }
 
       binding.close();
-      
+
       return binding;
    }
 
@@ -537,7 +536,7 @@
    {
       route(message, new RoutingContextImpl(tx), direct);
    }
-   
+
    public void route(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception
    {
       // Sanity check
@@ -547,7 +546,7 @@
       }
 
       SimpleString address = message.getAddress();
-      
+
       setPagingStore(message);
 
       Object duplicateID = message.getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID);
@@ -614,17 +613,18 @@
       else
       {
          Transaction tx = context.getTransaction();
-         
+
          boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
-         
-         // if the TX paged at least one message on a give address, all the other addresses should also go towards paging cache now 
+
+         // if the TX paged at least one message on a give address, all the other addresses should also go towards
+         // paging cache now
          boolean alreadyPaging = false;
-         
+
          if (tx.isPaging())
          {
-            alreadyPaging = getPageOperation(tx).isPaging(message.getAddress()); 
+            alreadyPaging = getPageOperation(tx).isPaging(message.getAddress());
          }
-         
+
          if (!depage && message.storeIsPaging() || alreadyPaging)
          {
             tx.setPaging(true);
@@ -633,7 +633,7 @@
             {
                tx.commit();
             }
-            
+
             return;
          }
       }
@@ -849,7 +849,7 @@
    private void setPagingStore(final ServerMessage message) throws Exception
    {
       PagingStore store = pagingManager.getPageStore(message.getAddress());
-      
+
       message.setPagingStore(store);
    }
 
@@ -1113,21 +1113,26 @@
 
    private class PageMessageOperation implements TransactionOperation
    {
-      private final List<ServerMessage> messagesToPage = new ArrayList<ServerMessage>();
-      
-      private final HashSet<SimpleString> addressesPaging = new HashSet<SimpleString>();
-      
+      private final HashMap<SimpleString, Pair<PagingStore, List<ServerMessage>>> pagingData = new HashMap<SimpleString, Pair<PagingStore, List<ServerMessage>>>();
+
       private Transaction subTX = null;
-      
+
       void addMessageToPage(final ServerMessage message)
       {
-         messagesToPage.add(message);
-         addressesPaging.add(message.getAddress());
+         Pair<PagingStore, List<ServerMessage>> pagePair = pagingData.get(message.getAddress());
+         if (pagePair == null)
+         {
+            pagePair = new Pair<PagingStore, List<ServerMessage>>(message.getPagingStore(),
+                                                                  new ArrayList<ServerMessage>());
+            pagingData.put(message.getAddress(), pagePair);
+         }
+
+         pagePair.b.add(message);
       }
-      
+
       boolean isPaging(final SimpleString address)
       {
-         return addressesPaging.contains(address);
+         return pagingData.get(address) != null;
       }
 
       public void afterCommit(final Transaction tx)
@@ -1142,7 +1147,7 @@
          {
             pageTransaction.commit();
          }
-         
+
          if (subTX != null)
          {
             subTX.afterCommit();
@@ -1178,18 +1183,18 @@
          {
             pageMessages(tx);
          }
-         
+
          if (subTX != null)
          {
             subTX.beforeCommit();
          }
-         
+
       }
 
       public void beforePrepare(final Transaction tx) throws Exception
       {
          pageMessages(tx);
-         
+
          if (subTX != null)
          {
             subTX.beforePrepare();
@@ -1206,7 +1211,7 @@
 
       private void pageMessages(final Transaction tx) throws Exception
       {
-         if (!messagesToPage.isEmpty())
+         if (!pagingData.isEmpty())
          {
             PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
 
@@ -1223,21 +1228,33 @@
 
             boolean pagingPersistent = false;
 
-            Set<PagingStore> pagingStoresToSync = new HashSet<PagingStore>();
+            ArrayList<ServerMessage> nonPagedMessages = null;
 
-            for (ServerMessage message : messagesToPage)
+            for (Pair<PagingStore, List<ServerMessage>> pair : pagingData.values())
             {
-               if (message.page(tx.getID()))
+               
+               if (!pair.a.page(pair.b, tx.getID()))
                {
-                  if (message.isDurable())
+                  if (nonPagedMessages == null)
                   {
-                     // We only create pageTransactions if using persistent messages
+                     nonPagedMessages = new ArrayList<ServerMessage>();
+                  }
+                  nonPagedMessages.addAll(pair.b);
+               }
+               
+               for (ServerMessage msg : pair.b)
+               {
+                  if (msg.isDurable())
+                  {
                      pageTransaction.increment();
                      pagingPersistent = true;
-                     pagingStoresToSync.add(message.getPagingStore());
                   }
                }
-               else
+            }
+
+            if (nonPagedMessages != null)
+            {
+               for (ServerMessage message : nonPagedMessages)
                {
                   // This could happen when the PageStore left the pageState
                   // we create a copy of the transaction so that messages are routed with the same tx ID.
@@ -1246,9 +1263,9 @@
                   {
                      subTX = tx.copy();
                   }
-                  
+
                   route(message, subTX, false);
-                  
+
                   if (subTX.isContainsPersistent())
                   {
                      // The route wouldn't be able to update the persistent flag on the main TX
@@ -1261,16 +1278,12 @@
             if (pagingPersistent)
             {
                tx.setContainsPersistent();
-
-               if (!pagingStoresToSync.isEmpty())
+               for (Pair<PagingStore, List<ServerMessage>> pair : pagingData.values())
                {
-                  for (PagingStore store : pagingStoresToSync)
-                  {
-                     store.sync();
-                  }
+                  pair.a.sync();
+               }
 
-                  pageTransaction.store(storageManager, pagingManager, tx);
-               }
+               pageTransaction.store(storageManager, pagingManager, tx);
             }
          }
       }

Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java	2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java	2010-09-21 22:25:29 UTC (rev 9709)
@@ -23,6 +23,7 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.DivertConfiguration;
 import org.hornetq.core.management.impl.HornetQServerControlImpl;
+import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.protocol.core.Channel;
@@ -58,6 +59,8 @@
    RemotingService getRemotingService();
 
    StorageManager getStorageManager();
+   
+   PagingManager getPagingManager();
 
    ManagementService getManagementService();
 

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-09-21 22:25:29 UTC (rev 9709)
@@ -502,6 +502,11 @@
    {
       return mbeanServer;
    }
+   
+   public PagingManager getPagingManager()
+   {
+      return pagingManager;
+   }
 
    public RemotingService getRemotingService()
    {

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2010-09-21 22:25:29 UTC (rev 9709)
@@ -14,6 +14,7 @@
 package org.hornetq.core.server.impl;
 
 import java.io.InputStream;
+import java.util.Arrays;
 
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
@@ -269,7 +270,7 @@
    {
       if (pagingStore != null)
       {
-         return pagingStore.page(this, transactionID);
+         return pagingStore.page(Arrays.asList((ServerMessage)this), transactionID);
       }
       else
       {

Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-09-21 22:25:29 UTC (rev 9709)
@@ -13,10 +13,17 @@
 
 package org.hornetq.tests.integration.client;
 
+import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.Assert;
@@ -33,14 +40,30 @@
 import org.hornetq.api.core.client.MessageHandler;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.DivertConfiguration;
+import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.PagingStoreFactory;
+import org.hornetq.core.paging.impl.PageImpl;
+import org.hornetq.core.paging.impl.PagingManagerImpl;
+import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.paging.impl.PagingStoreImpl;
 import org.hornetq.core.paging.impl.TestSupportPageStore;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.HornetQServerImpl;
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.ExecutorFactory;
 
 /**
  * A PagingTest
@@ -129,17 +152,17 @@
 
       server.start();
 
-      final int numberOfIntegers = 256;
+      final int messageSize = 1024;
 
       final int numberOfMessages = 30000;
 
-      final byte[] body = new byte[numberOfIntegers * 4];
+      final byte[] body = new byte[messageSize];
 
       ByteBuffer bb = ByteBuffer.wrap(body);
 
-      for (int j = 1; j <= numberOfIntegers; j++)
+      for (int j = 1; j <= messageSize; j++)
       {
-         bb.putInt(j);
+         bb.put(getSamplebyte(j));
       }
 
       try
@@ -244,7 +267,7 @@
                      }
 
                      consumer.close();
-                     
+
                      session.close();
                   }
                   catch (Throwable e)
@@ -266,11 +289,20 @@
          {
             threads[i].join();
          }
-         
+
          assertEquals(0, errors.get());
          
+         for (int i = 0 ; i < 20 && server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
+         {
+            if (server.getPostOffice().getPagingManager().getTransactions().size() != 0)
+            {
+               // The delete may be asynchronous, giving some time case it eventually happen asynchronously
+               Thread.sleep(500);
+            }
+         }
+
          assertEquals(0, server.getPostOffice().getPagingManager().getTransactions().size());
-         
+
       }
       finally
       {
@@ -740,11 +772,11 @@
             message.putIntProperty(new SimpleString("id"), i);
 
             producerTransacted.send(message);
-            
+
             if (i % 2 == 0)
             {
                System.out.println("Sending 20 msgs to make it page");
-               for (int j = 0 ; j < 20; j++)
+               for (int j = 0; j < 20; j++)
                {
                   ClientMessage msgSend = sessionNonTX.createMessage(true);
                   msgSend.getBodyBuffer().writeBytes(new byte[10 * 1024]);
@@ -756,7 +788,7 @@
             {
                System.out.println("Consuming 20 msgs to make it page");
                ClientConsumer consumer = sessionNonTX.createConsumer(PagingTest.ADDRESS);
-               for (int j = 0 ; j < 20; j++)
+               for (int j = 0; j < 20; j++)
                {
                   ClientMessage msgReceived = consumer.receive(10000);
                   assertNotNull(msgReceived);
@@ -765,7 +797,7 @@
                consumer.close();
             }
          }
-         
+
          ClientConsumer consumerNonTX = sessionNonTX.createConsumer(PagingTest.ADDRESS);
          while (true)
          {
@@ -777,7 +809,6 @@
             msgReceived.acknowledge();
          }
          consumerNonTX.close();
-         
 
          ClientConsumer consumer = sessionNonTX.createConsumer(PagingTest.ADDRESS);
 
@@ -798,7 +829,7 @@
             // System.out.println(messageID);
             Assert.assertNotNull(messageID);
             Assert.assertEquals("message received out of order", i, messageID.intValue());
-            
+
             System.out.println("MessageID = " + messageID);
 
             message.acknowledge();
@@ -823,7 +854,6 @@
 
    }
 
-
    public void testDepageDuringTransaction4() throws Exception
    {
       clearData();
@@ -835,93 +865,88 @@
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
                                           new HashMap<String, AddressSettings>());
-      
+
       server.getConfiguration().setJournalSyncNonTransactional(false);
       server.getConfiguration().setJournalSyncTransactional(false);
 
       server.start();
 
       final AtomicInteger errors = new AtomicInteger(0);
-      
+
       final int messageSize = 1024; // 1k
       final int numberOfMessages = 10000;
 
       try
       {
          final ClientSessionFactory sf = createInVMFactory();
-         
 
          sf.setBlockOnNonDurableSend(true);
          sf.setBlockOnDurableSend(true);
          sf.setBlockOnAcknowledge(false);
 
          final byte[] body = new byte[messageSize];
-         
-         
+
          Thread producerThread = new Thread()
          {
-           public void run()
-           {
-              ClientSession sessionProducer = null;
-              try
-              {
-                 sessionProducer = sf.createSession(false, false);
-                 ClientProducer producer = sessionProducer.createProducer(ADDRESS);
-                 
-                 for (int i = 0 ; i < numberOfMessages; i++)
-                 {
-                    ClientMessage msg = sessionProducer.createMessage(true);
-                    msg.getBodyBuffer().writeBytes(body);
-                    msg.putIntProperty("count", i);
-                    producer.send(msg);
-                    
-                    if (i % 50 == 0 && i != 0)
-                    {
-                       sessionProducer.commit();
-                       //Thread.sleep(500);
-                    }
-                 }
-                 
-                 sessionProducer.commit();
-                 
-                 System.out.println("Producer gone");
-                 
-                 
-                 
-              }
-              catch (Throwable e)
-              {
-                 e.printStackTrace(); // >> junit report
-                 errors.incrementAndGet();
-              }
-              finally
-              {
-                 try
-                 {
-                    if (sessionProducer != null)
-                    {
-                       sessionProducer.close();
-                    }
-                 }
-                 catch (Throwable e)
-                 {
-                    e.printStackTrace();
-                    errors.incrementAndGet();
-                 }
-              }
-           }
+            public void run()
+            {
+               ClientSession sessionProducer = null;
+               try
+               {
+                  sessionProducer = sf.createSession(false, false);
+                  ClientProducer producer = sessionProducer.createProducer(ADDRESS);
+
+                  for (int i = 0; i < numberOfMessages; i++)
+                  {
+                     ClientMessage msg = sessionProducer.createMessage(true);
+                     msg.getBodyBuffer().writeBytes(body);
+                     msg.putIntProperty("count", i);
+                     producer.send(msg);
+
+                     if (i % 50 == 0 && i != 0)
+                     {
+                        sessionProducer.commit();
+                        // Thread.sleep(500);
+                     }
+                  }
+
+                  sessionProducer.commit();
+
+                  System.out.println("Producer gone");
+
+               }
+               catch (Throwable e)
+               {
+                  e.printStackTrace(); // >> junit report
+                  errors.incrementAndGet();
+               }
+               finally
+               {
+                  try
+                  {
+                     if (sessionProducer != null)
+                     {
+                        sessionProducer.close();
+                     }
+                  }
+                  catch (Throwable e)
+                  {
+                     e.printStackTrace();
+                     errors.incrementAndGet();
+                  }
+               }
+            }
          };
-         
+
          ClientSession session = sf.createSession(true, true, 0);
          session.start();
          session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
-         
+
          producerThread.start();
- 
+
          ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
-         
-         
-         for (int i = 0 ; i < numberOfMessages; i++)
+
+         for (int i = 0; i < numberOfMessages; i++)
          {
             ClientMessage msg = consumer.receive(500000);
             assertNotNull(msg);
@@ -929,15 +954,15 @@
             msg.acknowledge();
             if (i > 0 && i % 10 == 0)
             {
-               //session.commit();
+               // session.commit();
             }
          }
-         //session.commit();
-         
+         // session.commit();
+
          session.close();
-         
+
          producerThread.join();
-         
+
          assertEquals(0, errors.get());
       }
       finally
@@ -953,6 +978,361 @@
 
    }
 
+   // This test will force a depage thread as soon as the first message hits the page
+   public void testDepageOnTX5() throws Exception
+   {
+      clearData();
+
+      final Configuration config = createDefaultConfig();
+      HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
+
+      final Executor executor = Executors.newSingleThreadExecutor();
+
+      final AtomicInteger countDepage = new AtomicInteger(0);
+      class HackPagingStore extends PagingStoreImpl
+      {
+         HackPagingStore(final SimpleString address,
+                         final PagingManager pagingManager,
+                         final StorageManager storageManager,
+                         final PostOffice postOffice,
+                         final SequentialFileFactory fileFactory,
+                         final PagingStoreFactory storeFactory,
+                         final SimpleString storeName,
+                         final AddressSettings addressSettings,
+                         final Executor executor,
+                         final boolean syncNonTransactional)
+         {
+            super(address,
+                  pagingManager,
+                  storageManager,
+                  postOffice,
+                  fileFactory,
+                  storeFactory,
+                  storeName,
+                  addressSettings,
+                  executor,
+                  syncNonTransactional);
+         }
+
+         protected boolean page(final List<ServerMessage> messages, final long transactionID, final boolean sync) throws Exception
+         {
+            boolean paged = super.page(messages, transactionID, sync);
+
+            if (paged)
+            {
+
+               if (countDepage.incrementAndGet() == 1)
+               {
+                  countDepage.set(0);
+
+                  executor.execute(new Runnable()
+                  {
+                     public void run()
+                     {
+                        try
+                        {
+                           while (isStarted() && readPage());
+                        }
+                        catch (Exception e)
+                        {
+                           e.printStackTrace();
+                        }
+                     }
+                  });
+               }
+            }
+
+            return paged;
+         }
+
+         public boolean startDepaging()
+         {
+            // do nothing, we are hacking depage right in between paging
+            return false;
+         }
+
+      };
+
+      class HackStoreFactory extends PagingStoreFactoryNIO
+      {
+         HackStoreFactory(final String directory,
+                          final ExecutorFactory executorFactory,
+                          final boolean syncNonTransactional)
+         {
+            super(directory, executorFactory, syncNonTransactional);
+         }
+
+         public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) throws Exception
+         {
+
+            return new HackPagingStore(address,
+                                       getPagingManager(),
+                                       getStorageManager(),
+                                       getPostOffice(),
+                                       null,
+                                       this,
+                                       address,
+                                       settings,
+                                       getExecutorFactory().getExecutor(),
+                                       syncNonTransactional);
+         }
+
+      };
+
+      HornetQServer server = new HornetQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), securityManager)
+
+      {
+         protected PagingManager createPagingManager()
+         {
+            return new PagingManagerImpl(new HackStoreFactory(config.getPagingDirectory(),
+                                                              getExecutorFactory(),
+                                                              config.isJournalSyncNonTransactional()),
+                                         getStorageManager(),
+                                         getAddressSettingsRepository());
+         }
+      };
+
+      AddressSettings defaultSetting = new AddressSettings();
+      defaultSetting.setPageSizeBytes(PAGE_SIZE);
+      defaultSetting.setMaxSizeBytes(PAGE_MAX);
+      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+      server.start();
+
+      final AtomicInteger errors = new AtomicInteger(0);
+
+      final int messageSize = 1024; // 1k
+      final int numberOfMessages = 2000;
+
+      try
+      {
+         final ClientSessionFactory sf = createInVMFactory();
+
+         sf.setBlockOnNonDurableSend(true);
+         sf.setBlockOnDurableSend(true);
+         sf.setBlockOnAcknowledge(false);
+
+         final byte[] body = new byte[messageSize];
+
+         Thread producerThread = new Thread()
+         {
+            public void run()
+            {
+               ClientSession sessionProducer = null;
+               try
+               {
+                  sessionProducer = sf.createSession(false, false);
+                  ClientProducer producer = sessionProducer.createProducer(ADDRESS);
+
+                  for (int i = 0; i < numberOfMessages; i++)
+                  {
+                     ClientMessage msg = sessionProducer.createMessage(true);
+                     msg.getBodyBuffer().writeBytes(body);
+                     msg.putIntProperty("count", i);
+                     producer.send(msg);
+
+                     if (i % 500 == 0 && i != 0)
+                     {
+                        sessionProducer.commit();
+                        // Thread.sleep(500);
+                     }
+                  }
+
+                  sessionProducer.commit();
+
+                  System.out.println("Producer gone");
+
+               }
+               catch (Throwable e)
+               {
+                  e.printStackTrace(); // >> junit report
+                  errors.incrementAndGet();
+               }
+               finally
+               {
+                  try
+                  {
+                     if (sessionProducer != null)
+                     {
+                        sessionProducer.close();
+                     }
+                  }
+                  catch (Throwable e)
+                  {
+                     e.printStackTrace();
+                     errors.incrementAndGet();
+                  }
+               }
+            }
+         };
+
+         ClientSession session = sf.createSession(true, true, 0);
+         session.start();
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+         producerThread.start();
+
+         ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = consumer.receive(500000);
+            assertNotNull(msg);
+            assertEquals(i, msg.getIntProperty("count").intValue());
+            msg.acknowledge();
+            if (i > 0 && i % 10 == 0)
+            {
+               // session.commit();
+            }
+         }
+         // session.commit();
+
+         session.close();
+
+         producerThread.join();
+
+         assertEquals(0, errors.get());
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+   public void testOrderingNonTX() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      final HornetQServer server = createServer(true,
+                                                config,
+                                                PagingTest.PAGE_SIZE,
+                                                PagingTest.PAGE_SIZE * 2,
+                                                new HashMap<String, AddressSettings>());
+
+      server.getConfiguration().setJournalSyncNonTransactional(false);
+      server.getConfiguration().setJournalSyncTransactional(false);
+
+      server.start();
+
+      final AtomicInteger errors = new AtomicInteger(0);
+
+      final int messageSize = 1024; // 1k
+      final int numberOfMessages = 2000;
+
+      try
+      {
+         final ClientSessionFactory sf = createInVMFactory();
+
+         sf.setBlockOnNonDurableSend(false);
+         sf.setBlockOnDurableSend(true);
+         sf.setBlockOnAcknowledge(false);
+
+         final CountDownLatch ready = new CountDownLatch(1);
+
+         final byte[] body = new byte[messageSize];
+
+         Thread producerThread = new Thread()
+         {
+            public void run()
+            {
+               ClientSession sessionProducer = null;
+               try
+               {
+                  sessionProducer = sf.createSession(true, true);
+                  ClientProducer producer = sessionProducer.createProducer(ADDRESS);
+
+                  for (int i = 0; i < numberOfMessages; i++)
+                  {
+                     ClientMessage msg = sessionProducer.createMessage(true);
+                     msg.getBodyBuffer().writeBytes(body);
+                     msg.putIntProperty("count", i);
+                     producer.send(msg);
+
+                     if (i == 1000)
+                     {
+                        // The session is not TX, but we do this just to perform a round trip to the server
+                        // and make sure there are no pending messages
+                        sessionProducer.commit();
+
+                        assertTrue(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+                        ready.countDown();
+                     }
+                  }
+
+                  sessionProducer.commit();
+
+                  System.out.println("Producer gone");
+
+               }
+               catch (Throwable e)
+               {
+                  e.printStackTrace(); // >> junit report
+                  errors.incrementAndGet();
+               }
+               finally
+               {
+                  try
+                  {
+                     if (sessionProducer != null)
+                     {
+                        sessionProducer.close();
+                     }
+                  }
+                  catch (Throwable e)
+                  {
+                     e.printStackTrace();
+                     errors.incrementAndGet();
+                  }
+               }
+            }
+         };
+
+         ClientSession session = sf.createSession(true, true, 0);
+         session.start();
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+         producerThread.start();
+
+         assertTrue(ready.await(10, TimeUnit.SECONDS));
+
+         ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = consumer.receive(500000);
+            assertNotNull(msg);
+            assertEquals(i, msg.getIntProperty("count").intValue());
+            msg.acknowledge();
+         }
+
+         session.close();
+
+         producerThread.join();
+
+         assertEquals(0, errors.get());
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    public void testPageOnSchedulingNoRestart() throws Exception
    {
       internalTestPageOnScheduling(false);



More information about the hornetq-commits mailing list