[hornetq-commits] JBoss hornetq SVN: r10243 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/cursor/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Feb 22 20:30:43 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-02-22 20:30:42 -0500 (Tue, 22 Feb 2011)
New Revision: 10243

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingStore.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
JBPAPP-5973 - fixing empty files after paging

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingStore.java	2011-02-23 00:58:50 UTC (rev 10242)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingStore.java	2011-02-23 01:30:42 UTC (rev 10243)
@@ -72,6 +72,8 @@
 
    Page createPage(final int page) throws Exception;
    
+   boolean checkPage(final int page) throws Exception;
+   
    PagingManager getPagingManager();
    
    PageCursorProvider getCursorProvier();

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-02-23 00:58:50 UTC (rev 10242)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-02-23 01:30:42 UTC (rev 10243)
@@ -438,6 +438,11 @@
             cache = softCache.get(pageId);
             if (cache == null)
             {
+               if (!pagingStore.checkPage((int)pageId))
+               {
+                  return null;
+               }
+
                cache = createPageCache(pageId);
                needToRead = true;
                // anyone reading from this cache will have to wait reading to finish first
@@ -464,9 +469,7 @@
                {
                   pdgMessage.initMessage(storageManager);
                }
-
                cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()]));
-
             }
             finally
             {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-02-23 00:58:50 UTC (rev 10242)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-02-23 01:30:42 UTC (rev 10243)
@@ -319,13 +319,8 @@
 
       PageCache cache = cursorProvider.getPageCache(pos);
 
-      if (cache == null)
+      if (cache == null || (!cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages()))
       {
-         return null;
-      }
-
-      if (!cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages())
-      {
          retPos = pos.nextPage();
 
          cache = cursorProvider.getPageCache(retPos);

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-02-23 00:58:50 UTC (rev 10242)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-02-23 01:30:42 UTC (rev 10243)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.paging.impl;
 
+import java.io.File;
 import java.text.DecimalFormat;
 import java.util.Collections;
 import java.util.HashSet;
@@ -560,6 +561,13 @@
    {
       return currentPage;
    }
+   
+   public boolean checkPage(final int pageNumber)
+   {
+      String fileName = createFileName(pageNumber);
+      SequentialFile file = fileFactory.createSequentialFile(fileName, 1);
+      return file.exists();
+   }
 
    public Page createPage(final int pageNumber) throws Exception
    {

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-02-23 00:58:50 UTC (rev 10242)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-02-23 01:30:42 UTC (rev 10243)
@@ -29,6 +29,7 @@
 import junit.framework.AssertionFailedError;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientConsumer;
@@ -454,7 +455,6 @@
          try
          {
             server.stop();
-            // System.exit(-1);
          }
          catch (Throwable ignored)
          {
@@ -2787,7 +2787,154 @@
          }
       }
    }
+   
+   
+   public void testPageAndDepageRapidly() throws Exception
+   {
+      boolean persistentMessages = true;
 
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+      config.setJournalFileSize(10 * 1024 * 1024);
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          512 * 1024,
+                                          1024 * 1024,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 51527;
+
+      final int numberOfMessages = 200;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         final ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(true, true);
+
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+         
+         final AtomicInteger errors = new AtomicInteger(0);
+         
+         Thread consumeThread = new Thread()
+         {
+            public void run()
+            {
+               ClientSession sessionConsumer = null;
+               try
+               {
+                  sessionConsumer = sf.createSession(false, false);
+                  sessionConsumer.start();
+                  
+                  ClientConsumer cons = sessionConsumer.createConsumer(ADDRESS);
+                  
+                  for (int i = 0; i < numberOfMessages; i++)
+                  {
+                     ClientMessage msg = cons.receive(PagingTest.RECEIVE_TIMEOUT);
+                     System.out.println("Message " + i + " consumed");
+                     assertNotNull(msg);
+                     msg.acknowledge();
+                     
+                     if (i % 20 == 0)
+                     {
+                        System.out.println("Commit consumer");
+                        sessionConsumer.commit();
+                     }
+                  }
+                  sessionConsumer.commit();
+               }
+               catch (Throwable e)
+               {
+                  e.printStackTrace();
+                  errors.incrementAndGet();
+               }
+               finally
+               {
+                  try
+                  {
+                     sessionConsumer.close();
+                  }
+                  catch (HornetQException e)
+                  {
+                     e.printStackTrace();
+                     errors.incrementAndGet();
+                  }
+               }
+               
+            }
+         };
+         
+         consumeThread.start();
+
+         ClientMessage message = null;
+
+         byte[] body = new byte[messageSize];
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = session.createMessage(persistentMessages);
+            
+            System.out.println("Message " + i + " sent");
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producer.send(message);
+            
+            Thread.sleep(50);
+         }
+
+         
+         consumeThread.join();
+         
+         long timeout = System.currentTimeMillis() + 5000;
+         
+         while (System.currentTimeMillis() < timeout && (server.getPagingManager().getPageStore(ADDRESS).isPaging() || server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages() != 1))
+         {
+            Thread.sleep(1);
+         }
+
+         // It's async, so need to wait a bit for it happening
+         assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+         
+         assertEquals(1, server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages());
+
+         sf.close();
+
+         locator.close();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------



More information about the hornetq-commits mailing list