[jboss-cvs] JBoss Messaging SVN: r6164 - trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Mar 25 11:41:59 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-03-25 11:41:58 -0400 (Wed, 25 Mar 2009)
New Revision: 6164

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java
Modified:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1558 - Adding test

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java	2009-03-25 10:41:11 UTC (rev 6163)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java	2009-03-25 15:41:58 UTC (rev 6164)
@@ -36,7 +36,7 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
-import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.tests.util.ServiceTestBase;
 
 /**
  * A MultiThreadFailoverSupport
@@ -48,7 +48,7 @@
  *
  *
  */
-public abstract class MultiThreadFailoverSupport extends UnitTestCase
+public abstract class MultiThreadFailoverSupport extends ServiceTestBase
 {
 
    // Constants -----------------------------------------------------
@@ -57,7 +57,7 @@
 
    // Attributes ----------------------------------------------------
 
-   protected Timer timer;
+   private Timer timer;
 
    // Static --------------------------------------------------------
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-03-25 10:41:11 UTC (rev 6163)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-03-25 15:41:58 UTC (rev 6164)
@@ -26,7 +26,6 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.Timer;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -77,8 +76,6 @@
 
    protected final Map<String, Object> backupParams = new HashMap<String, Object>();
 
-   protected Timer timer;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -1299,8 +1296,6 @@
       super.setUp();
 
       log.info("************ Starting test " + getName());
-
-      timer = new Timer();
    }
 
    @Override
@@ -1316,7 +1311,6 @@
       {
          backupService.stop();
       }
-      timer.cancel();
 
       super.tearDown();
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java	2009-03-25 10:41:11 UTC (rev 6163)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java	2009-03-25 15:41:58 UTC (rev 6164)
@@ -28,10 +28,12 @@
 import java.util.concurrent.TimeUnit;
 
 import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
 import org.jboss.messaging.core.paging.Page;
 import org.jboss.messaging.core.paging.PagedMessage;
 import org.jboss.messaging.core.paging.PagingManager;
@@ -63,23 +65,45 @@
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
-   
-   public void testPageOrderingLiveAndBackup() throws Exception
+   public void testPageOrderingLiveAndBackupProducerOnly() throws Exception
    {
+      internalTestPageOrderingLiveAndBackup(false);
+   }
+
+   public void testPageOrderingLiveAndBackupConsume() throws Exception
+   {
+      internalTestPageOrderingLiveAndBackup(true);
+   }
+
+   private void internalTestPageOrderingLiveAndBackup(boolean consumeMessages) throws Exception
+   {
       final SimpleString threadIDKey = new SimpleString("THREAD_ID");
       final SimpleString sequenceIDKey = new SimpleString("SEQUENCE_ID");
       final SimpleString ADDRESS = new SimpleString("SOME_QUEUE");
 
       final int NUMBER_OF_THREADS = 100;
+      final int NUMBER_OF_MESSAGES = 200;
 
+      final int NUMBER_OF_HANDLERS = consumeMessages ? NUMBER_OF_THREADS : 0;
+
       setUpFailoverServers(true, 100 * 1024, 50 * 1024);
 
       final ClientSessionFactory factory = createFailoverFactory();
 
       ClientSession session = factory.createSession(false, true, true);
-      session.createQueue(ADDRESS, ADDRESS, true);
+      for (int i = 0; i < NUMBER_OF_THREADS; i++)
+      {
+         session.createQueue(ADDRESS, ADDRESS.concat("-" + i), true);
+      }
       session.close();
 
+      MyHandler handlers[] = new MyHandler[NUMBER_OF_HANDLERS];
+
+      for (int i = 0; i < handlers.length; i++)
+      {
+         handlers[i] = new MyHandler(factory, ADDRESS.concat("-" + i), NUMBER_OF_MESSAGES * 10);
+      }
+
       final CountDownLatch flagAlign = new CountDownLatch(NUMBER_OF_THREADS);
       final CountDownLatch flagStart = new CountDownLatch(1);
 
@@ -105,7 +129,7 @@
                flagAlign.countDown();
                flagStart.await();
 
-               for (int i = 0; i < 200; i++)
+               for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
                {
                   ClientMessage msg = session.createClientMessage(true);
                   msg.setBody(ChannelBuffers.wrappedBuffer(new byte[512]));
@@ -152,12 +176,37 @@
          }
       }
 
+      Thread.sleep(5000);
+
+      for (MyHandler handler : handlers)
+      {
+         handler.close();
+         if (handler.failure != null)
+         {
+            throw new Exception("Failure on consumer", handler.failure);
+         }
+      }
+
       PagingManager livePagingManager = liveService.getServer().getPostOffice().getPagingManager();
       PagingManager backupPagingManager = backupService.getServer().getPostOffice().getPagingManager();
 
       TestSupportPageStore livePagingStore = (TestSupportPageStore)livePagingManager.getPageStore(ADDRESS);
       TestSupportPageStore backupPagingStore = (TestSupportPageStore)backupPagingManager.getPageStore(ADDRESS);
 
+      System.out.println("Pages: " + livePagingStore.getNumberOfPages() +
+                         " on backup: " +
+                         backupPagingStore.getNumberOfPages());
+
+
+      if (consumeMessages)
+      {
+         if (livePagingStore.getNumberOfPages() == backupPagingStore.getNumberOfPages() - 1)
+         {
+            // The live node may have one extra page in front of the backup
+            backupPagingStore.depage();
+         }
+      }
+
       assertEquals(livePagingStore.getNumberOfPages(), backupPagingStore.getNumberOfPages());
 
       Page livePage = null;
@@ -169,6 +218,7 @@
 
          if (livePage == null)
          {
+            assertNull(backupPagingStore.depage());
             break;
          }
 
@@ -193,7 +243,7 @@
          {
             PagedMessage backupMsg = backupIterator.next();
             assertNotNull(backupMsg);
-            
+
             ServerMessage liveSrvMsg = liveMsg.getMessage(null);
             ServerMessage backupSrvMsg = liveMsg.getMessage(null);
 
@@ -213,4 +263,69 @@
 
    // Inner classes -------------------------------------------------
 
+   class MyHandler implements MessageHandler
+   {
+      final ClientSession session;
+
+      final ClientConsumer consumer;
+
+      volatile boolean started = true;
+
+      final int msgs;
+
+      volatile int receivedMsgs = 0;
+
+      final CountDownLatch latch;
+
+      Throwable failure;
+
+      MyHandler(ClientSessionFactory sf, SimpleString address, final int msgs) throws Exception
+      {
+         this.session = sf.createSession(null, null, false, true, true, false, 0);
+         this.consumer = session.createConsumer(address);
+         consumer.setMessageHandler(this);
+         this.session.start();
+         this.msgs = msgs;
+         latch = new CountDownLatch(msgs);
+      }
+
+      public synchronized void close() throws Exception
+      {
+         session.close();
+      }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
+       */
+      public synchronized void onMessage(ClientMessage message)
+      {
+         try
+         {
+            if (!started)
+            {
+               throw new IllegalStateException("Stopped Handler received message");
+            }
+
+            if (receivedMsgs++ == msgs)
+            {
+               System.out.println("done");
+               started = false;
+               session.stop();
+            }
+
+            message.acknowledge();
+
+            if (!started)
+            {
+               latch.countDown();
+            }
+
+         }
+         catch (Throwable e)
+         {
+            this.failure = e;
+         }
+      }
+
+   }
 }

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java	2009-03-25 15:41:58 UTC (rev 6164)
@@ -0,0 +1,618 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.jms.client.JBossBytesMessage;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A PagingFailoverMultiThreadTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PagingFailoverMultiThreadTest extends MultiThreadFailoverSupport
+{
+
+   // Constants -----------------------------------------------------
+   private static final int RECEIVE_TIMEOUT = 20000;
+
+
+   final int PAGE_SIZE = 512;
+
+   final int MAX_GLOBAL = 40 * PAGE_SIZE;
+
+   final boolean CREATE_AT_START = true;
+
+   private final int LATCH_WAIT = 50000;
+
+   private final int NUM_THREADS = 10;
+
+   private final int NUM_SESSIONS = 10;
+
+   private final Logger log = Logger.getLogger(this.getClass());
+
+   // Attributes ----------------------------------------------------
+
+   protected static final SimpleString ADDRESS_GLOBAL = new SimpleString("FailoverTestAddress");
+
+   protected MessagingService liveService;
+
+   protected MessagingService backupService;
+
+   protected final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testFoo()
+   {
+      
+   }
+   
+   // Currently disabled - https://jira.jboss.org/jira/browse/JBMESSAGING-1558
+   public void disabled_testB() throws Exception
+   {
+
+      runMultipleThreadsFailoverTest(new RunnableT()
+      {
+         @Override
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestB(sf, threadNum);
+         }
+      }, NUM_THREADS, 20, false, 1000);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void setBody(final ClientMessage message) throws Exception
+   {
+      message.getBody().writeBytes(new byte[256]);
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadRandomFailoverTestBase#checkSize(org.jboss.messaging.core.client.ClientMessage)
+    */
+   protected boolean checkSize(final ClientMessage message)
+   {
+      return 256 == message.getBody().writerIndex();
+   }
+
+   protected SimpleString createAddressName(int threadNum)
+   {
+      return ADDRESS_GLOBAL.concat("_thread-" + threadNum);
+   }
+
+   protected SimpleString createSubName(int thread, int sequence)
+   {
+      return new SimpleString(thread + "sub" + sequence);
+   }
+
+   protected void doTestB(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      SimpleString ADDRESS = createAddressName(threadNum);
+
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 1;
+
+      Set<MyInfo> infos = new HashSet<MyInfo>();
+
+      for (int i = 0; i < NUM_SESSIONS; i++)
+      {
+         SimpleString subName = createSubName(threadNum, i);
+
+         ClientSession sessConsume = sf.createSession(null, null, false, true, true, false, 0);
+
+         if (!CREATE_AT_START)
+         {
+            sessConsume.createQueue(ADDRESS, subName, null, true, false);
+         }
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         infos.add(new MyInfo(sessConsume, consumer));
+      }
+
+      ClientSession sessSend = sf.createSession(false, true, true);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      sendMessages(sessSend, producer, numMessages, threadNum);
+
+      for (MyInfo info : infos)
+      {
+         info.session.start();
+      }
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (MyInfo info : infos)
+      {
+         MyHandler handler = new MyHandler(threadNum, numMessages, info.session, info.consumer);
+
+         handler.start();
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+         if (!ok)
+         {
+            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+                                " threadnum " +
+                                threadNum);
+         }
+
+         if (handler.failure != null)
+         {
+            throw new Exception("Handler failed: " + handler.failure);
+         }
+
+         assertNull(handler.consumer.receive(250));
+      }
+
+      sessSend.close();
+
+      for (MyInfo info : infos)
+      {
+         info.session.close();
+      }
+
+      if (!CREATE_AT_START)
+      {
+         for (int i = 0; i < numSessions; i++)
+         {
+            SimpleString subName = new SimpleString(threadNum + "sub" + i);
+   
+            s.deleteQueue(subName);
+         }
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+
+   }
+
+   protected void stop() throws Exception
+   {
+      backupService.stop();
+
+      liveService.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+
+   }
+
+   private void sendMessages(final ClientSession sessSend,
+                             final ClientProducer producer,
+                             final int numMessages,
+                             final int threadNum) throws Exception
+   {
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createClientMessage(JBossBytesMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.putIntProperty(new SimpleString("threadnum"), threadNum);
+         message.putIntProperty(new SimpleString("count"), i);
+         setBody(message);
+         producer.send(message);
+      }
+   }
+
+   private void consumeMessages(final Set<ClientConsumer> consumers, final int numMessages, final int threadNum) throws Exception
+   {
+      // We make sure the messages arrive in the order they were sent from a particular producer
+      Map<ClientConsumer, Map<Integer, Integer>> counts = new HashMap<ClientConsumer, Map<Integer, Integer>>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            Map<Integer, Integer> consumerCounts = counts.get(consumer);
+
+            if (consumerCounts == null)
+            {
+               consumerCounts = new HashMap<Integer, Integer>();
+               counts.put(consumer, consumerCounts);
+            }
+
+            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+            assertNotNull(msg);
+
+            int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
+            int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+
+            Integer c = consumerCounts.get(tn);
+            if (c == null)
+            {
+               c = new Integer(cnt);
+            }
+
+            if (tn == threadNum && cnt != c.intValue())
+            {
+               throw new Exception("Invalid count, expected " + tn + ": " + c + " got " + cnt);
+            }
+
+            c++;
+
+            // Wrap
+            if (c == numMessages)
+            {
+               c = 0;
+            }
+
+            consumerCounts.put(tn, c);
+
+            msg.acknowledge();
+         }
+      }
+   }
+
+   /**
+    * @return
+    */
+   protected ClientSessionFactoryInternal createSessionFactory()
+   {
+      final ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                                           new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                                      backupParams),
+                                                                           0,
+                                                                           1,
+                                                                           ClientSessionFactoryImpl.DEFAULT_INITIAL_CONNECT_ATTEMPTS,
+                                                                           ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS);
+
+      sf.setSendWindowSize(32 * 1024);
+      return sf;
+   }
+
+   @Override
+   protected void start() throws Exception
+   {
+      setUpFailoverServers(true, MAX_GLOBAL, PAGE_SIZE);
+
+      if (CREATE_AT_START)
+      {
+         // TODO: Remove this part here
+         ClientSessionFactory sf = createSessionFactory();
+
+         ClientSession session = sf.createSession(false, true, true);
+
+         for (int threadNum = 0; threadNum < NUM_THREADS; threadNum++)
+         {
+            SimpleString ADDRESS = createAddressName(threadNum);
+
+            for (int i = 0; i < NUM_SESSIONS; i++)
+            {
+               SimpleString subName = createSubName(threadNum, i);
+               session.createQueue(ADDRESS, subName, null, true, false);
+            }
+         }
+         session.close();
+
+      }
+
+   }
+
+   protected void setUpFailoverServers(boolean fileBased, final long maxGlobalSize, final int pageSize) throws Exception
+   {
+      deleteDirectory(new File(getTestDir()));
+
+      Configuration backupConf = new ConfigurationImpl();
+      backupConf.setSecurityEnabled(false);
+      backupConf.setClustered(true);
+      backupConf.setBackup(true);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), backupParams));
+
+      if (fileBased)
+      {
+         clearData(getTestDir() + "/backup");
+
+         backupConf.setJournalDirectory(getJournalDir(getTestDir() + "/backup"));
+         backupConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/backup"));
+         backupConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/backup"));
+         backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+         backupConf.setJournalFileSize(100 * 1024);
+
+         backupConf.setJournalType(JournalType.NIO);
+
+         backupConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
+         backupConf.setPagingGlobalWatermarkSize(pageSize);
+         backupService = Messaging.newMessagingService(backupConf);
+      }
+      else
+      {
+         backupService = Messaging.newNullStorageMessagingService(backupConf);
+      }
+
+      backupService.start();
+
+      Thread.sleep(20);
+
+      Configuration liveConf = new ConfigurationImpl();
+      liveConf.setSecurityEnabled(false);
+      liveConf.setClustered(true);
+
+      TransportConfiguration liveTC = new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName());
+      liveConf.getAcceptorConfigurations().add(liveTC);
+
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+      TransportConfiguration backupTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY,
+                                                                   backupParams,
+                                                                   "backup-connector");
+      connectors.put(backupTC.getName(), backupTC);
+      liveConf.setConnectorConfigurations(connectors);
+      liveConf.setBackupConnectorName(backupTC.getName());
+
+      if (fileBased)
+      {
+         liveConf.setJournalDirectory(getJournalDir(getTestDir() + "/live"));
+         liveConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/live"));
+         liveConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/live"));
+         liveConf.setPagingDirectory(getPageDir(getTestDir() + "/live"));
+
+         liveConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
+         liveConf.setPagingGlobalWatermarkSize(pageSize);
+         liveConf.setJournalFileSize(100 * 1024);
+
+         liveConf.setJournalType(JournalType.NIO);
+      }
+
+      if (fileBased)
+      {
+         liveService = Messaging.newMessagingService(liveConf);
+      }
+      else
+      {
+         liveService = Messaging.newNullStorageMessagingService(liveConf);
+      }
+
+      AddressSettings settings = new AddressSettings();
+      settings.setPageSizeBytes(pageSize);
+
+      liveService.getServer().getAddressSettingsRepository().addMatch("#", settings);
+      backupService.getServer().getAddressSettingsRepository().addMatch("#", settings);
+
+      clearData(getTestDir() + "/live");
+
+      liveService.start();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+   private class MyInfo
+   {
+      final ClientSession session;
+
+      final ClientConsumer consumer;
+
+      public MyInfo(final ClientSession session, final ClientConsumer consumer)
+      {
+         this.session = session;
+         this.consumer = consumer;
+      }
+   }
+
+   private class MyHandler implements MessageHandler
+   {
+      CountDownLatch latch = new CountDownLatch(1);
+
+      private final Map<Integer, Integer> counts = new HashMap<Integer, Integer>();
+
+      volatile String failure;
+
+      final int tn;
+
+      final int numMessages;
+
+      final ClientSession session;
+
+      final ClientConsumer consumer;
+
+      volatile Xid xid;
+
+      volatile boolean done;
+
+      volatile boolean started = false;
+
+      volatile boolean commit = false;
+
+      synchronized void start() throws Exception
+      {
+         counts.clear();
+
+         done = false;
+
+         failure = null;
+
+         latch = new CountDownLatch(1);
+
+         started = true;
+         consumer.setMessageHandler(this);
+         session.start();
+      }
+
+      synchronized void stop() throws Exception
+      {
+         session.stop();
+         // FIXME: Remove this line when https://jira.jboss.org/jira/browse/JBMESSAGING-1549 is done
+         consumer.setMessageHandler(null);
+         started = false;
+      }
+
+      synchronized void close() throws Exception
+      {
+         stop();
+         session.close();
+      }
+
+      MyHandler(final int threadNum, final int numMessages, final ClientSession session, final ClientConsumer consumer) throws Exception
+      {
+         tn = threadNum;
+
+         this.numMessages = numMessages;
+
+         this.session = session;
+
+         this.consumer = consumer;
+
+      }
+
+      public void setCommitOnComplete(boolean commit)
+      {
+         this.commit = commit;
+      }
+
+      public synchronized void onMessage(final ClientMessage message)
+      {
+
+         if (!started)
+         {
+            this.failure = "Received message with session stopped (thread = " + tn + ")";
+            log.error(failure);
+            return;
+         }
+
+         // log.info("*** handler got message");
+         try
+         {
+            message.acknowledge();
+         }
+         catch (MessagingException me)
+         {
+            log.error("Failed to process", me);
+         }
+
+         if (done)
+         {
+            return;
+         }
+
+         int threadNum = (Integer)message.getProperty(new SimpleString("threadnum"));
+         int cnt = (Integer)message.getProperty(new SimpleString("count"));
+
+         Integer c = counts.get(threadNum);
+         if (c == null)
+         {
+            c = new Integer(cnt);
+         }
+
+         // log.info(System.identityHashCode(this) + " consumed message " + threadNum + ":" + cnt);
+
+         if (tn == threadNum && cnt != c.intValue())
+         {
+            failure = "Invalid count, expected " + threadNum + ":" + c + " got " + cnt;
+            log.error(failure);
+
+            latch.countDown();
+         }
+
+         if (!checkSize(message))
+         {
+            failure = "Invalid size on message";
+            log.error(failure);
+            latch.countDown();
+         }
+
+         if (tn == threadNum && c == numMessages - 1)
+         {
+            done = true;
+            try
+            {
+               this.stop();
+            }
+            catch (Exception e)
+            {
+               this.failure = e.getMessage();
+               e.printStackTrace();
+            }
+            latch.countDown();
+         }
+
+         c++;
+         // Wrap around at numMessages
+         if (c == numMessages)
+         {
+            c = 0;
+         }
+
+         counts.put(threadNum, c);
+
+      }
+   }
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	2009-03-25 10:41:11 UTC (rev 6163)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	2009-03-25 15:41:58 UTC (rev 6164)
@@ -38,6 +38,7 @@
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
+import org.jboss.messaging.tests.util.RandomUtil;
 import org.jboss.messaging.utils.SimpleString;
 
 /**
@@ -375,7 +376,8 @@
 
          if (fail)
          {
-            Thread.sleep(1000);
+            // Fail after some time
+            Thread.sleep((long)(1000 * RandomUtil.randomDouble()));
             while (store.getNumberOfPages() == initialNumberOfPages)
             {
                Thread.sleep(100);




More information about the jboss-cvs-commits mailing list