[hornetq-commits] JBoss hornetq SVN: r11780 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 28 12:03:24 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-11-28 12:03:24 -0500 (Mon, 28 Nov 2011)
New Revision: 11780

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-5747 - synchronizing access on consumers and producers avoiding close / rollback through multiple threads from unbehaved applications

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-11-28 12:55:00 UTC (rev 11779)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-11-28 17:03:24 UTC (rev 11780)
@@ -594,10 +594,13 @@
          stop();
       }
 
-      // We need to make sure we don't get any inflight messages
-      for (ClientConsumerInternal consumer : consumers.values())
+      synchronized (consumers)
       {
-         consumer.clear(true);
+         // We need to make sure we don't get any inflight messages
+         for (ClientConsumerInternal consumer : consumers.values())
+         {
+            consumer.clear(true);
+         }
       }
 
       // Acks must be flushed here *after connection is stopped and all onmessages finished executing
@@ -694,9 +697,12 @@
 
       if (started)
       {
-         for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+         synchronized (consumers)
          {
-            clientConsumerInternal.stop(waitForOnMessage);
+            for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+            {
+               clientConsumerInternal.stop(waitForOnMessage);
+            }
          }
 
          channel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP));
@@ -816,7 +822,10 @@
 
    public void addProducer(final ClientProducerInternal producer)
    {
-      producers.add(producer);
+      synchronized (producers)
+      {
+         producers.add(producer);
+      }
    }
 
    public void removeConsumer(final ClientConsumerInternal consumer) throws HornetQException
@@ -829,12 +838,15 @@
 
    public void removeProducer(final ClientProducerInternal producer)
    {
-      producers.remove(producer);
+      synchronized (producers)
+      {
+         producers.remove(producer);
+      }
    }
 
    public void handleReceiveMessage(final long consumerID, final SessionReceiveMessage message) throws Exception
    {
-      ClientConsumerInternal consumer = consumers.get(consumerID);
+      ClientConsumerInternal consumer = getConsumer(consumerID);
 
       if (consumer != null)
       {
@@ -850,7 +862,7 @@
 
    public void handleReceiveLargeMessage(final long consumerID, final SessionReceiveLargeMessage message) throws Exception
    {
-      ClientConsumerInternal consumer = consumers.get(consumerID);
+      ClientConsumerInternal consumer = getConsumer(consumerID);
 
       if (consumer != null)
       {
@@ -860,7 +872,7 @@
 
    public void handleReceiveContinuation(final long consumerID, final SessionReceiveContinuationMessage continuation) throws Exception
    {
-      ClientConsumerInternal consumer = consumers.get(consumerID);
+      ClientConsumerInternal consumer = getConsumer(consumerID);
 
       if (consumer != null)
       {
@@ -1526,10 +1538,13 @@
             stop(false);
          }
 
-         // We need to make sure we don't get any inflight messages
-         for (ClientConsumerInternal consumer : consumers.values())
+         synchronized (consumers)
          {
-            consumer.clear(false);
+            // We need to make sure we don't get any inflight messages
+            for (ClientConsumerInternal consumer : consumers.values())
+            {
+               consumer.clear(false);
+            }
          }
 
          flushAcks();
@@ -1871,6 +1886,19 @@
       });
 
    }
+   
+   /**
+    * @param consumerID
+    * @return
+    */
+   private ClientConsumerInternal getConsumer(final long consumerID)
+   {
+      synchronized (consumers)
+      {
+         ClientConsumerInternal consumer = consumers.get(consumerID);
+         return consumer;
+      }
+   }
 
    private void doCleanup(boolean failingOver)
    {
@@ -1900,14 +1928,24 @@
 
    private void cleanUpChildren() throws Exception
    {
-      Set<ClientConsumerInternal> consumersClone = new HashSet<ClientConsumerInternal>(consumers.values());
+      Set<ClientConsumerInternal> consumersClone;
+      
+      synchronized (consumers)
+      {
+         consumersClone = new HashSet<ClientConsumerInternal>(consumers.values());
+      }
 
       for (ClientConsumerInternal consumer : consumersClone)
       {
          consumer.cleanUp();
       }
 
-      Set<ClientProducerInternal> producersClone = new HashSet<ClientProducerInternal>(producers);
+      Set<ClientProducerInternal> producersClone;
+      
+      synchronized (producers)
+      {
+         producersClone = new HashSet<ClientProducerInternal>(producers);
+      }
 
       for (ClientProducerInternal producer : producersClone)
       {
@@ -1917,15 +1955,24 @@
 
    private void closeChildren() throws HornetQException
    {
-      Set<ClientConsumer> consumersClone = new HashSet<ClientConsumer>(consumers.values());
+      Set<ClientConsumer> consumersClone;
+      synchronized (consumers)
+      {
+         consumersClone = new HashSet<ClientConsumer>(consumers.values());
+      }
 
       for (ClientConsumer consumer : consumersClone)
       {
          consumer.close();
       }
 
-      Set<ClientProducer> producersClone = new HashSet<ClientProducer>(producers);
+      Set<ClientProducer> producersClone;
 
+      synchronized (producers)
+      {
+         producersClone =  new HashSet<ClientProducer>(producers);
+      }
+
       for (ClientProducer producer : producersClone)
       {
          producer.close();



More information about the hornetq-commits mailing list