[jboss-cvs] JBoss Messaging SVN: r5648 - branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jan 16 00:34:40 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-16 00:34:40 -0500 (Fri, 16 Jan 2009)
New Revision: 5648

Modified:
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
Adding failover test on Page

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	2009-01-15 15:29:32 UTC (rev 5647)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	2009-01-16 05:34:40 UTC (rev 5648)
@@ -23,6 +23,8 @@
 package org.jboss.messaging.tests.integration.cluster.failover;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
@@ -31,10 +33,13 @@
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.impl.ClientSessionImpl;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -50,6 +55,7 @@
 {
 
    // Constants -----------------------------------------------------
+   private static final Logger log = Logger.getLogger(PagingFailoverTest.class);
 
    // Attributes ----------------------------------------------------
 
@@ -60,21 +66,21 @@
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
-   
-   
+
    public void testFailoverOnPaging() throws Exception
    {
       testPaging(true);
    }
-   
-   
+
    public void testReplicationOnPaging() throws Exception
    {
       testPaging(false);
    }
-   
+
    private void testPaging(boolean fail) throws Exception
    {
+      setUpFileBased(100 * 1024);
+
       ClientSession session = null;
       try
       {
@@ -156,7 +162,7 @@
             assertEquals(i, message.getBody().getInt());
 
          }
-         
+
          session.close();
          session = null;
 
@@ -186,6 +192,354 @@
 
    }
 
+   public void testMultithreadFailoverReplicationOnly() throws Throwable
+   {
+      setUpFileBased(100 * 1024, 20 * 1024);
+
+      int numberOfProducedMessages = multiThreadProducer(false);
+
+      System.out.println(numberOfProducedMessages + " messages produced");
+
+      int numberOfConsumedMessages = multiThreadConsumer(false, false);
+
+      assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+   }
+
+   public void testMultithreadFailoverOnProducing() throws Throwable
+   {
+      setUpFileBased(100 * 1024, 20 * 1024);
+
+      int numberOfProducedMessages = multiThreadProducer(true);
+
+      System.out.println(numberOfProducedMessages + " messages produced");
+
+      int numberOfConsumedMessages = multiThreadConsumer(true, false);
+
+      assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+   }
+
+   public void testMultithreadFailoverOnConsume() throws Throwable
+   {
+      setUpFileBased(100 * 1024, 20 * 1024);
+
+      int numberOfProducedMessages = multiThreadProducer(false);
+
+      System.out.println(numberOfProducedMessages + " messages produced");
+
+      int numberOfConsumedMessages = multiThreadConsumer(false, true);
+
+      assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+   }
+
+   /**
+    * @throws Exception
+    * @throws InterruptedException
+    * @throws Throwable
+    */
+   private int multiThreadConsumer(boolean connectedOnBackup, boolean fail) throws Exception,
+                                                                           InterruptedException,
+                                                                           Throwable
+   {
+      ClientSession session = null;
+      try
+      {
+         final AtomicInteger numberOfMessages = new AtomicInteger(0);
+
+         final int RECEIVE_TIMEOUT = 10000;
+
+         final ClientSessionFactory factory;
+         final PagingStore store;
+
+         if (connectedOnBackup)
+         {
+            factory = createBackupFactory();
+            store = this.backupService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+         }
+         else
+         {
+            factory = createFailoverFactory();
+            store = this.liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+         }
+
+         session = factory.createSession(false, true, true, false);
+
+         final int initialNumberOfPages = store.getNumberOfPages();
+
+         System.out.println("It has initially " + initialNumberOfPages);
+
+         final int THREAD_CONSUMERS = 20;
+
+         final CountDownLatch startFlag = new CountDownLatch(1);
+         final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_CONSUMERS);
+
+         class Consumer extends Thread
+         {
+            volatile Throwable e;
+
+            ClientSession session;
+
+            public Consumer() throws Exception
+            {
+               session = factory.createSession(false, true, true);
+            }
+
+            public void run()
+            {
+               boolean started = false;
+
+               try
+               {
+
+                  try
+                  {
+                     ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+                     session.start();
+
+                     alignSemaphore.countDown();
+
+                     started = true;
+
+                     startFlag.await();
+
+                     while (true)
+                     {
+                        ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+                        if (msg == null)
+                        {
+                           break;
+                        }
+
+                        if (numberOfMessages.incrementAndGet() % 1000 == 0)
+                        {
+                           System.out.println(numberOfMessages + " messages read");
+                        }
+
+                        msg.acknowledge();
+
+                        session.commit();
+                     }
+
+                  }
+                  finally
+                  {
+                     session.close();
+                  }
+               }
+               catch (Throwable e)
+               {
+                  log.error(e.getMessage(), e);
+                  if (!started)
+                  {
+                     alignSemaphore.countDown();
+                  }
+                  this.e = e;
+               }
+            }
+         }
+
+         Consumer[] consumers = new Consumer[THREAD_CONSUMERS];
+
+         for (int i = 0; i < THREAD_CONSUMERS; i++)
+         {
+            consumers[i] = new Consumer();
+         }
+
+         for (int i = 0; i < THREAD_CONSUMERS; i++)
+         {
+            consumers[i].start();
+         }
+
+         alignSemaphore.await();
+
+         startFlag.countDown();
+
+         if (fail)
+         {
+            Thread.sleep(2000);
+            while (store.getNumberOfPages() == initialNumberOfPages)
+            {
+               Thread.sleep(500);
+            }
+
+            System.out.println("The system has already depaged " + (initialNumberOfPages - store.getNumberOfPages()) +
+                               ", failing now");
+
+            fail(session);
+         }
+
+         for (Thread t : consumers)
+         {
+            t.join();
+         }
+
+         for (Consumer p : consumers)
+         {
+            if (p.e != null)
+            {
+               throw p.e;
+            }
+         }
+
+         return numberOfMessages.intValue();
+      }
+      finally
+      {
+         if (session != null)
+         {
+            try
+            {
+               session.close();
+            }
+            catch (Exception ignored)
+            {
+            }
+         }
+      }
+   }
+
+   /**
+    * @throws Exception
+    * @throws MessagingException
+    * @throws InterruptedException
+    * @throws Throwable
+    */
+   private int multiThreadProducer(boolean failover) throws Exception,
+                                                    MessagingException,
+                                                    InterruptedException,
+                                                    Throwable
+   {
+
+      final AtomicInteger numberOfMessages = new AtomicInteger(0);
+      final PagingStore store = this.liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+
+      final ClientSessionFactory factory = createFailoverFactory();
+
+      ClientSession session = factory.createSession(false, true, true, false);
+      try
+      {
+         session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+         // Block on everything ,just because we want to stop sending as soon as the destination gets in page-mode
+         factory.setBlockOnAcknowledge(true);
+         factory.setBlockOnNonPersistentSend(true);
+         factory.setBlockOnPersistentSend(true);
+
+         final int THREAD_PRODUCERS = 20;
+
+         final CountDownLatch startFlag = new CountDownLatch(1);
+         final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_PRODUCERS);
+         final CountDownLatch flagPaging = new CountDownLatch(THREAD_PRODUCERS);
+
+         class Producer extends Thread
+         {
+            volatile Throwable e;
+
+            public void run()
+            {
+               boolean started = false;
+               try
+               {
+                  ClientSession session = factory.createSession(false, true, true);
+                  try
+                  {
+                     ClientProducer producer = session.createProducer(ADDRESS);
+
+                     alignSemaphore.countDown();
+
+                     started = true;
+                     startFlag.await();
+
+                     while (!store.isPaging())
+                     {
+
+                        ClientMessage msg = session.createClientMessage(true);
+
+                        producer.send(msg);
+                        numberOfMessages.incrementAndGet();
+                     }
+
+                     flagPaging.countDown();
+
+                     for (int i = 0; i < 10000; i++)
+                     {
+
+                        ClientMessage msg = session.createClientMessage(true);
+
+                        producer.send(msg);
+                        numberOfMessages.incrementAndGet();
+
+                     }
+
+                  }
+                  finally
+                  {
+                     session.close();
+                  }
+               }
+               catch (Throwable e)
+               {
+                  log.error(e.getMessage(), e);
+                  if (!started)
+                  {
+                     alignSemaphore.countDown();
+                  }
+                  flagPaging.countDown();
+                  this.e = e;
+               }
+            }
+         }
+
+         Producer[] producers = new Producer[THREAD_PRODUCERS];
+
+         for (int i = 0; i < THREAD_PRODUCERS; i++)
+         {
+            producers[i] = new Producer();
+            producers[i].start();
+         }
+
+         alignSemaphore.await();
+
+         // Start producing only when all the sessions are opened
+         startFlag.countDown();
+
+         if (failover)
+         {
+            flagPaging.await(); // for this test I want everybody on the paging part
+
+            Thread.sleep(1500);
+
+            fail(session);
+
+         }
+
+         for (Thread t : producers)
+         {
+            t.join();
+         }
+
+         for (Producer p : producers)
+         {
+            if (p.e != null)
+            {
+               throw p.e;
+            }
+         }
+
+         return numberOfMessages.intValue();
+
+      }
+      finally
+      {
+         session.close();
+         InVMConnector.resetFailures();
+      }
+
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -195,11 +549,20 @@
       super.tearDown();
    }
 
+   protected void fail(final ClientSession session) throws Exception
+   {
+      RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+
+      InVMConnector.numberOfFailures = 1;
+      InVMConnector.failOnCreateConnection = true;
+      System.out.println("Forcing a failure");
+      conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+
+   }
+
    protected void setUp() throws Exception
    {
       super.setUp();
-      setUpFileBased(100 * 1024);
-
    }
 
    // Private -------------------------------------------------------




More information about the jboss-cvs-commits mailing list