[jboss-cvs] JBoss Messaging SVN: r5653 - in branches/Branch_Failover_Page: src/main/org/jboss/messaging/core/persistence/impl/journal and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jan 16 19:18:23 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-16 19:18:22 -0500 (Fri, 16 Jan 2009)
New Revision: 5653

Modified:
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveContinuationMessage.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendContinuationMessage.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java
Log:
Backup of my current work

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2009-01-17 00:18:22 UTC (rev 5653)
@@ -277,7 +277,7 @@
          
          lastChunk = pos >= bodySize;
 
-         final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(), !lastChunk, lastChunk && sendBlocking);
+         final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(), !lastChunk, lastChunk && sendBlocking, this.address);
 
          if (sendBlocking && lastChunk)
          {

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java	2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java	2009-01-17 00:18:22 UTC (rev 5653)
@@ -187,7 +187,7 @@
       return true;
    }
 
-   public void deleteFile() throws MessagingException
+   public synchronized void deleteFile() throws MessagingException
    {
       if (file != null)
       {

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java	2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java	2009-01-17 00:18:22 UTC (rev 5653)
@@ -39,19 +39,20 @@
 
    // Constants -----------------------------------------------------
 
+   public static final int SESSION_CONTINUATION_BASE_SIZE = BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
+                                                            DataConstants.SIZE_BOOLEAN;
+
    // Attributes ----------------------------------------------------
 
-   private byte[] body;
+   protected byte[] body;
 
-   private boolean continues;
+   protected boolean continues;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionContinuationMessage(byte type,
-                                     final byte[] body,
-                                     final boolean continues)
+   public SessionContinuationMessage(byte type, final byte[] body, final boolean continues)
    {
       super(type);
       this.body = body;
@@ -84,9 +85,7 @@
    @Override
    public int getRequiredBufferSize()
    {
-      return BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
-             body.length +
-             DataConstants.SIZE_BOOLEAN;
+      return SESSION_CONTINUATION_BASE_SIZE + body.length; 
    }
 
    @Override

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveContinuationMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveContinuationMessage.java	2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveContinuationMessage.java	2009-01-17 00:18:22 UTC (rev 5653)
@@ -39,6 +39,8 @@
 
    // Constants -----------------------------------------------------
 
+   public static final int SESSION_RECEIVE_CONTINUATION_BASE_SIZE = SESSION_CONTINUATION_BASE_SIZE + DataConstants.SIZE_LONG;
+   
    // Attributes ----------------------------------------------------
 
    private long consumerID;
@@ -83,7 +85,7 @@
    @Override
    public int getRequiredBufferSize()
    {
-      return super.getRequiredBufferSize() + DataConstants.SIZE_LONG;
+      return SESSION_RECEIVE_CONTINUATION_BASE_SIZE + body.length; 
    }
 
    @Override

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java	2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java	2009-01-17 00:18:22 UTC (rev 5653)
@@ -22,7 +22,6 @@
 
 package org.jboss.messaging.core.remoting.impl.wireformat;
 
-import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.impl.ClientMessageImpl;
 import org.jboss.messaging.core.client.impl.ClientMessageInternal;
 import org.jboss.messaging.core.logging.Logger;
@@ -41,6 +40,11 @@
 {
    // Constants -----------------------------------------------------
 
+   public static final int SESSIO_RECEIVE_MESSAGE_LARGE_MESSAGE_SIZE = BASIC_PACKET_SIZE + DataConstants.SIZE_LONG +
+                                                                       DataConstants.SIZE_INT +
+                                                                       DataConstants.SIZE_BOOLEAN +
+                                                                       DataConstants.SIZE_INT;
+
    private static final Logger log = Logger.getLogger(SessionReceiveMessage.class);
 
    // Attributes ----------------------------------------------------
@@ -115,7 +119,7 @@
    {
       return largeMessageHeader;
    }
-   
+
    /**
     * @return the largeMessage
     */
@@ -133,11 +137,7 @@
    {
       if (largeMessage)
       {
-         return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG +
-                DataConstants.SIZE_INT +
-                DataConstants.SIZE_BOOLEAN +
-                DataConstants.SIZE_INT +
-                largeMessageHeader.length;
+         return SESSIO_RECEIVE_MESSAGE_LARGE_MESSAGE_SIZE + largeMessageHeader.length;
       }
       else
       {

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendContinuationMessage.java	2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendContinuationMessage.java	2009-01-17 00:18:22 UTC (rev 5653)
@@ -24,6 +24,7 @@
 
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.util.DataConstants;
+import org.jboss.messaging.util.SimpleString;
 
 
 /**
@@ -43,7 +44,9 @@
    // Attributes ----------------------------------------------------
 
    private boolean requiresResponse;
+   private SimpleString address;
 
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -64,10 +67,12 @@
     */
    public SessionSendContinuationMessage(final byte[] body,
                                          final boolean continues,
-                                         final boolean requiresResponse)
+                                         final boolean requiresResponse,
+                                         final SimpleString address)
    {
       super(SESS_SEND_CONTINUATION, body, continues);
       this.requiresResponse = requiresResponse;
+      this.address = address;
    }
 
 
@@ -80,11 +85,16 @@
    {
       return requiresResponse;
    }
+   
+   public SimpleString getAddress()
+   {
+      return address;
+   }
 
    @Override
    public int getRequiredBufferSize()
    {
-      return super.getRequiredBufferSize() + DataConstants.SIZE_BOOLEAN;
+      return super.getRequiredBufferSize() + DataConstants.SIZE_BOOLEAN + SimpleString.sizeofString(address);
    }
 
    @Override
@@ -92,6 +102,7 @@
    {
       super.encodeBody(buffer);
       buffer.putBoolean(requiresResponse);
+      buffer.putSimpleString(address);
    }
 
    @Override
@@ -99,6 +110,7 @@
    {
       super.decodeBody(buffer);
       requiresResponse = buffer.getBoolean();
+      address = buffer.getSimpleString();
    }
 
 

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java	2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java	2009-01-17 00:18:22 UTC (rev 5653)
@@ -103,6 +103,13 @@
    {
       return serverMessage;
    }
+   
+   /** This should be only used to set the serverLargeMessage, 
+    *  which needs to be initialised on the storage */
+   public void setServerLargeMessage(ServerMessage serverLargeMessage)
+   {
+      this.serverMessage = serverLargeMessage;
+   }
 
    public byte[] getLargeMessageHeader()
    {

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-17 00:18:22 UTC (rev 5653)
@@ -21,8 +21,6 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.list.PriorityLinkedList;
@@ -66,8 +64,6 @@
    private static final boolean trace = log.isTraceEnabled();
 
    public static final int NUM_PRIORITIES = 10;
-   
-   private static final int MAX_NUMBER_OF_DEPAGES_ON_BACKUP = 10;
 
    private volatile long persistenceID = -1;
 
@@ -433,8 +429,9 @@
       PagingStore store = pagingManager.getPageStore(address);
       
       if (ref == null)
-      {
-         for (int i = 0; i < MAX_NUMBER_OF_DEPAGES_ON_BACKUP; i++)
+     {
+         // TODO: should we impose a MAX?
+         for (;;)
          {
             // Can't have the same store being depaged in more than one thread
             synchronized (store)
@@ -444,7 +441,10 @@
                if (ref == null)
                {
                   // force a depage
-                  store.readPage();
+                  if (!store.readPage())
+                  {
+                     break;
+                  }
                }
                else
                {

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-17 00:18:22 UTC (rev 5653)
@@ -76,7 +76,8 @@
 
    // Static ---------------------------------------------------------------------------------------
 
-   private static final boolean trace = log.isTraceEnabled();
+   // private static final boolean trace = log.isTraceEnabled();
+   private static final boolean trace = false;
 
    private static void trace(final String message)
    {
@@ -105,6 +106,10 @@
 
    private volatile LargeMessageDeliverer largeMessageDeliverer = null;
 
+   // We will only be sending one largeMessage at any time, however during replication you may have
+   // more than one LargeMessage pending on the replicationBuffer
+   private final AtomicInteger pendingLargeMessagesCounter = new AtomicInteger(0);
+
    /**
     * if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
     */
@@ -314,12 +319,27 @@
       }
       else
       {
+         if (trace)
+         {
+            trace("Receiving " + credits + " credits");
+         }
          int previous = availableCredits.getAndAdd(credits);
 
          if (previous <= 0 && previous + credits > 0)
          {
+            if (trace)
+            {
+               trace("promptDelivery being resumed by receiveCredits");
+            }
             promptDelivery();
          }
+         else
+         {
+            if (trace)
+            {
+               trace("PromptDelivery wasn't negative");
+            }
+         }
       }
    }
 
@@ -489,13 +509,24 @@
 
    private void promptDelivery()
    {
-      if (largeMessageDeliverer != null)
+      lock.lock();
+      try
       {
-         resumeLargeMessage();
+         // largeMessageDeliverer is aways set inside a lock
+         // if we don't acquire a lock, we will have NPE eventually
+         if (largeMessageDeliverer != null)
+         {
+            resumeLargeMessage();
+         }
+         else
+         {
+            session.promptDelivery(messageQueue);
+
+         }
       }
-      else
+      finally
       {
-         session.promptDelivery(messageQueue);
+         lock.unlock();
       }
    }
 
@@ -506,15 +537,16 @@
    {
       if (messageQueue.isBackup())
       {
-         // We are supposed to finish largeMessageDeliverer, or use all the possible credits before we return this method.
-         // If we play the commands on a different order than how they were generated on the live node, we will 
+         // We are supposed to finish largeMessageDeliverer, or use all the possible credits before we return this
+         // method.
+         // If we play the commands on a different order than how they were generated on the live node, we will
          // eventually still be running this largeMessage before the next message come, what would reject messages
          // from the cluster
          largeMessageDeliverer.deliver();
       }
       else
       {
-         executor.execute(largeMessageDeliverer.resumeLargeMessageRunnable);
+         executor.execute(resumeLargeMessageRunnable);
       }
    }
 
@@ -559,12 +591,15 @@
 
          // If there is a pendingLargeMessage we can't take another message
          // This has to be checked inside the lock as the set to null is done inside the lock
-         if (largeMessageDeliverer != null)
+         if (pendingLargeMessagesCounter.get() > 0)
          {
-            if (trace)
+            if (messageQueue.isBackup())
             {
-               trace("doHandle: LargeMessageDeliverer != null, can't send another message while send is pending");
+               log.warn("doHandle: rejecting message while send is pending, ignoring reference = " + ref +
+                        " backup = " +
+                        messageQueue.isBackup());
             }
+
             return HandleStatus.BUSY;
          }
 
@@ -610,36 +645,46 @@
 
    private void deliverLargeMessage(final MessageReference ref, final ServerMessage message)
    {
-      largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage)message, ref);
+      pendingLargeMessagesCounter.incrementAndGet();
 
-      channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID(), message.getDestination()));
 
-      // TODO: Should we block until the replication is done?
-//      DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID(), message.getDestination()));
-//      if (result != null)
-//      {
-//         final CountDownLatch latch = new CountDownLatch(1);
-//         result.setResultRunner(new Runnable()
-//         {
-//            public void run()
-//            {
-//               latch.countDown();
-//            }
-//            
-//         });
-//         try
-//         {
-//            latch.await();
-//         }
-//         catch (InterruptedException ignored)
-//         {
-//         }
-//      }
+      final LargeMessageDeliverer localDeliverer = new LargeMessageDeliverer((LargeServerMessage)message, ref);
 
-      // deliverLargeMessage has to be done on the same thread used on the QueueImpl or we would have problems with flow control credits
-      // credits would arrive while deliver still being done, what would cause interruption on the flowControl
-      largeMessageDeliverer.deliver();
-   
+      DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
+                                                                                         message.getMessageID(),
+                                                                                         message.getDestination()));
+
+      if (result == null)
+      {
+         // it doesn't need lock because deliverLargeMesasge is already inside the lock.lock()
+         largeMessageDeliverer = localDeliverer;
+         largeMessageDeliverer.deliver();
+      }
+      else
+      {
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               // setting & unsetting largeMessageDeliver is done inside the lock,
+               // so this needs to be locked
+               lock.lock();
+               try
+               {
+                  largeMessageDeliverer = localDeliverer;
+                  if (largeMessageDeliverer.deliver())
+                  {
+                     promptDelivery();
+                  }
+               }
+               finally
+               {
+                  lock.unlock();
+               }
+            }
+         });
+      }
+
    }
 
    /**
@@ -681,6 +726,26 @@
    // Inner classes
    // ------------------------------------------------------------------------
 
+   final Runnable resumeLargeMessageRunnable = new Runnable()
+   {
+      public void run()
+      {
+         lock.lock();
+         try
+         {
+            if (largeMessageDeliverer == null || largeMessageDeliverer.deliver())
+            {
+               // prompt Delivery only if chunk was finished
+               session.promptDelivery(messageQueue);
+            }
+         }
+         finally
+         {
+            lock.unlock();
+         }
+      }
+   };
+
    /** Internal encapsulation of the logic on sending LargeMessages.
     *  This Inner class was created to avoid a bunch of loose properties about the current LargeMessage being sent*/
    private class LargeMessageDeliverer
@@ -697,18 +762,6 @@
       /** The current position on the message being processed */
       private volatile long positionPendingLargeMessage;
 
-      final Runnable resumeLargeMessageRunnable = new Runnable()
-      {
-         public void run()
-         {
-            if (deliver())
-            {
-               // prompt Delivery only if chunk was finished
-               session.promptDelivery(messageQueue);
-            }
-         }
-      };
-
       public LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref)
       {
          pendingLargeMessage = message;
@@ -728,12 +781,25 @@
             {
                return true;
             }
-
+            
+            
+            
             if (availableCredits != null && availableCredits.get() <= 0)
             {
                return false;
             }
 
+            int creditsUsed;
+
+            if (availableCredits != null)
+            {
+               creditsUsed = preCalculateFlowControl();
+            }
+            else
+            {
+               creditsUsed = 0;
+            }
+
             if (!sentFirstMessage)
             {
                if (trace)
@@ -754,13 +820,15 @@
 
                if (availableCredits != null)
                {
-                  // RequiredBufferSize on this case represents the right number of bytes sent
-                  availableCredits.addAndGet(-initialMessage.getRequiredBufferSize());
+                  if ((creditsUsed -= initialMessage.getRequiredBufferSize()) < 0)
+                  {
+                     log.warn("Credit logic is not working properly, too many credits were taken");
+                  }
                   if (trace)
                   {
                      trace("deliverLargeMessage:: Initial send, taking out " + initialMessage.getRequiredBufferSize() +
                            " credits, current = " +
-                           availableCredits +
+                           creditsUsed +
                            " isBackup = " +
                            messageQueue.isBackup());
 
@@ -777,7 +845,7 @@
 
             while (positionPendingLargeMessage < sizePendingLargeMessage)
             {
-               if (availableCredits != null && availableCredits.get() <= 0)
+               if (creditsUsed <= 0)
                {
                   if (trace)
                   {
@@ -792,7 +860,10 @@
 
                if (availableCredits != null)
                {
-                  availableCredits.addAndGet(-chunk.getRequiredBufferSize());
+                  if ((creditsUsed -= chunk.getRequiredBufferSize()) < 0)
+                  {
+                     log.warn("Flowcontrol logic is not working properly, too many credits were taken");
+                  }
                }
 
                if (trace)
@@ -809,6 +880,11 @@
                positionPendingLargeMessage += chunkLen;
             }
 
+            if (creditsUsed != 0)
+            {
+               log.warn("Flowcontrol logic is not working properly... creidts = " + creditsUsed);
+            }
+            
             if (trace)
             {
                trace("Finished deliverLargeMessage isBackup = " + messageQueue.isBackup());
@@ -818,6 +894,8 @@
 
             largeMessageDeliverer = null;
 
+            pendingLargeMessagesCounter.decrementAndGet();
+
             return true;
          }
          finally
@@ -826,6 +904,39 @@
          }
       }
 
+      /**
+       * Credits flow control are calculated in advance.
+       * @return
+       */
+      private int preCalculateFlowControl()
+      {
+         for (;;)
+         {
+            final int currentCredit;
+            int creditsUsed = 0;
+            currentCredit = availableCredits.get();
+            
+            if (!sentFirstMessage)
+            {
+               creditsUsed = SessionReceiveMessage.SESSIO_RECEIVE_MESSAGE_LARGE_MESSAGE_SIZE + pendingLargeMessage.getPropertiesEncodeSize();
+            }
+
+            long chunkLen = 0;
+            for (long i = positionPendingLargeMessage; creditsUsed < currentCredit  && i < sizePendingLargeMessage; i += chunkLen)
+            {
+               chunkLen = (int)Math.min(sizePendingLargeMessage - i, minLargeMessageSize);
+               creditsUsed += chunkLen + SessionReceiveContinuationMessage.SESSION_RECEIVE_CONTINUATION_BASE_SIZE;
+            }
+
+            // The calculation of credits and taking credits out has to be taken atomically.
+            // This is being calculated before the packets are sent, so the Consumer shouldn't be receiving credits during this calculation
+            if (availableCredits.compareAndSet(currentCredit, currentCredit - creditsUsed))
+            {
+               return creditsUsed;
+            }
+         }
+      }
+
       private SessionReceiveContinuationMessage createChunkSend()
       {
          SessionReceiveContinuationMessage chunk;

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-17 00:18:22 UTC (rev 5653)
@@ -990,19 +990,16 @@
    {
       DelayedResult result = channel.replicatePacket(packet);
 
-      try
-      {
-         // Note we don't wait for response before handling this
-
-         consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to receive credits", e);
-      }
-
       if (result == null)
       {
+         try
+         {
+            consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to receive credits", e);
+         }
          channel.confirm(packet);
       }
       else
@@ -1011,6 +1008,14 @@
          {
             public void run()
             {
+               try
+               {
+                  consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+               }
+               catch (Exception e)
+               {
+                  log.error("Failed to receive credits", e);
+               }
                channel.confirm(packet);
             }
          });
@@ -1027,6 +1032,28 @@
          packet.setMessageID(id);
       }
 
+      // need to create the LargeMessage before continue
+      if (!doCreateLargeMessage(packet))
+      {
+         // packet logged an error, and played with channel.returns... and nothing needs to be done now
+         return;
+      }
+      
+      
+      final SendLock lock;
+
+      if (channel.getReplicatingChannel() != null)
+      {
+         lock = postOffice.getAddressLock(packet.getServerMessage().getDestination());
+
+         lock.beforeSend();
+      }
+      else
+      {
+         lock = null;
+      }
+
+      
       DelayedResult result = channel.replicatePacket(packet);
 
       // With a send we must make sure it is replicated to backup before being processed on live
@@ -1043,8 +1070,10 @@
             public void run()
             {
                doSendLargeMessage(packet);
+               lock.afterSend();
             }
          });
+         
       }
 
    }
@@ -1109,9 +1138,22 @@
    {
       DelayedResult result = channel.replicatePacket(packet);
 
-      // With a send we must make sure it is replicated to backup before being processed on live
-      // or can end up with delivery being processed on backup before original send
+      final SendLock lock;
 
+      if (channel.getReplicatingChannel() != null)
+      {
+         // We can't use currentLargeMessage to get the address, as it may be pending on the replication
+         // We would need to require blocked replication for large-messages to avoid getting this address
+         lock = postOffice.getAddressLock(packet.getAddress()); 
+
+         lock.beforeSend();
+      }
+      else
+      {
+         lock = null;
+      }
+
+
       if (result == null)
       {
          doSendContinuations(packet);
@@ -1123,6 +1165,7 @@
             public void run()
             {
                doSendContinuations(packet);
+               lock.afterSend();
             }
          });
       }
@@ -2343,6 +2386,44 @@
 
       started = s;
    }
+   
+   
+   /**
+    * We need to create the LargeMessage before replicating the packe, or else we won't know how to extract the destination,
+    * which is stored on the header
+    * @param packet
+    * @throws Exception
+    */
+   private boolean doCreateLargeMessage(final SessionSendMessage packet)
+   {
+      try
+      {
+         packet.setServerLargeMessage(createLargeMessageStorage(packet.getMessageID(), packet.getLargeMessageHeader()));
+         return true;
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to create large message", e);
+         Packet response = null;
+         if (packet.isRequiresResponse())
+         {
+            if (e instanceof MessagingException)
+            {
+               response = new MessagingExceptionMessage((MessagingException)e);
+            }
+            else
+            {
+               response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+            }
+         }
+         channel.confirm(packet);
+         if (response != null)
+         {
+            channel.send(response);
+         }
+         return false;
+      }
+   }
 
    private void doSendLargeMessage(final SessionSendMessage packet)
    {
@@ -2350,8 +2431,7 @@
 
       try
       {
-         largeMessage = createLargeMessageStorage(packet.getMessageID(), packet.getLargeMessageHeader());
-
+         this.largeMessage = (LargeServerMessage)packet.getServerMessage();
          if (packet.isRequiresResponse())
          {
             response = new NullResponseMessage();
@@ -2511,7 +2591,7 @@
 
       largeMessage.decodeProperties(headerBuffer);
 
-      // client didn send the ID originally
+      // client didn't send the ID originally
       largeMessage.setMessageID(messageID);
 
       return largeMessage;

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2009-01-17 00:18:22 UTC (rev 5653)
@@ -67,7 +67,7 @@
    // Constants -----------------------------------------------------
 
    final static int RECEIVE_WAIT_TIME = 10000;
-   
+
    // Attributes ----------------------------------------------------
 
    static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
@@ -160,7 +160,7 @@
          mockFactory.setBlockOnPersistentSend(false);
          mockFactory.setBlockOnAcknowledge(false);
 
-         session = mockFactory.createSession(null, null, false, true, true, false,  0);
+         session = mockFactory.createSession(null, null, false, true, true, false, 0);
 
          callback.session = session;
 
@@ -205,7 +205,7 @@
 
       checkFileRead(file, 13333);
    }
-   
+
    public void testClearOnClientBuffer() throws Exception
    {
       clearData();
@@ -215,7 +215,7 @@
 
       final int numberOfIntegers = 10;
       final int numberOfMessages = 100;
-      
+
       try
       {
          ClientSessionFactory sf = createInVMFactory();
@@ -227,7 +227,7 @@
          ClientSession session = sf.createSession(null, null, false, true, false, false, 0);
 
          session.createQueue(ADDRESS, ADDRESS, null, true, false);
-         
+
          messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
 
          ClientProducer producer = session.createProducer(ADDRESS);
@@ -243,16 +243,15 @@
             producer.send(message);
          }
 
-
          ClientConsumer consumer = session.createFileConsumer(new File(getClientLargeMessagesDir()), ADDRESS);;
 
          File clientfiles = new File(getClientLargeMessagesDir());
-         
+
          session.start();
-         
+
          ClientMessage msg = consumer.receive(1000);
          msg.acknowledge();
-         
+
          for (int i = 0; i < 100; i++)
          {
             if (clientfiles.listFiles().length > 0)
@@ -261,16 +260,14 @@
             }
             Thread.sleep(100);
          }
-         
+
          assertTrue(clientfiles.listFiles().length > 0);
 
          session.close();
-         
-         
+
          assertEquals(1, clientfiles.list().length); // 1 message was received, that should be kept
 
          validateNoFilesOnLargeDir();
-         
 
       }
       finally
@@ -283,7 +280,7 @@
          {
          }
       }
-      
+
    }
 
    public void testMessageChunkFilePersistence() throws Exception
@@ -296,11 +293,11 @@
       testChunks(true, false, false, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
    }
 
-//Uncomment when https://jira.jboss.org/jira/browse/JBMESSAGING-1472 is complete   
-//   public void testMessageChunkFilePersistenceBlockedPreCommit() throws Exception
-//   {
-//      testChunks(true, false, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
-//   }
+   // Uncomment when https://jira.jboss.org/jira/browse/JBMESSAGING-1472 is complete
+   // public void testMessageChunkFilePersistenceBlockedPreCommit() throws Exception
+   // {
+   // testChunks(true, false, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+   // }
 
    public void testMessageChunkFilePersistenceDelayed() throws Exception
    {
@@ -393,46 +390,41 @@
 
          session.createQueue(ADDRESS, queue[0], null, true, false);
          session.createQueue(ADDRESS, queue[1], null, true, false);
-         
 
          int numberOfIntegers = 100000;
 
          Message clientFile = createLargeClientMessage(session, numberOfIntegers);
-         //Message clientFile = createLargeClientMessage(session, numberOfIntegers);
+         // Message clientFile = createLargeClientMessage(session, numberOfIntegers);
 
          ClientProducer producer = session.createProducer(ADDRESS);
-         
 
-
          session.start();
-         
+
          producer.send(clientFile);
 
          producer.close();
 
-         
          ClientConsumer consumer = session.createFileConsumer(new File(getClientLargeMessagesDir()), queue[1]);
          ClientMessage msg = consumer.receive(RECEIVE_WAIT_TIME);
-         assertNull(consumer.receive(1000)); 
+         assertNull(consumer.receive(1000));
          assertNotNull(msg);
-         
+
          msg.acknowledge();
          consumer.close();
-         
+
          System.out.println("Stopping");
 
          session.stop();
-         
+
          ClientConsumer consumer1 = session.createFileConsumer(new File(getClientLargeMessagesDir()), queue[0]);
 
          session.start();
-         
 
          msg = consumer1.receive(RECEIVE_WAIT_TIME);
          assertNotNull(msg);
          msg.acknowledge();
          consumer1.close();
-         
+
          session.commit();
 
          session.close();
@@ -484,12 +476,11 @@
 
          session.createQueue(ADDRESS, queue[0], null, true, false);
          session.createQueue(ADDRESS, queue[1], null, true, false);
-         
 
          int numberOfIntegers = 100000;
 
          Message clientFile = createLargeClientMessage(session, numberOfIntegers);
-         //Message clientFile = createLargeClientMessage(session, numberOfIntegers);
+         // Message clientFile = createLargeClientMessage(session, numberOfIntegers);
 
          ClientProducer producer = session.createProducer(ADDRESS);
          producer.send(clientFile);

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2009-01-17 00:18:22 UTC (rev 5653)
@@ -158,6 +158,8 @@
             ClientFileMessage message = (ClientFileMessage)consumer.receive(20000);
 
             assertNotNull("Message i=" + i + " wasn't received", message);
+            
+//            System.out.println("Received message " + i);
 
             message.acknowledge();
             

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java	2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java	2009-01-17 00:18:22 UTC (rev 5653)
@@ -1226,13 +1226,13 @@
    {
       long start = System.currentTimeMillis();
 
-      sf.setMinLargeMessageSize(10 * 1024);
+      sf.setMinLargeMessageSize(1024);
       
       sf.setSendWindowSize(1024*1024);
 
       ClientSession s = sf.createSession(false, false, false);
 
-      final int messageSize = 40 * 1024;
+      final int messageSize = 4 * 1024;
       
       final int numMessages = 100;
 
@@ -1277,7 +1277,7 @@
 
       for (MyHandler handler : handlers)
       {
-         boolean ok = handler.latch.await(5000, TimeUnit.MILLISECONDS);
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
 
          if (!ok)
          {
@@ -1422,7 +1422,10 @@
             {
                thread.join();
 
-               assertNull(thread.throwable);
+               if (thread.throwable != null)
+               {
+                  throw new Exception ("Exception on thread " + thread, thread.throwable);
+               }
             }
 
             runnable.checkFail();




More information about the jboss-cvs-commits mailing list