[jboss-cvs] JBoss Messaging SVN: r7513 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/management/impl and 17 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jul 2 07:26:08 EDT 2009


Author: timfox
Date: 2009-07-02 07:26:07 -0400 (Thu, 02 Jul 2009)
New Revision: 7513

Added:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableCall.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java
Removed:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableAction.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/StatefulObjectReadWriteLock.java
Modified:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/AddressManager.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/Binding.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicationResponseMessage.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/Queue.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/QueueFactory.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/LastValueQueue.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/Replicator.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/PriorityLock.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java
Log:
MT replication continued

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -67,6 +67,7 @@
             case SESS_RECEIVE_CONTINUATION:
             {
                SessionReceiveContinuationMessage continuation = (SessionReceiveContinuationMessage)packet;
+               
                clientSession.handleReceiveContinuation(continuation.getConsumerID(), continuation);
 
                break;

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -613,8 +613,6 @@
 
    public void sendNotification(final Notification notification) throws Exception
    {
-      log.info("messagingservercontrol " + this.messagingServerControl + " ne " + this.notificationsEnabled);
-      
       if (messagingServerControl != null && notificationsEnabled)
       {
          // This needs to be synchronized since we need to ensure notifications are processed in strict sequence

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -48,7 +48,7 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.replication.impl.StatefulObjectReadWriteLock;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
@@ -117,7 +117,7 @@
     * */
    private final ReadWriteLock currentPageLock = new ReentrantReadWriteLock();
    
-   //private final Lock lock = StatefulObjectReadWriteLock.createLock().writeLock();
+   //private final Lock lock = ReplicationAwareReadWriteLock.createLock().writeLock();
 
    private volatile boolean running = false;
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -68,6 +68,7 @@
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareAtomicLong;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.TransactionOperation;
@@ -250,8 +251,6 @@
    {            
       long id = idGenerator.generateID();
       
-      log.info("Generating unique id on backup " + backup + " id " + id, new Exception());
-
       return id;
    }
 
@@ -1115,7 +1114,7 @@
 
    private class BatchingIDGenerator implements IDGenerator
    {
-      private final AtomicLong counter;
+      private final ReplicationAwareAtomicLong counter;
 
       private final long checkpointSize;
 
@@ -1123,7 +1122,7 @@
 
       public BatchingIDGenerator(final long start, final long checkpointSize)
       {
-         this.counter = new AtomicLong(start);
+         this.counter = new ReplicationAwareAtomicLong(start);
 
          this.checkpointSize = checkpointSize;
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -24,7 +24,6 @@
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.transaction.xa.Xid;
 
@@ -38,6 +37,7 @@
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareAtomicLong;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.utils.Pair;
 import org.jboss.messaging.utils.SimpleString;
@@ -55,7 +55,7 @@
 {
    private static final Logger log = Logger.getLogger(NullStorageManager.class);
    
-   private final AtomicLong idSequence = new AtomicLong(0);
+   private final ReplicationAwareAtomicLong idSequence = new ReplicationAwareAtomicLong(0);
    
    private UUID id;
 
@@ -193,8 +193,6 @@
    {
       long id = idSequence.getAndIncrement();
       
-    //  log.info("Generating unique id on backup " + backup + " id " + id, new Exception());
-      
       return id;
    }
    

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/AddressManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/AddressManager.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/AddressManager.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -45,6 +45,8 @@
    void clear();
 
    Binding getBinding(SimpleString queueName);
+   
+   Binding getBindingByID(long id);
 
    Map<SimpleString, Binding> getBindings();
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/Binding.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/Binding.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/Binding.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -57,9 +57,7 @@
 
    boolean isExclusive();
    
-   int getID();
+   long getID();
    
-   void setID(int id);
-
    int getDistance();
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -59,6 +59,8 @@
 
    Binding getBinding(SimpleString uniqueName);
    
+   Binding getBindingByID(long id);
+   
    Bindings getMatchingBindings(SimpleString address);
 
    void route(ServerMessage message) throws Exception;

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -49,7 +49,7 @@
    
    private final SimpleString filterString;
    
-   private final int id;
+   private final long id;
    
    private List<SimpleString> filterStrings;
    
@@ -57,7 +57,7 @@
    
    private final int distance;
    
-   public QueueInfo(final SimpleString routingName, final SimpleString clusterName, final SimpleString address, final SimpleString filterString, final int id,
+   public QueueInfo(final SimpleString routingName, final SimpleString clusterName, final SimpleString address, final SimpleString filterString, final long id,
                     final Integer distance)
    {
       if (routingName == null)
@@ -109,7 +109,7 @@
       return distance;
    }
    
-   public int getID()
+   public long getID()
    {
       return id;
    }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -60,7 +60,7 @@
 
    private final Map<SimpleString, Integer> routingNamePositions = new ConcurrentHashMap<SimpleString, Integer>();
 
-   private final Map<Integer, Binding> bindingsMap = new ConcurrentHashMap<Integer, Binding>();
+   private final Map<Long, Binding> bindingsMap = new ConcurrentHashMap<Long, Binding>();
 
    private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -154,13 +154,13 @@
 
             SimpleString address = (SimpleString)props.getProperty(ManagementHelper.HDR_ADDRESS);
 
-            Integer transientID = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_ID);
+            Long bindingID = (Long)props.getProperty(ManagementHelper.HDR_BINDING_ID);
 
             SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
 
             Integer distance = (Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
 
-            QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, transientID, distance);
+            QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, bindingID, distance);
 
             queueInfos.put(clusterName, info);
 
@@ -373,7 +373,7 @@
                message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
                message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
                message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
-               message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
+               message.putLongProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
                message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
                message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -54,10 +54,12 @@
    
    private final boolean exclusive;
    
-   private int id;
+   private final long id;
    
-   public DivertBinding(final SimpleString address, final Divert divert)
+   public DivertBinding(final long id, final SimpleString address, final Divert divert)
    {
+      this.id = id;
+      
       this.address = address;
       
       this.divert = divert;
@@ -71,16 +73,11 @@
       this.exclusive = divert.isExclusive();
    }
    
-   public int getID()
+   public long getID()
    {
       return id;
    }
    
-   public void setID(final int id)
-   {
-      this.id = id;
-   }
-   
    public Filter getFilter()
    {
       return filter;

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -57,7 +57,7 @@
    
    private final SimpleString name;
    
-   private int id;
+   private final long id;
    
    private SimpleString clusterName;
    
@@ -72,18 +72,15 @@
       this.name = queue.getName();
       
       this.clusterName = name.concat(nodeID);
+      
+      this.id = queue.getID();
    }
    
-   public int getID()
+   public long getID()
    {
       return id;
    }
    
-   public void setID(final int id)
-   {
-      this.id = id;
-   }
-   
    public Filter getFilter()
    {
       return filter;

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -59,7 +59,7 @@
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-import org.jboss.messaging.core.server.replication.impl.StatefulObjectReadWriteLock;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.transaction.Transaction;
@@ -119,9 +119,9 @@
    // on a particular node, and all would
    // have to be specified in the message. Specify 10000 ints takes up a lot less space than 10000 arbitrary queue names
    // The drawback of this approach is we only allow up to 2^32 queues in memory at any one time
-   private int transientIDSequence;
+  // private int transientIDSequence;
 
-   private Set<Integer> transientIDs = new HashSet<Integer>();
+  // private Set<Integer> transientIDs = new HashSet<Integer>();
 
    // private Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString, QueueInfo>();
 
@@ -178,7 +178,7 @@
 
       // this.addressSettingsRepository = addressSettingsRepository;
       
-      lock = new StatefulObjectReadWriteLock("postoffice", storageManager.generateUniqueID(), 0);
+      lock = new ReplicationAwareReadWriteLock("postoffice", 0);
    }
 
    // MessagingComponent implementation ---------------------------------------
@@ -218,7 +218,7 @@
 
       // queueInfos.clear();
 
-      transientIDs.clear();
+     // transientIDs.clear();
 
       started = false;
    }
@@ -460,7 +460,7 @@
       lock.writeLock().lock();
       try
       {
-         binding.setID(generateTransientID());
+         //binding.setID(generateTransientID());
 
          boolean addressExists = addressManager.getBindingsForRoutingAddress(binding.getAddress()) != null;
 
@@ -498,7 +498,7 @@
 
       props.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
 
-      props.putIntProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
+      props.putLongProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
 
       props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
 
@@ -510,8 +510,7 @@
       }        
       
       String uid = UUIDGenerator.getInstance().generateStringUUID();
-
-      log.info("*** sending notification");
+      
       managementService.sendNotification(new Notification(uid, NotificationType.BINDING_ADDED, props));
    }
 
@@ -546,9 +545,7 @@
             }
          }
          
-         releaseTransientID(binding.getID());
-
-         
+         //releaseTransientID(binding.getID());         
       }
       finally
       {
@@ -602,6 +599,19 @@
          lock.readLock().unlock();
       }
    }
+   
+   public Binding getBindingByID(final long id)
+   {
+      lock.readLock().lock();
+      try
+      {
+         return addressManager.getBindingByID(id);
+      }
+      finally
+      {
+         lock.readLock().unlock();
+      }
+   }
 
    public Bindings getMatchingBindings(final SimpleString address)
    {
@@ -880,16 +890,17 @@
 
    private synchronized void startExpiryScanner()
    {
-      if (reaperPeriod > 0)
-      {
-         reaper = new Reaper();
-
-         expiryReaper = new Thread(reaper, "JBM-expiry-reaper");
-
-         expiryReaper.setPriority(reaperPriority);
-
-         expiryReaper.start();
-      }
+      //TODO disabled for now
+//      if (reaperPeriod > 0)
+//      {
+//         reaper = new Reaper();
+//
+//         expiryReaper = new Thread(reaper, "JBM-expiry-reaper");
+//
+//         expiryReaper.setPriority(reaperPriority);
+//
+//         expiryReaper.start();
+//      }
    }
 
    // private void routeDirect(final Queue queue, final ServerMessage message) throws Exception
@@ -918,30 +929,30 @@
    // return message;
    // }
 
-   private int generateTransientID()
-   {
-      int start = transientIDSequence;
-      do
-      {
-         int id = transientIDSequence++;
+//   private int generateTransientID()
+//   {
+//      int start = transientIDSequence;
+//      do
+//      {
+//         int id = transientIDSequence++;
+//
+//         if (!transientIDs.contains(id))
+//         {
+//            transientIDs.add(id);
+//
+//            return id;
+//         }
+//      }
+//      while (transientIDSequence != start);
+//
+//      throw new IllegalStateException("Run out of queue ids!");
+//   }
+//
+//   private void releaseTransientID(final int id)
+//   {
+//      transientIDs.remove(id);
+//   }
 
-         if (!transientIDs.contains(id))
-         {
-            transientIDs.add(id);
-
-            return id;
-         }
-      }
-      while (transientIDSequence != start);
-
-      throw new IllegalStateException("Run out of queue ids!");
-   }
-
-   private void releaseTransientID(final int id)
-   {
-      transientIDs.remove(id);
-   }
-
    private final PageMessageOperation getPageOperation(final Transaction tx)
    {
       // you could have races on the case two sessions using the same XID

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -45,6 +45,8 @@
    private final Map<SimpleString, Bindings> mappings = new HashMap<SimpleString, Bindings>();
 
    private final Map<SimpleString, Binding> nameMap = new HashMap<SimpleString, Binding>();
+   
+   private final Map<Long, Binding> idMap = new HashMap<Long, Binding>();
 
    public void addBinding(final Binding binding)
    {
@@ -55,9 +57,9 @@
       
       nameMap.put(binding.getUniqueName(), binding);
       
-      addMappingInternal(binding.getAddress(), binding);
+      idMap.put(binding.getID(), binding);
       
-      log.info(System.identityHashCode(this) + " adding binding " + binding.getUniqueName());
+      addMappingInternal(binding.getAddress(), binding);          
    }
 
    public Binding removeBinding(final SimpleString uniqueName)
@@ -68,6 +70,8 @@
       {
          return null;
       }
+      
+      idMap.remove(binding.getID());
 
       removeBindingInternal(binding.getAddress(), uniqueName);
       
@@ -81,20 +85,13 @@
 
    public Binding getBinding(final SimpleString bindableName)
    {      
-      log.info(System.identityHashCode(this) + " dumping bindings");
-      
-      for (SimpleString name: nameMap.keySet())
-      {
-         log.info("binding name: " + name);
-      }
-      
-      for (SimpleString address: mappings.keySet())
-      {
-         log.info("address name: " + address);
-      }
-      
       return nameMap.get(bindableName);
    }
+   
+   public Binding getBindingByID(final long id)
+   {      
+      return idMap.get(id);
+   }
 
    public Map<SimpleString, Binding> getBindings()
    {

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -94,8 +94,7 @@
     * @param binding the binding to add
     */
    public void addBinding(final Binding binding)
-   {
-      log.info("Adding binding " + binding.getAddress() + " name " + binding.getUniqueName());
+   {     
       super.addBinding(binding);
       Address add = addAndUpdateAddressMap(binding.getAddress());
       if (add.containsWildCard())

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -24,11 +24,8 @@
 
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
+import java.util.Iterator;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -41,7 +38,6 @@
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.CommandConfirmationHandler;
-import org.jboss.messaging.core.remoting.Interceptor;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
@@ -50,7 +46,6 @@
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.core.server.replication.Replicator;
 import org.jboss.messaging.core.server.replication.impl.JBMThread;
-import org.jboss.messaging.utils.Pair;
 
 /**
  * A ChannelImpl
@@ -180,26 +175,29 @@
    {
       send(packet, false);
    }
-   
+
    // This must never called by more than one thread concurrently
    public void send(final Packet packet, final boolean flush)
    {
-      //FIXME - this is a bit hacky
-      
+      // FIXME - this is a bit hacky
+
       Thread t = Thread.currentThread();
-      
+
       if (t instanceof JBMThread)
-      {         
+      {
          JBMThread thread = (JBMThread)t;
-   
+
          if (thread.isRecording())
          {
             thread.getReplicator().registerWaitingChannel(this);
-   
-            log.info("Queueing write");
-            
-            queuedWrites.add(new Pair<Replicator, Packet>(thread.getReplicator(), packet));
-            
+
+            QueuedWrite qw = new QueuedWrite();
+            qw.replicator = thread.getReplicator();
+            qw.packet = packet;
+            //qw.sequence = qw.replicator.getReplicateSequence();
+
+            queuedWrites.add(qw);
+
             return;
          }
       }
@@ -249,7 +247,6 @@
 
             if (connection.isActive() || packet.isWriteAlways())
             {
-               log.info("actually writing packet " + packet.getType());
                connection.getTransportConnection().write(buffer, flush);
             }
          }
@@ -318,7 +315,7 @@
             {
                resendCache.add(packet);
             }
-           
+
             connection.getTransportConnection().write(buffer);
 
             long toWait = connection.getBlockingCallTimeout();
@@ -498,15 +495,15 @@
       {
          connection.removeChannel(id);
 
-//         if (replicatingChannel != null)
-//         {
-//            // If we're reconnecting to a live node which is replicated then there will be a replicating channel
-//            // too. We need to then make sure that all replication responses come back since packets aren't
-//            // considered confirmed until response comes back and is processed. Otherwise responses to previous
-//            // message sends could come back after reconnection resulting in clients resending same message
-//            // since it wasn't confirmed yet.
-//            replicatingChannel.waitForAllReplicationResponse();
-//         }
+         // if (replicatingChannel != null)
+         // {
+         // // If we're reconnecting to a live node which is replicated then there will be a replicating channel
+         // // too. We need to then make sure that all replication responses come back since packets aren't
+         // // considered confirmed until response comes back and is processed. Otherwise responses to previous
+         // // message sends could come back after reconnection resulting in clients resending same message
+         // // since it wasn't confirmed yet.
+         // replicatingChannel.waitForAllReplicationResponse();
+         // }
 
          // And switch it
 
@@ -524,6 +521,7 @@
 
       for (final Packet packet : resendCache)
       {
+         log.info("Replaying command " + packet);
          doWrite(packet);
       }
    }
@@ -612,7 +610,7 @@
       else
       {
          if (packet.isResponse())
-         {            
+         {
             response = packet;
 
             confirm(packet);
@@ -775,28 +773,76 @@
          sendSemaphore.release(sizeToFree);
       }
    }
-   
-   
-   private java.util.Queue<Pair<Replicator, Packet>> queuedWrites = new ConcurrentLinkedQueue<Pair<Replicator, Packet>>();
 
+   private java.util.Queue<QueuedWrite> queuedWrites = new ConcurrentLinkedQueue<QueuedWrite>();
+
+   private final Object replicationLock = new Object();
+
+   // we only include sequence for debug
+   private static class QueuedWrite
+   {
+      Replicator replicator;
+
+      Packet packet;
+
+     // long sequence;
+
+      boolean done;
+   }
+ 
    public void replicationResponseReceived(final Replicator replicator)
    {
-      Pair<Replicator, Packet> pair = queuedWrites.peek();
-
-      if (pair != null && pair.a == replicator)
-      {
-         do
+      synchronized (replicationLock)
+      {         
+         QueuedWrite qw = queuedWrites.peek();
+         
+         //We assume max of only one queued write per channel per replicator for now - this is true for
+         //all actions
+         
+         //And we only send a replication response if we're waiting for a queued write TODO!
+         
+         if (qw.replicator == replicator)
          {
             queuedWrites.remove();
-
-            log.info("replication response received, sending packet " + pair.b);
-            send(pair.b);
-
-            pair = queuedWrites.peek();
+            
+            send(qw.packet);
+            
+            qw = queuedWrites.peek();
+            
+            while (qw != null)
+            {
+               if (qw.done)
+               {
+                  queuedWrites.remove();
+                  
+                  send(qw.packet);
+                  
+                  qw = queuedWrites.peek();
+               }
+               else
+               {
+                  break;
+               }
+            }
          }
-         while (pair != null && pair.a.isResponseReceived());
+         else
+         {
+            Iterator<QueuedWrite> iter = queuedWrites.iterator();
+            
+            iter.next();
+            
+            while (iter.hasNext())
+            {
+               qw = iter.next();
+               
+               if (qw.replicator == replicator)
+               {
+                  qw.done = true;
+                  
+                  break;
+               }
+            }
+         }                  
       }
    }
-
-   
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -33,10 +33,12 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REGISTER_QUEUE_REPLICATION_CHANNEL;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ACKNOWLEDGE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_CONSUMER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_QUEUE_BINDING;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_LOCK_SEQUENCES;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_QUEUE_DELIVERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REDISTRIBUTION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REMOVE_REMOTE_CONSUMER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REMOVE_REMOTE_QUEUE_BINDING;
@@ -122,6 +124,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.RegisterQueueReplicationChannelMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
@@ -148,8 +151,6 @@
    {
       final byte packetType = in.readByte();
       
-      log.info("decoding packet " + packetType);
-
       Packet packet;
 
       switch (packetType)
@@ -434,6 +435,16 @@
             packet = new ReplicateLockSequenceMessage();
             break;
          }
+         case REPLICATE_QUEUE_DELIVERY:
+         {
+            packet = new PacketImpl(REPLICATE_QUEUE_DELIVERY);
+            break;
+         }   
+         case REGISTER_QUEUE_REPLICATION_CHANNEL:
+         {
+            packet = new RegisterQueueReplicationChannelMessage();
+            break;
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + packetType);

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -27,6 +27,7 @@
 import org.jboss.messaging.core.remoting.Interceptor;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
 import org.jboss.messaging.core.remoting.spi.Connector;
@@ -368,8 +369,6 @@
    {
       final Packet packet = decoder.decode(buffer);
       
-      log.info("packet received " + packet.getType());
-
       synchronized (transferLock)
       {
          if (!frozen)

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -122,25 +122,19 @@
    public void write(final MessagingBuffer buffer, final boolean flush)
    {
       try
-      {
-         log.info("writing on invm connection");
+      {        
          executor.execute(new Runnable()
          {
             public void run()
             {
                try
-               {
-                  log.info("runnable running");
+               {                  
                   if (!closed)
                   {
                      buffer.readInt(); // read and discard
 
                      handler.bufferReceived(id, buffer);
                   }
-                  else
-                  {
-                     log.info("it's closed");
-                  }
                }
                catch (Exception e)
                {
@@ -154,7 +148,6 @@
       catch (RejectedExecutionException e)
       {
          // Ignore - this can happen if server/client is shutdown and another request comes in
-         log.error("shutdown", e);
       }
    }
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -160,6 +160,12 @@
    
    public static final byte REPLICATE_LOCK_SEQUENCES = 98;
    
+   public static final byte REPLICATE_QUEUE_DELIVERY = 99;
+   
+   public static final byte REGISTER_QUEUE_REPLICATION_CHANNEL = 100;
+   
+   public static final byte REGISTER_POST_OFFICE_REPLICATION_CHANNEL = 101;
+   
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)
@@ -187,11 +193,6 @@
    
    public int encode(final MessagingBuffer buffer)
    {
-//      if (this.type == PacketImpl.EXCEPTION)
-//      {
-//         log.info("encoding exception", new Exception());
-//      }
-      
       // The standard header fields
       buffer.writeInt(0); // The length gets filled in at the end
       buffer.writeByte(type);

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -18,7 +18,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.DataConstants;
-import org.jboss.messaging.utils.Pair;
 
 /**
  * 
@@ -34,17 +33,21 @@
 
    // Attributes ----------------------------------------------------
 
-   private List<Pair<Long, Integer>> sequences;
+   private List<Long> sequences;
 
+   private boolean requiresResponse;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public ReplicateLockSequenceMessage(final List<Pair<Long, Integer>> sequences)
+   public ReplicateLockSequenceMessage(final List<Long> sequences, final boolean requiresResponse)
    {
       super(REPLICATE_LOCK_SEQUENCES);
 
       this.sequences = sequences;
+
+      this.requiresResponse = requiresResponse;
    }
 
    // Public --------------------------------------------------------
@@ -56,35 +59,44 @@
 
    public int getRequiredBufferSize()
    {
-      return BASIC_PACKET_SIZE + DataConstants.SIZE_INT + sequences.size() * (DataConstants.SIZE_LONG + DataConstants.SIZE_INT);
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
+             sequences.size() *
+             DataConstants.SIZE_LONG +
+             DataConstants.SIZE_BOOLEAN;
    }
 
    @Override
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.writeInt(sequences.size());
-      for (Pair<Long, Integer> sequence : sequences)
+      for (long sequence : sequences)
       {
-         buffer.writeLong(sequence.a);
-         buffer.writeInt(sequence.b);
+         buffer.writeLong(sequence);
       }
+      buffer.writeBoolean(requiresResponse);
    }
 
    @Override
    public void decodeBody(final MessagingBuffer buffer)
    {
       int len = buffer.readInt();
-      sequences = new ArrayList<Pair<Long, Integer>>(len);
+      sequences = new ArrayList<Long>(len);
       for (int i = 0; i < len; i++)
       {
-         sequences.add(new Pair<Long, Integer>(buffer.readLong(), buffer.readInt()));
+         sequences.add(buffer.readLong());
       }
+      requiresResponse = buffer.readBoolean();
    }
 
-   public List<Pair<Long, Integer>> getSequences()
+   public List<Long> getSequences()
    {
       return sequences;
    }
+   
+   public boolean isRequiresResponse()
+   {
+      return requiresResponse;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicationResponseMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicationResponseMessage.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicationResponseMessage.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -22,8 +22,6 @@
 
 package org.jboss.messaging.core.remoting.impl.wireformat.replication;
 
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
-
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 
 /**
@@ -56,6 +54,11 @@
       return true;
    }
    
+   public boolean isRequiresConfirmations()
+   {
+      return false;
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -64,4 +67,3 @@
 
    // Inner classes -------------------------------------------------
 }
-

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -44,6 +44,7 @@
 import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
 import org.jboss.messaging.core.remoting.server.RemotingService;
 import org.jboss.messaging.core.remoting.spi.Acceptor;
 import org.jboss.messaging.core.remoting.spi.AcceptorFactory;
@@ -286,9 +287,6 @@
            
       RemotingConnection rc = new RemotingConnectionImpl(connection, replicatingConnection, interceptors, !config.isBackup());
 
-      log.info("** creating connection " + System.identityHashCode(rc) + " replicating connection is " + replicatingConnection + 
-               " backup is " + config.isBackup());
-      
       Channel channel1 = rc.getChannel(1, -1, false);
       
       final Replicator replicator;
@@ -305,7 +303,7 @@
             {
                if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
                {
-                  log.info("*** got replication response from create session");
+                  ReplicationResponseMessage msg = (ReplicationResponseMessage)packet;
                   replicator.replicationResponseReceived();
                }
                else

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/Queue.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/Queue.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -76,6 +76,10 @@
    void cancel(MessageReference reference) throws Exception;
 
    void deliverAsync(Executor executor);
+   
+   void deliverAll();
+   
+   HandleStatus deliverOne();
 
    List<MessageReference> list(Filter filter);
 
@@ -139,7 +143,7 @@
    void addRedistributor(long delay, Executor executor);
 
    // Only used in testing
-   void deliverNow();
+  // void deliverNow();
 
    boolean checkDLQ(MessageReference ref) throws Exception;
    

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/QueueFactory.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/QueueFactory.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/QueueFactory.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -24,6 +24,7 @@
 
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.replication.Replicator;
 import org.jboss.messaging.utils.SimpleString;
 
 /**
@@ -38,7 +39,8 @@
  */
 public interface QueueFactory
 {
-   Queue createQueue(long persistenceID, final SimpleString address, SimpleString name, Filter filter, boolean durable, boolean temporary);
+   Queue createQueue(long persistenceID, final SimpleString address, SimpleString name, Filter filter, boolean durable,
+                     boolean temporary, Replicator replicator);
 
    /**
     * This is required for delete-all-reference to work correctly with paging

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -106,7 +106,7 @@
 
    private volatile boolean started;
    
-   private int replicationCount;
+  // private int replicationCount;
    
    /*
     * Constructor using static list of connectors
@@ -452,51 +452,51 @@
 
          clearBindings();
          
-         waitForReplicationsToComplete(3000);
+        // waitForReplicationsToComplete(3000);
       }
       
-      private synchronized void waitForReplicationsToComplete(long timeout)
-      {
-         long toWait = timeout;
+//      private synchronized void waitForReplicationsToComplete(long timeout)
+//      {
+//         long toWait = timeout;
+//
+//         long start = System.currentTimeMillis();
+//
+//         while (replicationCount > 0 && toWait > 0)
+//         {
+//            try
+//            {
+//               wait(toWait);
+//            }
+//            catch (InterruptedException e)
+//            {
+//            }
+//
+//            long now = System.currentTimeMillis();
+//
+//            toWait -= now - start;
+//
+//            start = now;
+//         }
+//
+//         if (toWait <= 0)
+//         {
+//            log.warn("Timed out waiting for replication responses to return");
+//         }
+//      
+//      }
+//      
+//      private synchronized void replicationComplete()
+//      {
+//         replicationCount--;
+//         
+//         notify();
+//      }
+//      
+//      private synchronized void beforeReplicate()
+//      {
+//         replicationCount++;
+//      }
 
-         long start = System.currentTimeMillis();
-
-         while (replicationCount > 0 && toWait > 0)
-         {
-            try
-            {
-               wait(toWait);
-            }
-            catch (InterruptedException e)
-            {
-            }
-
-            long now = System.currentTimeMillis();
-
-            toWait -= now - start;
-
-            start = now;
-         }
-
-         if (toWait <= 0)
-         {
-            log.warn("Timed out waiting for replication responses to return");
-         }
-      
-      }
-      
-      private synchronized void replicationComplete()
-      {
-         replicationCount--;
-         
-         notify();
-      }
-      
-      private synchronized void beforeReplicate()
-      {
-         replicationCount++;
-      }
-
       public void activate(final Queue queue) throws Exception
       {
          this.queue = queue;
@@ -631,7 +631,7 @@
             throw new IllegalStateException("queueID is null");
          }
 
-         RemoteQueueBinding binding = new RemoteQueueBindingImpl(queueAddress,
+         RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateUniqueID(), queueAddress,
                                                                     clusterName,
                                                                     routingName,
                                                                     queueID,
@@ -782,7 +782,7 @@
 
       Queue queue = (Queue)queueBinding.getBindable();
 
-      RemoteQueueBinding binding = new RemoteQueueBindingImpl(address,
+      RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateUniqueID(), address,
                                                               uniqueName,
                                                               routingName,
                                                               queueID,

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -59,7 +59,7 @@
    private final SimpleString uniqueName;
 
    private final SimpleString routingName;
-   
+
    private final int remoteQueueID;
 
    private final Filter queueFilter;
@@ -71,20 +71,23 @@
    private int consumerCount;
 
    private final SimpleString idsHeaderName;
-   
-   private int id;
-      
+
+   private final long id;
+
    private final int distance;
-   
-   public RemoteQueueBindingImpl(final SimpleString address,
+
+   public RemoteQueueBindingImpl(final long id,
+                                 final SimpleString address,
                                  final SimpleString uniqueName,
                                  final SimpleString routingName,
                                  final int remoteQueueID,
                                  final SimpleString filterString,
-                                 final Queue storeAndForwardQueue,                     
+                                 final Queue storeAndForwardQueue,
                                  final SimpleString bridgeName,
                                  final int distance) throws Exception
    {
+      this.id = id;
+      
       this.address = address;
 
       this.storeAndForwardQueue = storeAndForwardQueue;
@@ -92,7 +95,7 @@
       this.uniqueName = uniqueName;
 
       this.routingName = routingName;
-      
+
       this.remoteQueueID = remoteQueueID;
 
       if (filterString != null)
@@ -103,22 +106,17 @@
       {
          queueFilter = null;
       }
-      
+
       this.idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(bridgeName);
-      
+
       this.distance = distance;
    }
-   
-   public int getID()
+
+   public long getID()
    {
       return id;
    }
-   
-   public void setID(final int id)
-   {
-      this.id = id;
-   }
-   
+
    public SimpleString getAddress()
    {
       return address;
@@ -128,7 +126,7 @@
    {
       return storeAndForwardQueue;
    }
-   
+
    public Queue getQueue()
    {
       return storeAndForwardQueue;
@@ -143,7 +141,7 @@
    {
       return uniqueName;
    }
-   
+
    public SimpleString getClusterName()
    {
       return uniqueName;
@@ -153,24 +151,24 @@
    {
       return false;
    }
-   
+
    public BindingType getType()
    {
       return BindingType.REMOTE_QUEUE;
    }
-   
+
    public Filter getFilter()
    {
       return queueFilter;
    }
-   
+
    public int getDistance()
    {
       return distance;
    }
 
    public synchronized boolean isHighAcceptPriority(final ServerMessage message)
-   {      
+   {
       if (consumerCount == 0)
       {
          return false;
@@ -193,15 +191,15 @@
 
       return false;
    }
-   
+
    public void willRoute(final ServerMessage message)
-   {               
-      //We add a header with the name of the queue, holding a list of the transient ids of the queues to route to
-      
-      //TODO - this can be optimised
-      
+   {
+      // We add a header with the name of the queue, holding a list of the transient ids of the queues to route to
+
+      // TODO - this can be optimised
+
       byte[] ids = (byte[])message.getProperty(idsHeaderName);
-      
+
       if (ids == null)
       {
          ids = new byte[4];
@@ -209,17 +207,17 @@
       else
       {
          byte[] newIds = new byte[ids.length + 4];
-         
+
          System.arraycopy(ids, 0, newIds, 4, ids.length);
-                          
+
          ids = newIds;
       }
-      
+
       ByteBuffer buff = ByteBuffer.wrap(ids);
-      
+
       buff.putInt(remoteQueueID);
-      
-      message.putBytesProperty(idsHeaderName, ids); 
+
+      message.putBytesProperty(idsHeaderName, ids);
    }
 
    public synchronized void addConsumer(final SimpleString filterString) throws Exception
@@ -271,7 +269,7 @@
 
       consumerCount--;
    }
-   
+
    public synchronized int consumerCount()
    {
       return consumerCount;

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/LastValueQueue.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/LastValueQueue.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/LastValueQueue.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -38,6 +38,7 @@
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.replication.Replicator;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.transaction.Transaction;
@@ -69,7 +70,8 @@
                          final ScheduledExecutorService scheduledExecutor,
                          final PostOffice postOffice,
                          final StorageManager storageManager,
-                         final HierarchicalRepository<AddressSettings> addressSettingsRepository)
+                         final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+                         final Replicator replicator)
    {
       super(id,
             address,
@@ -80,7 +82,8 @@
             scheduledExecutor,
             postOffice,
             storageManager,
-            addressSettingsRepository);
+            addressSettingsRepository,
+            replicator);
       this.pagingManager = postOffice.getPagingManager();
       this.storageManager = storageManager;
    }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -32,6 +32,8 @@
 import javax.management.MBeanServer;
 
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ConnectionManager;
+import org.jboss.messaging.core.client.impl.ConnectionManagerImpl;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.cluster.DivertConfiguration;
@@ -74,7 +76,9 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.RegisterQueueReplicationChannelMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateStartupInfoMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
 import org.jboss.messaging.core.remoting.server.RemotingService;
 import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
 import org.jboss.messaging.core.remoting.spi.Connection;
@@ -464,6 +468,8 @@
 
       // Need to activate the connection even if session can't be found - since otherwise response
       // will never get back
+      
+      log.info("** reattaching session");
 
       checkActivate(connection);
 
@@ -476,6 +482,8 @@
          // Reconnect the channel to the new connection
          int serverLastReceivedCommandID = session.transferConnection(connection, lastReceivedCommandID);
 
+         log.info("Reattached session ok");
+         
          return new ReattachSessionResponseMessage(serverLastReceivedCommandID, false);
       }
    }
@@ -713,6 +721,8 @@
 
    private Map<String, Object> backupConnectorParams;
 
+   private ConnectionManager replicatingConnectionManager;
+   
    private void setupBackupConnectorFactory()
    {
       String backupConnectorName = configuration.getBackupConnectorName();
@@ -742,11 +752,37 @@
             }
 
             backupConnectorParams = backupConnector.getParams();
+            
+            replicatingConnectionManager = new ConnectionManagerImpl(null,
+                                                                     backupConnector,
+                                                                     null,
+                                                                     false,
+                                                                     10, // TODO don't hardcode this
+                                                                     ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+                                                                     ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+                                                                     ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+                                                                     0,
+                                                                     0d,
+                                                                     0,
+                                                                     this.threadPool,
+                                                                     this.scheduledPool);
          }
       }
    }
 
    private boolean activatedBackup;
+      
+   public synchronized RemotingConnection getPooledReplicatingConnection()
+   {
+      RemotingConnection conn = null;
+      
+      if (replicatingConnectionManager != null)
+      {
+         conn = replicatingConnectionManager.getConnection(1);
+      }
+      
+      return conn;
+   }
 
    public synchronized RemotingConnection getReplicatingConnection()
    {
@@ -779,37 +815,8 @@
 
             ChannelHandler prevHandler = channel1.getHandler();
             
-            log.info("Prev handler is " + prevHandler);
+            sendOnReplicatingAndWaitForResponse(packet, channel1);
             
-            final Future future = new Future();
-
-            channel1.setHandler(new ChannelHandler()
-            {
-               public void handlePacket(final Packet packet)
-               {
-                  if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
-                  {
-                     log.info("&&&&&&&&&&&&&&&& got replication response");
-                     future.run();
-                  }
-                  else
-                  {
-                     throw new IllegalArgumentException("Invalid packet " + packet.getType());
-                  }
-               }
-            });
-
-            channel1.send(packet);
-
-            boolean ok = future.await(10000);
-
-            if (!ok)
-            {
-               throw new IllegalStateException("Timed out waiting for response from backup for initialisation");
-            }
-
-            log.info("got response");
-            
             channel1.setHandler(prevHandler);
 
             activatedBackup = true;
@@ -902,6 +909,8 @@
             }
 
             configuration.setBackup(false);
+            
+            log.info("set backup to false");
 
             if (clusterManager != null)
             {
@@ -1250,7 +1259,72 @@
    //
    // return true;
    // }
+   
+   private void sendOnReplicatingAndWaitForResponse(final Packet packet, final Channel channel)
+   {
+      final Future future = new Future();
+      
+      channel.setHandler(new ChannelHandler()
+      {
+         public void handlePacket(final Packet packet)
+         {
+            future.run();
+         }
+      });
+            
+      channel.send(packet);
+      
+      boolean ok = future.await(10000);
 
+      if (!ok)
+      {
+         throw new IllegalStateException("Timed out waiting for response from backup");
+      }
+   }
+   
+   private Replicator getReplicatorForQueue(final long queueID)
+   {
+      RemotingConnection replicatingConnection = this.getPooledReplicatingConnection();
+      
+      final Replicator replicator;
+      
+      if (replicatingConnection != null)
+      {
+         Channel channel1 = replicatingConnection.getChannel(1, -1, false);
+         
+         JBMThread thread = JBMThread.currentThread();
+         
+         thread.setNoReplayOrRecord();
+         
+         //sendOnReplicatingAndWaitForResponse(new RegisterQueueReplicationChannelMessage(queueID), channel1);
+         
+         //Actually no need to wait for response
+         
+         channel1.send(new RegisterQueueReplicationChannelMessage(queueID));
+         
+         thread.resumeRecording();
+                  
+         Channel replChannel = replicatingConnection.getChannel(queueID, -1, false);
+         
+
+         replicator = new ReplicatorImpl(replChannel);
+         
+         replChannel.setHandler(new ChannelHandler()
+         {
+            public void handlePacket(final Packet packet)
+            {               
+               replicator.replicationResponseReceived();      
+            }
+         });
+      }
+      else
+      {
+         replicator = null;
+      }
+      
+      return replicator;
+   }
+
    private void loadJournal() throws Exception
    {
       List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
@@ -1270,13 +1344,16 @@
          {
             filter = new FilterImpl(queueBindingInfo.getFilterString());
          }
+         
+         Replicator replicator = getReplicatorForQueue(queueBindingInfo.getPersistenceID());
 
          Queue queue = queueFactory.createQueue(queueBindingInfo.getPersistenceID(),
                                                 queueBindingInfo.getAddress(),
                                                 queueBindingInfo.getQueueName(),
                                                 filter,
                                                 true,
-                                                false);
+                                                false,
+                                                replicator);
 
          Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeID);
 
@@ -1344,8 +1421,7 @@
                              final boolean durable,
                              final boolean temporary,
                              final boolean ignoreIfExists) throws Exception
-   {
-      log.info("** creating queue " + queueName);
+   {      
       Binding binding = postOffice.getBinding(queueName);
 
       if (binding != null)
@@ -1366,8 +1442,18 @@
       {
          filter = new FilterImpl(filterString);
       }
+      
+      long queueID;
+      
+      do
+      {
+         queueID = storageManager.generateUniqueID();
+      }
+      while (queueID == 0 || queueID == 1); //0 and 1 are reserved channels
+      
+      Replicator replicator = getReplicatorForQueue(queueID);
 
-      final Queue queue = queueFactory.createQueue(storageManager.generateUniqueID(), address, queueName, filter, durable, temporary);
+      Queue queue = queueFactory.createQueue(queueID, address, queueName, filter, durable, temporary, replicator);
 
       binding = new LocalQueueBinding(address, queue, nodeID);
 
@@ -1436,7 +1522,7 @@
                                         pagingManager,
                                         storageManager);
 
-         Binding binding = new DivertBinding(sAddress, divert);
+         Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
 
          postOffice.addBinding(binding);
 
@@ -1483,6 +1569,8 @@
       {
          // This session may well be on a different connection and different channel id, so we must get rid
          // of it and create another
+         
+         //TODO - is this true any more??
          currentSession.getChannel().close();
       }
 
@@ -1490,10 +1578,6 @@
            
       RemotingConnection replicatingConnection = connection.getReplicatingConnection();
       
-      log.info("getting repl conenction from " + System.identityHashCode(connection) + " it is " + replicatingConnection);
-
-      log.info("Creating session, replicating connection is " + replicatingConnection);
-
       final Replicator replicator;
 
       if (replicatingConnection != null)
@@ -1507,7 +1591,7 @@
             public void handlePacket(final Packet packet)
             {
                if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
-               {
+               {                                    
                   replicator.replicationResponseReceived();
                }
                else
@@ -1547,8 +1631,6 @@
  
       ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, replicator, configuration);
 
-      log.info(System.identityHashCode(handler)+ " ** creating serversessionpackethandler on backup "+ configuration.isBackup() + " replicator is " + replicator);      
-      
       session.setHandler(handler);
 
       channel.setHandler(handler);
@@ -1619,7 +1701,5 @@
          t.setPriority(threadPriority);
          return t;
       }
-
    }
-
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -12,11 +12,10 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CLOSED;
-import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CREATED;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REGISTER_POST_OFFICE_REPLICATION_CHANNEL;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REGISTER_QUEUE_REPLICATION_CHANNEL;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_LOCK_SEQUENCES;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_STARTUP_INFO;
 
@@ -24,35 +23,22 @@
 
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.management.Notification;
-import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.RegisterQueueReplicationChannelMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingAddedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingRemovedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerAddedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerRemovedMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateStartupInfoMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
-import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.cluster.ClusterConnection;
-import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
 import org.jboss.messaging.core.server.replication.ReplicableAction;
 import org.jboss.messaging.core.server.replication.Replicator;
 import org.jboss.messaging.core.server.replication.impl.JBMThread;
-import org.jboss.messaging.utils.Pair;
 
 /**
  * A packet handler for all packets that need to be handled at the server level
@@ -61,7 +47,7 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  */
-public class MessagingServerPacketHandler implements ChannelHandler, ReplicableAction
+public class MessagingServerPacketHandler implements ChannelHandler
 {
    private static final Logger log = Logger.getLogger(MessagingServerPacketHandler.class);
 
@@ -70,13 +56,13 @@
    private final Channel channel1;
 
    private final RemotingConnection connection;
-   
+
    private Replicator replicator;
+
+   private volatile List<Long> sequences;
    
-   private Packet packet;
-   
-   private List<Pair<Long, Integer>> sequences;
-      
+   private volatile boolean requiresReplicationResponse;
+
    public MessagingServerPacketHandler(final MessagingServer server,
                                        final Channel channel1,
                                        final RemotingConnection connection,
@@ -87,86 +73,37 @@
       this.channel1 = channel1;
 
       this.connection = connection;
-      
+
       this.replicator = replicator;
    }
-   
-   public void run()
-   {
-      handlePacket();
-   }
-   
-   public Packet getPacket()
-   {
-      return packet;
-   }
-   
+
+
    public void handlePacket(final Packet packet)
    {
-      this.packet = packet;
-      
-      log.info("Handling packet " + packet.getType() + " on backup " + server.getConfiguration().isBackup());
-      
-      if (server.getConfiguration().isBackup())
-      {         
-         log.info("getting current thread");
-         
-         JBMThread thread = JBMThread.currentThread();
-         
-         thread.setReplay(sequences);
-         
-         //thread.setReplay(true);
-         
-         log.info("about to call handle packet");
-         
-         handlePacket();  
-         
-         //send the response message
-         
-         log.info("sending back replication response");
-         
-         if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES)
-         {
-            channel1.send(new ReplicationResponseMessage());
-         }
-      }
-      else
-      {     
-         log.info("not backup");
-         if (replicator != null)
-         {
-            replicator.execute(this);
-         }
-         else
-         {
-            handlePacket();
-         }
-      }
-   }
-
-   private void handlePacket()
-   {
       byte type = packet.getType();
-      
+
       if (!server.isInitialised() && type != PacketImpl.REPLICATE_STARTUP_INFO)
       {
-         throw new IllegalStateException("First packet must be startup info for backup " + type);        
+         throw new IllegalStateException("First packet must be startup info for backup " + type);
       }
 
-      // All these operations need to be idempotent since they are outside of the session
-      // reliability replay functionality
       switch (type)
       {
          case REPLICATE_LOCK_SEQUENCES:
          {
             ReplicateLockSequenceMessage msg = (ReplicateLockSequenceMessage)packet;
-            sequences = msg.getSequences();
-            log.info("Got sequences " + sequences.size());
+            
+            sequences = msg.getSequences();  
+            
+            requiresReplicationResponse = msg.isRequiresResponse();
+            
             return;
          }
          case REPLICATE_STARTUP_INFO:
-         {          
+         {
             ReplicateStartupInfoMessage msg = (ReplicateStartupInfoMessage)packet;
+
+           // log.info("** got replicate startup info");
             
             try
             {
@@ -176,99 +113,94 @@
             {
                log.error("Failed to initialise", e);
             }
-            
-            break;
-         }
-         case CREATESESSION:
-         {
-            CreateSessionMessage request = (CreateSessionMessage)packet;
 
-            log.info("sequences is " + sequences);
-            handleCreateSession(request, sequences == null);
-
             break;
          }
-         case REATTACH_SESSION:
-         {
-            ReattachSessionMessage request = (ReattachSessionMessage)packet;
-
-            handleReattachSession(request);
-
-            break;
-         }
-         case CREATE_QUEUE:
-         {
-            // Create queue can also be fielded here in the case of a replicated store and forward queue creation
-
-            CreateQueueMessage request = (CreateQueueMessage)packet;
+         case REGISTER_QUEUE_REPLICATION_CHANNEL:
+         {           
+            RegisterQueueReplicationChannelMessage msg = (RegisterQueueReplicationChannelMessage)packet;
+                        
+            Channel channel = connection.getChannel(msg.getBindingID(), -1, false);
             
-            handleCreateQueue(request);
-
+            channel.setHandler(new QueueReplicationPacketHandler(msg.getBindingID(), server.getPostOffice(), channel));
+            
             break;
          }
-         case PacketImpl.REPLICATE_ADD_REMOTE_QUEUE_BINDING:
+         case REGISTER_POST_OFFICE_REPLICATION_CHANNEL:
          {
-            ReplicateRemoteBindingAddedMessage request = (ReplicateRemoteBindingAddedMessage)packet;
-
-            handleAddRemoteQueueBinding(request);
-
             break;
          }
-         case PacketImpl.REPLICATE_REMOVE_REMOTE_QUEUE_BINDING:
+         case CREATESESSION:
          {
-            ReplicateRemoteBindingRemovedMessage request = (ReplicateRemoteBindingRemovedMessage)packet;
+            final CreateSessionMessage request = (CreateSessionMessage)packet;
 
-            handleRemoveRemoteQueueBinding(request);
+            ReplicableAction action = new ReplicableAction()
+            {
+               public void run()
+               {
+                  handleCreateSession(request, sequences == null);
+               }
+               
+               public Packet getPacket()
+               {
+                  return packet;
+               }
+            };
+            
+            if (server.getConfiguration().isBackup())
+            {
+               JBMThread thread = JBMThread.currentThread();
 
-            break;
-         }
-         case PacketImpl.REPLICATE_ADD_REMOTE_CONSUMER:
-         {
-            ReplicateRemoteConsumerAddedMessage request = (ReplicateRemoteConsumerAddedMessage)packet;
+               thread.setReplay(sequences);
 
-            handleAddRemoteConsumer(request);
+               action.run();
+               
+               thread.setNoReplayOrRecord();
+            }
+            else
+            {
+               if (replicator != null)
+               {
+                  replicator.execute(action);
+               }
+               else
+               {
+                  action.run();
+               }
+            }
 
             break;
          }
-         case PacketImpl.REPLICATE_REMOVE_REMOTE_CONSUMER:
+         case REATTACH_SESSION:
          {
-            ReplicateRemoteConsumerRemovedMessage request = (ReplicateRemoteConsumerRemovedMessage)packet;
+            ReattachSessionMessage request = (ReattachSessionMessage)packet;
 
-            handleRemoveRemoteConsumer(request);
+            handleReattachSession(request);
 
             break;
          }
-         case PacketImpl.REPLICATE_ACKNOWLEDGE:
-         {
-            ReplicateAcknowledgeMessage request = (ReplicateAcknowledgeMessage)packet;
-
-            handleReplicateAcknowledge(request);
-
-            break;
-         }
-         case PacketImpl.REPLICATE_REDISTRIBUTION:
-         {
-            ReplicateRedistributionMessage message = (ReplicateRedistributionMessage)packet;
-            
-            handleReplicateRedistribution(message);
-            
-            break;
-         }
          default:
          {
             log.error("Invalid packet " + packet);
          }
       }
       sequences = null;
-   }
+      
+      // send the response message
 
+      if (server.getConfiguration().isBackup() && requiresReplicationResponse || type == REPLICATE_STARTUP_INFO)
+      {
+         channel1.send(new ReplicationResponseMessage());
+      }
+   }
+   
    private void handleCreateSession(final CreateSessionMessage request, final boolean activate)
    {
       Packet response;
       try
-      {         
+      {
          response = server.createSession(request.getName(),
-                                         request.getSessionChannelID(),                                         
+                                         request.getSessionChannelID(),
                                          request.getUsername(),
                                          request.getPassword(),
                                          request.getMinLargeMessageSize(),
@@ -294,10 +226,10 @@
             response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
          }
       }
-      
-      channel1.send(response); 
+
+      channel1.send(response);
    }
-   
+
    private void handleReattachSession(final ReattachSessionMessage request)
    {
       Packet response;
@@ -323,163 +255,4 @@
       channel1.send(response);
    }
 
-   private void handleCreateQueue(final CreateQueueMessage request)
-   {
-      try
-      {
-         server.createQueue(request.getAddress(), request.getQueueName(), request.getFilterString(), request.isDurable(), request.isTemporary());
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to handle create queue", e);
-      }
-   }
-
-   private void handleAddRemoteQueueBinding(final ReplicateRemoteBindingAddedMessage request)
-   {
-      ClusterConnection cc = server.getClusterManager().getClusterConnection(request.getClusterConnectionName());
-
-      if (cc == null)
-      {
-         throw new IllegalStateException("No cluster connection found with name " + request.getClusterConnectionName());
-      }
-
-      try
-      {
-         cc.handleReplicatedAddBinding(request.getAddress(),
-                                       request.getUniqueName(),
-                                       request.getRoutingName(),
-                                       request.getRemoteQueueID(),
-                                       request.getFilterString(),
-                                       request.getSfQueueName(),
-                                       request.getDistance());
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to handle add remote queue binding", e);
-      }
-   }
-
-   private void handleRemoveRemoteQueueBinding(final ReplicateRemoteBindingRemovedMessage request)
-   {
-      try
-      {
-         Binding binding = server.getPostOffice().removeBinding(request.getUniqueName());
-
-         if (binding == null)
-         {
-            throw new IllegalStateException("Cannot find binding to remove " + request.getUniqueName());
-         }
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to handle remove remote queue binding", e);
-      }
-   }
-
-   private void handleAddRemoteConsumer(final ReplicateRemoteConsumerAddedMessage request)
-   {
-      RemoteQueueBinding binding = (RemoteQueueBinding)server.getPostOffice()
-                                                             .getBinding(request.getUniqueBindingName());
-
-      if (binding == null)
-      {
-         throw new IllegalStateException("Cannot find binding to remove " + request.getUniqueBindingName());
-      }
-
-      try
-      {
-         binding.addConsumer(request.getFilterString());
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to handle add remote consumer", e);
-      }
-      
-      // Need to propagate the consumer add
-      Notification notification = new Notification(null, CONSUMER_CREATED, request.getProperties());
-
-      try
-      {
-         server.getManagementService().sendNotification(notification);
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to handle add remote consumer", e);
-      }
-   }
-
-   private void handleRemoveRemoteConsumer(final ReplicateRemoteConsumerRemovedMessage request)
-   {
-      RemoteQueueBinding binding = (RemoteQueueBinding)server.getPostOffice()
-                                                             .getBinding(request.getUniqueBindingName());
-
-      if (binding == null)
-      {
-         throw new IllegalStateException("Cannot find binding to remove " + request.getUniqueBindingName());
-      }
-
-      try
-      {
-         binding.removeConsumer(request.getFilterString());
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to handle remove remote consumer", e);
-      }
-      
-      // Need to propagate the consumer close
-      Notification notification = new Notification(null, CONSUMER_CLOSED, request.getProperties());
-
-      try
-      {
-         server.getManagementService().sendNotification(notification);
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to handle remove remote consumer", e);
-      }
-   }
-
-   private void handleReplicateAcknowledge(final ReplicateAcknowledgeMessage request)
-   {
-      Binding binding = server.getPostOffice().getBinding(request.getUniqueName());
-
-      if (binding == null)
-      {
-         throw new IllegalStateException("Cannot find binding " + request.getUniqueName());
-      }
-
-      try
-      {
-         Queue queue = (Queue)binding.getBindable();
-         
-         MessageReference ref = queue.removeFirstReference(request.getMessageID());
-         
-         queue.acknowledge(ref);
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to handle remove remote consumer", e);
-      }
-   }
-   
-   private void handleReplicateRedistribution(final ReplicateRedistributionMessage request)
-   {
-      Binding binding = server.getPostOffice().getBinding(request.getQueueName());
-
-      if (binding == null)
-      {
-         throw new IllegalStateException("Cannot find binding " + request.getQueueName());
-      }
-
-      try
-      {
-         server.handleReplicateRedistribution(request.getQueueName(), request.getMessageID());
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to handle remove remote consumer", e);
-      }
-   }
 }
\ No newline at end of file

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -29,6 +29,7 @@
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
+import org.jboss.messaging.core.server.replication.Replicator;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.utils.SimpleString;
@@ -73,7 +74,8 @@
                             final SimpleString name,
                             final Filter filter,
                             final boolean durable,
-                            final boolean temporary)
+                            final boolean temporary,
+                            final Replicator replicator)
    {
       AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
 
@@ -89,7 +91,8 @@
                                    scheduledExecutor,
                                    postOffice,
                                    storageManager,
-                                   addressSettingsRepository);
+                                   addressSettingsRepository,
+                                   replicator);
       }
       else
       {
@@ -102,7 +105,8 @@
                                scheduledExecutor,
                                postOffice,
                                storageManager,
-                               addressSettingsRepository);
+                               addressSettingsRepository,
+                               replicator);
       }
 
       queue.setDistributionPolicy(addressSettings.getDistributionPolicy());

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -15,6 +15,7 @@
 import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ACTUAL_EXPIRY_TIME;
 import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ORIGINAL_DESTINATION;
 import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ORIG_MESSAGE_ID;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_QUEUE_DELIVERY;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -43,6 +44,8 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Bindings;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.Distributor;
 import org.jboss.messaging.core.server.HandleStatus;
@@ -51,7 +54,9 @@
 import org.jboss.messaging.core.server.ScheduledDeliveryHandler;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.cluster.impl.Redistributor;
-import org.jboss.messaging.core.server.replication.impl.StatefulObjectReadWriteLock;
+import org.jboss.messaging.core.server.replication.ReplicableCall;
+import org.jboss.messaging.core.server.replication.Replicator;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.transaction.Transaction;
@@ -78,6 +83,8 @@
 
    public static final int NUM_PRIORITIES = 10;
 
+   private final Replicator replicator;
+
    private final long id;
 
    private final SimpleString name;
@@ -147,7 +154,8 @@
                     final ScheduledExecutorService scheduledExecutor,
                     final PostOffice postOffice,
                     final StorageManager storageManager,
-                    final HierarchicalRepository<AddressSettings> addressSettingsRepository)
+                    final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+                    final Replicator replicator)
    {
       this.id = id;
 
@@ -181,8 +189,10 @@
       direct = true;
 
       scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
-      
-      lock = new StatefulObjectReadWriteLock(name.toString(), id, 0).writeLock();
+
+      lock = new ReplicationAwareReadWriteLock(name.toString(), 0).writeLock();
+
+      this.replicator = replicator;
    }
 
    // Bindable implementation -------------------------------------------------------------------------------------
@@ -274,9 +284,7 @@
 
             tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
 
-            storageManager.storeReferenceTransactional(tx.getID(),
-                                                       ref.getQueue().getID(),
-                                                       message.getMessageID());
+            storageManager.storeReferenceTransactional(tx.getID(), ref.getQueue().getID(), message.getMessageID());
          }
 
          if (scheduledDeliveryTime != null && durableRef)
@@ -372,26 +380,33 @@
    {
       // Prevent too many executors running at once
 
-      if (waitingToDeliver.compareAndSet(false, true))
+      if (backup)
       {
-         executor.execute(deliverRunner);
+         // We don't deliver async directly on the backup
+         return;
       }
-   }
 
-   // Only used in testing - do not call directly!
-   public void deliverNow()
-   {
-      lock.lock();
-      try
+      if (waitingToDeliver.compareAndSet(false, true))
       {
-         deliver();
+         // log.info("delivering async on backup " + backup, new Exception());
+         executor.execute(deliverRunner);
       }
-      finally
-      {
-         lock.unlock();
-      }
    }
 
+   // // Only used in testing - do not call directly!
+   // public void deliverNow()
+   // {
+   // lock.lock();
+   // try
+   // {
+   // deliver();
+   // }
+   // finally
+   // {
+   // lock.unlock();
+   // }
+   // }
+
    public void addConsumer(final Consumer consumer) throws Exception
    {
       lock.lock();
@@ -1126,81 +1141,57 @@
    {
       backup = true;
 
-      direct = false;
+      // direct = false;
    }
 
-   public boolean activate()
+   public synchronized boolean activate()
    {
-      lock.lock();
-      try
+      consumersToFailover = consumers.size();
+
+      if (consumersToFailover == 0)
       {
-         consumersToFailover = consumers.size();
+         backup = false;
 
-         if (consumersToFailover == 0)
-         {
-            backup = false;
-
-            return true;
-         }
-         else
-         {
-            return false;
-         }
+         return true;
       }
-      finally
+      else
       {
-         lock.unlock();
+         return false;
       }
    }
 
-   public void activateNow(final Executor executor)
+   public synchronized void activateNow(final Executor executor)
    {
-      lock.lock();
-      try
+      if (backup)
       {
-         if (backup)
-         {
-            log.info("Timed out waiting for all consumers to reconnect to queue " + name +
-                     " so queue will be activated now");
+         log.info("Timed out waiting for all consumers to reconnect to queue " + name +
+                  " so queue will be activated now");
 
-            backup = false;
+         backup = false;
 
-            scheduledDeliveryHandler.reSchedule();
+         scheduledDeliveryHandler.reSchedule();
 
-            deliverAsync(executor);
-         }
+         deliverAsync(executor);
       }
-      finally
-      {
-         lock.unlock();
-      }
    }
 
-   public boolean consumerFailedOver()
+   public synchronized boolean consumerFailedOver()
    {
-      lock.lock();
-      try
+      consumersToFailover--;
+
+      if (consumersToFailover == 0)
       {
-         consumersToFailover--;
+         // All consumers for the queue have failed over, can re-activate it now
 
-         if (consumersToFailover == 0)
-         {
-            // All consumers for the queue have failed over, can re-activate it now
+         backup = false;
 
-            backup = false;
+         scheduledDeliveryHandler.reSchedule();
 
-            scheduledDeliveryHandler.reSchedule();
-
-            return true;
-         }
-         else
-         {
-            return false;
-         }
+         return true;
       }
-      finally
+      else
       {
-         lock.unlock();
+         return false;
       }
    }
 
@@ -1417,33 +1408,21 @@
       tx.commit();
    }
 
-   /*
-    * Attempt to deliver all the messages in the queue
-    */
-   private void deliver()
+   public HandleStatus deliverOne()
    {
       lock.lock();
 
+      direct = false;
+
       try
       {
-         // We don't do actual delivery if the queue is on a backup node - this is
-         // because it's async and could get out of step
-         // with the live node. Instead, when we replicate the delivery we remove
-         // the ref from the queue
+         Iterator<MessageReference> iterator = null;
 
-         if (backup)
+         HandleStatus status;
+         do
          {
-            return;
-         }
+            MessageReference reference;
 
-         direct = false;
-
-         MessageReference reference;
-
-         Iterator<MessageReference> iterator = null;
-
-         while (true)
-         {
             if (iterator == null)
             {
                reference = messageReferences.peekFirst();
@@ -1484,49 +1463,45 @@
 
                   promptDelivery = false;
                }
-               return;
+
+               status = HandleStatus.BUSY;
             }
-
-            // PagingManager would be null only on testcases
-            if (pagingStore == null && pagingManager != null)
+            else
             {
-               // TODO: It would be better if we could initialize the pagingStore during the construction
-               try
+               // PagingManager would be null only on testcases
+               if (pagingStore == null && pagingManager != null)
                {
-                  pagingStore = pagingManager.getPageStore(reference.getMessage().getDestination());
+                  // TODO: It would be better if we could initialize the pagingStore during the construction
+                  try
+                  {
+                     pagingStore = pagingManager.getPageStore(reference.getMessage().getDestination());
+                  }
+                  catch (Exception e)
+                  {
+                     // This shouldn't happen, and if it happens, this shouldn't abort the route
+
+                     log.error("Failed to get page store", e);
+                  }
                }
-               catch (Exception e)
-               {
-                  // This shouldn't happen, and if it happens, this shouldn't abort the route
-               }
-            }
 
-            HandleStatus status = deliver(reference);
+               status = deliverReference(reference);
 
-            if (status == HandleStatus.HANDLED)
-            {
-               if (iterator == null)
+               if (status == HandleStatus.HANDLED)
                {
-                  messageReferences.removeFirst();
+                  if (iterator == null)
+                  {
+                     messageReferences.removeFirst();
+                  }
+                  else
+                  {
+                     iterator.remove();
+                  }
                }
-               else
-               {
-                  iterator.remove();
-               }
             }
-            else if (status == HandleStatus.BUSY)
-            {
-               // All consumers busy - give up
-               break;
-            }
-            else if (status == HandleStatus.NO_MATCH && iterator == null)
-            {
-               // Consumers not all busy - but filter not accepting - iterate
-               // back
-               // through the queue
-               iterator = messageReferences.iterator();
-            }
          }
+         while (status == HandleStatus.NO_MATCH);
+
+         return status;
       }
       finally
       {
@@ -1551,11 +1526,12 @@
 
          boolean add = false;
 
-         if (direct && !backup)
+         if (direct)
          {
             // Deliver directly
 
-            HandleStatus status = deliver(ref);
+            // log.info("delivering direct on backup " + backup);
+            HandleStatus status = deliverReference(ref);
 
             if (status == HandleStatus.HANDLED)
             {
@@ -1577,6 +1553,7 @@
          }
          else
          {
+            // log.info("Not delivering direct on backup " + backup);
             add = true;
          }
 
@@ -1604,7 +1581,7 @@
                // filters with queues - in most cases
                // it's an ant-pattern since it would cause a queue scan on each
                // message
-               deliver();
+               deliverAll();
             }
          }
       }
@@ -1614,7 +1591,7 @@
       }
    }
 
-   private HandleStatus deliver(final MessageReference reference)
+   private HandleStatus deliverReference(final MessageReference reference)
    {
       HandleStatus status = distributionPolicy.distribute(reference);
 
@@ -1693,17 +1670,17 @@
             ServerMessage msg = ref.getMessage();
 
             if (!scheduledDeliveryHandler.checkAndSchedule(ref, backup))
-            {
+            {               
                messageReferences.addFirst(ref, msg.getPriority());
             }
          }
-
-         deliver();
       }
       finally
       {
          lock.unlock();
       }
+
+      deliverAll();
    }
 
    // Inner classes
@@ -1716,16 +1693,69 @@
          // Must be set to false *before* executing to avoid race
          waitingToDeliver.set(false);
 
-         deliver();
+         // log.info("** calling deliver runner " + backup, new Exception());
+
+         deliverAll();
       }
    }
 
-   final class RefsOperation implements TransactionOperation
+   /*
+    * Attempt to deliver all the messages in the queue
+    */
+   public void deliverAll()
    {
-      List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
+      // direct = false;
 
-      List<MessageReference> refsToAck = new ArrayList<MessageReference>();
+      HandleStatus handled;
 
+      if (replicator != null)
+      {
+         ReplicableCall<HandleStatus> action = new DeliverAction();
+
+         do
+         {
+            replicator.execute(action);
+
+            handled = action.getResult();
+         }
+         while (handled != HandleStatus.BUSY);
+      }
+      else
+      {
+         do
+         {
+            handled = deliverOne();
+         }
+         while (handled != HandleStatus.BUSY);
+      }
+   }
+
+   private class DeliverAction implements ReplicableCall<HandleStatus>
+   {
+      public Packet getPacket()
+      {
+         return new PacketImpl(REPLICATE_QUEUE_DELIVERY);
+      }
+
+      private HandleStatus status;
+
+      public void run()
+      {
+         status = deliverOne();
+      }
+
+      public HandleStatus getResult()
+      {
+         return status;
+      }
+   }
+
+   protected final class RefsOperation implements TransactionOperation
+   {
+      protected final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
+
+      protected final List<MessageReference> refsToAck = new ArrayList<MessageReference>();
+
       synchronized void addRef(final MessageReference ref)
       {
          refsToAdd.add(ref);
@@ -1771,15 +1801,7 @@
 
             QueueImpl queue = entry.getKey();
 
-            queue.lock.lock();
-            try
-            {
-               queue.postRollback(refs);
-            }
-            finally
-            {
-               queue.lock.unlock();
-            }
+            queue.postRollback(refs);
          }
       }
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -51,7 +51,7 @@
 import org.jboss.messaging.core.server.ServerConsumer;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerSession;
-import org.jboss.messaging.core.server.replication.impl.StatefulObjectReadWriteLock;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.utils.TypedProperties;
@@ -168,7 +168,7 @@
 
       this.updateDeliveries = updateDeliveries;
 
-      lock = new StatefulObjectReadWriteLock("consumer " + id, storageManager.generateUniqueID(), 0).writeLock();
+      lock = new ReplicationAwareReadWriteLock("consumer " + id, 0).writeLock();
 
       binding.getQueue().addConsumer(this);
    }
@@ -487,8 +487,11 @@
 
       try
       {
+         //log.info("handling message");
+         
          if ((flowControl && availableCredits <= 0) || !started)
          {
+            log.info("busy");
             return HandleStatus.BUSY;
          }
 
@@ -578,7 +581,7 @@
       {
          availableCredits -= packet.getRequiredBufferSize();
       }
-
+      
       channel.send(packet);
    }
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -132,8 +132,6 @@
 
    private RemotingConnection remotingConnection;
 
-   // private Channel replicatingChannel;
-
    private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
 
    private final Executor executor;
@@ -451,8 +449,6 @@
       {
          Binding binding = postOffice.getBinding(name);
 
-         log.info("binding is " + binding);
-
          if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
          {
             throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST, "Binding " + name + " does not exist");
@@ -479,7 +475,8 @@
                                                 name,
                                                 filter,
                                                 false,
-                                                true);
+                                                true,
+                                                null);
 
             // There's no need for any special locking since the list method is synchronized
             List<MessageReference> refs = ((Queue)binding.getBindable()).list(filter);
@@ -496,8 +493,6 @@
             theQueue = (Queue)binding.getBindable();
          }
 
-         log.info("*********** creating consumer");
-
          ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
                                                           this,
                                                           (QueueBinding)binding,
@@ -560,8 +555,7 @@
    }
 
    public void handleCreateQueue(final CreateQueueMessage packet)
-   {
-      log.info("processing create queue");
+   {      
       SimpleString address = packet.getAddress();
 
       final SimpleString name = packet.getQueueName();
@@ -634,8 +628,6 @@
       channel.confirm(packet);
 
       channel.send(response);
-
-      log.info("processed create queue");
    }
 
    public void handleDeleteQueue(final SessionDeleteQueueMessage packet)
@@ -1561,6 +1553,8 @@
       {
          this.setStarted(false);
       }
+      
+      log.info("Transferring connection");
 
       // backup = false;
 
@@ -1585,6 +1579,7 @@
 
       int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
 
+      log.info("replaying commands");
       channel.replayCommands(lastReceivedCommandID);
 
       if (wasStarted)
@@ -1592,6 +1587,8 @@
          this.setStarted(true);
       }
 
+      log.info("Transferred connection");
+      
       return serverLastReceivedCommandID;
    }
 
@@ -1715,13 +1712,9 @@
    private void doReceiveCredits(final SessionConsumerFlowCreditMessage packet)
    {
       try
-      {
-         log.info("packet consumer id is "+ packet.getConsumerID());
-         
+      { 
          ServerConsumer consumer = consumers.get(packet.getConsumerID());
          
-         log.info("consumer is " + consumer);
-         
          consumer.receiveCredits(packet.getCredits());
       }
       catch (Exception e)

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -80,7 +80,6 @@
 import org.jboss.messaging.core.server.replication.ReplicableAction;
 import org.jboss.messaging.core.server.replication.Replicator;
 import org.jboss.messaging.core.server.replication.impl.JBMThread;
-import org.jboss.messaging.utils.Pair;
 
 /**
  * A ServerSessionPacketHandler
@@ -94,27 +93,31 @@
    private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
 
    private final ServerSession session;
-   
+
    private final Replicator replicator;
-   
+
    private Packet packet;
-   
-   //private boolean backup;
-   
+
    private Configuration config;
+
+   //TODO the sequences and repl response can be encapsulated in a super class
    
-   private List<Pair<Long, Integer>> sequences;
+   private volatile List<Long> sequences;
    
+   private volatile boolean requiresReplicationResponse;
+
    private final Channel channel;
-   
-   public ServerSessionPacketHandler(final ServerSession session, final Replicator replicator, final Configuration config)
+
+   public ServerSessionPacketHandler(final ServerSession session,
+                                     final Replicator replicator,
+                                     final Configuration config)
    {
       this.session = session;
-      
+
       this.replicator = replicator;
-      
+
       this.channel = session.getChannel();
-      
+
       this.config = config;
    }
 
@@ -122,60 +125,59 @@
    {
       return session.getID();
    }
-     
+
    public void run()
    {
       handlePacket();
    }
-   
+
    public Packet getPacket()
-   {      
+   {
       return packet;
    }
-   
+
    public void handlePacket(final Packet packet)
    {
       this.packet = packet;
       
-      log.info(System.identityHashCode(this)+ " Handling packet " + packet.getType() + " on backup " + config.isBackup());
-      
+     // log.info("Got packet " + packet + " at server session packet handler backup is " + config.isBackup());
+
       if (config.isBackup())
-      {         
+      {
          JBMThread thread = JBMThread.currentThread();
-         
+
          thread.setReplay(sequences);
-         
-        // thread.setReplay(true);
-         
-         handlePacket();  
-         
-         //send the response message
-         
-         if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES)
+
+         handlePacket();
+
+         // send the response message
+
+         if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES && this.requiresReplicationResponse)
          {
+            log.info("sending back replication response");
             channel.send(new ReplicationResponseMessage());
-         }                
+         }
+         
+         thread.setNoReplayOrRecord();
       }
       else
-      {         
+      {
          if (replicator != null)
          {
             replicator.execute(this);
          }
          else
          {
-            log.info("replicator is null");
             handlePacket();
          }
       }
    }
-   
-   private void dumpSequences(List<Pair<Long, Integer>> sequences)
+
+   private void dumpSequences(List<Long> sequences)
    {
-      log.info("Sequences size is " + sequences.size());
-      for (Pair<Long, Integer> pair: sequences)
+      for (long sequence : sequences)
       {
-         log.info(pair.a + ": " + pair.b);
+         log.info(sequence);
       }
    }
 
@@ -190,11 +192,13 @@
             case REPLICATE_LOCK_SEQUENCES:
             {
                ReplicateLockSequenceMessage msg = (ReplicateLockSequenceMessage)packet;
+
                sequences = msg.getSequences();
                
-               log.info("Session, set sequences");
-               dumpSequences(sequences);
-               
+               this.requiresReplicationResponse = msg.isRequiresResponse();
+
+               // dumpSequences(sequences);
+
                break;
             }
             case SESS_CREATECONSUMER:
@@ -204,16 +208,15 @@
                break;
             }
             case CREATE_QUEUE:
-            {               
-               log.info("Got create queue message");
-               CreateQueueMessage request = (CreateQueueMessage)packet;               
-               session.handleCreateQueue(request);             
+            {
+               CreateQueueMessage request = (CreateQueueMessage)packet;
+               session.handleCreateQueue(request);
                break;
             }
             case DELETE_QUEUE:
             {
                SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
-               session.handleDeleteQueue(request);             
+               session.handleDeleteQueue(request);
                break;
             }
             case SESS_QUEUEQUERY:
@@ -354,20 +357,20 @@
             case SESS_SEND:
             {
                SessionSendMessage message = (SessionSendMessage)packet;
-               session.handleSend(message);               
-               break;              
+               session.handleSend(message);
+               break;
             }
             case SESS_SEND_LARGE:
             {
                SessionSendLargeMessage message = (SessionSendLargeMessage)packet;
-               session.handleSendLargeMessage(message);     
-               break;              
+               session.handleSendLargeMessage(message);
+               break;
             }
             case SESS_SEND_CONTINUATION:
             {
                SessionSendContinuationMessage message = (SessionSendContinuationMessage)packet;
                session.handleSendContinuations(message);
-               break;               
+               break;
             }
          }
       }

Deleted: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableAction.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableAction.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableAction.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -1,38 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.server.replication;
-
-import org.jboss.messaging.core.remoting.Packet;
-
-/**
- * A ReplicableAction
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public interface ReplicableAction extends Runnable
-{
-   Packet getPacket();
-}

Copied: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableCall.java (from rev 7478, branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableAction.java)
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableCall.java	                        (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableCall.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -0,0 +1,37 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.replication;
+
+
+/**
+ * A ReplicableCall
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public interface ReplicableCall<R> extends ReplicableAction
+{
+   R getResult();
+}

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/Replicator.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/Replicator.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/Replicator.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -39,7 +39,9 @@
    
    void registerWaitingChannel(Channel channel);
    
-   boolean isResponseReceived();
+  // boolean isResponseReceived();
    
    void replicationResponseReceived();
+   
+  // long getReplicateSequence();
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -24,9 +24,9 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.jboss.messaging.core.server.replication.Replicator;
-import org.jboss.messaging.utils.Pair;
 
 /**
  * A JBMThread
@@ -39,12 +39,17 @@
 {
    private static enum ThreadState
    {
-      RECORD, REPLAY, END_RECORD;
+      RECORD, REPLAY, NONE;
    }
    
+//   public long getSequence()
+//   {
+//      return sequence;
+//   }
+//   
    private ThreadState state;
 
-   private List<Pair<Long, Integer>> objectSequences;
+   private List<Long> objectSequences;
 
    private int pos;
    
@@ -75,7 +80,7 @@
       return state == ThreadState.RECORD;
    }
    
-   public void setReplay(final List<Pair<Long, Integer>> objectSequences)
+   public void setReplay(final List<Long> objectSequences)
    {
       this.objectSequences = objectSequences;
       
@@ -84,11 +89,16 @@
       this.pos = 0;
    }
    
+//   private volatile long sequence;
+//   
+//   private AtomicLong seqCounter = new AtomicLong(0);
+   
    public void setRecord(final Replicator replicator)
    {
+      //this.sequence = seqCounter.getAndIncrement();
       if (this.objectSequences == null)
       {
-         this.objectSequences = new ArrayList<Pair<Long, Integer>>();
+         this.objectSequences = new ArrayList<Long>();
       }
       else
       {
@@ -102,22 +112,32 @@
       this.replicator = replicator;
    }
    
-   public void endRecord()
+   public void setNoReplayOrRecord()
    {
-      this.state = ThreadState.END_RECORD;
+      this.state = ThreadState.NONE;
    }
 
-   public Pair<Long, Integer> getNextSequence()
+   public void resumeRecording()
    {
+      this.state = ThreadState.RECORD;
+   }
+   
+   public void resumeReplay()
+   {
+      this.state = ThreadState.REPLAY;
+   }
+   
+   public long getNextSequence()
+   {
       return objectSequences.get(pos++);
    }
 
-   public void addSequence(final Pair<Long, Integer> currentSequence)
+   public void addSequence(final long currentSequence)
    {
       objectSequences.add(currentSequence);
    }
    
-   public List<Pair<Long, Integer>> getSequences()
+   public List<Long> getSequences()
    {
       return objectSequences;
    }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/PriorityLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/PriorityLock.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/PriorityLock.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -41,21 +41,19 @@
 
    private final Queue<QueueEntry> waiting;
 
-   private volatile int currentSequence;
+   private volatile long currentSequence;
 
    private Thread owner;
-   
-   public PriorityLock(final int sequence)
+
+   public PriorityLock(final long sequence)
    {
       waiting = new PriorityQueue<QueueEntry>();
-      
+
       this.currentSequence = sequence;
    }
 
-   public void lock(final int sequence)
+   public void lock(final long sequence)
    {
-      //log.info(this + " trying to get lock on backup " + sequence);
-      
       Thread currentThread = Thread.currentThread();
 
       if (sequence != currentSequence)
@@ -68,15 +66,12 @@
          }
 
          while (sequence != currentSequence)
-         {            
-            log.info("parking lock, expected " + sequence + " current " + currentSequence);
+         {
             LockSupport.park();
          }
       }
 
       owner = currentThread;
-      
-      //log.info(this + " got lock om backup " + sequence, new Exception());
    }
 
    public void unlock()
@@ -93,15 +88,13 @@
       synchronized (waiting)
       {
          entry = waiting.peek();
-         
+
          if (entry != null && entry.thread == owner)
          {
             waiting.poll();
          }
-         
+
          entry = waiting.peek();
-         
-         log.info("size " + waiting.size());
       }
 
       if (entry != null)
@@ -112,11 +105,11 @@
 
    private static final class QueueEntry implements Comparable<QueueEntry>
    {
-      private final int sequence;
+      private final long sequence;
 
       private final Thread thread;
 
-      private QueueEntry(final int sequence, final Thread thread)
+      private QueueEntry(final long sequence, final Thread thread)
       {
          this.sequence = sequence;
 
@@ -130,9 +123,9 @@
 
       public int compareTo(final QueueEntry entry)
       {
-         int i = entry.sequence;
+         long l = entry.sequence;
 
-         return sequence < i ? -1 : (sequence == i ? 0 : 1);
+         return sequence < l ? -1 : (sequence == l ? 0 : 1);
       }
    }
 }

Copied: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java (from rev 7478, branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/StatefulObjectReadWriteLock.java)
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java	                        (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -0,0 +1,258 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.replication.impl;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.ConcurrentHashSet;
+
+/**
+ * A ReplicationAwareReadWriteLock
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class ReplicationAwareReadWriteLock implements ReadWriteLock
+{
+   private static final Logger log = Logger.getLogger(ReplicationAwareReadWriteLock.class);
+
+   private final Lock readLock = new StatefulObjectReadLock();
+
+   private final Lock writeLock = new StatefulObjectWriteLock();
+
+ //  private long objectID;
+
+   private final ReadWriteLock rwLock;
+
+   private final AtomicInteger counter;
+
+   private final PriorityLock sequencedLock;
+
+   private String name;
+
+   public ReplicationAwareReadWriteLock(final String name, final int initialCount)
+   {
+      this.name = name;
+
+    //  this.objectID = objectID;
+
+      rwLock = new ReentrantReadWriteLock();
+
+      sequencedLock = new PriorityLock(initialCount);
+
+      counter = new AtomicInteger(initialCount);
+   }
+
+   public Lock readLock()
+   {
+      return readLock;
+   }
+
+   public Lock writeLock()
+   {
+      return writeLock;
+   }
+   
+   private Map<JBMThread, StackTraceElement[]> stackTraces = new ConcurrentHashMap<JBMThread, StackTraceElement[]>();
+   
+   //debug only
+   private void addOwner(final JBMThread thread)
+   {
+      owners.add(thread);
+      
+      stackTraces.put(thread, thread.getStackTrace());
+   }
+   
+   //debug only
+   private void removeOwner(final JBMThread thread)
+   {
+      owners.remove(thread);
+      
+      stackTraces.remove(thread);
+   }
+
+   // For debug
+   private Set<Thread> owners = new ConcurrentHashSet<Thread>();
+
+   private boolean doLock(long time, TimeUnit unit, boolean read) throws InterruptedException
+   {
+      JBMThread thread = JBMThread.currentThread();
+
+      // debug only
+      if (owners.contains(thread))
+      {
+         Exception e = new Exception();
+         e.setStackTrace(stackTraces.get(thread));
+         log.error("Stateful lock is not re-entrant, first obtained here", e);
+         Exception e2 = new Exception();
+         log.info("Second attempt to obtain here", e2);
+         throw new IllegalStateException("Stateful lock is NOT re-entrant!");
+      }
+
+      if (thread.isReplay())
+      {
+         long sequence = thread.getNextSequence();
+
+         sequencedLock.lock(sequence);
+
+         addOwner(thread);
+
+         return true;
+      }
+      else
+      {
+         boolean ok;
+
+         if (read)
+         {
+            ok = rwLock.readLock().tryLock(time, unit);
+         }
+         else
+         {
+            ok = rwLock.writeLock().tryLock(time, unit);
+         }
+
+         if (ok)
+         {
+            thread.addSequence(counter.getAndIncrement());
+
+            addOwner(thread);
+         }
+
+         return ok;
+      }
+   }
+
+   private void doUnlock(final boolean read)
+   {
+      JBMThread thread = JBMThread.currentThread();
+
+      if (thread.isReplay())
+      {
+         sequencedLock.unlock();
+      }
+      else
+      {
+         if (read)
+         {
+            rwLock.readLock().unlock();
+         }
+         else
+         {
+            rwLock.writeLock().unlock();
+         }
+      }
+
+      removeOwner(thread);
+   }
+
+   private class StatefulObjectReadLock implements Lock
+   {
+      public void lock()
+      {
+         // throw new UnsupportedOperationException();
+         try
+         {
+            doLock(10000, TimeUnit.MILLISECONDS, true);
+         }
+         catch (InterruptedException e)
+         {
+         }
+      }
+
+      public void lockInterruptibly() throws InterruptedException
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      public Condition newCondition()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      public boolean tryLock()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
+      {
+         return doLock(time, unit, true);
+      }
+
+      public void unlock()
+      {
+         doUnlock(true);
+      }
+   }
+
+   private class StatefulObjectWriteLock implements Lock
+   {
+      public void lock()
+      {
+         // throw new UnsupportedOperationException();
+         try
+         {
+            doLock(10000, TimeUnit.MILLISECONDS, false);
+         }
+         catch (InterruptedException e)
+         {
+         }
+      }
+
+      public void lockInterruptibly() throws InterruptedException
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      public Condition newCondition()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      public boolean tryLock()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
+      {
+         return doLock(time, unit, false);
+      }
+
+      public void unlock()
+      {
+         doUnlock(false);
+      }
+   }
+}

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -22,10 +22,11 @@
 
 package org.jboss.messaging.core.server.replication.impl;
 
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Channel;
@@ -33,7 +34,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
 import org.jboss.messaging.core.server.replication.ReplicableAction;
 import org.jboss.messaging.core.server.replication.Replicator;
-import org.jboss.messaging.utils.Pair;
 
 /**
  * A ReplicatorImpl
@@ -46,12 +46,16 @@
 {
    private static final Logger log = Logger.getLogger(ReplicatorImpl.class);
 
-   private Channel replicatingChannel;
+   private final Channel replicatingChannel;
+   
+   private final Queue<Set<Channel>> waitingChannelsQueue = new ConcurrentLinkedQueue<Set<Channel>>();
 
-   private Set<Channel> waitingChannels = new HashSet<Channel>();
+   private Set<Channel> currentChannels;
 
-   private boolean responseReceived;
-
+  // private long responseSequence;
+   
+  // private long replicateSequence;
+      
    public ReplicatorImpl(final Channel replicatingChannel)
    {
       this.replicatingChannel = replicatingChannel;
@@ -59,69 +63,61 @@
 
    public void registerWaitingChannel(final Channel channel)
    {
-      this.waitingChannels.add(channel);
+      currentChannels.add(channel);
    }
 
-   public synchronized void replicationResponseReceived()
+   public void replicationResponseReceived()
    {
-      log.info("** got replication response in replicator");
+      //long sequence = responseSequence++; 
       
+      Set<Channel> waitingChannels = waitingChannelsQueue.remove();
+            
       for (Channel channel : waitingChannels)
-      {
+      {        
          channel.replicationResponseReceived(this);
       }
-
-      responseReceived = true;
    }
 
-   public synchronized boolean isResponseReceived()
-   {
-      return responseReceived;
-   }
-
    public void execute(final ReplicableAction action)
    {
       // First we execute the action
-      
-      log.info("Running action locally");
-      
+       
       JBMThread thread = JBMThread.currentThread();
       
-      //List<Pair<Long, Integer>> sequences = new ArrayList<Pair<Long, Integer>>(); 
+      this.currentChannels = new HashSet<Channel>();
       
       thread.setRecord(this);
-
+      
       action.run();
       
-      thread.endRecord();
+      thread.setNoReplayOrRecord();
       
-      log.info("Ran action locally");
+      List<Long> sequences = JBMThread.currentThread().getSequences();
       
-      List<Pair<Long, Integer>> sequences = JBMThread.currentThread().getSequences();
-      
-      dumpSequences(sequences);
+     // dumpSequences(sequences);
 
       // We then send the sequences to the backup
-
-      log.info("Replicated sequences");
       
-      Packet packet = new ReplicateLockSequenceMessage(sequences);
+      if (!currentChannels.isEmpty())
+      {
+         waitingChannelsQueue.add(currentChannels);
+      }
 
+      Packet packet = new ReplicateLockSequenceMessage(sequences, !currentChannels.isEmpty());
+
       replicatingChannel.send(packet);
 
       // Next we replicate the actual action
            
-      replicatingChannel.send(action.getPacket());
-      
-      log.info("replicated packet " + action.getPacket().getType());
+      replicatingChannel.send(action.getPacket());     
    }
    
-   private void dumpSequences(List<Pair<Long, Integer>> sequences)
+   private void dumpSequences(List<Long> sequences)
    {
       log.info("Sequences size is " + sequences.size());
-      for (Pair<Long, Integer> pair: sequences)
+      for (long sequence: sequences)
       {
-         log.info(pair.a + ": " + pair.b);
+         log.info(sequence);
       }
    }
    

Deleted: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/StatefulObjectReadWriteLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/StatefulObjectReadWriteLock.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/StatefulObjectReadWriteLock.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -1,243 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.server.replication.impl;
-
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.utils.ConcurrentHashSet;
-import org.jboss.messaging.utils.Pair;
-
-/**
- * A StatefulObjectReadWriteLock
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public class StatefulObjectReadWriteLock implements ReadWriteLock
-{
-   private static final Logger log = Logger.getLogger(StatefulObjectReadWriteLock.class);
-
-   private final Lock readLock = new StatefulObjectReadLock();
-
-   private final Lock writeLock = new StatefulObjectWriteLock();
-   
-   private long objectID;
-   
-   private final ReadWriteLock rwLock;
-   
-   private final AtomicInteger counter;
-   
-   private final PriorityLock sequencedLock;
-   
-   private String name;
-
-   public StatefulObjectReadWriteLock(final String name, final long objectID, final int initialCount)
-   {
-      this.name = name;
-      
-      this.objectID = objectID;
-      
-      rwLock = new ReentrantReadWriteLock();
-      
-      sequencedLock = new PriorityLock(initialCount);
-      
-      counter = new AtomicInteger(initialCount);
-   }
-
-   public Lock readLock()
-   {
-      return readLock;
-   }
-
-   public Lock writeLock()
-   {
-      return writeLock;
-   }
-   
-   //For debug
-   private Set<Thread> owners = new ConcurrentHashSet<Thread>();
-   
-   private boolean doLock(long time, TimeUnit unit, boolean read) throws InterruptedException
-   {
-      JBMThread thread = JBMThread.currentThread();
-      
-      //debug only
-      if (owners.contains(thread))
-      {
-         throw new IllegalStateException("Stateful lock is NOT re-entrant!");
-      }
-      
-      if (thread.isReplay())
-      {
-         Pair<Long, Integer> sequence = thread.getNextSequence();
-         
-         log.info(name + " Attempting to get lock on backup " + sequence.b, new Exception());
-         
-         if (sequence.a != objectID)
-         {
-            throw new IllegalStateException("Invalid object id " + sequence.a + " expected " + objectID);
-         }
-         
-         sequencedLock.lock(sequence.b);
-         
-         owners.add(thread);
-         
-         return true;
-      }
-      else
-      {
-         boolean ok;
-         
-         if (read)
-         {
-            ok = rwLock.readLock().tryLock(time, unit);
-         }
-         else
-         {
-            ok = rwLock.writeLock().tryLock(time, unit);
-         }
-                  
-         if (ok)
-         {
-            log.info(name + " added sequence on live " + counter.get(), new Exception());
-            
-            thread.addSequence(new Pair<Long, Integer>(objectID, counter.getAndIncrement()));
-            
-            owners.add(thread);
-         }
-         
-         return ok;
-      }
-   }
-   
-   private void doUnlock(final boolean read)
-   {
-      JBMThread thread = JBMThread.currentThread();
-      
-      if (thread.isReplay())
-      {
-         sequencedLock.unlock();
-      }
-      else
-      {         
-         if (read)
-         {
-            rwLock.readLock().unlock();
-         }
-         else
-         {
-            rwLock.writeLock().unlock();
-         }
-      }  
-      
-      owners.remove(thread);
-   }
-
-   private class StatefulObjectReadLock implements Lock
-   {
-      public void lock()
-      {
-         //throw new UnsupportedOperationException();
-         try
-         {
-            doLock(10000, TimeUnit.MILLISECONDS, true);
-         }
-         catch (InterruptedException e)
-         {
-         }
-      }
-
-      public void lockInterruptibly() throws InterruptedException
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      public Condition newCondition()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      public boolean tryLock()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
-      {
-         return doLock(time, unit, true);
-      }
-
-      public void unlock()
-      {
-         doUnlock(true);
-      }
-   }
-   
-   private class StatefulObjectWriteLock implements Lock
-   {
-      public void lock()
-      {
-         //throw new UnsupportedOperationException();
-         try
-         {
-            doLock(10000, TimeUnit.MILLISECONDS, false);
-         }
-         catch (InterruptedException e)
-         {
-         }
-      }
-
-      public void lockInterruptibly() throws InterruptedException
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      public Condition newCondition()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      public boolean tryLock()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
-      {
-         return doLock(time, unit, false);
-      }
-
-      public void unlock()
-      {
-         doUnlock(false);
-      }
-   }
-}

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -83,67 +83,142 @@
 
    // Public --------------------------------------------------------
 
-   public void testReplication() throws Exception
+   public void testReplication1() throws Exception
    {
-      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-
-      sf.setProducerWindowSize(32 * 1024);
-
-      log.info("creating session");
-      
-      ClientSession session = sf.createSession(false, true, true);
-      
-      log.info("created session");
-
-      session.createQueue(ADDRESS, ADDRESS, null, false);
-      
-      log.info("created queue");
-
-      ClientProducer producer = session.createProducer(ADDRESS);
-      
-      log.info("created producer");
-
-      final int numMessages = 1;
-
-      for (int i = 0; i < numMessages; i++)
+      for (int j = 0; j < 5000; j++)
       {
-         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
-                                                             false,
-                                                             0,
-                                                             System.currentTimeMillis(),
-                                                             (byte)1);
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().writeString("aardvarks");
-         producer.send(message);
+         log.info("Iteration " + j);
+         
+         ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+   
+         sf.setProducerWindowSize(32 * 1024);
+   
+         ClientSession session = sf.createSession(false, true, true);
+         
+         session.createQueue(ADDRESS, ADDRESS, null, false);
+         
+         ClientProducer producer = session.createProducer(ADDRESS);
+         
+         final int numMessages = 100;
+         
+         long start = System.currentTimeMillis();
+   
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+                                                                false,
+                                                                0,
+                                                                System.currentTimeMillis(),
+                                                                (byte)1);
+            message.putIntProperty(new SimpleString("count"), i);
+            message.getBody().writeString("aardvarks");
+            producer.send(message);
+         }
+         
+         //Thread.sleep(500);
+         
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+         
+         log.info("sent messages");
+         
+         session.start();
+         
+         log.info("Started session");
+         
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message2 = consumer.receive();
+   
+            assertEquals("aardvarks", message2.getBody().readString());
+            assertEquals(i, message2.getProperty(new SimpleString("count")));
+            
+            message2.acknowledge();
+         }
+         
+         long end = System.currentTimeMillis();
+         
+         log.info("That took " + (end - start));
+         
+         ClientMessage message3 = consumer.receive(250);
+   
+         assertNull(message3);
+   
+         session.close();
+         
+         tearDown();
+         
+         setUp();
       }
-      
-     // Thread.sleep(30000);
-
-      ClientConsumer consumer = session.createConsumer(ADDRESS);
-      
-      log.info("created consumer");
-
-      session.start();
-      
-      log.info("started session");
-
-      for (int i = 0; i < numMessages; i++)
+   }
+   
+   public void testReplication2() throws Exception
+   {
+      for (int j = 0; j < 5000; j++)
       {
-         ClientMessage message2 = consumer.receive();
-
-         assertEquals("aardvarks", message2.getBody().readString());
-         assertEquals(i, message2.getProperty(new SimpleString("count")));
-
-         message2.acknowledge();
+         log.info("Iteration " + j);
+         
+         ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+   
+         sf.setProducerWindowSize(32 * 1024);
+   
+         ClientSession session = sf.createSession(false, true, true);
+         
+         session.createQueue(ADDRESS, ADDRESS, null, false);
+         
+         ClientProducer producer = session.createProducer(ADDRESS);
+         
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+         
+         log.info("sent messages");
+         
+         session.start();
+         
+         //Thread.sleep(500);
+         
+         final int numMessages = 1000;
+         
+         long start = System.currentTimeMillis();
+   
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+                                                                false,
+                                                                0,
+                                                                System.currentTimeMillis(),
+                                                                (byte)1);
+            message.putIntProperty(new SimpleString("count"), i);
+            message.getBody().writeString("aardvarks");
+            producer.send(message);
+         }
+         
+         //Thread.sleep(500);
+                          
+         log.info("Started session");
+         
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message2 = consumer.receive();
+   
+            assertEquals("aardvarks", message2.getBody().readString());
+            assertEquals(i, message2.getProperty(new SimpleString("count")));
+            
+            message2.acknowledge();
+         }
+         
+         long end = System.currentTimeMillis();
+         
+         log.info("That took " + (end - start));
+         
+         ClientMessage message3 = consumer.receive(250);
+   
+         assertNull(message3);
+   
+         session.close();
+         
+         tearDown();
+         
+         setUp();
       }
-      
-      log.info("consumed messages");
-
-      ClientMessage message3 = consumer.receive(250);
-
-      assertNull(message3);
-
-      session.close();
    }
 
    public void testFailoverSameConnectionFactory() throws Exception
@@ -173,7 +248,7 @@
          message.getBody().writeString("aardvarks");
          producer.send(message);
       }
-
+      
       RemotingConnection conn1 = ((ClientSessionImpl)session).getConnection();
 
       // Simulate failure on connection

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java	2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java	2009-07-02 11:26:07 UTC (rev 7513)
@@ -954,7 +954,7 @@
       /* (non-Javadoc)
        * @see org.jboss.messaging.core.postoffice.Binding#getID()
        */
-      public int getID()
+      public long getID()
       {
 
          return 0;




More information about the jboss-cvs-commits mailing list