[hornetq-commits] JBoss hornetq SVN: r11781 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Nov 28 14:19:19 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-11-28 14:19:19 -0500 (Mon, 28 Nov 2011)
New Revision: 11781
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-5747 - synchronizing access on consumers and producers avoiding close / rollback through multiple threads from unbehaved applications
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-11-28 17:03:24 UTC (rev 11780)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-11-28 19:19:19 UTC (rev 11781)
@@ -594,13 +594,11 @@
stop();
}
- synchronized (consumers)
+
+ // We need to make sure we don't get any inflight messages
+ for (ClientConsumerInternal consumer : cloneConsumers())
{
- // We need to make sure we don't get any inflight messages
- for (ClientConsumerInternal consumer : consumers.values())
- {
- consumer.clear(true);
- }
+ consumer.clear(true);
}
// Acks must be flushed here *after connection is stopped and all onmessages finished executing
@@ -675,7 +673,7 @@
if (!started)
{
- for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+ for (ClientConsumerInternal clientConsumerInternal : cloneConsumers())
{
clientConsumerInternal.start();
}
@@ -697,12 +695,9 @@
if (started)
{
- synchronized (consumers)
+ for (ClientConsumerInternal clientConsumerInternal : cloneConsumers())
{
- for (ClientConsumerInternal clientConsumerInternal : consumers.values())
- {
- clientConsumerInternal.stop(waitForOnMessage);
- }
+ clientConsumerInternal.stop(waitForOnMessage);
}
channel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP));
@@ -1102,7 +1097,7 @@
// Now start the session if it was already started
if (started)
{
- for (ClientConsumerInternal consumer : consumers.values())
+ for (ClientConsumerInternal consumer : cloneConsumers())
{
consumer.clearAtFailover();
consumer.start();
@@ -1538,13 +1533,10 @@
stop(false);
}
- synchronized (consumers)
+ // We need to make sure we don't get any inflight messages
+ for (ClientConsumerInternal consumer : cloneConsumers())
{
- // We need to make sure we don't get any inflight messages
- for (ClientConsumerInternal consumer : consumers.values())
- {
- consumer.clear(false);
- }
+ consumer.clear(false);
}
flushAcks();
@@ -1928,51 +1920,57 @@
private void cleanUpChildren() throws Exception
{
- Set<ClientConsumerInternal> consumersClone;
-
- synchronized (consumers)
- {
- consumersClone = new HashSet<ClientConsumerInternal>(consumers.values());
- }
+ Set<ClientConsumerInternal> consumersClone = cloneConsumers();
for (ClientConsumerInternal consumer : consumersClone)
{
consumer.cleanUp();
}
+ Set<ClientProducerInternal> producersClone = cloneProducers();
+
+ for (ClientProducerInternal producer : producersClone)
+ {
+ producer.cleanUp();
+ }
+ }
+
+ /**
+ * @return
+ */
+ private Set<ClientProducerInternal> cloneProducers()
+ {
Set<ClientProducerInternal> producersClone;
synchronized (producers)
{
producersClone = new HashSet<ClientProducerInternal>(producers);
}
+ return producersClone;
+ }
- for (ClientProducerInternal producer : producersClone)
+ /**
+ * @return
+ */
+ private Set<ClientConsumerInternal> cloneConsumers()
+ {
+ synchronized (consumers)
{
- producer.cleanUp();
+ return new HashSet<ClientConsumerInternal>(consumers.values());
}
}
private void closeChildren() throws HornetQException
{
- Set<ClientConsumer> consumersClone;
- synchronized (consumers)
- {
- consumersClone = new HashSet<ClientConsumer>(consumers.values());
- }
+ Set<ClientConsumerInternal> consumersClone = cloneConsumers();
for (ClientConsumer consumer : consumersClone)
{
consumer.close();
}
- Set<ClientProducer> producersClone;
+ Set<ClientProducerInternal> producersClone = cloneProducers();
- synchronized (producers)
- {
- producersClone = new HashSet<ClientProducer>(producers);
- }
-
for (ClientProducer producer : producersClone)
{
producer.close();
@@ -1981,12 +1979,9 @@
private void flushAcks() throws HornetQException
{
- synchronized (consumers)
+ for (ClientConsumerInternal consumer : cloneConsumers())
{
- for (ClientConsumerInternal consumer : consumers.values())
- {
- consumer.flushAcks();
- }
+ consumer.flushAcks();
}
}
More information about the hornetq-commits
mailing list