[hornetq-commits] JBoss hornetq SVN: r10778 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Jun 6 21:45:24 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-06-06 21:45:23 -0400 (Mon, 06 Jun 2011)
New Revision: 10778

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/messagecounter/MessageCounter.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
Log:
JBPAPP-6646 - performance issue on paging - avoiding locking through scheduledExecutor and blockOnExecutor for Counters

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/messagecounter/MessageCounter.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/messagecounter/MessageCounter.java	2011-06-06 20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/messagecounter/MessageCounter.java	2011-06-07 01:45:23 UTC (rev 10778)
@@ -27,7 +27,7 @@
 import java.util.GregorianCalendar;
 import java.util.List;
 
-import org.hornetq.api.core.management.QueueControl;
+import org.hornetq.core.server.Queue;
 
 /**
  * This class stores message count informations for a given queue
@@ -58,8 +58,7 @@
 
    private final boolean destDurable;
 
-   // destination queue
-   private final QueueControl destQueue;
+   private final Queue serverQueue;
 
    // counter
    private long countTotal;
@@ -95,7 +94,7 @@
     */
    public MessageCounter(final String name,
                          final String subscription,
-                         final QueueControl queue,
+                         final Queue serverQueue,
                          final boolean topic,
                          final boolean durable,
                          final int daycountmax)
@@ -105,7 +104,7 @@
       destSubscription = subscription;
       destTopic = topic;
       destDurable = durable;
-      destQueue = queue;
+      this.serverQueue = serverQueue;
 
       // initialize counter
       resetCounter();
@@ -115,7 +114,33 @@
 
       setHistoryLimit(daycountmax);
    }
+   
+   private Runnable onTimeExecutor = new Runnable()
+   {
+      public void run()
+      {
+         long latestMessagesAdded = serverQueue.getInstantMessagesAdded();
 
+         long newMessagesAdded = latestMessagesAdded - lastMessagesAdded;
+
+         countTotal += newMessagesAdded;
+         
+         lastMessagesAdded = latestMessagesAdded;
+
+         if (newMessagesAdded > 0)
+         {
+            timeLastAdd = System.currentTimeMillis();
+         }
+
+         // update timestamp
+         timeLastUpdate = System.currentTimeMillis();
+
+         // update message history
+         updateHistory(newMessagesAdded);
+         
+      }
+   };
+
    // Public --------------------------------------------------------
 
    /*
@@ -123,24 +148,11 @@
     */
    public synchronized void onTimer()
    {
-      long latestMessagesAdded = destQueue.getMessagesAdded();
-
-      long newMessagesAdded = latestMessagesAdded - lastMessagesAdded;
-
-      countTotal += newMessagesAdded;
-
-      lastMessagesAdded = latestMessagesAdded;
-
-      if (newMessagesAdded > 0)
-      {
-         timeLastAdd = System.currentTimeMillis();
-      }
-
-      // update timestamp
-      timeLastUpdate = System.currentTimeMillis();
-
-      // update message history
-      updateHistory(newMessagesAdded);
+      // Actor approach here: Instead of having the Counter locking the queue, we will use the Queue's executor
+      // instead of possibly making an lock on the queue.
+      // This way the scheduled Threads will be free to keep doing their pings in case the server is busy with paging or 
+      // any other deliveries
+      serverQueue.getExecutor().execute(onTimeExecutor);
    }
 
    public String getDestinationName()
@@ -190,7 +202,7 @@
     */
    public long getMessageCount()
    {
-      return destQueue.getMessageCount();
+      return serverQueue.getInstantMessageCount();
    }
 
    /**
@@ -199,7 +211,7 @@
     */
    public long getMessageCountDelta()
    {
-      long current = destQueue.getMessageCount();
+      long current = serverQueue.getInstantMessageCount();
       int delta = (int)(current - depthLast);
 
       depthLast = current;
@@ -334,8 +346,8 @@
              destTopic +
              ", destDurable=" +
              destDurable +
-             ", destQueue=" +
-             destQueue +
+             ", serverQueue =" +
+             serverQueue +
              "]";
    }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java	2011-06-06 20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java	2011-06-07 01:45:23 UTC (rev 10778)
@@ -76,6 +76,9 @@
    void forceDelivery();
 
    long getMessageCount();
+   
+   /** Return the current message count without waiting for scheduled executors to finish */
+   long getInstantMessageCount();
 
    int getDeliveringCount();
 
@@ -86,6 +89,8 @@
    List<MessageReference> getScheduledMessages();
 
    long getMessagesAdded();
+   
+   long getInstantMessagesAdded();
 
    MessageReference removeReferenceWithID(long id) throws Exception;
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-06-06 20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-06-07 01:45:23 UTC (rev 10778)
@@ -727,7 +727,12 @@
    public long getMessageCount()
    {
       blockOnExecutorFuture();
-
+      
+      return getInstantMessageCount();
+   }
+   
+   public long getInstantMessageCount()
+   {
       synchronized (this)
       {
          if (pageSubscription != null)
@@ -892,7 +897,12 @@
    public long getMessagesAdded()
    {
       blockOnExecutorFuture();
-
+      
+      return getInstantMessagesAdded();
+  }
+   
+   public long getInstantMessagesAdded()
+   {
       synchronized (this)
       {
          if (pageSubscription != null)
@@ -904,7 +914,7 @@
             return messagesAdded;
          }
       }
-   }
+    }
 
    public int deleteAllReferences() throws Exception
    {
@@ -1543,6 +1553,8 @@
                holder.iter.remove();
 
                refRemoved(ref);
+               
+               handled++;
 
                continue;
             }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java	2011-06-06 20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java	2011-06-07 01:45:23 UTC (rev 10778)
@@ -246,7 +246,7 @@
       {
          MessageCounter counter = new MessageCounter(queue.getName().toString(),
                                                      null,
-                                                     queueControl,
+                                                     queue,
                                                      false,
                                                      queue.isDurable(),
                                                      messageCounterManager.getMaxDayCount());

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2011-06-06 20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2011-06-07 01:45:23 UTC (rev 10778)
@@ -48,6 +48,7 @@
 import org.hornetq.core.security.Role;
 import org.hornetq.core.server.ActivateCallback;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.impl.HornetQServerImpl;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.ResourceManager;
@@ -193,7 +194,7 @@
    {
       active = true;
 
-      jmsManagementService = new JMSManagementServiceImpl(server.getManagementService(), this);
+      jmsManagementService = new JMSManagementServiceImpl(server.getManagementService(), server, this);
 
       try
       {
@@ -1036,7 +1037,7 @@
             coreFilterString = SelectorTranslator.convertToHornetQFilterString(selectorString);
          }
 
-         server.deployQueue(SimpleString.toSimpleString(hqQueue.getAddress()),
+         Queue queue = server.deployQueue(SimpleString.toSimpleString(hqQueue.getAddress()),
                             SimpleString.toSimpleString(hqQueue.getAddress()),
                             SimpleString.toSimpleString(coreFilterString),
                             durable,

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java	2011-06-06 20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java	2011-06-07 01:45:23 UTC (rev 10778)
@@ -15,6 +15,7 @@
 
 import javax.management.ObjectName;
 
+import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.management.AddressControl;
 import org.hornetq.api.core.management.QueueControl;
 import org.hornetq.api.core.management.ResourceNames;
@@ -24,6 +25,8 @@
 import org.hornetq.api.jms.management.TopicControl;
 import org.hornetq.core.messagecounter.MessageCounter;
 import org.hornetq.core.messagecounter.MessageCounterManager;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.management.ManagementService;
 import org.hornetq.jms.client.HornetQConnectionFactory;
 import org.hornetq.jms.client.HornetQDestination;
@@ -52,13 +55,16 @@
    private final ManagementService managementService;
    
    private final JMSServerManager jmsServerManager;
+   
+   private final HornetQServer server;
 
    // Static --------------------------------------------------------
 
-   public JMSManagementServiceImpl(final ManagementService managementService, final JMSServerManager jmsServerManager)
+   public JMSManagementServiceImpl(final ManagementService managementService, final HornetQServer server, final JMSServerManager jmsServerManager)
    {
       this.managementService = managementService;
       this.jmsServerManager = jmsServerManager;
+      this.server = server;
    }
 
    // Public --------------------------------------------------------
@@ -83,11 +89,12 @@
 
    public synchronized void registerQueue(final HornetQQueue queue) throws Exception
    {
+      Queue serverQueue = server.locateQueue(new SimpleString(queue.getName()));
       QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue.getAddress());
       MessageCounterManager messageCounterManager = managementService.getMessageCounterManager();
       MessageCounter counter = new MessageCounter(queue.getName(),
                                                   null,
-                                                  coreQueueControl,
+                                                  serverQueue,
                                                   false,
                                                   coreQueueControl.isDurable(),
                                                   messageCounterManager.getMaxDayCount());

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2011-06-06 20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2011-06-07 01:45:23 UTC (rev 10778)
@@ -665,4 +665,22 @@
       
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.Queue#getInstantMessageCount()
+    */
+   public long getInstantMessageCount()
+   {
+      // TODO Auto-generated method stub
+      return 0;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.Queue#getInstantMessagesAdded()
+    */
+   public long getInstantMessagesAdded()
+   {
+      // TODO Auto-generated method stub
+      return 0;
+   }
+
 }
\ No newline at end of file

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java	2011-06-06 20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java	2011-06-07 01:45:23 UTC (rev 10778)
@@ -659,6 +659,24 @@
          
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.server.Queue#getInstantMessageCount()
+       */
+      public long getInstantMessageCount()
+      {
+         // TODO Auto-generated method stub
+         return 0;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.server.Queue#getInstantMessagesAdded()
+       */
+      public long getInstantMessagesAdded()
+      {
+         // TODO Auto-generated method stub
+         return 0;
+      }
+
    }
 
 }



More information about the hornetq-commits mailing list