[hornetq-commits] JBoss hornetq SVN: r11604 - trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 26 08:14:52 EDT 2011


Author: borges
Date: 2011-10-26 08:14:51 -0400 (Wed, 26 Oct 2011)
New Revision: 11604

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Avoid locking the locator to close it, (ii) track factories at static connector with connectingFactory,
(iii) add 'state' field and remove all other fields used to track state.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-10-26 12:05:13 UTC (rev 11603)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-10-26 12:14:51 UTC (rev 11604)
@@ -59,6 +59,11 @@
 {
    private static final long serialVersionUID = -1615857864410205260L;
 
+   private enum STATE
+   {
+      INITIALIZED, CLOSING, CLOSED,
+   };
+
    private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
 
    private final boolean ha;
@@ -97,8 +102,6 @@
 
    private ConnectionLoadBalancingPolicy loadBalancingPolicy;
 
-   private boolean readOnly;
-
    // Settable attributes:
 
    private boolean cacheLargeMessagesClient;
@@ -155,10 +158,8 @@
 
    private int initialMessagePacketSize;
 
-   private volatile boolean closed;
+   private volatile STATE state;
 
-   private volatile boolean closing;
-
    private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
 
    private static ExecutorService globalThreadPool;
@@ -332,10 +333,14 @@
 
    private synchronized void initialise() throws HornetQException
    {
-      if (readOnly)
+      if (state == STATE.INITIALIZED)
       {
          return;
       }
+      if (state == STATE.CLOSING)
+      {
+         throw new IllegalStateException("Cannot initialize 'closing' locator");
+      }
       try
       {
          setThreadPools();
@@ -369,7 +374,7 @@
             discoveryGroup.start();
          }
 
-         readOnly = true;
+         state = STATE.INITIALIZED;
       }
       catch (Exception e)
       {
@@ -550,7 +555,7 @@
             }
             catch (Exception e)
             {
-               if (!closing)
+               if (!isClosed())
                {
                   log.warn("did not connect the cluster connection to other nodes", e);
                }
@@ -600,21 +605,20 @@
 
    public boolean isClosed()
    {
-      return closed || closing;
+      return state == STATE.CLOSED || state == STATE.CLOSING;
    }
 
    public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
    {
       assertOpen();
 
-         initialise();
+      initialise();
 
       synchronized (this)
       {
          assertOpen();
          ClientSessionFactoryInternal factory =
-                  new ClientSessionFactoryImpl(this,
-                                                                          transportConfiguration,
+                  new ClientSessionFactoryImpl(this, transportConfiguration,
                                                                           callTimeout,
                                                                           clientFailureCheckPeriod,
                                                                           connectionTTL,
@@ -625,17 +629,36 @@
                                                                           threadPool,
                                                                           scheduledThreadPool,
                                                                           interceptors);
+         addToConnecting(factory);
+         try
+         {
+            factory.connect(reconnectAttempts, failoverOnInitialConnection);
+            addFactory(factory);
+            return factory;
+         }
+         finally
+         {
+            removeFromConnecting(factory);
+         }
+      }
+   }
+
+   private void removeFromConnecting(ClientSessionFactoryInternal factory)
+   {
+      connectingFactories.remove(factory);
+   }
+
+   private void addToConnecting(ClientSessionFactoryInternal factory)
+   {
+      synchronized (connectingFactories)
+      {
+         assertOpen();
          connectingFactories.add(factory);
-         factory.connect(reconnectAttempts, failoverOnInitialConnection);
-         connectingFactories.remove(factory);
-         addFactory(factory);
-         return factory;
       }
    }
-
    private void assertOpen()
    {
-      if (closed || closing)
+      if (state != null && state != STATE.INITIALIZED)
       {
          throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
       }
@@ -677,27 +700,24 @@
 
             try
             {
-               factory = new ClientSessionFactoryImpl(this,
-                                                      tc,
-                                                      callTimeout,
-                                                      clientFailureCheckPeriod,
-                                                      connectionTTL,
-                                                      retryInterval,
-                                                      retryIntervalMultiplier,
-                                                      maxRetryInterval,
-                                                      reconnectAttempts,
-                                                      threadPool,
-                                                      scheduledThreadPool,
-                                                      interceptors);
-               connectingFactories.add(factory);
-               factory.connect(initialConnectAttempts, failoverOnInitialConnection);
-               connectingFactories.remove(factory);
+               factory =
+                        new ClientSessionFactoryImpl(this, tc, callTimeout, clientFailureCheckPeriod, connectionTTL,
+                                                     retryInterval, retryIntervalMultiplier, maxRetryInterval,
+                                                     reconnectAttempts, threadPool, scheduledThreadPool, interceptors);
+               addToConnecting(factory);
+               try
+               {
+                  factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+               }
+               finally
+               {
+                  removeFromConnecting(factory);
+               }
             }
             catch (HornetQException e)
             {
                if (factory != null)
                {
-                  connectingFactories.remove(factory);
                   factory.close();
                }
                factory = null;
@@ -728,9 +748,7 @@
          if (ha || clusterConnection)
          {
             long timeout = System.currentTimeMillis() + 30000;
-            while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing &&
-                   !receivedTopology &&
-                   timeout > System.currentTimeMillis())
+            while (!isClosed() && !receivedTopology && timeout > System.currentTimeMillis())
             {
                // Now wait for the topology
 
@@ -744,7 +762,7 @@
 
             }
 
-            if (System.currentTimeMillis() > timeout && !receivedTopology && !closed && !closing)
+            if (System.currentTimeMillis() > timeout && !receivedTopology)
             {
                throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
                                           "Timed out waiting to receive cluster topology. Group:" + discoveryGroup);
@@ -755,7 +773,6 @@
 
          return factory;
       }
-
    }
 
    public boolean isHA()
@@ -1108,7 +1125,7 @@
 
    private void checkWrite()
    {
-      if (readOnly)
+      if (state == STATE.INITIALIZED)
       {
          throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
       }
@@ -1187,7 +1204,7 @@
 
    protected void doClose(final boolean sendClose)
    {
-      if (closed)
+      if (state == STATE.CLOSED)
       {
          if (log.isDebugEnabled())
          {
@@ -1201,7 +1218,7 @@
          log.debug(this + " is calling close", new Exception("trace"));
       }
 
-      closing = true;
+      state = STATE.CLOSING;
 
       if (discoveryGroup != null)
       {
@@ -1219,28 +1236,33 @@
          staticConnector.disconnect();
       }
 
-      for (ClientSessionFactoryInternal factory : connectingFactories)
+      synchronized (connectingFactories)
       {
-         factory.causeExit();
-         factory.close();
+         for (ClientSessionFactoryInternal factory : connectingFactories)
+         {
+            factory.causeExit();
+            factory.close();
+         }
+         connectingFactories.clear();
       }
-      synchronized (this)
+
+      synchronized (factories)
       {
-      Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
+         Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
 
-      for (ClientSessionFactory factory : clonedFactory)
-      {
-         if (sendClose)
+         for (ClientSessionFactory factory : clonedFactory)
          {
-            factory.close();
+            if (sendClose)
+            {
+               factory.close();
+            }
+            else
+            {
+               factory.cleanup();
+            }
          }
-         else
-         {
-            factory.cleanup();
-         }
-      }
 
-      factories.clear();
+         factories.clear();
       }
 
       if (shutdownPool)
@@ -1277,9 +1299,7 @@
             }
          }
       }
-      readOnly = false;
-
-      closed = true;
+      state = STATE.CLOSED;
    }
 
    /** This is directly called when the connection to the node is gone,
@@ -1370,9 +1390,6 @@
       }
    }
 
-   /* (non-Javadoc)
-    * @see java.lang.Object#toString()
-    */
    @Override
    public String toString()
    {
@@ -1385,21 +1402,19 @@
                 discoveryGroupConfiguration +
                 "]";
       }
-      else
-      {
-         return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
+      return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
                 ", discoveryGroupConfiguration=" +
                 discoveryGroupConfiguration +
                 "]";
-      }
    }
 
    private synchronized void updateArraysAndPairs()
    {
+      assertOpen();
       Collection<TopologyMember> membersCopy = topology.getMembers();
 
-      topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
-                                                                                                membersCopy.size());
+      topologyArray =
+               (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class, membersCopy.size());
 
       int count = 0;
       for (TopologyMember pair : membersCopy)
@@ -1472,20 +1487,22 @@
       topology.removeClusterTopologyListener(listener);
    }
 
-   public synchronized void addFactory(ClientSessionFactoryInternal factory)
+   private synchronized void addFactory(ClientSessionFactoryInternal factory)
    {
       if (factory == null)
       {
          return;
       }
 
-      if (closed || closing)
+      synchronized (factories)
       {
-         factory.close();
-         return;
-      }
+         if (isClosed())
+         {
+            factory.close();
+            return;
+         }
 
-      TransportConfiguration backup = null;
+         TransportConfiguration backup = null;
 
          if (topology != null)
          {
@@ -1494,7 +1511,7 @@
 
          factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
          factories.add(factory);
-
+      }
    }
 
    final class StaticConnector implements Serializable
@@ -1505,7 +1522,7 @@
 
       public ClientSessionFactory connect() throws HornetQException
       {
-         if (closed || closing)
+         if (isClosed())
          {
             throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
          }
@@ -1520,7 +1537,7 @@
          {
 
             int retryNumber = 0;
-            while (csf == null && !ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing)
+            while (csf == null && isClosed())
             {
                retryNumber++;
                for (Connector conn : connectors)
@@ -1573,7 +1590,7 @@
                   break;
                }
 
-               if (!closed && !closing)
+               if (!isClosed())
                {
                   Thread.sleep(retryInterval);
                }
@@ -1586,7 +1603,7 @@
             throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
          }
 
-         if (csf == null && !closed)
+         if (csf == null && !isClosed())
          {
             log.warn("Failed to connecto to any static connector, throwing exception now");
             throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
@@ -1647,7 +1664,7 @@
       @Override
       public void finalize() throws Throwable
       {
-         if (!closed && finalizeCheck)
+         if (state != STATE.CLOSED && finalizeCheck)
          {
             log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
                      System.identityHashCode(this));
@@ -1692,7 +1709,16 @@
                ClientSessionFactoryInternal factoryToUse = factory;
                if (factoryToUse != null)
                {
-                  factory.connect(1, false);
+                  addToConnecting(factoryToUse);
+
+                  try
+                  {
+                     factoryToUse.connect(1, false);
+                  }
+                  finally
+                  {
+                     removeFromConnecting(factoryToUse);
+                  }
                }
                return factoryToUse;
             }
@@ -1715,9 +1741,6 @@
             }
          }
 
-         /* (non-Javadoc)
-          * @see java.lang.Object#toString()
-          */
          @Override
          public String toString()
          {



More information about the hornetq-commits mailing list