[hornetq-commits] JBoss hornetq SVN: r9680 - in branches/Branch_2_1: src/main/org/hornetq/core/paging/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Sep 13 18:39:15 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-09-13 18:39:14 -0400 (Mon, 13 Sep 2010)
New Revision: 9680

Modified:
   branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java
   branches/Branch_2_1/src/main/org/hornetq/core/paging/PagingManager.java
   branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
   branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
   branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
Log:
Fixing a leak on PageTransaction and adding tests on ordering for paging

Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java	2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java	2010-09-13 22:39:14 UTC (rev 9680)
@@ -40,12 +40,12 @@
 
    long getTransactionID();
    
-   void store(StorageManager storageManager,Transaction tx) throws Exception;
+   void store(StorageManager storageManager, PagingManager pagingManager, Transaction tx) throws Exception;
    
-   void storeUpdate(StorageManager storageManager, Transaction tx, int depages) throws Exception;
+   void storeUpdate(StorageManager storageManager, PagingManager pagingManager, Transaction tx, int depages) throws Exception;
 
    // To be used after the update was stored or reload
-   void update(int update, StorageManager storageManager);
+   void update(int update, StorageManager storageManager, PagingManager pagingManager);
 
    void increment();
 

Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/PagingManager.java	2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/PagingManager.java	2010-09-13 22:39:14 UTC (rev 9680)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.paging;
 
+import java.util.Map;
+
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.postoffice.PostOffice;
@@ -67,6 +69,8 @@
     * @param transactionID
     */
    void removeTransaction(long transactionID);
+   
+   Map<Long, PageTransactionInfo> getTransactions();
 
    /**
     * Reload previously created PagingStores into memory

Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-09-13 22:39:14 UTC (rev 9680)
@@ -20,6 +20,7 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.TransactionOperation;
@@ -56,6 +57,7 @@
 
    public PageTransactionInfoImpl(final long transactionID)
    {
+      this();
       this.transactionID = transactionID;
       countDownCompleted = new CountDownLatch(1);
    }
@@ -81,7 +83,7 @@
       return transactionID;
    }
 
-   public void update(final int update, final StorageManager storageManager)
+   public void update(final int update, final StorageManager storageManager, PagingManager pagingManager)
    {
       int sizeAfterUpdate = numberOfMessages.addAndGet(-update);
       if (sizeAfterUpdate == 0 && storageManager != null)
@@ -95,6 +97,11 @@
             log.warn("Can't delete page transaction id=" + this.recordID);
          }
       }
+      
+      if (sizeAfterUpdate == 0 && pagingManager != null)
+      {
+         pagingManager.removeTransaction(this.transactionID);
+      }
    }
 
    public void increment()
@@ -149,7 +156,7 @@
       }
    }
 
-   public void store(final StorageManager storageManager, final Transaction tx) throws Exception
+   public void store(final StorageManager storageManager, PagingManager pagingManager, final Transaction tx) throws Exception
    {
       storageManager.storePageTransaction(tx.getID(), this);
    }
@@ -157,7 +164,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.PageTransactionInfo#storeUpdate(org.hornetq.core.persistence.StorageManager, org.hornetq.core.transaction.Transaction, int)
     */
-   public void storeUpdate(final StorageManager storageManager, final Transaction tx, final int depages) throws Exception
+   public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx, final int depages) throws Exception
    {
       storageManager.updatePageTransaction(tx.getID(), this, depages);
       
@@ -187,7 +194,7 @@
          
          public void afterCommit(Transaction tx)
          {
-            pgToUpdate.update(depages, storageManager);
+            pgToUpdate.update(depages, storageManager, pagingManager);
          }
       });
    }

Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java	2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java	2010-09-13 22:39:14 UTC (rev 9680)
@@ -14,6 +14,7 @@
 package org.hornetq.core.paging.impl;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -155,7 +156,16 @@
    {
       return transactions.get(id);
    }
+   
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.PagingManager#getTransactions()
+    */
+   public Map<Long, PageTransactionInfo> getTransactions()
+   {
+      return transactions;
+   }
 
+
    // HornetQComponent implementation
    // ------------------------------------------------------------------------------------------------
 

Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-09-13 22:39:14 UTC (rev 9680)
@@ -1059,7 +1059,7 @@
          // This will set the journal transaction to commit;
          depageTransaction.setContainsPersistent();
          
-         entry.getKey().storeUpdate(storageManager, depageTransaction, entry.getValue().intValue());
+         entry.getKey().storeUpdate(storageManager, this.pagingManager, depageTransaction, entry.getValue().intValue());
       }
 
       depageTransaction.commit();

Modified: branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-09-13 22:39:14 UTC (rev 9680)
@@ -913,7 +913,7 @@
                   
                   PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX);
                   
-                  pageTX.update(pageUpdate.recods, null);
+                  pageTX.update(pageUpdate.recods, null, null);
                }
                else
                {

Modified: branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-09-13 22:39:14 UTC (rev 9680)
@@ -1267,7 +1267,7 @@
                      store.sync();
                   }
 
-                  pageTransaction.store(storageManager, tx);
+                  pageTransaction.store(storageManager, pagingManager, tx);
                }
             }
          }

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-09-13 22:39:14 UTC (rev 9680)
@@ -34,6 +34,7 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.DivertConfiguration;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
 import org.hornetq.core.paging.impl.TestSupportPageStore;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
@@ -163,6 +164,10 @@
 
             for (int i = 0; i < numberOfMessages; i++)
             {
+               if (i % 500 == 0)
+               {
+                  session.commit();
+               }
                message = session.createMessage(true);
 
                HornetQBuffer bodyLocal = message.getBodyBuffer();
@@ -240,6 +245,8 @@
                      }
 
                      consumer.close();
+                     
+                     session.close();
                   }
                   catch (Throwable e)
                   {
@@ -260,6 +267,9 @@
          {
             threads[i].join();
          }
+         
+         assertEquals(0, server.getPostOffice().getPagingManager().getTransactions().size());
+
       }
       finally
       {
@@ -812,6 +822,136 @@
 
    }
 
+
+   public void testDepageDuringTransaction4() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+      
+      server.getConfiguration().setJournalSyncNonTransactional(false);
+      server.getConfiguration().setJournalSyncTransactional(false);
+
+      server.start();
+
+      final AtomicInteger errors = new AtomicInteger(0);
+      
+      final int messageSize = 1024; // 1k
+      final int numberOfMessages = 10000;
+
+      try
+      {
+         final ClientSessionFactory sf = createInVMFactory();
+         
+
+         sf.setBlockOnNonDurableSend(true);
+         sf.setBlockOnDurableSend(true);
+         sf.setBlockOnAcknowledge(false);
+
+         final byte[] body = new byte[messageSize];
+         
+         
+         Thread producerThread = new Thread()
+         {
+           public void run()
+           {
+              ClientSession sessionProducer = null;
+              try
+              {
+                 sessionProducer = sf.createSession(false, false);
+                 ClientProducer producer = sessionProducer.createProducer(ADDRESS);
+                 
+                 for (int i = 0 ; i < numberOfMessages; i++)
+                 {
+                    ClientMessage msg = sessionProducer.createMessage(true);
+                    msg.getBodyBuffer().writeBytes(body);
+                    msg.putIntProperty("count", i);
+                    producer.send(msg);
+                    
+                    if (i % 50 == 0 && i != 0)
+                    {
+                       sessionProducer.commit();
+                       //Thread.sleep(500);
+                    }
+                 }
+                 
+                 sessionProducer.commit();
+                 
+                 System.out.println("Producer gone");
+                 
+                 
+                 
+              }
+              catch (Throwable e)
+              {
+                 e.printStackTrace(); // >> junit report
+                 errors.incrementAndGet();
+              }
+              finally
+              {
+                 try
+                 {
+                    if (sessionProducer != null)
+                    {
+                       sessionProducer.close();
+                    }
+                 }
+                 catch (Throwable e)
+                 {
+                    e.printStackTrace();
+                    errors.incrementAndGet();
+                 }
+              }
+           }
+         };
+         
+         ClientSession session = sf.createSession(true, true, 0);
+         session.start();
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+         
+         producerThread.start();
+ 
+         ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+         
+         
+         for (int i = 0 ; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = consumer.receive(500000);
+            assertNotNull(msg);
+            assertEquals(i, msg.getIntProperty("count").intValue());
+            msg.acknowledge();
+            if (i > 0 && i % 10 == 0)
+            {
+               //session.commit();
+            }
+         }
+         //session.commit();
+         
+         session.close();
+         
+         producerThread.join();
+         
+         assertEquals(0, errors.get());
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    public void testPageOnSchedulingNoRestart() throws Exception
    {
       internalTestPageOnScheduling(false);

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-09-13 22:39:14 UTC (rev 9680)
@@ -688,6 +688,154 @@
       Assert.assertEquals(0, storeImpl.getAddressSize());
    }
 
+   public void testOrderOnPaging() throws Throwable
+   {
+      clearData();
+      SequentialFileFactory factory = new NIOSequentialFileFactory(this.getPageDir());
+
+      PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
+
+      final int MAX_SIZE = 1024 * 10;
+
+      AddressSettings settings = new AddressSettings();
+      settings.setPageSizeBytes(MAX_SIZE);
+      settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+
+      final TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+                                                                 createMockManager(),
+                                                                 createStorageManagerMock(),
+                                                                 createPostOfficeMock(),
+                                                                 factory,
+                                                                 storeFactory,
+                                                                 new SimpleString("test"),
+                                                                 settings,
+                                                                 executor,
+                                                                 true);
+
+      storeImpl.start();
+
+      Assert.assertEquals(0, storeImpl.getNumberOfPages());
+
+      // Marked the store to be paged
+      storeImpl.startPaging();
+
+      final CountDownLatch producedLatch = new CountDownLatch(1);
+
+      Assert.assertEquals(1, storeImpl.getNumberOfPages());
+
+      final SimpleString destination = new SimpleString("test");
+
+      final long NUMBER_OF_MESSAGES = 100000;
+
+      final List<Throwable> errors = new ArrayList<Throwable>();
+
+      class WriterThread extends Thread
+      {
+
+         public WriterThread()
+         {
+            super("PageWriter");
+         }
+
+         @Override
+         public void run()
+         {
+
+            try
+            {
+               for (long i = 0; i < NUMBER_OF_MESSAGES; i++)
+               {
+                  // Each thread will Keep paging until all the messages are depaged.
+                  // This is possible because the depage thread is not actually reading the pages.
+                  // Just using the internal API to remove it from the page file system
+                  ServerMessage msg = createMessage(i, storeImpl, destination, createRandomBuffer(i, 1024));
+                  msg.putLongProperty("count", i);
+                  while (!storeImpl.page(msg))
+                  {
+                     storeImpl.startPaging();
+                  }
+
+                  if (i == 0)
+                  {
+                     producedLatch.countDown();
+                  }
+               }
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+               errors.add(e);
+            }
+         }
+      }
+
+      class ReaderThread extends Thread
+      {
+         public ReaderThread()
+         {
+            super("PageReader");
+         }
+
+         @Override
+         public void run()
+         {
+            try
+            {
+
+               long msgsRead = 0;
+
+               while (msgsRead < NUMBER_OF_MESSAGES)
+               {
+                  Page page = storeImpl.depage();
+                  if (page != null)
+                  {
+                     page.open();
+                     List<PagedMessage> messages = page.read();
+ 
+                     for (PagedMessage pgmsg : messages)
+                     {
+                        ServerMessage msg = pgmsg.getMessage(null);
+
+                        assertEquals(msgsRead++, msg.getMessageID());
+
+                        assertEquals(msg.getMessageID(), msg.getLongProperty("count").longValue());
+                     }
+ 
+                     page.close();
+                     page.delete();
+                  }
+                  else
+                  {
+                     System.out.println("Depaged!!!!");
+                     Thread.sleep(500);
+                  }
+               }
+
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+               errors.add(e);
+            }
+         }
+      }
+
+      WriterThread producerThread = new WriterThread();
+      producerThread.start();
+      ReaderThread consumer = new ReaderThread();
+      consumer.start();
+
+      producerThread.join();
+      consumer.join();
+
+      storeImpl.stop();
+
+      for (Throwable e: errors)
+      {
+         throw e;
+      }
+   }
+
    /**
    * @return
    */
@@ -878,6 +1026,15 @@
          return false;
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.paging.PagingManager#getTransactions()
+       */
+      public Map<Long, PageTransactionInfo> getTransactions()
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
    }
 
    class FakeStorageManager implements StorageManager

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2010-09-13 22:39:14 UTC (rev 9680)
@@ -17,6 +17,7 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -35,7 +36,6 @@
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
-import org.hornetq.core.server.JournalType;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.transaction.impl.ResourceManagerImpl;
@@ -312,6 +312,14 @@
          return false;
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.paging.PagingManager#getTransactions()
+       */
+      public Map<Long, PageTransactionInfo> getTransactions()
+      {
+         return null;
+      }
+
    }
 
 }



More information about the hornetq-commits mailing list