[jboss-cvs] JBoss Messaging SVN: r5442 - trunk/src/main/org/jboss/messaging/core/client/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Nov 28 14:00:25 EST 2008


Author: timfox
Date: 2008-11-28 14:00:25 -0500 (Fri, 28 Nov 2008)
New Revision: 5442

Added:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
Log:
Missing file


Added: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-11-28 19:00:25 UTC (rev 5442)
@@ -0,0 +1,690 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.client.impl;
+
+import java.net.InetAddress;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ConnectionLoadBalancingPolicy;
+import org.jboss.messaging.core.client.ConnectionManager;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.server.cluster.DiscoveryListener;
+import org.jboss.messaging.core.server.cluster.impl.DiscoveryGroupImpl;
+import org.jboss.messaging.util.Pair;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ * @version <tt>$Revision: 3602 $</tt>
+ * 
+ * Note! There should never be more than one clientsessionfactory with the same connection params
+ * Otherwise failover won't work properly since channel ids won't match on live and backup
+ */
+public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, DiscoveryListener
+{
+   // Constants
+   // ------------------------------------------------------------------------------------
+
+   private static final long serialVersionUID = 2512460695662741413L;
+
+   private static final Logger log = Logger.getLogger(ClientSessionFactoryImpl.class);
+
+   public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = "org.jboss.messaging.core.client.impl.RoundRobinConnectionLoadBalancingPolicy";
+
+   public static final long DEFAULT_PING_PERIOD = 5000;
+
+   // Any message beyond this size is considered a large message (to be sent in chunks)
+   public static final int DEFAULT_MIN_LARGE_MESSAGE_SIZE = 100 * 1024;
+
+   public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
+
+   public static final int DEFAULT_CONSUMER_MAX_RATE = -1;
+
+   public static final int DEFAULT_SEND_WINDOW_SIZE = 1024 * 1024;
+
+   public static final int DEFAULT_PRODUCER_MAX_RATE = -1;
+
+   public static final boolean DEFAULT_BLOCK_ON_ACKNOWLEDGE = false;
+
+   public static final boolean DEFAULT_BLOCK_ON_PERSISTENT_SEND = false;
+
+   public static final boolean DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND = false;
+
+   public static final boolean DEFAULT_AUTO_GROUP = false;
+
+   public static final long DEFAULT_CALL_TIMEOUT = 30000;
+
+   public static final int DEFAULT_MAX_CONNECTIONS = 8;
+
+   public static final int DEFAULT_ACK_BATCH_SIZE = 1024 * 1024;
+
+   public static final boolean DEFAULT_PRE_ACKNOWLEDGE = false;
+   
+   public static final long DEFAULT_DISCOVERY_INITIAL_WAIT = 10000;
+   
+   // Attributes
+   // -----------------------------------------------------------------------------------
+
+   private final Map<Pair<TransportConfiguration, TransportConfiguration>, ConnectionManager> connectionManagerMap =
+      new LinkedHashMap<Pair<TransportConfiguration, TransportConfiguration>, ConnectionManager>();
+
+   private ConnectionManager[] connectionManagerArray;
+
+   private final long pingPeriod;
+
+   private final long callTimeout;
+
+   private final int maxConnections;
+
+   // Some of these attributes are mutable and can be updated by different threads so
+   // must be volatile
+
+   private volatile int minLargeMessageSize;
+
+   private volatile int consumerWindowSize;
+
+   private volatile int consumerMaxRate;
+
+   private volatile int sendWindowSize;
+
+   private volatile int producerMaxRate;
+
+   private volatile boolean blockOnAcknowledge;
+
+   private volatile boolean blockOnPersistentSend;
+
+   private volatile boolean blockOnNonPersistentSend;
+
+   private volatile boolean autoGroup;
+
+   private boolean preAcknowledge;
+
+   private volatile int ackBatchSize;
+
+   private final ConnectionLoadBalancingPolicy loadBalancingPolicy;
+
+   private final DiscoveryGroup discoveryGroup;
+
+   private boolean receivedBroadcast = false;
+
+   private final long initialWaitTimeout;
+
+   // Static
+   // ---------------------------------------------------------------------------------------
+
+   // Constructors
+   // ---------------------------------------------------------------------------------
+
+   public ClientSessionFactoryImpl(final String discoveryGroupName, final int discoveryGroupPort) throws Exception
+   {
+      this(discoveryGroupName,
+           discoveryGroupPort,
+           ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT,
+           DEFAULT_DISCOVERY_INITIAL_WAIT);
+   }
+
+   public ClientSessionFactoryImpl(final String discoveryGroupName,
+                                   final int discoveryGroupPort,
+                                   final long discoveryRefreshTimeout,
+                                   final long initialWaitTimeout) throws Exception
+   {
+      InetAddress groupAddress = InetAddress.getByName(discoveryGroupName);
+
+      discoveryGroup = new DiscoveryGroupImpl(groupAddress, discoveryGroupPort, discoveryRefreshTimeout);
+
+      discoveryGroup.registerListener(this);
+
+      discoveryGroup.start();
+
+      this.initialWaitTimeout = initialWaitTimeout;
+      this.loadBalancingPolicy = instantiateLoadBalancingPolicy(DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME);
+      this.pingPeriod = DEFAULT_PING_PERIOD;
+      this.callTimeout = DEFAULT_CALL_TIMEOUT;
+      this.consumerWindowSize = DEFAULT_CONSUMER_WINDOW_SIZE;
+      this.consumerMaxRate = DEFAULT_CONSUMER_MAX_RATE;
+      this.sendWindowSize = DEFAULT_SEND_WINDOW_SIZE;
+      this.producerMaxRate = DEFAULT_PRODUCER_MAX_RATE;
+      this.blockOnAcknowledge = DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+      this.blockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+      this.blockOnPersistentSend = DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+      this.minLargeMessageSize = DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+      this.autoGroup = DEFAULT_AUTO_GROUP;
+      this.maxConnections = DEFAULT_MAX_CONNECTIONS;
+      this.ackBatchSize = DEFAULT_ACK_BATCH_SIZE;
+      this.preAcknowledge = DEFAULT_PRE_ACKNOWLEDGE;
+   }
+
+   public ClientSessionFactoryImpl(final String discoveryGroupName,
+                                   final int discoveryGroupPort,
+                                   final long discoveryRefreshTimeout,
+                                   final long initialWaitTimeout,
+                                   final String connectionloadBalancingPolicyClassName,
+                                   final long pingPeriod,
+                                   final long callTimeout,
+                                   final int consumerWindowSize,
+                                   final int consumerMaxRate,
+                                   final int sendWindowSize,
+                                   final int producerMaxRate,
+                                   final int minLargeMessageSize,
+                                   final boolean blockOnAcknowledge,
+                                   final boolean blockOnNonPersistentSend,
+                                   final boolean blockOnPersistentSend,
+                                   final boolean autoGroup,
+                                   final int maxConnections,
+                                   final boolean preAcknowledge,
+                                   final int ackBatchSize) throws MessagingException
+   {
+      try
+      {
+         InetAddress groupAddress = InetAddress.getByName(discoveryGroupName);
+   
+         discoveryGroup = new DiscoveryGroupImpl(groupAddress, discoveryGroupPort, discoveryRefreshTimeout);
+   
+         discoveryGroup.registerListener(this);
+   
+         discoveryGroup.start();
+      }
+      catch (Exception e)
+      {
+         //TODO - better execption
+         throw new MessagingException(MessagingException.INTERNAL_ERROR, "Failed to connect discovery group");
+      }
+
+      this.initialWaitTimeout = initialWaitTimeout;
+      this.loadBalancingPolicy = instantiateLoadBalancingPolicy(connectionloadBalancingPolicyClassName);
+      this.pingPeriod = pingPeriod;
+      this.callTimeout = callTimeout;
+      this.consumerWindowSize = consumerWindowSize;
+      this.consumerMaxRate = consumerMaxRate;
+      this.sendWindowSize = sendWindowSize;
+      this.producerMaxRate = producerMaxRate;
+      this.blockOnAcknowledge = blockOnAcknowledge;
+      this.blockOnNonPersistentSend = blockOnNonPersistentSend;
+      this.blockOnPersistentSend = blockOnPersistentSend;
+      this.minLargeMessageSize = minLargeMessageSize;
+      this.autoGroup = autoGroup;
+      this.maxConnections = maxConnections;
+      this.ackBatchSize = ackBatchSize;
+      this.preAcknowledge = preAcknowledge;
+   }
+
+   public ClientSessionFactoryImpl(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors,
+                                   final String connectionloadBalancingPolicyClassName,
+                                   final long pingPeriod,
+                                   final long callTimeout,
+                                   final int consumerWindowSize,
+                                   final int consumerMaxRate,
+                                   final int sendWindowSize,
+                                   final int producerMaxRate,
+                                   final int minLargeMessageSize,
+                                   final boolean blockOnAcknowledge,
+                                   final boolean blockOnNonPersistentSend,
+                                   final boolean blockOnPersistentSend,
+                                   final boolean autoGroup,
+                                   final int maxConnections,
+                                   final boolean preAcknowledge,
+                                   final int ackBatchSize)
+   {
+      this.loadBalancingPolicy = instantiateLoadBalancingPolicy(connectionloadBalancingPolicyClassName);
+      this.pingPeriod = pingPeriod;
+      this.callTimeout = callTimeout;
+      this.consumerWindowSize = consumerWindowSize;
+      this.consumerMaxRate = consumerMaxRate;
+      this.sendWindowSize = sendWindowSize;
+      this.producerMaxRate = producerMaxRate;
+      this.blockOnAcknowledge = blockOnAcknowledge;
+      this.blockOnNonPersistentSend = blockOnNonPersistentSend;
+      this.blockOnPersistentSend = blockOnPersistentSend;
+      this.minLargeMessageSize = minLargeMessageSize;
+      this.autoGroup = autoGroup;
+      this.maxConnections = maxConnections;
+      this.ackBatchSize = ackBatchSize;
+      this.preAcknowledge = preAcknowledge;
+      this.initialWaitTimeout = -1;
+
+      for (Pair<TransportConfiguration, TransportConfiguration> pair : connectors)
+      {
+         ConnectionManager cm = new ConnectionManagerImpl(pair.a, pair.b, maxConnections, callTimeout, pingPeriod);
+         
+         connectionManagerMap.put(pair, cm);
+      }
+      
+      updateConnectionManagerArray();
+
+      this.discoveryGroup = null;
+   }
+
+   public ClientSessionFactoryImpl(final TransportConfiguration connectorConfig,
+                                   final TransportConfiguration backupConfig)
+   {      
+      this.loadBalancingPolicy = new FirstElementConnectionLoadBalancingPolicy();
+      this.pingPeriod = DEFAULT_PING_PERIOD;
+      this.callTimeout = DEFAULT_CALL_TIMEOUT;
+      this.consumerWindowSize = DEFAULT_CONSUMER_WINDOW_SIZE;
+      this.consumerMaxRate = DEFAULT_CONSUMER_MAX_RATE;
+      this.sendWindowSize = DEFAULT_SEND_WINDOW_SIZE;
+      this.producerMaxRate = DEFAULT_PRODUCER_MAX_RATE;
+      this.blockOnAcknowledge = DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+      this.blockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+      this.blockOnPersistentSend = DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+      this.minLargeMessageSize = DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+      this.autoGroup = DEFAULT_AUTO_GROUP;
+      this.maxConnections = DEFAULT_MAX_CONNECTIONS;
+      this.ackBatchSize = DEFAULT_ACK_BATCH_SIZE;
+      this.preAcknowledge = DEFAULT_PRE_ACKNOWLEDGE;
+      this.initialWaitTimeout = -1;
+
+      Pair<TransportConfiguration, TransportConfiguration> pair = new Pair<TransportConfiguration, TransportConfiguration>(connectorConfig,
+               backupConfig);
+      
+      ConnectionManager cm =  new ConnectionManagerImpl(pair.a, pair.b, maxConnections, callTimeout, pingPeriod);
+      
+      connectionManagerMap.put(pair, cm);
+      
+      updateConnectionManagerArray();
+
+      discoveryGroup = null;
+   }
+   
+   public ClientSessionFactoryImpl(final TransportConfiguration connectorConfig,
+                                   final TransportConfiguration backupConfig,
+                                   final String connectionloadBalancingPolicyClassName,
+                                   final long pingPeriod,
+                                   final long callTimeout,
+                                   final int consumerWindowSize,
+                                   final int consumerMaxRate,
+                                   final int sendWindowSize,
+                                   final int producerMaxRate,
+                                   final int minLargeMessageSize,
+                                   final boolean blockOnAcknowledge,
+                                   final boolean blockOnNonPersistentSend,
+                                   final boolean blockOnPersistentSend,
+                                   final boolean autoGroup,
+                                   final int maxConnections,
+                                   final boolean preAcknowledge,
+                                   final int ackBatchSize)
+   {
+      this.loadBalancingPolicy = instantiateLoadBalancingPolicy(connectionloadBalancingPolicyClassName);
+      this.pingPeriod = pingPeriod;
+      this.callTimeout = callTimeout;
+      this.consumerWindowSize = consumerWindowSize;
+      this.consumerMaxRate = consumerMaxRate;
+      this.sendWindowSize = sendWindowSize;
+      this.producerMaxRate = producerMaxRate;
+      this.blockOnAcknowledge = blockOnAcknowledge;
+      this.blockOnNonPersistentSend = blockOnNonPersistentSend;
+      this.blockOnPersistentSend = blockOnPersistentSend;
+      this.minLargeMessageSize = minLargeMessageSize;
+      this.autoGroup = autoGroup;
+      this.maxConnections = maxConnections;
+      this.ackBatchSize = ackBatchSize;
+      this.preAcknowledge = preAcknowledge;
+      this.initialWaitTimeout = -1;
+
+      Pair<TransportConfiguration, TransportConfiguration> pair = new Pair<TransportConfiguration, TransportConfiguration>(connectorConfig,
+               backupConfig);
+      
+      ConnectionManager cm =  new ConnectionManagerImpl(pair.a, pair.b, maxConnections, callTimeout, pingPeriod);
+      
+      connectionManagerMap.put(pair, cm);
+      
+      updateConnectionManagerArray();
+
+      discoveryGroup = null;
+   }
+   
+   /**
+   * Create a ClientSessionFactoryImpl specify transport type and using defaults
+   */
+   public ClientSessionFactoryImpl(final TransportConfiguration connectorConfig)
+   {
+      this(connectorConfig, null);
+   }
+
+   // ClientSessionFactory implementation------------------------------------------------------------   
+
+   public ClientSession createSession(final String username,
+                                      final String password,
+                                      final boolean xa,
+                                      final boolean autoCommitSends,
+                                      final boolean autoCommitAcks,
+                                      final boolean preAcknowledge,
+                                      final int ackBatchSize) throws MessagingException
+   {
+      return createSessionInternal(username,
+                                   password,
+                                   xa,
+                                   autoCommitSends,
+                                   autoCommitAcks,
+                                   preAcknowledge,
+                                   ackBatchSize);
+   }
+
+   public ClientSession createSession(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks) throws MessagingException
+   {
+      return createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks, preAcknowledge, ackBatchSize);
+   }
+
+   public ClientSession createSession(final boolean xa,
+                                      final boolean autoCommitSends,
+                                      final boolean autoCommitAcks,
+                                      final boolean preAcknowledge) throws MessagingException
+   {
+      return createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks, preAcknowledge, ackBatchSize);
+   }
+
+   
+   public int getConsumerWindowSize()
+   {
+      return consumerWindowSize;
+   }
+
+   public void setConsumerWindowSize(final int size)
+   {
+      consumerWindowSize = size;
+   }
+
+   public int getSendWindowSize()
+   {
+      return sendWindowSize;
+   }
+
+   public void setSendWindowSize(final int size)
+   {
+      sendWindowSize = size;
+   }
+
+   public int getProducerMaxRate()
+   {
+      return producerMaxRate;
+   }
+
+   public void setProducerMaxRate(final int rate)
+   {
+      producerMaxRate = rate;
+   }
+
+   public int getConsumerMaxRate()
+   {
+      return consumerMaxRate;
+   }
+
+   public void setConsumerMaxRate(final int rate)
+   {
+      consumerMaxRate = rate;
+   }
+
+   public boolean isBlockOnPersistentSend()
+   {
+      return blockOnPersistentSend;
+   }
+
+   public void setBlockOnPersistentSend(final boolean blocking)
+   {
+      blockOnPersistentSend = blocking;
+   }
+
+   public boolean isBlockOnNonPersistentSend()
+   {
+      return blockOnNonPersistentSend;
+   }
+
+   public void setBlockOnNonPersistentSend(final boolean blocking)
+   {
+      blockOnNonPersistentSend = blocking;
+   }
+
+   public boolean isBlockOnAcknowledge()
+   {
+      return blockOnAcknowledge;
+   }
+
+   public void setBlockOnAcknowledge(final boolean blocking)
+   {
+      blockOnAcknowledge = blocking;
+   }
+
+   public boolean isAutoGroup()
+   {
+      return autoGroup;
+   }
+
+   public void setAutoGroup(boolean autoGroup)
+   {
+      this.autoGroup = autoGroup;
+   }
+
+   public int getAckBatchSize()
+   {
+      return ackBatchSize;
+   }
+
+   public void setAckBatchSize(int ackBatchSize)
+   {
+      this.ackBatchSize = ackBatchSize;
+   }
+
+   public boolean isPreAcknowledge()
+   {
+      return preAcknowledge;
+   }
+
+   public void setPreAcknowledge(boolean preAcknowledge)
+   {
+      this.preAcknowledge = preAcknowledge;
+   }
+
+   public long getPingPeriod()
+   {
+      return pingPeriod;
+   }
+
+   public long getCallTimeout()
+   {
+      return callTimeout;
+   }
+
+   public int getMaxConnections()
+   {
+      return maxConnections;
+   }
+
+   /**
+    * @return the minLargeMessageSize
+    */
+   public int getMinLargeMessageSize()
+   {
+      return minLargeMessageSize;
+   }
+
+   /**
+    * @param minLargeMessageSize the minLargeMessageSize to set
+    */
+   public void setMinLargeMessageSize(int minLargeMessageSize)
+   {
+      this.minLargeMessageSize = minLargeMessageSize;
+   }
+
+   public int numSessions()
+   {
+      int num = 0;
+
+      for (ConnectionManager connectionManager: connectionManagerMap.values())
+      {
+         num += connectionManager.numSessions();
+      }
+
+      return num;
+   }
+
+   public int numConnections()
+   {
+      int num = 0;
+
+      for (ConnectionManager connectionManager: connectionManagerMap.values())
+      {
+         num += connectionManager.numConnections();
+      }
+
+      return num;
+   }
+
+   public void close()
+   {
+      if (discoveryGroup != null)
+      {
+         try
+         {
+            discoveryGroup.stop();
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to stop discovery group", e);
+         }
+      }
+   }
+
+   // DiscoveryListener implementation --------------------------------------------------------
+
+   public synchronized void connectorsChanged()
+   {
+      receivedBroadcast = true;
+
+      List<Pair<TransportConfiguration, TransportConfiguration>> newConnectors = discoveryGroup.getConnectors();
+      
+      Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
+
+      connectorSet.addAll(newConnectors);
+
+      Iterator<Map.Entry<Pair<TransportConfiguration,TransportConfiguration>, ConnectionManager>> iter = connectionManagerMap.entrySet().iterator();
+
+      while (iter.hasNext())
+      {
+         Map.Entry<Pair<TransportConfiguration,TransportConfiguration>, ConnectionManager> entry = iter.next();
+
+         if (!connectorSet.contains(entry.getKey()))
+         {
+            // ConnectionManager no longer there - we should remove it
+
+            iter.remove();
+         }
+      }
+
+      for (Pair<TransportConfiguration,TransportConfiguration> connectorPair : newConnectors)
+      {
+         if (!connectionManagerMap.containsKey(connectorPair))
+         {
+            //Create a new ConnectionManager
+             
+            ConnectionManager connectionManager = new ConnectionManagerImpl(connectorPair.a,
+                                                                            connectorPair.b,
+                                                                            maxConnections,
+                                                                            callTimeout,
+                                                                            pingPeriod);
+
+            connectionManagerMap.put(connectorPair, connectionManager);                       
+         }
+      }
+      
+      updateConnectionManagerArray();
+   }
+   
+   // Protected ------------------------------------------------------------------------------
+   
+   protected void finalize() throws Throwable
+   {
+      if (discoveryGroup != null)
+      {
+         discoveryGroup.stop();
+      }
+   }
+   
+   // Private --------------------------------------------------------------------------------
+
+   private synchronized ClientSession createSessionInternal(final String username,
+                                                            final String password,
+                                                            final boolean xa,
+                                                            final boolean autoCommitSends,
+                                                            final boolean autoCommitAcks,
+                                                            final boolean preAcknowledge,
+                                                            final int ackBatchSize) throws MessagingException
+   {
+      if (discoveryGroup != null && !receivedBroadcast)
+      {
+         boolean ok = discoveryGroup.waitForBroadcast(initialWaitTimeout);
+
+         if (!ok)
+         {
+            throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+                                         "Timed out waiting to receive intial broadcast from discovery group");
+         }
+      }
+
+      int pos = loadBalancingPolicy.select(connectionManagerArray.length);
+
+      ConnectionManager connectionManager = connectionManagerArray[pos];
+
+      return connectionManager.createSession(username,
+                                             password,
+                                             xa,
+                                             autoCommitSends,
+                                             autoCommitAcks,
+                                             preAcknowledge,
+                                             ackBatchSize,
+                                             minLargeMessageSize,
+                                             blockOnAcknowledge,
+                                             autoGroup,
+                                             sendWindowSize,
+                                             consumerWindowSize,
+                                             consumerMaxRate,
+                                             producerMaxRate,
+                                             blockOnNonPersistentSend,
+                                             blockOnPersistentSend);
+   }
+   
+   private ConnectionLoadBalancingPolicy instantiateLoadBalancingPolicy(final String className)
+   {
+      ClassLoader loader = Thread.currentThread().getContextClassLoader();
+      ConnectionLoadBalancingPolicy lbPolicy;
+      try
+      {
+         Class<?> clazz = loader.loadClass(className);
+         lbPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
+      }
+      catch (Exception e)
+      {
+         throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + className + "\"", e);
+      }
+
+      return lbPolicy;
+   }
+   
+   private void updateConnectionManagerArray()
+   {      
+      connectionManagerArray = new ConnectionManager[connectionManagerMap.size()];
+      
+      connectionManagerMap.values().toArray(connectionManagerArray);
+   }
+
+}




More information about the jboss-cvs-commits mailing list