[jboss-cvs] JBoss Messaging SVN: r2838 - trunk/src/main/org/jboss/jms/server/endpoint.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jul 4 12:46:37 EDT 2007


Author: timfox
Date: 2007-07-04 12:46:36 -0400 (Wed, 04 Jul 2007)
New Revision: 2838

Modified:
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
Log:
Work around remoting bug


Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-07-04 13:09:09 UTC (rev 2837)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-07-04 16:46:36 UTC (rev 2838)
@@ -51,6 +51,9 @@
 import org.jboss.remoting.callback.Callback;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
+import EDU.oswego.cs.dl.util.concurrent.Callable;
+import EDU.oswego.cs.dl.util.concurrent.TimedCallable;
+
 /**
  * Concrete implementation of ConsumerEndpoint. Lives on the boundary between Messaging Core and the
  * JMS Facade. Handles delivery of messages from the server to the client side consumer.
@@ -310,14 +313,13 @@
 
             if (callbackClient != null)
             {
-               invoker = callbackClient.getInvoker();
-                              
+               invoker = callbackClient.getInvoker();                              
             }
             else
             {
                // TODO: dummy synchronization object, in case there's no clientInvoker. This will
-               // happen during the first invocation anyway. It's a kludge, I know, but this whole
-               // synchronization thing is a huge kludge. Needs to be reviewed.
+               // happen during the first invocation anyway. It's a kludge, I know, but we will get rid of this
+            	// when we get rid of remoting
                invoker = new Object();
             }
             
@@ -325,9 +327,25 @@
             {
                // one way invocation, no acknowledgment sent back by the client
                if (trace) { log.trace(this + " submitting message " + message + " to the remoting layer to be sent asynchronously"); }
+                              
+               if (remote)
+               {
+               	// FIXME - there is a bug in remoting when using the bisocket transport that if an invocation is attempted when the connection is closed
+               	// it sends a request to create a socket, then waits for ever for the socket to be created.
+               	// Therefore if this is a message sucker, we excuted using a TimedCallable which is guaranteed to throw an exception if it
+               	// takes too long.
+               	// In this case the consumer will be stopped and later on it will get cleaned up
+               	               	
+               	//TODO make the timeout configurable
+               	Callable callable = new TimedCallable(new HandleCallbackCallable(callbackHandler, callback), 1000);
+               	
+               	callable.call();               	               
+               }
+               else
+               {               
+               	callbackHandler.handleCallbackOneway(callback);
+               }               
                
-               callbackHandler.handleCallbackOneway(callback);
-               
                //We store the delivery id so we know to wait for any deliveries in transit on close
                this.lastDeliveryID = deliveryId;
             }
@@ -355,7 +373,7 @@
          return delivery;
       }
    }
-
+      
    // Filter implementation ------------------------------------------------------------------------
 
    public boolean accept(Message msg)
@@ -633,5 +651,27 @@
    }
 
    // Inner classes --------------------------------------------------------------------------------
+   
+   private class HandleCallbackCallable implements Callable
+   {
+   	private ServerInvokerCallbackHandler handler;
+   	
+   	private Callback callback;
+   	
+   	HandleCallbackCallable(ServerInvokerCallbackHandler handler, Callback callback)
+   	{   		
+   		this.handler = handler;
+   		
+   		this.callback = callback;
+   	}
 
+		public Object call() throws Exception
+		{
+			handler.handleCallback(callback);
+			
+			return null;
+		}
+   	
+   }
+
 }




More information about the jboss-cvs-commits mailing list