[jboss-cvs] JBoss Messaging SVN: r2045 - in trunk: src/main/org/jboss/jms/server and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jan 24 20:55:38 EST 2007


Author: ovidiu.feodorov at jboss.com
Date: 2007-01-24 20:55:37 -0500 (Wed, 24 Jan 2007)
New Revision: 2045

Modified:
   trunk/src/main/org/jboss/jms/client/JBossConnection.java
   trunk/src/main/org/jboss/jms/server/ConnectionManager.java
   trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
   trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   trunk/src/main/org/jboss/jms/server/container/InjectionAspect.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
   trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
   trunk/tests/src/org/jboss/test/messaging/jms/crash/CallbackFailureTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashLargeLeaseTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashNegativeLeaseTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTwoConnectionsTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashZeroLeaseTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateClientOnServerCommand.java
   trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateHangingConsumerCommand.java
   trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateTwoClientOnServerCommand.java
   trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
Log:
minor refactoring in preparation of http://jira.jboss.org/jira/browse/JBMESSAGING-779

Modified: trunk/src/main/org/jboss/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnection.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/jms/client/JBossConnection.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -220,7 +220,7 @@
 
    // Public ---------------------------------------------------------------------------------------
 
-   public String getRemotingClientSessionId()
+   public String getRemotingClientSessionID()
    {
       ConnectionState state = (ConnectionState)((ClientConnectionDelegate)delegate).getState();
       

Modified: trunk/src/main/org/jboss/jms/server/ConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ConnectionManager.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/jms/server/ConnectionManager.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -27,6 +27,9 @@
 
 
 /**
+ * An interface that allows management of ConnectionEnpoints and their association with remoting
+ * clients.
+ *
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision$</tt>
@@ -42,16 +45,16 @@
    /**
     * @return null if there is no such connection.
     */
-   ConnectionEndpoint unregisterConnection(String jmsClientVMId, String remotingClientSessionID);
+   ConnectionEndpoint unregisterConnection(String jmsClientVMID, String remotingClientSessionID);
    
-   void handleClientFailure(String remotingSessionID);
-   
-   boolean containsSession(String remotingClientSessionID);
+   boolean containsRemotingSession(String remotingClientSessionID);
 
    /**
-    * Returns a list of active connections currently maintained by an instance of this manager.
-    * The implementation should make a copy of the list to avoid ConcurrentModificationException.
-    * The list could be empty, but never null.
+    * Returns a list of active connection endpoints currently maintained by an instance of this
+    * manager. The implementation should make a copy of the list to avoid
+    * ConcurrentModificationException. The list could be empty, but never null.
+    *
+    * @return List<ConnectionEndpoint>
     */
    List getActiveConnections();
 }

Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -109,10 +109,14 @@
          
          if (refCount == 1 && enablePing)
          {
+            // TODO Something is not quite right here, we can detect failure even if pinging is not
+            // enabled, for example if we try to send a callback to the client and sending the
+            // calback fails
+
             // install the connection listener that listens for failed connections            
             server.invoke(connectorObjectName, "addConnectionListener",
                   new Object[] {connectionManager},
-                  new String[] {"org.jboss.remoting.ConnectionListener"});                     
+                  new String[] {"org.jboss.remoting.ConnectionListener"});
          }
          
          // We use the MBean service name to uniquely identify the connection factory

Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -55,44 +55,47 @@
 
    // Static ---------------------------------------------------------------------------------------
 
+   private static boolean trace = log.isTraceEnabled();
+
    // Attributes -----------------------------------------------------------------------------------
 
+   // Map<jmsClientVMID<String> - Map<remotingClientSessionID<String> - ConnectionEndpoint>>
    protected Map jmsClients;
-   
-   protected Map sessions;
 
-   protected Set activeConnections; 
+   // Map<remotingClientSessionID<String> - jmsClientVMID<String>
+   protected Map remotingSessions;
 
+   // Set<ConnectionEndpoint>
+   protected Set activeConnectionEndpoints;
+
    // Constructors ---------------------------------------------------------------------------------
 
    public SimpleConnectionManager()
    {
       jmsClients = new HashMap();
-      
-      sessions = new HashMap();
-
-      activeConnections = new HashSet();
+      remotingSessions = new HashMap();
+      activeConnectionEndpoints = new HashSet();
    }
 
-   // ConnectionManager ----------------------------------------------------------------------------
+   // ConnectionManager implementation -------------------------------------------------------------
 
-   public synchronized void registerConnection(String jmsClientVMId,
+   public synchronized void registerConnection(String jmsClientVMID,
                                                String remotingClientSessionID,
                                                ConnectionEndpoint endpoint)
    {    
-      Map endpoints = (Map)jmsClients.get(jmsClientVMId);
+      Map endpoints = (Map)jmsClients.get(jmsClientVMID);
       
       if (endpoints == null)
       {
          endpoints = new HashMap();
-         jmsClients.put(jmsClientVMId, endpoints);                  
+         jmsClients.put(jmsClientVMID, endpoints);
       }
       
       endpoints.put(remotingClientSessionID, endpoint);
       
-      sessions.put(remotingClientSessionID, jmsClientVMId);
+      remotingSessions.put(remotingClientSessionID, jmsClientVMID);
 
-      activeConnections.add(endpoint);
+      activeConnectionEndpoints.add(endpoint);
       
       log.debug("registered connection " + endpoint + " as " +
                 Util.guidToString(remotingClientSessionID));
@@ -120,85 +123,29 @@
             jmsClients.remove(jmsClientVMId);
          }
          
-         sessions.remove(remotingClientSessionID);
+         remotingSessions.remove(remotingClientSessionID);
          
          return e;
       }
       return null;
    }
    
-   public synchronized void handleClientFailure(String remotingSessionID)
+   public synchronized List getActiveConnections()
    {
-      String jmsClientId = (String)sessions.get(remotingSessionID);
-      
-      if (jmsClientId != null)
-      {
-         log.warn("A problem has been detected with the connection to remote client " +
-                  remotingSessionID + ". It is possible the client has exited without closing " +
-                  "its connection(s) or there is a network problem. All connection resources " +
-                  "corresponding to that client process will now be removed.");
+      // I will make a copy to avoid ConcurrentModification
+      ArrayList list = new ArrayList();
+      list.addAll(activeConnectionEndpoints);
+      return list;
+   }
 
-         // Remoting only provides one pinger per invoker, not per connection therefore when the
-         // pinger dies we must close ALL the connections corresponding to that jms client id
-         
-         Map endpoints = (Map)jmsClients.get(jmsClientId);                  
-         
-         if (endpoints != null)
-         {
-            List sces = new ArrayList();
-            
-            Iterator iter = endpoints.entrySet().iterator();
-            
-            while (iter.hasNext())
-            {
-               Map.Entry entry = (Map.Entry)iter.next();
-               
-               ConnectionEndpoint sce = (ConnectionEndpoint)entry.getValue();
-               
-               sces.add(sce);                             
-            }
-            
-            //Now close the end points - this will result in a callback into unregisterConnection
-            //to remove the data from the jmsClients and sessions maps.
-            //Note we do this outside the loop to prevent ConcurrentModificationException
-            
-            iter = sces.iterator();
-            
-            while (iter.hasNext())
-            {
-               ConnectionEndpoint sce = (ConnectionEndpoint)iter.next();
-               
-               try
-               {
-                  sce.closing();
-                  sce.close();
-                  log.debug("cleared up state for connection " + sce);
-               }
-               catch (JMSException e)
-               {
-                  log.error("Failed to close connection", e);
-               }
-            }
-         }
-      } 
-   }
-   
    /*
     * Used in testing only
     */
-   public synchronized boolean containsSession(String remotingClientSessionID)
+   public synchronized boolean containsRemotingSession(String remotingClientSessionID)
    {
-      return sessions.containsKey(remotingClientSessionID);
+      return remotingSessions.containsKey(remotingClientSessionID);
    }
 
-   public synchronized List getActiveConnections()
-   {
-      // I will make a copy to avoid ConcurrentModification
-      ArrayList list = new ArrayList();
-      list.addAll(activeConnections);
-      return list;
-   }
-
    /*
     * Used in testing only
     */
@@ -207,24 +154,26 @@
       return Collections.unmodifiableMap(jmsClients);
    }
 
-   // ConnectionListener ---------------------------------------------------------------------------
+   // ConnectionListener implementation ------------------------------------------------------------
 
    /**
     * Be aware that ConnectionNotifier uses to call this method with null Throwables.
-    * @param t - expect it to be null!
+    *
+    * @param t - plan for it to be null!
     */
    public void handleConnectionException(Throwable t, Client client)
    {  
       if (t instanceof ClientDisconnectedException)
       {
          // This is OK
-         if (log.isTraceEnabled()) { log.trace(this + " Notified that client " + client + " has disconnected"); }
+         if (trace) { log.trace(this + " notified that client " + client + " has disconnected"); }
          return;
       }
       else
       {
-         if (log.isTraceEnabled()) { log.trace(this + " Failure on client " + client, t); }
+         if (trace) { log.trace(this + " detected failure on client " + client, t); }
       }
+
       String remotingSessionID = client.getSessionId();
       
       if (remotingSessionID != null)
@@ -247,6 +196,65 @@
 
    // Public ---------------------------------------------------------------------------------------
 
+   /**
+    * TODO - this method shouldn't be part of the public interace
+    */
+   public synchronized void handleClientFailure(String remotingSessionID)
+   {
+      String jmsClientID = (String)remotingSessions.get(remotingSessionID);
+
+      if (jmsClientID != null)
+      {
+         log.warn("A problem has been detected with the connection to remote client " +
+                  remotingSessionID + ". It is possible the client has exited without closing " +
+                  "its connection(s) or there is a network problem. All connection resources " +
+                  "corresponding to that client process will now be removed.");
+
+         // Remoting only provides one pinger per invoker, not per connection therefore when the
+         // pinger dies we must close ALL the connections corresponding to that jms client id
+
+         Map endpoints = (Map)jmsClients.get(jmsClientID);
+
+         if (endpoints != null)
+         {
+            List sces = new ArrayList();
+
+            Iterator iter = endpoints.entrySet().iterator();
+
+            while (iter.hasNext())
+            {
+               Map.Entry entry = (Map.Entry)iter.next();
+
+               ConnectionEndpoint sce = (ConnectionEndpoint)entry.getValue();
+
+               sces.add(sce);
+            }
+
+            // Now close the end points - this will result in a callback into unregisterConnection
+            // to remove the data from the jmsClients and sessions maps.
+            // Note we do this outside the loop to prevent ConcurrentModificationException
+
+            iter = sces.iterator();
+
+            while (iter.hasNext())
+            {
+               ConnectionEndpoint sce = (ConnectionEndpoint)iter.next();
+
+               try
+               {
+                  sce.closing();
+                  sce.close();
+                  log.debug("cleared up state for connection " + sce);
+               }
+               catch (JMSException e)
+               {
+                  log.error("Failed to close connection", e);
+               }
+            }
+         }
+      }
+   }
+
    public String toString()
    {
       return "ConnectionManager[" + Integer.toHexString(hashCode()) + "]";

Modified: trunk/src/main/org/jboss/jms/server/container/InjectionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/container/InjectionAspect.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/jms/server/container/InjectionAspect.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -40,18 +40,16 @@
  */
 public class InjectionAspect
 {
-    // Constants -----------------------------------------------------
+    // Constants -----------------------------------------------------------------------------------
    
-    // Static --------------------------------------------------------
+    // Static --------------------------------------------------------------------------------------
 
-    // Attributes ----------------------------------------------------
+    // Attributes ----------------------------------------------------------------------------------
 
-    // Constructors --------------------------------------------------
+    // Constructors --------------------------------------------------------------------------------
 
-    // Public --------------------------------------------------------
+    // Public --------------------------------------------------------------------------------------
 
-    // Interceptor implementation ------------------------------------
-
     public Object handleCreateConnectionDelegate(Invocation invocation) throws Throwable
     {
        MethodInvocation mi = (MethodInvocation)invocation;
@@ -80,33 +78,28 @@
           endpoint.setCallbackHandler(handler);
           
           // Then we inject the remoting session id of the client
-          String sessionId =
-             (String)mi.getMetaData(MetaDataConstants.JMS,
-                                    MetaDataConstants.REMOTING_SESSION_ID);
-          
+          String sessionId = (String)mi.getMetaData(MetaDataConstants.JMS,
+                                                    MetaDataConstants.REMOTING_SESSION_ID);
           if (sessionId == null)
           {
-             throw new IllegalStateException("Can't find session id");
+             throw new IllegalStateException("Can't find remoting session ID");
           }
           
-          // Then we inject the unique id of the client VM
-          String jmsClientVMID =
-             (String)mi.getMetaData(MetaDataConstants.JMS,
-                                    MetaDataConstants.JMS_CLIENT_VM_ID);
+          // Then we inject the unique ID of the client VM
+          String jmsClientVMID = (String)mi.getMetaData(MetaDataConstants.JMS,
+                                                        MetaDataConstants.JMS_CLIENT_VM_ID);
           
           if (jmsClientVMID == null)
           {
-             throw new IllegalStateException("Can't find jms client id");
+             throw new IllegalStateException("Can't find jms client ID");
           }
           
           endpoint.setRemotingInformation(jmsClientVMID, sessionId);       
           
           // Then we inject the version number from to be used
           
-          Byte ver =
-             (Byte)mi.getMetaData(MetaDataConstants.JMS,
-                                  MetaDataConstants.VERSION_NUMBER);
-          
+          Byte ver = (Byte)mi.getMetaData(MetaDataConstants.JMS,
+                                          MetaDataConstants.VERSION_NUMBER);
           if (ver == null)
           {
              throw new IllegalStateException("Can't find version");

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -91,8 +91,8 @@
    private String username;
    private String password;
 
-   private String remotingClientSessionId;
-   private String jmsClientVMId;
+   private String remotingClientSessionID;
+   private String jmsClientVMID;
 
    // the server itself
    private ServerPeer serverPeer;
@@ -347,7 +347,7 @@
             temporaryDestinations.clear();
          }
    
-         cm.unregisterConnection(jmsClientVMId, remotingClientSessionId);
+         cm.unregisterConnection(jmsClientVMID, remotingClientSessionID);
    
          JMSDispatcher.instance.unregisterTarget(new Integer(id));
 
@@ -459,8 +459,10 @@
    {
       return sm;
    }
-   
-   //IOC
+
+   /**
+    * IOC
+    */
    public void setCallbackHandler(ServerInvokerCallbackHandler handler)
    {
       callbackHandler = handler;
@@ -493,15 +495,16 @@
       return callbackHandler;
    }
 
-   // IOC
-   public void setRemotingInformation(String jmsClientVMId, String remotingClientSessionId)
+   /**
+    * IOC
+    */
+   public void setRemotingInformation(String jmsClientVMID, String remotingClientSessionID)
    {
-      this.remotingClientSessionId = remotingClientSessionId;
+      this.remotingClientSessionID = remotingClientSessionID;
+      this.jmsClientVMID = jmsClientVMID;
       
-      this.jmsClientVMId = jmsClientVMId;
-      
       this.serverPeer.getConnectionManager().
-         registerConnection(jmsClientVMId, remotingClientSessionId, this);
+         registerConnection(jmsClientVMID, remotingClientSessionID, this);
    }
    
    public void setUsingVersion(byte version)
@@ -596,9 +599,9 @@
       }
    }
    
-   String getRemotingClientSessionId()
+   String getRemotingClientSessionID()
    {
-      return remotingClientSessionId;
+      return remotingClientSessionID;
    }
    
    void sendMessage(JBossMessage msg, Transaction tx) throws Exception

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -250,7 +250,7 @@
          }
          catch (HandleCallbackException e)
          {
-            log.error("Failed to handle callback", e);
+            log.error(this + " failed to handle callback", e);
             
             return null;
          }

Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -163,7 +163,7 @@
 
    public void removeListener(InvokerCallbackHandler callbackHandler)
    {
-      log.debug("removing callback handler: " + callbackHandler);
+      log.debug("removing callback handler " + callbackHandler);
 
       synchronized(callbackHandlers)
       {

Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -737,38 +737,37 @@
       {
          //Have reached maximum size - will drop message
          
-         log.warn("Queue " + this + " has reached maximum size, ref " + ref + " will be dropped");
+         log.warn(this + " has reached maximum size, " + ref + " will be dropped");
          
          return null;
       }
    
-      //Each channel has its own copy of the reference
+      // Each channel has its own copy of the reference
       ref = ref.copy();
 
       try
       {
          if (ref.isReliable() && !recoverable)
          {
-            //Reliable reference in a non recoverable channel-
-            //We handle it as a non reliable reference
-            //It's important that we set it to non reliable otherwise if the channel
-            //pages and is non recoverable a reliable ref will be paged in the database as reliable
-            //which makes them hard to remove on server restart.
-            //If we always page them as unreliable then it is easy to remove them.
+            // Reliable reference in a non recoverable channel. We handle it as a non reliable
+            // reference. It's important that we set it to non reliable otherwise if the channel
+            // pages and is non recoverable a reliable ref will be paged in the database as reliable
+            // which makes them hard to remove on server restart. If we always page them as
+            // unreliable then it is easy to remove them.
             ref.setReliable(false);               
          }
          
          if (tx == null)
          {
-            // Don't even attempt synchronous delivery for a reliable message
-            // when we have an non-recoverable state that doesn't accept reliable messages. If
-            // we do, we may get into the situation where we need to reliably store an active
-            // delivery of a reliable message, which in these conditions cannot be done.
+            // Don't even attempt synchronous delivery for a reliable message when we have an
+            // non-recoverable state that doesn't accept reliable messages. If we do, we may get
+            // into the situation where we need to reliably store an active delivery of a reliable
+            // message, which in these conditions cannot be done.
 
             if (ref.isReliable() && !acceptReliableMessages)
             {
-               log.error("Cannot handle reliable message " + ref
-                        + " because the channel has a non-recoverable state!");
+               log.error("Cannot handle reliable message " + ref +
+                  " because the channel has a non-recoverable state!");
                return null;
             }
         
@@ -777,12 +776,12 @@
                // Reliable message in a recoverable state - also add to db
                if (trace) { log.trace(this + " adding " + ref + " to database non-transactionally"); }
 
-               //TODO - this db access could safely be done outside the event loop
+               // TODO - this db access could safely be done outside the event loop
                pm.addReference(channelID, ref, null);        
             }
             
-            //If the ref has a scheduled delivery time then we don't add to the in memory queue
-            //instead we create a timeout, so when that time comes delivery will attempted directly
+            // If the ref has a scheduled delivery time then we don't add to the in memory queue
+            // instead we create a timeout, so when that time comes delivery will attempted directly
             
             if (!checkAndSchedule(ref))
             {               
@@ -791,8 +790,8 @@
                   addReferenceInMemory(ref);
                }
                
-               // We only do delivery if there are receivers that haven't said they don't want
-               // any more references.
+               // We only do delivery if there are receivers that haven't said they don't want any
+               // more references.
                if (receiversReady)
                {
                   // Prompt delivery

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -62,9 +62,6 @@
 import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
 
 /**
- * 
- * A DefaultPostOffice
- *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -75,39 +72,41 @@
  */
 public class DefaultPostOffice extends JDBCSupport implements PostOffice
 {
+   // Constants ------------------------------------------------------------------------------------
+
    private static final Logger log = Logger.getLogger(DefaultPostOffice.class);
-   
-   private boolean trace = log.isTraceEnabled();
-          
-   private String officeName;
-   
-   //This lock protects the condition and name maps
-   protected ReadWriteLock lock;
-   
-   protected MessageStore ms;     
-   
+
+   // Static ---------------------------------------------------------------------------------------
+
+   private static boolean trace = log.isTraceEnabled();
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   protected MessageStore ms;
    protected PersistenceManager pm;
-   
    protected TransactionRepository tr;
-   
+   protected FilterFactory filterFactory;
+   protected ConditionFactory conditionFactory;
+   protected QueuedExecutorPool pool;
    protected int currentNodeId;
-   
+
    // Map <NodeID, Map<queueName, Binding>>
    protected Map nameMaps;
-   
+
    // Map <Condition, Bindings>
    protected Map conditionMap;
-   
-   protected FilterFactory filterFactory;
-   
-   protected ConditionFactory conditionFactory;
-   
-   protected QueuedExecutorPool pool;
-   
+
+   // this lock protects the condition and name maps
+   protected ReadWriteLock lock;
+
+   private String officeName;
+
+   // Constructors ---------------------------------------------------------------------------------
+
    public DefaultPostOffice()
-   {      
+   {
    }
-   
+
    public DefaultPostOffice(DataSource ds, TransactionManager tm, Properties sqlProperties,
                          boolean createTablesOnStartup,
                          int nodeId, String officeName, MessageStore ms,
@@ -115,42 +114,36 @@
                          TransactionRepository tr, FilterFactory filterFactory,
                          ConditionFactory conditionFactory,
                          QueuedExecutorPool pool)
-   {            
+   {
       super (ds, tm, sqlProperties, createTablesOnStartup);
-      
+
       lock = new ReentrantWriterPreferenceReadWriteLock();
-      
+
       nameMaps = new LinkedHashMap();
-       
-      conditionMap = new LinkedHashMap(); 
-      
+
+      conditionMap = new LinkedHashMap();
+
       this.currentNodeId = nodeId;
-      
-      this.officeName = officeName;
-      
       this.ms = ms;
-      
       this.pm = pm;
-      
       this.tr = tr;
-      
       this.filterFactory = filterFactory;
-      
       this.conditionFactory = conditionFactory;
-      
       this.pool = pool;
+      this.officeName = officeName;
+
    }
-   
-   // MessagingComponent implementation --------------------------------
-   
+
+   // MessagingComponent implementation ------------------------------------------------------------
+
    public void start() throws Exception
    {
       if (trace) { log.trace(this + " starting"); }
-      
+
       super.start();
-      
+
       loadBindings();
-      
+
       log.debug(this + " started");
    }
 
@@ -162,114 +155,114 @@
    public void stop(boolean sendNotification) throws Exception
    {
       if (trace) { log.trace(this + " stopping"); }
-      
+
       super.stop();
-      
+
       log.debug(this + " stopped");
    }
-     
-   // PostOffice implementation ---------------------------------------
 
+   // PostOffice implementation --------------------------------------------------------------------
+
    public String getOfficeName()
    {
       return officeName;
    }
-         
+
    public Binding bindQueue(Condition condition, Queue queue) throws Exception
    {
       if (trace) { log.trace(this + " binding queue " + queue.getName() + " with condition " + condition); }
-      
+
       if (queue.getName() == null)
       {
          throw new IllegalArgumentException("Queue name is null");
       }
-      
+
       if (condition == null)
       {
          throw new IllegalArgumentException("Condition is null");
       }
-       
+
       lock.writeLock().acquire();
 
       try
-      {         
+      {
          //We currently only allow one binding per name per node
          Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
-         
+
          Binding binding = null;
-         
+
          if (nameMap != null)
          {
             binding = (Binding)nameMap.get(queue.getName());
          }
-         
+
          if (binding != null)
          {
             throw new IllegalArgumentException("Binding already exists for name " + queue.getName());
          }
-                 
+
          binding = new DefaultBinding(currentNodeId, condition, queue, false);
-         
+
          addBinding(binding);
-               
+
          if (queue.isRecoverable())
          {
-            //Need to write the binding to the db            
-            insertBinding(binding);       
+            //Need to write the binding to the db
+            insertBinding(binding);
          }
-                           
-         return binding;   
+
+         return binding;
       }
       finally
       {
          lock.writeLock().release();
       }
-   }   
-            
+   }
+
    public Binding unbindQueue( String queueName) throws Throwable
    {
       if (trace) { log.trace(this + " unbinding queue " + queueName); }
-             
+
       if (queueName == null)
       {
          throw new IllegalArgumentException("Queue name is null");
       }
-      
+
       lock.writeLock().acquire();
 
       try
-      {         
+      {
          Binding binding = removeBinding(currentNodeId,queueName);
-      
+
          if (binding.getQueue().isRecoverable())
          {
             //Need to remove from db too
-            
+
             deleteBinding(currentNodeId, binding.getQueue().getName());
          }
-         
-         binding.getQueue().removeAllReferences();         
-         
-         return binding;     
+
+         binding.getQueue().removeAllReferences();
+
+         return binding;
       }
       finally
       {
          lock.writeLock().release();
       }
-   }   
-   
+   }
+
    public Collection getBindingsForCondition(Condition condition) throws Exception
    {
       return listBindingsForConditionInternal(condition, true);
-   }  
-   
+   }
+
    public Binding getBindingForQueueName(String queueName) throws Exception
    {
       if (queueName == null)
       {
          throw new IllegalArgumentException("Queue name is null");
       }
-      
+
       lock.readLock().acquire();
 
       try
@@ -282,163 +275,252 @@
       }
    }
 
-   /**
-    * Internal methods (e.g. failOver) will already hold a lock and will need to call
-    * getBindingForQueueNames() without a lock. (Also... I dind't move this method to the protected
-    * section of the code as this is related to getBindingForQueueNames).
-    */
-   protected Binding internalGetBindingForQueueName(String queueName)
+   public boolean route(MessageReference ref, Condition condition, Transaction tx) throws Exception
    {
-      Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
+      if (trace) { log.trace(this + " routing " + ref + " with condition '" + condition + "' " + (tx == null ? "non-transactionally" : " in " + tx)); }
 
-      Binding binding = null;
-
-      if (nameMap != null)
-      {
-         binding = (Binding)nameMap.get(queueName);
-      }
-
-      return binding;
-   }
-
-   public boolean route(MessageReference ref, Condition condition, Transaction tx) throws Exception
-   {
-      if (trace) { log.trace(this + "  routing ref " + ref + " with condition " + condition + " and transaction " + tx); }
-            
       if (ref == null)
       {
          throw new IllegalArgumentException("Message reference is null");
       }
-      
+
       if (condition == null)
       {
          throw new IllegalArgumentException("Condition key is null");
       }
-      
+
       boolean routed = false;
-      
+
       lock.readLock().acquire();
-                   
+
       try
-      {                 
+      {
          Bindings bd = (Bindings)conditionMap.get(condition);
-                             
+
          if (bd != null)
-         {            
+         {
             boolean startInternalTx = false;
-            
+
             if (tx == null && ref.isReliable())
             {
                if (bd.getDurableCount() > 1)
                {
-                  // When routing a persistent message without a transaction then we may need to start an 
-                  // internal transaction in order to route it.
-                  // This is so we can guarantee the message is delivered to all or none of the subscriptions.
-                  // We need to do this if there is more than one durable sub
+                  // When routing a persistent message without a transaction then we may need to
+                  // start an internal transaction in order to route it. This is so we can guarantee
+                  // the message is delivered to all or none of the subscriptions. We need to do
+                  // this if there is more than one durable subscription.
                   startInternalTx = true;
                }
             }
-            
+
             if (startInternalTx)
             {
                tx = tr.createTransaction();
             }
-                        
+
             Collection bindings = bd.getAllBindings();
-            
+
             Iterator iter = bindings.iterator();
-               
+
             while (iter.hasNext())
             {
                Binding binding = (Binding)iter.next();
-               
+
                //Sanity check
                if (binding.getNodeID() != this.currentNodeId)
                {
                   throw new IllegalStateException("Local post office has foreign bindings!");
                }
-                                
+
                Queue queue = binding.getQueue();
-                
+
                Delivery del = queue.handle(null, ref, tx);
-               
+
                if (del != null && del.isSelectorAccepted())
                {
                   routed = true;
-               }                                                                     
-            } 
-            
+               }
+            }
+
             if (startInternalTx)
             {
                //TODO - do we need to rollback if an exception is thrown??
                tx.commit();
             }
          }
-                 
+
          return routed;
       }
       finally
-      {                  
+      {
          lock.readLock().release();
       }
-   } 
-   
+   }
+
    public boolean isLocal()
    {
       return true;
    }
-   
+
    public Binding getBindingforChannelId(long channelId) throws Exception
    {
       lock.readLock().acquire();
-      
+
       try
-      {         
+      {
          Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
-   
+
          if (nameMap == null)
          {
             throw new IllegalStateException("Cannot find name map for current node " + currentNodeId);
          }
-         
+
          Binding binding = null;
-         
+
          for (Iterator iterbindings = nameMap.values().iterator(); iterbindings.hasNext();)
          {
             Binding itemBinding = (Binding)iterbindings.next();
-            
+
             if (itemBinding.getQueue().getChannelID() == channelId)
             {
                binding = itemBinding;
                break;
             }
          }
-         
+
          return binding;
       }
       finally
       {
          lock.readLock().release();
-      }      
+      }
    }
-     
-   // Protected -----------------------------------------------------
-   
-   protected Collection listBindingsForConditionInternal(Condition condition, boolean localOnly) throws Exception
+
+   // Public ---------------------------------------------------------------------------------------
+
+   public String printBindingInformation()
    {
+       StringWriter buffer = new StringWriter();
+       PrintWriter out = new PrintWriter(buffer);
+
+       out.println("Ocurrencies of nameMaps:");
+       out.println("<table border=1>");
+       for (Iterator mapIterator = nameMaps.entrySet().iterator();mapIterator.hasNext();)
+       {
+           Map.Entry entry = (Map.Entry)mapIterator.next();
+           out.println("<tr><td colspan=3><b>Map on node " + entry.getKey() + "</b></td></tr>");
+           Map valuesOnNode = (Map)entry.getValue();
+
+           out.println("<tr><td>Key</td><td>Value</td><td>Class of Value</td></tr>");
+           for (Iterator valuesIterator=valuesOnNode.entrySet().iterator();valuesIterator.hasNext();)
+           {
+               Map.Entry entry2 = (Map.Entry)valuesIterator.next();
+
+               out.println("<tr>");
+               out.println("<td>" + entry2.getKey() + "</td><td>" + entry2.getValue()+
+                  "</td><td>" + entry2.getValue().getClass().getName() + "</td>");
+               out.println("</tr>");
+
+               if (entry2.getValue() instanceof Binding &&
+                  ((Binding)entry2.getValue()).getQueue() instanceof PagingFilteredQueue)
+               {
+                   PagingFilteredQueue queue =
+                      (PagingFilteredQueue)((Binding)entry2.getValue()).getQueue();
+                   List undelivered = queue.undelivered(null);
+                   if (!undelivered.isEmpty())
+                   {
+                       out.println("<tr><td>List of undelivered messages on Paging</td>");
+
+                       out.println("<td colspan=2><table border=1>");
+                       out.println("<tr><td>Reference#</td><td>Message</td></tr>");
+                       for (Iterator i = undelivered.iterator();i.hasNext();)
+                       {
+                           MessageReference reference = (MessageReference)i.next();
+                           out.println("<tr><td>" + reference.getInMemoryChannelCount() +
+                              "</td><td>" + reference.getMessage() +"</td></tr>");
+                       }
+                       out.println("</table></td>");
+                       out.println("</tr>");
+                   }
+               }
+           }
+       }
+
+       out.println("</table>");
+       out.println("<br>Ocurrencies of conditionMap:");
+       out.println("<table border=1>");
+       out.println("<tr><td>EntryName</td><td>Value</td>");
+
+       for (Iterator iterConditions = conditionMap.entrySet().iterator();iterConditions.hasNext();)
+       {
+           Map.Entry entry = (Map.Entry)iterConditions.next();
+           out.println("<tr><td>" + entry.getKey() + "</td><td>" + entry.getValue() + "</td></tr>");
+
+           if (entry.getValue() instanceof Bindings)
+           {
+               out.println("<tr><td>Binding Information:</td><td>");
+               out.println("<table border=1>");
+               out.println("<tr><td>Binding</td><td>Queue</td></tr>");
+               Bindings bindings = (Bindings)entry.getValue();
+               for (Iterator i = bindings.getAllBindings().iterator();i.hasNext();)
+               {
+
+                   Binding binding = (Binding)i.next();
+                   out.println("<tr><td>" + binding + "</td><td>" + binding.getQueue() +
+                      "</td></tr>");
+               }
+               out.println("</table></td></tr>");
+           }
+       }
+       out.println("</table>");
+
+       return buffer.toString();
+   }
+
+   public String toString()
+   {
+      return "DefaultPostOffice[" + Integer.toHexString(hashCode()) + "]";
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   /**
+    * Internal methods (e.g. failOver) will already hold a lock and will need to call
+    * getBindingForQueueNames() without a lock. (Also... I dind't move this method to the protected
+    * section of the code as this is related to getBindingForQueueNames).
+    */
+   protected Binding internalGetBindingForQueueName(String queueName)
+   {
+      Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
+
+      Binding binding = null;
+
+      if (nameMap != null)
+      {
+         binding = (Binding)nameMap.get(queueName);
+      }
+
+      return binding;
+   }
+
+   protected Collection listBindingsForConditionInternal(Condition condition, boolean localOnly)
+      throws Exception
+   {
       if (condition == null)
       {
          throw new IllegalArgumentException("Condition is null");
       }
-      
+
       lock.readLock().acquire();
-      
+
       try
       {
          //We should only list the bindings for the local node
-         
-         Bindings cb = (Bindings)conditionMap.get(condition);                  
-                  
+
+         Bindings cb = (Bindings)conditionMap.get(condition);
+
          if (cb == null)
          {
             return Collections.EMPTY_LIST;
@@ -446,21 +528,21 @@
          else
          {
             List list = new ArrayList();
-            
+
             Collection bindings = cb.getAllBindings();
-            
+
             Iterator iter = bindings.iterator();
-            
+
             while (iter.hasNext())
             {
                Binding binding = (Binding)iter.next();
-               
+
                if (!localOnly || (binding.getNodeID() == this.currentNodeId))
                {
                   list.add(binding);
                }
             }
-            
+
             return list;
          }
       }
@@ -469,13 +551,12 @@
          lock.readLock().release();
       }
    }
-    
+
    //FIXME - this is not quite right
-   //We should not load any bindings at startup - since then we do not have to create any queues internally
-   //Creating queues is problematic since there are params we do not know until destination deploy time
-   //e.g. paging params, maxsize etc.
-   //This means we have to load the queues disabled and then set the params and re-activate them
-   //which is not clean
+   // We should not load any bindings at startup - since then we do not have to create any queues
+   // internally. Creating queues is problematic since there are params we do not know until
+   // destination deploy time e.g. paging params, maxsize etc. This means we have to load the queues
+   // disabled and then set the params and re-activate them which is not clean.
    protected void loadBindings() throws Exception
    {
       lock.writeLock().acquire();
@@ -484,29 +565,29 @@
       PreparedStatement ps  = null;
       ResultSet rs = null;
       TransactionWrapper wrap = new TransactionWrapper();
-         
+
       try
       {
          conn = ds.getConnection();
-         
+
          ps = conn.prepareStatement(getSQLStatement("LOAD_BINDINGS"));
-                 
+
          ps.setString(1, officeName);
 
          rs = ps.executeQuery();
-              
+
          while (rs.next())
          {
             int nodeID = rs.getInt(1);
             String queueName = rs.getString(2);
             String conditionText = rs.getString(3);
             String selector = rs.getString(4);
-            
+
             if (rs.wasNull())
             {
                selector = null;
             }
-            
+
             long channelID = rs.getLong(5);
             boolean failed = rs.getString(6).equals("Y");
 
@@ -527,12 +608,12 @@
                //Don't load other nodes binding
             }
             else
-            {               
+            {
                Binding binding = createBinding(nodeID, condition, queueName, channelID,
                                                selector, true, failed, failedNodeID);
-      
+
                log.debug(this + " loaded from database " + binding);
-               
+
                binding.getQueue().deactivate();
                addBinding(binding);
             }
@@ -564,7 +645,7 @@
    protected Binding createBinding(int nodeID, Condition condition, String queueName,
                                    long channelID, String filterString, boolean durable,
                                    boolean failed, Integer failedNodeID) throws Exception
-   {      
+   {
       Filter filter = filterFactory.createFilter(filterString);
       return createBinding(nodeID, condition, queueName, channelID,
                            filter, durable, failed, failedNodeID);
@@ -590,30 +671,30 @@
          throw new IllegalStateException("This is a non clustered post office - should not " +
             "have bindings from different nodes!");
       }
-      
+
       return new DefaultBinding(nodeID, condition, queue, failed);
    }
-   
+
    protected void insertBinding(Binding binding) throws Exception
    {
       Connection conn = null;
-      PreparedStatement ps  = null;     
+      PreparedStatement ps  = null;
       TransactionWrapper wrap = new TransactionWrapper();
-      
+
       try
       {
          conn = ds.getConnection();
-         
+
          ps = conn.prepareStatement(getSQLStatement("INSERT_BINDING"));
-         
+
          String filterString =
             binding.getQueue().getFilter() == null ?
                null : binding.getQueue().getFilter().getFilterString();
-                  
+
          ps.setString(1, officeName);
          ps.setInt(2, currentNodeId);
          ps.setString(3, binding.getQueue().getName());
-         ps.setString(4, binding.getCondition().toText());         
+         ps.setString(4, binding.getCondition().toText());
          if (filterString != null)
          {
             ps.setString(5, filterString);
@@ -648,28 +729,28 @@
             conn.close();
          }
          wrap.end();
-      }     
+      }
    }
-   
+
    protected boolean deleteBinding(int parameterNodeId, String queueName) throws Exception
    {
       if (parameterNodeId<0) parameterNodeId=this.currentNodeId;
       Connection conn = null;
       PreparedStatement ps  = null;
       TransactionWrapper wrap = new TransactionWrapper();
-      
+
       try
       {
          conn = ds.getConnection();
 
          ps = conn.prepareStatement(getSQLStatement("DELETE_BINDING"));
-         
+
          ps.setString(1, this.officeName);
          ps.setInt(2, parameterNodeId);
          ps.setString(3, queueName);
 
          int rows = ps.executeUpdate();
-         
+
          return rows == 1;
       }
       finally
@@ -683,104 +764,24 @@
             conn.close();
          }
          wrap.end();
-      }     
+      }
    }
 
-   public String printBindingInformation()
-   {
-       StringWriter buffer = new StringWriter();
-       PrintWriter out = new PrintWriter(buffer);
-
-       out.println("Ocurrencies of nameMaps:");
-       out.println("<table border=1>");
-       for (Iterator mapIterator = nameMaps.entrySet().iterator();mapIterator.hasNext();)
-       {
-           Map.Entry entry = (Map.Entry)mapIterator.next();
-           out.println("<tr><td colspan=3><b>Map on node " + entry.getKey() + "</b></td></tr>");
-           Map valuesOnNode = (Map)entry.getValue();
-
-           out.println("<tr><td>Key</td><td>Value</td><td>Class of Value</td></tr>");
-           for (Iterator valuesIterator=valuesOnNode.entrySet().iterator();valuesIterator.hasNext();)
-           {
-               Map.Entry entry2 = (Map.Entry)valuesIterator.next();
-
-               out.println("<tr>");
-               out.println("<td>" + entry2.getKey() + "</td><td>" + entry2.getValue()+ "</td><td>" + entry2.getValue().getClass().getName() + "</td>");
-               out.println("</tr>");
-
-               if (entry2.getValue() instanceof Binding && ((Binding)entry2.getValue()).getQueue() instanceof PagingFilteredQueue)
-               {
-                   PagingFilteredQueue queue = (PagingFilteredQueue)((Binding)entry2.getValue()).getQueue();
-                   List undelivered = queue.undelivered(null);
-                   if (!undelivered.isEmpty())
-                   {
-                       out.println("<tr><td>List of undelivered messages on Paging</td>");
-
-                       out.println("<td colspan=2><table border=1>");
-                       out.println("<tr><td>Reference#</td><td>Message</td></tr>");
-                       for (Iterator iterUndelivered = undelivered.iterator();iterUndelivered.hasNext();)
-                       {
-                           MessageReference reference = (MessageReference)iterUndelivered.next();
-                           out.println("<tr><td>" + reference.getInMemoryChannelCount() + "</td><td>" + reference.getMessage() +"</td></tr>");
-                       }
-                       out.println("</table></td>");
-                       out.println("</tr>");
-                   }
-               }
-               //out.println("   bindingName=" +entry2.getKey() + " value = " + entry2.getValue() + " valueClass=" + entry2.getValue().getClass().getName());
-           }
-       }
-       out.println("</table>");
-
-
-
-       out.println("<br>Ocurrencies of conditionMap:");
-
-       out.println("<table border=1>");
-       out.println("<tr><td>EntryName</td><td>Value</td>");
-       for (Iterator iterConditions = conditionMap.entrySet().iterator();iterConditions.hasNext();)
-       {
-           Map.Entry entry = (Map.Entry)iterConditions.next();
-           out.println("<tr><td>" + entry.getKey() + "</td><td>" + entry.getValue() + "</td></tr>");
-
-           if (entry.getValue() instanceof Bindings)
-           {
-               out.println("<tr><td>Binding Information:</td><td>");
-               out.println("<table border=1>");
-               out.println("<tr><td>Binding</td><td>Queue</td></tr>");
-               Bindings bindings = (Bindings)entry.getValue();
-               for (Iterator iterBindings = bindings.getAllBindings().iterator();iterBindings.hasNext();)
-               {
-
-                   Binding binding = (Binding)iterBindings.next();
-                   out.println("<tr><td>" + binding + "</td><td>" + binding.getQueue() + "</td></tr>");
-               }
-               out.println("</table></td></tr>");
-           }
-       }
-       out.println("</table>");
-
-
-       return buffer.toString();
-
-
-   }
-
    protected void addBinding(Binding binding)
    {
       addToNameMap(binding);
       addToConditionMap(binding);
-   }   
-   
+   }
+
    protected Binding removeBinding(int nodeId, String queueName)
    {
       Binding binding = removeFromNameMap(nodeId, queueName);
-                  
+
       removeFromConditionMap(binding);
-      
+
       return binding;
    }
-   
+
    protected void addToNameMap(Binding binding)
    {
       Integer nodeID = new Integer(binding.getNodeID());
@@ -796,84 +797,84 @@
 
       if (trace) { log.trace(this + " added " + binding + " to name map"); }
    }
-   
+
    protected void addToConditionMap(Binding binding)
    {
       Condition condition = binding.getCondition();
-      
+
       Bindings bindings = (Bindings)conditionMap.get(condition);
-      
+
       if (bindings == null)
       {
          bindings = new DefaultBindings();
-         
+
          conditionMap.put(condition, bindings);
       }
-      
+
       bindings.addBinding(binding);
 
       if (trace) { log.trace(this + " added " + binding + " to condition map"); }
    }
-   
+
    protected Binding removeFromNameMap(int nodeId, String queueName)
    {
       if (queueName == null)
       {
          throw new IllegalArgumentException("Queue name is null");
       }
-             
+
       Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
-      
+
       if (nameMap == null)
       {
          throw new IllegalArgumentException("Cannot find any bindings for node Id: " + nodeId);
       }
-      
-      Binding binding = null;            
-      
+
+      Binding binding = null;
+
       if (nameMap != null)
       {
          binding = (Binding)nameMap.remove(queueName);
       }
-      
+
       if (binding == null)
       {
          throw new IllegalArgumentException("Name map does not contain binding for " + queueName);
       }
-              
+
       if (nameMap.isEmpty())
       {
          nameMaps.remove(new Integer(nodeId));
       }
-      
+
       return binding;
    }
-   
+
    protected void removeFromConditionMap(Binding binding)
    {
       Bindings bindings = (Bindings)conditionMap.get(binding.getCondition());
-      
+
       if (bindings == null)
       {
          throw new IllegalStateException("Cannot find condition bindings for " +
             binding.getCondition());
       }
-      
+
       boolean removed = bindings.removeBinding(binding);
-      
+
       if (!removed)
       {
          throw new IllegalStateException("Cannot find binding in condition binding list");
-      }           
-      
+      }
+
       if (bindings.isEmpty())
       {
          conditionMap.remove(binding.getCondition());
-      }        
-   }         
+      }
+   }
 
    protected Map getDefaultDMLStatements()
-   {                
+   {
       Map map = new LinkedHashMap();
 
       map.put("INSERT_BINDING",
@@ -904,7 +905,7 @@
 
       return map;
    }
-   
+
    protected Map getDefaultDDLStatements()
    {
       Map map = new LinkedHashMap();
@@ -915,8 +916,12 @@
               "FAILED_NODE_ID INTEGER)");
       return map;
    }
-   
-   // Private -------------------------------------------------------             
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
+   // Private -------------------------------------------------------
                  
    // Inner classes ------------------------------------------------- 
       

Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/CallbackFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/CallbackFailureTest.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/CallbackFailureTest.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -33,9 +33,6 @@
 import org.jboss.test.messaging.tools.jndi.InVMInitialContextFactory;
 
 /**
- * 
- * A CallbackFailureTest.
- * 
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision$</tt>
  *
@@ -43,39 +40,38 @@
  */
 public class CallbackFailureTest extends MessagingTestCase
 {
-   // Constants -----------------------------------------------------
+   // Constants ------------------------------------------------------------------------------------
 
-   // Static --------------------------------------------------------
+   // Static ---------------------------------------------------------------------------------------
    
-   // Attributes ----------------------------------------------------
+   // Attributes -----------------------------------------------------------------------------------
    
    protected Server localServer;
-   
    protected Server remoteServer;
 
-   // Constructors --------------------------------------------------
+   // Constructors ---------------------------------------------------------------------------------
 
    public CallbackFailureTest(String name)
    {
       super(name);
    }
 
-   // Public --------------------------------------------------------
+   // Public ---------------------------------------------------------------------------------------
 
    public void setUp() throws Exception
    {
       super.setUp();
       
-      //Start the local server
+      // Start the local server
       localServer = new LocalTestServer();
       
-      //Start all the services locally
+      // Start all the services locally
       localServer.start("all", true);
             
       localServer.deployQueue("Queue", null, false);
            
-      //Connect to the remote server, but don't start a servicecontainer on it
-      //We are only using the remote server to open a client connection to the local server
+      // Connect to the remote server, but don't start a servicecontainer on it. We are only using
+      // the remote server to open a client connection to the local server.
       ServerManagement.create();
           
       remoteServer = ServerManagement.getServer();
@@ -87,21 +83,26 @@
    }
         
    /*
-    * Test that when a client callback fails, server side resources for connections are cleaned-up
+    * Test that when a client callback fails, server side resources for connections are cleaned-up.
     */
    public void testCallbackFailure() throws Exception
    {
-      if (!ServerManagement.isRemote()) return;
+      if (!ServerManagement.isRemote())
+      {
+         fail("this test should be run in a remote configuration");
+      }
+
+      // we need to disable exception listener otherwise it will clear up the connection itself
       
-      //We need to disable exception listener otherwise it will clear up the connection itself
+      ObjectName remoteConnectorName = ServiceContainer.REMOTING_OBJECT_NAME;
       
-      ObjectName connectorName = ServiceContainer.REMOTING_OBJECT_NAME;
-      
       ConnectionManager cm = localServer.getServerPeer().getConnectionManager();
       
-      localServer.getServerPeer().getServer().invoke(connectorName, "removeConnectionListener",
-                                                     new Object[] {cm},
-                                                     new String[] {"org.jboss.remoting.ConnectionListener"}); 
+      localServer.getServerPeer().getServer().
+         invoke(remoteConnectorName,
+                "removeConnectionListener",
+                new Object[] {cm},
+                new String[] {"org.jboss.remoting.ConnectionListener"});
        
       InitialContext ic = new InitialContext(InVMInitialContextFactory.getJNDIEnvironment());
       
@@ -115,46 +116,47 @@
       
       remoteServer.kill();
         
-      //we have removed the exception listener so the server side resouces shouldn't be cleared up
-      
-      Thread.sleep(20000);
+      // we have removed the exception listener so the server side resouces shouldn't be cleared up
+
+      log.info("sleeping for 1 min ...");
+      Thread.sleep(10);
                  
-      assertTrue(cm.containsSession(remotingSessionId));
+      assertTrue(cm.containsRemotingSession(remotingSessionId));
       
-      //Now we send a message which should prompt delivery to the dead consumer causing
-      //an exception which should cause connection cleanup
+      // Now we send a message which should prompt delivery to the dead consumer causing
+      // an exception which should cause connection cleanup
                   
       Connection conn = cf.createConnection();
       
       Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         
       MessageProducer prod = sess.createProducer(queue);
-      
+
+      // sending just one message should be enough to trigger the failure and client smacking
       prod.send(sess.createMessage());
-      prod.send(sess.createMessage());
-      prod.send(sess.createMessage());
-      prod.send(sess.createMessage());
-      prod.send(sess.createMessage());
-      prod.send(sess.createMessage());
-      prod.send(sess.createMessage());
-      prod.send(sess.createMessage());
-      prod.send(sess.createMessage());
-      prod.send(sess.createMessage());
-      prod.send(sess.createMessage());
+
+      log.info("sleeping for 45 secs ...");
+      Thread.sleep(10);
       
-      Thread.sleep(45000);
-      
-      assertFalse(cm.containsSession(remotingSessionId));   
+      assertFalse(cm.containsRemotingSession(remotingSessionId));
+
+      // make sure the message is still in queue
+
+      MessageConsumer cons = sess.createConsumer(queue);
+      Message m = cons.receive(1000);
+      assertNotNull(m);
+
+
+      cons.close();
                
    }
    
-   // Package protected ---------------------------------------------
+   // Package protected ----------------------------------------------------------------------------
    
-   // Protected -----------------------------------------------------
+   // Protected ------------------------------------------------------------------------------------
    
-   // Private -------------------------------------------------------
+   // Private --------------------------------------------------------------------------------------
    
-  
-   // Inner classes -------------------------------------------------
+   // Inner classes --------------------------------------------------------------------------------
 
 }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashLargeLeaseTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashLargeLeaseTest.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashLargeLeaseTest.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -111,7 +111,7 @@
       
       ConnectionManager cm = localServer.getServerPeer().getConnectionManager();
             
-      assertTrue(cm.containsSession(remotingSessionId));
+      assertTrue(cm.containsRemotingSession(remotingSessionId));
       
       // Now we should have a client connection from the remote server to the local server
       
@@ -124,7 +124,7 @@
       // See if we still have a connection with this id
       
       //Connection state shouldn't have been cleared up by now
-      assertTrue(cm.containsSession(remotingSessionId));            
+      assertTrue(cm.containsRemotingSession(remotingSessionId));
    }
    
    

Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashNegativeLeaseTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashNegativeLeaseTest.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashNegativeLeaseTest.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -113,7 +113,7 @@
       
       ConnectionManager cm = localServer.getServerPeer().getConnectionManager();
             
-      assertTrue(cm.containsSession(remotingSessionId));
+      assertTrue(cm.containsRemotingSession(remotingSessionId));
       
       // Now we should have a client connection from the remote server to the local server
       
@@ -124,7 +124,7 @@
       Thread.sleep(15000);
            
       // See if we still have a connection with this id
-      assertTrue(cm.containsSession(remotingSessionId));            
+      assertTrue(cm.containsRemotingSession(remotingSessionId));
    }
    
    

Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTest.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTest.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -115,7 +115,7 @@
       
       ConnectionManager cm = localServer.getServerPeer().getConnectionManager();
             
-      assertTrue(cm.containsSession(remotingSessionId));
+      assertTrue(cm.containsRemotingSession(remotingSessionId));
       
       // Now we should have a client connection from the remote server to the local server
       
@@ -126,7 +126,7 @@
       Thread.sleep(15000);
            
       // See if we still have a connection with this id
-      assertFalse(cm.containsSession(remotingSessionId));            
+      assertFalse(cm.containsRemotingSession(remotingSessionId));
    }
    
    // Package protected ---------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTwoConnectionsTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTwoConnectionsTest.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTwoConnectionsTest.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -120,8 +120,8 @@
       log.info("server(1) = " + remotingSessionId[1]);
       log.info("we have = " + ((SimpleConnectionManager)cm).getClients().size() + " clients registered on SimpleconnectionManager");
       
-      assertFalse(cm.containsSession(remotingSessionId[0]));            
-      assertTrue(cm.containsSession(remotingSessionId[1]));          
+      assertFalse(cm.containsRemotingSession(remotingSessionId[0]));
+      assertTrue(cm.containsRemotingSession(remotingSessionId[1]));
       
       // Now we should have a client connection from the remote server to the local server
       remoteServer.kill();
@@ -133,8 +133,8 @@
       // See if we still have a connection with this id
       
       //Connection state should have been cleared up by now
-      assertFalse(cm.containsSession(remotingSessionId[0]));            
-      assertFalse(cm.containsSession(remotingSessionId[1]));            
+      assertFalse(cm.containsRemotingSession(remotingSessionId[0]));
+      assertFalse(cm.containsRemotingSession(remotingSessionId[1]));
       
       log.info("Servers = " + ((SimpleConnectionManager)cm).getClients().size());
       

Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashZeroLeaseTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashZeroLeaseTest.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashZeroLeaseTest.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -113,7 +113,7 @@
       
       ConnectionManager cm = localServer.getServerPeer().getConnectionManager();
             
-      assertTrue(cm.containsSession(remotingSessionId));
+      assertTrue(cm.containsRemotingSession(remotingSessionId));
       
       // Now we should have a client connection from the remote server to the local server
       
@@ -124,7 +124,7 @@
       Thread.sleep(15000);
            
       // See if we still have a connection with this id
-      assertTrue(cm.containsSession(remotingSessionId));            
+      assertTrue(cm.containsRemotingSession(remotingSessionId));
    }
    
    

Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateClientOnServerCommand.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateClientOnServerCommand.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateClientOnServerCommand.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -91,7 +91,7 @@
       //Leave the connection unclosed
       
       //Return the remoting client session id for the connection
-      return ((JBossConnection)conn).getRemotingClientSessionId();      
+      return ((JBossConnection)conn).getRemotingClientSessionID();
    }
 
 }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateHangingConsumerCommand.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateHangingConsumerCommand.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateHangingConsumerCommand.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -33,9 +33,6 @@
 import org.jboss.test.messaging.tools.jmx.rmi.Command;
 
 /**
- * 
- * A CreateHangingConsumerCommand.
- * 
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision$</tt>
  *
@@ -46,15 +43,11 @@
    private static final long serialVersionUID = -997724797145152821L;
    
    private ConnectionFactory cf;
-   
    private Queue queue;
-   
-   private static MessageConsumer consumer;
-   
+
    public CreateHangingConsumerCommand(ConnectionFactory cf, Queue queue)
    {
       this.cf = cf;
-      
       this.queue = queue;
    }
    
@@ -66,21 +59,20 @@
        
       conn.start();
       
-      consumer = sess.createConsumer(queue);
+      MessageConsumer cons = sess.createConsumer(queue);
+
+      cons.setMessageListener(new Listener());
       
-      consumer.setMessageListener(new Listener());
+      // leave the connection unclosed
       
-      //Leave the connection unclosed
-      
-      //Return the remoting client session id for the connection
-      return ((JBossConnection)conn).getRemotingClientSessionId();      
+      // return the remoting client session id for the connection
+      return ((JBossConnection)conn).getRemotingClientSessionID();      
    }
    
    class Listener implements MessageListener
    {
       public void onMessage(Message m)
       {
-         
       }
    }
 

Modified: trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateTwoClientOnServerCommand.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateTwoClientOnServerCommand.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/crash/CreateTwoClientOnServerCommand.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -85,8 +85,8 @@
       conn2.start();
 
       String arrays[] = new String[2];
-      arrays[0] = ((JBossConnection)conn1).getRemotingClientSessionId();
-      arrays[1] = ((JBossConnection)conn2).getRemotingClientSessionId();
+      arrays[0] = ((JBossConnection)conn1).getRemotingClientSessionID();
+      arrays[1] = ((JBossConnection)conn2).getRemotingClientSessionID();
       
       conn1.close();
       

Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -201,12 +201,12 @@
       cm.registerConnection("jvm3", "sessionid5", e5);
       cm.registerConnection("jvm3", "sessionid6", e6);
 
-      assertTrue(cm.containsSession("sessionid1"));
-      assertTrue(cm.containsSession("sessionid2"));
-      assertTrue(cm.containsSession("sessionid3"));
-      assertTrue(cm.containsSession("sessionid4"));
-      assertTrue(cm.containsSession("sessionid5"));
-      assertTrue(cm.containsSession("sessionid6"));
+      assertTrue(cm.containsRemotingSession("sessionid1"));
+      assertTrue(cm.containsRemotingSession("sessionid2"));
+      assertTrue(cm.containsRemotingSession("sessionid3"));
+      assertTrue(cm.containsRemotingSession("sessionid4"));
+      assertTrue(cm.containsRemotingSession("sessionid5"));
+      assertTrue(cm.containsRemotingSession("sessionid6"));
 
       ConnectionEndpoint r1 = cm.unregisterConnection("jvm3", "sessionid6");
       assertEquals(e6, r1);
@@ -214,21 +214,21 @@
 
       assertNull(cm.unregisterConnection("blah", "blah"));
 
-      assertFalse(cm.containsSession("sessionid6"));
+      assertFalse(cm.containsRemotingSession("sessionid6"));
 
       ConnectionEndpoint r2 = cm.unregisterConnection("jvm3", "sessionid5");
       assertEquals(e5, r2);
       assertFalse(e5.isClosed());
 
-      assertFalse(cm.containsSession("sessionid5"));
+      assertFalse(cm.containsRemotingSession("sessionid5"));
 
       cm.handleClientFailure("sessionid4");
 
       assertNull(cm.unregisterConnection("jvm2", "sessionid4"));
       assertNull(cm.unregisterConnection("jvm2", "sessionid3"));
 
-      assertFalse(cm.containsSession("sessionid4"));
-      assertFalse(cm.containsSession("sessionid3"));
+      assertFalse(cm.containsRemotingSession("sessionid4"));
+      assertFalse(cm.containsRemotingSession("sessionid3"));
 
       assertTrue(e3.isClosed());
       assertTrue(e4.isClosed());
@@ -241,8 +241,8 @@
       assertEquals(e2, r4);
       assertFalse(e2.isClosed());
 
-      assertFalse(cm.containsSession("sessionid2"));
-      assertFalse(cm.containsSession("sessionid1"));
+      assertFalse(cm.containsRemotingSession("sessionid2"));
+      assertFalse(cm.containsRemotingSession("sessionid1"));
 
    }
 

Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java	2007-01-25 00:05:49 UTC (rev 2044)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java	2007-01-25 01:55:37 UTC (rev 2045)
@@ -1224,7 +1224,8 @@
                       "callbackStore=org.jboss.remoting.callback.BlockingCallbackStore&" +
                       "clientSocketClass=org.jboss.jms.client.remoting.ClientSocketWrapper&" +
                       "serverSocketClass=org.jboss.jms.server.remoting.ServerSocketWrapper&" +
-                      "NumberOfRetries=1&NumberOfCallRetries=1";
+                      "NumberOfRetries=1&" +
+                      "NumberOfCallRetries=1;"
 
       // specific parameters per transport
 




More information about the jboss-cvs-commits mailing list