Author: clebert.suconic(a)jboss.com
Date: 2011-12-21 10:53:48 -0500 (Wed, 21 Dec 2011)
New Revision: 11925
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
JBPAPP-7756 - Disabled flow control shouldn't be looking for credits
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2011-12-21
13:42:08 UTC (rev 11924)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2011-12-21
15:53:48 UTC (rev 11925)
@@ -50,44 +50,51 @@
public ClientProducerCredits getCredits(final SimpleString address, final boolean
anon)
{
- boolean needInit = false;
- ClientProducerCredits credits;
-
- synchronized(this)
+ if (windowSize == -1)
{
- credits = producerCredits.get(address);
-
- if (credits == null)
+ return ClientProducerCreditsNoFlowControl.instance;
+ }
+ else
+ {
+ boolean needInit = false;
+ ClientProducerCredits credits;
+
+ synchronized(this)
{
- // Doesn't need to be fair since session is single threaded
- credits = new ClientProducerCreditsImpl(session, address, windowSize);
- needInit = true;
-
- producerCredits.put(address, credits);
+ credits = producerCredits.get(address);
+
+ if (credits == null)
+ {
+ // Doesn't need to be fair since session is single threaded
+ credits = new ClientProducerCreditsImpl(session, address, windowSize);
+ needInit = true;
+
+ producerCredits.put(address, credits);
+ }
+
+ if (!anon)
+ {
+ credits.incrementRefCount();
+
+ // Remove from anon credits (if there)
+ unReferencedCredits.remove(address);
+ }
+ else
+ {
+ addToUnReferencedCache(address, credits);
+ }
}
-
- if (!anon)
+
+ // The init is done outside of the lock
+ // otherwise packages may arrive with flow control
+ // while this is still sending requests causing a dead lock
+ if (needInit)
{
- credits.incrementRefCount();
+ credits.init();
+ }
- // Remove from anon credits (if there)
- unReferencedCredits.remove(address);
- }
- else
- {
- addToUnReferencedCache(address, credits);
- }
+ return credits;
}
-
- // The init is done outside of the lock
- // otherwise packages may arrive with flow control
- // while this is still sending requests causing a dead lock
- if (needInit)
- {
- credits.init();
- }
-
- return credits;
}
public synchronized void returnCredits(final SimpleString address)
@@ -166,5 +173,50 @@
credits.close();
}
+
+
+ static class ClientProducerCreditsNoFlowControl implements ClientProducerCredits
+ {
+ static ClientProducerCreditsNoFlowControl instance = new
ClientProducerCreditsNoFlowControl();
+ public void acquireCredits(int credits) throws InterruptedException
+ {
+ }
+
+ public void receiveCredits(int credits)
+ {
+ }
+
+ public boolean isBlocked()
+ {
+ return false;
+ }
+
+ public void init()
+ {
+ }
+
+ public void reset()
+ {
+ }
+
+ public void close()
+ {
+ }
+
+ public void incrementRefCount()
+ {
+ }
+
+ public int decrementRefCount()
+ {
+ return 1;
+ }
+
+ public void releaseOutstanding()
+ {
+ }
+
+ }
+
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-12-21
13:42:08 UTC (rev 11924)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-12-21
15:53:48 UTC (rev 11925)
@@ -360,6 +360,108 @@
}
+ public void testSendOverBlockingNoFlowControl() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ AddressFullMessagePolicy.BLOCK,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 10 * 1024;
+
+ final int numberOfMessages = 500;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setProducerWindowSize(-1);
+ locator.setMinLargeMessageSize(1024 * 1024);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+
+ if (i % 10 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer(ADDRESS);
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ message = cons.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ if (i % 10 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testReceiveImmediate() throws Exception
{
clearData();