[hornetq-commits] JBoss hornetq SVN: r8625 - trunk/tests/src/org/hornetq/tests/integration/cluster/reattach.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Dec 8 17:05:34 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-12-08 17:05:34 -0500 (Tue, 08 Dec 2009)
New Revision: 8625

Added:
   trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/OrderReattachTest.java
Log:
Adding test that reliably fail on the failover ordering issue

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/OrderReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/OrderReattachTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/OrderReattachTest.java	2009-12-08 22:05:34 UTC (rev 8625)
@@ -0,0 +1,393 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.reattach;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+import junit.framework.TestSuite;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.MessageHandler;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A OrderReattachTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class OrderReattachTest extends ServiceTestBase
+{
+
+   // Disabled for now... under investigation (Clebert)
+   public static TestSuite suite()
+   {
+      TestSuite suite = new TestSuite();
+
+      return suite;
+   }
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   private final Logger log = Logger.getLogger(this.getClass());
+
+   private HornetQServer server;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testOrderOnSendInVM() throws Throwable
+   {
+      for (int i = 0; i < 500; i++)
+      {
+         log.info("#" + getName() + " # " + i);
+         doTestOrderOnSend(false);
+         tearDown();
+         setUp();
+      }
+   }
+
+   public void doTestOrderOnSend(final boolean isNetty) throws Throwable
+   {
+      server = createServer(false, isNetty);
+
+      server.start();
+
+      ClientSessionFactory sf = createFactory(isNetty);
+      sf.setReconnectAttempts(-1);
+      sf.setConfirmationWindowSize(100 * 1024 * 1024);
+      sf.setBlockOnNonDurableSend(false);
+      sf.setBlockOnAcknowledge(false);
+
+      final ClientSession session = sf.createSession(false, true, true);
+
+      final LinkedBlockingDeque<Boolean> failureQueue = new LinkedBlockingDeque<Boolean>();
+
+      final CountDownLatch ready = new CountDownLatch(1);
+
+      Thread failer = new Thread()
+      {
+         @Override
+         public void run()
+         {
+            ready.countDown();
+            while (true)
+            {
+               try
+               {
+                  Boolean poll = false;
+                  try
+                  {
+                     poll = failureQueue.poll(60, TimeUnit.SECONDS);
+                  }
+                  catch (InterruptedException e)
+                  {
+                     e.printStackTrace();
+                     break;
+                  }
+
+                  Thread.sleep(1);
+
+                  final RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
+
+                  // True means... fail session
+                  if (poll)
+                  {
+                     conn.fail(new HornetQException(HornetQException.NOT_CONNECTED, "poop"));
+                  }
+                  else
+                  {
+                     // false means... finish thread
+                     break;
+                  }
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace();
+               }
+            }
+         }
+      };
+
+      failer.start();
+
+      ready.await();
+
+      try
+      {
+         int numberOfProducers = 1;
+
+         final CountDownLatch align = new CountDownLatch(numberOfProducers);
+         final CountDownLatch flagStart = new CountDownLatch(1);
+
+         final ClientSessionFactory sessionFactory = sf;
+
+         class ThreadProd extends Thread
+         {
+            Throwable throwable;
+
+            int count;
+
+            public ThreadProd(final int count)
+            {
+               this.count = count;
+            }
+
+            @Override
+            public void run()
+            {
+               try
+               {
+                  align.countDown();
+                  flagStart.await();
+                  doSend2(count, sessionFactory, failureQueue);
+               }
+               catch (Throwable e)
+               {
+                  e.printStackTrace();
+                  throwable = e;
+               }
+            }
+         }
+
+         ThreadProd prod[] = new ThreadProd[numberOfProducers];
+         for (int i = 0; i < prod.length; i++)
+         {
+            prod[i] = new ThreadProd(i);
+            prod[i].start();
+         }
+
+         align.await();
+         flagStart.countDown();
+
+         for (ThreadProd prodT : prod)
+         {
+            prodT.join();
+            if (prodT.throwable != null)
+            {
+               throw prodT.throwable;
+            }
+         }
+
+      }
+      finally
+      {
+         try
+         {
+            session.close();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+
+         try
+         {
+            sf.close();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+
+         failureQueue.put(false);
+
+         failer.join();
+      }
+
+   }
+
+   final SimpleString ADDRESS = new SimpleString("address");
+
+   public void doSend2(final int order, final ClientSessionFactory sf, final LinkedBlockingDeque<Boolean> failureQueue) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 500;
+
+      final int numSessions = 100;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+
+         // failureQueue.push(true);
+
+         ClientSession sessConsume = sf.createSession(false, true, true);
+
+         sessConsume.createQueue(ADDRESS, subName, null, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, true, true);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+                                                        false,
+                                                        0,
+                                                        System.currentTimeMillis(),
+                                                        (byte)1);
+
+         if (i % 10 == 0)
+         {
+            // failureQueue.push(true);
+         }
+         message.putIntProperty(new SimpleString("count"), i);
+         producer.send(message);
+      }
+
+      for (ClientSession session : sessions)
+      {
+         session.start();
+      }
+
+      class MyHandler implements MessageHandler
+      {
+         final CountDownLatch latch = new CountDownLatch(1);
+
+         volatile int count;
+
+         public void onMessage(final ClientMessage message)
+         {
+            if (count == numMessages)
+            {
+               Assert.fail("Too many messages");
+            }
+
+            if (message.getIntProperty("count") != count)
+            {
+               log.warn("Error on counter", new Exception("error on counter"));
+               System.exit(-1);
+            }
+            Assert.assertEquals(count, message.getObjectProperty(new SimpleString("count")));
+
+            count++;
+
+            if (count % 100 == 0)
+            {
+               failureQueue.push(true);
+            }
+
+            if (count == numMessages)
+            {
+               latch.countDown();
+            }
+         }
+      }
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         Assert.assertTrue(ok);
+      }
+
+      // failureQueue.push(true);
+
+      sessSend.close();
+
+      for (ClientSession session : sessions)
+      {
+
+         // failureQueue.push(true);
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+
+         failureQueue.push(true);
+
+         SimpleString subName = new SimpleString("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      if (server != null && server.isStarted())
+      {
+         server.stop();
+      }
+
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}
\ No newline at end of file



More information about the hornetq-commits mailing list