[jboss-cvs] JBoss Messaging SVN: r7515 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/remoting and 11 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jul 2 12:25:40 EDT 2009


Author: timfox
Date: 2009-07-02 12:25:39 -0400 (Thu, 02 Jul 2009)
New Revision: 7515

Modified:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMRegistry.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/ServerSession.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/Replicator.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareAtomicLong.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
   branches/Branch_MultiThreaded_Replication/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java
Log:
mt replication

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -727,15 +727,17 @@
 
          remotingConnection = backupConnection;
 
-         Packet request = new ReattachSessionMessage(name, channel.getLastReceivedCommandID());
+         Packet request = new ReattachSessionMessage(name, channel.getLastConfirmedCommandID());
 
          Channel channel1 = backupConnection.getChannel(1, -1, false);
 
          ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
+         
+         log.info("Got a response with a last received command id " + response.getLastConfirmedCommandID());
 
          if (!response.isRemoved())
          {
-            channel.replayCommands(response.getLastReceivedCommandID());
+            channel.replayCommands(response.getLastConfirmedCommandID());
 
             ok = true;
          }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -41,9 +41,9 @@
 
    void transferConnection(RemotingConnection newConnection);
    
-   void replayCommands(int lastReceivedCommandID);
+   void replayCommands(int lastConfirmedCommandID);
 
-   int getLastReceivedCommandID();
+   int getLastConfirmedCommandID();
 
    void lock();
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -68,7 +68,7 @@
 
    private volatile int firstStoredCommandID;
 
-   private volatile int lastReceivedCommandID = -1;
+   private volatile int lastConfirmedCommandID = -1;
 
    private volatile RemotingConnection connection;
 
@@ -140,9 +140,9 @@
       return id;
    }
 
-   public int getLastReceivedCommandID()
+   public int getLastConfirmedCommandID()
    {
-      return lastReceivedCommandID;
+      return lastConfirmedCommandID;
    }
 
    public Lock getLock()
@@ -515,13 +515,14 @@
       }
    }
 
-   public void replayCommands(final int otherLastReceivedCommandID)
+   public void replayCommands(final int otherLastConfirmedCommandID)
    {
-      clearUpTo(otherLastReceivedCommandID);
+      clearUpTo(otherLastConfirmedCommandID);
 
       for (final Packet packet : resendCache)
       {
-         log.info("Replaying command " + packet);
+         //log.info("Replaying command " + packet);
+         
          doWrite(packet);
       }
    }
@@ -557,7 +558,7 @@
       {
          receivedBytes = 0;
 
-         final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+         final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID);
 
          confirmed.setChannelID(id);
 
@@ -569,7 +570,7 @@
    {
       if (resendCache != null && packet.isRequiresConfirmations())
       {
-         lastReceivedCommandID++;
+         lastConfirmedCommandID++;
 
          receivedBytes += packet.getPacketSize();
 
@@ -579,7 +580,8 @@
 
             if (connection.isActive())
             {
-               final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+              // log.info("sending packet confirmed message " + lastConfirmedCommandID);
+               final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID);
 
                confirmed.setChannelID(id);
 
@@ -731,13 +733,16 @@
       connection.getTransportConnection().write(buffer);
    }
 
-   private void clearUpTo(final int lastReceivedCommandID)
+   private void clearUpTo(final int lastConfirmedCommandID)
    {
-      final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
+      //log.info("client " + connection.isClient() + " clearing up to " + lastConfirmedCommandID);
+      
+      final int numberToClear = 1 + lastConfirmedCommandID - firstStoredCommandID;
 
-      if (numberToClear == -1)
+      if (numberToClear < 0)
       {
-         throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
+         throw new IllegalArgumentException("Invalid lastConfirmedCommandID: " + lastConfirmedCommandID + 
+                                            " firstStoredCommandID " + firstStoredCommandID + " client " + connection.isClient());
       }
 
       int sizeToFree = 0;
@@ -750,7 +755,7 @@
          {
             throw new IllegalStateException(System.identityHashCode(this) + " Can't find packet to clear: " +
                                             " last received command id " +
-                                            lastReceivedCommandID +
+                                            lastConfirmedCommandID +
                                             " first stored command id " +
                                             firstStoredCommandID);
          }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMRegistry.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMRegistry.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMRegistry.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -42,6 +42,7 @@
 
    public void registerAcceptor(final int id, final InVMAcceptor acceptor)
    {     
+      //log.info("Registering acceptor with id " + id, new Exception());
       if (acceptors.putIfAbsent(id, acceptor) != null)
       {
          throw new IllegalArgumentException("Acceptor with id " + id + " already registered");
@@ -68,6 +69,11 @@
 
    public int size()
    {
+      log.info("** remaining acceptors");
+      for (Integer i: acceptors.keySet())
+      {
+         log.info("id: " + i);
+      }
       return this.acceptors.size();
    }
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -40,19 +40,19 @@
 
    private String name;
    
-   private int lastReceivedCommandID;
+   private int lastConfirmedCommandID;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public ReattachSessionMessage(final String name, final int lastReceivedCommandID)
+   public ReattachSessionMessage(final String name, final int lastConfirmedCommandID)
    {
       super(REATTACH_SESSION);
 
       this.name = name;
       
-      this.lastReceivedCommandID = lastReceivedCommandID;
+      this.lastConfirmedCommandID = lastConfirmedCommandID;
    }
    
    public ReattachSessionMessage()
@@ -67,9 +67,9 @@
       return name;
    }
    
-   public int getLastReceivedCommandID()
+   public int getLastConfirmedCommandID()
    {
-      return lastReceivedCommandID;
+      return lastConfirmedCommandID;
    }
    
    public int getRequiredBufferSize()
@@ -81,13 +81,13 @@
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.writeString(name);
-      buffer.writeInt(lastReceivedCommandID);
+      buffer.writeInt(lastConfirmedCommandID);
    }
    
    public void decodeBody(final MessagingBuffer buffer)
    {
       name = buffer.readString();
-      lastReceivedCommandID = buffer.readInt();
+      lastConfirmedCommandID = buffer.readInt();
    }
 
    public boolean equals(Object other)

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -38,7 +38,7 @@
 
    // Attributes ----------------------------------------------------
 
-   private int lastReceivedCommandID;
+   private int lastConfirmedCommandID;
    
    //Is this flag really necessary - try removing it
    private boolean removed;
@@ -47,11 +47,11 @@
 
    // Constructors --------------------------------------------------
 
-   public ReattachSessionResponseMessage(final int lastReceivedCommandID, final boolean removed)
+   public ReattachSessionResponseMessage(final int lastConfirmedCommandID, final boolean removed)
    {
       super(REATTACH_SESSION_RESP);
 
-      this.lastReceivedCommandID = lastReceivedCommandID;
+      this.lastConfirmedCommandID = lastConfirmedCommandID;
       
       this.removed = removed;
    }
@@ -63,9 +63,9 @@
 
    // Public --------------------------------------------------------
 
-   public int getLastReceivedCommandID()
+   public int getLastConfirmedCommandID()
    {
-      return lastReceivedCommandID;
+      return lastConfirmedCommandID;
    }
    
    public boolean isRemoved()
@@ -81,13 +81,13 @@
 
    public void encodeBody(final MessagingBuffer buffer)
    { 
-      buffer.writeInt(lastReceivedCommandID);
+      buffer.writeInt(lastConfirmedCommandID);
       buffer.writeBoolean(removed);
    }
    
    public void decodeBody(final MessagingBuffer buffer)
    { 
-      lastReceivedCommandID = buffer.readInt();
+      lastConfirmedCommandID = buffer.readInt();
       removed = buffer.readBoolean();
    }
    
@@ -105,7 +105,7 @@
             
       ReattachSessionResponseMessage r = (ReattachSessionResponseMessage)other;
       
-      return super.equals(other) && this.lastReceivedCommandID == r.lastReceivedCommandID;
+      return super.equals(other) && this.lastConfirmedCommandID == r.lastConfirmedCommandID;
    }
    
    public final boolean isRequiresConfirmations()

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -35,19 +35,15 @@
 
    private List<Long> sequences;
 
-   private boolean requiresResponse;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public ReplicateLockSequenceMessage(final List<Long> sequences, final boolean requiresResponse)
+   public ReplicateLockSequenceMessage(final List<Long> sequences)
    {
       super(REPLICATE_LOCK_SEQUENCES);
 
       this.sequences = sequences;
-
-      this.requiresResponse = requiresResponse;
    }
 
    // Public --------------------------------------------------------
@@ -59,10 +55,7 @@
 
    public int getRequiredBufferSize()
    {
-      return BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
-             sequences.size() *
-             DataConstants.SIZE_LONG +
-             DataConstants.SIZE_BOOLEAN;
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_INT + sequences.size() * DataConstants.SIZE_LONG;
    }
 
    @Override
@@ -73,7 +66,6 @@
       {
          buffer.writeLong(sequence);
       }
-      buffer.writeBoolean(requiresResponse);
    }
 
    @Override
@@ -85,18 +77,12 @@
       {
          sequences.add(buffer.readLong());
       }
-      requiresResponse = buffer.readBoolean();
    }
 
    public List<Long> getSequences()
    {
       return sequences;
    }
-   
-   public boolean isRequiresResponse()
-   {
-      return requiresResponse;
-   }
 
    // Package protected ---------------------------------------------
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -235,6 +235,8 @@
       for (Acceptor acceptor : acceptors)
       {
          acceptor.stop();
+         
+         log.info("Stopping acceptor " + acceptor);
       }
 
       acceptors.clear();

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -67,7 +67,7 @@
    
    void unregisterActivateCallback(ActivateCallback callback);
     
-   ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
+   ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastConfirmedCommandID) throws Exception;
 
    CreateSessionResponseMessage createSession(String name,
                                               long channelID,                                              

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/ServerSession.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/ServerSession.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -140,7 +140,7 @@
 
    void handleClose(Packet packet);
 
-   int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
+   int transferConnection(RemotingConnection newConnection, int lastConfirmedCommandID);
 
    Channel getChannel();
    

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -286,7 +286,9 @@
       // We start the remoting service here - if the server is a backup remoting service needs to be started
       // so it can be initialised by the live node
       remotingService.start();
-
+      
+      started = true;
+      
       log.info("JBoss Messaging Server version " + getVersion().getFullVersion() + " started");
    }
 
@@ -309,6 +311,8 @@
          session.getChannel().flushConfirmations();
       }
 
+      log.info("Stopping remoting service on backup " + configuration.isBackup());
+      
       remotingService.stop();
 
       // Stop the deployers
@@ -333,18 +337,35 @@
 
       managementService.stop();
 
-      storageManager.stop();
+      if (storageManager != null)
+      {
+         storageManager.stop();
+      }
 
       if (securityManager != null)
       {
          securityManager.stop();
       }
 
-      resourceManager.stop();
+      if (resourceManager != null)
+      {
+         resourceManager.stop();
+      }
 
-      clusterQueueStateManager.stop();
+      if (clusterQueueStateManager != null)
+      {
+         clusterQueueStateManager.stop();
+      }
 
-      postOffice.stop();
+      if (postOffice != null)
+      {
+         postOffice.stop();
+      }
+      
+      if (replicatingConnectionManager != null)
+      {
+         replicatingConnectionManager.close();
+      }
 
       // Need to shutdown pools before shutting down paging manager to make sure everything is written ok
 
@@ -371,7 +392,10 @@
       scheduledPool = null;
       threadPool = null;
 
-      pagingManager.stop();
+      if (pagingManager != null)
+      {
+         pagingManager.stop();
+      }
 
       pagingManager = null;
       securityStore = null;
@@ -386,6 +410,9 @@
       sessions.clear();
 
       started = false;
+      
+      log.info(System.identityHashCode(this) + " called stop ");
+      
       initialised = false;
       uuid = null;
       nodeID = null;
@@ -462,7 +489,7 @@
 
    public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
                                                          final String name,
-                                                         final int lastReceivedCommandID) throws Exception
+                                                         final int lastConfirmedCommandID) throws Exception
    {
       ServerSession session = sessions.get(name);
 
@@ -480,11 +507,11 @@
       else
       {
          // Reconnect the channel to the new connection
-         int serverLastReceivedCommandID = session.transferConnection(connection, lastReceivedCommandID);
-
-         log.info("Reattached session ok");
+         int serverLastConfirmedCommandID = session.transferConnection(connection, lastConfirmedCommandID);
+                  
+         log.info("Reattached session ok, server last received command id is " + serverLastConfirmedCommandID);
          
-         return new ReattachSessionResponseMessage(serverLastReceivedCommandID, false);
+         return new ReattachSessionResponseMessage(serverLastConfirmedCommandID, false);
       }
    }
 
@@ -940,28 +967,29 @@
    // can't find message in queue since active was delivered immediately
    private void freezeBackupConnection()
    {
-      // Sanity check
-      // All replicated sessions should be on the same connection
-      RemotingConnection replConnection = null;
+//      // Sanity check
+//      // All replicated sessions should be on the same connection
+//      RemotingConnection replConnection = null;
 
       for (ServerSession session : sessions.values())
       {
          RemotingConnection rc = session.getChannel().getConnection();
 
-         if (replConnection == null)
-         {
-            replConnection = rc;
-         }
-         else if (replConnection != rc)
-         {
-            throw new IllegalStateException("More than one replicating connection!");
-         }
+//         if (replConnection == null)
+//         {
+//            replConnection = rc;
+//         }
+//         else if (replConnection != rc)
+//         {
+//            throw new IllegalStateException("More than one replicating connection!");
+//         }
+         rc.freeze();
       }
-
-      if (replConnection != null)
-      {
-         replConnection.freeze();
-      }
+//
+//      if (replConnection != null)
+//      {
+//         replConnection.freeze();
+//      }
    }
 
    private void initialisePart1() throws Exception
@@ -1162,7 +1190,7 @@
 
       initialised = true;
 
-      started = true;
+      log.info(System.identityHashCode(this) + " called initialise part2");
    }
 
    private void deployQueuesFromConfiguration() throws Exception

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -61,7 +61,7 @@
 
    private volatile List<Long> sequences;
    
-   private volatile boolean requiresReplicationResponse;
+  // private volatile boolean requiresReplicationResponse;
 
    public MessagingServerPacketHandler(final MessagingServer server,
                                        final Channel channel1,
@@ -95,7 +95,7 @@
             
             sequences = msg.getSequences();  
             
-            requiresReplicationResponse = msg.isRequiresResponse();
+          //  requiresReplicationResponse = msg.isRequiresResponse();
             
             return;
          }
@@ -161,7 +161,7 @@
             {
                if (replicator != null)
                {
-                  replicator.execute(action);
+                  replicator.execute(action, null);
                }
                else
                {
@@ -188,7 +188,7 @@
       
       // send the response message
 
-      if (server.getConfiguration().isBackup() && requiresReplicationResponse || type == REPLICATE_STARTUP_INFO)
+      if (server.getConfiguration().isBackup() || type == REPLICATE_STARTUP_INFO)
       {
          channel1.send(new ReplicationResponseMessage());
       }
@@ -236,7 +236,7 @@
 
       try
       {
-         response = server.reattachSession(connection, request.getName(), request.getLastReceivedCommandID());
+         response = server.reattachSession(connection, request.getName(), request.getLastConfirmedCommandID());
       }
       catch (Exception e)
       {

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -1714,7 +1714,7 @@
 
          do
          {
-            replicator.execute(action);
+            replicator.execute(action, null);
 
             handled = action.getResult();
          }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -56,7 +56,7 @@
 
    private volatile List<Long> sequences;
    
-   private volatile boolean requiresReplicationResponse;
+   //private volatile boolean requiresReplicationResponse;
    
    private final PostOffice postOffice;
    
@@ -81,7 +81,7 @@
             
             sequences = msg.getSequences();
             
-            requiresReplicationResponse = msg.isRequiresResponse();
+            //requiresReplicationResponse = msg.isRequiresResponse();
             
             break;
          }
@@ -114,10 +114,7 @@
 //               log.info("*** delivered message on backup");
 //            }
             
-            if (this.requiresReplicationResponse)
-            {
-               channel.send(new ReplicationResponseMessage());
-            }
+            channel.send(new ReplicationResponseMessage());            
             
             thread.setNoReplayOrRecord();
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -491,7 +491,7 @@
          
          if ((flowControl && availableCredits <= 0) || !started)
          {
-            log.info("busy");
+            //log.info("busy");
             return HandleStatus.BUSY;
          }
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -351,7 +351,7 @@
    {
       setStarted(true);
 
-      channel.confirm(packet);
+      ////channel.confirm(packet);
    }
 
    public void handleStop(final Packet packet)
@@ -360,7 +360,7 @@
 
       setStarted(false);
 
-      channel.confirm(packet);
+      ////channel.confirm(packet);
 
       channel.send(response);
    }
@@ -430,7 +430,7 @@
          }
       }
 
-      channel.confirm(packet);
+      ////channel.confirm(packet);
 
       channel.send(response);
    }
@@ -549,7 +549,7 @@
          }
       }
 
-      channel.confirm(packet);
+      ////channel.confirm(packet);
 
       channel.send(response);
    }
@@ -625,7 +625,7 @@
          }
       }
 
-      channel.confirm(packet);
+      ////channel.confirm(packet);
 
       channel.send(response);
    }
@@ -663,7 +663,7 @@
          }
       }
 
-      channel.confirm(packet);
+      ////channel.confirm(packet);
 
       channel.send(response);
    }
@@ -716,7 +716,7 @@
          }
       }
 
-      channel.confirm(packet);
+      ////channel.confirm(packet);
 
       channel.send(response);
    }
@@ -762,7 +762,7 @@
          }
       }
 
-      channel.confirm(packet);
+      ////channel.confirm(packet);
 
       channel.send(response);
    }
@@ -799,7 +799,7 @@
          }
       }
 
-      channel.confirm(packet);
+      ////channel.confirm(packet);
 
       if (response != null)
       {
@@ -824,7 +824,7 @@
          log.error("Failed to acknowledge", e);
       }
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
    }
 
    public void handleCommit(final Packet packet)
@@ -855,7 +855,7 @@
          tx = new TransactionImpl(storageManager);
       }
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       channel.send(response);
    }
@@ -884,7 +884,7 @@
          }
       }
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       channel.send(response);
    }
@@ -947,7 +947,7 @@
          }
       }
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       channel.send(response);
    }
@@ -1021,7 +1021,7 @@
          }
       }
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       channel.send(response);
    }
@@ -1033,7 +1033,7 @@
 
       Packet response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       channel.send(response);
    }
@@ -1084,7 +1084,7 @@
          }
       }
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       channel.send(response);
    }
@@ -1146,7 +1146,7 @@
          }
       }
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       channel.send(response);
    }
@@ -1209,7 +1209,7 @@
          }
       }
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       channel.send(response);
    }
@@ -1260,7 +1260,7 @@
          }
       }
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       channel.send(response);
    }
@@ -1309,7 +1309,7 @@
          }
       }
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       channel.send(response);
    }
@@ -1369,7 +1369,7 @@
          }
       }
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       channel.send(response);
    }
@@ -1378,7 +1378,7 @@
    {
       Packet response = new SessionXAGetInDoubtXidsResponseMessage(resourceManager.getPreparedTransactions());
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       channel.send(response);
    }
@@ -1387,7 +1387,7 @@
    {
       Packet response = new SessionXAGetTimeoutResponseMessage(resourceManager.getTimeoutSeconds());
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       channel.send(response);
    }
@@ -1396,7 +1396,7 @@
    {
       Packet response = new SessionXASetTimeoutResponseMessage(resourceManager.setTimeoutSeconds(packet.getTimeoutSeconds()));
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       channel.send(response);
    }
@@ -1446,7 +1446,7 @@
          }
       }
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       if (response != null)
       {
@@ -1503,7 +1503,7 @@
          }
       }
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       if (response != null)
       {
@@ -1535,7 +1535,7 @@
          }
       }
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
 
       // We flush the confirmations to make sure any send confirmations get handled on the client side
       channel.flushConfirmations();
@@ -1545,7 +1545,7 @@
       channel.close();
    }
 
-   public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
+   public int transferConnection(final RemotingConnection newConnection, final int lastConfirmedCommandID)
    {
       boolean wasStarted = this.started;
 
@@ -1577,10 +1577,10 @@
       remotingConnection.addFailureListener(this);
       remotingConnection.addCloseListener(this);
 
-      int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
+      int serverLastConfirmedCommandID = channel.getLastConfirmedCommandID();
 
       log.info("replaying commands");
-      channel.replayCommands(lastReceivedCommandID);
+      channel.replayCommands(lastConfirmedCommandID);
 
       if (wasStarted)
       {
@@ -1589,7 +1589,7 @@
 
       log.info("Transferred connection");
       
-      return serverLastReceivedCommandID;
+      return serverLastConfirmedCommandID;
    }
 
    public Channel getChannel()
@@ -1698,7 +1698,7 @@
       {
          Packet response = null;
 
-         channel.confirm(packet);
+         //channel.confirm(packet);
 
          if (response != null)
          {
@@ -1721,7 +1721,7 @@
       {
          log.error("Failed to receive credits", e);
       }
-      channel.confirm(packet);
+      //channel.confirm(packet);
    }
 
    private void doSendLargeMessage(final SessionSendLargeMessage packet)
@@ -1737,7 +1737,7 @@
          log.error("Failed to send message", e);
       }
 
-      channel.confirm(packet);
+      //channel.confirm(packet);
    }
 
    private void handleManagementMessage(final ServerMessage message) throws Exception

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -104,7 +104,7 @@
    
    private volatile List<Long> sequences;
    
-   private volatile boolean requiresReplicationResponse;
+ //  private volatile boolean requiresReplicationResponse;
 
    private final Channel channel;
 
@@ -151,10 +151,12 @@
          handlePacket();
 
          // send the response message
-
-         if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES && this.requiresReplicationResponse)
+                  
+         if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES)
          {
-            log.info("sending back replication response");
+            channel.confirm(packet);
+            
+            //log.info("sending back replication response");
             channel.send(new ReplicationResponseMessage());
          }
          
@@ -164,11 +166,20 @@
       {
          if (replicator != null)
          {
-            replicator.execute(this);
+            replicator.execute(this, 
+                               new Runnable()
+            {
+               public void run()
+               {
+                  channel.confirm(packet);
+               }
+            });
          }
          else
          {
             handlePacket();
+            
+            channel.confirm(packet);
          }
       }
    }
@@ -195,7 +206,7 @@
 
                sequences = msg.getSequences();
                
-               this.requiresReplicationResponse = msg.isRequiresResponse();
+             //  this.requiresReplicationResponse = msg.isRequiresResponse();
 
                // dumpSequences(sequences);
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/Replicator.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/Replicator.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/Replicator.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -35,13 +35,9 @@
  */
 public interface Replicator
 {
-   void execute(ReplicableAction action);
+   void execute(ReplicableAction action, Runnable postReplicateAction);
    
    void registerWaitingChannel(Channel channel);
    
-  // boolean isResponseReceived();
-   
    void replicationResponseReceived();
-   
-  // long getReplicateSequence();
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareAtomicLong.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareAtomicLong.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareAtomicLong.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -69,7 +69,10 @@
       {
          long sequence = al.getAndIncrement();
 
-         thread.addSequence(sequence);
+         if (thread.isRecording())
+         {
+            thread.addSequence(sequence);
+         }
          
          return sequence;
       }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -144,7 +144,10 @@
 
          if (ok)
          {
-            thread.addSequence(counter.getAndIncrement());
+            if (thread.isRecording())
+            {
+               thread.addSequence(counter.getAndIncrement());
+            }
 
             addOwner(thread);
          }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -48,14 +48,17 @@
 
    private final Channel replicatingChannel;
    
-   private final Queue<Set<Channel>> waitingChannelsQueue = new ConcurrentLinkedQueue<Set<Channel>>();
+   private final Queue<WaitingChannelsHolder> waitingChannelsQueue = new ConcurrentLinkedQueue<WaitingChannelsHolder>();
 
    private Set<Channel> currentChannels;
 
-  // private long responseSequence;
-   
-  // private long replicateSequence;
+   private static class WaitingChannelsHolder
+   {
+      Runnable postReplicateAction;
       
+      Set<Channel> channels;
+   }
+      
    public ReplicatorImpl(final Channel replicatingChannel)
    {
       this.replicatingChannel = replicatingChannel;
@@ -68,17 +71,20 @@
 
    public void replicationResponseReceived()
    {
-      //long sequence = responseSequence++; 
-      
-      Set<Channel> waitingChannels = waitingChannelsQueue.remove();
+      WaitingChannelsHolder waitingChannelsHolder = waitingChannelsQueue.remove();
             
-      for (Channel channel : waitingChannels)
+      for (Channel channel : waitingChannelsHolder.channels)
       {        
          channel.replicationResponseReceived(this);
+      }         
+      
+      if (waitingChannelsHolder.postReplicateAction != null)
+      {
+         waitingChannelsHolder.postReplicateAction.run();
       }
    }
 
-   public void execute(final ReplicableAction action)
+   public void execute(final ReplicableAction action, final Runnable postReplicateAction)
    {
       // First we execute the action
        
@@ -98,12 +104,13 @@
 
       // We then send the sequences to the backup
       
-      if (!currentChannels.isEmpty())
-      {
-         waitingChannelsQueue.add(currentChannels);
-      }
+      WaitingChannelsHolder holder = new WaitingChannelsHolder();
+      holder.channels = currentChannels;
+      holder.postReplicateAction = postReplicateAction;
+      
+      waitingChannelsQueue.add(holder);      
 
-      Packet packet = new ReplicateLockSequenceMessage(sequences, !currentChannels.isEmpty());
+      Packet packet = new ReplicateLockSequenceMessage(sequences);
 
       replicatingChannel.send(packet);
 

Modified: branches/Branch_MultiThreaded_Replication/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -356,6 +356,7 @@
 
             TextMessage m = session.createTextMessage("message one");
 
+                      
             prod.send(m);
          }
 

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java	2009-07-02 11:27:55 UTC (rev 7514)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java	2009-07-02 16:25:39 UTC (rev 7515)
@@ -85,140 +85,122 @@
 
    public void testReplication1() throws Exception
    {
-      for (int j = 0; j < 5000; j++)
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      sf.setProducerWindowSize(32 * 1024);
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(ADDRESS, ADDRESS, null, false);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      final int numMessages = 100;
+
+      long start = System.currentTimeMillis();
+
+      for (int i = 0; i < numMessages; i++)
       {
-         log.info("Iteration " + j);
-         
-         ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-   
-         sf.setProducerWindowSize(32 * 1024);
-   
-         ClientSession session = sf.createSession(false, true, true);
-         
-         session.createQueue(ADDRESS, ADDRESS, null, false);
-         
-         ClientProducer producer = session.createProducer(ADDRESS);
-         
-         final int numMessages = 100;
-         
-         long start = System.currentTimeMillis();
-   
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
-                                                                false,
-                                                                0,
-                                                                System.currentTimeMillis(),
-                                                                (byte)1);
-            message.putIntProperty(new SimpleString("count"), i);
-            message.getBody().writeString("aardvarks");
-            producer.send(message);
-         }
-         
-         //Thread.sleep(500);
-         
-         ClientConsumer consumer = session.createConsumer(ADDRESS);
-         
-         log.info("sent messages");
-         
-         session.start();
-         
-         log.info("Started session");
-         
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage message2 = consumer.receive();
-   
-            assertEquals("aardvarks", message2.getBody().readString());
-            assertEquals(i, message2.getProperty(new SimpleString("count")));
-            
-            message2.acknowledge();
-         }
-         
-         long end = System.currentTimeMillis();
-         
-         log.info("That took " + (end - start));
-         
-         ClientMessage message3 = consumer.receive(250);
-   
-         assertNull(message3);
-   
-         session.close();
-         
-         tearDown();
-         
-         setUp();
+         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().writeString("aardvarks");
+         producer.send(message);
       }
+
+      // Thread.sleep(500);
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+      log.info("sent messages");
+
+      session.start();
+
+      log.info("Started session");
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         assertEquals("aardvarks", message2.getBody().readString());
+         assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+         message2.acknowledge();
+      }
+
+      long end = System.currentTimeMillis();
+
+      log.info("That took " + (end - start));
+
+      ClientMessage message3 = consumer.receive(250);
+
+      assertNull(message3);
+
+      session.close();
    }
-   
+
    public void testReplication2() throws Exception
    {
-      for (int j = 0; j < 5000; j++)
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      sf.setProducerWindowSize(32 * 1024);
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(ADDRESS, ADDRESS, null, false);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+      log.info("sent messages");
+
+      session.start();
+
+      // Thread.sleep(500);
+
+      final int numMessages = 1000;
+
+      long start = System.currentTimeMillis();
+
+      for (int i = 0; i < numMessages; i++)
       {
-         log.info("Iteration " + j);
-         
-         ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-   
-         sf.setProducerWindowSize(32 * 1024);
-   
-         ClientSession session = sf.createSession(false, true, true);
-         
-         session.createQueue(ADDRESS, ADDRESS, null, false);
-         
-         ClientProducer producer = session.createProducer(ADDRESS);
-         
-         ClientConsumer consumer = session.createConsumer(ADDRESS);
-         
-         log.info("sent messages");
-         
-         session.start();
-         
-         //Thread.sleep(500);
-         
-         final int numMessages = 1000;
-         
-         long start = System.currentTimeMillis();
-   
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
-                                                                false,
-                                                                0,
-                                                                System.currentTimeMillis(),
-                                                                (byte)1);
-            message.putIntProperty(new SimpleString("count"), i);
-            message.getBody().writeString("aardvarks");
-            producer.send(message);
-         }
-         
-         //Thread.sleep(500);
-                          
-         log.info("Started session");
-         
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage message2 = consumer.receive();
-   
-            assertEquals("aardvarks", message2.getBody().readString());
-            assertEquals(i, message2.getProperty(new SimpleString("count")));
-            
-            message2.acknowledge();
-         }
-         
-         long end = System.currentTimeMillis();
-         
-         log.info("That took " + (end - start));
-         
-         ClientMessage message3 = consumer.receive(250);
-   
-         assertNull(message3);
-   
-         session.close();
-         
-         tearDown();
-         
-         setUp();
+         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().writeString("aardvarks");
+         producer.send(message);
       }
+
+      // Thread.sleep(500);
+
+      log.info("Started session");
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         assertEquals("aardvarks", message2.getBody().readString());
+         assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+         message2.acknowledge();
+      }
+
+      long end = System.currentTimeMillis();
+
+      log.info("That took " + (end - start));
+
+      ClientMessage message3 = consumer.receive(250);
+
+      assertNull(message3);
+
+      session.close();
    }
 
    public void testFailoverSameConnectionFactory() throws Exception
@@ -248,7 +230,7 @@
          message.getBody().writeString("aardvarks");
          producer.send(message);
       }
-      
+
       RemotingConnection conn1 = ((ClientSessionImpl)session).getConnection();
 
       // Simulate failure on connection
@@ -684,8 +666,7 @@
       ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
                                                                      new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
                                                                                                 backupParams));
-      
-      
+
       sf.setFailoverOnServerShutdown(true);
       sf.setRetryInterval(retryInterval);
       sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -780,17 +761,14 @@
 
    public void testFailoverOnCreateSession() throws Exception
    {
-      stopServers();
-
       for (int j = 0; j < 10; j++)
       {
-         startServers();
-
+         log.info("Iteration " + j);
+         
          ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
                                                                         new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
                                                                                                    backupParams));
-         
-         
+
          sf.setFailoverOnServerShutdown(true);
          sf.setRetryInterval(100);
          sf.setRetryIntervalMultiplier(1);
@@ -854,8 +832,10 @@
          assertEquals(0, sf.numConnections());
 
          sf.close();
-
-         stopServers();
+         
+         tearDown();
+         
+         setUp();
       }
    }
 
@@ -952,16 +932,16 @@
       // We fail on the replicating connection and the client connection
 
       MessagingException me = new MessagingException(MessagingException.NOT_CONNECTED);
-      
-      //Note we call the remoting service impl handler which is what would happen in event
-      //of real connection failure
-      
+
+      // Note we call the remoting service impl handler which is what would happen in event
+      // of real connection failure
+
       RemotingConnection serverSideReplicatingConnection = backupService.getRemotingService()
                                                                         .getServerSideReplicatingConnection();
-      
-            
-      ((ConnectionLifeCycleListener)backupService.getRemotingService()).connectionException(serverSideReplicatingConnection.getID(), me);
 
+      ((ConnectionLifeCycleListener)backupService.getRemotingService()).connectionException(serverSideReplicatingConnection.getID(),
+                                                                                            me);
+
       conn1.fail(new MessagingException(MessagingException.NOT_CONNECTED));
 
       ClientConsumer consumer = session.createConsumer(ADDRESS);
@@ -1046,6 +1026,10 @@
       {
          backupService.stop();
       }
+      else
+      {
+         log.info("*** not stopping backup server since not started");
+      }
 
       if (liveService.isStarted())
       {




More information about the jboss-cvs-commits mailing list