[jboss-cvs] JBoss Messaging SVN: r7026 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss: jms/client/delegate and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue May 26 10:58:09 EDT 2009


Author: jbertram at redhat.com
Date: 2009-05-26 10:58:09 -0400 (Tue, 26 May 2009)
New Revision: 7026

Modified:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
Log:
[JBPAPP-2030]

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2009-05-26 14:58:09 UTC (rev 7026)
@@ -31,7 +31,6 @@
 import org.jboss.jms.client.delegate.ClientProducerDelegate;
 import org.jboss.jms.client.delegate.ClientSessionDelegate;
 import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.client.state.BrowserState;
 import org.jboss.jms.client.state.ConnectionState;
@@ -92,14 +91,6 @@
          Version versionToUse = connectionDelegate.getVersionToUse();
          JMSRemotingConnection remotingConnection = connectionDelegate.getRemotingConnection();
 
-         // install the consolidated remoting connection listener; it will be de-installed on
-         // connection closing by ConnectionAspect
-
-         ConsolidatedRemotingConnectionListener listener =
-            new ConsolidatedRemotingConnectionListener();
-
-         remotingConnection.addConnectionListener(listener);
-
          if (versionToUse == null)
          {
             throw new IllegalStateException("Connection version is null");
@@ -109,7 +100,8 @@
             new ConnectionState(serverID, connectionDelegate,
                                 remotingConnection, versionToUse);
 
-         listener.setConnectionState(connectionState);
+         remotingConnection.getConnectionListener().setConnectionState(connectionState);
+         remotingConnection.getConnectionListener().start();
           
          connectionDelegate.setState(connectionState);
       }

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2009-05-26 14:58:09 UTC (rev 7026)
@@ -30,6 +30,7 @@
 import javax.jms.JMSException;
 
 import org.jboss.jms.client.container.JMSClientVMIdentifier;
+import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.delegate.ConnectionFactoryDelegate;
 import org.jboss.jms.delegate.CreateConnectionResult;
@@ -149,7 +150,7 @@
       
       try
       {         
-         remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing, strictTck);
+         remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing, strictTck, new ConsolidatedRemotingConnectionListener(), sendAcksAsync);
          
          remotingConnection.start();
    

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java	2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java	2009-05-26 14:58:09 UTC (rev 7026)
@@ -42,6 +42,8 @@
    private ExceptionListener jmsExceptionListener;
 
    private ConnectionFailureListener remotingListener;
+      
+   private boolean started;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -53,6 +55,11 @@
 
    public void handleConnectionException(Throwable throwable, Client client)
    {
+      if (!started)
+      {
+         return;
+      }
+            
       // forward the exception to delegate listener and JMS ExceptionListeners; synchronize
       // to avoid race conditions
 
@@ -162,6 +169,11 @@
       }
       return state + ".ConsolidatedListener";
    }
+      
+   public void start()
+   {
+      started = true;
+   }
 
    // Package protected ----------------------------------------------------------------------------
 

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2009-05-26 14:58:09 UTC (rev 7026)
@@ -256,10 +256,17 @@
 
    public JMSRemotingConnection(String serverLocatorURI, boolean clientPing, boolean strictTck) throws Exception
    {
+      this(serverLocatorURI, clientPing, strictTck, null, sendAcksAsync);
+   }
+      
+   public JMSRemotingConnection(String serverLocatorURI, boolean clientPing, boolean strictTck, ConsolidatedRemotingConnectionListener listener, boolean sendAcksAsync) throws Exception
+   {
       serverLocator = new InvokerLocator(serverLocatorURI);
       this.clientPing = clientPing;
       this.strictTck = strictTck;
-
+      this.sendAcksAsync = sendAcksAsync;
+      this.remotingConnectionListener = listener;
+      
       log.trace(this + " created");
    }
 
@@ -316,7 +323,14 @@
       {
          public Object run() throws Exception
          {
-            client.connect();
+            if (remotingConnectionListener != null)
+            {
+               client.connect(remotingConnectionListener, serverLocator.getParameters());
+            }
+            else
+            {
+               client.connect();
+            }
             onewayClient.connect();
             return null;
          }
@@ -345,7 +359,7 @@
    public void stop()
    {
       log.trace(this + " stop");
-
+      
       // explicitly remove the callback listener, to avoid race conditions on server
       // (http://jira.jboss.org/jira/browse/JBMESSAGING-535)
 
@@ -421,6 +435,11 @@
    public synchronized void setFailed()
    {
       failed = true;
+      
+      if (client == null) 
+      {
+         return;
+      }
 
       // Remoting has the bad habit of letting the job of cleaning after a failed connection up to
       // the application. Here, we take care of that, by disconnecting the remoting client, and
@@ -429,7 +448,7 @@
 
       try
       {
-      	client.setDisconnectTimeout(0);
+         client.setDisconnectTimeout(0);
       }
       catch (Throwable ignore)
       {      	
@@ -465,7 +484,7 @@
       return true;
    }
 
-   public synchronized void addPlainConnectionListener(ConnectionListener listener)
+   public synchronized void addPlainConnectionListener(final ConnectionListener listener)
    {
       client.addConnectionListener(listener);
    }

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java	2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java	2009-05-26 14:58:09 UTC (rev 7026)
@@ -1178,7 +1178,10 @@
    {
       if (sessions.remove(id) == null)
       {
-         throw new IllegalStateException("Cannot find session with id " + id + " to remove");
+         //here we don't throw exception as the session may have been removed already due to server side
+         //failure handler (SimpleConnectionManager), which deemed to be normal behavior in application environment.
+         if (log.isTraceEnabled()) { log.trace("Cannot find session with id " + id + " to remove"); }
+         // throw new IllegalStateException("Cannot find session with id " + id + " to remove");
       }
    }
 

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2009-05-26 14:58:09 UTC (rev 7026)
@@ -29,8 +29,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import javax.jms.JMSException;
-
 import org.jboss.jms.delegate.ConnectionEndpoint;
 import org.jboss.jms.server.ConnectionManager;
 import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
@@ -167,6 +165,7 @@
          
          return e;
       }
+      
       return null;
    }
    
@@ -219,8 +218,22 @@
     */
    public void handleConnectionException(Throwable t, Client client)
    {  
+      if (t instanceof ClientDisconnectedException)
+      {
+         if (log.isTraceEnabled())
+         {
+            log.trace("Connection is closed normally: " + client);
+         }
+      }
+      else
+      {
+         if (log.isTraceEnabled())
+         {
+            log.trace("Connection is closed on failure event: " + client);
+         }
+      }
       String remotingSessionID = client.getSessionId();
-      
+
       if (remotingSessionID != null)
       {
          handleClientFailure(remotingSessionID);
@@ -390,7 +403,7 @@
 
             try
             {
-               ((ServerInvokerCallbackHandler)entry.getValue()).destroy();
+               ((ServerInvokerCallbackHandler)entry.getValue()).shutdown();
             }
             catch (Throwable ignore)
             {
@@ -410,10 +423,10 @@
    {      
    	String jmsClientID = remotingSessions.get(jmsSessionID);
    	
-      log.warn("A problem has been detected " +
-               "with the connection to remote client " +
-               jmsSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
-            "its connection(s) or the network has failed. All associated connection resources will be cleaned up.");
+      log.trace("A problem has been detected " +
+                    "with the connection to remote client " +
+                     jmsSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
+                  "its connection(s) or the network has failed. All associated connection resources will be cleaned up.");
    	
    	if (jmsClientID != null)
    	{      	
@@ -456,7 +469,6 @@
                catch (Throwable ignore)
                {            	
                }
-               
                return;
          	}
          }
@@ -482,7 +494,7 @@
    
                try
                {
-                  ((ServerInvokerCallbackHandler)entry.getValue()).destroy();
+                  ((ServerInvokerCallbackHandler)entry.getValue()).shutdown();
                }
                catch (Throwable ignore)
                {

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2009-05-26 14:58:09 UTC (rev 7026)
@@ -95,7 +95,7 @@
    // Attributes -----------------------------------------------------------------------------------
 
    private String id;
-
+   
    private volatile boolean closed;
    private volatile boolean started;
 
@@ -362,15 +362,17 @@
       }
    }
 
-   //reason for synchronization
-   //Sometimes the server side detects a connection failure but 
-   //client side is normal. So it's possible the client side is calling
-   //connection.close() while in the mean time the server side connection
-   //failure handler call it also.
-   public synchronized void close() throws JMSException
+   public void close() throws JMSException
    {
       try
       {
+         //reason for synchronization
+         //Sometimes the server side detects a connection failure but 
+         //client side is normal. So it's possible the client side is calling
+         //connection.close() while in the mean time the server side connection
+         //failure handler call it also.
+         synchronized (this)
+         {
          if (trace) { log.trace(this + " close()"); }
 
          if (closed)
@@ -442,11 +444,15 @@
             temporaryDestinations.clear();
          }
 
+         closed = true;
+      }
+      
+         //we put this outside the sync loop to avoid dead lock where
+         //SimpleConnectionManager.handleClientFailure() holds itself and then tries to call this close(), which requires lock on this
+         //meanwhile this close() (called from client) holds itself and call unregisterConnection(), which requires lock on SimpleConnectionManager.
          cm.unregisterConnection(jmsClientVMID, remotingClientSessionID);
 
          Dispatcher.instance.unregisterTarget(id, this);
-
-         closed = true;
       }
       catch (Throwable t)
       {
@@ -654,7 +660,10 @@
       {
          if (sessions.remove(sessionId) == null)
          {
-            throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
+            //Here not to throw exception, because it is possible that the session close can be 
+            //called from server side (SimpleConnectionManager) before client side.
+            if (trace) { log.trace("Cannot find session with id " + sessionId + " to remove"); }
+            //throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
          }
       }
    }
@@ -757,7 +766,7 @@
       else if (dest.isQueue())
       {
          if (trace) { log.trace(this + " routing " + msg + " to queue"); }
-        if (!postOffice.route(ref, new JMSCondition(true, dest.getName()), tx))
+         if (!postOffice.route(ref, new JMSCondition(true, dest.getName()), tx))
          {
             throw new JMSException("Failed to route " + ref + " to " + dest.getName());
          }

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2009-05-26 14:58:09 UTC (rev 7026)
@@ -1119,16 +1119,22 @@
       {
          if (consumers.remove(consumerId) == null)
          {
-            throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
+            if (trace) { log.trace("Cannot find consumer with id " + consumerId + " to remove"); }
+            //don't throw, as it maybe called twice from client and server's connection failure handler.
+            //throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
          }
       }
    }
 
    void localClose() throws Throwable
    {
+           
       if (closed)
       {
-         throw new IllegalStateException("Session is already closed");
+         //don't throw the exception as it maybe called twice
+         if (trace) { log.trace("Session is already closed. "); }
+         return;
+         //throw new IllegalStateException("Session is already closed");
       }
 
       if (trace) log.trace(this + " close()");
@@ -1169,10 +1175,10 @@
 
       synchronized (deliveries)
       {
-      List entries = new ArrayList(deliveries.entrySet());
+         List entries = new ArrayList(deliveries.entrySet());
 
-      //Sort them in reverse delivery id order
-      Collections.sort(entries,
+         //Sort them in reverse delivery id order
+         Collections.sort(entries,
                        new Comparator()
                        {
                            public int compare(Object obj1, Object obj2)
@@ -1185,38 +1191,44 @@
                            }
                        });
 
-      Iterator iter = entries.iterator();
+         Iterator iter = entries.iterator();
 
-      Set channels = new HashSet();
+         Set channels = new HashSet();
 
-      if (trace) { log.trace(this + " cancelling " + entries.size() + " deliveries"); }
+         if (trace) { log.trace(this + " cancelling " + entries.size() + " deliveries"); }
 
-      while (iter.hasNext())
-      {
-         Map.Entry entry = (Map.Entry)iter.next();
+         while (iter.hasNext())
+         {
+            Map.Entry entry = (Map.Entry)iter.next();
 
-         if (trace) { log.trace(this + " cancelling delivery with delivery id: " + entry.getKey()); }
+            if (trace) { log.trace(this + " cancelling delivery with delivery id: " + entry.getKey()); }
 
-         DeliveryRecord rec = (DeliveryRecord)entry.getValue();
+            DeliveryRecord rec = (DeliveryRecord)entry.getValue();
 
-         rec.del.cancel();
+            /*
+             * https://jira.jboss.org/jira/browse/JBMESSAGING-1440
+             */
+            if (!rec.del.isXAPrepared())
+            {
+               rec.del.cancel();
+            }
 
-         channels.add(rec.del.getObserver());
-      }
+            channels.add(rec.del.getObserver());
+         }
 
-      promptDelivery(channels);
+         promptDelivery(channels);
 
-      //Close down the executor
+         //Close down the executor
 
-      //Note we need to wait for ALL tasks to complete NOT just one otherwise we can end up with the following situation
-      //prompter is queued and starts to execute
-      //prompter almost finishes executing then a message is cancelled due to this session closing
-      //this causes another prompter to be queued
-      //shutdownAfterProcessingCurrentTask is then called
-      //this means the second prompter never runs and the cancelled message doesn't get redelivered
-      executor.shutdownAfterProcessingCurrentlyQueuedTasks();
+         //Note we need to wait for ALL tasks to complete NOT just one otherwise we can end up with the following situation
+         //prompter is queued and starts to execute
+         //prompter almost finishes executing then a message is cancelled due to this session closing
+         //this causes another prompter to be queued
+         //shutdownAfterProcessingCurrentTask is then called
+         //this means the second prompter never runs and the cancelled message doesn't get redelivered
+         executor.shutdownAfterProcessingCurrentlyQueuedTasks();
 
-      deliveries.clear();
+         deliveries.clear();
       }
       
       sp.removeSession(id);
@@ -1228,7 +1240,11 @@
 
    void cancelDelivery(long deliveryId) throws Throwable
    {
-      DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(deliveryId));
+      DeliveryRecord rec = null;
+      synchronized(deliveries)
+      {
+         rec = (DeliveryRecord)deliveries.remove(new Long(deliveryId));
+      }
 
       if (rec == null)
       {
@@ -1439,7 +1455,7 @@
       {
          // one way invocation, no acknowledgment sent back by the client
          if (trace) { log.trace(this + " submitting message " + ref.getMessage() + " to the remoting layer to be sent asynchronously"); }
-
+         
          callbackHandler.handleCallbackOneway(callback);
 
          //We store the delivery id so we know to wait for any deliveries in transit on close
@@ -1572,7 +1588,11 @@
 
    private Delivery cancelDeliveryInternal(Cancel cancel) throws Throwable
    {
-      DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+      DeliveryRecord rec = null;
+      synchronized (deliveries)
+      {
+         rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+      }
 
       if (rec == null)
       {
@@ -1722,7 +1742,7 @@
 
       DeliveryRecord rec = null;
       
-      //I put synchronized here to prevent the following:
+      //I put synchronized here to prevent the following from happening:
       //a clustered server node detects connection failure and cancel deliveries.
       //but the consumer on it get through to here
       //if not synchronized, the remove may get the record before the above cancel action clear up the deliveries map.

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java	2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java	2009-05-26 14:58:09 UTC (rev 7026)
@@ -32,12 +32,12 @@
  * A NamedThreadQueuedExecutor
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * @deprecated
  *
  */
 public class NamedThreadQueuedExecutor extends QueuedExecutor
-{
-	private static final Logger log = Logger.getLogger(NamedThreadQueuedExecutor.class);
-	  	  
+{	  	  
 	private final String name;
 	
 	private static final ThreadGroup jbmGroup = new ThreadGroup("JBM-threads");
@@ -49,10 +49,6 @@
 		this.name = name;
 		
 		setThreadFactory(new Factory());
-		
-		clearThread();
-		
-		restart();
 	}
 	
 	private class Factory implements ThreadFactory




More information about the jboss-cvs-commits mailing list