[jboss-cvs] JBoss Messaging SVN: r3772 - in trunk/src/main/org/jboss: messaging/util and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Feb 22 11:48:44 EST 2008


Author: timfox
Date: 2008-02-22 11:48:43 -0500 (Fri, 22 Feb 2008)
New Revision: 3772

Removed:
   trunk/src/main/org/jboss/messaging/util/Future.java
Modified:
   trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
Log:
Some improvements to ClientConsumerImpl and remove old Future


Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java	2008-02-22 15:34:18 UTC (rev 3771)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java	2008-02-22 16:48:43 UTC (rev 3772)
@@ -22,6 +22,8 @@
 package org.jboss.jms.client.impl;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.jboss.jms.client.api.MessageHandler;
 import org.jboss.jms.client.remoting.MessagingRemotingConnection;
@@ -32,7 +34,6 @@
 import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
 import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
 import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
-import org.jboss.messaging.util.Future;
 import org.jboss.messaging.util.Logger;
 import org.jboss.messaging.util.MessagingException;
 
@@ -54,6 +55,8 @@
    private static final Logger log = Logger.getLogger(ClientConsumerImpl.class);
 
    private static final boolean trace = log.isTraceEnabled();
+   
+   private static final long CLOSE_TIMEOUT_SECONDS = 10;
 
    // Attributes
    // -----------------------------------------------------------------------------------
@@ -66,16 +69,12 @@
    
    private volatile Thread receiverThread;
    
-   private MessageHandler handler;
+   private volatile MessageHandler handler;
    
    private volatile boolean closed;
-   
-   private Object mainLock = new Object();
-   
+    
    private ExecutorService sessionExecutor;
    
-   private boolean listenerRunning;
-   
    private MessagingRemotingConnection remotingConnection;
    
    private long ignoreDeliveryMark = -1;
@@ -110,85 +109,82 @@
    // ClientConsumer implementation
    // -----------------------------------------------------------------
 
-   public Message receive(long timeout) throws MessagingException
+   public synchronized Message receive(long timeout) throws MessagingException
    {
       checkClosed();
 
-      synchronized (mainLock)
+      if (handler != null)
       {
-         if (handler != null)
-         {
-            throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot call receive(...) - a MessageHandler is set");
-         }
+         throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot call receive(...) - a MessageHandler is set");
+      }
 
-         receiverThread = Thread.currentThread();
+      receiverThread = Thread.currentThread();
 
-         if (timeout == 0)            
-         {
-            //Effectively infinite
-            timeout = Long.MAX_VALUE;
-         }
-         
-         long start = System.currentTimeMillis();
-         
-         long toWait = timeout;
+      if (timeout == 0)            
+      {
+         //Effectively infinite
+         timeout = Long.MAX_VALUE;
+      }
+      
+      long start = System.currentTimeMillis();
+      
+      long toWait = timeout;
 
-         try
+      try
+      {
+         while (true)
          {
-            while (true)
+            while (!closed && buffer.isEmpty() && toWait > 0)
             {
-               while (!closed && buffer.isEmpty() && toWait > 0)
+               try
                {
-                  try
+                  wait(toWait);
+               }
+               catch (InterruptedException e)
+               {
+               }
+               
+               long now = System.currentTimeMillis();
+               
+               toWait -= now - start;
+               
+               start = now;
+            }
+                    
+            if (!closed && !buffer.isEmpty())
+            {                              
+               DeliverMessage m = buffer.removeFirst();
+               
+               boolean expired = m.getMessage().isExpired();
+               
+               session.delivered(m.getDeliveryID(), expired);
+               
+               flowControl();
+                                 
+               if (expired)
+               {
+                  if (toWait > 0)
                   {
-                     mainLock.wait(toWait);
+                     continue;
                   }
-                  catch (InterruptedException e)
+                  else
                   {
+                     return null;
                   }
-                  
-                  long now = System.currentTimeMillis();
-                  
-                  toWait -= now - start;
-                  
-                  start = now;
                }
-                       
-               if (!closed && !buffer.isEmpty())
-               {                              
-                  DeliverMessage m = buffer.removeFirst();
-                  
-                  boolean expired = m.getMessage().isExpired();
-                  
-                  session.delivered(m.getDeliveryID(), expired);
-                  
-                  flowControl();
-                                    
-                  if (expired)
-                  {
-                     if (toWait > 0)
-                     {
-                        continue;
-                     }
-                     else
-                     {
-                        return null;
-                     }
-                  }
-                                    
-                  return m.getMessage();
-               }
-               else
-               {
-                  return null;
-               }
+                                 
+               return m.getMessage();
             }
+            else
+            {
+               return null;
+            }
          }
-         finally
-         {
-            receiverThread = null;
-         }
       }
+      finally
+      {
+         receiverThread = null;
+      }      
    }
 
    public Message receiveImmediate() throws MessagingException
@@ -211,18 +207,10 @@
       {
          throw new MessagingException(MessagingException.ILLEGAL_STATE,"Cannot set MessageHandler - consumer is in receive(...)");
       }
-
-      synchronized (mainLock)
-      {         
-         this.handler = handler;
-
-         if (handler != null && !buffer.isEmpty())
-         {
-            listenerRunning = true;
-
-            queueRunner();
-         }
-      }
+        
+   	waitForOnMessageToComplete();   	
+   	
+      this.handler = handler;      
    }
 
    public void close() throws MessagingException
@@ -231,25 +219,26 @@
       {
          return;
       }
-
+ 
       try
       {
-         // We set the handler to null so the next ListenerRunner won't run
-         handler = null;
-
          // Now we wait for any current handler runners to run.
          waitForOnMessageToComplete();
 
-         synchronized (mainLock)
+         closed = true;
+                           
+         if (receiverThread != null)
          {
-            closed = true;
-
-            if (receiverThread != null)
-            {
+            synchronized (this)
+            {   
                // Wake up any receive() thread that might be waiting
-               mainLock.notify();
+               notify();
             }
          }
+                          
+         handler = null;
+         
+         receiverThread = null;
 
          remotingConnection.send(id, new CloseMessage());
 
@@ -258,7 +247,7 @@
       finally
       {
          session.removeConsumer(this);
-      }
+      }   	
    }
 
    public boolean isClosed()
@@ -276,76 +265,74 @@
 
    public void handleMessage(final DeliverMessage message) throws Exception
    {
-      synchronized (mainLock)
+      if (closed)
       {
-         if (closed)
+         // This is ok - we just ignore the message
+         return;
+      }
+
+      if (ignoreDeliveryMark >= 0)
+      {
+         long delID = message.getDeliveryID();
+
+         if (delID > ignoreDeliveryMark)
          {
-            // This is ok - we just ignore the message
+            // Ignore - the session is recovering and these are inflight
+            // messages
             return;
          }
-
-         if (ignoreDeliveryMark >= 0)
+         else
          {
-            long delID = message.getDeliveryID();
-
-            if (delID > ignoreDeliveryMark)
-            {
-               // Ignore - the session is recovering and these are inflight
-               // messages
-               return;
-            }
-            else
-            {
-               // We have hit the begining of the recovered messages - we can
-               // stop ignoring
-               ignoreDeliveryMark = -1;
-            }
+            // We have hit the begining of the recovered messages - we can
+            // stop ignoring
+            ignoreDeliveryMark = -1;
          }
-
-         // Add it to the buffer
-         Message coreMessage = message.getMessage();
-
-         buffer.addLast(message, coreMessage.getPriority());
-
-         if (receiverThread != null)
+      }
+      
+      if (handler != null)
+      {
+         if (direct)
          {
-            mainLock.notify();
-         }
-         else if (handler != null)
-         {
-            if (direct)
-            {
-               //Dispatch it directly on remoting thread
-               
-               boolean expired = message.getMessage().isExpired();
+            //Dispatch it directly on remoting thread
+            
+            boolean expired = message.getMessage().isExpired();
 
-               session.delivered(message.getDeliveryID(), expired);
-               
-               flowControl();
+            session.delivered(message.getDeliveryID(), expired);
+            
+            flowControl();
 
-               if (!expired)
-               {
-                  handler.onMessage(message.getMessage());
-               }
-            }
-            else if (!listenerRunning)
+            if (!expired)
             {
-               listenerRunning = true;
-
-               queueRunner();
+               handler.onMessage(message.getMessage());
             }
          }
+         else
+         {
+         	//Execute using executor
+         	
+         	buffer.addLast(message, message.getMessage().getPriority());
+         	            	
+         	sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(); } } );
+         }
       }
+      else
+      {
+      	 // Add it to the buffer
+
+      	buffer.addLast(message, message.getMessage().getPriority());
+      	
+      	synchronized (this)
+      	{
+      		notify();
+      	}
+      }      
    }
 
-   public void recover(long lastDeliveryID)
+   public synchronized void recover(long lastDeliveryID)
    {
-      synchronized (mainLock)
-      {
-         ignoreDeliveryMark = lastDeliveryID;
+      ignoreDeliveryMark = lastDeliveryID;
 
-         buffer.clear();
-      }
+      buffer.clear();      
    }
 
    // Public
@@ -377,26 +364,36 @@
    
    private void waitForOnMessageToComplete()
    {
-      // Wait for any onMessage() executions to complete
-
+   	if (handler == null)
+   	{
+   		return;
+   	}
+   	
       if (Thread.currentThread() == onMessageThread)
       {
          // If called from inside onMessage then return immediately - otherwise would block forever
          return;
       }
 
-      Future result = new Future();
+      Future<?> future = sessionExecutor.submit(new Runnable() { public void run() {} });
 
-      sessionExecutor.execute(new Closer(result));
-
-      result.getResult();
+      long start = System.currentTimeMillis();
+      try
+      {
+      	future.get(CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+      }
+      catch (Exception e)
+      {
+      	//Ignore
+      }
+      long end = System.currentTimeMillis();
+      
+      if (end - start >= CLOSE_TIMEOUT_SECONDS * 1000)
+      {
+      	log.warn("Timed out waiting for handler to complete processing");
+      }
    }
 
-   private void queueRunner()
-   {
-      sessionExecutor.execute(new ListenerRunner());
-   }
-
    private void checkClosed() throws MessagingException
    {
       if (closed)
@@ -405,89 +402,48 @@
       }
    }
    
-   private void onMessageLoop()
+   private void callOnMessage()
    {
-      try
-      {
-         onMessageThread = Thread.currentThread();
-
-         DeliverMessage msg = null;
-
-         MessageHandler theListener = null;
-                  
-         synchronized (mainLock)
-         {
-            if (handler == null || buffer.isEmpty())
-            {
-               listenerRunning = false;
-
-               return;
-            }
-
-            theListener = handler;
-
-            msg = buffer.removeFirst();              
-         }
-
-         if (msg != null)
-         {
-            boolean expired = msg.getMessage().isExpired();
-
-            session.delivered(msg.getDeliveryID(), expired);
+   	try
+		{
+   		if (closed)
+   		{
+   			return;
+   		}
+   		
+   		//We pull the message from the buffer from inside the Runnable so we can ensure priority
+   		//ordering. If we just added a Runnable with the message to the executor immediately as we get it
+   		//we could not do that
+   		
+   		DeliverMessage message = buffer.removeFirst();
+   		
+   		if (message != null)
+   		{      		
+      		boolean expired = message.getMessage().isExpired();
+   
+            session.delivered(message.getDeliveryID(), expired);
             
             flowControl();
-
+   
             if (!expired)
             {
-               theListener.onMessage(msg.getMessage());
+         		onMessageThread = Thread.currentThread();
+         		
+               handler.onMessage(message.getMessage());
             }
-         }
-
-         synchronized (mainLock)
-         {
-            if (!buffer.isEmpty())
-            {
-               queueRunner();
-            }
-            else
-            {
-               listenerRunning = false;
-            }
-         }
-      }
-      catch (MessagingException e)
-      {
-         log.error("Failure in ListenerRunner", e);
-      }
+   		}
+		}
+		catch (MessagingException e)
+		{
+			log.error("Failed to execute", e);
+		}
+		catch (RuntimeException e)
+		{
+			log.error("RuntimeException thrown from handler", e);
+		}
    }
    
    // Inner classes
    // --------------------------------------------------------------------------------
 
-   /*
-    * This class is used to put on the handler executor to wait for onMessage
-    * invocations to complete when closing
-    */
-   private class Closer implements Runnable
-   {
-      Future result;
-
-      Closer(Future result)
-      {
-         this.result = result;
-      }
-
-      public void run()
-      {
-         result.setResult(null);
-      }
-   }
-     
-   private class ListenerRunner implements Runnable
-   {
-      public void run()
-      {
-         onMessageLoop();
-      }
-   }
 }

Deleted: trunk/src/main/org/jboss/messaging/util/Future.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/Future.java	2008-02-22 15:34:18 UTC (rev 3771)
+++ trunk/src/main/org/jboss/messaging/util/Future.java	2008-02-22 16:48:43 UTC (rev 3772)
@@ -1,85 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.util;
-
-
-/**
- * A Future
-
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- *
- */
-public class Future
-{
-   private static final Logger log = Logger.getLogger(Future.class);
-   
-   private Object result;
-   
-   private boolean isException;
-   
-   private boolean resultSet;
-   
-   public synchronized Object getResult()
-   {
-      while (!resultSet)
-      {
-         try
-         {
-            wait();
-         }
-         catch (InterruptedException e)
-         {
-            log.warn("Thread interrupted", e);
-         }
-      }
-      return result;
-   }
-   
-   public synchronized void setResult(Object theResult)
-   {
-      result = theResult;
-      
-      resultSet = true;
-      
-      notify();
-   }  
-   
-   public synchronized void setException(Throwable t)
-   {
-      result = t;
-      
-      isException = true;
-      
-      resultSet = true;
-      
-      notify();
-   }
-   
-   public boolean isException()
-   {
-      return isException;
-   }
-}
-




More information about the jboss-cvs-commits mailing list