[hornetq-commits] JBoss hornetq SVN: r8709 - in trunk/src/main/org/hornetq/core: server/impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Jan 4 12:12:31 EST 2010


Author: timfox
Date: 2010-01-04 12:12:31 -0500 (Mon, 04 Jan 2010)
New Revision: 8709

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-255

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java	2010-01-04 16:56:10 UTC (rev 8708)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java	2010-01-04 17:12:31 UTC (rev 8709)
@@ -24,7 +24,9 @@
  */
 public interface ClientProducerCreditManager
 {
-   ClientProducerCredits getCredits(SimpleString address);
+   ClientProducerCredits getCredits(SimpleString address, boolean anon);
+   
+   void returnCredits(SimpleString address);
 
    void receiveCredits(SimpleString address, int credits, int offset);
 

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java	2010-01-04 16:56:10 UTC (rev 8708)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java	2010-01-04 17:12:31 UTC (rev 8709)
@@ -13,10 +13,12 @@
 
 package org.hornetq.core.client.impl;
 
-import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.hornetq.SimpleString;
+import org.hornetq.core.logging.Logger;
 
 /**
  * A ProducerCreditManager
@@ -27,8 +29,14 @@
  */
 public class ClientProducerCreditManagerImpl implements ClientProducerCreditManager
 {
-   private final Map<SimpleString, ClientProducerCredits> producerCredits = new HashMap<SimpleString, ClientProducerCredits>();
+   private static final Logger log = Logger.getLogger(ClientProducerCreditManagerImpl.class);
 
+   private static final int MAX_ANON_CREDITS_CACHE_SIZE = 1000;
+
+   private final Map<SimpleString, ClientProducerCredits> producerCredits = new LinkedHashMap<SimpleString, ClientProducerCredits>();
+
+   private final Map<SimpleString, ClientProducerCredits> anonCredits = new LinkedHashMap<SimpleString, ClientProducerCredits>();
+
    private final ClientSessionInternal session;
 
    private final int windowSize;
@@ -39,8 +47,8 @@
 
       this.windowSize = windowSize;
    }
-
-   public synchronized ClientProducerCredits getCredits(final SimpleString address)
+      
+   public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon)
    {
       ClientProducerCredits credits = producerCredits.get(address);
 
@@ -50,11 +58,46 @@
          credits = new ClientProducerCreditsImpl(session, address, windowSize);
 
          producerCredits.put(address, credits);
+
+         if (anon)
+         {
+            addToAnonCache(address, credits);
+         }
       }
 
+      if (!anon)
+      {
+         credits.incrementRefCount();
+         
+         //Remove from anon credits (if there)
+         anonCredits.remove(address);                     
+      }
+      else
+      {
+         credits.setAnon();
+      }
+      
       return credits;
    }
 
+   public synchronized void returnCredits(final SimpleString address)
+   {
+      ClientProducerCredits credits = producerCredits.get(address);
+
+      if (credits != null && credits.decrementRefCount() == 0)
+      {
+         if (!credits.isAnon())
+         {
+            removeEntry(address, credits);
+         }
+         else
+         {
+            //All the producer refs have been removed but it's been used anonymously too so we add to the anon cache
+            addToAnonCache(address, credits);
+         }
+      }
+   }
+
    public synchronized void receiveCredits(final SimpleString address, final int credits, final int offset)
    {
       ClientProducerCredits cr = producerCredits.get(address);
@@ -82,4 +125,32 @@
 
       producerCredits.clear();
    }
+   
+   private void addToAnonCache(final SimpleString address, final ClientProducerCredits credits)
+   {
+      anonCredits.put(address, credits);
+      
+      if (anonCredits.size() > MAX_ANON_CREDITS_CACHE_SIZE)
+      {
+         //Remove the oldest entry
+         
+         Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = anonCredits.entrySet().iterator();
+         
+         Map.Entry<SimpleString, ClientProducerCredits> oldest = iter.next();
+         
+         iter.remove();
+         
+         removeEntry(oldest.getKey(), oldest.getValue());
+      }
+   }
+   
+   private void removeEntry(final SimpleString address, final ClientProducerCredits credits)
+   {
+      producerCredits.remove(address);
+      
+      credits.releaseOutstanding();
+
+      credits.close();
+   }
+
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java	2010-01-04 16:56:10 UTC (rev 8708)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java	2010-01-04 17:12:31 UTC (rev 8709)
@@ -29,4 +29,14 @@
    void reset();
 
    void close();
+   
+   void incrementRefCount();
+   
+   int decrementRefCount();
+   
+   void setAnon();
+   
+   boolean isAnon();
+   
+   void releaseOutstanding();
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2010-01-04 16:56:10 UTC (rev 8708)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2010-01-04 17:12:31 UTC (rev 8709)
@@ -19,7 +19,7 @@
 import org.hornetq.core.logging.Logger;
 
 /**
- * A ProducerCredits
+ * A ClientProducerCreditsImpl
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
@@ -38,7 +38,11 @@
    private final ClientSessionInternal session;
 
    private int arriving;
-
+   
+   private int refCount;
+   
+   private boolean anon;
+   
    public ClientProducerCreditsImpl(final ClientSessionInternal session,
                                     final SimpleString address,
                                     final int windowSize)
@@ -94,7 +98,34 @@
 
       semaphore.release(Integer.MAX_VALUE / 2);
    }
-
+    
+   public synchronized void incrementRefCount()
+   {
+      refCount++;
+   }
+   
+   public synchronized int decrementRefCount()
+   {
+      return --refCount;
+   }
+   
+   public synchronized void releaseOutstanding()
+   {
+      int permits = semaphore.drainPermits();
+      
+      session.sendProducerCreditsMessage(permits, address);
+   }
+   
+   public synchronized boolean isAnon()
+   {
+      return anon;
+   }
+   
+   public synchronized void setAnon()
+   {
+      this.anon = true;
+   }
+   
    private void checkCredits(final int credits)
    {
       int needed = Math.max(credits, windowSize);
@@ -119,7 +150,6 @@
 
    private void requestCredits(final int credits)
    {
-
       session.sendProducerCreditsMessage(credits, address);
    }
 

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-01-04 16:56:10 UTC (rev 8708)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-01-04 17:12:31 UTC (rev 8709)
@@ -111,7 +111,7 @@
 
       if (address != null)
       {
-         credits = session.getCredits(address);
+         credits = session.getCredits(address, false);
       }
       else
       {
@@ -151,7 +151,7 @@
       {
          return;
       }
-
+            
       doCleanup();
    }
 
@@ -195,6 +195,11 @@
 
    private void doCleanup()
    {
+      if (address != null)
+      {
+         session.returnCredits(address);
+      }
+      
       session.removeProducer(this);
 
       closed = true;
@@ -211,7 +216,7 @@
          msgI.setAddress(address);
 
          // Anonymous
-         theCredits = session.getCredits(address);
+         theCredits = session.getCredits(address, true);
       }
       else
       {

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-01-04 16:56:10 UTC (rev 8708)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-01-04 17:12:31 UTC (rev 8709)
@@ -1047,10 +1047,15 @@
       channel.send(new SessionRequestProducerCreditsMessage(credits, address));
    }
 
-   public ClientProducerCredits getCredits(final SimpleString address)
+   public ClientProducerCredits getCredits(final SimpleString address, final boolean anon)
    {
-      return producerCreditManager.getCredits(address);
+      return producerCreditManager.getCredits(address, anon);
    }
+   
+   public void returnCredits(final SimpleString address)
+   {
+      producerCreditManager.returnCredits(address);
+   }
 
    public void handleReceiveProducerCredits(final SimpleString address, final int credits, final int offset)
    {

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2010-01-04 16:56:10 UTC (rev 8708)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2010-01-04 17:12:31 UTC (rev 8709)
@@ -70,7 +70,9 @@
 
    void sendProducerCreditsMessage(int credits, SimpleString address);
 
-   ClientProducerCredits getCredits(SimpleString address);
+   ClientProducerCredits getCredits(SimpleString address, boolean anon);
+   
+   void returnCredits(SimpleString address);
 
    void handleReceiveProducerCredits(SimpleString address, int credits, int offset);
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2010-01-04 16:56:10 UTC (rev 8708)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2010-01-04 17:12:31 UTC (rev 8709)
@@ -519,10 +519,15 @@
       session.sendProducerCreditsMessage(credits, address);
    }
 
-   public ClientProducerCredits getCredits(final SimpleString address)
+   public ClientProducerCredits getCredits(final SimpleString address, final boolean anon)
    {
-      return session.getCredits(address);
+      return session.getCredits(address, anon);
    }
+   
+   public void returnCredits(final SimpleString address)
+   {
+      session.returnCredits(address);
+   }
 
    public void handleReceiveProducerCredits(final SimpleString address, final int credits, final int offset)
    {

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-01-04 16:56:10 UTC (rev 8708)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-01-04 17:12:31 UTC (rev 8709)
@@ -1485,7 +1485,7 @@
             throw new HornetQException(HornetQException.ILLEGAL_STATE, "large-message not initialized on server");
          }
 
-         // Immediately release the credits for the continuations- these don't contrinute to the in-memory size
+         // Immediately release the credits for the continuations- these don't contribute to the in-memory size
          // of the message
 
          releaseOutStanding(currentLargeMessage, packet.getPacketSize());
@@ -1535,30 +1535,39 @@
       final CreditManagerHolder holder = getCreditManagerHolder(address);
 
       int credits = packet.getCredits();
-
-      int gotCredits = holder.manager.acquireCredits(credits, new CreditsAvailableRunnable()
+      
+      //Requesting -ve credits means returning them
+      
+      if (credits < 0)
       {
-         public boolean run(final int credits)
+         releaseOutStanding(address, -credits);
+      }
+      else
+      {
+         int gotCredits = holder.manager.acquireCredits(credits, new CreditsAvailableRunnable()
          {
-            synchronized (ServerSessionImpl.this)
+            public boolean run(final int credits)
             {
-               if (!closed)
+               synchronized (ServerSessionImpl.this)
                {
-                  sendProducerCredits(holder, credits, address);
-
-                  return true;
+                  if (!closed)
+                  {
+                     sendProducerCredits(holder, credits, address);
+   
+                     return true;
+                  }
+                  else
+                  {
+                     return false;
+                  }
                }
-               else
-               {
-                  return false;
-               }
             }
+         });
+   
+         if (gotCredits > 0)
+         {
+            sendProducerCredits(holder, gotCredits, address);
          }
-      });
-
-      if (gotCredits > 0)
-      {
-         sendProducerCredits(holder, gotCredits, address);
       }
 
       sendResponse(packet, null, false, false);
@@ -1932,7 +1941,12 @@
     */
    private void releaseOutStanding(final ServerMessage message, final int credits) throws Exception
    {
-      CreditManagerHolder holder = getCreditManagerHolder(message.getAddress());
+      releaseOutStanding(message.getAddress(), credits);
+   }
+   
+   private void releaseOutStanding(final SimpleString address, final int credits) throws Exception
+   {
+      CreditManagerHolder holder = getCreditManagerHolder(address);
 
       holder.outstandingCredits -= credits;
 



More information about the hornetq-commits mailing list