[jboss-cvs] JBoss Messaging SVN: r2485 - in trunk: src/main/org/jboss/jms/client/container and 8 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Feb 27 21:30:34 EST 2007
Author: ovidiu.feodorov at jboss.com
Date: 2007-02-27 21:30:33 -0500 (Tue, 27 Feb 2007)
New Revision: 2485
Modified:
trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java
trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryInternalEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryCreateConnectionDelegateRequest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
Log:
minor
Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -14,7 +14,6 @@
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
-import org.jboss.jms.message.MessageIdGenerator;
import org.jboss.jms.server.endpoint.CreateConnectionResult;
import org.jboss.logging.Logger;
Modified: trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -99,8 +99,8 @@
while (attemptCount < MAX_RECONNECT_HOP_COUNT)
{
- // since an exceptiong might be captured during an attempt,
- // this has to be the first operation
+ // since an exceptiong might be captured during an attempt, this has to be the first
+ // operation
attemptCount++;
try
{
@@ -205,7 +205,7 @@
{
delegate = null;
log.warn("Exception captured on createConnection... hopping to a new connection factory", e);
- //Currently hardcoded
+ // Currently hardcoded
Thread.sleep(2000);
}
}
Modified: trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -118,21 +118,26 @@
// Set retry flag as true on send() and sendTransaction()
// more details at http://jira.jboss.org/jira/browse/JBMESSAGING-809
+
if (invocation.getTargetObject() instanceof ClientSessionDelegate &&
(methodName.equals("send") || methodName.equals("sendTransaction")))
{
log.debug(this + " caught " + methodName + "() invocation, enabling check for duplicates");
+
Object[] arguments = ((MethodInvocation)invocation).getArguments();
arguments[1] = Boolean.TRUE;
((MethodInvocation)invocation).setArguments(arguments);
}
- //We don't retry the following invocations:
- //cancelDelivery, cancelDeliveries, cancelInflightMessages - the deliveries will already be cancelled after failover
- if (methodName.equals("cancelDelivery") || methodName.equals("cancelDeliveries")
- || methodName.equals("cancelInflightMessages"))
+ // We don't retry the following invocations:
+ // cancelDelivery(), cancelDeliveries(), cancelInflightMessages() - the deliveries will
+ // already be cancelled after failover.
+
+ if (methodName.equals("cancelDelivery") ||
+ methodName.equals("cancelDeliveries") ||
+ methodName.equals("cancelInflightMessages"))
{
- log.debug(this + " NOT resuming " + methodName + "()");
+ log.debug(this + " NOT resuming " + methodName + "(), let it wither and die");
return null;
}
Modified: trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -167,7 +167,8 @@
// Generate the message id
ConnectionState connectionState = (ConnectionState)sessionState.getParent();
- long id = connectionState.getIdGenerator().getId((ConnectionDelegate)connectionState.getDelegate());
+ long id =
+ connectionState.getIdGenerator().getId((ConnectionDelegate)connectionState.getDelegate());
JBossMessage messageToSend;
boolean foreign = false;
Modified: trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -39,7 +39,6 @@
import org.jboss.jms.client.state.HierarchicalState;
import org.jboss.jms.client.state.ProducerState;
import org.jboss.jms.client.state.SessionState;
-import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.delegate.ProducerDelegate;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.message.MessageIdGenerator;
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -103,7 +103,8 @@
}
}
- throw new MessagingNetworkFailureException("Failed to download and/or install client side AOP stack");
+ throw new MessagingNetworkFailureException(
+ "Failed to download and/or install client side AOP stack");
}
/**
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -45,8 +45,7 @@
/**
* The client-side ConnectionFactory delegate class.
- *
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
*
Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -203,15 +203,17 @@
public JMSException handleThrowable(Throwable t)
{
- // ConnectionFailedException could happen during ConnectionFactory.createConnection
+ // ConnectionFailedException could happen during ConnectionFactory.createConnection.
// IOException could happen during an interrupted exception.
- // CannotConnectionException could happen during a communication error between a
- // connected remoting client and the server (what means.. any new invocation)
+ // CannotConnectionException could happen during a communication error between a connected
+ // remoting client and the server (what means any new invocation).
+
if (t instanceof JMSException)
{
return (JMSException)t;
}
- else if ((t instanceof CannotConnectException) || (t instanceof IOException) ||
+ else if ((t instanceof CannotConnectException) ||
+ (t instanceof IOException) ||
(t instanceof ConnectionFailedException))
{
log.warn("Captured Exception:" + t, t);
Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -37,9 +37,6 @@
import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
/**
- *
- * A CallbackManager.
- *
* The CallbackManager is an InvocationHandler used for handling callbacks to message consumers.
* The callback is received and dispatched off to the relevant consumer.
*
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -42,7 +42,6 @@
import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
-import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
/**
Modified: trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConsumerState.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/state/ConsumerState.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -23,8 +23,6 @@
import java.util.Collections;
-import javax.jms.Destination;
-
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.remoting.CallbackManager;
@@ -32,7 +30,6 @@
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.server.Version;
-import org.jboss.jms.util.MessageQueueNameHelper;
/**
* State corresponding to a Consumer. This state is acessible inside aspects/interceptors.
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -199,7 +199,8 @@
sessionID = newState.sessionID;
// We need to clear anything waiting in the session executor - since there may be messages
- // from before failover waiting in there and we don't want them to get delivered after failover
+ // from before failover waiting in there and we don't want them to get delivered after
+ // failover.
executor.shutdownAfterProcessingCurrentTask();
createExecutor();
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -123,10 +123,10 @@
private Object failoverStatusLock;
- //Default is 1 minute
+ // Default is 1 minute
private long failoverStartTimeout = 60 * 1000;
- //Default is 5 minutes
+ // Default is 5 minutes
private long failoverCompleteTimeout = 5 * 60 * 1000;
private Map sessions;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -31,10 +31,9 @@
/**
+ * Represents the set of methods from the ConnectionDelegate that are handled on the server. The
+ * rest of the methods are handled in the advice stack.
*
- * Represents the set of methods from the ConnectionDelegate that are handled on the server.
- * The rest of the methods are handled in the advice stack.
- *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
*
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryInternalEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryInternalEndpoint.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryInternalEndpoint.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -26,8 +26,6 @@
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
/**
- * A ConnectionFactoryInternalEndpoint
- *
* The interface only exists so the connection factory requests can call through the AOP stack
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -37,7 +37,6 @@
import org.jboss.jms.wireformat.ConnectionFactoryUpdate;
import org.jboss.jms.wireformat.Dispatcher;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.plugin.IDBlock;
import org.jboss.remoting.callback.Callback;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
import org.jboss.security.SecurityAssociation;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -53,9 +53,8 @@
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
/**
- * Concrete implementation of ConsumerEndpoint. Lives on the boundary between
- * Messaging Core and the JMS Facade. Handles delivery of messages from the
- * server to the client side consumer.
+ * Concrete implementation of ConsumerEndpoint. Lives on the boundary between Messaging Core and the
+ * JMS Facade. Handles delivery of messages from the server to the client side consumer.
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -64,17 +63,14 @@
*/
public class ServerConsumerEndpoint implements Receiver, ConsumerEndpoint
{
- // Constants
- // ------------------------------------------------------------------------------------
+ // Constants ------------------------------------------------------------------------------------
private static final Logger log = Logger
.getLogger(ServerConsumerEndpoint.class);
- // Static
- // ---------------------------------------------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
- // Attributes
- // -----------------------------------------------------------------------------------
+ // Attributes -----------------------------------------------------------------------------------
private boolean trace = log.isTraceEnabled();
@@ -110,8 +106,7 @@
private boolean storeDeliveries;
- // Constructors
- // ---------------------------------------------------------------------------------
+ // Constructors ---------------------------------------------------------------------------------
ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
ServerSessionEndpoint sessionEndpoint, String selector,
@@ -152,8 +147,7 @@
if (dest.isTopic() && !messageQueue.isRecoverable())
{
- // This is a consumer of a non durable topic subscription. We don't
- // need to store
+ // This is a consumer of a non durable topic subscription. We don't need to store
// deliveries since if the consumer is closed or dies the refs go too.
this.storeDeliveries = false;
} else
@@ -165,11 +159,10 @@
if (selector != null)
{
- if (trace)
- log.trace("creating selector:" + selector);
+ if (trace) { log.trace("creating selector:" + selector); }
+
this.messageSelector = new Selector(selector);
- if (trace)
- log.trace("created selector");
+ if (trace) { log.trace("created selector"); }
}
this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
@@ -177,14 +170,12 @@
// adding the consumer to the queue
this.messageQueue.add(this);
- // We don't need to prompt delivery - this will come from the client in a
- // changeRate request
+ // We don't need to prompt delivery - this will come from the client in a changeRate request
log.debug(this + " constructed");
}
- // Receiver implementation
- // ----------------------------------------------------------------------
+ // Receiver implementation ----------------------------------------------------------------------
/*
* The queue ensures that handle is never called concurrently by more than
@@ -201,10 +192,7 @@
// This is ok to have outside lock - is volatile
if (!clientAccepting)
{
- if (trace)
- {
- log.trace(this + "'s client is NOT accepting messages!");
- }
+ if (trace) { log.trace(this + "'s client is NOT accepting messages!"); }
return null;
}
@@ -226,25 +214,16 @@
synchronized (startStopLock)
{
- // If the consumer is stopped then we don't accept the message, it
- // should go back into the
+ // If the consumer is stopped then we don't accept the message, it should go back into the
// queue for delivery later.
if (!started)
{
- if (trace)
- {
- log.trace(this + " NOT started yet!");
- }
+ if (trace) { log.trace(this + " NOT started yet!"); }
return null;
}
- if (trace)
- {
- log
- .trace(this
- + " has startStopLock lock, preparing the message for delivery");
- }
+ if (trace) { log.trace(this + " has startStopLock lock, preparing the message for delivery"); }
Message message = ref.getMessage();
@@ -269,10 +248,8 @@
deliveryId = -1;
}
- // We send the message to the client on the current thread. The message
- // is written onto the
- // transport and then the thread returns immediately without waiting
- // for a response.
+ // We send the message to the client on the current thread. The message is written onto the
+ // transport and then the thread returns immediately without waiting for a response.
Client callbackClient = callbackHandler.getCallbackClient();
@@ -282,18 +259,12 @@
try
{
- // FIXME - due a design (flaw??) in the socket based transports,
- // they use a pool of TCP
- // connections, so subsequent invocations can end up using different
- // underlying
- // connections meaning that later invocations can overtake earlier
- // invocations, if there
- // are more than one user concurrently invoking on the same
- // transport. We need someway
- // of pinning the client object to the underlying invocation. For
- // now we just serialize
- // all access so that only the first connection in the pool is ever
- // used - bit this is
+ // FIXME - due a design (flaw??) in the socket based transports, they use a pool of TCP
+ // connections, so subsequent invocations can end up using different underlying
+ // connections meaning that later invocations can overtake earlier invocations, if there
+ // are more than one user concurrently invoking on the same transport. We need someway
+ // of pinning the client object to the underlying invocation. For now we just serialize
+ // all access so that only the first connection in the pool is ever used - bit this is
// far from ideal!!!
// See http://jira.jboss.com/jira/browse/JBMESSAGING-789
@@ -304,10 +275,8 @@
invoker = callbackClient.getInvoker();
} else
{
- // TODO: dummy synchronization object, in case there's no
- // clientInvoker. This will
- // happen during the first invocation anyway. It's a kludge, I
- // know, but this whole
+ // TODO: dummy synchronization object, in case there's no clientInvoker. This will
+ // happen during the first invocation anyway. It's a kludge, I know, but this whole
// synchronization thing is a huge kludge. Needs to be reviewed.
invoker = new Object();
}
@@ -315,30 +284,18 @@
synchronized (invoker)
{
// one way invocation, no acknowledgment sent back by the client
- if (trace)
- {
- log
- .trace(this
- + " submitting message "
- + message
- + " to the remoting layer to be sent asynchronously");
- }
- callbackHandler.handleCallbackOneway(callback);
+ if (trace) { log.trace(this + " submitting message " + message + " to the remoting layer to be sent asynchronously"); } callbackHandler.handleCallbackOneway(callback);
}
} catch (HandleCallbackException e)
{
- // it's an oneway callback, so exception could only have happened on
- // the server, while
- // trying to send the callback. This is a good reason to smack the
- // whole connection.
- // I trust remoting to have already done its own cleanup via a
- // CallbackErrorHandler,
+ // it's an oneway callback, so exception could only have happened on the server, while
+ // trying to send the callback. This is a good reason to smack the whole connection.
+ // I trust remoting to have already done its own cleanup via a CallbackErrorHandler,
// I need to do my own cleanup at ConnectionManager level.
log.debug(this + " failed to handle callback", e);
- ServerConnectionEndpoint sce = sessionEndpoint
- .getConnectionEndpoint();
+ ServerConnectionEndpoint sce = sessionEndpoint.getConnectionEndpoint();
ConnectionManager cm = sce.getServerPeer().getConnectionManager();
cm.handleClientFailure(sce.getRemotingClientSessionID(), false);
@@ -352,8 +309,7 @@
}
}
- // Filter implementation
- // ------------------------------------------------------------------------
+ // Filter implementation ------------------------------------------------------------------------
public boolean accept(Message msg)
{
@@ -361,19 +317,13 @@
if (destination.isQueue())
{
- // For subscriptions message selection is handled in the Subscription
- // itself
- // we do not want to do the check twice
+ // For subscriptions message selection is handled in the Subscription itself we do not want
+ // to do the check twice
if (messageSelector != null)
{
accept = messageSelector.accept(msg);
- if (trace)
- {
- log.trace("message selector "
- + (accept ? "accepts " : "DOES NOT accept ")
- + "the message");
- }
+ if (trace) { log.trace("message selector " + (accept ? "accepts " : "DOES NOT accept ") + "the message"); }
}
}
@@ -383,41 +333,27 @@
{
int conId = ((JBossMessage) msg).getConnectionID();
- if (trace)
- {
- log.trace("message connection id: "
- + conId
- + " current connection connection id: "
- + sessionEndpoint.getConnectionEndpoint()
- .getConnectionID());
- }
+ if (trace) { log.trace("message connection id: " + conId + " current connection connection id: " + sessionEndpoint.getConnectionEndpoint().getConnectionID()); }
- accept = conId != sessionEndpoint.getConnectionEndpoint()
- .getConnectionID();
+ accept = conId != sessionEndpoint.getConnectionEndpoint().getConnectionID();
- if (trace)
- {
- log.trace("accepting? " + accept);
- }
+ if (trace) { log.trace("accepting? " + accept); }
}
}
return accept;
}
- // Closeable implementation
- // ---------------------------------------------------------------------
+ // Closeable implementation ---------------------------------------------------------------------
public void closing() throws JMSException
{
try
{
- if (trace)
- {
- log.trace(this + " closing");
- }
+ if (trace) { log.trace(this + " closing");}
stop();
- } catch (Throwable t)
+ }
+ catch (Throwable t)
{
throw ExceptionUtil.handleJMSInvocation(t, this + " closing");
}
@@ -435,14 +371,14 @@
localClose();
sessionEndpoint.removeConsumer(id);
- } catch (Throwable t)
+ }
+ catch (Throwable t)
{
throw ExceptionUtil.handleJMSInvocation(t, this + " close");
}
}
- // ConsumerEndpoint implementation
- // --------------------------------------------------------------
+ // ConsumerEndpoint implementation --------------------------------------------------------------
public void changeRate(float newRate) throws JMSException
{
@@ -454,20 +390,14 @@
try
{
// For now we just support a binary on/off.
- // The client will send newRate = 0, to say it does not want any more
- // messages when its
- // client side buffer gets full or it will send an arbitrary non zero
- // number to say it
- // does want more messages, when its client side buffer empties to half
- // its full size.
- // Note the client does not wait until the client side buffer is empty
- // before sending a
+ // The client will send newRate = 0, to say it does not want any more messages when its
+ // client side buffer gets full or it will send an arbitrary non zero number to say it
+ // does want more messages, when its client side buffer empties to half its full size.
+ // Note the client does not wait until the client side buffer is empty before sending a
// newRate(+ve) message since this would add extra latency.
- // In the future we can fine tune this by allowing the client to
- // specify an actual rate in
- // the newRate value so this is basically a placeholder for the future
- // so we don't have to
+ // In the future we can fine tune this by allowing the client to specify an actual rate in
+ // the newRate value so this is basically a placeholder for the future so we don't have to
// change the wire format when we support it.
// No need to synchronize - clientAccepting is volatile.
@@ -485,40 +415,35 @@
{
promptDelivery();
}
- } catch (Throwable t)
+ }
+ catch (Throwable t)
{
throw ExceptionUtil.handleJMSInvocation(t, this + " changeRate");
}
}
- /*
- * This method is always called between closing() and close() being called
- * Instead of having a new method we could perhaps somehow pass the last
- * delivery id in with closing - then we don't need another message
+ /**
+ * This method is always called between closing() and close() being called. Instead of having a
+ * new method we could perhaps somehow pass the last delivery id in with closing - then we don't
+ * need another message.
*/
public void cancelInflightMessages(long lastDeliveryId) throws JMSException
{
- if (trace)
- {
- log.trace(this + " cancelInflightMessages: " + lastDeliveryId);
- }
+ if (trace) { log.trace(this + " cancelInflightMessages: " + lastDeliveryId); }
try
{
- // Cancel all deliveries made by this consumer with delivery id >
- // lastDeliveryId
+ // Cancel all deliveries made by this consumer with delivery id > lastDeliveryId
- sessionEndpoint.cancelDeliveriesForConsumerAfterDeliveryId(id,
- lastDeliveryId);
- } catch (Throwable t)
+ sessionEndpoint.cancelDeliveriesForConsumerAfterDeliveryId(id, lastDeliveryId);
+ }
+ catch (Throwable t)
{
- throw ExceptionUtil.handleJMSInvocation(t, this
- + " cancelInflightMessages");
+ throw ExceptionUtil.handleJMSInvocation(t, this + " cancelInflightMessages");
}
}
- // Public
- // ---------------------------------------------------------------------------------------
+ // Public ---------------------------------------------------------------------------------------
public String toString()
{
@@ -535,8 +460,7 @@
return sessionEndpoint;
}
- // Package protected
- // ----------------------------------------------------------------------------
+ // Package protected ----------------------------------------------------------------------------
Queue getDLQ()
{
@@ -555,17 +479,13 @@
void localClose() throws Throwable
{
- if (trace)
- {
- log.trace(this + " grabbed the main lock in close() " + this);
- }
+ if (trace) { log.trace(this + " grabbed the main lock in close() " + this); }
messageQueue.remove(this);
Dispatcher.instance.unregisterTarget(id, this);
- // If this is a consumer of a non durable subscription then we want to
- // unbind the
+ // If this is a consumer of a non durable subscription then we want to unbind the
// subscription and delete all its data.
if (destination.isTopic())
@@ -575,11 +495,9 @@
Binding binding = postOffice.getBindingForQueueName(queueName);
- // Note binding can be null since there can many competing subscribers
- // for the subscription -
- // in which case the first will have removed the subscription and
- // subsequently
- // ones won't find it
+ // Note binding can be null since there can many competing subscribers for the
+ // subscription - in which case the first will have removed the subscription and
+ // subsequently ones won't find it
if (binding != null && !binding.getQueue().isRecoverable())
{
@@ -587,14 +505,13 @@
if (!queue.isClustered())
{
postOffice.unbindQueue(queue.getName());
- } else
+ }
+ else
{
- ((ClusteredPostOffice) postOffice).unbindClusteredQueue(queue
- .getName());
+ ((ClusteredPostOffice)postOffice).unbindClusteredQueue(queue.getName());
}
- String counterName = TopicService.SUBSCRIPTION_MESSAGECOUNTER_PREFIX
- + queueName;
+ String counterName = TopicService.SUBSCRIPTION_MESSAGECOUNTER_PREFIX + queueName;
MessageCounter counter = sessionEndpoint.getConnectionEndpoint()
.getServerPeer().getMessageCounterManager()
@@ -602,8 +519,7 @@
if (counter == null)
{
- throw new IllegalStateException("Cannot find counter to remove "
- + counterName);
+ throw new IllegalStateException("Cannot find counter to remove " + counterName);
}
}
}
@@ -637,57 +553,46 @@
started = false;
- // Any message deliveries already transit to the consumer, will just be
- // ignored by the
+ // Any message deliveries already transit to the consumer, will just be ignored by the
// MessageCallbackHandler since it will be closed.
//
// To clarify, the close protocol (from connection) is as follows:
//
- // 1) MessageCallbackHandler::close() - any messages in buffer are
- // cancelled to the server
+ // 1) MessageCallbackHandler::close() - any messages in buffer are cancelled to the server
// session, and any subsequent receive messages will be ignored.
//
- // 2) ServerConsumerEndpoint::closing() causes stop() this flushes any
- // deliveries yet to
+ // 2) ServerConsumerEndpoint::closing() causes stop() this flushes any deliveries yet to
// deliver to the client callback handler.
//
- // 3) MessageCallbackHandler::cancelInflightMessages(long
- // lastDeliveryId) - any deliveries
- // after lastDeliveryId for the consumer will be considered in flight
- // and cancelled.
+ // 3) MessageCallbackHandler::cancelInflightMessages(long lastDeliveryId) - any deliveries
+ // after lastDeliveryId for the consumer will be considered in flight and cancelled.
//
// 4) ServerConsumerEndpoint:close() - endpoint is deregistered.
//
- // 5) Session.close() - acks or cancels any remaining deliveries in the
- // SessionState as
+ // 5) Session.close() - acks or cancels any remaining deliveries in the SessionState as
// appropriate.
//
- // 6) ServerSessionEndpoint::close() - cancels any remaining deliveries
- // and deregisters
+ // 6) ServerSessionEndpoint::close() - cancels any remaining deliveries and deregisters
// session.
//
// 7) Client side session executor is shutdown.
//
// 8) ServerConnectionEndpoint::close() - connection is deregistered.
//
- // 9) Remoting connection listener is removed and remoting connection
- // stopped.
+ // 9) Remoting connection listener is removed and remoting connection stopped.
}
}
- // Protected
- // ------------------------------------------------------------------------------------
+ // Protected ------------------------------------------------------------------------------------
- // Private
- // --------------------------------------------------------------------------------------
+ // Private --------------------------------------------------------------------------------------
private void promptDelivery()
{
sessionEndpoint.promptDelivery(messageQueue);
}
- // Inner classes
- // --------------------------------------------------------------------------------
+ // Inner classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -37,17 +37,18 @@
*
* ConnectionFactoryAdvised.java,v 1.3 2006/03/01 22:56:51 ovidiu Exp
*/
-public class ConnectionFactoryAdvised extends AdvisedSupport implements ConnectionFactoryInternalEndpoint
+public class ConnectionFactoryAdvised extends AdvisedSupport
+ implements ConnectionFactoryInternalEndpoint
{
- // Constants -----------------------------------------------------
+ // Constants ------------------------------------------------------------------------------------
- // Static --------------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
- // Attributes ----------------------------------------------------
+ // Attributes -----------------------------------------------------------------------------------
protected ConnectionFactoryEndpoint endpoint;
- // Constructors --------------------------------------------------
+ // Constructors ---------------------------------------------------------------------------------
public ConnectionFactoryAdvised()
{
@@ -58,7 +59,7 @@
this.endpoint = endpoint;
}
- // ConnectionFactoryEndpoint implementation -----------------------
+ // ConnectionFactoryEndpoint implementation -----------------------------------------------------
public CreateConnectionResult createConnectionDelegate(String username,
String password,
@@ -73,41 +74,43 @@
return endpoint.getClientAOPStack();
}
- // ConnectionFactoryInternalEndpoint implementation -----------------------
- public CreateConnectionResult createConnectionDelegate(String username,
- String password,
- int failedNodeID,
- String remotingSessionID,
- String clientVMID,
- byte versionToUse,
- ServerInvokerCallbackHandler callbackHandler)
+ // ConnectionFactoryInternalEndpoint implementation ---------------------------------------------
+ public CreateConnectionResult
+ createConnectionDelegate(String username,
+ String password,
+ int failedNodeID,
+ String remotingSessionID,
+ String clientVMID,
+ byte versionToUse,
+ ServerInvokerCallbackHandler callbackHandler)
throws JMSException
{
- return ((ServerConnectionFactoryEndpoint)endpoint).createConnectionDelegate(username, password, failedNodeID,
- remotingSessionID, clientVMID,
- versionToUse, callbackHandler);
+ return ((ServerConnectionFactoryEndpoint)endpoint).
+ createConnectionDelegate(username, password, failedNodeID,
+ remotingSessionID, clientVMID,
+ versionToUse, callbackHandler);
}
- // AdvisedSupport override ---------------------------------------
+ // AdvisedSupport override ----------------------------------------------------------------------
public Object getEndpoint()
{
return endpoint;
}
- // Public --------------------------------------------------------
+ // Public ---------------------------------------------------------------------------------------
public String toString()
{
return "ConnectionFactoryAdvised->" + endpoint;
}
- // Package protected ---------------------------------------------
+ // Package protected ----------------------------------------------------------------------------
- // Protected -----------------------------------------------------
+ // Protected ------------------------------------------------------------------------------------
- // Private -------------------------------------------------------
+ // Private --------------------------------------------------------------------------------------
- // Inner classes -------------------------------------------------
+ // Inner classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryCreateConnectionDelegateRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryCreateConnectionDelegateRequest.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryCreateConnectionDelegateRequest.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -25,7 +25,6 @@
import java.io.DataOutputStream;
import org.jboss.jms.server.endpoint.CreateConnectionResult;
-import org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint;
import org.jboss.jms.server.endpoint.advised.ConnectionFactoryAdvised;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -2133,6 +2133,25 @@
//Ok
}
+// session1.close();
+//
+// session2.close();;
+//
+// Session session3 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//
+// MessageConsumer cons3 = session3.createConsumer(queue[0]);
+//
+// TextMessage rm3 = (TextMessage)cons3.receive(2000);
+//
+// assertNotNull(rm3);
+//
+// assertEquals(tm3.getText(), rm3.getText());
+//
+// rm3 = (TextMessage)cons3.receive(2000);
+//
+// assertNull(rm3);
+
+
}
finally
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -52,9 +52,6 @@
import org.jboss.test.messaging.tools.ServerManagement;
/**
- *
- * A HATest
- *
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -15,20 +15,14 @@
import javax.jms.TextMessage;
import org.jboss.jms.client.JBossConnection;
-import org.jboss.jms.client.FailoverEvent;
import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
import org.jboss.test.messaging.tools.ServerManagement;
-
/**
- *
- * A MergeQueueTest
- *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
*
* $Id$
- *
*/
public class MergeQueueTest extends ClusteringTestBase
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java 2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java 2007-02-28 02:30:33 UTC (rev 2485)
@@ -32,9 +32,6 @@
import java.util.HashSet;
/**
- *
- * A XAFailoverTest
- *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
*
More information about the jboss-cvs-commits
mailing list