[jboss-cvs] JBoss Messaging SVN: r6014 - in trunk: src/main/org/jboss/messaging/core/remoting/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Mar 5 17:39:24 EST 2009


Author: ataylor
Date: 2009-03-05 17:39:23 -0500 (Thu, 05 Mar 2009)
New Revision: 6014

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
Log:
thread locking fix and connection destroyed fix

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-03-05 17:13:07 UTC (rev 6013)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-03-05 22:39:23 UTC (rev 6014)
@@ -328,6 +328,11 @@
             }
             catch (Throwable t)
             {
+               if(lock != null)
+               {
+                  lock.unlock();
+                  lock = null;
+               }
                if (connection != null)
                {
                   returnConnection(connection.getID());

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-03-05 17:13:07 UTC (rev 6013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-03-05 22:39:23 UTC (rev 6014)
@@ -542,6 +542,18 @@
             log.error("Failed to execute failure listener", t);
          }
       }
+      for (ChannelImpl channel : channels.values())
+      {
+         channel.lock.lock();
+         try
+         {
+            channel.sendCondition.signalAll();
+         }
+         finally
+         {
+            channel.lock.unlock();
+         }
+      }
    }
 
    private void internalClose()
@@ -1076,7 +1088,10 @@
                   catch (InterruptedException e)
                   {
                   }
-
+                  if(closed)
+                  {
+                     break;
+                  }
                   final long now = System.currentTimeMillis();
 
                   toWait -= now - start;
@@ -1438,11 +1453,8 @@
             if (packet.isResponse())
             {
                response = packet;
-
                confirm(packet);
-
                lock.lock();
-
                try
                {
                   sendCondition.signal();

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java	2009-03-05 17:13:07 UTC (rev 6013)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java	2009-03-05 22:39:23 UTC (rev 6014)
@@ -65,99 +65,99 @@
       return false;
    }
    
-//   public void testStopAllStartAll() throws Exception
-//   {
-//      setupCluster();
-//
-//      startServers(0, 1, 2, 3, 4);
-//      
-//      setupSessionFactory(0, isNetty());
-//      setupSessionFactory(1, isNetty());
-//      setupSessionFactory(2, isNetty());
-//      setupSessionFactory(3, isNetty());
-//      setupSessionFactory(4, isNetty());
-//
-//      createQueue(0, "queues.testaddress", "queue0", null, false);
-//      createQueue(1, "queues.testaddress", "queue0", null, false);
-//      createQueue(2, "queues.testaddress", "queue0", null, false);
-//      createQueue(3, "queues.testaddress", "queue0", null, false);
-//      createQueue(4, "queues.testaddress", "queue0", null, false);
-//
-//      addConsumer(0, 0, "queue0", null);
-//      addConsumer(1, 1, "queue0", null);
-//      addConsumer(2, 2, "queue0", null);
-//      addConsumer(3, 3, "queue0", null);
-//      addConsumer(4, 4, "queue0", null);
-//
-//      waitForBindings(0, "queues.testaddress", 1, 1, true);
-//      waitForBindings(1, "queues.testaddress", 1, 1, true);
-//      waitForBindings(2, "queues.testaddress", 1, 1, true);
-//      waitForBindings(3, "queues.testaddress", 1, 1, true);
-//      waitForBindings(4, "queues.testaddress", 1, 1, true);
-//
-//      waitForBindings(0, "queues.testaddress", 4, 4, false);
-//      waitForBindings(1, "queues.testaddress", 4, 4, false);
-//      waitForBindings(2, "queues.testaddress", 4, 4, false);
-//      waitForBindings(3, "queues.testaddress", 4, 4, false);
-//      waitForBindings(4, "queues.testaddress", 4, 4, false);
-//
-//      send(0, "queues.testaddress", 10, false, null);
-//
-//      verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
-//
-//      this.verifyNotReceive(0, 1, 2, 3, 4);
-//      
-//      this.removeConsumer(0);
-//      this.removeConsumer(1);
-//      this.removeConsumer(2);
-//      this.removeConsumer(3);
-//      this.removeConsumer(4);
-//      
-//      this.closeAllSessionFactories();
-//      
-//      stopServers(0, 1, 2, 3, 4);
-//
-//      startServers(0, 1, 2, 3, 4);
-//      
-//      setupSessionFactory(0, isNetty());
-//      setupSessionFactory(1, isNetty());
-//      setupSessionFactory(2, isNetty());
-//      setupSessionFactory(3, isNetty());
-//      setupSessionFactory(4, isNetty());
-//
-//      createQueue(0, "queues.testaddress", "queue0", null, false);
-//      createQueue(1, "queues.testaddress", "queue0", null, false);
-//      createQueue(2, "queues.testaddress", "queue0", null, false);
-//      createQueue(3, "queues.testaddress", "queue0", null, false);
-//      createQueue(4, "queues.testaddress", "queue0", null, false);
-//
-//      addConsumer(0, 0, "queue0", null);
-//      addConsumer(1, 1, "queue0", null);
-//      addConsumer(2, 2, "queue0", null);
-//      addConsumer(3, 3, "queue0", null);
-//      addConsumer(4, 4, "queue0", null);
-//
-//      waitForBindings(0, "queues.testaddress", 1, 1, true);
-//      waitForBindings(1, "queues.testaddress", 1, 1, true);
-//      waitForBindings(2, "queues.testaddress", 1, 1, true);
-//      waitForBindings(3, "queues.testaddress", 1, 1, true);
-//      waitForBindings(4, "queues.testaddress", 1, 1, true);
-//
-//      waitForBindings(0, "queues.testaddress", 4, 4, false);
-//      waitForBindings(1, "queues.testaddress", 4, 4, false);
-//      waitForBindings(2, "queues.testaddress", 4, 4, false);
-//      waitForBindings(3, "queues.testaddress", 4, 4, false);
-//      waitForBindings(4, "queues.testaddress", 4, 4, false);
-//
-//      send(0, "queues.testaddress", 10, false, null);
-//
-//      verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
-//
-//      this.verifyNotReceive(0, 1, 2, 3, 4);
-//
-//      log.info("got here");
-//   }
+   public void testStopAllStartAll() throws Exception
+   {
+      setupCluster();
 
+      startServers(0, 1, 2, 3, 4);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+      setupSessionFactory(3, isNetty());
+      setupSessionFactory(4, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(1, "queues.testaddress", "queue0", null, false);
+      createQueue(2, "queues.testaddress", "queue0", null, false);
+      createQueue(3, "queues.testaddress", "queue0", null, false);
+      createQueue(4, "queues.testaddress", "queue0", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue0", null);
+      addConsumer(2, 2, "queue0", null);
+      addConsumer(3, 3, "queue0", null);
+      addConsumer(4, 4, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+      waitForBindings(3, "queues.testaddress", 1, 1, true);
+      waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 4, 4, false);
+      waitForBindings(1, "queues.testaddress", 4, 4, false);
+      waitForBindings(2, "queues.testaddress", 4, 4, false);
+      waitForBindings(3, "queues.testaddress", 4, 4, false);
+      waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+
+      this.verifyNotReceive(0, 1, 2, 3, 4);
+
+      this.removeConsumer(0);
+      this.removeConsumer(1);
+      this.removeConsumer(2);
+      this.removeConsumer(3);
+      this.removeConsumer(4);
+
+      this.closeAllSessionFactories();
+
+      stopServers(0, 1, 2, 3, 4);
+
+      startServers(0, 1, 2, 3, 4);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+      setupSessionFactory(3, isNetty());
+      setupSessionFactory(4, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(1, "queues.testaddress", "queue0", null, false);
+      createQueue(2, "queues.testaddress", "queue0", null, false);
+      createQueue(3, "queues.testaddress", "queue0", null, false);
+      createQueue(4, "queues.testaddress", "queue0", null, false);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue0", null);
+      addConsumer(2, 2, "queue0", null);
+      addConsumer(3, 3, "queue0", null);
+      addConsumer(4, 4, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+      waitForBindings(3, "queues.testaddress", 1, 1, true);
+      waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 4, 4, false);
+      waitForBindings(1, "queues.testaddress", 4, 4, false);
+      waitForBindings(2, "queues.testaddress", 4, 4, false);
+      waitForBindings(3, "queues.testaddress", 4, 4, false);
+      waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+      send(0, "queues.testaddress", 10, false, null);
+
+      verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+
+      this.verifyNotReceive(0, 1, 2, 3, 4);
+
+      log.info("got here");
+   }
+
    public void testBasicRoundRobin() throws Exception
    {
       setupCluster();




More information about the jboss-cvs-commits mailing list