[jboss-cvs] JBoss Messaging SVN: r7555 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/client/impl and 13 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jul 10 06:59:05 EDT 2009


Author: timfox
Date: 2009-07-10 06:59:04 -0400 (Fri, 10 Jul 2009)
New Revision: 7555

Added:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/utils/WeakHashSet.java
Modified:
   branches/Branch_MultiThreaded_Replication/src/config/stand-alone/non-clustered/logging.properties
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/utils/ConcurrentHashSet.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/SequencedLockTest.java
Log:
MT replication

Modified: branches/Branch_MultiThreaded_Replication/src/config/stand-alone/non-clustered/logging.properties
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/config/stand-alone/non-clustered/logging.properties	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/config/stand-alone/non-clustered/logging.properties	2009-07-10 10:59:04 UTC (rev 7555)
@@ -20,7 +20,6 @@
 java.util.logging.FileHandler.level=INFO
 java.util.logging.FileHandler.formatter=org.jboss.messaging.integration.logging.JBMLoggerFormatter
 java.util.logging.FileHandler.pattern=../logs/messaging.log
-java.util.logging.FileHandler.limit=10000
 # Default global logging level.
 # This specifies which kinds of events are logged across
 # all loggers.  For any given facility this global level

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -676,7 +676,11 @@
 
          closedSent = true;
 
+        // log.info(System.identityHashCode(this) + " session sending close message");
+         
          channel.sendBlocking(new SessionCloseMessage());
+         
+         //log.info(System.identityHashCode(this) + " session sent close message");
       }
       catch (Throwable ignore)
       {
@@ -721,26 +725,41 @@
 
       try
       {
+         log.info(System.identityHashCode(this) + " session handling failover");
+         
+         //Prevent any more packets being handled on the old connection
+         channel.getConnection().freeze();
+         
          channel.transferConnection(backupConnection);
 
          backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
 
          remotingConnection = backupConnection;
 
-         Packet request = new ReattachSessionMessage(name, channel.getLastConfirmedCommandID());
+         int lid = channel.getLastConfirmedCommandID();
+         
+         log.info(System.identityHashCode(this) + " last received command id on client side is " + lid);
+         
+         Packet request = new ReattachSessionMessage(name, lid);
 
          Channel channel1 = backupConnection.getChannel(1, -1, false);
 
          ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
          
+       //  log.info(System.identityHashCode(this) + " got response from reattach session");
+         
          if (!response.isRemoved())
          {
+           // log.info(System.identityHashCode(this) + " found session, server last received command id is " + response.getLastConfirmedCommandID());
+            
             channel.replayCommands(response.getLastConfirmedCommandID());
 
             ok = true;
          }
          else
          {
+           // log.info(System.identityHashCode(this) + " didn't find session, closed sent " + closedSent);
+            
             if (closedSent)
             {
                // a session re-attach may fail, if the session close was sent before failover started, hit the server,

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -33,6 +33,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.utils.SimpleString;
 
 /**
  *
@@ -109,5 +110,14 @@
       }
       
       channel.confirm(packet);
+      
+      if (packet.getType() == SESS_RECEIVE_MSG)
+      {
+         SessionReceiveMessage message = (SessionReceiveMessage) packet;
+         
+         int cnt = (Integer)message.getClientMessage().getProperty(new SimpleString("count"));
+         
+        // log.info("confirmed on client " + cnt);
+      }
    }
 }
\ No newline at end of file

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -22,6 +22,8 @@
 
 package org.jboss.messaging.core.client.impl;
 
+import java.util.Set;
+
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.RemotingConnection;

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -472,7 +472,7 @@
 
       closed = true;
    }
-   
+
    public void returnConnection(final RemotingConnection conn)
    {
       synchronized (createSessionLock)
@@ -485,7 +485,7 @@
             {
                refCount--;
             }
-            
+
             pingers.remove(conn.getID());
 
             try
@@ -494,10 +494,9 @@
             }
             catch (Throwable ignore)
             {
-            }                       
+            }
          }
       }
-
    }
 
    // Public
@@ -893,7 +892,7 @@
    public RemotingConnection getConnection(final int initialRefCount)
    {
       RemotingConnection conn;
-      
+
       if (connections.size() < maxConnections)
       {
          // Create a new one

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -41,7 +41,6 @@
 * 
 * Valid identifiers that can be used are:
 * 
-* JBMessageID - the message id of the message
 * JBMPriority - the priority of the message
 * JBMTimestamp - the timestamp of the message
 * JBMDurable - "DURABLE" or "NON_DURABLE"

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -1122,7 +1122,7 @@
 
       public BatchingIDGenerator(final long start, final long checkpointSize)
       {
-         this.counter = new ReplicationAwareSharedCounter(backup, start);
+         this.counter = new ReplicationAwareSharedCounter(start);
 
          this.checkpointSize = checkpointSize;
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -67,7 +67,7 @@
    {
       this.backup = backup;
       
-      idSequence = new ReplicationAwareSharedCounter(backup, 0);
+      idSequence = new ReplicationAwareSharedCounter(0);
    }
    
    public UUID getPersistentID()

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -83,8 +83,6 @@
 {
    private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
 
-   // public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("_JBM_RESET_QUEUE_DATA");
-
    private final AddressManager addressManager;
 
    private final QueueFactory queueFactory;
@@ -113,24 +111,6 @@
 
    private final boolean persistIDCache;
 
-   // Each queue has a transient ID which lasts the lifetime of its binding. This is used in clustering when routing
-   // messages to particular queues on nodes. We could
-   // use the queue name on the node to identify it. But sometimes we need to route to maybe 10s of thousands of queues
-   // on a particular node, and all would
-   // have to be specified in the message. Specify 10000 ints takes up a lot less space than 10000 arbitrary queue names
-   // The drawback of this approach is we only allow up to 2^32 queues in memory at any one time
-  // private int transientIDSequence;
-
-  // private Set<Integer> transientIDs = new HashSet<Integer>();
-
-   // private Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString, QueueInfo>();
-
-   // private final Object notificationLock = new Object();
-
-   // private final org.jboss.messaging.utils.ExecutorFactory redistributorExecutorFactory;
-   //
-   // private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
-
    private final ReadWriteLock lock;
 
    public PostOfficeImpl(final StorageManager storageManager,
@@ -143,9 +123,6 @@
                          final boolean backup,
                          final int idCacheSize,
                          final boolean persistIDCache)
-   // final ExecutorFactory orderedExecutorFactory,
-   // HierarchicalRepository<AddressSettings> addressSettingsRepository)
-
    {
       this.storageManager = storageManager;
 
@@ -174,10 +151,6 @@
 
       this.persistIDCache = persistIDCache;
 
-      // this.redistributorExecutorFactory = orderedExecutorFactory;
-
-      // this.addressSettingsRepository = addressSettingsRepository;
-      
       lock = new ReplicationAwareReadWriteLock("postoffice", 0, true);
    }
 
@@ -185,8 +158,6 @@
 
    public synchronized void start() throws Exception
    {
-      // managementService.addNotificationListener(this);
-
       if (pagingManager != null)
       {
          pagingManager.setPostOffice(this);
@@ -205,7 +176,6 @@
 
    public synchronized void stop() throws Exception
    {      
-      // managementService.removeNotificationListener(this);
 
       if (reaper != null)
       {
@@ -216,10 +186,6 @@
 
       addressManager.clear();
 
-      // queueInfos.clear();
-
-     // transientIDs.clear();
-
       started = false;
    }
 
@@ -228,226 +194,7 @@
       return started;
    }
 
-   // NotificationListener implementation -------------------------------------
-
-   // public void onNotification(final Notification notification)
-   // {
-   // synchronized (notificationLock)
-   // {
-   // NotificationType type = notification.getType();
-   //
-   // switch (type)
-   // {
-   // case BINDING_ADDED:
-   // {
-   // TypedProperties props = notification.getProperties();
-   //
-   // Integer bindingType = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_TYPE);
-   //
-   // if (bindingType == null)
-   // {
-   // throw new IllegalArgumentException("Binding type not specified");
-   // }
-   //
-   // if (bindingType == BindingType.DIVERT_INDEX)
-   // {
-   // // We don't propagate diverts
-   // return;
-   // }
-   //
-   // SimpleString routingName = (SimpleString)props.getProperty(ManagementHelper.HDR_ROUTING_NAME);
-   //
-   // SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
-   //
-   // SimpleString address = (SimpleString)props.getProperty(ManagementHelper.HDR_ADDRESS);
-   //
-   // Integer transientID = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_ID);
-   //
-   // SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
-   //
-   // Integer distance = (Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
-   //
-   // QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, transientID, distance);
-   //
-   // queueInfos.put(clusterName, info);
-   //
-   // break;
-   // }
-   // case BINDING_REMOVED:
-   // {
-   // TypedProperties props = notification.getProperties();
-   //
-   // SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
-   //
-   // if (clusterName == null)
-   // {
-   // throw new IllegalStateException("No cluster name");
-   // }
-   //
-   // QueueInfo info = queueInfos.remove(clusterName);
-   //
-   // if (info == null)
-   // {
-   // throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
-   // }
-   //
-   // break;
-   // }
-   // case CONSUMER_CREATED:
-   // {
-   // TypedProperties props = notification.getProperties();
-   //
-   // SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
-   //
-   // if (clusterName == null)
-   // {
-   // throw new IllegalStateException("No cluster name");
-   // }
-   //
-   // SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
-   //
-   // QueueInfo info = queueInfos.get(clusterName);
-   //
-   // if (info == null)
-   // {
-   // throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
-   // }
-   //
-   // info.incrementConsumers();
-   //
-   // if (filterString != null)
-   // {
-   // List<SimpleString> filterStrings = info.getFilterStrings();
-   //
-   // if (filterStrings == null)
-   // {
-   // filterStrings = new ArrayList<SimpleString>();
-   //
-   // info.setFilterStrings(filterStrings);
-   // }
-   //
-   // filterStrings.add(filterString);
-   // }
-   //
-   // Integer distance = (Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
-   //
-   // if (distance == null)
-   // {
-   // throw new IllegalStateException("No distance");
-   // }
-   //
-   // if (distance > 0)
-   // {
-   // SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_ROUTING_NAME);
-   //
-   // if (queueName == null)
-   // {
-   // throw new IllegalStateException("No queue name");
-   // }
-   //
-   // Binding binding = getBinding(queueName);
-   //
-   // if (binding != null)
-   // {
-   // // We have a local queue
-   // Queue queue = (Queue)binding.getBindable();
-   //
-   // AddressSettings addressSettings = addressSettingsRepository.getMatch(binding.getAddress()
-   // .toString());
-   //
-   // long redistributionDelay = addressSettings.getRedistributionDelay();
-   //
-   // if (redistributionDelay != -1)
-   // {
-   // queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
-   // }
-   // }
-   // }
-   //
-   // break;
-   // }
-   // case CONSUMER_CLOSED:
-   // {
-   // TypedProperties props = notification.getProperties();
-   //
-   // SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
-   //
-   // if (clusterName == null)
-   // {
-   // throw new IllegalStateException("No distance");
-   // }
-   //
-   // SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
-   //
-   // QueueInfo info = queueInfos.get(clusterName);
-   //
-   // if (info == null)
-   // {
-   // throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
-   // }
-   //
-   // info.decrementConsumers();
-   //
-   // if (filterString != null)
-   // {
-   // List<SimpleString> filterStrings = info.getFilterStrings();
-   //
-   // filterStrings.remove(filterString);
-   // }
-   //
-   // if (info.getNumberOfConsumers() == 0)
-   // {
-   // Integer distance = (Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
-   //
-   // if (distance == null)
-   // {
-   // throw new IllegalStateException("No cluster name");
-   // }
-   //
-   // if (distance == 0)
-   // {
-   // SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_ROUTING_NAME);
-   //
-   // if (queueName == null)
-   // {
-   // throw new IllegalStateException("No queue name");
-   // }
-   //
-   // Binding binding = getBinding(queueName);
-   //
-   // if (binding == null)
-   // {
-   // throw new IllegalStateException("No queue " + queueName);
-   // }
-   //
-   // Queue queue = (Queue)binding.getBindable();
-   //
-   // AddressSettings addressSettings = addressSettingsRepository.getMatch(binding.getAddress()
-   // .toString());
-   //
-   // long redistributionDelay = addressSettings.getRedistributionDelay();
-   //
-   // if (redistributionDelay != -1)
-   // {
-   // queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
-   // }
-   // }
-   // }
-   //
-   // break;
-   // }
-   // case SECURITY_AUTHENTICATION_VIOLATION:
-   // case SECURITY_PERMISSION_VIOLATION:
-   // break;
-   // default:
-   // {
-   // throw new IllegalArgumentException("Invalid type " + type);
-   // }
-   //
-   // }
-   // }
-   // }
-
+   
    // PostOffice implementation -----------------------------------------------
 
    // TODO - needs to be locked to prevent happening concurrently with activate().
@@ -458,7 +205,9 @@
    public void addBinding(final Binding binding) throws Exception
    {
       boolean addressExists;
+      
       lock.writeLock().lock();
+      
       try
       {
          addressExists = addressManager.getBindingsForRoutingAddress(binding.getAddress()) != null;
@@ -804,87 +553,7 @@
       return cache;
    }
 
-   // public Object getNotificationLock()
-   // {
-   // return notificationLock;
-   // }
-
-   // public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
-   // {
-   // // We send direct to the queue so we can send it to the same queue that is bound to the notifications adress -
-   // // this is crucial for ensuring
-   // // that queue infos and notifications are received in a contiguous consistent stream
-   // Binding binding = addressManager.getBinding(queueName);
-   //
-   // if (binding == null)
-   // {
-   // throw new IllegalStateException("Cannot find queue " + queueName);
-   // }
-   //
-   // Queue queue = (Queue)binding.getBindable();
-   //
-   // // Need to lock to make sure all queue info and notifications are in the correct order with no gaps
-   // synchronized (notificationLock)
-   // {
-   // // First send a reset message
-   //
-   // ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
-   // message.setBody(ChannelBuffers.EMPTY_BUFFER);
-   // message.setDestination(queueName);
-   // message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
-   // queue.preroute(message, null);
-   // queue.route(message, null);
-   //
-   // for (QueueInfo info : queueInfos.values())
-   // {
-   // if (info.getAddress().startsWith(address))
-   // {
-   // message = createQueueInfoMessage(NotificationType.BINDING_ADDED, queueName);
-   //
-   // message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
-   // message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
-   // message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
-   // message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
-   // message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
-   // message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
-   //
-   // routeDirect(queue, message);
-   //
-   // int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
-   //
-   // for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; i++)
-   // {
-   // message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
-   //
-   // message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
-   // message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
-   // message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
-   // message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
-   //
-   // routeDirect(queue, message);
-   // }
-   //
-   // if (info.getFilterStrings() != null)
-   // {
-   // for (SimpleString filterString : info.getFilterStrings())
-   // {
-   // message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
-   //
-   // message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
-   // message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
-   // message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
-   // message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
-   // message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
-   //
-   // routeDirect(queue, message);
-   // }
-   // }
-   // }
-   // }
-   // }
-   //
-   // }
-
+   
    // Private -----------------------------------------------------------------
 
    private synchronized void startExpiryScanner()
@@ -902,56 +571,7 @@
 //      }
    }
 
-   // private void routeDirect(final Queue queue, final ServerMessage message) throws Exception
-   // {
-   // if (queue.getFilter() == null || queue.getFilter().match(message))
-   // {
-   // queue.preroute(message, null);
-   // queue.route(message, null);
-   // }
-   // }
-   //
-   // private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
-   // {
-   // ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
-   // message.setBody(ChannelBuffers.EMPTY_BUFFER);
-   //
-   // message.setDestination(queueName);
-   //
-   // String uid = UUIDGenerator.getInstance().generateStringUUID();
-   //
-   // message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));
-   // message.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
-   //
-   // message.putStringProperty(new SimpleString("foobar"), new SimpleString(uid));
-   //
-   // return message;
-   // }
-
-//   private int generateTransientID()
-//   {
-//      int start = transientIDSequence;
-//      do
-//      {
-//         int id = transientIDSequence++;
-//
-//         if (!transientIDs.contains(id))
-//         {
-//            transientIDs.add(id);
-//
-//            return id;
-//         }
-//      }
-//      while (transientIDSequence != start);
-//
-//      throw new IllegalStateException("Run out of queue ids!");
-//   }
-//
-//   private void releaseTransientID(final int id)
-//   {
-//      transientIDs.remove(id);
-//   }
-
+  
    private final PageMessageOperation getPageOperation(final Transaction tx)
    {
       // you could have races on the case two sessions using the same XID

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -75,7 +75,5 @@
    
    long getBlockingCallTimeout();
    
-   Object getTransferLock();  
-   
    RemotingConnection getReplicatingConnection();
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -42,7 +42,10 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.utils.SimpleString;
 
 /**
  * A ChannelImpl
@@ -285,6 +288,10 @@
 
             if (resendCache != null && packet.isRequiresConfirmations())
             {
+//               if (packet.getType() == PacketImpl.SESS_CLOSE)
+//               {
+//                  log.info(System.identityHashCode(this) + " added session close to resend cache");                  
+//               }
                resendCache.add(packet);
             }
 
@@ -372,38 +379,53 @@
 
    public void transferConnection(final RemotingConnection newConnection)
    {
-      // Needs to synchronize on the connection to make sure no packets from
-      // the old connection get processed after transfer has occurred
-      synchronized (connection.getTransferLock())
-      {
-         connection.removeChannel(id);
+      connection.removeChannel(id);
 
-         // if (replicatingChannel != null)
-         // {
-         // // If we're reconnecting to a live node which is replicated then there will be a replicating channel
-         // // too. We need to then make sure that all replication responses come back since packets aren't
-         // // considered confirmed until response comes back and is processed. Otherwise responses to previous
-         // // message sends could come back after reconnection resulting in clients resending same message
-         // // since it wasn't confirmed yet.
-         // replicatingChannel.waitForAllReplicationResponse();
-         // }
+      // if (replicatingChannel != null)
+      // {
+      // // If we're reconnecting to a live node which is replicated then there will be a replicating channel
+      // // too. We need to then make sure that all replication responses come back since packets aren't
+      // // considered confirmed until response comes back and is processed. Otherwise responses to previous
+      // // message sends could come back after reconnection resulting in clients resending same message
+      // // since it wasn't confirmed yet.
+      // replicatingChannel.waitForAllReplicationResponse();
+      // }
 
-         // And switch it
+      // And switch it
 
-         final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
+      final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
 
-         rnewConnection.putChannel(id, this);
+      rnewConnection.putChannel(id, this);
 
-         connection = rnewConnection;
-      }
+      connection = rnewConnection;
    }
 
    public void replayCommands(final int otherLastConfirmedCommandID)
    {
-      clearUpTo(otherLastConfirmedCommandID);
+     // log.info("replaying, other last command id " + otherLastConfirmedCommandID);
+      
+      if (otherLastConfirmedCommandID != -1)
+      {
+         clearUpTo(otherLastConfirmedCommandID);
+      }
 
+      //log.info("Resend cache size is " + resendCache.size());
+      
       for (final Packet packet : resendCache)
       {
+         //log.info("Replaying command " + packet);
+         
+//         if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
+//         {
+//            SessionReceiveMessage sm = (SessionReceiveMessage)packet;
+//            
+//            ServerMessage msg = sm.getServerMessage();
+//            
+//            int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+//            
+//            log.info("resending message " + cnt);
+//         }
+         
          doWrite(packet);
       }
    }
@@ -435,16 +457,29 @@
 
    public void flushConfirmations()
    {
-      if (receivedBytes != 0 && connection.isActive())
+      int lcid = this.lastConfirmedCommandID;
+      
+      if (receivedBytes != 0 && connection.isActive() && lcid != -1)
       {
          receivedBytes = 0;
 
-         final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID);
+         //log.info("Sending packets confirmed from flush");
+         
+         sendConfirmation(lcid);     
+      }
+   }
+   
+   private void sendConfirmation(final int lastConfirmedID)
+   {
+      final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedID);
 
-         confirmed.setChannelID(id);
+      confirmed.setChannelID(id);
 
-         doWrite(confirmed);
-      }
+      //We need to queue packet confirmations too
+      if (!queuedWriteManager.tryQueue(confirmed))
+      {
+         doWrite(confirmed);   
+      }                    
    }
 
    public void confirm(final Packet packet)
@@ -452,6 +487,8 @@
       if (resendCache != null && packet.isRequiresConfirmations())
       {
          lastConfirmedCommandID++;
+         
+         //log.info("last confirmed id is now " + lastConfirmedCommandID);
 
          receivedBytes += packet.getPacketSize();
 
@@ -460,12 +497,8 @@
             receivedBytes = 0;
 
             if (connection.isActive())
-            {               
-               final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID);
-
-               confirmed.setChannelID(id);
-
-               doWrite(confirmed);
+            {
+               sendConfirmation(lastConfirmedCommandID);           
             }
          }
       }
@@ -526,16 +559,17 @@
 
    private void clearUpTo(final int lastConfirmedCommandID)
    {
-      final int numberToClear = 1 + lastConfirmedCommandID - firstStoredCommandID;
-
-      if (numberToClear < 0)
+      //log.info(System.identityHashCode(this) + " clear up to " + lastConfirmedCommandID + " on client " + connection.isClient() + " fscid " + this.firstStoredCommandID);
+      
+      if (lastConfirmedCommandID < firstStoredCommandID)
       {
-         throw new IllegalArgumentException("Invalid lastConfirmedCommandID: " + lastConfirmedCommandID +
-                                            " firstStoredCommandID " +
-                                            firstStoredCommandID +
-                                            " client " +
-                                            connection.isClient());
+         //This can legitimately happen, if the flushConfirmations() is called from the other side which causes a packet confirmation to be sent, after that
+         //another packet confirmation can come or on failover when the lastConfirmedCommandID is retrieved from the other side there may be overlap
+         //because of the previously flush. In this case we can safely ignore it.
+         return;
       }
+      
+      final int numberToClear = 1 + lastConfirmedCommandID - firstStoredCommandID;
 
       int sizeToFree = 0;
 
@@ -551,6 +585,8 @@
                                             " first stored command id " +
                                             firstStoredCommandID);
          }
+         
+        // log.info("cleared packet " + packet);
 
          if (packet.getType() != PACKETS_CONFIRMED)
          {

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -32,6 +32,7 @@
 import org.jboss.messaging.core.remoting.spi.Connector;
 import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.replication.impl.JBMThread;
 import org.jboss.messaging.utils.SimpleIDGenerator;
 
 /**
@@ -111,9 +112,9 @@
 
    private boolean idGeneratorSynced = false;
 
-   private final Object transferLock = new Object();
+   // private final Object freezeLock = new Object();
 
-   private boolean frozen;
+   private volatile boolean frozen;
 
    private final Object failLock = new Object();
 
@@ -331,11 +332,6 @@
       return idGenerator.getCurrentID();
    }
 
-   public Object getTransferLock()
-   {
-      return transferLock;
-   }
-
    public boolean isActive()
    {
       return active;
@@ -364,11 +360,15 @@
    // Buffer Handler implementation
    // ----------------------------------------------------
 
+   private volatile Thread currentThread;
+
    public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
    {
       final Packet packet = decoder.decode(buffer);
 
-      synchronized (transferLock)
+      currentThread = Thread.currentThread();
+
+      try
       {
          if (!frozen)
          {
@@ -402,10 +402,18 @@
             }
             else
             {
-               log.info("cannot find handler for packet " + packet + " client " + this.isClient() + " active " + this.isActive());
+               log.info("cannot find handler for packet " + packet +
+                        " client " +
+                        this.isClient() +
+                        " active " +
+                        this.isActive());
             }
          }
       }
+      finally
+      {
+         currentThread = null;
+      }
    }
 
    public void activate()
@@ -413,14 +421,86 @@
       active = true;
    }
 
+   private static final long FREEZE_TIMEOUT = 5000;
+
    public void freeze()
    {
-      // Prevent any more packets being handled on this connection
+      if (frozen)
+      {
+         return;
+      }
+      
+      frozen = true;
 
-      synchronized (transferLock)
+      if (currentThread != null)
       {
-         frozen = true;
+         // long count = 0;
+         long start = System.currentTimeMillis();
+         while (true)
+         {
+            Thread thread = currentThread;
+            
+            if (thread != null)
+            {                             
+               if (thread instanceof JBMThread)
+               {
+                  JBMThread jthread = (JBMThread)thread;
+      
+                  jthread.setLastLockNonStrict();           
+               }
+               
+               log.info("waiting for thread to complete");
+   
+               Thread.yield();
+   
+               if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
+               {
+                  throw new IllegalStateException("Timed out waiting for thread to complete");
+               }
+   
+               // count++;
+               //            
+               // if (count > 1)
+               // {
+               // Exception e = new Exception();
+               // e.setStackTrace(thread.getStackTrace());
+               // log.info("Waiting for this thread", e);
+               // }
+            }
+            else
+            {
+               break;
+            }
+         }
+         
       }
+      //      
+      //      
+      //      
+      // //TODO check this logic
+      //      
+      // log.info("freezing connection");
+      //      
+      // JBMThread thread = null;
+      //      
+      // if (currentThread != null)
+      // {
+      // thread = (JBMThread)currentThread;
+      //         
+      // thread.jbmInterrupt();
+      // }
+      //            
+      // // Prevent any more packets being handled on this connection
+      //
+      // synchronized (freezeLock)
+      // {
+      // frozen = true;
+      // }
+      //      
+      // if (thread != null)
+      // {
+      // thread.jbmResetInterrupt();
+      // }
    }
 
    // Package protected

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -283,25 +283,28 @@
       }
 
       RemotingConnection replicatingConnection = server.getNonPooledReplicatingConnection();
-           
-      RemotingConnection rc = new RemotingConnectionImpl(connection, replicatingConnection, interceptors, !config.isBackup());
-        
+
+      RemotingConnection rc = new RemotingConnectionImpl(connection,
+                                                         replicatingConnection,
+                                                         interceptors,
+                                                         !config.isBackup());
+
       Channel channel1 = rc.getChannel(1, -1, false);
-      
+
       final Replicator replicator;
-      
+
       if (replicatingConnection != null)
-      {      
+      {
          Channel replicatingChannel = replicatingConnection.getChannel(1, -1, false);
-   
+
          replicator = new ReplicatorImpl(replicatingChannel);
-   
+
          replicatingChannel.setHandler(new ChannelHandler()
          {
             public void handlePacket(final Packet packet)
             {
                if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
-               {                  
+               {
                   replicator.replicationResponseReceived();
                }
                else
@@ -315,7 +318,7 @@
       {
          replicator = null;
       }
-      
+
       ChannelHandler handler = new MessagingServerPacketHandler(server, channel1, rc, replicator);
 
       channel1.setHandler(handler);
@@ -339,7 +342,7 @@
    public void connectionDestroyed(final Object connectionID)
    {
       RemotingConnection conn = connections.get(connectionID);
-      
+
       if (conn != null)
       {
          // if the connection has no failure listeners it means the sesssions etc were already closed so this is a clean
@@ -392,9 +395,7 @@
 
    // Private -------------------------------------------------------
 
-   private void setupPinger(final RemotingConnection conn,
-                                        final long clientFailureCheckPeriod,
-                                        final long connectionTTL)
+   private void setupPinger(final RemotingConnection conn, final long clientFailureCheckPeriod, final long connectionTTL)
    {
       if ((connectionTTL <= 0 || clientFailureCheckPeriod <= 0) && connectionTTL != -1 &&
           clientFailureCheckPeriod != -1)
@@ -411,7 +412,11 @@
 
       long pingPeriod = clientFailureCheckPeriod == -1 ? -1 : clientFailureCheckPeriod / 2;
 
-      Pinger pingRunnable = new Pinger(conn, connectionTTLToUse, null, new FailedConnectionAction(conn), System.currentTimeMillis());
+      Pinger pingRunnable = new Pinger(conn,
+                                       connectionTTLToUse,
+                                       null,
+                                       new FailedConnectionAction(conn),
+                                       System.currentTimeMillis());
 
       Future<?> pingFuture = scheduledThreadPool.scheduleAtFixedRate(pingRunnable, 0, pingPeriod, TimeUnit.MILLISECONDS);
 
@@ -423,19 +428,22 @@
    private RemotingConnection closeConnection(final Object connectionID)
    {
       RemotingConnection connection = connections.remove(connectionID);
-      
+
       Pinger pinger = pingers.remove(connectionID);
 
       if (pinger != null)
       {
          pinger.close();
       }
-      
-      RemotingConnection replConnection = connection.getReplicatingConnection();
-      
-      if (replConnection != null)
+
+      if (connection != null)
       {
-         server.returnNonPooledReplicatingConnection(replConnection);
+         RemotingConnection replConnection = connection.getReplicatingConnection();
+
+         if (replConnection != null)
+         {
+            server.returnNonPooledReplicatingConnection(replConnection);
+         }
       }
 
       return connection;
@@ -452,10 +460,10 @@
       private InitialPingTimeout(final RemotingConnection conn)
       {
          this.conn = conn;
-         
+
          conn.getChannel(0, -1, false).setHandler(this);
       }
-      
+
       public synchronized void handlePacket(final Packet packet)
       {
          final byte type = packet.getType();

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -111,8 +111,6 @@
 
    SimpleString getNodeID();
 
-   // Channel getReplicatingChannel();
-
    RemotingConnection getNonPooledReplicatingConnection();
 
    void returnNonPooledReplicatingConnection(RemotingConnection connection);
@@ -136,4 +134,6 @@
    void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
 
    void handleReplicateRedistribution(final SimpleString queueName, final long messageID) throws Exception;
+   
+   boolean registerBackupConnection(RemotingConnection connection);
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -82,7 +82,6 @@
 import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
 import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
-import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
 import org.jboss.messaging.core.security.CheckType;
 import org.jboss.messaging.core.security.JBMSecurityManager;
 import org.jboss.messaging.core.security.Role;
@@ -118,6 +117,7 @@
 import org.jboss.messaging.utils.UUID;
 import org.jboss.messaging.utils.UUIDGenerator;
 import org.jboss.messaging.utils.VersionLoader;
+import org.jboss.messaging.utils.WeakHashSet;
 
 /**
  * The messaging server implementation
@@ -486,7 +486,7 @@
    {
       return clusterManager;
    }
-   
+
    public ExecutorService getThreadPool()
    {
       return this.threadPool;
@@ -530,24 +530,120 @@
                                                      final int sendWindowSize,
                                                      final boolean direct) throws Exception
    {
+      if (version.getIncrementingVersion() != incrementingVersion)
+      {
+         // For now we need exact compatibility
+
+         throw new MessagingException(MessagingException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
+                                      "client version " + incrementingVersion +
+                                               " not compatible with version: " +
+                                               version.getFullVersion());
+      }
+      
+      if (!direct)
+      {
+         if (!registerBackupConnection(connection))
+         {
+            return null;
+         }
+      }
+
+      // Is this comment relevant any more ?
+
+      // Authenticate. Successful autentication will place a new SubjectContext
+      // on thread local,
+      // which will be used in the authorization process. However, we need to
+      // make sure we clean
+      // up thread local immediately after we used the information, otherwise
+      // some other people
+      // security my be screwed up, on account of thread local security stack
+      // being corrupted.
+
+      securityStore.authenticate(username, password);
+
       if (direct)
       {
          checkActivate(connection);
       }
 
-      return doCreateSession(name,
-                             channelID,
-                             username,
-                             password,
-                             minLargeMessageSize,
-                             incrementingVersion,
-                             connection,
-                             autoCommitSends,
-                             autoCommitAcks,
-                             preAcknowledge,
-                             xa,
-                             sendWindowSize,
-                             false);
+      ServerSession currentSession = sessions.remove(name);
+
+      if (currentSession != null)
+      {
+         // This session may well be on a different connection and different channel id, so we must get rid
+         // of it and create another
+
+         // TODO - is this true any more??
+         currentSession.getChannel().close();
+      }
+
+      Channel channel = connection.getChannel(channelID, sendWindowSize, false);
+
+      RemotingConnection replicatingConnection = connection.getReplicatingConnection();
+
+      final Replicator replicator;
+
+      Channel replicatingChannel;
+
+      if (replicatingConnection != null)
+      {
+         replicatingChannel = replicatingConnection.getChannel(channelID, -1, false);
+
+         replicator = new ReplicatorImpl(replicatingChannel);
+
+         replicatingChannel.setHandler(new ChannelHandler()
+         {
+            public void handlePacket(final Packet packet)
+            {
+               if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
+               {
+                  replicator.replicationResponseReceived();
+               }
+               else
+               {
+                  throw new IllegalArgumentException("Invalid packet " + packet.getType());
+               }
+            }
+         });
+      }
+      else
+      {
+         replicator = null;
+
+         replicatingChannel = null;
+      }
+
+      ServerSessionImpl session = new ServerSessionImpl(name,
+                                                        username,
+                                                        password,
+                                                        minLargeMessageSize,
+                                                        autoCommitSends,
+                                                        autoCommitAcks,
+                                                        preAcknowledge,
+                                                        configuration.isPersistDeliveryCountBeforeDelivery(),
+                                                        xa,
+                                                        connection,
+                                                        storageManager,
+                                                        postOffice,
+                                                        resourceManager,
+                                                        securityStore,
+                                                        executorFactory,
+                                                        channel,
+                                                        replicatingChannel,
+                                                        managementService,
+                                                        queueFactory,
+                                                        this,
+                                                        configuration.getManagementAddress());
+
+      sessions.put(name, session);
+
+      ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, replicator, configuration);
+
+      session.setHandler(handler);
+
+      channel.setHandler(handler);
+      
+      return new CreateSessionResponseMessage(version.getIncrementingVersion());
    }
 
    public void removeSession(final String name) throws Exception
@@ -756,15 +852,15 @@
       activateCallbacks.remove(callback);
    }
 
-   private ConnectorFactory backupConnectorFactory;
+   // private ConnectorFactory backupConnectorFactory;
+   //
+   // private Map<String, Object> backupConnectorParams;
 
-   private Map<String, Object> backupConnectorParams;
+   private volatile ConnectionManager pooledReplicatingConnectionManager;
 
-   private ConnectionManager pooledReplicatingConnectionManager;
+   private volatile ConnectionManager nonPooledReplicatingConnectionManager;
 
-   private ConnectionManager nonPooledReplicatingConnectionManager;
-
-   private void setupBackupConnectorFactory()
+   private void setupConnectionManagers()
    {
       String backupConnectorName = configuration.getBackupConnectorName();
 
@@ -778,22 +874,6 @@
          }
          else
          {
-
-            ClassLoader loader = Thread.currentThread().getContextClassLoader();
-            try
-            {
-               Class<?> clz = loader.loadClass(backupConnector.getFactoryClassName());
-               backupConnectorFactory = (ConnectorFactory)clz.newInstance();
-            }
-            catch (Exception e)
-            {
-               throw new IllegalArgumentException("Error instantiating interceptor \"" + backupConnector.getFactoryClassName() +
-                                                           "\"",
-                                                  e);
-            }
-
-            backupConnectorParams = backupConnector.getParams();
-
             pooledReplicatingConnectionManager = new ConnectionManagerImpl(null, backupConnector, null, false, 10, // TODO
                                                                            // don't
                                                                            // hardcode
@@ -826,7 +906,7 @@
 
    private boolean activatedBackup;
 
-   public synchronized RemotingConnection getPooledReplicatingConnection()
+   public RemotingConnection getPooledReplicatingConnection()
    {
       RemotingConnection conn = null;
 
@@ -838,30 +918,38 @@
       return conn;
    }
 
-   public synchronized RemotingConnection getNonPooledReplicatingConnection()
+   public void returnPooledReplicatingConnection(final RemotingConnection conn)
    {
+      pooledReplicatingConnectionManager.returnConnection(conn);
+   }
+
+   public RemotingConnection getNonPooledReplicatingConnection()
+   {
       RemotingConnection conn = null;
 
       if (nonPooledReplicatingConnectionManager != null)
       {
          conn = nonPooledReplicatingConnectionManager.getConnection(1);
 
-         if (!activatedBackup)
+         synchronized (this)
          {
-            // First time we get channel we send a message down it informing the backup of our node id -
-            // backup and live must have the same node id
+            if (!activatedBackup)
+            {
+               // First time we get channel we send a message down it informing the backup of our node id -
+               // backup and live must have the same node id
 
-            Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
+               Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
 
-            Channel channel1 = conn.getChannel(1, -1, false);
+               Channel channel1 = conn.getChannel(1, -1, false);
 
-            ChannelHandler prevHandler = channel1.getHandler();
+               ChannelHandler prevHandler = channel1.getHandler();
 
-            sendOnReplicatingAndWaitForResponse(packet, channel1);
+               sendOnReplicatingAndWaitForResponse(packet, channel1);
 
-            channel1.setHandler(prevHandler);
+               channel1.setHandler(prevHandler);
 
-            activatedBackup = true;
+               activatedBackup = true;
+            }
          }
 
          // TODO execute outstanding results when failure occurs
@@ -870,11 +958,15 @@
       return conn;
    }
 
-   public synchronized void returnNonPooledReplicatingConnection(final RemotingConnection conn)
+   public void returnNonPooledReplicatingConnection(final RemotingConnection conn)
    {
       nonPooledReplicatingConnectionManager.returnConnection(conn);
    }
 
+   private Set<RemotingConnection> backupConnections = new WeakHashSet<RemotingConnection>();
+
+   
+
    private static class NoCacheConnectionLifeCycleListener implements ConnectionLifeCycleListener
    {
       private RemotingConnection conn;
@@ -939,7 +1031,7 @@
 
          synchronized (this)
          {
-            freezeBackupConnection();
+            freezeBackupConnections();
 
             List<Queue> toActivate = postOffice.activate();
 
@@ -964,6 +1056,7 @@
                queueDeployer.start();
             }
          }
+         log.info("Backup server is now ACTIVATED");
       }
 
       connection.activate();
@@ -978,19 +1071,38 @@
    // connection 1 delivery
    // connection 1 delivery gets replicated
    // can't find message in queue since active was delivered immediately
-   private void freezeBackupConnection()
+   
+   private boolean frozen;
+   
+   private void freezeBackupConnections()
    {
-      // // Sanity check
-      // // All replicated sessions should be on the same connection
-      // RemotingConnection replConnection = null;
-
-      for (ServerSession session : sessions.values())
+      synchronized (backupConnections)
       {
-         RemotingConnection rc = session.getChannel().getConnection();
-
-         rc.freeze();
+         frozen = true;
+         
+         for (RemotingConnection rc : backupConnections)
+         {
+            rc.freeze();
+         }
       }
    }
+   
+   public boolean registerBackupConnection(final RemotingConnection connection)
+   {
+      synchronized (backupConnections)
+      {
+         if (!frozen)
+         {
+            backupConnections.add(connection);
+            
+            return true;
+         }
+         else
+         {
+            return false;
+         }
+      }
+   }
 
    private void initialisePart1() throws Exception
    {
@@ -1022,7 +1134,7 @@
                                                 scheduledPool,
                                                 managementConnectorID);
 
-      setupBackupConnectorFactory();
+      setupConnectionManagers();
    }
 
    private ClusterQueueStateManager clusterQueueStateManager;
@@ -1465,119 +1577,6 @@
       }
    }
 
-   private CreateSessionResponseMessage doCreateSession(final String name,
-                                                        final long channelID,
-                                                        final String username,
-                                                        final String password,
-                                                        final int minLargeMessageSize,
-                                                        final int incrementingVersion,
-                                                        final RemotingConnection connection,
-                                                        final boolean autoCommitSends,
-                                                        final boolean autoCommitAcks,
-                                                        final boolean preAcknowledge,
-                                                        final boolean xa,
-                                                        final int sendWindowSize,
-                                                        final boolean backup) throws Exception
-   {
-      if (version.getIncrementingVersion() < incrementingVersion)
-      {
-         throw new MessagingException(MessagingException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
-                                      "client not compatible with version: " + version.getFullVersion());
-      }
-
-      // Is this comment relevant any more ?
-
-      // Authenticate. Successful autentication will place a new SubjectContext
-      // on thread local,
-      // which will be used in the authorization process. However, we need to
-      // make sure we clean
-      // up thread local immediately after we used the information, otherwise
-      // some other people
-      // security my be screwed up, on account of thread local security stack
-      // being corrupted.
-
-      securityStore.authenticate(username, password);
-
-      ServerSession currentSession = sessions.remove(name);
-
-      if (currentSession != null)
-      {
-         // This session may well be on a different connection and different channel id, so we must get rid
-         // of it and create another
-
-         // TODO - is this true any more??
-         currentSession.getChannel().close();
-      }
-
-      Channel channel = connection.getChannel(channelID, sendWindowSize, false);
-
-      RemotingConnection replicatingConnection = connection.getReplicatingConnection();
-
-      final Replicator replicator;
-
-      Channel replicatingChannel;
-
-      if (replicatingConnection != null)
-      {
-         replicatingChannel = replicatingConnection.getChannel(channelID, -1, false);
-
-         replicator = new ReplicatorImpl(replicatingChannel);
-
-         replicatingChannel.setHandler(new ChannelHandler()
-         {
-            public void handlePacket(final Packet packet)
-            {
-               if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
-               {
-                  replicator.replicationResponseReceived();
-               }
-               else
-               {
-                  throw new IllegalArgumentException("Invalid packet " + packet.getType());
-               }
-            }
-         });
-      }
-      else
-      {
-         replicator = null;
-
-         replicatingChannel = null;
-      }
-
-      final ServerSessionImpl session = new ServerSessionImpl(name,
-                                                              username,
-                                                              password,
-                                                              minLargeMessageSize,
-                                                              autoCommitSends,
-                                                              autoCommitAcks,
-                                                              preAcknowledge,
-                                                              configuration.isPersistDeliveryCountBeforeDelivery(),
-                                                              xa,
-                                                              connection,
-                                                              storageManager,
-                                                              postOffice,
-                                                              resourceManager,
-                                                              securityStore,
-                                                              executorFactory,
-                                                              channel,
-                                                              replicatingChannel,
-                                                              managementService,
-                                                              queueFactory,
-                                                              this,
-                                                              configuration.getManagementAddress());
-
-      sessions.put(name, session);
-
-      ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, replicator, configuration);
-
-      session.setHandler(handler);
-
-      channel.setHandler(handler);
-
-      return new CreateSessionResponseMessage(version.getIncrementingVersion());
-   }
-
    private Transformer instantiateTransformer(final String transformerClassName)
    {
       Transformer transformer = null;

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -116,6 +116,8 @@
             RegisterQueueReplicationChannelMessage msg = (RegisterQueueReplicationChannelMessage)packet;
 
             Channel channel = connection.getChannel(msg.getBindingID(), -1, false);
+            
+            server.registerBackupConnection(channel.getConnection());
 
             channel.setHandler(new QueueReplicationPacketHandler(msg.getBindingID(), server.getPostOffice(), channel));
 
@@ -234,7 +236,10 @@
          }
       }
 
-      channel1.send(response);
+      if (response != null)
+      {
+         channel1.send(response);
+      }
    }
 
    private void handleReattachSession(final ReattachSessionMessage request)

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -1523,6 +1523,12 @@
          }
       }
 
+      //Confirm needs to be done here before response is sent, so the subsequent flush flushes that confirm too, and flush needs
+      //to be send before the response so the send acknowledgements are received on the client before the client session closes
+      //However the actual packets confirmed packet *cannot* be sent before the response is back from the backup otherwise
+      //A session close confirmation could be sent before the backup has knowledge of the close, so on failover the client would
+      //hang forever waiting for the response to session close, so these get queued too
+         
       channel.confirm(packet);
       
       channel.flushConfirmations();
@@ -1570,6 +1576,8 @@
 
       int serverLastConfirmedCommandID = channel.getLastConfirmedCommandID();
 
+      //log.info("telling channel to replay commands up to " + lastConfirmedCommandID);
+      
       channel.replayCommands(lastConfirmedCommandID);
 
       if (wasStarted)

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -76,10 +76,12 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
+import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerSession;
 import org.jboss.messaging.core.server.replication.ReplicableAction;
 import org.jboss.messaging.core.server.replication.Replicator;
 import org.jboss.messaging.core.server.replication.impl.JBMThread;
+import org.jboss.messaging.utils.SimpleString;
 
 /**
  * A ServerSessionPacketHandler
@@ -140,6 +142,8 @@
       
       if (config.isBackup())
       {
+         //log.info(System.identityHashCode(this) + " inv on backup");
+         
          JBMThread thread = JBMThread.currentThread();
 
          thread.setReplay(sequences);
@@ -150,7 +154,21 @@
                   
          if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES)
          {
-            checkConfirm(packet);
+            //checkConfirm(packet);
+            
+            channel.confirm(packet);
+            
+//            if (packet.getType() == PacketImpl.SESS_SEND)
+//            {
+//               SessionSendMessage sm = (SessionSendMessage)packet;
+//               
+//               ServerMessage msg = sm.getServerMessage();
+//               
+//               int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+//               
+//               log.info("confirmed send " + cnt);
+//            }
+               
                        
             channel.send(new ReplicationResponseMessage());
          }
@@ -161,6 +179,8 @@
       }
       else
       {
+        // log.info(System.identityHashCode(this) + " inv on live, repl is " + replicator);
+         
          if (replicator != null)
          {
             replicator.execute(this, 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -24,8 +24,8 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
 
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.server.replication.Replicator;
 
 /**
@@ -37,24 +37,23 @@
  */
 public class JBMThread extends Thread
 {
+   private static final Logger log = Logger.getLogger(JBMThread.class);
+
    private static enum ThreadState
    {
       RECORD, REPLAY, NONE;
    }
    
-//   public long getSequence()
-//   {
-//      return sequence;
-//   }
-//   
    private ThreadState state;
 
    private List<Long> objectSequences;
 
    private int pos;
    
-   private Replicator replicator;     
+   private Replicator replicator;    
    
+ //  private volatile boolean jbmInterrupted;
+      
    public static JBMThread currentThread()
    {
       return (JBMThread)Thread.currentThread();
@@ -151,4 +150,38 @@
    {
       this.replicator = replicator;
    }
+   
+//   public boolean isJBMInterrupted()
+//   {
+//      return jbmInterrupted;
+//   }
+   
+   public void setLastLock(final SequencedLock lock)
+   {
+      this.lastLock = lock;
+   }
+   
+   private volatile SequencedLock lastLock;
+   
+   public void setLastLockNonStrict()
+   {
+      if (lastLock != null)
+      {
+         lastLock.setStrict(false);
+      }
+   }
+   
+//   public void jbmInterrupt()
+//   {
+//      log.info("Interrupting jbm thread");
+//      
+//      jbmInterrupted = true;
+//      
+//      interrupt();
+//   }
+//   
+//   public void jbmResetInterrupt()
+//   {
+//      jbmInterrupted = false;
+//   }
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -173,12 +173,12 @@
 //         {
 //            this.acquirers.put(sequence, thread.getStackTrace());
 //         }
-
-         if (!sequencedLock.lock(sequence))
-         {
-           // dumpLocksWithName(name);
+         
+         if (!sequencedLock.lock(sequence, unit.toNanos(time)))
+         {            
+             // dumpLocksWithName(name);            
          }
-
+         
          addOwner(thread);
 
          return true;

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -40,13 +40,9 @@
 
    private final AtomicLong al;
    
-   private final boolean backup;
-
-   public ReplicationAwareSharedCounter(final boolean backup, final long initialValue)
+   public ReplicationAwareSharedCounter(final long initialValue)
    {
       this.al = new AtomicLong(initialValue);
-      
-      this.backup = backup;
    }
 
    public void set(final long l)
@@ -58,15 +54,8 @@
    {
       JBMThread thread = JBMThread.currentThread();
 
-//      if (thread.isReplay())
-//      {
-      if (backup)
+      if (thread.isReplay())
       {
-         if (!thread.isReplay())
-         {
-            throw new IllegalStateException("Thread should be in replay mode");
-         }
-         
          if (thread.isRecording())
          {
             throw new IllegalStateException("Thread should not be recording");
@@ -74,6 +63,7 @@
          
          long sequence = thread.getNextSequence();
          
+         //FIXME - this will be slow - too much contention with many threads
          synchronized (this)
          {
             if (sequence >= al.get())
@@ -82,16 +72,6 @@
             }
          }
             
-   
-//            log.info(System.identityHashCode(this) + " attempting to get sequence " + sequence);
-//   
-//            while (!al.compareAndSet(sequence, sequence + 1))
-//            {
-//               Thread.yield();
-//            }
-//   
-//            log.info(System.identityHashCode(this) + " got sequence " + sequence);
-   
          return sequence;
       }
       else

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -43,113 +43,78 @@
 {
    private static final Logger log = Logger.getLogger(SequencedLock.class);
 
-   private final Queue<QueueEntry> waiting;
+   private final Queue<QueueEntry> queue;
 
    private final AtomicLong currentSequence;
 
    private Thread owner;
 
+   private final AtomicBoolean locked = new AtomicBoolean(false);
+   
+   private volatile boolean strictOrder = true;
+
    public SequencedLock(final long sequence)
    {
-      waiting = new PriorityBlockingQueue<QueueEntry>();
+      queue = new PriorityBlockingQueue<QueueEntry>();
 
-      // this.currentSequence = sequence;
       this.currentSequence = new AtomicLong(sequence);
    }
-
-   private synchronized void dump(final long sequence)
+   
+   public void setStrict(boolean strict)
    {
-      log.error(System.identityHashCode(this) + " Timed out trying to get lock, desired " +
-                sequence +
-                " current " +
-                currentSequence);
+      this.strictOrder = strict;
+      
+      QueueEntry entry = peekEntry();
 
-      log.info("Current owner is " + owner);
-
-      log.info("Dumping entries");
-
-      Object[] sorted = waiting.toArray();
-
-      Arrays.sort(sorted);
-
-      for (Object obj : sorted)
+      if (entry != null)
       {
-         QueueEntry en = (QueueEntry)obj;
-
-         log.info("seq:" + en.sequence + " thread:" + System.identityHashCode(en.thread));
+         LockSupport.unpark(entry.thread);
       }
-
-      try
-      {
-         Thread.sleep(1000000);
-      }
-      catch (Exception e)
-      {
-      }
    }
-
-   private void addEntry(final QueueEntry entry)
+           
+   //TODO parking with a timeout seems to be a lot slower than parking without timeout
+   public boolean lock(final long sequence, final long timeout)
    {
-//      synchronized (waiting)
-//      {
+      JBMThread currentThread = JBMThread.currentThread();
 
-         waiting.add(entry);
-     // }
-   }
+      QueueEntry entry = new QueueEntry(sequence, currentThread);
 
-   private QueueEntry peekEntry()
-   {
-//      synchronized (waiting)
-//      {
-         QueueEntry entry = waiting.peek();
+      queue.add(entry);
 
-         if (entry != null && entry.sequence == currentSequence.get())
-         {
-            return entry;
-         }
-         else
-         {
-            return null;
-         }
-     // }
+      long start = System.nanoTime();
 
-   }
+      long toWait = timeout;
 
-   private void removeEntry()
-   {
-     // synchronized (waiting)
-     // {
-         waiting.remove();
-     // }
-   }
-
-   private final AtomicBoolean locked = new AtomicBoolean(false);
-
-   public boolean lock(final long sequence)
-   {
-      Thread currentThread = Thread.currentThread();
-
-      // log.info("Thread " + System.identityHashCode(currentThread) + " trying to lock " +
-      // System.identityHashCode(this) + " with sequence " + sequence);
-
-      QueueEntry entry = new QueueEntry(sequence, currentThread);
-
-      addEntry(entry);
-
-      boolean wasInterrupted = false;
-
       while (true)
       {
          QueueEntry peeked = peekEntry();
 
          if (peeked == null || peeked.thread != currentThread || !locked.compareAndSet(false, true))
-         {
-            LockSupport.park();
+         {            
+            currentThread.setLastLock(this);
+            
+            LockSupport.parkNanos(toWait);
+            
+//            if (currentThread.isJBMInterrupted())
+//            {
+//               log.info("**** setting strict order to false");
+//               strictOrder = false;
+//               
+//               //Thread.interrupted();
+//            }
 
-            if (Thread.interrupted())
+            long now = System.nanoTime();
+
+            toWait -= now - start;
+
+            if (toWait <= 0)
             {
-               wasInterrupted = true;
+               log.warn("Timed out waiting for sequenced lock, current " + currentSequence.get() + " expected " + sequence);
+               
+               return false;
             }
+
+            start = now;
          }
          else
          {
@@ -157,26 +122,15 @@
          }
       }
 
-      if (wasInterrupted)
-      {
-         currentThread.interrupt();
-      }
+      queue.remove();
 
-      removeEntry();
-
       owner = currentThread;
 
-      // log.info("Thread " + System.identityHashCode(currentThread) + " locked " + System.identityHashCode(this) + "
-      // with sequence " + sequence);
-
       return true;
    }
 
    public void unlock()
    {
-      // log.info("Thread " + System.identityHashCode(Thread.currentThread()) + " unlocking " +
-      // System.identityHashCode(this));
-
       if (owner != Thread.currentThread())
       {
          throw new IllegalMonitorStateException();
@@ -190,10 +144,25 @@
 
       if (entry != null)
       {
-         LockSupport.unpark(peekEntry().thread);
+         LockSupport.unpark(entry.thread);
       }
    }
 
+   private QueueEntry peekEntry()
+   {
+      QueueEntry entry = queue.peek();
+
+      if (entry != null)
+      {
+         if (!strictOrder ||entry.sequence == currentSequence.get())
+         {
+            return entry;
+         }
+      }
+      
+      return null;      
+   }
+
    private static final class QueueEntry implements Comparable<QueueEntry>
    {
       private final long sequence;
@@ -219,4 +188,36 @@
          return sequence < l ? -1 : (sequence == l ? 0 : 1);
       }
    }
+
+   private synchronized void dump(final long sequence)
+   {
+      log.error(System.identityHashCode(this) + " Timed out trying to get lock, desired " +
+                sequence +
+                " current " +
+                currentSequence);
+
+      log.info("Current owner is " + owner);
+
+      log.info("Dumping entries");
+
+      Object[] sorted = queue.toArray();
+
+      Arrays.sort(sorted);
+
+      for (Object obj : sorted)
+      {
+         QueueEntry en = (QueueEntry)obj;
+
+         log.info("seq:" + en.sequence + " thread:" + System.identityHashCode(en.thread));
+      }
+
+      try
+      {
+         Thread.sleep(1000000);
+      }
+      catch (Exception e)
+      {
+      }
+   }
+
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/utils/ConcurrentHashSet.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/utils/ConcurrentHashSet.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/utils/ConcurrentHashSet.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -40,7 +40,7 @@
  */
 public class ConcurrentHashSet<E> extends AbstractSet<E> implements ConcurrentSet<E>
 {
-   private ConcurrentMap<E, Object> theMap;
+   private final ConcurrentMap<E, Object> theMap;
    
    private static final Object dummy = new Object();
    

Added: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/utils/WeakHashSet.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/utils/WeakHashSet.java	                        (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/utils/WeakHashSet.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.utils;
+
+import java.util.AbstractSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+/**
+ * 
+ * A WeakHashSet
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * @param <E>
+ *
+ */
+public class WeakHashSet<E> extends AbstractSet<E>
+{
+   private final Map<E, Object> theMap;
+   
+   private static final Object dummy = new Object();
+   
+   public WeakHashSet()
+   {
+      theMap = new WeakHashMap<E, Object>();
+   }
+   
+   public int size()
+   {
+      return theMap.size();
+   }
+   
+   public Iterator<E> iterator()
+   {
+      return theMap.keySet().iterator();
+   }
+   
+   public boolean isEmpty()
+   {
+      return theMap.isEmpty();
+   }
+   
+   public boolean add(E o)
+   {
+      return theMap.put(o, dummy) == null;
+   }
+   
+   public boolean contains(Object o)
+   {
+      return theMap.containsKey(o);
+   }
+   
+   public void clear()
+   {
+      theMap.clear();
+   }
+   
+   public boolean remove(Object o)
+   {
+      return theMap.remove(o) == dummy;
+   }   
+}

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -120,8 +120,10 @@
 
             private final int threadNum;
 
-            Runner(final RunnableT test, final int threadNum)
+            Runner(final String name, final RunnableT test, final int threadNum)
             {
+               super(name);
+               
                this.test = test;
 
                this.threadNum = threadNum;
@@ -142,7 +144,7 @@
 
                   // Case a failure happened here, it should print the Thread dump
                   // Sending it to System.out, as it would show on the Tests report
-                  System.out.println(threadDump(" - fired by MultiThreadRandomFailoverTestBase::runTestMultipleThreads (" + t.getLocalizedMessage() + ")"));
+                  //System.out.println(threadDump(" - fired by MultiThreadRandomFailoverTestBase::runTestMultipleThreads (" + t.getLocalizedMessage() + ")"));
                }
             }
          }
@@ -153,7 +155,7 @@
 
             for (int i = 0; i < numThreads; i++)
             {
-               Runner runner = new Runner(runnable, i);
+               Runner runner = new Runner("MultiThreadFailoverTest-thread-" + i, runnable, i);
 
                threads.add(runner);
 

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -40,7 +40,6 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.Pinger;
 import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.jms.client.JBossBytesMessage;
@@ -91,7 +90,7 @@
          {
             doTestA(sf, threadNum);
          }
-      }, NUM_THREADS, false);
+      }, 1, false);
    }
 
    public void testB() throws Exception
@@ -327,7 +326,7 @@
 
       final int numMessages = 100;
 
-      final int numSessions = 10;
+      final int numSessions = 1;
 
       Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
       Set<ClientSession> sessions = new HashSet<ClientSession>();
@@ -1287,7 +1286,7 @@
 
    protected int getNumIterations()
    {
-      return 2;
+      return 1000;
    }
 
    @Override

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/SequencedLockTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/SequencedLockTest.java	2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/SequencedLockTest.java	2009-07-10 10:59:04 UTC (rev 7555)
@@ -39,6 +39,8 @@
 public class SequencedLockTest extends TestCase
 {
    private static final Logger log = Logger.getLogger(SequencedLockTest.class);
+   
+   private static final long TIMEOUT = 5000000000L;
 
    public void testAscending() throws Exception
    {
@@ -144,7 +146,7 @@
             
           //  log.info("Trying lock " + cnt);
             
-            lock.lock(cnt);
+            lock.lock(cnt, TIMEOUT);
             
             int ob = observedCounter.getAndIncrement();
             
@@ -212,7 +214,7 @@
       {
          // log.info(System.identityHashCode(this) + " Will attempt to obtain lock with sequence " + seq);
 
-         lock.lock(seq);
+         lock.lock(seq, TIMEOUT);
 
          // log.info(System.identityHashCode(this) + " Obtained lock with sequence " + seq);
 




More information about the jboss-cvs-commits mailing list