[jboss-cvs] JBoss Messaging SVN: r1623 - in branches/Branch_HTTP_Experiment: src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised tests/src/org/jboss/test/messaging/jms

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Nov 22 22:28:37 EST 2006


Author: ovidiu.feodorov at jboss.com
Date: 2006-11-22 22:28:32 -0500 (Wed, 22 Nov 2006)
New Revision: 1623

Added:
   branches/Branch_HTTP_Experiment/tests/src/org/jboss/test/messaging/jms/ConsumerClosedTest.java
Modified:
   branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
   branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java
   branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
Log:
The client ashynchronously confirms message delivery to the server; it doesn't rely on server-to-client RPC anymore

Modified: branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2006-11-23 03:27:42 UTC (rev 1622)
+++ branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2006-11-23 03:28:32 UTC (rev 1623)
@@ -158,6 +158,16 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
+   /**
+    * This invocation should either be handled by the client-side interceptor chain or by the
+    * server-side endpoint.
+    */
+   public void confirmDelivery(int count)
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
+
    // Public --------------------------------------------------------
 
    public void init()

Modified: branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-11-23 03:27:42 UTC (rev 1622)
+++ branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-11-23 03:28:32 UTC (rev 1623)
@@ -247,7 +247,18 @@
             // Ignore
             return new HandleMessageResponse(false, 0);
          }
-                                      
+
+         // Asynchronously confirm delivery on client
+
+         try
+         {
+            sessionExecutor.execute(new ConfirmDelivery(msgs.size()));
+         }
+         catch (InterruptedException e)
+         {
+            log.warn("Thread interrupted", e);
+         }
+
          // Put the messages in the buffer and notify any waiting receive()
          
          processMessages(msgs);
@@ -756,6 +767,26 @@
          }
       }
    }
+
+   /*
+    * Used to asynchronously confirm to the server message arrival (delivery) on client.
+    */
+   private class ConfirmDelivery implements Runnable
+   {
+      int count;
+
+      ConfirmDelivery(int count)
+      {
+         this.count = count;
+      }
+
+      public void run()
+      {
+         if (trace) { log.trace("confirming delivery on client of " + count + " message(s)"); }
+         consumerDelegate.confirmDelivery(count);
+      }
+   }
+
 }
 
 

Modified: branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java
===================================================================
--- branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java	2006-11-23 03:27:42 UTC (rev 1622)
+++ branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java	2006-11-23 03:28:32 UTC (rev 1623)
@@ -30,6 +30,8 @@
  * The rest of the methods are handled in the advice stack.
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ *
  * @version <tt>$Revision$</tt>
  *
  * $Id$
@@ -37,11 +39,23 @@
 public interface ConsumerEndpoint extends Closeable
 {  
    /**
-    * If the client buffer has previously become full because the server was sending at a faster rate than the
-    * client could consume, then the server will stop sending messages.
-    * When the client has emptied the buffer it then needs to inform the server that it can receive more messages
-    * by calling this method
+    * If the client buffer has previously become full because the server was sending at a faster
+    * rate than the client could consume, then the server will stop sending messages. When the
+    * client has emptied the buffer it then needs to inform the server that it can receive more
+    * messages by calling this method.
+    *
     * @throws JMSException
     */
-   void more() throws JMSException;   
+   void more() throws JMSException;
+
+   /**
+    * The server consumer endpoint needs to know at any time how messages are in transit between
+    * server and client. That is why it needs to receive confirmations every time the client
+    * received one (or more) messages. The confirmation is sent asynchronously from client to server.
+    * This is NOT a consumption acknowledgment.
+    *
+    * @param count - the number of messages received by the client in one batch.
+    */
+   void confirmDelivery(int count);
+
 }

Modified: branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-11-23 03:27:42 UTC (rev 1622)
+++ branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-11-23 03:28:32 UTC (rev 1623)
@@ -27,7 +27,6 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
 
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidSelectorException;
@@ -57,7 +56,6 @@
 import org.jboss.messaging.core.tx.TxCallback;
 import org.jboss.messaging.util.Future;
 import org.jboss.remoting.callback.Callback;
-import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
 import EDU.oswego.cs.dl.util.concurrent.Executor;
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -81,6 +79,7 @@
    // Static --------------------------------------------------------
 
    private static final int MAX_DELIVERY_ATTEMPTS = 10;
+   private static final int MESSAGES_IN_TRANSIT_WAIT_COUNT = 100;
 
    // Attributes ----------------------------------------------------
 
@@ -124,6 +123,9 @@
    private Object lock;
 
    private Map deliveries;
+
+   private Object messagesInTransitLock;
+   private int messagesInTransitCount; // access only from a region guarded by messagesInTransitLock
    
    // Constructors --------------------------------------------------
 
@@ -195,6 +197,9 @@
       
       // prompt delivery
       messageQueue.deliver(false);
+
+      messagesInTransitLock = new Object();
+      messagesInTransitCount = 0;
       
       log.debug(this + " constructed");
    }
@@ -256,9 +261,9 @@
          {
             return delivery;
          }
+
+         deliveries.put(new Long(ref.getMessageID()), delivery);
    
-         deliveries.put(new Long(ref.getMessageID()), delivery);                 
-   
          // We don't send the message as-is, instead we create a MessageProxy instance. This allows
          // local fields such as deliveryCount to be handled by the proxy but global data to be
          // fielded by the same underlying Message instance. This allows us to avoid expensive
@@ -406,34 +411,39 @@
    {           
       try
       {
-         /*
-         Set clientConsumerFull to false
-         NOTE! This must be done using a Runnable on the delivery executor - this is to
-         prevent the following race condition:
-         1) Messages are delivered to the client, causing it to be full
-         2) The messages are consumed very quickly on the client causing more to be called()
-         3) more() hits the server BEFORE the deliverer thread has returned from delivering to the client
-         causing clientConsumerFull to be set to false and adding a deliverer to the queue.
-         4) The deliverer thread returns and sets clientConsumerFull to true
-         5) The next deliverer runs but doesn't do anything since clientConsumerFull = true even
-         though the client needs messages
-         */
-         this.executor.execute(new Runnable() { public void run() { clientConsumerFull = false; } });         
+         // Set clientConsumerFull to false.
+         //
+         // NOTE! This must be done using a Runnable on the delivery executor - this is to prevent
+         // the following race condition:
+         //  1) Messages are delivered to the client, causing it to be full.
+         //  2) The messages are consumed very quickly on the client causing more() to be called.
+         //  3) more() hits the server BEFORE the deliverer thread has returned from delivering to
+         //     the client causing clientConsumerFull to be set to false and adding a deliverer to
+         //     the queue.
+         //  4) The deliverer thread returns and sets clientConsumerFull to true.
+         //  5) The next deliverer runs but doesn't do anything since clientConsumerFull = true even
+         //     though the client needs messages.
+
+         executor.execute(new Runnable()
+         {
+            public void run()
+            {
+               if (trace) { log.trace(ServerConsumerEndpoint.this + " is notified that client wants more() messages"); }
+               clientConsumerFull = false;
+            }
+         });
                            
-         //Run a deliverer to deliver any existing ones
-         this.executor.execute(new Deliverer());
+         // Run a deliverer to deliver any existing ones
+         executor.execute(new Deliverer());
          
-         //TODO Why do we need to wait for it to execute??
-         //Why not just return immediately?
+         // TODO Why do we need to wait for it to execute? Why not just return immediately?
          
-         //Now wait for it to execute
+         // Now wait for it to execute
          Future result = new Future();
-         
          this.executor.execute(new Waiter(result));
-         
          result.getResult();
                   
-         //Now we know the deliverer has delivered any outstanding messages to the client buffer
+         // Now we know the deliverer has delivered any outstanding messages to the client buffer
          
          messageQueue.deliver(false);
       }
@@ -447,6 +457,24 @@
       }
    }
 
+   public void confirmDelivery(int count)
+   {
+      synchronized(messagesInTransitLock)
+      {
+         messagesInTransitCount -= count;
+
+         if (trace) { log.trace("confirming delivery of " + count + " message(s), messages in transit " + messagesInTransitCount); }
+
+         if (messagesInTransitCount < 0)
+         {
+            log.error(this + " has an invalid messages in transit count (" +
+                      messagesInTransitCount + ")");
+         }
+
+         messagesInTransitLock.notifyAll();
+      }
+   }
+
    // Public --------------------------------------------------------
    
    public String toString()
@@ -635,10 +663,7 @@
          // Flush any messages waiting to be sent to the client.
          this.executor.execute(new Deliverer());
          
-         // Now wait for it to execute.
-         Future result = new Future();
-         this.executor.execute(new Waiter(result));
-         result.getResult();
+         if (trace) { log.trace(this + " flushed all remaining messages (if any) to the client"); }
 
          // Now we know any deliverer has delivered any outstanding messages to the client buffer.
       }
@@ -646,7 +671,29 @@
       {
          log.warn("Thread interrupted", e);
       }
-            
+
+      // Make sure there are no messages in transit between server and client
+
+      synchronized(messagesInTransitLock)
+      {
+         int loopCount = 0;
+         while(messagesInTransitCount > 0 && loopCount < MESSAGES_IN_TRANSIT_WAIT_COUNT)
+         {
+            log.debug(this + " waiting for " + messagesInTransitCount + " message(s) in transit " +
+                      "to reach the client, " + (loopCount + 1) + " lock grab attempts.");
+            messagesInTransitLock.wait(500);
+            loopCount ++;
+         }
+
+         if (loopCount >= MESSAGES_IN_TRANSIT_WAIT_COUNT)
+         {
+            throw new IllegalStateException("Maximum number of lock grab attempts exceeded, " +
+                                            "giving up to wait for messages in transit");
+         }
+
+         if (trace) { log.trace(this + " has no messages in transit"); }
+      }
+
       // Now we know that there are no in flight messages on the way to the client consumer, but
       // there may be messages still in the toDeliver list since the client consumer might be full,
       // so we need to cancel these.
@@ -687,7 +734,7 @@
      
    private void checkDeliveryCount(SimpleDelivery del)
    {
-      //TODO - We need to put the message in a DLQ
+      // TODO - We need to put the message in a DLQ
       // For now we just ack it otherwise the message will keep being retried and we'll never get
       // anywhere
       if (del.getReference().getDeliveryCount() > MAX_DELIVERY_ATTEMPTS)
@@ -750,17 +797,22 @@
          
          int serverId = connection.getServerPeer().getServerPeerID();
 
+         // TODO How can we ensure that messages for the same consumer aren't delivered
+         // concurrently to the same consumer on different threads?
+
+         ClientDelivery del = new ClientDelivery(list, serverId, id);
+         MessagingMarshallable mm = new MessagingMarshallable(connection.getUsingVersion(), del);
+         Callback callback = new Callback(mm);
+
          try
          {
-            // TODO How can we ensure that messages for the same consumer aren't delivered
-            // concurrently to the same consumer on different threads?
-            ClientDelivery del = new ClientDelivery(list, serverId, id);
-            MessagingMarshallable mm = new MessagingMarshallable(connection.getUsingVersion(), del);
-            Callback callback = new Callback(mm);
-
             if (trace) { log.trace(ServerConsumerEndpoint.this + " handing " + list.size() + " message(s) over to the remoting layer"); }
 
-            connection.getCallbackHandler().handleCallback(callback);
+            synchronized(messagesInTransitLock)
+            {
+               connection.getCallbackHandler().handleCallback(callback);
+               messagesInTransitCount += list.size();
+            }
 
             if (trace) { log.trace(ServerConsumerEndpoint.this + " handed messages over to the remoting layer"); }
 

Modified: branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
===================================================================
--- branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java	2006-11-23 03:27:42 UTC (rev 1622)
+++ branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java	2006-11-23 03:28:32 UTC (rev 1623)
@@ -73,6 +73,12 @@
    {
       endpoint.more();
    }
+
+   public void confirmDelivery(int count)
+   {
+      endpoint.confirmDelivery(count);
+   }
+
   
    // AdvisedSupport overrides --------------------------------------
 

Added: branches/Branch_HTTP_Experiment/tests/src/org/jboss/test/messaging/jms/ConsumerClosedTest.java
===================================================================
--- branches/Branch_HTTP_Experiment/tests/src/org/jboss/test/messaging/jms/ConsumerClosedTest.java	2006-11-23 03:27:42 UTC (rev 1622)
+++ branches/Branch_HTTP_Experiment/tests/src/org/jboss/test/messaging/jms/ConsumerClosedTest.java	2006-11-23 03:28:32 UTC (rev 1623)
@@ -0,0 +1,112 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+import javax.naming.InitialContext;
+import javax.jms.ConnectionFactory;
+import javax.jms.Queue;
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.management.ObjectName;
+
+/**
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class ConsumerClosedTest extends MessagingTestCase
+{
+   // Constants -----------------------------------------------------
+
+   public static final int NUMBER_OF_MESSAGES = 10;
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   InitialContext ic;
+
+   // Constructors --------------------------------------------------
+
+   public ConsumerClosedTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+
+
+   public void testMessagesSentDuringClose() throws Exception
+   {
+      ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+      Queue queue = (Queue)ic.lookup("/queue/ConsumerClosedTestQueue");
+
+      Connection c = cf.createConnection();
+      c.start();
+
+      Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer p = s.createProducer(queue);
+
+      for(int i = 0; i < NUMBER_OF_MESSAGES; i++)
+      {
+         p.send(s.createTextMessage("message" + i));
+      }
+
+      log.debug("all messages sent");
+
+      MessageConsumer cons = s.createConsumer(queue);
+      cons.close();
+
+      log.debug("consumer closed");
+
+      // make sure that all messages are in queue
+      ObjectName on =
+         new ObjectName("jboss.messaging.destination:service=Queue,name=ConsumerClosedTestQueue");
+      Integer count = (Integer)ServerManagement.getAttribute(on, "MessageCount");
+      assertEquals(NUMBER_OF_MESSAGES, count.intValue());
+
+      //Thread.sleep(900000000);
+
+
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      ServerManagement.start("all");
+
+      ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+
+      ServerManagement.deployQueue("ConsumerClosedTestQueue");
+
+      log.debug("setup done");
+   }
+
+   protected void tearDown() throws Exception
+   {
+      ServerManagement.undeployQueue("ConsumerClosedTestQueue");
+
+      ic.close();
+
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}




More information about the jboss-cvs-commits mailing list