[jboss-cvs] JBoss Messaging SVN: r7573 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/paging/impl and 11 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jul 15 08:37:25 EDT 2009


Author: timfox
Date: 2009-07-15 08:37:24 -0400 (Wed, 15 Jul 2009)
New Revision: 7573

Added:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
Removed:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java
Modified:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnector.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/spi/Connector.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/SequencedLockTest.java
Log:
MT replication

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -725,11 +725,22 @@
 
       try
       {
-        // log.info(System.identityHashCode(this) + " session handling failover");
+       //  log.info(System.identityHashCode(this) + " session handling failover");
          
          //Prevent any more packets being handled on the old connection
          channel.getConnection().freeze();
          
+         while (channel.getConnection().getExecutingThread() != null)
+         {
+            try
+            {
+               Thread.sleep(1);
+            }
+            catch (InterruptedException ignore)
+            {               
+            }
+         }
+         
          channel.transferConnection(backupConnection);
 
          backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
@@ -738,7 +749,7 @@
 
          int lid = channel.getLastConfirmedCommandID();
          
-      //   log.info(System.identityHashCode(this) + " last received command id on client side is " + lid);
+       //  log.info(System.identityHashCode(this) + " last received command id on client side is " + lid);
          
          Packet request = new ReattachSessionMessage(name, lid);
 
@@ -746,11 +757,11 @@
 
          ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
          
-       //  log.info(System.identityHashCode(this) + " got response from reattach session");
+        // log.info(System.identityHashCode(this) + " got response from reattach session");
          
          if (!response.isRemoved())
          {
-           // log.info(System.identityHashCode(this) + " found session, server last received command id is " + response.getLastConfirmedCommandID());
+            //log.info(System.identityHashCode(this) + " found session, server last received command id is " + response.getLastConfirmedCommandID());
             
             channel.replayCommands(response.getLastConfirmedCommandID());
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.core.client.impl;
 
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_CONTINUATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
 
@@ -33,6 +34,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
 import org.jboss.messaging.utils.SimpleString;
 
 /**
@@ -97,7 +99,7 @@
                log.error("Received exception asynchronously from server", mem.getException());
                
                break;
-            }
+            }           
             default:
             {
                throw new IllegalStateException("Invalid packet: " + type);

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -22,8 +22,6 @@
 
 package org.jboss.messaging.core.client.impl;
 
-import java.util.Set;
-
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.RemotingConnection;
@@ -69,4 +67,6 @@
    void returnConnection(RemotingConnection conn);
    
    void close();
+   
+   void setNeverFail();
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -151,6 +151,8 @@
    private Connector connector;
 
    private Map<Object, Pinger> pingers = new ConcurrentHashMap<Object, Pinger>();
+   
+   private volatile boolean neverFail;
 
    // debug
 
@@ -207,8 +209,10 @@
       {
          backupConnectorFactory = null;
 
-         backupTransportParams = null;
+         backupTransportParams = null;                
       }
+      
+    //  log.info(System.identityHashCode(this) + " created cm with bcf " + this.backupConnectorFactory);
 
       this.maxConnections = maxConnections;
 
@@ -498,6 +502,11 @@
          }
       }
    }
+   
+   public void setNeverFail()
+   {
+      neverFail = true;
+   }
 
    // Public
    // ---------------------------------------------------------------------------------------
@@ -534,12 +543,16 @@
 
    private boolean failoverOrReconnect(final MessagingException me, final Object connectionID)
    {
+     // log.info(System.identityHashCode(this) + " connection manager failover or reconnect");
       // To prevent recursion
       if (inFailoverOrReconnect)
       {
+        // log.info("Already in it");
          return false;
       }
 
+     // log.info("Waiting on failover lock");
+      
       synchronized (failoverLock)
       {
          if (connectionID != null && !connections.containsKey(connectionID))
@@ -548,9 +561,13 @@
             // over then a async connection exception or disconnect
             // came in for one of the already closed connections, so we return true - we don't want to call the
             // listeners again
+            
+           // log.info("ALready failed over that connection");
 
             return true;
          }
+         
+        // log.info("Got failover lock");
 
          // Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure
          // There are either no threads executing in createSession, or one is blocking on a createSession
@@ -579,11 +596,16 @@
 
          boolean attemptFailover = (backupConnectorFactory) != null && (failoverOnServerShutdown || me.getCode() != MessagingException.DISCONNECTED);
 
+        // log.info("Attempt failover is " + attemptFailover);
+        // log.info("bcf " + backupConnectorFactory + " fss: " + failoverOnServerShutdown + " me code " + me.getCode());
+         
          boolean done = false;
 
          if (attemptFailover || reconnectAttempts != 0)
          {
+          //  log.info("Locking all channels");
             lockAllChannel1s();
+         //   log.info("Locked all channels");
 
             final boolean needToInterrupt;
 
@@ -594,6 +616,8 @@
 
             unlockAllChannel1s();
 
+           // log.info("Need to interrupt is " + needToInterrupt);
+            
             if (needToInterrupt)
             {
                // Forcing return all channels won't guarantee that any blocked thread will return immediately
@@ -616,8 +640,12 @@
                      }
                   }
                }
+               
+              // log.info("waited for create session to exit");
             }
 
+            //log.info("continuing");
+            
             // Now we absolutely know that no threads are executing in or blocked in createSession, and no
             // more will execute it until failover is complete
 
@@ -656,6 +684,8 @@
 
                transportParams = backupTransportParams;
 
+               //log.info(System.identityHashCode(this) + " set bcf to null");
+               
                backupConnectorFactory = null;
 
                backupTransportParams = null;
@@ -779,6 +809,8 @@
          }
       }
 
+    //  log.info("ok is " + ok);
+      
       if (ok)
       {
          // If all connections got ok, then handle failover
@@ -812,7 +844,7 @@
             return null;
          }
 
-         RemotingConnection connection = getConnection(initialRefCount);
+         RemotingConnection connection = internalGetConnection(initialRefCount);
 
          if (connection == null)
          {
@@ -888,11 +920,23 @@
          connector = null;
       }
    }
-
+   
    public RemotingConnection getConnection(final int initialRefCount)
    {
+      synchronized (createSessionLock)
+      {
+         synchronized (failoverLock)
+         {
+            return internalGetConnection(initialRefCount);
+         }
+      }
+   }
+
+   private RemotingConnection internalGetConnection(final int initialRefCount)
+   {
       RemotingConnection conn;
 
+      
       if (connections.size() < maxConnections)
       {
          // Create a new one
@@ -906,6 +950,12 @@
                DelegatingBufferHandler handler = new DelegatingBufferHandler();
 
                connector = connectorFactory.createConnector(transportParams, handler, this, threadPool);
+               
+               //For testing only - this makes sure that invm connector failures don't happen for backup connections
+               if (neverFail)
+               {
+                  connector.setNeverFail();
+               }
 
                if (connector != null)
                {

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -48,7 +48,7 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareMutex;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
@@ -117,7 +117,7 @@
     * */
    private final ReadWriteLock currentPageLock = new ReentrantReadWriteLock();
    
-   //private final Lock lock = ReplicationAwareReadWriteLock.createLock().writeLock();
+   //private final Lock lock = ReplicationAwareMutex.createLock().writeLock();
 
    private volatile boolean running = false;
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -44,7 +44,7 @@
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareMutex;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.utils.ExecutorFactory;
@@ -73,7 +73,7 @@
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
 
-   private final Lock lock;
+   private final ReplicationAwareMutex lock;
 
    private boolean started;
 
@@ -91,9 +91,7 @@
 
       this.server = server;
 
-      ReadWriteLock rwLock = new ReplicationAwareReadWriteLock("clusterqueuestatemanager", 0, true);
-
-      lock = rwLock.writeLock();
+      lock = new ReplicationAwareMutex("clusterqueuestatemanager", 0, true);
    }
 
    public synchronized void start()
@@ -121,16 +119,11 @@
       return started;
    }
 
-   public Lock getNotificationLock()
-   {
-      return lock;
-   }
-
    public void sendNotification(final Notification notification, final SimpleString dest) throws Exception
    {
       ServerMessage notificationMessage;
 
-      lock.lock();
+      lock.lock(0);
 
       try
       {
@@ -198,7 +191,7 @@
       Queue queue = (Queue)binding.getBindable();
 
       // Need to lock to make sure all queue info and notifications are in the correct order with no gaps
-      lock.lock();
+      lock.lock(1);
       try
       {
          // First send a reset message

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -59,7 +59,7 @@
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareMutex;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.transaction.Transaction;
@@ -111,7 +111,7 @@
 
    private final boolean persistIDCache;
 
-   private final ReadWriteLock lock;
+   private final ReplicationAwareMutex lock;
 
    public PostOfficeImpl(final StorageManager storageManager,
                          final PagingManager pagingManager,
@@ -151,7 +151,7 @@
 
       this.persistIDCache = persistIDCache;
 
-      lock = new ReplicationAwareReadWriteLock("postoffice", 0, true);
+      lock = new ReplicationAwareMutex("postoffice", 0, true);
    }
 
    // MessagingComponent implementation ---------------------------------------
@@ -175,7 +175,7 @@
    }
 
    public synchronized void stop() throws Exception
-   {      
+   {
 
       if (reaper != null)
       {
@@ -194,7 +194,6 @@
       return started;
    }
 
-   
    // PostOffice implementation -----------------------------------------------
 
    // TODO - needs to be locked to prevent happening concurrently with activate().
@@ -205,39 +204,37 @@
    public void addBinding(final Binding binding) throws Exception
    {
       boolean addressExists;
-      
-      lock.writeLock().lock();
-      
+
+      lock.lock(0);
+
       try
       {
          addressExists = addressManager.getBindingsForRoutingAddress(binding.getAddress()) != null;
 
          addressManager.addBinding(binding);
-         
-      }
-      finally
-      {
-         lock.writeLock().unlock();
-      }
 
-      if (binding.getType() == BindingType.LOCAL_QUEUE)
-      {
-         Queue queue = (Queue)binding.getBindable();
-
-         if (backup)
+         if (binding.getType() == BindingType.LOCAL_QUEUE)
          {
-            queue.setBackup();
-         }
+            Queue queue = (Queue)binding.getBindable();
 
-         managementService.registerQueue(queue, binding.getAddress(), storageManager);
+            if (backup)
+            {
+               queue.setBackup();
+            }
 
-         if (!addressExists)
-         {
-            managementService.registerAddress(binding.getAddress());
+            managementService.registerQueue(queue, binding.getAddress(), storageManager);
+
+            if (!addressExists)
+            {
+               managementService.registerAddress(binding.getAddress());
+            }
          }
       }
-      
-      
+      finally
+      {
+         lock.unlock();
+      }
+
       TypedProperties props = new TypedProperties();
 
       props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, binding.getType().toInt());
@@ -257,16 +254,16 @@
       if (filter != null)
       {
          props.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filter.getFilterString());
-      }        
-      
+      }
+
       String uid = UUIDGenerator.getInstance().generateStringUUID();
-      
+
       managementService.sendNotification(new Notification(uid, NotificationType.BINDING_ADDED, props));
    }
 
    public Binding removeBinding(final SimpleString uniqueName) throws Exception
    {
-      lock.writeLock().lock();
+      lock.lock(1);
       Binding binding;
       try
       {
@@ -274,7 +271,7 @@
       }
       finally
       {
-         lock.writeLock().unlock();
+         lock.unlock();
       }
       if (binding == null)
       {
@@ -298,8 +295,8 @@
          {
             managementService.unregisterAddress(binding.getAddress());
          }
-      }      
-            
+      }
+
       TypedProperties props = new TypedProperties();
 
       props.putStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
@@ -311,13 +308,13 @@
       props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
 
       managementService.sendNotification(new Notification(null, NotificationType.BINDING_REMOVED, props));
-      
+
       return binding;
    }
 
    public Bindings getBindingsForAddress(final SimpleString address)
    {
-      lock.readLock().lock();
+      lock.lock(2);
       try
       {
          Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
@@ -331,108 +328,109 @@
       }
       finally
       {
-         lock.readLock().unlock();
+         lock.unlock();
       }
    }
 
    public Binding getBinding(final SimpleString name)
    {
-      lock.readLock().lock();
+      lock.lock(3);
       try
       {
          return addressManager.getBinding(name);
       }
       finally
       {
-         lock.readLock().unlock();
+         lock.unlock();
       }
    }
-   
+
    public Binding getBindingByID(final long id)
    {
-      lock.readLock().lock();
+      lock.lock(4);
       try
       {
          return addressManager.getBindingByID(id);
       }
       finally
       {
-         lock.readLock().unlock();
+         lock.unlock();
       }
    }
 
    public Bindings getMatchingBindings(final SimpleString address)
    {
-      lock.readLock().lock();
+      lock.lock(5);
       try
       {
          return addressManager.getMatchingBindings(address);
       }
       finally
       {
-         lock.readLock().unlock();
+         lock.unlock();
       }
    }
 
    public void route(final ServerMessage message, Transaction tx) throws Exception
    {
-      SimpleString address = message.getDestination();
+      lock.lock(6);
+      try
+      {
+         SimpleString address = message.getDestination();
 
-      byte[] duplicateIDBytes = null;
+         byte[] duplicateIDBytes = null;
 
-      Object duplicateID = message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
+         Object duplicateID = message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
 
-      DuplicateIDCache cache = null;
+         DuplicateIDCache cache = null;
 
-      if (duplicateID != null)
-      {
-         cache = getDuplicateIDCache(message.getDestination());
-
-         if (duplicateID instanceof SimpleString)
+         if (duplicateID != null)
          {
-            duplicateIDBytes = ((SimpleString)duplicateID).getData();
-         }
-         else
-         {
-            duplicateIDBytes = (byte[])duplicateID;
-         }
+            cache = getDuplicateIDCache(message.getDestination());
 
-         if (cache.contains(duplicateIDBytes))
-         {
-            if (tx == null)
+            if (duplicateID instanceof SimpleString)
             {
-               log.trace("Duplicate message detected - message will not be routed");
+               duplicateIDBytes = ((SimpleString)duplicateID).getData();
             }
             else
             {
-               log.trace("Duplicate message detected - transaction will be rejected");
-
-               tx.markAsRollbackOnly(null);
+               duplicateIDBytes = (byte[])duplicateID;
             }
 
-            return;
+            if (cache.contains(duplicateIDBytes))
+            {
+               if (tx == null)
+               {
+                  log.trace("Duplicate message detected - message will not be routed");
+               }
+               else
+               {
+                  log.trace("Duplicate message detected - transaction will be rejected");
+
+                  tx.markAsRollbackOnly(null);
+               }
+
+               return;
+            }
          }
-      }
 
-      boolean startedTx = false;
+         boolean startedTx = false;
 
-      if (cache != null)
-      {
-         if (tx == null)
+         if (cache != null)
          {
-            // We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
+            if (tx == null)
+            {
+               // We need to store the duplicate id atomically with the message storage, so we need to create a tx for
+               // this
 
-            tx = new TransactionImpl(storageManager);
+               tx = new TransactionImpl(storageManager);
 
-            startedTx = true;
+               startedTx = true;
+            }
+
+            cache.addToCache(duplicateIDBytes, tx);
          }
 
-         cache.addToCache(duplicateIDBytes, tx);
-      }
-
-      lock.readLock().lock();
-      try
-      {
          if (tx == null)
          {
             if (pagingManager.page(message, true))
@@ -468,7 +466,7 @@
       }
       finally
       {
-         lock.readLock().unlock();
+         lock.unlock();
       }
    }
 
@@ -479,7 +477,7 @@
 
    public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final Transaction tx) throws Exception
    {
-      lock.readLock().lock();
+      lock.lock(7);
       try
       {
          Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getDestination());
@@ -495,7 +493,7 @@
       }
       finally
       {
-         lock.readLock().unlock();
+         lock.unlock();
       }
    }
 
@@ -553,25 +551,23 @@
       return cache;
    }
 
-   
    // Private -----------------------------------------------------------------
 
    private synchronized void startExpiryScanner()
    {
-      //TODO disabled for now
-//      if (reaperPeriod > 0)
-//      {
-//         reaper = new Reaper();
-//
-//         expiryReaper = new Thread(reaper, "JBM-expiry-reaper");
-//
-//         expiryReaper.setPriority(reaperPriority);
-//
-//         expiryReaper.start();
-//      }
+      // TODO disabled for now
+      // if (reaperPeriod > 0)
+      // {
+      // reaper = new Reaper();
+      //
+      // expiryReaper = new Thread(reaper, "JBM-expiry-reaper");
+      //
+      // expiryReaper.setPriority(reaperPriority);
+      //
+      // expiryReaper.start();
+      // }
    }
 
-  
    private final PageMessageOperation getPageOperation(final Transaction tx)
    {
       // you could have races on the case two sessions using the same XID

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -64,7 +64,9 @@
    void activate();
 
    void freeze();
-  
+   
+   //void freeze();
+   
    Connection getTransportConnection();
    
    boolean isActive();
@@ -76,4 +78,6 @@
    long getBlockingCallTimeout();
    
    RemotingConnection getReplicatingConnection();
+   
+   Thread getExecutingThread();
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -402,30 +402,17 @@
 
    public void replayCommands(final int otherLastConfirmedCommandID)
    {
-     // log.info("replaying, other last command id " + otherLastConfirmedCommandID);
+     // log.info(connection.isClient() + " replaying, other last command id " + otherLastConfirmedCommandID);
       
       if (otherLastConfirmedCommandID != -1)
       {
          clearUpTo(otherLastConfirmedCommandID);
       }
 
-      //log.info("Resend cache size is " + resendCache.size());
+     // log.info("Resend cache size is " + resendCache.size());
       
       for (final Packet packet : resendCache)
       {
-         //log.info("Replaying command " + packet);
-         
-//         if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
-//         {
-//            SessionReceiveMessage sm = (SessionReceiveMessage)packet;
-//            
-//            ServerMessage msg = sm.getServerMessage();
-//            
-//            int cnt = (Integer)msg.getProperty(new SimpleString("count"));
-//            
-//            log.info("resending message " + cnt);
-//         }
-         
          doWrite(packet);
       }
    }
@@ -463,7 +450,7 @@
       {
          receivedBytes = 0;
 
-         //log.info("Sending packets confirmed from flush");
+        // log.info("Sending packets confirmed from flush");
          
          sendConfirmation(lcid);     
       }
@@ -478,6 +465,8 @@
       //We need to queue packet confirmations too
       if (!queuedWriteManager.tryQueue(confirmed))
       {
+      //   log.info(connection.isClient() + " writing packets confirmed " + lastConfirmedID + " " + System.identityHashCode(this));
+         
          doWrite(confirmed);   
       }                    
    }
@@ -488,7 +477,7 @@
       {
          lastConfirmedCommandID++;
          
-         //log.info("last confirmed id is now " + lastConfirmedCommandID);
+        // log.info("sending confirm from confirm");
 
          receivedBytes += packet.getPacketSize();
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -12,12 +12,13 @@
 
 package org.jboss.messaging.core.remoting.impl;
 
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
 
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
@@ -27,12 +28,9 @@
 import org.jboss.messaging.core.remoting.Interceptor;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
 import org.jboss.messaging.core.remoting.spi.Connection;
-import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
-import org.jboss.messaging.core.remoting.spi.Connector;
-import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.server.replication.impl.JBMThread;
 import org.jboss.messaging.utils.SimpleIDGenerator;
 
 /**
@@ -47,40 +45,6 @@
 
    private static final Logger log = Logger.getLogger(RemotingConnectionImpl.class);
 
-   // Static
-   // ---------------------------------------------------------------------------------------
-
-   public static RemotingConnection createConnection(final ConnectorFactory connectorFactory,
-                                                     final Map<String, Object> params,
-                                                     final long callTimeout,
-                                                     final Executor threadPool,
-                                                     final ConnectionLifeCycleListener listener)
-   {
-      DelegatingBufferHandler handler = new DelegatingBufferHandler();
-
-      Connector connector = connectorFactory.createConnector(params, handler, listener, threadPool);
-
-      if (connector == null)
-      {
-         return null;
-      }
-
-      connector.start();
-
-      Connection tc = connector.createConnection();
-
-      if (tc == null)
-      {
-         return null;
-      }
-
-      RemotingConnection connection = new RemotingConnectionImpl(tc, callTimeout, null);
-
-      handler.conn = connection;
-
-      return connection;
-   }
-
    // Attributes
    // -----------------------------------------------------------------------------------
 
@@ -112,8 +76,6 @@
 
    private boolean idGeneratorSynced = false;
 
-   // private final Object freezeLock = new Object();
-
    private volatile boolean frozen;
 
    private final Object failLock = new Object();
@@ -163,7 +125,7 @@
 
       this.active = active;
 
-      this.client = client;
+      this.client = client;      
    }
 
    // RemotingConnection implementation
@@ -362,6 +324,13 @@
 
    private volatile Thread currentThread;
 
+   
+
+   public void activate()
+   {
+      active = true;
+   }
+
    public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
    {
       final Packet packet = decoder.decode(buffer);
@@ -400,14 +369,6 @@
             {
                channel.handlePacket(packet);
             }
-            else
-            {
-               log.info("cannot find handler for packet " + packet +
-                        " client " +
-                        this.isClient() +
-                        " active " +
-                        this.isActive());
-            }
          }
       }
       finally
@@ -415,93 +376,109 @@
          currentThread = null;
       }
    }
-
-   public void activate()
-   {
-      active = true;
-   }
-
-   private static final long FREEZE_TIMEOUT = 5000;
-
+   
    public void freeze()
    {
-      if (frozen)
-      {
-         return;
-      }
-      
-      frozen = true;
-
-      if (currentThread != null)
-      {
-         // long count = 0;
-         long start = System.currentTimeMillis();
-         while (true)
-         {
-            Thread thread = currentThread;
-            
-            if (thread != null)
-            {                             
-               if (thread instanceof JBMThread)
-               {
-                  JBMThread jthread = (JBMThread)thread;
-      
-                  jthread.setLastLockNonStrict();           
-               }
-               
-               log.info("waiting for thread to complete");
+      frozen = true;          
+   }
    
-               Thread.yield();
-   
-               if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
-               {
-                  throw new IllegalStateException("Timed out waiting for thread to complete");
-               }
-   
-               // count++;
-               //            
-               // if (count > 1)
-               // {
-               // Exception e = new Exception();
-               // e.setStackTrace(thread.getStackTrace());
-               // log.info("Waiting for this thread", e);
-               // }
-            }
-            else
-            {
-               break;
-            }
-         }
-         
-      }
-      //      
-      //      
-      //      
-      // //TODO check this logic
-      //      
-      // log.info("freezing connection");
-      //      
-      // JBMThread thread = null;
-      //      
-      // if (currentThread != null)
-      // {
-      // thread = (JBMThread)currentThread;
-      //         
-      // thread.jbmInterrupt();
-      // }
-      //            
-      // // Prevent any more packets being handled on this connection
-      //
-      // synchronized (freezeLock)
-      // {
-      // frozen = true;
-      // }
-      //      
-      // if (thread != null)
-      // {
-      // thread.jbmResetInterrupt();
-      // }
+   public Thread getExecutingThread()
+   {
+      return currentThread;
    }
+   
+//   public void freeze()
+//   {
+//      if (currentThread != null)
+//      {
+//         long start = System.currentTimeMillis();
+//
+//         while (true)
+//         {
+//            Thread thread = currentThread;
+//
+//            if (thread != null)
+//            {              
+//               log.info("waiting for thread to complete");
+//
+//               try
+//               {
+//                  Thread.sleep(1);
+//               }
+//               catch (InterruptedException ignore)
+//               {
+//               }
+//
+//               if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
+//               {
+//                  Exception e = new Exception();
+//                  e.setStackTrace(thread.getStackTrace());
+//                  log.info("Waiting for this thread", e);
+//                  
+//                  JBMThread jthread = null;
+//
+//                  if (thread instanceof JBMThread)
+//                  {
+//                     jthread = (JBMThread)thread;
+//
+//                     //jthread.setLastLockNonStrict();
+//                  }
+//
+//                  if (jthread != null)
+//                  {
+//                     SequencedLock lastLock = jthread.getLastLock();
+//
+//                     log.info("Owner of the lock which this thread is waiting for is ");
+//
+//                     if (lastLock != null)
+//                     {
+//                        log.info("Dumping last lock");
+//
+//                        lastLock.dump();
+//                     }                  
+//                  }
+//
+//                  // log.info("Gonna try a nudge");
+//                  // SequencedLock.dumpLocks();
+//
+//                  throw new IllegalStateException("Timed out waiting for thread to complete");
+//               }
+//            }
+//            else
+//            {
+//               break;
+//            }
+//         }
+//
+//      }
+//      //      
+//      //      
+//      //      
+//      // //TODO check this logic
+//      //      
+//      // log.info("freezing connection");
+//      //      
+//      // JBMThread thread = null;
+//      //      
+//      // if (currentThread != null)
+//      // {
+//      // thread = (JBMThread)currentThread;
+//      //         
+//      // thread.jbmInterrupt();
+//      // }
+//      //            
+//      // // Prevent any more packets being handled on this connection
+//      //
+//      // synchronized (freezeLock)
+//      // {
+//      // frozen = true;
+//      // }
+//      //      
+//      // if (thread != null)
+//      // {
+//      // thread.jbmResetInterrupt();
+//      // }
+//   }
 
    // Package protected
    // ----------------------------------------------------------------------------

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnector.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnector.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnector.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -52,6 +52,8 @@
    public static volatile int numberOfFailures = -1;
 
    private static volatile int failures;
+   
+   private volatile boolean neverFail;
 
    public static synchronized void resetFailures()
    {
@@ -125,10 +127,10 @@
    {
       return started;
    }
-
+   
    public Connection createConnection()
    {
-      if (failOnCreateConnection)
+      if (failOnCreateConnection && !neverFail)
       {
          incFailures();
          // For testing only
@@ -166,7 +168,13 @@
          conn.close();
       }
    }
+   
+   public void setNeverFail()
+   {
+      this.neverFail = true;
+   }
 
+
    // This may be an injection point for mocks on tests
    protected Connection internalCreateConnection(final BufferHandler handler,
                                                  final ConnectionLifeCycleListener listener,
@@ -218,5 +226,7 @@
       }
 
    }
+   
+   
 
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -18,6 +18,8 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.DataConstants;
+import org.jboss.messaging.utils.Pair;
+import org.jboss.messaging.utils.Triple;
 
 /**
  * 
@@ -33,13 +35,13 @@
 
    // Attributes ----------------------------------------------------
 
-   private List<Long> sequences;
+   private List<Triple<Long, Long, Integer>> sequences;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public ReplicateLockSequenceMessage(final List<Long> sequences)
+   public ReplicateLockSequenceMessage(final List<Triple<Long, Long, Integer>> sequences)
    {
       super(REPLICATE_LOCK_SEQUENCES);
 
@@ -55,16 +57,21 @@
 
    public int getRequiredBufferSize()
    {
-      return BASIC_PACKET_SIZE + DataConstants.SIZE_INT + sequences.size() * DataConstants.SIZE_LONG;
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
+      sequences.size() *
+      (2 * DataConstants.SIZE_LONG +
+      DataConstants.SIZE_INT);
    }
 
    @Override
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.writeInt(sequences.size());
-      for (long sequence : sequences)
+      for (Triple<Long, Long, Integer> sequence : sequences)
       {
-         buffer.writeLong(sequence);
+         buffer.writeLong(sequence.a);
+         buffer.writeLong(sequence.b);
+         buffer.writeInt(sequence.c);
       }
    }
 
@@ -72,14 +79,17 @@
    public void decodeBody(final MessagingBuffer buffer)
    {
       int len = buffer.readInt();
-      sequences = new ArrayList<Long>(len);
+      sequences = new ArrayList<Triple<Long, Long, Integer>>(len);
       for (int i = 0; i < len; i++)
       {
-         sequences.add(buffer.readLong());
+         Triple<Long, Long, Integer> pair = new Triple<Long, Long, Integer>(buffer.readLong(),
+                                                                            buffer.readLong(),
+                                                                            buffer.readInt());
+         sequences.add(pair);
       }
    }
 
-   public List<Long> getSequences()
+   public List<Triple<Long, Long, Integer>> getSequences()
    {
       return sequences;
    }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/spi/Connector.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/spi/Connector.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/spi/Connector.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -45,4 +45,6 @@
     * @return The connection, or null if unable to create a connection (e.g. network is unavailable)
     */
    Connection createConnection();   
+   
+   void setNeverFail();
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -28,6 +28,8 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.management.MBeanServer;
 
@@ -99,7 +101,9 @@
 import org.jboss.messaging.core.server.cluster.impl.ClusterManagerImpl;
 import org.jboss.messaging.core.server.replication.Replicator;
 import org.jboss.messaging.core.server.replication.impl.JBMThread;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareMutex;
 import org.jboss.messaging.core.server.replication.impl.ReplicatorImpl;
+import org.jboss.messaging.core.server.replication.impl.SequencedLock;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
@@ -204,7 +208,12 @@
    private int managementConnectorID;
 
    private static AtomicInteger managementConnectorSequence = new AtomicInteger(0);
+   
+   private ConnectionManager pooledReplicatingConnectionManager;
 
+   private ConnectionManager nonPooledReplicatingConnectionManager;
+
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -539,7 +548,7 @@
                                                " not compatible with version: " +
                                                version.getFullVersion());
       }
-      
+
       if (!direct)
       {
          if (!registerBackupConnection(connection))
@@ -642,7 +651,7 @@
       session.setHandler(handler);
 
       channel.setHandler(handler);
-      
+
       return new CreateSessionResponseMessage(version.getIncrementingVersion());
    }
 
@@ -716,7 +725,7 @@
                                             "). You're probably trying to restart a live backup pair after a crash");
          }
 
-         log.info("Backup server is now operational");
+         log.info("Backup server is now ready");
       }
    }
 
@@ -852,14 +861,6 @@
       activateCallbacks.remove(callback);
    }
 
-   // private ConnectorFactory backupConnectorFactory;
-   //
-   // private Map<String, Object> backupConnectorParams;
-
-   private volatile ConnectionManager pooledReplicatingConnectionManager;
-
-   private volatile ConnectionManager nonPooledReplicatingConnectionManager;
-
    private void setupConnectionManagers()
    {
       String backupConnectorName = configuration.getBackupConnectorName();
@@ -886,6 +887,8 @@
                                                                            0,
                                                                            this.threadPool,
                                                                            this.scheduledPool);
+            
+            pooledReplicatingConnectionManager.setNeverFail();
 
             nonPooledReplicatingConnectionManager = new ConnectionManagerImpl(null,
                                                                               backupConnector,
@@ -900,6 +903,7 @@
                                                                               0,
                                                                               this.threadPool,
                                                                               this.scheduledPool);
+            nonPooledReplicatingConnectionManager.setNeverFail();
          }
       }
    }
@@ -965,8 +969,6 @@
 
    private Set<RemotingConnection> backupConnections = new WeakHashSet<RemotingConnection>();
 
-   
-
    private static class NoCacheConnectionLifeCycleListener implements ConnectionLifeCycleListener
    {
       private RemotingConnection conn;
@@ -1056,7 +1058,8 @@
                queueDeployer.start();
             }
          }
-         log.info("Backup server is now ACTIVATED");
+         
+         log.info("Backup server is now activated");
       }
 
       connection.activate();
@@ -1071,30 +1074,183 @@
    // connection 1 delivery
    // connection 1 delivery gets replicated
    // can't find message in queue since active was delivered immediately
-   
+
    private boolean frozen;
-   
+
+   // We have to do a bit of locking jiggery-pokery to ensure we don't get connections being registered and blocking
+   // while freezing is in progress
+   private final Lock flock = new ReentrantLock();
+
+   private static final long FREEZE_TIMEOUT = 2000;
+
    private void freezeBackupConnections()
    {
+      flock.lock();
+      
+     // log.info("** freezing backup connections");
+
       synchronized (backupConnections)
       {
          frozen = true;
+
+         flock.unlock();
+
+         // Needs to be done in two stages - once set locks non strict
+
+         freezeConnections();
+
+         // Wait for threads to exit
          
-         for (RemotingConnection rc : backupConnections)
+         boolean timedOut = false;
+
+         long start = System.currentTimeMillis();
+                  
+         outer: for (RemotingConnection rc : backupConnections)
+         {                      
+            while (true)
+            {
+               Thread executingThread = rc.getExecutingThread();
+               
+               if (executingThread == null)
+               {
+                  break;
+               }
+               
+               try
+               {
+                  Thread.sleep(1);
+               }
+               catch (InterruptedException ignore)
+               {
+               }   
+               
+               if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
+               {
+                  timedOut = true;
+                  
+                  break outer;
+               }
+            }                       
+         }
+         
+         if (timedOut)
          {
-            rc.freeze();
+           // log.info("*** timedout waiting for threads to exit");
+            //Now we assert that all remaining threads are waiting for a SequencedLock
+            
+            for (RemotingConnection rc : backupConnections)
+            {
+               Thread executingThread = rc.getExecutingThread();
+               
+               if (executingThread != null & executingThread instanceof JBMThread)
+               {
+                  JBMThread jthread = (JBMThread)executingThread;
+                  
+                  if (!jthread.isWaitingOnSequencedLock())
+                  {
+                     String msg = "Thread is not waiting on SequencedLock";
+                     Exception e = new Exception();
+                     e.setStackTrace(jthread.getStackTrace());
+                     log.error(msg, e);
+                     throw new IllegalStateException(msg);
+                  }
+               }                              
+            }
+            
+           // log.info("** all threads waiting on sequenced locks");
+            
+            //Now we can freeze out all the locks, after setting all threads to replay = false
+            //and frozen to true
+         
+            for (RemotingConnection rc : backupConnections)
+            {
+               Thread executingThread = rc.getExecutingThread();
+               
+               if (executingThread != null & executingThread instanceof JBMThread)
+               {
+                  JBMThread jthread = (JBMThread)executingThread;
+                  
+                  jthread.setNoReplayOrRecord();
+                  
+                  jthread.setFrozen();
+               }                              
+            }
+            
+           // log.info("** set all threads to frozen");
+            
+            ReplicationAwareMutex.freezeOutAll();
+         
+            //Now we interrupt all those threads
+            
+            for (RemotingConnection rc : backupConnections)
+            {
+               Thread executingThread = rc.getExecutingThread();
+               
+               if (executingThread != null & executingThread instanceof JBMThread)
+               {
+                  JBMThread jthread = (JBMThread)executingThread;
+                  
+                  jthread.interrupt();
+               }                              
+            }
+            
+            //Now we wait again for the threads to complete
+            
+            start = System.currentTimeMillis();
+                        
+            for (RemotingConnection rc : backupConnections)
+            {                         
+               while (true)
+               {
+                  Thread executingThread = rc.getExecutingThread();
+                  
+                  if (executingThread == null)
+                  {
+                     break;
+                  }
+                  
+                  try
+                  {
+                     Thread.sleep(1);
+                  }
+                  catch (InterruptedException ignore)
+                  {
+                  }
+                  
+                  if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
+                  {
+                     String msg = "Thread won't freeze out";
+                     Exception e = new Exception();
+                     e.setStackTrace(executingThread.getStackTrace());
+                     log.error(msg, e);
+                     throw new IllegalStateException(msg);
+                  }
+               }
+            }
          }
       }
    }
-   
+
+   private void freezeConnections()
+   {
+      for (RemotingConnection rc : backupConnections)
+      {
+         rc.freeze();
+      }
+   }
+
    public boolean registerBackupConnection(final RemotingConnection connection)
    {
-      synchronized (backupConnections)
+      flock.lock();
+
+      try
       {
          if (!frozen)
          {
-            backupConnections.add(connection);
-            
+            synchronized (backupConnections)
+            {
+               backupConnections.add(connection);
+            }
             return true;
          }
          else
@@ -1102,6 +1258,10 @@
             return false;
          }
       }
+      finally
+      {
+         flock.unlock();
+      }
    }
 
    private void initialisePart1() throws Exception

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -41,6 +41,7 @@
 import org.jboss.messaging.core.server.replication.ReplicableAction;
 import org.jboss.messaging.core.server.replication.Replicator;
 import org.jboss.messaging.core.server.replication.impl.JBMThread;
+import org.jboss.messaging.utils.Triple;
 
 /**
  * A packet handler for all packets that need to be handled at the server level
@@ -61,7 +62,7 @@
 
    private Replicator replicator;
 
-   private volatile List<Long> sequences;
+   private volatile List<Triple<Long, Long, Integer>> sequences;
 
    public MessagingServerPacketHandler(final MessagingServer server,
                                        final Channel channel1,
@@ -116,19 +117,20 @@
             RegisterQueueReplicationChannelMessage msg = (RegisterQueueReplicationChannelMessage)packet;
 
             Channel channel = connection.getChannel(msg.getBindingID(), -1, false);
-            
-            server.registerBackupConnection(channel.getConnection());
 
-            channel.setHandler(new QueueReplicationPacketHandler(msg.getBindingID(), server.getPostOffice(), channel));
+            if (server.registerBackupConnection(channel.getConnection()))
+            {
+               channel.setHandler(new QueueReplicationPacketHandler(msg.getBindingID(), server.getPostOffice(), channel));
+            }
 
             break;
          }
          case UNREGISTER_QUEUE_REPLICATION_CHANNEL:
-         {            
+         {
             UnregisterQueueReplicationChannelMessage msg = (UnregisterQueueReplicationChannelMessage)packet;
 
             Channel channel = connection.getChannel(msg.getBindingID(), -1, false);
-            
+
             channel.setHandler(null);
 
             channel.close();
@@ -199,7 +201,7 @@
 
       if (server.getConfiguration().isBackup() || type == REPLICATE_STARTUP_INFO)
       {
-         channel1.send(new ReplicationResponseMessage());
+         channel1.send(new ReplicationResponseMessage());                 
       }
    }
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -56,7 +56,7 @@
 import org.jboss.messaging.core.server.cluster.impl.Redistributor;
 import org.jboss.messaging.core.server.replication.ReplicableCall;
 import org.jboss.messaging.core.server.replication.Replicator;
-import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareMutex;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.transaction.Transaction;
@@ -143,7 +143,7 @@
 
    private final Set<Consumer> consumers = new HashSet<Consumer>();
 
-   private final Lock lock;
+   private final ReplicationAwareMutex lock;
    
    private final Executor executor;
 
@@ -193,9 +193,9 @@
 
       scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
 
-      ReplicationAwareReadWriteLock rwLock = new ReplicationAwareReadWriteLock(name.toString(), 0, true);
+      lock = new ReplicationAwareMutex(name.toString(), 0, true);
       
-      lock = rwLock.writeLock();
+     // lock = rwLock.writeLock();
 
       this.replicator = replicator;
       
@@ -406,15 +406,15 @@
          return;
       }
 
-      if (waitingToDeliver.compareAndSet(false, true))
-      {         
+     // if (waitingToDeliver.compareAndSet(false, true))
+    //  {         
          executor.execute(deliverRunner);
-      }
+     // }
    }
 
    public void addConsumer(final Consumer consumer) throws Exception
    {
-      lock.lock();
+      lock.lock(0);
       try
       {
          cancelRedistributor();
@@ -431,7 +431,7 @@
 
    public boolean removeConsumer(final Consumer consumer) throws Exception
    {
-      lock.lock();
+      lock.lock(1);
       try
       {
          boolean removed = distributionPolicy.removeConsumer(consumer);
@@ -453,7 +453,7 @@
 
    public void addRedistributor(final long delay, final Executor executor)
    {
-      lock.lock();
+      lock.lock(2);
 
       try
       {
@@ -511,7 +511,7 @@
 
    public int getConsumerCount()
    {
-      lock.lock();
+      lock.lock(3);
 
       try
       {
@@ -530,7 +530,7 @@
 
    public List<MessageReference> list(final Filter filter)
    {
-      lock.lock();
+      lock.lock(4);
 
       try
       {
@@ -561,7 +561,7 @@
 
    public MessageReference removeReferenceWithID(final long id) throws Exception
    {
-      lock.lock();
+      lock.lock(5);
 
       try
       {
@@ -601,7 +601,7 @@
 
    public MessageReference removeFirstReference(final long id) throws Exception
    {
-      lock.lock();
+      lock.lock(6);
 
       try
       {
@@ -628,7 +628,7 @@
 
    public MessageReference getReference(final long id)
    {
-      lock.lock();
+      lock.lock(7);
 
       try
       {
@@ -654,7 +654,7 @@
 
    public int getMessageCount()
    {
-      lock.lock();
+      lock.lock(8);
 
       try
       {
@@ -680,7 +680,7 @@
 
    public int getScheduledCount()
    {
-      lock.lock();
+      lock.lock(9);
 
       try
       {
@@ -694,7 +694,7 @@
 
    public List<MessageReference> getScheduledMessages()
    {
-      lock.lock();
+      lock.lock(10);
 
       try
       {
@@ -761,6 +761,8 @@
 
          if (oper == null)
          {
+            //log.info("Creating new refs operation");
+            
             oper = new RefsOperation();
 
             tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper);
@@ -779,7 +781,7 @@
 
    public void cancel(final MessageReference reference) throws Exception
    {
-      lock.lock();
+      lock.lock(11);
 
       try
       {
@@ -853,7 +855,7 @@
       
       Transaction tx = new TransactionImpl(storageManager);
       
-      lock.lock();
+      lock.lock(12);
 
       try
       {
@@ -899,7 +901,7 @@
 
       Transaction tx = new TransactionImpl(storageManager);
 
-      lock.lock();
+      lock.lock(13);
 
       try
       {         
@@ -930,7 +932,7 @@
 
    public boolean expireReference(final long messageID) throws Exception
    {
-      lock.lock();
+      lock.lock(14);
 
       try
       {
@@ -961,7 +963,7 @@
 
       int count = 0;
       
-      lock.lock();
+      lock.lock(15);
 
       try
       {         
@@ -991,7 +993,7 @@
 
    public void expireReferences() throws Exception
    {
-      lock.lock();
+      lock.lock(16);
 
       try
       {
@@ -1011,7 +1013,7 @@
 
    public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
    {
-      lock.lock();
+      lock.lock(17);
 
       try
       {
@@ -1038,7 +1040,7 @@
 
    public boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
    {
-      lock.lock();
+      lock.lock(18);
 
       try
       {
@@ -1069,7 +1071,7 @@
 
       int count = 0;
       
-      lock.lock();
+      lock.lock(19);
 
       try
       {         
@@ -1111,7 +1113,7 @@
 
    public boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception
    {
-      lock.lock();
+      lock.lock(20);
       try
       {
          List<MessageReference> refs = list(null);
@@ -1204,7 +1206,7 @@
 
    public void lock()
    {
-      lock.lock();
+      lock.lock(21);
    }
 
    public void unlock()
@@ -1417,7 +1419,7 @@
 
    public HandleStatus deliverOne()
    {
-      lock.lock();
+      lock.lock(22);
 
       direct = false;
 
@@ -1518,7 +1520,7 @@
 
    private void add(final MessageReference ref, final boolean first)
    {
-      lock.lock();
+      lock.lock(23);
       try
       {
          if (!first)
@@ -1667,7 +1669,7 @@
 
    protected void postRollback(LinkedList<MessageReference> refs) throws Exception
    {
-      lock.lock();
+      lock.lock(24);
       try
       {
          for (MessageReference ref : refs)
@@ -1696,7 +1698,7 @@
       public void run()
       {
          // Must be set to false *before* executing to avoid race
-         waitingToDeliver.set(false);
+         //waitingToDeliver.set(false);
 
          deliverAll();
       }
@@ -1762,6 +1764,8 @@
       synchronized void addRef(final MessageReference ref)
       {
          refsToAdd.add(ref);
+         
+         //log.info("adding ref, now there are " + refsToAdd.size());
       }
 
       synchronized void addAck(final MessageReference ref)
@@ -1825,6 +1829,8 @@
 
       public void afterCommit(final Transaction tx) throws Exception
       {
+         //log.info("after commit , to add " + refsToAdd.size() + " to ack " + refsToAck.size());
+         
          for (MessageReference ref : refsToAdd)
          {
             ref.getQueue().addLast(ref);
@@ -1883,7 +1889,7 @@
 
       public void run()
       {
-         lock.lock();
+         lock.lock(25);
          try
          {
             internalAddRedistributor(executor);

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -35,9 +35,10 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
-import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.replication.impl.JBMThread;
+import org.jboss.messaging.utils.Pair;
+import org.jboss.messaging.utils.Triple;
 
 /**
  * A QueueReplicationPacketHandler
@@ -54,7 +55,7 @@
    
    private volatile Queue queue;
 
-   private volatile List<Long> sequences;
+   private volatile List<Triple<Long, Long, Integer>> sequences;
    
    private final PostOffice postOffice;
    
@@ -105,7 +106,7 @@
             
             queue.deliverOne();
             
-            channel.send(new ReplicationResponseMessage());            
+            channel.send(new ReplicationResponseMessage());
             
             thread.setNoReplayOrRecord();
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -51,7 +51,7 @@
 import org.jboss.messaging.core.server.ServerConsumer;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerSession;
-import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareMutex;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.utils.TypedProperties;
@@ -87,7 +87,7 @@
 
    private final Executor executor;
 
-   private final Lock lock;
+   private final ReplicationAwareMutex lock;
 
    private int availableCredits;
 
@@ -112,8 +112,6 @@
 
    private final Channel channel;
 
-   private volatile boolean closed;
-
    private final boolean preAcknowledge;
 
    private final ManagementService managementService;
@@ -134,8 +132,7 @@
                              final PagingManager pagingManager,
                              final Channel channel,
                              final boolean preAcknowledge,
-                             final boolean updateDeliveries,
-                            // final Executor executor,
+                             final boolean updateDeliveries,      
                              final ManagementService managementService) throws Exception
    {
       this.id = id;
@@ -168,9 +165,7 @@
 
       this.updateDeliveries = updateDeliveries;
 
-      ReplicationAwareReadWriteLock rwLock = new ReplicationAwareReadWriteLock("consumer " + id, 0, true);
-      
-      lock = rwLock.writeLock();
+      lock = new ReplicationAwareMutex("consumer " + session.getName() + "-" + id, 0, true);
             
       binding.getQueue().addConsumer(this);
    }
@@ -210,8 +205,6 @@
 
       Iterator<MessageReference> iter = refs.iterator();
 
-      closed = true;
-
       Transaction tx = new TransactionImpl(storageManager);
 
       while (iter.hasNext())
@@ -282,7 +275,7 @@
 
    public void setStarted(final boolean started)
    {
-      lock.lock();
+      lock.lock(0);
 
       try
       {
@@ -304,7 +297,7 @@
    {
       boolean promptDelivery = false;
 
-      lock.lock();
+      lock.lock(1);
 
       try
       {
@@ -364,9 +357,7 @@
                                             " backup = " +
                                             messageQueue.isBackup() +
                                             " queue = " +
-                                            messageQueue.getName() +
-                                            " closed = " +
-                                            closed);
+                                            messageQueue.getName());
          }
 
          if (autoCommitAcks)
@@ -412,9 +403,7 @@
       {
          throw new IllegalStateException("Could not find reference with id " + messageID +
                                          " backup " +
-                                         messageQueue.isBackup() +
-                                         " closed " +
-                                         closed);
+                                         messageQueue.isBackup());
       }
 
       return ref;
@@ -436,7 +425,7 @@
    /** To be used on tests only */
    public int getAvailableCredits()
    {
-      lock.lock();
+      lock.lock(2);
       try
       {
          return availableCredits;
@@ -485,7 +474,7 @@
 
    private HandleStatus doHandle(final MessageReference ref) throws Exception
    {
-      lock.lock();
+      lock.lock(3);
 
       try
       {
@@ -591,7 +580,7 @@
    {
       public void run()
       {
-         lock.lock();
+         lock.lock(4);
          try
          {
             if (largeMessageDeliverer == null || largeMessageDeliverer.deliver())
@@ -637,7 +626,7 @@
 
       public boolean deliver()
       {
-         lock.lock();
+         lock.lock(5);
 
          try
          {

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -76,12 +76,12 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
-import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerSession;
 import org.jboss.messaging.core.server.replication.ReplicableAction;
 import org.jboss.messaging.core.server.replication.Replicator;
 import org.jboss.messaging.core.server.replication.impl.JBMThread;
-import org.jboss.messaging.utils.SimpleString;
+import org.jboss.messaging.utils.Pair;
+import org.jboss.messaging.utils.Triple;
 
 /**
  * A ServerSessionPacketHandler
@@ -104,7 +104,7 @@
 
    //TODO the sequences and repl response can be encapsulated in a super class
    
-   private volatile List<Long> sequences;
+   private volatile List<Triple<Long, Long, Integer>> sequences;
    
    private final Channel channel;
 
@@ -156,6 +156,7 @@
          {
             //checkConfirm(packet);
             
+           // log.info("sending confirm on sess handle packet");
             channel.confirm(packet);
             
 //            if (packet.getType() == PacketImpl.SESS_SEND)
@@ -169,7 +170,6 @@
 //               log.info("confirmed send " + cnt);
 //            }
                
-                       
             channel.send(new ReplicationResponseMessage());
          }
          
@@ -210,6 +210,7 @@
       //TODO this is a bit hacky
       if (packet.getType() != PacketImpl.SESS_CLOSE)
       {
+        // log.info("sending confirm from sess close");
          channel.confirm(packet);
       }
    }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -24,9 +24,12 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.server.replication.Replicator;
+import org.jboss.messaging.utils.Triple;
 
 /**
  * A JBMThread
@@ -43,27 +46,76 @@
    {
       RECORD, REPLAY, NONE;
    }
-   
+
    private ThreadState state;
 
-   private List<Long> objectSequences;
+   private List<Triple<Long, Long, Integer>> objectSequences;
 
-   private int pos;
+   private final AtomicInteger pos = new AtomicInteger(0);
+
+   private Replicator replicator;
    
-   private Replicator replicator;    
+   private volatile boolean waitingOnSequencedLock;
    
- //  private volatile boolean jbmInterrupted;
-      
+   public boolean isWaitingOnSequencedLock()
+   {
+      return this.waitingOnSequencedLock;
+   }
+   
+   public void setWaitingOnSequencedLock(final boolean waiting)
+   {
+      this.waitingOnSequencedLock = waiting;
+   }
+   
+   private volatile boolean frozen;
+   
+   public void setFrozen()
+   {
+      this.frozen = true;
+   }
+   
+   public boolean isFrozen()
+   {
+      return frozen;
+   }
+
+   //private volatile SequencedLock lastLock;
+
+//   public void dumpSequences()
+//   {
+//      log.info("Dumping thread sequences, current pos = " + pos + " length is " + objectSequences.size());
+//      int cnt = 0;
+//      for (Triple<Long, Long, Integer> pair : objectSequences)
+//      {
+//         long id = pair.a;
+//         if (id == -1)
+//         {
+//            log.info("[" + cnt++ + ", " + id + "] COUNTER: " + pair.b);
+//         }
+//         else
+//         {
+//            SequencedLock lock = SequencedLock.getLock(id);
+//
+//            log.info("[" + cnt++ + ", " + id + "] " + lock.getName() + ": " + pair.b + ": " + pair.c);
+//         }
+//
+//      }
+//   }
+
    public static JBMThread currentThread()
    {
       return (JBMThread)Thread.currentThread();
    }
-   
+
+   public JBMThread()
+   {
+   }
+
    public JBMThread(final ThreadGroup threadGroup, final String name)
    {
       super(threadGroup, name);
    }
-   
+
    public JBMThread(final ThreadGroup threadGroup, final Runnable target, final String name)
    {
       super(threadGroup, target, name);
@@ -73,115 +125,116 @@
    {
       return state == ThreadState.REPLAY;
    }
-   
+
    public boolean isRecording()
    {
       return state == ThreadState.RECORD;
    }
-   
-   public void setReplay(final List<Long> objectSequences)
+
+   public void setReplay(final List<Triple<Long, Long, Integer>> objectSequences)
    {
       this.objectSequences = objectSequences;
-      
+
       this.state = ThreadState.REPLAY;
-      
-      this.pos = 0;
+
+      this.pos.set(0);
    }
-   
-//   private volatile long sequence;
-//   
-//   private AtomicLong seqCounter = new AtomicLong(0);
-   
+
    public void setRecord(final Replicator replicator)
    {
-      //this.sequence = seqCounter.getAndIncrement();
+      // this.sequence = seqCounter.getAndIncrement();
       if (this.objectSequences == null)
       {
-         this.objectSequences = new ArrayList<Long>();
+         this.objectSequences = new ArrayList<Triple<Long, Long, Integer>>();
       }
       else
       {
          this.objectSequences.clear();
       }
-      
+
       this.state = ThreadState.RECORD;
-      
-      this.pos = 0;
-      
+
+      this.pos.set(0);
+
       this.replicator = replicator;
    }
-   
+
    public void setNoReplayOrRecord()
    {
       this.state = ThreadState.NONE;
+
+      //this.strict = true;
    }
 
    public void resumeRecording()
    {
       this.state = ThreadState.RECORD;
    }
-   
+
    public void resumeReplay()
    {
       this.state = ThreadState.REPLAY;
    }
-   
-   public long getNextSequence()
+
+   public Triple<Long, Long, Integer> getNextSequence()
    {
-      return objectSequences.get(pos++);
+      return objectSequences.get(pos.getAndIncrement());
    }
 
-   public void addSequence(final long currentSequence)
+   public void addSequence(final Triple<Long, Long, Integer> currentSequence)
    {
       objectSequences.add(currentSequence);
    }
-   
-   public List<Long> getSequences()
+
+   public List<Triple<Long, Long, Integer>> getSequences()
    {
       return objectSequences;
    }
-   
+
    public Replicator getReplicator()
    {
       return replicator;
    }
-   
+
    public void setReplicator(final Replicator replicator)
    {
       this.replicator = replicator;
    }
-   
-//   public boolean isJBMInterrupted()
+
+//   private boolean strict = true;
+//
+//   public synchronized SequencedLock getLastLock()
 //   {
-//      return jbmInterrupted;
+//      return this.lastLock;
 //   }
-   
-   public void setLastLock(final SequencedLock lock)
+//
+//   public synchronized void setLastLock(final SequencedLock lock)
+//   {
+//      this.lastLock = lock;
+//
+//      if (!strict)
+//      {
+//         this.lastLock.setNonStrict();
+//      }
+//   }
+//
+//   public synchronized void setLastLockNonStrict()
+//   {
+//      if (lastLock != null)
+//      {
+//         lastLock.setNonStrict();
+//      }
+//
+//      strict = false;
+//   }
+
+   public void jbmPark(final long toWait)
    {
-      this.lastLock = lock;
+      LockSupport.parkNanos(toWait);
    }
-   
-   private volatile SequencedLock lastLock;
-   
-   public void setLastLockNonStrict()
+
+   public synchronized void jbmUnpark()
    {
-      if (lastLock != null)
-      {
-         lastLock.setStrict(false);
-      }
+      LockSupport.unpark(this);
    }
-   
-//   public void jbmInterrupt()
-//   {
-//      log.info("Interrupting jbm thread");
-//      
-//      jbmInterrupted = true;
-//      
-//      interrupt();
-//   }
-//   
-//   public void jbmResetInterrupt()
-//   {
-//      jbmInterrupted = false;
-//   }
 }

Copied: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java (from rev 7559, branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java)
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java	                        (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -0,0 +1,249 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.replication.impl;
+
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.ConcurrentHashSet;
+import org.jboss.messaging.utils.Triple;
+import org.jboss.messaging.utils.WeakHashSet;
+
+/**
+ * A ReplicationAwareMutex
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class ReplicationAwareMutex
+{
+   private static final Logger log = Logger.getLogger(ReplicationAwareMutex.class);
+
+   private final AtomicInteger counter;
+
+   private final SequencedLock sequencedLock;
+
+   private final long id;
+
+   private static final AtomicLong idSequence = new AtomicLong(0);
+
+   private final ReentrantLock lock;
+   
+   private volatile Thread unfreezeOwner;
+
+   private volatile CountDownLatch freezeLatch;
+   
+   private static final Set<ReplicationAwareMutex> allMutexes = new WeakHashSet<ReplicationAwareMutex>();
+
+   public ReplicationAwareMutex(final String name, final int initialCount, final boolean debug)
+   {
+      this.id = idSequence.getAndIncrement();
+
+      lock = new ReentrantLock();
+
+      sequencedLock = new SequencedLock(name, initialCount);
+
+      counter = new AtomicInteger(initialCount);
+      
+      synchronized (allMutexes)
+      {
+         allMutexes.add(this);
+      }
+   }
+   
+   public static void freezeOutAll()
+   {
+      synchronized (allMutexes)
+      {
+         for (ReplicationAwareMutex mutex: allMutexes)
+         {
+            log.info("*** freezing out mutex");
+            mutex.freezeOut();
+         }
+      }
+   }
+   
+   public void lock(final int methodID)
+   {
+      try
+      {
+         doLock(10000, TimeUnit.MILLISECONDS, methodID);
+      }
+      catch (InterruptedException e)
+      {
+      }
+   }
+
+   public void unlock()
+   {
+      this.doUnlock();
+   }
+
+   public void freezeOut()
+   {
+      if (sequencedLock.getOwner() != null)
+      {
+         freezeLatch = new CountDownLatch(1);
+         
+         unfreezeOwner = sequencedLock.getOwner();
+      }           
+   }
+
+   private boolean doLock(long time, TimeUnit unit, int methodID) throws InterruptedException
+   {
+      JBMThread thread = JBMThread.currentThread();
+
+      // debug only
+      if (owners.contains(thread))
+      {
+         // Exception e = new Exception();
+         // e.setStackTrace(stackTraces.get(thread));
+         // log.error("Stateful staticLock is not re-entrant, first obtained here", e);
+         // Exception e2 = new Exception();
+         // log.info("Second attempt to obtain here", e2);
+         throw new IllegalStateException("Lock is NOT re-entrant!");
+      }
+
+      if (thread.isReplay())
+      {
+         Triple<Long, Long, Integer> pair = thread.getNextSequence();
+
+         // // Sanity check
+         // String otherName = SequencedLock.getLock(pair.a).getName();
+         //            
+         // //If sequencedLock
+         //            
+         // if ((!otherName.equals(name) || methodID != pair.c))
+         // {
+         // String msg = "Invalid object id, expecting " + name + ": " + methodID + " got " + otherName + ": " + pair.c
+         // +
+         // " lock id is " + pair.a;
+         //
+         // log.error(msg);
+         //
+         // thread.dumpSequences();
+         //               
+         // SequencedLock.dumpLockMap();
+         //
+         // throw new IllegalStateException(msg);
+         // }
+
+         long sequence = pair.b;
+
+         try
+         {
+            if (!sequencedLock.lock(sequence, unit.toNanos(time)))
+            {
+               // dumpLocksWithName(name);
+            }
+         }
+         catch (InterruptedException e)
+         {
+            if (thread.isFrozen())
+            {
+               log.info("** interrupted and retrying");
+               //We retry and this time it will use the standard mutex - this happens on freezing out
+               return doLock(time, unit, methodID);
+            }
+         }
+
+         addOwner(thread);
+
+         return true;
+      }
+      else
+      {
+         if (unfreezeOwner != null)
+         {
+            //There was an original owner on the SequencedLock when we interrupted so we can't allow any other threads to get the lock
+            //until he has returned
+            freezeLatch.await();
+         }
+         
+         boolean ok = lock.tryLock(time, unit);
+
+         if (ok)
+         {
+            if (thread.isRecording())
+            {
+               long sequence = counter.getAndIncrement();
+
+               thread.addSequence(new Triple<Long, Long, Integer>(id, sequence, methodID));
+            }
+
+            addOwner(thread);
+         }
+
+         return ok;
+      }
+   }
+   
+   private void doUnlock()
+   {
+      JBMThread thread = JBMThread.currentThread();
+
+      if (thread == unfreezeOwner)
+      {
+         // Don't actually unlock this, since we never had the lock - we had the lock on the original SequencedLock
+         
+         unfreezeOwner = null;
+         
+         freezeLatch.countDown();
+      }
+      else
+      {
+         if (thread.isReplay())
+         {
+            sequencedLock.unlock();
+         }
+         else
+         {
+            lock.unlock();
+         }
+      }
+
+      removeOwner(thread);
+   }
+
+   // debug only
+   private void addOwner(final JBMThread thread)
+   {
+      owners.add(thread);
+   }
+
+   // debug only
+   private void removeOwner(final JBMThread thread)
+   {
+      owners.remove(thread);
+   }
+
+   // For debug
+   private Set<Thread> owners = new ConcurrentHashSet<Thread>();
+
+}

Deleted: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -1,333 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.server.replication.impl;
-
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.utils.ConcurrentHashSet;
-
-/**
- * A ReplicationAwareReadWriteLock
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public class ReplicationAwareReadWriteLock implements ReadWriteLock
-{
-   private static final Logger log = Logger.getLogger(ReplicationAwareReadWriteLock.class);
-
-   private final Lock writeLock = new StatefulObjectWriteLock();
-
-   private final ReadWriteLock rwLock;
-
-   private final AtomicInteger counter;
-
-   private final SequencedLock sequencedLock;
-
-   private String name;
-
-   private volatile boolean debug;
-
-  // private Map<Long, StackTraceElement[]> acquirers;
-
-   public ReplicationAwareReadWriteLock(final String name, final int initialCount, final boolean debug)
-   {
-      this.name = name;
-
-      rwLock = new ReentrantReadWriteLock();
-
-      sequencedLock = new SequencedLock(initialCount);
-
-      counter = new AtomicInteger(initialCount);
-      
-      this.debug = debug;
-      
-//      if (debug)
-//      {
-//         acquirers = new HashMap<Long, StackTraceElement[]>();
-//         
-//         addLock(this);
-//      }
-   }
-
-   public Lock readLock()
-   {
-      return writeLock;      
-   }
-
-   public Lock writeLock()
-   {
-      return writeLock;
-   }
-
- //  private Map<JBMThread, StackTraceElement[]> stackTraces = new ConcurrentHashMap<JBMThread, StackTraceElement[]>();
-
-   // debug only
-   private void addOwner(final JBMThread thread)
-   {
-      owners.add(thread);
-
-      //stackTraces.put(thread, thread.getStackTrace());
-   }
-
-   // debug only
-   private void removeOwner(final JBMThread thread)
-   {
-      owners.remove(thread);
-
-     // stackTraces.remove(thread);
-   }
-
-   // For debug
-   private Set<Thread> owners = new ConcurrentHashSet<Thread>();
-
-  // private static Map<String, List<ReplicationAwareReadWriteLock>> staticMap = new HashMap<String, List<ReplicationAwareReadWriteLock>>(); 
-   
-//   private static synchronized void addLock(final ReplicationAwareReadWriteLock lock)
-//   {
-//      List<ReplicationAwareReadWriteLock> locks = staticMap.get(lock.name);
-//      
-//      if (locks == null)
-//      {
-//         locks = new ArrayList<ReplicationAwareReadWriteLock>();
-//         
-//         staticMap.put(lock.name, locks);
-//      }
-//      
-//      locks.add(lock);
-//   }
-   
-//   private synchronized static void dumpLocksWithName(final String lockName)
-//   {
-//      List<ReplicationAwareReadWriteLock> locks = staticMap.get(lockName);
-//      
-//      log.info("******* DUMPING LOCKS WITH NAME " + lockName);
-//      log.info("There are " + locks.size() + " locks with name " + lockName);
-//      
-//      int count = 0;
-//      for (ReplicationAwareReadWriteLock lock: locks)
-//      {
-//         log.info("**** DUMPING LOCK " + count++);
-//         for (Map.Entry<Long, StackTraceElement[]> entry : lock.acquirers.entrySet())
-//         {
-//            Exception e = new Exception();
-//            e.setStackTrace(entry.getValue());
-//            log.info("sequence: " + entry.getKey(), e);
-//         }
-//      }
-//   }
-
-   private AtomicLong lastSequence = new AtomicLong(0);
-   
-   private boolean doLock(long time, TimeUnit unit, boolean read) throws InterruptedException
-   {
-      JBMThread thread = JBMThread.currentThread();
-
-      // debug only
-      if (owners.contains(thread))
-      {
-//         Exception e = new Exception();
-//         e.setStackTrace(stackTraces.get(thread));
-//         log.error("Stateful staticLock is not re-entrant, first obtained here", e);
-//         Exception e2 = new Exception();
-//         log.info("Second attempt to obtain here", e2);
-         throw new IllegalStateException("Lock is NOT re-entrant!");
-      }
-
-      if (thread.isReplay())
-      {
-         long sequence;
-         
-         try
-         {
-            sequence = thread.getNextSequence();
-            
-            lastSequence.set(sequence);
-         }
-         catch (IndexOutOfBoundsException e)
-         {
-            //This can occur if waiting for thread to complete on failover and we've allowed threads to executing in a different
-            //order due to setting non strict, so we just set sequence to last sequence + 1
-            sequence = lastSequence.incrementAndGet();            
-         }
-
-//         if (debug)
-//         {
-//            this.acquirers.put(sequence, thread.getStackTrace());
-//         }
-         
-         if (!sequencedLock.lock(sequence, unit.toNanos(time)))
-         {            
-             // dumpLocksWithName(name);            
-         }
-         
-         addOwner(thread);
-
-         return true;
-      }
-      else
-      {
-         boolean ok;
-
-         if (read)
-         {
-            ok = rwLock.readLock().tryLock(time, unit);
-         }
-         else
-         {
-            ok = rwLock.writeLock().tryLock(time, unit);
-         }
-
-         if (ok)
-         {
-            if (thread.isRecording())
-            {
-               long sequence = counter.getAndIncrement();
-
-               thread.addSequence(sequence);
-
-//               if (debug)
-//               {
-//                  this.acquirers.put(sequence, thread.getStackTrace());
-//               }
-            }
-
-            addOwner(thread);
-         }
-
-         return ok;
-      }
-   }
-
-   private void doUnlock(final boolean read)
-   {
-      JBMThread thread = JBMThread.currentThread();
-
-      if (thread.isReplay())
-      {
-         sequencedLock.unlock();
-      }
-      else
-      {
-         if (read)
-         {
-            rwLock.readLock().unlock();
-         }
-         else
-         {
-            rwLock.writeLock().unlock();
-         }
-      }
-
-      removeOwner(thread);
-   }
-
-   private class StatefulObjectReadLock implements Lock
-   {
-      public void lock()
-      {
-         // throw new UnsupportedOperationException();
-         try
-         {
-            doLock(10000, TimeUnit.MILLISECONDS, true);
-         }
-         catch (InterruptedException e)
-         {
-         }
-      }
-
-      public void lockInterruptibly() throws InterruptedException
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      public Condition newCondition()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      public boolean tryLock()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
-      {
-         return doLock(time, unit, true);
-      }
-
-      public void unlock()
-      {
-         doUnlock(true);
-      }
-   }
-
-   private class StatefulObjectWriteLock implements Lock
-   {
-      public void lock()
-      {
-         // throw new UnsupportedOperationException();
-         try
-         {
-            doLock(10000, TimeUnit.MILLISECONDS, false);
-         }
-         catch (InterruptedException e)
-         {
-         }
-      }
-
-      public void lockInterruptibly() throws InterruptedException
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      public Condition newCondition()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      public boolean tryLock()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
-      {
-         return doLock(time, unit, false);
-      }
-
-      public void unlock()
-      {
-         doUnlock(false);
-      }
-   }
-}

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -26,6 +26,7 @@
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.utils.Pair;
+import org.jboss.messaging.utils.Triple;
 
 /**
  * A ReplicationAwareSharedCounter
@@ -61,7 +62,15 @@
             throw new IllegalStateException("Thread should not be recording");
          }
          
-         long sequence = thread.getNextSequence();
+         Triple<Long, Long, Integer> pair = thread.getNextSequence();
+            
+         //Sanity check
+         if (pair.a != -1)
+         {
+            throw new IllegalStateException("Sequences in wrong order");
+         }
+
+         long sequence = pair.b;
          
          //FIXME - this will be slow - too much contention with many threads
          synchronized (this)
@@ -80,11 +89,9 @@
 
          if (thread.isRecording())
          {
-            thread.addSequence(sequence);
+            thread.addSequence(new Triple<Long, Long, Integer>(-1L, sequence, -1));
          }
 
-      //   log.info(System.identityHashCode(this) + " got sequence non replicated " + sequence);
-
          return sequence;
       }
    }
@@ -93,15 +100,4 @@
    {
       return al.get();
    }
-
-   // public long getAndIncrement(long expected)
-   // {
-   // while (!al.compareAndSet(expected, expected + 1))
-   // {
-   // Thread.yield();
-   // }
-   //
-   // return expected;
-   // }
-
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -32,10 +32,11 @@
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.impl.QueuedWriteManager;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
 import org.jboss.messaging.core.server.replication.ReplicableAction;
 import org.jboss.messaging.core.server.replication.Replicator;
+import org.jboss.messaging.utils.Pair;
+import org.jboss.messaging.utils.Triple;
 
 /**
  * A ReplicatorImpl
@@ -143,8 +144,13 @@
       
       thread.setNoReplayOrRecord();
       
-      List<Long> sequences = JBMThread.currentThread().getSequences();
+      List<Triple<Long, Long, Integer>> sequences = thread.getSequences();
       
+   //   log.info("Replicating:");
+      
+    //  thread.dumpSequences();
+      
+      
      // dumpSequences(sequences);
 
       // We then send the sequences to the backup

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -23,11 +23,12 @@
 package org.jboss.messaging.core.server.replication.impl;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.LockSupport;
 
 import org.jboss.messaging.core.logging.Logger;
 
@@ -50,31 +51,158 @@
    private Thread owner;
 
    private final AtomicBoolean locked = new AtomicBoolean(false);
-   
-   private volatile boolean strictOrder = true;
 
-   public SequencedLock(final long sequence)
+   // private volatile boolean strictOrder = true;
+
+   private static Map<Long, SequencedLock> locks = new HashMap<Long, SequencedLock>();
+
+   // for debug only
+  // private final long id;
+
+   private final String name;
+
+   public Thread getOwner()
    {
-      queue = new PriorityBlockingQueue<QueueEntry>();
+      return owner;
+   }
 
-      this.currentSequence = new AtomicLong(sequence);
+   // private static synchronized void registerLock(final SequencedLock lock)
+   // {
+   // locks.put(lock.id, lock);
+   // }
+   //
+   // public static synchronized SequencedLock getLock(final long id)
+   // {
+   // return locks.get(id);
+   // }
+
+   public String getName()
+   {
+      return name;
    }
-   
-   public void setStrict(boolean strict)
+
+   public static synchronized void dumpLockMap()
    {
-      this.strictOrder = strict;
-      
-      QueueEntry entry = peekEntry();
+      log.info("Dumping lock name map");
+      for (Map.Entry<Long, SequencedLock> entry : locks.entrySet())
+      {
+         log.info(entry.getKey() + ": " + entry.getValue().getName());
+      }
+   }
 
-      if (entry != null)
+   public static synchronized void dumpLocks()
+   {
+      for (SequencedLock lock : locks.values())
       {
-         LockSupport.unpark(entry.thread);
+         if (lock.owner != null)
+         {
+            log.info("************ Found lock with owner");
+            lock.dump();
+
+            // //lock.setNonStrict();
+            //            
+            // log.info("Nudging lock");
+            //            
+            // QueueEntry entry = lock.peekEntry();
+            //
+            // if (entry != null)
+            // {
+            // LockSupport.unpark(entry.thread);
+            // }
+         }
       }
    }
-           
-   //TODO parking with a timeout seems to be a lot slower than parking without timeout
-   public boolean lock(final long sequence, final long timeout)
+
+   public synchronized void dump()
    {
+      log.info("Dumping lock " + System.identityHashCode(this));
+      // log.info("*** This lock is strict " + strictOrder);
+
+      log.info("owner is " + owner);
+
+      if (owner != null)
+      {
+
+         Exception e = new Exception();
+         e.setStackTrace(owner.getStackTrace());
+         log.info("Owner trace", e);
+
+         JBMThread thr = (JBMThread)owner;
+
+//         SequencedLock waitingFor = thr.getLastLock();
+//
+//         if (waitingFor != null)
+//         {
+//            while (true)
+//            {
+//               log.info("waiting for...");
+//               waitingFor.dump();
+//
+//               thr = (JBMThread)waitingFor.owner;
+//
+//               if (thr != null)
+//               {
+//                  waitingFor = thr.getLastLock();
+//
+//                  if (waitingFor != null)
+//                  {
+//                     continue;
+//                  }
+//               }
+//
+//               break;
+//            }
+//         }
+      }
+
+      // log.info("Waiting threads: " + queue.size());
+      // for (QueueEntry entry: queue)
+      // {
+      // e = new Exception();
+      // e.setStackTrace(owner.getStackTrace());
+      // log.info("Waiting trace, sequence " + entry.sequence, e);
+      // }
+   }
+
+   public SequencedLock(final String name, final long sequence)
+   {
+     // this.id = id;
+
+      this.name = name;
+
+      queue = new PriorityBlockingQueue<QueueEntry>();
+
+      this.currentSequence = new AtomicLong(sequence);
+
+      // registerLock(this);
+   }
+
+   // public void setNonStrict()
+   // {
+   // //log.info(System.identityHashCode(this) + " setting non strict");
+   // if (strictOrder)
+   // {
+   // strictOrder = false;
+   //
+   // QueueEntry entry = peekEntry();
+   //
+   // if (entry != null)
+   // {
+   // // log.info(System.identityHashCode(this) + " setting non strict unparking " + entry.thread);
+   // //LockSupport.unpark(entry.thread);
+   // entry.thread.jbmUnpark();
+   // }
+   // }
+   // }
+   //   
+   // public boolean isStrict()
+   // {
+   // return strictOrder;
+   // }
+
+   // TODO parking with a timeout seems to be a lot slower than parking without timeout
+   public boolean lock(final long sequence, final long timeout) throws InterruptedException
+   {
       JBMThread currentThread = JBMThread.currentThread();
 
       QueueEntry entry = new QueueEntry(sequence, currentThread);
@@ -89,19 +217,26 @@
       {
          QueueEntry peeked = peekEntry();
 
-         if (peeked == null || peeked.thread != currentThread || !locked.compareAndSet(false, true))
-         {            
-            currentThread.setLastLock(this);
+         if (peeked == null || // There are higher priority threads
+             peeked.thread != currentThread ||
+             // Next thread is not this one
+             !locked.compareAndSet(false, true)) // Lock is already locked
+         {
+            //currentThread.setLastLock(this);
+
+            currentThread.setWaitingOnSequencedLock(true);
+                        
+            // LockSupport.parkNanos(toWait);
+            // log.info(System.identityHashCode(this) + " parking " + currentThread);
+            currentThread.jbmPark(toWait);
             
-            LockSupport.parkNanos(toWait);
+            currentThread.setWaitingOnSequencedLock(false);
             
-//            if (currentThread.isJBMInterrupted())
-//            {
-//               log.info("**** setting strict order to false");
-//               strictOrder = false;
-//               
-//               //Thread.interrupted();
-//            }
+            if (Thread.interrupted() && currentThread.isFrozen())
+            {
+               log.info("** It's been interrupted");
+               throw new InterruptedException();
+            }
 
             long now = System.nanoTime();
 
@@ -109,8 +244,10 @@
 
             if (toWait <= 0)
             {
-               log.warn("Timed out waiting for sequenced lock, current " + currentSequence.get() + " expected " + sequence);
-               
+               log.warn("Timed out waiting for sequenced lock, current " + currentSequence.get() +
+                        " expected " +
+                        sequence);
+
                return false;
             }
 
@@ -125,6 +262,8 @@
       queue.remove();
 
       owner = currentThread;
+      
+      
 
       return true;
    }
@@ -136,40 +275,64 @@
          throw new IllegalMonitorStateException();
       }
 
+      // log.info(System.identityHashCode(this) + " unlocking");
+
       currentSequence.incrementAndGet();
 
+      owner = null;
+
       locked.set(false);
 
       QueueEntry entry = peekEntry();
 
       if (entry != null)
       {
-         LockSupport.unpark(entry.thread);
+         // log.info(System.identityHashCode(this) + " unparking at unlock " + entry.thread);
+         // LockSupport.unpark(entry.thread);
+         entry.thread.jbmUnpark();
       }
+      // else
+      // {
+      // log.info(System.identityHashCode(this) + " nothing to unpark at unlock");
+      // }
    }
 
+   // private QueueEntry peekEntry()
+   // {
+   // QueueEntry entry = queue.peek();
+   //
+   // if (entry != null)
+   // {
+   // if (!strictOrder || entry.sequence == currentSequence.get())
+   // {
+   // return entry;
+   // }
+   // }
+   //
+   // return null;
+   // }
+
    private QueueEntry peekEntry()
    {
       QueueEntry entry = queue.peek();
 
-      if (entry != null)
+      if (entry != null && entry.sequence == currentSequence.get())
       {
-         if (!strictOrder ||entry.sequence == currentSequence.get())
-         {
-            return entry;
-         }
+         return entry;
       }
-      
-      return null;      
+      else
+      {
+         return null;
+      }
    }
 
    private static final class QueueEntry implements Comparable<QueueEntry>
    {
       private final long sequence;
 
-      private final Thread thread;
+      private final JBMThread thread;
 
-      private QueueEntry(final long sequence, final Thread thread)
+      private QueueEntry(final long sequence, final JBMThread thread)
       {
          this.sequence = sequence;
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -410,6 +410,10 @@
          return null;
       }
    }
+   
+   public void setNeverFail()
+   {      
+   }
 
    // Public --------------------------------------------------------
 

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -20,7 +20,6 @@
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
-
 package org.jboss.messaging.tests.integration.cluster.failover;
 
 import java.util.ArrayList;
@@ -52,7 +51,7 @@
 {
 
    // Constants -----------------------------------------------------
-   
+
    private final Logger log = Logger.getLogger(this.getClass());
 
    // Attributes ----------------------------------------------------
@@ -72,41 +71,40 @@
    protected abstract void start() throws Exception;
 
    protected abstract void stop() throws Exception;
-   
+
    protected abstract ClientSessionFactoryInternal createSessionFactory();
-   
+
    protected void setUp() throws Exception
    {
       super.setUp();
       timer = new Timer();
    }
-   
+
    protected void tearDown() throws Exception
    {
       timer.cancel();
       super.tearDown();
    }
-   
+
    protected boolean shouldFail()
    {
       return true;
    }
 
-
-   
    protected void runMultipleThreadsFailoverTest(final RunnableT runnable,
-                                       final int numThreads,
-                                       final int numIts,
-                                       final boolean failOnCreateConnection,
-                                       final long failDelay) throws Exception
+                                                 final int numThreads,
+                                                 final int numIts,
+                                                 final boolean failOnCreateConnection,
+                                                 final long failDelay) throws Exception
    {
       for (int its = 0; its < numIts; its++)
       {
          log.info("Beginning iteration " + its);
-         
+
          start();
 
          final ClientSessionFactoryInternal sf = createSessionFactory();
+         log.info("Created client session factory");
 
          final ClientSession session = sf.createSession(false, true, true);
 
@@ -123,7 +121,7 @@
             Runner(final String name, final RunnableT test, final int threadNum)
             {
                super(name);
-               
+
                this.test = test;
 
                this.threadNum = threadNum;
@@ -144,7 +142,8 @@
 
                   // Case a failure happened here, it should print the Thread dump
                   // Sending it to System.out, as it would show on the Tests report
-                  //System.out.println(threadDump(" - fired by MultiThreadRandomFailoverTestBase::runTestMultipleThreads (" + t.getLocalizedMessage() + ")"));
+                  // System.out.println(threadDump(" - fired by
+                  // MultiThreadRandomFailoverTestBase::runTestMultipleThreads (" + t.getLocalizedMessage() + ")"));
                }
             }
          }
@@ -184,14 +183,13 @@
          assertEquals(0, sf.numSessions());
 
          assertEquals(0, sf.numConnections());
-         
+
          sf.close();
 
          stop();
       }
    }
 
-
    // Private -------------------------------------------------------
 
    private Failer startFailer(final long time, final ClientSession session, final boolean failOnCreateConnection)
@@ -207,10 +205,8 @@
       return failer;
    }
 
-
    // Inner classes -------------------------------------------------
 
- 
    protected abstract class RunnableT extends Thread
    {
       private volatile String failReason;
@@ -238,8 +234,6 @@
       public abstract void run(final ClientSessionFactory sf, final int threadNum) throws Exception;
    }
 
-   
-
    private class Failer extends TimerTask
    {
       private final ClientSession session;
@@ -285,6 +279,4 @@
       }
    }
 
-   
-
 }

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -90,7 +90,7 @@
          {
             doTestA(sf, threadNum);
          }
-      }, 1, false);
+      }, NUM_THREADS, false);
    }
 
    public void testB() throws Exception
@@ -117,7 +117,7 @@
       }, NUM_THREADS, false);
    }
 
-   public void testD() throws Exception
+   public void testD() throws Exception          
    {
       runTestMultipleThreads(new RunnableT()
       {
@@ -1109,7 +1109,7 @@
     */
    protected void doTestL(final ClientSessionFactory sf) throws Exception
    {
-      final int numSessions = 10;
+      final int numSessions = 100;
 
       for (int i = 0; i < numSessions; i++)
       {

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/SequencedLockTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/SequencedLockTest.java	2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/SequencedLockTest.java	2009-07-15 12:37:24 UTC (rev 7573)
@@ -27,6 +27,7 @@
 import junit.framework.TestCase;
 
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.replication.impl.JBMThread;
 import org.jboss.messaging.core.server.replication.impl.SequencedLock;
 
 /**
@@ -39,64 +40,144 @@
 public class SequencedLockTest extends TestCase
 {
    private static final Logger log = Logger.getLogger(SequencedLockTest.class);
-   
+
    private static final long TIMEOUT = 5000000000L;
 
-   public void testAscending() throws Exception
+   public void testAscendingStrict() throws Exception
    {
       for (int i = 0; i < 1000; i++)
       {
-         //log.info("iter " + i);
-         this.doTestSequences(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+         // log.info("iter " + i);
+         this.doTestSequences(true, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
       }
    }
 
-   public void testDescending() throws Exception
+   public void testDescendingStrict() throws Exception
    {
       for (int i = 0; i < 1000; i++)
       {
-         //log.info("iter " + i);
-         this.doTestSequences(9, 8, 7, 6, 5, 4, 3, 2, 1, 0);
+         // log.info("iter " + i);
+         this.doTestSequences(true, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0);
       }
    }
 
-   public void testAlternate() throws Exception
+   public void testAlternateStrict() throws Exception
    {
       for (int i = 0; i < 1000; i++)
       {
-        // log.info("iter " + i);
-         this.doTestSequences(9, 0, 8, 1, 7, 2, 6, 3, 5, 4);
+         // log.info("iter " + i);
+         this.doTestSequences(true, 9, 0, 8, 1, 7, 2, 6, 3, 5, 4);
       }
    }
 
-   public void testRandom() throws Exception
+   public void testRandomStrict() throws Exception
    {
       for (int i = 0; i < 1000; i++)
       {
-        // log.info("iter " + i);
-         this.doTestSequences(3, 9, 5, 7, 1, 2, 0, 8, 6, 4);
+         // log.info("iter " + i);
+         this.doTestSequences(true, 3, 9, 5, 7, 1, 2, 0, 8, 6, 4);
       }
    }
 
+   public void testAscendingNonStrict() throws Exception
+   {
+      for (int i = 0; i < 1000; i++)
+      {
+         // log.info("iter " + i);
+         this.doTestSequences(false, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+      }
+   }
+
+   public void testDescendingNonStrict() throws Exception
+   {
+      for (int i = 0; i < 1000; i++)
+      {
+         // log.info("iter " + i);
+         this.doTestSequences(false, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0);
+      }
+   }
+
+   public void testAlternateNonStrict() throws Exception
+   {
+      for (int i = 0; i < 1000; i++)
+      {
+         // log.info("iter " + i);
+         this.doTestSequences(false, 9, 0, 8, 1, 7, 2, 6, 3, 5, 4);
+      }
+   }
+
+   public void testRandomNonStrict() throws Exception
+   {
+      for (int i = 0; i < 1000; i++)
+      {
+         // log.info("iter " + i);
+         this.doTestSequences(false, 3, 9, 5, 7, 1, 2, 0, 8, 6, 4);
+      }
+   }
+
    public void testSpeed() throws Exception
    {
       SequencedLock lock = new SequencedLock(0);
 
       AtomicInteger counter = new AtomicInteger(0);
-      
+
       AtomicInteger obcounter = new AtomicInteger(0);
-      
+
       final int numThreads = 10;
-      
+
       final int acquires = 100000;
-      
+
       MyThread2[] threads = new MyThread2[numThreads];
+
+      for (int i = 0; i < numThreads; i++)
+      {
+         threads[i] = new MyThread2(lock, acquires, counter, obcounter, true);
+      }
+
+      long start = System.currentTimeMillis();
+
+      for (int i = 0; i < numThreads; i++)
+      {
+         threads[i].start();
+      }
+
+      for (int i = 0; i < numThreads; i++)
+      {
+         threads[i].join();
+      }
       
       for (int i = 0; i < numThreads; i++)
       {
-         threads[i] = new MyThread2(lock, acquires, counter, obcounter);
+         assertNull(threads[i].exception);
       }
-      
+
+      long end = System.currentTimeMillis();
+
+      double rate = 1000 * ((double)acquires * numThreads) / (end - start);
+
+      System.out.println("Rate " + rate);
+
+   }
+
+   public void testSpeedChangeNonStrict() throws Exception
+   {
+      SequencedLock lock = new SequencedLock(0);
+
+      AtomicInteger counter = new AtomicInteger(0);
+
+      AtomicInteger obcounter = new AtomicInteger(0);
+
+      final int numThreads = 10;
+
+      final int acquires = 10000;
+
+      MyThread2[] threads = new MyThread2[numThreads];
+
+      for (int i = 0; i < numThreads; i++)
+      {
+         threads[i] = new MyThread2(lock, acquires, counter, obcounter, false);
+      }
+
       long start = System.currentTimeMillis();
 
       for (int i = 0; i < numThreads; i++)
@@ -104,68 +185,104 @@
          threads[i].start();
       }
 
+      // Thread.sleep(500);
+
+      log.info("Setting non strict");
+
+      lock.setNonStrict();
+
       for (int i = 0; i < numThreads; i++)
       {
          threads[i].join();
       }
+      
+      for (int i = 0; i < numThreads; i++)
+      {
+         assertNull(threads[i].exception);
+      }
 
+      log.info("done");
+
       long end = System.currentTimeMillis();
-      
+
       double rate = 1000 * ((double)acquires * numThreads) / (end - start);
 
       System.out.println("Rate " + rate);
-     
    }
 
-   class MyThread2 extends Thread
+   class MyThread2 extends JBMThread
    {
       private int acquireCount;
-      
+
       private final AtomicInteger counter;
-      
+
       private final AtomicInteger observedCounter;
-      
+
       private final SequencedLock lock;
 
-      MyThread2(final SequencedLock lock, final int acquires, final AtomicInteger counter, final AtomicInteger observedCounter)
+      private final boolean strict;
+      
+      private volatile Exception exception;
+
+      MyThread2(final SequencedLock lock,
+                final int acquires,
+                final AtomicInteger counter,
+                final AtomicInteger observedCounter,
+                final boolean strict)
       {
-         this.lock= lock;
-         
+         this.lock = lock;
+
          this.acquireCount = acquires;
-         
+
          this.counter = counter;
-         
+
          this.observedCounter = observedCounter;
+
+         this.strict = strict;
       }
-      
+
       public void run()
       {
-         for (int i = 0; i < acquireCount; i++)
+         try
          {
-            int cnt = counter.getAndIncrement();
-            
-          //  log.info("Trying lock " + cnt);
-            
-            lock.lock(cnt, TIMEOUT);
-            
-            int ob = observedCounter.getAndIncrement();
-            
-           // log.info("Got lock " + ob);
-            
-            if (cnt != ob)
+            for (int i = 0; i < acquireCount; i++)
             {
-               System.out.println("Out of order, cnt " + cnt + " ob " + ob);
+               int cnt = counter.getAndIncrement();
+   
+               // log.info("Trying lock " + cnt);
+   
+               lock.lock(cnt, TIMEOUT);
+   
+               int ob = observedCounter.getAndIncrement();
+   
+               // log.info("Got lock " + ob);
+   
+               if (strict && cnt != ob)
+               {
+                  System.out.println("Out of order, cnt " + cnt + " ob " + ob);
+               }
+   
+               lock.unlock();
             }
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to run", e);
             
-            lock.unlock();
+            this.exception = e;
          }
       }
    }
 
-   private void doTestSequences(final int... sequences) throws Exception
+   private void doTestSequences(final boolean strict, final int... sequences) throws Exception
    {
       SequencedLock lock = new SequencedLock(0);
 
+      if (!strict)
+      {
+         lock.setNonStrict();
+      }
+
       AtomicInteger counter = new AtomicInteger(0);
 
       MyThread[] threads = new MyThread[sequences.length];
@@ -184,14 +301,22 @@
       {
          threads[i].join();
       }
-
+      
       for (int i = 0; i < sequences.length; i++)
       {
-         assertEquals(sequences[i], threads[i].getObtainedSeq());
+         assertNull(threads[i].exception);
       }
+
+      if (strict)
+      {
+         for (int i = 0; i < sequences.length; i++)
+         {
+            assertEquals(sequences[i], threads[i].getObtainedSeq());
+         }
+      }
    }
 
-   class MyThread extends Thread
+   class MyThread extends JBMThread
    {
       private AtomicInteger counter;
 
@@ -200,6 +325,8 @@
       private final int seq;
 
       private volatile int obtainedSeq;
+      
+      private volatile Exception exception;
 
       MyThread(final int seq, final AtomicInteger counter, final SequencedLock lock)
       {
@@ -212,19 +339,27 @@
 
       public void run()
       {
-         // log.info(System.identityHashCode(this) + " Will attempt to obtain lock with sequence " + seq);
-
-         lock.lock(seq, TIMEOUT);
-
-         // log.info(System.identityHashCode(this) + " Obtained lock with sequence " + seq);
-
          try
          {
-            obtainedSeq = counter.getAndIncrement();
+            // log.info(System.identityHashCode(this) + " Will attempt to obtain lock with sequence " + seq);
+   
+            lock.lock(seq, TIMEOUT);
+   
+            // log.info(System.identityHashCode(this) + " Obtained lock with sequence " + seq);
+   
+            try
+            {
+               obtainedSeq = counter.getAndIncrement();
+            }
+            finally
+            {
+               lock.unlock();
+            }
          }
-         finally
+         catch (Exception e)
          {
-            lock.unlock();
+            log.error("Failed to run", e);
+            this.exception = e;
          }
       }
 




More information about the jboss-cvs-commits mailing list