[hornetq-commits] JBoss hornetq SVN: r9288 - in trunk: tests/src/org/hornetq/tests/integration/journal and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jun 3 02:00:02 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-06-03 02:00:01 -0400 (Thu, 03 Jun 2010)
New Revision: 9288

Added:
   trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java
Modified:
   trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
   trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-399 - fixing file size and adding tests

Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2010-06-01 14:55:59 UTC (rev 9287)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2010-06-03 06:00:01 UTC (rev 9288)
@@ -181,6 +181,10 @@
       {
          sequentialFile.position(0);
          SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+         
+         // To Fix the size of the file
+         writingChannel.writerIndex(writingChannel.capacity());
+         
          sequentialFile.writeDirect(writingChannel.toByteBuffer(), true, completion);
          completion.waitCompletion();
          sequentialFile.close();

Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2010-06-01 14:55:59 UTC (rev 9287)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2010-06-03 06:00:01 UTC (rev 9288)
@@ -14,6 +14,7 @@
 package org.hornetq.tests.integration.journal;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -204,7 +205,7 @@
          performNonTransactionalDelete = false;
       }
 
-      setup(50, 60 * 1024, false);
+      setup(2, 60 * 1024, false);
 
       ArrayList<Long> liveIDs = new ArrayList<Long>();
 
@@ -765,6 +766,30 @@
 
       file.mkdir();
    }
+   
+   protected void tearDown() throws Exception
+   {
+      
+      File testDir = new File(getTestDir());
+      
+      File files[] = testDir.listFiles(new FilenameFilter()
+      {
+         
+         public boolean accept(File dir, String name)
+         {
+            return name.startsWith(filePrefix) && name.endsWith(fileExtension);
+         }
+      });
+      
+      for (File file : files)
+      {
+         assertEquals("File "  + file + " doesn't have the expected number of bytes", fileSize, file.length());
+         
+         System.out.println("File " +  file);
+      }
+      
+      super.tearDown();
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()

Added: trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java	2010-06-03 06:00:01 UTC (rev 9288)
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+import java.util.ArrayList;
+import java.util.Random;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * Simulates the journal being updated, compacted cleared up, 
+ * and having multiple restarts,
+ * To make sure the journal would survive at multiple restarts of the server
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalRestartStressTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testLoad() throws Throwable
+   {
+      HornetQServer server2 = createServer(true, false);
+
+      server2.getConfiguration().setJournalFileSize(10 * 1024 * 1024);
+      server2.getConfiguration().setJournalMinFiles(10);
+      server2.getConfiguration().setJournalCompactMinFiles(3);
+      server2.getConfiguration().setJournalCompactPercentage(50);
+
+      
+      for (int i = 0 ; i < 10; i++)
+      {
+         server2.start();
+         
+         ClientSessionFactory sf = createFactory(false);
+         sf.setMinLargeMessageSize(1024 * 1024);
+         sf.setBlockOnDurableSend(false);
+   
+
+         ClientSession session = sf.createSession(true, true);
+         
+         try
+         {
+            session.createQueue("slow-queue", "slow-queue");
+         }
+         catch (Exception ignored)
+         {
+         }
+
+         session.start();
+         ClientConsumer consumer = session.createConsumer("slow-queue");
+         
+         
+         while (true)
+         {
+            System.out.println("Received message from previous");
+            ClientMessage msg = consumer.receiveImmediate();
+            if (msg == null)
+            {
+               break;
+            }
+            msg.acknowledge();
+         }
+            
+          
+   
+         produceMessages(sf, 30000);
+         
+         server2.stop();
+      }
+
+   }
+   // Package protected ---------------------------------------------
+
+   /**
+    * @param TIMEOUT
+    * @param NMSGS
+    * @throws HornetQException
+    * @throws InterruptedException
+    * @throws Throwable
+    */
+   private void produceMessages(final ClientSessionFactory sf, final int NMSGS) throws HornetQException,
+                                                                   InterruptedException,
+                                                                   Throwable
+   {
+ 
+      final int TIMEOUT = 5000;
+      
+      System.out.println("sending " + NMSGS + " messages");
+
+
+      final ClientSession sessionSend = sf.createSession(true, true);
+      
+      ClientProducer prod2 = sessionSend.createProducer("slow-queue");
+      
+
+      try
+      {
+         sessionSend.createQueue("Queue", "Queue", true);
+      }
+      catch (Exception ignored)
+      {
+      }
+
+      final ClientSession sessionReceive = sf.createSession(true, true);
+      sessionReceive.start();
+
+      final ArrayList<Throwable> errors = new ArrayList<Throwable>();
+
+      Thread tReceive = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               ClientConsumer consumer = sessionReceive.createConsumer("Queue");
+
+               for (int i = 0; i < NMSGS; i++)
+               {
+                  if (i % 500 == 0)
+                  {
+                     double percent = (double)i / (double) NMSGS;
+                     System.out.println("msgs " + i + " of "  + NMSGS +  ", " + (int)(percent * 100) + "%");
+                     Thread.sleep(100);
+                  }
+
+                  ClientMessage msg = consumer.receive(TIMEOUT);
+                  if (msg == null)
+                  {
+                     errors.add(new Exception("Didn't receive msgs"));
+                     break;
+                  }
+                  msg.acknowledge();
+               }
+            }
+            catch (Exception e)
+            {
+               errors.add(e);
+            }
+         }
+      };
+
+      tReceive.start();
+
+      ClientProducer prod = sessionSend.createProducer("Queue");
+
+      Random random = new Random();
+
+      for (int i = 0; i < NMSGS; i++)
+      {
+         ClientMessage msg = sessionSend.createMessage(true);
+         
+         int size = RandomUtil.randomPositiveInt() % 10024;
+
+         if (size == 0) size = 10 * 1024;
+         
+         byte[] buffer = new byte[size];
+
+         random.nextBytes(buffer);
+
+         msg.getBodyBuffer().writeBytes(buffer);
+
+         prod.send(msg);
+         
+         if (i % 5000 == 0)
+         {
+            prod2.send(msg);
+            System.out.println("Sending slow message");
+         }
+      }
+
+      tReceive.join();
+
+      sessionReceive.close();
+      sessionSend.close();
+
+      for (Throwable e : errors)
+      {
+         throw e;
+      }
+   }
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}



More information about the hornetq-commits mailing list