[jboss-cvs] JBoss Messaging SVN: r5062 - in trunk: src/main/org/jboss/messaging/core/remoting and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 3 08:24:02 EDT 2008


Author: timfox
Date: 2008-10-03 08:24:02 -0400 (Fri, 03 Oct 2008)
New Revision: 5062

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleManualFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SystematicFailoverTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
   trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java
   trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
More failover and session replication malarkey


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-10-03 12:24:02 UTC (rev 5062)
@@ -474,7 +474,7 @@
                                                    autoCommitSends,
                                                    autoCommitAcks);
 
-         Channel channel1 = connection.getChannel(1, false, -1, true);
+         Channel channel1 = connection.getChannel(1, false, -1, false, true);
 
          try
          {
@@ -499,9 +499,16 @@
 
          CreateSessionResponseMessage response = (CreateSessionResponseMessage)pResponse;
 
+         int packetConfirmationBatchSize = response.getPacketConfirmationBatchSize();
+         
+         //TODO - don't need packet confirmationbatch size on the client side - it's only really
+         //needed on the server side
+         //since confirmations are only sent from server to client
+         
          Channel sessionChannel = connection.getChannel(sessionChannelID,
                                                         false,
-                                                        response.getPacketConfirmationBatchSize(),
+                                                        -1,
+                                                        packetConfirmationBatchSize != -1,
                                                         !hasBackup);
 
          ClientSessionInternal session = new ClientSessionImpl(this,

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-03 12:24:02 UTC (rev 5062)
@@ -735,9 +735,9 @@
 
          remotingConnection = backupConnection;
 
-         Packet request = new ReattachSessionMessage(name, channel.getLastReceivedCommandID());
+         Packet request = new ReattachSessionMessage(name);
 
-         Channel channel1 = backupConnection.getChannel(1, false, -1, true);
+         Channel channel1 = backupConnection.getChannel(1, false, -1, false, true);
 
          ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Channel.java	2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Channel.java	2008-10-03 12:24:02 UTC (rev 5062)
@@ -42,7 +42,7 @@
 
    void transferConnection(RemotingConnection newConnection);
 
-   int replayCommands(int lastReceivedCommandID);
+   void replayCommands(int lastReceivedCommandID);
 
    int getLastReceivedCommandID();
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2008-10-03 12:24:02 UTC (rev 5062)
@@ -26,14 +26,11 @@
 {
    Object getID();
 
-   Channel getChannel(long channelID, boolean ordered, int packetConfirmationBatchSize, boolean interruptBlockOnFailure);
+   Channel getChannel(long channelID, boolean ordered, int packetConfirmationBatchSize,
+                      boolean hasResendCache, boolean interruptBlockOnFailure);
 
    long generateChannelID();
 
-   public void setReplicating(boolean backup);
-
-   boolean isReplicating();
-
    void addFailureListener(FailureListener listener);
 
    boolean removeFailureListener(FailureListener listener);
@@ -51,4 +48,6 @@
    void syncIDGeneratorSequence(long id);
 
    long getIDGeneratorSequence();
+   
+   void activate();
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java	2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java	2008-10-03 12:24:02 UTC (rev 5062)
@@ -103,7 +103,7 @@
                                                                     pingExecutor,
                                                                     null,
                                                                     null,
-                                                                    true);
+                                                                    true);        
 
          handler.conn = connection;
 
@@ -146,7 +146,7 @@
                                                                  pingExecutor,
                                                                  null,
                                                                  null,
-                                                                 true);
+                                                                 true);        
 
       handler.conn = connection;
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-03 12:24:02 UTC (rev 5062)
@@ -203,12 +203,8 @@
 
    private final RemotingConnection replicatingConnection;
 
-   private volatile boolean replicating;
+   private volatile boolean active;
 
-   private final boolean client;
-
-   private boolean writePackets;
-
    private final long pingPeriod;
 
    private final ScheduledExecutorService pingExecutor;
@@ -230,7 +226,7 @@
                                  final ScheduledExecutorService pingExecutor,
                                  final List<Interceptor> interceptors,
                                  final RemotingConnection replicatingConnection,
-                                 final boolean client)
+                                 final boolean active)
 
    {
       this.transportConnection = transportConnection;
@@ -250,16 +246,14 @@
 
       this.replicatingConnection = replicatingConnection;
 
-      this.client = client;
-
-      writePackets = client; // Gets changed when setReplicating is called
-
+      this.active = active;
+      
       this.pingPeriod = pingPeriod;
 
       this.pingExecutor = pingExecutor;
 
       // Channel zero is reserved for pinging
-      pingChannel = getChannel(0, false, -1, false);
+      pingChannel = getChannel(0, false, -1, false, false);
 
       final ChannelHandler ppHandler = new PingPongHandler();
 
@@ -293,13 +287,14 @@
    public synchronized Channel getChannel(final long channelID,
                                           final boolean ordered,
                                           final int packetConfirmationBatchSize,
+                                          final boolean hasResendCache,
                                           final boolean interruptBlockOnFailure)
    {
       ChannelImpl channel = channels.get(channelID);
 
       if (channel == null)
       {
-         channel = new ChannelImpl(this, channelID, ordered, packetConfirmationBatchSize, interruptBlockOnFailure);
+         channel = new ChannelImpl(this, channelID, ordered, packetConfirmationBatchSize, hasResendCache, interruptBlockOnFailure);
 
          channels.put(channelID, channel);
       }
@@ -307,19 +302,6 @@
       return channel;
    }
 
-   // This is a bit hacky - can we somehow do this in the constructor?
-   public void setReplicating(final boolean replicating)
-   {
-      this.replicating = replicating;
-
-      writePackets = client || !replicating;
-   }
-
-   public boolean isReplicating()
-   {
-      return replicating;
-   }
-
    public void addFailureListener(final FailureListener listener)
    {
       if (listener == null)
@@ -442,6 +424,11 @@
          }
       }
    }
+   
+   public void activate()
+   {
+      active = true;
+   }
 
    // Package protected
    // ----------------------------------------------------------------------------
@@ -844,11 +831,18 @@
       private volatile boolean closed;
 
       private final boolean interruptBlockOnFailure;
+      
+      private final Object waitLock = new Object();
 
+      private Thread blockThread;
+
+      private ResponseNotifier responseNotifier;
+
       private ChannelImpl(final RemotingConnectionImpl connection,
                           final long id,
                           final boolean ordered,
                           final int packetConfirmationBatchSize,
+                          final boolean hasResendCache,
                           final boolean interruptBlockOnFailure)
       {
          this.connection = connection;
@@ -863,10 +857,9 @@
          {
             executor = null;
          }
+         
 
-         this.packetConfirmationBatchSize = packetConfirmationBatchSize;
-
-         if (packetConfirmationBatchSize != -1 && ((connection.client && !connection.replicating) || (!connection.client && connection.replicatingConnection == null)))
+         if (hasResendCache)
          {
             resendCache = new ConcurrentLinkedQueue<Packet>();
 
@@ -879,12 +872,17 @@
 
          if (connection.replicatingConnection != null)
          {
-            replicatingChannel = connection.replicatingConnection.getChannel(id, ordered, -1, interruptBlockOnFailure);
+            //Don't want to send confirmations if replicating to backup
+            this.packetConfirmationBatchSize = -1;
+            
+            replicatingChannel = connection.replicatingConnection.getChannel(id, ordered, -1, false, interruptBlockOnFailure);
 
             replicatingChannel.setHandler(new ReplicatedPacketsConfirmedChannelHandler());
          }
          else
          {
+            this.packetConfirmationBatchSize = packetConfirmationBatchSize;
+            
             replicatingChannel = null;
          }
 
@@ -905,36 +903,27 @@
       // This must never called by more than one thread concurrently
       public void send(final Packet packet)
       {
-         synchronized (this)
+         if (connection.active || packet.isWriteAlways())
          {
-            packet.setChannelID(id);
-
-            lock.lock();
-            try
+            synchronized (this)
             {
-               if (resendCache != null)
+               packet.setChannelID(id);
+   
+               lock.lock();
+               try
                {
-                  addToCache(packet);
+                  addToCache(packet);                  
                }
+               finally
+               {
+                  lock.unlock();
+               }
+                  
+               connection.doWrite(packet);               
             }
-            finally
-            {
-               lock.unlock();
-            }
-
-            if (connection.writePackets || packet.isWriteAlways())
-            {
-               connection.doWrite(packet);
-            }
          }
       }
-
-      private final Object waitLock = new Object();
-
-      private Thread blockThread;
-
-      private ResponseNotifier responseNotifier;
-
+     
       public Executor getExecutor()
       {
          return executor;
@@ -967,10 +956,7 @@
                lock.lock();
                try
                {
-                  if (resendCache != null)
-                  {
-                     addToCache(packet);
-                  }
+                  addToCache(packet);                  
                }
                finally
                {
@@ -1149,7 +1135,7 @@
          }
       }
 
-      public int replayCommands(final int otherLastReceivedCommandID)
+      public void replayCommands(final int otherLastReceivedCommandID)
       {
          clearUpTo(otherLastReceivedCommandID);
 
@@ -1157,8 +1143,6 @@
          {
             connection.doWrite(packet);
          }
-
-         return lastReceivedCommandID;
       }
 
       public void lock()
@@ -1287,16 +1271,12 @@
          }
       }
 
-      private volatile Packet lastReceivedPacket;
-      
       private void checkConfirmation(final Packet packet)
       {
-         if (resendCache != null && packet.isRequiresConfirmations())
+         if (packet.isRequiresConfirmations() && packetConfirmationBatchSize != -1)
          {
             lastReceivedCommandID++;
             
-            lastReceivedPacket = packet;
-
             if (lastReceivedCommandID == nextConfirmation)
             {
                final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
@@ -1308,12 +1288,14 @@
                connection.doWrite(confirmed);
             }
          }
-
       }
 
       private void addToCache(final Packet packet)
       {
-         resendCache.add(packet);
+         if (resendCache != null)
+         {
+            resendCache.add(packet);
+         }
       }
 
       private void clearUpTo(final int lastReceivedCommandID)
@@ -1331,8 +1313,7 @@
 
             if (packet == null)
             {
-               throw new IllegalStateException("Can't find packet to clear, client: " + connection.client + 
-                                               " replicating: " + connection.replicating +
+               throw new IllegalStateException("Can't find packet to clear: " +                                             
                                                " last received command id " + lastReceivedCommandID +
                                                " first stored command id " + firstStoredCommandID + 
                                                " channel id " + id);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java	2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java	2008-10-03 12:24:02 UTC (rev 5062)
@@ -239,12 +239,10 @@
                                                          null,
                                                          interceptors,
                                                          replicatingConnection,
-                                                         false);
+                                                         !backup);
 
-      rc.setReplicating(backup);
+      Channel channel1 = rc.getChannel(1, false, -1, false, true);
 
-      Channel channel1 = rc.getChannel(1, false, -1, true);
-
       ChannelHandler handler = new MessagingServerPacketHandler(server, channel1, rc);
 
       channel1.setHandler(handler);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java	2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java	2008-10-03 12:24:02 UTC (rev 5062)
@@ -39,19 +39,15 @@
 
    private String name;
    
-   private int lastReceivedCommandID;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public ReattachSessionMessage(final String name, final int lastReceivedCommandID)
+   public ReattachSessionMessage(final String name)
    {
       super(REATTACH_SESSION);
 
       this.name = name;
-       
-      this.lastReceivedCommandID = lastReceivedCommandID;
    }
    
    public ReattachSessionMessage()
@@ -66,21 +62,14 @@
       return name;
    }
    
-   public int getLastReceivedCommandID()
-   {
-      return lastReceivedCommandID;
-   }
-   
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.putString(name);
-      buffer.putInt(lastReceivedCommandID);
    }
    
    public void decodeBody(final MessagingBuffer buffer)
    {
       name = buffer.getString();
-      lastReceivedCommandID = buffer.getInt();
    }
 
    public boolean equals(Object other)
@@ -92,8 +81,7 @@
             
       ReattachSessionMessage r = (ReattachSessionMessage)other;
       
-      return super.equals(other) && this.name.equals(r.name) &&
-             this.lastReceivedCommandID == r.lastReceivedCommandID;
+      return super.equals(other) && this.name.equals(r.name);
    }
    
    public final boolean isRequiresConfirmations()

Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-10-03 12:24:02 UTC (rev 5062)
@@ -61,7 +61,7 @@
 
    Version getVersion();
 
-   ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
+   ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name) throws Exception;
 
    CreateSessionResponseMessage createSession(String name,
                                               long channelID,

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-03 12:24:02 UTC (rev 5062)
@@ -125,10 +125,8 @@
    
    void browserReset(long browserID) throws Exception;
    
-   void transferConnection(RemotingConnection newConnection);
+   int transferConnection(RemotingConnection newConnection);
    
-   int replayCommands(int lastReceivedCommandID);
-
    void handleManagementMessage(SessionSendManagementMessage message) throws Exception;
    
    void failedOver() throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-10-03 12:24:02 UTC (rev 5062)
@@ -384,8 +384,7 @@
    }
 
    public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
-                                                         final String name,
-                                                         final int lastReceivedCommandID) throws Exception
+                                                         final String name) throws Exception
    {
       ServerSession session = sessions.get(name);
 
@@ -395,25 +394,16 @@
       }
       
       // Reconnect the channel to the new connection
-      session.transferConnection(connection);
+      int serverLastReceivedCommandID = session.transferConnection(connection);
       
-      // This is necessary for invm since the replicating connection will be the
-      // same connection
-      // as the original replicating connection since the key is the same in the
-      // registry, and that connection
-      // won't have any resend buffer etc
-      connection.setReplicating(false);
+      connection.activate();
 
-      int serverLastReceivedCommandID = session.replayCommands(lastReceivedCommandID);
-
       postOffice.activate();
 
       configuration.setBackup(false);
 
       remotingService.setBackup(false);
 
-      connection.setReplicating(false);
-
       session.failedOver();
 
       return new ReattachSessionResponseMessage(serverLastReceivedCommandID);
@@ -448,7 +438,9 @@
 
       securityStore.authenticate(username, password);
 
-      Channel channel = connection.getChannel(channelID, true, configuration.getPacketConfirmationBatchSize(), false);
+      //Server side connections never have a resend cache
+      
+      Channel channel = connection.getChannel(channelID, true, configuration.getPacketConfirmationBatchSize(), false, false);
 
       final ServerSessionImpl session = new ServerSessionImpl(name,
                                                               channelID,
@@ -505,10 +497,7 @@
          RemotingConnection replicatingConnection = ConnectionRegistryImpl.instance.getConnectionNoCache(backupConnectorFactory,
                                                                                                          backupConnectorParams,
                                                                                                          -1,
-                                                                                                         30000);
-
-         replicatingConnection.setReplicating(true);
-
+                                                                                                         30000); 
          return replicatingConnection;
       }
       else

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-10-03 12:24:02 UTC (rev 5062)
@@ -87,7 +87,7 @@
             {
                ReattachSessionMessage request = (ReattachSessionMessage)packet;
    
-               response = server.reattachSession(connection, request.getName(), request.getLastReceivedCommandID());
+               response = server.reattachSession(connection, request.getName());
                
                break;
             }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-03 10:06:40 UTC (rev 5061)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-03 12:24:02 UTC (rev 5062)
@@ -1075,7 +1075,7 @@
       producers.get(producerID).send(message);
    }
 
-   public void transferConnection(final RemotingConnection newConnection)
+   public int transferConnection(final RemotingConnection newConnection)
    {
       remotingConnection.removeFailureListener(this);
 
@@ -1089,13 +1089,14 @@
       remotingConnection = newConnection;
 
       remotingConnection.addFailureListener(this);
+      
+      int lastReceivedCommandID =  channel.getLastReceivedCommandID();
+     
+      //TODO resend any dup responses
+      
+      return lastReceivedCommandID;
    }
 
-   public int replayCommands(final int lastReceivedCommandID)
-   {
-      return channel.replayCommands(lastReceivedCommandID);
-   }
-
    public void handleManagementMessage(final SessionSendManagementMessage message) throws Exception
    {
       ServerMessage serverMessage = message.getServerMessage();

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java	2008-10-03 12:24:02 UTC (rev 5062)
@@ -0,0 +1,1300 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ConnectionRegistryImpl;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A RandomFailoverTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ */
+public class RandomFailoverTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(SimpleAutomaticFailoverTest.class);
+
+   // Constants -----------------------------------------------------
+   
+   private static final int RECEIVE_TIMEOUT = 5000;
+
+   // Attributes ----------------------------------------------------
+
+   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   private MessagingService liveService;
+
+   private MessagingService backupService;
+
+   private final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   //private volatile Thread failThread;
+   
+   private Timer timer = new Timer();
+   
+   private volatile Failer failer;
+   
+   private void startFailer(final long time)
+   {      
+      failer = new Failer();
+      
+      timer.schedule(failer, (long)(time * Math.random()), Long.MAX_VALUE);
+   }
+
+   private static class Failer extends TimerTask
+   {
+      volatile ClientSession session;
+                    
+      public void run()
+      {
+         if (session != null)
+         {
+            log.info("** Failing connection");
+            
+            RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+            
+            conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+            
+            log.info("** Fail complete");
+            
+            session = null;
+            
+            cancel();
+         }
+      }
+   }
+   
+   public void testFailureA() throws Exception
+   {
+      for (int its = 0; its < 1000; its++)
+      {      
+         start();                  
+                           
+         startFailer(3000);
+         
+         ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                                new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                           backupParams));
+         
+         do
+         {         
+            testA(sf);
+         }
+         while (failer.session != null);
+               
+         stop();
+      }
+   }
+   
+   public void testA(final ClientSessionFactory sf) throws Exception
+   {      
+      long start = System.currentTimeMillis();
+                        
+      ClientSession s = sf.createSession(false, false, false, false);
+      
+      failer.session = s;
+                              
+      final int numMessages = 100;
+
+      final int numSessions = 50;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+                 
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+         
+         ClientSession sessConsume = sf.createSession(false, true, true, false);
+         
+         sessConsume.start();
+         
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+         
+         sessions.add(sessConsume);
+      }
+      
+      ClientSession sessSend = sf.createSession(false, true, true, false);         
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().flip();
+         producer.send(message);
+      }
+      
+     // log.info("sent messages");
+            
+      class MyHandler implements MessageHandler
+      {
+         final CountDownLatch latch = new CountDownLatch(1);
+
+         volatile int count;
+
+         public void onMessage(ClientMessage message)
+         {
+            if (count == numMessages)
+            {
+               fail("Too many messages");
+            }
+            
+            assertEquals(count, message.getProperty(new SimpleString("count")));
+
+            count++;
+
+            if (count == numMessages)
+            {
+               latch.countDown();
+            }
+         }
+      }
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         assertTrue(ok);
+      }
+
+      sessSend.close();
+      for (ClientSession session: sessions)
+      {
+         session.close();
+      }
+      
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+                                 
+         s.deleteQueue(subName);
+      }
+      
+      s.close();
+                         
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+        
+   }
+   
+   public void testB() throws Exception
+   {
+      start();
+
+      long start = System.currentTimeMillis();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                        backupParams));
+
+      final int numMessages = 1000;
+
+      final int numSessions = 100;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+                 
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+         
+         ClientSession sessConsume = sf.createSession(false, true, true, false);
+         
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+         
+         sessions.add(sessConsume);
+      }
+      
+      ClientSession sessSend = sf.createSession(false, true, true, false);         
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().flip();
+         producer.send(message);
+      }
+      
+      for (ClientSession session: sessions)
+      {
+         session.start();
+      }
+            
+      class MyHandler implements MessageHandler
+      {
+         final CountDownLatch latch = new CountDownLatch(1);
+
+         volatile int count;
+
+         public void onMessage(ClientMessage message)
+         {
+            if (count == numMessages)
+            {
+               fail("Too many messages");
+            }
+            
+            assertEquals(count, message.getProperty(new SimpleString("count")));
+
+            count++;
+
+            if (count == numMessages)
+            {
+               latch.countDown();
+            }
+         }
+      }
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         assertTrue(ok);
+      }
+
+      sessSend.close();
+      
+      for (ClientSession session: sessions)
+      {
+         session.close();
+      }
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+
+      // this.waitForFailThread();
+
+      stop();
+   }
+   
+   public void testC() throws Exception
+   {
+      start();
+
+      long start = System.currentTimeMillis();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                        backupParams));
+
+      final int numMessages = 1000;
+
+      final int numSessions = 100;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+                 
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+         
+         ClientSession sessConsume = sf.createSession(false, false, false, false);
+         
+         sessConsume.start();
+         
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+         
+         sessions.add(sessConsume);
+      }
+      
+      ClientSession sessSend = sf.createSession(false, true, true, false);         
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().flip();
+         producer.send(message);
+      }
+      
+      sessSend.rollback();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().flip();
+         producer.send(message);
+      }
+      
+      sessSend.commit();
+                  
+      class MyHandler implements MessageHandler
+      {
+         final CountDownLatch latch = new CountDownLatch(1);
+
+         volatile int count;
+
+         public void onMessage(ClientMessage message)
+         {
+            if (count == numMessages)
+            {
+               fail("Too many messages");
+            }
+            
+            assertEquals(count, message.getProperty(new SimpleString("count")));
+
+            count++;
+
+            if (count == numMessages)
+            {
+               latch.countDown();
+            }
+         }
+      }
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         assertTrue(ok);
+      }
+      
+      handlers.clear();
+      
+      //New handlers
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+      
+      for (ClientSession session: sessions)
+      {
+         session.rollback();
+      }
+      
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         assertTrue(ok);
+      }
+      
+      for (ClientSession session: sessions)
+      {
+         session.commit();
+      }
+
+      sessSend.close();
+      for (ClientSession session: sessions)
+      {
+         session.close();
+      }
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+
+      // this.waitForFailThread();
+
+      stop();
+   }
+   
+   public void testD() throws Exception
+   {
+      start();
+
+      long start = System.currentTimeMillis();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                        backupParams));
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+                 
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+         
+         ClientSession sessConsume = sf.createSession(false, false, false, false);
+         
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+         
+         sessions.add(sessConsume);
+      }
+      
+      ClientSession sessSend = sf.createSession(false, true, true, false);         
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().flip();
+         producer.send(message);
+      }
+      
+      sessSend.rollback();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().flip();
+         producer.send(message);
+      }
+      
+      sessSend.commit();
+      
+      for (ClientSession session: sessions)
+      {
+         session.start();
+      }
+                  
+      class MyHandler implements MessageHandler
+      {
+         final CountDownLatch latch = new CountDownLatch(1);
+
+         volatile int count;
+
+         public void onMessage(ClientMessage message)
+         {
+            if (count == numMessages)
+            {
+               fail("Too many messages");
+            }
+            
+            assertEquals(count, message.getProperty(new SimpleString("count")));
+
+            count++;
+
+            if (count == numMessages)
+            {
+               latch.countDown();
+            }
+         }
+      }
+
+      Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         assertTrue(ok);
+      }
+      
+      handlers.clear();
+      
+      //New handlers
+      for (ClientConsumer consumer : consumers)
+      {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+      
+      for (ClientSession session: sessions)
+      {
+         session.rollback();
+      }
+      
+      for (MyHandler handler : handlers)
+      {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         assertTrue(ok);
+      }
+      
+      for (ClientSession session: sessions)
+      {
+         session.commit();
+      }
+
+      sessSend.close();
+      for (ClientSession session: sessions)
+      {
+         session.close();
+      }
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+
+      // this.waitForFailThread();
+
+      stop();
+   }
+   
+   // Now with synchronous receive()
+   
+   
+   public void testE() throws Exception
+   {
+      start();
+
+      long start = System.currentTimeMillis();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                        backupParams));
+
+      final int numMessages = 1000;
+
+      final int numSessions = 100;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+                 
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+         
+         ClientSession sessConsume = sf.createSession(false, true, true, false);
+         
+         sessConsume.start();
+         
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+         
+         sessions.add(sessConsume);
+      }
+      
+      ClientSession sessSend = sf.createSession(false, true, true, false);         
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().flip();
+         producer.send(message);
+      }
+            
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+            
+            assertNotNull(msg);
+            
+            assertEquals(i, msg.getProperty(new SimpleString("count")));
+            
+            msg.processed();
+         }
+      }
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receiveImmediate();
+            
+            assertNull(msg);
+         }
+      }
+      
+      sessSend.close();
+      for (ClientSession session: sessions)
+      {
+         session.close();
+      }
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+
+      // this.waitForFailThread();
+
+      stop();
+   }
+   
+   public void testF() throws Exception
+   {
+      start();
+
+      long start = System.currentTimeMillis();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                        backupParams));
+
+      final int numMessages = 1000;
+
+      final int numSessions = 100;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+                 
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+         
+         ClientSession sessConsume = sf.createSession(false, true, true, false);
+         
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+         
+         sessions.add(sessConsume);
+      }
+      
+      ClientSession sessSend = sf.createSession(false, true, true, false);         
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().flip();
+         producer.send(message);
+      }
+      
+      for (ClientSession session: sessions)
+      {
+         session.start();
+      }
+            
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+            
+            assertNotNull(msg);
+            
+            assertEquals(i, msg.getProperty(new SimpleString("count")));
+            
+            msg.processed();
+         }
+      }
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receiveImmediate();
+            
+            assertNull(msg);
+         }
+      }
+      
+      sessSend.close();
+      for (ClientSession session: sessions)
+      {
+         session.close();
+      }
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+
+      // this.waitForFailThread();
+
+      stop();
+   }
+   
+   public void testG() throws Exception
+   {
+      start();
+
+      long start = System.currentTimeMillis();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                        backupParams));
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+                 
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+         
+         ClientSession sessConsume = sf.createSession(false, false, false, false);
+         
+         sessConsume.start();
+         
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+         
+         sessions.add(sessConsume);
+      }
+      
+      ClientSession sessSend = sf.createSession(false, false, false, false);         
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().flip();
+         producer.send(message);
+      }
+      
+      sessSend.rollback();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().flip();
+         producer.send(message);
+      }
+      
+      sessSend.commit();
+                  
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+            
+            assertNotNull(msg);
+            
+            assertEquals(i, msg.getProperty(new SimpleString("count")));
+            
+            msg.processed();
+         }
+      }
+                  
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receiveImmediate();
+            
+            assertNull(msg);
+         }
+      }
+      
+      for (ClientSession session: sessions)
+      {
+         session.rollback();
+      }
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+            
+            assertNotNull(msg);
+            
+            assertEquals(i, msg.getProperty(new SimpleString("count")));
+            
+            msg.processed();
+         }
+      }
+                  
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receiveImmediate();
+            
+            assertNull(msg);
+         }
+      }
+         
+      for (ClientSession session: sessions)
+      {
+         session.commit();
+      }
+      
+      sessSend.close();
+      for (ClientSession session: sessions)
+      {
+         session.close();
+      }
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+
+      // this.waitForFailThread();
+
+      stop();
+   }
+   
+      
+   public void testH() throws Exception
+   {
+      start();
+
+      long start = System.currentTimeMillis();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                        backupParams));
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+      Set<ClientSession> sessions = new HashSet<ClientSession>();
+                 
+      for (int i = 0; i < numSessions; i++)
+      {
+         SimpleString subName = new SimpleString("sub" + i);
+         
+         ClientSession sessConsume = sf.createSession(false, false, false, false);
+         
+         sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+         
+         sessions.add(sessConsume);
+      }
+      
+      ClientSession sessSend = sf.createSession(false, false, false, false);         
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().flip();
+         producer.send(message);
+      }
+      
+      sessSend.rollback();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().flip();
+         producer.send(message);
+      }
+      
+      sessSend.commit();
+      
+      for (ClientSession session: sessions)
+      {
+         session.start();
+      }
+                  
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+            
+            assertNotNull(msg);
+            
+            assertEquals(i, msg.getProperty(new SimpleString("count")));
+            
+            msg.processed();
+         }
+      }
+                  
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receiveImmediate();
+            
+            assertNull(msg);
+         }
+      }
+      
+      for (ClientSession session: sessions)
+      {
+         session.rollback();
+      }
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+            
+            assertNotNull(msg);
+            
+            assertEquals(i, msg.getProperty(new SimpleString("count")));
+            
+            msg.processed();
+         }
+      }
+                  
+      for (int i = 0; i < numMessages; i++)
+      {
+         for (ClientConsumer consumer : consumers)
+         {
+            ClientMessage msg = consumer.receiveImmediate();
+            
+            assertNull(msg);
+         }
+      }
+         
+      for (ClientSession session: sessions)
+      {
+         session.commit();
+      }
+      
+      sessSend.close();
+      for (ClientSession session: sessions)
+      {
+         session.close();
+      }
+
+      long end = System.currentTimeMillis();
+
+      log.info("duration " + (end - start));
+
+      // this.waitForFailThread();
+
+      stop();
+   }
+   
+   public void testI() throws Exception
+   {
+      start();
+
+      long start = System.currentTimeMillis();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                        backupParams));
+
+      final int numIts = 1000;
+            
+      for (int i = 0; i < numIts; i++)
+      {      
+         //log.info("iteration " + i);
+         ClientSession sessCreate = sf.createSession(false, true, true, false);
+         
+         sessCreate.createQueue(ADDRESS, ADDRESS, null, false, false);
+         
+         
+         
+         ClientSession sess = sf.createSession(false, true, true, false);
+         
+         sess.start();
+                  
+         ClientConsumer consumer = sess.createConsumer(ADDRESS);
+         
+         ClientProducer producer = sess.createProducer(ADDRESS);
+         
+         ClientMessage message = sess.createClientMessage(JBossTextMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.getBody().flip();
+         
+         producer.send(message);
+         
+         ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
+         
+         assertNotNull(message2);
+         
+         message2.processed();
+         
+         sess.close();
+         
+         sessCreate.deleteQueue(ADDRESS);
+         
+         sessCreate.close();
+                  
+      }
+
+      // this.waitForFailThread();
+
+      stop();
+   }
+   
+   public void testJ() throws Exception
+   {
+      start();
+
+      long start = System.currentTimeMillis();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                        backupParams));
+
+      ClientSession sessHold = sf.createSession(false, true, true, false);
+      
+      final int numIts = 1000;
+            
+      for (int i = 0; i < numIts; i++)
+      {      
+         //log.info("iteration " + i);
+         ClientSession sessCreate = sf.createSession(false, true, true, false);
+         
+         sessCreate.createQueue(ADDRESS, ADDRESS, null, false, false);
+         
+         
+         
+         ClientSession sess = sf.createSession(false, true, true, false);
+         
+         sess.start();
+                  
+         ClientConsumer consumer = sess.createConsumer(ADDRESS);
+         
+         ClientProducer producer = sess.createProducer(ADDRESS);
+         
+         ClientMessage message = sess.createClientMessage(JBossTextMessage.TYPE,
+                                                              false,
+                                                              0,
+                                                              System.currentTimeMillis(),
+                                                              (byte)1);
+         message.getBody().flip();
+         
+         producer.send(message);
+         
+         ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
+         
+         assertNotNull(message2);
+         
+         message2.processed();
+         
+         sess.close();
+         
+         sessCreate.deleteQueue(ADDRESS);
+         
+         sessCreate.close();
+                  
+      }
+      
+      sessHold.close();
+
+      // this.waitForFailThread();
+
+      stop();
+   }
+   
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   private void start() throws Exception
+   {
+      Configuration backupConf = new ConfigurationImpl();
+      backupConf.setSecurityEnabled(false);
+      backupConf.setPacketConfirmationBatchSize(10);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                backupParams));
+      backupConf.setBackup(true);
+      backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+      backupService.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+      liveConf.setSecurityEnabled(false);
+      liveConf.setPacketConfirmationBatchSize(10);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+      liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                          backupParams));
+      liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+      liveService.start();
+   }
+
+   private void stop() throws Exception
+   {
+      ConnectionRegistryImpl.instance.dump();
+
+      assertEquals(0, ConnectionRegistryImpl.instance.size());
+
+      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+      backupService.stop();
+
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      liveService.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleManualFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleManualFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleManualFailoverTest.java	2008-10-03 12:24:02 UTC (rev 5062)
@@ -0,0 +1,212 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ConnectionRegistryImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A SimpleAutomaticFailoverTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class SimpleManualFailoverTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(SimpleManualFailoverTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   private MessagingService server0Service;
+
+   private MessagingService server1Service;
+
+   private Map<String, Object> server1Params = new HashMap<String, Object>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   private ClientSession sendAndConsume(final ClientSessionFactory sf) throws Exception
+   {
+      ClientSession session = sf.createSession(false, true, true, false);
+
+      session.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().putString("aardvarks");
+         message.getBody().flip();
+         producer.send(message);
+      }
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+      session.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         assertEquals("aardvarks", message2.getBody().getString());
+
+         assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+         message2.processed();
+      }
+
+      ClientMessage message3 = consumer.receive(250);
+
+      assertNull(message3);
+      
+      return session;
+   }
+   
+   public void testFailover() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sendAndConsume(sf);
+
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      class MyListener implements FailureListener
+      {
+         public void connectionFailed(MessagingException me)
+         {
+            latch.countDown();
+         }
+      }
+
+      RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+      conn.addFailureListener(new MyListener());
+
+      // Simulate failure on connection
+      conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+      // Wait to be informed of failure
+
+      boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+
+      assertTrue(ok);
+
+      log.info("closing session");
+      session.close();
+      log.info("closed session");
+
+      sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                   server1Params));
+
+      session = sendAndConsume(sf);
+
+      session.close();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      Configuration server1Conf = new ConfigurationImpl();
+      server1Conf.setSecurityEnabled(false);
+      server1Conf.setPacketConfirmationBatchSize(1);
+      server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      server1Conf.getAcceptorConfigurations()
+                 .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                 server1Params));
+      server1Service = MessagingServiceImpl.newNullStorageMessagingServer(server1Conf);
+      server1Service.start();
+
+      Configuration server0Conf = new ConfigurationImpl();
+      server0Conf.setSecurityEnabled(false);
+      server0Conf.setPacketConfirmationBatchSize(1);
+      server0Conf.getAcceptorConfigurations()
+                 .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+      server0Service = MessagingServiceImpl.newNullStorageMessagingServer(server0Conf);
+      server0Service.start();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      assertEquals(0, ConnectionRegistryImpl.instance.size());
+
+      assertEquals(0, server1Service.getServer().getRemotingService().getConnections().size());
+
+      server1Service.stop();
+
+      assertEquals(0, server0Service.getServer().getRemotingService().getConnections().size());
+
+      server0Service.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SystematicFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SystematicFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SystematicFailoverTest.java	2008-10-03 12:24:02 UTC (rev 5062)
@@ -0,0 +1,574 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ConnectionRegistryImpl;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A SystematicFailoverTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ */
+public class SystematicFailoverTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(SimpleAutomaticFailoverTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   private MessagingService liveService;
+
+   private MessagingService backupService;
+
+   private final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   private volatile Thread failThread;
+
+   private void setFailAfterRandomTime(final RemotingConnection conn)
+   {
+      failThread = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               Thread.sleep((long)(300 * Math.random()));
+
+               log.info("Failing");
+               conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+            }
+            catch (InterruptedException ignore)
+            {
+            }
+         }
+      };
+
+      failThread.start();
+   }
+
+   private void waitForFailThread() throws Exception
+   {
+      if (failThread != null)
+      {
+         failThread.join();
+      }
+   }
+
+   public void testExerciseAPI() throws Exception
+   {
+      for (int its = 0; its < 1000; its++)
+      {         
+         start();
+
+         long start = System.currentTimeMillis();
+
+         ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                                new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                           backupParams));
+
+         // Non transacted
+         ClientSession session1 = sf.createSession(false, true, true, false);
+
+         RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session1).getConnection();
+
+         setFailAfterRandomTime(conn);
+
+         session1.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+         final int numMessages = 1000;
+
+         ClientProducer producer1 = session1.createProducer(ADDRESS);
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = session1.createClientMessage(JBossTextMessage.TYPE,
+                                                                 false,
+                                                                 0,
+                                                                 System.currentTimeMillis(),
+                                                                 (byte)1);
+            message.putIntProperty(new SimpleString("count"), i);
+            message.getBody().putString("aardvarks");
+            message.getBody().flip();
+            producer1.send(message);
+         }
+
+         ClientSession session2 = sf.createSession(false, true, true, false);
+
+         session2.start();
+
+         ClientConsumer consumer1 = session2.createConsumer(ADDRESS);
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = consumer1.receive();
+
+            assertEquals("aardvarks", message.getBody().getString());
+            assertEquals(i, message.getProperty(new SimpleString("count")));
+
+            message.processed();
+         }
+
+         producer1.close();
+
+         ClientProducer producer2 = session1.createProducer(ADDRESS);
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = session2.createClientMessage(JBossTextMessage.TYPE,
+                                                                 false,
+                                                                 0,
+                                                                 System.currentTimeMillis(),
+                                                                 (byte)1);
+            message.putIntProperty(new SimpleString("count"), i);
+            message.getBody().putString("aardvarks");
+            message.getBody().flip();
+            producer2.send(message);
+         }
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = consumer1.receive();
+
+            assertEquals("aardvarks", message.getBody().getString());
+            assertEquals(i, message.getProperty(new SimpleString("count")));
+
+            message.processed();
+         }
+
+         consumer1.close();
+
+         // Transacted
+         ClientSession session3 = sf.createSession(false, false, false, false);
+
+         ClientProducer producer3 = session3.createProducer(ADDRESS);
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = session3.createClientMessage(JBossTextMessage.TYPE,
+                                                                 false,
+                                                                 0,
+                                                                 System.currentTimeMillis(),
+                                                                 (byte)1);
+            message.putIntProperty(new SimpleString("count"), i);
+            message.getBody().putString("aardvarks");
+            message.getBody().flip();
+            producer3.send(message);
+
+            session3.rollback();
+
+            message = session3.createClientMessage(JBossTextMessage.TYPE, false, 0, System.currentTimeMillis(), (byte)1);
+            message.putIntProperty(new SimpleString("count"), i);
+            message.getBody().putString("aardvarks");
+            message.getBody().flip();
+            producer3.send(message);
+
+            session3.commit();
+         }
+
+         ClientConsumer consumer2 = session3.createConsumer(ADDRESS);
+
+         session3.start();
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = consumer2.receive();
+
+            assertEquals("aardvarks", message.getBody().getString());
+            assertEquals(i, message.getProperty(new SimpleString("count")));
+
+            message.processed();
+         }
+
+         session3.rollback();
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = consumer2.receive();
+
+            assertEquals("aardvarks", message.getBody().getString());
+            assertEquals(i, message.getProperty(new SimpleString("count")));
+
+            message.processed();
+         }
+
+         session3.commit();
+
+         session1.close();
+
+         session2.close();
+
+         final int numConsumers = 10;
+
+         Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+
+         final ClientSession session4 = sf.createSession(false, true, true, false);
+
+         session4.start();
+
+         for (int i = 0; i < numConsumers; i++)
+         {
+            SimpleString wibble = new SimpleString("wibble");
+
+            session4.addDestination(wibble, false, false);
+
+            session4.removeDestination(wibble, false);
+
+            SimpleString subName = new SimpleString("sub" + i);
+
+            session4.createQueue(ADDRESS, subName, null, false, false);
+
+            ClientConsumer consumer = session4.createConsumer(subName);
+
+            consumers.add(consumer);
+         }
+
+         ClientProducer producer4 = session4.createProducer(ADDRESS);
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = session4.createClientMessage(JBossTextMessage.TYPE,
+                                                                 false,
+                                                                 0,
+                                                                 System.currentTimeMillis(),
+                                                                 (byte)1);
+            message.putIntProperty(new SimpleString("count"), i);
+            message.getBody().putString("aardvarks");
+            message.getBody().flip();
+            producer4.send(message);
+         }
+
+         // Consume synchronously
+         for (ClientConsumer consumer : consumers)
+         {
+            for (int i = 0; i < numMessages; i++)
+            {
+               ClientMessage message = consumer.receive();
+
+               assertEquals("aardvarks", message.getBody().getString());
+               assertEquals(i, message.getProperty(new SimpleString("count")));
+
+               message.processed();
+            }
+         }
+
+         class MyHandler implements MessageHandler
+         {
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            volatile int count;
+
+            public void onMessage(ClientMessage message)
+            {
+               assertEquals(count, message.getProperty(new SimpleString("count")));
+
+               // log.info(this + " got message " + count);
+
+               try
+               {
+                  message.processed();
+               }
+               catch (MessagingException me)
+               {
+                  fail();
+               }
+
+               count++;
+
+               if (count == numMessages)
+               {
+                  latch.countDown();
+               }
+            }
+         }
+
+         Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+         for (ClientConsumer consumer : consumers)
+         {
+            MyHandler handler = new MyHandler();
+
+            consumer.setMessageHandler(handler);
+
+            handlers.add(handler);
+         }
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = session4.createClientMessage(JBossTextMessage.TYPE,
+                                                                 false,
+                                                                 0,
+                                                                 System.currentTimeMillis(),
+                                                                 (byte)1);
+            message.putIntProperty(new SimpleString("count"), i);
+            message.getBody().putString("aardvarks");
+            message.getBody().flip();
+            producer4.send(message);
+         }
+
+         for (MyHandler handler : handlers)
+         {
+            boolean ok = handler.latch.await(1000, TimeUnit.MILLISECONDS);
+
+            assertTrue(ok);
+         }
+
+         session3.close();
+
+         session4.close();
+
+         long end = System.currentTimeMillis();
+
+         log.info("iteration" + its + " duration " + (end - start));
+
+         this.waitForFailThread();
+
+         stop();
+      }
+   }
+
+   public void testReplicateWithHandlers() throws Exception
+   {
+      for (int its = 0; its < 100; its++)
+      {
+         log.info("Starting iteration " + its);
+
+         start();
+
+         long start = System.currentTimeMillis();
+
+         ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                                new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                           backupParams));
+
+         final int numMessages = 100;
+
+         final int numConsumers = 10;
+
+         Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+
+         ClientSession sessConsume = sf.createSession(false, true, true, false);
+         
+         sessConsume.start();
+         
+         for (int i = 0; i < numConsumers; i++)
+         {
+            SimpleString subName = new SimpleString("sub" + i);
+
+            sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+            ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+            consumers.add(consumer);
+         }
+         
+         ClientSession sessSend = sf.createSession(false, true, true, false);         
+
+         ClientProducer producer4 = sessSend.createProducer(ADDRESS);
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+                                                                false,
+                                                                0,
+                                                                System.currentTimeMillis(),
+                                                                (byte)1);
+            message.putIntProperty(new SimpleString("count"), i);
+            message.getBody().putString("aardvarks");
+            message.getBody().flip();
+            producer4.send(message);
+         }
+         
+         
+         class MyHandler implements MessageHandler
+         {
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            volatile int count;
+
+            public void onMessage(ClientMessage message)
+            {
+               //log.info("got message " + message.getMessageID());
+               
+               assertEquals(count, message.getProperty(new SimpleString("count")));
+
+               //log.info(this + " got message " + count);
+
+               count++;
+
+               if (count == numMessages)
+               {
+                  latch.countDown();
+               }
+            }
+         }
+
+         Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+         for (ClientConsumer consumer : consumers)
+         {
+            MyHandler handler = new MyHandler();
+
+            consumer.setMessageHandler(handler);
+
+            handlers.add(handler);
+         }
+
+         for (MyHandler handler : handlers)
+         {
+            boolean ok = handler.latch.await(1000, TimeUnit.MILLISECONDS);
+
+            assertTrue(ok);
+         }
+
+         sessSend.close();
+         sessConsume.close();
+
+         long end = System.currentTimeMillis();
+
+         log.info("duration " + (end - start));
+
+         // this.waitForFailThread();
+
+         stop();
+      }
+   }
+
+   // public void testHangOnCreateSession(final byte type) throws Exception
+   // {
+   // for (int j = 0; j < 1000; j++)
+   // {
+   // start();
+   //
+   // ClientSessionFactory sf = new ClientSessionFactoryImpl(new
+   // TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+   // new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+   // backupParams));
+   //
+   // final int numSessions = 50;
+   //
+   // List<ClientSession> sessions = new ArrayList<ClientSession>();
+   //
+   // for (int i = 0; i < numSessions; i++)
+   // {
+   // ClientSession session = sf.createSession(false, true, true, -1, false);
+   //
+   // sessions.add(session);
+   //
+   // if (i == 0)
+   // {
+   // RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+   //
+   // //conn.failOnPacketType(type);
+   // }
+   // }
+   //
+   // for (ClientSession session : sessions)
+   // {
+   // session.close();
+   // }
+   //
+   // stop();
+   // }
+   //
+   // }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   private void start() throws Exception
+   {
+      Configuration backupConf = new ConfigurationImpl();
+      backupConf.setSecurityEnabled(false);
+      backupConf.setPacketConfirmationBatchSize(10);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                backupParams));
+      backupConf.setBackup(true);
+      backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+      backupService.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+      liveConf.setSecurityEnabled(false);
+      liveConf.setPacketConfirmationBatchSize(10);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+      liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                          backupParams));
+      liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+      liveService.start();
+   }
+
+   private void stop() throws Exception
+   {
+      ConnectionRegistryImpl.instance.dump();
+
+      assertEquals(0, ConnectionRegistryImpl.instance.size());
+
+      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+      backupService.stop();
+
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      liveService.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}




More information about the jboss-cvs-commits mailing list