[hornetq-commits] JBoss hornetq SVN: r8221 - in trunk: src/main/org/hornetq/core/client/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 5 03:42:21 EST 2009


Author: timfox
Date: 2009-11-05 03:42:21 -0500 (Thu, 05 Nov 2009)
New Revision: 8221

Modified:
   trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-206

Modified: trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
===================================================================
--- trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java	2009-11-05 07:56:15 UTC (rev 8220)
+++ trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java	2009-11-05 08:42:21 UTC (rev 8221)
@@ -47,7 +47,9 @@
 
    // The message we will send is size 256MB, even though we are only running in 50MB of RAM on both client and server.
    // HornetQ will support much larger message sizes, but we use 512MB so the example runs in reasonable time.
-   private final long FILE_SIZE = 256 * 1024 * 1024;
+  // private final long FILE_SIZE = 256L * 1024 * 1024;
+   
+   private final long FILE_SIZE = 2L * 1024 * 1024 * 1024; // 2 GiB message
 
    public boolean runExample() throws Exception
    {

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2009-11-05 07:56:15 UTC (rev 8220)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2009-11-05 08:42:21 UTC (rev 8221)
@@ -22,8 +22,8 @@
 import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.LargeMessageEncodingContext;
 import org.hornetq.core.message.Message;
-import org.hornetq.core.message.LargeMessageEncodingContext;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
@@ -250,7 +250,7 @@
 
       if (isLarge)
       {
-         largeMessageSend(sendBlocking, msg);
+         largeMessageSend(sendBlocking, msg, theCredits);
       }
       else if (sendBlocking)
       {
@@ -269,14 +269,8 @@
          // Not the continuations, but this is ok since we are only interested in limiting the amount of
          // data in *memory* and continuations go straight to the disk
 
-         if (isLarge)
+         if (!isLarge)
          {
-            // TODO this is pretty hacky - we should define consistent meanings of encode size
-
-            theCredits.acquireCredits(msg.getHeadersAndPropertiesEncodeSize());
-         }
-         else
-         {
             theCredits.acquireCredits(msg.getEncodeSize());
          }
       }
@@ -292,15 +286,14 @@
          throw new HornetQException(HornetQException.OBJECT_CLOSED, "Producer is closed");
       }
    }
-   
-   
+
    // Methods to send Large Messages----------------------------------------------------------------
-   
+
    /**
     * @param msg
     * @throws HornetQException
     */
-   private void largeMessageSend(final boolean sendBlocking, final Message msg) throws HornetQException
+   private void largeMessageSend(final boolean sendBlocking, final Message msg, final ClientProducerCredits credits) throws HornetQException
    {
       int headerSize = msg.getHeadersAndPropertiesEncodeSize();
 
@@ -323,15 +316,23 @@
 
       channel.send(initialChunk);
 
+      try
+      {
+         credits.acquireCredits(msg.getHeadersAndPropertiesEncodeSize());
+      }
+      catch (InterruptedException e)
+      {
+      }
+
       InputStream input = msg.getBodyInputStream();
 
       if (input != null)
       {
-         largeMessageSendStreamed(sendBlocking, input);
+         largeMessageSendStreamed(sendBlocking, input, credits);
       }
       else
       {
-         largeMessageSendBuffered(sendBlocking, msg);
+         largeMessageSendBuffered(sendBlocking, msg, credits);
       }
    }
 
@@ -340,7 +341,9 @@
     * @param msg
     * @throws HornetQException
     */
-   private void largeMessageSendBuffered(final boolean sendBlocking, final Message msg) throws HornetQException
+   private void largeMessageSendBuffered(final boolean sendBlocking,
+                                         final Message msg,
+                                         final ClientProducerCredits credits) throws HornetQException
    {
       final long bodySize = msg.getLargeBodySize();
 
@@ -373,6 +376,14 @@
          {
             channel.send(chunk);
          }
+
+         try
+         {
+            credits.acquireCredits(chunk.getRequiredBufferSize());
+         }
+         catch (InterruptedException e)
+         {
+         }
       }
    }
 
@@ -381,7 +392,9 @@
     * @param input
     * @throws HornetQException
     */
-   private void largeMessageSendStreamed(final boolean sendBlocking, InputStream input) throws HornetQException
+   private void largeMessageSendStreamed(final boolean sendBlocking,
+                                         final InputStream input,
+                                         final ClientProducerCredits credits) throws HornetQException
    {
       boolean lastPacket = false;
 
@@ -441,6 +454,14 @@
          {
             channel.send(chunk);
          }
+
+         try
+         {
+            credits.acquireCredits(chunk.getRequiredBufferSize());
+         }
+         catch (InterruptedException e)
+         {
+         }
       }
 
       try
@@ -455,8 +476,6 @@
       }
    }
 
-
-
    // Inner Classes --------------------------------------------------------------------------------
    class DecodingContext implements LargeMessageEncodingContext
    {

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-05 07:56:15 UTC (rev 8220)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-05 08:42:21 UTC (rev 8221)
@@ -1478,7 +1478,7 @@
       {
          try
          {
-            releaseOutStanding(message);
+            releaseOutStanding(message, message.getEncodeSize());
          }
          catch (Exception e)
          {
@@ -1499,20 +1499,23 @@
          {
             throw new HornetQException(HornetQException.ILLEGAL_STATE, "large-message not initialized on server");
          }
-
+         
+         //Immediately release the credits for the continuations- these don't contrinute to the in-memory size
+         //of the message
+         
+         releaseOutStanding(currentLargeMessage, packet.getRequiredBufferSize());
+         
          currentLargeMessage.addBytes(packet.getBody());
 
          if (!packet.isContinues())
-         {
-            final LargeServerMessage message = currentLargeMessage;
+         {                        
+            currentLargeMessage.releaseResources();
 
+            send(currentLargeMessage);
+
+            releaseOutStanding(currentLargeMessage, currentLargeMessage.getEncodeSize());
+            
             currentLargeMessage = null;
-
-            message.releaseResources();
-
-            send(message);
-
-            releaseOutStanding(message);
          }
 
          if (packet.isRequiresResponse())
@@ -1910,17 +1913,15 @@
     * returned. When a session closes any outstanding credits will be returned.
     * 
     */
-   private void releaseOutStanding(final ServerMessage message) throws Exception
+   private void releaseOutStanding(final ServerMessage message, final int credits) throws Exception
    {
       CreditManagerHolder holder = getCreditManagerHolder(message.getDestination());
 
-      int size = message.getEncodeSize();
+      holder.outstandingCredits -= credits;
 
-      holder.outstandingCredits -= size;
-
-      holder.store.returnProducerCredits(size);
+      holder.store.returnProducerCredits(credits);
    }
-
+   
    private void send(final ServerMessage msg) throws Exception
    {
       // check the user has write access to this address.



More information about the hornetq-commits mailing list