[jboss-cvs] JBoss Messaging SVN: r7528 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jul 6 19:33:05 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-07-06 19:33:05 -0400 (Mon, 06 Jul 2009)
New Revision: 7528

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/client/CompactingTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java
Log:
Fixes on compactors

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-07-06 20:00:55 UTC (rev 7527)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-07-06 23:33:05 UTC (rev 7528)
@@ -499,17 +499,17 @@
       try
       {
 
-         wholeFileBuffer = fileFactory.newBuffer((int)file.getFile().size());
+         final int filesize = (int)file.getFile().size();
 
+         wholeFileBuffer = fileFactory.newBuffer((int)filesize);
+
          final int journalFileSize = file.getFile().read(wholeFileBuffer);
 
-         if (journalFileSize != file.getFile().size())
+         if (journalFileSize != filesize)
          {
             throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
          }
 
-         wholeFileBuffer.position(0);
-
          // First long is the ordering timestamp, we just jump its position
          wholeFileBuffer.position(SIZE_HEADER);
 
@@ -1397,12 +1397,11 @@
          compactingLock.writeLock().lock();
          try
          {
+            autoReclaim = false;
 
             // We need to move to the next file, as we need a clear start for negatives and positives counts
-            moveNextFile();
+            moveNextFile(true);
 
-            autoReclaim = false;
-
             // Take the snapshots and replace the structures
 
             dataFilesToProcess.addAll(dataFiles);
@@ -1417,6 +1416,11 @@
 
             dataFiles.clear();
 
+            if (dataFilesToProcess.size() == 0)
+            {
+               return;
+            }
+
             compactor = new JournalCompactor(fileFactory, this, records.keySet(), dataFilesToProcess.get(0).getFileID());
 
             for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
@@ -1438,19 +1442,8 @@
 
          // Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as
          // well
-         JournalFile previousFile = null;
          for (final JournalFile file : dataFilesToProcess)
          {
-            if (previousFile != null)
-            {
-               if (file.getFileID() < previousFile.getFileID())
-               {
-                  // Sanity check, this should never happen
-                  throw new IllegalStateException("DataFiles out of order!");
-               }
-            }
-            previousFile = file;
-
             log.info("Compacting file " + file.getFile().getFileName() + ", internalID = " + file.getFileID());
             readJournalFile(fileFactory, file, compactor);
          }
@@ -1493,6 +1486,8 @@
                dataFiles.addFirst(fileToAdd);
             }
 
+            trace("There are " + dataFiles.size() + " datafiles Now");
+
             // Replay pending commands (including updates, deletes and commits)
 
             localCompactor.replayPendingCommands();
@@ -2001,21 +1996,7 @@
                   log.warn("Could not remove file " + file);
                }
 
-               // FIXME - size() involves a scan!!!
-               if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
-               {
-                  // Re-initialise it
-
-                  JournalFile jf = reinitializeFile(file);
-
-                  freeFiles.add(jf);
-               }
-               else
-               {
-                  file.getFile().open(1);
-
-                  file.getFile().delete();
-               }
+               addFreeFile(file);
             }
          }
       }
@@ -2046,7 +2027,7 @@
 
       long compactMargin = (long)(totalBytes * compactPercentage);
 
-      if (totalLiveSize < compactMargin && compactor == null && dataFiles.length > compactMinFiles)
+      if (totalLiveSize < compactMargin && compactorWait.getCount() == 0 && dataFiles.length > compactMinFiles)
       {
 
          log.info("Compacting being started, numberOfDataFiles = " + dataFiles.length +
@@ -2055,6 +2036,8 @@
                   ", margin to start compacting = " +
                   compactMargin);
 
+         compactorWait.waitCompletion();
+
          compactorWait.up();
 
          // We can't use the executor for the compacting... or we would lock files opening and creation (besides other
@@ -2148,6 +2131,8 @@
     *  It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
    public void debugWait() throws Exception
    {
+      compactorWait.waitCompletion();
+
       fileFactory.testFlush();
 
       for (JournalTransaction tx : transactions.values())
@@ -2230,7 +2215,7 @@
       lock.acquire();
       try
       {
-         moveNextFile();
+         moveNextFile(true);
          debugWait();
       }
       finally
@@ -2279,6 +2264,11 @@
 
       try
       {
+         while (!compactorWait.waitCompletion(60000))
+         {
+            log.warn("Waiting the compactor to finish its operations");
+         }
+
          filesExecutor.shutdown();
 
          if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
@@ -2286,14 +2276,9 @@
             log.warn("Couldn't stop journal executor after 60 seconds");
          }
 
-         while (!compactorWait.waitCompletion(60000))
-         {
-            log.warn("Waiting the compactor to finish its operations");
-         }
-
          fileFactory.stop();
 
-         if (currentFile != null)
+         if (currentFile != null && currentFile.getFile().isOpen())
          {
             currentFile.getFile().close();
          }
@@ -2316,13 +2301,6 @@
       finally
       {
          lock.release();
-         try
-         {
-            compactingLock.writeLock().unlock();
-         }
-         catch (Throwable ignored)
-         {
-         }
       }
    }
 
@@ -2342,8 +2320,7 @@
    {
       for (JournalFile file : oldFiles)
       {
-         dataFiles.remove(file);
-         freeFiles.add(reinitializeFile(file));
+         addFreeFile(file);
       }
 
       for (JournalFile file : newFiles)
@@ -2371,6 +2348,29 @@
    // Private
    // -----------------------------------------------------------------------------
 
+   /**
+    * @param file
+    * @throws Exception
+    */
+   private void addFreeFile(JournalFile file) throws Exception
+   {
+      // FIXME - size() involves a scan!!!
+      if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+      {
+         // Re-initialise it
+
+         JournalFile jf = reinitializeFile(file);
+
+         freeFiles.add(jf);
+      }
+      else
+      {
+         file.getFile().open(1);
+
+         file.getFile().delete();
+      }
+   }
+
    private void checkReclaimStatus() throws Exception
    {
       reclaimer.scan(getDataFiles());
@@ -2558,7 +2558,7 @@
          if (!currentFile.getFile().fits(size))
          {
             currentFile.getFile().unlockBuffer();
-            moveNextFile();
+            moveNextFile(false);
             currentFile.getFile().lockBuffer();
 
             // The same check needs to be done at the new file also
@@ -2699,12 +2699,11 @@
    }
 
    // You need to guarantee lock.acquire() before calling this method
-   private void moveNextFile() throws InterruptedException
+   private void moveNextFile(boolean synchronous) throws InterruptedException
    {
-      // Asynchronously close the file
-      closeFile(currentFile);
+      closeFile(currentFile, synchronous);
 
-      currentFile = enqueueOpenFile();
+      currentFile = enqueueOpenFile(synchronous);
 
       if (trace)
       {
@@ -2719,14 +2718,14 @@
     * <p>In case there are no cached opened files, this method will block until the file was opened,
     * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as JBM).</p> 
     * */
-   private JournalFile enqueueOpenFile() throws InterruptedException
+   private JournalFile enqueueOpenFile(boolean synchronous) throws InterruptedException
    {
       if (trace)
       {
          trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
       }
 
-      filesExecutor.execute(new Runnable()
+      Runnable run = new Runnable()
       {
          public void run()
          {
@@ -2739,10 +2738,19 @@
                log.error(e.getMessage(), e);
             }
          }
-      });
+      };
 
-      if (autoReclaim)
+      if (synchronous)
       {
+         run.run();
+      }
+      else
+      {
+         filesExecutor.execute(run);
+      }
+
+      if (autoReclaim && !synchronous)
+      {
          filesExecutor.execute(new Runnable()
          {
             public void run()
@@ -2827,12 +2835,12 @@
       return nextOpenedFile;
    }
 
-   private void closeFile(final JournalFile file)
+   private void closeFile(final JournalFile file, boolean synchronous)
    {
       fileFactory.deactivate(file.getFile());
       pendingCloseFiles.add(file);
 
-      filesExecutor.execute(new Runnable()
+      Runnable run = new Runnable()
       {
          public void run()
          {
@@ -2855,7 +2863,17 @@
                compactingLock.readLock().unlock();
             }
          }
-      });
+      };
+
+      if (synchronous)
+      {
+         run.run();
+      }
+      else
+      {
+         filesExecutor.execute(run);
+      }
+
    }
 
    private JournalTransaction getTransactionInfo(final long txID)

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/CompactingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/CompactingTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/CompactingTest.java	2009-07-06 23:33:05 UTC (rev 7528)
@@ -0,0 +1,326 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+
+/**
+ * A CompactingTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class CompactingTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private static final String AD1 = "ad1";
+
+   private static final String AD2 = "ad2";
+
+   private static final String Q1 = "q1";
+
+   private static final String Q2 = "q2";
+
+   private MessagingServer server;
+
+   private ClientSessionFactory sf;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testMultiProducerAndCompactNIO() throws Throwable
+   {
+      internalTestMultiProducer(JournalType.NIO);
+   }
+
+   public void testMultiProducerAndCompactAIO() throws Throwable
+   {
+      internalTestMultiProducer(JournalType.ASYNCIO);
+   }
+
+   public void internalTestMultiProducer(JournalType journalType) throws Throwable
+   {
+
+      setupServer(journalType);
+
+      final AtomicInteger numberOfMessages = new AtomicInteger(0);
+      final int NUMBER_OF_FAST_MESSAGES = 100000;
+      final int SLOW_INTERVAL = 100;
+
+      final CountDownLatch latchReady = new CountDownLatch(2);
+      final CountDownLatch latchStart = new CountDownLatch(1);
+
+      class FastProducer extends Thread
+      {
+         Throwable e;
+
+         FastProducer()
+         {
+            super("Fast-Thread");
+         }
+
+         public void run()
+         {
+            ClientSession session = null;
+            ClientSession sessionSlow = null;
+            latchReady.countDown();
+            try
+            {
+               latchStart.await();
+               session = sf.createSession(true, true);
+               sessionSlow = sf.createSession(false, false);
+               ClientProducer prod = session.createProducer(AD2);
+               ClientProducer slowProd = sessionSlow.createProducer(AD1);
+               for (int i = 0; i < NUMBER_OF_FAST_MESSAGES; i++)
+               {
+                  if (i % SLOW_INTERVAL == 0)
+                  {
+                     if (numberOfMessages.incrementAndGet() % 5 == 0)
+                     {
+                        sessionSlow.commit();
+                     }
+                     slowProd.send(session.createClientMessage(true));
+                  }
+                  ClientMessage msg = session.createClientMessage(true);
+                  msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+                  prod.send(msg);
+               }
+               sessionSlow.commit();
+            }
+            catch (Throwable e)
+            {
+               this.e = e;
+            }
+            finally
+            {
+               try
+               {
+                  session.close();
+               }
+               catch (Throwable e)
+               {
+                  this.e = e;
+               }
+               try
+               {
+                  sessionSlow.close();
+               }
+               catch (Throwable e)
+               {
+                  this.e = e;
+               }
+            }
+         }
+      }
+
+      class FastConsumer extends Thread
+      {
+         Throwable e;
+
+         FastConsumer()
+         {
+            super("Fast-Consumer");
+         }
+
+         public void run()
+         {
+            ClientSession session = null;
+            latchReady.countDown();
+            try
+            {
+               latchStart.await();
+               session = sf.createSession(true, true);
+               session.start();
+               ClientConsumer cons = session.createConsumer(Q2);
+               for (int i = 0; i < NUMBER_OF_FAST_MESSAGES; i++)
+               {
+                  ClientMessage msg = cons.receive(60 * 1000);
+                  msg.acknowledge();
+               }
+
+               assertNull(cons.receiveImmediate());
+            }
+            catch (Throwable e)
+            {
+               this.e = e;
+            }
+            finally
+            {
+               try
+               {
+                  session.close();
+               }
+               catch (Throwable e)
+               {
+                  this.e = e;
+               }
+            }
+         }
+      }
+
+      FastConsumer f1 = new FastConsumer();
+      f1.start();
+
+      FastProducer p1 = new FastProducer();
+      p1.start();
+
+      latchReady.await();
+      latchStart.countDown();
+
+      p1.join();
+
+      if (p1.e != null)
+      {
+         throw p1.e;
+      }
+
+      f1.join();
+
+      if (f1.e != null)
+      {
+         throw f1.e;
+      }
+
+      sf.close();
+
+      server.stop();
+
+      setupServer(journalType);
+
+      ClientSession sess = sf.createSession(true, true);
+
+      ClientConsumer cons = sess.createConsumer(Q1);
+
+      sess.start();
+
+      for (int i = 0; i < numberOfMessages.intValue(); i++)
+      {
+         ClientMessage msg = cons.receive(10000);
+         assertNotNull(msg);
+         msg.acknowledge();
+      }
+
+      assertNull(cons.receiveImmediate());
+
+      cons.close();
+
+      cons = sess.createConsumer(Q2);
+
+      assertNull(cons.receive(100));
+
+      sess.close();
+
+   }
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      clearData();
+   }
+
+   /**
+    * @throws Exception
+    * @throws MessagingException
+    */
+   private void setupServer(JournalType journalType) throws Exception, MessagingException
+   {
+      Configuration config = createDefaultConfig();
+      config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
+
+      config.setJournalType(journalType);
+
+      config.setJournalCompactMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_MIN_FILES);
+      config.setJournalCompactPercentage(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE);
+
+      server = createServer(true, config);
+
+      server.start();
+
+      sf = createInVMFactory();
+
+      ClientSession sess = sf.createSession();
+
+      try
+      {
+         sess.createQueue(AD1, Q1, true);
+      }
+      catch (Exception ignored)
+      {
+      }
+
+      try
+      {
+         sess.createQueue(AD2, Q2, true);
+      }
+      catch (Exception ignored)
+      {
+      }
+
+      sess.close();
+
+      sf = createInVMFactory();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      sf.close();
+
+      server.stop();
+
+      // super.tearDown();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java	2009-07-06 20:00:55 UTC (rev 7527)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java	2009-07-06 23:33:05 UTC (rev 7528)
@@ -301,8 +301,6 @@
          }
       }
 
-      System.out.println("Number of Files: " + journal.getDataFilesCount());
-
       if (regularAdd)
       {
          for (int i = 0; i < NUMBER_OF_RECORDS; i++)
@@ -555,8 +553,6 @@
          update(i);
       }
 
-      System.out.println("Number of Files: " + journal.getDataFilesCount());
-
       for (int i = 0; i < NUMBER_OF_RECORDS; i++)
       {
          if (!(i % 10 == 0))
@@ -696,15 +692,8 @@
          journal.forceMoveNextFile();
       }
 
-      System.out.println("DataFiles = " + journal.getDataFilesCount());
-
       JournalFile files[] = journal.getDataFiles();
 
-      for (JournalFile file : files)
-      {
-         System.out.println("Size: " + file.getLiveSize());
-      }
-
       stopJournal();
       createJournal();
       startJournal();
@@ -712,17 +701,10 @@
 
       journal.forceMoveNextFile();
 
-      System.out.println("DataFiles = " + journal.getDataFilesCount());
-
       JournalFile files2[] = journal.getDataFiles();
 
       assertEquals(files.length, files2.length);
 
-      for (JournalFile file : files2)
-      {
-         System.out.println("Size: " + file.getLiveSize());
-      }
-
       for (int i = 0; i < files.length; i++)
       {
          assertEquals(expectedSizes.get(i).intValue(), files[i].getLiveSize());
@@ -742,11 +724,6 @@
 
       for (JournalFile file : files3)
       {
-         System.out.println("Size: " + file.getLiveSize());
-      }
-
-      for (JournalFile file : files3)
-      {
          assertEquals(0, file.getLiveSize());
       }
 




More information about the jboss-cvs-commits mailing list