[jboss-cvs] JBoss Messaging SVN: r7029 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456: src/main/org/jboss/jms/client/container and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue May 26 11:36:57 EDT 2009


Author: jbertram at redhat.com
Date: 2009-05-26 11:36:56 -0400 (Tue, 26 May 2009)
New Revision: 7029

Added:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java
Modified:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/etc/remoting/remoting-bisocket-service.xml
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/etc/remoting/remoting-sslbisocket-service.xml
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/build.xml
Log:
[JBPAPP-2030]

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/etc/remoting/remoting-bisocket-service.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/etc/remoting/remoting-bisocket-service.xml	2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/etc/remoting/remoting-bisocket-service.xml	2009-05-26 15:36:56 UTC (rev 7029)
@@ -57,7 +57,11 @@
                <attribute name="secondaryBindPort">xyz</attribute>                           
                <attribute name="secondaryConnectPort">abc</attribute>               
                -->
-                          
+
+               <attribute name="failureDisconnectTimeout" isParam="true">0</attribute>
+               <attribute name="callbackErrorsAllowed">1</attribute>
+               <attribute name="useClientConnectionIdentity" isParam="true">true</attribute>
+           
             </invoker>
             <handlers>
                <handler subsystem="JMS">org.jboss.jms.server.remoting.JMSServerInvocationHandler</handler>

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/etc/remoting/remoting-sslbisocket-service.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/etc/remoting/remoting-sslbisocket-service.xml	2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/etc/remoting/remoting-sslbisocket-service.xml	2009-05-26 15:36:56 UTC (rev 7029)
@@ -56,6 +56,10 @@
                <attribute name="secondaryConnectPort">abc</attribute>               
                -->
                
+               <attribute name="failureDisconnectTimeout" isParam="true">0</attribute>
+               <attribute name="callbackErrorsAllowed">1</attribute>
+               <attribute name="useClientConnectionIdentity" isParam="true">true</attribute>
+
             </invoker>
             <handlers>
                <handler subsystem="JMS">org.jboss.jms.server.remoting.JMSServerInvocationHandler</handler>

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2009-05-26 15:36:56 UTC (rev 7029)
@@ -31,7 +31,6 @@
 import org.jboss.jms.client.delegate.ClientProducerDelegate;
 import org.jboss.jms.client.delegate.ClientSessionDelegate;
 import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.client.state.BrowserState;
 import org.jboss.jms.client.state.ConnectionState;
@@ -92,14 +91,6 @@
          Version versionToUse = connectionDelegate.getVersionToUse();
          JMSRemotingConnection remotingConnection = connectionDelegate.getRemotingConnection();
 
-         // install the consolidated remoting connection listener; it will be de-installed on
-         // connection closing by ConnectionAspect
-
-         ConsolidatedRemotingConnectionListener listener =
-            new ConsolidatedRemotingConnectionListener();
-
-         remotingConnection.addConnectionListener(listener);
-
          if (versionToUse == null)
          {
             throw new IllegalStateException("Connection version is null");
@@ -109,7 +100,8 @@
             new ConnectionState(serverID, connectionDelegate,
                                 remotingConnection, versionToUse);
 
-         listener.setConnectionState(connectionState);
+         remotingConnection.getConnectionListener().setConnectionState(connectionState);
+         remotingConnection.getConnectionListener().start();
           
          connectionDelegate.setState(connectionState);
       }

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2009-05-26 15:36:56 UTC (rev 7029)
@@ -30,6 +30,7 @@
 import javax.jms.JMSException;
 
 import org.jboss.jms.client.container.JMSClientVMIdentifier;
+import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.delegate.ConnectionFactoryDelegate;
 import org.jboss.jms.delegate.CreateConnectionResult;
@@ -149,7 +150,7 @@
       
       try
       {         
-         remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing, strictTck);
+         remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing, strictTck, new ConsolidatedRemotingConnectionListener(), sendAcksAsync);
          
          remotingConnection.start();
    

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java	2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java	2009-05-26 15:36:56 UTC (rev 7029)
@@ -42,6 +42,8 @@
    private ExceptionListener jmsExceptionListener;
 
    private ConnectionFailureListener remotingListener;
+      
+   private boolean started;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -53,6 +55,11 @@
 
    public void handleConnectionException(Throwable throwable, Client client)
    {
+      if (!started)
+      {
+         return;
+      }
+            
       // forward the exception to delegate listener and JMS ExceptionListeners; synchronize
       // to avoid race conditions
 
@@ -162,6 +169,11 @@
       }
       return state + ".ConsolidatedListener";
    }
+      
+   public void start()
+   {
+      started = true;
+   }
 
    // Package protected ----------------------------------------------------------------------------
 

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2009-05-26 15:36:56 UTC (rev 7029)
@@ -256,10 +256,17 @@
 
    public JMSRemotingConnection(String serverLocatorURI, boolean clientPing, boolean strictTck) throws Exception
    {
+      this(serverLocatorURI, clientPing, strictTck, null, sendAcksAsync);
+   }
+      
+   public JMSRemotingConnection(String serverLocatorURI, boolean clientPing, boolean strictTck, ConsolidatedRemotingConnectionListener listener, boolean sendAcksAsync) throws Exception
+   {
       serverLocator = new InvokerLocator(serverLocatorURI);
       this.clientPing = clientPing;
       this.strictTck = strictTck;
-
+      this.sendAcksAsync = sendAcksAsync;
+      this.remotingConnectionListener = listener;
+      
       log.trace(this + " created");
    }
 
@@ -316,7 +323,14 @@
       {
          public Object run() throws Exception
          {
-            client.connect();
+            if (remotingConnectionListener != null)
+            {
+               client.connect(remotingConnectionListener, serverLocator.getParameters());
+            }
+            else
+            {
+               client.connect();
+            }
             onewayClient.connect();
             return null;
          }
@@ -345,7 +359,7 @@
    public void stop()
    {
       log.trace(this + " stop");
-
+      
       // explicitly remove the callback listener, to avoid race conditions on server
       // (http://jira.jboss.org/jira/browse/JBMESSAGING-535)
 
@@ -421,6 +435,11 @@
    public synchronized void setFailed()
    {
       failed = true;
+      
+      if (client == null) 
+      {
+         return;
+      }
 
       // Remoting has the bad habit of letting the job of cleaning after a failed connection up to
       // the application. Here, we take care of that, by disconnecting the remoting client, and
@@ -429,7 +448,7 @@
 
       try
       {
-      	client.setDisconnectTimeout(0);
+         client.setDisconnectTimeout(0);
       }
       catch (Throwable ignore)
       {      	
@@ -465,7 +484,7 @@
       return true;
    }
 
-   public synchronized void addPlainConnectionListener(ConnectionListener listener)
+   public synchronized void addPlainConnectionListener(final ConnectionListener listener)
    {
       client.addConnectionListener(listener);
    }

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java	2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java	2009-05-26 15:36:56 UTC (rev 7029)
@@ -1178,7 +1178,10 @@
    {
       if (sessions.remove(id) == null)
       {
-         throw new IllegalStateException("Cannot find session with id " + id + " to remove");
+         //here we don't throw exception as the session may have been removed already due to server side
+         //failure handler (SimpleConnectionManager), which deemed to be normal behavior in application environment.
+         if (log.isTraceEnabled()) { log.trace("Cannot find session with id " + id + " to remove"); }
+         // throw new IllegalStateException("Cannot find session with id " + id + " to remove");
       }
    }
 

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2009-05-26 15:36:56 UTC (rev 7029)
@@ -29,8 +29,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import javax.jms.JMSException;
-
 import org.jboss.jms.delegate.ConnectionEndpoint;
 import org.jboss.jms.server.ConnectionManager;
 import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
@@ -167,6 +165,7 @@
          
          return e;
       }
+      
       return null;
    }
    
@@ -221,17 +220,20 @@
    {  
       if (t instanceof ClientDisconnectedException)
       {
-         // This is OK
-         if (trace) { log.trace(this + " notified that client " + client + " has disconnected"); }
-         return;
+         if (log.isTraceEnabled())
+         {
+            log.trace("Connection is closed normally: " + client);
+         }
       }
       else
       {
-         if (trace) { log.trace(this + " detected failure on client " + client, t); }
+         if (log.isTraceEnabled())
+         {
+            log.trace("Connection is closed on failure event: " + client);
+         }
       }
+      String remotingSessionID = client.getSessionId();
 
-      String remotingSessionID = client.getSessionId();
-      
       if (remotingSessionID != null)
       {
          handleClientFailure(remotingSessionID);
@@ -401,7 +403,7 @@
 
             try
             {
-               ((ServerInvokerCallbackHandler)entry.getValue()).destroy();
+               ((ServerInvokerCallbackHandler)entry.getValue()).shutdown();
             }
             catch (Throwable ignore)
             {
@@ -421,10 +423,10 @@
    {      
    	String jmsClientID = remotingSessions.get(jmsSessionID);
    	
-      log.warn("A problem has been detected " +
-               "with the connection to remote client " +
-               jmsSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
-            "its connection(s) or the network has failed. All associated connection resources will be cleaned up.");
+      log.trace("A problem has been detected " +
+                    "with the connection to remote client " +
+                     jmsSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
+                  "its connection(s) or the network has failed. All associated connection resources will be cleaned up.");
    	
    	if (jmsClientID != null)
    	{      	
@@ -467,7 +469,6 @@
                catch (Throwable ignore)
                {            	
                }
-               
                return;
          	}
          }
@@ -493,7 +494,7 @@
    
                try
                {
-                  ((ServerInvokerCallbackHandler)entry.getValue()).destroy();
+                  ((ServerInvokerCallbackHandler)entry.getValue()).shutdown();
                }
                catch (Throwable ignore)
                {

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2009-05-26 15:36:56 UTC (rev 7029)
@@ -95,7 +95,7 @@
    // Attributes -----------------------------------------------------------------------------------
 
    private String id;
-
+   
    private volatile boolean closed;
    private volatile boolean started;
 
@@ -366,6 +366,13 @@
    {
       try
       {
+         //reason for synchronization
+         //Sometimes the server side detects a connection failure but 
+         //client side is normal. So it's possible the client side is calling
+         //connection.close() while in the mean time the server side connection
+         //failure handler call it also.
+         synchronized (this)
+         {
          if (trace) { log.trace(this + " close()"); }
 
          if (closed)
@@ -437,11 +444,15 @@
             temporaryDestinations.clear();
          }
 
+         closed = true;
+      }
+      
+         //we put this outside the sync loop to avoid dead lock where
+         //SimpleConnectionManager.handleClientFailure() holds itself and then tries to call this close(), which requires lock on this
+         //meanwhile this close() (called from client) holds itself and call unregisterConnection(), which requires lock on SimpleConnectionManager.
          cm.unregisterConnection(jmsClientVMID, remotingClientSessionID);
 
          Dispatcher.instance.unregisterTarget(id, this);
-
-         closed = true;
       }
       catch (Throwable t)
       {
@@ -649,7 +660,10 @@
       {
          if (sessions.remove(sessionId) == null)
          {
-            throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
+            //Here not to throw exception, because it is possible that the session close can be 
+            //called from server side (SimpleConnectionManager) before client side.
+            if (trace) { log.trace("Cannot find session with id " + sessionId + " to remove"); }
+            //throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
          }
       }
    }
@@ -752,7 +766,7 @@
       else if (dest.isQueue())
       {
          if (trace) { log.trace(this + " routing " + msg + " to queue"); }
-        if (!postOffice.route(ref, new JMSCondition(true, dest.getName()), tx))
+         if (!postOffice.route(ref, new JMSCondition(true, dest.getName()), tx))
          {
             throw new JMSException("Failed to route " + ref + " to " + dest.getName());
          }

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2009-05-26 15:36:56 UTC (rev 7029)
@@ -1119,16 +1119,22 @@
       {
          if (consumers.remove(consumerId) == null)
          {
-            throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
+            if (trace) { log.trace("Cannot find consumer with id " + consumerId + " to remove"); }
+            //don't throw, as it maybe called twice from client and server's connection failure handler.
+            //throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
          }
       }
    }
 
    void localClose() throws Throwable
    {
+           
       if (closed)
       {
-         throw new IllegalStateException("Session is already closed");
+         //don't throw the exception as it maybe called twice
+         if (trace) { log.trace("Session is already closed. "); }
+         return;
+         //throw new IllegalStateException("Session is already closed");
       }
 
       if (trace) log.trace(this + " close()");
@@ -1167,10 +1173,12 @@
       //Note we don't maintain order using a LinkedHashMap since then we lose
       //concurrency since we would have to lock it exclusively
 
-      List entries = new ArrayList(deliveries.entrySet());
+      synchronized (deliveries)
+      {
+         List entries = new ArrayList(deliveries.entrySet());
 
-      //Sort them in reverse delivery id order
-      Collections.sort(entries,
+         //Sort them in reverse delivery id order
+         Collections.sort(entries,
                        new Comparator()
                        {
                            public int compare(Object obj1, Object obj2)
@@ -1183,39 +1191,46 @@
                            }
                        });
 
-      Iterator iter = entries.iterator();
+         Iterator iter = entries.iterator();
 
-      Set channels = new HashSet();
+         Set channels = new HashSet();
 
-      if (trace) { log.trace(this + " cancelling " + entries.size() + " deliveries"); }
+         if (trace) { log.trace(this + " cancelling " + entries.size() + " deliveries"); }
 
-      while (iter.hasNext())
-      {
-         Map.Entry entry = (Map.Entry)iter.next();
+         while (iter.hasNext())
+         {
+            Map.Entry entry = (Map.Entry)iter.next();
 
-         if (trace) { log.trace(this + " cancelling delivery with delivery id: " + entry.getKey()); }
+            if (trace) { log.trace(this + " cancelling delivery with delivery id: " + entry.getKey()); }
 
-         DeliveryRecord rec = (DeliveryRecord)entry.getValue();
+            DeliveryRecord rec = (DeliveryRecord)entry.getValue();
 
-         rec.del.cancel();
+            /*
+             * https://jira.jboss.org/jira/browse/JBMESSAGING-1440
+             */
+            if (!rec.del.isXAPrepared())
+            {
+               rec.del.cancel();
+            }
 
-         channels.add(rec.del.getObserver());
-      }
+            channels.add(rec.del.getObserver());
+         }
 
-      promptDelivery(channels);
+         promptDelivery(channels);
 
-      //Close down the executor
+         //Close down the executor
 
-      //Note we need to wait for ALL tasks to complete NOT just one otherwise we can end up with the following situation
-      //prompter is queued and starts to execute
-      //prompter almost finishes executing then a message is cancelled due to this session closing
-      //this causes another prompter to be queued
-      //shutdownAfterProcessingCurrentTask is then called
-      //this means the second prompter never runs and the cancelled message doesn't get redelivered
-      executor.shutdownAfterProcessingCurrentlyQueuedTasks();
+         //Note we need to wait for ALL tasks to complete NOT just one otherwise we can end up with the following situation
+         //prompter is queued and starts to execute
+         //prompter almost finishes executing then a message is cancelled due to this session closing
+         //this causes another prompter to be queued
+         //shutdownAfterProcessingCurrentTask is then called
+         //this means the second prompter never runs and the cancelled message doesn't get redelivered
+         executor.shutdownAfterProcessingCurrentlyQueuedTasks();
 
-      deliveries.clear();
-
+         deliveries.clear();
+      }
+      
       sp.removeSession(id);
 
       Dispatcher.instance.unregisterTarget(id, this);
@@ -1225,7 +1240,11 @@
 
    void cancelDelivery(long deliveryId) throws Throwable
    {
-      DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(deliveryId));
+      DeliveryRecord rec = null;
+      synchronized(deliveries)
+      {
+         rec = (DeliveryRecord)deliveries.remove(new Long(deliveryId));
+      }
 
       if (rec == null)
       {
@@ -1436,7 +1455,7 @@
       {
          // one way invocation, no acknowledgment sent back by the client
          if (trace) { log.trace(this + " submitting message " + ref.getMessage() + " to the remoting layer to be sent asynchronously"); }
-
+         
          callbackHandler.handleCallbackOneway(callback);
 
          //We store the delivery id so we know to wait for any deliveries in transit on close
@@ -1569,7 +1588,11 @@
 
    private Delivery cancelDeliveryInternal(Cancel cancel) throws Throwable
    {
-      DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+      DeliveryRecord rec = null;
+      synchronized (deliveries)
+      {
+         rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+      }
 
       if (rec == null)
       {
@@ -1717,7 +1740,19 @@
    {
       if (trace) { log.trace(this + " acknowledging delivery " + ack); }
 
-      DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+      DeliveryRecord rec = null;
+      
+      //I put synchronized here to prevent the following from happening:
+      //a clustered server node detects connection failure and cancel deliveries.
+      //but the consumer on it get through to here
+      //if not synchronized, the remove may get the record before the above cancel action clear up the deliveries map.
+      //so the cancel action makes the message back to queue and this method cause the delivery count to decrement.
+      //as the cancel will decrease the delivery count once, so this will result the delivery count being decremented twice
+      //for one same message.
+      synchronized (deliveries)
+      {
+         rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+      }
 
       if (rec == null)
       {

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java	2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java	2009-05-26 15:36:56 UTC (rev 7029)
@@ -32,12 +32,12 @@
  * A NamedThreadQueuedExecutor
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * @deprecated
  *
  */
 public class NamedThreadQueuedExecutor extends QueuedExecutor
-{
-	private static final Logger log = Logger.getLogger(NamedThreadQueuedExecutor.class);
-	  	  
+{	  	  
 	private final String name;
 	
 	private static final ThreadGroup jbmGroup = new ThreadGroup("JBM-threads");
@@ -49,10 +49,6 @@
 		this.name = name;
 		
 		setThreadFactory(new Factory());
-		
-		clearThread();
-		
-		restart();
 	}
 	
 	private class Factory implements ThreadFactory

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/build.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/build.xml	2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/build.xml	2009-05-26 15:36:56 UTC (rev 7029)
@@ -346,28 +346,27 @@
          <!--
          <jvmarg line="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=y,suspend=y,address=antjunit"/>
          -->
-         <classpath refid="test.execution.classpath"/>
-         <formatter type="xml" usefile="${junit.formatter.usefile}"/>
-         <batchtest todir="${junit.batchtest.todir}"
-                    haltonfailure="${junit.batchtest.haltonfailure}"
-                    haltonerror="${junit.batchtest.haltonerror}">
-            <formatter type="plain" usefile="${junit.formatter.usefile}"/>
-            <fileset dir="${build.tests.classes}">
-	       <include name="**/messaging/core/**/${test-mask}.class"/>
-	       <include name="**/jms/**/${test-mask}.class"/>
-               <include name="**/messaging/util/**/${test-mask}.class"/>
-               <exclude name="**/jms/MemLeakTest.class"/>
-               <exclude name="**/jms/RemotingConnectionConfigurationTest.class"/>
-               <exclude name="**/jms/XAResourceRecoveryTest.class"/>
-               <exclude name="**/jms/stress/**"/>
-               <exclude name="**/jms/crash/**"/>
-               <exclude name="**/jms/bridge/**"/>
-               <exclude name="**/jms/manual/**"/>
-               <exclude name="**/jms/clustering/**"/>
-            </fileset>
-         </batchtest>
-      </junit>
-   </target>
+			<classpath refid="test.execution.classpath" />
+			<formatter type="xml" usefile="${junit.formatter.usefile}" />
+			<batchtest todir="${junit.batchtest.todir}" haltonfailure="${junit.batchtest.haltonfailure}" haltonerror="${junit.batchtest.haltonerror}">
+				<formatter type="plain" usefile="${junit.formatter.usefile}" />
+				<fileset dir="${build.tests.classes}">
+					<include name="**/messaging/core/**/${test-mask}.class" />
+					<include name="**/jms/**/${test-mask}.class" />
+					<include name="**/messaging/util/**/${test-mask}.class" />
+                                        <exclude name="**/jms/DeliveryOnConnectionFailureTest.class" />
+					<exclude name="**/jms/MemLeakTest.class" />
+					<exclude name="**/jms/RemotingConnectionConfigurationTest.class" />
+					<exclude name="**/jms/XAResourceRecoveryTest.class" />
+					<exclude name="**/jms/stress/**" />
+					<exclude name="**/jms/crash/**" />
+					<exclude name="**/jms/bridge/**" />
+					<exclude name="**/jms/manual/**" />
+					<exclude name="**/jms/clustering/**" />
+				</fileset>
+			</batchtest>
+		</junit>
+	</target>
 
    <target name="invm-thirdparty-tests" depends="tests-jar, prepare-testdirs, clear-test-logs"
            description="Runs all available thirdparty tests an in-VM configuration">

Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java (from rev 6192, branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java	                        (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java	2009-05-26 15:36:56 UTC (rev 7029)
@@ -0,0 +1,291 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.test.messaging.jms;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.server.ServerPeer;
+import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
+import org.jboss.remoting.Client;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.Command;
+import org.jboss.test.messaging.tools.container.Server;
+
+/**
+ * A DeliveryOnConnectionFailureTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ * 
+ * Created Mar 26, 2009 3:14:28 PM
+ *
+ */
+public class DeliveryOnConnectionFailureTest extends JMSTestCase
+{
+
+   public DeliveryOnConnectionFailureTest(String name)
+   {
+      super(name);
+   }
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   
+   //https://jira.jboss.org/jira/browse/JBMESSAGING-1456
+   //Message Stuck means messages are kept in delivering state and never be delivered again
+   //unless the server is restarted (for persistent messages).
+   //this can happen in the following conditions:
+   //1. The client ping timeout and JBM tries to disconnect from the server (this happens in cluster).
+   //2. Due to the network/remoting issue, the server will receive a 'normal' disconnection event
+   //3. The server assumes the client has already closed it's connection and therefore doesn't clean up
+   //4. So the connection at the server is left open, including the sessions created on that connection.
+   //5. If the sessions contains messages in delivering, those messages will appear stuck.
+   //To fix this, either the server side always tries to clean up the connection whenever a disconnection happens
+   //or the remoting layer handle this correctly.
+   //
+   //Here we simplify the situation. First start the server and get a connection to it. Then
+   //we send a message to the server with client ack. We receive it without ack, 
+   //next we directly call the client.disconnect() from client without closing the connection
+   //the server should cancel the message. Then we receive the message and ack it.
+   public void testMessageStuckOnConnectionFailure() throws Exception
+   {
+      ConnectionFactory cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
+      
+      JBossConnection conn1 = null;
+      JBossConnection conn2 = null;
+
+      try
+      {
+         //create a connection
+         conn1 = (JBossConnection)cf.createConnection();
+         Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageProducer prod1 = sess1.createProducer(queue1);
+         TextMessage msg = sess1.createTextMessage("dont-stuck-me!");
+         conn1.start();
+         
+         //send a message
+         prod1.send(msg);
+         
+         //receive the message but not ack
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         TextMessage rm = (TextMessage)cons1.receive(2000);
+         
+         assertNotNull(rm);
+         assertEquals("dont-stuck-me!", rm.getText());
+         
+         //break connection.
+         JMSRemotingConnection jmsConn = ((ClientConnectionDelegate)conn1.getDelegate()).getRemotingConnection();
+         Client rmClient = jmsConn.getRemotingClient();
+         rmClient.disconnect();
+         
+         //wait for server side cleanup
+         try
+         {
+            Thread.sleep(5000);
+         }
+         catch (InterruptedException e)
+         {
+            //ignore.
+         }
+         
+         //now receive the message 
+         conn2 = (JBossConnection)cf.createConnection();
+         conn2.start();
+         Session sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageConsumer cons2 = sess2.createConsumer(queue1);
+         TextMessage rm2 = (TextMessage)cons2.receive(2000);
+
+         assertNotNull(rm2);
+         assertEquals("dont-stuck-me!", rm2.getText());
+         rm2.acknowledge();
+         
+         //Message count should be zero.
+         //this is checked in tearDown().
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+
+   }
+   
+   //https://jira.jboss.org/jira/browse/JBMESSAGING-1456
+   //another issue with jira 1456 is negative message count.
+   //This test guarantees the message count won't go negative
+   //Error Scenario:
+   // 1. Server side detects the connection failure (lease timeout) and close the connection/session
+   // 2. The session endpoint will cancel the messages being delivered to the queue.
+   // 3. At the same time the client side received some of the messages and acknowledge them
+   // 4. The acknowledge action will decrease the delivering count of the queue, and the session endpoint
+   //    cancel also decrease the delivering count.
+   // 5. If not synchronized, one message may be canceled and acked at the same time, so the delivering count
+   //    will be decreased twice for each message, resulting in negative message count.
+   //
+   //The test first creates a connection and send messages, then it receives the messages, then ack the last
+   //msg (client-ack), at the same time, simulate the server side connection failure to trigger server side
+   //clean up. This will create a possibility that when not properly synchronized, the above 
+   //described issue may happen. At the end check the message count, it should always be zero.
+   public void testMessageCountOnConnectionFailure() throws Exception
+   {
+      ConnectionFactory cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
+      
+      JBossConnection conn1 = null;
+      JBossConnection conn2 = null;
+      
+      try
+      {
+         conn1 = (JBossConnection)cf.createConnection();
+         conn1.start();
+         Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         
+         //now send messages
+         MessageProducer prod1 = sess1.createProducer(queue1);
+         
+         final int NUM_MSG = 2000;
+         for (int i = 0; i < NUM_MSG; ++i)
+         {
+            TextMessage tm = sess1.createTextMessage("-m"+i);
+            prod1.send(tm);
+         }
+         
+         //receive the messages
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         for (int j = 0; j < NUM_MSG-1; ++j)
+         {
+            TextMessage rm = (TextMessage)cons1.receive(2000);
+            assertNotNull(rm);
+            assertEquals("-m"+j, rm.getText());
+         }
+         
+         //last message
+         TextMessage lastRm = (TextMessage)cons1.receive(2000);
+         assertNotNull(lastRm);
+         assertEquals("-m"+(NUM_MSG-1), lastRm.getText());
+         
+         final ServerClientFailureCommand cmd = new ServerClientFailureCommand();
+         
+         Thread exeThr = new Thread()
+         {
+            public void run()
+            {
+               try
+               {
+                  ServerManagement.getServer().executeCommand(cmd);
+               }
+               catch (Exception e)
+               {
+                  log.error("failed to invoke command", e);
+                  fail("failure in executing command.");
+               }               
+            }
+         };
+         
+         exeThr.start();
+
+         //ack last message, making server side ack happening.
+         lastRm.acknowledge();
+
+         //receive possible canceled messages
+         TextMessage prm = null;
+         conn2 = (JBossConnection)cf.createConnection();
+         conn2.start();
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons2 = sess2.createConsumer(queue1);
+         prm = (TextMessage)cons2.receive(2000);
+         while (prm != null)
+         {
+            prm = (TextMessage)cons2.receive(2000);
+         }
+         
+         //check message count
+         //tearDown will do the check.
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }      
+   }
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+   public static class ServerClientFailureCommand implements Command
+   {
+
+      private static final long serialVersionUID = 2603154447586447658L;
+
+      public Object execute(Server server) throws Exception
+      {
+         ServerPeer peer = server.getServerPeer();
+
+         SimpleConnectionManager cm = (SimpleConnectionManager)peer.getConnectionManager();
+
+         Map jmsClients = cm.getClients();
+         assertEquals(1, jmsClients.size());
+         Map endpoints = (Map)jmsClients.values().iterator().next();
+         assertEquals(1, endpoints.size());
+         Map.Entry entry = (Map.Entry)endpoints.entrySet().iterator().next();
+         String sessId = (String)entry.getKey();
+
+         // triggering server side clean up
+         cm.handleClientFailure(sessId);
+         return null;
+      }
+      
+   }
+}




More information about the jboss-cvs-commits mailing list