[jboss-cvs] JBoss Messaging SVN: r1623 - in branches/Branch_HTTP_Experiment: src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised tests/src/org/jboss/test/messaging/jms
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Nov 22 22:28:37 EST 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-11-22 22:28:32 -0500 (Wed, 22 Nov 2006)
New Revision: 1623
Added:
branches/Branch_HTTP_Experiment/tests/src/org/jboss/test/messaging/jms/ConsumerClosedTest.java
Modified:
branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java
branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
Log:
The client ashynchronously confirms message delivery to the server; it doesn't rely on server-to-client RPC anymore
Modified: branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2006-11-23 03:27:42 UTC (rev 1622)
+++ branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2006-11-23 03:28:32 UTC (rev 1623)
@@ -158,6 +158,16 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
+ /**
+ * This invocation should either be handled by the client-side interceptor chain or by the
+ * server-side endpoint.
+ */
+ public void confirmDelivery(int count)
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
+
// Public --------------------------------------------------------
public void init()
Modified: branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-11-23 03:27:42 UTC (rev 1622)
+++ branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-11-23 03:28:32 UTC (rev 1623)
@@ -247,7 +247,18 @@
// Ignore
return new HandleMessageResponse(false, 0);
}
-
+
+ // Asynchronously confirm delivery on client
+
+ try
+ {
+ sessionExecutor.execute(new ConfirmDelivery(msgs.size()));
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("Thread interrupted", e);
+ }
+
// Put the messages in the buffer and notify any waiting receive()
processMessages(msgs);
@@ -756,6 +767,26 @@
}
}
}
+
+ /*
+ * Used to asynchronously confirm to the server message arrival (delivery) on client.
+ */
+ private class ConfirmDelivery implements Runnable
+ {
+ int count;
+
+ ConfirmDelivery(int count)
+ {
+ this.count = count;
+ }
+
+ public void run()
+ {
+ if (trace) { log.trace("confirming delivery on client of " + count + " message(s)"); }
+ consumerDelegate.confirmDelivery(count);
+ }
+ }
+
}
Modified: branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java
===================================================================
--- branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java 2006-11-23 03:27:42 UTC (rev 1622)
+++ branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java 2006-11-23 03:28:32 UTC (rev 1623)
@@ -30,6 +30,8 @@
* The rest of the methods are handled in the advice stack.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ *
* @version <tt>$Revision$</tt>
*
* $Id$
@@ -37,11 +39,23 @@
public interface ConsumerEndpoint extends Closeable
{
/**
- * If the client buffer has previously become full because the server was sending at a faster rate than the
- * client could consume, then the server will stop sending messages.
- * When the client has emptied the buffer it then needs to inform the server that it can receive more messages
- * by calling this method
+ * If the client buffer has previously become full because the server was sending at a faster
+ * rate than the client could consume, then the server will stop sending messages. When the
+ * client has emptied the buffer it then needs to inform the server that it can receive more
+ * messages by calling this method.
+ *
* @throws JMSException
*/
- void more() throws JMSException;
+ void more() throws JMSException;
+
+ /**
+ * The server consumer endpoint needs to know at any time how messages are in transit between
+ * server and client. That is why it needs to receive confirmations every time the client
+ * received one (or more) messages. The confirmation is sent asynchronously from client to server.
+ * This is NOT a consumption acknowledgment.
+ *
+ * @param count - the number of messages received by the client in one batch.
+ */
+ void confirmDelivery(int count);
+
}
Modified: branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-11-23 03:27:42 UTC (rev 1622)
+++ branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-11-23 03:28:32 UTC (rev 1623)
@@ -27,7 +27,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.HashMap;
import javax.jms.IllegalStateException;
import javax.jms.InvalidSelectorException;
@@ -57,7 +56,6 @@
import org.jboss.messaging.core.tx.TxCallback;
import org.jboss.messaging.util.Future;
import org.jboss.remoting.callback.Callback;
-import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -81,6 +79,7 @@
// Static --------------------------------------------------------
private static final int MAX_DELIVERY_ATTEMPTS = 10;
+ private static final int MESSAGES_IN_TRANSIT_WAIT_COUNT = 100;
// Attributes ----------------------------------------------------
@@ -124,6 +123,9 @@
private Object lock;
private Map deliveries;
+
+ private Object messagesInTransitLock;
+ private int messagesInTransitCount; // access only from a region guarded by messagesInTransitLock
// Constructors --------------------------------------------------
@@ -195,6 +197,9 @@
// prompt delivery
messageQueue.deliver(false);
+
+ messagesInTransitLock = new Object();
+ messagesInTransitCount = 0;
log.debug(this + " constructed");
}
@@ -256,9 +261,9 @@
{
return delivery;
}
+
+ deliveries.put(new Long(ref.getMessageID()), delivery);
- deliveries.put(new Long(ref.getMessageID()), delivery);
-
// We don't send the message as-is, instead we create a MessageProxy instance. This allows
// local fields such as deliveryCount to be handled by the proxy but global data to be
// fielded by the same underlying Message instance. This allows us to avoid expensive
@@ -406,34 +411,39 @@
{
try
{
- /*
- Set clientConsumerFull to false
- NOTE! This must be done using a Runnable on the delivery executor - this is to
- prevent the following race condition:
- 1) Messages are delivered to the client, causing it to be full
- 2) The messages are consumed very quickly on the client causing more to be called()
- 3) more() hits the server BEFORE the deliverer thread has returned from delivering to the client
- causing clientConsumerFull to be set to false and adding a deliverer to the queue.
- 4) The deliverer thread returns and sets clientConsumerFull to true
- 5) The next deliverer runs but doesn't do anything since clientConsumerFull = true even
- though the client needs messages
- */
- this.executor.execute(new Runnable() { public void run() { clientConsumerFull = false; } });
+ // Set clientConsumerFull to false.
+ //
+ // NOTE! This must be done using a Runnable on the delivery executor - this is to prevent
+ // the following race condition:
+ // 1) Messages are delivered to the client, causing it to be full.
+ // 2) The messages are consumed very quickly on the client causing more() to be called.
+ // 3) more() hits the server BEFORE the deliverer thread has returned from delivering to
+ // the client causing clientConsumerFull to be set to false and adding a deliverer to
+ // the queue.
+ // 4) The deliverer thread returns and sets clientConsumerFull to true.
+ // 5) The next deliverer runs but doesn't do anything since clientConsumerFull = true even
+ // though the client needs messages.
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ if (trace) { log.trace(ServerConsumerEndpoint.this + " is notified that client wants more() messages"); }
+ clientConsumerFull = false;
+ }
+ });
- //Run a deliverer to deliver any existing ones
- this.executor.execute(new Deliverer());
+ // Run a deliverer to deliver any existing ones
+ executor.execute(new Deliverer());
- //TODO Why do we need to wait for it to execute??
- //Why not just return immediately?
+ // TODO Why do we need to wait for it to execute? Why not just return immediately?
- //Now wait for it to execute
+ // Now wait for it to execute
Future result = new Future();
-
this.executor.execute(new Waiter(result));
-
result.getResult();
- //Now we know the deliverer has delivered any outstanding messages to the client buffer
+ // Now we know the deliverer has delivered any outstanding messages to the client buffer
messageQueue.deliver(false);
}
@@ -447,6 +457,24 @@
}
}
+ public void confirmDelivery(int count)
+ {
+ synchronized(messagesInTransitLock)
+ {
+ messagesInTransitCount -= count;
+
+ if (trace) { log.trace("confirming delivery of " + count + " message(s), messages in transit " + messagesInTransitCount); }
+
+ if (messagesInTransitCount < 0)
+ {
+ log.error(this + " has an invalid messages in transit count (" +
+ messagesInTransitCount + ")");
+ }
+
+ messagesInTransitLock.notifyAll();
+ }
+ }
+
// Public --------------------------------------------------------
public String toString()
@@ -635,10 +663,7 @@
// Flush any messages waiting to be sent to the client.
this.executor.execute(new Deliverer());
- // Now wait for it to execute.
- Future result = new Future();
- this.executor.execute(new Waiter(result));
- result.getResult();
+ if (trace) { log.trace(this + " flushed all remaining messages (if any) to the client"); }
// Now we know any deliverer has delivered any outstanding messages to the client buffer.
}
@@ -646,7 +671,29 @@
{
log.warn("Thread interrupted", e);
}
-
+
+ // Make sure there are no messages in transit between server and client
+
+ synchronized(messagesInTransitLock)
+ {
+ int loopCount = 0;
+ while(messagesInTransitCount > 0 && loopCount < MESSAGES_IN_TRANSIT_WAIT_COUNT)
+ {
+ log.debug(this + " waiting for " + messagesInTransitCount + " message(s) in transit " +
+ "to reach the client, " + (loopCount + 1) + " lock grab attempts.");
+ messagesInTransitLock.wait(500);
+ loopCount ++;
+ }
+
+ if (loopCount >= MESSAGES_IN_TRANSIT_WAIT_COUNT)
+ {
+ throw new IllegalStateException("Maximum number of lock grab attempts exceeded, " +
+ "giving up to wait for messages in transit");
+ }
+
+ if (trace) { log.trace(this + " has no messages in transit"); }
+ }
+
// Now we know that there are no in flight messages on the way to the client consumer, but
// there may be messages still in the toDeliver list since the client consumer might be full,
// so we need to cancel these.
@@ -687,7 +734,7 @@
private void checkDeliveryCount(SimpleDelivery del)
{
- //TODO - We need to put the message in a DLQ
+ // TODO - We need to put the message in a DLQ
// For now we just ack it otherwise the message will keep being retried and we'll never get
// anywhere
if (del.getReference().getDeliveryCount() > MAX_DELIVERY_ATTEMPTS)
@@ -750,17 +797,22 @@
int serverId = connection.getServerPeer().getServerPeerID();
+ // TODO How can we ensure that messages for the same consumer aren't delivered
+ // concurrently to the same consumer on different threads?
+
+ ClientDelivery del = new ClientDelivery(list, serverId, id);
+ MessagingMarshallable mm = new MessagingMarshallable(connection.getUsingVersion(), del);
+ Callback callback = new Callback(mm);
+
try
{
- // TODO How can we ensure that messages for the same consumer aren't delivered
- // concurrently to the same consumer on different threads?
- ClientDelivery del = new ClientDelivery(list, serverId, id);
- MessagingMarshallable mm = new MessagingMarshallable(connection.getUsingVersion(), del);
- Callback callback = new Callback(mm);
-
if (trace) { log.trace(ServerConsumerEndpoint.this + " handing " + list.size() + " message(s) over to the remoting layer"); }
- connection.getCallbackHandler().handleCallback(callback);
+ synchronized(messagesInTransitLock)
+ {
+ connection.getCallbackHandler().handleCallback(callback);
+ messagesInTransitCount += list.size();
+ }
if (trace) { log.trace(ServerConsumerEndpoint.this + " handed messages over to the remoting layer"); }
Modified: branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
===================================================================
--- branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java 2006-11-23 03:27:42 UTC (rev 1622)
+++ branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java 2006-11-23 03:28:32 UTC (rev 1623)
@@ -73,6 +73,12 @@
{
endpoint.more();
}
+
+ public void confirmDelivery(int count)
+ {
+ endpoint.confirmDelivery(count);
+ }
+
// AdvisedSupport overrides --------------------------------------
Added: branches/Branch_HTTP_Experiment/tests/src/org/jboss/test/messaging/jms/ConsumerClosedTest.java
===================================================================
--- branches/Branch_HTTP_Experiment/tests/src/org/jboss/test/messaging/jms/ConsumerClosedTest.java 2006-11-23 03:27:42 UTC (rev 1622)
+++ branches/Branch_HTTP_Experiment/tests/src/org/jboss/test/messaging/jms/ConsumerClosedTest.java 2006-11-23 03:28:32 UTC (rev 1623)
@@ -0,0 +1,112 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+import javax.naming.InitialContext;
+import javax.jms.ConnectionFactory;
+import javax.jms.Queue;
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.management.ObjectName;
+
+/**
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class ConsumerClosedTest extends MessagingTestCase
+{
+ // Constants -----------------------------------------------------
+
+ public static final int NUMBER_OF_MESSAGES = 10;
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ InitialContext ic;
+
+ // Constructors --------------------------------------------------
+
+ public ConsumerClosedTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+
+ public void testMessagesSentDuringClose() throws Exception
+ {
+ ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+ Queue queue = (Queue)ic.lookup("/queue/ConsumerClosedTestQueue");
+
+ Connection c = cf.createConnection();
+ c.start();
+
+ Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer p = s.createProducer(queue);
+
+ for(int i = 0; i < NUMBER_OF_MESSAGES; i++)
+ {
+ p.send(s.createTextMessage("message" + i));
+ }
+
+ log.debug("all messages sent");
+
+ MessageConsumer cons = s.createConsumer(queue);
+ cons.close();
+
+ log.debug("consumer closed");
+
+ // make sure that all messages are in queue
+ ObjectName on =
+ new ObjectName("jboss.messaging.destination:service=Queue,name=ConsumerClosedTestQueue");
+ Integer count = (Integer)ServerManagement.getAttribute(on, "MessageCount");
+ assertEquals(NUMBER_OF_MESSAGES, count.intValue());
+
+ //Thread.sleep(900000000);
+
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ ServerManagement.start("all");
+
+ ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+
+ ServerManagement.deployQueue("ConsumerClosedTestQueue");
+
+ log.debug("setup done");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ ServerManagement.undeployQueue("ConsumerClosedTestQueue");
+
+ ic.close();
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
More information about the jboss-cvs-commits
mailing list