[jboss-cvs] JBoss Messaging SVN: r1482 - branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Oct 17 03:46:06 EDT 2006


Author: ron_sigal
Date: 2006-10-17 03:46:05 -0400 (Tue, 17 Oct 2006)
New Revision: 1482

Modified:
   branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
Log:
JBMESSAGING-207:  Uses standard Remoting callbacks.

Modified: branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-10-17 07:39:06 UTC (rev 1481)
+++ branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-10-17 07:46:05 UTC (rev 1482)
@@ -23,10 +23,14 @@
 
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidSelectorException;
@@ -56,6 +60,9 @@
 import org.jboss.messaging.core.tx.TransactionException;
 import org.jboss.messaging.core.tx.TxCallback;
 import org.jboss.messaging.util.Future;
+import org.jboss.remoting.callback.Callback;
+import org.jboss.remoting.callback.CallbackListener;
+import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
 import EDU.oswego.cs.dl.util.concurrent.Executor;
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -70,7 +77,7 @@
  *
  * $Id$
  */
-public class ServerConsumerEndpoint implements Receiver, ConsumerEndpoint
+public class ServerConsumerEndpoint implements Receiver, ConsumerEndpoint, CallbackListener
 {
    // Constants -----------------------------------------------------
 
@@ -123,6 +130,8 @@
 
    private Map deliveries;
    
+   private Set pendingCallbacks;
+
    // Constructors --------------------------------------------------
 
    protected ServerConsumerEndpoint(int id, PagingFilteredQueue messageQueue, String queueName,
@@ -194,6 +203,8 @@
       // prompt delivery
       messageQueue.deliver(false);
       
+      this.pendingCallbacks = new HashSet();
+      
       log.debug(this + " constructed");
    }
 
@@ -634,10 +645,13 @@
          // Flush any messages waiting to be sent to the client.
          this.executor.execute(new Deliverer());
          
-         // Now wait for it to execute.
-         Future result = new Future();
-         this.executor.execute(new Waiter(result));
-         result.getResult();
+         //Now wait for it to execute
+         synchronized (pendingCallbacks)
+         {
+            System.out.println("waiting on pendingCallbacks");
+            if (!pendingCallbacks.isEmpty())
+               pendingCallbacks.wait();
+         }
              
          // Now we know any deliverer has delivered any outstanding messages to the client buffer.
       }
@@ -760,20 +774,30 @@
             // concurrently to the same consumer on different threads?
             MessagingMarshallable mm = new MessagingMarshallable(connection.getUsingVersion(), del);
 
-            MessagingMarshallable resp = (MessagingMarshallable)connection.getCallbackClient().invoke(mm);
-
-            if (trace) { log.trace(ServerConsumerEndpoint.this + " handed messages over to the remoting layer"); }
-
-            HandleMessageResponse result = (HandleMessageResponse)resp.getLoad();
-
-            // For now we don't look at how many messages are accepted since they all will be.
-            // The field is a placeholder for the future.
-            if (result.clientIsFull())
+            Map metadata = new HashMap();
+            metadata.put(ServerInvokerCallbackHandler.CALLBACK_POSTPROCESS_LISTENER,
+                         ServerConsumerEndpoint.this);
+            Callback callback = new Callback(mm);
+            callback.setReturnPayload(metadata);
+            
+            synchronized (pendingCallbacks)
             {
-               // Stop the server sending any more messages to the client.
-               // This is ok outside lock.
-               clientConsumerFull = true;
+               pendingCallbacks.add(callback);
             }
+            
+            connection.getCallbackHandler().handleCallback(callback);
+            if (trace) { log.trace(ServerConsumerEndpoint.this + " handed messages over to the remoting layer"); }
+//
+//            HandleMessageResponse result = (HandleMessageResponse)resp.getLoad();
+//
+//            // For now we don't look at how many messages are accepted since they all will be.
+//            // The field is a placeholder for the future.
+//            if (result.clientIsFull())
+//            {
+//               // Stop the server sending any more messages to the client.
+//               // This is ok outside lock.
+//               clientConsumerFull = true;
+//            }
          }
          catch(Throwable t)
          {
@@ -870,4 +894,20 @@
       }
    }
    
+   /**
+    * To implement org.jboss.remoting.callback.CallbackListener
+    */
+   public void callbackSent(Callback callback)
+   {  
+      System.out.println("entering callbackSent(): " + callback);
+      synchronized (pendingCallbacks)
+      {
+         if (!pendingCallbacks.remove(callback))
+            log.warn("callbackSent() received unknown Callback: " + callback);
+         
+         if (pendingCallbacks.isEmpty())
+            pendingCallbacks.notify();
+      }
+   }
+   
 }




More information about the jboss-cvs-commits mailing list