[hornetq-commits] JBoss hornetq SVN: r10455 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/cursor and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Apr 5 12:17:19 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-04-05 12:17:19 -0400 (Tue, 05 Apr 2011)
New Revision: 10455

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/Page.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCache.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
JBPAPP-6237 - delaying delete of large messages when sent through paging

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/Page.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/Page.java	2011-04-05 14:47:00 UTC (rev 10454)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/Page.java	2011-04-05 16:17:19 UTC (rev 10455)
@@ -16,6 +16,7 @@
 import java.util.List;
 
 import org.hornetq.core.paging.cursor.LivePageCache;
+import org.hornetq.core.persistence.StorageManager;
 
 /**
  * 
@@ -29,7 +30,7 @@
 
    void write(PagedMessage message) throws Exception;
 
-   List<PagedMessage> read() throws Exception;
+   List<PagedMessage> read(StorageManager storage) throws Exception;
    
    void setLiveCache(LivePageCache pageCache);
 
@@ -43,5 +44,5 @@
 
    void close() throws Exception;
 
-   boolean delete() throws Exception;
+   boolean delete(PagedMessage[] messages) throws Exception;
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java	2011-04-05 14:47:00 UTC (rev 10454)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java	2011-04-05 16:17:19 UTC (rev 10455)
@@ -108,7 +108,7 @@
                System.out.println("*******   Page " + pgid);
                Page page = pgStore.createPage(pgid);
                page.open();
-               List<PagedMessage> msgs = page.read();
+               List<PagedMessage> msgs = page.read(sm);
                page.close();
 
                int msgID = 0;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCache.java	2011-04-05 14:47:00 UTC (rev 10454)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCache.java	2011-04-05 16:17:19 UTC (rev 10455)
@@ -34,6 +34,8 @@
    
    void setMessages(PagedMessage[] messages);
    
+   PagedMessage[] getMessages();
+   
    /**
     * If this cache is still being updated
     * @return

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java	2011-04-05 14:47:00 UTC (rev 10454)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java	2011-04-05 16:17:19 UTC (rev 10455)
@@ -143,7 +143,15 @@
       this.isLive = false;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCache#getMessages()
+    */
+   public PagedMessage[] getMessages()
+   {
+      return messages.toArray(new PagedMessage[messages.size()]);
+   }
 
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java	2011-04-05 14:47:00 UTC (rev 10454)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java	2011-04-05 16:17:19 UTC (rev 10455)
@@ -133,6 +133,14 @@
       return "PageCacheImpl::page=" + page.getPageId() + " numberOfMessages = " + messages.length;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCache#getMessages()
+    */
+   public PagedMessage[] getMessages()
+   {
+      return messages;
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-04-05 14:47:00 UTC (rev 10454)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-04-05 16:17:19 UTC (rev 10455)
@@ -143,6 +143,74 @@
       return getPageCache(pos.getPageNr());
    }
 
+   public PageCache getPageCache(final long pageId)
+   {
+      try
+      {
+         boolean needToRead = false;
+         PageCache cache = null;
+         synchronized (softCache)
+         {
+            if (pageId > pagingStore.getCurrentWritingPage())
+            {
+               return null;
+            }
+
+            cache = softCache.get(pageId);
+            if (cache == null)
+            {
+               if (!pagingStore.checkPage((int)pageId))
+               {
+                  return null;
+               }
+
+               cache = createPageCache(pageId);
+               needToRead = true;
+               // anyone reading from this cache will have to wait reading to finish first
+               // we also want only one thread reading this cache
+               cache.lock();
+               softCache.put(pageId, cache);
+            }
+         }
+
+         // Reading is done outside of the synchronized block, however
+         // the page stays locked until the entire reading is finished
+         if (needToRead)
+         {
+            Page page = null;
+            try
+            {
+               page = pagingStore.createPage((int)pageId);
+
+               page.open();
+
+               List<PagedMessage> pgdMessages = page.read(storageManager);
+               cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()]));
+            }
+            finally
+            {
+               try
+               {
+                  if (page != null)
+                  {
+                     page.close();
+                  }
+               }
+               catch (Throwable ignored)
+               {
+               }
+               cache.unlock();
+            }
+         }
+
+         return cache;
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException("Couldn't complete paging due to an IO Exception on Paging - " + e.getMessage(), e);
+      }
+   }
+
    public void addPageCache(PageCache cache)
    {
       synchronized (softCache)
@@ -337,9 +405,31 @@
       {
          for (Page depagedPage : depagedPages)
          {
-            depagedPage.delete();
+            PageCache cache;
+            PagedMessage[] pgdMessages;
             synchronized (softCache)
             {
+               cache = softCache.remove((long)depagedPage.getPageId());
+            }
+            
+            if (cache == null)
+            {
+               // The page is not on cache any more
+               // We need to read the page-file before deleting it
+               // to make sure we remove any large-messages pending
+               depagedPage.open();
+               List<PagedMessage> pgdMessagesList = depagedPage.read(storageManager);
+               depagedPage.close();
+               pgdMessages = pgdMessagesList.toArray(new PagedMessage[pgdMessagesList.size()]);
+            }
+            else
+            {
+               pgdMessages = cache.getMessages();
+            }
+            
+            depagedPage.delete(pgdMessages);
+            synchronized (softCache)
+            {
                softCache.remove((long)depagedPage.getPageId());
             }
          }
@@ -422,79 +512,6 @@
 
    }
 
-   private PageCache getPageCache(final long pageId)
-   {
-      try
-      {
-         boolean needToRead = false;
-         PageCache cache = null;
-         synchronized (softCache)
-         {
-            if (pageId > pagingStore.getCurrentWritingPage())
-            {
-               return null;
-            }
-
-            cache = softCache.get(pageId);
-            if (cache == null)
-            {
-               if (!pagingStore.checkPage((int)pageId))
-               {
-                  return null;
-               }
-
-               cache = createPageCache(pageId);
-               needToRead = true;
-               // anyone reading from this cache will have to wait reading to finish first
-               // we also want only one thread reading this cache
-               cache.lock();
-               softCache.put(pageId, cache);
-            }
-         }
-
-         // Reading is done outside of the synchronized block, however
-         // the page stays locked until the entire reading is finished
-         if (needToRead)
-         {
-            Page page = null;
-            try
-            {
-               page = pagingStore.createPage((int)pageId);
-
-               page.open();
-
-               List<PagedMessage> pgdMessages = page.read();
-
-               for (PagedMessage pdgMessage : pgdMessages)
-               {
-                  pdgMessage.initMessage(storageManager);
-               }
-               cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()]));
-            }
-            finally
-            {
-               try
-               {
-                  if (page != null)
-                  {
-                     page.close();
-                  }
-               }
-               catch (Throwable ignored)
-               {
-               }
-               cache.unlock();
-            }
-         }
-
-         return cache;
-      }
-      catch (Exception e)
-      {
-         throw new RuntimeException("Couldn't complete paging due to an IO Exception on Paging - " + e.getMessage(), e);
-      }
-   }
-
    // Inner classes -------------------------------------------------
 
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageImpl.java	2011-04-05 14:47:00 UTC (rev 10454)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageImpl.java	2011-04-05 16:17:19 UTC (rev 10455)
@@ -28,6 +28,7 @@
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.cursor.LivePageCache;
 import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.utils.DataConstants;
 
 /**
@@ -101,7 +102,7 @@
       this.pageCache = pageCache;
    }
 
-   public List<PagedMessage> read() throws Exception
+   public List<PagedMessage> read(StorageManager storage) throws Exception
    {
       ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
 
@@ -140,6 +141,7 @@
                      // constraint was already checked
                      throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE " + b);
                   }
+                  msg.initMessage(storage);
                   messages.add(msg);
                }
                else
@@ -218,13 +220,29 @@
       file.close();
    }
 
-   public boolean delete() throws Exception
+   public boolean delete(final PagedMessage[] messages) throws Exception
    {
       if (storageManager != null)
       {
          storageManager.pageDeleted(storeName, pageId);
       }
 
+      if (messages != null)
+      {
+         for (PagedMessage msg : messages)
+         {
+            if (msg.getMessage().isLargeMessage())
+            {
+               LargeServerMessage lmsg = (LargeServerMessage)msg.getMessage();
+               
+               // Remember, cannot call delete directly here
+               // Because the large-message may be linked to another message
+               // or it may still being delivered even though it has been acked already
+               lmsg.decrementDelayDeletionCount();
+            }
+         }
+      }
+
       try
       {
          if (suspiciousRecords)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java	2011-04-05 14:47:00 UTC (rev 10454)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java	2011-04-05 16:17:19 UTC (rev 10455)
@@ -79,9 +79,11 @@
    {
       if (largeMessageLazyData != null)
       {
-         message = storage.createLargeMessage();
+         LargeServerMessage lgMessage = storage.createLargeMessage();
+         message = lgMessage;
          HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(largeMessageLazyData);
          message.decodeHeadersAndProperties(buffer);
+         lgMessage.incrementDelayDeletionCount();
          largeMessageLazyData = null;
       }
    }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-04-05 14:47:00 UTC (rev 10454)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-04-05 16:17:19 UTC (rev 10455)
@@ -462,13 +462,12 @@
                   currentPage = createPage(currentPageId);
                   currentPage.open();
 
-                  List<PagedMessage> messages = currentPage.read();
+                  List<PagedMessage> messages = currentPage.read(storageManager);
 
                   LivePageCache pageCache = new LivePageCacheImpl(currentPage);
 
                   for (PagedMessage msg : messages)
                   {
-                     msg.initMessage(storageManager);
                      pageCache.addLiveMessage(msg);
                   }
 
@@ -646,7 +645,7 @@
                {
                   stopPaging();
                   returnPage.open();
-                  returnPage.delete();
+                  returnPage.delete(null);
 
                   // This will trigger this address to exit the page mode,
                   // and this will make HornetQ start using the journal again

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-04-05 14:47:00 UTC (rev 10454)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-04-05 16:17:19 UTC (rev 10455)
@@ -533,7 +533,7 @@
          {
             if (deletePages)
             {
-               page.delete();
+               page.delete(null);
             }
          }
          else

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2011-04-05 14:47:00 UTC (rev 10454)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2011-04-05 16:17:19 UTC (rev 10455)
@@ -13,6 +13,7 @@
 
 package org.hornetq.tests.integration.client;
 
+import java.io.ByteArrayOutputStream;
 import java.util.HashMap;
 
 import javax.transaction.xa.XAResource;
@@ -139,17 +140,16 @@
       }
    }
 
-
    public void testLargeBufferTransacted() throws Exception
    {
       doTestLargeBuffer(true);
    }
-   
+
    public void testLargeBufferNotTransacted() throws Exception
    {
       doTestLargeBuffer(false);
    }
-   
+
    public void doTestLargeBuffer(boolean transacted) throws Exception
    {
       final int journalsize = 100 * 1024;
@@ -162,10 +162,10 @@
       {
          Configuration config = createDefaultConfig(isNetty());
          config.setJournalFileSize(journalsize);
-         
+
          config.setJournalBufferSize_AIO(10 * 1024);
          config.setJournalBufferSize_NIO(10 * 1024);
-         
+
          server = createServer(true, config);
 
          server.start();
@@ -179,11 +179,10 @@
          ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
 
          Message clientFile = session.createMessage(true);
-         for (int i = 0 ; i < messageSize; i++)
+         for (int i = 0; i < messageSize; i++)
          {
             clientFile.getBodyBuffer().writeByte(getSamplebyte(i));
          }
-         
 
          producer.send(clientFile);
 
@@ -197,26 +196,24 @@
          ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
          ClientMessage msg1 = consumer.receive(5000);
          assertNotNull(msg1);
-         
+
          Assert.assertNotNull(msg1);
-         
-         for (int i = 0 ; i < messageSize; i++)
+
+         for (int i = 0; i < messageSize; i++)
          {
-            //System.out.print(msg1.getBodyBuffer().readByte() + "  ");
-            //if (i % 100 == 0) System.out.println();
-            assertEquals("position = "  + i, getSamplebyte(i), msg1.getBodyBuffer().readByte());
+            // System.out.print(msg1.getBodyBuffer().readByte() + "  ");
+            // if (i % 100 == 0) System.out.println();
+            assertEquals("position = " + i, getSamplebyte(i), msg1.getBodyBuffer().readByte());
          }
-       
+
          msg1.acknowledge();
-        
+
          consumer.close();
-         
-         
+
          if (transacted)
          {
             session.commit();
          }
-         
 
          session.close();
 
@@ -880,7 +877,6 @@
 
          producer2.send(msg1);
 
-
          session.commit();
 
          ClientMessage msg2 = consumer2.receive(10000);
@@ -939,9 +935,8 @@
 
          server.start();
 
-         
          locator.setMinLargeMessageSize(200);
-         
+
          locator.setCacheLargeMessagesClient(true);
 
          ClientSessionFactory sf = locator.createSessionFactory();
@@ -968,19 +963,19 @@
          session.commit();
 
          compareString(messageSize, msgReceived);
-         
+
          msgReceived.getBodyBuffer().readerIndex(0);
-         
+
          producer.send(msgReceived);
 
          session.commit();
-         
+
          ClientMessage msgReceived2 = consumer.receive(10000);
 
          msgReceived2.acknowledge();
 
          compareString(messageSize, msgReceived2);
-         
+
          session.commit();
 
          session.close();
@@ -1016,7 +1011,7 @@
       assertNotNull(msg);
       for (long i = 0; i < messageSize; i++)
       {
-         Assert.assertEquals("position "  + i, UnitTestCase.getSamplebyte(i), msg.getBodyBuffer().readByte());
+         Assert.assertEquals("position " + i, UnitTestCase.getSamplebyte(i), msg.getBodyBuffer().readByte());
       }
    }
 
@@ -2365,7 +2360,6 @@
 
          server.start();
 
-
          locator.setMinLargeMessageSize(1024);
          locator.setConsumerWindowSize(1024 * 1024);
 
@@ -2469,7 +2463,6 @@
 
          server.start();
 
-
          locator.setMinLargeMessageSize(100 * 1024);
 
          ClientSessionFactory sf = locator.createSessionFactory();
@@ -2629,18 +2622,17 @@
       try
       {
          LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager)server.getStorageManager());
-         
+
          fileMessage.setMessageID(1005);
 
          for (int i = 0; i < LARGE_MESSAGE_SIZE; i++)
          {
             fileMessage.addBytes(new byte[] { UnitTestCase.getSamplebyte(i) });
          }
-         
+
          // The server would be doing this
          fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, LARGE_MESSAGE_SIZE);
 
- 
          fileMessage.releaseResources();
 
          session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
@@ -2821,6 +2813,158 @@
 
    }
 
+   public void testPageOnLargeMessageMultipleQueues() throws Exception
+   {
+      Configuration config = createDefaultConfig(isNetty());
+
+      final int PAGE_MAX = 20 * 1024;
+
+      final int PAGE_SIZE = 10 * 1024;
+
+      HashMap<String, AddressSettings> map = new HashMap<String, AddressSettings>();
+
+      AddressSettings value = new AddressSettings();
+      map.put(LargeMessageTest.ADDRESS.toString(), value);
+      server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+      server.start();
+
+      final int numberOfBytes = 1024;
+
+      final int numberOfBytesBigMessage = 400000;
+
+      try
+      {
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS.concat("-0"), null, true);
+         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS.concat("-1"), null, true);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         for (int i = 0; i < 100; i++)
+         {
+            message = session.createMessage(true);
+
+            message.getBodyBuffer().writerIndex(0);
+
+            message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
+
+            for (int j = 1; j <= numberOfBytes; j++)
+            {
+               message.getBodyBuffer().writeInt(j);
+            }
+
+            producer.send(message);
+         }
+
+         ClientMessage clientFile = createLargeClientMessage(session, numberOfBytesBigMessage);
+         clientFile.putBooleanProperty("TestLarge", true);
+         producer.send(clientFile);
+
+         for (int i = 0; i < 100; i++)
+         {
+            message = session.createMessage(true);
+
+            message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
+
+            producer.send(message);
+         }
+
+         session.close();
+
+         server.stop();
+
+         server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+         server.start();
+
+         sf = locator.createSessionFactory();
+
+         for (int ad = 0; ad < 2; ad++)
+         {
+            session = sf.createSession(false, false, false);
+
+            ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
+
+            session.start();
+
+            for (int i = 0; i < 100; i++)
+            {
+               ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+
+               Assert.assertNotNull(message2);
+
+               message2.acknowledge();
+
+               Assert.assertNotNull(message2);
+            }
+
+            session.commit();
+
+            for (int i = 0; i < 5; i++)
+            {
+               ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
+
+               assertTrue(messageLarge.getBooleanProperty("TestLarge"));
+
+               Assert.assertNotNull(messageLarge);
+
+               ByteArrayOutputStream bout = new ByteArrayOutputStream();
+               messageLarge.acknowledge();
+               messageLarge.saveToOutputStream(bout);
+               byte[] body = bout.toByteArray();
+               assertEquals(numberOfBytesBigMessage, body.length);
+               for (int bi = 0; bi < body.length; bi++)
+               {
+                  assertEquals(getSamplebyte(bi), body[bi]);
+               }
+
+               if (i < 4)
+                  session.rollback();
+               else
+                  session.commit();
+            }
+
+            for (int i = 0; i < 100; i++)
+            {
+               ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+
+               Assert.assertNotNull(message2);
+
+               message2.acknowledge();
+
+               Assert.assertNotNull(message2);
+            }
+
+            session.commit();
+
+            consumer.close();
+
+            session.close();
+
+         }
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java	2011-04-05 14:47:00 UTC (rev 10454)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java	2011-04-05 16:17:19 UTC (rev 10455)
@@ -98,7 +98,7 @@
       file.open();
       impl = new PageImpl(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
 
-      List<PagedMessage> msgs = impl.read();
+      List<PagedMessage> msgs = impl.read(new NullStorageManager());
 
       Assert.assertEquals(numberOfElements, msgs.size());
 
@@ -115,7 +115,7 @@
                                                                                         .array());
       }
 
-      impl.delete();
+      impl.delete(null);
 
       Assert.assertEquals(0, factory.listFiles(".page").size());
 
@@ -170,7 +170,7 @@
       file.open();
       impl = new PageImpl(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
 
-      List<PagedMessage> msgs = impl.read();
+      List<PagedMessage> msgs = impl.read(new NullStorageManager());
 
       Assert.assertEquals(numberOfElements, msgs.size());
 
@@ -187,7 +187,7 @@
                                                                                         .array());
       }
 
-      impl.delete();
+      impl.delete(null);
 
       Assert.assertEquals(0, factory.listFiles("page").size());
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java	2011-04-05 14:47:00 UTC (rev 10454)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java	2011-04-05 16:17:19 UTC (rev 10455)
@@ -93,7 +93,7 @@
 
       page.open();
 
-      List<PagedMessage> msgs = page.read();
+      List<PagedMessage> msgs = page.read(new NullStorageManager());
 
       page.close();
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-04-05 14:47:00 UTC (rev 10454)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-04-05 16:17:19 UTC (rev 10455)
@@ -13,7 +13,6 @@
 
 package org.hornetq.tests.unit.core.paging.impl;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -58,6 +57,7 @@
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.config.PersistedAddressSetting;
 import org.hornetq.core.persistence.config.PersistedRoles;
+import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.replication.ReplicationManager;
@@ -286,7 +286,7 @@
 
       page.open();
 
-      List<PagedMessage> msg = page.read();
+      List<PagedMessage> msg = page.read(new NullStorageManager());
 
       Assert.assertEquals(numMessages, msg.size());
       Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -374,7 +374,7 @@
 
          page.open();
 
-         List<PagedMessage> msg = page.read();
+         List<PagedMessage> msg = page.read(new NullStorageManager());
 
          page.close();
 
@@ -399,9 +399,9 @@
 
       newPage.open();
 
-      Assert.assertEquals(1, newPage.read().size());
+      Assert.assertEquals(1, newPage.read(new NullStorageManager()).size());
 
-      newPage.delete();
+      newPage.delete(null);
 
       Assert.assertEquals(1, storeImpl.getNumberOfPages());
 
@@ -421,7 +421,7 @@
 
       page.open();
 
-      List<PagedMessage> msgs = page.read();
+      List<PagedMessage> msgs = page.read(new NullStorageManager());
 
       Assert.assertEquals(1, msgs.size());
 
@@ -603,7 +603,7 @@
       for (Page page : readPages)
       {
          page.open();
-         List<PagedMessage> msgs = page.read();
+         List<PagedMessage> msgs = page.read(new NullStorageManager());
          page.close();
 
          for (PagedMessage msg : msgs)
@@ -678,7 +678,7 @@
 
          page.open();
 
-         List<PagedMessage> msgs = page.read();
+         List<PagedMessage> msgs = page.read(new NullStorageManager());
 
          page.close();
 
@@ -696,7 +696,7 @@
       }
 
       lastPage.open();
-      List<PagedMessage> lastMessages = lastPage.read();
+      List<PagedMessage> lastMessages = lastPage.read(new NullStorageManager());
       lastPage.close();
       Assert.assertEquals(1, lastMessages.size());
 
@@ -856,7 +856,7 @@
                   if (page != null)
                   {
                      page.open();
-                     List<PagedMessage> messages = page.read();
+                     List<PagedMessage> messages = page.read(new NullStorageManager());
 
                      for (PagedMessage pgmsg : messages)
                      {
@@ -868,7 +868,7 @@
                      }
 
                      page.close();
-                     page.delete();
+                     page.delete(null);
                   }
                   else
                   {



More information about the hornetq-commits mailing list