[hornetq-commits] JBoss hornetq SVN: r9595 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/api/core and 7 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Aug 25 04:39:02 EDT 2010


Author: ataylor
Date: 2010-08-25 04:39:01 -0400 (Wed, 25 Aug 2010)
New Revision: 9595

Modified:
   branches/2_2_0_HA_Improvements/hornetq.ipr
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/TransportConfiguration.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
fixes for shared store backup

Modified: branches/2_2_0_HA_Improvements/hornetq.ipr
===================================================================
--- branches/2_2_0_HA_Improvements/hornetq.ipr	2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/hornetq.ipr	2010-08-25 08:39:01 UTC (rev 9595)
@@ -342,11 +342,8 @@
     <option name="SKIP_IMPORT_STATEMENTS" value="false" />
   </component>
   <component name="EclipseCompilerSettings">
-    <option name="DEBUGGING_INFO" value="true" />
     <option name="GENERATE_NO_WARNINGS" value="true" />
     <option name="DEPRECATION" value="false" />
-    <option name="ADDITIONAL_OPTIONS_STRING" value="" />
-    <option name="MAXIMUM_HEAP_SIZE" value="128" />
   </component>
   <component name="EclipseEmbeddedCompilerSettings">
     <option name="DEBUGGING_INFO" value="true" />
@@ -423,14 +420,6 @@
     <option name="LOCALE" />
     <option name="OPEN_IN_BROWSER" value="true" />
   </component>
-  <component name="JikesSettings">
-    <option name="JIKES_PATH" value="" />
-    <option name="DEBUGGING_INFO" value="true" />
-    <option name="DEPRECATION" value="true" />
-    <option name="GENERATE_NO_WARNINGS" value="false" />
-    <option name="IS_EMACS_ERRORS_MODE" value="true" />
-    <option name="ADDITIONAL_OPTIONS_STRING" value="" />
-  </component>
   <component name="LogConsolePreferences">
     <option name="FILTER_ERRORS" value="false" />
     <option name="FILTER_WARNINGS" value="false" />

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/TransportConfiguration.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/TransportConfiguration.java	2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/TransportConfiguration.java	2010-08-25 08:39:01 UTC (rev 9595)
@@ -167,13 +167,13 @@
 
       if (factoryClassName.equals(kother.factoryClassName))
       {
-         if (params == null)
+         if (params == null || params.isEmpty())
          {
-            return kother.params == null;
+            return kother.params == null || kother.params.isEmpty();
          }
          else
          {
-            if (kother.params == null)
+            if (kother.params == null || kother.params.isEmpty())
             {
                return false;
             }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java	2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java	2010-08-25 08:39:01 UTC (rev 9595)
@@ -657,4 +657,8 @@
    void close();
 
    boolean isHA();
+
+   void addClusterTopologyListener(ClusterTopologyListener listener);
+
+   void removeClusterTopologyListener(ClusterTopologyListener listener);
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-08-25 08:39:01 UTC (rev 9595)
@@ -39,13 +39,9 @@
    String getNodeID();
 
    void connect();
-   
-   void addClusterTopologyListener(ClusterTopologyListener listener);
-   
-   void removeClusterTopologyListener(ClusterTopologyListener listener);
-   
+
    void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
-   
+
    void notifyNodeDown(String nodeID);
 
    void setClusterConnection(boolean clusterConnection);

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java	2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java	2010-08-25 08:39:01 UTC (rev 9595)
@@ -38,16 +38,57 @@
     */
    private Map<String, TopologyMember> topology = new HashMap<String, TopologyMember>();
 
+   int nodes = 0;
+
    public synchronized boolean addMember(String nodeId, TopologyMember member)
    {
-      boolean replaced = topology.containsKey(nodeId);
-      topology.put(nodeId, member);
+      boolean replaced = false;
+      TopologyMember currentMember = topology.get(nodeId);
+      if(currentMember == null)
+      {
+         topology.put(nodeId, member);
+         replaced = true;
+         if(member.getConnector().a != null)
+         {
+            nodes++;
+         }
+         if(member.getConnector().b != null)
+         {
+            nodes++;
+         }
+      }
+      else
+      {
+         if(currentMember.getConnector().a == null && member.getConnector().a != null)
+         {
+            currentMember.getConnector().a =  member.getConnector().a;
+            replaced = true;
+            nodes++;
+         }
+         if(currentMember.getConnector().b == null && member.getConnector().b != null)
+         {
+            currentMember.getConnector().b =  member.getConnector().b;
+            replaced = true;
+            nodes++;
+         }
+      }
       return replaced;
    }
 
    public synchronized boolean removeMember(String nodeId)
    {
       TopologyMember member = topology.remove(nodeId);
+      if(member != null)
+      {
+         if(member.getConnector().a != null)
+         {
+            nodes--;
+         }
+         if(member.getConnector().b != null)
+         {
+            nodes--;
+         }
+      }
       return (member != null);
    }
 
@@ -77,7 +118,7 @@
 
    public int size()
    {
-      return topology.size();
+      return nodes;
    }
 
    public String describe()
@@ -94,5 +135,6 @@
    public void clear()
    {
       topology.clear();
+      nodes = 0;
    }
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java	2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java	2010-08-25 08:39:01 UTC (rev 9595)
@@ -113,7 +113,15 @@
       buffer.writeString(nodeID);
       if (!exit)
       {         
-         pair.a.encode(buffer);
+         if (pair.a != null)
+         {
+            buffer.writeBoolean(true);
+            pair.a.encode(buffer);
+         }
+         else
+         {
+            buffer.writeBoolean(false);
+         }
          if (pair.b != null)
          {
             buffer.writeBoolean(true);
@@ -134,9 +142,18 @@
       exit = buffer.readBoolean();
       nodeID = buffer.readString();
       if (!exit)
-      {         
-         TransportConfiguration a = new TransportConfiguration();
-         a.decode(buffer);
+      {
+         boolean hasLive = buffer.readBoolean();
+         TransportConfiguration a;
+         if(hasLive)
+         {
+            a = new TransportConfiguration();
+            a.decode(buffer);
+         }
+         else
+         {
+            a = null;
+         }
          boolean hasBackup = buffer.readBoolean();
          TransportConfiguration b;
          if (hasBackup)

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-08-25 08:39:01 UTC (rev 9595)
@@ -244,26 +244,22 @@
                                    boolean last,
                                    int distance)
    {
-      if (nodeID.equals(nodeUUID.toString()))
-      {
-         return;
-      }
-      
       boolean updated = topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
-
-      if (distance >= topology.size() || updated)
+      if(!updated)
       {
          return;
       }
-      
       for (ClusterTopologyListener listener : clientListeners)
       {
          listener.nodeUP(nodeID, connectorPair, last, distance);
       }
 
-      for (ClusterTopologyListener listener : clusterConnectionListeners)
+      if (distance < topology.size())
       {
-         listener.nodeUP(nodeID, connectorPair, last, distance);
+         for (ClusterTopologyListener listener : clusterConnectionListeners)
+         {
+            listener.nodeUP(nodeID, connectorPair, last, distance);
+         }
       }
    }
    

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java	2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java	2010-08-25 08:39:01 UTC (rev 9595)
@@ -13,9 +13,12 @@
 
 package org.hornetq.core.server.cluster.impl;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.WeakHashMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -40,10 +43,9 @@
 
    private final String directory;
    
-   private static Map<String, Lock> locks = new WeakHashMap<String, Lock>();
-   
-   private Lock lock;
-   
+   private final static Map<String, Semaphore> locks = new WeakHashMap<String, Semaphore>();
+
+   private Semaphore semaphore;
    /**
     * @param fileName
     * @param directory
@@ -56,15 +58,32 @@
       
       synchronized (locks)
       {
-         String key = directory + fileName;
+         String key = directory + "/" + fileName;
          
-         lock = locks.get(key);
+         semaphore = locks.get(key);
          
-         if (lock == null)
+         if (semaphore == null)
          {
-            lock = new ReentrantLock(true);
+            semaphore = new Semaphore(1, true);
             
-            locks.put(key, lock);
+            locks.put(key, semaphore);
+
+            File f = new File(directory, fileName);
+
+            try
+            {
+               f.createNewFile();
+            }
+            catch (IOException e)
+            {
+               e.printStackTrace();
+               throw new IllegalStateException(e);
+            }
+
+            if(!f.exists())
+            {
+               throw new IllegalStateException("unable to create " + directory + fileName);
+            }
          }
       }
    }
@@ -81,13 +100,38 @@
 
    public void lock() throws IOException
    {
-      lock.lock();
+      try
+      {
+         semaphore.acquire();
+      }
+      catch (InterruptedException e)
+      {
+         throw new IOException(e);
+      }
    }
 
    public boolean unlock() throws IOException
    {
-      lock.unlock();
+      semaphore.release();
       
       return true;
    }
+
+   public static void unlock(final String fileName, final String directory)
+   {
+      String key = directory + "/" + fileName;
+
+      Semaphore semaphore = locks.get(key);
+
+      semaphore.release();
+   }
+
+   public static void clearLocks()
+   {
+      for (Semaphore semaphore : locks.values())
+      {
+         semaphore.drainPermits();
+      }
+      locks.clear();
+   }
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-08-25 08:39:01 UTC (rev 9595)
@@ -557,7 +557,6 @@
             initialisePart1();
             
             //TODO TODO at this point the clustermanager needs to announce it's presence so the cluster can know about the backup
-            
             // We now look for the live.lock file - if it doesn't exist it means the live isn't started yet, so we wait
             // for that
             
@@ -574,17 +573,20 @@
 
                liveLock = createLockFile("live.lock", configuration.getJournalDirectory());
 
+
+              clusterManager.start();
+
                log.info("Live server is up - waiting for failover");
 
                liveLock.lock();
-
+               //todo check if we need this or not
                // We need to test if the file exists again, since the live might have shutdown
-               if (!liveLockFile.exists())
-               {
-                  liveLock.unlock();
+              // if (!liveLockFile.exists())
+              // {
+              //    liveLock.unlock();
 
-                  continue;
-               }
+              //    continue;
+             //  }
 
                log.info("Obtained live lock");
                
@@ -600,13 +602,13 @@
 
             initialisePart2();
 
-            log.info("Server is now live");
+            log.info("Back Up Server is now live");
 
             backupLock.unlock();
          }
          catch (InterruptedException e)
          {
-            // This can occur when closing if the thread is blocked - it's ok
+            System.out.println("HornetQServerImpl$SharedStoreBackupActivation.run");
          }
          catch (Exception e)
          {

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java	2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java	2010-08-25 08:39:01 UTC (rev 9595)
@@ -24,11 +24,7 @@
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientConsumer;
-import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.api.core.client.*;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.client.impl.DelegatingSession;
@@ -171,9 +167,9 @@
          for (int i = 0; i < numIts; i++)
          {
             AsynchronousFailoverTest.log.info("Iteration " + i);
+            ServerLocator locator = getServerLocator();
+            sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
 
-            sf = getSessionFactory();
-
             sf.getServerLocator().setBlockOnNonDurableSend(true);
             sf.getServerLocator().setBlockOnDurableSend(true);
 

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2010-08-25 08:39:01 UTC (rev 9595)
@@ -26,16 +26,13 @@
 
 import junit.framework.Assert;
 
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.*;
 import org.hornetq.api.core.client.*;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.impl.invm.TransportConstants;
+import org.hornetq.core.server.cluster.impl.FakeLockFile;
 import org.hornetq.core.transaction.impl.XidImpl;
 import org.hornetq.jms.client.HornetQTextMessage;
 import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -84,11 +81,17 @@
 
    public void testNonTransacted() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ClientSessionFactoryInternal sf;
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      ServerLocator locator = getServerLocator();
 
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
+
+      sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session = sf.createSession(true, true);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -146,6 +149,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -153,11 +158,15 @@
 
    public void testConsumeTransacted() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session = sf.createSession(false, false);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -242,6 +251,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -251,11 +262,15 @@
     *  and the servers should be able to connect without any problems. */
    public void testRestartServers() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session = sf.createSession(true, true);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -279,17 +294,14 @@
 
       session.close();
 
+      server1Service.stop();
       server0Service.stop();
-      server1Service.stop();
-
+      FakeLockFile.clearLocks();
       server1Service.start();
       server0Service.start();
 
-      sf = getSessionFactory();
+      sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
-
       session = sf.createSession(true, true);
 
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
@@ -311,6 +323,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -319,12 +333,16 @@
    // https://jira.jboss.org/jira/browse/HORNETQ-285
    public void testFailoverOnInitialConnection() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
-      sf.getServerLocator().setFailoverOnInitialConnection(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnInitialConnection(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       // Stop live server
 
       this.server0Service.stop();
@@ -367,6 +385,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -377,28 +397,32 @@
     * @param latch
     * @throws InterruptedException
     */
-   private void fail(final ClientSession session, final CountDownLatch latch) throws InterruptedException
+   private void fail(final ClientSession session, final CountDownLatch latch) throws Exception
    {
 
-      RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
+      //RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
 
       // Simulate failure on connection
-      conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
+      //conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+      server0Service.stop();
       // Wait to be informed of failure
 
-      boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+      boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);
 
       Assert.assertTrue(ok);
    }
 
    public void testTransactedMessagesSentSoRollback() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session = sf.createSession(false, false);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -456,6 +480,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -467,11 +493,15 @@
     */
    public void testTransactedMessagesSentSoRollbackAndContinueWork() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session = sf.createSession(false, false);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -541,6 +571,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -548,11 +580,15 @@
 
    public void testTransactedMessagesNotSentSoNoRollback() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session = sf.createSession(false, false);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -622,6 +658,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -629,11 +667,15 @@
 
    public void testTransactedMessagesWithConsumerStartedBeforeFailover() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session = sf.createSession(false, false);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -711,6 +753,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -718,11 +762,15 @@
 
    public void testTransactedMessagesConsumedSoRollback() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session1 = sf.createSession(false, false);
 
       session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -794,6 +842,8 @@
 
       session2.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -801,11 +851,15 @@
 
    public void testTransactedMessagesNotConsumedSoNoRollback() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session1 = sf.createSession(false, false);
 
       session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -889,6 +943,8 @@
 
       session2.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -896,11 +952,15 @@
 
    public void testXAMessagesSentSoRollbackOnEnd() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session = sf.createSession(true, false, false);
 
       Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -959,6 +1019,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -966,11 +1028,15 @@
 
    public void testXAMessagesSentSoRollbackOnPrepare() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session = sf.createSession(true, false, false);
 
       Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1031,6 +1097,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -1039,11 +1107,15 @@
    // This might happen if 1PC optimisation kicks in
    public void testXAMessagesSentSoRollbackOnCommit() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+     ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session = sf.createSession(true, false, false);
 
       Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1106,6 +1178,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -1113,11 +1187,15 @@
 
    public void testXAMessagesNotSentSoNoRollbackOnCommit() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session = sf.createSession(true, false, false);
 
       Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1195,6 +1273,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -1202,11 +1282,15 @@
 
    public void testXAMessagesConsumedSoRollbackOnEnd() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session1 = sf.createSession(false, false);
 
       session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1280,6 +1364,8 @@
 
       session2.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -1287,11 +1373,15 @@
 
    public void testXAMessagesConsumedSoRollbackOnPrepare() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session1 = sf.createSession(false, false);
 
       session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1376,6 +1466,8 @@
 
       session2.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -1384,11 +1476,14 @@
    // 1PC optimisation
    public void testXAMessagesConsumedSoRollbackOnCommit() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
       ClientSession session1 = sf.createSession(false, false);
 
       session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1466,6 +1561,8 @@
 
       session2.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -1473,8 +1570,10 @@
 
    public void testCreateNewFactoryAfterFailover() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session = sendAndConsume(sf, true);
 
       final CountDownLatch latch = new CountDownLatch(1);
@@ -1502,13 +1601,14 @@
 
       session.close();
 
-      ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
       sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
 
       session = sendAndConsume(sf, false);
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -1516,11 +1616,15 @@
 
    public void testFailoverMultipleSessionsWithConsumers() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       final int numSessions = 5;
 
       final int numConsumersPerSession = 5;
@@ -1620,6 +1724,8 @@
 
       sendSession.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -1630,11 +1736,14 @@
     */
    public void testFailWithBrowser() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
       ClientSession session = sf.createSession(true, true);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1703,6 +1812,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -1710,11 +1821,15 @@
 
    public void testFailThenReceiveMoreMessagesAfterFailover() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       ClientSession session = sf.createSession(true, true);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1785,6 +1900,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -1792,11 +1909,12 @@
 
    public void testFailThenReceiveMoreMessagesAfterFailover2() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
-      sf.getServerLocator().setBlockOnAcknowledge(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(true);
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
 
       ClientSession session = sf.createSession(true, true, 0);
 
@@ -1878,6 +1996,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -1905,11 +2025,14 @@
 
    private void testSimpleSendAfterFailover(final boolean durable, final boolean temporary) throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
-      sf.getServerLocator().setBlockOnAcknowledge(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
 
       ClientSession session = sf.createSession(true, true, 0);
 
@@ -1970,6 +2093,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -1977,8 +2102,10 @@
 
    public void testForceBlockingReturn() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+
       // Add an interceptor to delay the send method so we can get time to cause failover before it returns
 
       server0Service.getRemotingService().addInterceptor(new DelayInterceptor());
@@ -1986,6 +2113,8 @@
       sf.getServerLocator().setBlockOnNonDurableSend(true);
       sf.getServerLocator().setBlockOnDurableSend(true);
       sf.getServerLocator().setBlockOnAcknowledge(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
       final ClientSession session = sf.createSession(true, true, 0);
 
@@ -2043,6 +2172,8 @@
 
       session.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -2050,12 +2181,16 @@
 
    public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception
    {
-      final ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
-      sf.getServerLocator().setBlockOnAcknowledge(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
 
+      locator.setBlockOnAcknowledge(true);
+      final ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       final ClientSession session = sf.createSession(false, false);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -2203,6 +2338,8 @@
 
       session2.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -2210,11 +2347,14 @@
 
    public void testCommitDidNotOccurUnblockedAndResend() throws Exception
    {
-      ClientSessionFactoryInternal sf = getSessionFactory();
+      ServerLocator locator = getServerLocator();
 
-      sf.getServerLocator().setBlockOnNonDurableSend(true);
-      sf.getServerLocator().setBlockOnDurableSend(true);
-      sf.getServerLocator().setBlockOnAcknowledge(true);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(true);
+      locator.setFailoverOnServerShutdown(true);
+      locator.setReconnectAttempts(-1);
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
 
       final ClientSession session = sf.createSession(false, false);
 
@@ -2340,6 +2480,8 @@
 
       session2.close();
 
+      sf.close();
+
       Assert.assertEquals(0, sf.numSessions());
 
       Assert.assertEquals(0, sf.numConnections());
@@ -2452,4 +2594,6 @@
    }
 
    // Inner classes -------------------------------------------------
+
+
 }

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2010-08-25 08:39:01 UTC (rev 9595)
@@ -13,24 +13,33 @@
 
 package org.hornetq.tests.integration.cluster.failover;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
+import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.remoting.impl.invm.InVMConnector;
 import org.hornetq.core.remoting.impl.invm.InVMRegistry;
 import org.hornetq.core.remoting.impl.invm.TransportConstants;
 import org.hornetq.core.server.ActivateCallback;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.impl.FakeLockFile;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.tests.util.UnitTestCase;
 
@@ -80,6 +89,7 @@
    {
       super.setUp();
       clearData();
+      FakeLockFile.clearLocks();
       createConfigs();
 
       if (server1Service != null)
@@ -101,6 +111,16 @@
       config1.setSecurityEnabled(false);
       config1.setSharedStore(true);
       config1.setBackup(true);
+      config1.setClustered(true);
+      TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
+      TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
+      List<String> staticConnectors = new ArrayList<String>();
+      staticConnectors.add(liveConnector.getName());
+       ClusterConnectionConfiguration ccc1 = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1,
+               staticConnectors);
+      config1.getClusterConfigurations().add(ccc1);
+      config1.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+      config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
       server1Service = createFakeLockServer(true, config1);
       
       server1Service.registerActivateCallback(new ActivateCallback()
@@ -129,6 +149,12 @@
       config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
       config0.setSecurityEnabled(false);
       config0.setSharedStore(true);
+      config0.setClustered(true);
+       List<String> pairs = null;
+      ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
+               pairs);
+      config0.getClusterConfigurations().add(ccc0);
+      config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
       server0Service = createFakeLockServer(true, config0);
 
    }
@@ -179,6 +205,20 @@
       super.tearDown();
    }
 
+   protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator, int topologyMembers)
+           throws Exception
+     {
+        ClientSessionFactoryInternal sf;
+        CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
+
+        locator.addClusterTopologyListener(new LatchClusterTopologyListener(countDownLatch));
+
+        sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+
+        assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
+        return sf;
+     }
+
    protected TransportConfiguration getInVMConnectorTransportConfiguration(final boolean live)
    {
       if (live)
@@ -251,14 +291,45 @@
 
    protected abstract TransportConfiguration getConnectorTransportConfiguration(final boolean live);
 
-   protected ClientSessionFactoryInternal getSessionFactory() throws Exception
+   protected ServerLocatorInternal getServerLocator() throws Exception
    {
-      ServerLocator locator = HornetQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true), getConnectorTransportConfiguration(false));
-      return (ClientSessionFactoryInternal) locator.createSessionFactory();
+      ServerLocator locator = HornetQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true));
+      return (ServerLocatorInternal) locator;
    }
 
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
+   class LatchClusterTopologyListener implements ClusterTopologyListener
+   {
+      final CountDownLatch latch;
+      int liveNodes = 0;
+      int backUpNodes = 0;
+      List<String> liveNode = new ArrayList<String>();
+      List<String> backupNode = new ArrayList<String>();
 
+      public LatchClusterTopologyListener(CountDownLatch latch)
+      {
+         this.latch = latch;
+      }
+
+      public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+      {
+         if(connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
+         {
+            liveNode.add(connectorPair.a.getName());
+            latch.countDown();
+         }
+         if(connectorPair.b != null && !backupNode.contains(connectorPair.b.getName()))
+         {
+            backupNode.add(connectorPair.b.getName());
+            latch.countDown();
+         }
+      }
+
+      public void nodeDown(String nodeID)
+      {
+         //To change body of implemented methods use File | Settings | File Templates.
+      }
+   }
 }

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2010-08-25 08:39:01 UTC (rev 9595)
@@ -17,7 +17,9 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.tests.util.UnitTestCase;
 
@@ -84,11 +86,11 @@
    }
    
 
-   protected ClientSessionFactoryInternal getSessionFactory() throws Exception
+   protected ServerLocatorInternal getServerLocator() throws Exception
    {
-      ClientSessionFactoryInternal sf = super.getSessionFactory();
-      sf.getServerLocator().setMinLargeMessageSize(LARGE_MESSAGE_SIZE);
-      return sf;
+      ServerLocator locator = super.getServerLocator();
+      locator.setMinLargeMessageSize(LARGE_MESSAGE_SIZE);
+      return (ServerLocatorInternal) locator;
    }
 
 

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java	2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java	2010-08-25 08:39:01 UTC (rev 9595)
@@ -22,11 +22,7 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientConsumer;
-import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.api.core.client.*;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.config.Configuration;
@@ -79,11 +75,16 @@
 
    public void internalTestPage(final boolean transacted, final boolean failBeforeConsume) throws Exception
    {
-      ClientSessionFactoryInternal factory = getSessionFactory();
-      factory.getServerLocator().setBlockOnDurableSend(true);
-      factory.getServerLocator().setBlockOnAcknowledge(true);
-      ClientSession session = factory.createSession(!transacted, !transacted, 0);
+      ServerLocator locator = getServerLocator();
 
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+
+      //waitForTopology(locator, 1, 1);
+
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+      ClientSession session = sf.createSession(!transacted, !transacted, 0);
+
       try
       {
 
@@ -170,7 +171,7 @@
 
          session.close();
 
-         session = factory.createSession(true, true, 0);
+         session = sf.createSession(true, true, 0);
 
          cons = session.createConsumer(PagingFailoverTest.ADDRESS);
 

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java	2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java	2010-08-25 08:39:01 UTC (rev 9595)
@@ -10,6 +10,7 @@
  *  implied.  See the License for the specific language governing
  *  permissions and limitations under the License.
  */
+
 package org.hornetq.tests.util;
 
 import org.hornetq.core.config.Configuration;

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java	2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java	2010-08-25 08:39:01 UTC (rev 9595)
@@ -65,6 +65,7 @@
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.impl.FakeLockFile;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.transaction.impl.XidImpl;
 import org.hornetq.jms.client.HornetQTextMessage;



More information about the hornetq-commits mailing list