[hornetq-commits] JBoss hornetq SVN: r10065 - in trunk: src/main/org/hornetq/core/protocol/core/impl and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Dec 21 15:02:00 EST 2010


Author: ataylor
Date: 2010-12-21 15:01:58 -0500 (Tue, 21 Dec 2010)
New Revision: 10065

Added:
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
   trunk/src/main/org/hornetq/core/remoting/server/RemotingService.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
Log:
session factory now deals with trying both live and backup so as to avoid starvation, also removed test for failoveronservershutdown as makes no sense when only 1 server can ever be live

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-12-21 20:01:58 UTC (rev 10065)
@@ -92,8 +92,6 @@
 
    private ConnectorFactory connectorFactory;
 
-   private Map<String, Object> transportParams;
-
    private final long callTimeout;
 
    private final long clientFailureCheckPeriod;
@@ -176,10 +174,8 @@
 
       connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
 
-      transportParams = connectorConfig.getParams();
+      checkTransportKeys(connectorFactory, connectorConfig.getParams());
 
-      checkTransportKeys(connectorFactory, transportParams);
-
       this.callTimeout = callTimeout;
 
       this.clientFailureCheckPeriod = clientFailureCheckPeriod;
@@ -209,31 +205,17 @@
    public void connect(int initialConnectAttempts, boolean failoverOnInitialConnection) throws HornetQException
    {
       // Get the connection
-
       getConnectionWithRetry(initialConnectAttempts);
 
-      if (connection == null && failoverOnInitialConnection)
+      if (connection == null)
       {
-         if (backupConfig != null)
+         StringBuffer msg = new StringBuffer("Unable to connect to server using configuration ").append(connectorConfig);
+         if(backupConfig != null)
          {
-            // Try and connect to the backup
-
-            log.warn("Server is not available to make initial connection to. Will try backup server instead.");
-
-            this.connectorConfig = backupConfig;
-
-            connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
-
-            transportParams = this.connectorConfig.getParams();
-
-            getConnectionWithRetry(initialConnectAttempts);
+            msg.append(" and backup configuration ").append(backupConfig);
          }
-      }
-
-      if (connection == null)
-      {
          throw new HornetQException(HornetQException.NOT_CONNECTED,
-                                    "Unable to connect to server using configuration " + connectorConfig);
+               msg.toString());
       }
 
    }
@@ -251,6 +233,11 @@
       }
    }
 
+   public Object getBackupConnector()
+   {
+      return backupConfig;
+   }
+
    public ClientSession createSession(final String username,
                                       final String password,
                                       final boolean xa,
@@ -524,25 +511,9 @@
          // It can then release the channel 1 lock, and retry (which will cause locking on failoverLock
          // until failover is complete
 
-         boolean serverShutdown = me.getCode() == HornetQException.DISCONNECTED;
 
-         // We will try to failover if there is a backup connector factory, but we don't do this if the server
-         // has been shutdown cleanly unless failoverOnServerShutdown is true
-         boolean attemptFailover = (backupConfig != null) && !serverShutdown;
-
-         boolean attemptReconnect;
-
-         if (attemptFailover)
+         if (reconnectAttempts != 0)
          {
-            attemptReconnect = false;
-         }
-         else
-         {
-            attemptReconnect = reconnectAttempts != 0;
-         }
-
-         if (attemptFailover || attemptReconnect)
-         {
             lockChannel1();
 
             final boolean needToInterrupt;
@@ -599,25 +570,8 @@
 
             connector = null;
 
-            if (attemptFailover)
-            {
-               // Now try failing over to backup
+            reconnectSessions(oldConnection, reconnectAttempts);
 
-               this.connectorConfig = backupConfig;
-
-               backupConfig = null;
-
-               connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
-
-               transportParams = connectorConfig.getParams();
-
-               reconnectSessions(oldConnection, reconnectAttempts == -1 ? -1 : reconnectAttempts + 1);
-            }
-            else
-            {
-               reconnectSessions(oldConnection, reconnectAttempts);
-            }
-
             oldConnection.destroy();
          }
          else
@@ -1011,7 +965,7 @@
          {
             DelegatingBufferHandler handler = new DelegatingBufferHandler();
 
-            connector = connectorFactory.createConnector(transportParams,
+            connector = connectorFactory.createConnector(connectorConfig.getParams(),
                                                          handler,
                                                          this,
                                                          closeExecutor,
@@ -1037,6 +991,45 @@
                   connector = null;
                }
             }
+            //if connection fails we can try the backup incase it has come live
+            if(connector == null && backupConfig != null)
+            {
+               ConnectorFactory backupConnectorFactory = instantiateConnectorFactory(backupConfig.getFactoryClassName());
+               connector = backupConnectorFactory.createConnector(backupConfig.getParams(),
+                                                         handler,
+                                                         this,
+                                                         closeExecutor,
+                                                         threadPool,
+                                                         scheduledThreadPool);
+               if (connector != null)
+               {
+                  connector.start();
+
+                  tc = connector.createConnection();
+
+                  if (tc == null)
+                  {
+                     try
+                     {
+                        connector.close();
+                     }
+                     catch (Throwable t)
+                     {
+                     }
+
+                     connector = null;
+                  }
+                  else
+                  {
+                     /*looks like the backup is now live, lets use that*/
+                     connectorConfig = backupConfig;
+
+                     backupConfig = null;
+
+                     connectorFactory = backupConnectorFactory;
+                  }
+               }
+            }
          }
          catch (Exception e)
          {
@@ -1224,8 +1217,7 @@
                      serverLocator.notifyNodeDown(msg.getNodeID().toString());
                   }
 
-                  conn.fail(new HornetQException(msg.isFailoverOnServerShutdown() ? HornetQException.NOT_CONNECTED
-                                                                                 : HornetQException.DISCONNECTED,
+                  conn.fail(new HornetQException(HornetQException.DISCONNECTED,
                                                  "The connection was disconnected because of server shutdown"));
 
                }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java	2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java	2010-12-21 20:01:58 UTC (rev 10065)
@@ -44,4 +44,6 @@
    TransportConfiguration getConnectorConfiguration();
 
    void setBackupConnector(TransportConfiguration live, TransportConfiguration backUp);
+
+   Object getBackupConnector();
 }

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-12-21 20:01:58 UTC (rev 10065)
@@ -324,7 +324,7 @@
       callClosingListeners();
    }
    
-   public void disconnect(boolean failoverOnServerShutdown)
+   public void disconnect()
    {
       Channel channel0 = getChannel(0, -1);
 
@@ -343,7 +343,7 @@
          channel.flushConfirmations();
       }
 
-      Packet disconnect = new DisconnectMessage(nodeID, failoverOnServerShutdown);
+      Packet disconnect = new DisconnectMessage(nodeID);
       channel0.sendAndFlush(disconnect);
    }
 

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java	2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java	2010-12-21 20:01:58 UTC (rev 10065)
@@ -31,20 +31,16 @@
    // Attributes ----------------------------------------------------
 
    private SimpleString nodeID;
-
-   private boolean failoverOnServerShutdown;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public DisconnectMessage(final SimpleString nodeID, boolean failoverOnServerShutdown)
+   public DisconnectMessage(final SimpleString nodeID)
    {
       super(PacketImpl.DISCONNECT);
 
       this.nodeID = nodeID;
-
-      this.failoverOnServerShutdown = failoverOnServerShutdown;
    }
 
    public DisconnectMessage()
@@ -59,24 +55,16 @@
       return nodeID;
    }
 
-   public boolean isFailoverOnServerShutdown()
-   {
-      return failoverOnServerShutdown;
-   }
-
    @Override
    public void encodeRest(final HornetQBuffer buffer)
    {
       buffer.writeNullableSimpleString(nodeID);
-
-      buffer.writeBoolean(failoverOnServerShutdown);
    }
 
    @Override
    public void decodeRest(final HornetQBuffer buffer)
    {
       nodeID = buffer.readNullableSimpleString();
-      failoverOnServerShutdown = buffer.readBoolean();
    }
 
    @Override

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-12-21 20:01:58 UTC (rev 10065)
@@ -194,7 +194,7 @@
       manager.cleanup(this);
    }
 
-   public void disconnect(boolean clientFailover)
+   public void disconnect()
    {
    }
 

Modified: trunk/src/main/org/hornetq/core/remoting/server/RemotingService.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/RemotingService.java	2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/remoting/server/RemotingService.java	2010-12-21 20:01:58 UTC (rev 10065)
@@ -45,6 +45,4 @@
    void freeze();
 
    RemotingConnection getServerSideReplicatingConnection();
-
-   void stop(boolean failoverOnServerShutdown) throws Exception;
 }

Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-12-21 20:01:58 UTC (rev 10065)
@@ -254,11 +254,6 @@
 
    public void stop() throws Exception
    {
-      stop(false);
-   }
-
-   public void stop(boolean failoverOnServerShutdown) throws Exception
-   {
       if (!started)
       {
          return;
@@ -283,7 +278,7 @@
       {
          RemotingConnection conn = entry.connection;
 
-         conn.disconnect(failoverOnServerShutdown);
+         conn.disconnect();
       }
 
       for (Acceptor acceptor : acceptors)

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-12-21 20:01:58 UTC (rev 10065)
@@ -217,6 +217,7 @@
       if(backupServerLocator != null)
       {
          backupServerLocator.close();
+         backupServerLocator = null;
       }
 
       for (ServerLocatorInternal clusterLocator : clusterLocators)

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-12-21 20:01:58 UTC (rev 10065)
@@ -626,7 +626,7 @@
       {
          System.out.println("HornetQServerImpl.stop");
       }
-      remotingService.stop(failoverOnServerShutdown);
+      remotingService.stop();
 
       synchronized (this)
       {

Modified: trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java	2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java	2010-12-21 20:01:58 UTC (rev 10065)
@@ -151,9 +151,8 @@
    
    /**
     * Disconnect the connection, closing all channels
-    * @param clientFailover
     */
-   void disconnect(boolean clientFailover);
+   void disconnect();
    
    /**
     * returns true if any data has been received since the last time this method was called.

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java	2010-12-21 20:01:58 UTC (rev 10065)
@@ -0,0 +1,323 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import junit.framework.Assert;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.server.impl.InVMNodeManager;
+import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.utils.ReusableLatch;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.com">Andy Taylor</a>
+ *         Date: Dec 21, 2010
+ *         Time: 12:04:16 PM
+ */
+public class FailBackAutoTest extends FailoverTestBase
+{
+   private ServerLocatorInternal locator;
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      locator = getServerLocator();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      if (locator != null)
+      {
+         try
+         {
+            locator.close();
+         }
+         catch (Exception e)
+         {
+            //
+         }
+      }
+      super.tearDown();
+   }
+
+   public void testAutoFailback() throws Exception
+   {
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnInitialConnection(true);
+      locator.setReconnectAttempts(-1);
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      ClientSession session = sendAndConsume(sf, true);
+
+      MyListener listener = new MyListener(latch);
+
+      session.addFailureListener(listener);
+
+      backupServer.stop();
+
+      liveServer.crash();
+
+      backupServer.start();
+
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      ClientMessage message = session.createMessage(true);
+
+      setBody(0, message);
+
+      producer.send(message);
+
+      session.removeFailureListener(listener);
+
+      final CountDownLatch latch2 = new CountDownLatch(1);
+
+      listener = new MyListener(latch2);
+
+      session.addFailureListener(listener);
+
+      liveServer.start();
+
+      assertTrue(latch2.await(5, TimeUnit.SECONDS));
+
+      message = session.createMessage(true);
+
+      setBody(1, message);
+
+      producer.send(message);
+
+      session.close();
+
+      sf.close();
+
+      Assert.assertEquals(0, sf.numSessions());
+
+      Assert.assertEquals(0, sf.numConnections());
+   }
+
+   public void testAutoFailbackThenFailover() throws Exception
+   {
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnInitialConnection(true);
+      locator.setReconnectAttempts(-1);
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      CountDownLatch latch = new CountDownLatch(1);
+
+      ClientSession session = sendAndConsume(sf, true);
+
+      MyListener listener = new MyListener(latch);
+
+      session.addFailureListener(listener);
+
+      backupServer.stop();
+
+      liveServer.crash();
+
+      backupServer.start();
+
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      ClientMessage message = session.createMessage(true);
+
+      setBody(0, message);
+
+      producer.send(message);
+
+      session.removeFailureListener(listener);
+
+      CountDownLatch latch2 = new CountDownLatch(1);
+
+      listener = new MyListener(latch2);
+
+      session.addFailureListener(listener);
+
+      liveServer.start();
+
+      assertTrue(latch2.await(5, TimeUnit.SECONDS));
+
+      message = session.createMessage(true);
+
+      setBody(1, message);
+
+      producer.send(message);
+
+      CountDownLatch latch3 = new CountDownLatch(1);
+
+      session.removeFailureListener(listener);
+
+      listener = new MyListener(latch3);
+
+      session.addFailureListener(listener);
+
+      waitForBackup(sf, 5);
+
+      liveServer.crash();
+
+      assertTrue(latch3.await(5, TimeUnit.SECONDS));
+
+      session.close();
+
+      sf.close();
+
+      Assert.assertEquals(0, sf.numSessions());
+
+      Assert.assertEquals(0, sf.numConnections());
+   }
+
+   protected void createConfigs() throws Exception
+   {
+      nodeManager = new InVMNodeManager();
+
+      backupConfig = super.createDefaultConfig();
+      backupConfig.getAcceptorConfigurations().clear();
+      backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
+      backupConfig.setSecurityEnabled(false);
+      backupConfig.setSharedStore(true);
+      backupConfig.setBackup(true);
+      backupConfig.setClustered(true);
+      TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
+      TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
+      backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+      backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
+      ArrayList<String> staticConnectors = new ArrayList<String>();
+      staticConnectors.add(liveConnector.getName());
+      ClusterConnectionConfiguration cccLive = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1,
+            staticConnectors, false);
+      backupConfig.getClusterConfigurations().add(cccLive);
+      backupServer = createBackupServer();
+
+      liveConfig = super.createDefaultConfig();
+      liveConfig.getAcceptorConfigurations().clear();
+      liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
+      liveConfig.setSecurityEnabled(false);
+      liveConfig.setSharedStore(true);
+      liveConfig.setClustered(true);
+      List<String> pairs = new ArrayList<String>();
+      pairs.add(backupConnector.getName());
+      ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
+            pairs, false);
+      liveConfig.getClusterConfigurations().add(ccc0);
+      liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+      liveConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
+      liveServer = createLiveServer();
+   }
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
+   {
+      return getInVMTransportAcceptorConfiguration(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
+   {
+      return getInVMConnectorTransportConfiguration(live);
+   }
+
+
+   private ClientSession sendAndConsume(final ClientSessionFactory sf, final boolean createQueue) throws Exception
+   {
+      ClientSession session = sf.createSession(false, true, true);
+
+      if (createQueue)
+      {
+         session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, false);
+      }
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
+               false,
+               0,
+               System.currentTimeMillis(),
+               (byte) 1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBodyBuffer().writeString("aardvarks");
+         producer.send(message);
+      }
+
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+
+      session.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         Assert.assertEquals("aardvarks", message2.getBodyBuffer().readString());
+
+         Assert.assertEquals(i, message2.getObjectProperty(new SimpleString("count")));
+
+         message2.acknowledge();
+      }
+
+      ClientMessage message3 = consumer.receiveImmediate();
+
+      Assert.assertNull(message3);
+
+      return session;
+   }
+
+   /**
+    * @param i
+    * @param message
+    * @throws Exception
+    */
+   protected void setBody(final int i, final ClientMessage message) throws Exception
+   {
+      message.getBodyBuffer().writeString("message" + i);
+   }
+
+   class MyListener implements SessionFailureListener
+   {
+      private final CountDownLatch latch;
+
+      public MyListener(CountDownLatch latch)
+      {
+         this.latch = latch;
+      }
+
+      public void connectionFailed(final HornetQException me, boolean failedOver)
+      {
+         latch.countDown();
+      }
+
+      public void beforeReconnect(HornetQException exception)
+      {
+         System.out.println("MyListener.beforeReconnect");
+      }
+   }
+}

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java	2010-12-21 20:01:58 UTC (rev 10065)
@@ -0,0 +1,287 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import junit.framework.Assert;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.server.impl.InVMNodeManager;
+import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.com">Andy Taylor</a>
+ *         Date: Dec 21, 2010
+ *         Time: 12:04:16 PM
+ */
+public class FailBackManualTest extends FailoverTestBase
+{
+   private ServerLocatorInternal locator;
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      locator = getServerLocator();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      if (locator != null)
+      {
+         try
+         {
+            locator.close();
+         }
+         catch (Exception e)
+         {
+            //
+         }
+      }
+      super.tearDown();
+   }
+
+
+   public void testNoAutoFailback() throws Exception
+   {
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnInitialConnection(true);
+      locator.setReconnectAttempts(-1);
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      ClientSession session = sendAndConsume(sf, true);
+
+      MyListener listener = new MyListener(latch);
+
+      session.addFailureListener(listener);
+
+      backupServer.stop();
+
+      liveServer.crash();
+
+      backupServer.start();
+
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      ClientMessage message = session.createMessage(true);
+
+      setBody(0, message);
+
+      producer.send(message);
+
+      session.removeFailureListener(listener);
+
+      final CountDownLatch latch2 = new CountDownLatch(1);
+
+      listener = new MyListener(latch2);
+
+      session.addFailureListener(listener);
+
+      liveConfig.setAllowAutoFailBack(false);
+
+      Thread t = new Thread(new ServerStarter(liveServer));
+
+      t.start();
+
+      waitForBackup(sf, 5);
+
+      assertTrue(backupServer.isStarted());
+
+      backupServer.stop();
+
+      assertTrue(latch2.await(5, TimeUnit.SECONDS));
+
+      message = session.createMessage(true);
+
+      setBody(1, message);
+
+      producer.send(message);
+
+      session.close();
+
+      sf.close();
+
+      Assert.assertEquals(0, sf.numSessions());
+
+      Assert.assertEquals(0, sf.numConnections());
+   }
+
+
+   protected void createConfigs() throws Exception
+   {
+      nodeManager = new InVMNodeManager();
+
+      backupConfig = super.createDefaultConfig();
+      backupConfig.getAcceptorConfigurations().clear();
+      backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
+      backupConfig.setSecurityEnabled(false);
+      backupConfig.setSharedStore(true);
+      backupConfig.setBackup(true);
+      backupConfig.setClustered(true);
+      TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
+      TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
+      backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+      backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
+      ArrayList<String> staticConnectors = new ArrayList<String>();
+      staticConnectors.add(liveConnector.getName());
+      ClusterConnectionConfiguration cccLive = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1,
+            staticConnectors, false);
+      backupConfig.getClusterConfigurations().add(cccLive);
+      backupConfig.setAllowAutoFailBack(false);
+      backupServer = createBackupServer();
+
+      liveConfig = super.createDefaultConfig();
+      liveConfig.getAcceptorConfigurations().clear();
+      liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
+      liveConfig.setSecurityEnabled(false);
+      liveConfig.setSharedStore(true);
+      liveConfig.setClustered(true);
+      List<String> pairs = new ArrayList<String>();
+      pairs.add(backupConnector.getName());
+      ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
+            pairs, false);
+      liveConfig.getClusterConfigurations().add(ccc0);
+      liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+      liveConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
+      liveConfig.setAllowAutoFailBack(false);
+      liveServer = createLiveServer();
+   }
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
+   {
+      return getInVMTransportAcceptorConfiguration(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
+   {
+      return getInVMConnectorTransportConfiguration(live);
+   }
+
+
+   private ClientSession sendAndConsume(final ClientSessionFactory sf, final boolean createQueue) throws Exception
+   {
+      ClientSession session = sf.createSession(false, true, true);
+
+      if (createQueue)
+      {
+         session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, false);
+      }
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
+               false,
+               0,
+               System.currentTimeMillis(),
+               (byte) 1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBodyBuffer().writeString("aardvarks");
+         producer.send(message);
+      }
+
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+
+      session.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         Assert.assertEquals("aardvarks", message2.getBodyBuffer().readString());
+
+         Assert.assertEquals(i, message2.getObjectProperty(new SimpleString("count")));
+
+         message2.acknowledge();
+      }
+
+      ClientMessage message3 = consumer.receiveImmediate();
+
+      Assert.assertNull(message3);
+
+      return session;
+   }
+
+   /**
+    * @param i
+    * @param message
+    * @throws Exception
+    */
+   protected void setBody(final int i, final ClientMessage message) throws Exception
+   {
+      message.getBodyBuffer().writeString("message" + i);
+   }
+
+   class MyListener implements SessionFailureListener
+   {
+      private final CountDownLatch latch;
+
+      public MyListener(CountDownLatch latch)
+      {
+         this.latch = latch;
+      }
+
+      public void connectionFailed(final HornetQException me, boolean failedOver)
+      {
+         latch.countDown();
+      }
+
+      public void beforeReconnect(HornetQException exception)
+      {
+         System.out.println("MyListener.beforeReconnect");
+      }
+   }
+
+   class ServerStarter implements Runnable
+   {
+      private final TestableServer server;
+
+      public ServerStarter(TestableServer server)
+      {
+         this.server = server;
+      }
+
+      public void run()
+      {
+         try
+         {
+            server.start();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+      }
+   }
+}

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2010-12-21 20:01:58 UTC (rev 10065)
@@ -19,6 +19,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
@@ -2114,6 +2115,245 @@
       Assert.assertEquals(0, sf.numConnections());
    }
 
+   public void testBackupServerNotRemoved() throws Exception
+   {
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnInitialConnection(true);
+      locator.setReconnectAttempts(-1);
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      class MyListener implements SessionFailureListener
+      {
+         public void connectionFailed(final HornetQException me, boolean failedOver)
+         {
+            latch.countDown();
+         }
+
+         public void beforeReconnect(HornetQException exception)
+         {
+            System.out.println("MyListener.beforeReconnect");
+         }
+      }
+
+      ClientSession session = sendAndConsume(sf, true);
+
+      session.addFailureListener(new MyListener());
+
+      backupServer.stop();
+
+      liveServer.crash();
+      
+      backupServer.start();
+
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      ClientMessage message = session.createMessage(true);
+
+      setBody(0, message);
+
+      producer.send(message);
+
+      session.close();
+
+      sf.close();
+      
+      Assert.assertEquals(0, sf.numSessions());
+
+      Assert.assertEquals(0, sf.numConnections());
+   }
+
+   public void testLiveAndBackupLiveComesBack() throws Exception
+   {
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnInitialConnection(true);
+      locator.setReconnectAttempts(-1);
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      class MyListener implements SessionFailureListener
+      {
+         public void connectionFailed(final HornetQException me, boolean failedOver)
+         {
+            latch.countDown();
+         }
+
+         public void beforeReconnect(HornetQException exception)
+         {
+            System.out.println("MyListener.beforeReconnect");
+         }
+      }
+
+      ClientSession session = sendAndConsume(sf, true);
+
+      session.addFailureListener(new MyListener());
+
+      backupServer.stop();
+
+      liveServer.crash();
+
+      liveServer.start();
+
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      ClientMessage message = session.createMessage(true);
+
+      setBody(0, message);
+
+      producer.send(message);
+
+      session.close();
+
+      sf.close();
+
+      Assert.assertEquals(0, sf.numSessions());
+
+      Assert.assertEquals(0, sf.numConnections());
+   }
+
+   public void testLiveAndBackupLiveComesBackNewFactory() throws Exception
+   {
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnInitialConnection(true);
+      locator.setReconnectAttempts(-1);
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      class MyListener implements SessionFailureListener
+      {
+         public void connectionFailed(final HornetQException me, boolean failedOver)
+         {
+            latch.countDown();
+         }
+
+         public void beforeReconnect(HornetQException exception)
+         {
+            System.out.println("MyListener.beforeReconnect");
+         }
+      }
+
+      ClientSession session = sendAndConsume(sf, true);
+
+      session.addFailureListener(new MyListener());
+
+      backupServer.stop();
+
+      liveServer.crash();
+
+      liveServer.start();
+
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      ClientMessage message = session.createMessage(true);
+
+      setBody(0, message);
+
+      producer.send(message);
+
+      session.close();
+
+      sf.close();
+
+      sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+
+      session = sf.createSession();
+
+      ClientConsumer cc = session.createConsumer(FailoverTestBase.ADDRESS);
+
+      session.start();
+      
+      ClientMessage cm = cc.receive(5000);
+
+      assertNotNull(cm);
+
+      Assert.assertEquals("message0", cm.getBodyBuffer().readString());
+
+      session.close();
+      
+      sf.close();
+
+      Assert.assertEquals(0, sf.numSessions());
+
+      Assert.assertEquals(0, sf.numConnections());
+   }
+
+   public void testLiveAndBackupBackupComesBackNewFactory() throws Exception
+   {
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnInitialConnection(true);
+      locator.setReconnectAttempts(-1);
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      class MyListener implements SessionFailureListener
+      {
+         public void connectionFailed(final HornetQException me, boolean failedOver)
+         {
+            latch.countDown();
+         }
+
+         public void beforeReconnect(HornetQException exception)
+         {
+            System.out.println("MyListener.beforeReconnect");
+         }
+      }
+
+      ClientSession session = sendAndConsume(sf, true);
+
+      session.addFailureListener(new MyListener());
+
+      backupServer.stop();
+
+      liveServer.crash();
+
+      backupServer.start();
+
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      ClientMessage message = session.createMessage(true);
+
+      setBody(0, message);
+
+      producer.send(message);
+
+      session.close();
+
+      sf.close();
+
+      sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+
+      session = sf.createSession();
+
+      ClientConsumer cc = session.createConsumer(FailoverTestBase.ADDRESS);
+
+      session.start();
+
+      ClientMessage cm = cc.receive(5000);
+
+      assertNotNull(cm);
+
+      Assert.assertEquals("message0", cm.getBodyBuffer().readString());
+
+      session.close();
+
+      sf.close();
+
+      Assert.assertEquals(0, sf.numSessions());
+
+      Assert.assertEquals(0, sf.numConnections());
+   }
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2010-12-21 20:01:58 UTC (rev 10065)
@@ -50,8 +50,6 @@
  * A FailoverTestBase
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
  */
 public abstract class FailoverTestBase extends ServiceTestBase
 {
@@ -68,7 +66,7 @@
    protected Configuration backupConfig;
 
    protected Configuration liveConfig;
-   
+
    protected NodeManager nodeManager;
 
    // Static --------------------------------------------------------
@@ -139,42 +137,19 @@
       ArrayList<String> staticConnectors = new ArrayList<String>();
       staticConnectors.add(liveConnector.getName());
       ClusterConnectionConfiguration cccLive = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1,
-               staticConnectors, false);
+            staticConnectors, false);
       backupConfig.getClusterConfigurations().add(cccLive);
       backupServer = createBackupServer();
-      
-      // FIXME
-      /*
-      server1Service.registerActivateCallback(new ActivateCallback()
-      {
-         
-         public void preActivate()
-         {
-            // To avoid two servers messing up with the same journal at any single point
 
-         }
-         
-         public void activated()
-         {
-            try
-            {
-               liveServer.getStorageManager().stop();
-            }
-            catch (Exception ignored)
-            {
-            }
-         }
-      });
-*/
       liveConfig = super.createDefaultConfig();
       liveConfig.getAcceptorConfigurations().clear();
       liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
       liveConfig.setSecurityEnabled(false);
       liveConfig.setSharedStore(true);
       liveConfig.setClustered(true);
-       List<String> pairs = null;
+      List<String> pairs = null;
       ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
-               pairs, false);
+            pairs, false);
       liveConfig.getClusterConfigurations().add(ccc0);
       liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
       liveServer = createLiveServer();
@@ -193,7 +168,7 @@
       config1.setSharedStore(false);
       config1.setBackup(true);
       backupServer = createBackupServer();
-      
+
       Configuration config0 = super.createDefaultConfig();
       config0.getAcceptorConfigurations().clear();
       config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
@@ -249,25 +224,51 @@
    }
 
    protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator, int topologyMembers)
-           throws Exception
-     {
-        ClientSessionFactoryInternal sf;
-        CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
+         throws Exception
+   {
+      ClientSessionFactoryInternal sf;
+      CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
 
-        locator.addClusterTopologyListener(new LatchClusterTopologyListener(countDownLatch));
+      locator.addClusterTopologyListener(new LatchClusterTopologyListener(countDownLatch));
 
-        sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+      sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
 
-        boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
-        assertTrue(ok);
-        return sf;
-     }
+      boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
+      assertTrue(ok);
+      return sf;
+   }
 
+   protected void waitForBackup(ClientSessionFactoryInternal sf, long seconds)
+         throws Exception
+   {
+      long time = System.currentTimeMillis();
+      long toWait = seconds * 1000;
+      while (sf.getBackupConnector() == null)
+      {
+         try
+         {
+            Thread.sleep(100);
+         }
+         catch (InterruptedException e)
+         {
+            //ignore
+         }
+         if (sf.getBackupConnector() != null)
+         {
+            break;
+         }
+         else if (System.currentTimeMillis() > (time + toWait))
+         {
+            fail("backup server never started");
+         }
+      }
+   }
+
    protected void waitForBackup(long seconds)
    {
       long time = System.currentTimeMillis();
       long toWait = seconds * 1000;
-      while(!backupServer.isInitialised())
+      while (!backupServer.isInitialised())
       {
          try
          {
@@ -277,20 +278,43 @@
          {
             //ignore
          }
-         if(backupServer.isInitialised())
+         if (backupServer.isInitialised())
          {
             break;
          }
-         else if(System.currentTimeMillis() > (time + toWait))
+         else if (System.currentTimeMillis() > (time + toWait))
          {
             fail("backup server never started");
          }
       }
-      System.out.println("FailoverTestBase.waitForNewLive");
    }
-   
-   
 
+   protected void waitForBackup(long seconds, TestableServer server)
+   {
+      long time = System.currentTimeMillis();
+      long toWait = seconds * 1000;
+      while (!server.isInitialised())
+      {
+         try
+         {
+            Thread.sleep(100);
+         }
+         catch (InterruptedException e)
+         {
+            //ignore
+         }
+         if (server.isInitialised())
+         {
+            break;
+         }
+         else if (System.currentTimeMillis() > (time + toWait))
+         {
+            fail("server never started");
+         }
+      }
+   }
+
+
    protected TransportConfiguration getInVMConnectorTransportConfiguration(final boolean live)
    {
       if (live)
@@ -334,10 +358,10 @@
          Map<String, Object> server1Params = new HashMap<String, Object>();
 
          server1Params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
-                           org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+               org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
 
          return new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory",
-                                           server1Params);
+               server1Params);
       }
    }
 
@@ -352,10 +376,10 @@
          Map<String, Object> server1Params = new HashMap<String, Object>();
 
          server1Params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
-                           org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+               org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
 
          return new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory",
-                                           server1Params);
+               server1Params);
       }
    }
 
@@ -373,7 +397,7 @@
    {
       liveServer.crash(sessions);
    }
-   
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
@@ -384,7 +408,7 @@
       {
       }
    }
-   
+
    class LatchClusterTopologyListener implements ClusterTopologyListener
    {
       final CountDownLatch latch;
@@ -400,12 +424,12 @@
 
       public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
       {
-         if(connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
+         if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
          {
             liveNode.add(connectorPair.a.getName());
             latch.countDown();
          }
-         if(connectorPair.b != null && !backupNode.contains(connectorPair.b.getName()))
+         if (connectorPair.b != null && !backupNode.contains(connectorPair.b.getName()))
          {
             backupNode.add(connectorPair.b.getName());
             latch.countDown();

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2010-12-21 20:01:58 UTC (rev 10065)
@@ -70,6 +70,18 @@
       super();
    }
 
+   @Override
+   public void testLiveAndBackupLiveComesBackNewFactory() throws Exception
+   {
+      Thread.sleep(1000);
+   }
+
+   @Override
+   public void testLiveAndBackupBackupComesBackNewFactory() throws Exception
+   {
+      Thread.sleep(1000);
+   }
+
    /**
     * @param i
     * @param message



More information about the hornetq-commits mailing list