[jboss-cvs] JBoss Messaging SVN: r4412 - trunk/src/main/org/jboss/messaging/core/client/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 9 13:22:40 EDT 2008


Author: timfox
Date: 2008-06-09 13:22:39 -0400 (Mon, 09 Jun 2008)
New Revision: 4412

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1351


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-06-09 16:53:40 UTC (rev 4411)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java	2008-06-09 17:22:39 UTC (rev 4412)
@@ -175,7 +175,6 @@
       {
          remotingConnection = remotingConnectionFactory.createRemotingConnection(location, connectionParams);
        
-         log.info("calling start");
          remotingConnection.start();
          
          long sessionID = remotingConnection.getSessionID();
@@ -207,7 +206,6 @@
          
          if (t instanceof MessagingException)
          {
-            log.info("got messaging excetption");
             throw (MessagingException)t;
          }
          else

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-06-09 16:53:40 UTC (rev 4411)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-06-09 17:22:39 UTC (rev 4412)
@@ -235,7 +235,14 @@
    	
       this.handler = handler;
       
-      flushBuffer(false);
+      //If there are any messages in the buffer, we need to queue up executors for them
+      synchronized (this)
+      {
+         for (int i = 0; i < buffer.size(); i++)
+         {      
+            queueExecutor();
+         }
+      }
    }
 
    public void close() throws MessagingException
@@ -346,7 +353,7 @@
          		maxSize = Math.max(maxSize, buffer.size());
          	}
          	            	
-         	flushBuffer(true);
+         	queueExecutor();
          }
       }
       else
@@ -382,9 +389,9 @@
    // Private
    // --------------------------------------------------------------------------------------
 
-   private void flushBuffer(final boolean singleMessage)
+   private void queueExecutor()
    {
-      sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(singleMessage); } } );
+      sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(); } } );
    }
    
    private void flowControl(final int messageBytes) throws MessagingException
@@ -441,39 +448,29 @@
          throw new MessagingException(MessagingException.OBJECT_CLOSED, "Consumer is closed");
       }
    }
-
    
-   // TODO (JBMESSAGING-1351): I have made this parameter singleMessage as I just wanted to keep the current behavior of handleMessages now, 
-   //                  and as I don't want to mess up with results (performance, tests... everything else)
-   //                  But I have the impression that callOnMessage should aways run until the buffer is empty, on that case singleMessage 
-   //                  should be ignored
-   private void callOnMessage(final boolean singleMessage)
+   private void callOnMessage()
    {
    	try
 		{
-   	   do
-   	   {
-      		if (closed)
-      		{
-      			return;
-      		}
-      		
-      		//We pull the message from the buffer from inside the Runnable so we can ensure priority
-      		//ordering. If we just added a Runnable with the message to the executor immediately as we get it
-      		//we could not do that
-      		
-      		ClientMessage message;
-      		
-      		synchronized (this)
-      		{
-      		   message = buffer.removeFirst();
-      		}
-      		
-      		if (message == null)
-      		{
-      		   break;
-      		}
-      		
+   		if (closed)
+   		{
+   			return;
+   		}
+   		
+   		//We pull the message from the buffer from inside the Runnable so we can ensure priority
+   		//ordering. If we just added a Runnable with the message to the executor immediately as we get it
+   		//we could not do that
+   		
+   		ClientMessage message;
+   		
+   		synchronized (this)
+   		{
+   		   message = buffer.removeFirst();
+   		}
+   		
+   		if (message != null)
+   		{   		      		
       		boolean expired = message.isExpired();
    
             session.delivered(message.getDeliveryID(), expired);
@@ -486,8 +483,7 @@
          		
                handler.onMessage(message);
             }
-   	   } 
-   	   while (!singleMessage);
+   		}
 		}
 		catch (MessagingException e)
 		{




More information about the jboss-cvs-commits mailing list