Author: timfox
Date: 2010-01-05 11:40:35 -0500 (Tue, 05 Jan 2010)
New Revision: 8715
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerInternal.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-255
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java 2010-01-05
16:25:50 UTC (rev 8714)
+++
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java 2010-01-05
16:40:35 UTC (rev 8715)
@@ -33,4 +33,8 @@
void reset();
void close();
+
+ int creditsMapSize();
+
+ int unReferencedCreditsSize();
}
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2010-01-05
16:25:50 UTC (rev 8714)
+++
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2010-01-05
16:40:35 UTC (rev 8715)
@@ -31,11 +31,11 @@
{
private static final Logger log =
Logger.getLogger(ClientProducerCreditManagerImpl.class);
- private static final int MAX_ANON_CREDITS_CACHE_SIZE = 1000;
+ public static final int MAX_UNREFERENCED_CREDITS_CACHE_SIZE = 1000;
private final Map<SimpleString, ClientProducerCredits> producerCredits = new
LinkedHashMap<SimpleString, ClientProducerCredits>();
- private final Map<SimpleString, ClientProducerCredits> anonCredits = new
LinkedHashMap<SimpleString, ClientProducerCredits>();
+ private final Map<SimpleString, ClientProducerCredits> unReferencedCredits = new
LinkedHashMap<SimpleString, ClientProducerCredits>();
private final ClientSessionInternal session;
@@ -47,7 +47,7 @@
this.windowSize = windowSize;
}
-
+
public synchronized ClientProducerCredits getCredits(final SimpleString address, final
boolean anon)
{
ClientProducerCredits credits = producerCredits.get(address);
@@ -58,25 +58,20 @@
credits = new ClientProducerCreditsImpl(session, address, windowSize);
producerCredits.put(address, credits);
-
- if (anon)
- {
- addToAnonCache(address, credits);
- }
}
if (!anon)
{
credits.incrementRefCount();
-
- //Remove from anon credits (if there)
- anonCredits.remove(address);
+
+ // Remove from anon credits (if there)
+ unReferencedCredits.remove(address);
}
else
{
- credits.setAnon();
+ addToUnReferencedCache(address, credits);
}
-
+
return credits;
}
@@ -86,15 +81,7 @@
if (credits != null && credits.decrementRefCount() == 0)
{
- if (!credits.isAnon())
- {
- removeEntry(address, credits);
- }
- else
- {
- //All the producer refs have been removed but it's been used anonymously
too so we add to the anon cache
- addToAnonCache(address, credits);
- }
+ addToUnReferencedCache(address, credits);
}
}
@@ -124,30 +111,42 @@
}
producerCredits.clear();
+
+ unReferencedCredits.clear();
}
- private void addToAnonCache(final SimpleString address, final ClientProducerCredits
credits)
+ public synchronized int creditsMapSize()
{
- anonCredits.put(address, credits);
-
- if (anonCredits.size() > MAX_ANON_CREDITS_CACHE_SIZE)
+ return producerCredits.size();
+ }
+
+ public synchronized int unReferencedCreditsSize()
+ {
+ return unReferencedCredits.size();
+ }
+
+ private void addToUnReferencedCache(final SimpleString address, final
ClientProducerCredits credits)
+ {
+ unReferencedCredits.put(address, credits);
+
+ if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE)
{
- //Remove the oldest entry
-
- Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter =
anonCredits.entrySet().iterator();
-
+ // Remove the oldest entry
+
+ Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter =
unReferencedCredits.entrySet().iterator();
+
Map.Entry<SimpleString, ClientProducerCredits> oldest = iter.next();
-
+
iter.remove();
-
- removeEntry(oldest.getKey(), oldest.getValue());
+
+ removeEntry(oldest.getKey(), oldest.getValue());
}
}
-
+
private void removeEntry(final SimpleString address, final ClientProducerCredits
credits)
{
producerCredits.remove(address);
-
+
credits.releaseOutstanding();
credits.close();
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java 2010-01-05
16:25:50 UTC (rev 8714)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java 2010-01-05
16:40:35 UTC (rev 8715)
@@ -34,9 +34,5 @@
int decrementRefCount();
- void setAnon();
-
- boolean isAnon();
-
void releaseOutstanding();
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2010-01-05
16:25:50 UTC (rev 8714)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2010-01-05
16:40:35 UTC (rev 8715)
@@ -41,8 +41,6 @@
private int refCount;
- private boolean anon;
-
public ClientProducerCreditsImpl(final ClientSessionInternal session,
final SimpleString address,
final int windowSize)
@@ -116,16 +114,6 @@
session.sendProducerCreditsMessage(permits, address);
}
- public synchronized boolean isAnon()
- {
- return anon;
- }
-
- public synchronized void setAnon()
- {
- this.anon = true;
- }
-
private void checkCredits(final int credits)
{
int needed = Math.max(credits, windowSize);
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-01-05
16:25:50 UTC (rev 8714)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-01-05
16:40:35 UTC (rev 8715)
@@ -187,6 +187,11 @@
// Public
---------------------------------------------------------------------------------------
+ public ClientProducerCredits getProducerCredits()
+ {
+ return credits;
+ }
+
// Protected
------------------------------------------------------------------------------------
// Package Private
------------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerInternal.java 2010-01-05
16:25:50 UTC (rev 8714)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerInternal.java 2010-01-05
16:40:35 UTC (rev 8715)
@@ -25,4 +25,6 @@
public interface ClientProducerInternal extends ClientProducer
{
void cleanUp();
+
+ ClientProducerCredits getProducerCredits();
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-01-05 16:25:50
UTC (rev 8714)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-01-05 16:40:35
UTC (rev 8715)
@@ -1061,6 +1061,11 @@
{
producerCreditManager.receiveCredits(address, credits, offset);
}
+
+ public ClientProducerCreditManager getProducerCreditManager()
+ {
+ return producerCreditManager;
+ }
// CommandConfirmationHandler implementation ------------------------------------
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-01-05
16:25:50 UTC (rev 8714)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-01-05
16:40:35 UTC (rev 8715)
@@ -75,4 +75,6 @@
void returnCredits(SimpleString address);
void handleReceiveProducerCredits(SimpleString address, int credits, int offset);
+
+ ClientProducerCreditManager getProducerCreditManager();
}
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-01-05 16:25:50
UTC (rev 8714)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-01-05 16:40:35
UTC (rev 8715)
@@ -533,4 +533,9 @@
{
session.handleReceiveProducerCredits(address, credits, offset);
}
+
+ public ClientProducerCreditManager getProducerCreditManager()
+ {
+ return session.getProducerCreditManager();
+ }
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2010-01-05
16:25:50 UTC (rev 8714)
+++
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2010-01-05
16:40:35 UTC (rev 8715)
@@ -12,6 +12,9 @@
*/
package org.hornetq.tests.integration.client;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -24,6 +27,10 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.core.client.impl.ClientProducerCreditManagerImpl;
+import org.hornetq.core.client.impl.ClientProducerCredits;
+import org.hornetq.core.client.impl.ClientProducerInternal;
+import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.server.HornetQServer;
@@ -809,4 +816,332 @@
server.stop();
}
+ public void testProducerCreditsCaching1() throws Exception
+ {
+ HornetQServer server = createServer(false, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+
+ final ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue("address", "queue1", null, false);
+
+ ClientProducerCredits credits = null;
+
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
+ {
+ ClientProducer prod = session.createProducer("address");
+
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
+
+ if (credits != null)
+ {
+ assertTrue(newCredits == credits);
+ }
+
+ credits = newCredits;
+
+ assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ }
+
+ session.close();
+
+ server.stop();
+ }
+
+ public void testProducerCreditsCaching2() throws Exception
+ {
+ HornetQServer server = createServer(false, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+
+ final ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue("address", "queue1", null, false);
+
+ ClientProducerCredits credits = null;
+
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
+ {
+ ClientProducer prod = session.createProducer("address");
+
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
+
+ if (credits != null)
+ {
+ assertTrue(newCredits == credits);
+ }
+
+ credits = newCredits;
+
+ prod.close();
+
+ assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ }
+
+ session.close();
+
+ server.stop();
+ }
+
+ public void testProducerCreditsCaching3() throws Exception
+ {
+ HornetQServer server = createServer(false, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+
+ final ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue("address", "queue1", null, false);
+
+ ClientProducerCredits credits = null;
+
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer("address" + i);
+
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
+
+ if (credits != null)
+ {
+ assertFalse(newCredits == credits);
+ }
+
+ credits = newCredits;
+
+ assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ }
+
+ session.close();
+
+ server.stop();
+ }
+
+ public void testProducerCreditsCaching4() throws Exception
+ {
+ HornetQServer server = createServer(false, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+
+ final ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue("address", "queue1", null, false);
+
+ ClientProducerCredits credits = null;
+
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer("address" + i);
+
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
+
+ if (credits != null)
+ {
+ assertFalse(newCredits == credits);
+ }
+
+ credits = newCredits;
+
+ prod.close();
+
+ assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ }
+
+ session.close();
+
+ server.stop();
+ }
+
+ public void testProducerCreditsCaching5() throws Exception
+ {
+ HornetQServer server = createServer(false, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+
+ final ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue("address", "queue1", null, false);
+
+ ClientProducerCredits credits = null;
+
+ List<ClientProducerCredits> creditsList = new
ArrayList<ClientProducerCredits>();
+
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer("address" + i);
+
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
+
+ if (credits != null)
+ {
+ assertFalse(newCredits == credits);
+ }
+
+ credits = newCredits;
+
+ assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+
+ creditsList.add(credits);
+ }
+
+ Iterator<ClientProducerCredits> iter = creditsList.iterator();
+
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer("address" + i);
+
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
+
+ assertTrue(newCredits == iter.next());
+
+
assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ }
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientProducer prod = session.createProducer("address" + (i +
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE));
+
+ assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE
+ i + 1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ }
+
+ session.close();
+
+ server.stop();
+ }
+
+ public void testProducerCreditsCaching6() throws Exception
+ {
+ HornetQServer server = createServer(false, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+
+ final ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue("address", "queue1", null, false);
+
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer((String)null);
+
+ prod.send("address", session.createMessage(false));
+
+ assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ }
+
+ session.close();
+
+ server.stop();
+ }
+
+ public void testProducerCreditsCaching7() throws Exception
+ {
+ HornetQServer server = createServer(false, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+
+ final ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue("address", "queue1", null, false);
+
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer((String)null);
+
+ prod.send("address" + i, session.createMessage(false));
+
+ assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ }
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientProducer prod = session.createProducer((String)null);
+
+ prod.send("address" + i, session.createMessage(false));
+
+
assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+
assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ }
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientProducer prod = session.createProducer((String)null);
+
+ prod.send("address2-" + i, session.createMessage(false));
+
+
assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+
assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ }
+
+ session.close();
+
+ server.stop();
+ }
+
+ public void testProducerCreditsRefCounting() throws Exception
+ {
+ HornetQServer server = createServer(false, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+
+ final ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue("address", "queue1", null, false);
+
+ ClientProducer prod1 = session.createProducer("address");
+ assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+
+ ClientProducer prod2 = session.createProducer("address");
+ assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+
+ ClientProducer prod3 = session.createProducer("address");
+ assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+
+ prod1.close();
+
+ assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+
+ prod2.close();
+
+ assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+
+ prod3.close();
+
+ assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+
+ session.close();
+
+ server.stop();
+ }
+
}