[jboss-cvs] JBoss Messaging SVN: r2507 - in trunk: src/main/org/jboss/jms/client/container and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Feb 28 18:46:51 EST 2007
Author: timfox
Date: 2007-02-28 18:46:51 -0500 (Wed, 28 Feb 2007)
New Revision: 2507
Added:
trunk/src/main/org/jboss/jms/wireformat/ClosingResponse.java
Removed:
trunk/src/main/org/jboss/jms/wireformat/ConsumerCancelInflightMessagesRequest.java
Modified:
trunk/src/main/org/jboss/jms/client/Closeable.java
trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
trunk/src/main/org/jboss/jms/server/remoting/DirectThreadPool.java
trunk/src/main/org/jboss/jms/wireformat/ClosingRequest.java
trunk/src/main/org/jboss/jms/wireformat/ConnectionStartRequest.java
trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java
trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/QueueTest.java
trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
trunk/tests/src/org/jboss/test/thirdparty/remoting/RemotingConnectionFailureTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-899
Modified: trunk/src/main/org/jboss/jms/client/Closeable.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/Closeable.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/Closeable.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -46,5 +46,5 @@
*
* @throws JMSException
*/
- void closing() throws JMSException;
+ long closing() throws JMSException;
}
Modified: trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -135,7 +135,7 @@
{
if (checkClosingAlreadyDone())
{
- return null;
+ return new Long(-1);
}
}
else if (isClose)
Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -115,24 +115,18 @@
public Object handleClosing(Invocation invocation) throws Throwable
{
ConsumerState consumerState = getState(invocation);
-
- // First we call close on the messagecallbackhandler which waits for onMessage invocations
- // to complete any further messages received will be ignored
- consumerState.getMessageCallbackHandler().close();
-
- // Then we make sure closing is called on the ServerConsumerEndpoint.
-
- Object res = invocation.invokeNext();
+
+ // We make sure closing is called on the ServerConsumerEndpoint.
+ // This returns us the last delivery id sent
- //Now we send a message to the server consumer with the last delivery id so
- //it can cancel any inflight messages after that
- //This needs to be done *after* the call to closing has been executed on the server
- //maybe it can be combined with closing
+ Long l = (Long)invocation.invokeNext();
- ConsumerDelegate del = (ConsumerDelegate)invocation.getTargetObject();
+ long lastDeliveryId = l.longValue();
- long lastDeliveryId = consumerState.getMessageCallbackHandler().getLastDeliveryId();
-
+ // First we call close on the messagecallbackhandler which waits for onMessage invocations
+ // to complete and the last delivery to arrive
+ consumerState.getMessageCallbackHandler().close(lastDeliveryId);
+
SessionState sessionState = (SessionState)consumerState.getParent();
ConnectionState connectionState = (ConnectionState)sessionState.getParent();
@@ -141,15 +135,10 @@
CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
cm.unregisterHandler(consumerState.getConsumerID());
- //Now we need to cancel any inflight messages - this must be done before
- //cancelling the message callback handler buffer, so that messages end up back in the channel
- //in the right order
- del.cancelInflightMessages(lastDeliveryId);
-
//And then we cancel any messages still in the message callback handler buffer
consumerState.getMessageCallbackHandler().cancelBuffer();
- return res;
+ return l;
}
public Object handleReceive(Invocation invocation) throws Throwable
Modified: trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -134,8 +134,7 @@
// already be cancelled after failover.
if (methodName.equals("cancelDelivery") ||
- methodName.equals("cancelDeliveries") ||
- methodName.equals("cancelInflightMessages"))
+ methodName.equals("cancelDeliveries"))
{
log.debug(this + " NOT resuming " + methodName + "(), let it wither and die");
Modified: trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -331,7 +331,7 @@
public Object handleClosing(Invocation invocation) throws Throwable
{
- return null;
+ return new Long(-1);
}
public Object handleClose(Invocation invocation) throws Throwable
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -103,11 +103,11 @@
doInvoke(client, req);
}
- public void closing() throws JMSException
+ public long closing() throws JMSException
{
RequestSupport req = new ClosingRequest(id, version);
- doInvoke(client, req);
+ return ((Long)doInvoke(client, req)).longValue();
}
// BrowserDelegate implementation ---------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -151,11 +151,11 @@
doInvoke(client, req);
}
- public void closing() throws JMSException
+ public long closing() throws JMSException
{
RequestSupport req = new ClosingRequest(id, version);
- doInvoke(client, req);
+ return ((Long)doInvoke(client, req)).longValue();
}
// ConnectionDelegate implementation ------------------------------------------------------------
@@ -239,7 +239,7 @@
{
RequestSupport req = new ConnectionStartRequest(id, version);
- doInvokeOneway(client, req);
+ doInvoke(client, req);
}
public void stop() throws JMSException
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -34,7 +34,6 @@
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.wireformat.CloseRequest;
import org.jboss.jms.wireformat.ClosingRequest;
-import org.jboss.jms.wireformat.ConsumerCancelInflightMessagesRequest;
import org.jboss.jms.wireformat.ConsumerChangeRateRequest;
import org.jboss.jms.wireformat.RequestSupport;
import org.jboss.logging.Logger;
@@ -117,22 +116,15 @@
doInvoke(client, req);
}
- public void closing() throws JMSException
+ public long closing() throws JMSException
{
RequestSupport req = new ClosingRequest(id, version);
- doInvoke(client, req);
+ return ((Long)doInvoke(client, req)).longValue();
}
// ConsumerDelegate implementation --------------------------------------------------------------
- public void cancelInflightMessages(long lastDeliveryId) throws JMSException
- {
- RequestSupport req = new ConsumerCancelInflightMessagesRequest(id, version, lastDeliveryId);
-
- doInvoke(client, req);
- }
-
public void changeRate(float newRate) throws JMSException
{
RequestSupport req = new ConsumerChangeRateRequest(id, version, newRate);
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -85,7 +85,7 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void closing() throws JMSException
+ public long closing() throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -142,11 +142,11 @@
doInvoke(client, req);
}
- public void closing() throws JMSException
+ public long closing() throws JMSException
{
RequestSupport req = new ClosingRequest(id, version);
- doInvoke(client, req);
+ return ((Long)doInvoke(client, req)).longValue();
}
// SessionDelegate implementation ---------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -96,11 +96,10 @@
if (handler == null)
{
- // This is OK and can happen if the callback handler is deregistered on consumer close,
- // but there are messages still in transit which arrive later. In this case it is just
- // safe to ignore the message.
+ // This should never happen since we wait for all deliveries to arrive before closing
+ // the consumer
- if (trace) { log.trace(this + " callback handler not found, message arrived after consumer is closed"); }
+ log.warn(this + " callback handler not found, message arrived after consumer is closed. Cancelling it bacdk to queue");
return;
}
Modified: trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -332,6 +332,8 @@
// cleaning up the callback listener
client.setDisconnectTimeout(0);
+
+ client.disconnect();
try
{
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -61,6 +61,9 @@
private static boolean trace;
+ private static final int WAIT_TIMEOUT = 30000;
+
+
static
{
log = Logger.getLogger(MessageCallbackHandler.class);
@@ -109,7 +112,7 @@
return false;
}
}
-
+
//This is static so it can be called by the asf layer too
public static void callOnMessage(SessionDelegate sess,
MessageListener listener,
@@ -200,8 +203,9 @@
private String queueName;
private long lastDeliveryId = -1;
private volatile boolean serverSending = true;
+ private boolean waitingForLastDelivery;
+
-
// Constructors ---------------------------------------------------------------------------------
public MessageCallbackHandler(boolean isCC, int ackMode,
@@ -306,10 +310,10 @@
{
MessageProxy mp = (MessageProxy)i.next();
- DefaultCancel ack =
+ DefaultCancel cancel =
new DefaultCancel(mp.getDeliveryId(), mp.getDeliveryCount(), false, false);
- cancels.add(ack);
+ cancels.add(cancel);
}
sessionDelegate.cancelDeliveries(cancels);
@@ -319,8 +323,12 @@
}
}
- public void close() throws JMSException
- {
+ public void close(long lastDeliveryId) throws JMSException
+ {
+ waitForLastDelivery(lastDeliveryId);
+
+ waitForOnMessageToComplete();
+
synchronized (mainLock)
{
log.debug(this + " closing");
@@ -340,9 +348,7 @@
this.listener = null;
}
-
- waitForOnMessageToComplete();
-
+
if (trace) { log.trace(this + " closed"); }
}
@@ -521,20 +527,54 @@
serverSending = true;
}
- public long getLastDeliveryId()
- {
- synchronized (mainLock)
- {
- return lastDeliveryId;
- }
- }
-
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
// Private --------------------------------------------------------------------------------------
+ /*
+ * Wait for the last delivery to arrive
+ */
+ private void waitForLastDelivery(long id)
+ {
+ if (trace) { log.trace("Waiting for last delivery id " + id); }
+
+ synchronized (mainLock)
+ {
+ waitingForLastDelivery = true;
+ try
+ {
+ long wait = WAIT_TIMEOUT;
+ while (lastDeliveryId != id && wait > 0)
+ {
+ long start = System.currentTimeMillis();
+ try
+ {
+ mainLock.wait(wait);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ wait -= (System.currentTimeMillis() - start);
+ }
+ if (trace && lastDeliveryId == id)
+ {
+ log.trace("Got last delivery");
+ }
+
+ if (lastDeliveryId != id)
+ {
+ log.warn("Timed out waiting for last delivery");
+ }
+ }
+ finally
+ {
+ waitingForLastDelivery = false;
+ }
+ }
+ }
+
private void handleMessageInternal(Object message) throws Exception
{
MessageProxy proxy = (MessageProxy) message;
@@ -545,8 +585,10 @@
{
if (closed)
{
- // Ignore
- if (trace) { log.trace(this + " is closed, so ignore message"); }
+ // This should never happen - we should always wait for all deliveries to arrive
+ // when closing
+ log.warn(this + " is closed, so ignoring message");
+
return;
}
@@ -556,7 +598,7 @@
buffer.addLast(proxy, proxy.getJMSPriority());
lastDeliveryId = proxy.getDeliveryId();
-
+
if (trace) { log.trace(this + " added message(s) to the buffer"); }
messageAdded();
@@ -654,11 +696,16 @@
private void messageAdded()
{
+ boolean notified = false;
+
// If we have a thread waiting on receive() we notify it
if (receiverThread != null)
{
- if (trace) { log.trace(this + " notifying receiver thread"); }
- mainLock.notify();
+ if (trace) { log.trace(this + " notifying receiver/waiter thread"); }
+
+ mainLock.notifyAll();
+
+ notified = true;
}
else if (listener != null)
{
@@ -673,6 +720,12 @@
//TODO - Execute onMessage on same thread for even better throughput
}
+
+ // Make sure we notify any thread waiting for last delivery
+ if (waitingForLastDelivery && !notified)
+ {
+ mainLock.notifyAll();
+ }
}
private long waitOnLock(Object lock, long waitTime) throws InterruptedException
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -42,12 +42,4 @@
* Sent to the server to specify a new maximum rate at which to send messages at
*/
void changeRate(float newRate) throws JMSException;
-
-
- /**
- * Cancels any deliveries with a delivery id > lastDeliveryId - these are inflight
- * @param lastDeliveryId
- */
- void cancelInflightMessages(long lastDeliveryId) throws JMSException;
-
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -206,9 +206,10 @@
}
}
- public void closing() throws JMSException
+ public long closing() throws JMSException
{
// Do nothing
+ return -1;
}
// Public ---------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -424,9 +424,11 @@
}
}
- public void closing() throws JMSException
+ public long closing() throws JMSException
{
log.trace(this + " closing (noop)");
+
+ return -1;
}
public void sendTransaction(TransactionRequest request,
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -58,15 +58,13 @@
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt> $Id: ServerConsumerEndpoint.java 2399
- * 2007-02-23 01:21:29Z ovidiu.feodorov at jboss.com $
+ * @version <tt>$Revision$</tt> $Id$
*/
public class ServerConsumerEndpoint implements Receiver, ConsumerEndpoint
{
// Constants ------------------------------------------------------------------------------------
- private static final Logger log = Logger
- .getLogger(ServerConsumerEndpoint.class);
+ private static final Logger log = Logger.getLogger(ServerConsumerEndpoint.class);
// Static ---------------------------------------------------------------------------------------
@@ -106,6 +104,8 @@
private boolean storeDeliveries;
+ private long lastDeliveryID = -1;
+
// Constructors ---------------------------------------------------------------------------------
ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
@@ -150,11 +150,13 @@
// This is a consumer of a non durable topic subscription. We don't need to store
// deliveries since if the consumer is closed or dies the refs go too.
this.storeDeliveries = false;
- } else
+ }
+ else
{
this.storeDeliveries = true;
}
+ //For now always true - revisit later
storeDeliveries = true;
if (selector != null)
@@ -166,7 +168,7 @@
}
this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
-
+
// adding the consumer to the queue
this.messageQueue.add(this);
@@ -204,7 +206,8 @@
try
{
sessionEndpoint.expireDelivery(delivery, expiryQueue);
- } catch (Throwable t)
+ }
+ catch (Throwable t)
{
log.error("Failed to expire delivery: " + delivery, t);
}
@@ -222,7 +225,7 @@
return null;
}
-
+
if (trace) { log.trace(this + " has startStopLock lock, preparing the message for delivery"); }
Message message = ref.getMessage();
@@ -241,9 +244,9 @@
if (storeDeliveries)
{
- deliveryId = sessionEndpoint.addDelivery(delivery, id, dlq,
- expiryQueue, redeliveryDelay);
- } else
+ deliveryId = sessionEndpoint.addDelivery(delivery, id, dlq, expiryQueue, redeliveryDelay);
+ }
+ else
{
deliveryId = -1;
}
@@ -273,20 +276,28 @@
if (callbackClient != null)
{
invoker = callbackClient.getInvoker();
- } else
+
+ }
+ else
{
// TODO: dummy synchronization object, in case there's no clientInvoker. This will
// happen during the first invocation anyway. It's a kludge, I know, but this whole
// synchronization thing is a huge kludge. Needs to be reviewed.
invoker = new Object();
}
-
+
synchronized (invoker)
{
// one way invocation, no acknowledgment sent back by the client
- if (trace) { log.trace(this + " submitting message " + message + " to the remoting layer to be sent asynchronously"); } callbackHandler.handleCallbackOneway(callback);
+ if (trace) { log.trace(this + " submitting message " + message + " to the remoting layer to be sent asynchronously"); }
+
+ callbackHandler.handleCallbackOneway(callback);
+
+ //We store the delivery id so we know to wait for any deliveries in transit on close
+ this.lastDeliveryID = deliveryId;
}
- } catch (HandleCallbackException e)
+ }
+ catch (HandleCallbackException e)
{
// it's an oneway callback, so exception could only have happened on the server, while
// trying to send the callback. This is a good reason to smack the whole connection.
@@ -345,13 +356,15 @@
// Closeable implementation ---------------------------------------------------------------------
- public void closing() throws JMSException
+ public long closing() throws JMSException
{
try
{
if (trace) { log.trace(this + " closing");}
stop();
+
+ return lastDeliveryID;
}
catch (Throwable t)
{
@@ -422,27 +435,6 @@
}
}
- /**
- * This method is always called between closing() and close() being called. Instead of having a
- * new method we could perhaps somehow pass the last delivery id in with closing - then we don't
- * need another message.
- */
- public void cancelInflightMessages(long lastDeliveryId) throws JMSException
- {
- if (trace) { log.trace(this + " cancelInflightMessages: " + lastDeliveryId); }
-
- try
- {
- // Cancel all deliveries made by this consumer with delivery id > lastDeliveryId
-
- sessionEndpoint.cancelDeliveriesForConsumerAfterDeliveryId(id, lastDeliveryId);
- }
- catch (Throwable t)
- {
- throw ExceptionUtil.handleJMSInvocation(t, this + " cancelInflightMessages");
- }
- }
-
// Public ---------------------------------------------------------------------------------------
public String toString()
@@ -552,7 +544,7 @@
}
started = false;
-
+
// Any message deliveries already transit to the consumer, will just be ignored by the
// MessageCallbackHandler since it will be closed.
//
@@ -564,8 +556,7 @@
// 2) ServerConsumerEndpoint::closing() causes stop() this flushes any deliveries yet to
// deliver to the client callback handler.
//
- // 3) MessageCallbackHandler::cancelInflightMessages(long lastDeliveryId) - any deliveries
- // after lastDeliveryId for the consumer will be considered in flight and cancelled.
+ // 3) MessageCallbackHandler waits for all deliveries to arrive at client side
//
// 4) ServerConsumerEndpoint:close() - endpoint is deregistered.
//
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -29,7 +29,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -302,10 +301,12 @@
}
}
- public void closing() throws JMSException
+ public long closing() throws JMSException
{
// currently does nothing
if (trace) log.trace(this + " closing (noop)");
+
+ return -1;
}
public void send(JBossMessage message, boolean checkForDuplicates) throws JMSException
@@ -747,41 +748,6 @@
}
}
- void cancelDeliveriesForConsumerAfterDeliveryId(int consumerId, long lastDeliveryId)
- throws Throwable
- {
- //Need to cancel in reverse
-
- LinkedList toCancel = new LinkedList();
-
- Iterator iter = deliveries.entrySet().iterator();
-
- while (iter.hasNext())
- {
- Map.Entry entry = (Map.Entry)iter.next();
-
- Long deliveryId = (Long)entry.getKey();
-
- DeliveryRecord record = (DeliveryRecord)entry.getValue();
-
- if (record.consumerId == consumerId && deliveryId.longValue() > lastDeliveryId)
- {
- iter.remove();
-
- toCancel.addFirst(record);
- }
- }
-
- iter = toCancel.iterator();
-
- while (iter.hasNext())
- {
- DeliveryRecord record = (DeliveryRecord)iter.next();
-
- record.del.cancel();
- }
- }
-
void removeBrowser(int browserId) throws Exception
{
synchronized (browsers)
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/BrowserAdvised.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -60,9 +60,9 @@
endpoint.close();
}
- public void closing() throws JMSException
+ public long closing() throws JMSException
{
- endpoint.closing();
+ return endpoint.closing();
}
public void reset() throws JMSException
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -63,9 +63,9 @@
endpoint.close();
}
- public void closing() throws JMSException
+ public long closing() throws JMSException
{
- endpoint.closing();
+ return endpoint.closing();
}
public SessionDelegate createSessionDelegate(boolean transacted,
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -59,9 +59,9 @@
endpoint.close();
}
- public void closing() throws JMSException
+ public long closing() throws JMSException
{
- endpoint.closing();
+ return endpoint.closing();
}
public void changeRate(float newRate) throws JMSException
@@ -69,11 +69,6 @@
endpoint.changeRate(newRate);
}
- public void cancelInflightMessages(long lastDeliveryId) throws JMSException
- {
- endpoint.cancelInflightMessages(lastDeliveryId);
- }
-
// AdvisedSupport overrides --------------------------------------
public Object getEndpoint()
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -70,9 +70,9 @@
endpoint.close();
}
- public void closing() throws JMSException
+ public long closing() throws JMSException
{
- endpoint.closing();
+ return endpoint.closing();
}
public void send(JBossMessage msg, boolean checkForDuplicates) throws JMSException
Modified: trunk/src/main/org/jboss/jms/server/remoting/DirectThreadPool.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/DirectThreadPool.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/server/remoting/DirectThreadPool.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -6,6 +6,8 @@
*/
package org.jboss.jms.server.remoting;
+import org.jboss.jms.server.endpoint.ServerConsumerEndpoint;
+import org.jboss.logging.Logger;
import org.jboss.util.threadpool.ThreadPool;
import org.jboss.util.threadpool.TaskWrapper;
import org.jboss.util.threadpool.Task;
@@ -22,6 +24,8 @@
{
// Constants ------------------------------------------------------------------------------------
+ private static final Logger log = Logger.getLogger(DirectThreadPool.class);
+
// Static ---------------------------------------------------------------------------------------
// Attributes -----------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/wireformat/ClosingRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ClosingRequest.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/wireformat/ClosingRequest.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -59,8 +59,9 @@
throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
}
- endpoint.closing();
- return null;
+ long id = endpoint.closing();
+
+ return new ClosingResponse(id);
}
public void write(DataOutputStream os) throws Exception
Added: trunk/src/main/org/jboss/jms/wireformat/ClosingResponse.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ClosingResponse.java (rev 0)
+++ trunk/src/main/org/jboss/jms/wireformat/ClosingResponse.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -0,0 +1,63 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.wireformat;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+
+public class ClosingResponse extends ResponseSupport
+{
+ private long id;
+
+ public ClosingResponse()
+ {
+ }
+
+ public ClosingResponse(long id)
+ {
+ super(PacketSupport.RESP_CLOSING);
+
+ this.id = id;
+ }
+
+ public Object getResponse()
+ {
+ return new Long(id);
+ }
+
+ public void write(DataOutputStream os) throws Exception
+ {
+ super.write(os);
+
+ os.writeLong(id);
+
+ os.flush();
+ }
+
+ public void read(DataInputStream is) throws Exception
+ {
+ id = is.readLong();
+ }
+
+}
+
Modified: trunk/src/main/org/jboss/jms/wireformat/ConnectionStartRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionStartRequest.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionStartRequest.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -80,16 +80,5 @@
os.flush();
}
- public Object getPayload()
- {
- OnewayInvocation oi = new OnewayInvocation(this);
-
- InvocationRequest request =
- new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
- oi, ONE_WAY_METADATA, null, null);
-
- return request;
- }
-
}
Deleted: trunk/src/main/org/jboss/jms/wireformat/ConsumerCancelInflightMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConsumerCancelInflightMessagesRequest.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/wireformat/ConsumerCancelInflightMessagesRequest.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -1,91 +0,0 @@
-
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.wireformat;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-
-import org.jboss.jms.server.endpoint.ConsumerEndpoint;
-
-/**
- *
- * A ConsumerCancelInflightMessagesRequest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- *
- */
-public class ConsumerCancelInflightMessagesRequest extends RequestSupport
-{
- private long lastDeliveryId;
-
- public ConsumerCancelInflightMessagesRequest()
- {
- }
-
- public ConsumerCancelInflightMessagesRequest(int objectId,
- byte version,
- long lastDeliveryId)
- {
- super(objectId, PacketSupport.REQ_CONSUMER_CANCELINFLIGHTMESSAGES, version);
-
- this.lastDeliveryId = lastDeliveryId;
- }
-
- public void read(DataInputStream is) throws Exception
- {
- super.read(is);
-
- lastDeliveryId = is.readLong();
- }
-
- public ResponseSupport serverInvoke() throws Exception
- {
- ConsumerEndpoint endpoint =
- (ConsumerEndpoint)Dispatcher.instance.getTarget(objectId);
-
- if (endpoint == null)
- {
- throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
- }
-
- endpoint.cancelInflightMessages(lastDeliveryId);
-
- return null;
- }
-
- public void write(DataOutputStream os) throws Exception
- {
- super.write(os);
-
- os.writeLong(lastDeliveryId);
-
- os.flush();
- }
-
-}
-
-
-
Modified: trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -110,7 +110,6 @@
// --------
public static final int REQ_CONSUMER_CHANGERATE = 401;
- public static final int REQ_CONSUMER_CANCELINFLIGHTMESSAGES = 402;
// Browser
// -------
@@ -160,6 +159,8 @@
public static final int RESP_BROWSER_NEXTMESSAGE = 100500;
public static final int RESP_BROWSER_HASNEXTMESSAGE = 100501;
public static final int RESP_BROWSER_NEXTMESSAGEBLOCK = 100502;
+
+ public static final int RESP_CLOSING = 100601;
public static PacketSupport createPacket(int id)
@@ -262,11 +263,7 @@
break;
// Consumer
-
- case REQ_CONSUMER_CANCELINFLIGHTMESSAGES:
- packet = new ConsumerCancelInflightMessagesRequest();
- break;
-
+
// Browser
case REQ_BROWSER_NEXTMESSAGE:
@@ -343,7 +340,10 @@
packet = new BrowserNextMessageBlockResponse();
break;
-
+ case RESP_CLOSING:
+ packet = new ClosingResponse();
+ break;
+
case SERIALIZED:
packet = new SerializedPacket();
break;
Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -3361,31 +3361,31 @@
TextMessage tm = (TextMessage)m;
count++;
- log.trace("Got message:" + count);
-
+ log.info(this + " Got message:" + count);
+
try
{
- log.trace("message:" + tm.getText());
+ log.info(this + " message:" + tm.getText());
if (count == 1)
{
if (!("a".equals(tm.getText())))
{
- log.trace("Should be a but was " + tm.getText());
+ log.info("Should be a but was " + tm.getText());
failed = true;
latch.release();
}
- log.trace("Throwing exception");
+ log.info("Throwing exception");
throw new RuntimeException("Aardvark");
}
else if (count == 2)
{
- log.trace("ack mode:" + sess.getAcknowledgeMode());
+ log.info("ack mode:" + sess.getAcknowledgeMode());
if (sess.getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE || sess.getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE)
{
//Message should be immediately redelivered
if (!("a".equals(tm.getText())))
{
- log.trace("Should be a but was " + tm.getText());
+ log.info("Should be a but was " + tm.getText());
failed = true;
latch.release();
}
@@ -3400,7 +3400,7 @@
//Transacted or CLIENT_ACKNOWLEDGE - next message should be delivered
if (!("b".equals(tm.getText())))
{
- log.trace("Should be b but was " + tm.getText());
+ log.info("Should be b but was " + tm.getText());
failed = true;
latch.release();
}
@@ -3412,7 +3412,7 @@
{
if (!("b".equals(tm.getText())))
{
- log.trace("Should be b but was " + tm.getText());
+ log.info("Should be b but was " + tm.getText());
failed = true;
latch.release();
}
@@ -3421,7 +3421,7 @@
{
if (!("c".equals(tm.getText())))
{
- log.trace("Should be c but was " + tm.getText());
+ log.info("Should be c but was " + tm.getText());
failed = true;
latch.release();
}
@@ -3435,7 +3435,7 @@
{
if (!("c".equals(tm.getText())))
{
- log.trace("Should be c but was " + tm.getText());
+ log.info("Should be c but was " + tm.getText());
failed = true;
latch.release();
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/QueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/QueueTest.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/tests/src/org/jboss/test/messaging/jms/QueueTest.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -21,15 +21,19 @@
*/
package org.jboss.test.messaging.jms;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Queue;
-import javax.jms.Connection;
import javax.jms.Session;
-import javax.jms.MessageProducer;
-import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
+import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
@@ -122,54 +126,84 @@
{
Queue queue = (Queue)ic.lookup("/queue/TestQueue");
- Connection conn1 = cf.createConnection();
+ // Maybe we could remove this counter after we are sure this test is fixed!
+ // I had to use a counter because this can work in some iterations.
+ for (int counter = 0; counter < 20; counter++)
+ {
+ log.info("Iteration = " + counter);
- Connection conn2 = cf.createConnection();
+ Connection conn1 = cf.createConnection();
- try
- {
- Session s = conn1.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ assertEquals(0, ((JBossConnection)conn1).getServerID());
- MessageProducer p = s.createProducer(queue);
+ Connection conn2 = cf.createConnection();
- for (int i = 0; i < 20; i++)
+ assertEquals(0, ((JBossConnection)conn2).getServerID());
+
+ try
{
- p.send(s.createTextMessage("message " + i));
- }
+ Session s = conn1.createSession(true, Session.AUTO_ACKNOWLEDGE);
- s.commit();
+ MessageProducer p = s.createProducer(queue);
- Session s2 = conn2.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ for (int i = 0; i < 20; i++)
+ {
+ p.send(s.createTextMessage("message " + i));
+ }
- // Create a consumer, start the session, close the consumer..
- // This shouldn't cause any message to be lost
- MessageConsumer c2 = s2.createConsumer(queue);
- conn2.start();
- c2.close();
+ s.commit();
- // open a new consumer
- c2 = s2.createConsumer(queue);
+ Session s2 = conn2.createSession(true, Session.AUTO_ACKNOWLEDGE);
- for (int i = 0; i < 20; i++)
- {
- TextMessage txt = (TextMessage)c2.receive(5000);
- assertNotNull(txt);
- assertEquals("message " + i, txt.getText());
- }
+ // these next three lines are an anti-pattern but they shouldn't loose any messages
+ MessageConsumer c2 = s2.createConsumer(queue);
+ conn2.start();
+ c2.close();
- assertNull(c2.receive(1000));
+ c2 = s2.createConsumer(queue);
- s2.commit();
- }
- finally
- {
- if (conn1 != null)
- {
- conn1.close();
+ //There is a possibility the messages arrive out of order if they hit the closed
+ //consumer and are cancelled back before delivery to the other consumer has finished.
+ //There is nothing much we can do about this
+ Set texts = new HashSet();
+
+ for (int i = 0; i < 20; i++)
+ {
+ TextMessage txt = (TextMessage)c2.receive(5000);
+ assertNotNull(txt);
+ texts.add(txt.getText());
+ }
+
+ for (int i = 0; i < 20; i++)
+ {
+ assertTrue(texts.contains("message " + i));
+ }
+
+ // Ovidiu: the test was originally invalid, a locally transacted session that is closed
+ // rolls back its transaction. I added s2.commit() to correct the test.
+ // JMS 1.1 Specifications, Section 4.3.5:
+ // "Closing a connection must roll back the transactions in progress on its
+ // transacted sessions*.
+ // *) The term 'transacted session' refers to the case where a session's commit and
+ // rollback methods are used to demarcate a transaction local to the session. In the
+ // case where a session's work is coordinated by an external transaction manager, a
+ // session's commit and rollback methods are not used and the result of a closed
+ // session's work is determined later by the transaction manager.
+
+ s2.commit();
+
+ assertNull(c2.receive(1000));
}
- if (conn2 != null)
+ finally
{
- conn2.close();
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
}
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -54,23 +54,23 @@
import org.jboss.jms.wireformat.BrowserNextMessageResponse;
import org.jboss.jms.wireformat.CloseRequest;
import org.jboss.jms.wireformat.ClosingRequest;
+import org.jboss.jms.wireformat.ClosingResponse;
import org.jboss.jms.wireformat.ConnectionCreateSessionDelegateRequest;
import org.jboss.jms.wireformat.ConnectionCreateSessionDelegateResponse;
import org.jboss.jms.wireformat.ConnectionFactoryCreateConnectionDelegateRequest;
import org.jboss.jms.wireformat.ConnectionFactoryCreateConnectionDelegateResponse;
import org.jboss.jms.wireformat.ConnectionFactoryGetClientAOPStackRequest;
import org.jboss.jms.wireformat.ConnectionFactoryGetClientAOPStackResponse;
-import org.jboss.jms.wireformat.ConnectionGetIDBlockRequest;
-import org.jboss.jms.wireformat.ConnectionGetIDBlockResponse;
import org.jboss.jms.wireformat.ConnectionGetClientIDRequest;
import org.jboss.jms.wireformat.ConnectionGetClientIDResponse;
+import org.jboss.jms.wireformat.ConnectionGetIDBlockRequest;
+import org.jboss.jms.wireformat.ConnectionGetIDBlockResponse;
import org.jboss.jms.wireformat.ConnectionGetPreparedTransactionsRequest;
import org.jboss.jms.wireformat.ConnectionGetPreparedTransactionsResponse;
import org.jboss.jms.wireformat.ConnectionSendTransactionRequest;
import org.jboss.jms.wireformat.ConnectionSetClientIDRequest;
import org.jboss.jms.wireformat.ConnectionStartRequest;
import org.jboss.jms.wireformat.ConnectionStopRequest;
-import org.jboss.jms.wireformat.ConsumerCancelInflightMessagesRequest;
import org.jboss.jms.wireformat.ConsumerChangeRateRequest;
import org.jboss.jms.wireformat.NullResponse;
import org.jboss.jms.wireformat.PacketSupport;
@@ -275,11 +275,6 @@
wf.testConsumerChangeRateRequest();
}
- public void testConsumerCancelInflightMessagesRequest() throws Exception
- {
- wf.testConsumerCancelInflightMessagesRequest();
- }
-
// Browser
public void testBrowserNextMessageRequest() throws Exception
@@ -392,7 +387,12 @@
wf.testNullResponse();
}
+ public void testClosingResponse() throws Exception
+ {
+ wf.testClosingResponse();
+ }
+
//We just check the first byte to make sure serialization is not be used.
private class TestWireFormat extends JMSWireFormat
@@ -665,14 +665,6 @@
testPacket(req, PacketSupport.REQ_CONSUMER_CHANGERATE);
}
- public void testConsumerCancelInflightMessagesRequest() throws Exception
- {
- RequestSupport req =
- new ConsumerCancelInflightMessagesRequest(23, (byte)77, 123);
-
- testPacket(req, PacketSupport.REQ_CONSUMER_CANCELINFLIGHTMESSAGES);
- }
-
// Browser
public void testBrowserNextMessageRequest() throws Exception
@@ -859,6 +851,13 @@
testPacket(resp, PacketSupport.NULL_RESPONSE);
}
+ public void testClosingResponse() throws Exception
+ {
+ ResponseSupport resp = new ClosingResponse(23);
+
+ testPacket(resp, PacketSupport.RESP_CLOSING);
+ }
+
}
public static class SerializableObject implements Serializable
Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -319,9 +319,9 @@
closed = true;
}
- public void closing() throws JMSException
+ public long closing() throws JMSException
{
-
+ return -1;
}
public IDBlock getIdBlock(int size) throws JMSException
Modified: trunk/tests/src/org/jboss/test/thirdparty/remoting/RemotingConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/thirdparty/remoting/RemotingConnectionFailureTest.java 2007-02-28 23:40:00 UTC (rev 2506)
+++ trunk/tests/src/org/jboss/test/thirdparty/remoting/RemotingConnectionFailureTest.java 2007-02-28 23:46:51 UTC (rev 2507)
@@ -243,7 +243,7 @@
client.setDisconnectTimeout(0);
client.disconnect();
-
+
// the client should be "dead", in that both the connection validator and the lease pinger
// are silenced
More information about the jboss-cvs-commits
mailing list