[hornetq-commits] JBoss hornetq SVN: r7948 - in trunk: src/main/org/hornetq/core/journal and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Sep 9 23:14:43 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-09-09 23:14:38 -0400 (Wed, 09 Sep 2009)
New Revision: 7948

Added:
   trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
Modified:
   trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java
   trunk/src/main/org/hornetq/core/journal/SequentialFile.java
   trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
   trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTestBase.java
   trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageTestBase.java
   trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
HORNETQ-124 - Fixing races on compacting

Modified: trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java	2009-09-09 16:37:43 UTC (rev 7947)
+++ trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java	2009-09-10 03:14:38 UTC (rev 7948)
@@ -179,12 +179,12 @@
       this.bufferObserver = observer;
    }
 
-   public void lock()
+   public void disableAutoFlush()
    {
       lock.lock();
    }
 
-   public void unlock()
+   public void enableAutoFlush()
    {
       lock.unlock();
    }

Modified: trunk/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFile.java	2009-09-09 16:37:43 UTC (rev 7947)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFile.java	2009-09-10 03:14:38 UTC (rev 7948)
@@ -81,8 +81,8 @@
    
    void renameTo(String newFileName) throws Exception;
 
-   void lockBuffer();
+   void disableAutoFlush();
 
-   void unlockBuffer();
+   void enableAutoFlush();
 
 }

Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-09-09 16:37:43 UTC (rev 7947)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-09-10 03:14:38 UTC (rev 7948)
@@ -127,14 +127,14 @@
       return timedBuffer.checkSize(size);
    }
 
-   public void lockBuffer()
+   public void disableAutoFlush()
    {
-      timedBuffer.lock();
+      timedBuffer.disableAutoFlush();
    }
 
-   public void unlockBuffer()
+   public void enableAutoFlush()
    {
-      timedBuffer.unlock();
+      timedBuffer.enableAutoFlush();
    }
 
    public synchronized void close() throws Exception

Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2009-09-09 16:37:43 UTC (rev 7947)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2009-09-10 03:14:38 UTC (rev 7948)
@@ -91,14 +91,14 @@
    public void activate(SequentialFile file)
    {
       final AIOSequentialFile sequentialFile = (AIOSequentialFile)file;
-      timedBuffer.lock();
+      timedBuffer.disableAutoFlush();
       try
       {
          sequentialFile.setTimedBuffer(timedBuffer);
       }
       finally
       {
-         timedBuffer.unlock();
+         timedBuffer.enableAutoFlush();
       }
    }
 

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java	2009-09-09 16:37:43 UTC (rev 7947)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java	2009-09-10 03:14:38 UTC (rev 7948)
@@ -548,9 +548,8 @@
       ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize());
       writingChannel = ChannelBuffers.wrappedBuffer(bufferWrite);
 
-      currentFile = journal.getFile(false, false, false);
+      currentFile = journal.getFile(false, false, false, true);
       sequentialFile = currentFile.getFile();
-      sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
             
       sequentialFile.open(1);
       fileID = nextOrderingID++;

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-09-09 16:37:43 UTC (rev 7947)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-09-10 03:14:38 UTC (rev 7948)
@@ -202,7 +202,8 @@
 
    private ExecutorService filesExecutor = null;
 
-   private final Semaphore lock = new Semaphore(1);
+   // Lock used during the append of records
+   private final Semaphore lockAppend = new Semaphore(1);
 
    /** We don't lock the journal while compacting, however we need to lock it while taking and updating snapshots */
    private final ReadWriteLock compactingLock = new ReentrantReadWriteLock();
@@ -835,7 +836,7 @@
 
          callback = getSyncCallback(sync);
 
-         lock.acquire();
+         lockAppend.acquire();
          try
          {
             JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
@@ -844,7 +845,7 @@
          }
          finally
          {
-            lock.release();
+            lockAppend.release();
          }
       }
       finally
@@ -895,7 +896,7 @@
 
          callback = getSyncCallback(sync);
 
-         lock.acquire();
+         lockAppend.acquire();
          try
          {
             JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
@@ -913,7 +914,7 @@
          }
          finally
          {
-            lock.release();
+            lockAppend.release();
          }
       }
       finally
@@ -962,7 +963,7 @@
 
          callback = getSyncCallback(sync);
 
-         lock.acquire();
+         lockAppend.acquire();
          try
          {
             JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
@@ -981,7 +982,7 @@
          }
          finally
          {
-            lock.release();
+            lockAppend.release();
          }
       }
       finally
@@ -1024,7 +1025,7 @@
 
          JournalTransaction tx = getTransactionInfo(txID);
 
-         lock.acquire();
+         lockAppend.acquire();
          try
          {
             JournalFile usedFile = appendRecord(bb, false, false, tx, null);
@@ -1033,7 +1034,7 @@
          }
          finally
          {
-            lock.release();
+            lockAppend.release();
          }
       }
       finally
@@ -1073,7 +1074,7 @@
 
          JournalTransaction tx = getTransactionInfo(txID);
 
-         lock.acquire();
+         lockAppend.acquire();
          try
          {
             JournalFile usedFile = appendRecord(bb, false, false, tx, null);
@@ -1082,7 +1083,7 @@
          }
          finally
          {
-            lock.release();
+            lockAppend.release();
          }
       }
       finally
@@ -1115,7 +1116,7 @@
 
          JournalTransaction tx = getTransactionInfo(txID);
 
-         lock.acquire();
+         lockAppend.acquire();
          try
          {
             JournalFile usedFile = appendRecord(bb, false, false, tx, null);
@@ -1124,7 +1125,7 @@
          }
          finally
          {
-            lock.release();
+            lockAppend.release();
          }
       }
       finally
@@ -1158,6 +1159,8 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
+      compactingLock.readLock().lock();
+
       JournalTransaction tx = getTransactionInfo(txID);
 
       if (sync)
@@ -1165,8 +1168,6 @@
          tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
       }
 
-      compactingLock.readLock().lock();
-
       try
       {
 
@@ -1175,7 +1176,7 @@
 
          writeTransaction(-1, PREPARE_RECORD, txID, tx, transactionData, size, -1, bb);
 
-         lock.acquire();
+         lockAppend.acquire();
          try
          {
             JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
@@ -1184,7 +1185,7 @@
          }
          finally
          {
-            lock.release();
+            lockAppend.release();
          }
 
       }
@@ -1221,10 +1222,10 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
+      compactingLock.readLock().lock();
+
       JournalTransaction tx = transactions.remove(txID);
 
-      compactingLock.readLock().lock();
-
       try
       {
 
@@ -1237,7 +1238,7 @@
 
          writeTransaction(-1, COMMIT_RECORD, txID, tx, null, SIZE_COMPLETE_TRANSACTION_RECORD, -1, bb);
 
-         lock.acquire();
+         lockAppend.acquire();
          try
          {
             JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
@@ -1246,7 +1247,7 @@
          }
          finally
          {
-            lock.release();
+            lockAppend.release();
          }
 
       }
@@ -1291,7 +1292,7 @@
          bb.writeLong(txID);
          bb.writeInt(size);
 
-         lock.acquire();
+         lockAppend.acquire();
          try
          {
             JournalFile usedFile = appendRecord(bb, false, sync, tx, null);
@@ -1300,7 +1301,7 @@
          }
          finally
          {
-            lock.release();
+            lockAppend.release();
          }
 
       }
@@ -1406,6 +1407,8 @@
       try
       {
 
+         log.info("Starting compacting operation on journal");
+
          // We need to guarantee that the journal is frozen for this short time
          // We don't freeze the journal as we compact, only for the short time where we replace records
          compactingLock.writeLock().lock();
@@ -1459,10 +1462,13 @@
 
          Collections.sort(dataFilesToProcess, new JournalFileComparator());
 
+         // This is where most of the work is done, taking most of the time of the compacting routine.
+         // Notice there are no locks while this is being done.
+         
          // Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as
          // well
          for (final JournalFile file : dataFilesToProcess)
-         {            
+         {
             readJournalFile(fileFactory, file, compactor);
          }
 
@@ -1518,7 +1524,7 @@
             {
                if (trace)
                {
-                  trace("Merging pending transaction " + newTransaction + " after compacting to the journal");
+                  trace("Merging pending transaction " + newTransaction + " after compacting the journal");
                }
                JournalTransaction liveTransaction = transactions.get(newTransaction.getId());
                if (liveTransaction == null)
@@ -1541,6 +1547,8 @@
          renameFiles(dataFilesToProcess, newDatafiles);
          deleteControlFile(controlFile);
 
+         log.info("Finished compacting on journal");
+
       }
       finally
       {
@@ -1929,7 +1937,7 @@
          for (int i = 0; i < filesToCreate; i++)
          {
             // Keeping all files opened can be very costly (mainly on AIO)
-            freeFiles.add(createFile(false, false, true));
+            freeFiles.add(createFile(false, false, true, false));
          }
       }
 
@@ -2234,19 +2242,27 @@
    // In some tests we need to force the journal to move to a next file
    public void forceMoveNextFile() throws Exception
    {
-      lock.acquire();
+      compactingLock.readLock().lock();
       try
       {
-         moveNextFile(true);
-         if (autoReclaim)
+         lockAppend.acquire();
+         try
          {
-            checkAndReclaimFiles();
+            moveNextFile(true);
+            if (autoReclaim)
+            {
+               checkAndReclaimFiles();
+            }
+            debugWait();
          }
-         debugWait();
+         finally
+         {
+            lockAppend.release();
+         }
       }
       finally
       {
-         lock.release();
+         compactingLock.readLock().unlock();
       }
    }
 
@@ -2286,7 +2302,7 @@
          throw new IllegalStateException("Journal is already stopped");
       }
 
-      lock.acquire();
+      lockAppend.acquire();
 
       try
       {
@@ -2321,7 +2337,7 @@
       }
       finally
       {
-         lock.release();
+         lockAppend.release();
       }
    }
 
@@ -2501,9 +2517,6 @@
       return recordSize;
    }
 
-   /** 
-    * This method requires bufferControl disabled, or the reads are going to be invalid
-    * */
    private List<JournalFile> orderFiles() throws Exception
    {
 
@@ -2522,7 +2535,7 @@
          file.read(bb);
 
          int fileID = bb.getInt();
-
+         
          fileFactory.releaseBuffer(bb);
 
          bb = null;
@@ -2532,6 +2545,16 @@
             nextFileID.set(fileID);
          }
 
+         int fileNameID = getFileNameID(fileName);
+         
+         // The compactor could create a fileName but use a previously assigned ID.
+         // Because of that we need to take both parts into account
+         if (nextFileID.get() < fileNameID)
+         {
+            nextFileID.set(fileNameID);
+         }
+
+
          orderedFiles.add(new JournalFileImpl(file, fileID));
 
          file.close();
@@ -2571,14 +2594,14 @@
             throw new IllegalArgumentException("Record is too large to store " + size);
          }
 
-         // The buffer on the file can't be flushed or the currentFile could be affected
-         currentFile.getFile().lockBuffer();
+         // Disable auto flush on the timer. The Timer should'nt flush anything 
+         currentFile.getFile().disableAutoFlush();
 
          if (!currentFile.getFile().fits(size))
          {
-            currentFile.getFile().unlockBuffer();
+            currentFile.getFile().enableAutoFlush();
             moveNextFile(false);
-            currentFile.getFile().lockBuffer();
+            currentFile.getFile().disableAutoFlush();
 
             // The same check needs to be done at the new file also
             if (!currentFile.getFile().fits(size))
@@ -2642,10 +2665,24 @@
       }
       finally
       {
-         currentFile.getFile().unlockBuffer();
+         currentFile.getFile().enableAutoFlush();
       }
 
    }
+   
+   /** Get the ID part of the name */
+   private int getFileNameID(String fileName)
+   {
+      try
+      {
+         return Integer.parseInt(fileName.substring(filePrefix.length()+1, fileName.indexOf('.')));
+      }
+      catch (Throwable e)
+      {
+         log.warn("Impossible to get the ID part of the file name " + fileName, e);
+         return 0;
+      }
+   }
 
    /**
     * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
@@ -2653,12 +2690,24 @@
     * @return
     * @throws Exception
     */
-   private JournalFile createFile(final boolean keepOpened, final boolean multiAIO, final boolean fill) throws Exception
+   private JournalFile createFile(final boolean keepOpened,
+                                  final boolean multiAIO,
+                                  final boolean fill,
+                                  final boolean tmpCompact) throws Exception
    {
       int fileID = generateFileID();
 
-      String fileName = filePrefix + "-" + fileID + "." + fileExtension;
+      String fileName;
 
+      if (tmpCompact)
+      {
+         fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
+      }
+      else
+      {
+         fileName = filePrefix + "-" + fileID + "." + fileExtension;
+      }
+
       if (trace)
       {
          trace("Creating file " + fileName);
@@ -2820,7 +2869,7 @@
     * */
    private void pushOpenedFile() throws Exception
    {
-      JournalFile nextOpenedFile = getFile(true, true, true);
+      JournalFile nextOpenedFile = getFile(true, true, true, false);
 
       openedFiles.offer(nextOpenedFile);
    }
@@ -2829,12 +2878,20 @@
     * @return
     * @throws Exception
     */
-   JournalFile getFile(final boolean keepOpened, final boolean multiAIO, final boolean fill) throws Exception
+   JournalFile getFile(final boolean keepOpened,
+                       final boolean multiAIO,
+                       final boolean fill,
+                       final boolean tmpCompactExtension) throws Exception
    {
       JournalFile nextOpenedFile = null;
       try
       {
          nextOpenedFile = freeFiles.remove();
+         if (tmpCompactExtension)
+         {
+            SequentialFile sequentialFile = nextOpenedFile.getFile();
+            sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+         }
       }
       catch (NoSuchElementException ignored)
       {
@@ -2842,7 +2899,7 @@
 
       if (nextOpenedFile == null)
       {
-         nextOpenedFile = createFile(keepOpened, multiAIO, fill);
+         nextOpenedFile = createFile(keepOpened, multiAIO, fill, tmpCompactExtension);
       }
       else
       {
@@ -2951,7 +3008,7 @@
       {
          for (String dataFile : dataFiles)
          {
-            SequentialFile file = fileFactory.createSequentialFile(dataFile, 1);           
+            SequentialFile file = fileFactory.createSequentialFile(dataFile, 1);
             if (file.exists())
             {
                file.delete();
@@ -2960,7 +3017,7 @@
 
          for (String newFile : newFiles)
          {
-            SequentialFile file = fileFactory.createSequentialFile(newFile, 1);           
+            SequentialFile file = fileFactory.createSequentialFile(newFile, 1);
             if (file.exists())
             {
                final String originalName = file.getFileName();
@@ -3179,7 +3236,7 @@
       {
          try
          {
-            lock.acquire();
+            lockAppend.acquire();
 
             HornetQBuffer bb = newBuffer(128 * 1024);
 
@@ -3188,7 +3245,7 @@
                appendRecord(bb, false, false, null, null);
             }
 
-            lock.release();
+            lockAppend.release();
          }
          catch (Exception e)
          {

Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-09-09 16:37:43 UTC (rev 7947)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-09-10 03:14:38 UTC (rev 7948)
@@ -292,14 +292,14 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.journal.SequentialFile#lockBuffer()
     */
-   public void lockBuffer()
+   public void disableAutoFlush()
    {
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.journal.SequentialFile#unlockBuffer()
     */
-   public void unlockBuffer()
+   public void enableAutoFlush()
    {
    }
 

Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTestBase.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTestBase.java	2009-09-09 16:37:43 UTC (rev 7947)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTestBase.java	2009-09-10 03:14:38 UTC (rev 7948)
@@ -47,7 +47,7 @@
  *
  * $Id: MessageImplTestBase.java 2883 2007-07-12 23:36:16Z timfox $
  */
-public class MessageHeaderTestBase extends HornetQServerTestCase
+public abstract class MessageHeaderTestBase extends HornetQServerTestCase
 {
    // Constants -----------------------------------------------------
 

Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageTestBase.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageTestBase.java	2009-09-09 16:37:43 UTC (rev 7947)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageTestBase.java	2009-09-10 03:14:38 UTC (rev 7948)
@@ -29,7 +29,7 @@
  *
  * $Id$
  */
-public class MessageTestBase extends HornetQServerTestCase
+public abstract class MessageTestBase extends HornetQServerTestCase
 {
    // Constants -----------------------------------------------------
 

Modified: trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java	2009-09-09 16:37:43 UTC (rev 7947)
+++ trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java	2009-09-10 03:14:38 UTC (rev 7948)
@@ -47,10 +47,16 @@
 
    private static final String AD2 = "ad2";
 
+   private static final String AD3 = "ad3";
+
    private static final String Q1 = "q1";
 
    private static final String Q2 = "q2";
 
+   private static final String Q3 = "q3";
+
+   private static final int TOT_AD3 = 5000;
+
    private HornetQServer server;
 
    private ClientSessionFactory sf;
@@ -76,6 +82,36 @@
 
       setupServer(journalType);
 
+      ClientSession session = sf.createSession(false, false);
+
+      try
+      {
+         ClientProducer producer = session.createProducer(AD3);
+
+         byte[] buffer = new byte[10 * 1024];
+
+         ClientMessage msg = session.createClientMessage(true);
+         msg.setBody(ChannelBuffers.wrappedBuffer(buffer));
+         for (int i = 0; i < TOT_AD3; i++)
+         {
+            producer.send(msg);
+            if (i % 100 == 0)
+            {
+               session.commit();
+            }
+         }
+
+         session.commit();
+      }
+      finally
+      {
+         session.close();
+      }
+      
+      server.stop();
+      
+      setupServer(journalType);
+      
       final AtomicInteger numberOfMessages = new AtomicInteger(0);
       final int NUMBER_OF_FAST_MESSAGES = 100000;
       final int SLOW_INTERVAL = 100;
@@ -224,7 +260,7 @@
 
       try
       {
-         
+
          sess = sf.createSession(true, true);
 
          ClientConsumer cons = sess.createConsumer(Q1);
@@ -246,6 +282,19 @@
 
          assertNull(cons.receive(100));
 
+         cons.close();
+
+         cons = sess.createConsumer(Q3);
+
+         for (int i = 0; i < TOT_AD3; i++)
+         {
+            ClientMessage msg = cons.receive(60000);
+            assertNotNull(msg);
+            msg.acknowledge();
+         }
+         
+         assertNull(cons.receiveImmediate());
+
       }
       finally
       {
@@ -305,6 +354,14 @@
       {
       }
 
+      try
+      {
+         sess.createQueue(AD3, Q3, true);
+      }
+      catch (Exception ignored)
+      {
+      }
+
       sess.close();
 
       sf = createInVMFactory();

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java	2009-09-09 16:37:43 UTC (rev 7947)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java	2009-09-10 03:14:38 UTC (rev 7948)
@@ -13,9 +13,30 @@
 
 package org.hornetq.tests.integration.jms.client;
 
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
+
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.List;
 
+import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.MapMessage;
 import javax.jms.Message;
@@ -26,7 +47,11 @@
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.jms.client.HornetQMessage;
 import org.hornetq.tests.util.JMSTestBase;
+import org.hornetq.utils.Pair;
 
 /**
  * Receive Messages and resend them, like the bridge would do
@@ -62,6 +87,11 @@
 
          for (int i = 0; i < 10; i++)
          {
+            BytesMessage bm = sess.createBytesMessage();
+            bm.setObjectProperty(HornetQMessage.JMS_HORNETQ_INPUT_STREAM,
+                                 createFakeLargeStream(2 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE));
+            msgs.add(bm);
+
             MapMessage mm = sess.createMapMessage();
             mm.setBoolean("boolean", true);
             mm.setByte("byte", (byte)3);
@@ -125,8 +155,17 @@
 
          sess.commit();
 
-         if (copiedMessage instanceof MapMessage)
+         if (copiedMessage instanceof BytesMessage)
          {
+            BytesMessage copiedBytes = (BytesMessage)copiedMessage;
+
+            for (int i = 0; i < copiedBytes.getBodyLength(); i++)
+            {
+               assertEquals(getSamplebyte(i), copiedBytes.readByte());
+            }
+         }
+         else if (copiedMessage instanceof MapMessage)
+         {
             MapMessage copiedMap = (MapMessage)copiedMessage;
             MapMessage originalMap = (MapMessage)originalMessage;
             assertEquals(originalMap.getString("str"), copiedMap.getString("str"));
@@ -209,7 +248,46 @@
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
+   protected void createCF(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
+                           List<String> jndiBindings) throws Exception
+   {
+      int retryInterval = 1000;
+      double retryIntervalMultiplier = 1.0;
+      int reconnectAttempts = -1;
+      boolean failoverOnServerShutdown = true;
+      int callTimeout = 30000;
 
+      jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest",
+                                        connectorConfigs,
+                                        null,
+                                        DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+                                        DEFAULT_CONNECTION_TTL,
+                                        callTimeout,
+                                        DEFAULT_MAX_CONNECTIONS,
+                                        true,
+                                        DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                        DEFAULT_CONSUMER_WINDOW_SIZE,
+                                        DEFAULT_CONSUMER_MAX_RATE,
+                                        DEFAULT_PRODUCER_WINDOW_SIZE,
+                                        DEFAULT_PRODUCER_MAX_RATE,
+                                        DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+                                        DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+                                        DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+                                        DEFAULT_AUTO_GROUP,
+                                        DEFAULT_PRE_ACKNOWLEDGE,
+                                        DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+                                        DEFAULT_ACK_BATCH_SIZE,
+                                        DEFAULT_ACK_BATCH_SIZE,
+                                        DEFAULT_USE_GLOBAL_POOLS,
+                                        DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+                                        DEFAULT_THREAD_POOL_MAX_SIZE,
+                                        retryInterval,
+                                        retryIntervalMultiplier,
+                                        reconnectAttempts,
+                                        failoverOnServerShutdown,
+                                        jndiBindings);
+   }
+
    @Override
    protected void setUp() throws Exception
    {

Added: trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java	2009-09-10 03:14:38 UTC (rev 7948)
@@ -0,0 +1,493 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.journal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A MultiThreadConsumerStressTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class MultiThreadCompactorTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   final SimpleString ADDRESS = new SimpleString("SomeAddress");
+
+   final SimpleString QUEUE = new SimpleString("SomeQueue");
+
+   private HornetQServer server;
+
+   private ClientSessionFactory sf;
+
+   protected int getNumberOfIterations()
+   {
+      return 3;
+   }
+   
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      stopServer();
+      super.tearDown();
+   }
+
+   public void testMultiThreadCompact() throws Throwable
+   {
+      setupServer(JournalType.ASYNCIO);
+      for (int i = 0; i < getNumberOfIterations(); i++)
+      {
+         System.out.println("######################################");
+         System.out.println("test # " + i);
+         internalTestProduceAndConsume();
+         stopServer();
+
+         AIOSequentialFileFactory factory = new AIOSequentialFileFactory(getJournalDir());
+         JournalImpl journal = new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
+                                               2,
+                                               0,
+                                               0,
+                                               factory,
+                                               "hornetq-data",
+                                               "hq",
+                                               100);
+         List<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
+         List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
+         journal.start();
+         journal.load(committedRecords, preparedTransactions);
+
+         assertEquals(0, committedRecords.size());
+         assertEquals(0, preparedTransactions.size());
+
+         System.out.println("DataFiles = " + journal.getDataFilesCount());
+
+         if (i % 2 == 0 && i > 0)
+         {
+            System.out.println("DataFiles = " + journal.getDataFilesCount());
+            journal.forceMoveNextFile();
+            assertEquals(0, journal.getDataFilesCount());
+         }
+
+         journal.stop();
+         journal = null;
+
+         setupServer(JournalType.ASYNCIO);
+      }
+   }
+
+   public void internalTestProduceAndConsume() throws Throwable
+   {
+
+      addBogusData(100, "LAZY-QUEUE");
+
+      System.out.println(getTemporaryDir());
+      boolean transactionalOnConsume = true;
+      boolean transactionalOnProduce = true;
+      int numberOfConsumers = 30;
+      // this test assumes numberOfConsumers == numberOfProducers
+      int numberOfProducers = numberOfConsumers;
+      int produceMessage = 5000;
+      int commitIntervalProduce = 100;
+      int consumeMessage = (int)(produceMessage * 0.9);
+      int commitIntervalConsume = 100;
+
+      System.out.println("ConsumeMessages = " + consumeMessage + " produceMessage = " + produceMessage);
+
+      // Number of messages expected to be received after restart
+      int numberOfMessagesExpected = (produceMessage - consumeMessage) * numberOfConsumers;
+
+      CountDownLatch latchReady = new CountDownLatch(numberOfConsumers + numberOfProducers);
+
+      CountDownLatch latchStart = new CountDownLatch(1);
+
+      ArrayList<BaseThread> threads = new ArrayList<BaseThread>();
+
+      ProducerThread[] prod = new ProducerThread[numberOfProducers];
+      for (int i = 0; i < numberOfProducers; i++)
+      {
+         prod[i] = new ProducerThread(i,
+                                      latchReady,
+                                      latchStart,
+                                      transactionalOnConsume,
+                                      produceMessage,
+                                      commitIntervalProduce);
+         prod[i].start();
+         threads.add(prod[i]);
+      }
+
+      ConsumerThread[] cons = new ConsumerThread[numberOfConsumers];
+
+      for (int i = 0; i < numberOfConsumers; i++)
+      {
+         cons[i] = new ConsumerThread(i,
+                                      latchReady,
+                                      latchStart,
+                                      transactionalOnProduce,
+                                      consumeMessage,
+                                      commitIntervalConsume);
+         cons[i].start();
+         threads.add(cons[i]);
+      }
+
+      latchReady.await();
+      latchStart.countDown();
+
+      for (BaseThread t : threads)
+      {
+         t.join();
+         if (t.e != null)
+         {
+            throw t.e;
+         }
+      }
+
+      server.stop();
+
+      setupServer(JournalType.ASYNCIO);
+
+      drainQueue(numberOfMessagesExpected, QUEUE);
+      drainQueue(100, new SimpleString("LAZY-QUEUE"));
+
+      server.stop();
+
+      setupServer(JournalType.ASYNCIO);
+      drainQueue(0, QUEUE);
+      drainQueue(0, new SimpleString("LAZY-QUEUE"));
+
+   }
+
+   /**
+    * @param numberOfMessagesExpected
+    * @param queue
+    * @throws HornetQException
+    */
+   private void drainQueue(int numberOfMessagesExpected, SimpleString queue) throws HornetQException
+   {
+      ClientSession sess = sf.createSession(true, true);
+
+      ClientConsumer consumer = sess.createConsumer(queue);
+
+      sess.start();
+
+      for (int i = 0; i < numberOfMessagesExpected; i++)
+      {
+         ClientMessage msg = consumer.receive(5000);
+         assertNotNull(msg);
+
+         if (i % 100 == 0)
+         {
+            System.out.println("Received #" + i + "  on thread after start");
+         }
+         msg.acknowledge();
+      }
+
+      assertNull(consumer.receiveImmediate());
+
+      sess.close();
+   }
+
+   /**
+    * @throws HornetQException
+    */
+   private void addBogusData(int nmessages, String queue) throws HornetQException
+   {
+      ClientSession session = sf.createSession(false, false);
+      try
+      {
+         session.createQueue(queue, queue, true);
+      }
+      catch (Exception ignored)
+      {
+      }
+
+      ClientProducer prod = session.createProducer(queue);
+      for (int i = 0; i < nmessages; i++)
+      {
+         ClientMessage msg = session.createClientMessage(true);
+         msg.getBody().writeBytes(new byte[1024]);
+         prod.send(msg);
+      }
+      session.commit();
+
+      session.start();
+
+      ClientConsumer cons = session.createConsumer(queue);
+      assertNotNull(cons.receive(1000));
+      session.rollback();
+      session.close();
+   }
+
+   protected void stopServer() throws Exception
+   {
+      try
+      {
+         if (server != null && server.isStarted())
+         {
+            server.stop();
+         }
+      }
+      catch (Throwable e)
+      {
+         e.printStackTrace(System.out); // System.out => junit reports
+      }
+
+      sf = null;
+   }
+
+   private void setupServer(JournalType journalType) throws Exception, HornetQException
+   {
+      if (server == null)
+      {
+         Configuration config = createDefaultConfig(true);
+         config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
+
+         config.setJournalType(journalType);
+         config.setJMXManagementEnabled(false);
+
+         config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
+         config.setJournalMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_MIN_FILES);
+
+         config.setJournalCompactMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_MIN_FILES);
+         config.setJournalCompactPercentage(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE);
+
+         // config.setJournalCompactMinFiles(0);
+         // config.setJournalCompactPercentage(0);
+
+         server = createServer(true, config);
+      }
+
+      server.start();
+
+      sf = createNettyFactory();
+
+      ClientSession sess = sf.createSession();
+
+      try
+      {
+         sess.createQueue(ADDRESS, QUEUE, true);
+      }
+      catch (Exception ignored)
+      {
+      }
+
+      sess.close();
+
+      sf = createInVMFactory();
+   }
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   class BaseThread extends Thread
+   {
+      Throwable e;
+
+      final CountDownLatch latchReady;
+
+      final CountDownLatch latchStart;
+
+      final int numberOfMessages;
+
+      final int commitInterval;
+
+      final boolean transactional;
+
+      BaseThread(String name,
+                 CountDownLatch latchReady,
+                 CountDownLatch latchStart,
+                 boolean transactional,
+                 int numberOfMessages,
+                 int commitInterval)
+      {
+         super(name);
+         this.transactional = transactional;
+         this.latchReady = latchReady;
+         this.latchStart = latchStart;
+         this.commitInterval = commitInterval;
+         this.numberOfMessages = numberOfMessages;
+      }
+
+   }
+
+   class ProducerThread extends BaseThread
+   {
+      ProducerThread(int id,
+                     CountDownLatch latchReady,
+                     CountDownLatch latchStart,
+                     boolean transactional,
+                     int numberOfMessages,
+                     int commitInterval)
+      {
+         super("ClientProducer:" + id, latchReady, latchStart, transactional, numberOfMessages, commitInterval);
+      }
+
+      public void run()
+      {
+         ClientSession session = null;
+         latchReady.countDown();
+         try
+         {
+            latchStart.await();
+            session = sf.createSession(!transactional, !transactional);
+            ClientProducer prod = session.createProducer(ADDRESS);
+            for (int i = 0; i < numberOfMessages; i++)
+            {
+               if (transactional)
+               {
+                  if (i % commitInterval == 0)
+                  {
+                     session.commit();
+                  }
+               }
+               if (i % 100 == 0)
+               {
+                  // System.out.println(Thread.currentThread().getName() + "::sent #" + i);
+               }
+               ClientMessage msg = session.createClientMessage(true);
+               msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+               prod.send(msg);
+            }
+
+            if (transactional)
+            {
+               session.commit();
+            }
+
+            System.out.println("Thread " + Thread.currentThread().getName() +
+                               " sent " +
+                               numberOfMessages +
+                               "  messages");
+         }
+         catch (Throwable e)
+         {
+            e.printStackTrace();
+            this.e = e;
+         }
+         finally
+         {
+            try
+            {
+               session.close();
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+   }
+
+   class ConsumerThread extends BaseThread
+   {
+      ConsumerThread(int id,
+                     CountDownLatch latchReady,
+                     CountDownLatch latchStart,
+                     boolean transactional,
+                     int numberOfMessages,
+                     int commitInterval)
+      {
+         super("ClientConsumer:" + id, latchReady, latchStart, transactional, numberOfMessages, commitInterval);
+      }
+
+      public void run()
+      {
+         ClientSession session = null;
+         latchReady.countDown();
+         try
+         {
+            latchStart.await();
+            session = sf.createSession(!transactional, !transactional);
+            session.start();
+            ClientConsumer cons = session.createConsumer(QUEUE);
+            for (int i = 0; i < numberOfMessages; i++)
+            {
+               ClientMessage msg = cons.receive(60 * 1000);
+               msg.acknowledge();
+               if (i % commitInterval == 0)
+               {
+                  session.commit();
+               }
+               if (i % 100 == 0)
+               {
+                  // System.out.println(Thread.currentThread().getName() + "::received #" + i);
+               }
+            }
+
+            System.out.println("Thread " + Thread.currentThread().getName() +
+                               " received " +
+                               numberOfMessages +
+                               " messages");
+
+            session.commit();
+         }
+         catch (Throwable e)
+         {
+            this.e = e;
+         }
+         finally
+         {
+            try
+            {
+               session.close();
+            }
+            catch (Throwable e)
+            {
+               this.e = e;
+            }
+         }
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-09-09 16:37:43 UTC (rev 7947)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-09-10 03:14:38 UTC (rev 7948)
@@ -585,14 +585,14 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.journal.SequentialFile#lockBuffer()
        */
-      public void lockBuffer()
+      public void disableAutoFlush()
       {
       }
 
       /* (non-Javadoc)
        * @see org.hornetq.core.journal.SequentialFile#unlockBuffer()
        */
-      public void unlockBuffer()
+      public void enableAutoFlush()
       {
       }
 



More information about the hornetq-commits mailing list