[jboss-cvs] JBoss Messaging SVN: r5566 - in trunk: src/main/org/jboss/messaging/core/persistence/impl/journal and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jan 2 07:11:25 EST 2009


Author: timfox
Date: 2009-01-02 07:11:25 -0500 (Fri, 02 Jan 2009)
New Revision: 5566

Modified:
   trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/BindableFactory.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/BindableFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/LinkImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java
   trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
Log:
Paging, routing, tx refactoring part II


Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2009-01-02 12:11:25 UTC (rev 5566)
@@ -108,7 +108,7 @@
 
    // Bindings related operations
 
-   void addBinding(Binding binding) throws Exception;
+   void addBinding(Binding binding, boolean duplicateDetection) throws Exception;
 
    void deleteBinding(Binding binding) throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-02 12:11:25 UTC (rev 5566)
@@ -605,7 +605,7 @@
 
    // Bindings operations
 
-   public void addBinding(final Binding binding) throws Exception
+   public void addBinding(final Binding binding, final boolean duplicateDetection) throws Exception
    {
       // We generate the queue id here
 
@@ -644,7 +644,8 @@
                                                             binding.getAddress(),
                                                             filterString,
                                                             binding.isExclusive(),
-                                                            linkAddress);
+                                                            linkAddress,
+                                                            duplicateDetection);
 
       bindingsJournal.appendAddRecord(bindingID, BINDING_RECORD, bindingEncoding);
    }
@@ -741,7 +742,8 @@
                                                      filter,
                                                      true,
                                                      false,
-                                                     bindingEncoding.linkAddress);
+                                                     bindingEncoding.linkAddress,
+                                                     bindingEncoding.duplicateDetection);
             }
 
             Binding binding = new BindingImpl(bindingEncoding.type,
@@ -1113,6 +1115,8 @@
       boolean exclusive;
 
       SimpleString linkAddress;
+      
+      boolean duplicateDetection;
 
       public BindingEncoding()
       {
@@ -1123,7 +1127,8 @@
                              final SimpleString address,
                              final SimpleString filter,
                              final boolean exclusive,
-                             final SimpleString linkAddress)
+                             final SimpleString linkAddress,
+                             final boolean duplicateDetection)
       {
          super();
          this.type = type;
@@ -1132,6 +1137,7 @@
          this.filter = filter;
          this.exclusive = exclusive;
          this.linkAddress = linkAddress;
+         this.duplicateDetection = duplicateDetection;
       }
 
       public void decode(final MessagingBuffer buffer)
@@ -1159,6 +1165,7 @@
          filter = buffer.getNullableSimpleString();
          exclusive = buffer.getBoolean();
          linkAddress = buffer.getNullableSimpleString();
+         duplicateDetection = buffer.getBoolean();
       }
 
       public void encode(final MessagingBuffer buffer)
@@ -1176,6 +1183,7 @@
          buffer.putNullableSimpleString(filter);
          buffer.putBoolean(exclusive);
          buffer.putNullableSimpleString(linkAddress);
+         buffer.putBoolean(duplicateDetection);
       }
       
       public int getEncodeSize()
@@ -1185,7 +1193,8 @@
                 SimpleString.sizeofString(address) +
                 SimpleString.sizeofNullableString(filter) + 
                 SIZE_BOOLEAN +
-                SimpleString.sizeofNullableString(linkAddress);
+                SimpleString.sizeofNullableString(linkAddress) +
+                SIZE_BOOLEAN;
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2009-01-02 12:11:25 UTC (rev 5566)
@@ -57,7 +57,7 @@
 
    private volatile boolean started;
 
-   public void addBinding(final Binding binding) throws Exception
+   public void addBinding(final Binding binding, final boolean duplicateDetection) throws Exception
    {
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-01-02 12:11:25 UTC (rev 5566)
@@ -66,7 +66,8 @@
                           boolean durable,
                           boolean temporary,
                           boolean exclusive,
-                          SimpleString linkAddress) throws Exception;
+                          SimpleString linkAddress,
+                          boolean duplicateDetection) throws Exception;
 
    Binding addQueueBinding(SimpleString name,
                            SimpleString address,

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-02 12:11:25 UTC (rev 5566)
@@ -278,7 +278,7 @@
 
       if (durable)
       {
-         storageManager.addBinding(binding);
+         storageManager.addBinding(binding, false);
       }
 
       return binding;
@@ -290,15 +290,16 @@
                                               final boolean durable,
                                               final boolean temporary,
                                               final boolean exclusive,
-                                              final SimpleString linkAddress) throws Exception
+                                              final SimpleString linkAddress,
+                                              final boolean duplicateDetection) throws Exception
    {
-      Binding binding = createLinkBinding(name, address, filter, durable, temporary, exclusive, linkAddress);
+      Binding binding = createLinkBinding(name, address, filter, durable, temporary, exclusive, linkAddress, duplicateDetection);
 
       addBindingInMemory(binding);
 
       if (durable)
       {
-         storageManager.addBinding(binding);
+         storageManager.addBinding(binding, duplicateDetection);
       }
 
       return binding;
@@ -481,9 +482,10 @@
                                      final boolean durable,
                                      final boolean temporary,
                                      final boolean exclusive,
-                                     final SimpleString linkAddress) throws Exception
+                                     final SimpleString linkAddress,
+                                     final boolean duplicateDetection) throws Exception
    {
-      Bindable bindable = bindableFactory.createLink(-1, name, filter, durable, temporary, linkAddress);
+      Bindable bindable = bindableFactory.createLink(-1, name, filter, durable, temporary, linkAddress, duplicateDetection);
 
       Binding binding = new BindingImpl(BindingType.LINK, address, bindable, exclusive);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/BindableFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/BindableFactory.java	2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/server/BindableFactory.java	2009-01-02 12:11:25 UTC (rev 5566)
@@ -45,7 +45,8 @@
                    Filter filter,
                    boolean durable,
                    boolean temporary,
-                   SimpleString linkAddress);
+                   SimpleString linkAddress,
+                   boolean duplicateDetection);
 
    // TODO - these injectors should not be here!!
 

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java	2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java	2009-01-02 12:11:25 UTC (rev 5566)
@@ -205,7 +205,10 @@
          log.warn("Timed out waiting for batch to be sent");
       }
 
-      session.close();
+      if (session != null)
+      {
+         session.close();
+      }
 
       started = false;
    }
@@ -225,6 +228,7 @@
 
    public HandleStatus handle(final MessageReference reference) throws Exception
    {
+      log.info("Got message, busy: " + busy);
       if (busy)
       {
          return HandleStatus.BUSY;
@@ -236,7 +240,9 @@
          {
             return HandleStatus.BUSY;
          }
+         
          reference.getQueue().referenceHandled();
+         
          refs.add(reference);
 
          if (maxBatchTime != -1)

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java	2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java	2009-01-02 12:11:25 UTC (rev 5566)
@@ -342,7 +342,7 @@
                
                //Create the link
                
-               postOffice.addLinkBinding(linkName, address, filter, true, false, exclusive, queueName);
+               postOffice.addLinkBinding(linkName, address, filter, true, false, exclusive, queueName, useDuplicateDetection);
             }
             
             Queue queue = (Queue)binding.getBindable();

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/BindableFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/BindableFactoryImpl.java	2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/BindableFactoryImpl.java	2009-01-02 12:11:25 UTC (rev 5566)
@@ -99,10 +99,11 @@
                           final Filter filter,
                           final boolean durable,
                           final boolean temporary,
-                          final SimpleString linkAddress)
+                          final SimpleString linkAddress,
+                          final boolean duplicateDetection)
    {
 
-      Link link = new LinkImpl(name, durable, filter, linkAddress, postOffice, storageManager);
+      Link link = new LinkImpl(name, durable, filter, linkAddress, duplicateDetection, postOffice, storageManager);
 
       link.setPersistenceID(persistenceID);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/LinkImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/LinkImpl.java	2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/LinkImpl.java	2009-01-02 12:11:25 UTC (rev 5566)
@@ -22,6 +22,7 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_DUPLICATE_DETECTION_ID;
 import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ORIGIN_QUEUE;
 
 import org.jboss.messaging.core.filter.Filter;
@@ -52,6 +53,8 @@
    private final PostOffice postOffice;
 
    private final SimpleString address;
+   
+   private final boolean duplicateDetection;
 
    private final StorageManager storageManager;
 
@@ -67,6 +70,7 @@
                    final boolean durable,
                    final Filter filter,
                    final SimpleString address,
+                   final boolean useDuplicateDetection,
                    final PostOffice postOffice,
                    final StorageManager storageManager)
    {
@@ -77,6 +81,8 @@
       this.filter = filter;
 
       this.address = address;
+      
+      this.duplicateDetection = useDuplicateDetection;
 
       this.postOffice = postOffice;
 
@@ -95,6 +101,13 @@
       
       copy.putStringProperty(HDR_ORIGIN_QUEUE, originalDestination);
       
+      if (duplicateDetection)
+      {
+         SimpleString duplID = new SimpleString(String.valueOf(copy.getMessageID())).concat(name);
+         
+         copy.putStringProperty(HDR_DUPLICATE_DETECTION_ID, duplID);
+      }
+      
       postOffice.route(copy, tx);
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-02 12:11:25 UTC (rev 5566)
@@ -30,6 +30,7 @@
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.DuplicateIDCache;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.Distributor;
@@ -147,8 +148,37 @@
 
    // Bindable implementation -------------------------------------------------------------------------------------
 
-   public void route(final ServerMessage message, final Transaction tx) throws Exception
+   public void route(final ServerMessage message, Transaction tx) throws Exception
    {     
+      SimpleString duplicateID = (SimpleString)message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
+
+      DuplicateIDCache cache = null;
+
+      if (duplicateID != null)
+      {
+         cache = postOffice.getDuplicateIDCache(message.getDestination());
+
+         if (cache.contains(duplicateID))
+         {
+            log.warn("Duplicate message detected - message will not be routed");
+
+            return;
+         }
+      }
+      
+      boolean durableRef = message.isDurable() && durable;
+      
+      boolean startedTx = false;
+      
+      if (cache != null && tx == null && durableRef)
+      {
+         //We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
+         
+         tx = new TransactionImpl(storageManager, postOffice);
+         
+         startedTx = true;
+      }
+      
       // TODO we can avoid these lookups in the Queue since all messsages in the Queue will be for the same store
       PagingStore store = pagingManager.getPageStore(message.getDestination());
 
@@ -159,9 +189,17 @@
 
          if (!message.isReload())
          {
-            if (message.getDurableRefCount() == 1)
+            if (message.getRefCount() == 1)
             {
-               storageManager.storeMessage(message);
+               if (durableRef)
+               {
+                  storageManager.storeMessage(message);
+               }
+               
+               if (cache != null)
+               {
+                  cache.addToCache(duplicateID);
+               }
             }
 
             Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
@@ -170,7 +208,7 @@
             {
                ref.setScheduledDeliveryTime(scheduledDeliveryTime);
 
-               if (ref.getMessage().isDurable() && durable)
+               if (durableRef)
                {
                   storageManager.updateScheduledDeliveryTime(ref);
                }
@@ -204,17 +242,23 @@
          {
             MessageReference ref = message.createReference(this);
 
-            boolean first = message.getDurableRefCount() == 1;
+            boolean first = message.getRefCount() == 1;
             
-            if (first)
+            if (message.getRefCount() == 1)
             {
-               storageManager.storeMessageTransactional(tx.getID(), message);
+               if (durableRef)
+               {
+                  storageManager.storeMessageTransactional(tx.getID(), message);
+               }
+               
+               if (cache != null)
+               {
+                  cache.addToCache(duplicateID, tx);
+               }
             }
 
             Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
-
-            boolean durableRef = ref.getMessage().isDurable() && durable;
-
+           
             if (scheduledDeliveryTime != null)
             {
                ref.setScheduledDeliveryTime(scheduledDeliveryTime);
@@ -240,6 +284,11 @@
             }
          }
       }
+      
+      if (startedTx)
+      {
+         tx.commit();
+      }
    }
 
    private class AddMessageOperation implements TransactionOperation

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-02 12:11:25 UTC (rev 5566)
@@ -313,8 +313,8 @@
    {
       if (tx != null && tx.getXid() == null)
       {
-         //We only rollback local txs on close, not XA tx branches
-         
+         // We only rollback local txs on close, not XA tx branches
+
          rollback();
       }
 
@@ -348,8 +348,939 @@
       queue.deliverAsync(executor);
    }
 
-   public void doHandleCreateConsumer(final SessionCreateConsumerMessage packet)
+   public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
    {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleCreateConsumer(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleCreateConsumer(packet);
+            }
+         });
+      }
+   }
+
+   public void handleCreateQueue(final SessionCreateQueueMessage packet)
+   {
+      final SendLock lock;
+
+      if (channel.getReplicatingChannel() != null)
+      {
+         lock = postOffice.getAddressLock(packet.getAddress());
+
+         lock.lock();
+      }
+      else
+      {
+         lock = null;
+      }
+
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleCreateQueue(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleCreateQueue(packet);
+
+               lock.unlock();
+            }
+         });
+      }
+   }
+
+   public void handleDeleteQueue(final SessionDeleteQueueMessage packet)
+   {
+      final SendLock lock;
+      if (channel.getReplicatingChannel() != null)
+      {
+         Binding binding = postOffice.getBinding(packet.getQueueName());
+         lock = postOffice.getAddressLock(binding.getAddress());
+
+         lock.lock();
+      }
+      else
+      {
+         lock = null;
+      }
+
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleDeleteQueue(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleDeleteQueue(packet);
+
+               lock.unlock();
+            }
+         });
+      }
+   }
+
+   public void handleExecuteQueueQuery(final SessionQueueQueryMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleExecuteQueueQuery(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleExecuteQueueQuery(packet);
+            }
+         });
+      }
+   }
+
+   public void handleExecuteBindingQuery(final SessionBindingQueryMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleExecuteBindingQuery(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleExecuteBindingQuery(packet);
+            }
+         });
+      }
+   }
+
+   public void handleAcknowledge(final SessionAcknowledgeMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleAcknowledge(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleAcknowledge(packet);
+            }
+         });
+      }
+   }
+
+   public void handleExpired(final SessionExpiredMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleExpired(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleExpired(packet);
+            }
+         });
+      }
+   }
+
+   public void handleCommit(final Packet packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleCommit(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleCommit(packet);
+            }
+         });
+      }
+   }
+
+   public void handleRollback(final Packet packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleRollback(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleRollback(packet);
+            }
+         });
+      }
+   }
+
+   public void handleXACommit(final SessionXACommitMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleXACommit(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleXACommit(packet);
+            }
+         });
+      }
+   }
+
+   public void handleXAEnd(final SessionXAEndMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleXAEnd(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleXAEnd(packet);
+            }
+         });
+      }
+   }
+
+   public void handleXAForget(final SessionXAForgetMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleXAForget(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleXAForget(packet);
+            }
+         });
+      }
+   }
+
+   public void handleXAJoin(final SessionXAJoinMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleXAJoin(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleXAJoin(packet);
+            }
+         });
+      }
+   }
+
+   public void handleXAResume(final SessionXAResumeMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleXAResume(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleXAResume(packet);
+            }
+         });
+      }
+   }
+
+   public void handleXARollback(final SessionXARollbackMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleXARollback(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleXARollback(packet);
+            }
+         });
+      }
+   }
+
+   public void handleXAStart(final SessionXAStartMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleXAStart(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleXAStart(packet);
+            }
+         });
+      }
+   }
+
+   public void handleXASuspend(final Packet packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleXASuspend(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleXASuspend(packet);
+            }
+         });
+      }
+   }
+
+   public void handleXAPrepare(final SessionXAPrepareMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleXAPrepare(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleXAPrepare(packet);
+            }
+         });
+      }
+   }
+
+   public void handleGetInDoubtXids(final Packet packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleGetInDoubtXids(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleGetInDoubtXids(packet);
+            }
+         });
+      }
+   }
+
+   public void handleGetXATimeout(final Packet packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleGetXATimeout(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleGetXATimeout(packet);
+            }
+         });
+      }
+   }
+
+   public void handleSetXATimeout(final SessionXASetTimeoutMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleSetXATimeout(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleSetXATimeout(packet);
+            }
+         });
+      }
+   }
+
+   public void handleAddDestination(final SessionAddDestinationMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleAddDestination(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleAddDestination(packet);
+            }
+         });
+      }
+   }
+
+   public void handleRemoveDestination(final SessionRemoveDestinationMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleRemoveDestination(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleRemoveDestination(packet);
+            }
+         });
+      }
+   }
+
+   private void lockConsumers()
+   {
+      for (ServerConsumer consumer : consumers.values())
+      {
+         consumer.lock();
+      }
+   }
+
+   private void unlockConsumers()
+   {
+      for (ServerConsumer consumer : consumers.values())
+      {
+         consumer.unlock();
+      }
+   }
+
+   public void handleStart(final Packet packet)
+   {
+      boolean lock = channel.getReplicatingChannel() != null;
+
+      if (lock)
+      {
+         lockConsumers();
+      }
+
+      // We need to prevent any delivery and replication of delivery occurring while the start/stop
+      // is being processed.
+      // Otherwise we can end up with start/stop being processed in different order on backup to live.
+      // Which can result in, say, a delivery arriving at backup, but it's still not started!
+      DelayedResult result = null;
+      try
+      {
+         result = channel.replicatePacket(packet);
+
+         // note we process start before response is back from the backup
+
+         setStarted(true);
+      }
+      finally
+      {
+         if (lock)
+         {
+            unlockConsumers();
+         }
+      }
+
+      if (result == null)
+      {
+         channel.confirm(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               channel.confirm(packet);
+            }
+         });
+      }
+   }
+
+   // TODO try removing the lock consumers and see what happens!!
+   public void handleStop(final Packet packet)
+   {
+      boolean lock = channel.getReplicatingChannel() != null;
+
+      if (lock)
+      {
+         lockConsumers();
+      }
+
+      try
+      {
+         DelayedResult result = channel.replicatePacket(packet);
+
+         // note we process stop before response is back from the backup
+
+         final Packet response = new NullResponseMessage();
+
+         setStarted(false);
+
+         if (result == null)
+         {
+            channel.confirm(packet);
+            // Not clustered - just send now
+            channel.send(response);
+         }
+         else
+         {
+            result.setResultRunner(new Runnable()
+            {
+               public void run()
+               {
+                  channel.confirm(packet);
+
+                  channel.send(response);
+               }
+            });
+         }
+      }
+      finally
+      {
+         if (lock)
+         {
+            unlockConsumers();
+         }
+      }
+   }
+
+   public void handleFailedOver(final Packet packet)
+   {
+      Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
+
+      for (ServerConsumer consumer : consumersClone)
+      {
+         consumer.failedOver();
+      }
+   }
+
+   public void handleClose(final Packet packet)
+   {
+      // We need to stop the consumers first before replicating, to ensure no deliveries occur after this,
+      // but we need to process the actual close() when the replication response returns, otherwise things
+      // can happen like acks can come in after close
+
+      for (ServerConsumer consumer : consumers.values())
+      {
+         consumer.setStarted(false);
+      }
+
+      DelayedResult result = channel.replicatePacket(packet);
+
+      if (result == null)
+      {
+         doHandleClose(packet);
+      }
+      else
+      {
+         // Don't process until result has come back from backup
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleClose(packet);
+            }
+         });
+      }
+   }
+
+   public void handleCloseConsumer(final SessionConsumerCloseMessage packet)
+   {
+      // We need to stop the consumer first before replicating, to ensure no deliveries occur after this,
+      // but we need to process the actual close() when the replication response returns, otherwise things
+      // can happen like acks can come in after close
+
+      ServerConsumer consumer = consumers.get(packet.getConsumerID());
+
+      consumer.handleClose(packet);
+   }
+
+   public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      try
+      {
+         // Note we don't wait for response before handling this
+
+         consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to receive credits", e);
+      }
+
+      if (result == null)
+      {
+         channel.confirm(packet);
+      }
+      else
+      {
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               channel.confirm(packet);
+            }
+         });
+      }
+   }
+
+   public void handleSendLargeMessage(final SessionSendMessage packet)
+   {
+      if (packet.getMessageID() <= 0L)
+      {
+         // must generate message id here, so we know they are in sync on live and backup
+         long id = storageManager.generateUniqueID();
+
+         packet.setMessageID(id);
+      }
+
+      DelayedResult result = channel.replicatePacket(packet);
+
+      // With a send we must make sure it is replicated to backup before being processed on live
+      // or can end up with delivery being processed on backup before original send
+
+      if (result == null)
+      {
+         doSendLargeMessage(packet);
+      }
+      else
+      {
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doSendLargeMessage(packet);
+            }
+         });
+      }
+
+   }
+
+   public void handleSend(final SessionSendMessage packet)
+   {
+      // With a send we must make sure it is replicated to backup before being processed on live
+      // or can end up with delivery being processed on backup before original send
+
+      ServerMessage msg = packet.getServerMessage();
+
+      final SendLock lock;
+
+      if (channel.getReplicatingChannel() != null)
+      {
+         lock = postOffice.getAddressLock(msg.getDestination());
+
+         lock.beforeSend();
+      }
+      else
+      {
+         lock = null;
+      }
+
+      if (packet.getMessageID() <= 0L)
+      {
+         // must generate message id here, so we know they are in sync on live and backup
+         long id = storageManager.generateUniqueID();
+
+         packet.setMessageID(id);
+      }
+
+      if (channel.getReplicatingChannel() != null)
+      {
+         msg.putBooleanProperty(new SimpleString("clustered"), true);
+      }
+
+      DelayedResult result = channel.replicatePacket(packet);
+
+      // With a send we must make sure it is replicated to backup before being processed on live
+      // or can end up with delivery being processed on backup before original send
+
+      if (result == null)
+      {
+         doSend(packet);
+      }
+      else
+      {
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doSend(packet);
+
+               lock.afterSend();
+            }
+         });
+      }
+   }
+
+   public void handleSendContinuations(final SessionSendContinuationMessage packet)
+   {
+      DelayedResult result = channel.replicatePacket(packet);
+
+      // With a send we must make sure it is replicated to backup before being processed on live
+      // or can end up with delivery being processed on backup before original send
+
+      if (result == null)
+      {
+         doSendContinuations(packet);
+      }
+      else
+      {
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doSendContinuations(packet);
+            }
+         });
+      }
+   }
+
+   public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)
+   {
+      ServerConsumer consumer = consumers.get(packet.getConsumerID());
+
+      if (consumer == null)
+      {
+         throw new IllegalStateException("Cannot handle replicated delivery, consumer is closed");
+      }
+
+      try
+      {
+         consumer.deliverReplicated(packet.getMessageID());
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to handle replicated delivery", e);
+      }
+   }
+
+   public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
+   {
+      boolean wasStarted = this.started;
+
+      if (wasStarted)
+      {
+         this.setStarted(false);
+      }
+
+      remotingConnection.removeFailureListener(this);
+
+      channel.transferConnection(newConnection);
+
+      RemotingConnection oldReplicatingConnection = newConnection.getReplicatingConnection();
+
+      if (oldReplicatingConnection != null)
+      {
+         oldReplicatingConnection.destroy();
+      }
+
+      newConnection.setReplicatingConnection(remotingConnection.getReplicatingConnection());
+
+      remotingConnection.setReplicatingConnection(null);
+
+      newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
+
+      // Destroy the old connection
+      remotingConnection.destroy();
+
+      remotingConnection = newConnection;
+
+      remotingConnection.addFailureListener(this);
+
+      int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
+
+      channel.replayCommands(lastReceivedCommandID);
+
+      if (wasStarted)
+      {
+         this.setStarted(true);
+      }
+
+      return serverLastReceivedCommandID;
+   }
+
+   public Channel getChannel()
+   {
+      return channel;
+   }
+
+   // FailureListener implementation
+   // --------------------------------------------------------------------
+
+   public boolean connectionFailed(final MessagingException me)
+   {
+      try
+      {
+         log.info("Connection timed out, so clearing up resources for session " + name);
+
+         for (Runnable runner : failureRunners)
+         {
+            try
+            {
+               runner.run();
+            }
+            catch (Throwable t)
+            {
+               log.error("Failed to execute failure runner", t);
+            }
+         }
+
+         // We call handleClose() since we need to replicate the close too, if there is a backup
+         handleClose(new PacketImpl(PacketImpl.SESS_CLOSE));
+
+         log.info("Cleared up resources for session " + name);
+      }
+      catch (Throwable t)
+      {
+         log.error("Failed to close connection " + this);
+      }
+
+      return true;
+   }
+
+   // Public
+   // ----------------------------------------------------------------------------
+
+   public Transaction getTransaction()
+   {
+      return tx;
+   }
+
+   // Private
+   // ----------------------------------------------------------------------------
+
+   private void doHandleCreateConsumer(final SessionCreateConsumerMessage packet)
+   {
       SimpleString queueName = packet.getQueueName();
 
       SimpleString filterString = packet.getFilterString();
@@ -432,29 +1363,8 @@
       channel.send(response);
    }
 
-   public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
+   private void doHandleCreateQueue(final SessionCreateQueueMessage packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleCreateConsumer(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleCreateConsumer(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleCreateQueue(final SessionCreateQueueMessage packet)
-   {
       SimpleString address = packet.getAddress();
 
       SimpleString queueName = packet.getQueueName();
@@ -538,80 +1448,8 @@
       channel.send(response);
    }
 
-   public void handleCreateQueue(final SessionCreateQueueMessage packet)
+   private void doHandleDeleteQueue(final SessionDeleteQueueMessage packet)
    {
-      final SendLock lock;
-
-      if (channel.getReplicatingChannel() != null)
-      {
-         lock = postOffice.getAddressLock(packet.getAddress());
-
-         lock.lock();
-      }
-      else
-      {
-         lock = null;
-      }
-
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleCreateQueue(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleCreateQueue(packet);
-
-               lock.unlock();
-            }
-         });
-      }
-   }
-
-   public void handleDeleteQueue(final SessionDeleteQueueMessage packet)
-   {
-      final SendLock lock;
-      if (channel.getReplicatingChannel() != null)
-      {
-         Binding binding = postOffice.getBinding(packet.getQueueName());
-         lock = postOffice.getAddressLock(binding.getAddress());
-
-         lock.lock();
-      }
-      else
-      {
-         lock = null;
-      }
-
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleDeleteQueue(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleDeleteQueue(packet);
-
-               lock.unlock();
-            }
-         });
-      }
-   }
-
-   public void doHandleDeleteQueue(final SessionDeleteQueueMessage packet)
-   {
       SimpleString queueName = packet.getQueueName();
 
       Packet response = null;
@@ -658,29 +1496,8 @@
       channel.send(response);
    }
 
-   public void handleExecuteQueueQuery(final SessionQueueQueryMessage packet)
+   private void doHandleExecuteQueueQuery(final SessionQueueQueryMessage packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleExecuteQueueQuery(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleExecuteQueueQuery(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleExecuteQueueQuery(final SessionQueueQueryMessage packet)
-   {
       SimpleString queueName = packet.getQueueName();
 
       Packet response = null;
@@ -732,29 +1549,8 @@
       channel.send(response);
    }
 
-   public void handleExecuteBindingQuery(final SessionBindingQueryMessage packet)
+   private void doHandleExecuteBindingQuery(final SessionBindingQueryMessage packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleExecuteBindingQuery(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleExecuteBindingQuery(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleExecuteBindingQuery(final SessionBindingQueryMessage packet)
-   {
       SimpleString address = packet.getAddress();
 
       Packet response = null;
@@ -804,29 +1600,8 @@
       channel.send(response);
    }
 
-   public void handleAcknowledge(final SessionAcknowledgeMessage packet)
+   private void doHandleAcknowledge(final SessionAcknowledgeMessage packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleAcknowledge(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleAcknowledge(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleAcknowledge(final SessionAcknowledgeMessage packet)
-   {
       Packet response = null;
 
       try
@@ -865,29 +1640,8 @@
       }
    }
 
-   public void handleExpired(final SessionExpiredMessage packet)
+   private void doHandleExpired(final SessionExpiredMessage packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleExpired(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleExpired(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleExpired(final SessionExpiredMessage packet)
-   {
       try
       {
          MessageReference ref = consumers.get(packet.getConsumerID()).getExpired(packet.getMessageID());
@@ -906,29 +1660,8 @@
       channel.confirm(packet);
    }
 
-   public void handleCommit(final Packet packet)
+   private void doHandleCommit(final Packet packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleCommit(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleCommit(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleCommit(final Packet packet)
-   {
       Packet response = null;
 
       try
@@ -960,29 +1693,8 @@
       channel.send(response);
    }
 
-   public void handleRollback(final Packet packet)
+   private void doHandleRollback(final Packet packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleRollback(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleRollback(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleRollback(final Packet packet)
-   {
       Packet response = null;
 
       try
@@ -1010,29 +1722,8 @@
       channel.send(response);
    }
 
-   public void handleXACommit(final SessionXACommitMessage packet)
+   private void doHandleXACommit(final SessionXACommitMessage packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleXACommit(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleXACommit(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleXACommit(final SessionXACommitMessage packet)
-   {
       Packet response = null;
 
       Xid xid = packet.getXid();
@@ -1094,29 +1785,8 @@
       channel.send(response);
    }
 
-   public void handleXAEnd(final SessionXAEndMessage packet)
+   private void doHandleXAEnd(final SessionXAEndMessage packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleXAEnd(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleXAEnd(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleXAEnd(final SessionXAEndMessage packet)
-   {
       Packet response = null;
 
       Xid xid = packet.getXid();
@@ -1189,29 +1859,8 @@
       channel.send(response);
    }
 
-   public void handleXAForget(final SessionXAForgetMessage packet)
+   private void doHandleXAForget(final SessionXAForgetMessage packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleXAForget(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleXAForget(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleXAForget(final SessionXAForgetMessage packet)
-   {
       // Do nothing since we don't support heuristic commits / rollback from the
       // resource manager
 
@@ -1222,29 +1871,8 @@
       channel.send(response);
    }
 
-   public void handleXAJoin(final SessionXAJoinMessage packet)
+   private void doHandleXAJoin(final SessionXAJoinMessage packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleXAJoin(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleXAJoin(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleXAJoin(final SessionXAJoinMessage packet)
-   {
       Packet response = null;
 
       Xid xid = packet.getXid();
@@ -1294,29 +1922,8 @@
       channel.send(response);
    }
 
-   public void handleXAResume(final SessionXAResumeMessage packet)
+   private void doHandleXAResume(final SessionXAResumeMessage packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleXAResume(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleXAResume(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleXAResume(final SessionXAResumeMessage packet)
-   {
       Packet response = null;
 
       Xid xid = packet.getXid();
@@ -1377,29 +1984,8 @@
       channel.send(response);
    }
 
-   public void handleXARollback(final SessionXARollbackMessage packet)
+   private void doHandleXARollback(final SessionXARollbackMessage packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleXARollback(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleXARollback(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleXARollback(final SessionXARollbackMessage packet)
-   {
       Packet response = null;
 
       Xid xid = packet.getXid();
@@ -1461,29 +2047,8 @@
       channel.send(response);
    }
 
-   public void handleXAStart(final SessionXAStartMessage packet)
+   private void doHandleXAStart(final SessionXAStartMessage packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleXAStart(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleXAStart(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleXAStart(final SessionXAStartMessage packet)
-   {
       Packet response = null;
 
       Xid xid = packet.getXid();
@@ -1533,29 +2098,8 @@
       channel.send(response);
    }
 
-   public void handleXASuspend(final Packet packet)
+   private void doHandleXASuspend(final Packet packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleXASuspend(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleXASuspend(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleXASuspend(final Packet packet)
-   {
       Packet response = null;
 
       try
@@ -1603,29 +2147,8 @@
       channel.send(response);
    }
 
-   public void handleXAPrepare(final SessionXAPrepareMessage packet)
+   private void doHandleXAPrepare(final SessionXAPrepareMessage packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleXAPrepare(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleXAPrepare(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleXAPrepare(final SessionXAPrepareMessage packet)
-   {
       Packet response = null;
 
       Xid xid = packet.getXid();
@@ -1684,29 +2207,8 @@
       channel.send(response);
    }
 
-   public void handleGetInDoubtXids(final Packet packet)
+   private void doHandleGetInDoubtXids(final Packet packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleGetInDoubtXids(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleGetInDoubtXids(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleGetInDoubtXids(final Packet packet)
-   {
       Packet response = new SessionXAGetInDoubtXidsResponseMessage(resourceManager.getPreparedTransactions());
 
       channel.confirm(packet);
@@ -1714,29 +2216,8 @@
       channel.send(response);
    }
 
-   public void handleGetXATimeout(final Packet packet)
+   private void doHandleGetXATimeout(final Packet packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleGetXATimeout(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleGetXATimeout(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleGetXATimeout(final Packet packet)
-   {
       Packet response = new SessionXAGetTimeoutResponseMessage(resourceManager.getTimeoutSeconds());
 
       channel.confirm(packet);
@@ -1744,29 +2225,8 @@
       channel.send(response);
    }
 
-   public void handleSetXATimeout(final SessionXASetTimeoutMessage packet)
+   private void doHandleSetXATimeout(final SessionXASetTimeoutMessage packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleSetXATimeout(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleSetXATimeout(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleSetXATimeout(final SessionXASetTimeoutMessage packet)
-   {
       Packet response = new SessionXASetTimeoutResponseMessage(resourceManager.setTimeoutSeconds(packet.getTimeoutSeconds()));
 
       channel.confirm(packet);
@@ -1774,29 +2234,8 @@
       channel.send(response);
    }
 
-   public void handleAddDestination(final SessionAddDestinationMessage packet)
+   private void doHandleAddDestination(final SessionAddDestinationMessage packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleAddDestination(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleAddDestination(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleAddDestination(final SessionAddDestinationMessage packet)
-   {
       Packet response = null;
 
       final SimpleString address = packet.getAddress();
@@ -1859,29 +2298,8 @@
       channel.send(response);
    }
 
-   public void handleRemoveDestination(final SessionRemoveDestinationMessage packet)
+   private void doHandleRemoveDestination(final SessionRemoveDestinationMessage packet)
    {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleRemoveDestination(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleRemoveDestination(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleRemoveDestination(final SessionRemoveDestinationMessage packet)
-   {
       Packet response = null;
 
       final SimpleString address = packet.getAddress();
@@ -1919,159 +2337,8 @@
       channel.send(response);
    }
 
-   private void lockConsumers()
+   private void doHandleClose(final Packet packet)
    {
-      for (ServerConsumer consumer : consumers.values())
-      {
-         consumer.lock();
-      }
-   }
-
-   private void unlockConsumers()
-   {
-      for (ServerConsumer consumer : consumers.values())
-      {
-         consumer.unlock();
-      }
-   }
-
-   public void handleStart(final Packet packet)
-   {
-      boolean lock = channel.getReplicatingChannel() != null;
-
-      if (lock)
-      {
-         lockConsumers();
-      }
-
-      // We need to prevent any delivery and replication of delivery occurring while the start/stop
-      // is being processed.
-      // Otherwise we can end up with start/stop being processed in different order on backup to live.
-      // Which can result in, say, a delivery arriving at backup, but it's still not started!
-      DelayedResult result = null;
-      try
-      {
-         result = channel.replicatePacket(packet);
-
-         // note we process start before response is back from the backup
-
-         setStarted(true);
-      }
-      finally
-      {
-         if (lock)
-         {
-            unlockConsumers();
-         }
-      }
-
-      if (result == null)
-      {
-         channel.confirm(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               channel.confirm(packet);
-            }
-         });
-      }
-   }
-
-   // TODO try removing the lock consumers and see what happens!!
-   public void handleStop(final Packet packet)
-   {
-      boolean lock = channel.getReplicatingChannel() != null;
-
-      if (lock)
-      {
-         lockConsumers();
-      }
-
-      try
-      {
-         DelayedResult result = channel.replicatePacket(packet);
-
-         // note we process stop before response is back from the backup
-
-         final Packet response = new NullResponseMessage();
-
-         setStarted(false);
-
-         if (result == null)
-         {
-            channel.confirm(packet);
-            // Not clustered - just send now
-            channel.send(response);
-         }
-         else
-         {
-            result.setResultRunner(new Runnable()
-            {
-               public void run()
-               {
-                  channel.confirm(packet);
-
-                  channel.send(response);
-               }
-            });
-         }
-      }
-      finally
-      {
-         if (lock)
-         {
-            unlockConsumers();
-         }
-      }
-   }
-
-   public void handleFailedOver(final Packet packet)
-   {
-      Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
-
-      for (ServerConsumer consumer : consumersClone)
-      {
-         consumer.failedOver();
-      }
-   }
-
-   public void handleClose(final Packet packet)
-   {
-      // We need to stop the consumers first before replicating, to ensure no deliveries occur after this,
-      // but we need to process the actual close() when the replication response returns, otherwise things
-      // can happen like acks can come in after close
-
-      for (ServerConsumer consumer : consumers.values())
-      {
-         consumer.setStarted(false);
-      }
-
-      DelayedResult result = channel.replicatePacket(packet);
-
-      if (result == null)
-      {
-         doHandleClose(packet);
-      }
-      else
-      {
-         // Don't process until result has come back from backup
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleClose(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleClose(final Packet packet)
-   {
       Packet response = null;
 
       try
@@ -2113,274 +2380,6 @@
       started = s;
    }
 
-   public void handleCloseConsumer(final SessionConsumerCloseMessage packet)
-   {
-      // We need to stop the consumer first before replicating, to ensure no deliveries occur after this,
-      // but we need to process the actual close() when the replication response returns, otherwise things
-      // can happen like acks can come in after close
-
-      ServerConsumer consumer = consumers.get(packet.getConsumerID());
-
-      consumer.handleClose(packet);
-   }
-
-   public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
-   {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      try
-      {
-         // Note we don't wait for response before handling this
-
-         consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to receive credits", e);
-      }
-
-      if (result == null)
-      {
-         channel.confirm(packet);
-      }
-      else
-      {
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               channel.confirm(packet);
-            }
-         });
-      }
-   }
-
-   public void handleSendLargeMessage(final SessionSendMessage packet)
-   {
-
-      if (packet.getMessageID() <= 0L)
-      {
-         // must generate message id here, so we know they are in sync on live and backup
-         long id = storageManager.generateUniqueID();
-
-         packet.setMessageID(id);
-      }
-
-      DelayedResult result = channel.replicatePacket(packet);
-
-      // With a send we must make sure it is replicated to backup before being processed on live
-      // or can end up with delivery being processed on backup before original send
-
-      if (result == null)
-      {
-         doSendLargeMessage(packet);
-      }
-      else
-      {
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doSendLargeMessage(packet);
-            }
-         });
-      }
-
-   }
-
-   public void handleSend(final SessionSendMessage packet)
-   {
-      // With a send we must make sure it is replicated to backup before being processed on live
-      // or can end up with delivery being processed on backup before original send
-
-      ServerMessage msg = packet.getServerMessage();
-
-      final SendLock lock;
-
-      if (channel.getReplicatingChannel() != null)
-      {
-         lock = postOffice.getAddressLock(msg.getDestination());
-
-         lock.beforeSend();
-      }
-      else
-      {
-         lock = null;
-      }
-
-      if (packet.getMessageID() <= 0L)
-      {
-         // must generate message id here, so we know they are in sync on live and backup
-         long id = storageManager.generateUniqueID();
-
-         packet.setMessageID(id);
-      }
-
-      if (channel.getReplicatingChannel() != null)
-      {
-         msg.putBooleanProperty(new SimpleString("clustered"), true);
-      }
-
-      DelayedResult result = channel.replicatePacket(packet);
-
-      // With a send we must make sure it is replicated to backup before being processed on live
-      // or can end up with delivery being processed on backup before original send
-
-      if (result == null)
-      {
-         doSend(packet);
-      }
-      else
-      {
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doSend(packet);
-
-               lock.afterSend();
-            }
-         });
-      }
-   }
-
-   public void handleSendContinuations(final SessionSendContinuationMessage packet)
-   {
-      DelayedResult result = channel.replicatePacket(packet);
-
-      // With a send we must make sure it is replicated to backup before being processed on live
-      // or can end up with delivery being processed on backup before original send
-
-      if (result == null)
-      {
-         doSendContinuations(packet);
-      }
-      else
-      {
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doSendContinuations(packet);
-            }
-         });
-      }
-   }
-
-   public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)
-   {
-      ServerConsumer consumer = consumers.get(packet.getConsumerID());
-
-      if (consumer == null)
-      {
-         throw new IllegalStateException("Cannot handle replicated delivery, consumer is closed");
-      }
-
-      try
-      {
-         consumer.deliverReplicated(packet.getMessageID());
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to handle replicated delivery", e);
-      }
-   }
-
-   public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
-   {
-      boolean wasStarted = this.started;
-
-      if (wasStarted)
-      {
-         this.setStarted(false);
-      }
-
-      remotingConnection.removeFailureListener(this);
-
-      channel.transferConnection(newConnection);
-
-      RemotingConnection oldReplicatingConnection = newConnection.getReplicatingConnection();
-
-      if (oldReplicatingConnection != null)
-      {
-         oldReplicatingConnection.destroy();
-      }
-
-      newConnection.setReplicatingConnection(remotingConnection.getReplicatingConnection());
-
-      remotingConnection.setReplicatingConnection(null);
-
-      newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
-
-      // Destroy the old connection
-      remotingConnection.destroy();
-
-      remotingConnection = newConnection;
-
-      remotingConnection.addFailureListener(this);
-
-      int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
-
-      channel.replayCommands(lastReceivedCommandID);
-
-      if (wasStarted)
-      {
-         this.setStarted(true);
-      }
-
-      return serverLastReceivedCommandID;
-   }
-
-   public Channel getChannel()
-   {
-      return channel;
-   }
-
-   // FailureListener implementation
-   // --------------------------------------------------------------------
-
-   public boolean connectionFailed(final MessagingException me)
-   {
-      try
-      {
-         log.info("Connection timed out, so clearing up resources for session " + name);
-
-         for (Runnable runner : failureRunners)
-         {
-            try
-            {
-               runner.run();
-            }
-            catch (Throwable t)
-            {
-               log.error("Failed to execute failure runner", t);
-            }
-         }
-
-         // We call handleClose() since we need to replicate the close too, if there is a backup
-         handleClose(new PacketImpl(PacketImpl.SESS_CLOSE));
-
-         log.info("Cleared up resources for session " + name);
-      }
-      catch (Throwable t)
-      {
-         log.error("Failed to close connection " + this);
-      }
-
-      return true;
-   }
-
-   // Public
-   // ----------------------------------------------------------------------------
-
-   public Transaction getTransaction()
-   {
-      return tx;
-   }
-
-   // Private
-   // ----------------------------------------------------------------------------
-
    private void doSendLargeMessage(final SessionSendMessage packet)
    {
       Packet response = null;
@@ -2607,55 +2606,14 @@
       // check the user has write access to this address.
       doSecurity(msg);
 
-      SimpleString duplicateID = (SimpleString)msg.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
-
-      DuplicateIDCache cache = null;
-
-      if (duplicateID != null)
+      if (tx == null || autoCommitSends)
       {
-         cache = postOffice.getDuplicateIDCache(msg.getDestination());
-
-         if (cache.contains(duplicateID))
-         {
-            log.warn("Duplicate message detected - message will not be routed");
-
-            return;
-         }
+         postOffice.route(msg, null);
       }
-
-      Transaction theTx = null;
-      boolean startedTx = false;
-
-      if (!autoCommitSends)
+      else
       {
-         theTx = tx;
+         postOffice.route(msg, tx);
       }
-      else if (cache != null)
-      {
-         theTx = new TransactionImpl(storageManager, postOffice);
-
-         startedTx = true;
-      }
-      
-      if (theTx == null)
-      {
-         postOffice.route(msg, null);         
-      }
-      else
-      { 
-         postOffice.route(msg, theTx);
-
-         // Add to cache in same transaction
-         if (cache != null)
-         {
-            cache.addToCache(duplicateID, theTx);
-         }
-
-         if (startedTx)
-         {
-            theTx.commit();
-         }
-      }
    }
 
    private void doSecurity(final ServerMessage msg) throws Exception

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2009-01-02 12:11:25 UTC (rev 5566)
@@ -365,8 +365,6 @@
             storageManager.commit(id);
          }
 
-         //postOffice.deliver(refsToAdd);
-
          // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
          // transaction until all the messages were added to the queue
          // or else we could deliver the messages out of order
@@ -390,8 +388,7 @@
             {
                operation.afterCommit();
             }
-         }
-                  
+         }                  
       }
    }
 

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java	2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java	2009-01-02 12:11:25 UTC (rev 5566)
@@ -146,9 +146,19 @@
             
             c++;
             
+            //log.info("Got message " + c);
+            
             if (c == num)
             {
                latch.countDown();
+               
+               try
+               {
+                  Thread.sleep(2000);
+               }
+               catch (Exception e)
+               {                  
+               }
             }
          }
          catch (JMSException e)

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2009-01-02 12:11:25 UTC (rev 5566)
@@ -80,9 +80,10 @@
                                  boolean durable,
                                  boolean temporary,
                                  boolean exclusive,
-                                 SimpleString linkAddress) throws Exception
+                                 SimpleString linkAddress,
+                                 boolean duplicateDetection) throws Exception
    {
-      Link link = queueFactory.createLink(-1, queueName, filter, durable, false, linkAddress);
+      Link link = queueFactory.createLink(-1, queueName, filter, durable, false, linkAddress, duplicateDetection);
       Binding binding = new FakeBinding(address, link);
       bindings.put(address, binding);
       return binding;

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeQueueFactory.java	2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeQueueFactory.java	2009-01-02 12:11:25 UTC (rev 5566)
@@ -54,9 +54,9 @@
 	}
 	
 	public Link createLink(long persistenceID, SimpleString name, Filter filter,
-                            boolean durable, boolean temporary, SimpleString linkAddress)
+                            boolean durable, boolean temporary, SimpleString linkAddress, boolean duplicateDetection)
    {
-      Link link =  new LinkImpl(name, durable, filter, linkAddress, postOffice, null);
+      Link link =  new LinkImpl(name, durable, filter, linkAddress, duplicateDetection, postOffice, null);
       
       link.setPersistenceID(persistenceID);
       




More information about the jboss-cvs-commits mailing list