Author: clebert.suconic
Date: 2011-11-01 17:09:46 -0400 (Tue, 01 Nov 2011)
New Revision: 11626
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
Log:
Fixing rare dead lock on Flow control
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2011-11-01
19:49:48 UTC (rev 11625)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2011-11-01
21:09:46 UTC (rev 11626)
@@ -48,29 +48,44 @@
this.windowSize = windowSize;
}
- public synchronized ClientProducerCredits getCredits(final SimpleString address, final
boolean anon)
+ public ClientProducerCredits getCredits(final SimpleString address, final boolean
anon)
{
- ClientProducerCredits credits = producerCredits.get(address);
-
- if (credits == null)
+ boolean needInit = false;
+ ClientProducerCredits credits;
+
+ synchronized(this)
{
- // Doesn't need to be fair since session is single threaded
- credits = new ClientProducerCreditsImpl(session, address, windowSize);
-
- producerCredits.put(address, credits);
+ credits = producerCredits.get(address);
+
+ if (credits == null)
+ {
+ // Doesn't need to be fair since session is single threaded
+ credits = new ClientProducerCreditsImpl(session, address, windowSize);
+ needInit = true;
+
+ producerCredits.put(address, credits);
+ }
+
+ if (!anon)
+ {
+ credits.incrementRefCount();
+
+ // Remove from anon credits (if there)
+ unReferencedCredits.remove(address);
+ }
+ else
+ {
+ addToUnReferencedCache(address, credits);
+ }
}
-
- if (!anon)
+
+ // The init is done outside of the lock
+ // otherwise packages may arrive with flow control
+ // while this is still sending requests causing a dead lock
+ if (needInit)
{
- credits.incrementRefCount();
-
- // Remove from anon credits (if there)
- unReferencedCredits.remove(address);
+ credits.init();
}
- else
- {
- addToUnReferencedCache(address, credits);
- }
return credits;
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java 2011-11-01
19:49:48 UTC (rev 11625)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java 2011-11-01
21:09:46 UTC (rev 11626)
@@ -27,6 +27,8 @@
void receiveCredits(int credits);
boolean isBlocked();
+
+ void init();
void reset();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2011-11-01
19:49:48 UTC (rev 11625)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2011-11-01
21:09:46 UTC (rev 11626)
@@ -58,7 +58,10 @@
// Doesn't need to be fair since session is single threaded
semaphore = new Semaphore(0, false);
-
+ }
+
+ public void init()
+ {
// We initial request twice as many credits as we request in subsequent requests
// This allows the producer to keep sending as more arrive, minimising pauses
checkCredits(windowSize);