[jboss-cvs] JBoss Messaging SVN: r5615 - in branches/Branch_Failover_Page: src/main/org/jboss/messaging/core/postoffice/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jan 12 22:52:51 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-12 22:52:51 -0500 (Mon, 12 Jan 2009)
New Revision: 5615

Modified:
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
Log:
LargeMessage & Failover fixes

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-01-12 16:45:17 UTC (rev 5614)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-01-13 03:52:51 UTC (rev 5615)
@@ -54,8 +54,6 @@
  */
 public interface PostOffice extends MessagingComponent
 {
-   boolean isBackup(); // Remove-me... debug for now only
-   
    boolean addDestination(SimpleString address, boolean durable) throws Exception;
 
    boolean removeDestination(SimpleString address, boolean durable) throws Exception;

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-12 16:45:17 UTC (rev 5614)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-13 03:52:51 UTC (rev 5615)
@@ -264,11 +264,6 @@
       return addressManager.getDestinations();
    }
    
-   public boolean isBackup()  // Remove-me... debug for now only
-   {
-      return backup;
-   }
-
    // TODO - needs to be synchronized to prevent happening concurrently with activate().
    // (and possible removeBinding and other methods)
    // Otherwise can have situation where createQueue comes in before failover, then failover occurs

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-12 16:45:17 UTC (rev 5614)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-13 03:52:51 UTC (rev 5615)
@@ -26,6 +26,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -69,19 +70,21 @@
  */
 public class ServerConsumerImpl implements ServerConsumer
 {
-   // Constants
-   // ------------------------------------------------------------------------------------
+   // Constants ------------------------------------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(ServerConsumerImpl.class);
 
-   // Static
-   // ---------------------------------------------------------------------------------------
+   // Static ---------------------------------------------------------------------------------------
 
-   // Attributes
-   // -----------------------------------------------------------------------------------
+   private static final boolean trace = log.isTraceEnabled();
 
-   private final boolean trace = log.isTraceEnabled();
+   private static void trace(final String message)
+   {
+      log.trace(message);
+   }
 
+   // Attributes -----------------------------------------------------------------------------------
+
    private final long id;
 
    private final Queue messageQueue;
@@ -119,8 +122,7 @@
 
    private final boolean preAcknowledge;
 
-   // Constructors
-   // ---------------------------------------------------------------------------------
+   // Constructors ---------------------------------------------------------------------------------
 
    public ServerConsumerImpl(final long id,
                              final ServerSession session,
@@ -309,18 +311,8 @@
       {
          int previous = availableCredits.getAndAdd(credits);
 
-         log.info("Had " + previous +
-                  ", received " +
-                  credits +
-                  " = " +
-                  availableCredits +
-                  " backup = " +
-                  messageQueue.isBackup() + 
-                  Thread.currentThread().getName()); // remove me
-
          if (previous <= 0 && previous + credits > 0)
          {
-//            log.info("********************* Calling promptDelivery, backup = " + messageQueue.isBackup()); -- remove me
             promptDelivery();
          }
       }
@@ -510,13 +502,22 @@
    {
       if (availableCredits != null && availableCredits.get() <= 0)
       {
-         log.info("No Credits " + availableCredits + " backup=" + this.messageQueue.isBackup() + " ref = " + ref,
-                  new Exception("trace"));
+         if (trace)
+         {
+            trace("doHandle: No Credits " + availableCredits + " backup=" + messageQueue.isBackup() + " ref = " + ref);
+         }
          return HandleStatus.BUSY;
       }
       else
       {
-         log.info("Accepted ref = " + ref + " backup = " + messageQueue.isBackup(), new Exception("trace"));
+         if (trace)
+         {
+            trace("doHandle: Accepted ref = " + ref +
+                  " backup = " +
+                  messageQueue.isBackup() +
+                  " Thread = " +
+                  Thread.currentThread().getName());
+         }
       }
 
       lock.lock();
@@ -529,7 +530,10 @@
          // queue for delivery later.
          if (!started)
          {
-            log.info("! started");
+            if (trace)
+            {
+               trace("doHandle: ignore reference " + ref + " as consumer is not started!");
+            }
             return HandleStatus.BUSY;
          }
 
@@ -537,7 +541,10 @@
          // This has to be checked inside the lock as the set to null is done inside the lock
          if (largeMessageSender != null)
          {
-            log.info("LargeMessageSender != null, backup = " + messageQueue.isBackup());
+            if (trace)
+            {
+               trace("doHandle: LargeMessageSender != null, can't send another message while send is pending");
+            }
             return HandleStatus.BUSY;
          }
 
@@ -583,29 +590,30 @@
 
    private void sendLargeMessage(final MessageReference ref, final ServerMessage message)
    {
-
       DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
                                                                                          message.getMessageID(),
                                                                                          message.getDestination()));
 
+      largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
+
       if (result == null)
       {
-         largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
-
          largeMessageSender.sendLargeMessage();
       }
       else
       {
+         
+         final CountDownLatch latchDone = new CountDownLatch(1);
          // Send when replicate delivery response comes back
          result.setResultRunner(new Runnable()
          {
             public void run()
             {
-               largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
-
-               largeMessageSender.sendLargeMessage();
+               latchDone.countDown();
             }
          });
+
+         largeMessageSender.sendLargeMessage();
       }
 
    }
@@ -678,8 +686,6 @@
       {
          lock.lock();
 
-         log.info("Entering SendLargeMessage (backup = " + messageQueue.isBackup() + ")");
-         
          try
          {
             if (pendingLargeMessage == null)
@@ -689,13 +695,15 @@
 
             if (availableCredits != null && availableCredits.get() <= 0)
             {
-               log.info("Leaving send LargeMessage because of credits, backup = " + messageQueue.isBackup() + " even before it started sending messages");
                return false;
             }
 
             if (!sentFirstMessage)
             {
-               log.info("Sending first message = " + messageQueue.isBackup());
+               if (trace)
+               {
+                  trace("sendLargeMessage:: sending initialMessage, backup = " + messageQueue.isBackup());
+               }
                sentFirstMessage = true;
 
                MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
@@ -712,19 +720,33 @@
                {
                   // RequiredBufferSize on this case represents the right number of bytes sent
                   availableCredits.addAndGet(-initialMessage.getRequiredBufferSize());
-                  log.info("Initial send, taking out " + initialMessage.getRequiredBufferSize() +
+                  if (trace)
+                  {
+                     trace("sendLargeMessage:: Initial send, taking out " + initialMessage.getRequiredBufferSize() +
                            " credits, current = " +
                            availableCredits +
                            " isBackup = " +
                            messageQueue.isBackup());
+
+                  }
                }
             }
+            else
+            {
+               if (trace)
+               {
+                  trace("sendLargeMessage: Summarizing sendLargeMessage, currentPosition = " + positionPendingLargeMessage);
+               }
+            }
 
             while (positionPendingLargeMessage < sizePendingLargeMessage)
             {
                if (availableCredits != null && availableCredits.get() <= 0)
                {
-                  log.info("Leaving loop of send LargeMessage because of credits, backup = " + messageQueue.isBackup());
+                  if (trace)
+                  {
+                     trace("sendLargeMessage: Leaving loop of send LargeMessage because of credits, backup = " + messageQueue.isBackup());
+                  }
                   return false;
                }
 
@@ -737,14 +759,24 @@
                   availableCredits.addAndGet(-chunk.getRequiredBufferSize());
                }
 
-               log.info("Sending " + chunk.getRequiredBufferSize() + " availableCredits now is " + availableCredits + " isBackup = " + messageQueue.isBackup());
+               if (trace)
+               {
+                  trace("sendLargeMessage: Sending " + chunk.getRequiredBufferSize() +
+                        " availableCredits now is " +
+                        availableCredits +
+                        " isBackup = " +
+                        messageQueue.isBackup());
+               }
 
                channel.send(chunk);
 
                positionPendingLargeMessage += chunkLen;
             }
 
-            log.info("Finished sendLargeMessage isBackup = " + messageQueue.isBackup());
+            if (trace)
+            {
+               trace("Finished sendLargeMessage isBackup = " + messageQueue.isBackup());
+            }
 
             pendingLargeMessage.releaseResources();
 

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-12 16:45:17 UTC (rev 5614)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-13 03:52:51 UTC (rev 5615)
@@ -89,7 +89,6 @@
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.core.transaction.TransactionOperation;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.util.IDGenerator;
 import org.jboss.messaging.util.SimpleIDGenerator;
@@ -995,9 +994,6 @@
       {
          try
          {
-            log.info("Receiving credits... first option.. backup = " + postOffice.isBackup() +
-                     " and it is receiving " +
-                     packet.getCredits());
             consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
          }
          catch (Exception e)
@@ -1014,11 +1010,6 @@
          {
             public void run()
             {
-               log.info("Receiving credits... second option.. backup = " + postOffice.isBackup() +
-                        " and it is receiving " +
-                        packet.getCredits() +
-                        " thread = " +
-                        Thread.currentThread().getName());
                try
                {
                   consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2009-01-12 16:45:17 UTC (rev 5614)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2009-01-13 03:52:51 UTC (rev 5615)
@@ -22,9 +22,11 @@
 
 package org.jboss.messaging.tests.integration.cluster.failover;
 
+import java.io.File;
 import java.nio.ByteBuffer;
 
 import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientFileMessage;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
@@ -33,7 +35,6 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -62,39 +63,42 @@
 
    public void testLargeMessageReplicatedNoFailover() throws Exception
    {
-      testLargeMessage(-1, 2);
+      testLargeMessage(-1, 500, 1024 * 1024);
    }
 
+
+   public void testLargeMessageReplicatedNoFailoverSmallMessageSize() throws Exception
+   {
+      testLargeMessage(-1, 500, 50 * 1024);
+   }
+
    public void testLargeMessageFailOnProducing() throws Exception
    {
-      testLargeMessage(1, 500);
+      testLargeMessage(1, 500, 1024 * 1024);
    }
 
-   
-//   public void testFail() throws Exception
-//   {
-//      for (int i = 0; i < 100; i++)
-//      {
-//         System.out.println ("****************** " + i);
-//         testLargeMessageFailOnConsume();
-//         tearDown();
-//         setUp();
-//      }
-//   }
-//   
+   // public void testFail() throws Exception
+   // {
+   // for (int i = 0; i < 100; i++)
+   // {
+   // System.out.println ("****************** " + i);
+   // testLargeMessageFailOnConsume();
+   // tearDown();
+   // setUp();
+   // }
+   // }
+   //   
    public void testLargeMessageFailOnConsume() throws Exception
    {
-      testLargeMessage(2, 2);
+      testLargeMessage(2, 10, 1024 * 1024);
    }
 
-   private void testLargeMessage(final int placeToFail, final int numberOfMessages) throws Exception
+   private void testLargeMessage(final int placeToFail, final int numberOfMessages, final int messageSize) throws Exception
    {
       ClientSessionFactory factory = createFailoverFactory();
 
       factory.setMinLargeMessageSize(10 * 1024);
 
-      final int messageSize = 1024*1024;
-
       try
       {
 
@@ -126,7 +130,8 @@
       try
       {
 
-         ClientConsumer consumer = session.createConsumer(ADDRESS);
+         ClientConsumer consumer = session.createFileConsumer(new File(getTestDir() + File.separator +
+                                                                       getClientLargeMessagesDir("live")), ADDRESS);
 
          session.start();
 
@@ -142,20 +147,17 @@
                }
             }
 
-            ClientMessage message = consumer.receive(10000);
+            ClientFileMessage message = (ClientFileMessage)consumer.receive(20000);
 
-            assertNotNull(message);
+            assertNotNull("Message i=" + i + " wasn't received", message);
 
             message.acknowledge();
+            
+            File file = message.getFile();
 
-            MessagingBuffer buffer = message.getBody();
+            assertEquals(messageSize, file.length());
+         }
 
-            buffer.rewind();
-
-            assertEquals(messageSize, buffer.limit());
-
-            assertEquals(i, buffer.getInt());
-         }
          assertNull(consumer.receive(500));
       }
       finally
@@ -181,7 +183,7 @@
             {
                e.printStackTrace();
             }
-            
+
             System.out.println("***************************************** Forcing failure");
             conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
          }
@@ -218,7 +220,7 @@
                {
                   forceFailure(conn);
                }
-           }
+            }
 
             ClientMessage message = session.createClientMessage(true);
 

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2009-01-12 16:45:17 UTC (rev 5614)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2009-01-13 03:52:51 UTC (rev 5615)
@@ -209,12 +209,5 @@
    {
    }
 
-   /* (non-Javadoc)
-    * @see org.jboss.messaging.core.postoffice.PostOffice#isBackup()
-    */
-   public boolean isBackup()
-   {
-      return false;
-   }
 
 }




More information about the jboss-cvs-commits mailing list