[hornetq-commits] JBoss hornetq SVN: r11780 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Nov 28 12:03:24 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-11-28 12:03:24 -0500 (Mon, 28 Nov 2011)
New Revision: 11780
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-5747 - synchronizing access on consumers and producers avoiding close / rollback through multiple threads from unbehaved applications
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-11-28 12:55:00 UTC (rev 11779)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-11-28 17:03:24 UTC (rev 11780)
@@ -594,10 +594,13 @@
stop();
}
- // We need to make sure we don't get any inflight messages
- for (ClientConsumerInternal consumer : consumers.values())
+ synchronized (consumers)
{
- consumer.clear(true);
+ // We need to make sure we don't get any inflight messages
+ for (ClientConsumerInternal consumer : consumers.values())
+ {
+ consumer.clear(true);
+ }
}
// Acks must be flushed here *after connection is stopped and all onmessages finished executing
@@ -694,9 +697,12 @@
if (started)
{
- for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+ synchronized (consumers)
{
- clientConsumerInternal.stop(waitForOnMessage);
+ for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+ {
+ clientConsumerInternal.stop(waitForOnMessage);
+ }
}
channel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP));
@@ -816,7 +822,10 @@
public void addProducer(final ClientProducerInternal producer)
{
- producers.add(producer);
+ synchronized (producers)
+ {
+ producers.add(producer);
+ }
}
public void removeConsumer(final ClientConsumerInternal consumer) throws HornetQException
@@ -829,12 +838,15 @@
public void removeProducer(final ClientProducerInternal producer)
{
- producers.remove(producer);
+ synchronized (producers)
+ {
+ producers.remove(producer);
+ }
}
public void handleReceiveMessage(final long consumerID, final SessionReceiveMessage message) throws Exception
{
- ClientConsumerInternal consumer = consumers.get(consumerID);
+ ClientConsumerInternal consumer = getConsumer(consumerID);
if (consumer != null)
{
@@ -850,7 +862,7 @@
public void handleReceiveLargeMessage(final long consumerID, final SessionReceiveLargeMessage message) throws Exception
{
- ClientConsumerInternal consumer = consumers.get(consumerID);
+ ClientConsumerInternal consumer = getConsumer(consumerID);
if (consumer != null)
{
@@ -860,7 +872,7 @@
public void handleReceiveContinuation(final long consumerID, final SessionReceiveContinuationMessage continuation) throws Exception
{
- ClientConsumerInternal consumer = consumers.get(consumerID);
+ ClientConsumerInternal consumer = getConsumer(consumerID);
if (consumer != null)
{
@@ -1526,10 +1538,13 @@
stop(false);
}
- // We need to make sure we don't get any inflight messages
- for (ClientConsumerInternal consumer : consumers.values())
+ synchronized (consumers)
{
- consumer.clear(false);
+ // We need to make sure we don't get any inflight messages
+ for (ClientConsumerInternal consumer : consumers.values())
+ {
+ consumer.clear(false);
+ }
}
flushAcks();
@@ -1871,6 +1886,19 @@
});
}
+
+ /**
+ * @param consumerID
+ * @return
+ */
+ private ClientConsumerInternal getConsumer(final long consumerID)
+ {
+ synchronized (consumers)
+ {
+ ClientConsumerInternal consumer = consumers.get(consumerID);
+ return consumer;
+ }
+ }
private void doCleanup(boolean failingOver)
{
@@ -1900,14 +1928,24 @@
private void cleanUpChildren() throws Exception
{
- Set<ClientConsumerInternal> consumersClone = new HashSet<ClientConsumerInternal>(consumers.values());
+ Set<ClientConsumerInternal> consumersClone;
+
+ synchronized (consumers)
+ {
+ consumersClone = new HashSet<ClientConsumerInternal>(consumers.values());
+ }
for (ClientConsumerInternal consumer : consumersClone)
{
consumer.cleanUp();
}
- Set<ClientProducerInternal> producersClone = new HashSet<ClientProducerInternal>(producers);
+ Set<ClientProducerInternal> producersClone;
+
+ synchronized (producers)
+ {
+ producersClone = new HashSet<ClientProducerInternal>(producers);
+ }
for (ClientProducerInternal producer : producersClone)
{
@@ -1917,15 +1955,24 @@
private void closeChildren() throws HornetQException
{
- Set<ClientConsumer> consumersClone = new HashSet<ClientConsumer>(consumers.values());
+ Set<ClientConsumer> consumersClone;
+ synchronized (consumers)
+ {
+ consumersClone = new HashSet<ClientConsumer>(consumers.values());
+ }
for (ClientConsumer consumer : consumersClone)
{
consumer.close();
}
- Set<ClientProducer> producersClone = new HashSet<ClientProducer>(producers);
+ Set<ClientProducer> producersClone;
+ synchronized (producers)
+ {
+ producersClone = new HashSet<ClientProducer>(producers);
+ }
+
for (ClientProducer producer : producersClone)
{
producer.close();
More information about the hornetq-commits
mailing list