[jboss-cvs] JBoss Messaging SVN: r5667 - in trunk: tests/src/org/jboss/messaging/tests/integration/divert and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jan 20 06:58:37 EST 2009


Author: timfox
Date: 2009-01-20 06:58:36 -0500 (Tue, 20 Jan 2009)
New Revision: 5667

Modified:
   trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/divert/PersistentDivertTest.java
Log:
more tweaks

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java	2009-01-20 11:22:15 UTC (rev 5666)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java	2009-01-20 11:58:36 UTC (rev 5667)
@@ -26,6 +26,9 @@
 
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.Divert;
 import org.jboss.messaging.core.server.ServerMessage;
@@ -59,6 +62,10 @@
    private final Filter filter;
 
    private final Transformer transformer;
+   
+   private final PagingManager pagingManager;
+   
+   private final StorageManager storageManager;
 
    public DivertImpl(final SimpleString forwardAddress,
                      final SimpleString uniqueName,
@@ -66,7 +73,9 @@
                      final boolean exclusive,
                      final Filter filter,
                      final Transformer transformer,
-                     final PostOffice postOffice)
+                     final PostOffice postOffice,
+                     final PagingManager pagingManager,
+                     final StorageManager storageManager)
    {
       this.forwardAddress = forwardAddress;
 
@@ -81,6 +90,10 @@
       this.transformer = transformer;
 
       this.postOffice = postOffice;
+      
+      this.pagingManager = pagingManager;
+      
+      this.storageManager = storageManager;
    }
 
    public boolean accept(final ServerMessage message) throws Exception
@@ -91,6 +104,25 @@
       }
       else
       {
+         //We need to increment ref count here to ensure that the message doesn't get stored, deleted and stored again in a single route which
+         //can occur if the message is routed to a queue, then acked before it's routed here
+         
+         //TODO - combine with similar code in QueueImpl.accept()
+         
+         int count = message.incrementRefCount();
+         
+         if (count == 1)
+         {
+            PagingStore store = pagingManager.getPageStore(message.getDestination());
+            
+            store.addSize(message.getMemoryEstimate());
+         }
+       
+         if (message.isDurable())
+         {
+            message.incrementDurableRefCount();
+         }
+         
          return true;
       }
    }
@@ -109,6 +141,29 @@
       }
 
       postOffice.route(message, tx);
+      
+      //Decrement the ref count here - and delete the message if necessary
+      
+      //TODO combine this with code in QueueImpl::postAcknowledge
+      
+      if (message.isDurable())
+      {
+         int count = message.decrementDurableRefCount();
+
+         if (count == 0)
+         {
+            storageManager.deleteMessage(message.getMessageID());
+         }
+      }
+
+      // TODO: We could optimize this by storing the paging-store for the address on the Queue. We would need to know
+      // the Address for the Queue
+      PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+      if (message.decrementRefCount() == 0)
+      {
+         store.addSize(-message.getMemoryEstimate());         
+      }
    }
 
    public SimpleString getRoutingName()

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-20 11:22:15 UTC (rev 5666)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-20 11:58:36 UTC (rev 5667)
@@ -874,7 +874,9 @@
                                         config.isExclusive(),
                                         filter,
                                         transformer,
-                                        postOffice);
+                                        postOffice,
+                                        pagingManager,
+                                        storageManager);
 
          DivertBinding binding = new DivertBindingImpl(sAddress, divert);
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/divert/PersistentDivertTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/divert/PersistentDivertTest.java	2009-01-20 11:22:15 UTC (rev 5666)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/divert/PersistentDivertTest.java	2009-01-20 11:58:36 UTC (rev 5667)
@@ -369,6 +369,8 @@
       
       sf.close();
       
+      assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+      
       messagingService.stop();
    }
    




More information about the jboss-cvs-commits mailing list