[hornetq-commits] JBoss hornetq SVN: r10322 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/journal and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Mar 11 16:33:07 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-03-11 16:33:07 -0500 (Fri, 11 Mar 2011)
New Revision: 10322

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
Log:
Adding new test

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2011-03-11 21:32:40 UTC (rev 10321)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2011-03-11 21:33:07 UTC (rev 10322)
@@ -1610,6 +1610,7 @@
 
             if (dataFilesToProcess.size() == 0)
             {
+               trace("Finishing compacting, nothing to process");
                return;
             }
 
@@ -1740,7 +1741,7 @@
 
          if (JournalImpl.trace)
          {
-            JournalImpl.log.debug("Finished compacting on journal");
+            trace("Finished compacting on journal");
          }
 
          if (JournalImpl.TRACE_RECORDS)

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2011-03-11 21:32:40 UTC (rev 10321)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2011-03-11 21:33:07 UTC (rev 10322)
@@ -17,13 +17,21 @@
 import java.io.FilenameFilter;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import junit.framework.Assert;
 
 import org.hornetq.api.core.Pair;
-import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.PreparedTransactionInfo;
 import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.SequentialFile;
@@ -36,9 +44,13 @@
 import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase;
 import org.hornetq.tests.unit.core.journal.impl.fakes.SimpleEncoding;
 import org.hornetq.utils.IDGenerator;
+import org.hornetq.utils.OrderedExecutorFactory;
 import org.hornetq.utils.SimpleIDGenerator;
 
 /**
@@ -238,41 +250,41 @@
       setup(2, 60 * 1024, false);
 
       createJournal();
-      
+
       startJournal();
-      
+
       load();
 
       startCompact();
-      
+
       addTx(1, 2);
-      
+
       prepare(1, new SimpleEncoding(10, (byte)0));
-      
+
       finishCompact();
-      
+
       stopJournal();
-      
+
       createJournal();
-      
+
       startJournal();
-      
+
       loadAndCheck();
-      
+
       startCompact();
-      
+
       commit(1);
-      
+
       finishCompact();
-      
+
       journal.compact();
-      
+
       stopJournal();
-      
+
       createJournal();
 
       startJournal();
-      
+
       loadAndCheck();
    }
 
@@ -281,37 +293,37 @@
       setup(2, 60 * 1024, false);
 
       createJournal();
-      
+
       startJournal();
-      
+
       load();
 
       addTx(1, 2);
-      
+
       prepare(1, new SimpleEncoding(10, (byte)0));
-      
+
       stopJournal();
-      
+
       createJournal();
-      
+
       startJournal();
-      
+
       loadAndCheck();
-      
+
       startCompact();
-      
+
       commit(1);
-      
+
       finishCompact();
-      
+
       journal.compact();
-      
+
       stopJournal();
-      
+
       createJournal();
 
       startJournal();
-      
+
       loadAndCheck();
    }
 
@@ -320,29 +332,29 @@
       setup(2, 60 * 1024, false);
 
       createJournal();
-      
+
       startJournal();
-      
+
       load();
 
       addTx(1, 2, 3);
-      
+
       prepare(1, new SimpleEncoding(10, (byte)0));
-      
+
       startCompact();
-      
+
       commit(1);
-      
+
       finishCompact();
-      
+
       journal.compact();
-      
+
       stopJournal();
-      
+
       createJournal();
 
       startJournal();
-      
+
       loadAndCheck();
    }
 
@@ -1446,7 +1458,7 @@
       }
 
       long tx1 = idGenerator.generateID();
-      
+
       journal.forceMoveNextFile();
 
       ArrayList<Long> listToDelete = new ArrayList<Long>();
@@ -1469,7 +1481,7 @@
       startCompact();
       System.out.println("Committing TX " + tx1);
       rollback(tx0);
-      for (int i = 0 ; i < 10; i++)
+      for (int i = 0; i < 10; i++)
       {
          addTx(tx1, ids[i]);
       }
@@ -1506,7 +1518,7 @@
       }
 
       long tx1 = idGenerator.generateID();
-      
+
       journal.forceMoveNextFile();
 
       ArrayList<Long> listToDelete = new ArrayList<Long>();
@@ -1529,7 +1541,7 @@
       startCompact();
       System.out.println("Committing TX " + tx1);
       rollback(tx0);
-      for (int i = 0 ; i < 10; i++)
+      for (int i = 0; i < 10; i++)
       {
          addTx(tx1, ids[i]);
       }
@@ -1564,11 +1576,11 @@
          ids[i] = idGenerator.generateID();
          addTx(tx0, ids[i]);
       }
-      
+
       commit(tx0);
 
       startCompact();
-      for (int i = 0 ; i < 10; i++)
+      for (int i = 0; i < 10; i++)
       {
          delete(ids[i]);
       }
@@ -1591,22 +1603,22 @@
       long tx0 = idGenerator.generateID();
       add(idGenerator.generateID());
 
-      long ids[] = new long[]{idGenerator.generateID(), idGenerator.generateID()};
+      long ids[] = new long[] { idGenerator.generateID(), idGenerator.generateID() };
 
       addTx(tx0, ids[0]);
       addTx(tx0, ids[1]);
-      
+
       journal.forceMoveNextFile();
-      
+
       commit(tx0);
-      
+
       journal.forceMoveNextFile();
-      
+
       delete(ids[0]);
       delete(ids[1]);
-      
+
       journal.forceMoveNextFile();
-      
+
       journal.compact();
 
       stopJournal();
@@ -1700,6 +1712,161 @@
 
    }
 
+   public void testStressDeletesNoSync() throws Exception
+   {
+      Configuration config = createBasicConfig();
+      config.setJournalFileSize(100 * 1024);
+      System.out.println(config.getJournalDirectory());
+      config.setJournalSyncNonTransactional(false);
+      config.setJournalSyncTransactional(false);
+//      config.setJournalBufferTimeout_NIO(2000000000);
+//      config.setJournalBufferTimeout_AIO(2000000000);
+      config.setJournalCompactMinFiles(0);
+      config.setJournalCompactPercentage(0);
+
+      final AtomicInteger errors = new AtomicInteger(0);
+
+      final AtomicBoolean running = new AtomicBoolean(true);
+
+      final AtomicLong seqGenerator = new AtomicLong(1);
+
+      final ExecutorService executor = Executors.newCachedThreadPool();
+      
+      OrderedExecutorFactory factory = new OrderedExecutorFactory(executor);
+      
+      final ExecutorService deleteExecutor = Executors.newCachedThreadPool();
+
+      final JournalStorageManager storage = new JournalStorageManager(config, factory);
+
+      storage.start();
+      storage.loadInternalOnly();
+      
+      ((JournalImpl)storage.getMessageJournal()).setAutoReclaim(false);
+      final LinkedList<Long> survivingMsgs = new LinkedList<Long>();
+
+      Runnable producerRunnable = new Runnable()
+      {
+         public void run()
+         {
+            try
+            {
+               while (running.get())
+               {
+                  final long[] values = new long[100];
+                  long tx = seqGenerator.incrementAndGet();
+
+                  OperationContextImpl ctx = new OperationContextImpl(executor);
+                  storage.setContext(ctx);
+
+                  for (int i = 0; i < 100; i++)
+                  {
+                     long id = seqGenerator.incrementAndGet();
+                     values[i] = id;
+
+                     ServerMessageImpl message = new ServerMessageImpl(id, 100);
+
+                     message.getBodyBuffer().writeBytes(new byte[1024]);
+
+                     storage.storeMessageTransactional(tx, message);
+                  }
+                  ServerMessageImpl message = new ServerMessageImpl(seqGenerator.incrementAndGet(), 100);
+                  
+                  survivingMsgs.add(message.getMessageID());
+                  
+                  // This one will stay here forever
+                  storage.storeMessage(message);
+
+                  storage.commit(tx);
+
+                  ctx.executeOnCompletion(new IOAsyncTask()
+                  {
+                     public void onError(int errorCode, String errorMessage)
+                     {
+                     }
+
+                     public void done()
+                     {
+                        deleteExecutor.execute(new Runnable()
+                        {
+                           public void run()
+                           {
+                              try
+                              {
+                                 for (long messageID : values)
+                                 {
+                                    storage.deleteMessage(messageID);
+                                 }
+                              }
+                              catch (Exception e)
+                              {
+                                 e.printStackTrace();
+                                 errors.incrementAndGet();
+                              }
+
+                           }
+                        });
+                     }
+                  });
+
+               }
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+               errors.incrementAndGet();
+            }
+         }
+      };
+
+      Runnable compressRunnable = new Runnable()
+      {
+         public void run()
+         {
+            try
+            {
+               int i = 0;
+               while (running.get())
+               {
+                  Thread.sleep(500);
+                  System.out.println("Compacting");
+                  ((JournalImpl)storage.getMessageJournal()).compact();
+                  ((JournalImpl)storage.getMessageJournal()).checkReclaimStatus();
+               }
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+               errors.incrementAndGet();
+            }
+
+         }
+      };
+
+      Thread producerThread = new Thread(producerRunnable);
+      producerThread.start();
+
+      Thread compactorThread = new Thread(compressRunnable);
+      compactorThread.start();
+
+      Thread.sleep(10000);
+
+      running.set(false);
+
+      producerThread.join();
+
+      compactorThread.join();
+
+      executor.shutdown();
+
+      assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+      
+      deleteExecutor.shutdown();
+      
+      assertTrue(deleteExecutor.awaitTermination(30, TimeUnit.SECONDS));
+
+      storage.stop();
+   }
+
    @Override
    protected void setUp() throws Exception
    {



More information about the hornetq-commits mailing list