[jboss-cvs] JBoss Messaging SVN: r2485 - in trunk: src/main/org/jboss/jms/client/container and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Feb 27 21:30:34 EST 2007


Author: ovidiu.feodorov at jboss.com
Date: 2007-02-27 21:30:33 -0500 (Tue, 27 Feb 2007)
New Revision: 2485

Modified:
   trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
   trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java
   trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
   trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
   trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
   trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
   trunk/src/main/org/jboss/jms/client/state/SessionState.java
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryInternalEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
   trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryCreateConnectionDelegateRequest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
Log:
minor

Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -14,7 +14,6 @@
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.delegate.ConnectionFactoryDelegate;
-import org.jboss.jms.message.MessageIdGenerator;
 import org.jboss.jms.server.endpoint.CreateConnectionResult;
 import org.jboss.logging.Logger;
 

Modified: trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -99,8 +99,8 @@
 
       while (attemptCount < MAX_RECONNECT_HOP_COUNT)
       {
-         // since an exceptiong might be captured during an attempt,
-         // this has to be the first operation
+         // since an exceptiong might be captured during an attempt, this has to be the first
+         // operation
          attemptCount++;
          try
          {
@@ -205,7 +205,7 @@
          {
             delegate = null;
             log.warn("Exception captured on createConnection... hopping to a new connection factory", e);
-            //Currently hardcoded
+            // Currently hardcoded
             Thread.sleep(2000);
          }
       }

Modified: trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -118,21 +118,26 @@
          
          // Set retry flag as true on send() and sendTransaction()
          // more details at http://jira.jboss.org/jira/browse/JBMESSAGING-809
+
          if (invocation.getTargetObject() instanceof ClientSessionDelegate &&
             (methodName.equals("send") || methodName.equals("sendTransaction")))
          {
             log.debug(this + " caught " + methodName + "() invocation, enabling check for duplicates");
+
             Object[] arguments = ((MethodInvocation)invocation).getArguments();
             arguments[1] = Boolean.TRUE;
             ((MethodInvocation)invocation).setArguments(arguments);
          }
 
-         //We don't retry the following invocations:
-         //cancelDelivery, cancelDeliveries, cancelInflightMessages - the deliveries will already be cancelled after failover
-         if (methodName.equals("cancelDelivery") || methodName.equals("cancelDeliveries")
-                  || methodName.equals("cancelInflightMessages"))
+         // We don't retry the following invocations:
+         // cancelDelivery(), cancelDeliveries(), cancelInflightMessages() - the deliveries will
+         // already be cancelled after failover.
+
+         if (methodName.equals("cancelDelivery") ||
+            methodName.equals("cancelDeliveries") ||
+            methodName.equals("cancelInflightMessages"))
          {
-            log.debug(this + " NOT resuming " + methodName + "()");
+            log.debug(this + " NOT resuming " + methodName + "(), let it wither and die");
             
             return null;
          }

Modified: trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -167,7 +167,8 @@
       // Generate the message id
       ConnectionState connectionState = (ConnectionState)sessionState.getParent();
       
-      long id = connectionState.getIdGenerator().getId((ConnectionDelegate)connectionState.getDelegate());
+      long id =
+         connectionState.getIdGenerator().getId((ConnectionDelegate)connectionState.getDelegate());
     
       JBossMessage messageToSend;
       boolean foreign = false;

Modified: trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -39,7 +39,6 @@
 import org.jboss.jms.client.state.HierarchicalState;
 import org.jboss.jms.client.state.ProducerState;
 import org.jboss.jms.client.state.SessionState;
-import org.jboss.jms.delegate.ConnectionFactoryDelegate;
 import org.jboss.jms.delegate.ProducerDelegate;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.message.MessageIdGenerator;

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -103,7 +103,8 @@
          }
       }
 
-      throw new MessagingNetworkFailureException("Failed to download and/or install client side AOP stack");
+      throw new MessagingNetworkFailureException(
+         "Failed to download and/or install client side AOP stack");
    }
 
    /**

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -45,8 +45,7 @@
 
 /**
  * The client-side ConnectionFactory delegate class.
- * 
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  *

Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -203,15 +203,17 @@
    
    public JMSException handleThrowable(Throwable t)
    {
-      // ConnectionFailedException could happen during ConnectionFactory.createConnection
+      // ConnectionFailedException could happen during ConnectionFactory.createConnection.
       // IOException could happen during an interrupted exception.
-      // CannotConnectionException could happen during a communication error between a
-      //    connected remoting client and the server (what means.. any new invocation)
+      // CannotConnectionException could happen during a communication error between a connected
+      // remoting client and the server (what means any new invocation).
+
       if (t instanceof JMSException)
       {
          return (JMSException)t;
       }
-      else if ((t instanceof CannotConnectException) || (t instanceof IOException) ||
+      else if ((t instanceof CannotConnectException) ||
+         (t instanceof IOException) ||
          (t instanceof ConnectionFailedException))
       {
          log.warn("Captured Exception:" + t, t);

Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -37,9 +37,6 @@
 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
 
 /**
- * 
- * A CallbackManager.
- * 
  * The CallbackManager is an InvocationHandler used for handling callbacks to message consumers.
  * The callback is received and dispatched off to the relevant consumer.
  * 

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -42,7 +42,6 @@
 import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
 import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
 
-import EDU.oswego.cs.dl.util.concurrent.Executor;
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
 /**

Modified: trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConsumerState.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/state/ConsumerState.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -23,8 +23,6 @@
 
 import java.util.Collections;
 
-import javax.jms.Destination;
-
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.remoting.CallbackManager;
@@ -32,7 +30,6 @@
 import org.jboss.jms.delegate.ConsumerDelegate;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.server.Version;
-import org.jboss.jms.util.MessageQueueNameHelper;
 
 /**
  * State corresponding to a Consumer. This state is acessible inside aspects/interceptors.

Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -199,7 +199,8 @@
       sessionID = newState.sessionID;
       
       // We need to clear anything waiting in the session executor - since there may be messages
-      // from before failover waiting in there and we don't want them to get delivered after failover
+      // from before failover waiting in there and we don't want them to get delivered after
+      // failover.
       executor.shutdownAfterProcessingCurrentTask();
       
       createExecutor();

Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -123,10 +123,10 @@
 
    private Object failoverStatusLock;
    
-   //Default is 1 minute
+   // Default is 1 minute
    private long failoverStartTimeout = 60 * 1000;
    
-   //Default is 5 minutes
+   // Default is 5 minutes
    private long failoverCompleteTimeout = 5 * 60 * 1000;
    
    private Map sessions;

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -31,10 +31,9 @@
 
 
 /**
+ * Represents the set of methods from the ConnectionDelegate that are handled on the server. The
+ * rest of the methods are handled in the advice stack.
  * 
- * Represents the set of methods from the ConnectionDelegate that are handled on the server.
- * The rest of the methods are handled in the advice stack.
- * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision$</tt>
  *

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryInternalEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryInternalEndpoint.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryInternalEndpoint.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -26,8 +26,6 @@
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
 /**
- * A ConnectionFactoryInternalEndpoint
- * 
  * The interface only exists so the connection factory requests can call through the AOP stack
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -37,7 +37,6 @@
 import org.jboss.jms.wireformat.ConnectionFactoryUpdate;
 import org.jboss.jms.wireformat.Dispatcher;
 import org.jboss.logging.Logger;
-import org.jboss.messaging.core.plugin.IDBlock;
 import org.jboss.remoting.callback.Callback;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 import org.jboss.security.SecurityAssociation;

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -53,9 +53,8 @@
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
 /**
- * Concrete implementation of ConsumerEndpoint. Lives on the boundary between
- * Messaging Core and the JMS Facade. Handles delivery of messages from the
- * server to the client side consumer.
+ * Concrete implementation of ConsumerEndpoint. Lives on the boundary between Messaging Core and the
+ * JMS Facade. Handles delivery of messages from the server to the client side consumer.
  * 
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -64,17 +63,14 @@
  */
 public class ServerConsumerEndpoint implements Receiver, ConsumerEndpoint
 {
-   // Constants
-   // ------------------------------------------------------------------------------------
+   // Constants ------------------------------------------------------------------------------------
 
    private static final Logger log = Logger
             .getLogger(ServerConsumerEndpoint.class);
 
-   // Static
-   // ---------------------------------------------------------------------------------------
+   // Static ---------------------------------------------------------------------------------------
 
-   // Attributes
-   // -----------------------------------------------------------------------------------
+   // Attributes -----------------------------------------------------------------------------------
 
    private boolean trace = log.isTraceEnabled();
 
@@ -110,8 +106,7 @@
 
    private boolean storeDeliveries;
    
-   // Constructors
-   // ---------------------------------------------------------------------------------
+   // Constructors ---------------------------------------------------------------------------------
 
    ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
             ServerSessionEndpoint sessionEndpoint, String selector,
@@ -152,8 +147,7 @@
 
       if (dest.isTopic() && !messageQueue.isRecoverable())
       {
-         // This is a consumer of a non durable topic subscription. We don't
-         // need to store
+         // This is a consumer of a non durable topic subscription. We don't need to store
          // deliveries since if the consumer is closed or dies the refs go too.
          this.storeDeliveries = false;
       } else
@@ -165,11 +159,10 @@
 
       if (selector != null)
       {
-         if (trace)
-            log.trace("creating selector:" + selector);
+         if (trace) { log.trace("creating selector:" + selector); }
+
          this.messageSelector = new Selector(selector);
-         if (trace)
-            log.trace("created selector");
+         if (trace) { log.trace("created selector"); }
       }
 
       this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
@@ -177,14 +170,12 @@
       // adding the consumer to the queue
       this.messageQueue.add(this);
 
-      // We don't need to prompt delivery - this will come from the client in a
-      // changeRate request
+      // We don't need to prompt delivery - this will come from the client in a changeRate request
 
       log.debug(this + " constructed");
    }
 
-   // Receiver implementation
-   // ----------------------------------------------------------------------
+   // Receiver implementation ----------------------------------------------------------------------
 
    /*
     * The queue ensures that handle is never called concurrently by more than
@@ -201,10 +192,7 @@
       // This is ok to have outside lock - is volatile
       if (!clientAccepting)
       {
-         if (trace)
-         {
-            log.trace(this + "'s client is NOT accepting messages!");
-         }
+         if (trace) { log.trace(this + "'s client is NOT accepting messages!"); }
 
          return null;
       }
@@ -226,25 +214,16 @@
 
       synchronized (startStopLock)
       {
-         // If the consumer is stopped then we don't accept the message, it
-         // should go back into the
+         // If the consumer is stopped then we don't accept the message, it should go back into the
          // queue for delivery later.
          if (!started)
          {
-            if (trace)
-            {
-               log.trace(this + " NOT started yet!");
-            }
+            if (trace) { log.trace(this + " NOT started yet!"); }
 
             return null;
          }
 
-         if (trace)
-         {
-            log
-                     .trace(this
-                              + " has startStopLock lock, preparing the message for delivery");
-         }
+         if (trace) { log.trace(this + " has startStopLock lock, preparing the message for delivery"); }
 
          Message message = ref.getMessage();
 
@@ -269,10 +248,8 @@
             deliveryId = -1;
          }
 
-         // We send the message to the client on the current thread. The message
-         // is written onto the
-         // transport and then the thread returns immediately without waiting
-         // for a response.
+         // We send the message to the client on the current thread. The message is written onto the
+         // transport and then the thread returns immediately without waiting for a response.
 
          Client callbackClient = callbackHandler.getCallbackClient();
 
@@ -282,18 +259,12 @@
 
          try
          {
-            // FIXME - due a design (flaw??) in the socket based transports,
-            // they use a pool of TCP
-            // connections, so subsequent invocations can end up using different
-            // underlying
-            // connections meaning that later invocations can overtake earlier
-            // invocations, if there
-            // are more than one user concurrently invoking on the same
-            // transport. We need someway
-            // of pinning the client object to the underlying invocation. For
-            // now we just serialize
-            // all access so that only the first connection in the pool is ever
-            // used - bit this is
+            // FIXME - due a design (flaw??) in the socket based transports, they use a pool of TCP
+            // connections, so subsequent invocations can end up using different underlying
+            // connections meaning that later invocations can overtake earlier invocations, if there
+            // are more than one user concurrently invoking on the same transport. We need someway
+            // of pinning the client object to the underlying invocation. For now we just serialize
+            // all access so that only the first connection in the pool is ever used - bit this is
             // far from ideal!!!
             // See http://jira.jboss.com/jira/browse/JBMESSAGING-789
 
@@ -304,10 +275,8 @@
                invoker = callbackClient.getInvoker();
             } else
             {
-               // TODO: dummy synchronization object, in case there's no
-               // clientInvoker. This will
-               // happen during the first invocation anyway. It's a kludge, I
-               // know, but this whole
+               // TODO: dummy synchronization object, in case there's no clientInvoker. This will
+               // happen during the first invocation anyway. It's a kludge, I know, but this whole
                // synchronization thing is a huge kludge. Needs to be reviewed.
                invoker = new Object();
             }
@@ -315,30 +284,18 @@
             synchronized (invoker)
             {
                // one way invocation, no acknowledgment sent back by the client
-               if (trace)
-               {
-                  log
-                           .trace(this
-                                    + " submitting message "
-                                    + message
-                                    + " to the remoting layer to be sent asynchronously");
-               }
-               callbackHandler.handleCallbackOneway(callback);
+               if (trace) { log.trace(this + " submitting message " + message + " to the remoting layer to be sent asynchronously"); }               callbackHandler.handleCallbackOneway(callback);
             }
          } catch (HandleCallbackException e)
          {
-            // it's an oneway callback, so exception could only have happened on
-            // the server, while
-            // trying to send the callback. This is a good reason to smack the
-            // whole connection.
-            // I trust remoting to have already done its own cleanup via a
-            // CallbackErrorHandler,
+            // it's an oneway callback, so exception could only have happened on the server, while
+            // trying to send the callback. This is a good reason to smack the whole connection.
+            // I trust remoting to have already done its own cleanup via a CallbackErrorHandler,
             // I need to do my own cleanup at ConnectionManager level.
 
             log.debug(this + " failed to handle callback", e);
 
-            ServerConnectionEndpoint sce = sessionEndpoint
-                     .getConnectionEndpoint();
+            ServerConnectionEndpoint sce = sessionEndpoint.getConnectionEndpoint();
             ConnectionManager cm = sce.getServerPeer().getConnectionManager();
 
             cm.handleClientFailure(sce.getRemotingClientSessionID(), false);
@@ -352,8 +309,7 @@
       }
    }
 
-   // Filter implementation
-   // ------------------------------------------------------------------------
+   // Filter implementation ------------------------------------------------------------------------
 
    public boolean accept(Message msg)
    {
@@ -361,19 +317,13 @@
 
       if (destination.isQueue())
       {
-         // For subscriptions message selection is handled in the Subscription
-         // itself
-         // we do not want to do the check twice
+         // For subscriptions message selection is handled in the Subscription itself we do not want
+         // to do the check twice
          if (messageSelector != null)
          {
             accept = messageSelector.accept(msg);
 
-            if (trace)
-            {
-               log.trace("message selector "
-                        + (accept ? "accepts " : "DOES NOT accept ")
-                        + "the message");
-            }
+            if (trace) { log.trace("message selector " + (accept ? "accepts " : "DOES NOT accept ") + "the message"); }
          }
       }
 
@@ -383,41 +333,27 @@
          {
             int conId = ((JBossMessage) msg).getConnectionID();
 
-            if (trace)
-            {
-               log.trace("message connection id: "
-                        + conId
-                        + " current connection connection id: "
-                        + sessionEndpoint.getConnectionEndpoint()
-                                 .getConnectionID());
-            }
+            if (trace) { log.trace("message connection id: " + conId + " current connection connection id: " + sessionEndpoint.getConnectionEndpoint().getConnectionID()); }
 
-            accept = conId != sessionEndpoint.getConnectionEndpoint()
-                     .getConnectionID();
+            accept = conId != sessionEndpoint.getConnectionEndpoint().getConnectionID();
 
-            if (trace)
-            {
-               log.trace("accepting? " + accept);
-            }
+            if (trace) { log.trace("accepting? " + accept); }
          }
       }
       return accept;
    }
 
-   // Closeable implementation
-   // ---------------------------------------------------------------------
+   // Closeable implementation ---------------------------------------------------------------------
 
    public void closing() throws JMSException
    {
       try
       {
-         if (trace)
-         {
-            log.trace(this + " closing");
-         }
+         if (trace) { log.trace(this + " closing");}
 
          stop();
-      } catch (Throwable t)
+      }
+      catch (Throwable t)
       {
          throw ExceptionUtil.handleJMSInvocation(t, this + " closing");
       }
@@ -435,14 +371,14 @@
          localClose();
 
          sessionEndpoint.removeConsumer(id);
-      } catch (Throwable t)
+      }
+      catch (Throwable t)
       {
          throw ExceptionUtil.handleJMSInvocation(t, this + " close");
       }
    }
 
-   // ConsumerEndpoint implementation
-   // --------------------------------------------------------------
+   // ConsumerEndpoint implementation --------------------------------------------------------------
 
    public void changeRate(float newRate) throws JMSException
    {
@@ -454,20 +390,14 @@
       try
       {
          // For now we just support a binary on/off.
-         // The client will send newRate = 0, to say it does not want any more
-         // messages when its
-         // client side buffer gets full or it will send an arbitrary non zero
-         // number to say it
-         // does want more messages, when its client side buffer empties to half
-         // its full size.
-         // Note the client does not wait until the client side buffer is empty
-         // before sending a
+         // The client will send newRate = 0, to say it does not want any more messages when its
+         // client side buffer gets full or it will send an arbitrary non zero number to say it
+         // does want more messages, when its client side buffer empties to half its full size.
+         // Note the client does not wait until the client side buffer is empty before sending a
          // newRate(+ve) message since this would add extra latency.
 
-         // In the future we can fine tune this by allowing the client to
-         // specify an actual rate in
-         // the newRate value so this is basically a placeholder for the future
-         // so we don't have to
+         // In the future we can fine tune this by allowing the client to specify an actual rate in
+         // the newRate value so this is basically a placeholder for the future so we don't have to
          // change the wire format when we support it.
 
          // No need to synchronize - clientAccepting is volatile.
@@ -485,40 +415,35 @@
          {
             promptDelivery();
          }
-      } catch (Throwable t)
+      }
+      catch (Throwable t)
       {
          throw ExceptionUtil.handleJMSInvocation(t, this + " changeRate");
       }
    }
 
-   /*
-    * This method is always called between closing() and close() being called
-    * Instead of having a new method we could perhaps somehow pass the last
-    * delivery id in with closing - then we don't need another message
+   /**
+    * This method is always called between closing() and close() being called. Instead of having a
+    * new method we could perhaps somehow pass the last delivery id in with closing - then we don't
+    * need another message.
     */
    public void cancelInflightMessages(long lastDeliveryId) throws JMSException
    {
-      if (trace)
-      {
-         log.trace(this + " cancelInflightMessages: " + lastDeliveryId);
-      }
+      if (trace) { log.trace(this + " cancelInflightMessages: " + lastDeliveryId); }
 
       try
       {
-         // Cancel all deliveries made by this consumer with delivery id >
-         // lastDeliveryId
+         // Cancel all deliveries made by this consumer with delivery id > lastDeliveryId
 
-         sessionEndpoint.cancelDeliveriesForConsumerAfterDeliveryId(id,
-                  lastDeliveryId);
-      } catch (Throwable t)
+         sessionEndpoint.cancelDeliveriesForConsumerAfterDeliveryId(id, lastDeliveryId);
+      }
+      catch (Throwable t)
       {
-         throw ExceptionUtil.handleJMSInvocation(t, this
-                  + " cancelInflightMessages");
+         throw ExceptionUtil.handleJMSInvocation(t, this + " cancelInflightMessages");
       }
    }
 
-   // Public
-   // ---------------------------------------------------------------------------------------
+   // Public ---------------------------------------------------------------------------------------
 
    public String toString()
    {
@@ -535,8 +460,7 @@
       return sessionEndpoint;
    }
 
-   // Package protected
-   // ----------------------------------------------------------------------------
+   // Package protected ----------------------------------------------------------------------------
 
    Queue getDLQ()
    {
@@ -555,17 +479,13 @@
 
    void localClose() throws Throwable
    {
-      if (trace)
-      {
-         log.trace(this + " grabbed the main lock in close() " + this);
-      }
+      if (trace) { log.trace(this + " grabbed the main lock in close() " + this); }
 
       messageQueue.remove(this);
 
       Dispatcher.instance.unregisterTarget(id, this);
 
-      // If this is a consumer of a non durable subscription then we want to
-      // unbind the
+      // If this is a consumer of a non durable subscription then we want to unbind the
       // subscription and delete all its data.
 
       if (destination.isTopic())
@@ -575,11 +495,9 @@
 
          Binding binding = postOffice.getBindingForQueueName(queueName);
 
-         // Note binding can be null since there can many competing subscribers
-         // for the subscription -
-         // in which case the first will have removed the subscription and
-         // subsequently
-         // ones won't find it
+         // Note binding can be null since there can many competing subscribers for the
+         // subscription - in which case the first will have removed the subscription and
+         // subsequently ones won't find it
 
          if (binding != null && !binding.getQueue().isRecoverable())
          {
@@ -587,14 +505,13 @@
             if (!queue.isClustered())
             {
                postOffice.unbindQueue(queue.getName());
-            } else
+            }
+            else
             {
-               ((ClusteredPostOffice) postOffice).unbindClusteredQueue(queue
-                        .getName());
+               ((ClusteredPostOffice)postOffice).unbindClusteredQueue(queue.getName());
             }
 
-            String counterName = TopicService.SUBSCRIPTION_MESSAGECOUNTER_PREFIX
-                     + queueName;
+            String counterName = TopicService.SUBSCRIPTION_MESSAGECOUNTER_PREFIX + queueName;
 
             MessageCounter counter = sessionEndpoint.getConnectionEndpoint()
                      .getServerPeer().getMessageCounterManager()
@@ -602,8 +519,7 @@
 
             if (counter == null)
             {
-               throw new IllegalStateException("Cannot find counter to remove "
-                        + counterName);
+               throw new IllegalStateException("Cannot find counter to remove " + counterName);
             }
          }
       }
@@ -637,57 +553,46 @@
 
          started = false;
 
-         // Any message deliveries already transit to the consumer, will just be
-         // ignored by the
+         // Any message deliveries already transit to the consumer, will just be ignored by the
          // MessageCallbackHandler since it will be closed.
          //
          // To clarify, the close protocol (from connection) is as follows:
          //
-         // 1) MessageCallbackHandler::close() - any messages in buffer are
-         // cancelled to the server
+         // 1) MessageCallbackHandler::close() - any messages in buffer are cancelled to the server
          // session, and any subsequent receive messages will be ignored.
          //
-         // 2) ServerConsumerEndpoint::closing() causes stop() this flushes any
-         // deliveries yet to
+         // 2) ServerConsumerEndpoint::closing() causes stop() this flushes any deliveries yet to
          // deliver to the client callback handler.
          //
-         // 3) MessageCallbackHandler::cancelInflightMessages(long
-         // lastDeliveryId) - any deliveries
-         // after lastDeliveryId for the consumer will be considered in flight
-         // and cancelled.
+         // 3) MessageCallbackHandler::cancelInflightMessages(long lastDeliveryId) - any deliveries
+         // after lastDeliveryId for the consumer will be considered in flight and cancelled.
          //
          // 4) ServerConsumerEndpoint:close() - endpoint is deregistered.
          //
-         // 5) Session.close() - acks or cancels any remaining deliveries in the
-         // SessionState as
+         // 5) Session.close() - acks or cancels any remaining deliveries in the SessionState as
          // appropriate.
          //
-         // 6) ServerSessionEndpoint::close() - cancels any remaining deliveries
-         // and deregisters
+         // 6) ServerSessionEndpoint::close() - cancels any remaining deliveries and deregisters
          // session.
          //
          // 7) Client side session executor is shutdown.
          //
          // 8) ServerConnectionEndpoint::close() - connection is deregistered.
          //
-         // 9) Remoting connection listener is removed and remoting connection
-         // stopped.
+         // 9) Remoting connection listener is removed and remoting connection stopped.
 
       }
    }
 
-   // Protected
-   // ------------------------------------------------------------------------------------
+   // Protected ------------------------------------------------------------------------------------
 
-   // Private
-   // --------------------------------------------------------------------------------------
+   // Private --------------------------------------------------------------------------------------
 
    private void promptDelivery()
    {
       sessionEndpoint.promptDelivery(messageQueue);
    }
 
-   // Inner classes
-   // --------------------------------------------------------------------------------
+   // Inner classes --------------------------------------------------------------------------------
 
 }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -37,17 +37,18 @@
  *
  * ConnectionFactoryAdvised.java,v 1.3 2006/03/01 22:56:51 ovidiu Exp
  */
-public class ConnectionFactoryAdvised extends AdvisedSupport implements ConnectionFactoryInternalEndpoint
+public class ConnectionFactoryAdvised extends AdvisedSupport
+   implements ConnectionFactoryInternalEndpoint
 {
-   // Constants -----------------------------------------------------
+   // Constants ------------------------------------------------------------------------------------
 
-   // Static --------------------------------------------------------
+   // Static ---------------------------------------------------------------------------------------
 
-   // Attributes ----------------------------------------------------
+   // Attributes -----------------------------------------------------------------------------------
 
    protected ConnectionFactoryEndpoint endpoint;
 
-   // Constructors --------------------------------------------------
+   // Constructors ---------------------------------------------------------------------------------
 
    public ConnectionFactoryAdvised()
    {
@@ -58,7 +59,7 @@
       this.endpoint = endpoint;
    }
 
-   // ConnectionFactoryEndpoint implementation -----------------------
+   // ConnectionFactoryEndpoint implementation -----------------------------------------------------
 
    public CreateConnectionResult createConnectionDelegate(String username,
                                                           String password,
@@ -73,41 +74,43 @@
       return endpoint.getClientAOPStack();
    }
    
-   // ConnectionFactoryInternalEndpoint implementation -----------------------
-   public CreateConnectionResult createConnectionDelegate(String username,
-                                                          String password,
-                                                          int failedNodeID,
-                                                          String remotingSessionID,
-                                                          String clientVMID,
-                                                          byte versionToUse,
-                                                          ServerInvokerCallbackHandler callbackHandler)
+   // ConnectionFactoryInternalEndpoint implementation ---------------------------------------------
+   public CreateConnectionResult
+      createConnectionDelegate(String username,
+                               String password,
+                               int failedNodeID,
+                               String remotingSessionID,
+                               String clientVMID,
+                               byte versionToUse,
+                               ServerInvokerCallbackHandler callbackHandler)
       throws JMSException
    {
-      return ((ServerConnectionFactoryEndpoint)endpoint).createConnectionDelegate(username, password, failedNodeID,
-                                                      remotingSessionID, clientVMID,
-                                                      versionToUse, callbackHandler);
+      return ((ServerConnectionFactoryEndpoint)endpoint).
+         createConnectionDelegate(username, password, failedNodeID,
+                                  remotingSessionID, clientVMID,
+                                  versionToUse, callbackHandler);
    }
 
-   // AdvisedSupport override ---------------------------------------
+   // AdvisedSupport override ----------------------------------------------------------------------
 
    public Object getEndpoint()
    {
       return endpoint;
    }
 
-   // Public --------------------------------------------------------
+   // Public ---------------------------------------------------------------------------------------
 
    public String toString()
    {
       return "ConnectionFactoryAdvised->" + endpoint;
    }
 
-   // Package protected ---------------------------------------------
+   // Package protected ----------------------------------------------------------------------------
 
-   // Protected -----------------------------------------------------
+   // Protected ------------------------------------------------------------------------------------
 
-   // Private -------------------------------------------------------
+   // Private --------------------------------------------------------------------------------------
 
-   // Inner classes -------------------------------------------------
+   // Inner classes --------------------------------------------------------------------------------
 
 }

Modified: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryCreateConnectionDelegateRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryCreateConnectionDelegateRequest.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryCreateConnectionDelegateRequest.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -25,7 +25,6 @@
 import java.io.DataOutputStream;
 
 import org.jboss.jms.server.endpoint.CreateConnectionResult;
-import org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint;
 import org.jboss.jms.server.endpoint.advised.ConnectionFactoryAdvised;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -2133,6 +2133,25 @@
             //Ok
          }
                          
+//         session1.close();
+//
+//         session2.close();;
+//
+//         Session session3 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//
+//         MessageConsumer cons3 = session3.createConsumer(queue[0]);
+//
+//         TextMessage rm3 = (TextMessage)cons3.receive(2000);
+//
+//         assertNotNull(rm3);
+//
+//         assertEquals(tm3.getText(), rm3.getText());
+//
+//         rm3 = (TextMessage)cons3.receive(2000);
+//
+//         assertNull(rm3);
+
+
       }
       finally
       {

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -52,9 +52,6 @@
 import org.jboss.test.messaging.tools.ServerManagement;
 
 /**
- * 
- * A HATest
- *
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -15,20 +15,14 @@
 import javax.jms.TextMessage;
 
 import org.jboss.jms.client.JBossConnection;
-import org.jboss.jms.client.FailoverEvent;
 import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
 import org.jboss.test.messaging.tools.ServerManagement;
 
-
 /**
- * 
- * A MergeQueueTest
- *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision: 1.1 $</tt>
  *
  * $Id$
- *
  */
 public class MergeQueueTest extends ClusteringTestBase
 {

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java	2007-02-28 01:59:13 UTC (rev 2484)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java	2007-02-28 02:30:33 UTC (rev 2485)
@@ -32,9 +32,6 @@
 import java.util.HashSet;
 
 /**
- * 
- * A XAFailoverTest
- *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision: 1.1 $</tt>
  *




More information about the jboss-cvs-commits mailing list