[jboss-cvs] JBoss Messaging SVN: r5060 - in trunk: src/main/org/jboss/messaging/core/persistence/impl/nullpm and 13 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Oct 2 09:43:49 EDT 2008


Author: timfox
Date: 2008-10-02 09:43:48 -0400 (Thu, 02 Oct 2008)
New Revision: 5060

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCloseMessage.java
Removed:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CloseSessionMessage.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
   trunk/src/main/org/jboss/messaging/core/remoting/Packet.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/NullResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Ping.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Pong.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAddDestinationMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
   trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java
   trunk/src/main/org/jboss/messaging/util/JBMThreadFactory.java
   trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java
Log:
More failover and session replication


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -13,6 +13,7 @@
 package org.jboss.messaging.core.client.impl;
 
 import java.util.LinkedList;
+import java.util.Queue;
 import java.util.concurrent.Executor;
 
 import org.jboss.messaging.core.client.ClientMessage;
@@ -55,11 +56,9 @@
    private final Executor sessionExecutor;
 
    private final int clientWindowSize;
+ 
+   private final Queue<ClientMessage> buffer = new LinkedList<ClientMessage>();
 
-  // private final PriorityLinkedList<ClientMessage> buffer = new PriorityLinkedListImpl<ClientMessage>(10);
-   
-   private final LinkedList<ClientMessage> buffer = new LinkedList<ClientMessage>();
-
    private final boolean direct;
 
    private final Runner runner = new Runner();
@@ -75,6 +74,10 @@
    private volatile int creditsToSend;
    
    private volatile boolean cleared;
+   
+   private volatile long lastMessageIDProcessed = -1;
+   
+   private volatile long ignoreMessageID = -1;
 
    // Constructors
    // ---------------------------------------------------------------------------------
@@ -148,7 +151,9 @@
 
             if (!closed && !buffer.isEmpty())
             {
-               ClientMessage m = buffer.removeFirst();
+               ClientMessage m = buffer.poll();
+               
+               lastMessageIDProcessed = m.getMessageID();
 
                boolean expired = m.isExpired();
 
@@ -257,7 +262,7 @@
    {
       return id;
    }
-
+   
    public void handleMessage(final ClientMessage message) throws Exception
    {
       if (closed)
@@ -273,6 +278,12 @@
          return;         
       }
       
+      if (message.getMessageID() == ignoreMessageID)
+      {
+         //Ignore this - this is one resent after failover since was processing in onMessage
+         return;
+      }
+      
       message.onReceipt(session, id);
 
       if (handler != null)
@@ -300,7 +311,7 @@
 
             synchronized (this)
             {
-               buffer.addLast(message);
+               buffer.add(message);
             }
 
             queueExecutor();
@@ -311,7 +322,7 @@
          // Add it to the buffer
          synchronized (this)
          {
-            buffer.addLast(message);
+            buffer.add(message);
 
             notify();
          }
@@ -344,18 +355,27 @@
    {
       return creditsToSend;
    }
+   
+   public void failover()
+   {  
+      // We ignore any message that might be resent on redelivery after failover due to it being
+      // in onMessage and processed not having been called yet
+      ignoreMessageID = lastMessageIDProcessed;
+      
+      buffer.clear();  
+   }
 
    // Public
    // ---------------------------------------------------------------------------------------
 
    // Package protected
-   // ----------------------------------------------------------------------------
+   // ---------------------------------------------------------------------------------------
 
    // Protected
-   // ------------------------------------------------------------------------------------
+   // ---------------------------------------------------------------------------------------
 
    // Private
-   // --------------------------------------------------------------------------------------
+   // ---------------------------------------------------------------------------------------
 
    private void queueExecutor()
    {
@@ -409,7 +429,7 @@
          throw new MessagingException(MessagingException.OBJECT_CLOSED, "Consumer is closed");
       }
    }
-
+   
    private void callOnMessage()
    {
       try
@@ -427,7 +447,7 @@
 
          synchronized (this)
          {
-            message = buffer.removeFirst();
+            message = buffer.poll();
          }
 
          if (message != null)
@@ -439,6 +459,8 @@
             if (!expired)
             {
                onMessageThread = Thread.currentThread();
+               
+               lastMessageIDProcessed = message.getMessageID();
 
                handler.onMessage(message);
             }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -49,4 +49,6 @@
    int getCreditsToSend();
    
    void cleanUp() throws Exception;
+   
+   void failover();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -1,33 +1,94 @@
 /*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
- * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
- * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
- * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
- * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 package org.jboss.messaging.core.client.impl;
 
-import org.jboss.messaging.core.client.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.client.ClientBrowser;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.*;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ConnectionRegistry;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.ResponseNotifier;
 import org.jboss.messaging.core.remoting.impl.ConnectionRegistryImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.*;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionProcessedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.*;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.IDGenerator;
+import org.jboss.messaging.util.JBMThreadFactory;
+import org.jboss.messaging.util.OrderedExecutorFactory;
+import org.jboss.messaging.util.SimpleIDGenerator;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TokenBucketLimiterImpl;
 
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
 /*
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * 
@@ -101,11 +162,11 @@
 
    private final IDGenerator idGenerator = new SimpleIDGenerator(0);
 
-   // Constructors ----------------------------------------------------------------------------  
+   // Constructors ----------------------------------------------------------------------------
 
    public ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
                             final String name,
-                            final boolean xa,         
+                            final boolean xa,
                             final boolean cacheProducers,
                             final boolean autoCommitSends,
                             final boolean autoCommitAcks,
@@ -410,27 +471,26 @@
    {
       checkClosed();
 
-      //We need to make sure we don't get any inflight messages
+      // We need to make sure we don't get any inflight messages
       for (ClientConsumerInternal consumer : consumers.values())
       {
-         consumer.clear();                  
+         consumer.clear();
       }
-      
-      channel.sendBlocking(new PacketImpl(PacketImpl.SESS_ROLLBACK),
-                           new ResponseNotifier()
+
+      channel.sendBlocking(new PacketImpl(PacketImpl.SESS_ROLLBACK), new ResponseNotifier()
       {
          public void onResponseReceived()
          {
-            //This needs to be called on before the blocking thread is awoken
-            //hence the ResponseNotifier
+            // This needs to be called on before the blocking thread is awoken
+            // hence the ResponseNotifier
             for (ClientConsumerInternal consumer : consumers.values())
             {
                consumer.resume();
             }
          }
-      }); 
+      });
    }
-  
+
    public synchronized void close() throws MessagingException
    {
       if (closed)
@@ -444,9 +504,7 @@
       {
          closeChildren();
 
-         Channel channel1 = remotingConnection.getChannel(1, false, -1, true);
-
-         channel1.sendBlocking(new CloseSessionMessage(name));
+         channel.sendBlocking(new SessionCloseMessage());
       }
       catch (Throwable ignore)
       {
@@ -547,7 +605,7 @@
    {
       return name;
    }
-   
+
    public void processed(final long consumerID, final long messageID) throws MessagingException
    {
       checkClosed();
@@ -561,7 +619,7 @@
       else
       {
          channel.send(message);
-      }          
+      }
    }
 
    public void addConsumer(final ClientConsumerInternal consumer)
@@ -661,14 +719,18 @@
    public void handleFailover(final RemotingConnection backupConnection)
    {
       // We lock the channel to prevent any packets to be added to the resend
-      // cache
-      // during the failover process
+      // cache during the failover process
       channel.lock();
 
       try
-      {
+      {         
          channel.transferConnection(backupConnection);
-
+         
+         for (ClientConsumerInternal consumer: consumers.values())
+         {
+            consumer.failover();
+         }
+         
          backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
 
          remotingConnection = backupConnection;
@@ -874,29 +936,33 @@
    {
       checkXA();
 
-      //We need to make sure we don't get any inflight messages
+      // We need to make sure we don't get any inflight messages
       for (ClientConsumerInternal consumer : consumers.values())
       {
-         consumer.clear();                  
+         consumer.clear();
       }
-      
+
       SessionXARollbackMessage packet = new SessionXARollbackMessage(xid);
 
       try
       {
          SessionXAResponseMessage response = (SessionXAResponseMessage)channel.sendBlocking(packet,
-                              new ResponseNotifier()
-         {
-            public void onResponseReceived()
-            {
-               //This needs to be called on before the blocking thread is awoken
-               //hence the ResponseNotifier
-               for (ClientConsumerInternal consumer : consumers.values())
-               {
-                  consumer.resume();
-               }
-            }
-         }); 
+                                                                                            new ResponseNotifier()
+                                                                                            {
+                                                                                               public void onResponseReceived()
+                                                                                               {
+                                                                                                  // This needs to be
+                                                                                                   // called on before
+                                                                                                   // the blocking
+                                                                                                   // thread is awoken
+                                                                                                  // hence the
+                                                                                                   // ResponseNotifier
+                                                                                                  for (ClientConsumerInternal consumer : consumers.values())
+                                                                                                  {
+                                                                                                     consumer.resume();
+                                                                                                  }
+                                                                                               }
+                                                                                            });
 
          if (response.isError())
          {
@@ -969,7 +1035,7 @@
    }
 
    // Public
-   //----------------------------------------------------------------------------
+   // ----------------------------------------------------------------------------
 
    public void setForceNotSameRM(final boolean force)
    {
@@ -987,13 +1053,13 @@
    }
 
    // Protected
-   //----------------------------------------------------------------------------
+   // ----------------------------------------------------------------------------
 
    // Package Private
-   //----------------------------------------------------------------------------
+   // ----------------------------------------------------------------------------
 
    // Private
-   //----------------------------------------------------------------------------
+   // ----------------------------------------------------------------------------
 
    private void checkXA() throws XAException
    {
@@ -1067,7 +1133,7 @@
          producerCache.clear();
       }
 
-      channel.close();
+      channel.close(false);
 
       connectionRegistry.returnConnection(remotingConnection.getID());
 

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -22,6 +22,12 @@
 
 package org.jboss.messaging.core.persistence.impl.nullpm;
 
+import java.util.List;
+import java.util.Map;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.LastPageRecord;
 import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.persistence.StorageManager;
@@ -32,13 +38,10 @@
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.transaction.ResourceManager;
+import org.jboss.messaging.util.IDGenerator;
 import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TimeAndCounterIDGenerator;
 
-import javax.transaction.xa.Xid;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * 
  * A NullStorageManager
@@ -49,10 +52,13 @@
  */
 public class NullStorageManager implements StorageManager
 {
-	private final AtomicLong messageIDSequence = new AtomicLong(0);
-	
-	private final AtomicLong transactionIDSequence = new AtomicLong(0);
-	
+   private static final Logger log = Logger.getLogger(NullStorageManager.class);
+
+   
+   //FIXME - these need to use id generators from 1.4 null storage manager since is not unique across
+   //cluster
+	private final IDGenerator idGenerator = new TimeAndCounterIDGenerator();
+
 	private volatile boolean started;
 	
 	public void addBinding(Binding binding) throws Exception
@@ -136,22 +142,9 @@
 	public long generateUniqueID()
 	{
 	   //FIXME - this needs to use Howard's ID generator from JBM 1.4
-		return messageIDSequence.getAndIncrement();
+		return idGenerator.generateID();
 	}
 	
-	public synchronized void setMaxID(final long id)
-   {
-      if (1 + id > messageIDSequence.get())
-      {
-         messageIDSequence.set(id + 1);
-      }
-   }
-	
-	public long generateTransactionID()
-	{
-		return transactionIDSequence.getAndIncrement();
-	}
-	
 	public synchronized void start() throws Exception
 	{
 		if (started)

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -83,7 +83,7 @@
 
    Set<SimpleString> listAllDestinations();
    
-   void setBackup(boolean backup);
+   void activate();
    
    PagingManager getPagingManager();
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -87,7 +87,8 @@
                          final ManagementService managementService,
                          final boolean checkAllowable,
                          final ResourceManager resourceManager,
-                         final boolean enableWildCardRouting)
+                         final boolean enableWildCardRouting,
+                         final boolean backup)
    {
       this.storageManager = storageManager;
 
@@ -109,6 +110,8 @@
       {
          addressManager = new SimpleAddressManager();
       }
+      
+      this.backup = backup;
    }
 
    // MessagingComponent implementation ---------------------------------------
@@ -195,7 +198,8 @@
    public Binding addBinding(final SimpleString address,
                              final SimpleString queueName,
                              final Filter filter,
-                             final boolean durable, boolean temporary) throws Exception
+                             final boolean durable,
+                             boolean temporary) throws Exception
    {
       Binding binding = createBinding(address, queueName, filter, durable, temporary);
 
@@ -289,29 +293,6 @@
 
    }
 
-   // public void routeFromCluster(final String address, final Message message)
-   // throws Exception
-   // {
-   // List<Binding> bindings = mappings.get(address);
-   //      
-   // for (Binding binding: bindings)
-   // {
-   // Queue queue = binding.getQueue();
-   //         
-   // if (binding.getNodeID() == nodeID)
-   // {
-   // if (queue.getFilter() == null || queue.getFilter().match(message))
-   // {
-   // MessageReference ref = message.createReference(queue);
-   //
-   // //We never route durably from other nodes - so no need to persist
-   //
-   // queue.addLast(ref);
-   // }
-   // }
-   // }
-   // }
-
    public PagingManager getPagingManager()
    {
       return pagingManager;
@@ -327,19 +308,16 @@
       return flowControllers.get(address);
    }
 
-   public void setBackup(final boolean backup)
+   public void activate()
    {
-      if (this.backup != backup)
-      {
-         this.backup = backup;
+      this.backup = false;
+      
+      Map<SimpleString, Binding> nameMap = addressManager.getBindings();
 
-         Map<SimpleString, Binding> nameMap = addressManager.getBindings();
-
-         for (Binding binding : nameMap.values())
-         {
-            binding.getQueue().setBackup(backup);
-         }
-      }
+      for (Binding binding : nameMap.values())
+      {
+         binding.getQueue().activate();
+      }            
    }
 
    // Private -----------------------------------------------------------------
@@ -347,11 +325,15 @@
    private Binding createBinding(final SimpleString address,
                                  final SimpleString name,
                                  final Filter filter,
-                                 final boolean durable, final boolean temporary) throws Exception
+                                 final boolean durable,
+                                 final boolean temporary) throws Exception
    {
       Queue queue = queueFactory.createQueue(-1, name, filter, durable, false);
 
-      queue.setBackup(backup);
+      if (backup)
+      {
+         queue.setBackup();
+      }
 
       Binding binding = new BindingImpl(address, queue);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Channel.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Channel.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -11,6 +11,8 @@
  */
 package org.jboss.messaging.core.remoting;
 
+import java.util.concurrent.Executor;
+
 import org.jboss.messaging.core.exception.MessagingException;
 
 /**
@@ -28,11 +30,11 @@
    
    Packet sendBlocking(Packet packet, ResponseNotifier notifier) throws MessagingException;
 
-   void replicatePacket(Packet packet);
-
+   void replicatePacket(Packet packet) throws MessagingException;
+   
    void setHandler(ChannelHandler handler);
 
-   void close();
+   void close(boolean onExecutorThread);
 
    void fail();
 
@@ -47,4 +49,6 @@
    void lock();
 
    void unlock();
+   
+   Executor getExecutor();
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/Packet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Packet.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Packet.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -44,4 +44,10 @@
    void encode(MessagingBuffer buffer);
       
    void decode(MessagingBuffer buffer);
+   
+   boolean isRequiresConfirmations();
+   
+   boolean isReplicateBlocking();
+   
+   boolean isWriteAlways();
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -12,11 +12,11 @@
 
 package org.jboss.messaging.core.remoting.impl;
 
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CLOSE_SESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NULL_RESPONSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PONG;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
@@ -29,6 +29,7 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_NEXTMESSAGE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_RESET;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEBROWSER;
@@ -40,14 +41,12 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_MANAGEMENT_SEND;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PACKETS_CONFIRMED;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PROCESSED;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVETOKENS;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECOVER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
@@ -94,7 +93,6 @@
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.ResponseNotifier;
-import org.jboss.messaging.core.remoting.impl.wireformat.CloseSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
@@ -114,6 +112,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserNextMessageMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserResetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
@@ -282,7 +281,7 @@
          pinger = null;
       }
    }
-
+   
    // RemotingConnection implementation
    // ------------------------------------------------------------
 
@@ -480,7 +479,7 @@
          future.cancel(false);
       }
 
-      pingChannel.close();
+      pingChannel.close(false);
 
       destroyed = true;
 
@@ -494,7 +493,7 @@
 
       for (Channel channel : channels.values())
       {
-         channel.close();
+         channel.close(false);
       }
    }
 
@@ -530,6 +529,11 @@
             packet = new MessagingExceptionMessage();
             break;
          }
+         case PACKETS_CONFIRMED:
+         {
+            packet = new PacketsConfirmedMessage();
+            break;
+         }
          case CREATESESSION:
          {
             packet = new CreateSessionMessage();
@@ -550,9 +554,9 @@
             packet = new ReattachSessionResponseMessage();
             break;
          }
-         case CLOSE_SESSION:
+         case SESS_CLOSE:
          {
-            packet = new CloseSessionMessage();
+            packet = new SessionCloseMessage();
             break;
          }
          case SESS_CREATECONSUMER:
@@ -585,11 +589,6 @@
             packet = new SessionProcessedMessage();
             break;
          }
-         case SESS_RECOVER:
-         {
-            packet = new PacketImpl(PacketImpl.SESS_RECOVER);
-            break;
-         }
          case SESS_COMMIT:
          {
             packet = new PacketImpl(PacketImpl.SESS_COMMIT);
@@ -775,11 +774,6 @@
             packet = new SessionReceiveMessage();
             break;
          }
-         case SESS_PACKETS_CONFIRMED:
-         {
-            packet = new PacketsConfirmedMessage();
-            break;
-         }
          case SESS_CONSUMER_CLOSE:
          {
             packet = new SessionConsumerCloseMessage();
@@ -797,7 +791,7 @@
          }
          case NULL_RESPONSE:
          {
-            packet = new NullResponseMessage();
+            packet = new NullResponseMessage(false);
             break;
          }
          case SESS_MANAGEMENT_SEND:
@@ -929,8 +923,7 @@
 
             final byte packetType = packet.getType();
 
-            if (connection.writePackets || packetType == SESS_PACKETS_CONFIRMED ||
-                packetType == PONG)
+             if (connection.writePackets || packet.isWriteAlways())
             {
                connection.doWrite(packet);
             }
@@ -942,6 +935,11 @@
       private Thread blockThread;
       
       private ResponseNotifier responseNotifier;
+      
+      public Executor getExecutor()
+      {
+         return executor;
+      }
 
       // This must never called by more than one thread concurrently
       public Packet sendBlocking(final Packet packet) throws MessagingException
@@ -1039,36 +1037,59 @@
          }
       }
 
-      public void replicatePacket(final Packet packet)
+      public void replicatePacket(final Packet packet) throws MessagingException
       {
          if (replicatingChannel != null)
          {
-            replicatingChannel.send(packet);
+            if (packet.isReplicateBlocking())
+            {
+               replicatingChannel.sendBlocking(packet);
+            }
+            else
+            {
+               replicatingChannel.send(packet);
+            }
          }
       }
+      
+      public void replicatePacketBlocking(final Packet packet) throws MessagingException
+      {
+         if (replicatingChannel != null)
+         {
+            replicatingChannel.sendBlocking(packet);
+         }
+      }
 
       public void setHandler(final ChannelHandler handler)
       {
          this.handler = handler;
       }
 
-      public void close()
+      public void close(boolean onExecutorThread)
       {
          if (closed)
          {
             return;
          }
 
-         if (!connection.destroyed && connection.channels.remove(id) == null)
+         synchronized (connection)
+         {         
+            if (!connection.destroyed && connection.channels.remove(id) == null)
+            {
+               throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
+            }                        
+         }
+         
+         if (!onExecutorThread)
          {
-            throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
+            waitForExecutorToComplete();
          }
          
-         waitForExecutorToComplete();
-
          if (replicatingChannel != null)
          {
-            replicatingChannel.close();
+            replicatingChannel.close(false);
+            
+            replicatingChannel = null;
          }
 
          closed = true;
@@ -1158,7 +1179,7 @@
 
       private void handlePacket(final Packet packet)
       {
-         if (packet.getType() == PacketImpl.SESS_PACKETS_CONFIRMED)
+         if (packet.getType() == PACKETS_CONFIRMED)
          {
             if (resendCache != null)
             {
@@ -1187,7 +1208,7 @@
                }
             }
             else if (replicatingChannel != null)
-            {
+            {               
                replicatingChannel.send(packet);
             }
             else
@@ -1229,13 +1250,6 @@
 
                   checkConfirmation(packet);
 
-                  // Shouldn't get responses back on replicating connections - since should never be written
-
-                  if (connection.replicating)
-                  {
-                     throw new IllegalStateException("Got response back on replicating connection " + packet.getType());
-                  }
-                  
                   if (responseNotifier != null)
                   {
                      responseNotifier.onResponseReceived();
@@ -1247,10 +1261,10 @@
             else if (handler != null)
             {
                if (executor == null)
-               {
+               {                  
                   checkConfirmation(packet);
-
-                  handler.handlePacket(packet);
+                  
+                  handler.handlePacket(packet);                                   
                }
                else
                {
@@ -1259,9 +1273,9 @@
                      public void run()
                      {
                         try
-                        {
+                        {     
                            checkConfirmation(packet);
-   
+                           
                            handler.handlePacket(packet);
                         }
                         catch (Exception e)
@@ -1281,7 +1295,7 @@
 
       private void checkConfirmation(final Packet packet)
       {                 
-         if (resendCache != null)
+         if (resendCache != null && packet.isRequiresConfirmations())
          {
             lastReceivedCommandID++;
             
@@ -1330,7 +1344,7 @@
       {
          public void handlePacket(final Packet packet)
          {
-            if (packet.getType() == SESS_PACKETS_CONFIRMED)
+            if (packet.getType() == PACKETS_CONFIRMED)
             {
                // Send it straight back to the client
                connection.doWrite(packet);

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CloseSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CloseSessionMessage.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CloseSessionMessage.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -1,83 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
- * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
- * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
- * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
- * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- */
-public class CloseSessionMessage extends PacketImpl
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private String name;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public CloseSessionMessage(final String name)
-   {
-      super(CLOSE_SESSION);
-
-      this.name = name;
-   }
-
-   public CloseSessionMessage()
-   {
-      super(CLOSE_SESSION);
-   }
-
-   // Public --------------------------------------------------------
-
-   public String getName()
-   {
-      return name;
-   }
-
-   @Override
-   public void encodeBody(final MessagingBuffer buffer)
-   {
-      buffer.putString(name);
-   }
-
-   @Override
-   public void decodeBody(final MessagingBuffer buffer)
-   {
-      name = buffer.getString();
-   }
-
-   @Override
-   public boolean equals(final Object other)
-   {
-      if (other instanceof CloseSessionMessage == false)
-      {
-         return false;
-      }
-
-      CloseSessionMessage r = (CloseSessionMessage)other;
-
-      return super.equals(other) && name == r.name;
-   }
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -172,6 +172,11 @@
          
       return matches;
    }
+   
+   public final boolean isRequiresConfirmations()
+   {
+      return false;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -101,6 +101,11 @@
       
       return matches;
    }
+   
+   public final boolean isRequiresConfirmations()
+   {
+      return false;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/NullResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/NullResponseMessage.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/NullResponseMessage.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -22,13 +22,17 @@
 
    // Attributes ----------------------------------------------------
 
+   private final boolean writeAlways;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public NullResponseMessage()
+   public NullResponseMessage(final boolean writeAlways)
    {
       super(NULL_RESPONSE);
+
+      this.writeAlways = writeAlways;
    }
 
    // Public --------------------------------------------------------
@@ -39,6 +43,11 @@
       return true;
    }
 
+   public boolean isWriteAlways()
+   {
+      return writeAlways;
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -45,18 +45,19 @@
    public static final byte EXCEPTION = 20;
 
    public static final byte NULL_RESPONSE = 21;
+   
+   public static final byte PACKETS_CONFIRMED = 22;
 
+
    // Server
    public static final byte CREATESESSION = 30;
 
    public static final byte CREATESESSION_RESP = 31;
+   
+   public static final byte REATTACH_SESSION = 32;
 
-   public static final byte CLOSE_SESSION = 32;
+   public static final byte REATTACH_SESSION_RESP = 33;
 
-   public static final byte REATTACH_SESSION = 33;
-
-   public static final byte REATTACH_SESSION_RESP = 34;
-
    // Session
    public static final byte SESS_CREATECONSUMER = 40;
 
@@ -72,93 +73,89 @@
 
    public static final byte SESS_PROCESSED = 46;
 
-   public static final byte SESS_RECOVER = 47;
+   public static final byte SESS_COMMIT = 47;
 
-   public static final byte SESS_COMMIT = 48;
-
-   public static final byte SESS_ROLLBACK = 49;
+   public static final byte SESS_ROLLBACK = 48;
    
-   public static final byte SESS_QUEUEQUERY = 51;
+   public static final byte SESS_QUEUEQUERY = 49;
 
-   public static final byte SESS_QUEUEQUERY_RESP = 52;
+   public static final byte SESS_QUEUEQUERY_RESP = 50;
 
-   public static final byte SESS_CREATEQUEUE = 53;
+   public static final byte SESS_CREATEQUEUE = 51;
 
-   public static final byte SESS_DELETE_QUEUE = 54;
+   public static final byte SESS_DELETE_QUEUE = 52;
 
-   public static final byte SESS_ADD_DESTINATION = 55;
+   public static final byte SESS_ADD_DESTINATION = 53;
 
-   public static final byte SESS_REMOVE_DESTINATION = 56;
+   public static final byte SESS_REMOVE_DESTINATION = 54;
 
-   public static final byte SESS_BINDINGQUERY = 57;
+   public static final byte SESS_BINDINGQUERY = 55;
 
-   public static final byte SESS_BINDINGQUERY_RESP = 58;
+   public static final byte SESS_BINDINGQUERY_RESP = 56;
 
-   public static final byte SESS_BROWSER_MESSAGE = 59;
+   public static final byte SESS_BROWSER_MESSAGE = 57;
 
-   public static final byte SESS_BROWSER_RESET = 60;
+   public static final byte SESS_BROWSER_RESET = 58;
 
-   public static final byte SESS_BROWSER_HASNEXTMESSAGE = 61;
+   public static final byte SESS_BROWSER_HASNEXTMESSAGE = 59;
 
-   public static final byte SESS_BROWSER_HASNEXTMESSAGE_RESP = 62;
+   public static final byte SESS_BROWSER_HASNEXTMESSAGE_RESP = 60;
 
-   public static final byte SESS_BROWSER_NEXTMESSAGE = 63;
+   public static final byte SESS_BROWSER_NEXTMESSAGE = 61;
 
-   public static final byte SESS_XA_START = 64;
+   public static final byte SESS_XA_START = 62;
 
-   public static final byte SESS_XA_END = 65;
+   public static final byte SESS_XA_END = 63;
 
-   public static final byte SESS_XA_COMMIT = 66;
+   public static final byte SESS_XA_COMMIT = 64;
 
-   public static final byte SESS_XA_PREPARE = 67;
+   public static final byte SESS_XA_PREPARE = 65;
 
-   public static final byte SESS_XA_RESP = 68;
+   public static final byte SESS_XA_RESP = 66;
 
-   public static final byte SESS_XA_ROLLBACK = 69;
+   public static final byte SESS_XA_ROLLBACK = 67;
 
-   public static final byte SESS_XA_JOIN = 70;
+   public static final byte SESS_XA_JOIN = 68;
 
-   public static final byte SESS_XA_SUSPEND = 71;
+   public static final byte SESS_XA_SUSPEND = 69;
 
-   public static final byte SESS_XA_RESUME = 72;
+   public static final byte SESS_XA_RESUME = 70;
 
-   public static final byte SESS_XA_FORGET = 73;
+   public static final byte SESS_XA_FORGET = 71;
 
-   public static final byte SESS_XA_INDOUBT_XIDS = 74;
+   public static final byte SESS_XA_INDOUBT_XIDS = 72;
 
-   public static final byte SESS_XA_INDOUBT_XIDS_RESP = 75;
+   public static final byte SESS_XA_INDOUBT_XIDS_RESP = 73;
 
-   public static final byte SESS_XA_SET_TIMEOUT = 76;
+   public static final byte SESS_XA_SET_TIMEOUT = 74;
 
-   public static final byte SESS_XA_SET_TIMEOUT_RESP = 77;
+   public static final byte SESS_XA_SET_TIMEOUT_RESP = 75;
 
-   public static final byte SESS_XA_GET_TIMEOUT = 78;
+   public static final byte SESS_XA_GET_TIMEOUT = 76;
 
-   public static final byte SESS_XA_GET_TIMEOUT_RESP = 79;
+   public static final byte SESS_XA_GET_TIMEOUT_RESP = 77;
 
-   public static final byte SESS_START = 80;
+   public static final byte SESS_START = 78; 
 
-   public static final byte SESS_STOP = 81;
+   public static final byte SESS_STOP = 79;
+   
+   public static final byte SESS_CLOSE = 80;
 
-   public static final byte SESS_FLOWTOKEN = 82;
+   public static final byte SESS_FLOWTOKEN = 81;
 
-   public static final byte SESS_SEND = 83;
+   public static final byte SESS_SEND = 82;
 
-   public static final byte SESS_RECEIVETOKENS = 84;
+   public static final byte SESS_RECEIVETOKENS = 83;
 
-   public static final byte SESS_CONSUMER_CLOSE = 85;
+   public static final byte SESS_CONSUMER_CLOSE = 84;
 
-   public static final byte SESS_PRODUCER_CLOSE = 86;
+   public static final byte SESS_PRODUCER_CLOSE = 85;
 
-   public static final byte SESS_BROWSER_CLOSE = 87;
+   public static final byte SESS_BROWSER_CLOSE = 86;
 
-   public static final byte SESS_RECEIVE_MSG = 88;
+   public static final byte SESS_RECEIVE_MSG = 87;
 
-   public static final byte SESS_PACKETS_CONFIRMED = 89;
-
-   public static final byte SESS_REPLICATE_DELIVERY = 90;
-
-   public static final byte SESS_MANAGEMENT_SEND = 94;
+   public static final byte SESS_MANAGEMENT_SEND = 88;
    
    // Static --------------------------------------------------------
 
@@ -220,6 +217,21 @@
    public void decodeBody(final MessagingBuffer buffer)
    {
    }
+   
+   public boolean isRequiresConfirmations()
+   {
+      return true;
+   }
+   
+   public boolean isReplicateBlocking()
+   {
+      return false;
+   }
+   
+   public boolean isWriteAlways()
+   {
+      return false;
+   }
 
    @Override
    public String toString()
@@ -239,7 +251,7 @@
 
       return r.type == type && r.channelID == channelID;
    }
-
+      
    // Package protected ---------------------------------------------
 
    protected String getParentString()

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -45,14 +45,14 @@
 
    public PacketsConfirmedMessage(final int commandID)
    {
-      super(SESS_PACKETS_CONFIRMED);
+      super(PACKETS_CONFIRMED);
 
       this.commandID = commandID;
    }
    
    public PacketsConfirmedMessage()
    {
-      super(SESS_PACKETS_CONFIRMED);
+      super(PACKETS_CONFIRMED);
    }
 
    // Public --------------------------------------------------------
@@ -89,6 +89,17 @@
       
       return super.equals(other) && this.commandID == r.commandID;
    }
+   
+   public final boolean isRequiresConfirmations()
+   {
+      return false;
+   }
+   
+   public boolean isWriteAlways()
+   {
+      return true;
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Ping.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Ping.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Ping.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -92,6 +92,11 @@
       
       return super.equals(other) && this.expirePeriod == r.expirePeriod;
    }
+   
+   public final boolean isRequiresConfirmations()
+   {
+      return false;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Pong.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Pong.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Pong.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -77,6 +77,11 @@
    {
       newPeriod = buffer.getLong();
    }
+   
+   public boolean isWriteAlways()
+   {
+      return true;
+   }
 
    @Override
    public String toString()
@@ -98,6 +103,11 @@
       
       return super.equals(other) && this.newPeriod == r.newPeriod;
    }
+   
+   public final boolean isRequiresConfirmations()
+   {
+      return false;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -95,6 +95,11 @@
       return super.equals(other) && this.name.equals(r.name) &&
              this.lastReceivedCommandID == r.lastReceivedCommandID;
    }
+   
+   public final boolean isRequiresConfirmations()
+   {
+      return false;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -88,6 +88,11 @@
       
       return super.equals(other) && this.lastReceivedCommandID == r.lastReceivedCommandID;
    }
+   
+   public final boolean isRequiresConfirmations()
+   {
+      return false;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAddDestinationMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAddDestinationMessage.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAddDestinationMessage.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -96,6 +96,13 @@
       temporary = buffer.getBoolean();
    }
    
+   //Needs to be true so we can ensure packet has reached backup before we start sending messages to it from another
+   //session
+   public boolean isReplicateBlocking()
+   {      
+      return true;
+   }
+   
    @Override
    public String toString()
    {

Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCloseMessage.java (from rev 5046, trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CloseSessionMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCloseMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCloseMessage.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionCloseMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionCloseMessage()
+   {
+      super(SESS_CLOSE);
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public boolean equals(final Object other)
+   {
+      if (other instanceof SessionCloseMessage == false)
+      {
+         return false;
+      }
+
+      SessionCloseMessage r = (SessionCloseMessage)other;
+
+      return super.equals(other);
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -121,6 +121,13 @@
       temporary = buffer.getBoolean();
    }
    
+   //Needs to be true so we can ensure packet has reached backup before we start sending messages to it from another
+   //session
+   public boolean isReplicateBlocking()
+   {      
+      return true;
+   }
+   
    public boolean equals(Object other)
    {
       if (other instanceof SessionCreateQueueMessage == false)

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -116,6 +116,11 @@
       
       clientMessage.getBody().flip();
    }
+   
+   public final boolean isRequiresConfirmations()
+   {
+      return false;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -73,7 +73,7 @@
                                               boolean autoCommitAcks,
                                               boolean xa) throws Exception;
 
-   void closeSession(String name) throws Exception;
+   void removeSession(String name) throws Exception;
 
    boolean isStarted();
 

Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -24,6 +24,7 @@
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 
 import org.jboss.messaging.core.filter.Filter;
@@ -110,6 +111,8 @@
   
    MessageReference removeReferenceWithID(long id);
    
+   MessageReference waitForReferenceWithID(long id, CountDownLatch latch);
+   
    MessageReference getReference(long id);
    
    void deleteAllReferences(StorageManager storageManager) throws Exception;
@@ -135,10 +138,13 @@
    boolean moveMessage(long messageID, Binding toBinding,
          StorageManager storageManager, PostOffice postOffice) throws Exception;
 
-   void setBackup(boolean backup);
+   void setBackup();
    
+   void activate();
+   
    boolean isBackup();
    
    MessageReference removeFirst();
-
+   
+   boolean consumerFailedOver();   
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -44,5 +44,7 @@
 	
 	Queue getQueue();
 
-	MessageReference getReference(long messageID) throws Exception;
+	MessageReference waitForReference(long messageID) throws Exception;
+	
+	void failedOver();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -130,4 +130,6 @@
    int replayCommands(int lastReceivedCommandID);
 
    void handleManagementMessage(SessionSendManagementMessage message) throws Exception;
+   
+   void failedOver() throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -210,7 +210,8 @@
                                       managementService,
                                       configuration.isRequireDestinations(),
                                       resourceManager,
-                                      configuration.isWildcardRoutingEnabled());
+                                      configuration.isWildcardRoutingEnabled(),
+                                      configuration.isBackup());
 
       securityRepository = new HierarchicalObjectRepository<Set<Role>>();
       securityRepository.setDefault(new HashSet<Role>());
@@ -224,7 +225,6 @@
                                                           this);
 
       postOffice.start();
-      postOffice.setBackup(configuration.isBackup());
 
       TransportConfiguration backupConnector = configuration.getBackupConnectorConfiguration();
 
@@ -396,7 +396,7 @@
 
       // Reconnect the channel to the new connection
       session.transferConnection(connection);
-
+      
       // This is necessary for invm since the replicating connection will be the
       // same connection
       // as the original replicating connection since the key is the same in the
@@ -406,7 +406,7 @@
 
       int serverLastReceivedCommandID = session.replayCommands(lastReceivedCommandID);
 
-      postOffice.setBackup(false);
+      postOffice.activate();
 
       configuration.setBackup(false);
 
@@ -414,8 +414,7 @@
 
       connection.setReplicating(false);
 
-      // Re-prompt delivery
-      session.setStarted(true);
+      session.failedOver();
 
       return new ReattachSessionResponseMessage(serverLastReceivedCommandID);
    }
@@ -467,6 +466,7 @@
                                                               executorFactory.getExecutor(),
                                                               channel,
                                                               managementService,
+                                                              this,
                                                               simpleStringIdGenerator);
 
       // If the session already exists that's fine - create session must be idempotent
@@ -486,15 +486,9 @@
                                               configuration.getPacketConfirmationBatchSize());
    }
 
-   // Must also be idempotent
-   public void closeSession(final String name) throws Exception
-   {
-      ServerSession session = sessions.remove(name);
-
-      if (session != null)
-      {
-         session.close();
-      }
+   public void removeSession(final String name) throws Exception
+   {      
+      sessions.remove(name);
    }
 
    public RemotingConnection getReplicatingConnection()

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -12,7 +12,6 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CLOSE_SESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
 
@@ -22,10 +21,8 @@
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.CloseSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
 import org.jboss.messaging.core.server.MessagingServer;
 
@@ -58,12 +55,7 @@
    }
 
    public void handlePacket(final Packet packet)
-   {
-      if (channel1.getReplicatingChannel() != null)
-      {
-         channel1.replicatePacket(packet);
-      }
-      
+   {            
       Packet response = null;
 
       byte type = packet.getType();
@@ -72,6 +64,8 @@
       // reliability replay functionality
       try
       {
+         channel1.replicatePacket(packet);
+                  
          switch (type)
          {
             case CREATESESSION:
@@ -97,16 +91,6 @@
                
                break;
             }
-            case CLOSE_SESSION:
-            {
-               CloseSessionMessage request = (CloseSessionMessage)packet;
-   
-               server.closeSession(request.getName());
-   
-               response = new NullResponseMessage();
-               
-               break;
-            }
             default:
             {
                response = new MessagingExceptionMessage(new MessagingException(MessagingException.UNSUPPORTED_PACKET,

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -12,6 +12,23 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.list.PriorityLinkedList;
 import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
@@ -20,22 +37,18 @@
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.*;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.DistributionPolicy;
+import org.jboss.messaging.core.server.HandleStatus;
+import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.util.SimpleString;
 
-import java.util.*;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * Implementation of a Queue TODO use Java 5 concurrent queue
  * 
@@ -78,6 +91,8 @@
 
    private boolean promptDelivery;
 
+   private int pos;
+
    private AtomicInteger sizeBytes = new AtomicInteger(0);
 
    private AtomicInteger messagesAdded = new AtomicInteger(0);
@@ -92,6 +107,10 @@
 
    private volatile boolean backup;
 
+   private int consumersToFailover = -1;
+
+   private Map<Long, CountDownLatch> waitingIDMap = new ConcurrentHashMap<Long, CountDownLatch>();
+
    public QueueImpl(final long persistenceID,
                     final SimpleString name,
                     final Filter filter,
@@ -145,25 +164,33 @@
 
    public HandleStatus addLast(final MessageReference ref)
    {
-      return add(ref, false);
+      HandleStatus status = add(ref, false);
+
+      checkWaiting(ref.getMessage().getMessageID());
+
+      return status;
    }
 
    public HandleStatus addFirst(final MessageReference ref)
    {
       return add(ref, true);
    }
-
-   public void addListFirst(final LinkedList<MessageReference> list)
+   
+   public synchronized void addListFirst(final LinkedList<MessageReference> list)
    {
       ListIterator<MessageReference> iter = list.listIterator(list.size());
 
       while (iter.hasPrevious())
       {
          MessageReference ref = iter.previous();
+         
+         ServerMessage msg = ref.getMessage();
 
-         messageReferences.addFirst(ref, ref.getMessage().getPriority());
+         messageReferences.addFirst(ref, msg.getPriority());
+         
+         checkWaiting(msg.getMessageID());
       }
-      
+
       deliver();
    }
 
@@ -262,7 +289,8 @@
    public synchronized boolean removeConsumer(final Consumer consumer) throws Exception
    {
       boolean removed = distributionPolicy.removeConsumer(consumer);
-      if(removed)
+      
+      if (removed)
       {
          distributionPolicy.removeConsumer(consumer);
       }
@@ -580,24 +608,86 @@
       return backup;
    }
 
-   public void setBackup(final boolean backup)
+   public synchronized void setBackup()
    {
-      this.backup = backup;
+      this.backup = true;
 
       this.direct = false;
+   }
 
-      if (!backup)
+   public MessageReference removeFirst()
+   {
+      return messageReferences.removeFirst();
+   }
+
+   public synchronized void activate()
+   {
+      consumersToFailover = distributionPolicy.getConsumerCount();
+
+      if (consumersToFailover == 0)
       {
+         backup = false;
+      }
+   }
+
+   public synchronized boolean consumerFailedOver()
+   {
+      consumersToFailover--;
+
+      if (consumersToFailover == 0)
+      {
+         // All consumers for the queue have failed over, can re-activate it now
+
+         backup = false;
+
          for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
          {
             scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
          }
+
+         return true;
       }
+      else
+      {
+         return false;
+      }
    }
 
-   public MessageReference removeFirst()
+   public MessageReference waitForReferenceWithID(final long id, final CountDownLatch latch)
    {
-      return messageReferences.removeFirst();
+      MessageReference ref;
+
+      synchronized (this)
+      {
+         ref = removeReferenceWithID(id);
+
+         if (ref == null)
+         {
+            waitingIDMap.put(id, latch);
+         }
+      }
+
+      if (ref == null)
+      {
+         boolean ok = false;
+
+         try
+         {
+            ok = latch.await(10000, TimeUnit.MILLISECONDS);
+         }
+         catch (InterruptedException e)
+         {
+         }
+
+         if (!ok)
+         {
+            throw new IllegalStateException("Timed out or interrupted waiting for ref to arrive on queue " + id);
+         }
+
+         ref = this.removeReferenceWithID(id);
+      }
+
+      return ref;
    }
 
    // Public
@@ -732,7 +822,7 @@
 
    private HandleStatus deliver(final MessageReference reference)
    {
-      if (!distributionPolicy.hasConsumers())
+      if (distributionPolicy.getConsumerCount() == 0)
       {
          return HandleStatus.BUSY;
       }
@@ -747,7 +837,7 @@
       {
          Consumer consumer = distributionPolicy.select(reference.getMessage(), status != null);
          pos = distributionPolicy.getCurrentPosition();
-         if(consumer == null)
+         if (consumer == null)
          {
             if (filterRejected)
             {
@@ -800,11 +890,11 @@
 
             filterRejected = true;
          }
-         if(startPos > distributionPolicy.getConsumerCount() - 1)
+         if (startPos > distributionPolicy.getConsumerCount() - 1)
          {
             startPos = distributionPolicy.getConsumerCount() - 1;
          }
-         if(startPos == pos)
+         if (startPos == pos)
          {
             // Tried all of them
             if (filterRejected)
@@ -815,10 +905,19 @@
             {
                // Give up - all consumers busy
                return HandleStatus.BUSY;
-            }   
+            }
          }
       }
+   }
+   
+   private void checkWaiting(final long messageID)
+   {
+      CountDownLatch latch = waitingIDMap.remove(messageID);
 
+      if (latch != null)
+      {
+         latch.countDown();
+      }
    }
 
    // Inner classes

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.core.server.impl;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.filter.Filter;
@@ -84,11 +85,13 @@
    private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
 
    private final PostOffice postOffice;
-   
+
    private final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue<MessageReference>();
-   
-   private final Channel channel;
 
+   private final Channel channel;
+   
+   private volatile CountDownLatch waitingLatch;
+   
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -128,9 +131,9 @@
       this.queueSettingsRepository = queueSettingsRepository;
 
       this.postOffice = postOffice;
-      
+
       this.channel = channel;
-       
+
       messageQueue.addConsumer(this);
    }
 
@@ -142,10 +145,10 @@
       return id;
    }
 
-   public HandleStatus handle(MessageReference ref) throws Exception
+   public HandleStatus handle(final MessageReference ref) throws Exception
    {
       if (availableCredits != null && availableCredits.get() <= 0)
-      {
+      {       
          return HandleStatus.BUSY;
       }
 
@@ -177,35 +180,39 @@
          {
             availableCredits.addAndGet(-message.getEncodeSize());
          }
+
+         deliveringRefs.add(ref);
          
-         deliveringRefs.add(ref);         
-         
-         SessionReceiveMessage packet =
-            new SessionReceiveMessage(id, ref.getMessage(), ref.getDeliveryCount() + 1);
-         
+         SessionReceiveMessage packet = new SessionReceiveMessage(id, ref.getMessage(), ref.getDeliveryCount() + 1);
+
          channel.send(packet);
-             
+
          return HandleStatus.HANDLED;
       }
    }
-
+   
    public void close() throws Exception
-   {
+   {     
       setStarted(false);
+      
+      if (waitingLatch != null)
+      {                
+         waitingLatch.countDown();
+      }
 
       messageQueue.removeConsumer(this);
 
       session.removeConsumer(this);
-      
+
       cancelRefs();
    }
-   
+
    public void cancelRefs() throws Exception
    {
       if (!deliveringRefs.isEmpty())
       {
          Transaction tx = new TransactionImpl(storageManager, postOffice);
-         
+
          for (MessageReference ref : deliveringRefs)
          {
             tx.addAcknowledgement(ref);
@@ -214,22 +221,18 @@
          deliveringRefs.clear();
 
          tx.rollback(queueSettingsRepository);
-      }      
+      }
    }
 
    public void setStarted(final boolean started)
    {
-      boolean useStarted;
-
       synchronized (startStopLock)
       {
          this.started = started;
-
-         useStarted = started;
       }
 
       // Outside the lock
-      if (useStarted)
+      if (started)
       {
          promptDelivery();
       }
@@ -240,11 +243,11 @@
       if (availableCredits != null)
       {
          int previous = availableCredits.getAndAdd(credits);
-
-         if (previous <= 0 && (previous + credits) > 0)
+         
+         if (previous <= 0 && previous + credits > 0)
          {
             promptDelivery();
-         }
+         }                 
       }
    }
 
@@ -252,62 +255,45 @@
    {
       return messageQueue;
    }
-
-   private MessageReference deliverMessage(final long messageID) throws Exception
+ 
+   public MessageReference waitForReference(final long messageID) throws Exception
    {
-      // Deliver a specific message from the queue - this is used when
-      // replicating delivery state
-      // We can't just deliver the next message since there may be multiple
-      // sessions on the same queue
-      // delivering concurrently
-      // and we could end up with different delivery state on backup compare to
-      // live
-      // So we need the message id so we can be sure the backup session has the
-      // same delivery state
-      MessageReference ref = messageQueue.removeReferenceWithID(messageID);
-
-      if (ref == null)
-      {
-         throw new IllegalStateException("Cannot find reference " + messageID);
-      }
-
-      HandleStatus handled = handle(ref);
-
-      if (handled != HandleStatus.HANDLED)
-      {
-         throw new IllegalStateException("Failed to handle replicated reference " + messageID);
-      }
-      
-      return ref;
-   }
-   
-   public MessageReference getReference(final long messageID) throws Exception
-   {
-//      MessageReference ref;
-//      do
-//      {
-//         ref = deliveringRefs.poll();
-//      }
-//      while (ref.getMessage().getMessageID() != messageID);
-      
       if (messageQueue.isBackup())
       {
-         return deliverMessage(messageID);
+         waitingLatch = new CountDownLatch(1);
+                                 
+         MessageReference ref = messageQueue.waitForReferenceWithID(messageID, waitingLatch);
+         
+         waitingLatch = null;
+         
+         return ref;
       }
       else
       {
-         
          MessageReference ref = deliveringRefs.poll();
-         
+
          if (ref.getMessage().getMessageID() != messageID)
          {
             throw new IllegalStateException("Invalid order");
          }
-   
+
          return ref;
       }
    }
 
+   public void failedOver()
+   {
+      synchronized (startStopLock)
+      {
+         started = true;
+      }
+
+      if (messageQueue.consumerFailedOver())
+      {
+         promptDelivery();
+      }
+   }
+
    // Public
    // -----------------------------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -116,6 +116,12 @@
 
    private final IDGenerator idGenerator = new SimpleIDGenerator(0);
 
+   private volatile boolean closed;
+
+   private final String name;
+
+   private final MessagingServer server;
+
    private final SimpleStringIdGenerator simpleStringIdGenerator;
 
    // Constructors ---------------------------------------------------------------------------------
@@ -136,6 +142,7 @@
                             final Executor executor,
                             final Channel channel,
                             final ManagementService managementService,
+                            final MessagingServer server,
                             final SimpleStringIdGenerator simpleStringIdGenerator) throws Exception
    {
       this.id = id;
@@ -173,6 +180,10 @@
 
       this.managementService = managementService;
 
+      this.name = name;
+
+      this.server = server;
+
       this.simpleStringIdGenerator = simpleStringIdGenerator;
    }
 
@@ -229,10 +240,24 @@
       started = s;
    }
 
+   public void failedOver() throws Exception
+   {
+      Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
+
+      for (ServerConsumer consumer : consumersClone)
+      {
+         consumer.failedOver();
+      }
+
+      started = true;
+   }
+
    public void close() throws Exception
    {
-      channel.close();
+      closed = true;
 
+      channel.close(true);
+
       Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
 
       for (ServerConsumer consumer : consumersClone)
@@ -260,7 +285,9 @@
 
       producers.clear();
 
-      rollback();
+      rollback(false);
+
+      server.removeSession(name);
    }
 
    public void promptDelivery(final Queue queue)
@@ -305,13 +332,12 @@
       {
          tx.addMessage(msg);
       }
-
    }
 
    public void processed(final long consumerID, final long messageID) throws Exception
    {
-      MessageReference ref = consumers.get(consumerID).getReference(messageID);
-      
+      MessageReference ref = consumers.get(consumerID).waitForReference(messageID);
+
       // Ref = null would imply consumer is already closed so we could ignore it
       if (ref != null)
       {
@@ -328,10 +354,27 @@
             ref.incrementDeliveryCount();
          }
       }
+      else
+      {
+         if (!closed)
+         {
+            throw new IllegalStateException(System.identityHashCode(this) + " Could not find ref with id " + messageID);
+         }
+         else
+         {
+            // If closed then might not find ref since processed might come in before send and send
+            // didn't come in since closed
+         }
+      }
    }
-
+   
    public void rollback() throws Exception
    {
+      rollback(true);
+   }
+
+   private void rollback(final boolean sendResponse) throws Exception
+   {
       if (tx == null)
       {
          // Might be null if XA
@@ -339,31 +382,34 @@
          tx = new TransactionImpl(storageManager, postOffice);
       }
 
-      //Need to write the response now - before redeliveries occur
-      channel.send(new NullResponseMessage());
-      
+      if (sendResponse)
+      {
+         // Need to write the response now - before redeliveries occur
+         channel.send(new NullResponseMessage(false));
+      }
+
       boolean wasStarted = started;
-      
+
       for (ServerConsumer consumer : consumers.values())
-      {            
+      {
          if (wasStarted)
          {
             consumer.setStarted(false);
          }
-         
+
          consumer.cancelRefs();
       }
-      
+
       tx.rollback(queueSettingsRepository);
-      
+
       if (wasStarted)
-      {         
+      {
          for (ServerConsumer consumer : consumers.values())
-         {            
+         {
             consumer.setStarted(true);
          }
       }
-      
+
       tx = new TransactionImpl(storageManager, postOffice);
    }
 
@@ -594,23 +640,23 @@
       }
 
       boolean wasStarted = started;
-      
+
       for (ServerConsumer consumer : consumers.values())
-      {            
+      {
          if (wasStarted)
          {
             consumer.setStarted(false);
          }
-         
+
          consumer.cancelRefs();
       }
-      
+
       theTx.rollback(queueSettingsRepository);
-      
+
       if (wasStarted)
       {
          for (ServerConsumer consumer : consumers.values())
-         {            
+         {
             consumer.setStarted(true);
          }
       }
@@ -1053,20 +1099,20 @@
    public void handleManagementMessage(final SessionSendManagementMessage message) throws Exception
    {
       ServerMessage serverMessage = message.getServerMessage();
-      
+
       if (serverMessage.containsProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS))
       {
          boolean subscribe = (Boolean)serverMessage.getProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS);
-         
+
          final SimpleString replyTo = (SimpleString)serverMessage.getProperty(ManagementHelper.HDR_JMX_REPLYTO);
-         
+
          if (subscribe)
          {
             if (log.isDebugEnabled())
             {
                log.debug("added notification listener " + this);
             }
-            
+
             managementService.addNotificationListener(this, null, replyTo);
          }
          else
@@ -1075,15 +1121,15 @@
             {
                log.debug("removed notification listener " + this);
             }
-            
+
             managementService.removeNotificationListener(this);
          }
          return;
       }
       managementService.handleMessage(message.getServerMessage());
-      
+
       serverMessage.setDestination((SimpleString)serverMessage.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
-      
+
       send(serverMessage);
    }
 
@@ -1106,7 +1152,22 @@
             }
          }
 
-         close();
+         // We execute this on the session's serial executor, then we can avoid complex synchronization
+         // and ensure no operations are fielded on the session after it is closed
+         channel.getExecutor().execute(new Runnable()
+         {
+            public void run()
+            {
+               try
+               {
+                  close();
+               }
+               catch (Exception e)
+               {
+                  log.error("Failed to close session", e);
+               }
+            }
+         });
       }
       catch (Throwable t)
       {
@@ -1141,7 +1202,7 @@
    {
       return tx;
    }
-   
+
    // Private
    // ----------------------------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -12,20 +12,91 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_CLOSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_NEXTMESSAGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_RESET;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEBROWSER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_MANAGEMENT_SEND;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PROCESSED;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CLOSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_END;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_FORGET;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_JOIN;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_PREPARE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESUME;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_ROLLBACK;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
+
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.wireformat.*;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.*;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserNextMessageMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserResetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionProcessedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerSession;
 
-import javax.transaction.xa.Xid;
-import java.util.List;
-
 /**
  * A ServerSessionPacketHandler
  * 
@@ -73,24 +144,17 @@
          {
             // must generate message id here, so we know they are in sync
             long id = storageManager.generateUniqueID();
-               
+
             send.getServerMessage().setMessageID(id);
          }
-//         else
-//         {
-//            log.info("Got replicated send");
-//         }
       }
-
-      if (channel.getReplicatingChannel() != null)
-      {
-         channel.replicatePacket(packet);
-      }
-
+      
       Packet response = null;
 
       try
       {
+         channel.replicatePacket(packet);         
+         
          switch (type)
          {
             case SESS_CREATECONSUMER:
@@ -110,14 +174,14 @@
                                    request.getFilterString(),
                                    request.isDurable(),
                                    request.isTemporary());
-               response = new NullResponseMessage();
+               response = new NullResponseMessage(true);
                break;
             }
             case SESS_DELETE_QUEUE:
             {
                SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
                session.deleteQueue(request.getQueueName());
-               response = new NullResponseMessage();
+               response = new NullResponseMessage(false);
                break;
             }
             case SESS_QUEUEQUERY:
@@ -136,7 +200,7 @@
             {
                SessionCreateBrowserMessage request = (SessionCreateBrowserMessage)packet;
                session.createBrowser(request.getQueueName(), request.getFilterString());
-               response = new NullResponseMessage();
+               response = new NullResponseMessage(false);
                break;
             }
             case SESS_CREATEPRODUCER:
@@ -151,14 +215,14 @@
                session.processed(message.getConsumerID(), message.getMessageID());
                if (message.isRequiresResponse())
                {
-                  response = new NullResponseMessage();
+                  response = new NullResponseMessage(false);
                }
                break;
             }
             case SESS_COMMIT:
             {
                session.commit();
-               response = new NullResponseMessage();
+               response = new NullResponseMessage(false);
                break;
             }
             case SESS_ROLLBACK:
@@ -241,14 +305,14 @@
             {
                SessionAddDestinationMessage message = (SessionAddDestinationMessage)packet;
                session.addDestination(message.getAddress(), message.isDurable(), message.isTemporary());
-               response = new NullResponseMessage();
+               response = new NullResponseMessage(true);
                break;
             }
             case SESS_REMOVE_DESTINATION:
             {
                SessionRemoveDestinationMessage message = (SessionRemoveDestinationMessage)packet;
                session.removeDestination(message.getAddress(), message.isDurable());
-               response = new NullResponseMessage();
+               response = new NullResponseMessage(false);
                break;
             }
             case SESS_START:
@@ -259,28 +323,34 @@
             case SESS_STOP:
             {
                session.setStarted(false);
-               response = new NullResponseMessage();
+               response = new NullResponseMessage(false);
                break;
             }
+            case SESS_CLOSE:
+            {
+               session.close();
+               response = new NullResponseMessage(false);
+               break;
+            }
             case SESS_CONSUMER_CLOSE:
             {
                SessionConsumerCloseMessage message = (SessionConsumerCloseMessage)packet;
                session.closeConsumer(message.getConsumerID());
-               response = new NullResponseMessage();
+               response = new NullResponseMessage(false);
                break;
             }
             case SESS_PRODUCER_CLOSE:
             {
                SessionProducerCloseMessage message = (SessionProducerCloseMessage)packet;
                session.closeProducer(message.getProducerID());
-               response = new NullResponseMessage();
+               response = new NullResponseMessage(false);
                break;
             }
             case SESS_BROWSER_CLOSE:
             {
                SessionBrowserCloseMessage message = (SessionBrowserCloseMessage)packet;
                session.closeBrowser(message.getBrowserID());
-               response = new NullResponseMessage();
+               response = new NullResponseMessage(false);
                break;
             }
             case SESS_FLOWTOKEN:
@@ -295,7 +365,7 @@
                session.sendProducerMessage(message.getProducerID(), message.getServerMessage());
                if (message.isRequiresResponse())
                {
-                  response = new NullResponseMessage();
+                  response = new NullResponseMessage(false);
                }
                break;
             }
@@ -316,7 +386,7 @@
             {
                SessionBrowserResetMessage message = (SessionBrowserResetMessage)packet;
                session.browserReset(message.getBrowserID());
-               response = new NullResponseMessage();
+               response = new NullResponseMessage(false);
                break;
             }
             case SESS_MANAGEMENT_SEND:

Modified: trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -75,7 +75,7 @@
          return;
       }
       
-      if (this.transactedOrClientAck)
+      if (transactedOrClientAck)
       {
          try
          {

Modified: trunk/src/main/org/jboss/messaging/util/JBMThreadFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/JBMThreadFactory.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/util/JBMThreadFactory.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -39,7 +39,7 @@
 
    public JBMThreadFactory(final String groupName)
    {
-      group = new ThreadGroup(groupName);
+      group = new ThreadGroup(groupName + "-" + System.identityHashCode(this));
    }
 
    public Thread newThread(final Runnable command)

Modified: trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -18,109 +18,72 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */ 
 
 package org.jboss.messaging.util;
 
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.Set;
 import java.util.concurrent.Executor;
 
 /**
- * A OrderedExecutorFactory2
- *
- * @author <a href="mailto:david.lloyd at jboss.com">David LLoyd</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * This factory creates a hierarchy of Executor which shares the threads of the
+ * parent Executor (typically, the root parent is a Thread pool).
  * 
+ * @author <a href="david.lloyd at jboss.com">David Lloyd</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
  */
 public final class OrderedExecutorFactory implements ExecutorFactory
 {
    private final Executor parent;
+   private final Set<ChildExecutor> runningChildren = Collections.synchronizedSet(new HashSet<ChildExecutor>());
 
-   /**
-    * Construct a new instance delegating to the given parent executor.
-    *
-    * @param parent the parent executor
-    */
    public OrderedExecutorFactory(final Executor parent)
    {
       this.parent = parent;
    }
 
-   /**
-    * Get an executor that always executes tasks in order.
-    *
-    * @return an ordered executor
-    */
    public Executor getExecutor()
    {
-      return new OrderedExecutor(parent);
+      return new ChildExecutor();
    }
 
-   private static final class OrderedExecutor implements Executor
+   private final class ChildExecutor implements Executor, Runnable
    {
-      // @protectedby tasks
       private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
 
-      // @protectedby tasks
-      private boolean running;
-
-      private final Executor parent;
-
-      private final Runnable runner;
-
-      /**
-       * Construct a new instance.
-       *
-       * @param parent the parent executor
-       */
-      public OrderedExecutor(final Executor parent)
+      public void execute(Runnable command)
       {
-         this.parent = parent;
-         
-         runner = new Runnable()
+         synchronized (tasks)
          {
-            public void run()
+            tasks.add(command);
+            if (tasks.size() == 1 && runningChildren.add(this))
             {
-               for (;;)
-               {
-                  final Runnable task;
-                  synchronized (tasks)
-                  {
-                     task = tasks.poll();
-                     if (task == null)
-                     {
-                        running = false;
-                        return;
-                     }
-                  }
-                  try
-                  {
-                     task.run();
-                  }
-                  catch (Throwable t)
-                  {
-                     // eat it!
-                  }
-               }
+               parent.execute(this);
             }
-         };
+         }
       }
 
-      /**
-       * Run a task.
-       *
-       * @param command the task to run.
-       */
-      public void execute(Runnable command)
+      public void run()
       {
-         synchronized (tasks)
+         for (;;)
          {
-            tasks.add(command);
-            if (!running)
+            final Runnable task;
+            synchronized (tasks)
             {
-               running = true;
-               parent.execute(runner);
+               task = tasks.poll();
+               if (task == null)
+               {
+                  runningChildren.remove(this);
+                  return;
+               }
             }
+            task.run();
          }
       }
    }

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -3763,7 +3763,7 @@
       }
    }
 
-// http://jira.jboss.org/jira/browse/JBMESSAGING-1294 - commented out until 2.0 beta
+   // http://jira.jboss.org/jira/browse/JBMESSAGING-1294 - commented out until 2.0 beta
 //   public void testExceptionMessageListener1() throws Exception
 //   {
 //   	Connection conn = null;

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -128,9 +128,9 @@
          message2.processed();
       }
 
-      ClientMessage message3 = consumer.receive(250);
+      //ClientMessage message3 = consumer.receive(250);
 
-      assertNull(message3);
+      //assertNull(message3);
 
       session.close();
    }
@@ -548,15 +548,17 @@
       {
          ClientConsumer cons = consumers.get(i);
 
-         ClientSession sess = sessions.get(i);
-
          for (int j = 0; j < numMessages; j++)
          {
             ClientMessage message2 = cons.receive();
 
             assertEquals("aardvarks", message2.getBody().getString());
+            
+           // log.info("actually got message " + message2.getMessageID());
 
             assertEquals(j, message2.getProperty(new SimpleString("count")));
+            
+            
 
             message2.processed();
          }

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -133,7 +133,7 @@
       return null;
    }
    
-   public void setBackup(boolean backup)
+   public void activate()
    {
    }
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -60,7 +60,7 @@
 {
    private final QueueFactory queueFactory = new FakeQueueFactory();
 
-   protected boolean wildCardRoutingEnabled = false;
+   protected boolean wildCardRoutingEnabled;
 
    public void testPostOfficeStart() throws Exception
    {
@@ -69,7 +69,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -90,7 +90,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -125,7 +125,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -174,7 +174,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -222,7 +222,7 @@
 
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -273,7 +273,7 @@
 
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -321,7 +321,7 @@
       PagingStore store = EasyMock.createNiceMock(PagingStore.class);
       EasyMock.expect(pgm.getPageStore(address1)).andReturn(store);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -355,7 +355,7 @@
 
       PagingStore pgstore = EasyMock.createNiceMock(PagingStore.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -418,7 +418,7 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
       PagingStore pgstore = EasyMock.createNiceMock(PagingStore.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -479,7 +479,7 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
       PagingStore pgstore = EasyMock.createNiceMock(PagingStore.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -541,7 +541,7 @@
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
       PagingStore pgstore = EasyMock.createNiceMock(PagingStore.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -600,7 +600,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice po = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+      PostOffice po = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
 
       final long id = 324;
       final SimpleString name = new SimpleString("wibb22");
@@ -642,7 +642,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice po = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+      PostOffice po = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
 
       final long id = 324;
       final SimpleString name = new SimpleString("wibb22");
@@ -693,7 +693,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice po = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+      PostOffice po = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
 
       final SimpleString condition1 = new SimpleString("queue.wibble");
 
@@ -781,7 +781,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -809,7 +809,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -843,7 +843,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -874,7 +874,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -919,7 +919,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -929,7 +929,6 @@
                       (ResourceManager)EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, true, false)).andReturn(queue);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setBackup(false);
       queue.setFlowController((FlowController)EasyMock.anyObject());
       pm.addBinding((Binding)EasyMock.anyObject());
       EasyMock.replay(pm, qf, queue);
@@ -954,7 +953,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -966,13 +965,10 @@
       EasyMock.expect(qf.createQueue(-1, queueName2, filter, true, false)).andReturn(queue2);
       EasyMock.expect(qf.createQueue(-1, queueName3, filter, true, false)).andReturn(queue3);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setBackup(false);
       queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
-      queue2.setBackup(false);
       queue2.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
-      queue3.setBackup(false);
       queue3.setFlowController((FlowController)EasyMock.anyObject());
       pm.addBinding((Binding)EasyMock.anyObject());
       pm.addBinding((Binding)EasyMock.anyObject());
@@ -999,7 +995,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -1009,7 +1005,6 @@
                       (ResourceManager)EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, false, false)).andReturn(queue);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setBackup(false);
       queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.replay(pm, qf, queue);
       postOffice.start();
@@ -1033,7 +1028,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -1045,13 +1040,10 @@
       EasyMock.expect(qf.createQueue(-1, queueName2, filter, false, false)).andReturn(queue2);
       EasyMock.expect(qf.createQueue(-1, queueName3, filter, false, false)).andReturn(queue3);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setBackup(false);
       queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
-      queue2.setBackup(false);
       queue2.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
-      queue3.setBackup(false);
       queue3.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.replay(pm, qf, queue, queue2, queue3);
       postOffice.start();
@@ -1075,7 +1067,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -1086,10 +1078,8 @@
       EasyMock.expect(qf.createQueue(-1, queueName, filter, true, false)).andReturn(queue);
       EasyMock.expect(qf.createQueue(-1, queueName, filter, true, false)).andReturn(queue);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setBackup(false);
       queue.setFlowController((FlowController)EasyMock.anyObject());
       pm.addBinding((Binding)EasyMock.anyObject());
-      queue.setBackup(false);
       EasyMock.replay(pm, qf, queue);
       postOffice.start();
 
@@ -1117,7 +1107,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -1127,7 +1117,6 @@
                       (ResourceManager)EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, true, false)).andReturn(queue);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setBackup(false);
       queue.setFlowController((FlowController)EasyMock.anyObject());
       pm.addBinding((Binding)EasyMock.anyObject());
       EasyMock.expect(queue.isDurable()).andStubReturn(true);
@@ -1156,7 +1145,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -1168,13 +1157,10 @@
       EasyMock.expect(qf.createQueue(-1, queueName2, filter, true, false)).andReturn(queue2);
       EasyMock.expect(qf.createQueue(-1, queueName3, filter, true, false)).andReturn(queue3);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setBackup(false);
       queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
-      queue2.setBackup(false);
       queue2.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
-      queue3.setBackup(false);
       queue3.setFlowController((FlowController)EasyMock.anyObject());
       pm.addBinding((Binding)EasyMock.anyObject());
       pm.addBinding((Binding)EasyMock.anyObject());
@@ -1209,7 +1195,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -1219,7 +1205,6 @@
                       (ResourceManager)EasyMock.anyObject());
       EasyMock.expect(qf.createQueue(-1, queueName, filter, false, false)).andReturn(queue);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setBackup(false);
       queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue.isDurable()).andStubReturn(false);
       queue.setFlowController(null);
@@ -1246,7 +1231,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -1258,13 +1243,10 @@
       EasyMock.expect(qf.createQueue(-1, queueName2, filter, false, false)).andReturn(queue2);
       EasyMock.expect(qf.createQueue(-1, queueName3, filter, false, false)).andReturn(queue3);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setBackup(false);
       queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
-      queue2.setBackup(false);
       queue2.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
-      queue3.setBackup(false);
       queue3.setFlowController((FlowController)EasyMock.anyObject());
 
       EasyMock.expect(queue.isDurable()).andStubReturn(false);
@@ -1294,7 +1276,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -1328,7 +1310,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -1361,7 +1343,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -1390,7 +1372,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -1403,7 +1385,6 @@
       EasyMock.expect(qf.createQueue(-1, queueName, null, false, false)).andReturn(queue);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
       EasyMock.expect(queue.getFilter()).andStubReturn(null);
-      queue.setBackup(false);
       queue.setFlowController((FlowController)EasyMock.anyObject());
 
       EasyMock.expect(message.createReference(queue)).andReturn(messageReference);
@@ -1439,7 +1420,7 @@
 
       EasyMock.expect(pgm.addSize(EasyMock.isA(ServerMessage.class))).andReturn(-1l);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -1453,7 +1434,6 @@
       EasyMock.expect(qf.createQueue(-1, queueName, null, false, false)).andReturn(queue);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
       EasyMock.expect(queue.getFilter()).andStubReturn(null);
-      queue.setBackup(false);
       queue.setFlowController((FlowController)EasyMock.anyObject());
 
       EasyMock.replay(pm, pgm, qf, message, queue);
@@ -1480,7 +1460,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -1494,7 +1474,6 @@
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
       EasyMock.expect(queue.getFilter()).andStubReturn(filter);
       EasyMock.expect(filter.match(message)).andReturn(true);
-      queue.setBackup(false);
       queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.expect(message.createReference(queue)).andReturn(messageReference);
       EasyMock.replay(pm, qf, message, queue, messageReference, filter);
@@ -1519,7 +1498,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -1533,7 +1512,6 @@
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
       EasyMock.expect(queue.getFilter()).andStubReturn(filter);
       EasyMock.expect(filter.match(message)).andReturn(false);
-      queue.setBackup(false);
       queue.setFlowController((FlowController)EasyMock.anyObject());
       EasyMock.replay(pm, qf, message, queue, messageReference, filter);
       postOffice.start();
@@ -1561,7 +1539,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -1580,9 +1558,6 @@
       EasyMock.expect(queue2.getFilter()).andStubReturn(null);
       EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
       EasyMock.expect(queue3.getFilter()).andStubReturn(null);
-      queue.setBackup(false);
-      queue2.setBackup(false);
-      queue3.setBackup(false);
       queue.setFlowController((FlowController)EasyMock.anyObject());
       queue2.setFlowController((FlowController)EasyMock.anyObject());
       queue3.setFlowController((FlowController)EasyMock.anyObject());
@@ -1622,7 +1597,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -1643,9 +1618,6 @@
       EasyMock.expect(queue3.getFilter()).andStubReturn(filter2);
       EasyMock.expect(filter.match(message)).andReturn(false);
       EasyMock.expect(filter2.match(message)).andReturn(true);
-      queue.setBackup(false);
-      queue2.setBackup(false);
-      queue3.setBackup(false);
       queue.setFlowController((FlowController)EasyMock.anyObject());
       queue2.setFlowController((FlowController)EasyMock.anyObject());
       queue3.setFlowController((FlowController)EasyMock.anyObject());

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java	2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java	2008-10-02 13:43:48 UTC (rev 5060)
@@ -67,7 +67,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -81,7 +81,6 @@
       EasyMock.expect(message3.getDestination()).andStubReturn(address3);
       EasyMock.expect(qf.createQueue(-1, queueName, null, false, false)).andReturn(queue);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
-      queue.setBackup(false);
       queue.setFlowController(null);
       EasyMock.expect(queue.getFilter()).andStubReturn(null);
       EasyMock.expect(pgm.addSize(message)).andStubReturn(1000l);
@@ -130,7 +129,7 @@
       ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
       PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
 
-      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+      PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
 
       qf.setPostOffice(postOffice);
 
@@ -148,8 +147,6 @@
       EasyMock.expect(qf.createQueue(-1, queueName2, null, false, false)).andReturn(queue2);
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
       EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
-      queue.setBackup(false);
-      queue2.setBackup(false);
       queue.setFlowController(null);
       queue2.setFlowController(null);
       EasyMock.expect(queue.getFilter()).andStubReturn(null);




More information about the jboss-cvs-commits mailing list