[jboss-cvs] JBoss Messaging SVN: r4755 - in branches/Branch_JBMESSAGING-1303: src/main/org/jboss/messaging/core/message/impl and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jul 31 05:30:40 EDT 2008


Author: jmesnil
Date: 2008-07-31 05:30:40 -0400 (Thu, 31 Jul 2008)
New Revision: 4755

Modified:
   branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
   branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java
   branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
   branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
   branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
Log:
JBMESSAGING-1303: Revisit management interfaces
* added method Queue.expireMessage()
* added property HDR_ORIGIN_QUEUE in MessageReferenceImpl.makeCopyForDLQOrExpiry() to store the origin queue before the message is moved to the DLQ or expiry queue

Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java	2008-07-31 07:37:20 UTC (rev 4754)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java	2008-07-31 09:30:40 UTC (rev 4755)
@@ -219,17 +219,7 @@
 
    public boolean expireMessage(long messageID) throws Exception
    {
-      List<MessageReference> refs = queue.list(null);
-      for (MessageReference ref : refs)
-      {
-         if (ref.getMessage().getMessageID() == messageID)
-         {
-            ref.expire(storageManager, postOffice, queueSettingsRepository);
-            queue.removeReferenceWithID(messageID);
-            return true;
-         }
-      }
-      return false;
+      return queue.expireMessage(messageID, storageManager, postOffice, queueSettingsRepository);
    }
 
    public int expireMessages(String filterStr) throws Exception
@@ -244,8 +234,7 @@
          List<MessageReference> refs = queue.list(filter);
          for (MessageReference ref : refs)
          {
-            ref.expire(storageManager, postOffice, queueSettingsRepository);
-            queue.removeReferenceWithID(ref.getMessage().getMessageID());
+            queue.expireMessage(ref.getMessage().getMessageID(), storageManager, postOffice, queueSettingsRepository);
          }
          return refs.size();
       } catch (MessagingException e)
@@ -285,8 +274,7 @@
          if (message.getMessageID() == messageID)
          {
             ref.sendToDLQ(storageManager, postOffice, queueSettingsRepository);
-            queue.removeReferenceWithID(messageID);
-            return true;
+            return queue.deleteReference(messageID, storageManager);
          }
       }
       return false;
@@ -309,7 +297,7 @@
             message.setPriority((byte) newPriority);
             // delete and add the reference so that it
             // goes to the right queues for the new priority
-            queue.removeReferenceWithID(messageID);
+            queue.deleteReference(messageID, storageManager);
             queue.addLast(ref);
             return true;
          }

Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-07-31 07:37:20 UTC (rev 4754)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-07-31 09:30:40 UTC (rev 4755)
@@ -58,6 +58,8 @@
 
    public static final SimpleString HDR_ACTUAL_EXPIRY_TIME = new SimpleString("JBMActualExpiryTime");
 
+   public static final SimpleString HDR_ORIGIN_QUEUE = new SimpleString("JBMOriginQueue");
+
    // Attributes ----------------------------------------------------
 
    private SimpleString destination;

Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java	2008-07-31 07:37:20 UTC (rev 4754)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java	2008-07-31 09:30:40 UTC (rev 4755)
@@ -29,6 +29,9 @@
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.FlowController;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.util.SimpleString;
 
 
@@ -118,6 +121,8 @@
    boolean deleteReference(long messageID, StorageManager storageManager)
    throws Exception;
 
+  boolean expireMessage(long messageID, StorageManager storageManager, PostOffice postOffice, HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception; 
+
    void lock();
    
    void unlock();

Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-07-31 07:37:20 UTC (rev 4754)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-07-31 09:30:40 UTC (rev 4755)
@@ -288,9 +288,11 @@
       
       copy.setMessageID(newMessageId);
       
+      SimpleString originalQueue = copy.getDestination();      
+      copy.putStringProperty(MessageImpl.HDR_ORIGIN_QUEUE, originalQueue);
+
       // reset expiry
       copy.setExpiration(0);
-      
       if (expiry)
       {
          long actualExpiryTime = System.currentTimeMillis();

Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-07-31 07:37:20 UTC (rev 4754)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-07-31 09:30:40 UTC (rev 4755)
@@ -44,11 +44,14 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.FlowController;
+import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.DistributionPolicy;
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.util.SimpleString;
@@ -403,7 +406,7 @@
    public void referenceAcknowledged(MessageReference ref) throws Exception
    {
       deliveringCount.decrementAndGet();
-      
+
       sizeBytes.addAndGet(-ref.getMessage().getEncodeSize());
 
 //      if (flowController != null)
@@ -498,7 +501,7 @@
       {
          MessageReference ref = iter.next();
          if (ref.getMessage().getMessageID() == messageID)
-         {         
+         {        
             deliveringCount.incrementAndGet();
             tx.addAcknowledgement(ref);
             iter.remove();
@@ -521,7 +524,25 @@
    {            
       lock.unlock();          
    }
+   
+   public boolean expireMessage(long messageID, StorageManager storageManager, PostOffice postOffice, HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception 
+   {
+      Iterator<MessageReference> iter = messageReferences.iterator();
 
+      while (iter.hasNext())
+      {
+         MessageReference ref = iter.next();
+         if (ref.getMessage().getMessageID() == messageID)
+         {        
+            deliveringCount.incrementAndGet();
+            ref.expire(storageManager, postOffice, queueSettingsRepository);
+            iter.remove();
+            return true;
+         }
+      }
+      return false;
+   }
+
    // Public
    // -----------------------------------------------------------------------------
 

Modified: branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java	2008-07-31 07:37:20 UTC (rev 4754)
+++ branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java	2008-07-31 09:30:40 UTC (rev 4755)
@@ -540,51 +540,37 @@
    {
       long messageID = randomLong();
 
-      List<MessageReference> refs = new ArrayList<MessageReference>();
-      MessageReference ref = createMock(MessageReference.class);
-      ServerMessage message = createMock(ServerMessage.class);
-      expect(message.getMessageID()).andStubReturn(messageID);
-      expect(ref.getMessage()).andReturn(message);
-      refs.add(ref);
       Queue queue = createMock(Queue.class);
       StorageManager storageManager = createMock(StorageManager.class);
-      expect(queue.list(null)).andReturn(refs);
       PostOffice postOffice = createMock(PostOffice.class);
       HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
-      ref.expire(storageManager, postOffice, repository);
-      expect(queue.removeReferenceWithID(messageID)).andReturn(true);
+      expect(queue.expireMessage(messageID, storageManager, postOffice, repository)).andReturn(true);
 
-      replay(queue, storageManager, postOffice, repository, ref, message);
+      replay(queue, storageManager, postOffice, repository);
 
       QueueControlMBean control = new QueueControl(queue, storageManager,
             postOffice, repository);
       assertTrue(control.expireMessage(messageID));
 
-      verify(queue, storageManager, postOffice, repository, ref, message);
+      verify(queue, storageManager, postOffice, repository);
    }
 
    public void testExpireMessageWithNoMatch() throws Exception
    {
       long messageID = randomLong();
 
-      List<MessageReference> refs = new ArrayList<MessageReference>();
-      MessageReference ref = createMock(MessageReference.class);
-      ServerMessage message = createMock(ServerMessage.class);
-      expect(message.getMessageID()).andStubReturn(messageID + 1);
-      expect(ref.getMessage()).andReturn(message);
-      refs.add(ref);
       Queue queue = createMock(Queue.class);
       StorageManager storageManager = createMock(StorageManager.class);
-      expect(queue.list(null)).andReturn(refs);
       PostOffice postOffice = createMock(PostOffice.class);
       HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
-      replay(queue, storageManager, postOffice, repository, ref, message);
+      expect(queue.expireMessage(messageID, storageManager, postOffice, repository)).andReturn(false);
+      replay(queue, storageManager, postOffice, repository);
 
       QueueControlMBean control = new QueueControl(queue, storageManager,
             postOffice, repository);
       assertFalse(control.expireMessage(messageID));
 
-      verify(queue, storageManager, postOffice, repository, ref, message);
+      verify(queue, storageManager, postOffice, repository);
    }
 
    public void testExpireMessagesWithFilter() throws Exception
@@ -608,10 +594,8 @@
       expect(queue.list(isA(Filter.class))).andReturn(refs);
       PostOffice postOffice = createMock(PostOffice.class);
       HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
-      ref_1.expire(storageManager, postOffice, repository);
-      ref_2.expire(storageManager, postOffice, repository);
-      expect(queue.removeReferenceWithID(messageID_1)).andReturn(true);
-      expect(queue.removeReferenceWithID(messageID_2)).andReturn(true);
+      expect(queue.expireMessage(messageID_1, storageManager, postOffice, repository)).andReturn(true);
+      expect(queue.expireMessage(messageID_2, storageManager, postOffice, repository)).andReturn(true);
 
       replay(queue, storageManager, postOffice, repository, ref_1, ref_2,
             message_1, message_2);
@@ -718,7 +702,7 @@
       HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
       Queue queue = createMock(Queue.class);
       expect(queue.list(null)).andReturn(refs);
-      expect(queue.removeReferenceWithID(messageID)).andReturn(true);
+      expect(queue.deleteReference(messageID, storageManager)).andReturn(true);
       expect(queue.addLast(ref)).andReturn(HandleStatus.HANDLED);
 
       replay(queue, storageManager, postOffice, repository, ref, message);
@@ -798,7 +782,7 @@
       PostOffice postOffice = createMock(PostOffice.class);
       HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
       ref.sendToDLQ(storageManager, postOffice, repository);
-      expect(queue.removeReferenceWithID(messageID)).andReturn(true);
+      expect(queue.deleteReference(messageID, storageManager)).andReturn(true);
 
       replay(queue, storageManager, postOffice, repository, ref, message);
 

Modified: branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java	2008-07-31 07:37:20 UTC (rev 4754)
+++ branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java	2008-07-31 09:30:40 UTC (rev 4755)
@@ -124,6 +124,7 @@
       SimpleString dlqName = new SimpleString("testDLQ");
       queueSettings.setDLQ(dlqName);
       Binding dlqBinding = EasyMock.createStrictMock(Binding.class);
+      EasyMock.expect(dlqBinding.getAddress()).andReturn(dlqName);
       StorageManager sm = EasyMock.createNiceMock(StorageManager.class);
       PostOffice po = EasyMock.createStrictMock(PostOffice.class);
       HierarchicalRepository<QueueSettings> repos = EasyMock.createStrictMock(HierarchicalRepository.class);
@@ -143,14 +144,17 @@
       EasyMock.expect(serverMessage.copy()).andReturn(serverMessage);
       EasyMock.expect(sm.generateMessageID()).andReturn(2l);
       serverMessage.setMessageID(2);
+      EasyMock.expect(serverMessage.getDestination()).andReturn(queueName);
+      serverMessage.putStringProperty(MessageImpl.HDR_ORIGIN_QUEUE, queueName);
       serverMessage.setExpiration(0);
+      serverMessage.setDestination(dlqName);
       EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
       EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
       EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
       EasyMock.expect(sm.generateTransactionID()).andReturn(1l);
-      EasyMock.replay(sm, po, repos, serverMessage, queue);
+      EasyMock.replay(sm, po, repos, serverMessage, queue, dlqBinding);
       assertFalse(messageReference.cancel(sm, po, repos));
-      EasyMock.verify(sm, po, repos, serverMessage, queue);
+      EasyMock.verify(sm, po, repos, serverMessage, queue, dlqBinding);
    }
 
    public void testCancelToDLQDoesntExist() throws Exception
@@ -160,6 +164,7 @@
       SimpleString dlqName = new SimpleString("testDLQ");
       queueSettings.setDLQ(dlqName);
       Binding dlqBinding = EasyMock.createStrictMock(Binding.class);
+      EasyMock.expect(dlqBinding.getAddress()).andReturn(dlqName);
       StorageManager sm = EasyMock.createNiceMock(StorageManager.class);
       PostOffice po = EasyMock.createStrictMock(PostOffice.class);
       HierarchicalRepository<QueueSettings> repos = EasyMock.createStrictMock(HierarchicalRepository.class);
@@ -180,14 +185,20 @@
       EasyMock.expect(serverMessage.copy()).andReturn(serverMessage);
       EasyMock.expect(sm.generateMessageID()).andReturn(2l);
       serverMessage.setMessageID(2);
+      EasyMock.expect(serverMessage.getDestination()).andReturn(queueName);
+      serverMessage.putStringProperty(MessageImpl.HDR_ORIGIN_QUEUE, queueName);
       serverMessage.setExpiration(0);
+      serverMessage.setDestination(dlqName);
       EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
       EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
       EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
       EasyMock.expect(sm.generateTransactionID()).andReturn(1l);
-      EasyMock.replay(sm, po, repos, serverMessage, queue);
+      
+      EasyMock.replay(sm, po, repos, serverMessage, queue, dlqBinding);
+      
       assertFalse(messageReference.cancel(sm, po, repos));
-      EasyMock.verify(sm, po, repos, serverMessage, queue);
+      
+      EasyMock.verify(sm, po, repos, serverMessage, queue, dlqBinding);
    }
 
    public void testExpire() throws Exception
@@ -244,6 +255,8 @@
       EasyMock.expect(po.getBinding(expQName)).andReturn(expQBinding);
       EasyMock.expect(serverMessage.copy()).andReturn(serverMessage);
       serverMessage.setMessageID(2);
+      EasyMock.expect(serverMessage.getDestination()).andReturn(queueName);
+      serverMessage.putStringProperty(MessageImpl.HDR_ORIGIN_QUEUE, queueName);
       serverMessage.setExpiration(0);
       serverMessage.putLongProperty(EasyMock.eq(MessageImpl.HDR_ACTUAL_EXPIRY_TIME), EasyMock.anyLong());
       EasyMock.expect(expQBinding.getAddress()).andStubReturn(expQName);
@@ -251,8 +264,11 @@
       EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
       EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
       EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
+
       EasyMock.replay(sm, po, repos, serverMessage, queue, expQBinding);
+      
       messageReference.expire(sm, po, repos);
+
       EasyMock.verify(sm, po, repos, serverMessage, queue, expQBinding);
    }
 
@@ -283,6 +299,8 @@
       EasyMock.expect(po.addBinding(expQName, expQName, null, true, false)).andReturn(expQBinding);
       EasyMock.expect(serverMessage.copy()).andReturn(serverMessage);
       serverMessage.setMessageID(2);
+      EasyMock.expect(serverMessage.getDestination()).andReturn(queueName);
+      serverMessage.putStringProperty(MessageImpl.HDR_ORIGIN_QUEUE, queueName);
       serverMessage.setExpiration(0);
       serverMessage.putLongProperty(EasyMock.eq(MessageImpl.HDR_ACTUAL_EXPIRY_TIME), EasyMock.anyLong());
       EasyMock.expect(expQBinding.getAddress()).andStubReturn(expQName);
@@ -290,8 +308,11 @@
       EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
       EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
       EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
+
       EasyMock.replay(sm, po, repos, serverMessage, queue, expQBinding);
+      
       messageReference.expire(sm, po, repos);
+      
       EasyMock.verify(sm, po, repos, serverMessage, queue, expQBinding);
    }
 




More information about the jboss-cvs-commits mailing list