[hornetq-commits] JBoss hornetq SVN: r11600 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 26 06:25:21 EDT 2011


Author: borges
Date: 2011-10-26 06:25:21 -0400 (Wed, 26 Oct 2011)
New Revision: 11600

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Fix race & dead-locking between ServerLocatorImpl.close() and the creation of new SessionFactories.

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-10-26 10:25:06 UTC (rev 11599)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-10-26 10:25:21 UTC (rev 11600)
@@ -57,6 +57,8 @@
  */
 public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
 {
+   private enum STATE{ INITIALIZED, CLOSED, CLOSING};
+   
    private static final long serialVersionUID = -1615857864410205260L;
 
    private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
@@ -69,7 +71,8 @@
 
    private transient String identity;
 
-   private Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>();
+   private final Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>();
+   private final Set<ClientSessionFactoryInternal> connectingFactories = new HashSet<ClientSessionFactoryInternal>();
 
    private TransportConfiguration[] initialConnectors;
 
@@ -154,10 +157,8 @@
 
    private int initialMessagePacketSize;
 
-   private volatile boolean closed;
+   private volatile STATE state;
 
-   private volatile boolean closing;
-
    private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
 
    private static ExecutorService globalThreadPool;
@@ -252,7 +253,7 @@
       return globalScheduledThreadPool;
    }
 
-   private void setThreadPools()
+   private synchronized void setThreadPools()
    {
       if (threadPool != null)
       {
@@ -329,10 +330,16 @@
       });
    }
 
-   private synchronized void initialise() throws Exception
+   private synchronized void initialise() throws HornetQException
    {
-      if (!readOnly)
+      if (readOnly)
       {
+         return;
+      }
+
+      try
+      {
+         state = STATE.INITIALIZED;
          setThreadPools();
 
          instantiateLoadBalancingPolicy();
@@ -366,6 +373,11 @@
 
          readOnly = true;
       }
+      catch (Exception e)
+      {
+         state = null;
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+      }
    }
 
    private ServerLocatorImpl(final Topology topology,
@@ -515,14 +527,12 @@
 
          return pair.getA();
       }
-      else
-      {
-         // Get from initialconnectors
+      
+      // Get from initialconnectors
 
-         int pos = loadBalancingPolicy.select(initialConnectors.length);
+      int pos = loadBalancingPolicy.select(initialConnectors.length);
 
-         return initialConnectors[pos];
-      }
+      return initialConnectors[pos];
    }
 
    public void start(Executor executor) throws Exception
@@ -541,7 +551,7 @@
             }
             catch (Exception e)
             {
-               if (!closing)
+               if (!isClosed())
                {
                   log.warn("did not connect the cluster connection to other nodes", e);
                }
@@ -565,19 +575,15 @@
 
    public ClientSessionFactoryInternal connect() throws Exception
    {
-      ClientSessionFactoryInternal sf;
       // static list of initial connectors
       if (initialConnectors != null && discoveryGroup == null)
       {
-         sf = (ClientSessionFactoryInternal)staticConnector.connect();
+         ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)staticConnector.connect();
+         addFactory(sf);
+         return sf;
       }
       // wait for discovery group to get the list of initial connectors
-      else
-      {
-         sf = (ClientSessionFactoryInternal)createSessionFactory();
-      }
-      addFactory(sf);
-      return sf;
+      return (ClientSessionFactoryInternal)createSessionFactory();
    }
 
    /* (non-Javadoc)
@@ -593,27 +599,12 @@
       return afterConnectListener;
    }
 
-   public boolean isClosed()
+   public synchronized ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
    {
-      return closed || closing;
-   }
+      assertOpen();
 
-   public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
-   {
-      if (closed)
-      {
-         throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
-      }
-
-      try
-      {
-         initialise();
-      }
-      catch (Exception e)
-      {
-         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
-      }
-
+      initialise();
+   
       ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
                                                                           transportConfiguration,
                                                                           callTimeout,
@@ -627,29 +618,39 @@
                                                                           scheduledThreadPool,
                                                                           interceptors);
 
-      factory.connect(reconnectAttempts, failoverOnInitialConnection);
+      addToConnecting(factory);
+      try
+      {
+         factory.connect(reconnectAttempts, failoverOnInitialConnection);
+         addFactory(factory);
+         return factory;
+      }
+      finally
+      {
+         removeFromConnecting(factory);
+      }
+   }
 
-      addFactory(factory);
-
-      return factory;
+   private void removeFromConnecting(ClientSessionFactoryInternal factory)
+   {
+      connectingFactories.remove(factory);
    }
 
-   public ClientSessionFactory createSessionFactory() throws Exception
+   private void addToConnecting(ClientSessionFactoryInternal factory)
    {
-      if (closed || closing)
+      synchronized (connectingFactories)
       {
-         throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+         assertOpen();
+         connectingFactories.add(factory);
       }
+   }
 
-      try
-      {
-         initialise();
-      }
-      catch (Exception e)
-      {
-         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
-      }
+   public ClientSessionFactory createSessionFactory() throws Exception
+   {
+      assertOpen();
 
+      initialise();
+
       if (initialConnectors == null && discoveryGroup != null)
       {
          // Wait for an initial broadcast to give us at least one node in the cluster
@@ -691,7 +692,15 @@
                                                       threadPool,
                                                       scheduledThreadPool,
                                                       interceptors);
-               factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+               try
+               {
+                  addToConnecting(factory);
+                  factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+               }
+               finally
+               {
+                  removeFromConnecting(factory);
+               }
             }
             catch (HornetQException e)
             {
@@ -723,10 +732,8 @@
 
          if (ha || clusterConnection)
          {
-            long timeout = System.currentTimeMillis() + 30000;
-            while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing &&
-                   !receivedTopology &&
-                   timeout > System.currentTimeMillis())
+            final long timeout = System.currentTimeMillis() + 30000;
+            while (!isClosed() && !receivedTopology && timeout > System.currentTimeMillis())
             {
                // Now wait for the topology
 
@@ -740,7 +747,7 @@
 
             }
 
-            if (System.currentTimeMillis() > timeout && !receivedTopology && !closed && !closing)
+            if (System.currentTimeMillis() > timeout && !receivedTopology && !isClosed())
             {
                throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
                                           "Timed out waiting to receive cluster topology. Group:" + discoveryGroup);
@@ -1184,7 +1191,7 @@
 
    protected void doClose(final boolean sendClose)
    {
-      if (closed)
+      if (state == STATE.CLOSED)
       {
          if (log.isDebugEnabled())
          {
@@ -1193,13 +1200,8 @@
          return;
       }
 
-      if (log.isDebugEnabled())
-      {
-         log.debug(this + " is calling close", new Exception("trace"));
-      }
+      state = STATE.CLOSING;
 
-      closing = true;
-
       if (discoveryGroup != null)
       {
          try
@@ -1215,9 +1217,21 @@
       {
          staticConnector.disconnect();
       }
+      
+      synchronized (connectingFactories)
+      {
+         for (ClientSessionFactoryInternal csf : connectingFactories)
+         {
+            csf.causeExit();
+            csf.close();
+         }
+         connectingFactories.clear();
+      }
+      
+      synchronized (this)
+      {
+         Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
 
-      Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
-
       for (ClientSessionFactory factory : clonedFactory)
       {
          if (sendClose)
@@ -1268,7 +1282,8 @@
       }
       readOnly = false;
 
-      closed = true;
+         state = STATE.CLOSED;
+      }
    }
 
    /** This is directly called when the connection to the node is gone,
@@ -1378,13 +1393,10 @@
                 discoveryGroupConfiguration +
                 "]";
       }
-      else
-      {
-         return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
-                ", discoveryGroupConfiguration=" +
-                discoveryGroupConfiguration +
-                "]";
-      }
+      return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
+             ", discoveryGroupConfiguration=" +
+             discoveryGroupConfiguration +
+             "]";
    }
 
    private synchronized void updateArraysAndPairs()
@@ -1467,18 +1479,27 @@
 
    public synchronized void addFactory(ClientSessionFactoryInternal factory)
    {
-      if (factory != null)
+      if (factory == null)
       {
-         TransportConfiguration backup = null;
+         return;
+      }
 
-         if (topology != null)
-         {
-            backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
-         }
+      if (isClosed())
+      {
+         factory.causeExit();
+         factory.close();
+         return;
+      }
 
-         factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
-         factories.add(factory);
+      TransportConfiguration backup = null;
+
+      if (topology != null)
+      {
+         backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
       }
+
+      factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
+      factories.add(factory);
    }
 
    class StaticConnector implements Serializable
@@ -1489,19 +1510,9 @@
 
       public ClientSessionFactory connect() throws HornetQException
       {
-         if (closed)
-         {
-            throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
-         }
+         assertOpen();
 
-         try
-         {
-            initialise();
-         }
-         catch (Exception e)
-         {
-            throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
-         }
+         initialise();
 
          ClientSessionFactory csf = null;
 
@@ -1511,7 +1522,7 @@
          {
 
             int retryNumber = 0;
-            while (csf == null && !ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing)
+            while (csf == null && !isClosed())
             {
                retryNumber++;
                for (Connector conn : connectors)
@@ -1564,7 +1575,7 @@
                   break;
                }
 
-               if (!closed && !closing)
+               if (!isClosed())
                {
                   Thread.sleep(retryInterval);
                }
@@ -1577,7 +1588,7 @@
             throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
          }
 
-         if (csf == null && !closed)
+         if (csf == null && !isClosed())
          {
             log.warn("Failed to connecto to any static connector, throwing exception now");
             throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
@@ -1636,7 +1647,7 @@
 
       public void finalize() throws Throwable
       {
-         if (!closed && finalizeCheck)
+         if (!isClosed() && finalizeCheck)
          {
             log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
                      System.identityHashCode(this));
@@ -1681,7 +1692,15 @@
                ClientSessionFactoryInternal factoryToUse = factory;
                if (factoryToUse != null)
                {
-                  factory.connect(1, false);
+                  try
+                  {
+                     addToConnecting(factoryToUse);
+                     factoryToUse.connect(1, false);
+                  }
+                  finally
+                  {
+                     removeFromConnecting(factoryToUse);
+                  }
                }
                return factoryToUse;
             }
@@ -1704,9 +1723,6 @@
             }
          }
 
-         /* (non-Javadoc)
-          * @see java.lang.Object#toString()
-          */
          @Override
          public String toString()
          {
@@ -1715,4 +1731,17 @@
 
       }
    }
+
+   private void assertOpen()
+   {
+      if (state != null && state != STATE.INITIALIZED)
+      {
+         throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+      }
+   }
+
+   public boolean isClosed()
+   {
+      return state != STATE.INITIALIZED;
+   }
 }



More information about the hornetq-commits mailing list