[hornetq-commits] JBoss hornetq SVN: r11781 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 28 14:19:19 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-11-28 14:19:19 -0500 (Mon, 28 Nov 2011)
New Revision: 11781

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-5747 - synchronizing access on consumers and producers avoiding close / rollback through multiple threads from unbehaved applications

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-11-28 17:03:24 UTC (rev 11780)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-11-28 19:19:19 UTC (rev 11781)
@@ -594,13 +594,11 @@
          stop();
       }
 
-      synchronized (consumers)
+      
+      // We need to make sure we don't get any inflight messages
+      for (ClientConsumerInternal consumer : cloneConsumers())
       {
-         // We need to make sure we don't get any inflight messages
-         for (ClientConsumerInternal consumer : consumers.values())
-         {
-            consumer.clear(true);
-         }
+         consumer.clear(true);
       }
 
       // Acks must be flushed here *after connection is stopped and all onmessages finished executing
@@ -675,7 +673,7 @@
 
       if (!started)
       {
-         for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+         for (ClientConsumerInternal clientConsumerInternal : cloneConsumers())
          {
             clientConsumerInternal.start();
          }
@@ -697,12 +695,9 @@
 
       if (started)
       {
-         synchronized (consumers)
+         for (ClientConsumerInternal clientConsumerInternal : cloneConsumers())
          {
-            for (ClientConsumerInternal clientConsumerInternal : consumers.values())
-            {
-               clientConsumerInternal.stop(waitForOnMessage);
-            }
+            clientConsumerInternal.stop(waitForOnMessage);
          }
 
          channel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP));
@@ -1102,7 +1097,7 @@
                   // Now start the session if it was already started
                   if (started)
                   {
-                     for (ClientConsumerInternal consumer : consumers.values())
+                     for (ClientConsumerInternal consumer : cloneConsumers())
                      {
                         consumer.clearAtFailover();
                         consumer.start();
@@ -1538,13 +1533,10 @@
             stop(false);
          }
 
-         synchronized (consumers)
+         // We need to make sure we don't get any inflight messages
+         for (ClientConsumerInternal consumer : cloneConsumers())
          {
-            // We need to make sure we don't get any inflight messages
-            for (ClientConsumerInternal consumer : consumers.values())
-            {
-               consumer.clear(false);
-            }
+            consumer.clear(false);
          }
 
          flushAcks();
@@ -1928,51 +1920,57 @@
 
    private void cleanUpChildren() throws Exception
    {
-      Set<ClientConsumerInternal> consumersClone;
-      
-      synchronized (consumers)
-      {
-         consumersClone = new HashSet<ClientConsumerInternal>(consumers.values());
-      }
+      Set<ClientConsumerInternal> consumersClone = cloneConsumers();
 
       for (ClientConsumerInternal consumer : consumersClone)
       {
          consumer.cleanUp();
       }
 
+      Set<ClientProducerInternal> producersClone = cloneProducers();
+
+      for (ClientProducerInternal producer : producersClone)
+      {
+         producer.cleanUp();
+      }
+   }
+
+   /**
+    * @return
+    */
+   private Set<ClientProducerInternal> cloneProducers()
+   {
       Set<ClientProducerInternal> producersClone;
       
       synchronized (producers)
       {
          producersClone = new HashSet<ClientProducerInternal>(producers);
       }
+      return producersClone;
+   }
 
-      for (ClientProducerInternal producer : producersClone)
+   /**
+    * @return
+    */
+   private Set<ClientConsumerInternal> cloneConsumers()
+   {
+      synchronized (consumers)
       {
-         producer.cleanUp();
+         return new HashSet<ClientConsumerInternal>(consumers.values());
       }
    }
 
    private void closeChildren() throws HornetQException
    {
-      Set<ClientConsumer> consumersClone;
-      synchronized (consumers)
-      {
-         consumersClone = new HashSet<ClientConsumer>(consumers.values());
-      }
+      Set<ClientConsumerInternal> consumersClone = cloneConsumers();
 
       for (ClientConsumer consumer : consumersClone)
       {
          consumer.close();
       }
 
-      Set<ClientProducer> producersClone;
+      Set<ClientProducerInternal> producersClone = cloneProducers();
 
-      synchronized (producers)
-      {
-         producersClone =  new HashSet<ClientProducer>(producers);
-      }
-
       for (ClientProducer producer : producersClone)
       {
          producer.close();
@@ -1981,12 +1979,9 @@
 
    private void flushAcks() throws HornetQException
    {
-      synchronized (consumers)
+      for (ClientConsumerInternal consumer : cloneConsumers())
       {
-         for (ClientConsumerInternal consumer : consumers.values())
-         {
-            consumer.flushAcks();
-         }
+         consumer.flushAcks();
       }
    }
 



More information about the hornetq-commits mailing list