[jboss-cvs] JBoss Messaging SVN: r5690 - in trunk: src/main/org/jboss/messaging/core/paging/impl and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jan 22 15:03:37 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-22 15:03:36 -0500 (Thu, 22 Jan 2009)
New Revision: 5690

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReplicateDeliveryMessage.java
   trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
Log:
Tweaks on paging & failover

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2009-01-22 14:37:22 UTC (rev 5689)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2009-01-22 20:03:36 UTC (rev 5690)
@@ -73,7 +73,7 @@
     * @return
     * @throws Exception 
     */
-   PagingStore createPageStore(SimpleString destination, boolean createDir) throws Exception;
+   PagingStore createPageStore(SimpleString destination) throws Exception;
 
    /** To return the PageStore associated with the address */
    PagingStore getPageStore(SimpleString address) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2009-01-22 14:37:22 UTC (rev 5689)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2009-01-22 20:03:36 UTC (rev 5690)
@@ -136,14 +136,14 @@
    /* (non-Javadoc)
     * @see org.jboss.messaging.core.paging.PagingManager#reloadStores()
     */
-   public void reloadStores() throws Exception
+   public synchronized void reloadStores() throws Exception
    {
       List<PagingStore> destinations = pagingStoreFactory.reloadStores(queueSettingsRepository);
 
       for (PagingStore store: destinations)
       {
-         stores.put(store.getStoreName(), store);
          store.start();
+         stores.put(store.getStoreName(), store);
       }
 
    }
@@ -152,7 +152,7 @@
     * @param destination
     * @return
     */
-   public synchronized PagingStore createPageStore(final SimpleString storeName, final boolean createDir) throws Exception
+   public synchronized PagingStore createPageStore(final SimpleString storeName) throws Exception
    {
       PagingStore store = stores.get(storeName);
 
@@ -160,16 +160,9 @@
       {
          store = newStore(storeName);
 
-         PagingStore oldStore = stores.putIfAbsent(storeName, store);
+         store.start();
 
-         if (oldStore != null)
-         {
-            store = oldStore;
-         }
-         else
-         {
-            store.start();
-         }
+         stores.put(storeName, store);
       }
 
       return store;
@@ -181,7 +174,7 @@
 
       if (store == null)
       {
-         store = createPageStore(storeName, true);
+         store = createPageStore(storeName);
       }
 
       return store;

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-22 14:37:22 UTC (rev 5689)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-22 20:03:36 UTC (rev 5690)
@@ -303,7 +303,7 @@
                          (maxGlobalSize - pagingManager.getDefaultPageSize()));
             }
 
-            if (pagingManager.isGlobalPageMode() && currentGlobalSize < maxGlobalSize - pagingManager.getDefaultPageSize())
+            if (maxGlobalSize > 0 && pagingManager.isGlobalPageMode() && currentGlobalSize < maxGlobalSize - pagingManager.getDefaultPageSize())
             {
                pagingManager.startGlobalDepage();
             }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReplicateDeliveryMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReplicateDeliveryMessage.java	2009-01-22 14:37:22 UTC (rev 5689)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReplicateDeliveryMessage.java	2009-01-22 20:03:36 UTC (rev 5690)
@@ -13,8 +13,6 @@
 package org.jboss.messaging.core.remoting.impl.wireformat;
 
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.DataConstants;
-import org.jboss.messaging.util.SimpleString;
 
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -30,21 +28,17 @@
 
    private long messageID;
 
-   private SimpleString address;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionReplicateDeliveryMessage(final long consumerID, final long messageID, final SimpleString address)
+   public SessionReplicateDeliveryMessage(final long consumerID, final long messageID)
    {
       super(SESS_REPLICATE_DELIVERY);
 
       this.consumerID = consumerID;
 
       this.messageID = messageID;
-
-      this.address = address;
    }
 
    public SessionReplicateDeliveryMessage()
@@ -64,18 +58,11 @@
       return messageID;
    }
 
-   public SimpleString getAddress()
-   {
-      return address;
-   }
-
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.putLong(consumerID);
 
       buffer.putLong(messageID);
-
-      buffer.putSimpleString(address);
    }
 
    public void decodeBody(final MessagingBuffer buffer)
@@ -83,20 +70,13 @@
       consumerID = buffer.getLong();
 
       messageID = buffer.getLong();
-
-      address = buffer.getSimpleString();
    }
-
+   
    public boolean isRequiresConfirmations()
-   {
+   {      
       return false;
    }
 
-   public int getRequiredBufferSize()
-   {
-      return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + SimpleString.sizeofString(address);
-   }
-
    public boolean equals(Object other)
    {
       if (other instanceof SessionReplicateDeliveryMessage == false)
@@ -108,7 +88,7 @@
 
       return super.equals(other) && this.consumerID == r.consumerID && this.messageID == r.messageID;
    }
-
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2009-01-22 14:37:22 UTC (rev 5689)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2009-01-22 20:03:36 UTC (rev 5690)
@@ -57,7 +57,7 @@
 	
 	void failedOver();
 	
-	void deliverReplicated(SimpleString address, long messageID) throws Exception;
+	void deliverReplicated(long messageID) throws Exception;
 	
 	void lock();
 	

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-22 14:37:22 UTC (rev 5689)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-22 20:03:36 UTC (rev 5690)
@@ -88,6 +88,8 @@
    private final long id;
 
    private final Queue messageQueue;
+   
+   private final SimpleString bindingAddress;
 
    private final Filter filter;
 
@@ -115,7 +117,7 @@
    private final boolean browseOnly;
 
    private final StorageManager storageManager;
-   
+
    private final PagingManager pagingManager;
 
    private final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue<MessageReference>();
@@ -130,6 +132,7 @@
 
    public ServerConsumerImpl(final long id,
                              final ServerSession session,
+                             final SimpleString bindingAddress,
                              final Queue messageQueue,
                              final Filter filter,
                              final boolean started,
@@ -143,6 +146,8 @@
       this.id = id;
 
       this.messageQueue = messageQueue;
+      
+      this.bindingAddress = bindingAddress;
 
       this.filter = filter;
 
@@ -159,7 +164,7 @@
       this.channel = channel;
 
       this.preAcknowledge = preAcknowledge;
-      
+
       this.pagingManager = pagingManager;
 
       messageQueue.addConsumer(this);
@@ -361,7 +366,7 @@
          else
          {
             ref.getQueue().acknowledge(tx, ref);
-            
+
             // Del count is not actually updated in storage unless it's
             // cancelled
             ref.incrementDeliveryCount();
@@ -409,11 +414,11 @@
       return ref;
    }
 
-   public void deliverReplicated(final SimpleString address, final long messageID) throws Exception
+   public void deliverReplicated(final long messageID) throws Exception
    {
       // It may not be the first in the queue - since there may be multiple producers
       // sending to the queue
-      MessageReference ref = removeReferenceOnBackup(address, messageID);
+      MessageReference ref = removeReferenceOnBackup(messageID);
 
       if (ref == null)
       {
@@ -459,7 +464,7 @@
 
    // Private --------------------------------------------------------------------------------------
 
-   private MessageReference removeReferenceOnBackup(SimpleString address, long id) throws Exception
+   private MessageReference removeReferenceOnBackup(long id) throws Exception
    {
 
       // most of the times, the remove will work ok, so we first try it without any locks
@@ -467,9 +472,9 @@
 
       if (ref == null)
       {
-         PagingStore store = pagingManager.getPageStore(address);
+         PagingStore store = pagingManager.getPageStore(bindingAddress);
 
-         for (;;)
+         while (true)
          {
             // Can't have the same store being depaged in more than one thread
             synchronized (store)
@@ -478,8 +483,9 @@
                ref = messageQueue.removeReferenceWithID(id);
                if (ref == null)
                {
+                  System.out.println("Forcing depage");
                   // force a depage
-                  if (!store.readPage())
+                  if (!store.readPage()) // This returns false if there are no pages
                   {
                      break;
                   }
@@ -558,8 +564,8 @@
             return HandleStatus.BUSY;
          }
 
-         //TODO use a null or boolean check here for performance
-         
+         // note: Since we schedule deliveries to start under replication, we use a counter of pendingLargeMessages.
+
          // If there is a pendingLargeMessage we can't take another message
          // This has to be checked inside the lock as the set to null is done inside the lock
          if (pendingLargeMessagesCounter.get() > 0)
@@ -620,9 +626,7 @@
 
       final LargeMessageDeliverer localDeliverer = new LargeMessageDeliverer((LargeServerMessage)message, ref);
 
-      DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
-                                                                                         message.getMessageID(),
-                                                                                         message.getDestination()));
+      DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID()));
 
       if (result == null)
       {
@@ -670,9 +674,7 @@
 
       final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount() + 1);
 
-      DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
-                                                                                         message.getMessageID(),
-                                                                                         message.getDestination()));
+      DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID()));
 
       if (result == null)
       {
@@ -757,16 +759,16 @@
                return false;
             }
 
-            int creditsUsed;
+            int precalculateAvailableCredits;
 
             if (availableCredits != null)
             {
                // Flow control needs to be done in advance.
-               creditsUsed = preCalculateFlowControl();
+               precalculateAvailableCredits = preCalculateFlowControl();
             }
             else
             {
-               creditsUsed = 0;
+               precalculateAvailableCredits = 0;
             }
 
             if (!sentFirstMessage)
@@ -789,7 +791,7 @@
 
                if (availableCredits != null)
                {
-                  if ((creditsUsed -= initialMessage.getRequiredBufferSize()) < 0)
+                  if ((precalculateAvailableCredits -= initialMessage.getRequiredBufferSize()) < 0)
                   {
                      log.warn("Credit logic is not working properly, too many credits were taken");
                   }
@@ -798,7 +800,7 @@
                   {
                      trace("deliverLargeMessage:: Initial send, taking out " + initialMessage.getRequiredBufferSize() +
                            " credits, current = " +
-                           creditsUsed +
+                           precalculateAvailableCredits +
                            " isBackup = " +
                            messageQueue.isBackup());
 
@@ -815,7 +817,7 @@
 
             while (positionPendingLargeMessage < sizePendingLargeMessage)
             {
-               if (creditsUsed <= 0)
+               if (precalculateAvailableCredits <= 0)
                {
                   if (trace)
                   {
@@ -830,7 +832,7 @@
 
                if (availableCredits != null)
                {
-                  if ((creditsUsed -= chunk.getRequiredBufferSize()) < 0)
+                  if ((precalculateAvailableCredits -= chunk.getRequiredBufferSize()) < 0)
                   {
                      log.warn("Flowcontrol logic is not working properly, too many credits were taken");
                   }
@@ -850,9 +852,9 @@
                positionPendingLargeMessage += chunkLen;
             }
 
-            if (creditsUsed != 0)
+            if (precalculateAvailableCredits != 0)
             {
-               log.warn("Flowcontrol logic is not working properly... creidts = " + creditsUsed);
+               log.warn("Flowcontrol logic is not working properly... creidts = " + precalculateAvailableCredits);
             }
 
             if (trace)
@@ -880,29 +882,28 @@
        */
       private int preCalculateFlowControl()
       {
-         for (;;)
+         while (true)
          {
-            final int currentCredit;
-            int creditsUsed = 0;
-            currentCredit = availableCredits.get();
+            final int currentCredit = availableCredits.get();
+            int precalculatedCredits = 0;
 
             if (!sentFirstMessage)
             {
-               creditsUsed = SessionReceiveMessage.SESSION_RECEIVE_MESSAGE_LARGE_MESSAGE_SIZE + pendingLargeMessage.getPropertiesEncodeSize();
+               precalculatedCredits = SessionReceiveMessage.SESSION_RECEIVE_MESSAGE_LARGE_MESSAGE_SIZE + pendingLargeMessage.getPropertiesEncodeSize();
             }
 
             long chunkLen = 0;
-            for (long i = positionPendingLargeMessage; creditsUsed < currentCredit && i < sizePendingLargeMessage; i += chunkLen)
+            for (long i = positionPendingLargeMessage; precalculatedCredits < currentCredit && i < sizePendingLargeMessage; i += chunkLen)
             {
                chunkLen = (int)Math.min(sizePendingLargeMessage - i, minLargeMessageSize);
-               creditsUsed += chunkLen + SessionReceiveContinuationMessage.SESSION_RECEIVE_CONTINUATION_BASE_SIZE;
+               precalculatedCredits += chunkLen + SessionReceiveContinuationMessage.SESSION_RECEIVE_CONTINUATION_BASE_SIZE;
             }
 
             // The calculation of credits and taking credits out has to be taken atomically.
             // Since we are not sending anything to the client during this calculation, this is unlikely to happen
-            if (availableCredits.compareAndSet(currentCredit, currentCredit - creditsUsed))
+            if (availableCredits.compareAndSet(currentCredit, currentCredit - precalculatedCredits))
             {
-               return creditsUsed;
+               return precalculatedCredits;
             }
          }
       }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-22 14:37:22 UTC (rev 5689)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-22 20:03:36 UTC (rev 5690)
@@ -1029,16 +1029,19 @@
    {
       DelayedResult result = channel.replicatePacket(packet);
 
+      try
+      {
+         // Note we don't wait for response before handling this
+
+         consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to receive credits", e);
+      }
+
       if (result == null)
       {
-         try
-         {
-            consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
-         }
-         catch (Exception e)
-         {
-            log.error("Failed to receive credits", e);
-         }
          channel.confirm(packet);
       }
       else
@@ -1047,14 +1050,6 @@
          {
             public void run()
             {
-               try
-               {
-                  consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
-               }
-               catch (Exception e)
-               {
-                  log.error("Failed to receive credits", e);
-               }
                channel.confirm(packet);
             }
          });
@@ -1249,7 +1244,7 @@
 
       try
       {
-         consumer.deliverReplicated(packet.getAddress(), packet.getMessageID());
+         consumer.deliverReplicated(packet.getMessageID());
       }
       catch (Exception e)
       {
@@ -1403,6 +1398,7 @@
 
          ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
                                                           this,
+                                                          binding.getAddress(),
                                                           theQueue,
                                                           filter,
                                                           started,

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	2009-01-22 20:03:36 UTC (rev 5690)
@@ -0,0 +1,560 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A PagingFailoverTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Dec 8, 2008 10:53:16 AM
+ *
+ *
+ */
+public class PagingFailoverTest extends FailoverTestBase
+{
+
+   // Constants -----------------------------------------------------
+   private static final Logger log = Logger.getLogger(PagingFailoverTest.class);
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testFailoverOnPaging() throws Exception
+   {
+      testPaging(true);
+   }
+
+   public void testReplicationOnPaging() throws Exception
+   {
+      testPaging(false);
+   }
+
+   private void testPaging(final boolean fail) throws Exception
+   {
+      setUpFileBased(100 * 1024);
+
+      ClientSession session = null;
+      try
+      {
+         ClientSessionFactory sf1 = createFailoverFactory();
+
+         session = sf1.createSession(null, null, false, true, true, false, 0);
+
+         session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         final int numMessages = 50000;
+
+         PagingManager pmLive = liveService.getServer().getPostOffice().getPagingManager();
+         PagingStore storeLive = pmLive.getPageStore(ADDRESS);
+
+         PagingManager pmBackup = backupService.getServer().getPostOffice().getPagingManager();
+         PagingStore storeBackup = pmBackup.getPageStore(ADDRESS);
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = session.createClientMessage(true);
+            ByteBuffer buffer = ByteBuffer.allocate(1000);
+
+            buffer.putInt(i);
+
+            buffer.rewind();
+
+            message.setBody(new ByteBufferWrapper(buffer));
+
+            producer.send(message);
+
+            if (storeLive.isPaging())
+            {
+               assertTrue(storeBackup.isPaging());
+            }
+         }
+
+         session.close();
+         session = sf1.createSession(null, null, false, true, true, false, 0);
+         session.start();
+
+         final RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+         assertEquals("GloblSize", pmLive.getGlobalSize(), pmBackup.getGlobalSize());
+
+         assertEquals("PageSizeLive", storeLive.getAddressSize(), pmLive.getGlobalSize());
+
+         assertEquals("PageSizeBackup", storeBackup.getAddressSize(), pmBackup.getGlobalSize());
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         for (int i = 0; i < numMessages; i++)
+         {
+
+            if (fail && i == numMessages / 2)
+            {
+               conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+            }
+
+            ClientMessage message = consumer.receive(10000);
+
+
+            assertNotNull(message);
+
+            message.acknowledge();
+
+            message.getBody().rewind();
+
+            assertEquals(i, message.getBody().getInt());
+
+         }
+
+         session.close();
+         session = null;
+
+         if (!fail)
+         {
+            assertEquals(0, pmLive.getGlobalSize());
+            assertEquals(0, storeLive.getAddressSize());
+         }
+         assertEquals(0, pmBackup.getGlobalSize());
+         assertEquals(0, storeBackup.getAddressSize());
+
+      }
+      finally
+      {
+         if (session != null)
+         {
+            try
+            {
+               session.close();
+            }
+            catch (Exception ignored)
+            {
+               // eat it
+            }
+         }
+      }
+
+   }
+
+   public void testMultithreadFailoverReplicationOnly() throws Throwable
+   {
+      setUpFileBased(100 * 1024, 20 * 1024);
+
+      int numberOfProducedMessages = multiThreadProducer(false);
+
+      System.out.println(numberOfProducedMessages + " messages produced");
+
+      int numberOfConsumedMessages = multiThreadConsumer(false, false);
+
+      assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+   }
+
+   public void testMultithreadFailoverOnProducing() throws Throwable
+   {
+      setUpFileBased(100 * 1024, 20 * 1024);
+
+      int numberOfProducedMessages = multiThreadProducer(true);
+
+      System.out.println(numberOfProducedMessages + " messages produced");
+
+      int numberOfConsumedMessages = multiThreadConsumer(true, false);
+
+      assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+   }
+
+   public void testMultithreadFailoverOnConsume() throws Throwable
+   {
+      setUpFileBased(100 * 1024, 20 * 1024);
+
+      int numberOfProducedMessages = multiThreadProducer(false);
+
+      System.out.println(numberOfProducedMessages + " messages produced");
+
+      int numberOfConsumedMessages = multiThreadConsumer(false, true);
+
+      assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+   }
+
+   /**
+    * @throws Exception
+    * @throws InterruptedException
+    * @throws Throwable
+    */
+   private int multiThreadConsumer(final boolean connectedOnBackup, final boolean fail) throws Exception,
+                                                                                       InterruptedException,
+                                                                                       Throwable
+   {
+      ClientSession session = null;
+      try
+      {
+         final AtomicInteger numberOfMessages = new AtomicInteger(0);
+
+         final int RECEIVE_TIMEOUT = 10000;
+
+         final ClientSessionFactory factory;
+         final PagingStore store;
+
+         if (connectedOnBackup)
+         {
+            factory = createBackupFactory();
+            store = backupService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+         }
+         else
+         {
+            factory = createFailoverFactory();
+            store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+         }
+
+         session = factory.createSession(false, true, true, false);
+
+         final int initialNumberOfPages = store.getNumberOfPages();
+
+         System.out.println("It has initially " + initialNumberOfPages);
+
+         final int THREAD_CONSUMERS = 20;
+
+         final CountDownLatch startFlag = new CountDownLatch(1);
+         final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_CONSUMERS);
+
+         class Consumer extends Thread
+         {
+            volatile Throwable e;
+
+            ClientSession session;
+
+            public Consumer() throws Exception
+            {
+               session = factory.createSession(null, null, false, true, true, false, 0);
+            }
+
+            @Override
+            public void run()
+            {
+               boolean started = false;
+
+               try
+               {
+
+                  try
+                  {
+                     ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+                     session.start();
+
+                     alignSemaphore.countDown();
+
+                     started = true;
+
+                     startFlag.await();
+
+                     while (true)
+                     {
+                        ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+                        if (msg == null)
+                        {
+                           break;
+                        }
+
+                        if (numberOfMessages.incrementAndGet() % 1000 == 0)
+                        {
+                           System.out.println(numberOfMessages + " messages read");
+                        }
+
+                        msg.acknowledge();
+                     }
+
+                  }
+                  finally
+                  {
+                     session.close();
+                  }
+               }
+               catch (Throwable e)
+               {
+                  log.error(e.getMessage(), e);
+                  if (!started)
+                  {
+                     alignSemaphore.countDown();
+                  }
+                  this.e = e;
+               }
+            }
+         }
+
+         Consumer[] consumers = new Consumer[THREAD_CONSUMERS];
+
+         for (int i = 0; i < THREAD_CONSUMERS; i++)
+         {
+            consumers[i] = new Consumer();
+         }
+
+         for (int i = 0; i < THREAD_CONSUMERS; i++)
+         {
+            consumers[i].start();
+         }
+
+         alignSemaphore.await();
+
+         startFlag.countDown();
+
+         if (fail)
+         {
+            Thread.sleep(1000);
+            while (store.getNumberOfPages() == initialNumberOfPages)
+            {
+               Thread.sleep(100);
+            }
+
+            System.out.println("The system has already depaged " + (initialNumberOfPages - store.getNumberOfPages()) +
+                               ", failing now");
+
+            fail(session);
+         }
+
+         for (Thread t : consumers)
+         {
+            t.join();
+         }
+
+         for (Consumer p : consumers)
+         {
+            if (p.e != null)
+            {
+               throw p.e;
+            }
+         }
+
+         return numberOfMessages.intValue();
+      }
+      finally
+      {
+         if (session != null)
+         {
+            try
+            {
+               session.close();
+            }
+            catch (Exception ignored)
+            {
+            }
+         }
+      }
+   }
+
+   /**
+    * @throws Exception
+    * @throws MessagingException
+    * @throws InterruptedException
+    * @throws Throwable
+    */
+   private int multiThreadProducer(final boolean failover) throws Exception,
+                                                          MessagingException,
+                                                          InterruptedException,
+                                                          Throwable
+   {
+
+      final AtomicInteger numberOfMessages = new AtomicInteger(0);
+      final PagingStore store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+
+      final ClientSessionFactory factory = createFailoverFactory();
+
+      ClientSession session = factory.createSession(false, true, true, false);
+      try
+      {
+         session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+         final int THREAD_PRODUCERS = 30;
+
+         final CountDownLatch startFlag = new CountDownLatch(1);
+         final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_PRODUCERS);
+         final CountDownLatch flagPaging = new CountDownLatch(THREAD_PRODUCERS);
+
+         class Producer extends Thread
+         {
+            volatile Throwable e;
+
+            @Override
+            public void run()
+            {
+               boolean started = false;
+               try
+               {
+                  ClientSession session = factory.createSession(false, true, true);
+                  try
+                  {
+                     ClientProducer producer = session.createProducer(ADDRESS);
+
+                     alignSemaphore.countDown();
+
+                     started = true;
+                     startFlag.await();
+
+                     while (!store.isPaging())
+                     {
+
+                        ClientMessage msg = session.createClientMessage(true);
+
+                        producer.send(msg);
+                        numberOfMessages.incrementAndGet();
+                     }
+
+                     flagPaging.countDown();
+
+                     for (int i = 0; i < 1000; i++)
+                     {
+
+                        ClientMessage msg = session.createClientMessage(true);
+
+                        producer.send(msg);
+                        numberOfMessages.incrementAndGet();
+
+                     }
+
+                  }
+                  finally
+                  {
+                     session.close();
+                  }
+               }
+               catch (Throwable e)
+               {
+                  log.error(e.getMessage(), e);
+                  if (!started)
+                  {
+                     alignSemaphore.countDown();
+                  }
+                  flagPaging.countDown();
+                  this.e = e;
+               }
+            }
+         }
+
+         Producer[] producers = new Producer[THREAD_PRODUCERS];
+
+         for (int i = 0; i < THREAD_PRODUCERS; i++)
+         {
+            producers[i] = new Producer();
+            producers[i].start();
+         }
+
+         alignSemaphore.await();
+
+         // Start producing only when all the sessions are opened
+         startFlag.countDown();
+
+         if (failover)
+         {
+            flagPaging.await(); // for this test I want everybody on the paging part
+
+            Thread.sleep(1500);
+
+            fail(session);
+
+         }
+
+         for (Thread t : producers)
+         {
+            t.join();
+         }
+
+         for (Producer p : producers)
+         {
+            if (p.e != null)
+            {
+               throw p.e;
+            }
+         }
+
+         return numberOfMessages.intValue();
+
+      }
+      finally
+      {
+         session.close();
+         InVMConnector.resetFailures();
+      }
+
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   protected void fail(final ClientSession session) throws Exception
+   {
+      RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+
+      InVMConnector.numberOfFailures = 1;
+      InVMConnector.failOnCreateConnection = true;
+      System.out.println("Forcing a failure");
+      conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+
+   }
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2009-01-22 14:37:22 UTC (rev 5689)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2009-01-22 20:03:36 UTC (rev 5690)
@@ -77,7 +77,7 @@
 
       managerImpl.start();
 
-      TestSupportPageStore store = (TestSupportPageStore)managerImpl.createPageStore(new SimpleString("simple-test"), true);
+      TestSupportPageStore store = (TestSupportPageStore)managerImpl.createPageStore(new SimpleString("simple-test"));
 
       ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
 
@@ -127,7 +127,7 @@
                                                             false);
       managerImpl.start();
 
-      managerImpl.createPageStore(new SimpleString("simple-test"), true);
+      managerImpl.createPageStore(new SimpleString("simple-test"));
 
       ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(100));
 




More information about the jboss-cvs-commits mailing list