Author: clebert.suconic(a)jboss.com
Date: 2010-12-17 13:59:02 -0500 (Fri, 17 Dec 2010)
New Revision: 10053
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java
Modified:
trunk/src/main/org/hornetq/api/core/client/ServerLocator.java
trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
Log:
JBPAPP-5521 - removing warning
Modified: trunk/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-12-17 15:54:31 UTC
(rev 10052)
+++ trunk/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-12-17 18:59:02 UTC
(rev 10053)
@@ -28,7 +28,16 @@
*/
public interface ServerLocator
{
+
/**
+ * This method will disable any checks when a GarbageCollection happens leaving
connections open.
+ * The JMS Layer will make specific usage of this method, since the
ConnectionFactory.finalize should release this.
+ *
+ * Warn: You may leave resources unnatended if you call this method and don't take
care of cleaning the resources yourself.
+ */
+ void disableFinalizeCheck();
+
+ /**
* Create a ClientSessionFactory using whatever load balancing policy is in force
* @return The ClientSessionFactory
* @throws Exception
Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-12-17 15:54:31
UTC (rev 10052)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-12-17 18:59:02
UTC (rev 10053)
@@ -51,6 +51,8 @@
private final boolean ha;
+ private boolean finalizeCheck = true;
+
private boolean clusterConnection;
private final Set<ClusterTopologyListener> topologyListeners = new
HashSet<ClusterTopologyListener>();
@@ -68,7 +70,7 @@
private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
private boolean receivedTopology;
-
+
private boolean compressLargeMessage;
private ExecutorService threadPool;
@@ -156,6 +158,10 @@
private boolean backup;
private final Exception e = new Exception();
+
+ // To be called when there are ServerLocator being finalized.
+ // To be used on test assertions
+ public static Runnable finalizeCallback = null;
private static synchronized ExecutorService getGlobalThreadPool()
{
@@ -174,12 +180,12 @@
if (globalScheduledThreadPool == null)
{
ThreadFactory factory = new
HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
- true,
- getThisClassLoader());
+ true,
+ getThisClassLoader());
globalScheduledThreadPool =
Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- factory);
+ factory);
}
return globalScheduledThreadPool;
@@ -196,8 +202,8 @@
else
{
ThreadFactory factory = new
HornetQThreadFactory("HornetQ-client-factory-threads-" +
System.identityHashCode(this),
- true,
- getThisClassLoader());
+ true,
+ getThisClassLoader());
if (threadPoolMaxSize == -1)
{
@@ -209,8 +215,8 @@
}
factory = new
HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" +
System.identityHashCode(this),
- true,
- getThisClassLoader());
+ true,
+ getThisClassLoader());
scheduledThreadPool =
Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
}
@@ -243,14 +249,14 @@
try
{
Class<?> clazz =
loader.loadClass(connectionLoadBalancingPolicyClassName);
- loadBalancingPolicy = (ConnectionLoadBalancingPolicy)
clazz.newInstance();
+ loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
return null;
}
catch (Exception e)
{
throw new IllegalArgumentException("Unable to instantiate load
balancing policy \"" + connectionLoadBalancingPolicyClassName +
- "\"",
- e);
+ "\"",
+ e);
}
}
});
@@ -280,11 +286,11 @@
}
discoveryGroup = new DiscoveryGroupImpl(nodeID,
- discoveryGroupConfiguration.getName(),
- lbAddress,
- groupAddress,
- discoveryGroupConfiguration.getGroupPort(),
- discoveryGroupConfiguration.getRefreshTimeout());
+
discoveryGroupConfiguration.getName(),
+ lbAddress,
+ groupAddress,
+
discoveryGroupConfiguration.getGroupPort(),
+
discoveryGroupConfiguration.getRefreshTimeout());
discoveryGroup.registerListener(this);
@@ -363,7 +369,7 @@
initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
+
compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
clusterConnection = false;
@@ -424,7 +430,7 @@
}
catch (Exception e)
{
- if(!closing)
+ if (!closing)
{
log.warn("did not connect the cluster connection to other
nodes", e);
}
@@ -433,18 +439,26 @@
});
}
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
+ */
+ public void disableFinalizeCheck()
+ {
+ finalizeCheck = false;
+ }
+
public ClientSessionFactory connect() throws Exception
{
ClientSessionFactoryInternal sf;
// static list of initial connectors
if (initialConnectors != null && discoveryGroup == null)
{
- sf = (ClientSessionFactoryInternal) staticConnector.connect();
+ sf = (ClientSessionFactoryInternal)staticConnector.connect();
}
// wait for discovery group to get the list of initial connectors
else
{
- sf = (ClientSessionFactoryInternal) createSessionFactory();
+ sf = (ClientSessionFactoryInternal)createSessionFactory();
}
addFactory(sf);
return sf;
@@ -467,17 +481,17 @@
}
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
- transportConfiguration,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
+
transportConfiguration,
+ callTimeout,
+
clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+
retryIntervalMultiplier,
+
maxRetryInterval,
+
reconnectAttempts,
+ threadPool,
+
scheduledThreadPool,
+ interceptors);
factory.connect(reconnectAttempts, failoverOnInitialConnection);
@@ -505,13 +519,13 @@
if (initialConnectors == null && discoveryGroup != null)
{
// Wait for an initial broadcast to give us at least one node in the cluster
- long timeout =
clusterConnection?0:discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
+ long timeout = clusterConnection ? 0 :
discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
boolean ok = discoveryGroup.waitForBroadcast(timeout);
if (!ok)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive initial broadcast from
cluster");
+ "Timed out waiting to receive initial
broadcast from cluster");
}
}
@@ -532,17 +546,17 @@
try
{
factory = new ClientSessionFactoryImpl(this,
- tc,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
+ tc,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
factory.connect(initialConnectAttempts, failoverOnInitialConnection);
}
catch (HornetQException e)
@@ -556,12 +570,12 @@
if (topologyArray != null && attempts == topologyArray.length)
{
throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried with all available
servers.");
+ "Cannot connect to server(s). Tried
with all available servers.");
}
if (topologyArray == null && initialConnectors != null
&& attempts == initialConnectors.length)
{
throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried with all available
servers.");
+ "Cannot connect to server(s). Tried
with all available servers.");
}
retry = true;
}
@@ -599,7 +613,7 @@
if (toWait <= 0)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive cluster topology");
+ "Timed out waiting to receive cluster
topology");
}
}
@@ -1008,7 +1022,10 @@
@Override
protected void finalize() throws Throwable
{
- close();
+ if (finalizeCheck)
+ {
+ close();
+ }
super.finalize();
}
@@ -1138,7 +1155,8 @@
{
for (ClientSessionFactory factory : factories)
{
- ((ClientSessionFactoryInternal)
factory).setBackupConnector(actMember.getConnector().a, actMember.getConnector().b);
+
((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
+
actMember.getConnector().b);
}
}
@@ -1163,8 +1181,8 @@
private void updateArraysAndPairs()
{
- topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])
Array.newInstance(Pair.class,
- topology.members());
+ topologyArray = (Pair<TransportConfiguration,
TransportConfiguration>[])Array.newInstance(Pair.class,
+
topology.members());
int count = 0;
for (TopologyMember pair : topology.getMembers())
@@ -1177,7 +1195,8 @@
{
List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
- this.initialConnectors = (TransportConfiguration[])
Array.newInstance(TransportConfiguration.class, newConnectors.size());
+ this.initialConnectors =
(TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+
newConnectors.size());
int count = 0;
for (DiscoveryEntry entry : newConnectors)
@@ -1195,7 +1214,7 @@
}
catch (Exception e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings
| File Templates.
+ e.printStackTrace(); // To change body of catch statement use File | Settings
| File Templates.
}
}
}
@@ -1239,6 +1258,7 @@
factories.add(factory);
}
}
+
public static void shutdown()
{
if (globalScheduledThreadPool != null)
@@ -1286,7 +1306,7 @@
try
{
csf = future.get();
- if(csf != null)
+ if (csf != null)
break;
}
catch (Exception e)
@@ -1317,22 +1337,21 @@
for (TransportConfiguration initialConnector : initialConnectors)
{
ClientSessionFactoryInternal factory = new
ClientSessionFactoryImpl(ServerLocatorImpl.this,
- initialConnector,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
+
initialConnector,
+
callTimeout,
+
clientFailureCheckPeriod,
+
connectionTTL,
+
retryInterval,
+
retryIntervalMultiplier,
+
maxRetryInterval,
+
reconnectAttempts,
+
threadPool,
+
scheduledThreadPool,
+
interceptors);
connectors.add(new Connector(initialConnector, factory));
}
}
-
public synchronized void disconnect()
{
if (connectors != null)
@@ -1344,14 +1363,19 @@
}
}
- public void finalize() throws Throwable
+ public void finalize() throws Throwable
{
- if (!closed)
+ if (!closed && finalizeCheck)
{
log.warn("I'm closing a core ServerLocator you left open. Please
make sure you close all ServerLocators explicitly " + "before letting them go
out of scope! " +
- System.identityHashCode(this));
+ System.identityHashCode(this));
log.warn("The ServerLocator you didn't close was created
here:", e);
+
+ if (ServerLocatorImpl.finalizeCallback != null)
+ {
+ ServerLocatorImpl.finalizeCallback.run();
+ }
close();
}
@@ -1362,9 +1386,13 @@
class Connector implements Callable<ClientSessionFactory>
{
private TransportConfiguration initialConnector;
+
private volatile ClientSessionFactoryInternal factory;
+
private boolean isConnected = false;
+
private boolean interrupted = false;
+
private Exception e;
public Connector(TransportConfiguration initialConnector,
ClientSessionFactoryInternal factory)
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-12-17
15:54:31 UTC (rev 10052)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-12-17
18:59:02 UTC (rev 10053)
@@ -75,6 +75,8 @@
public HornetQConnectionFactory(final ServerLocator serverLocator)
{
this.serverLocator = serverLocator;
+
+ serverLocator.disableFinalizeCheck();
}
public HornetQConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration
groupConfiguration)
@@ -87,6 +89,8 @@
{
serverLocator = HornetQClient.createServerLocatorWithoutHA(groupConfiguration);
}
+
+ serverLocator.disableFinalizeCheck();
}
public HornetQConnectionFactory(final boolean ha, final TransportConfiguration...
initialConnectors)
@@ -99,6 +103,8 @@
{
serverLocator = HornetQClient.createServerLocatorWithoutHA(initialConnectors);
}
+
+ serverLocator.disableFinalizeCheck();
}
// ConnectionFactory implementation
-------------------------------------------------------------
@@ -709,6 +715,7 @@
}
catch (Exception e)
{
+ e.printStackTrace();
//not much we can do here
}
super.finalize();
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java 2010-12-17
18:59:02 UTC (rev 10053)
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.jms.connection;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+import java.io.StringReader;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.JMSFactoryType;
+import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ *
+ * A CloseConnectionOnGCTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class CloseConnectionFactoryOnGCest extends JMSTestBase
+{
+ private static final Logger log =
Logger.getLogger(CloseConnectionFactoryOnGCest.class);
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public void testCloseCFOnGC() throws Exception
+ {
+
+ final AtomicInteger valueGC = new AtomicInteger(0);
+
+ ServerLocatorImpl.finalizeCallback = new Runnable()
+ {
+ public void run()
+ {
+ valueGC.incrementAndGet();
+ }
+ };
+
+ try
+ {
+ // System.setOut(out);
+ for (int i = 0; i < 100; i++)
+ {
+ HornetQConnectionFactory cf =
HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
+
new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ Connection conn = cf.createConnection();
+ cf = null;
+ conn.close();
+ conn = null;
+ }
+ forceGC();
+ }
+ finally
+ {
+ ServerLocatorImpl.finalizeCallback = null;
+ }
+
+ assertEquals("The code is throwing exceptions", 0, valueGC.get());
+
+ }
+}