[hornetq-commits] JBoss hornetq SVN: r9643 - in branches/Branch_2_1: examples/soak/tx-restarts and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Sep 3 17:24:49 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-09-03 17:24:48 -0400 (Fri, 03 Sep 2010)
New Revision: 9643

Modified:
   branches/Branch_2_1/.classpath
   branches/Branch_2_1/examples/soak/tx-restarts/README
   branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-configuration.xml
   branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java
   branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
   branches/Branch_2_1/src/main/org/hornetq/utils/ReusableLatch.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java
Log:
Tweaks on soak-test

Modified: branches/Branch_2_1/.classpath
===================================================================
--- branches/Branch_2_1/.classpath	2010-09-03 10:20:17 UTC (rev 9642)
+++ branches/Branch_2_1/.classpath	2010-09-03 21:24:48 UTC (rev 9643)
@@ -1,6 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <classpath>
 	<classpathentry excluding="**/.svn/**/*" kind="src" path="src/main"/>
+	<classpathentry kind="src" path="examples/soak/tx-restarts/src"/>
 	<classpathentry kind="src" path="src/config/common"/>
 	<classpathentry kind="src" path="build/src"/>
 	<classpathentry kind="src" path="tests/jms-tests/config"/>
@@ -17,7 +18,7 @@
 	<classpathentry kind="src" path="examples/core/microcontainer/src"/>
 	<classpathentry kind="src" path="examples/core/embedded-remote/src"/>
 	<classpathentry kind="src" path="examples/core/perf/src"/>
-   <classpathentry kind="src" path="examples/core/twitter-connector/src"/>
+	<classpathentry kind="src" path="examples/core/twitter-connector/src"/>
 	<classpathentry kind="src" path="examples/jms/applet/src"/>
 	<classpathentry kind="src" path="examples/jms/application-layer-failover/src"/>
 	<classpathentry kind="src" path="examples/jms/bridge/src"/>

Modified: branches/Branch_2_1/examples/soak/tx-restarts/README
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/README	2010-09-03 10:20:17 UTC (rev 9642)
+++ branches/Branch_2_1/examples/soak/tx-restarts/README	2010-09-03 21:24:48 UTC (rev 9643)
@@ -17,4 +17,13 @@
 
 You can start the server directly if you want, you can just start the server as:
 
+./run.sh PATH_TO_HORNETQ/examples/soak/tx-restarts/server0
 
+
+Then you can run the test as:
+
+./build.sh runRemote
+
+
+And you can now kill and restart the server manually as many times as you want.
+

Modified: branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-configuration.xml	2010-09-03 10:20:17 UTC (rev 9642)
+++ branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-configuration.xml	2010-09-03 21:24:48 UTC (rev 9643)
@@ -2,6 +2,9 @@
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
 
+
+   <journal-file-size>102400</journal-file-size>
+
    <!-- Connectors -->
    <connectors>
       <connector name="netty-connector">

Modified: branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java	2010-09-03 10:20:17 UTC (rev 9642)
+++ branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java	2010-09-03 21:24:48 UTC (rev 9643)
@@ -15,11 +15,14 @@
 
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.Queue;
 
+import org.hornetq.utils.ReusableLatch;
+
 /**
  * A Receiver
  *
@@ -36,16 +39,24 @@
    
    private Queue queue;
    
-   private final Semaphore sem = new Semaphore(0);
+   // We should leave some messages on paging. We don't want to consume all for this test
+   private final Semaphore minConsume = new Semaphore(0);
    
-   private final Semaphore max = new Semaphore(10000);
+   private final ReusableLatch latchMax = new ReusableLatch(0);
    
+   private static final int MAX_DIFF = 10000;
+   
+   // The difference between producer and consuming
+   private final AtomicInteger currentDiff = new AtomicInteger(0);
+   
    private final String queueJNDI;
    
-   protected volatile long msgs = 0;
+   protected long msgs = 0;
    
-   protected volatile long pendingMsgs = 0;
+   protected int pendingMsgs = 0;
    
+   protected int pendingSemaphores = 0;
+   
    protected MessageConsumer cons;
 
 
@@ -73,11 +84,6 @@
             
             for (int i = 0 ; i < 1000; i++)
             {
-               if (!sem.tryAcquire(1, 5, TimeUnit.SECONDS))
-               {
-                  break;
-               }
-               max.release();
                Message msg = cons.receive(5000);
                if (msg == null)
                {
@@ -91,6 +97,10 @@
                }
                
                pendingMsgs++;
+               if (!minConsume.tryAcquire(1, 5, TimeUnit.SECONDS))
+               {
+                  break;
+               }
                
             }
             
@@ -126,8 +136,10 @@
    protected void onCommit()
    {
       msgs += pendingMsgs;
+      this.currentDiff.addAndGet(-pendingMsgs);
+      latchMax.countDown(pendingMsgs);
       pendingMsgs = 0;
-      System.out.println("Commit on consumer " + queueJNDI + ", msgs=" + msgs);
+      System.out.println("Commit on consumer " + queueJNDI + ", msgs=" + msgs + " currentDiff = " + currentDiff);
    }
 
    /* (non-Javadoc)
@@ -137,6 +149,7 @@
    protected void onRollback()
    {
       System.out.println("Rollback on consumer " + queueJNDI + ", msgs=" + msgs);
+      minConsume.release(pendingMsgs);
       pendingMsgs = 0;
    }
    
@@ -148,17 +161,24 @@
    /**
     * @param pendingMsgs2
     */
-   public void messageProduced(int pendingMsgs2)
+   public void messageProduced(int producedMessages)
    {
-      sem.release(pendingMsgs2);
-      try
+      minConsume.release(producedMessages);
+      currentDiff.addAndGet(producedMessages);
+      System.out.println("Msg produced on " + this.queueJNDI + ", currentDiff = " + currentDiff);
+      if (currentDiff.get() > MAX_DIFF)
       {
-         max.tryAcquire(pendingMsgs2, 5, TimeUnit.SECONDS);
+         System.out.println("Holding producer for 5 seconds");
+         latchMax.setCount(currentDiff.get() - MAX_DIFF);
+         try
+         {
+            latchMax.await(5, TimeUnit.SECONDS);
+         }
+         catch (InterruptedException e)
+         {
+            e.printStackTrace();
+         }
       }
-      catch (InterruptedException e)
-      {
-         e.printStackTrace();
-      }
    }
 
    // Package protected ---------------------------------------------

Modified: branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java	2010-09-03 10:20:17 UTC (rev 9642)
+++ branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java	2010-09-03 21:24:48 UTC (rev 9643)
@@ -113,10 +113,10 @@
          rec1.start();
          rec2.start();
          
+         long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
          
          if (runServer)
          {
-            long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
             while (timeEnd > System.currentTimeMillis())
             {
                System.out.println("Letting the service run for 20 seconds");
@@ -148,10 +148,14 @@
          }
          else
          {
-            long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
             while (timeEnd > System.currentTimeMillis())
             {
-
+               if (send.getErrorsCount() != 0 || rec1.getErrorsCount() != 0 || rec2.getErrorsCount() != 0)
+               {
+                  System.out.println("There are sequence errors in some of the clients, please look at the logs");
+                  break;
+               }
+               Thread.sleep(10000);
             }
          }
          

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java	2010-09-03 10:20:17 UTC (rev 9642)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java	2010-09-03 21:24:48 UTC (rev 9643)
@@ -268,8 +268,7 @@
    {
       if (file.getFile().size() != fileSize)
       {
-         JournalFilesRepository.log.warn("Deleting " + file + ".. as it doesn't have the configured size",
-                                         new Exception("trace"));
+         JournalFilesRepository.log.warn("Deleting " + file + ".. as it doesn't have the configured size");
          file.getFile().delete();
       }
       else

Modified: branches/Branch_2_1/src/main/org/hornetq/utils/ReusableLatch.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/utils/ReusableLatch.java	2010-09-03 10:20:17 UTC (rev 9642)
+++ branches/Branch_2_1/src/main/org/hornetq/utils/ReusableLatch.java	2010-09-03 21:24:48 UTC (rev 9643)
@@ -87,6 +87,11 @@
             }
 
             int newState = actualState - numberOfReleases;
+            
+            if (newState < 0)
+            {
+               newState = 0;
+            }
 
             if (compareAndSetState(actualState, newState))
             {
@@ -128,6 +133,12 @@
       control.releaseShared(1);
    }
 
+
+   public void countDown(final int count)
+   {
+      control.releaseShared(count);
+   }
+
    public void await() throws InterruptedException
    {
       control.acquireSharedInterruptibly(1);

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java	2010-09-03 10:20:17 UTC (rev 9642)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java	2010-09-03 21:24:48 UTC (rev 9643)
@@ -30,6 +30,19 @@
 {
    private static final Logger log = Logger.getLogger(ReusableLatchTest.class);
 
+   
+   public void testLatchWithParameterizedDown() throws Exception
+   {
+      ReusableLatch latch = new ReusableLatch(1000);
+      
+      latch.countDown(5000);
+      
+      assertTrue(latch.await(1000));
+      
+      
+      assertEquals(0, latch.getCount());
+   }
+   
    public void testLatchOnSingleThread() throws Exception
    {
       ReusableLatch latch = new ReusableLatch();



More information about the hornetq-commits mailing list