Author: timfox
Date: 2010-01-04 12:12:31 -0500 (Mon, 04 Jan 2010)
New Revision: 8709
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-255
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java 2010-01-04
16:56:10 UTC (rev 8708)
+++
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java 2010-01-04
17:12:31 UTC (rev 8709)
@@ -24,7 +24,9 @@
*/
public interface ClientProducerCreditManager
{
- ClientProducerCredits getCredits(SimpleString address);
+ ClientProducerCredits getCredits(SimpleString address, boolean anon);
+
+ void returnCredits(SimpleString address);
void receiveCredits(SimpleString address, int credits, int offset);
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2010-01-04
16:56:10 UTC (rev 8708)
+++
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2010-01-04
17:12:31 UTC (rev 8709)
@@ -13,10 +13,12 @@
package org.hornetq.core.client.impl;
-import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.Map;
import org.hornetq.SimpleString;
+import org.hornetq.core.logging.Logger;
/**
* A ProducerCreditManager
@@ -27,8 +29,14 @@
*/
public class ClientProducerCreditManagerImpl implements ClientProducerCreditManager
{
- private final Map<SimpleString, ClientProducerCredits> producerCredits = new
HashMap<SimpleString, ClientProducerCredits>();
+ private static final Logger log =
Logger.getLogger(ClientProducerCreditManagerImpl.class);
+ private static final int MAX_ANON_CREDITS_CACHE_SIZE = 1000;
+
+ private final Map<SimpleString, ClientProducerCredits> producerCredits = new
LinkedHashMap<SimpleString, ClientProducerCredits>();
+
+ private final Map<SimpleString, ClientProducerCredits> anonCredits = new
LinkedHashMap<SimpleString, ClientProducerCredits>();
+
private final ClientSessionInternal session;
private final int windowSize;
@@ -39,8 +47,8 @@
this.windowSize = windowSize;
}
-
- public synchronized ClientProducerCredits getCredits(final SimpleString address)
+
+ public synchronized ClientProducerCredits getCredits(final SimpleString address, final
boolean anon)
{
ClientProducerCredits credits = producerCredits.get(address);
@@ -50,11 +58,46 @@
credits = new ClientProducerCreditsImpl(session, address, windowSize);
producerCredits.put(address, credits);
+
+ if (anon)
+ {
+ addToAnonCache(address, credits);
+ }
}
+ if (!anon)
+ {
+ credits.incrementRefCount();
+
+ //Remove from anon credits (if there)
+ anonCredits.remove(address);
+ }
+ else
+ {
+ credits.setAnon();
+ }
+
return credits;
}
+ public synchronized void returnCredits(final SimpleString address)
+ {
+ ClientProducerCredits credits = producerCredits.get(address);
+
+ if (credits != null && credits.decrementRefCount() == 0)
+ {
+ if (!credits.isAnon())
+ {
+ removeEntry(address, credits);
+ }
+ else
+ {
+ //All the producer refs have been removed but it's been used anonymously
too so we add to the anon cache
+ addToAnonCache(address, credits);
+ }
+ }
+ }
+
public synchronized void receiveCredits(final SimpleString address, final int credits,
final int offset)
{
ClientProducerCredits cr = producerCredits.get(address);
@@ -82,4 +125,32 @@
producerCredits.clear();
}
+
+ private void addToAnonCache(final SimpleString address, final ClientProducerCredits
credits)
+ {
+ anonCredits.put(address, credits);
+
+ if (anonCredits.size() > MAX_ANON_CREDITS_CACHE_SIZE)
+ {
+ //Remove the oldest entry
+
+ Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter =
anonCredits.entrySet().iterator();
+
+ Map.Entry<SimpleString, ClientProducerCredits> oldest = iter.next();
+
+ iter.remove();
+
+ removeEntry(oldest.getKey(), oldest.getValue());
+ }
+ }
+
+ private void removeEntry(final SimpleString address, final ClientProducerCredits
credits)
+ {
+ producerCredits.remove(address);
+
+ credits.releaseOutstanding();
+
+ credits.close();
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java 2010-01-04
16:56:10 UTC (rev 8708)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java 2010-01-04
17:12:31 UTC (rev 8709)
@@ -29,4 +29,14 @@
void reset();
void close();
+
+ void incrementRefCount();
+
+ int decrementRefCount();
+
+ void setAnon();
+
+ boolean isAnon();
+
+ void releaseOutstanding();
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2010-01-04
16:56:10 UTC (rev 8708)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2010-01-04
17:12:31 UTC (rev 8709)
@@ -19,7 +19,7 @@
import org.hornetq.core.logging.Logger;
/**
- * A ProducerCredits
+ * A ClientProducerCreditsImpl
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
@@ -38,7 +38,11 @@
private final ClientSessionInternal session;
private int arriving;
-
+
+ private int refCount;
+
+ private boolean anon;
+
public ClientProducerCreditsImpl(final ClientSessionInternal session,
final SimpleString address,
final int windowSize)
@@ -94,7 +98,34 @@
semaphore.release(Integer.MAX_VALUE / 2);
}
-
+
+ public synchronized void incrementRefCount()
+ {
+ refCount++;
+ }
+
+ public synchronized int decrementRefCount()
+ {
+ return --refCount;
+ }
+
+ public synchronized void releaseOutstanding()
+ {
+ int permits = semaphore.drainPermits();
+
+ session.sendProducerCreditsMessage(permits, address);
+ }
+
+ public synchronized boolean isAnon()
+ {
+ return anon;
+ }
+
+ public synchronized void setAnon()
+ {
+ this.anon = true;
+ }
+
private void checkCredits(final int credits)
{
int needed = Math.max(credits, windowSize);
@@ -119,7 +150,6 @@
private void requestCredits(final int credits)
{
-
session.sendProducerCreditsMessage(credits, address);
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-01-04
16:56:10 UTC (rev 8708)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-01-04
17:12:31 UTC (rev 8709)
@@ -111,7 +111,7 @@
if (address != null)
{
- credits = session.getCredits(address);
+ credits = session.getCredits(address, false);
}
else
{
@@ -151,7 +151,7 @@
{
return;
}
-
+
doCleanup();
}
@@ -195,6 +195,11 @@
private void doCleanup()
{
+ if (address != null)
+ {
+ session.returnCredits(address);
+ }
+
session.removeProducer(this);
closed = true;
@@ -211,7 +216,7 @@
msgI.setAddress(address);
// Anonymous
- theCredits = session.getCredits(address);
+ theCredits = session.getCredits(address, true);
}
else
{
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-01-04 16:56:10
UTC (rev 8708)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-01-04 17:12:31
UTC (rev 8709)
@@ -1047,10 +1047,15 @@
channel.send(new SessionRequestProducerCreditsMessage(credits, address));
}
- public ClientProducerCredits getCredits(final SimpleString address)
+ public ClientProducerCredits getCredits(final SimpleString address, final boolean
anon)
{
- return producerCreditManager.getCredits(address);
+ return producerCreditManager.getCredits(address, anon);
}
+
+ public void returnCredits(final SimpleString address)
+ {
+ producerCreditManager.returnCredits(address);
+ }
public void handleReceiveProducerCredits(final SimpleString address, final int
credits, final int offset)
{
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-01-04
16:56:10 UTC (rev 8708)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-01-04
17:12:31 UTC (rev 8709)
@@ -70,7 +70,9 @@
void sendProducerCreditsMessage(int credits, SimpleString address);
- ClientProducerCredits getCredits(SimpleString address);
+ ClientProducerCredits getCredits(SimpleString address, boolean anon);
+
+ void returnCredits(SimpleString address);
void handleReceiveProducerCredits(SimpleString address, int credits, int offset);
}
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-01-04 16:56:10
UTC (rev 8708)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-01-04 17:12:31
UTC (rev 8709)
@@ -519,10 +519,15 @@
session.sendProducerCreditsMessage(credits, address);
}
- public ClientProducerCredits getCredits(final SimpleString address)
+ public ClientProducerCredits getCredits(final SimpleString address, final boolean
anon)
{
- return session.getCredits(address);
+ return session.getCredits(address, anon);
}
+
+ public void returnCredits(final SimpleString address)
+ {
+ session.returnCredits(address);
+ }
public void handleReceiveProducerCredits(final SimpleString address, final int
credits, final int offset)
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-04 16:56:10
UTC (rev 8708)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-04 17:12:31
UTC (rev 8709)
@@ -1485,7 +1485,7 @@
throw new HornetQException(HornetQException.ILLEGAL_STATE,
"large-message not initialized on server");
}
- // Immediately release the credits for the continuations- these don't
contrinute to the in-memory size
+ // Immediately release the credits for the continuations- these don't
contribute to the in-memory size
// of the message
releaseOutStanding(currentLargeMessage, packet.getPacketSize());
@@ -1535,30 +1535,39 @@
final CreditManagerHolder holder = getCreditManagerHolder(address);
int credits = packet.getCredits();
-
- int gotCredits = holder.manager.acquireCredits(credits, new
CreditsAvailableRunnable()
+
+ //Requesting -ve credits means returning them
+
+ if (credits < 0)
{
- public boolean run(final int credits)
+ releaseOutStanding(address, -credits);
+ }
+ else
+ {
+ int gotCredits = holder.manager.acquireCredits(credits, new
CreditsAvailableRunnable()
{
- synchronized (ServerSessionImpl.this)
+ public boolean run(final int credits)
{
- if (!closed)
+ synchronized (ServerSessionImpl.this)
{
- sendProducerCredits(holder, credits, address);
-
- return true;
+ if (!closed)
+ {
+ sendProducerCredits(holder, credits, address);
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
- else
- {
- return false;
- }
}
+ });
+
+ if (gotCredits > 0)
+ {
+ sendProducerCredits(holder, gotCredits, address);
}
- });
-
- if (gotCredits > 0)
- {
- sendProducerCredits(holder, gotCredits, address);
}
sendResponse(packet, null, false, false);
@@ -1932,7 +1941,12 @@
*/
private void releaseOutStanding(final ServerMessage message, final int credits) throws
Exception
{
- CreditManagerHolder holder = getCreditManagerHolder(message.getAddress());
+ releaseOutStanding(message.getAddress(), credits);
+ }
+
+ private void releaseOutStanding(final SimpleString address, final int credits) throws
Exception
+ {
+ CreditManagerHolder holder = getCreditManagerHolder(address);
holder.outstandingCredits -= credits;