[jboss-cvs] JBoss Messaging SVN: r5652 - branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jan 16 10:50:10 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-16 10:50:08 -0500 (Fri, 16 Jan 2009)
New Revision: 5652

Modified:
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java
Log:
Just saving a backup before I mess up a lot the test for debugging 

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2009-01-16 06:04:54 UTC (rev 5651)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2009-01-16 15:50:08 UTC (rev 5652)
@@ -24,6 +24,8 @@
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientFileMessage;
@@ -33,8 +35,12 @@
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.impl.ClientSessionImpl;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -51,6 +57,8 @@
 
    // Constants -----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(LargeMessageFailoverTest.class);
+
    // Attributes ----------------------------------------------------
 
    private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
@@ -190,7 +198,321 @@
       }.start();
    }
 
+   
    /**
+    * @throws Exception
+    * @throws InterruptedException
+    * @throws Throwable
+    */
+   private int multiThreadConsume(final boolean connectedOnBackup, final boolean fail) throws Exception,
+                                                                                       InterruptedException,
+                                                                                       Throwable
+   {
+      ClientSession session = null;
+      try
+      {
+         final AtomicInteger numberOfMessages = new AtomicInteger(0);
+
+         final int RECEIVE_TIMEOUT = 10000;
+
+         final ClientSessionFactory factory;
+         final PagingStore store;
+
+         if (connectedOnBackup)
+         {
+            factory = createBackupFactory();
+            store = backupService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+         }
+         else
+         {
+            factory = createFailoverFactory();
+            store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+         }
+
+         session = factory.createSession(false, true, true, false);
+
+         final int initialNumberOfPages = store.getNumberOfPages();
+
+         System.out.println("It has initially " + initialNumberOfPages);
+
+         final int THREAD_CONSUMERS = 20;
+
+         final CountDownLatch startFlag = new CountDownLatch(1);
+         final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_CONSUMERS);
+
+         class Consumer extends Thread
+         {
+            volatile Throwable e;
+
+            ClientSession session;
+
+            public Consumer() throws Exception
+            {
+               session = factory.createSession(null, null, false, true, true, false, 0);
+            }
+
+            @Override
+            public void run()
+            {
+               boolean started = false;
+
+               try
+               {
+
+                  try
+                  {
+                     ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+                     session.start();
+
+                     alignSemaphore.countDown();
+
+                     started = true;
+
+                     startFlag.await();
+
+                     while (true)
+                     {
+                        ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+                        if (msg == null)
+                        {
+                           break;
+                        }
+
+                        if (numberOfMessages.incrementAndGet() % 1000 == 0)
+                        {
+                           System.out.println(numberOfMessages + " messages read");
+                        }
+
+                        msg.acknowledge();
+                     }
+
+                  }
+                  finally
+                  {
+                     session.close();
+                  }
+               }
+               catch (Throwable e)
+               {
+                  log.error(e.getMessage(), e);
+                  if (!started)
+                  {
+                     alignSemaphore.countDown();
+                  }
+                  this.e = e;
+               }
+            }
+         }
+
+         Consumer[] consumers = new Consumer[THREAD_CONSUMERS];
+
+         for (int i = 0; i < THREAD_CONSUMERS; i++)
+         {
+            consumers[i] = new Consumer();
+         }
+
+         for (int i = 0; i < THREAD_CONSUMERS; i++)
+         {
+            consumers[i].start();
+         }
+
+         alignSemaphore.await();
+
+         startFlag.countDown();
+
+         if (fail)
+         {
+            Thread.sleep(1000);
+            while (store.getNumberOfPages() == initialNumberOfPages)
+            {
+               Thread.sleep(100);
+            }
+
+            System.out.println("The system has already depaged " + (initialNumberOfPages - store.getNumberOfPages()) +
+                               ", failing now");
+
+            fail(session);
+         }
+
+         for (Thread t : consumers)
+         {
+            t.join();
+         }
+
+         for (Consumer p : consumers)
+         {
+            if (p.e != null)
+            {
+               throw p.e;
+            }
+         }
+
+         return numberOfMessages.intValue();
+      }
+      finally
+      {
+         if (session != null)
+         {
+            try
+            {
+               session.close();
+            }
+            catch (Exception ignored)
+            {
+            }
+         }
+      }
+   }
+
+   /**
+    * @throws Exception
+    * @throws MessagingException
+    * @throws InterruptedException
+    * @throws Throwable
+    */
+   private int multiThreadProducer(final boolean failover) throws Exception,
+                                                          MessagingException,
+                                                          InterruptedException,
+                                                          Throwable
+   {
+
+      final AtomicInteger numberOfMessages = new AtomicInteger(0);
+      final PagingStore store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+
+      final ClientSessionFactory factory = createFailoverFactory();
+
+      ClientSession session = factory.createSession(false, true, true, false);
+      try
+      {
+         session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+         final int THREAD_PRODUCERS = 30;
+
+         final CountDownLatch startFlag = new CountDownLatch(1);
+         final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_PRODUCERS);
+         final CountDownLatch flagPaging = new CountDownLatch(THREAD_PRODUCERS);
+
+         class Producer extends Thread
+         {
+            volatile Throwable e;
+
+            @Override
+            public void run()
+            {
+               boolean started = false;
+               try
+               {
+                  ClientSession session = factory.createSession(false, true, true);
+                  try
+                  {
+                     ClientProducer producer = session.createProducer(ADDRESS);
+
+                     alignSemaphore.countDown();
+
+                     started = true;
+                     startFlag.await();
+
+                     while (!store.isPaging())
+                     {
+
+                        ClientMessage msg = session.createClientMessage(true);
+
+                        producer.send(msg);
+                        numberOfMessages.incrementAndGet();
+                     }
+
+                     flagPaging.countDown();
+
+                     for (int i = 0; i < 1000; i++)
+                     {
+
+                        ClientMessage msg = session.createClientMessage(true);
+
+                        producer.send(msg);
+                        numberOfMessages.incrementAndGet();
+
+                     }
+
+                  }
+                  finally
+                  {
+                     session.close();
+                  }
+               }
+               catch (Throwable e)
+               {
+                  log.error(e.getMessage(), e);
+                  if (!started)
+                  {
+                     alignSemaphore.countDown();
+                  }
+                  flagPaging.countDown();
+                  this.e = e;
+               }
+            }
+         }
+
+         Producer[] producers = new Producer[THREAD_PRODUCERS];
+
+         for (int i = 0; i < THREAD_PRODUCERS; i++)
+         {
+            producers[i] = new Producer();
+            producers[i].start();
+         }
+
+         alignSemaphore.await();
+
+         // Start producing only when all the sessions are opened
+         startFlag.countDown();
+
+         if (failover)
+         {
+            flagPaging.await(); // for this test I want everybody on the paging part
+
+            Thread.sleep(1500);
+
+            fail(session);
+
+         }
+
+         for (Thread t : producers)
+         {
+            t.join();
+         }
+
+         for (Producer p : producers)
+         {
+            if (p.e != null)
+            {
+               throw p.e;
+            }
+         }
+
+         return numberOfMessages.intValue();
+
+      }
+      finally
+      {
+         session.close();
+         InVMConnector.resetFailures();
+      }
+
+   }
+
+   private void fail(final ClientSession session) throws Exception
+   {
+      RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+
+      InVMConnector.numberOfFailures = 1;
+      InVMConnector.failOnCreateConnection = true;
+      System.out.println("Forcing a failure");
+      conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+
+   }
+
+   
+   /**
     * @param factory
     * @param placeToFail
     * @param numberOfMessages

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java	2009-01-16 06:04:54 UTC (rev 5651)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java	2009-01-16 15:50:08 UTC (rev 5652)
@@ -12,6 +12,8 @@
 
 package org.jboss.messaging.tests.integration.cluster.failover;
 
+import java.io.File;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -23,8 +25,6 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import junit.framework.TestCase;
-
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
@@ -39,13 +39,17 @@
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
 import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
 import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossBytesMessage;
 import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -53,8 +57,13 @@
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  */
-public class MultiThreadRandomFailoverTest extends TestCase
+public class MultiThreadRandomFailoverTest extends UnitTestCase
 {
+   /**
+    * 
+    */
+   private static final int DEFAULT_MESSAGE_SIZE = 1024;
+
    private static final Logger log = Logger.getLogger(MultiThreadRandomFailoverTest.class);
 
    // Constants -----------------------------------------------------
@@ -88,7 +97,7 @@
          {
             doTestA(sf, threadNum);
          }
-      }, NUM_THREADS);
+      }, NUM_THREADS, false);
    }
 
    public void testB() throws Exception
@@ -99,7 +108,7 @@
          {
             doTestB(sf, threadNum);
          }
-      }, NUM_THREADS);
+      }, NUM_THREADS, false);
    }
 
    public void testC() throws Exception
@@ -110,7 +119,7 @@
          {
             doTestC(sf, threadNum);
          }
-      }, NUM_THREADS);
+      }, NUM_THREADS, false);
    }
 
    public void testD() throws Exception
@@ -121,7 +130,7 @@
          {
             doTestD(sf, threadNum);
          }
-      }, NUM_THREADS);
+      }, NUM_THREADS, false);
    }
 
    public void testE() throws Exception
@@ -132,7 +141,7 @@
          {
             doTestE(sf, threadNum);
          }
-      }, NUM_THREADS);
+      }, NUM_THREADS, false);
    }
 
    public void testF() throws Exception
@@ -143,7 +152,7 @@
          {
             doTestF(sf, threadNum);
          }
-      }, NUM_THREADS);
+      }, NUM_THREADS, false);
    }
 
    public void testG() throws Exception
@@ -154,7 +163,7 @@
          {
             doTestG(sf, threadNum);
          }
-      }, NUM_THREADS);
+      }, NUM_THREADS, false);
    }
 
    public void testH() throws Exception
@@ -165,7 +174,7 @@
          {
             doTestH(sf, threadNum);
          }
-      }, NUM_THREADS);
+      }, NUM_THREADS, false);
    }
 
    public void testI() throws Exception
@@ -176,7 +185,7 @@
          {
             doTestI(sf, threadNum);
          }
-      }, NUM_THREADS);
+      }, NUM_THREADS, false);
    }
 
    public void testJ() throws Exception
@@ -187,7 +196,7 @@
          {
             doTestJ(sf, threadNum);
          }
-      }, NUM_THREADS);
+      }, NUM_THREADS, false);
    }
 
    public void testK() throws Exception
@@ -198,7 +207,7 @@
          {
             doTestK(sf, threadNum);
          }
-      }, NUM_THREADS);
+      }, NUM_THREADS, false);
    }
 
    public void testL() throws Exception
@@ -209,7 +218,7 @@
          {
             doTestL(sf);
          }
-      }, NUM_THREADS);
+      }, NUM_THREADS, false);
    }
 
    // public void testM() throws Exception
@@ -231,9 +240,20 @@
          {
             doTestN(sf, threadNum);
          }
-      }, NUM_THREADS);
+      }, NUM_THREADS, false);
    }
 
+   public void testLargeMessage() throws Exception
+   {
+      runTestMultipleThreads(new RunnableT()
+      {
+         public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+         {
+            doTestLargeMessage(sf, threadNum);
+         }
+      }, NUM_THREADS, true);
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -272,7 +292,7 @@
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
-      sendMessages(sessSend, producer, numMessages, threadNum);
+      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
 
       Set<MyHandler> handlers = new HashSet<MyHandler>();
 
@@ -354,7 +374,7 @@
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
-      sendMessages(sessSend, producer, numMessages, threadNum);
+      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
 
       for (ClientSession session : sessions)
       {
@@ -445,11 +465,11 @@
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
-      sendMessages(sessSend, producer, numMessages, threadNum);
+      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
 
       sessSend.rollback();
 
-      sendMessages(sessSend, producer, numMessages, threadNum);
+      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
 
       sessSend.commit();
 
@@ -550,11 +570,11 @@
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
-      sendMessages(sessSend, producer, numMessages, threadNum);
+      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
 
       sessSend.rollback();
 
-      sendMessages(sessSend, producer, numMessages, threadNum);
+      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
 
       sessSend.commit();
 
@@ -692,7 +712,7 @@
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
-      sendMessages(sessSend, producer, numMessages, threadNum);
+      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
 
       consumeMessages(consumers, numMessages, threadNum);
 
@@ -748,7 +768,7 @@
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
-      sendMessages(sessSend, producer, numMessages, threadNum);
+      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
 
       for (ClientSession session : sessions)
       {
@@ -811,11 +831,11 @@
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
-      sendMessages(sessSend, producer, numMessages, threadNum);
+      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
 
       sessSend.rollback();
 
-      sendMessages(sessSend, producer, numMessages, threadNum);
+      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
 
       sessSend.commit();
 
@@ -885,11 +905,11 @@
 
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
-      sendMessages(sessSend, producer, numMessages, threadNum);
+      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
 
       sessSend.rollback();
 
-      sendMessages(sessSend, producer, numMessages, threadNum);
+      sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
 
       sessSend.commit();
 
@@ -1202,6 +1222,96 @@
       sessCreate.close();
    }
 
+   protected void doTestLargeMessage(final ClientSessionFactory sf, final int threadNum) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      sf.setMinLargeMessageSize(10 * 1024);
+      
+      sf.setSendWindowSize(1024*1024);
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int messageSize = 40 * 1024;
+      
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, true, true);
+
+         sessConsume.start();
+
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, true, true);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      sendMessages(sessSend, producer, numMessages, messageSize, threadNum);
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler(threadNum, numMessages, messageSize);
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(5000, TimeUnit.MILLISECONDS);
+
+         if (!ok)
+         {
+            throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+                                " threadnum " +
+                                threadNum);
+         }
+
+         if (handler.failure != null)
+         {
+            throw new Exception("Handler failed: " + handler.failure);
+         }
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions)
+      {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+   }
+
    protected int getNumIterations()
    {
       return 20;
@@ -1235,14 +1345,21 @@
 
    // Private -------------------------------------------------------
 
-   private void runTestMultipleThreads(final RunnableT runnable, final int numThreads) throws Exception
+   private void runTestMultipleThreads(final RunnableT runnable, final int numThreads, final boolean fileBased) throws Exception
    {
       final int numIts = getNumIterations();
 
       for (int its = 0; its < numIts; its++)
       {
          log.info("************ ITERATION: " + its);
-         start();
+         if (fileBased)
+         {
+            startFileBased();
+         }
+         else
+         {
+            start();
+         }
 
          final ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
                                                                               new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
@@ -1333,6 +1450,56 @@
       return failer;
    }
 
+   private void startFileBased() throws Exception
+   {
+
+      deleteDirectory(new File(getTestDir()));
+
+      Configuration backupConf = new ConfigurationImpl();
+
+      backupConf.setJournalDirectory(getJournalDir(getTestDir() + "/backup"));
+      backupConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/backup"));
+      backupConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/backup"));
+      backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+      backupConf.setJournalFileSize(100 * 1024);
+
+      backupConf.setSecurityEnabled(false);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), backupParams));
+      backupConf.setBackup(true);
+
+      backupService = MessagingServiceImpl.newMessagingService(backupConf);
+      backupService.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+
+      liveConf.setJournalDirectory(getJournalDir(getTestDir() + "/live"));
+      liveConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/live"));
+      liveConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/live"));
+      liveConf.setPagingDirectory(getPageDir(getTestDir() + "/live"));
+
+      liveConf.setJournalFileSize(100 * 1024);
+
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName()));
+
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+      TransportConfiguration backupTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY,
+                                                                   backupParams,
+                                                                   "backup-connector");
+      connectors.put(backupTC.getName(), backupTC);
+      liveConf.setConnectorConfigurations(connectors);
+      liveConf.setBackupConnectorName(backupTC.getName());
+      liveService = MessagingServiceImpl.newMessagingService(liveConf);
+
+      liveService.start();
+
+   }
+
    private void start() throws Exception
    {
       Configuration backupConf = new ConfigurationImpl();
@@ -1380,18 +1547,20 @@
    private void sendMessages(final ClientSession sessSend,
                              final ClientProducer producer,
                              final int numMessages,
+                             final int sizeOfMessage,
                              final int threadNum) throws Exception
    {
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+         ClientMessage message = sessSend.createClientMessage(JBossBytesMessage.TYPE,
                                                               false,
                                                               0,
                                                               System.currentTimeMillis(),
                                                               (byte)1);
          message.putIntProperty(new SimpleString("threadnum"), threadNum);
          message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().flip();
+         message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(sizeOfMessage)));
+         //message.getBody().flip();
          producer.send(message);
       }
    }
@@ -1522,16 +1691,26 @@
       final int tn;
 
       final int numMessages;
+      
+      final int messageSize;
 
       volatile boolean done;
 
-      MyHandler(final int threadNum, final int numMessages)
+      MyHandler(final int threadNum, final int numMessages, final int messageSize)
       {
          this.tn = threadNum;
 
          this.numMessages = numMessages;
+         
+         this.messageSize = messageSize;
       }
 
+
+      MyHandler(final int threadNum, final int numMessages)
+      {
+         this(threadNum, numMessages, DEFAULT_MESSAGE_SIZE);
+      }
+
       public void onMessage(ClientMessage message)
       {
          try
@@ -1566,6 +1745,13 @@
 
             latch.countDown();
          }
+         
+         if (message.getBody().limit() != messageSize)
+         {
+            failure = "Invalid Message size, expected " + messageSize + " but it was " + message.getBody().limit();
+            
+            latch.countDown();
+         }
 
          if (tn == threadNum && c == numMessages - 1)
          {




More information about the jboss-cvs-commits mailing list