[hornetq-commits] JBoss hornetq SVN: r8064 - in trunk: src/main/org/hornetq/core/deployers/impl and 24 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 7 17:01:21 EDT 2009


Author: timfox
Date: 2009-10-07 17:01:20 -0400 (Wed, 07 Oct 2009)
New Revision: 8064

Added:
   trunk/src/main/org/hornetq/core/server/RoutingContext.java
   trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
Modified:
   trunk/.classpath
   trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/hornetq/core/persistence/StorageManager.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/src/main/org/hornetq/core/postoffice/Bindings.java
   trunk/src/main/org/hornetq/core/postoffice/PostOffice.java
   trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/server/Bindable.java
   trunk/src/main/org/hornetq/core/server/MessageReference.java
   trunk/src/main/org/hornetq/core/server/Queue.java
   trunk/src/main/org/hornetq/core/server/ServerMessage.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
   trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
   trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/core/transaction/Transaction.java
   trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java
   trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
   trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
   trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
   trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
   trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
   trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java
   trunk/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
   trunk/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
   trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
routing refactoring, plus fixed some tests

Modified: trunk/.classpath
===================================================================
--- trunk/.classpath	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/.classpath	2009-10-07 21:01:20 UTC (rev 8064)
@@ -7,7 +7,7 @@
 	<classpathentry kind="src" path="tests/config"/>
 	<classpathentry excluding="**/.svn/**/*" kind="src" path="tests/src">
 		<attributes>
-			<attribute name="org.eclipse.jdt.launching.CLASSPATH_ATTR_LIBRARY_PATH_ENTRY" value="hornet/native/bin"/>
+			<attribute name="org.eclipse.jdt.launching.CLASSPATH_ATTR_LIBRARY_PATH_ENTRY" value="trunk/native/bin"/>
 		</attributes>
 	</classpathentry>
 	<classpathentry kind="src" path="tests/jms-tests/src"/>

Modified: trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -229,7 +229,7 @@
                try
                {
                   Deployer deployer = entry.getValue().deployer;
-                  log.info("Undeploying " + deployer + " with url " + pair.a);
+                  log.debug("Undeploying " + deployer + " with url " + pair.a);
                   deployer.undeploy(pair.a);
                   toRemove.add(pair);
                }

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -1441,9 +1441,8 @@
 
       try
       {
+         log.debug("Starting compacting operation on journal");
 
-         log.info("Starting compacting operation on journal");
-
          // We need to guarantee that the journal is frozen for this short time
          // We don't freeze the journal as we compact, only for the short time where we replace records
          compactingLock.writeLock().lock();
@@ -1582,7 +1581,7 @@
          renameFiles(dataFilesToProcess, newDatafiles);
          deleteControlFile(controlFile);
 
-         log.info("Finished compacting on journal");
+         log.debug("Finished compacting on journal");
 
       }
       finally
@@ -2167,7 +2166,7 @@
          try
          {
 
-            log.info("Cleaning up file " + file);
+            log.debug("Cleaning up file " + file);
 
             if (file.getPosCount() == 0)
             {
@@ -2221,7 +2220,7 @@
       finally
       {
          compactingLock.readLock().unlock();
-         log.info("Clean up on file " + file + " done");
+         log.debug("Clean up on file " + file + " done");
       }
 
    }
@@ -2767,7 +2766,7 @@
       {
          if (state != STATE_LOADED)
          {
-            throw new IllegalStateException("The journal was stopped");
+            throw new IllegalStateException("The journal is not loaded " + state);
          }
 
          int size = bb.capacity();
@@ -2845,11 +2844,14 @@
             currentFile.getFile().write(bb, sync);
          }
 
-         return currentFile;
+         return currentFile;         
       }
       finally
       {
-         currentFile.getFile().enableAutoFlush();
+         if (currentFile != null)
+         {
+            currentFile.getFile().enableAutoFlush();
+         }
       }
 
    }

Modified: trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -67,6 +67,7 @@
 import org.hornetq.core.server.cluster.Bridge;
 import org.hornetq.core.server.cluster.BroadcastGroup;
 import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.impl.RoutingContextImpl;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
@@ -728,7 +729,7 @@
 
                notificationMessage.putTypedProperties(notifProps);
 
-               postOffice.route(notificationMessage, null);
+               postOffice.route(notificationMessage, new RoutingContextImpl(null));
             }
          }
       }

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -38,7 +38,9 @@
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
@@ -593,6 +595,7 @@
       if (onDepage(page.getPageId(), storeName, messages))
       {
          page.delete();
+         
          return true;
       }
       else
@@ -755,16 +758,14 @@
       // back to where it was
 
       Transaction depageTransaction = new TransactionImpl(storageManager);
-
+      
       depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
 
       HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
 
       for (PagedMessage pagedMessage : pagedMessages)
       {
-         ServerMessage message = null;
-
-         message = pagedMessage.getMessage(storageManager);
+         ServerMessage message = pagedMessage.getMessage(storageManager);
          
          if (message.isLargeMessage())
          {
@@ -787,7 +788,6 @@
                log.warn("Transaction " + pagedMessage.getTransactionID() +
                         " used during paging not found, ignoring message " +
                         message);
-
                continue;
             }
 
@@ -815,7 +815,7 @@
                if (isTrace)
                {
                   trace("Rollback was called after prepare, ignoring message " + message);
-               }
+               }               
                continue;
             }
 
@@ -827,7 +827,7 @@
             }
          }
 
-         postOffice.route(message, depageTransaction);
+         postOffice.route(message, new RoutingContextImpl(depageTransaction));
       }
 
       if (!running)

Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -21,6 +21,7 @@
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
@@ -100,7 +101,8 @@
    
    void deleteHeuristicCompletion(long id) throws Exception;
    
-   void loadMessageJournal(PagingManager pagingManager,
+   void loadMessageJournal(PostOffice postOffice,
+                           PagingManager pagingManager,
                            ResourceManager resourceManager,
                            Map<Long, Queue> queues,
                            Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception;

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -51,6 +51,7 @@
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.remoting.impl.wireformat.XidCodecSupport;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.core.server.JournalType;
@@ -143,11 +144,11 @@
    private final String journalDir;
 
    private final String largeMessagesDirectory;
-
+   
    public JournalStorageManager(final Configuration config, final Executor executor)
    {
       this.executor = executor;
-
+      
       if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO)
       {
          throw new IllegalArgumentException("Only NIO and AsyncIO are supported journals");
@@ -250,7 +251,7 @@
 
       this.persistentID = id;
    }
-
+     
    public long generateUniqueID()
    {
       long id = idGenerator.generateID();
@@ -518,7 +519,8 @@
       
    }
 
-   public void loadMessageJournal(final PagingManager pagingManager,
+   public void loadMessageJournal(final PostOffice postOffice,
+                                  final PagingManager pagingManager,
                                   final ResourceManager resourceManager,
                                   final Map<Long, Queue> queues,
                                   final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
@@ -730,9 +732,9 @@
             {
                record.message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, scheduledDeliveryTime);
             }
+            
+            MessageReference ref = postOffice.reroute(record.message, queue, null);
 
-            MessageReference ref = queue.reroute(record.message, null);
-
             ref.setDeliveryCount(record.deliveryCount);
 
             if (scheduledDeliveryTime != 0)
@@ -742,13 +744,13 @@
          }
       }
 
-      loadPreparedTransactions(pagingManager, resourceManager, queues, preparedTransactions, duplicateIDMap);
+      loadPreparedTransactions(postOffice, pagingManager, resourceManager, queues, preparedTransactions, duplicateIDMap);
 
       for (LargeServerMessage msg : largeMessages)
       {
          if (msg.getRefCount() == 0)
          {
-            log.info("Large message: " + msg.getMessageID() + " didn't have any associated reference, file will be deleted");
+            log.debug("Large message: " + msg.getMessageID() + " didn't have any associated reference, file will be deleted");
             msg.decrementRefCount();
          }
       }
@@ -796,7 +798,8 @@
       return largeMessage;
    }
 
-   private void loadPreparedTransactions(final PagingManager pagingManager,
+   private void loadPreparedTransactions(final PostOffice postOffice,
+                                         final PagingManager pagingManager,
                                          final ResourceManager resourceManager,
                                          final Map<Long, Queue> queues,
                                          final List<PreparedTransactionInfo> preparedTransactions,
@@ -867,7 +870,7 @@
                      throw new IllegalStateException("Cannot find message with id " + messageID);
                   }
 
-                  queue.reroute(message, tx);
+                  postOffice.reroute(message, queue, tx);
 
                   break;
                }

Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -25,6 +25,7 @@
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
@@ -232,7 +233,8 @@
    {
    }
    
-   public void loadMessageJournal(PagingManager pagingManager,
+   public void loadMessageJournal(PostOffice postOffice,
+                                  PagingManager pagingManager,
                                   ResourceManager resourceManager,
                                   Map<Long, Queue> queues,
                                   Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception

Modified: trunk/src/main/org/hornetq/core/postoffice/Bindings.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/Bindings.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/postoffice/Bindings.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -16,8 +16,8 @@
 import java.util.Collection;
 
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.transaction.Transaction;
 
 /**
  * A Bindings
@@ -32,13 +32,13 @@
 {
    Collection<Binding> getBindings();
 
-   boolean route(ServerMessage message, Transaction tx) throws Exception;
-
    void addBinding(Binding binding);
 
    void removeBinding(Binding binding);
 
    void setRouteWhenNoConsumers(boolean takePriorityIntoAccount);
 
-   boolean redistribute(ServerMessage message, Queue originatingQueue, Transaction tx) throws Exception;
+   void redistribute(ServerMessage message, Queue originatingQueue, RoutingContext context) throws Exception;
+   
+   void route(ServerMessage message, RoutingContext context) throws Exception;
 }

Modified: trunk/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/PostOffice.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/postoffice/PostOffice.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -15,7 +15,9 @@
 
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.utils.SimpleString;
@@ -50,11 +52,11 @@
    
    Bindings getMatchingBindings(SimpleString address);
 
-   void route(ServerMessage message) throws Exception;
+   void route(ServerMessage message, RoutingContext context) throws Exception;
    
-   void route(ServerMessage message, Transaction tx) throws Exception;
+   MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception;
    
-   boolean redistribute(ServerMessage message, final Queue originatingQueue, Transaction tx) throws Exception;
+   boolean redistribute(ServerMessage message, final Queue originatingQueue, RoutingContext context) throws Exception;
 
    PagingManager getPagingManager();
 

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -30,8 +30,8 @@
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.server.Bindable;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.transaction.Transaction;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -56,7 +56,7 @@
    private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
 
    private volatile boolean routeWhenNoConsumers;
-   
+
    public void setRouteWhenNoConsumers(final boolean routeWhenNoConsumers)
    {
       this.routeWhenNoConsumers = routeWhenNoConsumers;
@@ -122,51 +122,14 @@
 
       bindingsMap.remove(binding.getID());
    }
-
-   private boolean routeFromCluster(final ServerMessage message, final Transaction tx) throws Exception
+   
+   public void redistribute(final ServerMessage message, final Queue originatingQueue, final RoutingContext context) throws Exception
    {
-      byte[] ids = (byte[])message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
-
-      ByteBuffer buff = ByteBuffer.wrap(ids);
-
-      Set<Bindable> chosen = new HashSet<Bindable>();
-
-      while (buff.hasRemaining())
-      {
-         long bindingID = buff.getLong();
-
-         Binding binding = bindingsMap.get(bindingID);
-
-         if (binding == null)
-         {
-            return false;
-         }
-
-         binding.willRoute(message);
-
-         chosen.add(binding.getBindable());
-      }
-
-      for (Bindable bindable : chosen)
-      {
-         bindable.preroute(message, tx);
-      }
-
-      for (Bindable bindable : chosen)
-      {
-         bindable.route(message, tx);
-      }
-      
-      return true;
-   }
-
-   public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final Transaction tx) throws Exception
-   {
       if (routeWhenNoConsumers)
       {
-         return false;
+         return;
       }
-      
+
       SimpleString routingName = originatingQueue.getName();
 
       List<Binding> bindings = routingNameBindingMap.get(routingName);
@@ -175,7 +138,7 @@
       {
          // The value can become null if it's concurrently removed while we're iterating - this is expected
          // ConcurrentHashMap behaviour!
-         return false;
+         return;
       }
 
       Integer ipos = routingNamePositions.get(routingName);
@@ -238,30 +201,22 @@
       {
          theBinding.willRoute(message);
 
-         theBinding.getBindable().preroute(message, tx);
-
-         theBinding.getBindable().route(message, tx);
-
-         return true;
+         theBinding.getBindable().route(message, context);
       }
-      else
-      {
-         return false;
-      }
    }
 
-   public boolean route(final ServerMessage message, final Transaction tx) throws Exception
+   public void route(final ServerMessage message, final RoutingContext context) throws Exception
    {
       boolean routed = false;
-      
+
       if (!exclusiveBindings.isEmpty())
       {
          for (Binding binding : exclusiveBindings)
          {
             if (binding.getFilter() == null || binding.getFilter().match(message))
             {
-               binding.getBindable().route(message, tx);
-               
+               binding.getBindable().route(message, context);
+
                routed = true;
             }
          }
@@ -271,7 +226,7 @@
       {
          if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
          {
-            routed = routeFromCluster(message, tx);
+            routeFromCluster(message, context);
          }
          else
          {
@@ -393,24 +348,34 @@
 
                routingNamePositions.put(routingName, pos);
             }
-
-            // TODO refactor to do this is one iteration
-
+            
             for (Bindable bindable : chosen)
             {
-               bindable.preroute(message, tx);
+               bindable.route(message, context);
             }
+         }
+      }
+   }
+   
+   private void routeFromCluster(final ServerMessage message, final RoutingContext context) throws Exception
+   {
+      byte[] ids = (byte[])message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
 
-            for (Bindable bindable : chosen)
-            {               
-               bindable.route(message, tx);
-               
-               routed = true;
-            }
+      ByteBuffer buff = ByteBuffer.wrap(ids);
+
+      while (buff.hasRemaining())
+      {
+         long bindingID = buff.getLong();
+
+         Binding binding = bindingsMap.get(bindingID);
+
+         if (binding != null)
+         {
+            binding.willRoute(message);
+
+            binding.getBindable().route(message, context);
          }
       }
-      
-      return routed;
    }
 
    private final int incrementPos(int pos, int length)

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -36,6 +36,7 @@
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.AddressManager;
@@ -45,9 +46,12 @@
 import org.hornetq.core.postoffice.DuplicateIDCache;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.postoffice.QueueInfo;
+import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
@@ -423,7 +427,7 @@
    public synchronized void addBinding(final Binding binding) throws Exception
    {
       addressManager.addBinding(binding);
-      
+
       TypedProperties props = new TypedProperties();
 
       props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, binding.getType().toInt());
@@ -515,8 +519,8 @@
       return addressManager.getMatchingBindings(address);
    }
 
-   public void route(final ServerMessage message, Transaction tx) throws Exception
-   {
+   public void route(final ServerMessage message, final RoutingContext context) throws Exception
+   {      
       SimpleString address = message.getDestination();
       
       byte[] duplicateIDBytes = null;
@@ -540,15 +544,15 @@
 
          if (cache.contains(duplicateIDBytes))
          {
-            if (tx == null)
+            if (context.getTransaction() == null)
             {
-               log.trace("Duplicate message detected - message will not be routed");
+               log.trace("Duplicate message detected - message will not be routed");               
             }
             else
             {
-               log.trace("Duplicate message detected - transaction will be rejected");
+               log.trace("Duplicate message detected - transaction will be rejected");               
 
-               tx.markAsRollbackOnly(null);
+               context.getTransaction().markAsRollbackOnly(null);
             }
 
             return;
@@ -559,23 +563,26 @@
 
       if (cache != null)
       {
-         if (tx == null)
+         if (context.getTransaction() == null)
          {
             // We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
 
-            tx = new TransactionImpl(storageManager);
+            Transaction tx = new TransactionImpl(storageManager);
+            
+            context.setTransaction(tx);
 
             startedTx = true;
          }
 
-         cache.addToCache(duplicateIDBytes, tx);
+         cache.addToCache(duplicateIDBytes, context.getTransaction());
       }
 
-      if (tx == null)
+      if (context.getTransaction() == null)
       {
          if (pagingManager.page(message, true))
          {
             message.setStored();
+            
             return;
          }
       }
@@ -583,80 +590,142 @@
       {
          SimpleString destination = message.getDestination();
 
-         boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
+         boolean depage = context.getTransaction().getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
 
          if (!depage && pagingManager.isPaging(destination))
          {
-            getPageOperation(tx).addMessageToPage(message);
+            getPageOperation(context.getTransaction()).addMessageToPage(message);
 
             return;
          }
       }
 
       Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
-
-      boolean routed;
       
       if (bindings != null)
       {
-         routed = bindings.route(message, tx);
+         context.incrementDepth();
+         
+         bindings.route(message, context);
+         
+         context.decrementDepth();
       }
-      else
-      {
-         routed = false;
-      }
 
-      if (!routed)
+      //The depth allows for recursion e.g. with diverts - we only want to process the route after any recursed routes
+      //have been processed
+      
+      if (context.getDepth() == 0)
       {
-         AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
-         
-         boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
-         
-         if (sendToDLA)
+         if (context.getQueues().isEmpty())
          {
-            //Send to the DLA for the address
-            
-            SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
-            
-            if (dlaAddress == null)
+            // Send to DLA if appropriate
+   
+            AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
+   
+            boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
+   
+            if (sendToDLA)
             {
-               log.warn("Did not route to any bindings for address " + address + " and sendToDLAOnNoRoute is true " + 
-                        "but there is no DLA configured for the address, the message will be ignored.");
+               // Send to the DLA for the address
+   
+               SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
+   
+               if (dlaAddress == null)
+               {
+                  log.warn("Did not route to any bindings for address " + address +
+                           " and sendToDLAOnNoRoute is true " +
+                           "but there is no DLA configured for the address, the message will be ignored.");
+               }
+               else
+               {
+                  message.setOriginalHeaders(message, false);
+   
+                  message.setDestination(dlaAddress);
+   
+                  route(message, context);
+               }
             }
-            else
-            {
-               message.setOriginalHeaders(message, false);
-               
-               message.setDestination(dlaAddress);
-               
-               route(message, tx);
-            }
          }
+         else
+         {
+            processRoute(message, context);
+         }
+   
+         if (startedTx)
+         {
+            context.getTransaction().commit();
+         }
       }
+   }
+   
+   public MessageReference reroute(final ServerMessage message, final Queue queue, final Transaction tx) throws Exception
+   {
+      MessageReference reference = message.createReference(queue);
 
-      if (startedTx)
+      Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
+
+      if (scheduledDeliveryTime != null)
       {
-         tx.commit();
+         reference.setScheduledDeliveryTime(scheduledDeliveryTime);
       }
+
+      message.incrementDurableRefCount();
+
+      message.setStored();
+         
+      int refCount = message.incrementRefCount();
+
+      PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+      if (refCount == 1)
+      {
+         store.addSize(message.getMemoryEstimate());
+      }
+
+      store.addSize(reference.getMemoryEstimate());
+
+      if (tx == null)
+      {
+         queue.addLast(reference);
+      }
+      else
+      {
+         List<MessageReference> refs = new ArrayList<MessageReference>(1);
+         
+         refs.add(reference);
+         
+         tx.addOperation(new AddOperation(refs));
+      }
+      
+      return reference;
    }
-
+   
    public void route(final ServerMessage message) throws Exception
    {
-      route(message, null);
+      route(message, new RoutingContextImpl(null));
    }
 
-   public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final Transaction tx) throws Exception
-   {
+   public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final RoutingContext context) throws Exception
+   {      
       Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getDestination());
 
+      boolean res = false;
+      
       if (bindings != null)
       {
-         return bindings.redistribute(message, originatingQueue, tx);
+         bindings.redistribute(message, originatingQueue, context);
+         
+         if (!context.getQueues().isEmpty())
+         {
+            processRoute(message, context);
+            
+            res = true;
+         }         
       }
-      else
-      {
-         return false;
-      }
+      
+      log.info("redistribute called res is " + res);
+      
+      return res;
    }
 
    public PagingManager getPagingManager()
@@ -711,8 +780,9 @@
          message.setBody(ChannelBuffers.EMPTY_BUFFER);
          message.setDestination(queueName);
          message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
-         queue.preroute(message, null);
-         queue.route(message, null);
+         // queue.preroute(message, null);
+         // queue.route(message, null);
+         routeDirect(message, queue, false);
 
          for (QueueInfo info : queueInfos.values())
          {
@@ -727,7 +797,7 @@
                message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
                message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
 
-               routeDirect(queue, message);
+               routeDirect(message, queue, true);
 
                int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
 
@@ -740,7 +810,7 @@
                   message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
                   message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
 
-                  routeDirect(queue, message);
+                  routeDirect(message, queue, true);
                }
 
                if (info.getFilterStrings() != null)
@@ -755,7 +825,7 @@
                      message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
                      message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
 
-                     routeDirect(queue, message);
+                     routeDirect(message, queue, true);
                   }
                }
             }
@@ -766,9 +836,107 @@
 
    // Private -----------------------------------------------------------------
 
+   private void routeDirect(final ServerMessage message, final Queue queue, final boolean applyFilters) throws Exception
+   {
+      if (!applyFilters || queue.getFilter() == null || queue.getFilter().match(message))
+      {
+         RoutingContext context = new RoutingContextImpl(null);
+
+         queue.route(message, context);
+
+         processRoute(message, context);
+      }
+   }
+
+   private void processRoute(final ServerMessage message, final RoutingContext context) throws Exception
+   {
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+
+      Transaction tx = context.getTransaction();
+      
+      for (Queue queue : context.getQueues())
+      {
+         MessageReference reference = message.createReference(queue);
+
+         refs.add(reference);
+
+         Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
+
+         if (scheduledDeliveryTime != null)
+         {
+            reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+         }
+
+         if (message.isDurable() && queue.isDurable())
+         {
+            int durableRefCount = message.incrementDurableRefCount();
+
+            if (durableRefCount == 1)
+            {
+               if (tx != null)
+               {
+                  storageManager.storeMessageTransactional(tx.getID(), message);
+               }
+               else
+               {
+                  storageManager.storeMessage(message);
+               }
+
+               message.setStored();
+            }
+
+            if (tx != null)
+            {
+               storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
+
+               tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+            }
+            else
+            {
+               storageManager.storeReference(queue.getID(), message.getMessageID());
+            }
+
+            if (scheduledDeliveryTime != null)
+            {
+               if (tx != null)
+               {
+                  storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
+               }
+               else
+               {
+                  storageManager.updateScheduledDeliveryTime(reference);
+               }
+            }
+         }
+
+         int refCount = message.incrementRefCount();
+
+         PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+         if (refCount == 1)
+         {
+            store.addSize(message.getMemoryEstimate());
+         }
+
+         store.addSize(reference.getMemoryEstimate());
+      }
+      
+      if (tx != null)
+      {
+         tx.addOperation(new AddOperation(refs));
+      }
+      else
+      {         
+         for (MessageReference ref : refs)
+         {
+
+            ref.getQueue().addLast(ref);
+         }
+      }
+   }
+   
    private synchronized void startExpiryScanner()
    {
-
       if (reaperPeriod > 0)
       {
          reaperThread = new Thread(reaperRunnable, "HornetQ-expiry-reaper");
@@ -779,15 +947,6 @@
       }
    }
 
-   private void routeDirect(final Queue queue, final ServerMessage message) throws Exception
-   {
-      if (queue.getFilter() == null || queue.getFilter().match(message))
-      {
-         queue.preroute(message, null);
-         queue.route(message, null);
-      }
-   }
-
    private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
    {
       ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
@@ -1008,7 +1167,7 @@
                   // This could happen when the PageStore left the pageState
 
                   // TODO is this correct - don't we lose transactionality here???
-                  route(message, null);
+                  route(message, new RoutingContextImpl(null));
                }
                first = false;
             }
@@ -1026,4 +1185,64 @@
          }
       }
    }
+   
+   private class AddOperation implements TransactionOperation
+   {
+      private final List<MessageReference> refs;
+
+      AddOperation(final List<MessageReference> refs)
+      {
+         this.refs = refs;
+      }
+
+      public void afterCommit(Transaction tx) throws Exception
+      {        
+         for (MessageReference ref : refs)
+         {
+            ref.getQueue().addLast(ref);
+         }
+      }
+
+      public void afterPrepare(Transaction tx) throws Exception
+      {
+      }
+
+      public void afterRollback(Transaction tx) throws Exception
+      {
+      }
+
+      public void beforeCommit(Transaction tx) throws Exception
+      {
+      }
+
+      public void beforePrepare(Transaction tx) throws Exception
+      {
+      }
+
+      public void beforeRollback(Transaction tx) throws Exception
+      {
+         // Reverse the ref counts, and paging sizes
+
+         for (MessageReference ref : refs)
+         {
+            ServerMessage message = ref.getMessage();
+
+            if (message.isDurable() && ref.getQueue().isDurable())
+            {
+               message.decrementDurableRefCount();
+            }
+
+            int count = message.decrementRefCount();
+
+            PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+            if (count == 0)
+            {
+               store.addSize(-message.getMemoryEstimate());
+            }
+
+            store.addSize(-ref.getMemoryEstimate());
+         }
+      }
+   }
 }

Modified: trunk/src/main/org/hornetq/core/server/Bindable.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Bindable.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/Bindable.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -13,7 +13,6 @@
 
 package org.hornetq.core.server;
 
-import org.hornetq.core.transaction.Transaction;
 
 /**
  * A Bindable
@@ -26,8 +25,6 @@
  */
 public interface Bindable
 {
-   void preroute(ServerMessage message, Transaction tx) throws Exception;
-   
-   void route(ServerMessage message, Transaction tx) throws Exception;
+   void route(ServerMessage message, RoutingContext context) throws Exception;
 }
 

Modified: trunk/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/MessageReference.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/MessageReference.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -51,4 +51,6 @@
    void decrementDeliveryCount();
 
    Queue getQueue();
+   
+   void handled();
 }

Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/Queue.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -19,7 +19,6 @@
 import java.util.concurrent.Executor;
 
 import org.hornetq.core.filter.Filter;
-import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.utils.SimpleString;
 
@@ -34,8 +33,6 @@
  */
 public interface Queue extends Bindable
 {
-   MessageReference reroute(ServerMessage message, Transaction tx) throws Exception;
-
    SimpleString getName();
 
    long getID();

Added: trunk/src/main/org/hornetq/core/server/RoutingContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/RoutingContext.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/server/RoutingContext.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server;
+
+import java.util.List;
+
+import org.hornetq.core.transaction.Transaction;
+
+/**
+ * A RoutingContext
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface RoutingContext
+{
+   Transaction getTransaction();
+   
+   void setTransaction(Transaction transaction);
+   
+   void addQueue(Queue queue);
+   
+   List<Queue> getQueues();
+   
+   void incrementDepth();
+   
+   void decrementDepth();
+   
+   int getDepth();
+}

Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -44,6 +44,7 @@
 
    int getMemoryEstimate();
 
+   //TODO - do we really need this? Can't we use durable ref count?
    void setStored() throws Exception;
 
    boolean isStored();

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -392,7 +392,7 @@
 
       synchronized (this)
       {
-         ref.getQueue().referenceHandled();
+         ref.handled();
 
          ServerMessage message = ref.getMessage();
 
@@ -500,10 +500,6 @@
 
    private void fail()
    {
-      log.info("bridge " + name + " has failed");
-      
-      //executor.execute(new FailRunnable());
-      
       //This will get called even after the bridge reconnects - in this case
       //we want to cancel all unacked refs so they get resent
       //duplicate detection will ensure no dups are routed on the other side
@@ -670,8 +666,6 @@
 
          queue.deliverAsync(executor);
          
-         log.info("Bridge " + name + " is now connected to destination ");
-
          return true;
       }
       catch (Exception e)

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -23,6 +23,7 @@
 import org.hornetq.core.server.HandleStatus;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.impl.RoutingContextImpl;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.utils.Future;
@@ -122,7 +123,7 @@
       
       final Transaction tx = new TransactionImpl(storageManager);
 
-      boolean routed = postOffice.redistribute(reference.getMessage(), queue, tx);
+      boolean routed = postOffice.redistribute(reference.getMessage(), queue, new RoutingContextImpl(tx));
 
       if (routed)
       {    
@@ -138,7 +139,7 @@
    
    private void doRedistribute(final MessageReference reference, final Transaction tx) throws Exception
    {
-      queue.referenceHandled();
+      reference.handled();
 
       queue.acknowledge(tx, reference);
 

Modified: trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -17,14 +17,11 @@
 
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.Divert;
+import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.cluster.Transformer;
-import org.hornetq.core.transaction.Transaction;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -54,19 +51,13 @@
 
    private final Transformer transformer;
    
-   private final PagingManager pagingManager;
-   
-   private final StorageManager storageManager;
-
    public DivertImpl(final SimpleString forwardAddress,
                      final SimpleString uniqueName,
                      final SimpleString routingName,
                      final boolean exclusive,
                      final Filter filter,
                      final Transformer transformer,
-                     final PostOffice postOffice,
-                     final PagingManager pagingManager,
-                     final StorageManager storageManager)
+                     final PostOffice postOffice)
    {
       this.forwardAddress = forwardAddress;
 
@@ -81,40 +72,14 @@
       this.transformer = transformer;
 
       this.postOffice = postOffice;
-      
-      this.pagingManager = pagingManager;
-      
-      this.storageManager = storageManager;
    }
 
-   public void preroute(final ServerMessage message, final Transaction tx) throws Exception
-   {
-      //We need to increment ref count here to ensure that the message doesn't get stored, deleted and stored again in a single route which
-      //can occur if the message is routed to a queue, then acked before it's routed here
-      
-      //TODO - combine with similar code in QueueImpl.accept()
-      
-      int count = message.incrementRefCount();
-      
-      if (count == 1)
-      {
-         PagingStore store = pagingManager.getPageStore(message.getDestination());
-         
-         store.addSize(message.getMemoryEstimate());
-      }
-    
-      if (message.isDurable())
-      {
-         message.incrementDurableRefCount();
-      }     
-   }
-
-   public void route(ServerMessage message, final Transaction tx) throws Exception
+   public void route(ServerMessage message, final RoutingContext context) throws Exception
    {      
       SimpleString originalDestination = message.getDestination();
 
       message.setDestination(forwardAddress);
-
+      
       message.putStringProperty(HDR_ORIGINAL_DESTINATION, originalDestination);
 
       if (transformer != null)
@@ -122,30 +87,7 @@
          message = transformer.transform(message);
       }
 
-      postOffice.route(message, tx);
-      
-      //Decrement the ref count here - and delete the message if necessary
-      
-      //TODO combine this with code in QueueImpl::postAcknowledge
-      
-      if (message.isDurable())
-      {
-         int count = message.decrementDurableRefCount();
-
-         if (count == 0)
-         {
-            storageManager.deleteMessage(message.getMessageID());
-         }
-      }
-
-      // TODO: We could optimize this by storing the paging-store for the address on the Queue. We would need to know
-      // the Address for the Queue
-      PagingStore store = pagingManager.getPageStore(message.getDestination());
-
-      if (message.decrementRefCount() == 0)
-      {
-         store.addSize(-message.getMemoryEstimate());         
-      }
+      postOffice.route(message, context);          
    }
 
    public SimpleString getRoutingName()

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -80,7 +80,6 @@
 import org.hornetq.core.server.Divert;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.MemoryManager;
-import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.QueueFactory;
 import org.hornetq.core.server.ServerSession;
@@ -91,9 +90,7 @@
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
 import org.hornetq.core.transaction.ResourceManager;
-import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.impl.ResourceManagerImpl;
-import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.core.version.Version;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.HornetQThreadFactory;
@@ -572,7 +569,7 @@
                                                               executorFactory.getExecutor(),
                                                               channel,
                                                               managementService,
-                                                              queueFactory,
+                                                              // queueFactory,
                                                               this,
                                                               configuration.getManagementAddress());
 
@@ -763,35 +760,6 @@
       return nodeID;
    }
 
-   public void handleReplicateRedistribution(final SimpleString queueName, final long messageID) throws Exception
-   {
-      Binding binding = postOffice.getBinding(queueName);
-
-      if (binding == null)
-      {
-         throw new IllegalStateException("Cannot find queue " + queueName);
-      }
-
-      Queue queue = (Queue)binding.getBindable();
-
-      MessageReference reference = queue.removeFirstReference(messageID);
-
-      Transaction tx = new TransactionImpl(storageManager);
-
-      boolean routed = postOffice.redistribute(reference.getMessage(), queue, tx);
-
-      if (routed)
-      {
-         queue.acknowledge(tx, reference);
-
-         tx.commit();
-      }
-      else
-      {
-         throw new IllegalStateException("Must be routed");
-      }
-   }
-
    public Queue createQueue(final SimpleString address,
                             final SimpleString queueName,
                             final SimpleString filterString,
@@ -924,8 +892,6 @@
          {
             // Complete the startup procedure
 
-            log.info("Activating server");
-
             configuration.setBackup(false);
 
             initialisePart2();
@@ -977,9 +943,8 @@
 
    private void initialiseLogging()
    {
-      LogDelegateFactory logDelegateFactory =
-         (LogDelegateFactory)instantiateInstance(configuration.getLogDelegateFactoryClassName());
-      
+      LogDelegateFactory logDelegateFactory = (LogDelegateFactory)instantiateInstance(configuration.getLogDelegateFactoryClassName());
+
       Logger.setDelegateFactory(logDelegateFactory);
    }
 
@@ -1188,7 +1153,7 @@
 
       Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
 
-      storageManager.loadMessageJournal(pagingManager, resourceManager, queues, duplicateIDMap);
+      storageManager.loadMessageJournal(postOffice, pagingManager, resourceManager, queues, duplicateIDMap);
 
       for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet())
       {
@@ -1267,8 +1232,13 @@
          filter = new FilterImpl(filterString);
       }
 
-      final Queue queue = queueFactory.createQueue(storageManager.generateUniqueID(), address, queueName, filter, durable, temporary);
-      
+      final Queue queue = queueFactory.createQueue(storageManager.generateUniqueID(),
+                                                   address,
+                                                   queueName,
+                                                   filter,
+                                                   durable,
+                                                   temporary);
+
       binding = new LocalQueueBinding(address, queue, nodeID);
 
       if (durable)
@@ -1335,9 +1305,9 @@
                                         config.isExclusive(),
                                         filter,
                                         transformer,
-                                        postOffice,
-                                        pagingManager,
-                                        storageManager);
+                                        postOffice);
+         // pagingManager,
+         // storageManager);
 
          Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
 

Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -12,55 +12,48 @@
  */
 package org.hornetq.core.server.impl;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.transaction.Transaction;
 import org.hornetq.utils.SimpleString;
 
 /**
  * A queue that will discard messages if a newer message with the same MessageImpl.HDR_LAST_VALUE_NAME property value.
  * In other words it only retains the last value
+ * 
  * This is useful for example, for stock prices, where you're only interested in the latest value
  * for a particular stock
+ * 
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a> rewrite
  */
 public class LastValueQueue extends QueueImpl
 {
    private static final Logger log = Logger.getLogger(LastValueQueue.class);
 
-   private final Map<SimpleString, ServerMessage> map = new HashMap<SimpleString, ServerMessage>();
+   private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<SimpleString, HolderReference>();
 
-   private final PagingManager pagingManager;
-
-   private final StorageManager storageManager;
-
    public LastValueQueue(final long persistenceID,
-                        final SimpleString address,
-                        final SimpleString name,
-                        final Filter filter,
-                        final boolean durable,
-                        final boolean temporary,
-                        final ScheduledExecutorService scheduledExecutor,
-                        final PostOffice postOffice,
-                        final StorageManager storageManager,
-                        final HierarchicalRepository<AddressSettings> addressSettingsRepository)
+                         final SimpleString address,
+                         final SimpleString name,
+                         final Filter filter,
+                         final boolean durable,
+                         final boolean temporary,
+                         final ScheduledExecutorService scheduledExecutor,
+                         final PostOffice postOffice,
+                         final StorageManager storageManager,
+                         final HierarchicalRepository<AddressSettings> addressSettingsRepository)
    {
       super(persistenceID,
             address,
@@ -72,213 +65,158 @@
             postOffice,
             storageManager,
             addressSettingsRepository);
-      this.pagingManager = postOffice.getPagingManager();
-      this.storageManager = storageManager;
    }
 
-   public void route(final ServerMessage message, final Transaction tx) throws Exception
+   public synchronized void add(final MessageReference ref, final boolean first)
    {
-      SimpleString prop = (SimpleString)message.getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
+      SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
+
       if (prop != null)
       {
-         synchronized (map)
-         {
-            ServerMessage msg = map.put(prop, message);
-            // if an older message existed then we discard it
-            if (msg != null)
+         HolderReference hr = map.get(prop);
+
+         if (!first)
+         {            
+            if (hr != null)
             {
-               MessageReference ref;
-               if (tx != null)
+               // We need to overwrite the old ref with the new one and ack the old one
+
+               MessageReference oldRef = hr.getReference();
+
+               super.referenceHandled();
+
+               try
                {
-                  discardMessage(msg.getMessageID(), tx);
+                  super.acknowledge(oldRef);
                }
-               else
+               catch (Exception e)
                {
-                  ref = removeReferenceWithID(msg.getMessageID());
-                  if (ref != null)
-                  {
-                     discardMessage(ref, tx);
-                  }
+                  log.error("Failed to ack old reference", e);
                }
 
+               hr.setReference(ref);
+
             }
-         }
-      }
-      super.route(message, tx);
-   }
+            else
+            {
+               hr = new HolderReference(prop, ref);
 
-   public MessageReference reroute(final ServerMessage message, final Transaction tx) throws Exception
-   {
-      SimpleString prop = (SimpleString)message.getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
-      if (prop != null)
-      {
-         synchronized (map)
+               map.put(prop, hr);
+
+               super.add(hr, first);
+            }
+         }
+         else
          {
-            ServerMessage msg = map.put(prop, message);
-            if (msg != null)
+            // Add to front
+
+            if (hr != null)
             {
-               if (tx != null)
+               // We keep the current ref and ack the one we are returning
+
+               super.referenceHandled();
+
+               try
                {
-                  rediscardMessage(msg.getMessageID(), tx);
+                  super.acknowledge(ref);
                }
-               else
+               catch (Exception e)
                {
-                  MessageReference ref = removeReferenceWithID(msg.getMessageID());
-                  rediscardMessage(ref);
+                  log.error("Failed to ack old reference", e);
                }
             }
+            else
+            {
+               map.put(prop, (HolderReference)ref);
+
+               super.add(ref, first);
+            }
          }
       }
-      return super.reroute(message, tx);
+      else
+      {
+         super.add(ref, first);
+      }
    }
 
-   public void acknowledge(final MessageReference ref) throws Exception
+   private class HolderReference implements MessageReference
    {
-      super.acknowledge(ref);
-      SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
-      if (prop != null)
+      private final SimpleString prop;
+
+      private volatile MessageReference ref;
+
+      HolderReference(final SimpleString prop, final MessageReference ref)
       {
-         synchronized (map)
-         {
-            ServerMessage serverMessage = map.get(prop);
-            if (serverMessage != null && ref.getMessage().getMessageID() == serverMessage.getMessageID())
-            {
-               map.remove(prop);
-            }
-         }
+         this.prop = prop;
+
+         this.ref = ref;
       }
-   }
 
-   public void cancel(final Transaction tx, final MessageReference ref) throws Exception
-   {
-      SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
-      if (prop != null)
+      MessageReference getReference()
       {
-         synchronized (map)
-         {
-            ServerMessage msg = map.get(prop);
-            if (msg.getMessageID() == ref.getMessage().getMessageID())
-            {
-               super.cancel(tx, ref);
-            }
-            else
-            {
-               discardMessage(ref, tx);
-            }
-         }
+         return ref;
       }
-      else
+
+      public void handled()
       {
-         super.cancel(tx, ref);
+         // We need to remove the entry from the map just before it gets delivered
+
+         map.remove(prop);
       }
-   }
 
-   void postRollback(final LinkedList<MessageReference> refs) throws Exception
-   {
-      List<MessageReference> refsToDiscard = new ArrayList<MessageReference>();
-      List<SimpleString> refsToClear = new ArrayList<SimpleString>();
-      synchronized (map)
+      void setReference(final MessageReference ref)
       {
-         for (MessageReference ref : refs)
-         {
-            SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
-            if (prop != null)
-            {
-               ServerMessage msg = map.get(prop);
-               if (msg != null)
-               {
-                  if (msg.getMessageID() != ref.getMessage().getMessageID())
-                  {
-                     refsToDiscard.add(ref);
-                  }
-                  else
-                  {
-                     refsToClear.add(prop);
-                  }
-               }
-            }
-         }
-         for (SimpleString simpleString : refsToClear)
-         {
-            map.remove(simpleString);
-         }
+         this.ref = ref;
       }
-      for (MessageReference ref : refsToDiscard)
+
+      public MessageReference copy(Queue queue)
       {
-         refs.remove(ref);
-         discardMessage(ref, null);
+         return ref.copy(queue);
       }
-      super.postRollback(refs);
-   }
 
-   final void discardMessage(MessageReference ref, Transaction tx) throws Exception
-   {
-      deliveringCount.decrementAndGet();
-      PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
-      store.addSize(-ref.getMemoryEstimate());
-      QueueImpl queue = (QueueImpl)ref.getQueue();
-      ServerMessage msg = ref.getMessage();
-      boolean durableRef = msg.isDurable() && queue.isDurable();
+      public void decrementDeliveryCount()
+      {
+         ref.decrementDeliveryCount();
+      }
 
-      if (durableRef)
+      public int getDeliveryCount()
       {
-         int count = msg.decrementDurableRefCount();
+         return ref.getDeliveryCount();
+      }
 
-         if (count == 0)
-         {
-            if (tx == null)
-            {
-               storageManager.deleteMessage(msg.getMessageID());
-            }
-            else
-            {
-               storageManager.deleteMessageTransactional(tx.getID(), getID(), msg.getMessageID());
-            }
-         }
+      public int getMemoryEstimate()
+      {
+         return ref.getMemoryEstimate();
       }
-   }
 
-   final void discardMessage(Long id, Transaction tx) throws Exception
-   {
-      RefsOperation oper = getRefsOperation(tx);
-      Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
+      public ServerMessage getMessage()
+      {
+         return ref.getMessage();
+      }
 
-      while (iterator.hasNext())
+      public Queue getQueue()
       {
-         MessageReference ref = iterator.next();
+         return ref.getQueue();
+      }
 
-         if (ref.getMessage().getMessageID() == id)
-         {
-            iterator.remove();
-            discardMessage(ref, tx);
-            break;
-         }
+      public long getScheduledDeliveryTime()
+      {
+         return ref.getScheduledDeliveryTime();
       }
 
-   }
+      public void incrementDeliveryCount()
+      {
+         ref.incrementDeliveryCount();
+      }
 
-   final void rediscardMessage(long id, Transaction tx) throws Exception
-   {
-      RefsOperation oper = getRefsOperation(tx);
-      Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
-
-      while (iterator.hasNext())
+      public void setDeliveryCount(int deliveryCount)
       {
-         MessageReference ref = iterator.next();
+         ref.setDeliveryCount(deliveryCount);
+      }
 
-         if (ref.getMessage().getMessageID() == id)
-         {
-            iterator.remove();
-            rediscardMessage(ref);
-            break;
-         }
+      public void setScheduledDeliveryTime(long scheduledDeliveryTime)
+      {
+         ref.setScheduledDeliveryTime(scheduledDeliveryTime);
       }
    }
-
-   final void rediscardMessage(MessageReference ref) throws Exception
-   {
-      deliveringCount.decrementAndGet();
-      PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
-      store.addSize(-ref.getMemoryEstimate());
-   }
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -119,6 +119,11 @@
    {
       return queue;
    }
+   
+   public void handled()
+   {
+      queue.referenceHandled();
+   }
 
    // Public --------------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -14,7 +14,6 @@
 package org.hornetq.core.server.impl;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -47,6 +46,7 @@
 import org.hornetq.core.server.HandleStatus;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ScheduledDeliveryHandler;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.cluster.impl.Redistributor;
@@ -210,135 +210,12 @@
    {
       return false;
    }
-
-   public void preroute(final ServerMessage message, final Transaction tx) throws Exception
+   
+   public void route(final ServerMessage message, final RoutingContext context) throws Exception
    {
-      int count = message.incrementRefCount();
-
-      if (count == 1)
-      {
-         PagingStore store = pagingManager.getPageStore(message.getDestination());
-
-         store.addSize(message.getMemoryEstimate());
-      }
-
-      boolean durableRef = message.isDurable() && durable;
-
-      if (durableRef)
-      {
-         message.incrementDurableRefCount();
-      }
+      context.addQueue(this);
    }
 
-   public void route(final ServerMessage message, final Transaction tx) throws Exception
-   {
-      boolean durableRef = message.isDurable() && durable;
-
-      // If durable, must be persisted before anything is routed
-      MessageReference ref = message.createReference(this);
-
-      PagingStore store = pagingManager.getPageStore(message.getDestination());
-
-      store.addSize(ref.getMemoryEstimate());
-
-      Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
-
-      if (scheduledDeliveryTime != null)
-      {
-         ref.setScheduledDeliveryTime(scheduledDeliveryTime);
-      }
-
-      if (tx == null)
-      {
-         if (durableRef)
-         {
-            if (!message.isStored())
-            {
-               storageManager.storeMessage(message);
-
-               message.setStored();
-            }
-
-            storageManager.storeReference(ref.getQueue().getID(), message.getMessageID());
-         }
-
-         if (scheduledDeliveryTime != null && durableRef)
-         {
-            storageManager.updateScheduledDeliveryTime(ref);
-         }
-
-         addLast(ref);
-      }
-      else
-      {
-         if (durableRef)
-         {
-            if (!message.isStored())
-            {
-               storageManager.storeMessageTransactional(tx.getID(), message);
-
-               message.setStored();
-            }
-
-            tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
-
-            storageManager.storeReferenceTransactional(tx.getID(),
-                                                       ref.getQueue().getID(),
-                                                       message.getMessageID());
-         }
-
-         if (scheduledDeliveryTime != null && durableRef)
-         {
-            storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), ref);
-         }
-
-         getRefsOperation(tx).addRef(ref);
-      }
-   }
-
-   public MessageReference reroute(final ServerMessage message, final Transaction tx) throws Exception
-   {
-      MessageReference ref = message.createReference(this);
-
-      int count = message.incrementRefCount();
-
-      PagingStore store = pagingManager.getPageStore(message.getDestination());
-
-      if (count == 1)
-      {
-         store.addSize(message.getMemoryEstimate());
-      }
-
-      store.addSize(ref.getMemoryEstimate());
-
-      boolean durableRef = message.isDurable() && durable;
-
-      if (durableRef)
-      {
-         message.incrementDurableRefCount();
-      }
-
-      Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
-
-      if (scheduledDeliveryTime != null)
-      {
-         ref.setScheduledDeliveryTime(scheduledDeliveryTime);
-      }
-
-      if (tx == null)
-      {
-         addLast(ref);
-      }
-      else
-      {
-         getRefsOperation(tx).addRef(ref);
-      }
-
-      message.setStored();
-
-      return ref;
-   }
-
    // Queue implementation ----------------------------------------------------------------------------------------
 
    public void lockDelivery()
@@ -384,7 +261,7 @@
    }
 
    public void addLast(final MessageReference ref)
-   {
+   {      
       add(ref, false);
    }
 
@@ -1071,7 +948,7 @@
 
       copyMessage.setDestination(toAddress);
 
-      postOffice.route(copyMessage, tx);
+      postOffice.route(copyMessage, new RoutingContextImpl(tx));
 
       acknowledge(tx, ref);
    }
@@ -1158,7 +1035,7 @@
 
       copyMessage.setDestination(address);
 
-      postOffice.route(copyMessage, tx);
+      postOffice.route(copyMessage, new RoutingContextImpl(tx));
 
       acknowledge(tx, ref);
 
@@ -1253,7 +1130,7 @@
                   iterator.remove();
                }
 
-               referenceHandled();
+               reference.handled();
 
                try
                {
@@ -1316,7 +1193,7 @@
       }
    }
 
-   private synchronized void add(final MessageReference ref, final boolean first)
+   protected synchronized void add(final MessageReference ref, final boolean first)
    {
       if (!first)
       {
@@ -1558,14 +1435,11 @@
    {
       synchronized (this)
       {
+         direct = false;
+         
          for (MessageReference ref : refs)
          {
-            ServerMessage msg = ref.getMessage();
-
-            if (!scheduledDeliveryHandler.checkAndSchedule(ref))
-            {
-               messageReferences.addFirst(ref, msg.getPriority());
-            }
+            add(ref, true);            
          }
 
          deliver();
@@ -1633,15 +1507,8 @@
 
    final class RefsOperation implements TransactionOperation
    {
-      List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
-
       List<MessageReference> refsToAck = new ArrayList<MessageReference>();
 
-      synchronized void addRef(final MessageReference ref)
-      {
-         refsToAdd.add(ref);
-      }
-
       synchronized void addAck(final MessageReference ref)
       {
          refsToAck.add(ref);
@@ -1689,28 +1556,8 @@
          }
       }
 
-      /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.TransactionOperation#getDistinctQueues()
-       */
-      public synchronized Collection<Queue> getDistinctQueues()
-      {
-         HashSet<Queue> queues = new HashSet<Queue>();
-
-         for (MessageReference ref : refsToAck)
-         {
-            queues.add(ref.getQueue());
-         }
-
-         return queues;
-      }
-
       public void afterCommit(final Transaction tx) throws Exception
       {
-         for (MessageReference ref : refsToAdd)
-         {
-            ref.getQueue().addLast(ref);
-         }
-
          for (MessageReference ref : refsToAck)
          {
             synchronized (ref.getQueue())
@@ -1726,25 +1573,6 @@
 
       public void beforeRollback(final Transaction tx) throws Exception
       {
-         Set<ServerMessage> msgs = new HashSet<ServerMessage>();
-
-         for (MessageReference ref : refsToAdd)
-         {
-            ServerMessage msg = ref.getMessage();
-
-            // Optimise this
-            PagingStore store = pagingManager.getPageStore(msg.getDestination());
-
-            store.addSize(-ref.getMemoryEstimate());
-
-            if (!msgs.contains(msg))
-            {
-               store.addSize(-msg.getMemoryEstimate());
-               msg.decrementRefCount();
-            }
-
-            msgs.add(msg);
-         }
       }
    }
 

Added: trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
+import org.hornetq.core.transaction.Transaction;
+
+/**
+ * A RoutingContextImpl
+ *
+ * @author tim
+ *
+ *
+ */
+public class RoutingContextImpl implements RoutingContext
+{
+   private List<Queue> queues = new ArrayList<Queue>();
+   
+   private Transaction transaction;
+   
+   private int depth;
+   
+   public RoutingContextImpl(final Transaction transaction)
+   {
+      this.transaction = transaction;
+   }
+
+   public void addQueue(final Queue queue)
+   {  
+      queues.add(queue);
+   }
+
+   public Transaction getTransaction()
+   {
+      return transaction;
+   }
+   
+   public void setTransaction(final Transaction tx)
+   {
+      this.transaction = tx;
+   }
+   
+   public List<Queue> getQueues()
+   {
+      return queues;
+   }
+
+   public void decrementDepth()
+   {
+      depth--;
+   }
+
+   public int getDepth()
+   {
+      return depth;
+   }
+
+   public void incrementDepth()
+   {
+      depth++;
+   }
+
+}

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -499,7 +499,7 @@
                deliveringRefs.add(ref);
             }
 
-            ref.getQueue().referenceHandled();
+            ref.handled();
 
             ref.incrementDeliveryCount();
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -85,7 +85,7 @@
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerConsumer;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.ServerSession;
@@ -159,10 +159,6 @@
 
    private final SimpleString managementAddress;
 
-   private final QueueFactory queueFactory;
-
-   private final SimpleString nodeID;
-
    // The current currentLargeMessage being processed
    // In case of replication, currentLargeMessage should only be accessed within the replication callbacks
    private volatile LargeServerMessage currentLargeMessage;
@@ -187,8 +183,7 @@
                             final SecurityStore securityStore,
                             final Executor executor,
                             final Channel channel,
-                            final ManagementService managementService,
-                            final QueueFactory queueFactory,
+                            final ManagementService managementService,                  
                             final HornetQServer server,
                             final SimpleString managementAddress) throws Exception
    {
@@ -235,10 +230,6 @@
 
       this.managementAddress = managementAddress;
 
-      this.queueFactory = queueFactory;
-
-      this.nodeID = server.getNodeID();
-
       remotingConnection.addFailureListener(this);
 
       remotingConnection.addCloseListener(this);
@@ -1671,11 +1662,6 @@
    // Public
    // ----------------------------------------------------------------------------
 
-   public Transaction getTransaction()
-   {
-      return tx;
-   }
-
    // Private
    // ----------------------------------------------------------------------------
 
@@ -1816,14 +1802,18 @@
          }
          throw e;
       }
-
+      
+      RoutingContext context;
+      
       if (tx == null || autoCommitSends)
       {
-         postOffice.route(msg);
+         context = new RoutingContextImpl(null);
       }
       else
       {
-         postOffice.route(msg, tx);
+         context = new RoutingContextImpl(tx);
       }
+      
+      postOffice.route(msg, context);     
    }
 }

Modified: trunk/src/main/org/hornetq/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/Transaction.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/transaction/Transaction.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -13,12 +13,9 @@
 
 package org.hornetq.core.transaction;
 
-import java.util.Set;
-
 import javax.transaction.xa.Xid;
 
 import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.server.Queue;
 
 /**
  * A HornetQ internal transaction
@@ -62,8 +59,6 @@
    
    Object getProperty(int index);
    
-   Set<Queue> getDistinctQueues();
-
    static enum State
    {
       ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY

Modified: trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -13,10 +13,7 @@
 
 package org.hornetq.core.transaction;
 
-import java.util.Collection;
 
-import org.hornetq.core.server.Queue;
-
 /**
  * 
  * A TransactionOperation
@@ -26,10 +23,6 @@
  */
 public interface TransactionOperation
 {
-   
-   /** rollback will need a distinct list of Queues in order to lock those queues before calling rollback */
-   Collection<Queue> getDistinctQueues();
-   
    void beforePrepare(Transaction tx) throws Exception;
    
    void beforeCommit(Transaction tx) throws Exception;

Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -97,29 +97,6 @@
    // Transaction implementation
    // -----------------------------------------------------------
 
-   public Set<Queue> getDistinctQueues()
-   {
-      HashSet<Queue> queues = new HashSet<Queue>();
-
-      if (operations != null)
-      {
-         for (TransactionOperation op : operations)
-         {
-            Collection<Queue> q = op.getDistinctQueues();
-            if (q == null)
-            {
-               log.warn("Operation " + op + " returned null getDistinctQueues");
-            }
-            else
-            {
-               queues.addAll(q);
-            }
-         }
-      }
-
-      return queues;
-   }
-
    public long getID()
    {
       return id;
@@ -184,7 +161,7 @@
    }
 
    public void commit(boolean onePhase) throws Exception
-   {
+   {      
       synchronized (timeoutLock)
       {
          if (state == State.ROLLBACK_ONLY)

Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -59,8 +59,7 @@
    {
       ObjectName objectName = managementService.getObjectNameBuilder().getJMSServerObjectName();
       JMSServerControlImpl control = new JMSServerControlImpl(server);
-      managementService.registerInJMX(objectName,
-                                      control);
+      managementService.registerInJMX(objectName, control);
       managementService.registerInRegistry(ResourceNames.JMS_SERVER, control);
       return control;
    }
@@ -72,8 +71,7 @@
       managementService.unregisterFromRegistry(ResourceNames.JMS_SERVER);
    }
 
-   public synchronized void registerQueue(final HornetQQueue queue,
-                             final String jndiBinding) throws Exception
+   public synchronized void registerQueue(final HornetQQueue queue, final String jndiBinding) throws Exception
    {
       QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue.getAddress());
       MessageCounterManager messageCounterManager = managementService.getMessageCounterManager();
@@ -85,10 +83,7 @@
                                                   messageCounterManager.getMaxDayCount());
       messageCounterManager.registerMessageCounter(queue.getName(), counter);
       ObjectName objectName = managementService.getObjectNameBuilder().getJMSQueueObjectName(queue.getQueueName());
-      JMSQueueControlImpl control = new JMSQueueControlImpl(queue,
-                                                    coreQueueControl,
-                                                    jndiBinding,
-                                                    counter);
+      JMSQueueControlImpl control = new JMSQueueControlImpl(queue, coreQueueControl, jndiBinding, counter);
       managementService.registerInJMX(objectName, control);
       managementService.registerInRegistry(ResourceNames.JMS_QUEUE + queue.getQueueName(), control);
    }
@@ -100,8 +95,7 @@
       managementService.unregisterFromRegistry(ResourceNames.JMS_QUEUE + name);
    }
 
-   public synchronized void registerTopic(final HornetQTopic topic,
-                             final String jndiBinding) throws Exception
+   public synchronized void registerTopic(final HornetQTopic topic, final String jndiBinding) throws Exception
    {
       ObjectName objectName = managementService.getObjectNameBuilder().getJMSTopicObjectName(topic.getTopicName());
       AddressControl addressControl = (AddressControl)managementService.getResource(ResourceNames.CORE_ADDRESS + topic.getAddress());
@@ -118,8 +112,8 @@
    }
 
    public synchronized void registerConnectionFactory(final String name,
-                                         final HornetQConnectionFactory connectionFactory,
-                                         final List<String> bindings) throws Exception
+                                                      final HornetQConnectionFactory connectionFactory,
+                                                      final List<String> bindings) throws Exception
    {
       ObjectName objectName = managementService.getObjectNameBuilder().getConnectionFactoryObjectName(name);
       JMSConnectionFactoryControlImpl control = new JMSConnectionFactoryControlImpl(connectionFactory, name, bindings);

Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -149,6 +149,8 @@
             ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
 
             assertNotNull(message2);
+            
+            log.info("got message " + message2.getProperty(new SimpleString("id")));
 
             assertEquals(i, ((Integer)message2.getProperty(new SimpleString("id"))).intValue());
 

Modified: trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -92,7 +92,7 @@
 
       ClientConsumer consumer2 = session.createConsumer(queueName2);
 
-      final int numMessages = 10;
+      final int numMessages = 1;
 
       final SimpleString propKey = new SimpleString("testkey");
 

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -12,6 +12,8 @@
  */
 package org.hornetq.tests.integration.jms.connection;
 
+import java.lang.ref.WeakReference;
+
 import javax.jms.Connection;
 import javax.jms.Session;
 
@@ -54,16 +56,14 @@
    public void testCloseOneConnectionOnGC() throws Exception
    {
       Connection conn = cf.createConnection();
+      
+      WeakReference<Connection> wr = new WeakReference<Connection>(conn);
            
       assertEquals(1, server.getRemotingService().getConnections().size());
       
       conn = null;
 
-      System.gc();
-      System.gc();
-      System.gc();
-      
-      Thread.sleep(2000);
+      checkWeakReferences(wr);
                   
       assertEquals(0, server.getRemotingService().getConnections().size());
    }
@@ -74,17 +74,17 @@
       Connection conn2 = cf.createConnection();
       Connection conn3 = cf.createConnection();     
       
+      WeakReference<Connection> wr1 = new WeakReference<Connection>(conn1);
+      WeakReference<Connection> wr2 = new WeakReference<Connection>(conn2);
+      WeakReference<Connection> wr3 = new WeakReference<Connection>(conn3);
+      
       assertEquals(1, server.getRemotingService().getConnections().size());
       
       conn1 = null;
       conn2 = null;
       conn3 = null;
 
-      System.gc();
-      System.gc();
-      System.gc();
-      
-      Thread.sleep(2000);
+      checkWeakReferences(wr1, wr2, wr3);
                      
       assertEquals(0, server.getRemotingService().getConnections().size());
    }
@@ -93,8 +93,12 @@
    {
       Connection conn1 = cf.createConnection();
       Connection conn2 = cf.createConnection();
-      Connection conn3 = cf.createConnection();    
+      Connection conn3 = cf.createConnection();   
       
+      WeakReference<Connection> wr1 = new WeakReference<Connection>(conn1);
+      WeakReference<Connection> wr2 = new WeakReference<Connection>(conn2);
+      WeakReference<Connection> wr3 = new WeakReference<Connection>(conn3);
+      
       Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
       Session sess2 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
       Session sess3 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -109,13 +113,40 @@
       conn2 = null;
       conn3 = null;
       
-      System.gc();
-      System.gc();
-      System.gc();
-      
-      Thread.sleep(2000);
+      checkWeakReferences(wr1, wr2, wr3);
                      
       assertEquals(0, server.getRemotingService().getConnections().size());
    }
    
+   public static void checkWeakReferences(WeakReference<?>... references)
+   {
+
+      int i = 0;
+      boolean hasValue = false;
+
+      do
+      {
+         hasValue = false;
+
+         if (i > 0)
+         {
+            forceGC();
+         }
+
+         for (WeakReference<?> ref : references)
+         {
+            if (ref.get() != null)
+            {
+               hasValue = true;
+            }
+         }
+      }
+      while (i++ <= 30 && hasValue);
+
+      for (WeakReference<?> ref : references)
+      {
+         assertNull(ref.get());
+      }
+   }
+   
 }

Modified: trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -47,7 +47,7 @@
    public void testLargeMessageCopy() throws Exception
    {
       clearData();
-      
+
       Configuration configuration = createDefaultConfig();
 
       configuration.start();
@@ -63,13 +63,15 @@
       byte[] data = new byte[1024];
 
       for (int i = 0; i < 110; i++)
+      {
          msg.addBytes(data);
+      }
 
       ServerMessage msg2 = msg.copy(2);
-      
+
       assertEquals(110 * 1024, msg.getBodySize());
       assertEquals(110 * 1024, msg2.getBodySize());
-      
+
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -24,8 +24,10 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.JournalType;
 import org.hornetq.core.server.Queue;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
 import org.hornetq.tests.util.ServiceTestBase;
 
 /**
@@ -42,7 +44,7 @@
 
    // Constants -----------------------------------------------------
    private static final Logger log = Logger.getLogger(RestartSMTest.class);
-                                                      
+
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------
@@ -62,6 +64,8 @@
 
       configuration.setJournalType(JournalType.ASYNCIO);
 
+      PostOffice postOffice = new FakePostOffice();
+
       final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
       try
       {
@@ -74,7 +78,7 @@
 
          Map<Long, Queue> queues = new HashMap<Long, Queue>();
 
-         journal.loadMessageJournal(null, null, queues, null);
+         journal.loadMessageJournal(postOffice, null, null, queues, null);
 
          journal.stop();
 
@@ -84,7 +88,7 @@
 
          queues = new HashMap<Long, Queue>();
 
-         journal.loadMessageJournal(null, null, queues, null);
+         journal.loadMessageJournal(postOffice, null, null, queues, null);
 
          queueBindingInfos = new ArrayList<QueueBindingInfo>();
 

Modified: trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -21,6 +21,7 @@
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.server.HornetQ;
 import org.hornetq.core.server.HornetQServer;
@@ -33,6 +34,8 @@
  */
 public class LVQTest extends UnitTestCase
 {
+   private static final Logger log = Logger.getLogger(LVQTest.class);
+
    private HornetQServer server;
 
    private ClientSession clientSession;
@@ -127,13 +130,14 @@
       clientSession.start();
       ClientMessage m = consumer.receive(1000);
       assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m1");
       producer.send(m2);
       consumer.close();
       consumer = clientSession.createConsumer(qName1);
       m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m2");
+      assertNotNull(m);      
+      assertEquals("m2", m.getBody().readString());
+      m.acknowledge();      
       m = consumer.receive(1000);
       assertNull(m);
    }

Modified: trunk/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -23,9 +23,11 @@
 import org.hornetq.core.config.impl.FileConfiguration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.JournalType;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
 import org.hornetq.tests.util.UnitTestCase;
 import org.hornetq.utils.SimpleString;
 
@@ -117,12 +119,15 @@
 
       configuration.setJournalType(journalType);
 
-      final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
+      PostOffice postOffice = new FakePostOffice();
+
+      final JournalStorageManager journal = new JournalStorageManager(configuration,
+                                                                      Executors.newCachedThreadPool());
       journal.start();
 
       HashMap<Long, Queue> queues = new HashMap<Long, Queue>();
 
-      journal.loadMessageJournal(null, null, queues, null);
+      journal.loadMessageJournal(postOffice, null, null, queues, null);
 
       final byte[] bytes = new byte[900];
 
@@ -163,11 +168,10 @@
                   final SimpleString address = new SimpleString("Destination " + i);
 
                   ServerMessageImpl implMsg = new ServerMessageImpl(/* type */(byte)1, /* durable */
-                                                                    true, /* expiration */
-                                                                    0,
-                                                                    /* timestamp */0, /* priority */
-                                                                    (byte)0,
-                                                                    ChannelBuffers.wrappedBuffer(new byte[1024]));
+                  true, /* expiration */
+                  0,
+                  /* timestamp */0, /* priority */
+                  (byte)0, ChannelBuffers.wrappedBuffer(new byte[1024]));
 
                   implMsg.putStringProperty(new SimpleString("Key"), new SimpleString("This String is worthless!"));
 

Modified: trunk/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -414,7 +414,7 @@
       bridge.setSourceDestinationFactory(sourceDF);
       bridge.setTargetConnectionFactoryFactory(targetCFF);
       bridge.setTargetDestinationFactory(targetDF);
-      bridge.setFailureRetryInterval(10);
+      bridge.setFailureRetryInterval(100);
       bridge.setMaxRetries(1);
       bridge.setMaxBatchSize(1);
       bridge.setMaxBatchTime(-1);

Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -741,7 +741,7 @@
       {
          return 0;
       }
-      
+
       public SimpleString[] getStoreNames()
       {
          return null;
@@ -940,7 +940,8 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager, java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
        */
-      public void loadMessageJournal(PagingManager pagingManager,
+      public void loadMessageJournal(PostOffice postOffice,
+                                     PagingManager pagingManager,
                                      ResourceManager resourceManager,
                                      Map<Long, Queue> queues,
                                      Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
@@ -1038,7 +1039,7 @@
       {
          return -1;
       }
-      
+
       public void deleteHeuristicCompletion(long txID) throws Exception
       {
       }

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -30,6 +30,7 @@
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.TransactionOperation;
 import org.hornetq.tests.util.UnitTestCase;
@@ -107,11 +108,11 @@
       {
          if (route)
          {
-            bind.route(new FakeMessage(), new FakeTransaction());
+            bind.route(new FakeMessage(), new RoutingContextImpl(new FakeTransaction()));
          }
          else
          {
-            bind.redistribute(new FakeMessage(), queue, new FakeTransaction());
+            bind.redistribute(new FakeMessage(), queue, new RoutingContextImpl(new FakeTransaction()));
          }
       }
    }

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -33,6 +33,7 @@
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.transaction.impl.ResourceManagerImpl;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.utils.Pair;
@@ -47,7 +48,6 @@
  */
 public class DuplicateDetectionUnitTest extends ServiceTestBase
 {
-
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
@@ -77,6 +77,8 @@
 
          Configuration configuration = createDefaultConfig();
 
+         PostOffice postOffice = new FakePostOffice();
+
          configuration.start();
 
          configuration.setJournalType(JournalType.ASYNCIO);
@@ -90,7 +92,8 @@
 
          HashMap<SimpleString, List<Pair<byte[], Long>>> mapDups = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
 
-         journal.loadMessageJournal(new FakePagingManager(),
+         journal.loadMessageJournal(postOffice,
+                                    new FakePagingManager(),
                                     new ResourceManagerImpl(0, 0, scheduledThreadPool),
                                     new HashMap<Long, Queue>(),
                                     mapDups);
@@ -110,7 +113,8 @@
          journal.start();
          journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
 
-         journal.loadMessageJournal(new FakePagingManager(),
+         journal.loadMessageJournal(postOffice,
+                                    new FakePagingManager(),
                                     new ResourceManagerImpl(0, 0, scheduledThreadPool),
                                     new HashMap<Long, Queue>(),
                                     mapDups);
@@ -137,7 +141,8 @@
          journal.start();
          journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
 
-         journal.loadMessageJournal(new FakePagingManager(),
+         journal.loadMessageJournal(postOffice,
+                                    new FakePagingManager(),
                                     new ResourceManagerImpl(0, 0, scheduledThreadPool),
                                     new HashMap<Long, Queue>(),
                                     mapDups);
@@ -196,7 +201,7 @@
       {
          return 0;
       }
-      
+
       public SimpleString[] getStoreNames()
       {
          return null;

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -23,32 +23,27 @@
 import org.hornetq.core.server.Distributor;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.utils.SimpleString;
 
 public class FakeQueue implements Queue
 {
-
    private SimpleString name;
    
    public FakeQueue(SimpleString name)
    {
       this.name = name;
    }
-   
-   public void setExpiryAddress(SimpleString expiryAddress)
-   {
-      // TODO Auto-generated method stub
-      
-   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.server.Queue#acknowledge(org.hornetq.core.server.MessageReference)
     */
    public void acknowledge(MessageReference ref) throws Exception
    {
-
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
@@ -56,32 +51,17 @@
     */
    public void acknowledge(Transaction tx, MessageReference ref) throws Exception
    {
-
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#activate()
-    */
-   public boolean activate()
-   {
-
-      return false;
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#activateNow(java.util.concurrent.Executor)
-    */
-   public void activateNow(Executor executor)
-   {
-
-   }
-
-   /* (non-Javadoc)
     * @see org.hornetq.core.server.Queue#addConsumer(org.hornetq.core.server.Consumer)
     */
    public void addConsumer(Consumer consumer) throws Exception
    {
-
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
@@ -89,7 +69,8 @@
     */
    public void addFirst(MessageReference ref)
    {
-
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
@@ -97,7 +78,8 @@
     */
    public void addLast(MessageReference ref)
    {
-
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
@@ -109,21 +91,22 @@
       
    }
 
-
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#cancel(org.hornetq.core.transaction.Transaction, org.hornetq.core.server.MessageReference)
+    * @see org.hornetq.core.server.Queue#cancel(org.hornetq.core.server.MessageReference)
     */
-   public void cancel(Transaction tx, MessageReference ref) throws Exception
+   public void cancel(MessageReference reference) throws Exception
    {
-
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#cancel(org.hornetq.core.server.MessageReference)
+    * @see org.hornetq.core.server.Queue#cancel(org.hornetq.core.transaction.Transaction, org.hornetq.core.server.MessageReference)
     */
-   public void cancel(MessageReference reference) throws Exception
+   public void cancel(Transaction tx, MessageReference ref) throws Exception
    {
-
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
@@ -131,7 +114,8 @@
     */
    public void cancelRedistributor() throws Exception
    {
-
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
@@ -139,7 +123,7 @@
     */
    public boolean changeReferencePriority(long messageID, byte newPriority) throws Exception
    {
-
+      // TODO Auto-generated method stub
       return false;
    }
 
@@ -148,25 +132,16 @@
     */
    public boolean checkDLQ(MessageReference ref) throws Exception
    {
-
+      // TODO Auto-generated method stub
       return false;
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#consumerFailedOver()
-    */
-   public boolean consumerFailedOver()
-   {
-
-      return false;
-   }
-
-   /* (non-Javadoc)
     * @see org.hornetq.core.server.Queue#deleteAllReferences()
     */
    public int deleteAllReferences() throws Exception
    {
-
+      // TODO Auto-generated method stub
       return 0;
    }
 
@@ -175,7 +150,7 @@
     */
    public int deleteMatchingReferences(Filter filter) throws Exception
    {
-
+      // TODO Auto-generated method stub
       return 0;
    }
 
@@ -184,7 +159,7 @@
     */
    public boolean deleteReference(long messageID) throws Exception
    {
-
+      // TODO Auto-generated method stub
       return false;
    }
 
@@ -193,7 +168,8 @@
     */
    public void deliverAsync(Executor executor)
    {
-
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
@@ -201,7 +177,8 @@
     */
    public void deliverNow()
    {
-
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
@@ -209,7 +186,8 @@
     */
    public void expire(MessageReference ref) throws Exception
    {
-
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
@@ -217,25 +195,26 @@
     */
    public boolean expireReference(long messageID) throws Exception
    {
-
+      // TODO Auto-generated method stub
       return false;
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#expireReferences(org.hornetq.core.filter.Filter)
+    * @see org.hornetq.core.server.Queue#expireReferences()
     */
-   public int expireReferences(Filter filter) throws Exception
+   public void expireReferences() throws Exception
    {
-
-      return 0;
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#expireReferences()
+    * @see org.hornetq.core.server.Queue#expireReferences(org.hornetq.core.filter.Filter)
     */
-   public void expireReferences() throws Exception
+   public int expireReferences(Filter filter) throws Exception
    {
-
+      // TODO Auto-generated method stub
+      return 0;
    }
 
    /* (non-Javadoc)
@@ -243,7 +222,7 @@
     */
    public int getConsumerCount()
    {
-
+      // TODO Auto-generated method stub
       return 0;
    }
 
@@ -252,7 +231,7 @@
     */
    public Set<Consumer> getConsumers()
    {
-
+      // TODO Auto-generated method stub
       return null;
    }
 
@@ -261,7 +240,7 @@
     */
    public int getDeliveringCount()
    {
-
+      // TODO Auto-generated method stub
       return 0;
    }
 
@@ -270,7 +249,7 @@
     */
    public Distributor getDistributionPolicy()
    {
-
+      // TODO Auto-generated method stub
       return null;
    }
 
@@ -279,7 +258,7 @@
     */
    public Filter getFilter()
    {
-
+      // TODO Auto-generated method stub
       return null;
    }
 
@@ -288,7 +267,7 @@
     */
    public int getMessageCount()
    {
-
+      // TODO Auto-generated method stub
       return 0;
    }
 
@@ -297,7 +276,7 @@
     */
    public int getMessagesAdded()
    {
-
+      // TODO Auto-generated method stub
       return 0;
    }
 
@@ -314,7 +293,7 @@
     */
    public long getID()
    {
-
+      // TODO Auto-generated method stub
       return 0;
    }
 
@@ -323,7 +302,7 @@
     */
    public MessageReference getReference(long id)
    {
-
+      // TODO Auto-generated method stub
       return null;
    }
 
@@ -332,7 +311,7 @@
     */
    public int getScheduledCount()
    {
-
+      // TODO Auto-generated method stub
       return 0;
    }
 
@@ -341,25 +320,25 @@
     */
    public List<MessageReference> getScheduledMessages()
    {
-
+      // TODO Auto-generated method stub
       return null;
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#isBackup()
+    * @see org.hornetq.core.server.Queue#isDurable()
     */
-   public boolean isBackup()
+   public boolean isDurable()
    {
-
+      // TODO Auto-generated method stub
       return false;
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#isDurable()
+    * @see org.hornetq.core.server.Queue#isPaused()
     */
-   public boolean isDurable()
+   public boolean isPaused()
    {
-
+      // TODO Auto-generated method stub
       return false;
    }
 
@@ -368,22 +347,35 @@
     */
    public boolean isTemporary()
    {
-
+      // TODO Auto-generated method stub
       return false;
    }
 
    /* (non-Javadoc)
+    * @see org.hornetq.core.server.Queue#iterator()
+    */
+   public Iterator<MessageReference> iterator()
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
+
+   /* (non-Javadoc)
     * @see org.hornetq.core.server.Queue#list(org.hornetq.core.filter.Filter)
     */
    public List<MessageReference> list(Filter filter)
    {
-
+      // TODO Auto-generated method stub
       return null;
    }
-   
-   public Iterator<MessageReference> iterator()
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.Queue#lockDelivery()
+    */
+   public void lockDelivery()
    {
-      return null;
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
@@ -391,7 +383,7 @@
     */
    public boolean moveReference(long messageID, SimpleString toAddress) throws Exception
    {
-
+      // TODO Auto-generated method stub
       return false;
    }
 
@@ -400,16 +392,26 @@
     */
    public int moveReferences(Filter filter, SimpleString toAddress) throws Exception
    {
-
+      // TODO Auto-generated method stub
       return 0;
    }
 
    /* (non-Javadoc)
+    * @see org.hornetq.core.server.Queue#pause()
+    */
+   public void pause()
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
     * @see org.hornetq.core.server.Queue#reacknowledge(org.hornetq.core.transaction.Transaction, org.hornetq.core.server.MessageReference)
     */
    public void reacknowledge(Transaction tx, MessageReference ref) throws Exception
    {
-
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
@@ -417,7 +419,8 @@
     */
    public void referenceHandled()
    {
-
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
@@ -425,7 +428,7 @@
     */
    public boolean removeConsumer(Consumer consumer) throws Exception
    {
-
+      // TODO Auto-generated method stub
       return false;
    }
 
@@ -434,7 +437,7 @@
     */
    public MessageReference removeFirstReference(long id) throws Exception
    {
-
+      // TODO Auto-generated method stub
       return null;
    }
 
@@ -443,17 +446,17 @@
     */
    public MessageReference removeReferenceWithID(long id) throws Exception
    {
-
+      // TODO Auto-generated method stub
       return null;
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#reroute(org.hornetq.core.server.ServerMessage, org.hornetq.core.transaction.Transaction)
+    * @see org.hornetq.core.server.Queue#resume()
     */
-   public MessageReference reroute(ServerMessage message, Transaction tx) throws Exception
+   public void resume()
    {
-
-      return null;
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
@@ -461,82 +464,48 @@
     */
    public boolean sendMessageToDeadLetterAddress(long messageID) throws Exception
    {
-
+      // TODO Auto-generated method stub
       return false;
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#setBackup()
-    */
-   public void setBackup()
-   {
-
-   }
-
-   /* (non-Javadoc)
     * @see org.hornetq.core.server.Queue#setDistributionPolicy(org.hornetq.core.server.Distributor)
     */
    public void setDistributionPolicy(Distributor policy)
    {
-
+      // TODO Auto-generated method stub
+      
    }
 
-
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Bindable#preroute(org.hornetq.core.server.ServerMessage, org.hornetq.core.transaction.Transaction)
+    * @see org.hornetq.core.server.Queue#setExpiryAddress(org.hornetq.utils.SimpleString)
     */
-   public void preroute(ServerMessage message, Transaction tx) throws Exception
+   public void setExpiryAddress(SimpleString expiryAddress)
    {
-
+      // TODO Auto-generated method stub
+      
    }
 
+      // TODO Auto-generated method stub
+      
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Bindable#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.transaction.Transaction)
+    * @see org.hornetq.core.server.Queue#unlockDelivery()
     */
-   public void route(ServerMessage message, Transaction tx) throws Exception
-   {
-
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#lock()
-    */
-   public void lockDelivery()
-   {
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#unlock()
-    */
    public void unlockDelivery()
    {
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#isPaused()
-    */
-   public boolean isPaused()
-   {
       // TODO Auto-generated method stub
-      return false;
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#pause()
-    */
-   public void pause()
-   {
-      // TODO Auto-generated method stub
       
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#resume()
+    * @see org.hornetq.core.server.Bindable#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.RoutingContext)
     */
-   public void resume()
+   public void route(ServerMessage message, RoutingContext context) throws Exception
    {
       // TODO Auto-generated method stub
       
    }
+   
+   
 
 }
\ No newline at end of file

Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java	2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java	2009-10-07 21:01:20 UTC (rev 8064)
@@ -14,14 +14,14 @@
 
 package org.hornetq.tests.unit.core.server.impl.fakes;
 
-import java.util.List;
-
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.DuplicateIDCache;
 import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.utils.SimpleString;
@@ -29,125 +29,141 @@
 public class FakePostOffice implements PostOffice
 {
 
-   public Object getNotificationLock()
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.HornetQComponent#isStarted()
+    */
+   public boolean isStarted()
    {
-      return null;
+      // TODO Auto-generated method stub
+      return false;
    }
 
-   public Bindings getMatchingBindings(SimpleString address)
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.HornetQComponent#start()
+    */
+   public void start() throws Exception
    {
-      return null;
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.postoffice.PostOffice#activate()
+    * @see org.hornetq.core.server.HornetQComponent#stop()
     */
-   public List<Queue> activate()
+   public void stop() throws Exception
    {
-      return null;
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.postoffice.PostOffice#addBinding(org.hornetq.core.postoffice.Binding)
     */
-   public void addBinding(final Binding binding) throws Exception
+   public void addBinding(Binding binding) throws Exception
    {
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.postoffice.PostOffice#getBinding(org.hornetq.utils.SimpleString)
     */
-   public Binding getBinding(final SimpleString uniqueName)
+   public Binding getBinding(SimpleString uniqueName)
    {
+      // TODO Auto-generated method stub
       return null;
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.postoffice.PostOffice#getBindingsForAddress(org.hornetq.utils.SimpleString)
     */
-   public Bindings getBindingsForAddress(final SimpleString address) throws Exception
+   public Bindings getBindingsForAddress(SimpleString address) throws Exception
    {
+      // TODO Auto-generated method stub
       return null;
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.postoffice.PostOffice#getDuplicateIDCache(org.hornetq.utils.SimpleString)
     */
-   public DuplicateIDCache getDuplicateIDCache(final SimpleString address)
+   public DuplicateIDCache getDuplicateIDCache(SimpleString address)
    {
+      // TODO Auto-generated method stub
       return null;
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.postoffice.PostOffice#getPagingManager()
+    * @see org.hornetq.core.postoffice.PostOffice#getMatchingBindings(org.hornetq.utils.SimpleString)
     */
-   public PagingManager getPagingManager()
+   public Bindings getMatchingBindings(SimpleString address)
    {
+      // TODO Auto-generated method stub
       return null;
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.postoffice.PostOffice#redistribute(org.hornetq.core.server.ServerMessage, org.hornetq.utils.SimpleString, org.hornetq.core.transaction.Transaction)
+    * @see org.hornetq.core.postoffice.PostOffice#getNotificationLock()
     */
-   public boolean redistribute(final ServerMessage message, final Queue queue, final Transaction tx) throws Exception
+   public Object getNotificationLock()
    {
-      return false;
+      // TODO Auto-generated method stub
+      return null;
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.postoffice.PostOffice#removeBinding(org.hornetq.utils.SimpleString)
+    * @see org.hornetq.core.postoffice.PostOffice#getPagingManager()
     */
-   public Binding removeBinding(final SimpleString uniqueName) throws Exception
+   public PagingManager getPagingManager()
    {
+      // TODO Auto-generated method stub
       return null;
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.postoffice.PostOffice#route(org.hornetq.core.server.ServerMessage)
+    * @see org.hornetq.core.postoffice.PostOffice#redistribute(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.Queue, org.hornetq.core.server.RoutingContext)
     */
-   public void route(final ServerMessage message) throws Exception
+   public boolean redistribute(ServerMessage message, Queue originatingQueue, RoutingContext context) throws Exception
    {
-
+      // TODO Auto-generated method stub
+      return false;
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.postoffice.PostOffice#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.transaction.Transaction)
+    * @see org.hornetq.core.postoffice.PostOffice#removeBinding(org.hornetq.utils.SimpleString)
     */
-   public void route(final ServerMessage message, final Transaction tx) throws Exception
+   public Binding removeBinding(SimpleString uniqueName) throws Exception
    {
-
+      // TODO Auto-generated method stub
+      return null;
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.postoffice.PostOffice#sendQueueInfoToQueue(org.hornetq.utils.SimpleString, org.hornetq.utils.SimpleString)
+    * @see org.hornetq.core.postoffice.PostOffice#reroute(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.Queue, org.hornetq.core.transaction.Transaction)
     */
-   public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
+   public MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception
    {
-
+      // TODO Auto-generated method stub
+      return null;
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.HornetQComponent#isStarted()
+    * @see org.hornetq.core.postoffice.PostOffice#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.RoutingContext)
     */
-   public boolean isStarted()
+   public void route(ServerMessage message, RoutingContext context) throws Exception
    {
-      return false;
+      // TODO Auto-generated method stub
+      
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.HornetQComponent#start()
+    * @see org.hornetq.core.postoffice.PostOffice#sendQueueInfoToQueue(org.hornetq.utils.SimpleString, org.hornetq.utils.SimpleString)
     */
-   public void start() throws Exception
+   public void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception
    {
-
+      // TODO Auto-generated method stub
+      
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.server.HornetQComponent#stop()
-    */
-   public void stop() throws Exception
-   {
 
-   }
 
 }
\ No newline at end of file



More information about the hornetq-commits mailing list