[jboss-cvs] JBoss Messaging SVN: r7673 - in trunk: src/main/org/jboss/messaging/core/client/impl and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Aug 6 13:09:25 EDT 2009


Author: timfox
Date: 2009-08-06 13:09:24 -0400 (Thu, 06 Aug 2009)
New Revision: 7673

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/client/FailureDeadlockTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/JBossConnectionFactoryTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1702 and https://jira.jboss.org/jira/browse/JBMESSAGING-1702

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java	2009-08-06 09:27:50 UTC (rev 7672)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java	2009-08-06 17:09:24 UTC (rev 7673)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.core.client;
 
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
@@ -172,4 +173,6 @@
    void setDiscoveryRefreshTimeout(long discoveryRefreshTimeout);
 
    void close();
+   
+   Executor getThreadPool();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-08-06 09:27:50 UTC (rev 7672)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-08-06 17:09:24 UTC (rev 7673)
@@ -20,6 +20,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -847,6 +848,11 @@
 
       closed = true;
    }
+   
+   public Executor getThreadPool()
+   {
+      return threadPool;
+   }
 
    // DiscoveryListener implementation --------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-08-06 09:27:50 UTC (rev 7672)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-08-06 17:09:24 UTC (rev 7673)
@@ -515,7 +515,9 @@
       {
          return false;
       }
-
+      
+      boolean done = false;
+      
       synchronized (failoverLock)
       {
          if (connectionID != null && !connections.containsKey(connectionID))
@@ -554,9 +556,7 @@
          // until failover is complete
 
          boolean attemptFailover = (backupConnectorFactory) != null && (failoverOnServerShutdown || me.getCode() != MessagingException.DISCONNECTED);
-
-         boolean done = false;
-
+         
          if (attemptFailover || reconnectAttempts != 0)
          {
             lockAllChannel1s();
@@ -607,7 +607,7 @@
             }
 
             closePingers();
-
+            
             connections.clear();
 
             refCount = 0;
@@ -666,16 +666,16 @@
          else
          {
             // Just fail the connections
-
-            closePingers();
-
-            failConnection(me);
+            
+            failConnections(me);
          }
 
          inFailoverOrReconnect = false;
 
-         return done;
       }
+
+      return done;
+      
    }
 
    private void closePingers()
@@ -829,7 +829,7 @@
    }
 
    private void checkCloseConnections()
-   {
+   {      
       if (refCount == 0)
       {
          // Close connections
@@ -863,6 +863,7 @@
 
          connector = null;
       }
+      
    }
 
    public RemotingConnection getConnection(final int initialRefCount)
@@ -1020,23 +1021,25 @@
       {
          // Can be legitimately null if session was closed before then went to remove session from csf
          // and locked since failover had started then after failover removes it but it's already been failed
-      }
+      } 
    }
 
-   private void failConnection(final MessagingException me)
+   private void failConnections(final MessagingException me)
    {
       synchronized (failConnectionLock)
       {
          // When a single connection fails, we fail *all* the connections
-
+         
          Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
 
          for (ConnectionEntry entry : copy)
          {
             entry.connection.fail(me);
          }
-
+         
          refCount = 0;
+         
+         checkCloseConnections();
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java	2009-08-06 09:27:50 UTC (rev 7672)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java	2009-08-06 17:09:24 UTC (rev 7673)
@@ -126,7 +126,7 @@
    public void close()
    {
       if (future != null)
-      {             
+      {                      
          future.cancel(false);
       }
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-08-06 09:27:50 UTC (rev 7672)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-08-06 17:09:24 UTC (rev 7673)
@@ -249,15 +249,6 @@
 
       for (Channel channel : channels.values())
       {
-         // channel.lock.lock();
-         // try
-         // {
-         // channel.sendCondition.signalAll();
-         // }
-         // finally
-         // {
-         // channel.lock.unlock();
-         // }
          channel.returnBlocking();
       }
    }
@@ -447,14 +438,4 @@
       }
    }
    
-   private static class DelegatingBufferHandler extends AbstractBufferHandler
-   {
-      RemotingConnection conn;
-
-      public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
-      {
-         conn.bufferReceived(connectionID, buffer);
-      }
-   }
-
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-08-06 09:27:50 UTC (rev 7672)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-08-06 17:09:24 UTC (rev 7673)
@@ -297,7 +297,7 @@
       // arrive the connection will get closed
 
       scheduledThreadPool.schedule(runnable, INITIAL_PING_TIMEOUT, TimeUnit.MILLISECONDS);
-
+      
       if (config.isBackup())
       {
          serverSideReplicatingConnection = rc;
@@ -385,7 +385,7 @@
 
       pingRunnable.setFuture(pingFuture);
 
-      pingers.put(conn.getID(), pingRunnable);
+      pingers.put(conn.getID(), pingRunnable);           
    }
 
    private RemotingConnection closeConnection(final Object connectionID)
@@ -393,12 +393,12 @@
       RemotingConnection connection = connections.remove(connectionID);
       
       Pinger pinger = pingers.remove(connectionID);
-
+      
       if (pinger != null)
       {
          pinger.close();
       }
-
+      
       return connection;
    }
 
@@ -441,7 +441,7 @@
       public synchronized void run()
       {
          if (!gotInitialPing)
-         {
+         {            
             // Never received initial ping
             log.warn("Did not receive initial ping from " + conn.getRemoteAddress() + ", connection will be closed");
 

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java	2009-08-06 09:27:50 UTC (rev 7672)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java	2009-08-06 17:09:24 UTC (rev 7673)
@@ -24,6 +24,7 @@
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionConsumer;
@@ -53,6 +54,7 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.FailureListener;
 import org.jboss.messaging.core.version.Version;
+import org.jboss.messaging.utils.OrderedExecutorFactory;
 import org.jboss.messaging.utils.SimpleString;
 import org.jboss.messaging.utils.UUIDGenerator;
 import org.jboss.messaging.utils.VersionLoader;
@@ -121,6 +123,8 @@
    private final int transactionBatchSize;
 
    private ClientSession initialSession;
+   
+   private final Executor executor;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -141,6 +145,8 @@
       this.clientID = clientID;
 
       this.sessionFactory = sessionFactory;
+      
+      this.executor = new OrderedExecutorFactory(sessionFactory.getThreadPool()).getExecutor();
 
       uid = UUIDGenerator.getInstance().generateSimpleStringUUID();
 
@@ -543,11 +549,20 @@
 
          if (exceptionListener != null)
          {
-            JMSException je = new JMSException(me.toString());
+            final JMSException je = new JMSException(me.toString());
 
             je.initCause(me);
-
-            exceptionListener.onException(je);
+            
+            executor.execute(new Runnable()
+            {
+               public void run()
+               {
+                  synchronized (exceptionListener)
+                  {
+                     exceptionListener.onException(je);
+                  }
+               }
+            });           
          }
          
          failed = true;

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/FailureDeadlockTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/FailureDeadlockTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/FailureDeadlockTest.java	2009-08-06 17:09:24 UTC (rev 7673)
@@ -0,0 +1,191 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.jms.client.JBossConnectionFactory;
+import org.jboss.messaging.jms.client.JBossSession;
+import org.jboss.messaging.jms.server.impl.JMSServerManagerImpl;
+import org.jboss.messaging.tests.integration.jms.server.management.NullInitialContext;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ * 
+ * A FailureDeadlockTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class FailureDeadlockTest extends UnitTestCase
+{
+   private static final Logger log = Logger.getLogger(FailureDeadlockTest.class);
+
+   private MessagingServer server;
+
+   private JMSServerManagerImpl jmsServer;
+
+   private JBossConnectionFactory cf1;
+
+   private JBossConnectionFactory cf2;
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      Configuration conf = new ConfigurationImpl();
+      conf.setSecurityEnabled(false);
+      conf.getAcceptorConfigurations()
+          .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+      server = Messaging.newMessagingServer(conf, false);
+      jmsServer = new JMSServerManagerImpl(server);
+      jmsServer.setContext(new NullInitialContext());
+      jmsServer.start();
+      cf1 = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      cf2 = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      if (server != null && server.isStarted())
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+         server = null;
+
+      }
+
+      super.tearDown();
+   }
+
+   // https://jira.jboss.org/jira/browse/JBMESSAGING-1702
+   //Test that two failures concurrently executing and calling the same exception listener
+   //don't deadlock
+   public void testDeadlock() throws Exception
+   {
+      for (int i = 0; i < 100; i++)
+      {
+         final Connection conn1 = cf1.createConnection();
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         RemotingConnection rc1 = ((ClientSessionImpl)((JBossSession)sess1).getCoreSession()).getConnection();
+
+         final Connection conn2 = cf2.createConnection();
+
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         RemotingConnection rc2 = ((ClientSessionImpl)((JBossSession)sess2).getCoreSession()).getConnection();
+
+         ExceptionListener listener1 = new ExceptionListener()
+         {
+            public void onException(JMSException exception)
+            {
+               try
+               {
+                  conn2.close();
+               }
+               catch (Exception e)
+               {
+                  log.error("Failed to close connection2", e);
+               }
+            }
+         };
+
+         conn1.setExceptionListener(listener1);
+
+         conn2.setExceptionListener(listener1);
+
+         Failer f1 = new Failer(rc1);
+
+         Failer f2 = new Failer(rc2);
+
+         f1.start();
+
+         f2.start();
+
+         f1.join();
+
+         f2.join();  
+         
+         conn1.close();
+         
+         conn2.close();
+      }      
+   }
+   
+   private class Failer extends Thread
+   {
+      RemotingConnection conn;
+
+      Failer(RemotingConnection conn)
+      {
+         this.conn = conn;
+      }
+
+      public void run()
+      {
+         conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+      }
+   }
+
+      
+   //https://jira.jboss.org/jira/browse/JBMESSAGING-1702
+   //Make sure that failing a connection removes it from the connection manager and can't be returned in a subsequent call
+   public void testUsingDeadConnection() throws Exception
+   {
+      for (int i = 0; i < 100; i++)
+      {
+         final Connection conn1 = cf1.createConnection();
+   
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         RemotingConnection rc1 = ((ClientSessionImpl)((JBossSession)sess1).getCoreSession()).getConnection();      
+   
+         rc1.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+   
+         Session sess2 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+   
+         conn1.close();
+      }
+   }
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/JBossConnectionFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/JBossConnectionFactoryTest.java	2009-08-06 09:27:50 UTC (rev 7672)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/JBossConnectionFactoryTest.java	2009-08-06 17:09:24 UTC (rev 7673)
@@ -58,15 +58,6 @@
 {
    private static final Logger log = Logger.getLogger(JBossConnectionFactoryTest.class);
 
-   // private MessagingServer server;
-   //
-   // private JMSServerManagerImpl jmsServer;
-   //
-   // private JBossConnectionFactory cf;
-   //
-   // private static final String Q_NAME = "ConnectionTestQueue";
-   //   
-
    private final String groupAddress = "230.1.2.3";
 
    private final int groupPort = 8765;




More information about the jboss-cvs-commits mailing list