[hornetq-commits] JBoss hornetq SVN: r9788 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Oct 14 21:18:34 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-14 21:18:33 -0400 (Thu, 14 Oct 2010)
New Revision: 9788

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Treating PageTransactions over the cursors

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -24,8 +24,6 @@
  */
 public interface PageTransactionInfo extends EncodingSupport
 {
-   boolean waitCompletion(int timeoutMilliSeconds) throws Exception;
-
    boolean isCommit();
 
    boolean isRollback();
@@ -45,11 +43,9 @@
    void storeUpdate(StorageManager storageManager, PagingManager pagingManager, Transaction tx, int depages) throws Exception;
 
    // To be used after the update was stored or reload
-   void update(int update, StorageManager storageManager, PagingManager pagingManager);
+   void onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
 
    void increment();
 
    int getNumberOfMessages();
-
-   void markIncomplete();
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -28,7 +28,9 @@
  */
 public interface PagedMessage extends EncodingSupport
 {
-   ServerMessage getMessage(StorageManager storageManager);
+   ServerMessage getMessage();
+   
+   void initMessage(StorageManager storageManager);
 
    long getTransactionID();
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -13,7 +13,7 @@
 
 package org.hornetq.core.paging.cursor;
 
-import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.paging.PagedMessage;
 
 /**
  * A LivePageCache
@@ -25,7 +25,5 @@
 public interface LivePageCache extends PageCache
 {
    
-   void addLiveMessage(ServerMessage message);
-
-   void close();
+   void addLiveMessage(PagedMessage message);
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -14,7 +14,7 @@
 package org.hornetq.core.paging.cursor;
 
 import org.hornetq.core.paging.Page;
-import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.paging.PagedMessage;
 
 /**
  * A PageCache
@@ -29,7 +29,7 @@
 
    int getNumberOfMessages();
    
-   void setMessages(ServerMessage[] messages);
+   void setMessages(PagedMessage[] messages);
    
    /**
     * If this cache is still being updated
@@ -42,7 +42,7 @@
     * @param messageNumber The order of the message on the page
     * @return
     */
-   ServerMessage getMessage(int messageNumber);
+   PagedMessage getMessage(int messageNumber);
 
    /**
     * When the cache is being created,
@@ -54,4 +54,7 @@
     * You have to call this method within the same thread you called lock
     */
    void unlock();
+   
+   void close();
+
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -14,6 +14,7 @@
 package org.hornetq.core.paging.cursor;
 
 import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.transaction.Transaction;
 
@@ -31,7 +32,7 @@
    
    void stop();
    
-   Pair<PagePosition, ServerMessage> moveNext() throws Exception;
+   Pair<PagePosition, PagedMessage> moveNext() throws Exception;
 
    void ack(PagePosition position) throws Exception;
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -14,6 +14,7 @@
 package org.hornetq.core.paging.cursor;
 
 import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.server.ServerMessage;
 
@@ -56,9 +57,9 @@
     */
    PageCursor createCursor();
 
-   Pair<PagePosition, ServerMessage> getAfter(PagePosition pos) throws Exception;
+   Pair<PagePosition, PagedMessage> getAfter(PageCursor cursor, PagePosition pos) throws Exception;
    
-   ServerMessage getMessage(PagePosition pos) throws Exception;
+   PagedMessage getMessage(PagePosition pos) throws Exception;
 
    void processReload() throws Exception;
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -17,6 +17,7 @@
 import java.util.List;
 
 import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.cursor.LivePageCache;
 import org.hornetq.core.server.ServerMessage;
 
@@ -33,7 +34,7 @@
 
    // Attributes ----------------------------------------------------
    
-   private final List<ServerMessage> messages = new LinkedList<ServerMessage>();
+   private final List<PagedMessage> messages = new LinkedList<PagedMessage>();
    
    private final Page page;
    
@@ -74,10 +75,10 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCache#setMessages(org.hornetq.core.server.ServerMessage[])
     */
-   public synchronized void setMessages(ServerMessage[] messages)
+   public synchronized void setMessages(PagedMessage[] messages)
    {
       // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
-      for (ServerMessage msg : messages)
+      for (PagedMessage msg : messages)
       {
          addLiveMessage(msg);
       }
@@ -86,7 +87,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCache#getMessage(int)
     */
-   public synchronized ServerMessage getMessage(int messageNumber)
+   public synchronized PagedMessage getMessage(int messageNumber)
    {
       if (messageNumber < messages.size())
       {
@@ -125,7 +126,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.LivePageCache#addLiveMessage(org.hornetq.core.server.ServerMessage)
     */
-   public synchronized void addLiveMessage(ServerMessage message)
+   public synchronized void addLiveMessage(PagedMessage message)
    {
       this.messages.add(message);
    }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -17,8 +17,8 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.server.ServerMessage;
 
 /**
  * The caching associated to a single page.
@@ -29,28 +29,28 @@
  */
 public class PageCacheImpl implements PageCache
 {
-   
+
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
 
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-   
-   private ServerMessage[] messages;
-   
+
+   private PagedMessage[] messages;
+
    private final Page page;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
-   
-   public PageCacheImpl(Page page)
+
+   public PageCacheImpl(final Page page)
    {
       this.page = page;
    }
 
    // Public --------------------------------------------------------
-   
+
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCache#getPage()
     */
@@ -62,7 +62,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCache#getMessage(int)
     */
-   public ServerMessage getMessage(int messageNumber)
+   public PagedMessage getMessage(final int messageNumber)
    {
       lock.readLock().lock();
       try
@@ -81,22 +81,22 @@
          lock.readLock().unlock();
       }
    }
-   
+
    public void lock()
    {
       lock.writeLock().lock();
    }
-   
+
    public void unlock()
    {
       lock.writeLock().unlock();
    }
-   
-   public void setMessages(ServerMessage[] messages)
+
+   public void setMessages(final PagedMessage[] messages)
    {
       this.messages = messages;
    }
-   
+
    public int getNumberOfMessages()
    {
       lock.readLock().lock();
@@ -110,6 +110,10 @@
       }
    }
 
+   public void close()
+   {
+   }
+
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCache#isLive()
     */
@@ -117,13 +121,13 @@
    {
       return false;
    }
-   
+
+   @Override
    public String toString()
    {
       return "PageCacheImpl::page=" + page.getPageId() + " numberOfMessages = " + messages.length;
    }
 
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -29,6 +29,7 @@
 import org.hornetq.api.core.Pair;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.cursor.PageCache;
 import org.hornetq.core.paging.cursor.PageCursor;
@@ -109,14 +110,14 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
     */
-   public synchronized Pair<PagePosition, ServerMessage> moveNext() throws Exception
+   public synchronized Pair<PagePosition, PagedMessage> moveNext() throws Exception
    {
       PagePosition redeliveryPos = null;
 
       // Redeliveries will take precedence
       if ((redeliveryPos = redeliveries.poll()) != null)
       {
-         return new Pair<PagePosition, ServerMessage>(redeliveryPos, cursorProvider.getMessage(redeliveryPos));
+         return new Pair<PagePosition, PagedMessage>(redeliveryPos, cursorProvider.getMessage(redeliveryPos));
       }
 
       if (lastPosition == null)
@@ -128,17 +129,17 @@
 
       boolean match = false;
 
-      Pair<PagePosition, ServerMessage> message = null;
+      Pair<PagePosition, PagedMessage> message = null;
 
       do
       {
-         message = cursorProvider.getAfter(lastPosition);
+         message = cursorProvider.getAfter(this, lastPosition);
 
          if (message != null)
          {
             lastPosition = message.a;
 
-            match = match(message.b);
+            match = match(message.b.getMessage());
 
             if (!match)
             {
@@ -246,7 +247,7 @@
                   // looking for holes on the ack list for redelivery
                   while (true)
                   {
-                     Pair<PagePosition, ServerMessage> msgCheck = cursorProvider.getAfter(tmpPos);
+                     Pair<PagePosition, PagedMessage> msgCheck = cursorProvider.getAfter(this, tmpPos);
 
                      positions = getPageInfo(tmpPos);
 
@@ -258,7 +259,7 @@
                      }
                      else
                      {
-                        if (match(msgCheck.b))
+                        if (match(msgCheck.b.getMessage()))
                         {
                            redeliver(msgCheck.a);
                         }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -106,8 +106,21 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
     */
-   public Pair<PagePosition, ServerMessage> getAfter(final PagePosition pos) throws Exception
+   public Pair<PagePosition, PagedMessage> getAfter(PageCursor cursor, final PagePosition pos) throws Exception
    {
+
+      while(true)
+      {
+         Pair<PagePosition, PagedMessage> retPos = internalAfter(pos);
+         
+         
+         
+         return retPos;
+      }
+   }
+   
+   private Pair<PagePosition, PagedMessage> internalAfter(final PagePosition pos)
+   {
       // TODO: consider page transactions here to avoid receiving an uncommitted message
       // TODO: consider the case where a full page is ignored because of a TX
       PagePosition retPos = pos.nextMessage();
@@ -131,11 +144,11 @@
          }
       }
       
-      ServerMessage serverMessage = cache.getMessage(retPos.getMessageNr());
+      PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
       
       if (serverMessage != null)
       {
-         return new Pair<PagePosition, ServerMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
+         return new Pair<PagePosition, PagedMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
       }
       else
       {
@@ -143,7 +156,7 @@
       }
    }
 
-   public ServerMessage getMessage(final PagePosition pos) throws Exception
+   public PagedMessage getMessage(final PagePosition pos) throws Exception
    {
       PageCache cache = getPageCache(pos);
 
@@ -257,16 +270,13 @@
 
                List<PagedMessage> pgdMessages = page.read();
 
-               ServerMessage srvMessages[] = new ServerMessage[pgdMessages.size()];
-
                int i = 0;
                for (PagedMessage pdgMessage : pgdMessages)
                {
-                  ServerMessage message = pdgMessage.getMessage(storageManager);
-                  srvMessages[i++] = message;
+                  pdgMessage.initMessage(storageManager);
                }
 
-               cache.setMessages(srvMessages);
+               cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()]));
 
             }
             finally

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -181,7 +181,7 @@
       
       if (pageCache != null)
       {
-         pageCache.addLiveMessage(message.getMessage(storageManager));
+         pageCache.addLiveMessage(message);
       }
 
       numberOfMessages.incrementAndGet();

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -13,8 +13,6 @@
 
 package org.hornetq.core.paging.impl;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.api.core.HornetQBuffer;
@@ -43,12 +41,10 @@
 
    private volatile long recordID = -1;
 
-   private volatile CountDownLatch countDownCompleted;
+   private volatile boolean committed = false;
 
-   private volatile boolean committed;
+   private volatile boolean rolledback = false;
 
-   private volatile boolean rolledback;
-
    private AtomicInteger numberOfMessages = new AtomicInteger(0);
 
    // Static --------------------------------------------------------
@@ -59,7 +55,6 @@
    {
       this();
       this.transactionID = transactionID;
-      countDownCompleted = new CountDownLatch(1);
    }
 
    public PageTransactionInfoImpl()
@@ -83,7 +78,7 @@
       return transactionID;
    }
 
-   public void update(final int update, final StorageManager storageManager, PagingManager pagingManager)
+   public void onUpdate(final int update, final StorageManager storageManager, PagingManager pagingManager)
    {
       int sizeAfterUpdate = numberOfMessages.addAndGet(-update);
       if (sizeAfterUpdate == 0 && storageManager != null)
@@ -120,7 +115,6 @@
    {
       transactionID = buffer.readLong();
       numberOfMessages.set(buffer.readInt());
-      countDownCompleted = null;
       committed = true;
    }
 
@@ -135,27 +129,11 @@
       return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
    }
 
-   public void commit()
+   public synchronized void commit()
    {
       committed = true;
-      /** 
-       * this is to avoid a race condition where the transaction still being committed while another thread is depaging messages
-       */
-      countDownCompleted.countDown();
    }
 
-   public boolean waitCompletion(final int timeoutMilliseconds) throws InterruptedException
-   {
-      if (countDownCompleted == null)
-      {
-         return true;
-      }
-      else
-      {
-         return countDownCompleted.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
-      }
-   }
-
    public void store(final StorageManager storageManager, PagingManager pagingManager, final Transaction tx) throws Exception
    {
       storageManager.storePageTransaction(tx.getID(), this);
@@ -194,7 +172,7 @@
          
          public void afterCommit(Transaction tx)
          {
-            pgToUpdate.update(depages, storageManager, pagingManager);
+            pgToUpdate.onUpdate(depages, storageManager, pagingManager);
          }
       });
    }
@@ -209,21 +187,12 @@
       return rolledback;
    }
 
-   public void rollback()
+   public synchronized void rollback()
    {
       rolledback = true;
       committed = false;
-      countDownCompleted.countDown();
    }
 
-   public void markIncomplete()
-   {
-      committed = false;
-      rolledback = false;
-
-      countDownCompleted = new CountDownLatch(1);
-   }
-
    public String toString()
    {
       return "PageTransactionInfoImpl(transactionID=" + transactionID +

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -67,8 +67,13 @@
    {
    }
 
-   public ServerMessage getMessage(final StorageManager storage)
+   public ServerMessage getMessage()
    {
+      return message;
+   }
+   
+   public void initMessage(StorageManager storage)
+   {
       if (largeMessageLazyData != null)
       {
          message = storage.createLargeMessage();
@@ -76,7 +81,6 @@
          message.decodeHeadersAndProperties(buffer);
          largeMessageLazyData = null;
       }
-      return message;
    }
 
    public long getTransactionID()

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -1019,7 +1019,7 @@
 
       for (PagedMessage pagedMessage : pagedMessages)
       {
-         ServerMessage message = pagedMessage.getMessage(storageManager);
+         ServerMessage message = pagedMessage.getMessage();
 
          if (message.isLargeMessage())
          {
@@ -1060,7 +1060,7 @@
                // This is to avoid a race condition where messages are depaged
                // before the commit arrived
 
-               while (running && !pageUserTransaction.waitCompletion(500))
+               while (running)
                {
                   // This is just to give us a chance to interrupt the process..
                   // if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -948,7 +948,7 @@
 
                   PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX);
 
-                  pageTX.update(pageUpdate.recods, null, null);
+                  pageTX.onUpdate(pageUpdate.recods, null, null);
                }
                else
                {
@@ -1534,8 +1534,6 @@
 
                   pageTransactionInfo.decode(buff);
 
-                  pageTransactionInfo.markIncomplete();
-
                   tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
 
                   pagingManager.addTransaction(pageTransactionInfo);

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -548,7 +548,8 @@
    private void handlePageWrite(final ReplicationPageWriteMessage packet) throws Exception
    {
       PagedMessage pgdMessage = packet.getPagedMessage();
-      ServerMessage msg = pgdMessage.getMessage(storage);
+      pgdMessage.initMessage(storage);
+      ServerMessage msg = pgdMessage.getMessage();
       Page page = getPage(msg.getAddress(), packet.getPageNumber());
       page.write(pgdMessage);
    }

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -21,6 +21,7 @@
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.config.Configuration;
+import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.cursor.PageCache;
 import org.hornetq.core.paging.cursor.PageCursor;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
@@ -110,12 +111,12 @@
       
       PageCursor cursor = cursorProvider.createCursor();
       
-      Pair<PagePosition, ServerMessage> msg;
+      Pair<PagePosition, PagedMessage> msg;
       
       int key = 0;
       while ((msg = cursor.moveNext()) != null)
       {
-         assertEquals(key++, msg.b.getIntProperty("key").intValue());
+         assertEquals(key++, msg.b.getMessage().getIntProperty("key").intValue());
          cursor.ack(msg.a);
       }
       assertEquals(NUM_MESSAGES, key);
@@ -169,9 +170,9 @@
 
       for (int i = 0 ; i < 1000 ; i++)
       {
-         Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg =  cursor.moveNext();
          assertNotNull(msg);
-         assertEquals(i, msg.b.getIntProperty("key").intValue());
+         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          
          if (i < firstPageSize)
          {
@@ -193,9 +194,9 @@
       for (int i = firstPageSize; i < NUM_MESSAGES; i++)
       {
          System.out.println("Received " + i);
-         Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
+         Pair<PagePosition, PagedMessage> msg =  cursor.moveNext();
          assertNotNull(msg);
-         assertEquals(i, msg.b.getIntProperty("key").intValue());
+         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          
          cursor.ack(msg.a);
          
@@ -226,8 +227,8 @@
       System.out.println("Cursor: " + cursor);
       for (int i = 0 ; i < 100 ; i++)
       {
-         Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
-         assertEquals(i, msg.b.getIntProperty("key").intValue());
+         Pair<PagePosition, PagedMessage> msg =  cursor.moveNext();
+         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          if (i < 10 || i > 20)
          {
             cursor.ack(msg.a);
@@ -242,16 +243,16 @@
       
       for (int i = 10; i <= 20; i++)
       {
-         Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
-         assertEquals(i, msg.b.getIntProperty("key").intValue());
+         Pair<PagePosition, PagedMessage> msg =  cursor.moveNext();
+         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          cursor.ack(msg.a);
       }
     
       
       for (int i = 100; i < NUM_MESSAGES; i++)
       {
-         Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
-         assertEquals(i, msg.b.getIntProperty("key").intValue());
+         Pair<PagePosition, PagedMessage> msg =  cursor.moveNext();
+         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          cursor.ack(msg.a);
       }
       
@@ -276,8 +277,8 @@
       Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
       for (int i = 0 ; i < 100 ; i++)
       {
-         Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
-         assertEquals(i, msg.b.getIntProperty("key").intValue());
+         Pair<PagePosition, PagedMessage> msg =  cursor.moveNext();
+         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          if (i < 10 || i > 20)
          {
             cursor.ackTx(tx, msg.a);
@@ -296,15 +297,15 @@
       
       for (int i = 10; i <= 20; i++)
       {
-         Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
-         assertEquals(i, msg.b.getIntProperty("key").intValue());
+         Pair<PagePosition, PagedMessage> msg =  cursor.moveNext();
+         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          cursor.ackTx(tx,msg.a);
       }
     
       for (int i = 100; i < NUM_MESSAGES; i++)
       {
-         Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
-         assertEquals(i, msg.b.getIntProperty("key").intValue());
+         Pair<PagePosition, PagedMessage> msg =  cursor.moveNext();
+         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
          cursor.ackTx(tx,msg.a);
       }
       
@@ -344,13 +345,13 @@
 
          Assert.assertTrue(pageStore.page(msg));
          
-         Pair<PagePosition, ServerMessage> readMessage = cursor.moveNext();
+         Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
          
          assertNotNull(readMessage);
          
          cursor.ack(readMessage.a);
          
-         assertEquals(i, readMessage.b.getIntProperty("key").intValue());
+         assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
          
          assertNull(cursor.moveNext());
       }

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -106,10 +106,10 @@
 
       for (int i = 0; i < msgs.size(); i++)
       {
-         Assert.assertEquals(simpleDestination, msgs.get(i).getMessage(null).getAddress());
+         Assert.assertEquals(simpleDestination, msgs.get(i).getMessage().getAddress());
 
          UnitTestCase.assertEqualsByteArrays(buffers.get(i).toByteBuffer().array(), msgs.get(i)
-                                                                                        .getMessage(null)
+                                                                                        .getMessage()
                                                                                         .getBodyBuffer()
                                                                                         .toByteBuffer()
                                                                                         .array());
@@ -178,10 +178,10 @@
 
       for (int i = 0; i < msgs.size(); i++)
       {
-         Assert.assertEquals(simpleDestination, msgs.get(i).getMessage(null).getAddress());
+         Assert.assertEquals(simpleDestination, msgs.get(i).getMessage().getAddress());
 
          UnitTestCase.assertEqualsByteArrays(buffers.get(i).toByteBuffer().array(), msgs.get(i)
-                                                                                        .getMessage(null)
+                                                                                        .getMessage()
                                                                                         .getBodyBuffer()
                                                                                         .toByteBuffer()
                                                                                         .array());

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -98,7 +98,7 @@
       Assert.assertEquals(1, msgs.size());
 
       UnitTestCase.assertEqualsByteArrays(msg.getBodyBuffer().writerIndex(), msg.getBodyBuffer().toByteBuffer().array(), msgs.get(0)
-                                                                                          .getMessage(null)
+                                                                                          .getMessage()
                                                                                           .getBodyBuffer()
                                                                                           .toByteBuffer()
                                                                                           .array());

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-10-15 01:18:33 UTC (rev 9788)
@@ -290,7 +290,7 @@
       for (int i = 0; i < numMessages; i++)
       {
          HornetQBuffer horn1 = buffers.get(i);
-         HornetQBuffer horn2 = msg.get(i).getMessage(null).getBodyBuffer();
+         HornetQBuffer horn2 = msg.get(i).getMessage().getBodyBuffer();
          horn1.resetReaderIndex();
          horn2.resetReaderIndex();
          for (int j = 0; j < horn1.writerIndex(); j++)
@@ -368,9 +368,9 @@
 
          for (int i = 0; i < 5; i++)
          {
-            Assert.assertEquals(sequence++, msg.get(i).getMessage(null).getMessageID());
+            Assert.assertEquals(sequence++, msg.get(i).getMessage().getMessageID());
             UnitTestCase.assertEqualsBuffers(18, buffers.get(pageNr * 5 + i), msg.get(i)
-                                                                                 .getMessage(null)
+                                                                                 .getMessage()
                                                                                  .getBodyBuffer());
          }
       }
@@ -413,9 +413,9 @@
 
       Assert.assertEquals(1, msgs.size());
 
-      Assert.assertEquals(1l, msgs.get(0).getMessage(null).getMessageID());
+      Assert.assertEquals(1l, msgs.get(0).getMessage().getMessageID());
 
-      UnitTestCase.assertEqualsBuffers(18, buffers.get(0), msgs.get(0).getMessage(null).getBodyBuffer());
+      UnitTestCase.assertEqualsBuffers(18, buffers.get(0), msgs.get(0).getMessage().getBodyBuffer());
 
       Assert.assertEquals(1, storeImpl.getNumberOfPages());
 
@@ -594,14 +594,14 @@
 
          for (PagedMessage msg : msgs)
          {
-            long id = msg.getMessage(null).getBodyBuffer().readLong();
-            msg.getMessage(null).getBodyBuffer().resetReaderIndex();
+            long id = msg.getMessage().getBodyBuffer().readLong();
+            msg.getMessage().getBodyBuffer().resetReaderIndex();
 
             ServerMessage msgWritten = buffers.remove(id);
-            buffers2.put(id, msg.getMessage(null));
+            buffers2.put(id, msg.getMessage());
             Assert.assertNotNull(msgWritten);
-            Assert.assertEquals(msg.getMessage(null).getAddress(), msgWritten.getAddress());
-            UnitTestCase.assertEqualsBuffers(10, msgWritten.getBodyBuffer(), msg.getMessage(null).getBodyBuffer());
+            Assert.assertEquals(msg.getMessage().getAddress(), msgWritten.getAddress());
+            UnitTestCase.assertEqualsBuffers(10, msgWritten.getBodyBuffer(), msg.getMessage().getBodyBuffer());
          }
       }
 
@@ -667,13 +667,13 @@
          for (PagedMessage msg : msgs)
          {
 
-            long id = msg.getMessage(null).getBodyBuffer().readLong();
+            long id = msg.getMessage().getBodyBuffer().readLong();
             ServerMessage msgWritten = buffers2.remove(id);
             Assert.assertNotNull(msgWritten);
-            Assert.assertEquals(msg.getMessage(null).getAddress(), msgWritten.getAddress());
+            Assert.assertEquals(msg.getMessage().getAddress(), msgWritten.getAddress());
             UnitTestCase.assertEqualsByteArrays(msgWritten.getBodyBuffer().writerIndex(),
                                                 msgWritten.getBodyBuffer().toByteBuffer().array(),
-                                                msg.getMessage(null).getBodyBuffer().toByteBuffer().array());
+                                                msg.getMessage().getBodyBuffer().toByteBuffer().array());
          }
       }
 
@@ -682,8 +682,8 @@
       lastPage.close();
       Assert.assertEquals(1, lastMessages.size());
 
-      lastMessages.get(0).getMessage(null).getBodyBuffer().resetReaderIndex();
-      Assert.assertEquals(lastMessages.get(0).getMessage(null).getBodyBuffer().readLong(), lastMessageId);
+      lastMessages.get(0).getMessage().getBodyBuffer().resetReaderIndex();
+      Assert.assertEquals(lastMessages.get(0).getMessage().getBodyBuffer().readLong(), lastMessageId);
 
       Assert.assertEquals(0, buffers2.size());
 
@@ -796,7 +796,7 @@
  
                      for (PagedMessage pgmsg : messages)
                      {
-                        ServerMessage msg = pgmsg.getMessage(null);
+                        ServerMessage msg = pgmsg.getMessage();
 
                         assertEquals(msgsRead++, msg.getMessageID());
 



More information about the hornetq-commits mailing list