[hornetq-commits] JBoss hornetq SVN: r9144 - in trunk: src/main/org/hornetq/core/server and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Apr 21 12:35:33 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-04-21 12:35:31 -0400 (Wed, 21 Apr 2010)
New Revision: 9144

Modified:
   trunk/build-maven.xml
   trunk/src/main/org/hornetq/core/server/ActivateCallback.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
Changes to Asynchronous Failover Test

Modified: trunk/build-maven.xml
===================================================================
--- trunk/build-maven.xml	2010-04-21 13:10:13 UTC (rev 9143)
+++ trunk/build-maven.xml	2010-04-21 16:35:31 UTC (rev 9144)
@@ -13,7 +13,7 @@
   -->
 
 <project default="upload" name="HornetQ">
-   <property name="hornetq.version" value="2.1.0.BETA2"/>
+   <property name="hornetq.version" value="2.1.0.BETA3"/>
    <property name="build.dir" value="build"/>
    <property name="jars.dir" value="${build.dir}/jars"/>
 

Modified: trunk/src/main/org/hornetq/core/server/ActivateCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ActivateCallback.java	2010-04-21 13:10:13 UTC (rev 9143)
+++ trunk/src/main/org/hornetq/core/server/ActivateCallback.java	2010-04-21 16:35:31 UTC (rev 9144)
@@ -22,5 +22,7 @@
  */
 public interface ActivateCallback
 {
+   void preActivate();
+   
    void activated();
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-04-21 13:10:13 UTC (rev 9143)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-04-21 16:35:31 UTC (rev 9144)
@@ -860,6 +860,15 @@
       }
    }
 
+
+   private void callPreActiveCallbacks()
+   {
+      for (ActivateCallback callback : activateCallbacks)
+      {
+         callback.preActivate();
+      }
+   }
+
    public synchronized boolean checkActivate() throws Exception
    {
       if (configuration.isBackup())
@@ -938,6 +947,8 @@
       {
          deploymentManager = new FileDeploymentManager(configuration.getFileDeployerScanPeriod());
       }
+      
+      callPreActiveCallbacks();
 
       startReplication();
 

Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2010-04-21 13:10:13 UTC (rev 9143)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2010-04-21 16:35:31 UTC (rev 9144)
@@ -166,6 +166,11 @@
 
    // ActivateCallback implementation -------------------------------------
 
+   public void preActivate()
+   {
+      
+   }
+   
    public synchronized void activated()
    {
       active = true;

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java	2010-04-21 13:10:13 UTC (rev 9143)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java	2010-04-21 16:35:31 UTC (rev 9144)
@@ -21,6 +21,8 @@
 import junit.framework.Assert;
 
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientConsumer;
 import org.hornetq.api.core.client.ClientMessage;
@@ -50,6 +52,8 @@
 
    private volatile ClientSessionFactoryInternal sf;
 
+   private Object lockFail = new Object();
+
    class MyListener implements SessionFailureListener
    {
       CountDownLatch latch = new CountDownLatch(1);
@@ -87,11 +91,22 @@
    {
       runTest(new TestRunner()
       {
+         volatile boolean running = false;
+
          public void run()
          {
             try
             {
-               doTestTransactional(this);
+               assertFalse(running);
+               running = true;
+               try
+               {
+                  doTestTransactional(this);
+               }
+               finally
+               {
+                  running = false;
+               }
             }
             catch (Throwable e)
             {
@@ -107,7 +122,7 @@
       volatile boolean failed;
 
       ArrayList<Throwable> errors = new ArrayList<Throwable>();
-      
+
       boolean isFailed()
       {
          return failed;
@@ -122,30 +137,27 @@
       {
          failed = false;
       }
-      
+
       synchronized void addException(Throwable e)
       {
          errors.add(e);
       }
-      
+
       void checkForExceptions() throws Throwable
       {
          if (errors.size() > 0)
          {
             log.warn("Exceptions on test:");
-            for (Throwable e: errors)
+            for (Throwable e : errors)
             {
                log.warn(e.getMessage(), e);
             }
             // throwing the first error that happened on the Runnable
             throw errors.get(0);
          }
-         
 
-         
       }
-      
-      
+
    }
 
    private void runTest(final TestRunner runnable) throws Throwable
@@ -173,6 +185,8 @@
 
             Thread t = new Thread(runnable);
 
+            t.setName("MainTEST");
+
             t.start();
 
             long randomDelay = (long)(2000 * Math.random());
@@ -186,7 +200,10 @@
             MyListener listener = this.listener;
 
             // Simulate failure on connection
-            conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+            synchronized (lockFail)
+            {
+               conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+            }
 
             if (listener != null)
             {
@@ -200,7 +217,7 @@
             AsynchronousFailoverTest.log.info("Fail complete");
 
             t.join();
-            
+
             runnable.checkForExceptions();
 
             createSession.close();
@@ -242,24 +259,24 @@
          session.addFailureListener(listener);
 
          this.listener = listener;
-         
+
          ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
          final int numMessages = 1000;
 
          for (int i = 0; i < numMessages; i++)
          {
-            ClientMessage message = session.createMessage(true);
-
-            message.getBodyBuffer().writeString("message" + i);
-
-            message.putIntProperty("counter", i);
-
             boolean retry = false;
             do
             {
                try
                {
+                  ClientMessage message = session.createMessage(true);
+
+                  message.getBodyBuffer().writeString("message" + i);
+
+                  message.putIntProperty("counter", i);
+
                   producer.send(message);
 
                   retry = false;
@@ -343,69 +360,59 @@
 
    private void doTestTransactional(final TestRunner runner) throws Exception
    {
+      // For duplication detection
+      int executionId = 0;
+
       while (!runner.isFailed())
       {
          ClientSession session = null;
 
+         executionId++;
+
          try
          {
-            session = sf.createSession(false, false);
 
             MyListener listener = new MyListener();
 
-            session.addFailureListener(listener);
-
             this.listener = listener;
+            boolean retry = false;
 
-            ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
             final int numMessages = 1000;
 
-            for (int i = 0; i < numMessages; i++)
+            session = sf.createSession(false, false);
+
+            session.addFailureListener(listener);
+
+            do
             {
-               ClientMessage message = session.createMessage(true);
+               try
+               {
+                  ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-               message.getBodyBuffer().writeString("message" + i);
+                  for (int i = 0; i < numMessages; i++)
+                  {
+                     ClientMessage message = session.createMessage(true);
 
-               message.putIntProperty("counter", i);
+                     message.getBodyBuffer().writeString("message" + i);
 
-               boolean retry = false;
-               do
-               {
-                  try
-                  {
+                     message.putIntProperty("counter", i);
+
+                     message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("id:" + i +
+                                                                                                    ",exec:" +
+                                                                                                    executionId));
+
                      producer.send(message);
                   }
-                  catch (HornetQException e)
-                  {
-                     Assert.assertEquals(e.getCode(), HornetQException.UNBLOCKED);
 
-                     retry = true;
-                  }
-               }
-               while (retry);
-            }
-
-            boolean retry = false;
-            while (retry)
-            {
-               try
-               {
                   session.commit();
 
                   retry = false;
                }
                catch (HornetQException e)
                {
-                  if (e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK)
+                  if (e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK || e.getCode() == HornetQException.UNBLOCKED)
                   {
                      // OK
-                     session.close();
-
-                     continue;
-                  }
-                  else if (e.getCode() == HornetQException.UNBLOCKED)
-                  {
                      retry = true;
                   }
                   else
@@ -414,70 +421,79 @@
                   }
                }
             }
+            while (retry);
 
-            ClientConsumer consumer = null;
             
-            while (true)
+            
+            boolean blocked = false;
+
+            retry = false;
+            
+            ClientConsumer consumer = null; 
+            do
             {
+               ArrayList<Integer> msgs = new ArrayList<Integer>();
                try
                {
-                  consumer = session.createConsumer(FailoverTestBase.ADDRESS);
-                  
-                  break;
-               }
-               catch (HornetQException e)
-               {
-                  if (e.getCode() == HornetQException.UNBLOCKED)
+                  if (consumer == null)
                   {
-                     continue;
+                     consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+                     session.start();
                   }
-                  throw e;
-               }
-            }
-            
-            session.start();
 
-            int lastCount = -1;
-            while (true)
-            {
-               ClientMessage message = consumer.receive(500);
+                  for (int i = 0; i < numMessages; i++)
+                  {
+                     ClientMessage message = consumer.receive(500);
+                     if (message == null)
+                     {
+                        break;
+                     }
 
-               if (message == null)
-               {
-                  break;
-               }
+                     int count = message.getIntProperty("counter");
 
-               // There may be some missing or duplicate messages - but the order should be correct
+                     msgs.add(count);
 
-               int count = message.getIntProperty("counter");
+                     message.acknowledge();
+                  }
 
-               Assert.assertTrue("count:" + count + " last count:" + lastCount, count >= lastCount);
+                  session.commit();
+                  
+                  if (blocked)
+                  {
+                     assertTrue("msgs.size is expected to be 0 or "  + numMessages + " but it was " + msgs.size(), msgs.size() == 0 || msgs.size() == numMessages);
+                  }
+                  else
+                  {
+                     assertTrue("msgs.size is expected to be "  + numMessages  + " but it was " + msgs.size(), msgs.size() == numMessages);
+                  }
 
-               lastCount = count;
+                  int i = 0;
+                  for (Integer msg : msgs)
+                  {
+                     assertEquals(i++, (int)msg);
+                  }
 
-               message.acknowledge();
-            }
-
-            retry = false;
-            while (retry)
-            {
-               try
-               {
-                  session.commit();
-
                   retry = false;
+                  blocked = false;
                }
                catch (HornetQException e)
                {
                   if (e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK)
                   {
-                     // OK
-                     session.close();
-
-                     continue;
+                     // TODO: https://jira.jboss.org/jira/browse/HORNETQ-369
+                     // ATM RolledBack exception is being called with the transaction is committed.
+                     // the test will fail if you remove this next line
+                     blocked = true;
                   }
                   else if (e.getCode() == HornetQException.UNBLOCKED)
                   {
+                     // TODO: https://jira.jboss.org/jira/browse/HORNETQ-369
+                     // This part of the test is never being called.
+                     blocked = true;
+                  }
+
+                  if (e.getCode() == HornetQException.UNBLOCKED || e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK)
+                  {
                      retry = true;
                   }
                   else
@@ -486,6 +502,7 @@
                   }
                }
             }
+            while (retry);
          }
          finally
          {

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2010-04-21 13:10:13 UTC (rev 9143)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2010-04-21 16:35:31 UTC (rev 9144)
@@ -27,6 +27,7 @@
 import org.hornetq.core.remoting.impl.invm.InVMConnector;
 import org.hornetq.core.remoting.impl.invm.InVMRegistry;
 import org.hornetq.core.remoting.impl.invm.TransportConstants;
+import org.hornetq.core.server.ActivateCallback;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.tests.util.ServiceTestBase;
 
@@ -98,6 +99,26 @@
       config1.setSharedStore(true);
       config1.setBackup(true);
       server1Service = createServer(true, config1);
+      
+      server1Service.registerActivateCallback(new ActivateCallback()
+      {
+         
+         public void preActivate()
+         {
+            // To avoid two servers messing up with the same journal at any single point
+            try
+            {
+               server0Service.getStorageManager().stop();
+            }
+            catch (Exception ignored)
+            {
+            }
+         }
+         
+         public void activated()
+         {
+         }
+      });
 
       Configuration config0 = super.createDefaultConfig();
       config0.getAcceptorConfigurations().clear();



More information about the hornetq-commits mailing list