[hornetq-commits] JBoss hornetq SVN: r8236 - in trunk: src/main/org/hornetq/core/client/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 6 05:34:51 EST 2009


Author: timfox
Date: 2009-11-06 05:34:51 -0500 (Fri, 06 Nov 2009)
New Revision: 8236

Removed:
   trunk/tests/jms-tests/src/org/hornetq/jms/tests/stress/ConnectionConsumerStressTest.java
Modified:
   trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
   trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
Log:
refactored so each jms connection maintains it's own session factory

Modified: trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java	2009-11-06 10:25:52 UTC (rev 8235)
+++ trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java	2009-11-06 10:34:51 UTC (rev 8236)
@@ -175,4 +175,6 @@
    boolean removeInterceptor(Interceptor interceptor);
 
    void close();
+   
+   ClientSessionFactory copy();
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2009-11-06 10:25:52 UTC (rev 8235)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2009-11-06 10:34:51 UTC (rev 8236)
@@ -29,6 +29,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
 import org.hornetq.core.client.ConnectionLoadBalancingPolicy;
 import org.hornetq.core.cluster.DiscoveryEntry;
 import org.hornetq.core.cluster.DiscoveryGroup;
@@ -313,6 +314,67 @@
    // Constructors
    // ---------------------------------------------------------------------------------
 
+   public ClientSessionFactoryImpl(final ClientSessionFactory other)
+   {
+      discoveryAddress = other.getDiscoveryAddress();
+      
+      discoveryPort = other.getDiscoveryPort();
+      
+      staticConnectors = other.getStaticConnectors();
+      
+      discoveryRefreshTimeout = other.getDiscoveryRefreshTimeout();
+
+      clientFailureCheckPeriod = other.getClientFailureCheckPeriod();
+
+      connectionTTL = other.getConnectionTTL();
+
+      callTimeout = other.getCallTimeout();
+
+      minLargeMessageSize = other.getMinLargeMessageSize();
+
+      consumerWindowSize = other.getConsumerWindowSize();
+
+      consumerMaxRate = other.getConsumerMaxRate();
+
+      confirmationWindowSize = other.getConfirmationWindowSize();
+      
+      producerWindowSize = other.getProducerWindowSize();
+
+      producerMaxRate = other.getProducerMaxRate();
+
+      blockOnAcknowledge = other.isBlockOnAcknowledge();
+
+      blockOnPersistentSend = other.isBlockOnPersistentSend();
+
+      blockOnNonPersistentSend = other.isBlockOnNonPersistentSend();
+
+      autoGroup = other.isAutoGroup();
+
+      preAcknowledge = other.isPreAcknowledge();
+
+      ackBatchSize = other.getAckBatchSize();
+
+      connectionLoadBalancingPolicyClassName = other.getConnectionLoadBalancingPolicyClassName();
+
+      discoveryInitialWaitTimeout = other.getDiscoveryInitialWaitTimeout();
+
+      useGlobalPools = other.isUseGlobalPools();
+
+      scheduledThreadPoolMaxSize = other.getScheduledThreadPoolMaxSize();
+
+      threadPoolMaxSize = other.getThreadPoolMaxSize();
+
+      retryInterval = other.getRetryInterval();
+
+      retryIntervalMultiplier = other.getRetryIntervalMultiplier();
+
+      maxRetryInterval = other.getMaxRetryInterval();
+
+      reconnectAttempts = other.getReconnectAttempts();
+
+      failoverOnServerShutdown = other.isFailoverOnServerShutdown();
+   }
+   
    public ClientSessionFactoryImpl()
    {
       discoveryRefreshTimeout = DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
@@ -884,7 +946,12 @@
 
       closed = true;
    }
-
+   
+   public ClientSessionFactory copy()
+   {
+      return new ClientSessionFactoryImpl(this);
+   }
+   
    // DiscoveryListener implementation --------------------------------------------------------
 
    public synchronized void connectorsChanged()

Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnection.java	2009-11-06 10:25:52 UTC (rev 8235)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnection.java	2009-11-06 10:34:51 UTC (rev 8236)
@@ -281,6 +281,8 @@
                initialSession.close();
             }
          }
+         
+         sessionFactory.close();
 
          closed = true;
       }

Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java	2009-11-06 10:25:52 UTC (rev 8235)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java	2009-11-06 10:34:51 UTC (rev 8236)
@@ -11,7 +11,6 @@
  * permissions and limitations under the License.
  */
 
-
 package org.hornetq.jms.client;
 
 import java.io.Serializable;
@@ -48,7 +47,7 @@
  * @version <tt>$Revision$</tt> $Id$
  */
 public class HornetQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
-         XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory, Serializable, Referenceable 
+         XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory, Serializable, Referenceable
 {
    // Constants ------------------------------------------------------------------------------------
 
@@ -93,7 +92,7 @@
    }
 
    public HornetQConnectionFactory(final TransportConfiguration connectorConfig,
-                                 final TransportConfiguration backupConnectorConfig)
+                                   final TransportConfiguration backupConnectorConfig)
    {
       sessionFactory = new ClientSessionFactoryImpl(connectorConfig, backupConnectorConfig);
    }
@@ -432,7 +431,7 @@
    {
       sessionFactory.setRetryInterval(retryInterval);
    }
-   
+
    public synchronized long getMaxRetryInterval()
    {
       return sessionFactory.getMaxRetryInterval();
@@ -462,7 +461,7 @@
    {
       sessionFactory.setReconnectAttempts(reconnectAttempts);
    }
-   
+
    public synchronized boolean isFailoverOnServerShutdown()
    {
       return sessionFactory.isFailoverOnServerShutdown();
@@ -518,24 +517,28 @@
    // Protected ------------------------------------------------------------------------------------
 
    protected synchronized HornetQConnection createConnectionInternal(final String username,
-                                                                   final String password,
-                                                                   final boolean isXA,
-                                                                   final int type) throws JMSException
+                                                                     final String password,
+                                                                     final boolean isXA,
+                                                                     final int type) throws JMSException
    {
       readOnly = true;
+      
+      //Note that each JMS connection gets it's own copy of the connection factory
+      //This means there is one underlying remoting connection per jms connection (if not load balanced)
+      ClientSessionFactory factory = sessionFactory.copy();
 
       HornetQConnection connection = new HornetQConnection(username,
-                                                       password,
-                                                       type,
-                                                       clientID,
-                                                       dupsOKBatchSize,
-                                                       transactionBatchSize,
-                                                       sessionFactory);
+                                                           password,
+                                                           type,
+                                                           clientID,
+                                                           dupsOKBatchSize,
+                                                           transactionBatchSize,
+                                                           factory);
 
-      
-      try {
+      try
+      {
          connection.authorize();
-      } 
+      }
       catch (JMSException e)
       {
          try

Deleted: trunk/tests/jms-tests/src/org/hornetq/jms/tests/stress/ConnectionConsumerStressTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/stress/ConnectionConsumerStressTest.java	2009-11-06 10:25:52 UTC (rev 8235)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/stress/ConnectionConsumerStressTest.java	2009-11-06 10:34:51 UTC (rev 8236)
@@ -1,54 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.tests.stress;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-
-/**
- * 
- * A ConnectionConsumerStressTest.
- * 
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 2349 $</tt>
- *
- * $Id: StressTest.java 2349 2007-02-19 14:15:53Z timfox $
- */
-public class ConnectionConsumerStressTest extends JMSStressTestBase
-{
-   
-   public void testConnectionConsumer() throws Exception
-   {
-      Connection conn = cf.createConnection();
-      conn.start();
-      
-      Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      
-      Session sessReceive = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      
-      MessageProducer prod = sessSend.createProducer(queue1);
-      prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-      
-      Runner[] runners = new Runner[] { new Sender("prod1", sessSend, prod, 100000),
-                                        new Receiver(conn, sessReceive, 100000, queue1) };
-
-      runRunners(runners);
-
-      conn.close();      
-   }
-
-}



More information about the hornetq-commits mailing list