[hornetq-commits] JBoss hornetq SVN: r7972 - in trunk: src/main/org/hornetq/core/management/impl and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Sep 18 12:59:06 EDT 2009


Author: timfox
Date: 2009-09-18 12:59:05 -0400 (Fri, 18 Sep 2009)
New Revision: 7972

Modified:
   trunk/src/main/org/hornetq/core/management/QueueControl.java
   trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
   trunk/src/main/org/hornetq/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java
   trunk/src/main/org/hornetq/core/server/Queue.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
   trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
   trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-82 merged patch

Modified: trunk/src/main/org/hornetq/core/management/QueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/QueueControl.java	2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/src/main/org/hornetq/core/management/QueueControl.java	2009-09-18 16:59:05 UTC (rev 7972)
@@ -124,4 +124,13 @@
 
    @Operation(desc = "List the message counters history HTML", impact = INFO)
    String listMessageCounterHistoryAsHTML() throws Exception;
+   
+   @Operation(desc = "Pauses the Queue", impact = ACTION)
+   void pause() throws Exception;
+   
+   @Operation(desc = "Resumes delivery of queued messages and gets the queue out of paused state.", impact = ACTION)
+   void resume() throws Exception;
+   
+   @Operation(desc = "Inspects if the queue is paused", impact = INFO)
+   boolean isPaused() throws Exception; 
 }

Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2009-09-18 16:59:05 UTC (rev 7972)
@@ -19,6 +19,7 @@
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.management.MessageCounterInfo;
 import org.hornetq.core.management.QueueControl;
 import org.hornetq.core.message.Message;
@@ -42,8 +43,9 @@
  */
 public class QueueControlImpl implements QueueControl
 {
-
    // Constants -----------------------------------------------------
+   
+   private static final Logger log = Logger.getLogger(QueueControlImpl.class);
 
    // Attributes ----------------------------------------------------
 
@@ -65,11 +67,11 @@
       for (int i = 0; i < messages.length; i++)
       {
          Map<String, Object> message = messages[i];
-         array.put(new JSONObject(message));         
+         array.put(new JSONObject(message));
       }
       return array.toString();
    }
-   
+
    /**
     * Returns null if the string is null or empty
     */
@@ -78,7 +80,8 @@
       if (filterStr == null || filterStr.trim().length() == 0)
       {
          return null;
-      } else
+      }
+      else
       {
          return new FilterImpl(new SimpleString(filterStr));
       }
@@ -87,9 +90,9 @@
    // Constructors --------------------------------------------------
 
    public QueueControlImpl(final Queue queue,
-                       final String address,
-                       final PostOffice postOffice,
-                       final HierarchicalRepository<AddressSettings> addressSettingsRepository)
+                           final String address,
+                           final PostOffice postOffice,
+                           final HierarchicalRepository<AddressSettings> addressSettingsRepository)
    {
       this.queue = queue;
       this.address = address;
@@ -211,12 +214,12 @@
       AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
 
       SimpleString sExpiryAddress = new SimpleString(expiryAddress);
-      
+
       if (expiryAddress != null)
       {
          addressSettings.setExpiryAddress(sExpiryAddress);
       }
-      
+
       queue.setExpiryAddress(sExpiryAddress);
    }
 
@@ -232,14 +235,14 @@
       }
       return messages;
    }
-   
+
    public String listScheduledMessagesAsJSON() throws Exception
    {
       return toJSON(listScheduledMessages());
    }
 
    public Map<String, Object>[] listMessages(final String filterStr) throws Exception
-   {     
+   {
       try
       {
          Filter filter = createFilter(filterStr);
@@ -258,7 +261,7 @@
          throw new IllegalStateException(e.getMessage());
       }
    }
-   
+
    public String listMessagesAsJSON(String filter) throws Exception
    {
       return toJSON(listMessages(filter));
@@ -408,6 +411,24 @@
       return MessageCounterHelper.listMessageCounterHistoryAsHTML(new MessageCounter[] { counter });
    }
 
+   public void pause()
+   {
+      log.info("calling pause");
+      queue.pause();
+   }
+
+   public void resume()
+   {
+      log.info("calling resume");
+      queue.resume();
+   }
+
+   public boolean isPaused() throws Exception
+   {
+      log.info("calling isPaused");
+      return queue.isPaused();
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java	2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/src/main/org/hornetq/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java	2009-09-18 16:59:05 UTC (rev 7972)
@@ -246,6 +246,21 @@
                            info.getNotifications());
    }
 
+   public boolean isPaused() throws Exception
+   {
+      return localQueueControl.isPaused();
+   }
+
+   public void pause() throws Exception
+   {
+      localQueueControl.pause();
+   }
+
+   public void resume() throws Exception
+   {
+      localQueueControl.resume();
+   }
+
    // Public --------------------------------------------------------
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java	2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/src/main/org/hornetq/core/server/Queue.java	2009-09-18 16:59:05 UTC (rev 7972)
@@ -150,4 +150,22 @@
    Iterator<MessageReference> iterator();
    
    void setExpiryAddress(SimpleString expiryAddress);
+   /**
+    * Pauses the queue. It will receive messages but won't give them to the consumers until resumed.
+    * If a queue is paused, pausing it again will only throw a warning. 
+    * To check if a queue is paused, invoke <i>isPaused()</i>
+    */
+   void pause();
+   /**
+    * Resumes the delivery of message for the queue. 
+    * If a queue is resumed, resuming it again will only throw a warning. 
+    * To check if a queue is resumed, invoke <i>isPaused()</i>
+    */
+   void resume();
+   /**
+    * 
+    * @return true if paused, false otherwise.
+    */
+   boolean isPaused();
+
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-09-18 16:59:05 UTC (rev 7972)
@@ -113,6 +113,8 @@
 
    private final AtomicBoolean waitingToDeliver = new AtomicBoolean(false);
 
+   private boolean paused;
+
    private final Runnable deliverRunner = new DeliverRunner();
 
    private final PagingManager pagingManager;
@@ -146,7 +148,7 @@
    private final Map<Consumer, Iterator<MessageReference>> iterators = new HashMap<Consumer, Iterator<MessageReference>>();
 
    private ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
-   
+
    private volatile SimpleString expiryAddress;
 
    public QueueImpl(final long persistenceID,
@@ -192,7 +194,7 @@
       direct = true;
 
       scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
-      
+
       if (addressSettingsRepository != null)
       {
          expiryAddress = addressSettingsRepository.getMatch(address.toString()).getExpiryAddress();
@@ -202,7 +204,7 @@
          expiryAddress = null;
       }
    }
-   
+
    // Bindable implementation -------------------------------------------------------------------------------------
 
    public SimpleString getRoutingName()
@@ -748,21 +750,21 @@
             messageReferences.addFirst(reference, reference.getMessage().getPriority());
          }
       }
-   }     
+   }
 
    public void expire(final MessageReference ref) throws Exception
-   {      
+   {
       log.info("expiring ref " + this.expiryAddress);
       if (expiryAddress != null)
       {
-         move(expiryAddress, ref, true);         
+         move(expiryAddress, ref, true);
       }
       else
-      {         
+      {
          acknowledge(ref);
       }
    }
-   
+
    public void setExpiryAddress(final SimpleString expiryAddress)
    {
       this.expiryAddress = expiryAddress;
@@ -1289,7 +1291,7 @@
       // with the live node. Instead, when we replicate the delivery we remove
       // the ref from the queue
 
-      if (backup)
+      if (backup || paused)
       {
          return;
       }
@@ -1307,11 +1309,11 @@
 
       Iterator<MessageReference> iterator = null;
 
-      //TODO - this needs to be optimised!! Creating too much stuff on an inner loop
+      // TODO - this needs to be optimised!! Creating too much stuff on an inner loop
       int totalConsumers = distributionPolicy.getConsumerCount();
       Set<Consumer> busyConsumers = new HashSet<Consumer>();
       Set<Consumer> nullReferences = new HashSet<Consumer>();
-      
+
       while (true)
       {
          consumer = distributionPolicy.getNextConsumer();
@@ -1331,7 +1333,7 @@
             else
             {
                reference = null;
-               
+
                if (consumer.getFilter() != null)
                {
                   // we have iterated on the whole queue for
@@ -1344,7 +1346,7 @@
 
          if (reference == null)
          {
-            nullReferences.add(consumer);            
+            nullReferences.add(consumer);
             if (nullReferences.size() + busyConsumers.size() == totalConsumers)
             {
                startDepaging();
@@ -1358,10 +1360,10 @@
          else
          {
             nullReferences.remove(consumer);
-            
+
             if (reference.getMessage().isExpired())
             {
-               //We expire messages on the server too
+               // We expire messages on the server too
                if (iterator == null)
                {
                   messageReferences.removeFirst();
@@ -1370,9 +1372,9 @@
                {
                   iterator.remove();
                }
-               
+
                referenceHandled();
-               
+
                try
                {
                   expire(reference);
@@ -1381,7 +1383,7 @@
                {
                   log.error("Failed to expire ref", e);
                }
-               
+
                continue;
             }
          }
@@ -1447,7 +1449,7 @@
 
       boolean add = false;
 
-      if (direct && !backup)
+      if (direct && !backup && !paused)
       {
          // Deliver directly
 
@@ -1667,7 +1669,7 @@
 
       if (message.decrementRefCount() == 0 && store != null)
       {
-         store.addSize(-ref.getMessage().getMemoryEstimate());         
+         store.addSize(-ref.getMessage().getMemoryEstimate());
       }
    }
 
@@ -1732,6 +1734,7 @@
    {
       public void run()
       {
+
          // Must be set to false *before* executing to avoid race
          waitingToDeliver.set(false);
 
@@ -1887,4 +1890,24 @@
          }
       }
    }
+
+   public synchronized void pause()
+   {
+      paused = true;
+      
+      log.info("Paused is now " + paused);
+   }
+
+   public synchronized void resume()
+   {
+      paused = false;
+      
+      deliver();
+   }
+
+   public synchronized boolean isPaused()
+   {
+      log.info("return ispaused " + paused);
+      return paused;
+   }
 }

Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java	2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java	2009-09-18 16:59:05 UTC (rev 7972)
@@ -32,6 +32,7 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.management.DayCounterInfo;
 import org.hornetq.core.management.HornetQServerControl;
 import org.hornetq.core.management.MessageCounterInfo;
@@ -307,7 +308,7 @@
 
       session.deleteQueue(queue);
    }
-   
+
    public void testListScheduledMessagesAsJSON() throws Exception
    {
       long delay = 2000;
@@ -369,7 +370,7 @@
       consumer.close();
       session.deleteQueue(queue);
    }
-   
+
    public void testListMessagesAsJSONWithNullFilter() throws Exception
    {
       SimpleString address = randomSimpleString();
@@ -384,7 +385,7 @@
       message.putIntProperty(new SimpleString("key"), intValue);
       producer.send(message);
 
-      String jsonString =  queueControl.listMessagesAsJSON(null);
+      String jsonString = queueControl.listMessagesAsJSON(null);
       assertNotNull(jsonString);
       JSONArray array = new JSONArray(jsonString);
       assertEquals(1, array.length());
@@ -392,7 +393,7 @@
 
       consumeMessages(1, session, queue);
 
-      jsonString =  queueControl.listMessagesAsJSON(null);
+      jsonString = queueControl.listMessagesAsJSON(null);
       assertNotNull(jsonString);
       array = new JSONArray(jsonString);
       assertEquals(0, array.length());
@@ -455,7 +456,7 @@
 
       session.deleteQueue(queue);
    }
-   
+
    public void testListMessagesWithEmptyFilter() throws Exception
    {
       SimpleString address = randomSimpleString();
@@ -478,7 +479,7 @@
 
       session.deleteQueue(queue);
    }
-   
+
    public void testListMessagesAsJSONWithFilter() throws Exception
    {
       SimpleString key = new SimpleString("key");
@@ -515,7 +516,7 @@
 
       session.deleteQueue(queue);
    }
-   
+
    /**
     * <ol>
     * <li>send a message to queue</li>
@@ -636,8 +637,7 @@
       assertEquals(2, queueControl.getMessageCount());
 
       // moved matching messages to otherQueue
-      int movedMatchedMessagesCount = queueControl.moveMessages(key + " =" + matchingValue,
-                                                                        otherQueue.toString());
+      int movedMatchedMessagesCount = queueControl.moveMessages(key + " =" + matchingValue, otherQueue.toString());
       assertEquals(1, movedMatchedMessagesCount);
       assertEquals(1, queueControl.getMessageCount());
 
@@ -682,7 +682,7 @@
       assertEquals(0, otherQueueControl.getMessageCount());
 
       // the message IDs are set on the server
-      Map<String, Object>[] messages = queueControl.listMessages(null);   
+      Map<String, Object>[] messages = queueControl.listMessages(null);
       assertEquals(2, messages.length);
       long messageID = (Long)messages[0].get("messageID");
 
@@ -714,7 +714,7 @@
       assertEquals(1, queueControl.getMessageCount());
 
       // the message IDs are set on the server
-      Map<String, Object>[] messages = queueControl.listMessages(null); 
+      Map<String, Object>[] messages = queueControl.listMessages(null);
       assertEquals(1, messages.length);
       long messageID = (Long)messages[0].get("messageID");
 
@@ -805,7 +805,7 @@
       int removedMatchedMessagesCount = queueControl.removeMessages(null);
       assertEquals(2, removedMatchedMessagesCount);
       assertEquals(0, queueControl.getMessageCount());
-      
+
       session.deleteQueue(queue);
    }
 
@@ -828,11 +828,10 @@
       int removedMatchedMessagesCount = queueControl.removeMessages("");
       assertEquals(2, removedMatchedMessagesCount);
       assertEquals(0, queueControl.getMessageCount());
-      
+
       session.deleteQueue(queue);
    }
-   
-   
+
    public void testRemoveMessage() throws Exception
    {
       SimpleString address = randomSimpleString();
@@ -849,7 +848,7 @@
       assertEquals(2, queueControl.getMessageCount());
 
       // the message IDs are set on the server
-      Map<String, Object>[] messages = queueControl.listMessages(null); 
+      Map<String, Object>[] messages = queueControl.listMessages(null);
       assertEquals(2, messages.length);
       long messageID = (Long)messages[0].get("messageID");
 
@@ -958,7 +957,7 @@
       assertEquals(0, expiryQueueControl.getMessageCount());
 
       // the message IDs are set on the server
-      Map<String, Object>[] messages = queueControl.listMessages(null);       
+      Map<String, Object>[] messages = queueControl.listMessages(null);
       assertEquals(1, messages.length);
       long messageID = (Long)messages[0].get("messageID");
 
@@ -996,7 +995,7 @@
       assertEquals(2, queueControl.getMessageCount());
 
       // the message IDs are set on the server
-      Map<String, Object>[] messages = queueControl.listMessages(null);       
+      Map<String, Object>[] messages = queueControl.listMessages(null);
       assertEquals(2, messages.length);
       long messageID = (Long)messages[0].get("messageID");
 
@@ -1015,7 +1014,7 @@
       consumeMessages(1, session, deadLetterQueue);
 
       session.deleteQueue(queue);
-      session.deleteQueue(deadLetterQueue);           
+      session.deleteQueue(deadLetterQueue);
    }
 
    public void testChangeMessagePriority() throws Exception
@@ -1037,7 +1036,7 @@
       assertEquals(1, queueControl.getMessageCount());
 
       // the message IDs are set on the server
-      Map<String, Object>[] messages = queueControl.listMessages(null);       
+      Map<String, Object>[] messages = queueControl.listMessages(null);
       assertEquals(1, messages.length);
       long messageID = (Long)messages[0].get("messageID");
 
@@ -1070,7 +1069,7 @@
       assertEquals(1, queueControl.getMessageCount());
 
       // the message IDs are set on the server
-      Map<String, Object>[] messages = queueControl.listMessages(null);       
+      Map<String, Object>[] messages = queueControl.listMessages(null);
       assertEquals(1, messages.length);
       long messageID = (Long)messages[0].get("messageID");
 
@@ -1099,14 +1098,14 @@
 
       session.createQueue(address, queue, null, false);
       QueueControl queueControl = createManagementControl(address, queue);
-      
+
       HornetQServerControl serverControl = createHornetQServerControl(mbeanServer);
       serverControl.enableMessageCounters();
       serverControl.setMessageCounterSamplePeriod(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD);
 
       String jsonString = queueControl.listMessageCounter();
       MessageCounterInfo info = MessageCounterInfo.fromJSON(jsonString);
-      
+
       assertEquals(0, info.getDepth());
       assertEquals(0, info.getCount());
 
@@ -1133,7 +1132,7 @@
 
       consumeMessages(2, session, queue);
 
-      Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);      
+      Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
       jsonString = queueControl.listMessageCounter();
       info = MessageCounterInfo.fromJSON(jsonString);
       assertEquals(0, info.getDepth());
@@ -1143,7 +1142,7 @@
 
       session.deleteQueue(queue);
    }
-   
+
    public void testResetMessageCounter() throws Exception
    {
       SimpleString address = randomSimpleString();
@@ -1151,7 +1150,7 @@
 
       session.createQueue(address, queue, null, false);
       QueueControl queueControl = createManagementControl(address, queue);
-      
+
       HornetQServerControl serverControl = createHornetQServerControl(mbeanServer);
       serverControl.enableMessageCounters();
       serverControl.setMessageCounterSamplePeriod(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD);
@@ -1175,7 +1174,7 @@
 
       consumeMessages(1, session, queue);
 
-      Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);      
+      Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
       jsonString = queueControl.listMessageCounter();
       info = MessageCounterInfo.fromJSON(jsonString);
       assertEquals(0, info.getDepth());
@@ -1184,18 +1183,18 @@
       assertEquals(0, info.getCountDelta());
 
       queueControl.resetMessageCounter();
-      
-      Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);      
+
+      Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
       jsonString = queueControl.listMessageCounter();
       info = MessageCounterInfo.fromJSON(jsonString);
       assertEquals(0, info.getDepth());
       assertEquals(0, info.getDepthDelta());
       assertEquals(0, info.getCount());
       assertEquals(0, info.getCountDelta());
-      
+
       session.deleteQueue(queue);
    }
-   
+
    public void testListMessageCounterAsHTML() throws Exception
    {
       SimpleString address = randomSimpleString();
@@ -1203,10 +1202,10 @@
 
       session.createQueue(address, queue, null, false);
       QueueControl queueControl = createManagementControl(address, queue);
-      
+
       String history = queueControl.listMessageCounterAsHTML();
       assertNotNull(history);
-      
+
       session.deleteQueue(queue);
    }
 
@@ -1218,7 +1217,7 @@
 
       session.createQueue(address, queue, null, false);
       QueueControl queueControl = createManagementControl(address, queue);
-      
+
       HornetQServerControl serverControl = createHornetQServerControl(mbeanServer);
       serverControl.enableMessageCounters();
       serverControl.setMessageCounterSamplePeriod(counterPeriod);
@@ -1238,7 +1237,7 @@
 
       session.createQueue(address, queue, null, false);
       QueueControl queueControl = createManagementControl(address, queue);
-      
+
       HornetQServerControl serverControl = createHornetQServerControl(mbeanServer);
       serverControl.enableMessageCounters();
       serverControl.setMessageCounterSamplePeriod(counterPeriod);
@@ -1247,8 +1246,36 @@
       assertNotNull(history);
 
       session.deleteQueue(queue);
+
    }
-   
+
+   public void testPauseAndResume()
+   {
+      long counterPeriod = 1000;
+      SimpleString address = randomSimpleString();
+      SimpleString queue = randomSimpleString();
+
+      try
+      {
+         session.createQueue(address, queue, null, false);
+         QueueControl queueControl = createManagementControl(address, queue);
+
+         HornetQServerControl serverControl = createHornetQServerControl(mbeanServer);
+         serverControl.enableMessageCounters();
+         serverControl.setMessageCounterSamplePeriod(counterPeriod);
+         assertFalse(queueControl.isPaused());
+         queueControl.pause();
+         assertTrue(queueControl.isPaused());
+         queueControl.resume();
+         assertFalse(queueControl.isPaused());
+      }
+      catch (Exception e)
+      {
+         // TODO Auto-generated catch block
+         e.printStackTrace();
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -1278,9 +1305,9 @@
       session.close();
 
       server.stop();
-      
+
       session = null;
-      
+
       server = null;
 
       super.tearDown();

Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java	2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java	2009-09-18 16:59:05 UTC (rev 7972)
@@ -252,6 +252,21 @@
             proxy.invokeOperation("setExpiryAddress", expiryAddres);
          }
 
+         public void pause() throws Exception
+         {
+            proxy.invokeOperation("pause");
+         }
+
+         public void resume() throws Exception
+         {
+            proxy.invokeOperation("pause");
+         }
+
+         public boolean isPaused() throws Exception
+         {
+            return (Boolean)proxy.invokeOperation("isPaused");
+         }
+
       };
    }
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2009-09-18 16:59:05 UTC (rev 7972)
@@ -517,4 +517,31 @@
    {
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.Queue#isPaused()
+    */
+   public boolean isPaused()
+   {
+      // TODO Auto-generated method stub
+      return false;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.Queue#pause()
+    */
+   public void pause()
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.Queue#resume()
+    */
+   public void resume()
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
 }
\ No newline at end of file

Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java	2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java	2009-09-18 16:59:05 UTC (rev 7972)
@@ -13,7 +13,6 @@
 
 package org.hornetq.tests.unit.core.server.impl;
 
-
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
@@ -76,7 +75,6 @@
       assertEquals(name, queue.getName());
    }
 
-
    public void testDurable()
    {
       Queue queue = new QueueImpl(1, address1, queue1, null, false, false, scheduledExecutor, null, null, null);
@@ -194,7 +192,7 @@
 
    }
 
-   public void testSimpleDirectDelivery()  throws Exception
+   public void testSimpleDirectDelivery() throws Exception
    {
       Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
@@ -222,7 +220,7 @@
       assertRefListsIdenticalRefs(refs, consumer.getReferences());
    }
 
-   public void testSimpleNonDirectDelivery()  throws Exception
+   public void testSimpleNonDirectDelivery() throws Exception
    {
       Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
@@ -243,7 +241,7 @@
       assertEquals(0, queue.getScheduledCount());
       assertEquals(0, queue.getDeliveringCount());
 
-      //Now add a consumer
+      // Now add a consumer
       FakeConsumer consumer = new FakeConsumer();
 
       queue.addConsumer(consumer);
@@ -260,7 +258,7 @@
       assertEquals(numMessages, queue.getDeliveringCount());
    }
 
-   public void testBusyConsumer()  throws Exception
+   public void testBusyConsumer() throws Exception
    {
       Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
@@ -425,10 +423,18 @@
       assertRefListsIdenticalRefs(allRefs, consumer.getReferences());
    }
 
-
    public void testChangeConsumersAndDeliver() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, new FakePostOffice(), null, null);
+      Queue queue = new QueueImpl(1,
+                                  address1,
+                                  queue1,
+                                  null,
+                                  false,
+                                  true,
+                                  scheduledExecutor,
+                                  new FakePostOffice(),
+                                  null,
+                                  null);
 
       final int numMessages = 10;
 
@@ -609,7 +615,7 @@
       }
       catch (IllegalStateException e)
       {
-         //Ok
+         // Ok
       }
    }
 
@@ -623,7 +629,7 @@
 
       List<MessageReference> refs = new ArrayList<MessageReference>();
 
-      //Test first with queueing
+      // Test first with queueing
 
       for (int i = 0; i < numMessages; i++)
       {
@@ -713,7 +719,7 @@
       {
          MessageReference ref = generateReference(queue, i);
 
-         ref.getMessage().setPriority((byte) i);
+         ref.getMessage().setPriority((byte)i);
 
          refs.add(ref);
 
@@ -728,7 +734,7 @@
 
       List<MessageReference> receivedRefs = consumer.getReferences();
 
-      //Should be in reverse order
+      // Should be in reverse order
 
       assertEquals(refs.size(), receivedRefs.size());
 
@@ -737,8 +743,8 @@
          assertEquals(refs.get(i), receivedRefs.get(9 - i));
       }
 
-      //But if we send more - since we are now in direct mode - the order will be the send order
-      //since the refs don't get queued
+      // But if we send more - since we are now in direct mode - the order will be the send order
+      // since the refs don't get queued
 
       consumer.clearReferences();
 
@@ -748,7 +754,7 @@
       {
          MessageReference ref = generateReference(queue, i);
 
-         ref.getMessage().setPriority((byte) i);
+         ref.getMessage().setPriority((byte)i);
 
          refs.add(ref);
 
@@ -839,7 +845,16 @@
 
    public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, new FakePostOffice(), null, null);
+      Queue queue = new QueueImpl(1,
+                                  address1,
+                                  queue1,
+                                  null,
+                                  false,
+                                  true,
+                                  scheduledExecutor,
+                                  new FakePostOffice(),
+                                  null,
+                                  null);
 
       Filter filter = new FakeFilter("fruit", "orange");
 
@@ -863,7 +878,6 @@
 
       refs.add(ref2);
 
-
       assertEquals(2, queue.getMessageCount());
 
       assertEquals(1, consumer.getReferences().size());
@@ -880,7 +894,6 @@
 
       queue.deliverNow();
 
-
       refs.clear();
 
       consumer.clearReferences();
@@ -1020,7 +1033,6 @@
       assertEquals(0, queue.getScheduledCount());
       assertEquals(10, queue.getDeliveringCount());
 
-
       for (int i = numMessages * 2; i < numMessages * 3; i++)
       {
          MessageReference ref = generateReference(queue, i);
@@ -1038,12 +1050,20 @@
       assertEquals(20, queue.getDeliveringCount());
    }
 
-
    // Private ------------------------------------------------------------------------------
 
    private void testConsumerWithFilters(boolean direct) throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, new FakePostOffice(), null, null);
+      Queue queue = new QueueImpl(1,
+                                  address1,
+                                  queue1,
+                                  null,
+                                  false,
+                                  true,
+                                  scheduledExecutor,
+                                  new FakePostOffice(),
+                                  null,
+                                  null);
 
       Filter filter = new FakeFilter("fruit", "orange");
 
@@ -1139,11 +1159,11 @@
       queue.addFirst(messageReference);
       queue.addLast(messageReference2);
       queue.addFirst(messageReference3);
-      
+
       assertEquals(0, consumer.getReferences().size());
       queue.addConsumer(consumer);
       queue.deliverNow();
-      
+
       assertEquals(3, consumer.getReferences().size());
       assertEquals(messageReference3, consumer.getReferences().get(0));
       assertEquals(messageReference, consumer.getReferences().get(1));
@@ -1162,7 +1182,6 @@
       assertEquals(queue.getMessagesAdded(), 3);
    }
 
-   
    public void testGetReference() throws Exception
    {
       Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
@@ -1189,16 +1208,120 @@
 
    }
 
+   /**
+    * Test the paused and resumed states with async deliveries.
+    * @throws Exception
+    */
+   public void testPauseAndResumeWithAsync() throws Exception
+   {
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+      
+      // pauses the queue
+      queue.pause();
 
+      final int numMessages = 10;
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+
+         refs.add(ref);
+
+         queue.addLast(ref);
+      }
+      // even as this queue is paused, it will receive the messages anyway
+      assertEquals(10, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+
+      // Now add a consumer
+      FakeConsumer consumer = new FakeConsumer();
+
+      queue.addConsumer(consumer);
+
+      assertTrue(consumer.getReferences().isEmpty());
+      assertEquals(10, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      // explicit order of delivery
+      queue.deliverNow();
+      // As the queue is paused, even an explicit order of delivery will not work.
+      assertEquals(0, consumer.getReferences().size());
+      assertEquals(numMessages, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+      // resuming work
+      queue.resume();
+
+      // after resuming the delivery begins.
+      assertRefListsIdenticalRefs(refs, consumer.getReferences());
+      assertEquals(numMessages, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(numMessages, queue.getDeliveringCount());
+
+   }
+
+   /**
+    * Test the paused and resumed states with direct deliveries.
+    * @throws Exception
+    */
+
+   public void testPauseAndResumeWithDirect() throws Exception
+   {
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+
+      // Now add a consumer
+      FakeConsumer consumer = new FakeConsumer();
+
+      queue.addConsumer(consumer);
+
+      // brings to queue to paused state
+      queue.pause();
+
+      final int numMessages = 10;
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         MessageReference ref = generateReference(queue, i);
+         refs.add(ref);
+         queue.addLast(ref);
+      }
+
+      // the queue even if it's paused will receive the message but won't forward
+      // directly to the consumer until resumed.
+      assertEquals(numMessages, queue.getMessageCount());
+      assertEquals(0, queue.getScheduledCount());
+      assertEquals(0, queue.getDeliveringCount());
+      assertTrue(consumer.getReferences().isEmpty());
+
+      // brings the queue to resumed state.
+      queue.resume();
+      // resuming delivery of messages
+      assertRefListsIdenticalRefs(refs, consumer.getReferences());
+      assertEquals(numMessages, queue.getMessageCount());
+      assertEquals(numMessages, queue.getDeliveringCount());
+
+   }
+
    class AddtoQueueRunner implements Runnable
    {
       Queue queue;
+
       MessageReference messageReference;
+
       boolean added = false;
+
       CountDownLatch countDownLatch;
+
       boolean first;
 
-      public AddtoQueueRunner(boolean first, Queue queue, MessageReference messageReference, CountDownLatch countDownLatch)
+      public AddtoQueueRunner(boolean first,
+                              Queue queue,
+                              MessageReference messageReference,
+                              CountDownLatch countDownLatch)
       {
          this.queue = queue;
          this.messageReference = messageReference;
@@ -1226,7 +1349,7 @@
       Consumer consumer;
 
       public List<Consumer> getConsumers()
-      {         
+      {
          return null;
       }
 



More information about the hornetq-commits mailing list