[hornetq-commits] JBoss hornetq SVN: r7972 - in trunk: src/main/org/hornetq/core/management/impl and 6 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Sep 18 12:59:06 EDT 2009
Author: timfox
Date: 2009-09-18 12:59:05 -0400 (Fri, 18 Sep 2009)
New Revision: 7972
Modified:
trunk/src/main/org/hornetq/core/management/QueueControl.java
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-82 merged patch
Modified: trunk/src/main/org/hornetq/core/management/QueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/QueueControl.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/src/main/org/hornetq/core/management/QueueControl.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -124,4 +124,13 @@
@Operation(desc = "List the message counters history HTML", impact = INFO)
String listMessageCounterHistoryAsHTML() throws Exception;
+
+ @Operation(desc = "Pauses the Queue", impact = ACTION)
+ void pause() throws Exception;
+
+ @Operation(desc = "Resumes delivery of queued messages and gets the queue out of paused state.", impact = ACTION)
+ void resume() throws Exception;
+
+ @Operation(desc = "Inspects if the queue is paused", impact = INFO)
+ boolean isPaused() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -19,6 +19,7 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.MessageCounterInfo;
import org.hornetq.core.management.QueueControl;
import org.hornetq.core.message.Message;
@@ -42,8 +43,9 @@
*/
public class QueueControlImpl implements QueueControl
{
-
// Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(QueueControlImpl.class);
// Attributes ----------------------------------------------------
@@ -65,11 +67,11 @@
for (int i = 0; i < messages.length; i++)
{
Map<String, Object> message = messages[i];
- array.put(new JSONObject(message));
+ array.put(new JSONObject(message));
}
return array.toString();
}
-
+
/**
* Returns null if the string is null or empty
*/
@@ -78,7 +80,8 @@
if (filterStr == null || filterStr.trim().length() == 0)
{
return null;
- } else
+ }
+ else
{
return new FilterImpl(new SimpleString(filterStr));
}
@@ -87,9 +90,9 @@
// Constructors --------------------------------------------------
public QueueControlImpl(final Queue queue,
- final String address,
- final PostOffice postOffice,
- final HierarchicalRepository<AddressSettings> addressSettingsRepository)
+ final String address,
+ final PostOffice postOffice,
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository)
{
this.queue = queue;
this.address = address;
@@ -211,12 +214,12 @@
AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
SimpleString sExpiryAddress = new SimpleString(expiryAddress);
-
+
if (expiryAddress != null)
{
addressSettings.setExpiryAddress(sExpiryAddress);
}
-
+
queue.setExpiryAddress(sExpiryAddress);
}
@@ -232,14 +235,14 @@
}
return messages;
}
-
+
public String listScheduledMessagesAsJSON() throws Exception
{
return toJSON(listScheduledMessages());
}
public Map<String, Object>[] listMessages(final String filterStr) throws Exception
- {
+ {
try
{
Filter filter = createFilter(filterStr);
@@ -258,7 +261,7 @@
throw new IllegalStateException(e.getMessage());
}
}
-
+
public String listMessagesAsJSON(String filter) throws Exception
{
return toJSON(listMessages(filter));
@@ -408,6 +411,24 @@
return MessageCounterHelper.listMessageCounterHistoryAsHTML(new MessageCounter[] { counter });
}
+ public void pause()
+ {
+ log.info("calling pause");
+ queue.pause();
+ }
+
+ public void resume()
+ {
+ log.info("calling resume");
+ queue.resume();
+ }
+
+ public boolean isPaused() throws Exception
+ {
+ log.info("calling isPaused");
+ return queue.isPaused();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/src/main/org/hornetq/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -246,6 +246,21 @@
info.getNotifications());
}
+ public boolean isPaused() throws Exception
+ {
+ return localQueueControl.isPaused();
+ }
+
+ public void pause() throws Exception
+ {
+ localQueueControl.pause();
+ }
+
+ public void resume() throws Exception
+ {
+ localQueueControl.resume();
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -150,4 +150,22 @@
Iterator<MessageReference> iterator();
void setExpiryAddress(SimpleString expiryAddress);
+ /**
+ * Pauses the queue. It will receive messages but won't give them to the consumers until resumed.
+ * If a queue is paused, pausing it again will only throw a warning.
+ * To check if a queue is paused, invoke <i>isPaused()</i>
+ */
+ void pause();
+ /**
+ * Resumes the delivery of message for the queue.
+ * If a queue is resumed, resuming it again will only throw a warning.
+ * To check if a queue is resumed, invoke <i>isPaused()</i>
+ */
+ void resume();
+ /**
+ *
+ * @return true if paused, false otherwise.
+ */
+ boolean isPaused();
+
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -113,6 +113,8 @@
private final AtomicBoolean waitingToDeliver = new AtomicBoolean(false);
+ private boolean paused;
+
private final Runnable deliverRunner = new DeliverRunner();
private final PagingManager pagingManager;
@@ -146,7 +148,7 @@
private final Map<Consumer, Iterator<MessageReference>> iterators = new HashMap<Consumer, Iterator<MessageReference>>();
private ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
-
+
private volatile SimpleString expiryAddress;
public QueueImpl(final long persistenceID,
@@ -192,7 +194,7 @@
direct = true;
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
-
+
if (addressSettingsRepository != null)
{
expiryAddress = addressSettingsRepository.getMatch(address.toString()).getExpiryAddress();
@@ -202,7 +204,7 @@
expiryAddress = null;
}
}
-
+
// Bindable implementation -------------------------------------------------------------------------------------
public SimpleString getRoutingName()
@@ -748,21 +750,21 @@
messageReferences.addFirst(reference, reference.getMessage().getPriority());
}
}
- }
+ }
public void expire(final MessageReference ref) throws Exception
- {
+ {
log.info("expiring ref " + this.expiryAddress);
if (expiryAddress != null)
{
- move(expiryAddress, ref, true);
+ move(expiryAddress, ref, true);
}
else
- {
+ {
acknowledge(ref);
}
}
-
+
public void setExpiryAddress(final SimpleString expiryAddress)
{
this.expiryAddress = expiryAddress;
@@ -1289,7 +1291,7 @@
// with the live node. Instead, when we replicate the delivery we remove
// the ref from the queue
- if (backup)
+ if (backup || paused)
{
return;
}
@@ -1307,11 +1309,11 @@
Iterator<MessageReference> iterator = null;
- //TODO - this needs to be optimised!! Creating too much stuff on an inner loop
+ // TODO - this needs to be optimised!! Creating too much stuff on an inner loop
int totalConsumers = distributionPolicy.getConsumerCount();
Set<Consumer> busyConsumers = new HashSet<Consumer>();
Set<Consumer> nullReferences = new HashSet<Consumer>();
-
+
while (true)
{
consumer = distributionPolicy.getNextConsumer();
@@ -1331,7 +1333,7 @@
else
{
reference = null;
-
+
if (consumer.getFilter() != null)
{
// we have iterated on the whole queue for
@@ -1344,7 +1346,7 @@
if (reference == null)
{
- nullReferences.add(consumer);
+ nullReferences.add(consumer);
if (nullReferences.size() + busyConsumers.size() == totalConsumers)
{
startDepaging();
@@ -1358,10 +1360,10 @@
else
{
nullReferences.remove(consumer);
-
+
if (reference.getMessage().isExpired())
{
- //We expire messages on the server too
+ // We expire messages on the server too
if (iterator == null)
{
messageReferences.removeFirst();
@@ -1370,9 +1372,9 @@
{
iterator.remove();
}
-
+
referenceHandled();
-
+
try
{
expire(reference);
@@ -1381,7 +1383,7 @@
{
log.error("Failed to expire ref", e);
}
-
+
continue;
}
}
@@ -1447,7 +1449,7 @@
boolean add = false;
- if (direct && !backup)
+ if (direct && !backup && !paused)
{
// Deliver directly
@@ -1667,7 +1669,7 @@
if (message.decrementRefCount() == 0 && store != null)
{
- store.addSize(-ref.getMessage().getMemoryEstimate());
+ store.addSize(-ref.getMessage().getMemoryEstimate());
}
}
@@ -1732,6 +1734,7 @@
{
public void run()
{
+
// Must be set to false *before* executing to avoid race
waitingToDeliver.set(false);
@@ -1887,4 +1890,24 @@
}
}
}
+
+ public synchronized void pause()
+ {
+ paused = true;
+
+ log.info("Paused is now " + paused);
+ }
+
+ public synchronized void resume()
+ {
+ paused = false;
+
+ deliver();
+ }
+
+ public synchronized boolean isPaused()
+ {
+ log.info("return ispaused " + paused);
+ return paused;
+ }
}
Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -32,6 +32,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.management.DayCounterInfo;
import org.hornetq.core.management.HornetQServerControl;
import org.hornetq.core.management.MessageCounterInfo;
@@ -307,7 +308,7 @@
session.deleteQueue(queue);
}
-
+
public void testListScheduledMessagesAsJSON() throws Exception
{
long delay = 2000;
@@ -369,7 +370,7 @@
consumer.close();
session.deleteQueue(queue);
}
-
+
public void testListMessagesAsJSONWithNullFilter() throws Exception
{
SimpleString address = randomSimpleString();
@@ -384,7 +385,7 @@
message.putIntProperty(new SimpleString("key"), intValue);
producer.send(message);
- String jsonString = queueControl.listMessagesAsJSON(null);
+ String jsonString = queueControl.listMessagesAsJSON(null);
assertNotNull(jsonString);
JSONArray array = new JSONArray(jsonString);
assertEquals(1, array.length());
@@ -392,7 +393,7 @@
consumeMessages(1, session, queue);
- jsonString = queueControl.listMessagesAsJSON(null);
+ jsonString = queueControl.listMessagesAsJSON(null);
assertNotNull(jsonString);
array = new JSONArray(jsonString);
assertEquals(0, array.length());
@@ -455,7 +456,7 @@
session.deleteQueue(queue);
}
-
+
public void testListMessagesWithEmptyFilter() throws Exception
{
SimpleString address = randomSimpleString();
@@ -478,7 +479,7 @@
session.deleteQueue(queue);
}
-
+
public void testListMessagesAsJSONWithFilter() throws Exception
{
SimpleString key = new SimpleString("key");
@@ -515,7 +516,7 @@
session.deleteQueue(queue);
}
-
+
/**
* <ol>
* <li>send a message to queue</li>
@@ -636,8 +637,7 @@
assertEquals(2, queueControl.getMessageCount());
// moved matching messages to otherQueue
- int movedMatchedMessagesCount = queueControl.moveMessages(key + " =" + matchingValue,
- otherQueue.toString());
+ int movedMatchedMessagesCount = queueControl.moveMessages(key + " =" + matchingValue, otherQueue.toString());
assertEquals(1, movedMatchedMessagesCount);
assertEquals(1, queueControl.getMessageCount());
@@ -682,7 +682,7 @@
assertEquals(0, otherQueueControl.getMessageCount());
// the message IDs are set on the server
- Map<String, Object>[] messages = queueControl.listMessages(null);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
assertEquals(2, messages.length);
long messageID = (Long)messages[0].get("messageID");
@@ -714,7 +714,7 @@
assertEquals(1, queueControl.getMessageCount());
// the message IDs are set on the server
- Map<String, Object>[] messages = queueControl.listMessages(null);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
assertEquals(1, messages.length);
long messageID = (Long)messages[0].get("messageID");
@@ -805,7 +805,7 @@
int removedMatchedMessagesCount = queueControl.removeMessages(null);
assertEquals(2, removedMatchedMessagesCount);
assertEquals(0, queueControl.getMessageCount());
-
+
session.deleteQueue(queue);
}
@@ -828,11 +828,10 @@
int removedMatchedMessagesCount = queueControl.removeMessages("");
assertEquals(2, removedMatchedMessagesCount);
assertEquals(0, queueControl.getMessageCount());
-
+
session.deleteQueue(queue);
}
-
-
+
public void testRemoveMessage() throws Exception
{
SimpleString address = randomSimpleString();
@@ -849,7 +848,7 @@
assertEquals(2, queueControl.getMessageCount());
// the message IDs are set on the server
- Map<String, Object>[] messages = queueControl.listMessages(null);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
assertEquals(2, messages.length);
long messageID = (Long)messages[0].get("messageID");
@@ -958,7 +957,7 @@
assertEquals(0, expiryQueueControl.getMessageCount());
// the message IDs are set on the server
- Map<String, Object>[] messages = queueControl.listMessages(null);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
assertEquals(1, messages.length);
long messageID = (Long)messages[0].get("messageID");
@@ -996,7 +995,7 @@
assertEquals(2, queueControl.getMessageCount());
// the message IDs are set on the server
- Map<String, Object>[] messages = queueControl.listMessages(null);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
assertEquals(2, messages.length);
long messageID = (Long)messages[0].get("messageID");
@@ -1015,7 +1014,7 @@
consumeMessages(1, session, deadLetterQueue);
session.deleteQueue(queue);
- session.deleteQueue(deadLetterQueue);
+ session.deleteQueue(deadLetterQueue);
}
public void testChangeMessagePriority() throws Exception
@@ -1037,7 +1036,7 @@
assertEquals(1, queueControl.getMessageCount());
// the message IDs are set on the server
- Map<String, Object>[] messages = queueControl.listMessages(null);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
assertEquals(1, messages.length);
long messageID = (Long)messages[0].get("messageID");
@@ -1070,7 +1069,7 @@
assertEquals(1, queueControl.getMessageCount());
// the message IDs are set on the server
- Map<String, Object>[] messages = queueControl.listMessages(null);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
assertEquals(1, messages.length);
long messageID = (Long)messages[0].get("messageID");
@@ -1099,14 +1098,14 @@
session.createQueue(address, queue, null, false);
QueueControl queueControl = createManagementControl(address, queue);
-
+
HornetQServerControl serverControl = createHornetQServerControl(mbeanServer);
serverControl.enableMessageCounters();
serverControl.setMessageCounterSamplePeriod(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD);
String jsonString = queueControl.listMessageCounter();
MessageCounterInfo info = MessageCounterInfo.fromJSON(jsonString);
-
+
assertEquals(0, info.getDepth());
assertEquals(0, info.getCount());
@@ -1133,7 +1132,7 @@
consumeMessages(2, session, queue);
- Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
+ Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
jsonString = queueControl.listMessageCounter();
info = MessageCounterInfo.fromJSON(jsonString);
assertEquals(0, info.getDepth());
@@ -1143,7 +1142,7 @@
session.deleteQueue(queue);
}
-
+
public void testResetMessageCounter() throws Exception
{
SimpleString address = randomSimpleString();
@@ -1151,7 +1150,7 @@
session.createQueue(address, queue, null, false);
QueueControl queueControl = createManagementControl(address, queue);
-
+
HornetQServerControl serverControl = createHornetQServerControl(mbeanServer);
serverControl.enableMessageCounters();
serverControl.setMessageCounterSamplePeriod(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD);
@@ -1175,7 +1174,7 @@
consumeMessages(1, session, queue);
- Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
+ Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
jsonString = queueControl.listMessageCounter();
info = MessageCounterInfo.fromJSON(jsonString);
assertEquals(0, info.getDepth());
@@ -1184,18 +1183,18 @@
assertEquals(0, info.getCountDelta());
queueControl.resetMessageCounter();
-
- Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
+
+ Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
jsonString = queueControl.listMessageCounter();
info = MessageCounterInfo.fromJSON(jsonString);
assertEquals(0, info.getDepth());
assertEquals(0, info.getDepthDelta());
assertEquals(0, info.getCount());
assertEquals(0, info.getCountDelta());
-
+
session.deleteQueue(queue);
}
-
+
public void testListMessageCounterAsHTML() throws Exception
{
SimpleString address = randomSimpleString();
@@ -1203,10 +1202,10 @@
session.createQueue(address, queue, null, false);
QueueControl queueControl = createManagementControl(address, queue);
-
+
String history = queueControl.listMessageCounterAsHTML();
assertNotNull(history);
-
+
session.deleteQueue(queue);
}
@@ -1218,7 +1217,7 @@
session.createQueue(address, queue, null, false);
QueueControl queueControl = createManagementControl(address, queue);
-
+
HornetQServerControl serverControl = createHornetQServerControl(mbeanServer);
serverControl.enableMessageCounters();
serverControl.setMessageCounterSamplePeriod(counterPeriod);
@@ -1238,7 +1237,7 @@
session.createQueue(address, queue, null, false);
QueueControl queueControl = createManagementControl(address, queue);
-
+
HornetQServerControl serverControl = createHornetQServerControl(mbeanServer);
serverControl.enableMessageCounters();
serverControl.setMessageCounterSamplePeriod(counterPeriod);
@@ -1247,8 +1246,36 @@
assertNotNull(history);
session.deleteQueue(queue);
+
}
-
+
+ public void testPauseAndResume()
+ {
+ long counterPeriod = 1000;
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ try
+ {
+ session.createQueue(address, queue, null, false);
+ QueueControl queueControl = createManagementControl(address, queue);
+
+ HornetQServerControl serverControl = createHornetQServerControl(mbeanServer);
+ serverControl.enableMessageCounters();
+ serverControl.setMessageCounterSamplePeriod(counterPeriod);
+ assertFalse(queueControl.isPaused());
+ queueControl.pause();
+ assertTrue(queueControl.isPaused());
+ queueControl.resume();
+ assertFalse(queueControl.isPaused());
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -1278,9 +1305,9 @@
session.close();
server.stop();
-
+
session = null;
-
+
server = null;
super.tearDown();
Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -252,6 +252,21 @@
proxy.invokeOperation("setExpiryAddress", expiryAddres);
}
+ public void pause() throws Exception
+ {
+ proxy.invokeOperation("pause");
+ }
+
+ public void resume() throws Exception
+ {
+ proxy.invokeOperation("pause");
+ }
+
+ public boolean isPaused() throws Exception
+ {
+ return (Boolean)proxy.invokeOperation("isPaused");
+ }
+
};
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -517,4 +517,31 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#isPaused()
+ */
+ public boolean isPaused()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#pause()
+ */
+ public void pause()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#resume()
+ */
+ public void resume()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
\ No newline at end of file
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -13,7 +13,6 @@
package org.hornetq.tests.unit.core.server.impl;
-
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -76,7 +75,6 @@
assertEquals(name, queue.getName());
}
-
public void testDurable()
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, false, scheduledExecutor, null, null, null);
@@ -194,7 +192,7 @@
}
- public void testSimpleDirectDelivery() throws Exception
+ public void testSimpleDirectDelivery() throws Exception
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
@@ -222,7 +220,7 @@
assertRefListsIdenticalRefs(refs, consumer.getReferences());
}
- public void testSimpleNonDirectDelivery() throws Exception
+ public void testSimpleNonDirectDelivery() throws Exception
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
@@ -243,7 +241,7 @@
assertEquals(0, queue.getScheduledCount());
assertEquals(0, queue.getDeliveringCount());
- //Now add a consumer
+ // Now add a consumer
FakeConsumer consumer = new FakeConsumer();
queue.addConsumer(consumer);
@@ -260,7 +258,7 @@
assertEquals(numMessages, queue.getDeliveringCount());
}
- public void testBusyConsumer() throws Exception
+ public void testBusyConsumer() throws Exception
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
@@ -425,10 +423,18 @@
assertRefListsIdenticalRefs(allRefs, consumer.getReferences());
}
-
public void testChangeConsumersAndDeliver() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, new FakePostOffice(), null, null);
+ Queue queue = new QueueImpl(1,
+ address1,
+ queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ new FakePostOffice(),
+ null,
+ null);
final int numMessages = 10;
@@ -609,7 +615,7 @@
}
catch (IllegalStateException e)
{
- //Ok
+ // Ok
}
}
@@ -623,7 +629,7 @@
List<MessageReference> refs = new ArrayList<MessageReference>();
- //Test first with queueing
+ // Test first with queueing
for (int i = 0; i < numMessages; i++)
{
@@ -713,7 +719,7 @@
{
MessageReference ref = generateReference(queue, i);
- ref.getMessage().setPriority((byte) i);
+ ref.getMessage().setPriority((byte)i);
refs.add(ref);
@@ -728,7 +734,7 @@
List<MessageReference> receivedRefs = consumer.getReferences();
- //Should be in reverse order
+ // Should be in reverse order
assertEquals(refs.size(), receivedRefs.size());
@@ -737,8 +743,8 @@
assertEquals(refs.get(i), receivedRefs.get(9 - i));
}
- //But if we send more - since we are now in direct mode - the order will be the send order
- //since the refs don't get queued
+ // But if we send more - since we are now in direct mode - the order will be the send order
+ // since the refs don't get queued
consumer.clearReferences();
@@ -748,7 +754,7 @@
{
MessageReference ref = generateReference(queue, i);
- ref.getMessage().setPriority((byte) i);
+ ref.getMessage().setPriority((byte)i);
refs.add(ref);
@@ -839,7 +845,16 @@
public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, new FakePostOffice(), null, null);
+ Queue queue = new QueueImpl(1,
+ address1,
+ queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ new FakePostOffice(),
+ null,
+ null);
Filter filter = new FakeFilter("fruit", "orange");
@@ -863,7 +878,6 @@
refs.add(ref2);
-
assertEquals(2, queue.getMessageCount());
assertEquals(1, consumer.getReferences().size());
@@ -880,7 +894,6 @@
queue.deliverNow();
-
refs.clear();
consumer.clearReferences();
@@ -1020,7 +1033,6 @@
assertEquals(0, queue.getScheduledCount());
assertEquals(10, queue.getDeliveringCount());
-
for (int i = numMessages * 2; i < numMessages * 3; i++)
{
MessageReference ref = generateReference(queue, i);
@@ -1038,12 +1050,20 @@
assertEquals(20, queue.getDeliveringCount());
}
-
// Private ------------------------------------------------------------------------------
private void testConsumerWithFilters(boolean direct) throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, new FakePostOffice(), null, null);
+ Queue queue = new QueueImpl(1,
+ address1,
+ queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ new FakePostOffice(),
+ null,
+ null);
Filter filter = new FakeFilter("fruit", "orange");
@@ -1139,11 +1159,11 @@
queue.addFirst(messageReference);
queue.addLast(messageReference2);
queue.addFirst(messageReference3);
-
+
assertEquals(0, consumer.getReferences().size());
queue.addConsumer(consumer);
queue.deliverNow();
-
+
assertEquals(3, consumer.getReferences().size());
assertEquals(messageReference3, consumer.getReferences().get(0));
assertEquals(messageReference, consumer.getReferences().get(1));
@@ -1162,7 +1182,6 @@
assertEquals(queue.getMessagesAdded(), 3);
}
-
public void testGetReference() throws Exception
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
@@ -1189,16 +1208,120 @@
}
+ /**
+ * Test the paused and resumed states with async deliveries.
+ * @throws Exception
+ */
+ public void testPauseAndResumeWithAsync() throws Exception
+ {
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+
+ // pauses the queue
+ queue.pause();
+ final int numMessages = 10;
+
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ MessageReference ref = generateReference(queue, i);
+
+ refs.add(ref);
+
+ queue.addLast(ref);
+ }
+ // even as this queue is paused, it will receive the messages anyway
+ assertEquals(10, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(0, queue.getDeliveringCount());
+
+ // Now add a consumer
+ FakeConsumer consumer = new FakeConsumer();
+
+ queue.addConsumer(consumer);
+
+ assertTrue(consumer.getReferences().isEmpty());
+ assertEquals(10, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ // explicit order of delivery
+ queue.deliverNow();
+ // As the queue is paused, even an explicit order of delivery will not work.
+ assertEquals(0, consumer.getReferences().size());
+ assertEquals(numMessages, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(0, queue.getDeliveringCount());
+ // resuming work
+ queue.resume();
+
+ // after resuming the delivery begins.
+ assertRefListsIdenticalRefs(refs, consumer.getReferences());
+ assertEquals(numMessages, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(numMessages, queue.getDeliveringCount());
+
+ }
+
+ /**
+ * Test the paused and resumed states with direct deliveries.
+ * @throws Exception
+ */
+
+ public void testPauseAndResumeWithDirect() throws Exception
+ {
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+
+ // Now add a consumer
+ FakeConsumer consumer = new FakeConsumer();
+
+ queue.addConsumer(consumer);
+
+ // brings to queue to paused state
+ queue.pause();
+
+ final int numMessages = 10;
+
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ MessageReference ref = generateReference(queue, i);
+ refs.add(ref);
+ queue.addLast(ref);
+ }
+
+ // the queue even if it's paused will receive the message but won't forward
+ // directly to the consumer until resumed.
+ assertEquals(numMessages, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(0, queue.getDeliveringCount());
+ assertTrue(consumer.getReferences().isEmpty());
+
+ // brings the queue to resumed state.
+ queue.resume();
+ // resuming delivery of messages
+ assertRefListsIdenticalRefs(refs, consumer.getReferences());
+ assertEquals(numMessages, queue.getMessageCount());
+ assertEquals(numMessages, queue.getDeliveringCount());
+
+ }
+
class AddtoQueueRunner implements Runnable
{
Queue queue;
+
MessageReference messageReference;
+
boolean added = false;
+
CountDownLatch countDownLatch;
+
boolean first;
- public AddtoQueueRunner(boolean first, Queue queue, MessageReference messageReference, CountDownLatch countDownLatch)
+ public AddtoQueueRunner(boolean first,
+ Queue queue,
+ MessageReference messageReference,
+ CountDownLatch countDownLatch)
{
this.queue = queue;
this.messageReference = messageReference;
@@ -1226,7 +1349,7 @@
Consumer consumer;
public List<Consumer> getConsumers()
- {
+ {
return null;
}
More information about the hornetq-commits
mailing list