Author: clebert.suconic(a)jboss.com
Date: 2011-06-06 21:45:23 -0400 (Mon, 06 Jun 2011)
New Revision: 10778
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/messagecounter/MessageCounter.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
Log:
JBPAPP-6646 - performance issue on paging - avoiding locking through scheduledExecutor and
blockOnExecutor for Counters
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/messagecounter/MessageCounter.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/messagecounter/MessageCounter.java 2011-06-06
20:25:18 UTC (rev 10777)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/messagecounter/MessageCounter.java 2011-06-07
01:45:23 UTC (rev 10778)
@@ -27,7 +27,7 @@
import java.util.GregorianCalendar;
import java.util.List;
-import org.hornetq.api.core.management.QueueControl;
+import org.hornetq.core.server.Queue;
/**
* This class stores message count informations for a given queue
@@ -58,8 +58,7 @@
private final boolean destDurable;
- // destination queue
- private final QueueControl destQueue;
+ private final Queue serverQueue;
// counter
private long countTotal;
@@ -95,7 +94,7 @@
*/
public MessageCounter(final String name,
final String subscription,
- final QueueControl queue,
+ final Queue serverQueue,
final boolean topic,
final boolean durable,
final int daycountmax)
@@ -105,7 +104,7 @@
destSubscription = subscription;
destTopic = topic;
destDurable = durable;
- destQueue = queue;
+ this.serverQueue = serverQueue;
// initialize counter
resetCounter();
@@ -115,7 +114,33 @@
setHistoryLimit(daycountmax);
}
+
+ private Runnable onTimeExecutor = new Runnable()
+ {
+ public void run()
+ {
+ long latestMessagesAdded = serverQueue.getInstantMessagesAdded();
+ long newMessagesAdded = latestMessagesAdded - lastMessagesAdded;
+
+ countTotal += newMessagesAdded;
+
+ lastMessagesAdded = latestMessagesAdded;
+
+ if (newMessagesAdded > 0)
+ {
+ timeLastAdd = System.currentTimeMillis();
+ }
+
+ // update timestamp
+ timeLastUpdate = System.currentTimeMillis();
+
+ // update message history
+ updateHistory(newMessagesAdded);
+
+ }
+ };
+
// Public --------------------------------------------------------
/*
@@ -123,24 +148,11 @@
*/
public synchronized void onTimer()
{
- long latestMessagesAdded = destQueue.getMessagesAdded();
-
- long newMessagesAdded = latestMessagesAdded - lastMessagesAdded;
-
- countTotal += newMessagesAdded;
-
- lastMessagesAdded = latestMessagesAdded;
-
- if (newMessagesAdded > 0)
- {
- timeLastAdd = System.currentTimeMillis();
- }
-
- // update timestamp
- timeLastUpdate = System.currentTimeMillis();
-
- // update message history
- updateHistory(newMessagesAdded);
+ // Actor approach here: Instead of having the Counter locking the queue, we will
use the Queue's executor
+ // instead of possibly making an lock on the queue.
+ // This way the scheduled Threads will be free to keep doing their pings in case
the server is busy with paging or
+ // any other deliveries
+ serverQueue.getExecutor().execute(onTimeExecutor);
}
public String getDestinationName()
@@ -190,7 +202,7 @@
*/
public long getMessageCount()
{
- return destQueue.getMessageCount();
+ return serverQueue.getInstantMessageCount();
}
/**
@@ -199,7 +211,7 @@
*/
public long getMessageCountDelta()
{
- long current = destQueue.getMessageCount();
+ long current = serverQueue.getInstantMessageCount();
int delta = (int)(current - depthLast);
depthLast = current;
@@ -334,8 +346,8 @@
destTopic +
", destDurable=" +
destDurable +
- ", destQueue=" +
- destQueue +
+ ", serverQueue =" +
+ serverQueue +
"]";
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-06-06
20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-06-07
01:45:23 UTC (rev 10778)
@@ -76,6 +76,9 @@
void forceDelivery();
long getMessageCount();
+
+ /** Return the current message count without waiting for scheduled executors to finish
*/
+ long getInstantMessageCount();
int getDeliveringCount();
@@ -86,6 +89,8 @@
List<MessageReference> getScheduledMessages();
long getMessagesAdded();
+
+ long getInstantMessagesAdded();
MessageReference removeReferenceWithID(long id) throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-06-06
20:25:18 UTC (rev 10777)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-06-07
01:45:23 UTC (rev 10778)
@@ -727,7 +727,12 @@
public long getMessageCount()
{
blockOnExecutorFuture();
-
+
+ return getInstantMessageCount();
+ }
+
+ public long getInstantMessageCount()
+ {
synchronized (this)
{
if (pageSubscription != null)
@@ -892,7 +897,12 @@
public long getMessagesAdded()
{
blockOnExecutorFuture();
-
+
+ return getInstantMessagesAdded();
+ }
+
+ public long getInstantMessagesAdded()
+ {
synchronized (this)
{
if (pageSubscription != null)
@@ -904,7 +914,7 @@
return messagesAdded;
}
}
- }
+ }
public int deleteAllReferences() throws Exception
{
@@ -1543,6 +1553,8 @@
holder.iter.remove();
refRemoved(ref);
+
+ handled++;
continue;
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-06-06
20:25:18 UTC (rev 10777)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-06-07
01:45:23 UTC (rev 10778)
@@ -246,7 +246,7 @@
{
MessageCounter counter = new MessageCounter(queue.getName().toString(),
null,
- queueControl,
+ queue,
false,
queue.isDurable(),
messageCounterManager.getMaxDayCount());
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-06-06
20:25:18 UTC (rev 10777)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-06-07
01:45:23 UTC (rev 10778)
@@ -48,6 +48,7 @@
import org.hornetq.core.security.Role;
import org.hornetq.core.server.ActivateCallback;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.ResourceManager;
@@ -193,7 +194,7 @@
{
active = true;
- jmsManagementService = new JMSManagementServiceImpl(server.getManagementService(),
this);
+ jmsManagementService = new JMSManagementServiceImpl(server.getManagementService(),
server, this);
try
{
@@ -1036,7 +1037,7 @@
coreFilterString =
SelectorTranslator.convertToHornetQFilterString(selectorString);
}
- server.deployQueue(SimpleString.toSimpleString(hqQueue.getAddress()),
+ Queue queue =
server.deployQueue(SimpleString.toSimpleString(hqQueue.getAddress()),
SimpleString.toSimpleString(hqQueue.getAddress()),
SimpleString.toSimpleString(coreFilterString),
durable,
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2011-06-06
20:25:18 UTC (rev 10777)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2011-06-07
01:45:23 UTC (rev 10778)
@@ -15,6 +15,7 @@
import javax.management.ObjectName;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.AddressControl;
import org.hornetq.api.core.management.QueueControl;
import org.hornetq.api.core.management.ResourceNames;
@@ -24,6 +25,8 @@
import org.hornetq.api.jms.management.TopicControl;
import org.hornetq.core.messagecounter.MessageCounter;
import org.hornetq.core.messagecounter.MessageCounterManager;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
@@ -52,13 +55,16 @@
private final ManagementService managementService;
private final JMSServerManager jmsServerManager;
+
+ private final HornetQServer server;
// Static --------------------------------------------------------
- public JMSManagementServiceImpl(final ManagementService managementService, final
JMSServerManager jmsServerManager)
+ public JMSManagementServiceImpl(final ManagementService managementService, final
HornetQServer server, final JMSServerManager jmsServerManager)
{
this.managementService = managementService;
this.jmsServerManager = jmsServerManager;
+ this.server = server;
}
// Public --------------------------------------------------------
@@ -83,11 +89,12 @@
public synchronized void registerQueue(final HornetQQueue queue) throws Exception
{
+ Queue serverQueue = server.locateQueue(new SimpleString(queue.getName()));
QueueControl coreQueueControl =
(QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE +
queue.getAddress());
MessageCounterManager messageCounterManager =
managementService.getMessageCounterManager();
MessageCounter counter = new MessageCounter(queue.getName(),
null,
- coreQueueControl,
+ serverQueue,
false,
coreQueueControl.isDurable(),
messageCounterManager.getMaxDayCount());
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-06-06
20:25:18 UTC (rev 10777)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-06-07
01:45:23 UTC (rev 10778)
@@ -665,4 +665,22 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getInstantMessageCount()
+ */
+ public long getInstantMessageCount()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getInstantMessagesAdded()
+ */
+ public long getInstantMessagesAdded()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
\ No newline at end of file
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-06-06
20:25:18 UTC (rev 10777)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-06-07
01:45:23 UTC (rev 10778)
@@ -659,6 +659,24 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getInstantMessageCount()
+ */
+ public long getInstantMessageCount()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getInstantMessagesAdded()
+ */
+ public long getInstantMessagesAdded()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
}