[hornetq-commits] JBoss hornetq SVN: r11925 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Dec 21 10:53:48 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-21 10:53:48 -0500 (Wed, 21 Dec 2011)
New Revision: 11925

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
JBPAPP-7756 - Disabled flow control shouldn't be looking for credits

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java	2011-12-21 13:42:08 UTC (rev 11924)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java	2011-12-21 15:53:48 UTC (rev 11925)
@@ -50,44 +50,51 @@
 
    public ClientProducerCredits getCredits(final SimpleString address, final boolean anon)
    {
-      boolean needInit = false;
-      ClientProducerCredits credits;
-      
-      synchronized(this)
+      if (windowSize == -1)
       {
-         credits = producerCredits.get(address);
-   
-         if (credits == null)
+         return ClientProducerCreditsNoFlowControl.instance;
+      }
+      else
+      {
+         boolean needInit = false;
+         ClientProducerCredits credits;
+         
+         synchronized(this)
          {
-            // Doesn't need to be fair since session is single threaded
-            credits = new ClientProducerCreditsImpl(session, address, windowSize);
-            needInit = true;
-   
-            producerCredits.put(address, credits);
+            credits = producerCredits.get(address);
+      
+            if (credits == null)
+            {
+               // Doesn't need to be fair since session is single threaded
+               credits = new ClientProducerCreditsImpl(session, address, windowSize);
+               needInit = true;
+      
+               producerCredits.put(address, credits);
+            }
+      
+            if (!anon)
+            {
+               credits.incrementRefCount();
+      
+               // Remove from anon credits (if there)
+               unReferencedCredits.remove(address);
+            }
+            else
+            {
+               addToUnReferencedCache(address, credits);
+            }
          }
-   
-         if (!anon)
+         
+         // The init is done outside of the lock
+         // otherwise packages may arrive with flow control
+         // while this is still sending requests causing a dead lock
+         if (needInit)
          {
-            credits.incrementRefCount();
+            credits.init();
+         }
    
-            // Remove from anon credits (if there)
-            unReferencedCredits.remove(address);
-         }
-         else
-         {
-            addToUnReferencedCache(address, credits);
-         }
+         return credits;
       }
-      
-      // The init is done outside of the lock
-      // otherwise packages may arrive with flow control
-      // while this is still sending requests causing a dead lock
-      if (needInit)
-      {
-         credits.init();
-      }
-
-      return credits;
    }
 
    public synchronized void returnCredits(final SimpleString address)
@@ -166,5 +173,50 @@
 
       credits.close();
    }
+   
+   
+   static class ClientProducerCreditsNoFlowControl implements ClientProducerCredits
+   {
+      static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl();
 
+      public void acquireCredits(int credits) throws InterruptedException
+      {
+      }
+
+      public void receiveCredits(int credits)
+      {
+      }
+
+      public boolean isBlocked()
+      {
+         return false;
+      }
+
+      public void init()
+      {
+      }
+
+      public void reset()
+      {
+      }
+
+      public void close()
+      {
+      }
+
+      public void incrementRefCount()
+      {
+      }
+
+      public int decrementRefCount()
+      {
+         return 1;
+      }
+
+      public void releaseOutstanding()
+      {
+      }
+      
+   }
+
 }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-12-21 13:42:08 UTC (rev 11924)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-12-21 15:53:48 UTC (rev 11925)
@@ -360,6 +360,108 @@
 
    }
 
+   public void testSendOverBlockingNoFlowControl() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          AddressFullMessagePolicy.BLOCK,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 10 * 1024;
+
+      final int numberOfMessages = 500;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+         locator.setProducerWindowSize(-1);
+         locator.setMinLargeMessageSize(1024 * 1024);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         byte[] body = new byte[messageSize];
+
+         ByteBuffer bb = ByteBuffer.wrap(body);
+
+         for (int j = 1; j <= messageSize; j++)
+         {
+            bb.put(getSamplebyte(j));
+         }
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = session.createMessage(true);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producer.send(message);
+
+            if (i % 10 == 0)
+            {
+               session.commit();
+            }
+         }
+         session.commit();
+         
+         session.start();
+         
+         ClientConsumer cons = session.createConsumer(ADDRESS);
+         
+         for (int i = 0 ; i < numberOfMessages; i++)
+         {
+            message = cons.receive(5000);
+            assertNotNull(message);
+            message.acknowledge();
+            
+            if (i % 10 == 0)
+            {
+               session.commit();
+            }
+         }
+         
+         session.commit();
+         
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    public void testReceiveImmediate() throws Exception
    {
       clearData();



More information about the hornetq-commits mailing list