[jboss-cvs] JBoss Messaging SVN: r5612 - in branches/Branch_Failover_Page: src/main/org/jboss/messaging/core/persistence/impl/nullpm and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Jan 10 18:07:50 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-10 18:07:50 -0500 (Sat, 10 Jan 2009)
New Revision: 5612

Modified:
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/ServerMessage.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
Log:
Backup

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java	2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java	2009-01-10 23:07:50 UTC (rev 5612)
@@ -182,6 +182,11 @@
       return currentRefCount;
    }
 
+   public boolean isLargeMessage()
+   {
+      return true;
+   }
+
    public void deleteFile() throws MessagingException
    {
       if (file != null)

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2009-01-10 23:07:50 UTC (rev 5612)
@@ -103,6 +103,11 @@
       
    }
 
+   public boolean isLargeMessage()
+   {
+      return true;
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-01-10 23:07:50 UTC (rev 5612)
@@ -54,6 +54,8 @@
  */
 public interface PostOffice extends MessagingComponent
 {
+   boolean isBackup(); // Remove-me... debug for now only
+   
    boolean addDestination(SimpleString address, boolean durable) throws Exception;
 
    boolean removeDestination(SimpleString address, boolean durable) throws Exception;

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-10 23:07:50 UTC (rev 5612)
@@ -263,6 +263,11 @@
    {
       return addressManager.getDestinations();
    }
+   
+   public boolean isBackup()  // Remove-me... debug for now only
+   {
+      return backup;
+   }
 
    // TODO - needs to be synchronized to prevent happening concurrently with activate().
    // (and possible removeBinding and other methods)

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/ServerMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/ServerMessage.java	2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/ServerMessage.java	2009-01-10 23:07:50 UTC (rev 5612)
@@ -53,6 +53,8 @@
    
    int getMemoryEstimate();
    
+   boolean isLargeMessage();
+   
    void setReload();
    
    boolean isReload();

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-10 23:07:50 UTC (rev 5612)
@@ -309,8 +309,18 @@
       {
          int previous = availableCredits.getAndAdd(credits);
 
+         log.info("Had " + previous +
+                  ", received " +
+                  credits +
+                  " = " +
+                  availableCredits +
+                  " backup = " +
+                  messageQueue.isBackup() + 
+                  Thread.currentThread().getName()); // remove me
+
          if (previous <= 0 && previous + credits > 0)
          {
+//            log.info("********************* Calling promptDelivery, backup = " + messageQueue.isBackup()); -- remove me
             promptDelivery();
          }
       }
@@ -422,7 +432,11 @@
 
       if (handled != HandleStatus.HANDLED)
       {
-         throw new IllegalStateException("Reference was not handled " + ref + " " + handled);
+         throw new IllegalStateException("Reference was not handled " + ref +
+                                         " " +
+                                         handled +
+                                         " backup = " +
+                                         messageQueue.isBackup());
       }
    }
 
@@ -496,28 +510,37 @@
    {
       if (availableCredits != null && availableCredits.get() <= 0)
       {
+         log.info("No Credits " + availableCredits + " backup=" + this.messageQueue.isBackup() + " ref = " + ref,
+                  new Exception("trace"));
          return HandleStatus.BUSY;
       }
+      else
+      {
+         log.info("Accepted ref = " + ref + " backup = " + messageQueue.isBackup(), new Exception("trace"));
+      }
 
       lock.lock();
 
       try
       {
-         // If there is a pendingLargeMessage we can't take another message
-         // This has to be checked inside the lock as the set to null is done inside the lock
-         if (largeMessageSender != null)
-         {
-            return HandleStatus.BUSY;
-         }
 
          // If the consumer is stopped then we don't accept the message, it
          // should go back into the
          // queue for delivery later.
          if (!started)
          {
+            log.info("! started");
             return HandleStatus.BUSY;
          }
 
+         // If there is a pendingLargeMessage we can't take another message
+         // This has to be checked inside the lock as the set to null is done inside the lock
+         if (largeMessageSender != null)
+         {
+            log.info("LargeMessageSender != null, backup = " + messageQueue.isBackup());
+            return HandleStatus.BUSY;
+         }
+
          final ServerMessage message = ref.getMessage();
 
          if (filter != null && !filter.match(message))
@@ -541,31 +564,9 @@
             doAck(ref);
          }
 
-         // TODO: get rid of the instanceof by something like message.isLargeMessage()
-         if (message instanceof LargeServerMessage)
+         if (message.isLargeMessage())
          {
-            // FIXME - please put the replication logic in the sendLargeMessage method
-
-            DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
-                                                                                               message.getMessageID(),
-                                                                                               message.getDestination()));
-
-            if (result == null)
-            {
-               sendLargeMessage(ref, message);
-            }
-            else
-            {
-               // Send when replicate delivery response comes back
-               result.setResultRunner(new Runnable()
-               {
-                  public void run()
-                  {
-                     sendLargeMessage(ref, message);
-                  }
-               });
-            }
-
+            sendLargeMessage(ref, message);
          }
          else
          {
@@ -582,9 +583,31 @@
 
    private void sendLargeMessage(final MessageReference ref, final ServerMessage message)
    {
-      largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
 
-      largeMessageSender.sendLargeMessage();
+      DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
+                                                                                         message.getMessageID(),
+                                                                                         message.getDestination()));
+
+      if (result == null)
+      {
+         largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
+
+         largeMessageSender.sendLargeMessage();
+      }
+      else
+      {
+         // Send when replicate delivery response comes back
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
+
+               largeMessageSender.sendLargeMessage();
+            }
+         });
+      }
+
    }
 
    /**
@@ -642,8 +665,6 @@
       /** The current position on the message being processed */
       private volatile long positionPendingLargeMessage;
 
-      private volatile SessionReceiveContinuationMessage readAheadChunk;
-
       public LargeMessageSender(final LargeServerMessage message, final MessageReference ref)
       {
          pendingLargeMessage = message;
@@ -657,6 +678,8 @@
       {
          lock.lock();
 
+         log.info("Entering SendLargeMessage (backup = " + messageQueue.isBackup() + ")");
+         
          try
          {
             if (pendingLargeMessage == null)
@@ -666,11 +689,13 @@
 
             if (availableCredits != null && availableCredits.get() <= 0)
             {
+               log.info("Leaving send LargeMessage because of credits, backup = " + messageQueue.isBackup() + " even before it started sending messages");
                return false;
             }
 
             if (!sentFirstMessage)
             {
+               log.info("Sending first message = " + messageQueue.isBackup());
                sentFirstMessage = true;
 
                MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
@@ -687,33 +712,19 @@
                {
                   // RequiredBufferSize on this case represents the right number of bytes sent
                   availableCredits.addAndGet(-initialMessage.getRequiredBufferSize());
+                  log.info("Initial send, taking out " + initialMessage.getRequiredBufferSize() +
+                           " credits, current = " +
+                           availableCredits +
+                           " isBackup = " +
+                           messageQueue.isBackup());
                }
             }
 
-            if (readAheadChunk != null)
-            {
-               int chunkLen = readAheadChunk.getBody().length;
-
-               positionPendingLargeMessage += chunkLen;
-
-               if (availableCredits != null)
-               {
-                  availableCredits.addAndGet(-readAheadChunk.getRequiredBufferSize());
-               }
-
-               channel.send(readAheadChunk);
-
-               readAheadChunk = null;
-            }
-
             while (positionPendingLargeMessage < sizePendingLargeMessage)
             {
                if (availableCredits != null && availableCredits.get() <= 0)
                {
-                  if (readAheadChunk == null)
-                  {
-                     readAheadChunk = createChunkSend();
-                  }
+                  log.info("Leaving loop of send LargeMessage because of credits, backup = " + messageQueue.isBackup());
                   return false;
                }
 
@@ -726,11 +737,15 @@
                   availableCredits.addAndGet(-chunk.getRequiredBufferSize());
                }
 
+               log.info("Sending " + chunk.getRequiredBufferSize() + " availableCredits now is " + availableCredits + " isBackup = " + messageQueue.isBackup());
+
                channel.send(chunk);
 
                positionPendingLargeMessage += chunkLen;
             }
 
+            log.info("Finished sendLargeMessage isBackup = " + messageQueue.isBackup());
+
             pendingLargeMessage.releaseResources();
 
             largeMessageSender = null;

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2009-01-10 23:07:50 UTC (rev 5612)
@@ -138,6 +138,11 @@
       return reload;
    }
 
+   public boolean isLargeMessage()
+   {
+      return false;
+   }
+
    public void setReload()
    {
       this.reload = true;
@@ -158,4 +163,5 @@
              getDestination() +
              "]";
    }
+
 }

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-10 23:07:50 UTC (rev 5612)
@@ -371,7 +371,7 @@
    public void handleDeleteQueue(final SessionDeleteQueueMessage packet)
    {
       final SendLock lock;
-      
+
       if (channel.getReplicatingChannel() != null)
       {
          Binding binding = postOffice.getBinding(packet.getQueueName());
@@ -991,20 +991,22 @@
    {
       DelayedResult result = channel.replicatePacket(packet);
 
-      try
+      if (result == null)
       {
-         // Note we don't wait for response before handling this
+         try
+         {
+            log.info("Receiving credits... first option.. backup = " + postOffice.isBackup() +
+                     " and it is receiving " +
+                     packet.getCredits());
+            consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to receive credits", e);
+         }
 
-         consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to receive credits", e);
-      }
+         channel.confirm(packet);
 
-      if (result == null)
-      {
-         channel.confirm(packet);
       }
       else
       {
@@ -1012,7 +1014,21 @@
          {
             public void run()
             {
+               log.info("Receiving credits... second option.. backup = " + postOffice.isBackup() +
+                        " and it is receiving " +
+                        packet.getCredits() +
+                        " thread = " +
+                        Thread.currentThread().getName());
+               try
+               {
+                  consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+               }
+               catch (Exception e)
+               {
+                  log.error("Failed to receive credits", e);
+               }
                channel.confirm(packet);
+
             }
          });
       }
@@ -1433,7 +1449,7 @@
          {
             throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot delete queue - it has consumers");
          }
-         
+
          if (queue.isDurable())
          {
             queue.deleteAllReferences(storageManager, postOffice, queueSettingsRepository);

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2009-01-10 23:07:50 UTC (rev 5612)
@@ -62,7 +62,7 @@
 
    public void testLargeMessageReplicatedNoFailover() throws Exception
    {
-      testLargeMessage(-1, 500);
+      testLargeMessage(-1, 2);
    }
 
    public void testLargeMessageFailOnProducing() throws Exception
@@ -84,7 +84,7 @@
 //   
    public void testLargeMessageFailOnConsume() throws Exception
    {
-      testLargeMessage(2, 500);
+      testLargeMessage(2, 2);
    }
 
    private void testLargeMessage(final int placeToFail, final int numberOfMessages) throws Exception
@@ -93,7 +93,7 @@
 
       factory.setMinLargeMessageSize(10 * 1024);
 
-      final int messageSize = 25000;
+      final int messageSize = 1024*1024;
 
       try
       {
@@ -105,7 +105,6 @@
       }
       finally
       {
-         System.out.println("Giving up!!!!!!");
       }
 
    }
@@ -143,7 +142,7 @@
                }
             }
 
-            ClientMessage message = consumer.receive(5000);
+            ClientMessage message = consumer.receive(10000);
 
             assertNotNull(message);
 

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2009-01-10 23:07:50 UTC (rev 5612)
@@ -209,4 +209,12 @@
    {
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.postoffice.PostOffice#isBackup()
+    */
+   public boolean isBackup()
+   {
+      return false;
+   }
+
 }




More information about the jboss-cvs-commits mailing list