[hornetq-commits] JBoss hornetq SVN: r12015 - trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jan 12 07:10:03 EST 2012


Author: borges
Date: 2012-01-12 07:10:02 -0500 (Thu, 12 Jan 2012)
New Revision: 12015

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
Log:
Make synchronization consistent.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java	2012-01-12 12:09:47 UTC (rev 12014)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java	2012-01-12 12:10:02 UTC (rev 12015)
@@ -267,7 +267,7 @@
                   {
                      // forced delivery messages are discarded, nothing has been delivered by the queue
                      resetIfSlowConsumer();
-                     
+
                      if (isTrace)
                      {
                         log.trace("There was nothing on the queue, leaving it now:: returning null");
@@ -315,7 +315,7 @@
                {
                   largeMessageReceived = m;
                }
-               
+
                if (isTrace)
                {
                   log.trace("Returning " + m);
@@ -367,7 +367,7 @@
       return receive(0, true);
    }
 
-   public MessageHandler getMessageHandler() throws HornetQException
+   public synchronized MessageHandler getMessageHandler() throws HornetQException
    {
       checkClosed();
 
@@ -479,7 +479,7 @@
    {
       return session;
    }
-   
+
    public SessionQueueQueryResponseMessage getQueueInfo()
    {
       return queueInfo;
@@ -621,7 +621,7 @@
             try
             {
                ClientMessageInternal message = iter.next();
-               
+
                if (message.isLargeMessage())
                {
                   ClientLargeMessageInternal largeMessage = (ClientLargeMessageInternal)message;
@@ -637,7 +637,7 @@
          }
 
          clearBuffer();
-         
+
          try
          {
             if (currentLargeMessageController != null)
@@ -704,11 +704,11 @@
       }
    }
 
-   /** 
-    * 
+   /**
+    *
     * LargeMessageBuffer will call flowcontrol here, while other handleMessage will also be calling flowControl.
     * So, this operation needs to be atomic.
-    * 
+    *
     * @param discountSlowConsumer When dealing with slowConsumers, we need to discount one credit that was pre-sent when the first receive was called. For largeMessage that is only done at the latest packet
     */
    public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException
@@ -716,7 +716,7 @@
       if (clientWindowSize >= 0)
       {
          creditsToSend += messageBytes;
-         
+
          if (log.isTraceEnabled())
          {
             log.trace(this + "::FlowControl::creditsToSend=" + creditsToSend + ", clientWindowSize = " + clientWindowSize + " messageBytes = " + messageBytes);
@@ -774,7 +774,7 @@
    // Private
    // ---------------------------------------------------------------------------------------
 
-   /** 
+   /**
     * Sending a initial credit for slow consumers
     * */
    private void startSlowConsumer()
@@ -808,7 +808,7 @@
       {
          ClientConsumerImpl.log.trace("Adding Runner on Executor for delivery");
       }
-      
+
       sessionExecutor.execute(runner);
    }
 
@@ -891,7 +891,7 @@
                //Ignore, this could be a relic from a previous receiveImmediate();
                return;
             }
-            
+
             boolean expired = message.isExpired();
 
             flowControlBeforeConsumption(message);



More information about the hornetq-commits mailing list