[jboss-cvs] JBoss Messaging SVN: r5493 - in trunk/src/main/org/jboss/messaging/core: client/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 9 12:50:53 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-12-09 12:50:53 -0500 (Tue, 09 Dec 2008)
New Revision: 5493

Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java
   trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
Log:
LargeMessage: tweak on flow control

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java	2008-12-09 17:12:44 UTC (rev 5492)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java	2008-12-09 17:50:53 UTC (rev 5493)
@@ -46,5 +46,5 @@
 
    FileChannel getChannel() throws MessagingException;
    
-   void closeChannel() throws MessagingException;     
+   void closeChannel() throws MessagingException;   
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java	2008-12-09 17:12:44 UTC (rev 5492)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java	2008-12-09 17:50:53 UTC (rev 5493)
@@ -46,4 +46,11 @@
    void acknowledge() throws MessagingException;
    
    boolean isLargeMessage();
+   
+   /** Size used for FlowControl */
+   int getFlowControlSize();
+   
+   /** Size used for FlowControl */
+   void setFlowControlSize(int flowControlSize);
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-12-09 17:12:44 UTC (rev 5492)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-12-09 17:50:53 UTC (rev 5493)
@@ -19,8 +19,8 @@
 import java.util.Queue;
 import java.util.concurrent.Executor;
 
-import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientFileMessage;
+import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.MessageHandler;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
@@ -70,7 +70,7 @@
 
    private final Runner runner = new Runner();
 
-   private File directory;
+   private final File directory;
 
    private ClientMessage currentChunkMessage;
 
@@ -243,9 +243,9 @@
       }
 
       // If no handler before then need to queue them up
-      boolean queueUp = this.handler == null;
+      boolean queueUp = handler == null;
 
-      this.handler = theHandler;
+      handler = theHandler;
 
       if (queueUp)
       {
@@ -341,14 +341,14 @@
          return;
       }
 
+      // Flow control for the first packet, we will have others
+      flowControl(header.length);
+
       currentChunkMessage = createFileMessage(header);
-      
-      // We won't call flow control at this point, as we will only flow control the header right before consumption
-      
 
    }
 
-   public synchronized void handleLargeMessageContinuation(SessionReceiveContinuationMessage chunk) throws Exception
+   public synchronized void handleLargeMessageContinuation(final SessionReceiveContinuationMessage chunk) throws Exception
    {
       if (closing)
       {
@@ -357,7 +357,10 @@
 
       ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
 
-      flowControl(chunk.getBody().length);
+      if (chunk.isContinues())
+      {
+         flowControl(chunk.getBody().length);
+      }
 
       if (isFileConsumer())
       {
@@ -367,7 +370,7 @@
       else
       {
          MessagingBuffer currentBody = currentChunkMessage.getBody();
-         
+
          final int currentBodySize = currentBody == null ? 0 : currentBody.limit();
 
          MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBodySize + body.limit()));
@@ -376,7 +379,7 @@
          {
             newBody.putBytes(currentBody.array());
          }
-         
+
          newBody.putBytes(body.array());
 
          currentChunkMessage.setBody(newBody);
@@ -389,6 +392,8 @@
          {
             ((ClientFileMessage)currentChunkMessage).closeChannel();
          }
+         
+         currentChunkMessage.setFlowControlSize(chunk.getBody().length);
 
          ClientMessage msgToSend = currentChunkMessage;
          currentChunkMessage = null;
@@ -523,7 +528,7 @@
 
       // Must store handler in local variable since might get set to null
       // otherwise while this is executing and give NPE when calling onMessage
-      MessageHandler theHandler = this.handler;
+      MessageHandler theHandler = handler;
 
       if (theHandler != null)
       {
@@ -556,18 +561,10 @@
     * @param message
     * @throws MessagingException
     */
-   private void flowControlBeforeConsumption(ClientMessage message) throws MessagingException
+   private void flowControlBeforeConsumption(final ClientMessage message) throws MessagingException
    {
       // Chunk messages will execute the flow control while receiving the chunks
-      if (!message.isLargeMessage())
-      {
-         flowControl(message.getEncodeSize());
-      }
-      else
-      {
-         // But the header is only flow controlled right before the consumption
-         flowControl(message.getPropertiesEncodeSize());
-      }
+      flowControl(message.getFlowControlSize());
    }
 
    private void doCleanUp(final boolean sendCloseMessage) throws MessagingException
@@ -634,58 +631,54 @@
       else
       {
          int propertiesSize = message.getPropertiesEncodeSize();
-   
+
          MessagingBuffer bufferProperties = message.getBody().createNewBuffer(propertiesSize);
-   
+
          // FIXME: Find a better way to clone this ClientMessageImpl as ClientFileMessageImpl without using the
          // MessagingBuffer.
          // There is no direct access into the Properties, and I couldn't add a direct cast to this method without loose
          // abstraction
          message.encodeProperties(bufferProperties);
-   
+
          bufferProperties.rewind();
-   
+
          ClientFileMessageImpl cloneMessage = new ClientFileMessageImpl();
-   
+
          cloneMessage.decodeProperties(bufferProperties);
-   
+
          cloneMessage.setDeliveryCount(message.getDeliveryCount());
-   
+
          cloneMessage.setLargeMessage(message.isLargeMessage());
-   
-         cloneMessage.setFile(new File(this.directory, cloneMessage.getMessageID() + "-" +
-                                                       this.session.getName() +
-                                                       "-" +
-                                                       this.getID() +
-                                                       ".jbm"));
-   
+
+         cloneMessage.setFile(new File(directory, cloneMessage.getMessageID() + "-" +
+                                                  session.getName() +
+                                                  "-" +
+                                                  getID() +
+                                                  ".jbm"));
+
          addBytesBody(cloneMessage, message.getBody().array());
-   
+
          cloneMessage.closeChannel();
-   
+
          return cloneMessage;
       }
    }
 
    private ClientMessage createFileMessage(final byte[] header) throws Exception
    {
-      
+
       MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.wrap(header));
 
       if (isFileConsumer())
       {
-         if (!this.directory.exists())
+         if (!directory.exists())
          {
             directory.mkdirs();
          }
 
          ClientFileMessageImpl message = new ClientFileMessageImpl();
          message.decodeProperties(headerBuffer);
-         message.setFile(new File(this.directory, message.getMessageID() + "-" +
-                                                  this.session.getName() +
-                                                  "-" +
-                                                  this.getID() +
-                                                  ".jbm"));
+         message.setFile(new File(directory, message.getMessageID() + "-" + session.getName() + "-" + getID() + ".jbm"));
          message.setLargeMessage(true);
          return message;
       }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2008-12-09 17:12:44 UTC (rev 5492)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2008-12-09 17:50:53 UTC (rev 5493)
@@ -40,13 +40,14 @@
 {
    // added this constant here so that the client package have no dependency on JMS
    public static final SimpleString REPLYTO_HEADER_NAME = new SimpleString("JMSReplyTo");
-   
+
    private int deliveryCount;
 
    private ClientConsumerInternal consumer;
 
    private boolean largeMessage;
 
+   private int flowControlSize = -1;
 
    /*
     * Constructor for when reading from network
@@ -97,7 +98,7 @@
 
    public int getDeliveryCount()
    {
-      return this.deliveryCount;
+      return deliveryCount;
    }
 
    public void acknowledge() throws MessagingException
@@ -108,6 +109,23 @@
       }
    }
 
+   public int getFlowControlSize()
+   {
+      if (flowControlSize > 0)
+      {
+         return flowControlSize;
+      }
+      else
+      {
+         return getEncodeSize();
+      }
+   }
+
+   public void setFlowControlSize(final int flowControlSize)
+   {
+      this.flowControlSize = flowControlSize;
+   }
+
    /**
     * @return the largeMessage
     */
@@ -119,7 +137,7 @@
    /**
     * @param largeMessage the largeMessage to set
     */
-   public void setLargeMessage(boolean largeMessage)
+   public void setLargeMessage(final boolean largeMessage)
    {
       this.largeMessage = largeMessage;
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-12-09 17:12:44 UTC (rev 5492)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-12-09 17:50:53 UTC (rev 5493)
@@ -655,7 +655,7 @@
                if (availableCredits != null)
                {
                   // RequiredBufferSize on this case represents the right number of bytes sent
-                  availableCredits.addAndGet(-pendingLargeMessage.getPropertiesEncodeSize());
+                  availableCredits.addAndGet(-headerBuffer.limit());
                }
             }
 




More information about the jboss-cvs-commits mailing list