[hornetq-commits] JBoss hornetq SVN: r9696 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/server/cluster/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Sep 16 09:41:15 EDT 2010


Author: ataylor
Date: 2010-09-16 09:41:15 -0400 (Thu, 16 Sep 2010)
New Revision: 9696

Added:
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java
Modified:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java
Log:
added connector code to connect to static connectors in parallel

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-09-16 13:41:15 UTC (rev 9696)
@@ -157,8 +157,6 @@
                                    final double retryIntervalMultiplier,
                                    final long maxRetryInterval,
                                    final int reconnectAttempts,
-                                   final int initialConnectAttempts,
-                                   final boolean failoverOnInitialConnection,
                                    final ExecutorService threadPool,
                                    final ScheduledExecutorService scheduledThreadPool,
                                    final List<Interceptor> interceptors) throws HornetQException
@@ -197,6 +195,10 @@
 
       this.interceptors = interceptors;
 
+   }
+
+   public void connect(int initialConnectAttempts, boolean failoverOnInitialConnection) throws HornetQException
+   {
       // Get the connection
 
       getConnectionWithRetry(initialConnectAttempts);
@@ -210,9 +212,9 @@
             // Try and connect to the backup
 
             log.warn("Server is not available to make initial connection to. Will try backup server instead.");
-            
+
             this.connectorConfig = backupConfig;
-            
+
             connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
 
             transportParams = this.connectorConfig.getParams();

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java	2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java	2010-09-16 13:41:15 UTC (rev 9696)
@@ -12,6 +12,7 @@
  */
 package org.hornetq.core.client.impl;
 
+import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.SessionFailureListener;
 
@@ -36,4 +37,6 @@
    int numSessions();
    
    void removeSession(final ClientSessionInternal session, boolean failingOver);
+
+   void connect(int reconnectAttempts, boolean failoverOnInitialConnection) throws HornetQException;
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-09-16 13:41:15 UTC (rev 9696)
@@ -18,17 +18,8 @@
 import java.net.InetAddress;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
@@ -71,6 +62,8 @@
 
    private TransportConfiguration[] initialConnectors;
 
+   private StaticConnector staticConnector = new StaticConnector();
+
    private Topology topology = new Topology();
 
    private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
@@ -162,9 +155,9 @@
    private String groupID;
 
    private String nodeID;
-   
+
    private TransportConfiguration clusterTransportConfiguration;
-   
+
    private boolean backup;
 
    private static synchronized ExecutorService getGlobalThreadPool()
@@ -184,8 +177,8 @@
       if (globalScheduledThreadPool == null)
       {
          ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
-                                                          true,
-                                                          getThisClassLoader());
+               true,
+               getThisClassLoader());
 
          globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
 
@@ -428,48 +421,22 @@
    {
       initialise();
    }
-   
-   public void connect()
+
+   public ClientSessionFactory connect() throws Exception
    {
+      ClientSessionFactory sf;
       // static list of initial connectors
       if (initialConnectors != null && discoveryGroup == null)
       {
-         for (TransportConfiguration connector : initialConnectors)
-         {
-            ClientSessionFactory sf = null;
-            do
-            {
-               try
-               {
-                  sf = createSessionFactory(connector);
-               }
-               catch (HornetQException e)
-               {
-                  if (e.getCode() == HornetQException.NOT_CONNECTED)
-                  {
-                     continue;
-                  }
-               }
-               catch (Exception e)
-               {
-                  break;
-               }
-            }
-            while (sf == null);
-         }
+         sf = staticConnector.connect();
       }
       // wait for discovery group to get the list of initial connectors
       else
       {
-         try
-         {
-            ClientSessionFactory sf = createSessionFactory();
-         }
-         catch (Exception e)
-         {
-            e.printStackTrace();
-         }
+         sf = createSessionFactory();
       }
+      factories.add(sf);
+      return sf;
    }
 
    public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
@@ -488,7 +455,7 @@
          throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
       }
 
-      ClientSessionFactory factory = new ClientSessionFactoryImpl(this,
+      ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
             transportConfiguration,
             callTimeout,
             clientFailureCheckPeriod,
@@ -497,14 +464,14 @@
             retryIntervalMultiplier,
             maxRetryInterval,
             reconnectAttempts,
-            initialConnectAttempts,
-            failoverOnInitialConnection,
             threadPool,
             scheduledThreadPool,
             interceptors);
 
+      factory.connect(reconnectAttempts, failoverOnInitialConnection);
+
       factories.add(factory);
-      
+
       return factory;
    }
 
@@ -537,7 +504,7 @@
          }
       }
 
-      ClientSessionFactory factory = null;
+      ClientSessionFactoryInternal factory = null;
 
       synchronized (this)
       {
@@ -562,11 +529,10 @@
                      retryIntervalMultiplier,
                      maxRetryInterval,
                      reconnectAttempts,
-                     initialConnectAttempts,
-                     failoverOnInitialConnection,
                      threadPool,
                      scheduledThreadPool,
                      interceptors);
+               factory.connect(reconnectAttempts, failoverOnInitialConnection);
             }
             catch (HornetQException e)
             {
@@ -582,7 +548,7 @@
                   if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
                   {
                      throw new HornetQException(HornetQException.NOT_CONNECTED,
-                     "Cannot connect to server(s). Tried with all available servers.");
+                           "Cannot connect to server(s). Tried with all available servers.");
                   }
                   retry = true;
                }
@@ -1007,7 +973,7 @@
    {
       this.nodeID = nodeID;
    }
-   
+
    public String getNodeID()
    {
       return nodeID;
@@ -1022,12 +988,12 @@
    {
       return clusterConnection;
    }
-   
+
    public TransportConfiguration getClusterTransportConfiguration()
    {
       return clusterTransportConfiguration;
    }
-   
+
    public void setClusterTransportConfiguration(TransportConfiguration tc)
    {
       this.clusterTransportConfiguration = tc;
@@ -1037,17 +1003,12 @@
    {
       return backup;
    }
-   
+
    public void setBackup(boolean backup)
    {
       this.backup = backup;
    }
 
-   public void announceBackup()
-   {
-      connect();
-   }
-
    @Override
    protected void finalize() throws Throwable
    {
@@ -1074,6 +1035,10 @@
             log.error("Failed to stop discovery group", e);
          }
       }
+      else
+      {
+         staticConnector.disconnect();
+      }
 
       for (ClientSessionFactory factory : factories)
       {
@@ -1214,12 +1179,19 @@
       {
          this.initialConnectors[count++] = entry.getConnector();
       }
-      
+
       if (ha && clusterConnection && !receivedTopology && initialConnectors.length > 0)
       {
          // FIXME the node is alone in the cluster. We create a connection to the new node
          // to trigger the node notification to form the cluster.
-         connect();
+         try
+         {
+            connect();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+         }
       }
    }
 
@@ -1253,4 +1225,176 @@
       return pairs.get(live);
    }
 
+   class StaticConnector
+   {
+      private List<Connector> connectors;
+
+      public ClientSessionFactory connect() throws HornetQException
+      {
+         if (closed)
+         {
+            throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+         }
+
+         try
+         {
+            initialise();
+         }
+         catch (Exception e)
+         {
+            throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+         }
+         
+         ClientSessionFactory csf = null;
+
+         createConnectors();
+
+         try
+         {
+            List<Future<ClientSessionFactory>> futures = threadPool.invokeAll(connectors);
+            for (int i = 0, futuresSize = futures.size(); i < futuresSize; i++)
+            {
+               Future<ClientSessionFactory> future = futures.get(i);
+               try
+               {
+                  csf = future.get();
+                  if(csf != null)
+                     break;
+               }
+               catch (Exception e)
+               {
+                  log.debug("unable to connect with static connector " + connectors.get(i).initialConnector);
+               }
+            }
+         }
+         catch (InterruptedException e)
+         {
+            throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
+         }
+
+         if (csf == null)
+         {
+            throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
+         }
+         return csf;
+      }
+
+      private synchronized void createConnectors()
+      {
+         connectors = new ArrayList<Connector>();
+         for (TransportConfiguration initialConnector : initialConnectors)
+         {
+            connectors.add(new Connector(initialConnector));
+         }
+      }
+
+
+      public synchronized void disconnect()
+      {
+         if (connectors != null)
+         {
+            for (Connector connector : connectors)
+            {
+               connector.disconnect();
+            }
+         }
+      }
+
+
+      class Connector implements Callable<ClientSessionFactory>
+      {
+         private TransportConfiguration initialConnector;
+         private ClientSessionFactoryInternal factory;
+         private boolean isConnected = false;
+         private boolean interrupted = false;
+         private Exception e;
+
+         public Connector(TransportConfiguration initialConnector)
+         {
+            this.initialConnector = initialConnector;
+         }
+
+         public ClientSessionFactory call() throws HornetQException
+         {
+            factory = getFactory();
+            try
+            {
+               factory.connect(reconnectAttempts, failoverOnInitialConnection);
+            }
+            catch (HornetQException e)
+            {
+               if (!interrupted)
+               {
+                  this.e = e;
+                  throw e;
+               }
+               return null;
+            }
+            isConnected = true;
+            for (Connector connector : connectors)
+            {
+               if (!connector.isConnected())
+               {
+                  connector.disconnect();
+               }
+            }
+            return factory;
+         }
+
+         public boolean isConnected()
+         {
+            return isConnected;
+         }
+
+         public void disconnect()
+         {
+            interrupted = true;
+            try
+            {
+               ClientSessionFactoryInternal factory = getFactory();
+               if (factory != null)
+               {
+                  factory.causeExit();
+               }
+               else
+               {
+                  System.out.println("ServerLocatorImpl$StaticConnector$Connector.disconnect");
+               }
+            }
+            catch (HornetQException e1)
+            {
+               log.debug("exception closing factory");
+            }
+         }
+
+         private synchronized ClientSessionFactoryInternal getFactory() throws HornetQException
+         {
+            if (factory == null)
+            {
+               try
+              {
+                 initialise();
+              }
+              catch (Exception e)
+              {
+                 throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+              }
+
+               factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
+                     initialConnector,
+                     callTimeout,
+                     clientFailureCheckPeriod,
+                     connectionTTL,
+                     retryInterval,
+                     retryIntervalMultiplier,
+                     maxRetryInterval,
+                     reconnectAttempts,
+                     threadPool,
+                     scheduledThreadPool,
+                     interceptors);
+            }
+            return factory;
+         }
+      }
+   }
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2010-09-16 13:41:15 UTC (rev 9696)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.client.impl;
 
+import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientSessionFactory;
@@ -37,7 +38,7 @@
 
    String getNodeID();
 
-   void connect();
+   ClientSessionFactory connect() throws  Exception;
 
    void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
 
@@ -55,7 +56,5 @@
    
    void setBackup(boolean backup);
 
-   void announceBackup();
-
    void setInitialConnectAttempts(int reconnectAttempts);
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-09-16 13:41:15 UTC (rev 9696)
@@ -134,7 +134,10 @@
          this.serverLocator.setClusterTransportConfiguration(connector);
          this.serverLocator.setBackup(server.getConfiguration().isBackup());
          this.serverLocator.setReconnectAttempts(-1);
-         this.serverLocator.setRetryInterval(retryInterval);
+         if(retryInterval > 0)
+         {
+            this.serverLocator.setRetryInterval(retryInterval);
+         }
          
          // a cluster connection will connect to other nodes only if they are directly connected
          // through a static list of connectors 

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-09-16 13:41:15 UTC (rev 9696)
@@ -183,7 +183,7 @@
    private void deployBackupListener(BackupConnectorConfiguration connectorConfiguration)
          throws Exception
    {
-      ServerLocator locator;
+      ServerLocatorInternal locator;
       if (connectorConfiguration.getDiscoveryGroupName() != null)
       {
          DiscoveryGroupConfiguration groupConfiguration = configuration.getDiscoveryGroupConfigurations().get(connectorConfiguration.getDiscoveryGroupName());
@@ -217,7 +217,7 @@
             //todo update the topology
          }
       });
-      backupSessionFactory = locator.createSessionFactory();
+      backupSessionFactory = locator.connect();
       backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), true, configuration.getConnectorConfigurations().get(connectorConfiguration.getConnector())));
    }
 
@@ -370,7 +370,7 @@
    }
    
    // backup node becomes live
-   public synchronized void activate()
+  public synchronized void activate()
    {
       if (backup)
       {
@@ -437,10 +437,15 @@
             }
          }
 
-         if (clusterConnections.size() > 0)
+         for (ClusterTopologyListener listener : clientListeners)
          {
-            announceNode();
+            listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
          }
+
+         for (ClusterTopologyListener listener : clusterConnectionListeners)
+         {
+            listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+         }
       }
    }
 

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-09-16 13:41:15 UTC (rev 9696)
@@ -601,9 +601,9 @@
             
             configuration.setBackup(false);
             
+            initialisePart2();
+            
             clusterManager.activate();
-            
-            initialisePart2();
 
             log.info("Backup Server is now live");
 

Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java	2010-09-16 13:41:15 UTC (rev 9696)
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * User: andy
+ * Date: Sep 15, 2010
+ * Time: 2:27:07 PM
+ * * @author <a href="mailto:andy.taylor at jboss.com">Andy Taylor</a>
+ */
+public class ServerLocatorConnectTest extends ServiceTestBase
+{
+   private HornetQServer server;
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      Configuration configuration = createDefaultConfig(isNetty());
+      server = createServer(false, configuration);
+      server.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      server.stop();
+      super.tearDown();
+   }
+
+   public void testSingleConnectorSingleServer() throws Exception
+   {
+
+      ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(createTransportConfiguration(isNetty(), false, generateParams(0, isNetty())));
+      ClientSessionFactory csf = locator.createSessionFactory();
+      csf.close();
+   }
+
+   public void testSingleConnectorSingleServerConnect() throws Exception
+   {
+      ServerLocatorInternal locator = (ServerLocatorInternal) HornetQClient.createServerLocatorWithoutHA(createTransportConfiguration(isNetty(), false, generateParams(0, isNetty())));
+      ClientSessionFactoryInternal csf = (ClientSessionFactoryInternal) locator.connect();
+      assertNotNull(csf);
+      assertEquals(csf.numConnections(), 1);
+      locator.close();
+   }
+
+   public void testMultipleConnectorSingleServerConnect() throws Exception
+   {
+      ServerLocatorInternal locator = (ServerLocatorInternal) HornetQClient.createServerLocatorWithoutHA(
+            createTransportConfiguration(isNetty(), false, generateParams(0, isNetty())),
+            createTransportConfiguration(isNetty(), false, generateParams(1, isNetty())),
+            createTransportConfiguration(isNetty(), false, generateParams(2, isNetty())),
+            createTransportConfiguration(isNetty(), false, generateParams(3, isNetty())),
+            createTransportConfiguration(isNetty(), false, generateParams(4, isNetty()))
+      );
+      ClientSessionFactoryInternal csf = (ClientSessionFactoryInternal) locator.connect();
+      assertNotNull(csf);
+      assertEquals(csf.numConnections(), 1);
+      locator.close();
+   }
+
+   public void testMultipleConnectorSingleServerConnectReconnect() throws Exception
+   {
+      ServerLocatorInternal locator = (ServerLocatorInternal) HornetQClient.createServerLocatorWithoutHA(
+            createTransportConfiguration(isNetty(), false, generateParams(0, isNetty())),
+            createTransportConfiguration(isNetty(), false, generateParams(1, isNetty())),
+            createTransportConfiguration(isNetty(), false, generateParams(2, isNetty())),
+            createTransportConfiguration(isNetty(), false, generateParams(3, isNetty())),
+            createTransportConfiguration(isNetty(), false, generateParams(4, isNetty()))
+      );
+      locator.setReconnectAttempts(-1);
+      ClientSessionFactoryInternal csf = (ClientSessionFactoryInternal) locator.connect();
+      assertNotNull(csf);
+      assertEquals(csf.numConnections(), 1);
+      locator.close();
+   }
+
+   public void testMultipleConnectorSingleServerNoConnect() throws Exception
+   {
+      ServerLocatorInternal locator = (ServerLocatorInternal) HornetQClient.createServerLocatorWithoutHA(
+            createTransportConfiguration(isNetty(), false, generateParams(1, isNetty())),
+            createTransportConfiguration(isNetty(), false, generateParams(2, isNetty())),
+            createTransportConfiguration(isNetty(), false, generateParams(3, isNetty())),
+            createTransportConfiguration(isNetty(), false, generateParams(4, isNetty())),
+            createTransportConfiguration(isNetty(), false, generateParams(5, isNetty()))
+      );
+      ClientSessionFactoryInternal csf = null;
+      try
+      {
+         csf = (ClientSessionFactoryInternal) locator.connect();
+      }
+      catch (Exception e)
+      {
+         assertTrue(e instanceof HornetQException);
+         assertEquals(((HornetQException)e).getCode(), HornetQException.NOT_CONNECTED);
+      }
+      assertNull(csf);
+      locator.close();
+   }
+
+   public void testMultipleConnectorSingleServerNoConnectAttemptReconnect() throws Exception
+   {
+      ServerLocatorInternal locator = (ServerLocatorInternal) HornetQClient.createServerLocatorWithoutHA(
+            createTransportConfiguration(isNetty(), false, generateParams(1, isNetty())),
+            createTransportConfiguration(isNetty(), false, generateParams(2, isNetty())),
+            createTransportConfiguration(isNetty(), false, generateParams(3, isNetty())),
+            createTransportConfiguration(isNetty(), false, generateParams(4, isNetty())),
+            createTransportConfiguration(isNetty(), false, generateParams(5, isNetty()))
+      );
+      locator.setReconnectAttempts(-1);
+      ClientSessionFactoryInternal csf = null;
+      CountDownLatch countDownLatch = new CountDownLatch(1);
+      Connector target = new Connector(locator, countDownLatch);
+      Thread t = new Thread(target);
+      t.start();
+      //let them get started
+      Thread.sleep(1500);
+      locator.close();
+      assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
+      assertTrue(target.e instanceof HornetQException);
+      assertEquals(((HornetQException)target.e).getCode(), HornetQException.NOT_CONNECTED);
+   }
+
+   public boolean isNetty()
+   {
+      return false;
+   }
+
+   static class Connector implements Runnable
+   {
+      private ServerLocatorInternal locator;
+      CountDownLatch latch;
+      Exception e;
+      public Connector(ServerLocatorInternal locator, CountDownLatch latch)
+      {
+         this.locator = locator;
+         this.latch = latch;
+      }
+
+      public void run()
+      {
+         try
+         {
+            locator.connect();
+         }
+         catch (Exception e)
+         {
+            this.e = e;
+         }
+         latch.countDown();
+      }
+   }
+}

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java	2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java	2010-09-16 13:41:15 UTC (rev 9696)
@@ -34,14 +34,34 @@
 {
    protected Map<Integer, TestableServer> servers = new HashMap<Integer, TestableServer>();
 
+   @Override
+   protected void tearDown() throws Exception
+   {
+      for (TestableServer testableServer : servers.values())
+      {
+         if(testableServer != null)
+         {
+            try
+            {
+               testableServer.destroy();
+            }
+            catch (Throwable e)
+            {
+               //
+            }
+         }
+      }
+      super.tearDown();
+   }
+
    public void testMultipleFailovers2LiveServers() throws Exception
    {
       createLiveConfig(0, 3);
       createBackupConfig(0, 1, true, 0, 3);
-      createBackupConfig(0, 2,true, 0, 3);
+      createBackupConfig(0, 2, true, 0, 3);
       createLiveConfig(3, 0);
-      createBackupConfig(3, 4, true,0, 3);
-      createBackupConfig(3, 5, true,0, 3);
+      createBackupConfig(3, 4, true, 0, 3, 1, 4);
+      createBackupConfig(3, 5, true, 0, 3, 1, 4);
       servers.get(0).start();
       servers.get(3).start();
       servers.get(1).start();
@@ -60,7 +80,7 @@
       servers.get(0).crash(session);
 
       int liveAfter0 = waitForBackup(10000, servers, 1, 2);
-      
+
       ServerLocator locator2 = getServerLocator(3);
       locator2.setBlockOnNonDurableSend(true);
       locator2.setBlockOnDurableSend(true);
@@ -74,23 +94,23 @@
       if (liveAfter0 == 2)
       {
          servers.get(1).stop();
-         servers.get(2).stop();         
+         servers.get(2).stop();
       }
       else
       {
          servers.get(2).stop();
-         servers.get(1).stop();         
+         servers.get(1).stop();
       }
-         
+
       if (liveAfter3 == 4)
       {
          servers.get(5).stop();
-         servers.get(4).stop();         
+         servers.get(4).stop();
       }
       else
       {
          servers.get(4).stop();
-         servers.get(5).stop();         
+         servers.get(5).stop();
       }
    }
 

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java	2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java	2010-09-16 13:41:15 UTC (rev 9696)
@@ -65,7 +65,15 @@
       }
       return false;
    }
-   
+
+   public void destroy()
+   {
+      if(serverProcess != null)
+      {
+         serverProcess.destroy();
+      }
+   }
+
    public void setInitialised(boolean initialised)
    {
       this.initialised = initialised;

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java	2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java	2010-09-16 13:41:15 UTC (rev 9696)
@@ -56,7 +56,12 @@
    {
       return server.isInitialised();
    }
-   
+
+   public void destroy()
+   {
+      //To change body of implemented methods use File | Settings | File Templates.
+   }
+
    public void start() throws Exception
    {
       server.start();

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java	2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java	2010-09-16 13:41:15 UTC (rev 9696)
@@ -31,4 +31,6 @@
    public void crash(ClientSession... sessions) throws Exception;
 
    public boolean isInitialised();
+
+   void destroy();
 }



More information about the hornetq-commits mailing list