[hornetq-commits] JBoss hornetq SVN: r9300 - in trunk: tests/src/org/hornetq/tests/integration/journal and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Jun 5 03:02:37 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-06-05 03:02:37 -0400 (Sat, 05 Jun 2010)
New Revision: 9300

Modified:
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
   trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-407 improving journal shutdown on compacting and avoid a rare test failure on JournalRestartStressTest

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-06-03 22:15:48 UTC (rev 9299)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-06-05 07:02:37 UTC (rev 9300)
@@ -2277,6 +2277,11 @@
          // compacting is disabled
          return;
       }
+      
+      if (state != JournalImpl.STATE_LOADED)
+      {
+         return;
+      }
 
       JournalFile[] dataFiles = getDataFiles();
 
@@ -2536,6 +2541,16 @@
 
       try
       {
+
+         state = JournalImpl.STATE_STOPPED;
+         
+         compactorExecutor.shutdown();
+         
+         if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS))
+         {
+            JournalImpl.log.warn("Couldn't stop compactor executor after 120 seconds");
+         }
+         
          filesExecutor.shutdown();
 
          if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
@@ -2564,8 +2579,6 @@
          freeFiles.clear();
 
          openedFiles.clear();
-
-         state = JournalImpl.STATE_STOPPED;
       }
       finally
       {

Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2010-06-03 22:15:48 UTC (rev 9299)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2010-06-05 07:02:37 UTC (rev 9300)
@@ -23,6 +23,8 @@
 import junit.framework.Assert;
 
 import org.hornetq.api.core.Pair;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.AbstractJournalUpdateTask;
@@ -181,7 +183,58 @@
    {
       internalCompactTest(false, false, true, true, false, false, false, false, false, false, true, true, true);
    }
+   
+   public void testCompactFirstFileReclaimed() throws Exception
+   {
 
+      setup(2, 60 * 1024, false);
+
+      final byte recordType = (byte)0;
+      
+      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO);
+      
+      journal.start();
+      
+      journal.loadInternalOnly();
+      
+      journal.appendAddRecord(1, recordType, "test".getBytes(), true);
+      
+      journal.forceMoveNextFile();
+      
+      
+      journal.appendUpdateRecord(1, recordType, "update".getBytes(), true);
+      
+      journal.appendDeleteRecord(1, true);
+      
+      journal.appendAddRecord(2, recordType, "finalRecord".getBytes(), true);
+
+      
+      for (int i = 10 ; i < 100; i++)
+      {
+         journal.appendAddRecord(i, recordType, ("tst" + i).getBytes(), true);
+         journal.forceMoveNextFile();
+         journal.appendUpdateRecord(i, recordType, ("uptst" + i).getBytes(), true);
+         journal.appendDeleteRecord(i, true);
+      }
+      
+      journal.compact();
+      
+      journal.stop();
+      
+      List<RecordInfo> records = new ArrayList<RecordInfo>();
+      
+      List<PreparedTransactionInfo> preparedRecords = new ArrayList<PreparedTransactionInfo>();
+      
+      journal.start();
+
+      journal.load(records, preparedRecords, null);
+      
+      assertEquals(1, records.size());
+         
+
+   
+   }
+
    private void internalCompactTest(final boolean preXA, // prepare before compact
                                     final boolean postXA, // prepare after compact
                                     final boolean regularAdd,

Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java	2010-06-03 22:15:48 UTC (rev 9299)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java	2010-06-05 07:02:37 UTC (rev 9300)
@@ -57,18 +57,16 @@
       server2.getConfiguration().setJournalCompactMinFiles(3);
       server2.getConfiguration().setJournalCompactPercentage(50);
 
-      
-      for (int i = 0 ; i < 10; i++)
+      for (int i = 0; i < 10; i++)
       {
          server2.start();
-         
+
          ClientSessionFactory sf = createFactory(false);
          sf.setMinLargeMessageSize(1024 * 1024);
          sf.setBlockOnDurableSend(false);
-   
 
          ClientSession session = sf.createSession(true, true);
-         
+
          try
          {
             session.createQueue("slow-queue", "slow-queue");
@@ -79,8 +77,7 @@
 
          session.start();
          ClientConsumer consumer = session.createConsumer("slow-queue");
-         
-         
+
          while (true)
          {
             System.out.println("Received message from previous");
@@ -91,15 +88,16 @@
             }
             msg.acknowledge();
          }
-            
-          
-   
+
+         session.close();
+
          produceMessages(sf, 30000);
-         
+
          server2.stop();
       }
 
    }
+
    // Package protected ---------------------------------------------
 
    /**
@@ -110,19 +108,17 @@
     * @throws Throwable
     */
    private void produceMessages(final ClientSessionFactory sf, final int NMSGS) throws HornetQException,
-                                                                   InterruptedException,
-                                                                   Throwable
+                                                                               InterruptedException,
+                                                                               Throwable
    {
- 
+
       final int TIMEOUT = 5000;
-      
+
       System.out.println("sending " + NMSGS + " messages");
 
+      final ClientSession sessionSend = sf.createSession(true, true);
 
-      final ClientSession sessionSend = sf.createSession(true, true);
-      
       ClientProducer prod2 = sessionSend.createProducer("slow-queue");
-      
 
       try
       {
@@ -139,6 +135,7 @@
 
       Thread tReceive = new Thread()
       {
+         @Override
          public void run()
          {
             try
@@ -149,8 +146,8 @@
                {
                   if (i % 500 == 0)
                   {
-                     double percent = (double)i / (double) NMSGS;
-                     System.out.println("msgs " + i + " of "  + NMSGS +  ", " + (int)(percent * 100) + "%");
+                     double percent = (double)i / (double)NMSGS;
+                     System.out.println("msgs " + i + " of " + NMSGS + ", " + (int)(percent * 100) + "%");
                      Thread.sleep(100);
                   }
 
@@ -179,11 +176,14 @@
       for (int i = 0; i < NMSGS; i++)
       {
          ClientMessage msg = sessionSend.createMessage(true);
-         
+
          int size = RandomUtil.randomPositiveInt() % 10024;
 
-         if (size == 0) size = 10 * 1024;
-         
+         if (size == 0)
+         {
+            size = 10 * 1024;
+         }
+
          byte[] buffer = new byte[size];
 
          random.nextBytes(buffer);
@@ -191,7 +191,7 @@
          msg.getBodyBuffer().writeBytes(buffer);
 
          prod.send(msg);
-         
+
          if (i % 5000 == 0)
          {
             prod2.send(msg);
@@ -203,6 +203,7 @@
 
       sessionReceive.close();
       sessionSend.close();
+      sf.close();
 
       for (Throwable e : errors)
       {



More information about the hornetq-commits mailing list