[hornetq-commits] JBoss hornetq SVN: r8715 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jan 5 11:40:35 EST 2010


Author: timfox
Date: 2010-01-05 11:40:35 -0500 (Tue, 05 Jan 2010)
New Revision: 8715

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerInternal.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-255

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java	2010-01-05 16:25:50 UTC (rev 8714)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java	2010-01-05 16:40:35 UTC (rev 8715)
@@ -33,4 +33,8 @@
    void reset();
 
    void close();
+   
+   int creditsMapSize();
+   
+   int unReferencedCreditsSize();
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java	2010-01-05 16:25:50 UTC (rev 8714)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java	2010-01-05 16:40:35 UTC (rev 8715)
@@ -31,11 +31,11 @@
 {
    private static final Logger log = Logger.getLogger(ClientProducerCreditManagerImpl.class);
 
-   private static final int MAX_ANON_CREDITS_CACHE_SIZE = 1000;
+   public static final int MAX_UNREFERENCED_CREDITS_CACHE_SIZE = 1000;
 
    private final Map<SimpleString, ClientProducerCredits> producerCredits = new LinkedHashMap<SimpleString, ClientProducerCredits>();
 
-   private final Map<SimpleString, ClientProducerCredits> anonCredits = new LinkedHashMap<SimpleString, ClientProducerCredits>();
+   private final Map<SimpleString, ClientProducerCredits> unReferencedCredits = new LinkedHashMap<SimpleString, ClientProducerCredits>();
 
    private final ClientSessionInternal session;
 
@@ -47,7 +47,7 @@
 
       this.windowSize = windowSize;
    }
-      
+
    public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon)
    {
       ClientProducerCredits credits = producerCredits.get(address);
@@ -58,25 +58,20 @@
          credits = new ClientProducerCreditsImpl(session, address, windowSize);
 
          producerCredits.put(address, credits);
-
-         if (anon)
-         {
-            addToAnonCache(address, credits);
-         }
       }
 
       if (!anon)
       {
          credits.incrementRefCount();
-         
-         //Remove from anon credits (if there)
-         anonCredits.remove(address);                     
+
+         // Remove from anon credits (if there)
+         unReferencedCredits.remove(address);
       }
       else
       {
-         credits.setAnon();
+         addToUnReferencedCache(address, credits);
       }
-      
+
       return credits;
    }
 
@@ -86,15 +81,7 @@
 
       if (credits != null && credits.decrementRefCount() == 0)
       {
-         if (!credits.isAnon())
-         {
-            removeEntry(address, credits);
-         }
-         else
-         {
-            //All the producer refs have been removed but it's been used anonymously too so we add to the anon cache
-            addToAnonCache(address, credits);
-         }
+         addToUnReferencedCache(address, credits);
       }
    }
 
@@ -124,30 +111,42 @@
       }
 
       producerCredits.clear();
+      
+      unReferencedCredits.clear();
    }
    
-   private void addToAnonCache(final SimpleString address, final ClientProducerCredits credits)
+   public synchronized int creditsMapSize()
    {
-      anonCredits.put(address, credits);
-      
-      if (anonCredits.size() > MAX_ANON_CREDITS_CACHE_SIZE)
+      return producerCredits.size();
+   }
+   
+   public synchronized int unReferencedCreditsSize()
+   {
+      return unReferencedCredits.size();
+   }
+
+   private void addToUnReferencedCache(final SimpleString address, final ClientProducerCredits credits)
+   {
+      unReferencedCredits.put(address, credits);
+
+      if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE)
       {
-         //Remove the oldest entry
-         
-         Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = anonCredits.entrySet().iterator();
-         
+         // Remove the oldest entry
+
+         Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = unReferencedCredits.entrySet().iterator();
+
          Map.Entry<SimpleString, ClientProducerCredits> oldest = iter.next();
-         
+
          iter.remove();
-         
-         removeEntry(oldest.getKey(), oldest.getValue());
+
+         removeEntry(oldest.getKey(), oldest.getValue());                 
       }
    }
-   
+
    private void removeEntry(final SimpleString address, final ClientProducerCredits credits)
    {
       producerCredits.remove(address);
-      
+
       credits.releaseOutstanding();
 
       credits.close();

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java	2010-01-05 16:25:50 UTC (rev 8714)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java	2010-01-05 16:40:35 UTC (rev 8715)
@@ -34,9 +34,5 @@
    
    int decrementRefCount();
    
-   void setAnon();
-   
-   boolean isAnon();
-   
    void releaseOutstanding();
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2010-01-05 16:25:50 UTC (rev 8714)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2010-01-05 16:40:35 UTC (rev 8715)
@@ -41,8 +41,6 @@
    
    private int refCount;
    
-   private boolean anon;
-   
    public ClientProducerCreditsImpl(final ClientSessionInternal session,
                                     final SimpleString address,
                                     final int windowSize)
@@ -116,16 +114,6 @@
       session.sendProducerCreditsMessage(permits, address);
    }
    
-   public synchronized boolean isAnon()
-   {
-      return anon;
-   }
-   
-   public synchronized void setAnon()
-   {
-      this.anon = true;
-   }
-   
    private void checkCredits(final int credits)
    {
       int needed = Math.max(credits, windowSize);

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-01-05 16:25:50 UTC (rev 8714)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-01-05 16:40:35 UTC (rev 8715)
@@ -187,6 +187,11 @@
 
    // Public ---------------------------------------------------------------------------------------
 
+   public ClientProducerCredits getProducerCredits()
+   {
+      return credits;
+   }
+   
    // Protected ------------------------------------------------------------------------------------
 
    // Package Private ------------------------------------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerInternal.java	2010-01-05 16:25:50 UTC (rev 8714)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerInternal.java	2010-01-05 16:40:35 UTC (rev 8715)
@@ -25,4 +25,6 @@
 public interface ClientProducerInternal extends ClientProducer
 {
    void cleanUp();
+   
+   ClientProducerCredits getProducerCredits();
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-01-05 16:25:50 UTC (rev 8714)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-01-05 16:40:35 UTC (rev 8715)
@@ -1061,6 +1061,11 @@
    {
       producerCreditManager.receiveCredits(address, credits, offset);
    }
+   
+   public ClientProducerCreditManager getProducerCreditManager()
+   {
+      return producerCreditManager;
+   }
 
    // CommandConfirmationHandler implementation ------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2010-01-05 16:25:50 UTC (rev 8714)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2010-01-05 16:40:35 UTC (rev 8715)
@@ -75,4 +75,6 @@
    void returnCredits(SimpleString address);
 
    void handleReceiveProducerCredits(SimpleString address, int credits, int offset);
+   
+   ClientProducerCreditManager getProducerCreditManager();
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2010-01-05 16:25:50 UTC (rev 8714)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2010-01-05 16:40:35 UTC (rev 8715)
@@ -533,4 +533,9 @@
    {
       session.handleReceiveProducerCredits(address, credits, offset);
    }
+
+   public ClientProducerCreditManager getProducerCreditManager()
+   {
+      return session.getProducerCreditManager();
+   }
 }

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java	2010-01-05 16:25:50 UTC (rev 8714)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java	2010-01-05 16:40:35 UTC (rev 8715)
@@ -12,6 +12,9 @@
  */
 package org.hornetq.tests.integration.client;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -24,6 +27,10 @@
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.core.client.impl.ClientProducerCreditManagerImpl;
+import org.hornetq.core.client.impl.ClientProducerCredits;
+import org.hornetq.core.client.impl.ClientProducerInternal;
+import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.impl.TestSupportPageStore;
 import org.hornetq.core.server.HornetQServer;
@@ -809,4 +816,332 @@
       server.stop();
    }
 
+   public void testProducerCreditsCaching1() throws Exception
+   {
+      HornetQServer server = createServer(false, isNetty());
+
+      server.start();
+
+      ClientSessionFactory sf = createFactory(isNetty());
+
+      final ClientSession session = sf.createSession(false, true, true, true);
+
+      session.createQueue("address", "queue1", null, false);
+      
+      ClientProducerCredits credits = null;
+      
+      for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
+      {
+         ClientProducer prod = session.createProducer("address");
+         
+         ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();         
+         
+         if (credits != null)
+         {            
+            assertTrue(newCredits == credits);
+         }
+         
+         credits = newCredits;
+         
+         assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+         assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+      }
+      
+      session.close();
+
+      server.stop();
+   }
+   
+   public void testProducerCreditsCaching2() throws Exception
+   {
+      HornetQServer server = createServer(false, isNetty());
+
+      server.start();
+
+      ClientSessionFactory sf = createFactory(isNetty());
+
+      final ClientSession session = sf.createSession(false, true, true, true);
+
+      session.createQueue("address", "queue1", null, false);
+      
+      ClientProducerCredits credits = null;
+      
+      for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
+      {
+         ClientProducer prod = session.createProducer("address");
+         
+         ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();         
+         
+         if (credits != null)
+         {            
+            assertTrue(newCredits == credits);
+         }
+         
+         credits = newCredits;
+         
+         prod.close();
+         
+         assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+         assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+      }
+      
+      session.close();
+
+      server.stop();
+   }
+   
+   public void testProducerCreditsCaching3() throws Exception
+   {
+      HornetQServer server = createServer(false, isNetty());
+
+      server.start();
+
+      ClientSessionFactory sf = createFactory(isNetty());
+
+      final ClientSession session = sf.createSession(false, true, true, true);
+
+      session.createQueue("address", "queue1", null, false);
+      
+      ClientProducerCredits credits = null;
+      
+      for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+      {
+         ClientProducer prod = session.createProducer("address" + i);
+         
+         ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();         
+         
+         if (credits != null)
+         {            
+            assertFalse(newCredits == credits);
+         }
+         
+         credits = newCredits;
+         
+         assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+         assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+      }
+      
+      session.close();
+
+      server.stop();
+   }
+   
+   public void testProducerCreditsCaching4() throws Exception
+   {
+      HornetQServer server = createServer(false, isNetty());
+
+      server.start();
+
+      ClientSessionFactory sf = createFactory(isNetty());
+
+      final ClientSession session = sf.createSession(false, true, true, true);
+
+      session.createQueue("address", "queue1", null, false);
+      
+      ClientProducerCredits credits = null;
+      
+      for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+      {
+         ClientProducer prod = session.createProducer("address" + i);
+         
+         ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();         
+         
+         if (credits != null)
+         {            
+            assertFalse(newCredits == credits);
+         }
+         
+         credits = newCredits;
+         
+         prod.close();
+         
+         assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+         assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+      }
+      
+      session.close();
+
+      server.stop();
+   }
+   
+   public void testProducerCreditsCaching5() throws Exception
+   {
+      HornetQServer server = createServer(false, isNetty());
+
+      server.start();
+
+      ClientSessionFactory sf = createFactory(isNetty());
+
+      final ClientSession session = sf.createSession(false, true, true, true);
+
+      session.createQueue("address", "queue1", null, false);
+      
+      ClientProducerCredits credits = null;
+      
+      List<ClientProducerCredits> creditsList = new ArrayList<ClientProducerCredits>();
+      
+      for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+      {
+         ClientProducer prod = session.createProducer("address" + i);
+         
+         ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();         
+         
+         if (credits != null)
+         {            
+            assertFalse(newCredits == credits);
+         }
+         
+         credits = newCredits;
+         
+         assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+         assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+         
+         creditsList.add(credits);
+      }
+      
+      Iterator<ClientProducerCredits> iter = creditsList.iterator();
+      
+      for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+      {
+         ClientProducer prod = session.createProducer("address" + i);
+         
+         ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();    
+         
+         assertTrue(newCredits == iter.next());
+         
+         assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+         assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+      }
+      
+      for (int i = 0; i < 10; i++)
+      {
+         ClientProducer prod = session.createProducer("address" + (i + ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE));
+         
+         assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE + i + 1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+         assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+      }
+      
+      session.close();
+
+      server.stop();
+   }
+   
+   public void testProducerCreditsCaching6() throws Exception
+   {
+      HornetQServer server = createServer(false, isNetty());
+
+      server.start();
+
+      ClientSessionFactory sf = createFactory(isNetty());
+
+      final ClientSession session = sf.createSession(false, true, true, true);
+
+      session.createQueue("address", "queue1", null, false);
+      
+      for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+      {
+         ClientProducer prod = session.createProducer((String)null);
+         
+         prod.send("address", session.createMessage(false));
+         
+         assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+         assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+      }
+      
+      session.close();
+
+      server.stop();
+   }
+   
+   public void testProducerCreditsCaching7() throws Exception
+   {
+      HornetQServer server = createServer(false, isNetty());
+
+      server.start();
+
+      ClientSessionFactory sf = createFactory(isNetty());
+
+      final ClientSession session = sf.createSession(false, true, true, true);
+
+      session.createQueue("address", "queue1", null, false);
+      
+      for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+      {
+         ClientProducer prod = session.createProducer((String)null);
+         
+         prod.send("address" + i, session.createMessage(false));
+         
+         assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+         assertEquals(i + 1, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+      }
+      
+      for (int i = 0; i < 10; i++)
+      {
+         ClientProducer prod = session.createProducer((String)null);
+         
+         prod.send("address" + i, session.createMessage(false));
+         
+         assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+         assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+      }
+      
+      for (int i = 0; i < 10; i++)
+      {
+         ClientProducer prod = session.createProducer((String)null);
+         
+         prod.send("address2-" + i, session.createMessage(false));
+         
+         assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+         assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+      }
+      
+      session.close();
+
+      server.stop();
+   }
+   
+   public void testProducerCreditsRefCounting() throws Exception
+   {
+      HornetQServer server = createServer(false, isNetty());
+
+      server.start();
+
+      ClientSessionFactory sf = createFactory(isNetty());
+
+      final ClientSession session = sf.createSession(false, true, true, true);
+
+      session.createQueue("address", "queue1", null, false);
+      
+      ClientProducer prod1 = session.createProducer("address");
+      assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+      assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+      
+      ClientProducer prod2 = session.createProducer("address");
+      assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+      assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+      
+      ClientProducer prod3 = session.createProducer("address");
+      assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+      assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+      
+      prod1.close();
+      
+      assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+      assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+      
+      prod2.close();
+      
+      assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+      assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+      
+      prod3.close();
+      
+      assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+      assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+      
+      session.close();
+
+      server.stop();
+   }
+   
 }



More information about the hornetq-commits mailing list