Author: jmesnil
Date: 2009-11-06 10:32:46 -0500 (Fri, 06 Nov 2009)
New Revision: 8241
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
Log:
fixed HornetQConnectionFactory code + test
* added checkWrite() to every HornetQConnectionFactory setter methods now that the
ClientSessionFactory
is copied for each created connection
* in ClientSessionFactory.close(), checked pool nullability before shutting them down
(they are null
if useGlobalPools is false and no session was created)
* fixed HornetQConnectionFactoryTest & CloseConnectionOnGCTest assertions
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-11-06
14:34:10 UTC (rev 8240)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-11-06
15:32:46 UTC (rev 8241)
@@ -917,31 +917,37 @@
if (!useGlobalPools)
{
- threadPool.shutdown();
+ if (threadPool != null)
+ {
+ threadPool.shutdown();
- try
- {
- if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ try
{
- log.warn("Timed out waiting for pool to terminate");
+ if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ {
+ log.warn("Timed out waiting for pool to terminate");
+ }
}
+ catch (InterruptedException ignore)
+ {
+ }
}
- catch (InterruptedException ignore)
+
+ if (scheduledThreadPool != null)
{
- }
+ scheduledThreadPool.shutdown();
- scheduledThreadPool.shutdown();
-
- try
- {
- if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ try
{
- log.warn("Timed out waiting for scheduled pool to terminate");
+ if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ {
+ log.warn("Timed out waiting for scheduled pool to
terminate");
+ }
}
+ catch (InterruptedException ignore)
+ {
+ }
}
- catch (InterruptedException ignore)
- {
- }
}
closed = true;
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-11-06
14:34:10 UTC (rev 8240)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-11-06
15:32:46 UTC (rev 8241)
@@ -193,6 +193,7 @@
public synchronized void setConnectionLoadBalancingPolicyClassName(String
connectionLoadBalancingPolicyClassName)
{
+ checkWrite();
sessionFactory.setConnectionLoadBalancingPolicyClassName(connectionLoadBalancingPolicyClassName);
}
@@ -203,6 +204,7 @@
public synchronized void setStaticConnectors(List<Pair<TransportConfiguration,
TransportConfiguration>> staticConnectors)
{
+ checkWrite();
sessionFactory.setStaticConnectors(staticConnectors);
}
@@ -213,6 +215,7 @@
public synchronized void setDiscoveryAddress(String discoveryAddress)
{
+ checkWrite();
sessionFactory.setDiscoveryAddress(discoveryAddress);
}
@@ -223,6 +226,7 @@
public synchronized void setDiscoveryPort(int discoveryPort)
{
+ checkWrite();
sessionFactory.setDiscoveryPort(discoveryPort);
}
@@ -233,6 +237,7 @@
public synchronized void setDiscoveryRefreshTimeout(long discoveryRefreshTimeout)
{
+ checkWrite();
sessionFactory.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
}
@@ -243,6 +248,7 @@
public synchronized void setDiscoveryInitialWaitTimeout(long
discoveryInitialWaitTimeout)
{
+ checkWrite();
sessionFactory.setDiscoveryInitialWaitTimeout(discoveryInitialWaitTimeout);
}
@@ -286,6 +292,7 @@
public synchronized void setClientFailureCheckPeriod(long clientFailureCheckPeriod)
{
+ checkWrite();
sessionFactory.setClientFailureCheckPeriod(clientFailureCheckPeriod);
}
@@ -296,6 +303,7 @@
public synchronized void setConnectionTTL(long connectionTTL)
{
+ checkWrite();
sessionFactory.setConnectionTTL(connectionTTL);
}
@@ -306,6 +314,7 @@
public synchronized void setCallTimeout(long callTimeout)
{
+ checkWrite();
sessionFactory.setCallTimeout(callTimeout);
}
@@ -316,6 +325,7 @@
public synchronized void setConsumerWindowSize(int consumerWindowSize)
{
+ checkWrite();
sessionFactory.setConsumerWindowSize(consumerWindowSize);
}
@@ -326,6 +336,7 @@
public synchronized void setConsumerMaxRate(int consumerMaxRate)
{
+ checkWrite();
sessionFactory.setConsumerMaxRate(consumerMaxRate);
}
@@ -336,6 +347,7 @@
public synchronized void setConfirmationWindowSize(int confirmationWindowSize)
{
+ checkWrite();
sessionFactory.setConfirmationWindowSize(confirmationWindowSize);
}
@@ -346,6 +358,7 @@
public synchronized void setProducerMaxRate(int producerMaxRate)
{
+ checkWrite();
sessionFactory.setProducerMaxRate(producerMaxRate);
}
@@ -354,6 +367,7 @@
*/
public synchronized void setCacheLargeMessagesClient(boolean
cacheLargeMessagesClient)
{
+ checkWrite();
sessionFactory.setCacheLargeMessagesClient(cacheLargeMessagesClient);
}
@@ -369,6 +383,7 @@
public synchronized void setMinLargeMessageSize(int minLargeMessageSize)
{
+ checkWrite();
sessionFactory.setMinLargeMessageSize(minLargeMessageSize);
}
@@ -379,6 +394,7 @@
public synchronized void setBlockOnAcknowledge(boolean blockOnAcknowledge)
{
+ checkWrite();
sessionFactory.setBlockOnAcknowledge(blockOnAcknowledge);
}
@@ -389,6 +405,7 @@
public synchronized void setBlockOnNonPersistentSend(boolean
blockOnNonPersistentSend)
{
+ checkWrite();
sessionFactory.setBlockOnNonPersistentSend(blockOnNonPersistentSend);
}
@@ -399,6 +416,7 @@
public synchronized void setBlockOnPersistentSend(boolean blockOnPersistentSend)
{
+ checkWrite();
sessionFactory.setBlockOnPersistentSend(blockOnPersistentSend);
}
@@ -409,6 +427,7 @@
public synchronized void setAutoGroup(boolean autoGroup)
{
+ checkWrite();
sessionFactory.setAutoGroup(autoGroup);
}
@@ -419,6 +438,7 @@
public synchronized void setPreAcknowledge(boolean preAcknowledge)
{
+ checkWrite();
sessionFactory.setPreAcknowledge(preAcknowledge);
}
@@ -429,6 +449,7 @@
public synchronized void setRetryInterval(long retryInterval)
{
+ checkWrite();
sessionFactory.setRetryInterval(retryInterval);
}
@@ -439,6 +460,7 @@
public synchronized void setMaxRetryInterval(long retryInterval)
{
+ checkWrite();
sessionFactory.setMaxRetryInterval(retryInterval);
}
@@ -449,6 +471,7 @@
public synchronized void setRetryIntervalMultiplier(double retryIntervalMultiplier)
{
+ checkWrite();
sessionFactory.setRetryIntervalMultiplier(retryIntervalMultiplier);
}
@@ -459,6 +482,7 @@
public synchronized void setReconnectAttempts(int reconnectAttempts)
{
+ checkWrite();
sessionFactory.setReconnectAttempts(reconnectAttempts);
}
@@ -469,6 +493,7 @@
public synchronized void setFailoverOnServerShutdown(boolean
failoverOnServerShutdown)
{
+ checkWrite();
sessionFactory.setFailoverOnServerShutdown(failoverOnServerShutdown);
}
@@ -479,6 +504,7 @@
public synchronized void setUseGlobalPools(boolean useGlobalPools)
{
+ checkWrite();
sessionFactory.setUseGlobalPools(useGlobalPools);
}
@@ -489,6 +515,7 @@
public synchronized void setScheduledThreadPoolMaxSize(int
scheduledThreadPoolMaxSize)
{
+ checkWrite();
sessionFactory.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
}
@@ -499,6 +526,7 @@
public synchronized void setThreadPoolMaxSize(int threadPoolMaxSize)
{
+ checkWrite();
sessionFactory.setThreadPoolMaxSize(threadPoolMaxSize);
}
@@ -560,7 +588,7 @@
{
if (readOnly)
{
- throw new IllegalStateException("Cannot set attribute on
HornetQRAConnectionFactory after it has been used");
+ throw new IllegalStateException("Cannot set attribute on
HornetQConnectionFactory after it has been used");
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2009-11-06
14:34:10 UTC (rev 8240)
+++
trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2009-11-06
15:32:46 UTC (rev 8241)
@@ -105,6 +105,13 @@
assertNotNull(csi);
}
+ public void testCloseUnusedClientSessionFactoryWithoutGlobalPools() throws Exception
+ {
+ ClientSessionFactoryImpl csf = new ClientSessionFactoryImpl();
+ csf.setUseGlobalPools(false);
+ csf.close();
+ }
+
public void testDefaultConstructor() throws Exception
{
try
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2009-11-06
14:34:10 UTC (rev 8240)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2009-11-06
15:32:46 UTC (rev 8241)
@@ -117,21 +117,58 @@
log.info("Got here");
+ testSettersThrowException(cf);
+ }
+
+ public void testDefaultConstructorAndSetConnectorPairs() throws Exception
+ {
+ HornetQConnectionFactory cf = new HornetQConnectionFactory();
final List<Pair<TransportConfiguration, TransportConfiguration>>
staticConnectors = new ArrayList<Pair<TransportConfiguration,
TransportConfiguration>>();
Pair<TransportConfiguration, TransportConfiguration> pair0 = new
Pair<TransportConfiguration, TransportConfiguration>(this.liveTC,
this.backupTC);
staticConnectors.add(pair0);
cf.setStaticConnectors(staticConnectors);
+
+ assertFactoryParams(cf,
+ staticConnectors,
+ null,
+ 0,
+ ClientSessionFactoryImpl.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
+ null,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
+ ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
+ ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE,
+
ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE,
+
ClientSessionFactoryImpl.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT,
+ ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS,
+
ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL,
+ ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS,
+ ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
- conn = cf.createConnection();
+ Connection conn = cf.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
testSettersThrowException(cf);
-
+
conn.close();
}
-
+
public void testDiscoveryConstructor() throws Exception
{
HornetQConnectionFactory cf = new HornetQConnectionFactory(groupAddress,
groupPort);
@@ -319,12 +356,13 @@
public void testGettersAndSetters()
{
- ClientSessionFactory cf = new ClientSessionFactoryImpl();
+ ClientSessionFactory csf = new ClientSessionFactoryImpl();
List<Pair<TransportConfiguration, TransportConfiguration>>
staticConnectors = new ArrayList<Pair<TransportConfiguration,
TransportConfiguration>>();
Pair<TransportConfiguration, TransportConfiguration> pair0 = new
Pair<TransportConfiguration, TransportConfiguration>(this.liveTC,
this.backupTC);
staticConnectors.add(pair0);
+ HornetQConnectionFactory cf = new HornetQConnectionFactory(csf);
String discoveryAddress = randomString();
int discoveryPort = RandomUtil.randomPositiveInt();
@@ -343,7 +381,6 @@
boolean autoGroup = RandomUtil.randomBoolean();
boolean preAcknowledge = RandomUtil.randomBoolean();
String loadBalancingPolicyClassName = RandomUtil.randomString();
- int ackBatchSize = RandomUtil.randomPositiveInt();
long initialWaitTimeout = RandomUtil.randomPositiveLong();
boolean useGlobalPools = RandomUtil.randomBoolean();
int scheduledThreadPoolMaxSize = RandomUtil.randomPositiveInt();
@@ -371,7 +408,6 @@
cf.setAutoGroup(autoGroup);
cf.setPreAcknowledge(preAcknowledge);
cf.setConnectionLoadBalancingPolicyClassName(loadBalancingPolicyClassName);
- cf.setAckBatchSize(ackBatchSize);
cf.setDiscoveryInitialWaitTimeout(initialWaitTimeout);
cf.setUseGlobalPools(useGlobalPools);
cf.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
@@ -399,7 +435,6 @@
assertEquals(autoGroup, cf.isAutoGroup());
assertEquals(preAcknowledge, cf.isPreAcknowledge());
assertEquals(loadBalancingPolicyClassName,
cf.getConnectionLoadBalancingPolicyClassName());
- assertEquals(ackBatchSize, cf.getAckBatchSize());
assertEquals(initialWaitTimeout, cf.getDiscoveryInitialWaitTimeout());
assertEquals(useGlobalPools, cf.isUseGlobalPools());
assertEquals(scheduledThreadPoolMaxSize, cf.getScheduledThreadPoolMaxSize());
@@ -409,6 +444,7 @@
assertEquals(reconnectAttempts, cf.getReconnectAttempts());
assertEquals(failoverOnServerShutdown, cf.isFailoverOnServerShutdown());
+ cf.close();
}
private void testSettersThrowException(HornetQConnectionFactory cf)
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-11-06
14:34:10 UTC (rev 8240)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-11-06
15:32:46 UTC (rev 8241)
@@ -54,16 +54,6 @@
super.tearDown();
}
- public void testFoo() throws Exception
- {
- for (int i = 0; i < 100; i++)
- {
- log.info("Iteration " + i);
-
- testCloseOneConnectionOnGC();
- }
- }
-
public void testCloseOneConnectionOnGC() throws Exception
{
//Debug - don't remove this until intermittent failure with this test is fixed
@@ -95,7 +85,7 @@
WeakReference<Connection> wr2 = new WeakReference<Connection>(conn2);
WeakReference<Connection> wr3 = new WeakReference<Connection>(conn3);
- assertEquals(1, server.getRemotingService().getConnections().size());
+ assertEquals(3, server.getRemotingService().getConnections().size());
conn1 = null;
conn2 = null;