[jboss-cvs] JBoss Messaging SVN: r1849 - in trunk: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/client/state src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/server/remoting tests tests/src/org/jboss/test/messaging/jms tests/src/org/jboss/test/messaging/jms/message tests/src/org/jboss/test/messaging/tools/jmx
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Dec 22 15:55:43 EST 2006
Author: timfox
Date: 2006-12-22 15:55:26 -0500 (Fri, 22 Dec 2006)
New Revision: 1849
Removed:
trunk/src/main/org/jboss/jms/client/remoting/HandleMessageResponse.java
Modified:
trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java
trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
trunk/tests/build.xml
trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
trunk/tests/src/org/jboss/test/messaging/jms/message/JMSExpirationHeaderTest.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
Log:
Mainly http://jira.jboss.com/jira/browse/JBMESSAGING-657
Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -72,10 +72,9 @@
ConnectionState connectionState = (ConnectionState)sessionState.getParent();
SessionDelegate sessionDelegate = (SessionDelegate)invocation.getTargetObject();
ConsumerState consumerState = (ConsumerState)((DelegateSupport)consumerDelegate).getState();
- int serverId = connectionState.getServerID();
int consumerID = consumerState.getConsumerID();
long channelID = consumerState.getChannelId();
- int prefetchSize = consumerState.getPrefetchSize();
+ int prefetchSize = consumerState.getBufferSize();
QueuedExecutor sessionExecutor = sessionState.getExecutor();
int maxDeliveries = consumerState.getMaxDeliveries();
@@ -94,31 +93,43 @@
//Now we have finished creating the client consumer, we can tell the SCD
//we are ready
- consumerDelegate.more();
+ consumerDelegate.changeRate(1);
return consumerDelegate;
}
public Object handleClosing(Invocation invocation) throws Throwable
{
- // First we make sure closing is called on the ServerConsumerEndpoint. This ensures that any
- // in-transit messages are flushed out to the client side.
-
- Object res = invocation.invokeNext();
+ ConsumerState consumerState = getState(invocation);
- ConsumerState consumerState = getState(invocation);
- SessionState sessionState = (SessionState)consumerState.getParent();
- ConnectionState connectionState = (ConnectionState)sessionState.getParent();
-
- // Then we call close on the messagecallbackhandler which waits for onMessage invocations
+ // First we call close on the messagecallbackhandler which waits for onMessage invocations
// to complete and then cancels anything in the client buffer.
+ // any further messages received will be ignored
consumerState.getMessageCallbackHandler().close();
+ long lastDeliveryId = consumerState.getMessageCallbackHandler().getLastDeliveryId();
+
+ SessionState sessionState = (SessionState)consumerState.getParent();
+ ConnectionState connectionState = (ConnectionState)sessionState.getParent();
+
sessionState.removeCallbackHandler(consumerState.getMessageCallbackHandler());
CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
cm.unregisterHandler(consumerState.getConsumerID());
+ // Then we make sure closing is called on the ServerConsumerEndpoint.
+
+ Object res = invocation.invokeNext();
+
+ //Now we send a message to the server consumer with the last delivery id so
+ //it can cancel any inflight messages after that
+ //This needs to be done *after* the call to closing has been executed on the server
+ //maybe it can be combined with closing
+
+ ConsumerDelegate del = (ConsumerDelegate)invocation.getTargetObject();
+
+ del.cancelInflightMessages(lastDeliveryId);
+
return res;
}
Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -38,7 +38,6 @@
import org.jboss.jms.message.MessageProxy;
import org.jboss.jms.server.endpoint.DefaultCancel;
import org.jboss.jms.server.endpoint.DeliveryInfo;
-import org.jboss.jms.tx.ClientTransaction;
import org.jboss.jms.tx.ResourceManager;
import org.jboss.logging.Logger;
@@ -67,31 +66,7 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
- private void ackDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
- {
- SessionDelegate connectionConsumerSession = delivery.getConnectionConsumerSession();
-
- //If the delivery was obtained via a connection consumer we need to ack via that
- //otherwise we just use this session
-
- SessionDelegate sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : sess;
-
- sessionToUse.acknowledgeDelivery(delivery);
- }
- private void cancelDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
- {
- SessionDelegate connectionConsumerSession = delivery.getConnectionConsumerSession();
-
- //If the delivery was obtained via a connection consumer we need to cancel via that
- //otherwise we just use this session
-
- SessionDelegate sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : sess;
-
- sessionToUse.cancelDelivery(new DefaultCancel(delivery.getDeliveryId(), delivery.getMessageProxy().getDeliveryCount()));
- }
-
public Object handleClosing(Invocation invocation) throws Throwable
{
MethodInvocation mi = (MethodInvocation)invocation;
@@ -176,6 +151,11 @@
state.getClientAckList().clear();
}
+
+ //TODO - we should also cancel any deliveries remaining in any transaction for the session
+ //so the delivery count gets updated to the server, and not rely on the server side close
+ //cancelling them
+
return invocation.invokeNext();
}
@@ -460,6 +440,30 @@
{
return (SessionState)((DelegateSupport)inv.getTargetObject()).getState();
}
+
+ private void ackDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
+ {
+ SessionDelegate connectionConsumerSession = delivery.getConnectionConsumerSession();
+
+ //If the delivery was obtained via a connection consumer we need to ack via that
+ //otherwise we just use this session
+
+ SessionDelegate sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : sess;
+
+ sessionToUse.acknowledgeDelivery(delivery);
+ }
+
+ private void cancelDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
+ {
+ SessionDelegate connectionConsumerSession = delivery.getConnectionConsumerSession();
+
+ //If the delivery was obtained via a connection consumer we need to cancel via that
+ //otherwise we just use this session
+
+ SessionDelegate sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : sess;
+
+ sessionToUse.cancelDelivery(new DefaultCancel(delivery.getDeliveryId(), delivery.getMessageProxy().getDeliveryCount()));
+ }
// Inner Classes -------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -169,7 +169,7 @@
int consumerID = consumerDelegate.getID();
- int prefetchSize = consumerDelegate.getPrefetchSize();
+ int bufferSize = consumerDelegate.getBufferSize();
int maxDeliveries = consumerDelegate.getMaxDeliveries();
@@ -177,7 +177,7 @@
ConsumerState consumerState =
new ConsumerState(sessionState, consumerDelegate, dest, selector, noLocal,
- subscriptionName, consumerID, connectionConsumer, prefetchSize,
+ subscriptionName, consumerID, connectionConsumer, bufferSize,
maxDeliveries, channelId);
delegate.setState(consumerState);
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -71,12 +71,12 @@
}
// ConsumerDelegate implementation -------------------------------
-
+
/**
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void more()
+ public void cancelInflightMessages(long lastDeliveryId) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
@@ -85,6 +85,15 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
+ public void changeRate(float newRate)
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
+ /**
+ * This invocation should either be handled by the client-side interceptor chain or by the
+ * server-side endpoint.
+ */
public void close() throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
@@ -162,16 +171,6 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public void confirmDelivery(int count)
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
-
// Public --------------------------------------------------------
public String toString()
@@ -179,7 +178,7 @@
return "ConsumerDelegate[" + id + "](ChannelId=" + this.channelId+")" ;
}
- public int getPrefetchSize()
+ public int getBufferSize()
{
return bufferSize;
}
@@ -198,7 +197,7 @@
{
super.copyAttributes(newDelegate);
- this.bufferSize = ((ClientConsumerDelegate)newDelegate).getPrefetchSize();
+ this.bufferSize = ((ClientConsumerDelegate)newDelegate).getBufferSize();
this.maxDeliveries = ((ClientConsumerDelegate)newDelegate).getMaxDeliveries();
Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -21,9 +21,9 @@
*/
package org.jboss.jms.client.remoting;
-import java.util.List;
import java.util.Map;
+import org.jboss.jms.message.MessageProxy;
import org.jboss.jms.server.endpoint.ClientDelivery;
import org.jboss.jms.server.remoting.MessagingMarshallable;
import org.jboss.logging.Logger;
@@ -53,13 +53,21 @@
{
// Constants -----------------------------------------------------
- protected static final Logger log = Logger.getLogger(CallbackManager.class);
+ protected static final Logger log;
public static final String JMS_CALLBACK_SUBSYSTEM = "CALLBACK";
// Static --------------------------------------------------------
protected static CallbackManager theManager;
+
+ private static boolean trace;
+
+ static
+ {
+ log = Logger.getLogger(CallbackManager.class);
+ trace = log.isTraceEnabled();
+ }
// Attributes ----------------------------------------------------
@@ -79,16 +87,20 @@
{
MessagingMarshallable mm = (MessagingMarshallable)callback.getParameter();
ClientDelivery dr = (ClientDelivery)mm.getLoad();
- List msgs = dr.getMessages();
+ MessageProxy msg = dr.getMessage();
MessageCallbackHandler handler = (MessageCallbackHandler)callbackHandlers.get(new Integer(dr.getConsumerId()));
if (handler == null)
{
- throw new IllegalStateException("Cannot find handler for consumer: " + dr.getConsumerId());
+ //This is OK and can happen if the callback handler is deregistered on consumer close,
+ //but there are messages still in transit which arrive later.
+ //In this case it is just safe to ignore the message
+ if (trace) { log.trace(this + " callback handler not found, message arrived after consumer is closed"); }
+ return;
}
- handler.handleMessage(msgs);
+ handler.handleMessage(msg);
}
// Public --------------------------------------------------------
Deleted: trunk/src/main/org/jboss/jms/client/remoting/HandleMessageResponse.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/HandleMessageResponse.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/remoting/HandleMessageResponse.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -1,86 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.client.remoting;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-
-import org.jboss.messaging.util.Streamable;
-
-/**
- * A HandleMessageResponse
- *
- * This is the response the server gets after delivering messages to a client consumer
-
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- *
- */
-public class HandleMessageResponse implements Streamable
-{
- private static final long serialVersionUID = 2500443290413453569L;
-
- private boolean full;
-
- private int messagesAccepted;
-
- public HandleMessageResponse()
- {
- }
-
- public HandleMessageResponse(boolean full, int messagesAccepted)
- {
- this.full = full;
-
- this.messagesAccepted = messagesAccepted;
- }
-
- public boolean clientIsFull()
- {
- return full;
- }
-
- public int getNumberAccepted()
- {
- return messagesAccepted;
- }
-
-
- // Streamable implementation
- // ---------------------------------------------------------------
-
- public void write(DataOutputStream out) throws Exception
- {
- out.writeBoolean(full);
-
- out.writeInt(messagesAccepted);
- }
-
- public void read(DataInputStream in) throws Exception
- {
- full = in.readBoolean();
-
- messagesAccepted = in.readInt();
- }
-}
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -165,12 +165,14 @@
private int ackMode;
private boolean closed;
private Object mainLock;
- private boolean serverSending;
- private int bufferSize;
+ private int maxBufferSize;
+ private int minBufferSize;
private QueuedExecutor sessionExecutor;
private boolean listenerRunning;
private int maxDeliveries;
private long channelID;
+ private boolean startSendingMessageSent;
+ private long lastDeliveryId = -1;
// Constructors --------------------------------------------------
@@ -185,7 +187,8 @@
throw new IllegalArgumentException(this + " bufferSize must be > 0");
}
- this.bufferSize = bufferSize;
+ this.maxBufferSize = bufferSize;
+ this.minBufferSize = bufferSize / 2;
buffer = new LinkedList();
isConnectionConsumer = isCC;
this.ackMode = ackMode;
@@ -193,77 +196,55 @@
this.consumerDelegate = cons;
this.consumerID = consumerID;
this.channelID = channelID;
- this.serverSending = true;
mainLock = new Object();
this.sessionExecutor = sessionExecutor;
this.maxDeliveries = maxDeliveries;
+ this.startSendingMessageSent = true;
}
// Public --------------------------------------------------------
/**
- * Handles a list of messages sent from the server
- * @param msgs The list of messages
- * @return The number of messages handled (placeholder for future - now we always accept all messages)
- * or -1 if closed
+ * Handles a message sent from the server
+ * @param msgs The message
*/
- public HandleMessageResponse handleMessage(List msgs) throws HandleCallbackException
+ public void handleMessage(MessageProxy msg)
{
- if (trace)
- {
- StringBuffer sb = new StringBuffer(this + " receiving [");
- for(int i = 0; i < msgs.size(); i++)
- {
- sb.append(((MessageProxy)msgs.get(i)).getMessage().getMessageID());
- if (i < msgs.size() - 1)
- {
- sb.append(",");
- }
- }
- sb.append("] from the remoting layer");
- log.trace(sb.toString());
- }
+ if (trace) { log.trace("Receiving message " + msg + " from the remoting layer"); }
synchronized (mainLock)
{
if (closed)
{
// Ignore
- return new HandleMessageResponse(false, 0);
+ if (trace) { log.trace(this + " is closed, so ignore message"); }
+ return;
}
- // Asynchronously confirm delivery on client
-
- try
- {
- sessionExecutor.execute(new ConfirmDelivery(msgs.size()));
- }
- catch (InterruptedException e)
- {
- log.warn("Thread interrupted", e);
- }
-
- // Put the messages in the buffer and notify any waiting receive()
-
- processMessages(msgs);
+ msg.setSessionDelegate(sessionDelegate, isConnectionConsumer);
+
+ msg.setReceived();
- buffer.addAll(msgs);
+ //Add it to the buffer
+ buffer.add(msg);
+ lastDeliveryId = msg.getDeliveryId();
+
if (trace) { log.trace(this + " added message(s) to the buffer"); }
- boolean full = buffer.size() >= bufferSize;
+ messageAdded();
- messagesAdded();
-
- if (full)
+ if (buffer.size() >= maxBufferSize)
{
- serverSending = false;
if (trace) { log.trace(this + " is full"); }
+
+ //We are full. Send message to server to tell it to stop sending
+
+ startSendingMessageSent = false;
+
+ sendChangeRateMessage(0);
}
-
- // For now we always accept all messages - in the future this may change
- return new HandleMessageResponse(full, msgs.size());
}
}
@@ -282,6 +263,7 @@
if (listener != null && !buffer.isEmpty())
{
listenerRunning = true;
+
this.queueRunner(new ListenerRunner());
}
}
@@ -331,7 +313,9 @@
for(Iterator i = buffer.iterator(); i.hasNext();)
{
MessageProxy mp = (MessageProxy)i.next();
+
DefaultCancel ack = new DefaultCancel(mp.getDeliveryId(), mp.getDeliveryCount());
+
cancels.add(ack);
}
@@ -342,34 +326,6 @@
if (trace) { log.trace(this + " closed"); }
}
-
- private void waitForOnMessageToComplete()
- {
- // Wait for any onMessage() executions to complete
-
- if (Thread.currentThread().equals(sessionExecutor.getThread()))
- {
- // the current thread already closing this MessageCallbackHandler (this happens when the
- // session is closed from within the MessageListener.onMessage(), for example), so no need
- // to register another Closer (see http://jira.jboss.org/jira/browse/JBMESSAGING-542)
- return;
- }
-
- Future result = new Future();
-
- try
- {
- sessionExecutor.execute(new Closer(result));
-
- if (trace) { log.trace(this + " blocking wait for Closer execution"); }
- result.getResult();
- if (trace) { log.trace(this + " got Closer result"); }
- }
- catch (InterruptedException e)
- {
- log.warn("Thread interrupted", e);
- }
- }
/**
* Method used by the client thread to get a Message, if available.
@@ -458,8 +414,6 @@
sessionDelegate.postDeliver(false);
}
- //postDeliver(sessionDelegate, isConnectionConsumer, false);
-
if (!m.getMessage().isExpired())
{
if (trace) { log.trace(this + ": message " + m + " is not expired, pushing it to the caller"); }
@@ -486,11 +440,13 @@
}
//This needs to be outside the lock
- if (buffer.isEmpty() && !serverSending)
+ if (!startSendingMessageSent && buffer.size() <= minBufferSize)
{
- //The server has previously stopped sending because the buffer was full
- //but now it is empty, so we tell the server to start sending again
- consumerDelegate.more();
+ //Tell the server we need more messages - but we don't want to keep sending the message
+ //if we've already sent it - hence the check
+ startSendingMessageSent = true;
+
+ sendChangeRateMessage(1);
}
m.incDeliveryCount();
@@ -498,7 +454,6 @@
return m;
}
-
public MessageListener getMessageListener()
{
return listener;
@@ -525,16 +480,121 @@
{
buffer.addFirst(proxy);
- messagesAdded();
+ messageAdded();
}
}
+
+ public void copyState(MessageCallbackHandler newHandler)
+ {
+ synchronized (mainLock)
+ {
+ this.consumerID = newHandler.consumerID;
+
+ this.consumerDelegate = newHandler.consumerDelegate;
+
+ this.sessionDelegate = newHandler.sessionDelegate;
+
+ this.buffer.clear();
+ }
+ }
+
+ public long getLastDeliveryId()
+ {
+ synchronized (mainLock)
+ {
+ return lastDeliveryId;
+ }
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
- protected long waitOnLock(Object lock, long waitTime) throws InterruptedException
+ private void waitForOnMessageToComplete()
{
+ // Wait for any onMessage() executions to complete
+
+ if (Thread.currentThread().equals(sessionExecutor.getThread()))
+ {
+ // the current thread already closing this MessageCallbackHandler (this happens when the
+ // session is closed from within the MessageListener.onMessage(), for example), so no need
+ // to register another Closer (see http://jira.jboss.org/jira/browse/JBMESSAGING-542)
+ return;
+ }
+
+ Future result = new Future();
+
+ try
+ {
+ sessionExecutor.execute(new Closer(result));
+
+ if (trace) { log.trace(this + " blocking wait for Closer execution"); }
+ result.getResult();
+ if (trace) { log.trace(this + " got Closer result"); }
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("Thread interrupted", e);
+ }
+ }
+
+ private void sendChangeRateMessage(float newRate)
+ {
+ //FIXME - We should be able to execute this invocation as a true
+ //remoting asynchronous invocation - i.e. it is written to the transport
+ //and no response is waited for
+ //Therefore there is no need to execute it here on a separate thread.
+ //Unfortunately remoting does not currently support this so this
+ //will be SLOW now.
+ try
+ {
+ consumerDelegate.changeRate(newRate);
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to send changeRate message", e);
+ }
+ }
+
+ private void queueRunner(ListenerRunner runner)
+ {
+ try
+ {
+ this.sessionExecutor.execute(runner);
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("Thread interrupted", e);
+ }
+ }
+
+ private void messageAdded()
+ {
+ // If we have a thread waiting on receive() we notify it
+ if (receiverThread != null)
+ {
+ if (trace) { log.trace(this + " notifying receiver thread"); }
+ mainLock.notify();
+ }
+ else if (listener != null)
+ {
+ // We have a message listener
+ if (!listenerRunning)
+ {
+ listenerRunning = true;
+
+ if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
+ this.queueRunner(new ListenerRunner());
+ }
+
+ //TODO - Execute onMessage on same thread for even better throughput
+ }
+ }
+
+ private long waitOnLock(Object lock, long waitTime) throws InterruptedException
+ {
long start = System.currentTimeMillis();
// Wait for last message to arrive
@@ -545,6 +605,7 @@
if (waited < waitTime)
{
waitTime = waitTime - waited;
+
return waitTime;
}
else
@@ -553,7 +614,7 @@
}
}
- protected MessageProxy getMessage(long timeout) throws JMSException
+ private MessageProxy getMessage(long timeout) throws JMSException
{
if (timeout == -1)
{
@@ -608,62 +669,8 @@
return m;
}
- protected void processMessages(List msgs)
- {
- Iterator iter = msgs.iterator();
-
- while (iter.hasNext())
- {
- MessageProxy msg = (MessageProxy)iter.next();
-
- // If this is the handler for a connection consumer we don't want to set the session
- // delegate since this is only used for client acknowledgement which is illegal for a
- // session used for an MDB
- msg.setSessionDelegate(sessionDelegate, isConnectionConsumer);
-
- msg.setReceived();
- }
- }
-
- // Private -------------------------------------------------------
-
- private void queueRunner(ListenerRunner runner)
- {
- try
- {
- this.sessionExecutor.execute(runner);
- }
- catch (InterruptedException e)
- {
- log.warn("Thread interrupted", e);
- }
- }
-
- private void messagesAdded()
- {
- // If we have a thread waiting on receive() we notify it
- if (receiverThread != null)
- {
- if (trace) { log.trace(this + " notifying receiver thread"); }
- mainLock.notify();
- }
- else if (listener != null)
- {
- // We have a message listener
- if (!listenerRunning)
- {
- listenerRunning = true;
-
- if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
- this.queueRunner(new ListenerRunner());
- }
-
- //TODO - Execute onMessage on same thread for even better throughput
- }
- }
-
// Inner classes -------------------------------------------------
-
+
/*
* This class is used to put on the listener executor to wait for onMessage
* invocations to complete when closing
@@ -703,7 +710,9 @@
if (listener == null)
{
listenerRunning = false;
+
if (trace) { log.trace("no listener, returning"); }
+
return;
}
@@ -712,6 +721,7 @@
if (buffer.isEmpty())
{
listenerRunning = false;
+
if (trace) { log.trace("no messages in buffer, marking listener as not running"); }
}
else
@@ -745,65 +755,23 @@
}
}
+
+ //Tell the server we need more messages - but we don't want to keep sending the message
+ //if we've already sent it - hence the check
+ if (!startSendingMessageSent && buffer.size() <= minBufferSize)
+ {
+ startSendingMessageSent = true;
+
+ sendChangeRateMessage(1);
+ }
+
if (again)
{
// Queue it up again
queueRunner(this);
- }
- else
- {
- if (!serverSending)
- {
- // Ask server for more messages
- try
- {
- consumerDelegate.more();
- }
- catch (JMSException e)
- {
- log.error("Failed to execute more()", e);
- }
- return;
- }
- }
+ }
}
- }
-
- /*
- * Used to asynchronously confirm to the server message arrival (delivery) on client.
- */
- private class ConfirmDelivery implements Runnable
- {
- int count;
-
- ConfirmDelivery(int count)
- {
- this.count = count;
- }
-
- public void run()
- {
- if (trace) { log.trace("confirming delivery on client of " + count + " message(s)"); }
- consumerDelegate.confirmDelivery(count);
- }
- }
-
- public void copyState(MessageCallbackHandler newHandler)
- {
- synchronized (mainLock)
- {
- this.consumerID = newHandler.consumerID;
-
- this.consumerDelegate = newHandler.consumerDelegate;
-
- this.sessionDelegate = newHandler.sessionDelegate;
-
- this.serverSending = false;
-
- this.buffer.clear();
- }
- }
-
+ }
}
Modified: trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConsumerState.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/state/ConsumerState.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -55,7 +55,7 @@
private MessageCallbackHandler messageCallbackHandler;
- private int prefetchSize;
+ private int bufferSize;
private SessionState parent;
@@ -68,7 +68,7 @@
public ConsumerState(SessionState parent, ConsumerDelegate delegate, Destination dest,
String selector, boolean noLocal, String subscriptionName, int consumerID,
- boolean isCC, int prefetchSize, int maxDeliveries, long channelId)
+ boolean isCC, int bufferSize, int maxDeliveries, long channelId)
{
super(parent, (DelegateSupport)delegate);
children = Collections.EMPTY_SET;
@@ -77,7 +77,7 @@
this.noLocal = noLocal;
this.consumerID = consumerID;
this.isConnectionConsumer = isCC;
- this.prefetchSize = prefetchSize;
+ this.bufferSize = bufferSize;
this.subscriptionName=subscriptionName;
this.maxDeliveries = maxDeliveries;
this.channelId = channelId;
@@ -134,9 +134,9 @@
return parent.getVersionToUse();
}
- public int getPrefetchSize()
+ public int getBufferSize()
{
- return prefetchSize;
+ return bufferSize;
}
public HierarchicalState getParent()
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -223,12 +223,7 @@
{
callbackHandlers.remove(new Integer(handler.getConsumerId()));
}
-
- public List getCallbackHandlers()
- {
- return new ArrayList(callbackHandlers.values());
- }
-
+
public int getSessionId()
{
return sessionId;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -23,9 +23,6 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.message.MessageProxy;
@@ -35,7 +32,7 @@
/**
*
* A ClientDelivery
- * Encapsulates a delivery of some messages to a client consumer
+ * Encapsulates a delivery of a messages to a client consumer
*
* There is no need to specify the server id since the client side CallbackManager is
* unique to the remoting connection
@@ -54,7 +51,7 @@
// Attributes ----------------------------------------------------
- private List msgs;
+ private MessageProxy msg;
private int consumerId;
@@ -64,9 +61,9 @@
{
}
- public ClientDelivery(List msgs, int consumerId)
+ public ClientDelivery(MessageProxy msg, int consumerId)
{
- this.msgs = msgs;
+ this.msg = msg;
this.consumerId = consumerId;
}
@@ -78,55 +75,37 @@
{
out.writeInt(consumerId);
- out.writeInt(msgs.size());
+ out.writeByte(msg.getMessage().getType());
+
+ out.writeInt(msg.getDeliveryCount());
- Iterator iter = msgs.iterator();
-
- while (iter.hasNext())
- {
- MessageProxy mp = (MessageProxy)iter.next();
-
- out.writeByte(mp.getMessage().getType());
+ out.writeLong(msg.getDeliveryId());
- out.writeInt(mp.getDeliveryCount());
-
- out.writeLong(mp.getDeliveryId());
-
- mp.getMessage().write(out);
- }
+ msg.getMessage().write(out);
}
public void read(DataInputStream in) throws Exception
{
consumerId = in.readInt();
- int numMessages = in.readInt();
+ byte type = in.readByte();
- msgs = new ArrayList(numMessages);
+ int deliveryCount = in.readInt();
- for (int i = 0; i < numMessages; i++)
- {
- byte type = in.readByte();
-
- int deliveryCount = in.readInt();
-
- long deliveryId = in.readLong();
-
- JBossMessage m = (JBossMessage)MessageFactory.createMessage(type);
+ long deliveryId = in.readLong();
+
+ JBossMessage m = (JBossMessage)MessageFactory.createMessage(type);
- m.read(in);
+ m.read(in);
- MessageProxy md = JBossMessage.createThinDelegate(deliveryId, m, deliveryCount);
-
- msgs.add(md);
- }
+ msg = JBossMessage.createThinDelegate(deliveryId, m, deliveryCount);
}
// Public --------------------------------------------------------
- public List getMessages()
+ public MessageProxy getMessage()
{
- return msgs;
+ return msg;
}
public int getConsumerId()
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -39,23 +39,15 @@
public interface ConsumerEndpoint extends Closeable
{
/**
- * If the client buffer has previously become full because the server was sending at a faster
- * rate than the client could consume, then the server will stop sending messages. When the
- * client has emptied the buffer it then needs to inform the server that it can receive more
- * messages by calling this method.
- *
- * @throws JMSException
+ * Sent to the server to specify a new maximum rate at which to send messages at
*/
- void more() throws JMSException;
-
+ void changeRate(float newRate) throws JMSException;
+
+
/**
- * The server consumer endpoint needs to know at any time how messages are in transit between
- * server and client. That is why it needs to receive confirmations every time the client
- * received one (or more) messages. The confirmation is sent asynchronously from client to server.
- * This is NOT a consumption acknowledgment.
- *
- * @param count - the number of messages received by the client in one batch.
+ * Cancels any deliveries with a delivery id > lastDeliveryId - these are inflight
+ * @param lastDeliveryId
*/
- void confirmDelivery(int count);
+ void cancelInflightMessages(long lastDeliveryId) throws JMSException;
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -21,10 +21,6 @@
*/
package org.jboss.jms.server.endpoint;
-
-import java.util.ArrayList;
-import java.util.List;
-
import javax.jms.IllegalStateException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@@ -33,8 +29,6 @@
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.message.MessageProxy;
import org.jboss.jms.selector.Selector;
-import org.jboss.jms.server.ConnectionManager;
-import org.jboss.jms.server.QueuedExecutorPool;
import org.jboss.jms.server.remoting.JMSDispatcher;
import org.jboss.jms.server.remoting.MessagingMarshallable;
import org.jboss.jms.util.ExceptionUtil;
@@ -49,15 +43,18 @@
import org.jboss.messaging.core.plugin.contract.PostOffice;
import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.tx.Transaction;
-import org.jboss.messaging.util.Future;
import org.jboss.remoting.callback.Callback;
+import org.jboss.remoting.callback.HandleCallbackException;
+import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
-import EDU.oswego.cs.dl.util.concurrent.Executor;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
/**
- * Concrete implementation of ConsumerEndpoint. Lives on the boundary between Messaging Core and the
+ * Concrete implementation of ConsumerEndpoint.
+ *
+ * Lives on the boundary between Messaging Core and the
* JMS Facade.
+ *
+ * Handles delivery of messages from the server to the client side consumer.
+ *
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -73,8 +70,6 @@
// Static --------------------------------------------------------
- private static final int MESSAGES_IN_TRANSIT_WAIT_COUNT = 100;
-
// Attributes ----------------------------------------------------
private boolean trace = log.isTraceEnabled();
@@ -86,84 +81,55 @@
private String queueName;
private ServerSessionEndpoint sessionEndpoint;
+
+ private ServerInvokerCallbackHandler callbackHandler;
+
+ private byte versionToUse;
private boolean noLocal;
private Selector messageSelector;
private JBossDestination destination;
+
+ private boolean started;
+
+ //This lock protects starting and stopping
+ private Object startStopLock;
- private List toDeliver;
-
// Must be volatile
- private volatile boolean clientConsumerFull;
-
- // Must be volatile
- private volatile boolean bufferFull;
-
- // Must be volatile
- private volatile boolean started;
-
- // No need to be volatile - is protected by lock
- private boolean closed;
-
- private Executor executor;
-
- private int prefetchSize;
-
- private Object lock;
-
- private Object messagesInTransitLock;
+ private volatile boolean clientAccepting;
- private int messagesInTransitCount; // access only from a region guarded by messagesInTransitLock
-
// Constructors --------------------------------------------------
ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
ServerSessionEndpoint sessionEndpoint,
- String selector, boolean noLocal, JBossDestination dest,
- int prefetchSize)
- throws InvalidSelectorException
+ String selector, boolean noLocal, JBossDestination dest)
+ throws InvalidSelectorException
{
if (trace) { log.trace("constructing consumer endpoint " + id); }
this.id = id;
+
this.messageQueue = messageQueue;
+
this.queueName = queueName;
+
this.sessionEndpoint = sessionEndpoint;
- this.prefetchSize = prefetchSize;
- // We always created with clientConsumerFull = true. This prevents the SCD sending messages to
- // the client before the client has fully finished creating the MessageCallbackHandler.
- this.clientConsumerFull = true;
-
- // We allocate an executor from the rotating pool for each consumer based on it's id
- // This gives better latency than each consumer for the destination using the same
- // executor
- QueuedExecutorPool pool =
- sessionEndpoint.getConnectionEndpoint().getServerPeer().getQueuedExecutorPool();
-
- this.executor = (QueuedExecutor)pool.get();
-
- // Note that using a PooledExecutor with a linked queue is not sufficient to ensure that
- // deliveries for the same consumer happen serially, since even if they are queued serially
- // the actual deliveries can happen in parallel, resulting in a later one "overtaking" an
- // earlier non-deterministicly depending on thread scheduling.
- // Consequently we use a QueuedExecutor to ensure the deliveries happen sequentially. We do
- // not want each ServerConsumerEndpoint instance to have its own instance - since we would
- // end up using too many threads, neither do we want to share the same instance amongst all
- // consumers - we do not want to serialize delivery to all consumers. So we maintain a bag of
- // QueuedExecutors and give them out to consumers as required. Different consumers can end up
- // using the same queuedexecutor concurrently if there are a lot of active consumers.
-
+ this.callbackHandler = sessionEndpoint.getConnectionEndpoint().getCallbackHandler();
+
+ this.versionToUse = sessionEndpoint.getConnectionEndpoint().getUsingVersion();
+
this.noLocal = noLocal;
this.destination = dest;
- this.toDeliver = new ArrayList();
+ //Always start as false - wait for consumer to initiate
+ this.clientAccepting = false;
- this.lock = new Object();
-
+ this.startStopLock = new Object();
+
if (selector != null)
{
if (trace) log.trace("creating selector:" + selector);
@@ -179,9 +145,6 @@
// prompt delivery
promptDelivery();
- messagesInTransitLock = new Object();
- messagesInTransitCount = 0;
-
log.debug(this + " constructed");
}
@@ -195,30 +158,26 @@
if (trace) { log.trace(this + " receives " + ref + " for delivery"); }
// This is ok to have outside lock - is volatile
- if (bufferFull)
+ if (!clientAccepting)
{
- // We buffer a maximum of PREFETCH_LIMIT messages at once
+ if (trace) { log.trace(this + " the client is not currently accepting messages"); }
- if (trace) { log.trace(this + " has reached prefetch size will not accept any more references"); }
-
return null;
}
-
- // Need to synchronized around the whole block to prevent setting started = false
- // but handle is already running and a message is deposited during the stop procedure.
- synchronized (lock)
- {
+
+ synchronized (startStopLock)
+ {
// If the consumer is stopped then we don't accept the message, it should go back into the
// queue for delivery later.
if (!started)
{
if (trace) { log.trace(this + " NOT started yet!"); }
-
+
return null;
}
-
+
if (trace) { log.trace(this + " has the main lock, preparing the message for delivery"); }
-
+
JBossMessage message = (JBossMessage)ref.getMessage();
boolean selectorRejected = !this.accept(message);
@@ -235,8 +194,8 @@
return delivery;
}
- long deliveryId = sessionEndpoint.addDelivery(delivery);
-
+ long deliveryId = sessionEndpoint.addDelivery(delivery, id);
+
// We don't send the message as-is, instead we create a MessageProxy instance. This allows
// local fields such as deliveryCount to be handled by the proxy but global data to be
// fielded by the same underlying Message instance. This allows us to avoid expensive
@@ -244,37 +203,42 @@
MessageProxy mp = JBossMessage.createThinDelegate(deliveryId, message, ref.getDeliveryCount());
- // Add the proxy to the list to deliver
-
- toDeliver.add(mp);
-
- bufferFull = toDeliver.size() >= prefetchSize;
-
- if (!clientConsumerFull)
- {
- try
- {
- Deliverer deliverer = new Deliverer();
- if (trace) { log.trace(this + " scheduling a new " + deliverer); }
- this.executor.execute(deliverer);
- }
- catch (InterruptedException e)
- {
- log.warn("Thread interrupted", e);
- }
+ //We send the message to the client on the current thread.
+ //The message is written onto the transport and then the thread returns immediately
+ //without waiting for a response
+
+ //FIXME - how can we ensure that a later send doesn't overtake an earlier send - this might
+ //happen if they are using different underlying TCP connections (e.g. from pool)
+
+ ClientDelivery del = new ClientDelivery(mp, id);
+
+ MessagingMarshallable mm = new MessagingMarshallable(versionToUse, del);
+
+ Callback callback = new Callback(mm);
+
+ try
+ {
+ //FIXME - we need to use the asynch callback API, this is the Sync one
+ callbackHandler.handleCallback(callback);
}
+ catch (HandleCallbackException e)
+ {
+ log.error("Failed to handle callback", e);
+
+ return null;
+ }
- return delivery;
+ return delivery;
}
}
-
// Filter implementation -----------------------------------------
public boolean accept(Routable r)
{
boolean accept = true;
- if (this.destination.isQueue())
+
+ if (destination.isQueue())
{
// For subscriptions message selection is handled in the Subscription itself
// we do not want to do the check twice
@@ -323,9 +287,11 @@
{
try
{
+ if (trace) { log.trace(this + " close"); }
+
localClose();
- sessionEndpoint.removeConsumer(id);
+ sessionEndpoint.removeConsumer(id);
}
catch (Throwable t)
{
@@ -335,78 +301,66 @@
// ConsumerEndpoint implementation -------------------------------
- /*
- * This is called by the client consumer to tell the server to wake up and start sending more
- * messages if available
- */
- public void more() throws JMSException
- {
+
+ public void changeRate(float newRate) throws JMSException
+ {
+ if (trace) { log.trace(this + " changeRate: " + newRate); }
+
try
- {
- // Set clientConsumerFull to false.
- //
- // NOTE! This must be done using a Runnable on the delivery executor - this is to prevent
- // the following race condition:
- // 1) Messages are delivered to the client, causing it to be full.
- // 2) The messages are consumed very quickly on the client causing more() to be called.
- // 3) more() hits the server BEFORE the deliverer thread has returned from delivering to
- // the client causing clientConsumerFull to be set to false and adding a deliverer to
- // the queue.
- // 4) The deliverer thread returns and sets clientConsumerFull to true.
- // 5) The next deliverer runs but doesn't do anything since clientConsumerFull = true even
- // though the client needs messages.
-
- executor.execute(new Runnable()
- {
- public void run()
- {
- if (trace) { log.trace(ServerConsumerEndpoint.this + " is notified that client wants more() messages"); }
- clientConsumerFull = false;
- }
- });
-
- // Run a deliverer to deliver any existing ones
- executor.execute(new Deliverer());
+ {
+ //For now we just support a binary on/off
+ //The client will send newRate = 0, to say it does not want any more messages when it's client side
+ //buffer gets full
+ //or it will send an arbitrary non zero number to say it does want more messages, when it's client side
+ //buffer empties to half it's full size.
+ //Note the client does not wait until the client side buffer is empty before sending a newRate(+ve)
+ //message since this would add extra latency.
- // TODO Why do we need to wait for it to execute? Why not just return immediately?
+ //In the future we can fine tune this by allowing the client to specify an actual rate in the newRate value
+ //so this is basically a placeholder for the future so we don't have to change the wire format when
+ //we support it
- // Now wait for it to execute
- Future result = new Future();
- this.executor.execute(new Waiter(result));
- result.getResult();
-
- // Now we know the deliverer has delivered any outstanding messages to the client buffer
+ //No need to synchronize - clientAccepting is volatile
- messageQueue.deliver(false);
- }
- catch (InterruptedException e)
- {
- log.warn("Thread interrupted", e);
- }
+ if (newRate == 0)
+ {
+ clientAccepting = false;
+ }
+ else
+ {
+ clientAccepting = true;
+
+ promptDelivery();
+ }
+ }
catch (Throwable t)
{
- throw ExceptionUtil.handleJMSInvocation(t, this + " more");
+ throw ExceptionUtil.handleJMSInvocation(t, this + " changeRate");
}
}
-
- public void confirmDelivery(int count)
+
+ /*
+ * This method is always called between closing() and close() being called
+ * Instead of having a new method we could perhaps somehow pass the last delivery id
+ * in with closing - then we don't need another message
+ */
+ public void cancelInflightMessages(long lastDeliveryId) throws JMSException
{
- synchronized(messagesInTransitLock)
+ if (trace) { log.trace(this + " cancelInflightMessages: " + lastDeliveryId); }
+
+ try
+ {
+ //Cancel all deliveries made by this consumer with delivery id > lastDeliveryId
+
+ sessionEndpoint.cancelDeliveriesForConsumerAfterDeliveryId(id, lastDeliveryId);
+ }
+ catch (Throwable t)
{
- messagesInTransitCount -= count;
-
- if (trace) { log.trace("confirming delivery of " + count + " message(s), messages in transit " + messagesInTransitCount); }
-
- if (messagesInTransitCount < 0)
- {
- log.error(this + " has an invalid messages in transit count (" +
- messagesInTransitCount + ")");
- }
-
- messagesInTransitLock.notifyAll();
- }
+ throw ExceptionUtil.handleJMSInvocation(t, this + " cancelInflightMessages");
+ }
}
+
public boolean isClosed() throws JMSException
{
throw new IllegalStateException("isClosed should never be handled on the server side");
@@ -433,48 +387,38 @@
void localClose() throws Throwable
{
- synchronized (lock)
- {
- if (trace) { log.trace(this + " grabbed the main lock in close() " + this); }
+ if (trace) { log.trace(this + " grabbed the main lock in close() " + this); }
- messageQueue.remove(this);
+ messageQueue.remove(this);
+
+ JMSDispatcher.instance.unregisterTarget(new Integer(id));
+
+ // If this is a consumer of a non durable subscription then we want to unbind the
+ // subscription and delete all its data.
+
+ if (destination.isTopic())
+ {
+ PostOffice postOffice =
+ sessionEndpoint.getConnectionEndpoint().getServerPeer().getPostOfficeInstance();
- JMSDispatcher.instance.unregisterTarget(new Integer(id));
-
- // If this is a consumer of a non durable subscription then we want to unbind the
- // subscription and delete all its data.
+ Binding binding = postOffice.getBindingForQueueName(queueName);
- if (destination.isTopic())
+ //Note binding can be null since there can many competing subscribers for the subscription -
+ //in which case the first will have removed the subscription and subsequently
+ //ones won't find it
+
+ if (binding != null && !binding.getQueue().isRecoverable())
{
- PostOffice postOffice =
- sessionEndpoint.getConnectionEndpoint().getServerPeer().getPostOfficeInstance();
-
- Binding binding = postOffice.getBindingForQueueName(queueName);
-
- //Note binding can be null since there can many competing subscribers for the subscription -
- //in which case the first will have removed the subscription and subsequently
- //ones won't find it
-
- if (binding != null && !binding.getQueue().isRecoverable())
- {
- postOffice.unbindQueue(queueName);
- }
+ postOffice.unbindQueue(queueName);
}
-
- closed = true;
}
+
}
void start()
{
- synchronized (lock)
- {
- // Can't start or stop it if it is closed.
- if (closed)
- {
- return;
- }
-
+ synchronized (startStopLock)
+ {
if (started)
{
return;
@@ -482,96 +426,53 @@
started = true;
}
-
+
// Prompt delivery
promptDelivery();
}
void stop() throws Throwable
- {
- // We need to:
- // - Stop accepting any new messages in the SCE.
- // - Flush any messages from the SCE to the buffer.
- // If the client consumer is now full, then we need to cancel the ones in the toDeliver list.
-
- // We need to lock since otherwise we could set started to false but the handle method was
- // already executing and messages might get deposited after.
- synchronized (lock)
- {
+ {
+ synchronized (startStopLock)
+ {
if (!started)
{
return;
}
- started = false;
- }
-
- // Now we know no more messages will be accepted in the SCE.
-
- try
- {
- // Flush any messages waiting to be sent to the client.
- this.executor.execute(new Deliverer());
+ started = false;
- if (trace) { log.trace(this + " flushed all remaining messages (if any) to the client"); }
-
- // Now we know any deliverer has delivered any outstanding messages to the client buffer.
- }
- catch (InterruptedException e)
- {
- log.warn("Thread interrupted", e);
- }
-
- // Make sure there are no messages in transit between server and client
-
- synchronized(messagesInTransitLock)
- {
- int loopCount = 0;
- while(messagesInTransitCount > 0 && loopCount < MESSAGES_IN_TRANSIT_WAIT_COUNT)
- {
- log.debug(this + " waiting for " + messagesInTransitCount + " message(s) in transit " +
- "to reach the client, " + (loopCount + 1) + " lock grab attempts.");
- messagesInTransitLock.wait(500);
- loopCount ++;
- }
-
- if (loopCount >= MESSAGES_IN_TRANSIT_WAIT_COUNT)
- {
- throw new IllegalStateException("Maximum number of lock grab attempts exceeded, " +
- "giving up to wait for messages in transit");
- }
-
- if (trace) { log.trace(this + " has no messages in transit"); }
- }
-
- // Now we know that there are no in flight messages on the way to the client consumer, but
- // there may be messages still in the toDeliver list since the client consumer might be full,
- // so we need to cancel these.
-
- if (!toDeliver.isEmpty())
- {
- synchronized (lock)
- {
- //Cancel in reverse order
- for (int i = toDeliver.size() - 1; i >= 0; i--)
- {
- MessageProxy proxy = (MessageProxy)toDeliver.get(i);
-
- sessionEndpoint.cancelDelivery(proxy.getDeliveryId());
- }
- }
-
- toDeliver.clear();
+ /*
+ *
+ Any message deliveries already transit to the consumer, will just
+ be ignored by the MessageCallbackHandler since it will be closed
+
+ To clarify, the close protocol (from connection) is as follows:
+
+ 1) MessageCallbackHandler::close() - any messages in buffer are cancelled to the server session, and any
+ subsequent receive messages will be ignored
- bufferFull = false;
- }
-
- //Need to prompt delivery
- promptDelivery();
+ 2) ServerConsumerEndpoint::closing() causes stop() this flushes any deliveries yet to deliver to the client callback handler
+
+ 3) MessageCallbackHandler::cancelInflightMessages(long lastDeliveryId) - any deliveries after lastDeliveryId
+ for the consumer will be considered in flight and cancelled.
+
+ 4) ServerConsumerEndpoint:close() - endpoint is deregistered
+
+ 5) Session.close() - acks or cancels any remaining deliveries in the SessionState as appropriate
+
+ 6) ServerSessionEndpoint::close() - cancels any remaining deliveries and deregisters session
+
+ 7) Client side session executor is shutdown
+
+ 8) ServerConnectionEndpoint::close() - connection is deregistered.
+
+ 9) Remoting connection listener is removed and remoting connection stopped.
+
+ */
+ }
}
-
-
-
+
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
@@ -582,122 +483,5 @@
}
// Inner classes -------------------------------------------------
-
- /*
- * Delivers messages to the client
- * TODO - We can make this a bit more intelligent by letting it measure the rate
- * the client is consuming messages and send messages at that rate.
- * This would mean the client consumer wouldn't be full so often and more wouldn't have to be called
- * This should give higher throughput.
- */
- private class Deliverer implements Runnable
- {
- public void run()
- {
- // Is there anything to deliver? This is ok outside lock - is volatile.
- if (clientConsumerFull)
- {
- if (trace) { log.trace(this + " client consumer full, do nothing"); }
- return;
- }
-
- List list = null;
-
- synchronized (lock)
- {
- if (trace) { log.trace(this + " has the main lock, attempting delivery"); }
-
- if (!toDeliver.isEmpty())
- {
- list = new ArrayList(toDeliver);
- toDeliver.clear();
- bufferFull = false;
- }
- }
-
- if (list == null)
- {
- if (trace) { log.trace(this + " has a null list, returning"); }
- return;
- }
-
- ServerConnectionEndpoint connection =
- ServerConsumerEndpoint.this.sessionEndpoint.getConnectionEndpoint();
-
- ClientDelivery del = new ClientDelivery(list, id);
- MessagingMarshallable mm = new MessagingMarshallable(connection.getUsingVersion(), del);
- Callback callback = new Callback(mm);
-
- try
- {
- if (trace)
- {
- StringBuffer sb = new StringBuffer(ServerConsumerEndpoint.this + " handing [");
- for(int i = 0; i < list.size(); i++)
- {
- sb.append(((MessageProxy)list.get(i)).getMessage().getMessageID());
- if (i < list.size() - 1)
- {
- sb.append(",");
- }
- }
- sb.append("] over to the remoting layer");
- log.trace(sb.toString());
- }
-
- synchronized(messagesInTransitLock)
- {
- connection.getCallbackHandler().handleCallback(callback);
- messagesInTransitCount += list.size();
- }
-
- if (trace) { log.trace(ServerConsumerEndpoint.this + " handed messages over to the remoting layer"); }
-
- // We are NOT using Remoting's facility of acknowledging callbacks. A callback is sent
- // asynchronously, and there is no confirmation that the callback reached the client or
- // not.
-
- // TODO Previously, synchronous server-to-client invocations were used by the client
- // to report back whether is full or not. This cannot be achieved with asynchronous
- // callbacks, so the client must explicitely sent this information to the server,
- // with an invocation on its own.
- }
- catch(Throwable t)
- {
- log.warn("Failed to deliver the message to the client. See the server log for more details.");
- log.debug(ServerConsumerEndpoint.this + " failed to deliver the message to the client.", t);
-
- ConnectionManager mgr = connection.getServerPeer().getConnectionManager();
-
- mgr.handleClientFailure(connection.getRemotingClientSessionId());
- }
- }
-
- public String toString()
- {
- return "Deliverer[" + Integer.toHexString(hashCode()) + "]";
- }
- }
-
- /*
- * The purpose of this class is to put it on the QueuedExecutor and wait for it to run
- * We can then ensure that all the Runnables in front of it on the queue have also executed
- * We cannot just call shutdownAfterProcessingCurrentlyQueuedTasks() since the
- * QueueExecutor might be share by other consumers and we don't want to wait for their
- * tasks to complete
- */
- private static class Waiter implements Runnable
- {
- Future result;
-
- Waiter(Future result)
- {
- this.result = result;
- }
-
- public void run()
- {
- result.setResult(null);
- }
- }
+
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -29,6 +29,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -502,7 +503,7 @@
if (trace) { log.trace(this + " Recovered delivery " + deliveryId + ", " + del); }
- deliveries.put(new Long(deliveryId), del);
+ deliveries.put(new Long(deliveryId), new DeliveryRecord(del, -1));
}
}
@@ -702,6 +703,40 @@
}
// Package protected ---------------------------------------------
+
+ void cancelDeliveriesForConsumerAfterDeliveryId(int consumerId, long lastDeliveryId) throws Throwable
+ {
+ //Need to cancel in reverse
+
+ LinkedList toCancel = new LinkedList();
+
+ Iterator iter = deliveries.entrySet().iterator();
+
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ Long deliveryId = (Long)entry.getKey();
+
+ DeliveryRecord record = (DeliveryRecord)entry.getValue();
+
+ if (record.consumerId == consumerId && deliveryId.longValue() > lastDeliveryId)
+ {
+ iter.remove();
+
+ toCancel.addFirst(record);
+ }
+ }
+
+ iter = toCancel.iterator();
+
+ while (iter.hasNext())
+ {
+ DeliveryRecord record = (DeliveryRecord)iter.next();
+
+ record.del.cancel();
+ }
+ }
void removeBrowser(int browserId) throws Exception
{
@@ -786,11 +821,11 @@
if (trace) { log.trace(this + " cancelling delivery with delivery id: " + entry.getKey()); }
- Delivery del = (Delivery)entry.getValue();
+ DeliveryRecord rec = (DeliveryRecord)entry.getValue();
- del.cancel();
+ rec.del.cancel();
- channels.add(del.getObserver());
+ channels.add(rec.del.getObserver());
}
promptDelivery(channels);
@@ -806,26 +841,26 @@
void cancelDelivery(long deliveryId) throws Throwable
{
- Delivery del = (Delivery)deliveries.remove(new Long(deliveryId));
+ DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(deliveryId));
- if (del == null)
+ if (rec == null)
{
throw new IllegalStateException("Cannot find delivery to cancel " + deliveryId);
}
- del.cancel();
+ rec.del.cancel();
}
- long addDelivery(Delivery del)
+ long addDelivery(Delivery del, int consumerId)
{
long deliveryId = deliveryIdSequence.increment();
- deliveries.put(new Long(deliveryId), del);
+ deliveries.put(new Long(deliveryId), new DeliveryRecord(del, consumerId));
if (trace) { log.trace(this + " added delivery " + deliveryId + ": " + del); }
return deliveryId;
- }
+ }
void acknowledgeTransactionally(List acks, Transaction tx) throws Throwable
{
@@ -847,16 +882,16 @@
Long id = new Long(ack.getDeliveryId());
- Delivery del = (Delivery)deliveries.get(id);
+ DeliveryRecord rec = (DeliveryRecord)deliveries.get(id);
- if (del == null)
+ if (rec == null)
{
throw new IllegalStateException("Cannot find delivery to acknowledge " + ack);
}
deliveryCallback.addDeliveryId(id);
- del.acknowledge(tx);
+ rec.del.acknowledge(tx);
}
}
@@ -890,21 +925,21 @@
{
if (trace) { log.trace(this + " acknowledging delivery " + ack.getDeliveryId()); }
- Delivery del = (Delivery)deliveries.remove(new Long(ack.getDeliveryId()));
+ DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryId()));
- if (del == null)
+ if (rec == null)
{
throw new IllegalStateException("Cannot find delivery to acknowledge: " + ack.getDeliveryId());
}
- del.acknowledge(null);
+ rec.del.acknowledge(null);
}
private Delivery cancelDeliveryInternal(Cancel cancel) throws Throwable
{
- Delivery del = (Delivery)deliveries.remove(new Long(cancel.getDeliveryId()));
+ DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
- if (del == null)
+ if (rec == null)
{
throw new IllegalStateException("Cannot find delivery to cancel " + cancel.getDeliveryId());
}
@@ -922,17 +957,17 @@
if (dlq != null)
{
//reset delivery count to zero
- del.getReference().setDeliveryCount(0);
+ rec.del.getReference().setDeliveryCount(0);
- dlq.handle(null, del.getReference(), tx);
+ dlq.handle(null, rec.del.getReference(), tx);
- del.acknowledge(tx);
+ rec.del.acknowledge(tx);
}
else
{
log.warn("Cannot send to DLQ since DLQ has not been deployed! The message will be removed");
- del.acknowledge(tx);
+ rec.del.acknowledge(tx);
}
tx.commit();
@@ -946,12 +981,12 @@
}
else
{
- del.getReference().setDeliveryCount(cancel.getDeliveryCount());
+ rec.del.getReference().setDeliveryCount(cancel.getDeliveryCount());
- del.cancel();
+ rec.del.cancel();
}
- return del;
+ return rec.del;
}
private ConsumerDelegate failoverConsumer(JBossDestination jmsDestination,
@@ -991,7 +1026,7 @@
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID, binding.getQueue(),
binding.getQueue().getName(), this, selectorString, noLocal,
- jmsDestination, prefetchSize);
+ jmsDestination);
JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
@@ -1263,13 +1298,13 @@
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(),
binding.getQueue().getName(), this, selectorString, noLocal,
- jmsDestination, prefetchSize);
+ jmsDestination);
JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
ClientConsumerDelegate stub =
new ClientConsumerDelegate(consumerID, binding.getQueue().getChannelID(),
- prefetchSize, maxDeliveryAttempts);
+ prefetchSize, maxDeliveryAttempts);
synchronized (consumers)
{
@@ -1294,11 +1329,33 @@
}
}
+ // Inner classes -------------------------------------------------
+ /*
+ * Holds a record of a delivery - we need to store the consumer id as well
+ * hence this class
+ * The only reason we need to store the consumer id is that on consumer close, we need to
+ * cancel any deliveries corresponding to that consumer.
+ * We can't rely on the cancel being driven from the MessageCallbackHandler since
+ * the deliveries may have got lost in transit (ignored) since the consumer might have closed
+ * when they were in transit.
+ * In such a case we might otherwise end up with the consumer closing but not all it's deliveries being
+ * cancelled, which would mean they wouldn't be cancelled until the session is closed which is too late
+ */
+ private class DeliveryRecord
+ {
+ Delivery del;
+
+ int consumerId;
+
+ DeliveryRecord(Delivery del, int consumerId)
+ {
+ this.del = del;
+
+ this.consumerId = consumerId;
+ }
+ }
-
- // Inner classes -------------------------------------------------
-
/**
*
* The purpose of this class is to remove deliveries from the delivery list on commit
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -64,16 +64,16 @@
endpoint.closing();
}
- public void more() throws JMSException
+ public void changeRate(float newRate) throws JMSException
{
- endpoint.more();
+ endpoint.changeRate(newRate);
}
-
- public void confirmDelivery(int count)
+
+ public void cancelInflightMessages(long lastDeliveryId) throws JMSException
{
- endpoint.confirmDelivery(count);
+ endpoint.cancelInflightMessages(lastDeliveryId);
}
-
+
public boolean isClosed() throws JMSException
{
return endpoint.isClosed();
Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -38,7 +38,6 @@
import org.jboss.aop.Dispatcher;
import org.jboss.aop.joinpoint.MethodInvocation;
import org.jboss.jms.client.remoting.CallbackManager;
-import org.jboss.jms.client.remoting.HandleMessageResponse;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.Version;
@@ -97,19 +96,23 @@
protected static final byte CANCEL = 3;
protected static final byte CANCEL_LIST = 4;
protected static final byte SEND = 5;
- protected static final byte MORE = 6;
+ //protected static final byte MORE = 6;
+
+ protected static final byte CHANGE_RATE = 6;
+
protected static final byte SEND_TRANSACTION = 7;
protected static final byte GET_ID_BLOCK = 8;
protected static final byte RECOVER_DELIVERIES = 9;
- protected static final byte CONFIRM_DELIVERY = 10;
+ //protected static final byte CONFIRM_DELIVERY = 10;
+
+
// The response codes - start from 100
- protected static final byte CALLBACK = 100;
+ protected static final byte MESSAGE_DELIVERY = 100;
protected static final byte NULL_RESPONSE = 101;
protected static final byte ID_BLOCK_RESPONSE = 102;
- protected static final byte HANDLE_MESSAGE_RESPONSE = 103;
protected static final byte BROWSE_MESSAGE_RESPONSE = 104;
protected static final byte BROWSE_MESSAGES_RESPONSE = 105;
protected static final byte CALLBACK_LIST = 106;
@@ -223,11 +226,15 @@
if (trace) { log.trace("wrote send()"); }
}
- else if ("more".equals(methodName))
+ else if ("changeRate".equals(methodName))
{
- dos.writeByte(MORE);
+ dos.writeByte(CHANGE_RATE);
writeHeader(mi, dos);
+
+ Float f = (Float)mi.getArguments()[0];
+
+ dos.writeFloat(f.floatValue());
dos.flush();
@@ -332,20 +339,6 @@
if (trace) { log.trace("wrote sendUnackedAckInfos()"); }
}
- else if ("confirmDelivery".equals(methodName))
- {
- dos.writeByte(CONFIRM_DELIVERY);
-
- writeHeader(mi, dos);
-
- Integer count = (Integer)mi.getArguments()[0];
-
- dos.writeInt(count.intValue());
-
- dos.flush();
-
- if (trace) { log.trace("wrote confirmDelivery()"); }
- }
else if ("sendTransaction".equals(methodName))
{
dos.writeByte(SEND_TRANSACTION);
@@ -393,7 +386,7 @@
ClientDelivery dr = (ClientDelivery)param;
- dos.writeByte(CALLBACK);
+ dos.writeByte(MESSAGE_DELIVERY);
dos.writeUTF(req.getSessionId());
@@ -455,19 +448,6 @@
if (trace) { log.trace("wrote id block response"); }
}
- else if (res instanceof HandleMessageResponse)
- {
- //Return value from delivering messages to client
- dos.write(HANDLE_MESSAGE_RESPONSE);
-
- HandleMessageResponse response = (HandleMessageResponse)res;
-
- response.write(dos);
-
- dos.flush();
-
- if (trace) { log.trace("wrote handle message response"); }
- }
else if (res instanceof JBossMessage)
{
//Return value from browsing message
@@ -482,8 +462,7 @@
dos.flush();
if (trace) { log.trace("wrote browse message response"); }
- }
-
+ }
else if (res instanceof Message[])
{
//Return value from browsing messages
@@ -510,7 +489,7 @@
((ArrayList) res).size() > 0 &&
((ArrayList) res).get(0) instanceof Callback)
{
- // Comes from polled Callbacks.
+ // Comes from polled Callbacks. (HTTP transport??)
ArrayList callbackList = (ArrayList)res;
dos.write(CALLBACK_LIST);
dos.writeUTF(resp.getSessionId());
@@ -627,10 +606,16 @@
return request;
}
- case MORE:
+ case CHANGE_RATE:
{
MethodInvocation mi = readHeader(dis);
-
+
+ float f = dis.readFloat();
+
+ Object[] args = new Object[] {new Float(f)};
+
+ mi.setArguments(args);
+
InvocationRequest request =
new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
new MessagingMarshallable(version, mi), null, null, null);
@@ -802,24 +787,6 @@
return request;
}
- case CONFIRM_DELIVERY:
- {
- MethodInvocation mi = readHeader(dis);
-
- int count = dis.readInt();
-
- Object[] args = new Object[] {new Integer(count)};
-
- mi.setArguments(args);
-
- InvocationRequest request =
- new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
- new MessagingMarshallable(version, mi), null, null, null);
-
- if (trace) { log.trace("read confirmDelivery()"); }
-
- return request;
- }
case ID_BLOCK_RESPONSE:
{
IDBlock block = new IDBlock();
@@ -832,18 +799,6 @@
return resp;
}
- case HANDLE_MESSAGE_RESPONSE:
- {
- HandleMessageResponse res = new HandleMessageResponse();
-
- res.read(dis);
-
- InvocationResponse resp = new InvocationResponse(null, new MessagingMarshallable(version, res), false, null);
-
- if (trace) { log.trace("read handle message response"); }
-
- return resp;
- }
case BROWSE_MESSAGE_RESPONSE:
{
byte type = dis.readByte();
@@ -890,7 +845,7 @@
return resp;
}
- case CALLBACK:
+ case MESSAGE_DELIVERY:
{
String sessionId = dis.readUTF();
ClientDelivery dr = new ClientDelivery();
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/tests/build.xml 2006-12-22 20:55:26 UTC (rev 1849)
@@ -356,6 +356,12 @@
<antcall target="clustering-tests"/>
</target>
+ <target name="http-tests" depends="tests-jar, prepare-testdirs, clear-test-logs">
+ <antcall target="remote-tests">
+ <param name="test.remoting" value="http"/>
+ </antcall>
+ </target>
+
<target name="stress-tests" depends="tests-jar, prepare-testdirs, clear-test-logs">
<antcall target="invm-stress-tests"/>
<antcall target="remote-stress-tests"/> <!-- default remoting configuration (socket) -->
Modified: trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -767,12 +767,18 @@
Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sessSend.createProducer(queue);
prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ log.trace("Sending messages");
+
TextMessage tm1 = sessSend.createTextMessage("a");
TextMessage tm2 = sessSend.createTextMessage("b");
TextMessage tm3 = sessSend.createTextMessage("c");
prod.send(tm1);
prod.send(tm2);
prod.send(tm3);
+
+ log.trace("Sent messages");
+
sessSend.close();
assertRemainingMessages(3);
@@ -780,10 +786,20 @@
conn.start();
Session sessReceive = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ log.trace("Creating consumer");
+
MessageConsumer cons = sessReceive.createConsumer(queue);
+ log.trace("Created consumer");
+
MessageListenerAutoAck listener = new MessageListenerAutoAck(sessReceive);
+
+ log.trace("Setting message listener");
+
cons.setMessageListener(listener);
+
+ log.trace("Set message listener");
listener.waitForMessages();
Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -1081,13 +1081,25 @@
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer prod = sess.createProducer(queue);
- prod.send(sess.createTextMessage("1"));
- prod.send(sess.createTextMessage("2"));
- prod.send(sess.createTextMessage("3"));
+
+ TextMessage tm1 = sess.createTextMessage("1");
+
+ TextMessage tm2 = sess.createTextMessage("2");
+
+ TextMessage tm3 = sess.createTextMessage("3");
+
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+ log.trace("Creating consumer");
MessageConsumer cons1 = sess.createConsumer(queue);
- Message r1 = cons1.receive();
+ log.trace("Waiting for message");
+
+ TextMessage r1 = (TextMessage)cons1.receive();
+
+ assertEquals(tm1.getText(), r1.getText());
log.trace("Got first message");
@@ -1097,12 +1109,16 @@
MessageConsumer cons2 = sess.createConsumer(queue);
- log.trace("Wairting for second message");
- Message r2 = cons2.receive();
+ log.trace("Waiting for second message");
+ TextMessage r2 = (TextMessage)cons2.receive();
+
+ assertEquals(tm2.getText(), r2.getText());
log.trace("got second message");
- Message r3 = cons2.receive();
+ TextMessage r3 = (TextMessage)cons2.receive();
+
+ assertEquals(tm3.getText(), r3.getText());
r1.acknowledge();
r2.acknowledge();
@@ -1137,6 +1153,8 @@
prod.send(sess.createTextMessage("1"));
prod.send(sess.createTextMessage("2"));
prod.send(sess.createTextMessage("3"));
+
+ log.trace("Sent messages");
conn.start();
Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -38,7 +38,6 @@
import org.jboss.aop.Dispatcher;
import org.jboss.aop.joinpoint.MethodInvocation;
-import org.jboss.jms.client.remoting.HandleMessageResponse;
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
@@ -104,7 +103,7 @@
//Consumer
- protected Method moreMethod;
+ protected Method changeRateMethod;
//connection
@@ -150,7 +149,7 @@
//Consumer
- moreMethod = consumerDelegate.getMethod("more", null);
+ changeRateMethod = consumerDelegate.getMethod("changeRate", new Class[] { Float.TYPE });
//Connection
@@ -193,9 +192,9 @@
//Consumer
- public void testMore() throws Exception
+ public void testChangeRate() throws Exception
{
- wf.testMore();
+ wf.testChangeRate();
}
@@ -229,9 +228,9 @@
wf.testSerializableResponse();
}
- public void testCallBack() throws Exception
+ public void testMessageDelivery() throws Exception
{
- wf.testCallback();
+ wf.testMessageDelivery();
}
public void testIDBlockResponse() throws Exception
@@ -239,11 +238,8 @@
wf.testGetIdBlockResponse();
}
- public void testHandleMessageResponse() throws Exception
- {
- wf.testHandleMessageResponse();
- }
-
+ //TODO need a test for the polled callbacks
+
// Public --------------------------------------------------------
public static class SerializableObject implements Serializable
@@ -269,7 +265,7 @@
/**
* We extend the class so we have access to protected fields
*/
- class TestWireFormat extends JMSWireFormat
+ private class TestWireFormat extends JMSWireFormat
{
public void testAcknowledgeDelivery() throws Exception
{
@@ -564,7 +560,127 @@
}
+ public void testCancelDeliveries() throws Exception
+ {
+ long methodHash = 62365354;
+
+ int objectId = 54321;
+
+ List cancels = new ArrayList();
+
+ DefaultCancel cancel1 = new DefaultCancel(65654, 43);
+ DefaultCancel cancel2 = new DefaultCancel(65765, 2);
+ cancels.add(cancel1);
+ cancels.add(cancel2);
+
+ MethodInvocation mi = new MethodInvocation(null, methodHash, cancelDeliveriesMethod, cancelDeliveriesMethod, null);
+
+ mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
+
+ mi.setArguments(new Object[] {cancels});
+
+ MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
+
+ InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ OutputStream oos = new DataOutputStream(bos);
+
+ wf.write(ir, oos);
+
+ oos.flush();
+
+ byte[] bytes = bos.toByteArray();
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+
+ DataInputStream dis = new DataInputStream(bis);
+
+ //Check the bytes
+
+ //First byte should be version
+ assertEquals(77, dis.readByte());
+
+ //Next byte should be CANCEL_MESSAGES
+ assertEquals(JMSWireFormat.CANCEL_LIST, dis.readByte());
+
+ //Next int should be objectId
+ assertEquals(objectId, dis.readInt());
+
+ //Next long should be methodHash
+ assertEquals(methodHash, dis.readLong());
+
+ //Next should the size of the list
+
+ int size = dis.readInt();
+
+ assertEquals(2, size);
+
+ //then the AckInfos
+ long deliveryId = dis.readLong();
+ int deliveryCount = dis.readInt();
+ DefaultCancel rcancel1 = new DefaultCancel(deliveryId, deliveryCount);
+
+ deliveryId = dis.readLong();
+ deliveryCount = dis.readInt();
+ DefaultCancel rcancel2 = new DefaultCancel(deliveryId, deliveryCount);
+
+ assertEquals(cancel1.getDeliveryCount(), rcancel1.getDeliveryCount());
+
+ assertEquals(cancel1.getDeliveryId(), cancel1.getDeliveryId());
+
+ assertEquals(cancel2.getDeliveryCount(), rcancel2.getDeliveryCount());
+
+ assertEquals(cancel2.getDeliveryId(), cancel2.getDeliveryId());
+
+ //should be eos
+
+ try
+ {
+ dis.readByte();
+ fail("End of stream expected");
+ }
+ catch (EOFException e)
+ {
+ //Ok
+ }
+
+
+ bis.reset();
+
+ InputStream ois = new DataInputStream(bis);
+
+ InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
+
+ mm = (MessagingMarshallable)ir2.getParameter();
+
+ assertEquals(77, mm.getVersion());
+
+ MethodInvocation mi2 = (MethodInvocation)mm.getLoad();
+
+ assertEquals(methodHash, mi2.getMethodHash());
+
+ assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
+
+ List list = (List)mi2.getArguments()[0];
+
+ assertEquals(2, list.size());
+
+ DefaultCancel xack1 = (DefaultCancel)list.get(0);
+ DefaultCancel xack2 = (DefaultCancel)list.get(1);
+
+ assertEquals(cancel1.getDeliveryId(), xack1.getDeliveryId());
+
+ assertEquals(cancel1.getDeliveryCount(), xack1.getDeliveryCount());
+
+ assertEquals(cancel2.getDeliveryId(), xack2.getDeliveryId());
+
+ assertEquals(cancel2.getDeliveryCount(), xack2.getDeliveryCount());
+
+ }
+
/*
* Test that general serializable invocation requests are marshalled correctky
*/
@@ -961,127 +1077,7 @@
}
-
- public void testCancelDeliveries() throws Exception
- {
- long methodHash = 62365354;
-
- int objectId = 54321;
-
- List cancels = new ArrayList();
-
- DefaultCancel cancel1 = new DefaultCancel(65654, 43);
- DefaultCancel cancel2 = new DefaultCancel(65765, 2);
- cancels.add(cancel1);
- cancels.add(cancel2);
-
- MethodInvocation mi = new MethodInvocation(null, methodHash, cancelDeliveriesMethod, cancelDeliveriesMethod, null);
-
- mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
-
- mi.setArguments(new Object[] {cancels});
-
- MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
-
- InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
-
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
- OutputStream oos = new DataOutputStream(bos);
- wf.write(ir, oos);
-
- oos.flush();
-
- byte[] bytes = bos.toByteArray();
-
- ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
-
- DataInputStream dis = new DataInputStream(bis);
-
- //Check the bytes
-
- //First byte should be version
- assertEquals(77, dis.readByte());
-
- //Next byte should be CANCEL_MESSAGES
- assertEquals(JMSWireFormat.CANCEL_LIST, dis.readByte());
-
- //Next int should be objectId
- assertEquals(objectId, dis.readInt());
-
- //Next long should be methodHash
- assertEquals(methodHash, dis.readLong());
-
- //Next should the size of the list
-
- int size = dis.readInt();
-
- assertEquals(2, size);
-
- //then the AckInfos
- long deliveryId = dis.readLong();
- int deliveryCount = dis.readInt();
- DefaultCancel rcancel1 = new DefaultCancel(deliveryId, deliveryCount);
-
- deliveryId = dis.readLong();
- deliveryCount = dis.readInt();
- DefaultCancel rcancel2 = new DefaultCancel(deliveryId, deliveryCount);
-
- assertEquals(cancel1.getDeliveryCount(), rcancel1.getDeliveryCount());
-
- assertEquals(cancel1.getDeliveryId(), cancel1.getDeliveryId());
-
- assertEquals(cancel2.getDeliveryCount(), rcancel2.getDeliveryCount());
-
- assertEquals(cancel2.getDeliveryId(), cancel2.getDeliveryId());
-
- //should be eos
-
- try
- {
- dis.readByte();
- fail("End of stream expected");
- }
- catch (EOFException e)
- {
- //Ok
- }
-
-
- bis.reset();
-
- InputStream ois = new DataInputStream(bis);
-
- InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
-
- mm = (MessagingMarshallable)ir2.getParameter();
-
- assertEquals(77, mm.getVersion());
-
- MethodInvocation mi2 = (MethodInvocation)mm.getLoad();
-
- assertEquals(methodHash, mi2.getMethodHash());
-
- assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
-
- List list = (List)mi2.getArguments()[0];
-
- assertEquals(2, list.size());
-
- DefaultCancel xack1 = (DefaultCancel)list.get(0);
- DefaultCancel xack2 = (DefaultCancel)list.get(1);
-
- assertEquals(cancel1.getDeliveryId(), xack1.getDeliveryId());
-
- assertEquals(cancel1.getDeliveryCount(), xack1.getDeliveryCount());
-
- assertEquals(cancel2.getDeliveryId(), xack2.getDeliveryId());
-
- assertEquals(cancel2.getDeliveryCount(), xack2.getDeliveryCount());
-
- }
-
public void testNullResponse() throws Exception
{
MessagingMarshallable mm = new MessagingMarshallable((byte)77, null);
@@ -1129,19 +1125,23 @@
assertNull(mm.getLoad());
}
-
-
-
- public void testMore() throws Exception
+
+ public void testChangeRate() throws Exception
{
long methodHash = 62365354;
int objectId = 54321;
- MethodInvocation mi = new MethodInvocation(null, methodHash, moreMethod, moreMethod, null);
+ float rate = 123.45f;
- mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
+ MethodInvocation mi = new MethodInvocation(null, methodHash, changeRateMethod, changeRateMethod, null);
+ mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
+
+ Object[] args = new Object[] { new Float(rate) };
+
+ mi.setArguments(args);
+
MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
@@ -1165,8 +1165,8 @@
//First byte should be version
assertEquals(77, dis.readByte());
- //Second byte should be MORE
- assertEquals(JMSWireFormat.MORE, dis.readByte());
+ //Second byte should be CHANGE_RATE
+ assertEquals(JMSWireFormat.CHANGE_RATE, dis.readByte());
//Next int should be objectId
assertEquals(objectId, dis.readInt());
@@ -1174,6 +1174,11 @@
//Next long should be methodHash
assertEquals(methodHash, dis.readLong());
+ //Next should be the float
+ float f2 = dis.readFloat();
+
+ assertTrue(rate == f2);
+
//Now eos
try
{
@@ -1196,37 +1201,28 @@
MethodInvocation mi2 = (MethodInvocation)mm.getLoad();
+ Float f3 = (Float)mi2.getArguments()[0];
+
+ assertTrue(rate == f3.floatValue());
+
assertEquals(methodHash, mi2.getMethodHash());
assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
}
-
-
- public void testCallback() throws Exception
+
+ public void testMessageDelivery() throws Exception
{
int consumerID = 12345678;
JBossMessage m1 = new JBossMessage(123);
- JBossMessage m2 = new JBossMessage(456);
- JBossMessage m3 = new JBossMessage(789);
-
- List msgs = new ArrayList();
-
+
MessageProxy del1 = JBossMessage.createThinDelegate(1, m1, 7);
- MessageProxy del2 = JBossMessage.createThinDelegate(2, m2, 8);
- MessageProxy del3 = JBossMessage.createThinDelegate(3, m3, 9);
MessageTest.configureMessage(m1);
- MessageTest.configureMessage(m2);
- MessageTest.configureMessage(m3);
+
+ ClientDelivery dr = new ClientDelivery(del1, consumerID);
- msgs.add(del1);
- msgs.add(del2);
- msgs.add(del3);
-
- ClientDelivery dr = new ClientDelivery(msgs, consumerID);
-
ByteArrayOutputStream bos = new ByteArrayOutputStream();
OutputStream oos = new DataOutputStream(bos);
@@ -1248,18 +1244,15 @@
//First byte should be version
assertEquals(77, dis.readByte());
- //Second byte should be CALLBACK
- assertEquals(JMSWireFormat.CALLBACK, dis.readByte());
+ //Second byte should be MESSAGE_DELIVERY
+ assertEquals(JMSWireFormat.MESSAGE_DELIVERY, dis.readByte());
//Next should be sessionID
assertEquals("dummySessionId", dis.readUTF());
//Next int should be consumer id
assertEquals(12345678, dis.readInt());
-
- //Next int should be number of messages
- assertEquals(3, dis.readInt());
-
+
//Next byte should be type
assertEquals(JBossMessage.TYPE, dis.readByte());
@@ -1272,41 +1265,9 @@
//And now the message itself
JBossMessage r1 = new JBossMessage();
- r1.read(dis);
+ r1.read(dis);
-
- //Next byte should be type
- assertEquals(JBossMessage.TYPE, dis.readByte());
-
- //Next int should be delivery count
- assertEquals(8, dis.readInt());
-
- // Delivery id
- assertEquals(2, dis.readLong());
-
- //And now the message itself
- JBossMessage r2 = new JBossMessage();
-
- r2.read(dis);
-
-
- //Next byte should be type
- assertEquals(JBossMessage.TYPE, dis.readByte());
-
- //Next int should be delivery count
- assertEquals(9, dis.readInt());
-
- // Delivery id
- assertEquals(3, dis.readLong());
-
- //And now the message itself
- JBossMessage r3 = new JBossMessage();
-
- r3.read(dis);
-
MessageTest.ensureEquivalent(m1, r1);
- MessageTest.ensureEquivalent(m2, r2);
- MessageTest.ensureEquivalent(m3, r3);
//eos
try
@@ -1339,25 +1300,16 @@
ClientDelivery dr2 = (ClientDelivery)mm.getLoad();
- List msgs2 = dr2.getMessages();
+ MessageProxy p1 = dr2.getMessage();
assertEquals(consumerID, dr2.getConsumerId());
- MessageProxy p1 = (MessageProxy)msgs2.get(0);
- MessageProxy p2 = (MessageProxy)msgs2.get(1);
- MessageProxy p3 = (MessageProxy)msgs2.get(2);
-
+
assertEquals(del1.getDeliveryCount(), p1.getDeliveryCount());
- assertEquals(del2.getDeliveryCount(), p2.getDeliveryCount());
- assertEquals(del3.getDeliveryCount(), p3.getDeliveryCount());
-
+
JBossMessage q1 = p1.getMessage();
- JBossMessage q2 = p1.getMessage();
- JBossMessage q3 = p1.getMessage();
-
- MessageTest.ensureEquivalent(m1, q1);
- MessageTest.ensureEquivalent(m2, q2);
- MessageTest.ensureEquivalent(m3, q3);
+
+ MessageTest.ensureEquivalent(m1, q1);
}
@@ -1421,66 +1373,6 @@
assertEquals(block.getLow(), block3.getLow());
assertEquals(block.getHigh(), block3.getHigh());
}
-
- public void testHandleMessageResponse() throws Exception
- {
- HandleMessageResponse h = new HandleMessageResponse(true, 76876);
-
- MessagingMarshallable mm = new MessagingMarshallable((byte)77, h);
-
- InvocationResponse ir = new InvocationResponse(null, mm, false, null);
-
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
- OutputStream oos = new DataOutputStream(bos);
-
- wf.write(ir, oos);
-
- oos.flush();
-
- ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
-
- DataInputStream dis = new DataInputStream(bis);
-
- // First byte should be version
- assertEquals(77, dis.readByte());
-
- int b = dis.readByte();
-
- assertEquals(JMSWireFormat.HANDLE_MESSAGE_RESPONSE, b);
-
- HandleMessageResponse h2 = new HandleMessageResponse();
-
- h2.read(dis);
-
- assertEquals(h.clientIsFull(), h2.clientIsFull());
- assertEquals(h.getNumberAccepted(), h2.getNumberAccepted());
-
- //eos
- try
- {
- dis.readByte();
- fail("End of stream expected");
- }
- catch (EOFException e)
- {
- //Ok
- }
-
- bis.reset();
-
- InputStream ois = new DataInputStream(bis);
-
- InvocationResponse ir2 = (InvocationResponse)wf.read(ois, null);
-
- mm = (MessagingMarshallable)ir2.getResult();
-
- assertEquals(77, mm.getVersion());
-
- HandleMessageResponse h3 = (HandleMessageResponse)mm.getLoad();
-
- assertEquals(h.clientIsFull(), h3.clientIsFull());
- assertEquals(h.getNumberAccepted(), h3.getNumberAccepted());
- }
+
}
}
\ No newline at end of file
Modified: trunk/tests/src/org/jboss/test/messaging/jms/message/JMSExpirationHeaderTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/message/JMSExpirationHeaderTest.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/tests/src/org/jboss/test/messaging/jms/message/JMSExpirationHeaderTest.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -88,7 +88,7 @@
{
Message m = queueProducerSession.createMessage();
queueProducer.send(m, DeliveryMode.NON_PERSISTENT, 4, 5000);
- Message result = queueConsumer.receive(100);
+ Message result = queueConsumer.receive(1000);
assertEquals(m.getJMSMessageID(), result.getJMSMessageID());
}
@@ -97,7 +97,7 @@
Message m = queueProducerSession.createMessage();
queueProducer.send(m, DeliveryMode.NON_PERSISTENT, 4, 1000);
Thread.sleep(2000);
- assertNull(queueConsumer.receive(100));
+ assertNull(queueConsumer.receive(1000));
}
public void testExpirationOnReceiveNoWait() throws Exception
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2006-12-22 16:26:50 UTC (rev 1848)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2006-12-22 20:55:26 UTC (rev 1849)
@@ -1157,6 +1157,8 @@
//is always currently used - (we could make this configurable)
String transport = config.getRemotingTransport();
+
+ log.info("*** Using transport: " + transport);
String params = "/?marshaller=org.jboss.jms.server.remoting.JMSWireFormat&" +
"unmarshaller=org.jboss.jms.server.remoting.JMSWireFormat&" +
More information about the jboss-cvs-commits
mailing list