[jboss-cvs] JBoss Messaging SVN: r5982 - in trunk: tests/src/org/jboss/messaging/tests/integration/cluster/failover and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Mar 3 21:48:50 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-03-03 21:48:49 -0500 (Tue, 03 Mar 2009)
New Revision: 5982

Modified:
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
   trunk/tests/src/org/jboss/messaging/tests/stress/failover/PagingFailoverStressTest.java
   trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
Log:
Fixing PageFailoverTest and PagingFailoverStressTest, and
PingTest (verifying if this would fix it on Hudson)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-03-03 21:39:33 UTC (rev 5981)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-03-04 02:48:49 UTC (rev 5982)
@@ -413,7 +413,7 @@
 
    public void deliverReplicated(final long messageID) throws Exception
    {
-      MessageReference ref = messageQueue.removeFirstReference(messageID);
+      MessageReference ref = removeReferenceOnBackup(messageID);
       
       //log.info("handling replicated delivery on backup " + messageID + " session " + session.getName());
 
@@ -462,7 +462,44 @@
    // Public ---------------------------------------------------------------------------------------
 
    // Private --------------------------------------------------------------------------------------
+   
+   private MessageReference removeReferenceOnBackup(final long id) throws Exception
+   {
+      // most of the times, the remove will work ok, so we first try it without any locks
+      MessageReference ref = messageQueue.removeFirstReference(id);
 
+      if (ref == null)
+      {
+         PagingStore store = pagingManager.getPageStore(binding.getAddress());
+
+         while (true)
+         {
+            // Can't have the same store being depaged in more than one thread
+            synchronized (store)
+            {
+               // as soon as it gets the lock, it needs to verify if another thread couldn't find the reference
+               ref = messageQueue.removeFirstReference(id);
+               if (ref == null)
+               {
+                  // force a depage
+                  if (!store.readPage()) // This returns false if there are no pages
+                  {
+                     break;
+                  }
+               }
+               else
+               {
+                  break;
+               }
+            }
+         }
+      }
+
+      return ref;
+
+   }
+
+
    private void promptDelivery()
    {
       lock.lock();

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-03-03 21:39:33 UTC (rev 5981)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-03-04 02:48:49 UTC (rev 5982)
@@ -2424,7 +2424,7 @@
    {
       LargeServerMessage largeMessage = storageManager.createLargeMessage();
 
-      MessagingBuffer headerBuffer = ChannelBuffers.dynamicBuffer(header); 
+      MessagingBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header); 
 
       largeMessage.decodeProperties(headerBuffer);
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java	2009-03-03 21:39:33 UTC (rev 5981)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java	2009-03-04 02:48:49 UTC (rev 5982)
@@ -34,6 +34,7 @@
 import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
 import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.JournalType;
 import org.jboss.messaging.core.server.Messaging;
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
@@ -97,6 +98,8 @@
       backupConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/backup"));
       backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
       backupConf.setJournalFileSize(100 * 1024);
+      
+      backupConf.setJournalType(JournalType.NIO);
 
       backupConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
       backupConf.setPagingGlobalWatermarkSize(pageSize);
@@ -123,6 +126,8 @@
       liveConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
       liveConf.setPagingGlobalWatermarkSize(pageSize);
       liveConf.setJournalFileSize(100 * 1024);
+      
+      liveConf.setJournalType(JournalType.NIO);
 
       liveConf.setSecurityEnabled(false);
       liveConf.getAcceptorConfigurations()
@@ -145,7 +150,7 @@
       backupService.getServer().getAddressSettingsRepository().addMatch("#", settings);
 
       clearData(getTestDir() + "/live");
-
+      
       liveService.start();
    }
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	2009-03-03 21:39:33 UTC (rev 5981)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	2009-03-04 02:48:49 UTC (rev 5982)
@@ -23,7 +23,8 @@
 
 package org.jboss.messaging.tests.integration.cluster.failover;
 
-import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
@@ -63,6 +64,50 @@
 
    // Public --------------------------------------------------------
 
+   
+   public void testMultithreadFailoverReplicationOnly() throws Throwable
+   {
+      setUpFileBased(getMaxGlobal(), getPageSize());
+
+      int numberOfProducedMessages = multiThreadProducer(getNumberOfThreads(), false);
+
+      System.out.println(numberOfProducedMessages + " messages produced");
+
+      int numberOfConsumedMessages = multiThreadConsumer(getNumberOfThreads(), false, false);
+
+      assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+   }
+
+   public void testMultithreadFailoverOnProducing() throws Throwable
+   {
+      setUpFileBased(getMaxGlobal(), getPageSize());
+
+      int numberOfProducedMessages = multiThreadProducer(getNumberOfThreads(), true);
+
+      System.out.println(numberOfProducedMessages + " messages produced");
+
+      int numberOfConsumedMessages = multiThreadConsumer(getNumberOfThreads(), true, false);
+
+      assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+   }
+
+   public void testMultithreadFailoverOnConsume() throws Throwable
+   {
+      setUpFileBased(getMaxGlobal(), getPageSize());
+
+      int numberOfProducedMessages = multiThreadProducer(getNumberOfThreads(), false);
+
+      System.out.println(numberOfProducedMessages + " messages produced");
+
+      int numberOfConsumedMessages = multiThreadConsumer(getNumberOfThreads(), false, true);
+
+      assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+   }
+
+   
    public void testFailoverOnPaging() throws Exception
    {
       testPaging(true);
@@ -181,12 +226,27 @@
       return 500;
    }
    
+   protected int getNumberOfThreads()
+   {
+      return 5;
+   }
+   
+   protected int getMaxGlobal()
+   {
+      return 1024;
+   }
+   
+   protected int getPageSize()
+   {
+      return 512;
+   }
+   
    protected void fail(final ClientSession session) throws Exception
    {
       RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
 
-      InVMConnector.numberOfFailures = 1;
-      InVMConnector.failOnCreateConnection = true;
+//      InVMConnector.numberOfFailures = 1;
+//      InVMConnector.failOnCreateConnection = true;
       System.out.println("Forcing a failure");
       conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
 
@@ -194,7 +254,307 @@
 
 
    // Private -------------------------------------------------------
+   
+   /**
+    * @throws Exception
+    * @throws InterruptedException
+    * @throws Throwable
+    */
+   protected int multiThreadConsumer(int numberOfThreads, final boolean connectedOnBackup, final boolean fail) throws Exception,
+                                                                                       InterruptedException,
+                                                                                       Throwable
+   {
+      ClientSession session = null;
+      try
+      {
+         final AtomicInteger numberOfMessages = new AtomicInteger(0);
 
+         final int RECEIVE_TIMEOUT = 2000;
+
+         final ClientSessionFactory factory;
+         final PagingStore store;
+
+         if (connectedOnBackup)
+         {
+            factory = createBackupFactory();
+            store = backupService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+         }
+         else
+         {
+            factory = createFailoverFactory();
+            store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+         }
+
+         session = factory.createSession(false, true, true, false);
+
+         final int initialNumberOfPages = store.getNumberOfPages();
+
+         System.out.println("It has initially " + initialNumberOfPages);
+
+         final CountDownLatch startFlag = new CountDownLatch(1);
+         final CountDownLatch alignSemaphore = new CountDownLatch(numberOfThreads);
+
+         class Consumer extends Thread
+         {
+            volatile Throwable e;
+
+            ClientSession session;
+
+            public Consumer() throws Exception
+            {
+               session = factory.createSession(null, null, false, true, true, false, 0);
+            }
+
+            @Override
+            public void run()
+            {
+               boolean started = false;
+
+               try
+               {
+
+                  try
+                  {
+                     ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+                     session.start();
+
+                     alignSemaphore.countDown();
+
+                     started = true;
+
+                     startFlag.await();
+
+                     while (true)
+                     {
+                        ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+                        if (msg == null)
+                        {
+                           break;
+                        }
+
+                        if (numberOfMessages.incrementAndGet() % 1000 == 0)
+                        {
+                           System.out.println(numberOfMessages + " messages read");
+                        }
+
+                        msg.acknowledge();
+                     }
+
+                  }
+                  finally
+                  {
+                     session.close();
+                  }
+               }
+               catch (Throwable e)
+               {
+                  // Using System.out, as it would appear on the test output
+                  e.printStackTrace(); 
+                  if (!started)
+                  {
+                     alignSemaphore.countDown();
+                  }
+                  this.e = e;
+               }
+            }
+         }
+
+         Consumer[] consumers = new Consumer[numberOfThreads];
+
+         for (int i = 0; i < numberOfThreads; i++)
+         {
+            consumers[i] = new Consumer();
+         }
+
+         for (int i = 0; i < numberOfThreads; i++)
+         {
+            consumers[i].start();
+         }
+
+         alignSemaphore.await();
+
+         startFlag.countDown();
+
+         if (fail)
+         {
+            Thread.sleep(1000);
+            while (store.getNumberOfPages() == initialNumberOfPages)
+            {
+               Thread.sleep(100);
+            }
+
+            System.out.println("The system has already depaged " + (initialNumberOfPages - store.getNumberOfPages()) +
+                               ", failing now");
+
+            fail(session);
+         }
+
+         for (Thread t : consumers)
+         {
+            t.join();
+         }
+
+         for (Consumer p : consumers)
+         {
+            if (p.e != null)
+            {
+               throw p.e;
+            }
+         }
+
+         return numberOfMessages.intValue();
+      }
+      finally
+      {
+         if (session != null)
+         {
+            try
+            {
+               session.close();
+            }
+            catch (Exception ignored)
+            {
+            }
+         }
+      }
+   }
+
+   /**
+    * @throws Exception
+    * @throws MessagingException
+    * @throws InterruptedException
+    * @throws Throwable
+    */
+   protected int multiThreadProducer(final int numberOfThreads, final boolean failover) throws Exception,
+                                                          MessagingException,
+                                                          InterruptedException,
+                                                          Throwable
+   {
+
+      final AtomicInteger numberOfMessages = new AtomicInteger(0);
+      final PagingStore store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+
+      final ClientSessionFactory factory = createFailoverFactory();
+
+      ClientSession session = factory.createSession(false, true, true, false);
+      try
+      {
+         session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+         final CountDownLatch startFlag = new CountDownLatch(1);
+         final CountDownLatch alignSemaphore = new CountDownLatch(numberOfThreads);
+         final CountDownLatch flagPaging = new CountDownLatch(numberOfThreads);
+
+         class Producer extends Thread
+         {
+            volatile Throwable e;
+
+            @Override
+            public void run()
+            {
+               boolean started = false;
+               try
+               {
+                  ClientSession session = factory.createSession(false, true, true);
+                  try
+                  {
+                     ClientProducer producer = session.createProducer(ADDRESS);
+
+                     alignSemaphore.countDown();
+
+                     started = true;
+                     startFlag.await();
+
+                     while (!store.isPaging())
+                     {
+
+                        ClientMessage msg = session.createClientMessage(true);
+
+                        producer.send(msg);
+                        numberOfMessages.incrementAndGet();
+                     }
+
+                     flagPaging.countDown();
+
+                     for (int i = 0; i < 100; i++)
+                     {
+
+                        ClientMessage msg = session.createClientMessage(true);
+
+                        producer.send(msg);
+                        numberOfMessages.incrementAndGet();
+
+                     }
+
+                  }
+                  finally
+                  {
+                     session.close();
+                  }
+               }
+               catch (Throwable e)
+               {
+                  // Using System.out, as it would appear on the test output
+                  e.printStackTrace(); 
+                  if (!started)
+                  {
+                     alignSemaphore.countDown();
+                  }
+                  flagPaging.countDown();
+                  this.e = e;
+               }
+            }
+         }
+
+         Producer[] producers = new Producer[numberOfThreads];
+
+         for (int i = 0; i < numberOfThreads; i++)
+         {
+            producers[i] = new Producer();
+            producers[i].start();
+         }
+
+         alignSemaphore.await();
+
+         // Start producing only when all the sessions are opened
+         startFlag.countDown();
+
+         if (failover)
+         {
+            flagPaging.await(); // for this test I want everybody on the paging part
+
+            Thread.sleep(1500);
+
+            fail(session);
+
+         }
+
+         for (Thread t : producers)
+         {
+            t.join();
+         }
+
+         for (Producer p : producers)
+         {
+            if (p.e != null)
+            {
+               throw p.e;
+            }
+         }
+
+         return numberOfMessages.intValue();
+
+      }
+      finally
+      {
+         session.close();
+         InVMConnector.resetFailures();
+      }
+
+   }
+   
+
    // Inner classes -------------------------------------------------
 
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java	2009-03-03 21:39:33 UTC (rev 5981)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java	2009-03-04 02:48:49 UTC (rev 5982)
@@ -78,7 +78,7 @@
    private static final long PING_INTERVAL = 500;
 
    // Attributes ----------------------------------------------------
-   
+
    private MessagingService messagingService;
 
    // Static --------------------------------------------------------
@@ -110,7 +110,7 @@
       public boolean connectionFailed(MessagingException me)
       {
          this.me = me;
-         
+
          return true;
       }
 
@@ -144,7 +144,7 @@
                                                               DEFAULT_AUTO_GROUP,
                                                               DEFAULT_MAX_CONNECTIONS,
                                                               DEFAULT_PRE_ACKNOWLEDGE,
-                                                              DEFAULT_ACK_BATCH_SIZE,                                                            
+                                                              DEFAULT_ACK_BATCH_SIZE,
                                                               DEFAULT_RETRY_INTERVAL,
                                                               DEFAULT_RETRY_INTERVAL_MULTIPLIER,
                                                               DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
@@ -219,14 +219,14 @@
                                                               DEFAULT_AUTO_GROUP,
                                                               DEFAULT_MAX_CONNECTIONS,
                                                               DEFAULT_PRE_ACKNOWLEDGE,
-                                                              DEFAULT_ACK_BATCH_SIZE,                                                          
+                                                              DEFAULT_ACK_BATCH_SIZE,
                                                               DEFAULT_RETRY_INTERVAL,
                                                               DEFAULT_RETRY_INTERVAL_MULTIPLIER,
                                                               DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
                                                               DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
 
       ClientSession session = csf.createSession(false, true, true);
-      
+
       assertEquals(1, ((ClientSessionFactoryInternal)csf).numConnections());
 
       Listener clientListener = new Listener();
@@ -294,20 +294,20 @@
                                                               DEFAULT_AUTO_GROUP,
                                                               DEFAULT_MAX_CONNECTIONS,
                                                               DEFAULT_PRE_ACKNOWLEDGE,
-                                                              DEFAULT_ACK_BATCH_SIZE,                                                             
+                                                              DEFAULT_ACK_BATCH_SIZE,
                                                               DEFAULT_RETRY_INTERVAL,
                                                               DEFAULT_RETRY_INTERVAL_MULTIPLIER,
                                                               DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
                                                               DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
-      
+
       Listener clientListener = new Listener();
 
       ClientSession session = csf.createSession(false, true, true);
-      
+
       assertEquals(1, ((ClientSessionFactoryInternal)csf).numConnections());
 
       session.addFailureListener(clientListener);
-      
+
       RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
 
       // We need to get it to send one ping then stop
@@ -334,8 +334,21 @@
 
       serverConn.addFailureListener(serverListener);
 
-      Thread.sleep(PING_INTERVAL * 10);
+      for (int i = 0; i < 20; i++)
+      {
+         // a few tries to avoid a possible race caused by GCs or similar issues
+         if (messagingService.getServer().getRemotingService().getConnections().isEmpty())
+         {
+            // Sleep a bit more since it's async
+            // We are not sure about the order in which the listeners are called, so we need another sleep
+            Thread.sleep(PING_INTERVAL);
 
+            break;
+         }
+
+         Thread.sleep(PING_INTERVAL);
+      }
+
       // The client listener should be called too since the server will close it from the server side which will result
       // in the
       // netty detecting closure on the client side and then calling failure listener
@@ -347,7 +360,8 @@
 
       session.close();
    }
-
+   
+   
    /*
    * Test the client triggering failure due to no pong received in time
    */
@@ -360,6 +374,7 @@
             log.info("In interceptor, packet is " + packet.getType());
             if (packet.getType() == PacketImpl.PING)
             {
+               log.info("Ignoring Ping packet.. it will be dropped");
                return false;
             }
             else
@@ -372,7 +387,7 @@
       messagingService.getServer().getRemotingService().addInterceptor(noPongInterceptor);
 
       TransportConfiguration transportConfig = new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyConnectorFactory");
-      
+
       ClientSessionFactory csf = new ClientSessionFactoryImpl(transportConfig,
                                                               null,
                                                               DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
@@ -390,18 +405,18 @@
                                                               DEFAULT_AUTO_GROUP,
                                                               DEFAULT_MAX_CONNECTIONS,
                                                               DEFAULT_PRE_ACKNOWLEDGE,
-                                                              DEFAULT_ACK_BATCH_SIZE,                                                              
+                                                              DEFAULT_ACK_BATCH_SIZE,
                                                               DEFAULT_RETRY_INTERVAL,
                                                               DEFAULT_RETRY_INTERVAL_MULTIPLIER,
                                                               0,
                                                               0);
-      
+
       ClientSession session = csf.createSession(false, true, true);
-      
+
       assertEquals(1, ((ClientSessionFactoryInternal)csf).numConnections());
 
       Listener clientListener = new Listener();
-      
+
       session.addFailureListener(clientListener);
 
       RemotingConnection serverConn = null;
@@ -424,13 +439,22 @@
 
       serverConn.addFailureListener(serverListener);
 
-      Thread.sleep(PING_INTERVAL * 2);
+      for (int i = 0; i < 20; i++)
+      {
+         // a few tries to avoid a possible race caused by GCs or similar issues
+         if (messagingService.getServer().getRemotingService().getConnections().isEmpty())
+         {
+            // Sleep a bit more since it's async
+            // We are not sure about the order in which the listeners are called, so we need another sleep
+            Thread.sleep(PING_INTERVAL);
+            break;
+         }
 
+         Thread.sleep(PING_INTERVAL);
+      }
+
       assertNotNull(clientListener.getException());
 
-      // Sleep a bit more since it's async
-      Thread.sleep(PING_INTERVAL);
-
       // We don't receive an exception on the server in this case
       assertNull(serverListener.getException());
 

Modified: trunk/tests/src/org/jboss/messaging/tests/stress/failover/PagingFailoverStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/failover/PagingFailoverStressTest.java	2009-03-03 21:39:33 UTC (rev 5981)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/failover/PagingFailoverStressTest.java	2009-03-04 02:48:49 UTC (rev 5982)
@@ -22,17 +22,6 @@
 
 package org.jboss.messaging.tests.stress.failover;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.paging.PagingStore;
-import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
 import org.jboss.messaging.tests.integration.cluster.failover.PagingFailoverTest;
 import org.jboss.messaging.utils.SimpleString;
 
@@ -59,359 +48,30 @@
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
-   public void testMultithreadFailoverReplicationOnly() throws Throwable
-   {
-      setUpFileBased(10 * 1024, 5 * 1024);
 
-      int numberOfProducedMessages = multiThreadProducer(false);
+   // Package protected ---------------------------------------------
 
-      System.out.println(numberOfProducedMessages + " messages produced");
-
-      int numberOfConsumedMessages = multiThreadConsumer(false, false);
-
-      assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
-
-   }
-
-   public void testMultithreadFailoverOnProducing() throws Throwable
+   // Protected -----------------------------------------------------
+   protected int getNumberOfMessages()
    {
-      setUpFileBased(10 * 1024, 5 * 1024);
-
-      int numberOfProducedMessages = multiThreadProducer(true);
-
-      System.out.println(numberOfProducedMessages + " messages produced");
-
-      int numberOfConsumedMessages = multiThreadConsumer(true, false);
-
-      assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
-
+      return 5000;
    }
-
-   public void testMultithreadFailoverOnConsume() throws Throwable
+   protected int getNumberOfThreads()
    {
-      setUpFileBased(10 * 1024, 5 * 1024);
-
-      int numberOfProducedMessages = multiThreadProducer(false);
-
-      System.out.println(numberOfProducedMessages + " messages produced");
-
-      int numberOfConsumedMessages = multiThreadConsumer(false, true);
-
-      assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
-
+      return 10;
    }
-
-   /**
-    * @throws Exception
-    * @throws InterruptedException
-    * @throws Throwable
-    */
-   private int multiThreadConsumer(final boolean connectedOnBackup, final boolean fail) throws Exception,
-                                                                                       InterruptedException,
-                                                                                       Throwable
+   
+   protected int getMaxGlobal()
    {
-      ClientSession session = null;
-      try
-      {
-         final AtomicInteger numberOfMessages = new AtomicInteger(0);
-
-         final int RECEIVE_TIMEOUT = 10000;
-
-         final ClientSessionFactory factory;
-         final PagingStore store;
-
-         if (connectedOnBackup)
-         {
-            factory = createBackupFactory();
-            store = backupService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
-         }
-         else
-         {
-            factory = createFailoverFactory();
-            store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
-         }
-
-         session = factory.createSession(false, true, true, false);
-
-         final int initialNumberOfPages = store.getNumberOfPages();
-
-         System.out.println("It has initially " + initialNumberOfPages);
-
-         final int THREAD_CONSUMERS = 20;
-
-         final CountDownLatch startFlag = new CountDownLatch(1);
-         final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_CONSUMERS);
-
-         class Consumer extends Thread
-         {
-            volatile Throwable e;
-
-            ClientSession session;
-
-            public Consumer() throws Exception
-            {
-               session = factory.createSession(null, null, false, true, true, false, 0);
-            }
-
-            @Override
-            public void run()
-            {
-               boolean started = false;
-
-               try
-               {
-
-                  try
-                  {
-                     ClientConsumer consumer = session.createConsumer(ADDRESS);
-
-                     session.start();
-
-                     alignSemaphore.countDown();
-
-                     started = true;
-
-                     startFlag.await();
-
-                     while (true)
-                     {
-                        ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
-                        if (msg == null)
-                        {
-                           break;
-                        }
-
-                        if (numberOfMessages.incrementAndGet() % 1000 == 0)
-                        {
-                           System.out.println(numberOfMessages + " messages read");
-                        }
-
-                        msg.acknowledge();
-                     }
-
-                  }
-                  finally
-                  {
-                     session.close();
-                  }
-               }
-               catch (Throwable e)
-               {
-                  // Using System.out, as it would appear on the test output
-                  e.printStackTrace(); 
-                  if (!started)
-                  {
-                     alignSemaphore.countDown();
-                  }
-                  this.e = e;
-               }
-            }
-         }
-
-         Consumer[] consumers = new Consumer[THREAD_CONSUMERS];
-
-         for (int i = 0; i < THREAD_CONSUMERS; i++)
-         {
-            consumers[i] = new Consumer();
-         }
-
-         for (int i = 0; i < THREAD_CONSUMERS; i++)
-         {
-            consumers[i].start();
-         }
-
-         alignSemaphore.await();
-
-         startFlag.countDown();
-
-         if (fail)
-         {
-            Thread.sleep(1000);
-            while (store.getNumberOfPages() == initialNumberOfPages)
-            {
-               Thread.sleep(100);
-            }
-
-            System.out.println("The system has already depaged " + (initialNumberOfPages - store.getNumberOfPages()) +
-                               ", failing now");
-
-            fail(session);
-         }
-
-         for (Thread t : consumers)
-         {
-            t.join();
-         }
-
-         for (Consumer p : consumers)
-         {
-            if (p.e != null)
-            {
-               throw p.e;
-            }
-         }
-
-         return numberOfMessages.intValue();
-      }
-      finally
-      {
-         if (session != null)
-         {
-            try
-            {
-               session.close();
-            }
-            catch (Exception ignored)
-            {
-            }
-         }
-      }
+      return 10 * 1024;
    }
-
-   /**
-    * @throws Exception
-    * @throws MessagingException
-    * @throws InterruptedException
-    * @throws Throwable
-    */
-   private int multiThreadProducer(final boolean failover) throws Exception,
-                                                          MessagingException,
-                                                          InterruptedException,
-                                                          Throwable
+   
+   protected int getPageSize()
    {
-
-      final AtomicInteger numberOfMessages = new AtomicInteger(0);
-      final PagingStore store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
-
-      final ClientSessionFactory factory = createFailoverFactory();
-
-      ClientSession session = factory.createSession(false, true, true, false);
-      try
-      {
-         session.createQueue(ADDRESS, ADDRESS, null, true, false);
-
-         final int THREAD_PRODUCERS = 30;
-
-         final CountDownLatch startFlag = new CountDownLatch(1);
-         final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_PRODUCERS);
-         final CountDownLatch flagPaging = new CountDownLatch(THREAD_PRODUCERS);
-
-         class Producer extends Thread
-         {
-            volatile Throwable e;
-
-            @Override
-            public void run()
-            {
-               boolean started = false;
-               try
-               {
-                  ClientSession session = factory.createSession(false, true, true);
-                  try
-                  {
-                     ClientProducer producer = session.createProducer(ADDRESS);
-
-                     alignSemaphore.countDown();
-
-                     started = true;
-                     startFlag.await();
-
-                     while (!store.isPaging())
-                     {
-
-                        ClientMessage msg = session.createClientMessage(true);
-
-                        producer.send(msg);
-                        numberOfMessages.incrementAndGet();
-                     }
-
-                     flagPaging.countDown();
-
-                     for (int i = 0; i < 100; i++)
-                     {
-
-                        ClientMessage msg = session.createClientMessage(true);
-
-                        producer.send(msg);
-                        numberOfMessages.incrementAndGet();
-
-                     }
-
-                  }
-                  finally
-                  {
-                     session.close();
-                  }
-               }
-               catch (Throwable e)
-               {
-                  // Using System.out, as it would appear on the test output
-                  e.printStackTrace(); 
-                  if (!started)
-                  {
-                     alignSemaphore.countDown();
-                  }
-                  flagPaging.countDown();
-                  this.e = e;
-               }
-            }
-         }
-
-         Producer[] producers = new Producer[THREAD_PRODUCERS];
-
-         for (int i = 0; i < THREAD_PRODUCERS; i++)
-         {
-            producers[i] = new Producer();
-            producers[i].start();
-         }
-
-         alignSemaphore.await();
-
-         // Start producing only when all the sessions are opened
-         startFlag.countDown();
-
-         if (failover)
-         {
-            flagPaging.await(); // for this test I want everybody on the paging part
-
-            Thread.sleep(1500);
-
-            fail(session);
-
-         }
-
-         for (Thread t : producers)
-         {
-            t.join();
-         }
-
-         for (Producer p : producers)
-         {
-            if (p.e != null)
-            {
-               throw p.e;
-            }
-         }
-
-         return numberOfMessages.intValue();
-
-      }
-      finally
-      {
-         session.close();
-         InVMConnector.resetFailures();
-      }
-
+      return 5 * 1024;
    }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-   protected int getNumberOfMessages()
-   {
-      return 5000;
-   }
    
+  
 
    // Private -------------------------------------------------------
 

Modified: trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java	2009-03-03 21:39:33 UTC (rev 5981)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java	2009-03-04 02:48:49 UTC (rev 5982)
@@ -21,6 +21,7 @@
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.JournalType;
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.tests.util.ServiceTestBase;
@@ -320,6 +321,8 @@
       config.setJournalFileSize(10 * 1024 * 1024);
       config.setJournalMinFiles(5);
       
+      config.setJournalType(JournalType.NIO);
+      
       return config;
    }
 




More information about the jboss-cvs-commits mailing list