[jboss-cvs] JBoss Messaging SVN: r4367 - trunk/src/main/org/jboss/messaging/core/client/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri May 30 22:14:51 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-05-30 22:14:51 -0400 (Fri, 30 May 2008)
New Revision: 4367

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
Log:
JBMESSAGING-1351

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-05-30 22:24:56 UTC (rev 4366)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-05-31 02:14:51 UTC (rev 4367)
@@ -232,7 +232,9 @@
         
    	waitForOnMessageToComplete();   	
    	
-      this.handler = handler;      
+      this.handler = handler;
+      
+      flushBuffer(false);
    }
 
    public void close() throws MessagingException
@@ -343,13 +345,12 @@
          		maxSize = Math.max(maxSize, buffer.size());
          	}
          	            	
-         	sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(); } } );
+         	flushBuffer(true);
          }
       }
       else
       {
       	 // Add it to the buffer
-      	
       	synchronized (this)
       	{
       		buffer.addLast(message, message.getPriority());
@@ -358,7 +359,7 @@
       	}
       }      
    }
-   
+
    int maxSize = 0;
 
    public void recover(final long lastDeliveryID)
@@ -380,6 +381,11 @@
    // Private
    // --------------------------------------------------------------------------------------
 
+   private void flushBuffer(final boolean singleMessage)
+   {
+      sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(singleMessage); } } );
+   }
+   
    private void flowControl(final int messageBytes) throws MessagingException
    {
       if (clientWindowSize > 0)
@@ -434,29 +440,39 @@
          throw new MessagingException(MessagingException.OBJECT_CLOSED, "Consumer is closed");
       }
    }
-        
-   private void callOnMessage()
+
+   
+   // TODO (JBMESSAGING-1351): I have made this parameter singleMessage as I just wanted to keep the current behavior of handleMessages now, 
+   //                  and as I don't want to mess up with results (performance, tests... everything else)
+   //                  But I have the impression that callOnMessage should aways run until the buffer is empty, on that case singleMessage 
+   //                  should be ignored
+   private void callOnMessage(final boolean singleMessage)
    {
    	try
 		{
-   		if (closed)
-   		{
-   			return;
-   		}
-   		
-   		//We pull the message from the buffer from inside the Runnable so we can ensure priority
-   		//ordering. If we just added a Runnable with the message to the executor immediately as we get it
-   		//we could not do that
-   		
-   		ClientMessage message;
-   		
-   		synchronized (this)
-   		{
-   		   message = buffer.removeFirst();
-   		}
-   		
-   		if (message != null)
-   		{      		
+   	   do
+   	   {
+      		if (closed)
+      		{
+      			return;
+      		}
+      		
+      		//We pull the message from the buffer from inside the Runnable so we can ensure priority
+      		//ordering. If we just added a Runnable with the message to the executor immediately as we get it
+      		//we could not do that
+      		
+      		ClientMessage message;
+      		
+      		synchronized (this)
+      		{
+      		   message = buffer.removeFirst();
+      		}
+      		
+      		if (message == null)
+      		{
+      		   break;
+      		}
+      		
       		boolean expired = message.isExpired();
    
             session.delivered(message.getDeliveryID(), expired);
@@ -469,7 +485,8 @@
          		
                handler.onMessage(message);
             }
-   		}
+   	   } 
+   	   while (!singleMessage);
 		}
 		catch (MessagingException e)
 		{




More information about the jboss-cvs-commits mailing list