Author: timfox
Date: 2009-11-06 05:34:51 -0500 (Fri, 06 Nov 2009)
New Revision: 8236
Removed:
trunk/tests/jms-tests/src/org/hornetq/jms/tests/stress/ConnectionConsumerStressTest.java
Modified:
trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
Log:
refactored so each jms connection maintains it's own session factory
Modified: trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java 2009-11-06 10:25:52
UTC (rev 8235)
+++ trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java 2009-11-06 10:34:51
UTC (rev 8236)
@@ -175,4 +175,6 @@
boolean removeInterceptor(Interceptor interceptor);
void close();
+
+ ClientSessionFactory copy();
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-11-06
10:25:52 UTC (rev 8235)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-11-06
10:34:51 UTC (rev 8236)
@@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.ConnectionLoadBalancingPolicy;
import org.hornetq.core.cluster.DiscoveryEntry;
import org.hornetq.core.cluster.DiscoveryGroup;
@@ -313,6 +314,67 @@
// Constructors
// ---------------------------------------------------------------------------------
+ public ClientSessionFactoryImpl(final ClientSessionFactory other)
+ {
+ discoveryAddress = other.getDiscoveryAddress();
+
+ discoveryPort = other.getDiscoveryPort();
+
+ staticConnectors = other.getStaticConnectors();
+
+ discoveryRefreshTimeout = other.getDiscoveryRefreshTimeout();
+
+ clientFailureCheckPeriod = other.getClientFailureCheckPeriod();
+
+ connectionTTL = other.getConnectionTTL();
+
+ callTimeout = other.getCallTimeout();
+
+ minLargeMessageSize = other.getMinLargeMessageSize();
+
+ consumerWindowSize = other.getConsumerWindowSize();
+
+ consumerMaxRate = other.getConsumerMaxRate();
+
+ confirmationWindowSize = other.getConfirmationWindowSize();
+
+ producerWindowSize = other.getProducerWindowSize();
+
+ producerMaxRate = other.getProducerMaxRate();
+
+ blockOnAcknowledge = other.isBlockOnAcknowledge();
+
+ blockOnPersistentSend = other.isBlockOnPersistentSend();
+
+ blockOnNonPersistentSend = other.isBlockOnNonPersistentSend();
+
+ autoGroup = other.isAutoGroup();
+
+ preAcknowledge = other.isPreAcknowledge();
+
+ ackBatchSize = other.getAckBatchSize();
+
+ connectionLoadBalancingPolicyClassName =
other.getConnectionLoadBalancingPolicyClassName();
+
+ discoveryInitialWaitTimeout = other.getDiscoveryInitialWaitTimeout();
+
+ useGlobalPools = other.isUseGlobalPools();
+
+ scheduledThreadPoolMaxSize = other.getScheduledThreadPoolMaxSize();
+
+ threadPoolMaxSize = other.getThreadPoolMaxSize();
+
+ retryInterval = other.getRetryInterval();
+
+ retryIntervalMultiplier = other.getRetryIntervalMultiplier();
+
+ maxRetryInterval = other.getMaxRetryInterval();
+
+ reconnectAttempts = other.getReconnectAttempts();
+
+ failoverOnServerShutdown = other.isFailoverOnServerShutdown();
+ }
+
public ClientSessionFactoryImpl()
{
discoveryRefreshTimeout = DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
@@ -884,7 +946,12 @@
closed = true;
}
-
+
+ public ClientSessionFactory copy()
+ {
+ return new ClientSessionFactoryImpl(this);
+ }
+
// DiscoveryListener implementation
--------------------------------------------------------
public synchronized void connectorsChanged()
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2009-11-06 10:25:52 UTC
(rev 8235)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2009-11-06 10:34:51 UTC
(rev 8236)
@@ -281,6 +281,8 @@
initialSession.close();
}
}
+
+ sessionFactory.close();
closed = true;
}
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-11-06
10:25:52 UTC (rev 8235)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-11-06
10:34:51 UTC (rev 8236)
@@ -11,7 +11,6 @@
* permissions and limitations under the License.
*/
-
package org.hornetq.jms.client;
import java.io.Serializable;
@@ -48,7 +47,7 @@
* @version <tt>$Revision$</tt> $Id$
*/
public class HornetQConnectionFactory implements ConnectionFactory,
QueueConnectionFactory, TopicConnectionFactory,
- XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory,
Serializable, Referenceable
+ XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory,
Serializable, Referenceable
{
// Constants
------------------------------------------------------------------------------------
@@ -93,7 +92,7 @@
}
public HornetQConnectionFactory(final TransportConfiguration connectorConfig,
- final TransportConfiguration backupConnectorConfig)
+ final TransportConfiguration backupConnectorConfig)
{
sessionFactory = new ClientSessionFactoryImpl(connectorConfig,
backupConnectorConfig);
}
@@ -432,7 +431,7 @@
{
sessionFactory.setRetryInterval(retryInterval);
}
-
+
public synchronized long getMaxRetryInterval()
{
return sessionFactory.getMaxRetryInterval();
@@ -462,7 +461,7 @@
{
sessionFactory.setReconnectAttempts(reconnectAttempts);
}
-
+
public synchronized boolean isFailoverOnServerShutdown()
{
return sessionFactory.isFailoverOnServerShutdown();
@@ -518,24 +517,28 @@
// Protected
------------------------------------------------------------------------------------
protected synchronized HornetQConnection createConnectionInternal(final String
username,
- final String
password,
- final boolean isXA,
- final int type) throws
JMSException
+ final String
password,
+ final boolean isXA,
+ final int type)
throws JMSException
{
readOnly = true;
+
+ //Note that each JMS connection gets it's own copy of the connection factory
+ //This means there is one underlying remoting connection per jms connection (if not
load balanced)
+ ClientSessionFactory factory = sessionFactory.copy();
HornetQConnection connection = new HornetQConnection(username,
- password,
- type,
- clientID,
- dupsOKBatchSize,
- transactionBatchSize,
- sessionFactory);
+ password,
+ type,
+ clientID,
+ dupsOKBatchSize,
+ transactionBatchSize,
+ factory);
-
- try {
+ try
+ {
connection.authorize();
- }
+ }
catch (JMSException e)
{
try
Deleted:
trunk/tests/jms-tests/src/org/hornetq/jms/tests/stress/ConnectionConsumerStressTest.java
===================================================================
---
trunk/tests/jms-tests/src/org/hornetq/jms/tests/stress/ConnectionConsumerStressTest.java 2009-11-06
10:25:52 UTC (rev 8235)
+++
trunk/tests/jms-tests/src/org/hornetq/jms/tests/stress/ConnectionConsumerStressTest.java 2009-11-06
10:34:51 UTC (rev 8236)
@@ -1,54 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.tests.stress;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-
-/**
- *
- * A ConnectionConsumerStressTest.
- *
- * @author <a href="tim.fox(a)jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 2349 $</tt>
- *
- * $Id: StressTest.java 2349 2007-02-19 14:15:53Z timfox $
- */
-public class ConnectionConsumerStressTest extends JMSStressTestBase
-{
-
- public void testConnectionConsumer() throws Exception
- {
- Connection conn = cf.createConnection();
- conn.start();
-
- Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Session sessReceive = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sessSend.createProducer(queue1);
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- Runner[] runners = new Runner[] { new Sender("prod1", sessSend, prod,
100000),
- new Receiver(conn, sessReceive, 100000, queue1)
};
-
- runRunners(runners);
-
- conn.close();
- }
-
-}