[hornetq-commits] JBoss hornetq SVN: r10053 - in trunk: src/main/org/hornetq/core/client/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Dec 17 13:59:02 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-12-17 13:59:02 -0500 (Fri, 17 Dec 2010)
New Revision: 10053

Added:
   trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java
Modified:
   trunk/src/main/org/hornetq/api/core/client/ServerLocator.java
   trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
Log:
JBPAPP-5521 - removing warning

Modified: trunk/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ServerLocator.java	2010-12-17 15:54:31 UTC (rev 10052)
+++ trunk/src/main/org/hornetq/api/core/client/ServerLocator.java	2010-12-17 18:59:02 UTC (rev 10053)
@@ -28,7 +28,16 @@
  */
 public interface ServerLocator
 {
+   
    /**
+    * This method will disable any checks when a GarbageCollection happens leaving connections open.
+    * The JMS Layer will make specific usage of this method, since the ConnectionFactory.finalize should release this.
+    * 
+    * Warn: You may leave resources unnatended if you call this method and don't take care of cleaning the resources yourself.
+    */
+   void disableFinalizeCheck();
+   
+   /**
     * Create a ClientSessionFactory using whatever load balancing policy is in force
     * @return The ClientSessionFactory
     * @throws Exception

Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-12-17 15:54:31 UTC (rev 10052)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-12-17 18:59:02 UTC (rev 10053)
@@ -51,6 +51,8 @@
 
    private final boolean ha;
 
+   private boolean finalizeCheck = true;
+
    private boolean clusterConnection;
 
    private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
@@ -68,7 +70,7 @@
    private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
 
    private boolean receivedTopology;
-   
+
    private boolean compressLargeMessage;
 
    private ExecutorService threadPool;
@@ -156,6 +158,10 @@
    private boolean backup;
 
    private final Exception e = new Exception();
+   
+   // To be called when there are ServerLocator being finalized.
+   // To be used on test assertions
+   public static Runnable finalizeCallback = null;
 
    private static synchronized ExecutorService getGlobalThreadPool()
    {
@@ -174,12 +180,12 @@
       if (globalScheduledThreadPool == null)
       {
          ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
-               true,
-               getThisClassLoader());
+                                                          true,
+                                                          getThisClassLoader());
 
          globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
 
-               factory);
+                                                                      factory);
       }
 
       return globalScheduledThreadPool;
@@ -196,8 +202,8 @@
       else
       {
          ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
-               true,
-               getThisClassLoader());
+                                                          true,
+                                                          getThisClassLoader());
 
          if (threadPoolMaxSize == -1)
          {
@@ -209,8 +215,8 @@
          }
 
          factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
-               true,
-               getThisClassLoader());
+                                            true,
+                                            getThisClassLoader());
 
          scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
       }
@@ -243,14 +249,14 @@
             try
             {
                Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
-               loadBalancingPolicy = (ConnectionLoadBalancingPolicy) clazz.newInstance();
+               loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
                return null;
             }
             catch (Exception e)
             {
                throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
-                     "\"",
-                     e);
+                                                           "\"",
+                                                  e);
             }
          }
       });
@@ -280,11 +286,11 @@
             }
 
             discoveryGroup = new DiscoveryGroupImpl(nodeID,
-                  discoveryGroupConfiguration.getName(),
-                  lbAddress,
-                  groupAddress,
-                  discoveryGroupConfiguration.getGroupPort(),
-                  discoveryGroupConfiguration.getRefreshTimeout());
+                                                    discoveryGroupConfiguration.getName(),
+                                                    lbAddress,
+                                                    groupAddress,
+                                                    discoveryGroupConfiguration.getGroupPort(),
+                                                    discoveryGroupConfiguration.getRefreshTimeout());
 
             discoveryGroup.registerListener(this);
 
@@ -363,7 +369,7 @@
       initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
 
       cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-      
+
       compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
 
       clusterConnection = false;
@@ -424,7 +430,7 @@
             }
             catch (Exception e)
             {
-               if(!closing)
+               if (!closing)
                {
                   log.warn("did not connect the cluster connection to other nodes", e);
                }
@@ -433,18 +439,26 @@
       });
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
+    */
+   public void disableFinalizeCheck()
+   {
+      finalizeCheck = false;
+   }
+
    public ClientSessionFactory connect() throws Exception
    {
       ClientSessionFactoryInternal sf;
       // static list of initial connectors
       if (initialConnectors != null && discoveryGroup == null)
       {
-         sf = (ClientSessionFactoryInternal) staticConnector.connect();
+         sf = (ClientSessionFactoryInternal)staticConnector.connect();
       }
       // wait for discovery group to get the list of initial connectors
       else
       {
-         sf = (ClientSessionFactoryInternal) createSessionFactory();
+         sf = (ClientSessionFactoryInternal)createSessionFactory();
       }
       addFactory(sf);
       return sf;
@@ -467,17 +481,17 @@
       }
 
       ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
-            transportConfiguration,
-            callTimeout,
-            clientFailureCheckPeriod,
-            connectionTTL,
-            retryInterval,
-            retryIntervalMultiplier,
-            maxRetryInterval,
-            reconnectAttempts,
-            threadPool,
-            scheduledThreadPool,
-            interceptors);
+                                                                          transportConfiguration,
+                                                                          callTimeout,
+                                                                          clientFailureCheckPeriod,
+                                                                          connectionTTL,
+                                                                          retryInterval,
+                                                                          retryIntervalMultiplier,
+                                                                          maxRetryInterval,
+                                                                          reconnectAttempts,
+                                                                          threadPool,
+                                                                          scheduledThreadPool,
+                                                                          interceptors);
 
       factory.connect(reconnectAttempts, failoverOnInitialConnection);
 
@@ -505,13 +519,13 @@
       if (initialConnectors == null && discoveryGroup != null)
       {
          // Wait for an initial broadcast to give us at least one node in the cluster
-         long timeout = clusterConnection?0:discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
+         long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
          boolean ok = discoveryGroup.waitForBroadcast(timeout);
 
          if (!ok)
          {
             throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
-                  "Timed out waiting to receive initial broadcast from cluster");
+                                       "Timed out waiting to receive initial broadcast from cluster");
          }
       }
 
@@ -532,17 +546,17 @@
             try
             {
                factory = new ClientSessionFactoryImpl(this,
-                     tc,
-                     callTimeout,
-                     clientFailureCheckPeriod,
-                     connectionTTL,
-                     retryInterval,
-                     retryIntervalMultiplier,
-                     maxRetryInterval,
-                     reconnectAttempts,
-                     threadPool,
-                     scheduledThreadPool,
-                     interceptors);
+                                                      tc,
+                                                      callTimeout,
+                                                      clientFailureCheckPeriod,
+                                                      connectionTTL,
+                                                      retryInterval,
+                                                      retryIntervalMultiplier,
+                                                      maxRetryInterval,
+                                                      reconnectAttempts,
+                                                      threadPool,
+                                                      scheduledThreadPool,
+                                                      interceptors);
                factory.connect(initialConnectAttempts, failoverOnInitialConnection);
             }
             catch (HornetQException e)
@@ -556,12 +570,12 @@
                   if (topologyArray != null && attempts == topologyArray.length)
                   {
                      throw new HornetQException(HornetQException.NOT_CONNECTED,
-                           "Cannot connect to server(s). Tried with all available servers.");
+                                                "Cannot connect to server(s). Tried with all available servers.");
                   }
                   if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
                   {
                      throw new HornetQException(HornetQException.NOT_CONNECTED,
-                           "Cannot connect to server(s). Tried with all available servers.");
+                                                "Cannot connect to server(s). Tried with all available servers.");
                   }
                   retry = true;
                }
@@ -599,7 +613,7 @@
             if (toWait <= 0)
             {
                throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
-                     "Timed out waiting to receive cluster topology");
+                                          "Timed out waiting to receive cluster topology");
             }
          }
 
@@ -1008,7 +1022,10 @@
    @Override
    protected void finalize() throws Throwable
    {
-      close();
+      if (finalizeCheck)
+      {
+         close();
+      }
 
       super.finalize();
    }
@@ -1138,7 +1155,8 @@
       {
          for (ClientSessionFactory factory : factories)
          {
-            ((ClientSessionFactoryInternal) factory).setBackupConnector(actMember.getConnector().a, actMember.getConnector().b);
+            ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
+                                                                       actMember.getConnector().b);
          }
       }
 
@@ -1163,8 +1181,8 @@
 
    private void updateArraysAndPairs()
    {
-      topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[]) Array.newInstance(Pair.class,
-            topology.members());
+      topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
+                                                                                                topology.members());
 
       int count = 0;
       for (TopologyMember pair : topology.getMembers())
@@ -1177,7 +1195,8 @@
    {
       List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
 
-      this.initialConnectors = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, newConnectors.size());
+      this.initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+                                                                           newConnectors.size());
 
       int count = 0;
       for (DiscoveryEntry entry : newConnectors)
@@ -1195,7 +1214,7 @@
          }
          catch (Exception e)
          {
-            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
          }
       }
    }
@@ -1239,6 +1258,7 @@
          factories.add(factory);
       }
    }
+
    public static void shutdown()
    {
       if (globalScheduledThreadPool != null)
@@ -1286,7 +1306,7 @@
                try
                {
                   csf = future.get();
-                  if(csf != null)
+                  if (csf != null)
                      break;
                }
                catch (Exception e)
@@ -1317,22 +1337,21 @@
          for (TransportConfiguration initialConnector : initialConnectors)
          {
             ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
-                     initialConnector,
-                     callTimeout,
-                     clientFailureCheckPeriod,
-                     connectionTTL,
-                     retryInterval,
-                     retryIntervalMultiplier,
-                     maxRetryInterval,
-                     reconnectAttempts,
-                     threadPool,
-                     scheduledThreadPool,
-                     interceptors);
+                                                                                initialConnector,
+                                                                                callTimeout,
+                                                                                clientFailureCheckPeriod,
+                                                                                connectionTTL,
+                                                                                retryInterval,
+                                                                                retryIntervalMultiplier,
+                                                                                maxRetryInterval,
+                                                                                reconnectAttempts,
+                                                                                threadPool,
+                                                                                scheduledThreadPool,
+                                                                                interceptors);
             connectors.add(new Connector(initialConnector, factory));
          }
       }
 
-
       public synchronized void disconnect()
       {
          if (connectors != null)
@@ -1344,14 +1363,19 @@
          }
       }
 
-       public void finalize() throws Throwable
+      public void finalize() throws Throwable
       {
-         if (!closed)
+         if (!closed && finalizeCheck)
          {
             log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
-                                       System.identityHashCode(this));
+                     System.identityHashCode(this));
 
             log.warn("The ServerLocator you didn't close was created here:", e);
+            
+            if (ServerLocatorImpl.finalizeCallback != null)
+            {
+               ServerLocatorImpl.finalizeCallback.run();
+            }
 
             close();
          }
@@ -1362,9 +1386,13 @@
       class Connector implements Callable<ClientSessionFactory>
       {
          private TransportConfiguration initialConnector;
+
          private volatile ClientSessionFactoryInternal factory;
+
          private boolean isConnected = false;
+
          private boolean interrupted = false;
+
          private Exception e;
 
          public Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)

Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java	2010-12-17 15:54:31 UTC (rev 10052)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java	2010-12-17 18:59:02 UTC (rev 10053)
@@ -75,6 +75,8 @@
    public HornetQConnectionFactory(final ServerLocator serverLocator)
    {
       this.serverLocator = serverLocator;
+      
+      serverLocator.disableFinalizeCheck();
    }
 
    public HornetQConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
@@ -87,6 +89,8 @@
       {
          serverLocator = HornetQClient.createServerLocatorWithoutHA(groupConfiguration);
       }
+      
+      serverLocator.disableFinalizeCheck();
    }
 
    public HornetQConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors)
@@ -99,6 +103,8 @@
       {
          serverLocator = HornetQClient.createServerLocatorWithoutHA(initialConnectors);
       }
+      
+      serverLocator.disableFinalizeCheck();
    }
 
    // ConnectionFactory implementation -------------------------------------------------------------
@@ -709,6 +715,7 @@
       }
       catch (Exception e)
       {
+         e.printStackTrace();
          //not much we can do here
       }
       super.finalize();

Added: trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java	2010-12-17 18:59:02 UTC (rev 10053)
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.jms.connection;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+import java.io.StringReader;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.JMSFactoryType;
+import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ * 
+ * A CloseConnectionOnGCTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class CloseConnectionFactoryOnGCest extends JMSTestBase
+{
+   private static final Logger log = Logger.getLogger(CloseConnectionFactoryOnGCest.class);
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   public void testCloseCFOnGC() throws Exception
+   {
+
+      final AtomicInteger valueGC = new AtomicInteger(0);
+
+      ServerLocatorImpl.finalizeCallback = new Runnable()
+      {
+         public void run()
+         {
+            valueGC.incrementAndGet();
+         }
+      };
+
+      try
+      {
+         // System.setOut(out);
+         for (int i = 0; i < 100; i++)
+         {
+            HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
+                                                                                            new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+            Connection conn = cf.createConnection();
+            cf = null;
+            conn.close();
+            conn = null;
+         }
+         forceGC();
+      }
+      finally
+      {
+         ServerLocatorImpl.finalizeCallback = null;
+      }
+
+      assertEquals("The code is throwing exceptions", 0, valueGC.get());
+
+   }
+}



More information about the hornetq-commits mailing list