[jboss-cvs] JBoss Messaging SVN: r7596 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/management/impl and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jul 20 18:34:58 EDT 2009


Author: timfox
Date: 2009-07-20 18:34:58 -0400 (Mon, 20 Jul 2009)
New Revision: 7596

Modified:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
Log:
MT replication

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -277,7 +277,9 @@
    {
       checkClosed();
 
+     // log.info("sending delete queue");
       channel.sendBlocking(new SessionDeleteQueueMessage(queueName));
+     // log.info("sent delete queue");
    }
 
    public void deleteQueue(final String queueName) throws MessagingException
@@ -676,11 +678,11 @@
 
          closedSent = true;
 
-        // log.info(System.identityHashCode(this) + " session sending close message");
-         
+         // log.info(System.identityHashCode(this) + " session sending close message");
+
          channel.sendBlocking(new SessionCloseMessage());
-         
-         //log.info(System.identityHashCode(this) + " session sent close message");
+
+         // log.info(System.identityHashCode(this) + " session sent close message");
       }
       catch (Throwable ignore)
       {
@@ -725,23 +727,58 @@
 
       try
       {
-       //  log.info(System.identityHashCode(this) + " session handling failover");
-         
-         //Prevent any more packets being handled on the old connection
-         channel.getConnection().freeze();
-         
-         while (channel.getConnection().getExecutingThread() != null)
+         // log.info(System.identityHashCode(this) + " session handling failover");
+
+          channel.getConnection().freeze();
+          
+          while (true)
+          {
+             // Set<Thread> executingThreads = channel.getConnection().getExecutingThreads();
+
+             Thread thread = channel.getConnection().getExecutingThread();
+
+             if (thread == null)
+             {
+                break;
+             }
+
+             try
+             {
+                Thread.sleep(1);
+             }
+             catch (InterruptedException ignore)
+             {
+             }
+          }
+          
+         // Prevent any more packets being handled on the old connection
+         channel.setFrozen(true);
+
+         while (true)
          {
+            // Set<Thread> executingThreads = channel.getConnection().getExecutingThreads();
+
+            Thread thread = channel.getExecutingThread();
+
+            if (thread == null)
+            {
+               break;
+            }
+
             try
             {
                Thread.sleep(1);
             }
             catch (InterruptedException ignore)
-            {               
+            {
             }
          }
+
+         channel.waitForAllExecutions();
+
+         channel.transferConnection(backupConnection);
          
-         channel.transferConnection(backupConnection);
+     //    log.info("unfreezing");
 
          backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
 
@@ -749,28 +786,31 @@
 
          int lid = channel.getLastConfirmedCommandID();
          
-       //  log.info(System.identityHashCode(this) + " last received command id on client side is " + lid);
+         channel.setFrozen(false);
          
+         // log.info(System.identityHashCode(this) + " last received command id on client side is " + lid);
+
          Packet request = new ReattachSessionMessage(name, lid);
 
-         Channel channel1 = backupConnection.getChannel(1, -1, false);
+         Channel channel1 = backupConnection.getChannel(1, -1, false, false);
 
          ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
-         
-        // log.info(System.identityHashCode(this) + " got response from reattach session");
-         
+
+         // log.info(System.identityHashCode(this) + " got response from reattach session");
+
          if (!response.isRemoved())
          {
-            //log.info(System.identityHashCode(this) + " found session, server last received command id is " + response.getLastConfirmedCommandID());
-            
+            // log.info(System.identityHashCode(this) + " found session, server last received command id is " +
+            // response.getLastConfirmedCommandID());
+
             channel.replayCommands(response.getLastConfirmedCommandID());
 
             ok = true;
          }
          else
          {
-           // log.info(System.identityHashCode(this) + " didn't find session, closed sent " + closedSent);
-            
+            // log.info(System.identityHashCode(this) + " didn't find session, closed sent " + closedSent);
+
             if (closedSent)
             {
                // a session re-attach may fail, if the session close was sent before failover started, hit the server,

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -112,14 +112,5 @@
       }
       
       channel.confirm(packet);
-      
-      if (packet.getType() == SESS_RECEIVE_MSG)
-      {
-         SessionReceiveMessage message = (SessionReceiveMessage) packet;
-         
-         int cnt = (Integer)message.getClientMessage().getProperty(new SimpleString("count"));
-         
-        // log.info("confirmed on client " + cnt);
-      }
    }
 }
\ No newline at end of file

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -299,7 +299,7 @@
                                                   "Unable to connect to server using configuration " + connectorConfig);
                   }
 
-                  channel1 = connection.getChannel(1, -1, false);
+                  channel1 = connection.getChannel(1, -1, false, false);
 
                   // Lock it - this must be done while the failoverLock is held
                   channel1.getLock().lock();
@@ -346,7 +346,8 @@
 
                   Channel sessionChannel = connection.getChannel(sessionChannelID,
                                                                  producerWindowSize,
-                                                                 producerWindowSize != -1);
+                                                                 producerWindowSize != -1,
+                                                                 false);
 
                   ClientSessionInternal session = new ClientSessionImpl(this,
                                                                         name,
@@ -1039,7 +1040,7 @@
 
          Ping ping = new Ping(clientFailureCheckPeriod, connectionTTL);
 
-         Channel channel0 = conn.getChannel(0, -1, false);
+         Channel channel0 = conn.getChannel(0, -1, false, false);
 
          channel0.send(ping);
 
@@ -1133,7 +1134,7 @@
    {
       for (ConnectionEntry entry : connections.values())
       {
-         Channel channel1 = entry.connection.getChannel(1, -1, false);
+         Channel channel1 = entry.connection.getChannel(1, -1, false, false);
 
          channel1.getLock().lock();
       }
@@ -1143,7 +1144,7 @@
    {
       for (ConnectionEntry entry : connections.values())
       {
-         Channel channel1 = entry.connection.getChannel(1, -1, false);
+         Channel channel1 = entry.connection.getChannel(1, -1, false, false);
 
          channel1.getLock().unlock();
       }
@@ -1153,7 +1154,7 @@
    {
       for (ConnectionEntry entry : connections.values())
       {
-         Channel channel1 = entry.connection.getChannel(1, -1, false);
+         Channel channel1 = entry.connection.getChannel(1, -1, false, false);
 
          channel1.returnBlocking();
       }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -116,8 +116,6 @@
 
    private MessagingServerControlImpl messagingServerControl;
 
-  // private MessagingServer messagingServer;
-
    private final MessageCounterManager messageCounterManager;
 
    private final SimpleString managementNotificationAddress;
@@ -136,8 +134,6 @@
 
    private boolean notificationsEnabled;
 
- //  private final Set<NotificationListener> listeners = new org.jboss.messaging.utils.ConcurrentHashSet<NotificationListener>();
-
    private ReplicationOperationInvoker replicationInvoker;
 
    private ClusterQueueStateManager clusterQueueStateManager;
@@ -210,7 +206,6 @@
       this.securityRepository = securityRepository;
       this.storageManager = storageManager;
       this.clusterQueueStateManager = clusterQueueStateManager;
-    //  this.messagingServer = messagingServer;
 
       JBMSecurityManager sm = messagingServer.getSecurityManager();
       if (sm != null)
@@ -687,9 +682,6 @@
                {
                   continue;
                }
-               // System.out.format("param=%s, expecting=%s\n", params[i].getClass(), paramTypes[i]);
-               // System.out.println(!paramTypes[i].isAssignableFrom(params[i].getClass()));
-               // System.out.println(paramTypes[i] == Long.TYPE && params[i].getClass() == Integer.class);
                if (paramTypes[i].isAssignableFrom(params[i].getClass()) || (paramTypes[i] == Long.TYPE && params[i].getClass() == Integer.class) ||
                    (paramTypes[i] == Double.TYPE && params[i].getClass() == Integer.class) ||
                    (paramTypes[i] == Long.TYPE && params[i].getClass() == Long.class) ||
@@ -718,22 +710,7 @@
       {
          throw new IllegalArgumentException("no operation " + operation + "/" + params.length);
       }
-      // System.out.println(method.getName());
-      // for (Class<?> parameters : method.getParameterTypes())
-      // {
-      // System.out.println(parameters);
-      // }
-      // System.out.println("===");
-      // for (Object object : params)
-      // {
-      // if (object == null)
-      // {
-      // System.out.println("null");
-      // } else
-      // {
-      // System.out.println(object.getClass());
-      // }
-      // }
+
       Object result = method.invoke(resource, params);
 
       return result;

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -73,9 +73,5 @@
 
    PagingManager getPagingManager();
 
-   DuplicateIDCache getDuplicateIDCache(SimpleString address);
-   
-   //void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception;
-   
-  // Object getNotificationLock();     
+   DuplicateIDCache getDuplicateIDCache(SimpleString address);  
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -68,4 +68,13 @@
    //void waitForAllReplicationResponse();
    
    //void replicationResponseReceived(Replicator replicator, int count);
+   
+   Thread getExecutingThread();
+   
+   //void freeze();
+   
+   void setFrozen(boolean frozen);
+   
+   void waitForAllExecutions();
+   
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -12,13 +12,14 @@
 
 package org.jboss.messaging.core.remoting;
 
+import java.util.List;
+import java.util.Set;
+
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.spi.BufferHandler;
 import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
-import java.util.List;
-
 /**
  * A RemotingConnection
  * 
@@ -31,7 +32,7 @@
 
    String getRemoteAddress();
 
-   Channel getChannel(long channelID, int windowSize, boolean block);
+   Channel getChannel(long channelID, int windowSize, boolean block, boolean async);
    
    void putChannel(long channelID, Channel channel);
    
@@ -80,4 +81,12 @@
    RemotingConnection getReplicatingConnection();
    
    Thread getExecutingThread();
+   
+   List<Interceptor> getInterceptors();
+   
+   void setFrozenAllChannels(boolean frozen);
+   
+   void waitForAllExecutions();
+   
+   Set<Thread> getExecutingThreads();
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -25,7 +25,9 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
 
+import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
@@ -37,15 +39,14 @@
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.CommandConfirmationHandler;
+import org.jboss.messaging.core.remoting.Interceptor;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.utils.SimpleString;
+import org.jboss.messaging.utils.Future;
 
 /**
  * A ChannelImpl
@@ -92,11 +93,17 @@
 
    private final Semaphore sendSemaphore;
 
+   private final Executor executor;
+
    private int receivedBytes;
 
    private CommandConfirmationHandler commandConfirmationHandler;
 
-   public ChannelImpl(final RemotingConnection connection, final long id, final int windowSize, final boolean block)
+   public ChannelImpl(final RemotingConnection connection,
+                      final long id,
+                      final int windowSize,
+                      final boolean block,
+                      final Executor executor)
    {
       this.connection = connection;
 
@@ -125,6 +132,7 @@
 
          sendSemaphore = null;
       }
+      this.executor = executor;
    }
 
    public long getID()
@@ -288,10 +296,6 @@
 
             if (resendCache != null && packet.isRequiresConfirmations())
             {
-//               if (packet.getType() == PacketImpl.SESS_CLOSE)
-//               {
-//                  log.info(System.identityHashCode(this) + " added session close to resend cache");                  
-//               }
                resendCache.add(packet);
             }
 
@@ -402,15 +406,15 @@
 
    public void replayCommands(final int otherLastConfirmedCommandID)
    {
-     // log.info(connection.isClient() + " replaying, other last command id " + otherLastConfirmedCommandID);
-      
+      // log.info(connection.isClient() + " replaying, other last command id " + otherLastConfirmedCommandID);
+
       if (otherLastConfirmedCommandID != -1)
       {
          clearUpTo(otherLastConfirmedCommandID);
       }
 
-     // log.info("Resend cache size is " + resendCache.size());
-      
+      // log.info("Resend cache size is " + resendCache.size());
+
       for (final Packet packet : resendCache)
       {
          doWrite(packet);
@@ -445,30 +449,28 @@
    public void flushConfirmations()
    {
       int lcid = this.lastConfirmedCommandID;
-      
+
       if (receivedBytes != 0 && connection.isActive() && lcid != -1)
       {
          receivedBytes = 0;
 
-        // log.info("Sending packets confirmed from flush");
-         
-         sendConfirmation(lcid);     
+         // log.info("Sending packets confirmed from flush");
+
+         sendConfirmation(lcid);
       }
    }
-   
+
    private void sendConfirmation(final int lastConfirmedID)
    {
       final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedID);
 
       confirmed.setChannelID(id);
 
-      //We need to queue packet confirmations too
+      // We need to queue packet confirmations too
       if (!queuedWriteManager.tryQueue(confirmed))
       {
-      //   log.info(connection.isClient() + " writing packets confirmed " + lastConfirmedID + " " + System.identityHashCode(this));
-         
-         doWrite(confirmed);   
-      }                    
+         doWrite(confirmed);
+      }
    }
 
    public void confirm(final Packet packet)
@@ -476,9 +478,9 @@
       if (resendCache != null && packet.isRequiresConfirmations())
       {
          lastConfirmedCommandID++;
-         
-        // log.info("sending confirm from confirm");
 
+         // log.info("sending confirm from confirm");
+
          receivedBytes += packet.getPacketSize();
 
          if (receivedBytes >= confWindowSize)
@@ -487,7 +489,7 @@
 
             if (connection.isActive())
             {
-               sendConfirmation(lastConfirmedCommandID);           
+               sendConfirmation(lastConfirmedCommandID);
             }
          }
       }
@@ -495,46 +497,183 @@
 
    public void handlePacket(final Packet packet)
    {
-      if (packet.getType() == PACKETS_CONFIRMED)
+      if (executor == null)
       {
-         if (resendCache != null)
+         this.doHandlePacket(packet);
+      }
+      else
+      {
+         executor.execute(new Runnable()
          {
-            final PacketsConfirmedMessage msg = (PacketsConfirmedMessage)packet;
+            public void run()
+            {
+               try
+               {
+                  doHandlePacket(packet);
+               }
+               catch (Throwable t)
+               {
+                  log.error("Failed to handle packet", t);
+               }
+            }
+         });
+      }
+   }
 
-            clearUpTo(msg.getCommandID());
-         }
+   private volatile boolean frozen;
 
-         if (!connection.isClient())
+   private volatile Thread currentThread;
+   
+   public void setFrozen(final boolean f)
+   {
+      if (f)
+      {
+         this.frozen = f;
+      }
+      else
+      {
+         if (this.executor == null)
          {
-            handler.handlePacket(packet);
+            this.frozen = false;
          }
-
-         return;
+         else
+         {
+            executor.execute(new Runnable()
+            {
+               public void run()
+               {
+                  frozen = false;
+               }
+            });
+         }
       }
-      else
+   }
+   
+   public Thread getExecutingThread()
+   {
+      return currentThread;
+   }
+   
+   public void waitForAllExecutions()
+   {
+      if (executor != null)
       {
-         if (packet.isResponse())
+         Future f = new Future();
+         
+         executor.execute(f);
+         
+         boolean ok = f.await(5000);
+         
+         if (!ok)
          {
-            response = packet;
+            throw new IllegalStateException("Timedout out waiting for channel executions to complete");
+         }
+      }
+   }
+   
+   /*
+    * Thread sat on A) (below)
+    * rc set frozen
+    * all channels set frozen
+    * channel waited for
+    * channel unfrozen - thread executes!!!
+    * 
+    * 
+    * 
+    * set rc frozen and wait for all rc threads to exit
+    * freeze all channels
+    * wait for all channels to exit
+    * add an executor to unfreeze channels
+    * 
+    * there's another race - if a thread is on A
+    * then set rc frozen
+    * freeze all channels
+    * 
+    */
 
-            confirm(packet);
+   private void doHandlePacket(final Packet packet)
+   {
+      //A
+      currentThread = Thread.currentThread();
 
-            lock.lock();
+      try
+      {
+         if (!frozen)
+         {
+            List<Interceptor> interceptors = connection.getInterceptors();
 
-            try
+            if (interceptors != null)
             {
-               sendCondition.signal();
+               for (final Interceptor interceptor : interceptors)
+               {
+                  try
+                  {
+                     boolean callNext = interceptor.intercept(packet, connection);
+
+                     if (!callNext)
+                     {
+                        // abort
+
+                        return;
+                     }
+                  }
+                  catch (final Throwable e)
+                  {
+                     log.warn("Failure in calling interceptor: " + interceptor, e);
+                  }
+               }
             }
-            finally
+
+            if (packet.getType() == PACKETS_CONFIRMED)
             {
-               lock.unlock();
+               if (resendCache != null)
+               {
+                  final PacketsConfirmedMessage msg = (PacketsConfirmedMessage)packet;
+
+                  clearUpTo(msg.getCommandID());
+               }
+
+               if (!connection.isClient())
+               {
+                  handler.handlePacket(packet);
+               }
+
+               return;
             }
+            else
+            {
+               if (packet.isResponse())
+               {
+                  response = packet;
+
+                  confirm(packet);
+
+                  lock.lock();
+
+                  try
+                  {
+                     sendCondition.signal();
+                  }
+                  finally
+                  {
+                     lock.unlock();
+                  }
+               }
+               else if (handler != null)
+               {
+                  handler.handlePacket(packet);
+               }
+            }
          }
-         else if (handler != null)
-         {
-            handler.handlePacket(packet);
-         }
+//         else
+//         {
+//            log.info("It's frozen");
+//         }
       }
+      finally
+      {
+         currentThread = null;
+      }
    }
 
    private void doWrite(final Packet packet)
@@ -548,16 +687,18 @@
 
    private void clearUpTo(final int lastConfirmedCommandID)
    {
-      //log.info(System.identityHashCode(this) + " clear up to " + lastConfirmedCommandID + " on client " + connection.isClient() + " fscid " + this.firstStoredCommandID);
+      //log.info(System.identityHashCode(this) + " clearupto " + lastConfirmedCommandID +  " first stored " + firstStoredCommandID);
       
       if (lastConfirmedCommandID < firstStoredCommandID)
       {
-         //This can legitimately happen, if the flushConfirmations() is called from the other side which causes a packet confirmation to be sent, after that
-         //another packet confirmation can come or on failover when the lastConfirmedCommandID is retrieved from the other side there may be overlap
-         //because of the previously flush. In this case we can safely ignore it.
+         // This can legitimately happen, if the flushConfirmations() is called from the other side which causes a
+         // packet confirmation to be sent, after that
+         // another packet confirmation can come or on failover when the lastConfirmedCommandID is retrieved from the
+         // other side there may be overlap
+         // because of the previously flush. In this case we can safely ignore it.
          return;
       }
-      
+
       final int numberToClear = 1 + lastConfirmedCommandID - firstStoredCommandID;
 
       int sizeToFree = 0;
@@ -569,13 +710,11 @@
          if (packet == null)
          {
             throw new IllegalStateException(System.identityHashCode(this) + " Can't find packet to clear: " +
-                                            " last received command id " +
+                                            " last confirmed command id " +
                                             lastConfirmedCommandID +
                                             " first stored command id " +
                                             firstStoredCommandID);
          }
-         
-        // log.info("cleared packet " + packet);
 
          if (packet.getType() != PACKETS_CONFIRMED)
          {

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -71,7 +71,7 @@
       
       this.connectionFailedAction = connectionFailedAction;
       
-      this.channel0 = conn.getChannel(0, -1, false); 
+      this.channel0 = conn.getChannel(0, -1, false, false); 
       
       this.lastPingReceived = lastPingReceived;
       

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -13,10 +13,13 @@
 package org.jboss.messaging.core.remoting.impl;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
@@ -28,6 +31,8 @@
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.ExecutorFactory;
+import org.jboss.messaging.utils.OrderedExecutorFactory;
 import org.jboss.messaging.utils.SimpleIDGenerator;
 
 /**
@@ -73,12 +78,14 @@
 
    private boolean idGeneratorSynced = false;
 
-   private volatile boolean frozen;
+   // private volatile boolean frozen;
 
    private final Object failLock = new Object();
 
    private final PacketDecoder decoder = new PacketDecoder();
 
+   private final ExecutorFactory orderedFactory;
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -89,7 +96,7 @@
                                  final long blockingCallTimeout,
                                  final List<Interceptor> interceptors)
    {
-      this(transportConnection, null, blockingCallTimeout, interceptors, true, true);
+      this(transportConnection, null, blockingCallTimeout, interceptors, true, true, null);
    }
 
    /*
@@ -98,10 +105,11 @@
    public RemotingConnectionImpl(final Connection transportConnection,
                                  final RemotingConnection replicatingConnection,
                                  final List<Interceptor> interceptors,
-                                 final boolean active)
+                                 final boolean active,
+                                 final Executor threadPool)
 
    {
-      this(transportConnection, replicatingConnection, -1, interceptors, active, false);
+      this(transportConnection, replicatingConnection, -1, interceptors, active, false, threadPool);
    }
 
    private RemotingConnectionImpl(final Connection transportConnection,
@@ -109,7 +117,8 @@
                                   final long blockingCallTimeout,
                                   final List<Interceptor> interceptors,
                                   final boolean active,
-                                  final boolean client)
+                                  final boolean client,
+                                  final Executor threadPool)
 
    {
       this.transportConnection = transportConnection;
@@ -122,12 +131,26 @@
 
       this.active = active;
 
-      this.client = client;      
+      this.client = client;
+
+      if (threadPool != null)
+      {
+         this.orderedFactory = new OrderedExecutorFactory(threadPool);
+      }
+      else
+      {
+         this.orderedFactory = null;
+      }
    }
 
    // RemotingConnection implementation
    // ------------------------------------------------------------
 
+   public List<Interceptor> getInterceptors()
+   {
+      return interceptors;
+   }
+
    public Connection getTransportConnection()
    {
       return this.transportConnection;
@@ -155,13 +178,16 @@
       return transportConnection.getRemoteAddress();
    }
 
-   public synchronized Channel getChannel(final long channelID, final int windowSize, final boolean block)
+   public synchronized Channel getChannel(final long channelID,
+                                          final int windowSize,
+                                          final boolean block,
+                                          final boolean async)
    {
       Channel channel = channels.get(channelID);
 
       if (channel == null)
       {
-         channel = new ChannelImpl(this, channelID, windowSize, block);
+         channel = new ChannelImpl(this, channelID, windowSize, block, async ? this.orderedFactory.getExecutor() : null);
 
          channels.put(channelID, channel);
       }
@@ -319,51 +345,48 @@
    // Buffer Handler implementation
    // ----------------------------------------------------
 
-   private volatile Thread currentThread;
-   
+   // private volatile Thread currentThread;
+
    public void activate()
    {
       active = true;
    }
 
+   private volatile Thread currentThread;
+
+   private volatile boolean frozen;
+   
+
    public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
    {
       final Packet packet = decoder.decode(buffer);
 
+      // if (packet.getType() == PacketImpl.REPLICATE_LOCK_SEQUENCES)
+      // {
+      // ReplicateLockSequenceMessage msg = (ReplicateLockSequenceMessage)packet;
+      //         
+      // log.info("received sequences " + msg.getID());
+      // }
+
       currentThread = Thread.currentThread();
 
       try
       {
          if (!frozen)
          {
-            if (interceptors != null)
-            {
-               for (final Interceptor interceptor : interceptors)
-               {
-                  try
-                  {
-                     boolean callNext = interceptor.intercept(packet, this);
+            Channel channel = channels.get(packet.getChannelID());
 
-                     if (!callNext)
-                     {
-                        // abort
-
-                        return;
-                     }
-                  }
-                  catch (final Throwable e)
-                  {
-                     log.warn("Failure in calling interceptor: " + interceptor, e);
-                  }
-               }
-            }
-
-            final Channel channel = channels.get(packet.getChannelID());
-
+            //A
+            
+            
             if (channel != null)
             {
                channel.handlePacket(packet);
             }
+            // else
+            // {
+            // log.info("channel is null");
+            // }
          }
       }
       finally
@@ -372,16 +395,61 @@
       }
    }
    
+   /*
+    * Thread sitting on A)
+    * 
+    * connection is frozen
+    * channels are frozen
+    * wait for all threads
+    * unfreeze channels
+    * ** channel processes invocation (by thread sat on A)
+    * 
+    */
+
    public void freeze()
    {
-      frozen = true;          
+      frozen = true;
    }
-   
+
    public Thread getExecutingThread()
    {
       return currentThread;
    }
-   
+
+   public void setFrozenAllChannels(final boolean frozen)
+   {
+      for (Channel channel : this.channels.values())
+      {
+         channel.setFrozen(frozen);
+      }
+   }
+
+   public void waitForAllExecutions()
+   {
+      for (Channel channel : this.channels.values())
+      {
+         // channel.setFrozen(frozen);
+         channel.waitForAllExecutions();
+      }
+   }
+
+   public Set<Thread> getExecutingThreads()
+   {
+      Set<Thread> threads = new HashSet<Thread>();
+
+      for (Channel channel : this.channels.values())
+      {
+         Thread thread = channel.getExecutingThread();
+
+         if (thread != null)
+         {
+            threads.add(thread);
+         }
+      }
+
+      return threads;
+   }
+
    // Package protected
    // ----------------------------------------------------------------------------
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -36,17 +36,26 @@
    // Attributes ----------------------------------------------------
 
    private List<Triple<Long, Long, Integer>> sequences;
+   
+   private long id;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public ReplicateLockSequenceMessage(final List<Triple<Long, Long, Integer>> sequences)
+   public ReplicateLockSequenceMessage(final long id, final List<Triple<Long, Long, Integer>> sequences)
    {
       super(REPLICATE_LOCK_SEQUENCES);
 
       this.sequences = sequences;
+      
+      this.id = id;
    }
+   
+   public long getID()
+   {
+      return id;
+   }
 
    // Public --------------------------------------------------------
 
@@ -60,7 +69,7 @@
       return BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
       sequences.size() *
       (2 * DataConstants.SIZE_LONG +
-      DataConstants.SIZE_INT);
+      DataConstants.SIZE_INT) + DataConstants.SIZE_LONG;
    }
 
    @Override
@@ -73,6 +82,7 @@
          buffer.writeLong(sequence.b);
          buffer.writeInt(sequence.c);
       }
+      buffer.writeLong(id);
    }
 
    @Override
@@ -87,6 +97,7 @@
                                                                             buffer.readInt());
          sequences.add(pair);
       }
+      id = buffer.readLong();
    }
 
    public List<Triple<Long, Long, Integer>> getSequences()

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -21,8 +21,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -92,7 +95,7 @@
 
    private volatile RemotingConnection serverSideReplicatingConnection;
 
-   private final Executor threadPool;
+   private final ExecutorService threadPool;
 
    private final ScheduledExecutorService scheduledThreadPool;
 
@@ -107,7 +110,7 @@
    public RemotingServiceImpl(final Configuration config,
                               final MessagingServer server,
                               final ManagementService managementService,
-                              final Executor threadPool,
+                              final ExecutorService threadPool,
                               final ScheduledExecutorService scheduledThreadPool,
                               final int managementConnectorID)
    {
@@ -228,7 +231,7 @@
 
       for (RemotingConnection connection : connections.values())
       {
-         connection.getChannel(0, -1, false).sendAndFlush(new PacketImpl(DISCONNECT));
+         connection.getChannel(0, -1, false, false).sendAndFlush(new PacketImpl(DISCONNECT));
       }
 
       for (Acceptor acceptor : acceptors)
@@ -282,22 +285,46 @@
          throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
       }
 
-      RemotingConnection replicatingConnection = server.getNonPooledReplicatingConnection();
+      Future<RemotingConnection> result = threadPool.submit(new Callable<RemotingConnection>()
+      {
+         public RemotingConnection call()
+         {
+            return server.getNonPooledReplicatingConnection();
+         }
+      });
 
+      RemotingConnection replicatingConnection;
+
+      try
+      {
+         replicatingConnection = result.get();
+      }
+      catch (ExecutionException e)
+      {
+         log.error("Failed to get replicating conection", e);
+         return;
+      }
+      catch (InterruptedException e)
+      {
+         log.error("Interrupted", e);
+         return;
+      }
+
       RemotingConnection rc = new RemotingConnectionImpl(connection,
                                                          replicatingConnection,
                                                          interceptors,
-                                                         !config.isBackup());
+                                                         !config.isBackup(),
+                                                         threadPool);
 
-      Channel channel1 = rc.getChannel(1, -1, false);
+      Channel channel1 = rc.getChannel(1, -1, false, false);
 
       final Replicator replicator;
 
       if (replicatingConnection != null)
       {
-         Channel replicatingChannel = replicatingConnection.getChannel(1, -1, false);
+         Channel replicatingChannel = replicatingConnection.getChannel(1, -1, false, false);
 
-         replicator = new ReplicatorImpl(replicatingChannel);
+         replicator = new ReplicatorImpl("mess server", replicatingChannel);
 
          replicatingChannel.setHandler(new ChannelHandler()
          {
@@ -461,7 +488,7 @@
       {
          this.conn = conn;
 
-         conn.getChannel(0, -1, false).setHandler(this);
+         conn.getChannel(0, -1, false, false).setHandler(this);
       }
 
       public synchronized void handlePacket(final Packet packet)

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -20,7 +20,9 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.Map.Entry;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -585,7 +587,7 @@
          currentSession.getChannel().close();
       }
 
-      Channel channel = connection.getChannel(channelID, sendWindowSize, false);
+      Channel channel = connection.getChannel(channelID, sendWindowSize, false, configuration.isBackup());
 
       RemotingConnection replicatingConnection = connection.getReplicatingConnection();
 
@@ -595,9 +597,9 @@
 
       if (replicatingConnection != null)
       {
-         replicatingChannel = replicatingConnection.getChannel(channelID, -1, false);
+         replicatingChannel = replicatingConnection.getChannel(channelID, -1, false, false);
 
-         replicator = new ReplicatorImpl(replicatingChannel);
+         replicator = new ReplicatorImpl("session " + channelID, replicatingChannel);
 
          replicatingChannel.setHandler(new ChannelHandler()
          {
@@ -842,7 +844,7 @@
 
       if (replicator != null)
       {
-         Channel channel1 = replicator.getReplicatingChannel().getConnection().getChannel(1, -1, false);
+         Channel channel1 = replicator.getReplicatingChannel().getConnection().getChannel(1, -1, false, false);
 
          channel1.send(new UnregisterQueueReplicationChannelMessage(queue.getID()));
       }
@@ -943,7 +945,7 @@
 
                Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
 
-               Channel channel1 = conn.getChannel(1, -1, false);
+               Channel channel1 = conn.getChannel(1, -1, false, false);
 
                ChannelHandler prevHandler = channel1.getHandler();
 
@@ -1028,7 +1030,8 @@
    {
       if (configuration.isBackup())
       {
-         log.info("A connection has been made to the backup server so it will be activated! This will result in the live server being considered failed.");
+         log.info("A connection has been made to the backup server so it will be activated! This will result in the live server being considered failed."
+                  );
 
          synchronized (this)
          {
@@ -1094,41 +1097,90 @@
 
          flock.unlock();
 
-         freezeConnections();
+         for (RemotingConnection rc : backupConnections)
+         {
+            rc.freeze();
+            
+            rc.setFrozenAllChannels(true);
+         }
+         
+         for (RemotingConnection rc : backupConnections)
+         {
+            while (true)
+            {
+               Thread thr = rc.getExecutingThread();
+               
+               if (thr == null)
+               {
+                  break;
+               }
+               
+               try
+               {
+                  Thread.sleep(1);
+               }
+               catch (InterruptedException ignore)
+               {
+               }
+            }
+         }
 
-         //We set a latch on each ReplicationAwareMutex
+         // We set a latch on each ReplicationAwareMutex
          ReplicationAwareMutex.setLatchAll();
 
          long start = System.currentTimeMillis();
 
-         //We wait for all executing threads to end up on this latch. If they instead end up on a SequencedLock we interrupt them
-         //until they fall through to the other latch
-         
+         // We wait for all executing threads to end up on this latch. If they instead end up on a SequencedLock we
+         // interrupt them
+         // until they fall through to the other latch
+
          for (RemotingConnection rc : backupConnections)
          {
             while (true)
             {
-               JBMThread executingThread = (JBMThread)rc.getExecutingThread();
+              // log.info("in loop");
+               Set<Thread> executingThreads = rc.getExecutingThreads();
 
-               if (executingThread == null)
+               boolean exit = true;
+               
+              // log.info("executing threads " + executingThreads.isEmpty());
+               
+               for (Thread executingThread : executingThreads)
                {
-                  break;
+                  if (executingThread instanceof JBMThread)
+                  {
+                     JBMThread jthread = (JBMThread)executingThread;
+
+                     if (jthread.isWaitingOnMutex())
+                     {
+                        //log.info("Thread " + jthread + " is waiting on mutex");
+                        
+                        jthread.setNoReplayOrRecord(0);      
+                     }
+                     else if (jthread.isWaitingOnSequencedLock())
+                     {
+                        jthread.setFrozen();
+
+                        executingThread.interrupt();
+                        
+                        exit = false;
+                     }
+                     else
+                     {
+                        exit = false;
+                     }
+                  }
+                  else
+                  {
+                     exit = false;
+                  }
                }
-
-               if (executingThread.isWaitingOnMutex())
+               
+               if (exit)
                {
-                  executingThread.setNoReplayOrRecord(0);
-                  
                   break;
                }
 
-               if (executingThread.isWaitingOnSequencedLock())
-               {
-                  executingThread.setFrozen();
-
-                  executingThread.interrupt();
-               }
-
                try
                {
                   Thread.sleep(1);
@@ -1141,23 +1193,28 @@
                {
                   throw new IllegalStateException("Timed out waiting for threads to exit or reach latch");
                }
+
             }
          }
+         
+       //  log.info("all on latch");
 
-         //Now we release the latch and wait for all threads to exit
-         
+         // Now we release the latch and wait for all threads to exit
+
          ReplicationAwareMutex.setOwnerLatchAll();
 
          start = System.currentTimeMillis();
+                  
 
          // Wait for everything to exit
          for (RemotingConnection rc : backupConnections)
          {
             while (true)
             {
-               JBMThread executingThread = (JBMThread)rc.getExecutingThread();
+              // log.info("in loop2");
+               Set<Thread> executingThreads = rc.getExecutingThreads();
 
-               if (executingThread == null)
+               if (executingThreads.isEmpty())
                {
                   break;
                }
@@ -1173,25 +1230,46 @@
                if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
                {
                   Exception e = new Exception();
+                  Thread executingThread = executingThreads.iterator().next();
                   e.setStackTrace(executingThread.getStackTrace());
                   log.error("Waiting for this thread " + executingThread, e);
-                  log.error("Replay " + executingThread.isReplay());
+                  // log.error("Replay " + executingThread.isReplay());
                   throw new IllegalStateException("Timed out waiting for threads to exit");
                }
             }
          }
+         
+       //  log.info("all exited");
+         
+         
+         
+         //FIXME this is not sufficient - since there may still be queued executions waiting from before the freeze
+         //need to wait for all executions on the channel to complete too
+         for (RemotingConnection rc : backupConnections)
+         {
+            rc.setFrozenAllChannels(false);
+         }
+         
+         //Now we need to wait for all executions to finish on the channel - they may be queued
+         
+         for (RemotingConnection rc : backupConnections)
+         {
+            rc.waitForAllExecutions();
+         }
 
          ReplicationAwareMutex.clearLatchAll();
+         
+         log.info("freeze complete");
       }
    }
 
-   private void freezeConnections()
-   {
-      for (RemotingConnection rc : backupConnections)
-      {
-         rc.freeze();
-      }
-   }
+   // private void freezeConnections()
+   // {
+   // for (RemotingConnection rc : backupConnections)
+   // {
+   // rc.freeze();
+   // }
+   // }
 
    public boolean registerBackupConnection(final RemotingConnection connection)
    {
@@ -1440,15 +1518,27 @@
       }
    }
 
-   private Replicator getReplicatorForQueue(final long queueID)
+   private Replicator getReplicatorForQueue(final long queueID) throws Exception
    {
-      RemotingConnection replicatingConnection = this.getPooledReplicatingConnection();
+      RemotingConnection replicatingConnection;
 
+      // Needs to be excuted on different thread since netty doesn't like new connections created on
+      // handler threads
+      java.util.concurrent.Future<RemotingConnection> result = threadPool.submit(new Callable<RemotingConnection>()
+      {
+         public RemotingConnection call()
+         {
+            return getPooledReplicatingConnection();
+         }
+      });
+
+      replicatingConnection = result.get();
+
       final Replicator replicator;
 
       if (replicatingConnection != null)
       {
-         Channel channel1 = replicatingConnection.getChannel(1, -1, false);
+         Channel channel1 = replicatingConnection.getChannel(1, -1, false, false);
 
          JBMThread thread = JBMThread.currentThread();
 
@@ -1458,9 +1548,9 @@
 
          thread.resumeRecording();
 
-         Channel replChannel = replicatingConnection.getChannel(queueID, -1, false);
+         Channel replChannel = replicatingConnection.getChannel(queueID, -1, false, false);
 
-         replicator = new ReplicatorImpl(replChannel);
+         replicator = new ReplicatorImpl("queue " + queueID, replChannel);
 
          replChannel.setHandler(new ChannelHandler()
          {

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -87,6 +87,8 @@
          throw new IllegalStateException("First packet must be startup info for backup " + type);
       }
 
+     // log.info("handling packet " + packet + " on backup " + this.server.getConfiguration().isBackup());
+      
       switch (type)
       {
          case REPLICATE_LOCK_SEQUENCES:
@@ -94,6 +96,8 @@
             ReplicateLockSequenceMessage msg = (ReplicateLockSequenceMessage)packet;
 
             sequences = msg.getSequences();
+            
+           // dumpSequences(sequences);
 
             return;
          }
@@ -116,7 +120,7 @@
          {
             RegisterQueueReplicationChannelMessage msg = (RegisterQueueReplicationChannelMessage)packet;
 
-            Channel channel = connection.getChannel(msg.getBindingID(), -1, false);
+            Channel channel = connection.getChannel(msg.getBindingID(), -1, false, true);
 
             if (server.registerBackupConnection(channel.getConnection()))
             {
@@ -129,7 +133,7 @@
          {
             UnregisterQueueReplicationChannelMessage msg = (UnregisterQueueReplicationChannelMessage)packet;
 
-            Channel channel = connection.getChannel(msg.getBindingID(), -1, false);
+            Channel channel = connection.getChannel(msg.getBindingID(), -1, false, true);
 
             channel.setHandler(null);
 
@@ -195,7 +199,7 @@
             log.error("Invalid packet " + packet);
          }
       }
-      sequences = null;
+      //sequences = null;
 
       // send the response message
 
@@ -268,5 +272,14 @@
 
       channel1.send(response);
    }
+   
+   private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
+   {
+      log.info(Thread.currentThread() + " Got on messaging server ph Sequences size is " + sequences.size());
+      for (Triple<Long, Long, Integer> sequence: sequences)
+      {
+         log.info(sequence.a + ": " + sequence.b);
+      }
+   }
 
 }
\ No newline at end of file

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -658,7 +658,7 @@
 
       try
       {
-         int count = messageReferences.size() + getScheduledCount() + getDeliveringCount();
+         int count = messageReferences.size() + scheduledDeliveryHandler.getScheduledCount() + deliveringCount.get();
 
          // log.info(System.identityHashCode(this) + " message count is " +
          // count +

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -79,6 +79,9 @@
             ReplicateLockSequenceMessage msg = (ReplicateLockSequenceMessage)packet;
             
             sequences = msg.getSequences();
+            
+           // log.info("got sequences on queue " + msg.getID());
+           // dumpSequences(sequences);
  
             break;
          }
@@ -118,4 +121,14 @@
          }
       }
    }
+   
+   private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
+   {
+      log.info(Thread.currentThread() + " Got on queue replication ph Sequences size is " + sequences.size());
+      
+      for (Triple<Long, Long, Integer> sequence: sequences)
+      {
+         log.info(sequence.a + ": " + sequence.b);
+      }
+   }
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -119,11 +119,14 @@
    private final Binding binding;
 
    private boolean flowControl = true;
+   
+   private boolean backup;
 
    // Constructors ---------------------------------------------------------------------------------
 
    public ServerConsumerImpl(final long id,
                              final ServerSession session,
+                             final Executor executor,
                              final QueueBinding binding,
                              final Filter filter,
                              final boolean started,
@@ -132,9 +135,12 @@
                              final PagingManager pagingManager,
                              final Channel channel,
                              final boolean preAcknowledge,
-                             final boolean updateDeliveries,      
-                             final ManagementService managementService) throws Exception
+                             final boolean updateDeliveries,
+                             final ManagementService managementService,
+                             boolean backup) throws Exception
    {
+      this.backup = backup;
+      
       this.id = id;
 
       this.filter = filter;
@@ -145,7 +151,7 @@
 
       this.messageQueue = binding.getQueue();
 
-      this.executor = null;
+      this.executor = executor;
 
       this.started = browseOnly || started;
 
@@ -164,11 +170,15 @@
       this.minLargeMessageSize = session.getMinLargeMessageSize();
 
       this.updateDeliveries = updateDeliveries;
+      
+      name = "consumer " + session.getName() + "-" + id;
 
-      lock = new ReplicationAwareMutex("consumer " + session.getName() + "-" + id, 0, true);
-            
+      lock = new ReplicationAwareMutex(name, 0, true);
+
       binding.getQueue().addConsumer(this);
    }
+   
+   String name;
 
    // ServerConsumer implementation
    // ----------------------------------------------------------------------
@@ -292,10 +302,12 @@
          promptDelivery();
       }
    }
-   
+
    public void receiveCredits(final int credits) throws Exception
    {
       boolean promptDelivery = false;
+      
+     // log.info(name + "backup " + backup + " received credits");
 
       lock.lock(1);
 
@@ -312,7 +324,7 @@
             {
                promptDelivery = true;
             }
-            
+
             availableCredits += credits;
          }
       }
@@ -421,19 +433,10 @@
    }
 
    // Public ---------------------------------------------------------------------------------------
-
-   /** To be used on tests only */
-   public int getAvailableCredits()
+   
+   public boolean isFlowControl()
    {
-      lock.lock(2);
-      try
-      {
-         return availableCredits;
-      }
-      finally
-      {
-         lock.unlock();
-      }
+      return flowControl;
    }
 
    // Private --------------------------------------------------------------------------------------
@@ -464,7 +467,15 @@
          // If we play the commands on a different order than how they were generated on the live node, we will
          // eventually still be running this largeMessage before the next message come, what would reject messages
          // from the cluster
-         largeMessageDeliverer.deliver();
+         lock.lock(1234);
+         try
+         {
+            largeMessageDeliverer.deliver();
+         }
+         finally
+         {
+            lock.unlock();
+         }
       }
       else
       {
@@ -474,12 +485,13 @@
 
    private HandleStatus doHandle(final MessageReference ref) throws Exception
    {
+    //  log.info(name + " backup " + backup + " handle");
       lock.lock(3);
 
       try
       {
          if ((flowControl && availableCredits <= 0) || !started)
-         {            
+         {
             return HandleStatus.BUSY;
          }
 
@@ -569,7 +581,7 @@
       {
          availableCredits -= packet.getRequiredBufferSize();
       }
-      
+
       channel.send(packet);
    }
 
@@ -626,98 +638,89 @@
 
       public boolean deliver()
       {
-         lock.lock(5);
+         if (pendingLargeMessage == null)
+         {
+            return true;
+         }
 
-         try
+         if (flowControl && availableCredits <= 0)
          {
-            if (pendingLargeMessage == null)
-            {
-               return true;
-            }
+            return false;
+         }
+         SessionReceiveMessage initialMessage;
 
-            if (flowControl && availableCredits <= 0)
-            {
-               return false;
-            }
-            SessionReceiveMessage initialMessage;
+         if (sentFirstMessage)
+         {
+            initialMessage = null;
+         }
+         else
+         {
+            sentFirstMessage = true;
 
-            if (sentFirstMessage)
-            {
-               initialMessage = null;
-            }
-            else
-            {
-               sentFirstMessage = true;
+            MessagingBuffer headerBuffer = ChannelBuffers.buffer(pendingLargeMessage.getPropertiesEncodeSize());
 
-               MessagingBuffer headerBuffer = ChannelBuffers.buffer(pendingLargeMessage.getPropertiesEncodeSize());
+            pendingLargeMessage.encodeProperties(headerBuffer);
 
-               pendingLargeMessage.encodeProperties(headerBuffer);
+            initialMessage = new SessionReceiveMessage(id,
+                                                       headerBuffer.array(),
+                                                       pendingLargeMessage.getLargeBodySize(),
+                                                       ref.getDeliveryCount());
+         }
 
-               initialMessage = new SessionReceiveMessage(id,
-                                                          headerBuffer.array(),
-                                                          pendingLargeMessage.getLargeBodySize(),
-                                                          ref.getDeliveryCount());
-            }
+         int precalculateAvailableCredits;
 
-            int precalculateAvailableCredits;
+         if (flowControl)
+         {
+            // Flow control needs to be done in advance.
+            precalculateAvailableCredits = preCalculateFlowControl(initialMessage);
+         }
+         else
+         {
+            precalculateAvailableCredits = 0;
+         }
 
+         if (initialMessage != null)
+         {
+            channel.send(initialMessage);
+
             if (flowControl)
             {
-               // Flow control needs to be done in advance.
-               precalculateAvailableCredits = preCalculateFlowControl(initialMessage);
+               precalculateAvailableCredits -= initialMessage.getRequiredBufferSize();
             }
-            else
+         }
+
+         while (positionPendingLargeMessage < sizePendingLargeMessage)
+         {
+            if (flowControl && precalculateAvailableCredits <= 0)
             {
-               precalculateAvailableCredits = 0;
+               return false;
             }
 
-            if (initialMessage != null)
-            {
-               channel.send(initialMessage);
+            SessionReceiveContinuationMessage chunk = createChunkSend();
 
-               if (flowControl)
-               {
-                  precalculateAvailableCredits -= initialMessage.getRequiredBufferSize();
-               }
-            }
+            int chunkLen = chunk.getBody().length;
 
-            while (positionPendingLargeMessage < sizePendingLargeMessage)
+            if (flowControl)
             {
-               if (flowControl && precalculateAvailableCredits <= 0)
+               if ((precalculateAvailableCredits -= chunk.getRequiredBufferSize()) < 0)
                {
-                  return false;
+                  log.warn("Flowcontrol logic is not working properly, too many credits were taken");
                }
-
-               SessionReceiveContinuationMessage chunk = createChunkSend();
-
-               int chunkLen = chunk.getBody().length;
-
-               if (flowControl)
-               {
-                  if ((precalculateAvailableCredits -= chunk.getRequiredBufferSize()) < 0)
-                  {
-                     log.warn("Flowcontrol logic is not working properly, too many credits were taken");
-                  }
-               }
-
-               channel.send(chunk);
-
-               positionPendingLargeMessage += chunkLen;
             }
 
-            if (precalculateAvailableCredits != 0)
-            {
-               log.warn("Flowcontrol logic is not working properly... creidts = " + precalculateAvailableCredits);
-            }
+            channel.send(chunk);
 
-            close();
+            positionPendingLargeMessage += chunkLen;
+         }
 
-            return true;
-         }
-         finally
+         if (precalculateAvailableCredits != 0)
          {
-            lock.unlock();
+            log.warn("Flowcontrol logic is not working properly... creidts = " + precalculateAvailableCredits);
          }
+
+         close();
+
+         return true;
       }
 
       /**

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -454,6 +454,7 @@
 
          ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
                                                           this,
+                                                          executorFactory.getExecutor(),
                                                           (QueueBinding)binding,
                                                           filter,
                                                           started,
@@ -463,7 +464,8 @@
                                                           channel,
                                                           preAcknowledge,
                                                           updateDeliveries,
-                                                          managementService);
+                                                          managementService,
+                                                          server.getConfiguration().isBackup());
 
          consumers.put(consumer.getID(), consumer);
 
@@ -525,6 +527,8 @@
       boolean durable = packet.isDurable();
 
       Packet response = null;
+      
+     // log.info("** handling create queue on backup " + this.server.getConfiguration().isBackup());
 
       try
       {
@@ -1575,6 +1579,8 @@
       remotingConnection.addCloseListener(this);
 
       int serverLastConfirmedCommandID = channel.getLastConfirmedCommandID();
+      
+      channel.setFrozen(false);
 
       //log.info("telling channel to replay commands up to " + lastConfirmedCommandID);
       

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -80,7 +80,6 @@
 import org.jboss.messaging.core.server.replication.ReplicableAction;
 import org.jboss.messaging.core.server.replication.Replicator;
 import org.jboss.messaging.core.server.replication.impl.JBMThread;
-import org.jboss.messaging.utils.Pair;
 import org.jboss.messaging.utils.Triple;
 
 /**
@@ -102,10 +101,10 @@
 
    private Configuration config;
 
-   //TODO the sequences and repl response can be encapsulated in a super class
-   
+   // TODO the sequences and repl response can be encapsulated in a super class
+
    private volatile List<Triple<Long, Long, Integer>> sequences;
-   
+
    private final Channel channel;
 
    public ServerSessionPacketHandler(final ServerSession session,
@@ -139,57 +138,45 @@
    public void handlePacket(final Packet packet)
    {
       this.packet = packet;
-      
+
       if (config.isBackup())
       {
-         //log.info(System.identityHashCode(this) + " inv on backup");
-         
+         // log.info(System.identityHashCode(this) + " inv on backup");
+
          JBMThread thread = JBMThread.currentThread();
+         
+         if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES)
+         {            
+            thread.setReplay(sequences);           
+         }
 
-         thread.setReplay(sequences);
-
          handlePacket();
 
          // send the response message
-                  
+
          if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES)
-         {
-            //checkConfirm(packet);
-            
-           // log.info("sending confirm on sess handle packet");
+         {           
             channel.confirm(packet);
-            
-//            if (packet.getType() == PacketImpl.SESS_SEND)
-//            {
-//               SessionSendMessage sm = (SessionSendMessage)packet;
-//               
-//               ServerMessage msg = sm.getServerMessage();
-//               
-//               int cnt = (Integer)msg.getProperty(new SimpleString("count"));
-//               
-//               log.info("confirmed send " + cnt);
-//            }
-               
+
             channel.send(new ReplicationResponseMessage());
+            
+            thread.setNoReplayOrRecord(4);
          }
+
          
-         thread.setNoReplayOrRecord(4);
-         
-         checkCloseSessionChannels(packet);         
+
+         checkCloseSessionChannels(packet);
       }
       else
       {
-        // log.info(System.identityHashCode(this) + " inv on live, repl is " + replicator);
-         
          if (replicator != null)
          {
-            replicator.execute(this, 
-                               new Runnable()
+            replicator.execute(this, new Runnable()
             {
                public void run()
                {
                   checkConfirm(packet);
-                  
+
                   checkCloseSessionChannels(packet);
                }
             });
@@ -197,41 +184,40 @@
          else
          {
             handlePacket();
-            
+
             checkConfirm(packet);
-            
+
             checkCloseSessionChannels(packet);
          }
       }
    }
-   
+
    private void checkConfirm(final Packet packet)
    {
-      //TODO this is a bit hacky
+      // TODO this is a bit hacky
       if (packet.getType() != PacketImpl.SESS_CLOSE)
-      {
-        // log.info("sending confirm from sess close");
+      {         
          channel.confirm(packet);
       }
    }
-   
+
    private void checkCloseSessionChannels(final Packet packet)
    {
       if (packet.getType() == PacketImpl.SESS_CLOSE)
       {
-         //Close channels once we have the response back from the backup
+         // Close channels once we have the response back from the backup
          session.closeChannels();
       }
    }
 
-//   private static synchronized void dumpSequences(List<Long> sequences)
-//   {
-//      log.info("dumping sequences");
-//      for (long sequence : sequences)
-//      {
-//         log.info(sequence);
-//      }
-//   }
+   // private static synchronized void dumpSequences(List<Long> sequences)
+   // {
+   // log.info("dumping sequences");
+   // for (long sequence : sequences)
+   // {
+   // log.info(sequence);
+   // }
+   // }
 
    private void handlePacket()
    {
@@ -244,11 +230,12 @@
             case REPLICATE_LOCK_SEQUENCES:
             {
                ReplicateLockSequenceMessage msg = (ReplicateLockSequenceMessage)packet;
-
+             
                sequences = msg.getSequences();
-               
-               //dumpSequences(sequences);
-               
+
+             //  log.info("session got sequences");
+             //  dumpSequences(sequences);
+
                break;
             }
             case SESS_CREATECONSUMER:
@@ -429,4 +416,13 @@
          log.error("Caught unexpected exception", t);
       }
    }
+
+   private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
+   {
+      log.info(Thread.currentThread() + " Got on serversession ph Sequences size is " + sequences.size());
+      for (Triple<Long, Long, Integer> sequence : sequences)
+      {
+         log.info(sequence.a + ": " + sequence.b + ": " + sequence.c);
+      }
+   }
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -121,6 +121,10 @@
 
    public void setReplay(final List<Triple<Long, Long, Integer>> objectSequences)
    {
+      if (objectSequences == null)
+      {
+         throw new NullPointerException("sequences cannot be null");
+      }
      // log.info(this + " set replay");
       this.objectSequences = objectSequences;
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -22,6 +22,8 @@
 
 package org.jboss.messaging.core.server.replication.impl;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -62,14 +64,21 @@
    private static final Set<ReplicationAwareMutex> allMutexes = new WeakHashSet<ReplicationAwareMutex>();
 
    private volatile CountDownLatch otherLatch;
+   
+   private final String name;
+   
+   //debug
+  // private List<Exception> history = new ArrayList<Exception>();
 
    public ReplicationAwareMutex(final String name, final int initialCount, final boolean debug)
    {
+      this.name = name;
+      
       this.id = idSequence.getAndIncrement();
 
       lock = new ReentrantLock();
 
-      sequencedLock = new SequencedLock(name, initialCount);
+      sequencedLock = new SequencedLock(id, name, initialCount);
 
       counter = new AtomicInteger(initialCount);
 
@@ -79,6 +88,24 @@
       }
    }
 
+   public static void dumpHistory(final String name)
+   {
+//      synchronized (allMutexes)
+//      {
+//         for (ReplicationAwareMutex mutex: allMutexes)
+//         {
+//            if (mutex.name.equals(name))
+//            {
+//               log.info("Dumping history of mutex with name " + name + " items " + mutex.history.size());
+//               for (Exception e: mutex.history)
+//               {
+//                  log.info("acquirer", e);
+//               }
+//            }
+//         }
+//      }
+   }
+   
    public static void setOwnerLatchAll()
    {
       synchronized (allMutexes)
@@ -110,18 +137,18 @@
       synchronized (allMutexes)
       {
          for (ReplicationAwareMutex mutex : allMutexes)
-         {            
+         {
             mutex.setLatch();
          }
       }
    }
-   
+
    public static void clearLatchAll()
    {
       synchronized (allMutexes)
       {
          for (ReplicationAwareMutex mutex : allMutexes)
-         {            
+         {
             mutex.clearLatches();
          }
       }
@@ -139,7 +166,7 @@
          otherLatch.countDown();
       }
    }
-   
+
    public void clearLatches()
    {
       otherLatch = freezeLatch = null;
@@ -163,8 +190,19 @@
 
    private boolean doLock(long time, TimeUnit unit, int methodID) throws InterruptedException
    {
-      JBMThread thread = JBMThread.currentThread();
+      // TODO optimise this
+      Thread thread = Thread.currentThread();
 
+      JBMThread jthread;
+      if (thread instanceof JBMThread)
+      {
+         jthread = (JBMThread)thread;
+      }
+      else
+      {
+         jthread = null;
+      }
+
       // debug only
       if (owners.contains(thread))
       {
@@ -176,10 +214,10 @@
          throw new IllegalStateException("Lock is NOT re-entrant!");
       }
 
-      if (otherLatch != null)
+      if (jthread != null && otherLatch != null)
       {
-         thread.setWaitingOnMutex(true);
-        
+         jthread.setWaitingOnMutex(true);
+
          while (true)
          {
             try
@@ -187,60 +225,69 @@
                otherLatch.await();
             }
             catch (InterruptedException e)
-            {       
-               //This might get interrupted by mistake when we interrupt the thread thinking it's on the sequenced lock
-               //in which case we just wait again
+            {
+               // This might get interrupted by mistake when we interrupt the thread thinking it's on the sequenced lock
+               // in which case we just wait again
                continue;
             }
-            
+
             break;
          }
 
-         thread.setWaitingOnMutex(false);
+         jthread.setWaitingOnMutex(false);
       }
 
-      if (thread.isReplay())
+      if (jthread != null && jthread.isReplay())
       {
-         //log.info("Thread " + thread + " is replay");
-         Triple<Long, Long, Integer> pair = thread.getNextSequence();
+         // log.info("Thread " + thread + " is replay");
+         Triple<Long, Long, Integer> pair = jthread.getNextSequence();
 
-         // // Sanity check
-         // String otherName = SequencedLock.getLock(pair.a).getName();
-         //            
-         // //If sequencedLock
-         //            
-         // if ((!otherName.equals(name) || methodID != pair.c))
-         // {
-         // String msg = "Invalid object id, expecting " + name + ": " + methodID + " got " + otherName + ": " + pair.c
-         // +
-         // " lock id is " + pair.a;
-         //
-         // log.error(msg);
-         //
-         // thread.dumpSequences();
-         //               
-         // SequencedLock.dumpLockMap();
-         //
-         // throw new IllegalStateException(msg);
-         // }
+//          // Sanity check
+//         String otherName = SequencedLock.getLock(pair.a).getName();
+//
+//         // If sequencedLock
+//
+//         if ((!otherName.equals(name) || methodID != pair.c))
+//         {
+//            String msg = "Invalid object id, expecting " + name +
+//                         ": " +
+//                         methodID +
+//                         " got " +
+//                         otherName +
+//                         ": " +
+//                         pair.c +
+//                         " lock id is " +
+//                         pair.a;
+//
+//            log.error(msg);
+//
+//            dumpSequences(jthread.getSequences());
+//
+//            SequencedLock.dumpLockMap();
+//
+//            throw new IllegalStateException(msg);
+//         }
 
          long sequence = pair.b;
 
          try
          {
-            if (!thread.isReplay())
+            if (!jthread.isReplay())
             {
                throw new IllegalStateException("How can it be non replay?");
             }
-            
+
             if (!sequencedLock.lock(sequence, unit.toNanos(time)))
             {
                // dumpLocksWithName(name);
+               log.error("Timedout out waiting for lock " + name + " method id " + methodID);
+               
+               dumpHistory(name);
             }
          }
          catch (InterruptedException e)
          {
-            if (thread.isFrozen())
+            if (jthread.isFrozen())
             {
                // We retry and this time it will use the standard mutex - this happens on freezing out
                return doLock(time, unit, methodID);
@@ -248,6 +295,10 @@
          }
 
          addOwner(thread);
+         
+//         Exception hist = new Exception();
+//         hist.setStackTrace(thread.getStackTrace());
+//         this.history.add(hist);
 
          return true;
       }
@@ -265,14 +316,18 @@
 
          if (ok)
          {
-            if (thread.isRecording())
+            if (jthread != null && jthread.isRecording())
             {
                long sequence = counter.getAndIncrement();
 
-               thread.addSequence(new Triple<Long, Long, Integer>(id, sequence, methodID));
+               jthread.addSequence(new Triple<Long, Long, Integer>(id, sequence, methodID));
             }
 
             addOwner(thread);
+            
+//            Exception hist = new Exception();
+//            hist.setStackTrace(thread.getStackTrace());
+//            this.history.add(hist);
          }
 
          return ok;
@@ -281,8 +336,18 @@
 
    private void doUnlock()
    {
-      JBMThread thread = JBMThread.currentThread();
+      Thread thread = Thread.currentThread();
 
+      JBMThread jthread;
+      if (thread instanceof JBMThread)
+      {
+         jthread = (JBMThread)thread;
+      }
+      else
+      {
+         jthread = null;
+      }
+
       if (thread == unfreezeOwner)
       {
          // Don't actually unlock this, since we never had the lock - we had the lock on the original SequencedLock
@@ -293,7 +358,7 @@
       }
       else
       {
-         if (thread.isReplay())
+         if (jthread != null && jthread.isReplay())
          {
             sequencedLock.unlock();
          }
@@ -304,21 +369,31 @@
       }
 
       removeOwner(thread);
+
    }
 
    // debug only
-   private void addOwner(final JBMThread thread)
+   private void addOwner(final Thread thread)
    {
       owners.add(thread);
    }
 
    // debug only
-   private void removeOwner(final JBMThread thread)
+   private void removeOwner(final Thread thread)
    {
       owners.remove(thread);
    }
 
    // For debug
    private Set<Thread> owners = new ConcurrentHashSet<Thread>();
+   
+   private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
+   {
+      log.info("Sequences size is " + sequences.size());
+      for (Triple<Long, Long, Integer> sequence: sequences)
+      {
+         log.info(sequence.a + ": " + sequence.b);
+      }
+   }
 
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -53,16 +53,27 @@
 
    public long getAndIncrement()
    {
-      JBMThread thread = JBMThread.currentThread();
+     // TODO optimise this
+      Thread thread = Thread.currentThread();
 
-      if (thread.isReplay())
+      JBMThread jthread;
+      if (thread instanceof JBMThread)
       {
-         if (thread.isRecording())
+         jthread = (JBMThread)thread;
+      }
+      else
+      {
+         jthread = null;
+      }
+      
+      if (jthread != null && jthread.isReplay())
+      {
+         if (jthread.isRecording())
          {
             throw new IllegalStateException("Thread should not be recording");
          }
          
-         Triple<Long, Long, Integer> pair = thread.getNextSequence();
+         Triple<Long, Long, Integer> pair = jthread.getNextSequence();
             
          //Sanity check
          if (pair.a != -1)
@@ -87,9 +98,9 @@
       {
          long sequence = al.getAndIncrement();
 
-         if (thread.isRecording())
+         if (jthread != null && jthread.isRecording())
          {
-            thread.addSequence(new Triple<Long, Long, Integer>(-1L, sequence, -1));
+            jthread.addSequence(new Triple<Long, Long, Integer>(-1L, sequence, -1));
          }
 
          return sequence;

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -27,6 +27,7 @@
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Channel;
@@ -35,7 +36,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
 import org.jboss.messaging.core.server.replication.ReplicableAction;
 import org.jboss.messaging.core.server.replication.Replicator;
-import org.jboss.messaging.utils.Pair;
 import org.jboss.messaging.utils.Triple;
 
 /**
@@ -70,9 +70,13 @@
    {
       int count;
    }
+   
+   private String name;
             
-   public ReplicatorImpl(final Channel replicatingChannel)
+   public ReplicatorImpl(String name, final Channel replicatingChannel)
    {
+      this.name = name;
+      
       this.replicatingChannel = replicatingChannel;
    }
    
@@ -104,15 +108,6 @@
    {      
       WaitingChannelsHolder waitingChannelsHolder = waitingChannelsQueue.remove();
       
-     // log.info("got replication response in replicator for sent packet " + waitingChannelsHolder.sentPacket);
-            
-     // log.info("there are " + waitingChannelsHolder.channelQueuedWriteCounts.size() + " waiting channels");
-            
-//      if (waitingChannelsHolder.sentPacket.getType() == PacketImpl.SESS_CLOSE)
-//      {
-//         log.info("***Got session close response from backup");
-//      }
-      
       for (Map.Entry<QueuedWriteManager, ChannelCount> entry: waitingChannelsHolder.channelQueuedWriteCounts.entrySet())
       {    
          entry.getKey().replicationResponseReceived(this, entry.getValue().count);
@@ -146,13 +141,10 @@
       
       List<Triple<Long, Long, Integer>> sequences = thread.getSequences();
       
-   //   log.info("Replicating:");
-      
-    //  thread.dumpSequences();
-      
-      
+      long id = seq.getAndIncrement();
+     // log.info("replicating " + name + " seq " + id);
      // dumpSequences(sequences);
-
+      
       // We then send the sequences to the backup
       
       WaitingChannelsHolder holder = new WaitingChannelsHolder();
@@ -161,22 +153,26 @@
       holder.sentPacket = action.getPacket();
       
       waitingChannelsQueue.add(holder);      
+      
+      
 
-      Packet packet = new ReplicateLockSequenceMessage(sequences);
+      Packet packet = new ReplicateLockSequenceMessage(id, sequences);
 
       replicatingChannel.send(packet);
 
       // Next we replicate the actual action
-           
+      
       replicatingChannel.send(action.getPacket());     
    }
    
-   private void dumpSequences(List<Long> sequences)
+   private static final AtomicLong seq = new AtomicLong(0);
+   
+   private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
    {
       log.info("Sequences size is " + sequences.size());
-      for (long sequence: sequences)
+      for (Triple<Long, Long, Integer> sequence: sequences)
       {
-         log.info(sequence);
+         log.info(sequence.a + ": " + sequence.b);
       }
    }
    

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -53,12 +53,14 @@
 
    private final AtomicBoolean locked = new AtomicBoolean(false);
 
+   private final long id;
+
    // private volatile boolean strictOrder = true;
 
    private static Map<Long, SequencedLock> locks = new HashMap<Long, SequencedLock>();
 
    // for debug only
-  // private final long id;
+   // private final long id;
 
    private final String name;
 
@@ -67,15 +69,16 @@
       return owner;
    }
 
-   // private static synchronized void registerLock(final SequencedLock lock)
-   // {
-   // locks.put(lock.id, lock);
-   // }
+   private static synchronized void registerLock(final SequencedLock lock)
+   {
+      //locks.put(lock.id, lock);
+   }
+
    //
-   // public static synchronized SequencedLock getLock(final long id)
-   // {
-   // return locks.get(id);
-   // }
+    public static synchronized SequencedLock getLock(final long id)
+   {
+      return locks.get(id);
+   }
 
    public String getName()
    {
@@ -130,30 +133,30 @@
 
          JBMThread thr = (JBMThread)owner;
 
-//         SequencedLock waitingFor = thr.getLastLock();
-//
-//         if (waitingFor != null)
-//         {
-//            while (true)
-//            {
-//               log.info("waiting for...");
-//               waitingFor.dump();
-//
-//               thr = (JBMThread)waitingFor.owner;
-//
-//               if (thr != null)
-//               {
-//                  waitingFor = thr.getLastLock();
-//
-//                  if (waitingFor != null)
-//                  {
-//                     continue;
-//                  }
-//               }
-//
-//               break;
-//            }
-//         }
+         // SequencedLock waitingFor = thr.getLastLock();
+         //
+         // if (waitingFor != null)
+         // {
+         // while (true)
+         // {
+         // log.info("waiting for...");
+         // waitingFor.dump();
+         //
+         // thr = (JBMThread)waitingFor.owner;
+         //
+         // if (thr != null)
+         // {
+         // waitingFor = thr.getLastLock();
+         //
+         // if (waitingFor != null)
+         // {
+         // continue;
+         // }
+         // }
+         //
+         // break;
+         // }
+         // }
       }
 
       // log.info("Waiting threads: " + queue.size());
@@ -165,9 +168,9 @@
       // }
    }
 
-   public SequencedLock(final String name, final long sequence)
+   public SequencedLock(final long id, final String name, final long sequence)
    {
-     // this.id = id;
+      this.id = id;
 
       this.name = name;
 
@@ -175,10 +178,9 @@
 
       this.currentSequence = new AtomicLong(sequence);
 
-      // registerLock(this);
+      registerLock(this);
    }
 
-
    // TODO parking with a timeout seems to be a lot slower than parking without timeout
    public boolean lock(final long sequence, final long timeout) throws InterruptedException
    {
@@ -197,33 +199,33 @@
          while (true)
          {
             QueueEntry peeked = peekEntry();
-   
+
             if (peeked == null || // There are higher priority threads
                 peeked.thread != currentThread ||
                 !locked.compareAndSet(false, true)) // Lock is already locked
             {
                currentThread.setWaitingOnSequencedLock(true);
-                      
+
                LockSupport.parkNanos(toWait);
-               
+
                if (Thread.interrupted() && currentThread.isFrozen())
                {
                   throw new InterruptedException();
                }
-   
+
                long now = System.nanoTime();
-   
+
                toWait -= now - start;
-   
+
                if (toWait <= 0)
                {
                   log.warn("Timed out waiting for sequenced lock, current " + currentSequence.get() +
                            " expected " +
                            sequence);
-   
+
                   return false;
                }
-   
+
                start = now;
             }
             else
@@ -240,7 +242,7 @@
       queue.remove();
 
       owner = currentThread;
-            
+
       return true;
    }
 
@@ -266,7 +268,6 @@
 
    }
 
-
    private QueueEntry peekEntry()
    {
       QueueEntry entry = queue.peek();

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -18,19 +18,22 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.tests.concurrent.server.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Executor;
 
+import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.replication.Replicator;
 import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeConsumer;
 import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeQueueFactory;
 import org.jboss.messaging.tests.util.UnitTestCase;
@@ -48,163 +51,167 @@
 public class QueueTest extends UnitTestCase
 {
    private static final Logger log = Logger.getLogger(QueueTest.class);
-   
+
    private QueueFactory queueFactory = new FakeQueueFactory();
-   
+
    /*
     * Concurrent set consumer not busy, busy then, call deliver while messages are being added and consumed
     */
    public void testConcurrentAddsDeliver() throws Exception
    {
-      Queue queue = queueFactory.createQueue(1, new SimpleString("address1"), new SimpleString("queue1"), null, false, false);
-      
+      Queue queue = queueFactory.createQueue(1,
+                                             new SimpleString("address1"),
+                                             new SimpleString("queue1"),
+                                             null,
+                                             false,
+                                             false,
+                                             null,
+                                             null);
+
       FakeConsumer consumer = new FakeConsumer();
-      
+
       queue.addConsumer(consumer);
-      
+
       final long testTime = 5000;
-      
+
       Sender sender = new Sender(queue, testTime);
-      
+
       Toggler toggler = new Toggler(queue, consumer, testTime);
-      
+
       sender.start();
-      
+
       toggler.start();
-      
+
       sender.join();
-      
+
       toggler.join();
-      
+
       consumer.setStatusImmediate(HandleStatus.HANDLED);
-      
+
       queue.deliverNow();
 
       if (sender.getException() != null)
       {
          throw sender.getException();
       }
-      
+
       if (toggler.getException() != null)
       {
          throw toggler.getException();
       }
-      
+
       assertRefListsIdenticalRefs(sender.getReferences(), consumer.getReferences());
-      
+
       log.info("num refs: " + sender.getReferences().size());
-      
+
       log.info("num toggles: " + toggler.getNumToggles());
-      
+
    }
-   
+
    // Inner classes ---------------------------------------------------------------
-   
+
    class Sender extends Thread
    {
       private volatile Exception e;
-      
+
       private Queue queue;
-      
+
       private long testTime;
-      
+
       private volatile int i;
-      
+
       public Exception getException()
       {
          return e;
       }
-      
+
       private List<MessageReference> refs = new ArrayList<MessageReference>();
-      
+
       public List<MessageReference> getReferences()
       {
          return refs;
       }
-      
+
       Sender(Queue queue, long testTime)
       {
          this.testTime = testTime;
-         
+
          this.queue = queue;
       }
-      
+
       public void run()
       {
          long start = System.currentTimeMillis();
-         
+
          while (System.currentTimeMillis() - start < testTime)
          {
             ServerMessage message = generateMessage(i);
-            
+
             MessageReference ref = message.createReference(queue);
-            
+
             queue.addLast(ref);
-            
+
             refs.add(ref);
-            
+
             i++;
          }
       }
    }
-   
+
    class Toggler extends Thread
    {
       private volatile Exception e;
-      
+
       private Queue queue;
-      
+
       private FakeConsumer consumer;
-      
+
       private long testTime;
-      
+
       private boolean toggle;
-      
+
       private volatile int numToggles;
-      
+
       public int getNumToggles()
       {
          return numToggles;
       }
-      
+
       public Exception getException()
       {
          return e;
       }
-      
+
       Toggler(Queue queue, FakeConsumer consumer, long testTime)
       {
          this.testTime = testTime;
-         
+
          this.queue = queue;
-         
+
          this.consumer = consumer;
       }
-      
+
       public void run()
       {
          long start = System.currentTimeMillis();
-         
+
          while (System.currentTimeMillis() - start < testTime)
          {
             if (toggle)
             {
-               consumer.setStatusImmediate(HandleStatus.BUSY);              
+               consumer.setStatusImmediate(HandleStatus.BUSY);
             }
             else
             {
                consumer.setStatusImmediate(HandleStatus.HANDLED);
-               
+
                queue.deliverNow();
             }
             toggle = !toggle;
-            
+
             numToggles++;
          }
       }
    }
-      
+
 }
-
-
-

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -925,12 +925,12 @@
                {
                   ServerConsumerImpl consumerImpl = (ServerConsumerImpl)consumer;
                   long timeout = System.currentTimeMillis() + 5000;
-                  while (timeout > System.currentTimeMillis() && consumerImpl.getAvailableCredits() != null)
+                  while (timeout > System.currentTimeMillis() && consumerImpl.isFlowControl())
                   {
                      Thread.sleep(10);
                   }
                   
-                  assertNull(consumerImpl.getAvailableCredits());
+                  assertFalse(consumerImpl.isFlowControl());
                }
             }
          }

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-07-20 22:34:58 UTC (rev 7596)
@@ -1286,7 +1286,7 @@
 
    protected int getNumIterations()
    {
-      return 1000;
+      return 500;
    }
 
    @Override




More information about the jboss-cvs-commits mailing list