[hornetq-commits] JBoss hornetq SVN: r10649 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/journal/impl and 11 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri May 13 00:10:27 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-05-13 00:10:25 -0400 (Fri, 13 May 2011)
New Revision: 10649

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFile.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
JBPAPP-6466 - improving IO on depaging

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFile.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/SequentialFile.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -94,6 +94,8 @@
    void renameTo(String newFileName) throws Exception;
 
    SequentialFile copy();
+   
+   void copyTo(SequentialFile newFileName);
 
    void setTimedBuffer(TimedBuffer buffer);
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -104,6 +104,37 @@
 
       file.delete();
    }
+   
+   public void copyTo(SequentialFile newFileName)
+   {
+      try
+      {
+         log.debug("Copying "  + this + " as " + newFileName);
+         newFileName.open();
+         this.open();
+         
+         
+         ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
+         
+         for (;;)
+         {
+            buffer.rewind();
+            int size = this.read(buffer);
+            newFileName.writeInternal(buffer);
+            if (size < 10 * 1024)
+            {
+               break;
+            }
+         }
+         newFileName.close();
+         this.close();
+      }
+      catch (Exception e)
+      {
+         log.warn("Error on copying file " + this + " as " + newFileName, e);
+      }
+      
+   }
 
    public void position(final long pos) throws Exception
    {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -36,6 +36,7 @@
 import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
 import org.hornetq.core.paging.impl.PagingManagerImpl;
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
 import org.hornetq.core.persistence.StorageManager;
@@ -79,6 +80,8 @@
       {
 
          Pair<Map<Long, Set<PagePosition>>, Set<Long>> cursorACKs = PrintPages.loadCursorACKs(arg[1]);
+         
+         Set<Long> pgTXs = cursorACKs.b;
 
          ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
          final ExecutorService executor = Executors.newFixedThreadPool(10);
@@ -146,7 +149,7 @@
                         System.out.print(",");
                      }
                   }
-                  if (msg.getTransactionID() != 0 && ! cursorACKs.b.contains(msg.getTransactionID()));
+                  if (msg.getTransactionID() >= 0 && !pgTXs.contains(msg.getTransactionID()))
                   {
                      System.out.print(", **PG_TX_NOT_FOUND**");
                   }
@@ -230,7 +233,12 @@
             }
             else
             {
-               pgTXs.add(record.id);
+               PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+
+               pageTransactionInfo.decode(buff);
+
+               pageTransactionInfo.setRecordID(record.id);
+               pgTXs.add(pageTransactionInfo.getTransactionID());
             }
          }
       }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.paging.cursor;
 
+import java.util.concurrent.Executor;
+
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.server.Queue;
@@ -131,4 +133,9 @@
     * @return
     */
    PagedMessage queryMessage(PagePosition pos);
+
+   /**
+    * @return
+    */
+   Executor getExecutor();
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -14,8 +14,10 @@
 package org.hornetq.core.paging.cursor;
 
 import java.lang.ref.WeakReference;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.api.core.Message;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
@@ -34,6 +36,10 @@
 
    private static final long serialVersionUID = -8640232251318264710L;
 
+   private static final Logger log = Logger.getLogger(PagedReferenceImpl.class);
+
+   private static final boolean isTrace = log.isTraceEnabled();
+
    private final PagePosition position;
 
    private WeakReference<PagedMessage> message;
@@ -42,6 +48,8 @@
 
    private int persistedCount;
 
+   private AtomicInteger deliveryCount = new AtomicInteger(0);
+
    private final PageSubscription subscription;
 
    public ServerMessage getMessage()
@@ -84,12 +92,12 @@
    {
       return true;
    }
-   
+
    public void setPersistedCount(int count)
    {
       this.persistedCount = count;
    }
-   
+
    public int getPersistedCount()
    {
       return persistedCount;
@@ -100,8 +108,7 @@
     */
    public MessageReference copy(final Queue queue)
    {
-      // TODO Auto-generated method stub
-      return null;
+      return new PagedReferenceImpl(this.position, this.getPagedMessage(), this.subscription);
    }
 
    /* (non-Javadoc)
@@ -137,8 +144,7 @@
     */
    public int getDeliveryCount()
    {
-      // TODO Auto-generated method stub
-      return 0;
+      return deliveryCount.get();
    }
 
    /* (non-Javadoc)
@@ -146,8 +152,7 @@
     */
    public void setDeliveryCount(final int deliveryCount)
    {
-      // TODO Auto-generated method stub
-
+      this.deliveryCount.set(deliveryCount);
    }
 
    /* (non-Javadoc)
@@ -155,7 +160,11 @@
     */
    public void incrementDeliveryCount()
    {
-      // TODO Auto-generated method stub
+      deliveryCount.incrementAndGet();
+      if (isTrace)
+      {
+         log.trace("deliveryCount = " + deliveryCount + " for " + this);
+      }
 
    }
 
@@ -164,8 +173,7 @@
     */
    public void decrementDeliveryCount()
    {
-      // TODO Auto-generated method stub
-
+      deliveryCount.decrementAndGet();
    }
 
    /* (non-Javadoc)
@@ -199,4 +207,25 @@
    {
       subscription.ackTx(tx, this);
    }
+
+   /* (non-Javadoc)
+    * @see java.lang.Object#toString()
+    */
+   @Override
+   public String toString()
+   {
+      return "PagedReferenceImpl [position=" + position +
+             ", message=" +
+             message +
+             ", deliveryTime=" +
+             deliveryTime +
+             ", persistedCount=" +
+             persistedCount +
+             ", deliveryCount=" +
+             deliveryCount +
+             ", subscription=" +
+             subscription +
+             "]";
+   }
+
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -30,7 +30,6 @@
 import org.hornetq.core.paging.cursor.PagedReference;
 import org.hornetq.core.paging.cursor.PagedReferenceImpl;
 import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.Future;
 import org.hornetq.utils.SoftValueHashMap;
 import org.jboss.netty.util.internal.ConcurrentHashMap;
@@ -57,8 +56,7 @@
 
    private final StorageManager storageManager;
 
-   private final ExecutorFactory executorFactory;
-
+   // This is the same executor used at the PageStoreImpl. One Executor per pageStore
    private final Executor executor;
 
    private final SoftValueHashMap<Long, PageCache> softCache;
@@ -71,13 +69,12 @@
 
    public PageCursorProviderImpl(final PagingStore pagingStore,
                                  final StorageManager storageManager,
-                                 final ExecutorFactory executorFactory,
+                                 final Executor executor,
                                  final int maxCacheSize)
    {
       this.pagingStore = pagingStore;
       this.storageManager = storageManager;
-      this.executorFactory = executorFactory;
-      this.executor = executorFactory.getExecutor();
+      this.executor = executor;
       this.softCache = new SoftValueHashMap<Long, PageCache>(maxCacheSize);
    }
 
@@ -96,13 +93,7 @@
          throw new IllegalStateException("Cursor " + cursorID + " had already been created");
       }
 
-      activeCursor = new PageSubscriptionImpl(this,
-                                              pagingStore,
-                                              storageManager,
-                                              executorFactory.getExecutor(),
-                                              filter,
-                                              cursorID,
-                                              persistent);
+      activeCursor = new PageSubscriptionImpl(this, pagingStore, storageManager, executor, filter, cursorID, persistent);
       activeCursors.put(cursorID, activeCursor);
       return activeCursor;
    }
@@ -389,6 +380,17 @@
             {
                pagingStore.stopPaging();
             }
+            else
+            {
+               if (log.isTraceEnabled())
+               {
+                  log.trace("Couldn't cleanup page on address " + this.pagingStore.getAddress() +
+                            " as numberOfPages == " +
+                            pagingStore.getNumberOfPages() +
+                            " and currentPage.numberOfMessages = " +
+                            pagingStore.getCurrentPage().getNumberOfMessages());
+               }
+            }
          }
          catch (Exception ex)
          {
@@ -411,7 +413,7 @@
             {
                cache = softCache.remove((long)depagedPage.getPageId());
             }
-            
+
             if (cache == null)
             {
                // The page is not on cache any more
@@ -426,7 +428,7 @@
             {
                pgdMessages = cache.getMessages();
             }
-            
+
             depagedPage.delete(pgdMessages);
             synchronized (softCache)
             {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -21,6 +21,7 @@
 
 import org.hornetq.api.core.Pair;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.MessageReference;
@@ -40,6 +41,8 @@
 
    // Constants -----------------------------------------------------
    static final Logger log = Logger.getLogger(PageSubscriptionCounterImpl.class);
+   
+   static final boolean isTrace = log.isTraceEnabled();
 
    // Attributes ----------------------------------------------------
 
@@ -51,8 +54,12 @@
    private long recordID = -1;
 
    private boolean persistent;
+   
+   private final PageSubscription subscription;
 
    private final StorageManager storage;
+   
+   private final Executor executor;
 
    private final AtomicLong value = new AtomicLong(0);
 
@@ -60,8 +67,6 @@
 
    private LinkedList<Pair<Long, Integer>> loadList;
 
-   private final Executor executor;
-
    private final Runnable cleanupCheck = new Runnable()
    {
       public void run()
@@ -77,14 +82,16 @@
    // Constructors --------------------------------------------------
 
    public PageSubscriptionCounterImpl(final StorageManager storage,
+                                      final PageSubscription subscription,
+                                      final Executor executor,
                                       final boolean persistent,
-                                      final long subscriptionID,
-                                      final Executor executor)
+                                      final long subscriptionID)
    {
       this.subscriptionID = subscriptionID;
+      this.executor = executor;
       this.storage = storage;
-      this.executor = executor;
       this.persistent = persistent;
+      this.subscription = subscription;
    }
 
    /* (non-Javadoc)
@@ -253,10 +260,13 @@
          }
 
          newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace);
+         
+         if (isTrace)
+         {
+            log.trace("Replacing page-counter record = "  + recordID + " by record = " + newRecordID + " on subscriptionID = " + this.subscriptionID + " for queue = " + this.subscription.getQueue().getName());
+         }
 
          storage.commit(txCleanup);
-
-         storage.waitOnOperations();
      }
       catch (Exception e)
       {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -92,8 +92,6 @@
 
    private final PageCursorProvider cursorProvider;
 
-   private final Executor executor;
-
    private volatile PagePosition lastAckedPosition;
 
    private List<PagePosition> recoveredACK;
@@ -101,6 +99,8 @@
    private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
 
    private final PageSubscriptionCounter counter;
+   
+   private final Executor executor;
 
    private final AtomicLong deliveredCount = new AtomicLong(0);
 
@@ -126,7 +126,7 @@
       this.executor = executor;
       this.filter = filter;
       this.persistent = persistent;
-      this.counter = new PageSubscriptionCounterImpl(store, persistent, cursorId, executor);
+      this.counter = new PageSubscriptionCounterImpl(store, this, executor, persistent, cursorId);
    }
 
    // Public --------------------------------------------------------
@@ -224,7 +224,7 @@
       // First get the completed pages using a lock
       synchronized (this)
       {
-         for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
+         for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet()) 
          {
             PageCursorInfo info = entry.getValue();
             if (info.isDone() && !info.isPendingDelete() && lastAckedPosition != null)
@@ -687,6 +687,14 @@
       }
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageSubscription#executeWithContext(java.lang.Runnable)
+    */
+   public Executor getExecutor()
+   {
+      return executor;
+   }
+
    private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
    {
       return getPageInfo(pos, true);
@@ -734,8 +742,17 @@
    {
       if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
       {
+         if (isTrace)
+         {
+            log.trace("a new position is being processed as ACK");
+         }
          if (lastAckedPosition != null && lastAckedPosition.getPageNr() != pos.getPageNr())
          {
+            if (isTrace)
+            {
+               log.trace("Scheduling cleanup on pageSubscription for address = " + pageStore.getAddress() + " queue = " + this.getQueue().getName());
+            }
+            
             // there's a different page being acked, we will do the check right away
             if (autoCleanup)
             {
@@ -780,7 +797,7 @@
 
    private PageTransactionInfo getPageTransaction(final PagedReference reference)
    {
-      if (reference.getPagedMessage().getTransactionID() != 0)
+      if (reference.getPagedMessage().getTransactionID() >= 0)
       {
          return pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID());
       }
@@ -816,6 +833,7 @@
 
       private final long pageId;
 
+      // TODO: Merge removedReferences and acks into a single structure
       // Confirmed ACKs on this page
       private final Set<PagePosition> acks = Collections.synchronizedSet(new LinkedHashSet<PagePosition>());
 
@@ -1135,6 +1153,13 @@
                {
                   ignored = true;
                }
+               
+               PageCursorInfo info = getPageInfo(message.getPosition(), false);
+               
+               if (info != null && info.isRemoved(message.getPosition()))
+               {
+                  continue;
+               }
 
                // 2nd ... if TX, is it committed?
                if (valid && message.getPagedMessage().getTransactionID() >= 0)
@@ -1145,7 +1170,7 @@
                   {
                      log.warn("Couldn't locate page transaction " + message.getPagedMessage().getTransactionID() +
                               ", ignoring message on position " +
-                              message.getPosition());
+                              message.getPosition() + " on address=" + pageStore.getAddress() + " queue=" + queue.getName());
                      valid = false;
                      ignored = true;
                   }
@@ -1166,7 +1191,7 @@
                   // Say you have a Browser that will only read the files... there's no need to control PageCursors is
                   // nothing
                   // is being changed. That's why the false is passed as a parameter here
-                  PageCursorInfo info = getPageInfo(message.getPosition(), false);
+                 
                   if (info != null && info.isRemoved(message.getPosition()))
                   {
                      valid = false;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -84,6 +84,7 @@
          HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(largeMessageLazyData);
          message.decodeHeadersAndProperties(buffer);
          lgMessage.incrementDelayDeletionCount();
+         lgMessage.setPaged();
          largeMessageLazyData = null;
       }
    }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -46,6 +46,7 @@
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.RouteContextList;
 import org.hornetq.core.server.RoutingContext;
@@ -200,7 +201,7 @@
          this.syncTimer = null;
       }
 
-      this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executorFactory, addressSettings.getPageCacheMaxSize());
+      this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executor, addressSettings.getPageCacheMaxSize());
 
    }
 
@@ -870,6 +871,11 @@
 
 
          PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), tx == null ? -1 : tx.getID());
+         
+         if (message.isLargeMessage())
+         {
+            ((LargeServerMessage)message).setPaged();
+         }
 
          int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -18,6 +18,7 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.BodyEncoder;
@@ -48,6 +49,8 @@
    private final JournalStorageManager storageManager;
 
    private LargeServerMessage linkMessage;
+   
+   private boolean paged;
 
    // We should only use the NIO implementation on the Journal
    private SequentialFile file;
@@ -82,6 +85,11 @@
 
    // Public --------------------------------------------------------
 
+   public void setPaged()
+   {
+      paged = true;
+   }
+   
    /* (non-Javadoc)
     * @see org.hornetq.core.server.LargeServerMessage#addBytes(byte[])
     */
@@ -260,27 +268,59 @@
          }
       }
    }
+   
 
+   public void setOriginalHeaders(final ServerMessage other, final boolean expiry)
+   {
+      super.setOriginalHeaders(other, expiry);
+      
+      LargeServerMessageImpl otherLM = (LargeServerMessageImpl)other;
+      this.paged = otherLM.paged;
+      if (this.paged)
+      {
+         this.removeProperty(Message.HDR_ORIG_MESSAGE_ID); 
+      }
+   }
+
+
    @Override
    public synchronized ServerMessage copy(final long newID)
    {
-      incrementDelayDeletionCount();
-
-      long idToUse = messageID;
-
-      if (linkMessage != null)
+      if (!paged)
       {
-         idToUse = linkMessage.getMessageID();
+         incrementDelayDeletionCount();
+   
+         long idToUse = messageID;
+   
+         if (linkMessage != null)
+         {
+            idToUse = linkMessage.getMessageID();
+         }
+   
+         SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
+   
+         ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
+                                                                                  : (LargeServerMessageImpl)linkMessage,
+                                                               newfile,
+                                                               newID);
+         return newMessage;
       }
-
-      SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
-
-      ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
-                                                                               : (LargeServerMessageImpl)linkMessage,
-                                                            newfile,
-                                                            newID);
-
-      return newMessage;
+      else
+      {
+         SequentialFile file = this.file;
+         
+         SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
+         
+         file.copyTo(newFile);
+         
+         LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
+         
+         newMessage.linkMessage = null;
+         
+         newMessage.setPaged();
+         
+         return newMessage;
+      }
    }
 
    public SequentialFile getFile()

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -157,7 +157,14 @@
       return "LargeServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress()  + ",properties=" + properties.toString() + "]";
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.LargeServerMessage#setPaged()
+    */
+   public void setPaged()
+   {
+   }
 
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -30,7 +30,13 @@
    void setLinkedMessage(LargeServerMessage message);
 
    boolean isFileExists() throws Exception;
-
+   
+   /**
+    * We have to copy the large message content in case of DLQ and paged messages
+    * For that we need to pre-mark the LargeMessage with a flag when it is paged
+    */
+   void setPaged();
+   
    /** Close the files if opened */
    void releaseResources();
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -72,6 +72,8 @@
 public class QueueImpl implements Queue
 {
    private static final Logger log = Logger.getLogger(QueueImpl.class);
+   
+   private static final boolean isTrace = log.isTraceEnabled();
 
    public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
 
@@ -396,7 +398,7 @@
 
    public void deliverAsync()
    {
-      executor.execute(deliverRunner);
+      getExecutor().execute(deliverRunner);
    }
 
    public void close() throws Exception
@@ -411,7 +413,15 @@
 
    public Executor getExecutor()
    {
-      return executor;
+      if (pageSubscription.isPaging())
+      {
+         // When in page mode, we don't want to have concurrent IO on the same PageStore
+         return pageSubscription.getExecutor();
+      }
+      else
+      {
+         return executor;
+      }
    }
 
    /* Only used on tests */
@@ -432,7 +442,7 @@
 
       if (!ok)
       {
-         log.warn("Couldn't finish waiting executors. Try increasing the thread pool size");
+         log.warn("Couldn't finish waiting executors. Try increasing the thread pool size", new Exception ("trace"));
       }
       
       return ok;
@@ -1440,6 +1450,8 @@
       int numRefs = messageReferences.size();
 
       int handled = 0;
+      
+      long timeout = System.currentTimeMillis() + 1000;
 
       while (handled < numRefs)
       {
@@ -1451,6 +1463,19 @@
 
             return;
          }
+         
+         if (pageSubscription != null && pageSubscription.isPaging() && System.currentTimeMillis() > timeout)
+         {
+            if (isTrace)
+            {
+               log.trace("Page delivery has been running for too long. Scheduling another delivery task now");
+            }
+            
+            deliverAsync();
+            
+            return;
+         }
+         
 
          ConsumerHolder holder = consumerList.get(pos);
 
@@ -1549,7 +1574,7 @@
          }
       }
 
-      if (pageIterator != null && messageReferences.size() == 0 && pageIterator.hasNext())
+      if (pageIterator != null && messageReferences.size() == 0 && pageSubscription.isPaging() && pageIterator.hasNext()) 
       {
          scheduleDepage();
       }
@@ -1580,7 +1605,7 @@
 
    private void scheduleDepage()
    {
-      executor.execute(depageRunner);
+      pageSubscription.getExecutor().execute(depageRunner);
    }
 
    private void depage()
@@ -1629,11 +1654,13 @@
 
       if (internalQueue)
       {
+         if (isTrace)
+         {
+            log.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery");
+         }
          // no DLQ check on internal queues
          return true;
       }
-
-      // TODO: DeliveryCount on paging
       
       if (!internalQueue && message.isDurable() && durable && !reference.isPaged())
       {
@@ -1644,9 +1671,18 @@
 
       int maxDeliveries = addressSettings.getMaxDeliveryAttempts();
 
+      if (isTrace)
+      {
+         log.trace("Checking redelivery for reference = " + reference + " with maxDeliveries = " + maxDeliveries + " on queue " + address);
+      }
+
       // First check DLA
       if (maxDeliveries > 0 && reference.getDeliveryCount() >= maxDeliveries)
       {
+         if (isTrace)
+         {
+            log.trace("Sending reference " + reference + " to DLA");
+         }
          sendToDeadLetterAddress(reference);
 
          return false;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -16,7 +16,6 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -84,8 +83,6 @@
 
    private final ServerSession session;
 
-   private final Executor executor;
-
    private final Object lock = new Object();
 
    private volatile AtomicInteger availableCredits = new AtomicInteger(0);
@@ -153,8 +150,6 @@
 
       messageQueue = binding.getQueue();
 
-      this.executor = messageQueue.getExecutor();
-
       this.started = browseOnly || started;
 
       this.browseOnly = browseOnly;
@@ -376,7 +371,7 @@
 
       Future future = new Future();
 
-      executor.execute(future);
+      messageQueue.getExecutor().execute(future);
 
       boolean ok = future.await(10000);
 
@@ -483,7 +478,7 @@
 
          Future future = new Future();
 
-         executor.execute(future);
+         messageQueue.getExecutor().execute(future);
 
          boolean ok = future.await(10000);
 
@@ -668,7 +663,7 @@
          {
             if (browseOnly)
             {
-               executor.execute(browserDeliverer);
+               messageQueue.getExecutor().execute(browserDeliverer);
             }
             else
             {
@@ -680,7 +675,7 @@
 
    private void resumeLargeMessage()
    {
-      executor.execute(resumeLargeMessageRunnable);
+      messageQueue.getExecutor().execute(resumeLargeMessageRunnable);
    }
 
    private void deliverLargeMessage(final MessageReference ref, final ServerMessage message) throws Exception
@@ -723,7 +718,7 @@
                {
                   if (browseOnly)
                   {
-                     executor.execute(browserDeliverer);
+                     messageQueue.getExecutor().execute(browserDeliverer);
                   }
                   else
                   {

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -13,10 +13,13 @@
 
 package org.hornetq.tests.integration.client;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -42,9 +45,16 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.DivertConfiguration;
 import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
 import org.hornetq.core.paging.impl.TestSupportPageStore;
 import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
@@ -262,8 +272,7 @@
          consumer = session.createConsumer(PagingTest.ADDRESS);
 
          session.start();
-         
-         
+
          assertEquals(numberOfMessages, queue.getMessageCount());
 
          ClientMessage msg = consumer.receive(5000);
@@ -284,7 +293,7 @@
          }
          assertNull(msg);
 
-         for (int i = xids.size() -1 ; i >= 0; i--)
+         for (int i = xids.size() - 1; i >= 0; i--)
          {
             Xid xid = xids.get(i);
             session.rollback(xid);
@@ -298,25 +307,25 @@
          session = sf.createSession(false, false, false);
 
          session.start();
-         
+
          consumer = session.createConsumer(PagingTest.ADDRESS);
-         
+
          for (int i = 0; i < numberOfMessages; i++)
          {
             msg = consumer.receive(1000);
             assertNotNull(msg);
             msg.acknowledge();
-            
+
             assertEquals(i, msg.getIntProperty("id").intValue());
-            
+
             if (i % 500 == 0)
             {
                session.commit();
             }
          }
-         
+
          session.commit();
-         
+
          session.close();
 
          sf.close();
@@ -324,14 +333,13 @@
          locator.close();
 
          assertEquals(0, queue.getMessageCount());
-         
+
          long timeout = System.currentTimeMillis() + 5000;
          while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
          {
             Thread.sleep(100);
          }
-         assertFalse (queue.getPageSubscription().getPagingStore().isPaging());
-         // assertEquals(numberOfMessages, queue.getMessageCount());
+         assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
       }
       finally
       {
@@ -346,6 +354,350 @@
 
    }
 
+   public void testMissingTXEverythingAcked() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 5000;
+
+      final int numberOfTX = 10;
+
+      final int messagesPerTX = numberOfMessages / numberOfTX;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.createQueue(ADDRESS.toString(), "q1", true);
+
+         session.createQueue(ADDRESS.toString(), "q2", true);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         byte[] body = new byte[messageSize];
+
+         ByteBuffer bb = ByteBuffer.wrap(body);
+
+         for (int j = 1; j <= messageSize; j++)
+         {
+            bb.put(getSamplebyte(j));
+         }
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = session.createMessage(true);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producer.send(message);
+            if (i % messagesPerTX == 0)
+            {
+               session.commit();
+            }
+         }
+         session.commit();
+         session.close();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+      ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
+
+      List<PreparedTransactionInfo> list = new ArrayList<PreparedTransactionInfo>();
+
+      JournalImpl jrn = new JournalImpl(config.getJournalFileSize(),
+                                        2,
+                                        0,
+                                        0,
+                                        new NIOSequentialFileFactory(getJournalDir()),
+                                        "hornetq-data",
+                                        "hq",
+                                        1);
+      jrn.start();
+      jrn.load(records, list, null);
+
+      // Delete everything from the journal
+      for (RecordInfo info : records)
+      {
+         if (!info.isUpdate)
+         {
+            jrn.appendDeleteRecord(info.id, false);
+         }
+      }
+
+      jrn.stop();
+
+      server = createServer(true,
+                            config,
+                            PagingTest.PAGE_SIZE,
+                            PagingTest.PAGE_MAX,
+                            new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      Page pg = server.getPagingManager().getPageStore(ADDRESS).getCurrentPage();
+
+      pg.open();
+
+      List<PagedMessage> msgs = pg.read(server.getStorageManager());
+
+      pg.close();
+
+      long queues[] = new long[] { server.locateQueue(new SimpleString("q1")).getID() };
+
+      for (long q : queues)
+      {
+         for (int i = 0; i < msgs.size(); i++)
+         {
+            server.getStorageManager().storeCursorAcknowledge(q, new PagePositionImpl(pg.getPageId(), i));
+         }
+      }
+
+      server.stop();
+
+      ServerLocator locator = createInVMNonHALocator();
+
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(true);
+
+      server = createServer(true,
+                            config,
+                            PagingTest.PAGE_SIZE,
+                            PagingTest.PAGE_MAX,
+                            new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      ClientSessionFactory csf = locator.createSessionFactory();
+
+      ClientSession sess = csf.createSession();
+
+      sess.start();
+
+      ClientConsumer cons = sess.createConsumer("q1");
+
+      assertNull(cons.receive(500));
+
+      Thread.sleep(5000);
+
+      ClientConsumer cons2 = sess.createConsumer("q2");
+      assertNull(cons2.receive(500));
+
+      long timeout = System.currentTimeMillis() + 5000;
+
+      while (System.currentTimeMillis() < timeout && server.getPagingManager().getPageStore(ADDRESS).isPaging())
+      {
+         Thread.sleep(100);
+      }
+
+      assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+      sess.close();
+
+      locator.close();
+
+      server.stop();
+   }
+
+   public void testMissingTXEverythingAcked2() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 6;
+
+      final int numberOfTX = 2;
+
+      final int messagesPerTX = numberOfMessages / numberOfTX;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.createQueue(ADDRESS.toString(), "q1", true);
+
+         session.createQueue(ADDRESS.toString(), "q2", true);
+
+         server.getPagingManager().getPageStore(ADDRESS).startPaging();
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         byte[] body = new byte[messageSize];
+
+         ByteBuffer bb = ByteBuffer.wrap(body);
+
+         for (int j = 1; j <= messageSize; j++)
+         {
+            bb.put(getSamplebyte(j));
+         }
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = session.createMessage(true);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putStringProperty("id", "str-" + i);
+
+            producer.send(message);
+            if ((i + 1) % messagesPerTX == 0)
+            {
+               session.commit();
+            }
+         }
+         session.commit();
+
+         session.start();
+
+         for (int i = 1; i <= 2; i++)
+         {
+            ClientConsumer cons = session.createConsumer("q" + i);
+
+            for (int j = 0; j < 3; j++)
+            {
+               ClientMessage msg = cons.receive(5000);
+
+               assertNotNull(msg);
+
+               assertEquals("str-" + j, msg.getStringProperty("id"));
+
+               msg.acknowledge();
+            }
+
+            session.commit();
+
+         }
+
+         session.close();
+      }
+      finally
+      {
+         locator.close();
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+      server = createServer(true,
+                            config,
+                            PagingTest.PAGE_SIZE,
+                            PagingTest.PAGE_MAX,
+                            new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      ServerLocator locator = createInVMNonHALocator();
+
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(true);
+
+      ClientSessionFactory csf = locator.createSessionFactory();
+
+      ClientSession session = csf.createSession();
+
+      session.start();
+
+      for (int i = 1; i <= 2; i++)
+      {
+         ClientConsumer cons = session.createConsumer("q" + i);
+
+         for (int j = 3; j < 6; j++)
+         {
+            ClientMessage msg = cons.receive(5000);
+
+            assertNotNull(msg);
+
+            assertEquals("str-" + j, msg.getStringProperty("id"));
+
+            msg.acknowledge();
+         }
+
+         session.commit();
+         assertNull(cons.receive(500));
+
+      }
+
+      session.close();
+
+      long timeout = System.currentTimeMillis() + 5000;
+
+      while (System.currentTimeMillis() < timeout && server.getPagingManager().getPageStore(ADDRESS).isPaging())
+      {
+         Thread.sleep(100);
+      }
+
+      locator.close();
+
+      server.stop();
+   }
+
    public void testTwoQueuesOneNoRouting() throws Exception
    {
       boolean persistentMessages = true;
@@ -2787,8 +3139,7 @@
          }
       }
    }
-   
-   
+
    public void testPageAndDepageRapidly() throws Exception
    {
       boolean persistentMessages = true;
@@ -2800,11 +3151,7 @@
       config.setJournalSyncNonTransactional(false);
       config.setJournalFileSize(10 * 1024 * 1024);
 
-      HornetQServer server = createServer(true,
-                                          config,
-                                          512 * 1024,
-                                          1024 * 1024,
-                                          new HashMap<String, AddressSettings>());
+      HornetQServer server = createServer(true, config, 512 * 1024, 1024 * 1024, new HashMap<String, AddressSettings>());
 
       server.start();
 
@@ -2827,9 +3174,9 @@
          session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
 
          ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
-         
+
          final AtomicInteger errors = new AtomicInteger(0);
-         
+
          Thread consumeThread = new Thread()
          {
             public void run()
@@ -2839,16 +3186,16 @@
                {
                   sessionConsumer = sf.createSession(false, false);
                   sessionConsumer.start();
-                  
+
                   ClientConsumer cons = sessionConsumer.createConsumer(ADDRESS);
-                  
+
                   for (int i = 0; i < numberOfMessages; i++)
                   {
                      ClientMessage msg = cons.receive(PagingTest.RECEIVE_TIMEOUT);
                      System.out.println("Message " + i + " consumed");
                      assertNotNull(msg);
                      msg.acknowledge();
-                     
+
                      if (i % 20 == 0)
                      {
                         System.out.println("Commit consumer");
@@ -2874,10 +3221,10 @@
                      errors.incrementAndGet();
                   }
                }
-               
+
             }
          };
-         
+
          consumeThread.start();
 
          ClientMessage message = null;
@@ -2887,7 +3234,7 @@
          for (int i = 0; i < numberOfMessages; i++)
          {
             message = session.createMessage(persistentMessages);
-            
+
             System.out.println("Message " + i + " sent");
 
             HornetQBuffer bodyLocal = message.getBodyBuffer();
@@ -2897,23 +3244,24 @@
             message.putIntProperty(new SimpleString("id"), i);
 
             producer.send(message);
-            
+
             Thread.sleep(50);
          }
 
-         
          consumeThread.join();
-         
+
          long timeout = System.currentTimeMillis() + 5000;
-         
-         while (System.currentTimeMillis() < timeout && (server.getPagingManager().getPageStore(ADDRESS).isPaging() || server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages() != 1))
+
+         while (System.currentTimeMillis() < timeout && (server.getPagingManager().getPageStore(ADDRESS).isPaging() || server.getPagingManager()
+                                                                                                                             .getPageStore(ADDRESS)
+                                                                                                                             .getNumberOfPages() != 1))
          {
             Thread.sleep(1);
          }
 
          // It's async, so need to wait a bit for it happening
          assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
-         
+
          assertEquals(1, server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages());
 
          sf.close();
@@ -2933,7 +3281,6 @@
 
    }
 
-
    public void testTwoQueuesDifferentFilters() throws Exception
    {
       boolean persistentMessages = true;
@@ -2959,7 +3306,7 @@
       try
       {
          ServerLocator locator = createInVMNonHALocator();
-         
+
          locator.setClientFailureCheckPeriod(120000);
          locator.setConnectionTTL(5000000);
          locator.setCallTimeout(120000);
@@ -2971,14 +3318,16 @@
          ClientSessionFactory sf = locator.createSessionFactory();
 
          ClientSession session = sf.createSession(false, false, false);
-         
+
          // note: if you want to change this, numberOfMessages has to be a multiple of NQUEUES
          int NQUEUES = 2;
-         
 
-         for (int i = 0 ; i < NQUEUES; i++)
+         for (int i = 0; i < NQUEUES; i++)
          {
-            session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=" + i), new SimpleString("propTest=" + i), true);
+            session.createQueue(PagingTest.ADDRESS,
+                                PagingTest.ADDRESS.concat("=" + i),
+                                new SimpleString("propTest=" + i),
+                                true);
          }
 
          ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
@@ -3012,20 +3361,20 @@
          for (int nqueue = 0; nqueue < NQUEUES; nqueue++)
          {
             ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=" + nqueue));
-   
-            for (int i = 0; i < (numberOfMessages /NQUEUES); i++)
+
+            for (int i = 0; i < (numberOfMessages / NQUEUES); i++)
             {
                message = consumer.receive(500000);
                assertNotNull(message);
                message.acknowledge();
-   
+
                assertEquals(nqueue, message.getIntProperty("propTest").intValue());
             }
-            
+
             assertNull(consumer.receiveImmediate());
-            
+
             consumer.close();
-   
+
             session.commit();
          }
 
@@ -3038,7 +3387,6 @@
             Thread.sleep(100);
          }
 
-         
          // It's async, so need to wait a bit for it happening
          assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
 
@@ -3056,11 +3404,8 @@
          {
          }
       }
-
    }
 
-
-
    public void testTwoQueues() throws Exception
    {
       boolean persistentMessages = true;
@@ -3086,7 +3431,7 @@
       try
       {
          ServerLocator locator = createInVMNonHALocator();
-         
+
          locator.setClientFailureCheckPeriod(120000);
          locator.setConnectionTTL(5000000);
          locator.setCallTimeout(120000);
@@ -3098,7 +3443,6 @@
          ClientSessionFactory sf = locator.createSessionFactory();
 
          ClientSession session = sf.createSession(false, false, false);
-         
 
          session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), null, true);
          session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), null, true);
@@ -3133,22 +3477,22 @@
          for (int msg = 1; msg <= 2; msg++)
          {
             ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=" + msg));
-   
+
             for (int i = 0; i < numberOfMessages; i++)
             {
                message = consumer.receive(500000);
                assertNotNull(message);
                message.acknowledge();
-   
-               //assertEquals(msg, message.getIntProperty("propTest").intValue());
-               
+
+               // assertEquals(msg, message.getIntProperty("propTest").intValue());
+
                System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest"));
             }
-   
+
             session.commit();
-            
+
             assertNull(consumer.receiveImmediate());
-            
+
             consumer.close();
          }
 
@@ -3162,10 +3506,9 @@
          }
 
          store.getCursorProvier().cleanup();
-         
+
          Thread.sleep(1000);
-         
-         
+
          // It's async, so need to wait a bit for it happening
          assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
 
@@ -3183,11 +3526,221 @@
          {
          }
       }
-
    }
 
+   public void testDLAOnLargeMessageAndPaging() throws Exception
+   {
+      clearData();
 
+      Configuration config = createDefaultConfig();
 
+      config.setJournalSyncNonTransactional(false);
+
+      Map<String, AddressSettings> settings = new HashMap<String, AddressSettings>();
+      AddressSettings dla = new AddressSettings();
+      dla.setMaxDeliveryAttempts(5);
+      dla.setDeadLetterAddress(new SimpleString("DLA"));
+      settings.put(ADDRESS.toString(), dla);
+
+      final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         session.createQueue("DLA", "DLA");
+
+         PagingStore pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
+         pgStoreAddress.startPaging();
+         PagingStore pgStoreDLA = server.getPagingManager().getPageStore(new SimpleString("DLA"));
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         for (int i = 0; i < 500; i++)
+         {
+            log.info("send message #" + i);
+            message = session.createMessage(true);
+
+            message.putStringProperty("id", "str" + i);
+
+            message.setBodyInputStream(createFakeLargeStream(messageSize));
+
+            producer.send(message);
+            
+            if ((i + 1) % 2 == 0)
+            {
+               session.commit();
+               if (i < 400)
+               {
+                  pgStoreAddress.forceAnotherPage();
+               }
+            }
+         }
+
+         session.commit();
+
+         session.start();
+
+         ClientConsumer cons = session.createConsumer(ADDRESS);
+
+         ClientMessage msg = null;
+         
+         for (int msgNr = 0 ; msgNr < 2; msgNr++)
+         {
+            for (int i = 0 ; i < 5; i++)
+            {
+               msg = cons.receive(5000);
+      
+               assertNotNull(msg);
+      
+               msg.acknowledge();
+      
+               assertEquals("str" + msgNr, msg.getStringProperty("id"));
+   
+               for (int j = 0; j < messageSize; j++)
+               {
+                  assertEquals(getSamplebyte(j), msg.getBodyBuffer().readByte());
+               }
+   
+               session.rollback();
+            }
+            
+            pgStoreDLA.startPaging();
+         }
+
+         for (int i = 2; i < 500; i++)
+         {
+            log.info("Received message " + i);
+            message = cons.receive(5000);
+            assertNotNull(message);
+            message.acknowledge();
+
+            message.saveToOutputStream(new OutputStream()
+            {
+               @Override
+               public void write(int b) throws IOException
+               {
+
+               }
+            });
+
+         }
+         
+         assertNull(cons.receiveImmediate());
+
+         cons.close();
+         
+         sf.close();
+         
+         locator.close();
+         
+         server.stop();
+         
+         server.start();
+         
+         locator = createInVMNonHALocator();
+         
+         sf = locator.createSessionFactory();
+         
+         session = sf.createSession(false, false);
+         
+         session.start();
+         
+         cons = session.createConsumer(ADDRESS);
+
+         for (int i = 2; i < 500; i++)
+         {
+            log.info("Received message " + i);
+            message = cons.receive(5000);
+            assertNotNull(message);
+            message.acknowledge();
+
+            message.saveToOutputStream(new OutputStream()
+            {
+               @Override
+               public void write(int b) throws IOException
+               {
+
+               }
+            });
+         }
+         
+         cons.close();
+         
+         cons = session.createConsumer("DLA");
+
+         for (int msgNr = 0 ; msgNr < 2; msgNr++)
+         {
+            msg = cons.receive(5000);
+
+            assertNotNull(msg);
+            
+            assertEquals("str" + msgNr, msg.getStringProperty("id"));
+
+            for (int i = 0; i < messageSize; i++)
+            {
+               assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte());
+            }
+   
+            msg.acknowledge();
+         }
+         
+         cons.close();
+         
+         cons = session.createConsumer(ADDRESS);
+         
+         session.commit();
+         
+         assertNull(cons.receiveImmediate());
+         
+         long timeout = System.currentTimeMillis() + 5000;
+         
+         pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
+         
+         pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries();
+         
+         pgStoreAddress.getCursorProvier().cleanup();
+         
+         while (timeout > System.currentTimeMillis() && pgStoreAddress.isPaging())
+         {
+            Thread.sleep(50);
+         }
+         
+         assertFalse(pgStoreAddress.isPaging());
+
+         session.commit();
+
+         session.close();
+      }
+      finally
+      {
+         locator.close();
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -97,7 +97,7 @@
 
       PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS),
                                                                          server.getStorageManager(),
-                                                                         server.getExecutorFactory(),
+                                                                         server.getExecutorFactory().getExecutor(),
                                                                          5);
 
       for (int i = 0; i < numberOfPages; i++)

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2011-05-12 15:53:41 UTC (rev 10648)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2011-05-13 04:10:25 UTC (rev 10649)
@@ -674,6 +674,15 @@
       {
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.SequentialFile#copyTo(org.hornetq.core.journal.SequentialFile)
+       */
+      public void copyTo(SequentialFile newFileName)
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
    }
 
    /* (non-Javadoc)



More information about the hornetq-commits mailing list